feat(api): admin crawler observability + control endpoints
GET /admin/crawler live status (phase, workers,
last pass, session, browser, queue)
POST /admin/crawler/run trigger an out-of-cycle metadata pass
POST /admin/crawler/browser/restart coordinated Chromium restart
POST /admin/crawler/session refresh PHPSESSID + re-probe
POST /admin/crawler/session/clear-expired clear the sticky expired flag
GET /admin/crawler/dead-jobs paginated dead-letter list
POST /admin/crawler/dead-jobs/requeue requeue all / per-manga / single
All cookie-only via RequireAdmin; control endpoints 503 when the daemon is
disabled; mutations are audit-logged. Reads compose the live status with
DB-derived queue counts.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
345
backend/src/api/admin/crawler.rs
Normal file
345
backend/src/api/admin/crawler.rs
Normal file
@@ -0,0 +1,345 @@
|
||||
//! Admin-only crawler observability + control endpoints.
|
||||
//!
|
||||
//! Mounted under `/api/v1/admin/crawler*`, cookie-only via `RequireAdmin`.
|
||||
//! All control endpoints return 503 when the crawler daemon is disabled
|
||||
//! (`AppState.crawler == None`). Reads compose the live in-process status
|
||||
//! ([`crate::crawler::status`]) with DB-derived queue counts and the
|
||||
//! session/browser flags.
|
||||
|
||||
use axum::extract::{Query, State};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Json, Router};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::app::{AppState, CrawlerControl};
|
||||
use crate::auth::extractor::RequireAdmin;
|
||||
use crate::crawler::browser_manager::RestartPhase;
|
||||
use crate::crawler::status::{LastPass, Phase, WorkerState};
|
||||
use crate::error::{AppError, AppResult};
|
||||
use crate::repo;
|
||||
use crate::repo::crawler::{DeadJob, RequeueScope};
|
||||
|
||||
pub fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/admin/crawler", get(get_status))
|
||||
.route("/admin/crawler/run", post(run_now))
|
||||
.route("/admin/crawler/browser/restart", post(restart_browser))
|
||||
.route("/admin/crawler/session", post(update_session))
|
||||
.route(
|
||||
"/admin/crawler/session/clear-expired",
|
||||
post(clear_session_expired),
|
||||
)
|
||||
.route("/admin/crawler/dead-jobs", get(list_dead_jobs))
|
||||
.route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// GET /admin/crawler — live status
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct QueueCounts {
|
||||
pending: i64,
|
||||
running: i64,
|
||||
dead: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SessionStatus {
|
||||
/// Whether the sticky session-expired flag is set (chapter workers idle).
|
||||
expired: bool,
|
||||
/// Whether a PHPSESSID is currently configured at all.
|
||||
configured: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct CrawlerStatusResponse {
|
||||
/// `"running"` | `"disabled"`.
|
||||
daemon: &'static str,
|
||||
phase: Option<Phase>,
|
||||
workers: Vec<WorkerState>,
|
||||
last_pass: LastPass,
|
||||
session: SessionStatus,
|
||||
/// `"healthy"` | `"draining"` | `"restarting"` | `"down"`.
|
||||
browser: &'static str,
|
||||
queue: QueueCounts,
|
||||
}
|
||||
|
||||
fn browser_phase_str(p: RestartPhase) -> &'static str {
|
||||
match p {
|
||||
RestartPhase::Healthy => "healthy",
|
||||
RestartPhase::Draining => "draining",
|
||||
RestartPhase::Restarting => "restarting",
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_status(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
) -> AppResult<Json<CrawlerStatusResponse>> {
|
||||
let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?;
|
||||
let queue = QueueCounts {
|
||||
pending,
|
||||
running,
|
||||
dead,
|
||||
};
|
||||
|
||||
let resp = match state.crawler.as_ref() {
|
||||
None => CrawlerStatusResponse {
|
||||
daemon: "disabled",
|
||||
phase: None,
|
||||
workers: Vec::new(),
|
||||
last_pass: LastPass::default(),
|
||||
session: SessionStatus {
|
||||
expired: false,
|
||||
configured: false,
|
||||
},
|
||||
browser: "down",
|
||||
queue,
|
||||
},
|
||||
Some(c) => {
|
||||
let snap = c.status.snapshot().await;
|
||||
CrawlerStatusResponse {
|
||||
daemon: "running",
|
||||
phase: Some(snap.phase),
|
||||
workers: snap.workers,
|
||||
last_pass: snap.last_pass,
|
||||
session: SessionStatus {
|
||||
expired: c.session.is_expired(),
|
||||
configured: c.session.current().await.is_some(),
|
||||
},
|
||||
browser: browser_phase_str(c.browser_manager.phase()),
|
||||
queue,
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(Json(resp))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/run — trigger an out-of-cycle metadata pass
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RunResponse {
|
||||
started: bool,
|
||||
}
|
||||
|
||||
async fn run_now(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<RunResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
let mp = c.metadata_pass.as_ref().ok_or_else(|| {
|
||||
AppError::ServiceUnavailable("no source configured (CRAWLER_START_URL unset)".into())
|
||||
})?;
|
||||
let mp = std::sync::Arc::clone(mp);
|
||||
// Fire-and-forget: the pass can run for minutes; the dashboard polls
|
||||
// status for progress. Overlap with the daily cron is rare (daily) and
|
||||
// both serialise on the single browser lease.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = mp.run().await {
|
||||
tracing::warn!(error = ?e, "manual metadata pass failed");
|
||||
}
|
||||
});
|
||||
repo::admin_audit::insert(&state.db, admin.0.id, "crawler_run", "crawler", None, json!({}))
|
||||
.await?;
|
||||
Ok(Json(RunResponse { started: true }))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/browser/restart — coordinated restart
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RestartResponse {
|
||||
ok: bool,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn restart_browser(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<RestartResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
let result = c.browser_manager.coordinated_restart(c.drain_deadline).await;
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_browser_restart",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "ok": result.is_ok() }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(match result {
|
||||
Ok(()) => RestartResponse {
|
||||
ok: true,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => RestartResponse {
|
||||
ok: false,
|
||||
error: Some(format!("{e:#}")),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/session — refresh PHPSESSID
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct UpdateSessionRequest {
|
||||
phpsessid: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct UpdateSessionResponse {
|
||||
/// Whether the post-update browser relaunch + session probe succeeded.
|
||||
valid: bool,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn update_session(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Json(body): Json<UpdateSessionRequest>,
|
||||
) -> AppResult<Json<UpdateSessionResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
c.session
|
||||
.update(&body.phpsessid)
|
||||
.await
|
||||
.map_err(|e| AppError::InvalidInput(format!("{e:#}")))?;
|
||||
// Relaunch the browser so on_launch re-injects the new cookie and
|
||||
// re-probes — the restart's success IS the session-validity signal.
|
||||
let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await;
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_session_update",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "valid": probe.is_ok() }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(match probe {
|
||||
Ok(()) => UpdateSessionResponse {
|
||||
valid: true,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => UpdateSessionResponse {
|
||||
valid: false,
|
||||
error: Some(format!("{e:#}")),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ClearExpiredResponse {
|
||||
cleared: bool,
|
||||
}
|
||||
|
||||
async fn clear_session_expired(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<ClearExpiredResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
c.session.clear_expired();
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_session_clear_expired",
|
||||
"crawler",
|
||||
None,
|
||||
json!({}),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(ClearExpiredResponse { cleared: true }))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dead jobs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
struct DeadJobsParams {
|
||||
#[serde(default)]
|
||||
search: Option<String>,
|
||||
#[serde(default = "default_limit")]
|
||||
limit: i64,
|
||||
#[serde(default)]
|
||||
offset: i64,
|
||||
}
|
||||
|
||||
fn default_limit() -> i64 {
|
||||
50
|
||||
}
|
||||
|
||||
async fn list_dead_jobs(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
Query(params): Query<DeadJobsParams>,
|
||||
) -> AppResult<Json<crate::api::pagination::PagedResponse<DeadJob>>> {
|
||||
let limit = params.limit.clamp(1, 200);
|
||||
let offset = params.offset.max(0);
|
||||
let search = params.search.filter(|s| !s.trim().is_empty());
|
||||
let (items, total) =
|
||||
repo::crawler::list_dead_jobs(&state.db, search.as_deref(), limit, offset).await?;
|
||||
Ok(Json(crate::api::pagination::PagedResponse::with_total(
|
||||
items, limit, offset, total,
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "scope", rename_all = "snake_case")]
|
||||
enum RequeueRequest {
|
||||
All,
|
||||
Manga { manga_id: Uuid },
|
||||
Job { job_id: Uuid },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RequeueResponse {
|
||||
requeued: u64,
|
||||
}
|
||||
|
||||
async fn requeue_dead_jobs(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Json(body): Json<RequeueRequest>,
|
||||
) -> AppResult<Json<RequeueResponse>> {
|
||||
let scope = match &body {
|
||||
RequeueRequest::All => RequeueScope::All,
|
||||
RequeueRequest::Manga { manga_id } => RequeueScope::Manga(*manga_id),
|
||||
RequeueRequest::Job { job_id } => RequeueScope::Job(*job_id),
|
||||
};
|
||||
let requeued = repo::crawler::requeue_dead_jobs(&state.db, scope).await?;
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_dead_jobs_requeue",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "requeued": requeued, "scope": scope_label(&body) }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(RequeueResponse { requeued }))
|
||||
}
|
||||
|
||||
fn scope_label(r: &RequeueRequest) -> &'static str {
|
||||
match r {
|
||||
RequeueRequest::All => "all",
|
||||
RequeueRequest::Manga { .. } => "manga",
|
||||
RequeueRequest::Job { .. } => "job",
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn require_crawler(state: &AppState) -> Result<&std::sync::Arc<CrawlerControl>, AppError> {
|
||||
state.crawler.as_ref().ok_or_else(|| {
|
||||
AppError::ServiceUnavailable("crawler daemon is disabled".into())
|
||||
})
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
//! bot/API tokens cannot reach admin routes (see
|
||||
//! `crate::auth::extractor::RequireAdmin`).
|
||||
|
||||
pub mod crawler;
|
||||
pub mod mangas;
|
||||
pub mod resync;
|
||||
pub mod system;
|
||||
@@ -19,4 +20,5 @@ pub fn routes() -> Router<AppState> {
|
||||
.merge(mangas::routes())
|
||||
.merge(resync::routes())
|
||||
.merge(system::routes())
|
||||
.merge(crawler::routes())
|
||||
}
|
||||
|
||||
163
backend/tests/api_admin_crawler.rs
Normal file
163
backend/tests/api_admin_crawler.rs
Normal file
@@ -0,0 +1,163 @@
|
||||
//! Integration tests for the admin crawler observability/control API.
|
||||
//!
|
||||
//! The default test harness wires `AppState.crawler = None` (no daemon),
|
||||
//! so the *control* endpoints return 503 and the *read* endpoints that
|
||||
//! work off the DB (status shell, dead-jobs list/requeue) still function.
|
||||
//! This is exactly the production "daemon disabled" posture.
|
||||
|
||||
mod common;
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::Router;
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use tower::ServiceExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use common::{body_json, get, get_with_cookie, post_json_with_cookie, register_user, harness};
|
||||
|
||||
async fn seed_admin(pool: &PgPool, app: &Router) -> String {
|
||||
let (username, cookie) = register_user(app).await;
|
||||
let u = mangalord::repo::user::find_by_username(pool, &username)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
mangalord::repo::user::set_is_admin_unchecked(pool, u.id, true)
|
||||
.await
|
||||
.unwrap();
|
||||
cookie
|
||||
}
|
||||
|
||||
async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid {
|
||||
let manga_id = Uuid::new_v4();
|
||||
let chapter_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
|
||||
.bind(chapter_id)
|
||||
.bind(manga_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let job_id = Uuid::new_v4();
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \
|
||||
VALUES ($1, $2, 'dead', 5, 'boom')",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id,
|
||||
"source_chapter_key": "k",
|
||||
}))
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
job_id
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn get_status_requires_admin(pool: PgPool) {
|
||||
let h = harness(pool);
|
||||
// Unauthenticated → 401.
|
||||
let resp = h.app.clone().oneshot(get("/api/v1/admin/crawler")).await.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
|
||||
// Authenticated non-admin → 403.
|
||||
let (_u, cookie) = register_user(&h.app).await;
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn get_status_reports_disabled_daemon_with_queue_counts(pool: PgPool) {
|
||||
seed_dead_job(&pool, "Naruto").await;
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["daemon"], "disabled");
|
||||
assert_eq!(body["queue"]["dead"], 1);
|
||||
assert_eq!(body["browser"], "down");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) {
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
for uri in [
|
||||
"/api/v1/admin/crawler/run",
|
||||
"/api/v1/admin/crawler/browser/restart",
|
||||
"/api/v1/admin/crawler/session/clear-expired",
|
||||
] {
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(post_json_with_cookie(uri, json!({}), &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
resp.status(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"{uri} should be 503 when daemon disabled"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) {
|
||||
let job_id = seed_dead_job(&pool, "Bleach").await;
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
// List.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/dead-jobs", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["page"]["total"], 1);
|
||||
assert_eq!(body["items"][0]["manga_title"], "Bleach");
|
||||
|
||||
// Requeue the single job.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(post_json_with_cookie(
|
||||
"/api/v1/admin/crawler/dead-jobs/requeue",
|
||||
json!({ "scope": "job", "job_id": job_id }),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["requeued"], 1);
|
||||
|
||||
let state: String = sqlx::query_scalar("SELECT state FROM crawler_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(state, "pending");
|
||||
}
|
||||
Reference in New Issue
Block a user