diff --git a/backend/src/api/admin/crawler.rs b/backend/src/api/admin/crawler.rs new file mode 100644 index 0000000..8529fbd --- /dev/null +++ b/backend/src/api/admin/crawler.rs @@ -0,0 +1,345 @@ +//! Admin-only crawler observability + control endpoints. +//! +//! Mounted under `/api/v1/admin/crawler*`, cookie-only via `RequireAdmin`. +//! All control endpoints return 503 when the crawler daemon is disabled +//! (`AppState.crawler == None`). Reads compose the live in-process status +//! ([`crate::crawler::status`]) with DB-derived queue counts and the +//! session/browser flags. + +use axum::extract::{Query, State}; +use axum::routing::{get, post}; +use axum::{Json, Router}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use uuid::Uuid; + +use crate::app::{AppState, CrawlerControl}; +use crate::auth::extractor::RequireAdmin; +use crate::crawler::browser_manager::RestartPhase; +use crate::crawler::status::{LastPass, Phase, WorkerState}; +use crate::error::{AppError, AppResult}; +use crate::repo; +use crate::repo::crawler::{DeadJob, RequeueScope}; + +pub fn routes() -> Router { + Router::new() + .route("/admin/crawler", get(get_status)) + .route("/admin/crawler/run", post(run_now)) + .route("/admin/crawler/browser/restart", post(restart_browser)) + .route("/admin/crawler/session", post(update_session)) + .route( + "/admin/crawler/session/clear-expired", + post(clear_session_expired), + ) + .route("/admin/crawler/dead-jobs", get(list_dead_jobs)) + .route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs)) +} + +// --------------------------------------------------------------------------- +// GET /admin/crawler — live status +// --------------------------------------------------------------------------- + +#[derive(Debug, Serialize)] +struct QueueCounts { + pending: i64, + running: i64, + dead: i64, +} + +#[derive(Debug, Serialize)] +struct SessionStatus { + /// Whether the sticky session-expired flag is set (chapter workers idle). + expired: bool, + /// Whether a PHPSESSID is currently configured at all. + configured: bool, +} + +#[derive(Debug, Serialize)] +struct CrawlerStatusResponse { + /// `"running"` | `"disabled"`. + daemon: &'static str, + phase: Option, + workers: Vec, + last_pass: LastPass, + session: SessionStatus, + /// `"healthy"` | `"draining"` | `"restarting"` | `"down"`. + browser: &'static str, + queue: QueueCounts, +} + +fn browser_phase_str(p: RestartPhase) -> &'static str { + match p { + RestartPhase::Healthy => "healthy", + RestartPhase::Draining => "draining", + RestartPhase::Restarting => "restarting", + } +} + +async fn get_status( + State(state): State, + _admin: RequireAdmin, +) -> AppResult> { + let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?; + let queue = QueueCounts { + pending, + running, + dead, + }; + + let resp = match state.crawler.as_ref() { + None => CrawlerStatusResponse { + daemon: "disabled", + phase: None, + workers: Vec::new(), + last_pass: LastPass::default(), + session: SessionStatus { + expired: false, + configured: false, + }, + browser: "down", + queue, + }, + Some(c) => { + let snap = c.status.snapshot().await; + CrawlerStatusResponse { + daemon: "running", + phase: Some(snap.phase), + workers: snap.workers, + last_pass: snap.last_pass, + session: SessionStatus { + expired: c.session.is_expired(), + configured: c.session.current().await.is_some(), + }, + browser: browser_phase_str(c.browser_manager.phase()), + queue, + } + } + }; + Ok(Json(resp)) +} + +// --------------------------------------------------------------------------- +// POST /admin/crawler/run — trigger an out-of-cycle metadata pass +// --------------------------------------------------------------------------- + +#[derive(Debug, Serialize)] +struct RunResponse { + started: bool, +} + +async fn run_now( + State(state): State, + admin: RequireAdmin, +) -> AppResult> { + let c = require_crawler(&state)?; + let mp = c.metadata_pass.as_ref().ok_or_else(|| { + AppError::ServiceUnavailable("no source configured (CRAWLER_START_URL unset)".into()) + })?; + let mp = std::sync::Arc::clone(mp); + // Fire-and-forget: the pass can run for minutes; the dashboard polls + // status for progress. Overlap with the daily cron is rare (daily) and + // both serialise on the single browser lease. + tokio::spawn(async move { + if let Err(e) = mp.run().await { + tracing::warn!(error = ?e, "manual metadata pass failed"); + } + }); + repo::admin_audit::insert(&state.db, admin.0.id, "crawler_run", "crawler", None, json!({})) + .await?; + Ok(Json(RunResponse { started: true })) +} + +// --------------------------------------------------------------------------- +// POST /admin/crawler/browser/restart — coordinated restart +// --------------------------------------------------------------------------- + +#[derive(Debug, Serialize)] +struct RestartResponse { + ok: bool, + error: Option, +} + +async fn restart_browser( + State(state): State, + admin: RequireAdmin, +) -> AppResult> { + let c = require_crawler(&state)?; + let result = c.browser_manager.coordinated_restart(c.drain_deadline).await; + repo::admin_audit::insert( + &state.db, + admin.0.id, + "crawler_browser_restart", + "crawler", + None, + json!({ "ok": result.is_ok() }), + ) + .await?; + Ok(Json(match result { + Ok(()) => RestartResponse { + ok: true, + error: None, + }, + Err(e) => RestartResponse { + ok: false, + error: Some(format!("{e:#}")), + }, + })) +} + +// --------------------------------------------------------------------------- +// POST /admin/crawler/session — refresh PHPSESSID +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct UpdateSessionRequest { + phpsessid: String, +} + +#[derive(Debug, Serialize)] +struct UpdateSessionResponse { + /// Whether the post-update browser relaunch + session probe succeeded. + valid: bool, + error: Option, +} + +async fn update_session( + State(state): State, + admin: RequireAdmin, + Json(body): Json, +) -> AppResult> { + let c = require_crawler(&state)?; + c.session + .update(&body.phpsessid) + .await + .map_err(|e| AppError::InvalidInput(format!("{e:#}")))?; + // Relaunch the browser so on_launch re-injects the new cookie and + // re-probes — the restart's success IS the session-validity signal. + let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await; + repo::admin_audit::insert( + &state.db, + admin.0.id, + "crawler_session_update", + "crawler", + None, + json!({ "valid": probe.is_ok() }), + ) + .await?; + Ok(Json(match probe { + Ok(()) => UpdateSessionResponse { + valid: true, + error: None, + }, + Err(e) => UpdateSessionResponse { + valid: false, + error: Some(format!("{e:#}")), + }, + })) +} + +#[derive(Debug, Serialize)] +struct ClearExpiredResponse { + cleared: bool, +} + +async fn clear_session_expired( + State(state): State, + admin: RequireAdmin, +) -> AppResult> { + let c = require_crawler(&state)?; + c.session.clear_expired(); + repo::admin_audit::insert( + &state.db, + admin.0.id, + "crawler_session_clear_expired", + "crawler", + None, + json!({}), + ) + .await?; + Ok(Json(ClearExpiredResponse { cleared: true })) +} + +// --------------------------------------------------------------------------- +// Dead jobs +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize, Default)] +struct DeadJobsParams { + #[serde(default)] + search: Option, + #[serde(default = "default_limit")] + limit: i64, + #[serde(default)] + offset: i64, +} + +fn default_limit() -> i64 { + 50 +} + +async fn list_dead_jobs( + State(state): State, + _admin: RequireAdmin, + Query(params): Query, +) -> AppResult>> { + let limit = params.limit.clamp(1, 200); + let offset = params.offset.max(0); + let search = params.search.filter(|s| !s.trim().is_empty()); + let (items, total) = + repo::crawler::list_dead_jobs(&state.db, search.as_deref(), limit, offset).await?; + Ok(Json(crate::api::pagination::PagedResponse::with_total( + items, limit, offset, total, + ))) +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "scope", rename_all = "snake_case")] +enum RequeueRequest { + All, + Manga { manga_id: Uuid }, + Job { job_id: Uuid }, +} + +#[derive(Debug, Serialize)] +struct RequeueResponse { + requeued: u64, +} + +async fn requeue_dead_jobs( + State(state): State, + admin: RequireAdmin, + Json(body): Json, +) -> AppResult> { + let scope = match &body { + RequeueRequest::All => RequeueScope::All, + RequeueRequest::Manga { manga_id } => RequeueScope::Manga(*manga_id), + RequeueRequest::Job { job_id } => RequeueScope::Job(*job_id), + }; + let requeued = repo::crawler::requeue_dead_jobs(&state.db, scope).await?; + repo::admin_audit::insert( + &state.db, + admin.0.id, + "crawler_dead_jobs_requeue", + "crawler", + None, + json!({ "requeued": requeued, "scope": scope_label(&body) }), + ) + .await?; + Ok(Json(RequeueResponse { requeued })) +} + +fn scope_label(r: &RequeueRequest) -> &'static str { + match r { + RequeueRequest::All => "all", + RequeueRequest::Manga { .. } => "manga", + RequeueRequest::Job { .. } => "job", + } +} + +// --------------------------------------------------------------------------- + +fn require_crawler(state: &AppState) -> Result<&std::sync::Arc, AppError> { + state.crawler.as_ref().ok_or_else(|| { + AppError::ServiceUnavailable("crawler daemon is disabled".into()) + }) +} diff --git a/backend/src/api/admin/mod.rs b/backend/src/api/admin/mod.rs index 0154665..2048cf1 100644 --- a/backend/src/api/admin/mod.rs +++ b/backend/src/api/admin/mod.rs @@ -4,6 +4,7 @@ //! bot/API tokens cannot reach admin routes (see //! `crate::auth::extractor::RequireAdmin`). +pub mod crawler; pub mod mangas; pub mod resync; pub mod system; @@ -19,4 +20,5 @@ pub fn routes() -> Router { .merge(mangas::routes()) .merge(resync::routes()) .merge(system::routes()) + .merge(crawler::routes()) } diff --git a/backend/tests/api_admin_crawler.rs b/backend/tests/api_admin_crawler.rs new file mode 100644 index 0000000..e7dcd41 --- /dev/null +++ b/backend/tests/api_admin_crawler.rs @@ -0,0 +1,163 @@ +//! Integration tests for the admin crawler observability/control API. +//! +//! The default test harness wires `AppState.crawler = None` (no daemon), +//! so the *control* endpoints return 503 and the *read* endpoints that +//! work off the DB (status shell, dead-jobs list/requeue) still function. +//! This is exactly the production "daemon disabled" posture. + +mod common; + +use axum::http::StatusCode; +use axum::Router; +use serde_json::json; +use sqlx::PgPool; +use tower::ServiceExt; +use uuid::Uuid; + +use common::{body_json, get, get_with_cookie, post_json_with_cookie, register_user, harness}; + +async fn seed_admin(pool: &PgPool, app: &Router) -> String { + let (username, cookie) = register_user(app).await; + let u = mangalord::repo::user::find_by_username(pool, &username) + .await + .unwrap() + .unwrap(); + mangalord::repo::user::set_is_admin_unchecked(pool, u.id, true) + .await + .unwrap(); + cookie +} + +async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid { + let manga_id = Uuid::new_v4(); + let chapter_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)") + .bind(manga_id) + .bind(title) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)") + .bind(chapter_id) + .bind(manga_id) + .execute(pool) + .await + .unwrap(); + let job_id = Uuid::new_v4(); + sqlx::query( + "INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \ + VALUES ($1, $2, 'dead', 5, 'boom')", + ) + .bind(job_id) + .bind(json!({ + "kind": "sync_chapter_content", + "source_id": "target", + "chapter_id": chapter_id, + "source_chapter_key": "k", + })) + .execute(pool) + .await + .unwrap(); + job_id +} + +#[sqlx::test(migrations = "./migrations")] +async fn get_status_requires_admin(pool: PgPool) { + let h = harness(pool); + // Unauthenticated → 401. + let resp = h.app.clone().oneshot(get("/api/v1/admin/crawler")).await.unwrap(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + + // Authenticated non-admin → 403. + let (_u, cookie) = register_user(&h.app).await; + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); +} + +#[sqlx::test(migrations = "./migrations")] +async fn get_status_reports_disabled_daemon_with_queue_counts(pool: PgPool) { + seed_dead_job(&pool, "Naruto").await; + let h = harness(pool.clone()); + let cookie = seed_admin(&pool, &h.app).await; + + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(body["daemon"], "disabled"); + assert_eq!(body["queue"]["dead"], 1); + assert_eq!(body["browser"], "down"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) { + let h = harness(pool.clone()); + let cookie = seed_admin(&pool, &h.app).await; + for uri in [ + "/api/v1/admin/crawler/run", + "/api/v1/admin/crawler/browser/restart", + "/api/v1/admin/crawler/session/clear-expired", + ] { + let resp = h + .app + .clone() + .oneshot(post_json_with_cookie(uri, json!({}), &cookie)) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::SERVICE_UNAVAILABLE, + "{uri} should be 503 when daemon disabled" + ); + } +} + +#[sqlx::test(migrations = "./migrations")] +async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) { + let job_id = seed_dead_job(&pool, "Bleach").await; + let h = harness(pool.clone()); + let cookie = seed_admin(&pool, &h.app).await; + + // List. + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/dead-jobs", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(body["page"]["total"], 1); + assert_eq!(body["items"][0]["manga_title"], "Bleach"); + + // Requeue the single job. + let resp = h + .app + .clone() + .oneshot(post_json_with_cookie( + "/api/v1/admin/crawler/dead-jobs/requeue", + json!({ "scope": "job", "job_id": job_id }), + &cookie, + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(body["requeued"], 1); + + let state: String = sqlx::query_scalar("SELECT state FROM crawler_jobs WHERE id = $1") + .bind(job_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(state, "pending"); +}