diff --git a/backend/src/app.rs b/backend/src/app.rs index bf0908b..a3be869 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use anyhow::Context; use async_trait::async_trait; @@ -46,6 +46,24 @@ pub struct AppState { /// same wiring that builds the daemon's chapter dispatcher, so a /// force resync uses the daemon's BrowserManager + rate limiters. pub resync: Option>, + /// Crawler observability + control handle (live status, coordinated + /// browser restart, runtime session, manual run). `None` when the + /// daemon is disabled; admin handlers gate on `.is_some()` → 503. + pub crawler: Option>, +} + +/// Shared handle the admin crawler endpoints use to observe and control +/// the running daemon. Bundled so the handlers take one optional field on +/// `AppState` rather than many. +pub struct CrawlerControl { + pub browser_manager: Arc, + pub session: Arc, + pub status: crate::crawler::status::StatusHandle, + /// Used by the "run metadata pass now" endpoint; `None` when no + /// `CRAWLER_START_URL` is configured (cron disabled). + pub metadata_pass: Option>, + /// Drain budget for a manually-triggered coordinated browser restart. + pub drain_deadline: std::time::Duration, } /// Bundle returned by [`build`]. The router is what `axum::serve` consumes; @@ -80,12 +98,12 @@ pub async fn build(config: Config) -> anyhow::Result { let storage: Arc = Arc::new(LocalStorage::new(config.storage_dir.clone())); - let (daemon, resync) = if config.crawler.daemon_enabled { + let (daemon, resync, crawler) = if config.crawler.daemon_enabled { let spawned = spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?; - (Some(spawned.handle), Some(spawned.resync)) + (Some(spawned.handle), Some(spawned.resync), Some(spawned.crawler)) } else { tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)"); - (None, None) + (None, None, None) }; let auth_limiter = Arc::new(AuthRateLimiter::new(config.auth.rate_limit)); @@ -96,6 +114,7 @@ pub async fn build(config: Config) -> anyhow::Result { upload: config.upload.clone(), auth_limiter, resync, + crawler, }; let router = router(state).layer(cors_layer(&config.cors_allowed_origins)); Ok(AppHandle { router, daemon }) @@ -108,6 +127,7 @@ pub async fn build(config: Config) -> anyhow::Result { struct SpawnedDaemon { handle: daemon::DaemonHandle, resync: Arc, + crawler: Arc, } async fn spawn_crawler_daemon( @@ -115,11 +135,17 @@ async fn spawn_crawler_daemon( storage: Arc, cfg: &CrawlerConfig, ) -> anyhow::Result { - // Reqwest client with cookie jar pre-seeded so CDN image fetches - // include PHPSESSID. Same shape as bin/crawler.rs main(). + // Reqwest client with a shared cookie jar so CDN image fetches include + // PHPSESSID. The same `Arc` is held by the SessionController, so a + // runtime session refresh rewrites it in place. Initial value: a + // persisted runtime session (survives restart) takes precedence over + // CRAWLER_PHPSESSID env. let cookie_jar = Arc::new(reqwest::cookie::Jar::default()); + let initial_sid = crate::crawler::session_control::SessionController::load_persisted(&db) + .await + .or_else(|| cfg.phpsessid.clone()); if let (Some(sid), Some(domain), Some(start_url)) = - (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) + (&initial_sid, &cfg.cookie_domain, &cfg.start_url) { let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); let seed_url = reqwest::Url::parse(start_url) @@ -129,7 +155,7 @@ async fn spawn_crawler_daemon( let mut http_builder = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .no_proxy() - .cookie_provider(cookie_jar); + .cookie_provider(Arc::clone(&cookie_jar)); if let Some(ua) = &cfg.user_agent { http_builder = http_builder.user_agent(ua); } @@ -157,6 +183,23 @@ async fn spawn_crawler_daemon( } let tor_recircuit_max = cfg.tor_recircuit_max_attempts; + // Session controller + sticky session-expired flag. Created before the + // browser so the on_launch hook can read the *current* session value + // (rather than a value captured at startup), and so a runtime refresh + // updates the cookie everywhere. + let session_expired = Arc::new(AtomicBool::new(false)); + let session_controller = crate::crawler::session_control::SessionController::new( + initial_sid, + Arc::clone(&cookie_jar), + cfg.cookie_domain.clone(), + cfg.start_url.clone(), + db.clone(), + Arc::clone(&session_expired), + ); + + // Live status surface, sized to the worker count. + let status = crate::crawler::status::StatusHandle::new(cfg.chapter_workers); + // Browser manager. on_launch re-injects PHPSESSID on every fresh // chromium spawn so an idle teardown followed by re-launch stays // authenticated without operator action. @@ -165,18 +208,25 @@ async fn spawn_crawler_daemon( let chromium_proxy = crate::crawler::url_utils::chromium_proxy_arg(proxy); launch_opts.extra_args.push(format!("--proxy-server={chromium_proxy}")); } - let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) { - (Some(sid), Some(domain), Some(start_url)) => { - let sid = sid.clone(); + let on_launch = match (&cfg.cookie_domain, &cfg.start_url) { + (Some(domain), Some(start_url)) => { let domain = domain.clone(); let start_url = start_url.clone(); let tor_for_launch = tor.as_ref().map(Arc::clone); + let sc = Arc::clone(&session_controller); let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| { - let sid = sid.clone(); let domain = domain.clone(); let start_url = start_url.clone(); let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone); + let sc = Arc::clone(&sc); Box::pin(async move { + // Read the *current* session each launch so a runtime + // refresh is picked up on the next (re)launch. No session + // configured → run unauthenticated (metadata needs no auth). + let Some(sid) = sc.current().await else { + tracing::info!("on_launch: no session set — skipping inject + probe"); + return Ok(()); + }; session::inject_phpsessid(&browser, &sid, &domain) .await .context("on_launch: inject_phpsessid")?; @@ -197,8 +247,6 @@ async fn spawn_crawler_daemon( }; let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch); - let session_expired = Arc::new(AtomicBool::new(false)); - let metadata_pass: Option> = cfg.start_url.as_ref().map(|url| { let m: Arc = Arc::new(RealMetadataPass { browser_manager: Arc::clone(&browser_manager), @@ -211,6 +259,7 @@ async fn spawn_crawler_daemon( download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures, + status: status.clone(), tor: tor.as_ref().map(Arc::clone), }); m @@ -224,6 +273,9 @@ async fn spawn_crawler_daemon( rate: Arc::clone(&rate), download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, + transient_failures: Arc::new(AtomicU32::new(0)), + restart_threshold: cfg.browser_restart_threshold, + drain_deadline: cfg.job_timeout, tor: tor.as_ref().map(Arc::clone), }); @@ -261,21 +313,31 @@ async fn spawn_crawler_daemon( db, cancel, DaemonConfig { - metadata_pass, + metadata_pass: metadata_pass.clone(), dispatcher, chapter_workers: cfg.chapter_workers, daily_at: cfg.daily_at, tz: cfg.tz, retention_days: cfg.retention_days, session_expired, + status: status.clone(), job_timeout: cfg.job_timeout, extra_tasks: vec![reaper_task, shutdown_task], }, ); + let crawler = Arc::new(CrawlerControl { + browser_manager: Arc::clone(&browser_manager), + session: session_controller, + status, + metadata_pass, + drain_deadline: cfg.job_timeout, + }); + Ok(SpawnedDaemon { handle: daemon_handle, resync, + crawler, }) } @@ -295,6 +357,7 @@ struct RealMetadataPass { download_allowlist: DownloadAllowlist, max_image_bytes: usize, metadata_max_consecutive_failures: u32, + status: crate::crawler::status::StatusHandle, tor: Option>, } @@ -313,6 +376,7 @@ impl MetadataPass for RealMetadataPass { &self.download_allowlist, self.max_image_bytes, self.metadata_max_consecutive_failures, + Some(&self.status), self.tor.as_deref(), ) .await; @@ -326,6 +390,9 @@ impl MetadataPass for RealMetadataPass { // late, and a transient browser failure shouldn't cancel the // residual cover backlog. The backfill has its own per-call cap // so a runaway error stream can't monopolise the tick. + self.status + .set_phase(crate::crawler::status::Phase::CoverBackfill) + .await; match pipeline::backfill_missing_covers( &self.browser_manager, &self.db, @@ -363,6 +430,13 @@ struct RealChapterDispatcher { rate: Arc, download_allowlist: DownloadAllowlist, max_image_bytes: usize, + /// Consecutive transient chapter failures; resets on any success. + /// Drives the automatic coordinated browser restart. + transient_failures: Arc, + /// Consecutive-failure count that triggers an auto restart. + restart_threshold: u32, + /// How long a coordinated restart waits for in-flight leases to drain. + drain_deadline: std::time::Duration, tor: Option>, } @@ -400,10 +474,32 @@ impl ChapterDispatcher for RealChapterDispatcher { .await; drop(lease); match result { - Ok(outcome) => Ok(outcome), + Ok(outcome) => { + // Any successful dispatch (including a clean Skipped) + // means the browser is healthy — reset the streak. + self.transient_failures.store(0, Ordering::Release); + Ok(outcome) + } Err(e) => { + let streak = self.transient_failures.fetch_add(1, Ordering::AcqRel) + 1; if crate::crawler::nav::anyhow_looks_browser_dead(&e) { + // Hard browser-dead: lazy invalidate (next acquire + // relaunches). Reset the streak — we're recovering. self.browser_manager.invalidate().await; + self.transient_failures.store(0, Ordering::Release); + } else if self.restart_threshold > 0 && streak >= self.restart_threshold { + // Persistent transients that TOR recircuit couldn't + // fix — proactively restart Chromium. + tracing::warn!( + streak, + threshold = self.restart_threshold, + "auto browser restart: consecutive transient chapter failures" + ); + let _ = self + .browser_manager + .coordinated_restart(self.drain_deadline) + .await; + self.transient_failures.store(0, Ordering::Release); } Err(e) } diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 9892048..730c1e9 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -306,6 +306,8 @@ async fn run( // Circuit-breaker disabled for the operator-driven CLI: a manual // sweep should push through transient failures, not self-abort. 0, + // No live status surface for the one-shot CLI. + None, tor.as_deref(), ) .await?; diff --git a/backend/src/crawler/daemon.rs b/backend/src/crawler/daemon.rs index f9d3611..602790a 100644 --- a/backend/src/crawler/daemon.rs +++ b/backend/src/crawler/daemon.rs @@ -48,6 +48,7 @@ use tokio_util::sync::CancellationToken; use crate::crawler::content::SyncOutcome; use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT}; use crate::crawler::pipeline; +use crate::crawler::status::{Phase, StatusHandle, WorkerState}; /// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a /// big-endian i64. Hardcoded so every replica agrees on the lock identity @@ -86,6 +87,8 @@ pub struct DaemonConfig { pub tz: Tz, pub retention_days: u32, pub session_expired: Arc, + /// Live status surface updated by the cron + workers. + pub status: StatusHandle, /// Hard upper bound on a single job's dispatch. A job that exceeds it /// is acked failed (exponential backoff) rather than wedging a worker /// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic @@ -137,6 +140,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem tz, retention_days, session_expired, + status, job_timeout, extra_tasks, } = cfg; @@ -149,6 +153,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem tz, retention_days, metadata, + status: status.clone(), }; join.spawn(async move { ctx.run().await }); } else { @@ -161,6 +166,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem cancel: cancel.clone(), dispatcher: Arc::clone(&dispatcher), session_expired: Arc::clone(&session_expired), + status: status.clone(), job_timeout, id: worker_id, }; @@ -185,6 +191,7 @@ struct CronContext { tz: Tz, retention_days: u32, metadata: Arc, + status: StatusHandle, } impl CronContext { @@ -212,6 +219,11 @@ impl CronContext { // (NTP step, suspend/resume) don't strand us on a stale instant. let next = next_fire(Utc::now(), self.daily_at, self.tz); let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO); + self.status + .set_phase(Phase::Idle { + next_fire: Some(next), + }) + .await; tracing::info!( next_fire_utc = %next.to_rfc3339(), wait_seconds = wait.as_secs(), @@ -259,9 +271,13 @@ impl CronContext { let metadata = &self.metadata; let pool = &self.pool; let retention_days = self.retention_days; + let status = &self.status; let body = async move { match metadata.run().await { - Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"), + Ok(stats) => { + status.record_pass(&stats, Utc::now()).await; + tracing::info!(?stats, "cron: metadata pass done"); + } Err(e) => tracing::error!(?e, "cron: metadata pass failed"), } match pipeline::enqueue_bookmarked_pending(pool).await { @@ -299,6 +315,7 @@ struct WorkerContext { cancel: CancellationToken, dispatcher: Arc, session_expired: Arc, + status: StatusHandle, job_timeout: Duration, id: usize, } @@ -379,12 +396,25 @@ impl WorkerContext { }) }; + // Publish what this worker is doing for the live status surface. + if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload { + self.status + .set_worker( + self.id, + WorkerState::Working { + chapter_id: *chapter_id, + }, + ) + .await; + } + // Outer timeout: a dispatch that exceeds `job_timeout` is acked // failed (exponential backoff) rather than wedging the worker. let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) .catch_unwind(); let outcome = tokio::time::timeout(self.job_timeout, dispatch).await; heartbeat.abort(); + self.status.set_worker(self.id, WorkerState::Idle).await; let outcome = match outcome { Ok(o) => o, diff --git a/backend/src/crawler/mod.rs b/backend/src/crawler/mod.rs index 650a20a..a3465f6 100644 --- a/backend/src/crawler/mod.rs +++ b/backend/src/crawler/mod.rs @@ -26,6 +26,8 @@ pub mod rate_limit; pub mod resync; pub mod safety; pub mod session; +pub mod session_control; pub mod source; +pub mod status; pub mod tor; pub mod url_utils; diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index ffa37e7..6341c96 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -115,6 +115,7 @@ pub async fn run_metadata_pass( allowlist: &DownloadAllowlist, max_image_bytes: usize, max_consecutive_failures: u32, + status: Option<&crate::crawler::status::StatusHandle>, tor: Option<&crate::crawler::tor::TorController>, ) -> anyhow::Result { let lease = browser_manager @@ -122,6 +123,9 @@ pub async fn run_metadata_pass( .await .context("acquire browser lease for metadata pass")?; let browser_ref: &chromiumoxide::Browser = &lease; + if let Some(s) = status { + s.set_phase(crate::crawler::status::Phase::WalkingList).await; + } let source = { let s = TargetSource::new(start_url.to_string()); @@ -226,6 +230,14 @@ pub async fn run_metadata_pass( continue; } stats.discovered += 1; + if let Some(s) = status { + s.set_phase(crate::crawler::status::Phase::FetchingMetadata { + index: stats.discovered, + total: max_refs, + title: r.title.clone(), + }) + .await; + } tracing::info!( idx = stats.discovered, key = %r.source_manga_key, diff --git a/backend/src/crawler/session_control.rs b/backend/src/crawler/session_control.rs new file mode 100644 index 0000000..91e060a --- /dev/null +++ b/backend/src/crawler/session_control.rs @@ -0,0 +1,122 @@ +//! Runtime-updatable crawler session (PHPSESSID). +//! +//! At startup the session comes from `CRAWLER_PHPSESSID`, but it expires +//! and previously needed a container restart to refresh. This controller +//! lets an admin push a fresh cookie at runtime: it rewrites the reqwest +//! cookie jar (CDN image fetches), updates the in-memory value the browser +//! `on_launch` hook reads, persists it to `crawler_state` (so it survives +//! a restart), and clears the sticky `session_expired` flag. A subsequent +//! coordinated browser restart re-runs `on_launch`, re-injecting the new +//! cookie into Chromium and re-probing. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use anyhow::Context; +use serde_json::json; +use sqlx::PgPool; +use tokio::sync::RwLock; + +const STATE_KEY_RUNTIME_SESSION: &str = "runtime_session"; + +pub struct SessionController { + /// Current PHPSESSID — what `on_launch` injects into a fresh browser. + phpsessid: RwLock>, + /// The same `Arc` handed to the reqwest client; updating it here + /// updates the client's cookies (the jar is internally mutable). + cookie_jar: Arc, + cookie_domain: Option, + start_url: Option, + db: PgPool, + session_expired: Arc, +} + +impl SessionController { + pub fn new( + initial: Option, + cookie_jar: Arc, + cookie_domain: Option, + start_url: Option, + db: PgPool, + session_expired: Arc, + ) -> Arc { + Arc::new(Self { + phpsessid: RwLock::new(initial), + cookie_jar, + cookie_domain, + start_url, + db, + session_expired, + }) + } + + /// The PHPSESSID a fresh browser should inject (None when unset). + pub async fn current(&self) -> Option { + self.phpsessid.read().await.clone() + } + + /// Whether the sticky session-expired flag is set (chapter workers + /// idle while true). + pub fn is_expired(&self) -> bool { + self.session_expired.load(Ordering::Acquire) + } + + /// Clear the session-expired flag without changing the cookie — used + /// when the operator knows the session is fine and wants workers to + /// resume immediately. + pub fn clear_expired(&self) { + self.session_expired.store(false, Ordering::Release); + } + + /// Update the session everywhere: reqwest jar, in-memory value, and + /// persisted `crawler_state`. Clears the session-expired flag. Does + /// NOT relaunch the browser — the caller triggers a coordinated + /// restart so `on_launch` re-injects + re-probes. + pub async fn update(&self, sid: &str) -> anyhow::Result<()> { + let sid = sid.trim().to_string(); + anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty"); + + if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) { + let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); + let seed_url = + reqwest::Url::parse(start_url).context("parse start_url for cookie seed")?; + self.cookie_jar.add_cookie_str(&cookie_str, &seed_url); + } + *self.phpsessid.write().await = Some(sid.clone()); + persist(&self.db, &sid).await.context("persist runtime session")?; + self.session_expired.store(false, Ordering::Release); + tracing::info!("crawler session updated at runtime"); + Ok(()) + } + + /// Read a persisted runtime session (if any) from `crawler_state`. + /// Called at startup so a mid-day refresh survives a restart. + pub async fn load_persisted(db: &PgPool) -> Option { + let row: Option = + sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1") + .bind(STATE_KEY_RUNTIME_SESSION) + .fetch_optional(db) + .await + .ok() + .flatten(); + row.and_then(|v| { + v.get("phpsessid") + .and_then(|s| s.as_str()) + .map(|s| s.to_string()) + }) + } +} + +async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> { + sqlx::query( + "INSERT INTO crawler_state (key, value, updated_at) \ + VALUES ($1, $2, now()) \ + ON CONFLICT (key) DO UPDATE \ + SET value = EXCLUDED.value, updated_at = now()", + ) + .bind(STATE_KEY_RUNTIME_SESSION) + .bind(json!({ "phpsessid": sid })) + .execute(db) + .await?; + Ok(()) +} diff --git a/backend/src/crawler/status.rs b/backend/src/crawler/status.rs new file mode 100644 index 0000000..aaffca3 --- /dev/null +++ b/backend/src/crawler/status.rs @@ -0,0 +1,162 @@ +//! Live, in-process crawler status. +//! +//! The metadata pass runs inline in the cron tick (it is not a +//! `crawler_jobs` row), so without this surface "what is the crawler doing +//! right now" is unanswerable from the dashboard. The daemon publishes its +//! current [`Phase`] and per-worker activity into a shared +//! [`StatusHandle`]; the admin endpoint reads a [`CrawlerStatus`] snapshot +//! and composes it with DB-derived queue counts + the session/browser +//! flags. +//! +//! NOTE: this is per-process state. The deployment is a single server +//! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals +//! (last-pass summary, runtime session) are persisted in `crawler_state`. + +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use serde::Serialize; +use tokio::sync::RwLock; +use uuid::Uuid; + +use crate::crawler::pipeline::MetadataStats; + +/// What the daemon is doing right now. Serialised with an internal `state` +/// tag so the frontend can switch on it. +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "state", rename_all = "snake_case")] +pub enum Phase { + /// Sleeping until the next scheduled metadata pass. + Idle { next_fire: Option> }, + /// Walking the source catalog list pages. + WalkingList, + /// Fetching one manga's metadata. `index`/`total` drive a progress bar + /// (`total` is `None` when the source size is unknown / uncapped). + FetchingMetadata { + index: usize, + total: Option, + title: String, + }, + /// Backfilling covers that failed on first attempt. + CoverBackfill, +} + +/// Per-worker activity. (Browser-restart and session-expired states are +/// surfaced as separate top-level fields by the endpoint, sourced from the +/// `BrowserManager` phase and the session flag respectively.) +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "state", rename_all = "snake_case")] +pub enum WorkerState { + Idle, + Working { chapter_id: Uuid }, +} + +/// Summary of the most recent metadata pass (persisted across restarts in +/// `crawler_state` by the cron; mirrored here for the live read). +#[derive(Clone, Debug, Serialize, Default)] +pub struct LastPass { + pub at: Option>, + pub discovered: usize, + pub upserted: usize, + pub covers_fetched: usize, + pub mangas_failed: usize, +} + +/// The mutable slice of status the daemon owns. Session/browser/queue are +/// composed at read time by the endpoint (they live elsewhere), so they +/// are not stored here. +#[derive(Clone, Debug, Serialize)] +pub struct CrawlerStatus { + pub phase: Phase, + pub workers: Vec, + pub last_pass: LastPass, +} + +impl CrawlerStatus { + fn new(num_workers: usize) -> Self { + Self { + phase: Phase::Idle { next_fire: None }, + workers: (0..num_workers.max(1)).map(|_| WorkerState::Idle).collect(), + last_pass: LastPass::default(), + } + } +} + +/// Cloneable handle the daemon tasks use to publish status. Cheap to clone +/// (`Arc`). All writers funnel through the helper methods so locking stays +/// localised. +#[derive(Clone)] +pub struct StatusHandle(Arc>); + +impl StatusHandle { + pub fn new(num_workers: usize) -> Self { + Self(Arc::new(RwLock::new(CrawlerStatus::new(num_workers)))) + } + + pub async fn set_phase(&self, phase: Phase) { + self.0.write().await.phase = phase; + } + + pub async fn set_worker(&self, id: usize, state: WorkerState) { + let mut s = self.0.write().await; + if let Some(slot) = s.workers.get_mut(id) { + *slot = state; + } + } + + /// Record a finished metadata pass. Stamps `at` with `now`. + pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime) { + let mut s = self.0.write().await; + s.last_pass = LastPass { + at: Some(at), + discovered: stats.discovered, + upserted: stats.upserted, + covers_fetched: stats.covers_fetched, + mangas_failed: stats.mangas_failed, + }; + } + + /// Seed the last-pass summary from a persisted `crawler_state` value on + /// startup so the dashboard isn't blank until the first tick. + pub async fn set_last_pass(&self, last: LastPass) { + self.0.write().await.last_pass = last; + } + + pub async fn snapshot(&self) -> CrawlerStatus { + self.0.read().await.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn set_worker_updates_only_that_slot() { + let h = StatusHandle::new(2); + let cid = Uuid::new_v4(); + h.set_worker(1, WorkerState::Working { chapter_id: cid }).await; + let snap = h.snapshot().await; + assert!(matches!(snap.workers[0], WorkerState::Idle)); + assert!(matches!(snap.workers[1], WorkerState::Working { .. })); + // Out-of-range id is a no-op, not a panic. + h.set_worker(99, WorkerState::Idle).await; + } + + #[tokio::test] + async fn record_pass_captures_stats_and_timestamp() { + let h = StatusHandle::new(1); + let stats = MetadataStats { + discovered: 5, + upserted: 3, + covers_fetched: 2, + mangas_failed: 1, + }; + let at = Utc::now(); + h.record_pass(&stats, at).await; + let snap = h.snapshot().await; + assert_eq!(snap.last_pass.discovered, 5); + assert_eq!(snap.last_pass.upserted, 3); + assert_eq!(snap.last_pass.at, Some(at)); + } +} diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index 2d8dbe6..2560b1b 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -17,8 +17,9 @@ //! Each public function is a transaction boundary so a partial failure //! mid-call leaves the DB in its pre-call state. -use chrono::Utc; -use sqlx::{PgPool, Postgres, Transaction}; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use sqlx::{FromRow, PgPool, Postgres, Transaction}; use uuid::Uuid; use crate::crawler::source::{SourceChapterRef, SourceManga}; @@ -618,3 +619,169 @@ pub async fn last_run_completed_cleanly( .unwrap_or(true)) } +// --------------------------------------------------------------------------- +// Dead-letter jobs: admin observability + requeue. +// --------------------------------------------------------------------------- + +/// A `dead` crawler job joined to its chapter/manga context for the admin +/// dead-letter view. Chapter columns are `Option` because the join is +/// best-effort (the chapter may have been removed since the job died, or +/// the job may be a non-chapter kind). +#[derive(Debug, Clone, Serialize, FromRow)] +pub struct DeadJob { + pub id: Uuid, + pub kind: String, + pub chapter_id: Option, + pub manga_id: Option, + pub manga_title: Option, + pub chapter_number: Option, + pub attempts: i32, + pub max_attempts: i32, + pub last_error: Option, + pub updated_at: DateTime, +} + +/// Paginated list of `dead` jobs, newest-failed first, joined to chapter + +/// manga context. `search` filters on manga title (case-insensitive +/// substring). Returns the page slice plus the unfiltered-by-page total. +pub async fn list_dead_jobs( + pool: &PgPool, + search: Option<&str>, + limit: i64, + offset: i64, +) -> sqlx::Result<(Vec, i64)> { + let search_pat = search + .map(|s| format!("%{}%", s.trim())) + .filter(|p| p.len() > 2); + + let items: Vec = sqlx::query_as( + r#" + SELECT + cj.id, + cj.payload->>'kind' AS kind, + (cj.payload->>'chapter_id')::uuid AS chapter_id, + c.manga_id AS manga_id, + m.title AS manga_title, + c.number AS chapter_number, + cj.attempts, + cj.max_attempts, + cj.last_error, + cj.updated_at + FROM crawler_jobs cj + LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid + LEFT JOIN mangas m ON m.id = c.manga_id + WHERE cj.state = 'dead' + AND ($1::text IS NULL OR m.title ILIKE $1) + ORDER BY cj.updated_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(&search_pat) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await?; + + let total: i64 = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM crawler_jobs cj + LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid + LEFT JOIN mangas m ON m.id = c.manga_id + WHERE cj.state = 'dead' + AND ($1::text IS NULL OR m.title ILIKE $1) + "#, + ) + .bind(&search_pat) + .fetch_one(pool) + .await?; + + Ok((items, total)) +} + +/// Scope of a dead-job requeue. +#[derive(Debug, Clone)] +pub enum RequeueScope { + /// Every dead job. + All, + /// Dead jobs whose chapter belongs to this manga. + Manga(Uuid), + /// A single dead job by its id. + Job(Uuid), +} + +/// Requeue dead jobs back to `pending` with a fresh attempt budget. This is +/// an explicit operator override, so it bypasses the dead-letter quarantine +/// the enqueue helpers honour (we act directly on the row). Skips any dead +/// job whose chapter already has a `pending`/`running` job so the partial +/// dedup index is never violated. Returns the number of rows requeued. +pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result { + // Guard against resurrecting a dead job when a live one already covers + // the same chapter (would otherwise hit the dedup unique index). + const NO_LIVE_DUP: &str = r#" + AND NOT EXISTS ( + SELECT 1 FROM crawler_jobs live + WHERE live.payload->>'kind' = 'sync_chapter_content' + AND live.payload->>'chapter_id' = crawler_jobs.payload->>'chapter_id' + AND live.state IN ('pending','running') + ) + "#; + const SET: &str = "SET state = 'pending', attempts = 0, leased_until = NULL, \ + last_error = NULL, scheduled_at = now(), updated_at = now()"; + + let affected = match scope { + RequeueScope::All => { + sqlx::query(&format!( + "UPDATE crawler_jobs {SET} WHERE state = 'dead' {NO_LIVE_DUP}" + )) + .execute(pool) + .await? + .rows_affected() + } + RequeueScope::Manga(manga_id) => { + sqlx::query(&format!( + "UPDATE crawler_jobs {SET} \ + WHERE state = 'dead' \ + AND (payload->>'chapter_id')::uuid IN \ + (SELECT id FROM chapters WHERE manga_id = $1) \ + {NO_LIVE_DUP}" + )) + .bind(manga_id) + .execute(pool) + .await? + .rows_affected() + } + RequeueScope::Job(job_id) => { + sqlx::query(&format!( + "UPDATE crawler_jobs {SET} WHERE state = 'dead' AND id = $1 {NO_LIVE_DUP}" + )) + .bind(job_id) + .execute(pool) + .await? + .rows_affected() + } + }; + Ok(affected) +} + +/// Count crawler jobs grouped by state — drives the dashboard queue +/// gauges. Returns `(pending, running, dead)`. +pub async fn job_state_counts(pool: &PgPool) -> sqlx::Result<(i64, i64, i64)> { + let rows: Vec<(String, i64)> = + sqlx::query_as("SELECT state, COUNT(*) FROM crawler_jobs GROUP BY state") + .fetch_all(pool) + .await?; + let mut pending = 0; + let mut running = 0; + let mut dead = 0; + for (state, n) in rows { + match state.as_str() { + "pending" => pending = n, + "running" => running = n, + "dead" => dead = n, + _ => {} + } + } + Ok((pending, running, dead)) +} + diff --git a/backend/tests/api_admin_role.rs b/backend/tests/api_admin_role.rs index a88f503..f7930d7 100644 --- a/backend/tests/api_admin_role.rs +++ b/backend/tests/api_admin_role.rs @@ -50,6 +50,7 @@ fn admin_test_router(pool: PgPool) -> (Router, TempDir) { upload: UploadConfig::default(), auth_limiter, resync: None, + crawler: None, }; let app = Router::new() .nest("/api/v1", api::routes()) diff --git a/backend/tests/common/mod.rs b/backend/tests/common/mod.rs index 12315b3..4d1bc53 100644 --- a/backend/tests/common/mod.rs +++ b/backend/tests/common/mod.rs @@ -78,6 +78,7 @@ fn harness_with_auth_config( // handlers return 503 in this config. Tests that need a stub // resync service swap it in via `harness_with_resync`. resync: None, + crawler: None, }; Harness { app: router(state), _storage_dir: storage_dir } } @@ -152,6 +153,7 @@ pub fn harness_with_resync( }, auth_limiter, resync: Some(resync), + crawler: None, }; Harness { app: router(state), diff --git a/backend/tests/crawler_daemon.rs b/backend/tests/crawler_daemon.rs index 69c41c5..f2d7a2e 100644 --- a/backend/tests/crawler_daemon.rs +++ b/backend/tests/crawler_daemon.rs @@ -40,6 +40,7 @@ fn make_cfg( tz: Tz::UTC, retention_days: 7, session_expired, + status: mangalord::crawler::status::StatusHandle::new(workers), job_timeout: Duration::from_secs(60), extra_tasks: Vec::new(), } diff --git a/backend/tests/crawler_dead_jobs.rs b/backend/tests/crawler_dead_jobs.rs new file mode 100644 index 0000000..7165ca3 --- /dev/null +++ b/backend/tests/crawler_dead_jobs.rs @@ -0,0 +1,171 @@ +//! Integration tests for the dead-letter admin queries in +//! `repo::crawler`: listing dead jobs with manga/chapter context and the +//! scoped requeue (all / per-manga / single) used by the admin dashboard. + +use mangalord::repo::crawler::{self, RequeueScope}; +use serde_json::json; +use sqlx::PgPool; +use uuid::Uuid; + +/// Seed a manga + chapter and return their ids. +async fn seed_chapter(pool: &PgPool, title: &str, number: i32) -> (Uuid, Uuid) { + let manga_id = Uuid::new_v4(); + let chapter_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)") + .bind(manga_id) + .bind(title) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, $3)") + .bind(chapter_id) + .bind(manga_id) + .bind(number) + .execute(pool) + .await + .unwrap(); + (manga_id, chapter_id) +} + +/// Insert a crawler_jobs row in a given state for a chapter-content job. +async fn insert_job(pool: &PgPool, chapter_id: Uuid, state: &str, attempts: i32) -> Uuid { + let id = Uuid::new_v4(); + let payload = json!({ + "kind": "sync_chapter_content", + "source_id": "target", + "chapter_id": chapter_id, + "source_chapter_key": "k", + }); + sqlx::query( + "INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \ + VALUES ($1, $2, $3, $4, 'boom')", + ) + .bind(id) + .bind(payload) + .bind(state) + .bind(attempts) + .execute(pool) + .await + .unwrap(); + id +} + +async fn state_of(pool: &PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test(migrations = "./migrations")] +async fn list_dead_jobs_returns_context_and_total(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await; + insert_job(&pool, c1, "dead", 5).await; + // A non-dead job must not appear. + let (_m2, c2) = seed_chapter(&pool, "Bleach", 1).await; + insert_job(&pool, c2, "pending", 0).await; + + let (items, total) = crawler::list_dead_jobs(&pool, None, 50, 0).await.unwrap(); + assert_eq!(total, 1); + assert_eq!(items.len(), 1); + let row = &items[0]; + assert_eq!(row.manga_title.as_deref(), Some("Naruto")); + assert_eq!(row.chapter_number, Some(700)); + assert_eq!(row.attempts, 5); + assert_eq!(row.last_error.as_deref(), Some("boom")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn list_dead_jobs_filters_by_title_search(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await; + insert_job(&pool, c1, "dead", 5).await; + let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await; + insert_job(&pool, c2, "dead", 5).await; + + let (items, total) = crawler::list_dead_jobs(&pool, Some("piece"), 50, 0) + .await + .unwrap(); + assert_eq!(total, 1); + assert_eq!(items[0].manga_title.as_deref(), Some("One Piece")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn requeue_all_resets_dead_jobs_to_pending(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "A", 1).await; + let (_m2, c2) = seed_chapter(&pool, "B", 1).await; + let j1 = insert_job(&pool, c1, "dead", 5).await; + let j2 = insert_job(&pool, c2, "dead", 5).await; + + let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All) + .await + .unwrap(); + assert_eq!(n, 2); + assert_eq!(state_of(&pool, j1).await, "pending"); + assert_eq!(state_of(&pool, j2).await, "pending"); + let attempts: i32 = sqlx::query_scalar("SELECT attempts FROM crawler_jobs WHERE id = $1") + .bind(j1) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(attempts, 0, "attempts reset on requeue"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn requeue_by_manga_scopes_to_that_manga(pool: PgPool) { + let (m1, c1) = seed_chapter(&pool, "A", 1).await; + let (_m2, c2) = seed_chapter(&pool, "B", 1).await; + let j1 = insert_job(&pool, c1, "dead", 5).await; + let j2 = insert_job(&pool, c2, "dead", 5).await; + + let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Manga(m1)) + .await + .unwrap(); + assert_eq!(n, 1); + assert_eq!(state_of(&pool, j1).await, "pending"); + assert_eq!(state_of(&pool, j2).await, "dead", "other manga untouched"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn requeue_single_job(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "A", 1).await; + let (_m2, c2) = seed_chapter(&pool, "B", 1).await; + let j1 = insert_job(&pool, c1, "dead", 5).await; + let j2 = insert_job(&pool, c2, "dead", 5).await; + + let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Job(j1)) + .await + .unwrap(); + assert_eq!(n, 1); + assert_eq!(state_of(&pool, j1).await, "pending"); + assert_eq!(state_of(&pool, j2).await, "dead"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn requeue_skips_dead_when_live_job_exists_for_same_chapter(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "A", 1).await; + let dead = insert_job(&pool, c1, "dead", 5).await; + // A live pending job for the SAME chapter already exists. + insert_job(&pool, c1, "pending", 0).await; + + let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All) + .await + .unwrap(); + assert_eq!(n, 0, "must not resurrect a dead job that has a live counterpart"); + assert_eq!(state_of(&pool, dead).await, "dead"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn job_state_counts_groups_by_state(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "A", 1).await; + let (_m2, c2) = seed_chapter(&pool, "B", 1).await; + let (_m3, c3) = seed_chapter(&pool, "C", 1).await; + insert_job(&pool, c1, "pending", 0).await; + insert_job(&pool, c2, "dead", 5).await; + insert_job(&pool, c3, "dead", 5).await; + + let (pending, running, dead) = crawler::job_state_counts(&pool).await.unwrap(); + assert_eq!(pending, 1); + assert_eq!(running, 0); + assert_eq!(dead, 2); +}