//! Admin-only crawler observability + control endpoints. //! //! Mounted under `/api/v1/admin/crawler*`, cookie-only via `RequireAdmin`. //! All control endpoints return 503 when the crawler daemon is disabled //! (`AppState.crawler == None`). Reads compose the live in-process status //! ([`crate::crawler::status`]) with DB-derived queue counts and the //! session/browser flags. use std::convert::Infallible; use std::time::Duration; use axum::extract::{Query, State}; use axum::response::sse::{Event, KeepAlive, Sse}; use axum::routing::{get, post}; use axum::{Json, Router}; use futures_util::stream::Stream; use serde::{Deserialize, Serialize}; use serde_json::json; use uuid::Uuid; use crate::app::{AppState, CrawlerControl}; use crate::auth::extractor::RequireAdmin; use crate::crawler::browser_manager::RestartPhase; use crate::crawler::status::{LastPass, Phase, WorkerState}; use crate::error::{AppError, AppResult}; use crate::repo; use crate::repo::crawler::{DeadJob, RequeueScope}; /// Backstop recompose interval for the SSE stream. Phase/worker/session /// changes push instantly via the status `watch`; this only bounds the /// staleness of DB-derived queue counts and the browser phase when those /// change without an accompanying status poke. const SSE_BACKSTOP: Duration = Duration::from_secs(5); pub fn routes() -> Router { Router::new() .route("/admin/crawler", get(get_status)) .route("/admin/crawler/stream", get(stream_status)) .route("/admin/crawler/run", post(run_now)) .route("/admin/crawler/browser/restart", post(restart_browser)) .route("/admin/crawler/session", post(update_session)) .route( "/admin/crawler/session/clear-expired", post(clear_session_expired), ) .route("/admin/crawler/dead-jobs", get(list_dead_jobs)) .route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs)) } // --------------------------------------------------------------------------- // GET /admin/crawler — live status // --------------------------------------------------------------------------- #[derive(Debug, Serialize)] struct QueueCounts { pending: i64, running: i64, dead: i64, } #[derive(Debug, Serialize)] struct SessionStatus { /// Whether the sticky session-expired flag is set (chapter workers idle). expired: bool, /// Whether a PHPSESSID is currently configured at all. configured: bool, } #[derive(Debug, Serialize)] struct CrawlerStatusResponse { /// `"running"` | `"disabled"`. daemon: &'static str, phase: Option, workers: Vec, last_pass: LastPass, session: SessionStatus, /// `"healthy"` | `"draining"` | `"restarting"` | `"down"`. browser: &'static str, queue: QueueCounts, } fn browser_phase_str(p: RestartPhase) -> &'static str { match p { RestartPhase::Healthy => "healthy", RestartPhase::Draining => "draining", RestartPhase::Restarting => "restarting", } } /// Compose a full status snapshot from the in-memory status, the /// browser/session flags, and a fresh DB queue-count query. Shared by the /// one-shot `get_status` and the SSE `stream_status`. async fn compose_status(state: &AppState) -> AppResult { let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?; let queue = QueueCounts { pending, running, dead, }; Ok(match state.crawler.as_ref() { None => CrawlerStatusResponse { daemon: "disabled", phase: None, workers: Vec::new(), last_pass: LastPass::default(), session: SessionStatus { expired: false, configured: false, }, browser: "down", queue, }, Some(c) => { let snap = c.status.snapshot().await; CrawlerStatusResponse { daemon: "running", phase: Some(snap.phase), workers: snap.workers, last_pass: snap.last_pass, session: SessionStatus { expired: c.session.is_expired(), configured: c.session.current().await.is_some(), }, browser: browser_phase_str(c.browser_manager.phase()), queue, } } }) } async fn get_status( State(state): State, _admin: RequireAdmin, ) -> AppResult> { Ok(Json(compose_status(&state).await?)) } // --------------------------------------------------------------------------- // GET /admin/crawler/stream — Server-Sent Events live status // --------------------------------------------------------------------------- /// Push live status to the dashboard instead of polling. Emits a snapshot /// immediately on connect, then on every status change (instant, via the /// `watch` notifier) and on a [`SSE_BACKSTOP`] tick (to refresh DB queue /// counts / browser phase that change without a status poke). The browser /// opens this only while the crawler page is mounted and closes it on /// navigate-away, so the subscription is scoped to the active page. async fn stream_status( State(state): State, _admin: RequireAdmin, ) -> Sse>> { // Subscribe before the first emit so no change between the initial // snapshot and the first await is lost. let rx = state.crawler.as_ref().map(|c| c.status.subscribe()); let stream = futures_util::stream::unfold( (state, rx, true), |(state, mut rx, first)| async move { // After the first immediate emit, wait for a change or the // backstop tick before recomposing. if !first { match rx.as_mut() { Some(rx) => { tokio::select! { _ = rx.changed() => {} _ = tokio::time::sleep(SSE_BACKSTOP) => {} } } None => tokio::time::sleep(SSE_BACKSTOP).await, } } // Compose; on a transient DB error, emit a keep-alive comment // rather than tearing down the stream. let event = match compose_status(&state).await { Ok(resp) => Event::default() .event("status") .json_data(&resp) .unwrap_or_else(|_| Event::default().comment("serialize error")), Err(_) => Event::default().comment("status unavailable"), }; Some((Ok(event), (state, rx, false))) }, ); Sse::new(stream).keep_alive(KeepAlive::default()) } // --------------------------------------------------------------------------- // POST /admin/crawler/run — trigger an out-of-cycle metadata pass // --------------------------------------------------------------------------- #[derive(Debug, Serialize)] struct RunResponse { started: bool, } async fn run_now( State(state): State, admin: RequireAdmin, ) -> AppResult> { let c = require_crawler(&state)?; let mp = c.metadata_pass.as_ref().ok_or_else(|| { AppError::ServiceUnavailable("no source configured (CRAWLER_START_URL unset)".into()) })?; let mp = std::sync::Arc::clone(mp); // Fire-and-forget: the pass can run for minutes; the dashboard polls // status for progress. Overlap with the daily cron is rare (daily) and // both serialise on the single browser lease. tokio::spawn(async move { if let Err(e) = mp.run().await { tracing::warn!(error = ?e, "manual metadata pass failed"); } }); repo::admin_audit::insert(&state.db, admin.0.id, "crawler_run", "crawler", None, json!({})) .await?; Ok(Json(RunResponse { started: true })) } // --------------------------------------------------------------------------- // POST /admin/crawler/browser/restart — coordinated restart // --------------------------------------------------------------------------- #[derive(Debug, Serialize)] struct RestartResponse { ok: bool, error: Option, } async fn restart_browser( State(state): State, admin: RequireAdmin, ) -> AppResult> { let c = require_crawler(&state)?; let result = c.browser_manager.coordinated_restart(c.drain_deadline).await; // A successful coordinated_restart re-runs on_launch, which re-injects // PHPSESSID and re-probes — i.e. the session is live. Drop the sticky // `session_expired` flag so chapter workers stop idling without // requiring a second click on "Clear expired". if result.is_ok() { c.session.clear_expired(); } // Push the post-restart browser phase to live subscribers immediately. c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, "crawler_browser_restart", "crawler", None, json!({ "ok": result.is_ok() }), ) .await?; Ok(Json(match result { Ok(()) => RestartResponse { ok: true, error: None, }, Err(e) => RestartResponse { ok: false, error: Some(format!("{e:#}")), }, })) } // --------------------------------------------------------------------------- // POST /admin/crawler/session — refresh PHPSESSID // --------------------------------------------------------------------------- #[derive(Debug, Deserialize)] struct UpdateSessionRequest { phpsessid: String, } #[derive(Debug, Serialize)] struct UpdateSessionResponse { /// Whether the post-update browser relaunch + session probe succeeded. valid: bool, error: Option, } async fn update_session( State(state): State, admin: RequireAdmin, Json(body): Json, ) -> AppResult> { let c = require_crawler(&state)?; c.session .update(&body.phpsessid) .await .map_err(|e| AppError::InvalidInput(format!("{e:#}")))?; // Relaunch the browser so on_launch re-injects the new cookie and // re-probes — the restart's success IS the session-validity signal. let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await; // Session + browser state changed — push to live subscribers. c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, "crawler_session_update", "crawler", None, json!({ "valid": probe.is_ok() }), ) .await?; Ok(Json(match probe { Ok(()) => UpdateSessionResponse { valid: true, error: None, }, Err(e) => UpdateSessionResponse { valid: false, error: Some(format!("{e:#}")), }, })) } #[derive(Debug, Serialize)] struct ClearExpiredResponse { cleared: bool, } async fn clear_session_expired( State(state): State, admin: RequireAdmin, ) -> AppResult> { let c = require_crawler(&state)?; c.session.clear_expired(); // session.expired flipped — push to live subscribers. c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, "crawler_session_clear_expired", "crawler", None, json!({}), ) .await?; Ok(Json(ClearExpiredResponse { cleared: true })) } // --------------------------------------------------------------------------- // Dead jobs // --------------------------------------------------------------------------- #[derive(Debug, Deserialize, Default)] struct DeadJobsParams { #[serde(default)] search: Option, #[serde(default = "default_limit")] limit: i64, #[serde(default)] offset: i64, } fn default_limit() -> i64 { 50 } async fn list_dead_jobs( State(state): State, _admin: RequireAdmin, Query(params): Query, ) -> AppResult>> { let limit = params.limit.clamp(1, 200); let offset = params.offset.max(0); let search = params.search.filter(|s| !s.trim().is_empty()); let (items, total) = repo::crawler::list_dead_jobs(&state.db, search.as_deref(), limit, offset).await?; Ok(Json(crate::api::pagination::PagedResponse::with_total( items, limit, offset, total, ))) } #[derive(Debug, Deserialize)] #[serde(tag = "scope", rename_all = "snake_case")] enum RequeueRequest { All, Manga { manga_id: Uuid }, Chapter { chapter_id: Uuid }, Job { job_id: Uuid }, } #[derive(Debug, Serialize)] struct RequeueResponse { requeued: u64, } async fn requeue_dead_jobs( State(state): State, admin: RequireAdmin, Json(body): Json, ) -> AppResult> { let scope = match &body { RequeueRequest::All => RequeueScope::All, RequeueRequest::Manga { manga_id } => RequeueScope::Manga(*manga_id), RequeueRequest::Chapter { chapter_id } => RequeueScope::Chapter(*chapter_id), RequeueRequest::Job { job_id } => RequeueScope::Job(*job_id), }; let requeued = repo::crawler::requeue_dead_jobs(&state.db, scope).await?; repo::admin_audit::insert( &state.db, admin.0.id, "crawler_dead_jobs_requeue", "crawler", None, json!({ "requeued": requeued, "scope": scope_label(&body) }), ) .await?; Ok(Json(RequeueResponse { requeued })) } fn scope_label(r: &RequeueRequest) -> &'static str { match r { RequeueRequest::All => "all", RequeueRequest::Manga { .. } => "manga", RequeueRequest::Chapter { .. } => "chapter", RequeueRequest::Job { .. } => "job", } } // --------------------------------------------------------------------------- fn require_crawler(state: &AppState) -> Result<&std::sync::Arc, AppError> { state.crawler.as_ref().ok_or_else(|| { AppError::ServiceUnavailable("crawler daemon is disabled".into()) }) }