//! In-process crawler daemon. //! //! Owns a cron task that fires a daily metadata pass and N worker tasks //! that drain `SyncChapterContent` jobs from `crawler_jobs`. The dispatch //! seams ([`MetadataPass`], [`ChapterDispatcher`]) are traits so tests can //! inject stubs without standing up a real Chromium / `Source` impl. //! //! ## Cron //! //! Each tick: //! 1. Acquire a Postgres advisory lock on a dedicated pool connection //! (multi-replica safety). Skip the tick on contention. //! 2. Call [`MetadataPass::run`] (typically `pipeline::run_metadata_pass`). //! 3. Enqueue `SyncChapterContent` jobs for any bookmarked manga whose //! chapters still have `page_count = 0`. //! 4. Reap `done` jobs older than `retention_days`. //! 5. Persist `last_metadata_tick_at` and release the lock. //! //! If the last persisted tick is older than the most recent scheduled slot //! (e.g. backend was down at midnight), the daemon fires immediately on //! startup before resuming the regular schedule. //! //! ## Workers //! //! Each worker leases one chapter-content job at a time, dispatches via the //! [`ChapterDispatcher`], and acks `done` / `failed` / re-`pending` based on //! the outcome. A `SessionExpired` outcome flips the sticky //! `session_expired` flag — all workers idle while it's set (until operator //! restart with a refreshed PHPSESSID). //! //! Worker dispatch is wrapped in `catch_unwind` so a panicking handler //! marks the job failed instead of taking down the worker task. use std::panic::AssertUnwindSafe; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use chrono::{DateTime, Datelike, NaiveTime, TimeZone, Timelike, Utc}; use chrono_tz::Tz; use futures_util::FutureExt; use serde_json::json; use sqlx::PgPool; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use crate::crawler::content::SyncOutcome; use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT}; use crate::crawler::pipeline; /// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a /// big-endian i64. Hardcoded so every replica agrees on the lock identity /// without consulting config. pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244; const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at"; #[async_trait] pub trait MetadataPass: Send + Sync { async fn run(&self) -> anyhow::Result; } #[async_trait] pub trait ChapterDispatcher: Send + Sync { async fn dispatch(&self, payload: JobPayload) -> anyhow::Result; } /// Configuration for [`spawn`]. Use `None` for `metadata_pass` to disable /// the cron entirely (worker-pool-only mode — useful when only the /// bookmark-triggered enqueue path is wanted). pub struct DaemonConfig { pub metadata_pass: Option>, pub dispatcher: Arc, pub chapter_workers: usize, pub daily_at: NaiveTime, pub tz: Tz, pub retention_days: u32, pub session_expired: Arc, /// Tasks that should run alongside the cron + workers and be cancelled /// on shutdown. Used to hand the daemon ownership of the browser /// manager's idle reaper. pub extra_tasks: Vec>, } pub struct DaemonHandle { cancel: CancellationToken, join: JoinSet<()>, extra: Vec>, } impl DaemonHandle { /// Trigger shutdown and await all worker / cron / extra tasks. pub async fn shutdown(mut self) { self.cancel.cancel(); while self.join.join_next().await.is_some() {} for task in self.extra.drain(..) { let _ = task.await; } } /// Cancellation token that drives shutdown — exposed so callers /// (`app::spawn_crawler_daemon`) can hand the same token to auxiliary /// tasks (e.g. the BrowserManager idle reaper) and have them stop on /// the daemon's signal. pub fn cancel_token(&self) -> CancellationToken { self.cancel.clone() } } /// Spawn the daemon. Returns immediately; tasks run in the background. /// Pass an external [`CancellationToken`] so auxiliary tasks (e.g. a /// BrowserManager idle reaper) can share the same shutdown signal — /// typically created in the caller, cloned into both spawns. pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> DaemonHandle { let mut join = JoinSet::new(); let DaemonConfig { metadata_pass, dispatcher, chapter_workers, daily_at, tz, retention_days, session_expired, extra_tasks, } = cfg; if let Some(metadata) = metadata_pass { let ctx = CronContext { pool: pool.clone(), cancel: cancel.clone(), daily_at, tz, retention_days, metadata, }; join.spawn(async move { ctx.run().await }); } else { tracing::info!("crawler daemon: no metadata_pass — cron disabled"); } for worker_id in 0..chapter_workers.max(1) { let ctx = WorkerContext { pool: pool.clone(), cancel: cancel.clone(), dispatcher: Arc::clone(&dispatcher), session_expired: Arc::clone(&session_expired), id: worker_id, }; join.spawn(async move { ctx.run().await }); } DaemonHandle { cancel, join, extra: extra_tasks, } } // --------------------------------------------------------------------------- // Cron // --------------------------------------------------------------------------- struct CronContext { pool: PgPool, cancel: CancellationToken, daily_at: NaiveTime, tz: Tz, retention_days: u32, metadata: Arc, } impl CronContext { async fn run(self) { // On startup, fire immediately if the most recent slot has already // passed and we never recorded a tick for it. let now = Utc::now(); let mut catchup = match read_last_tick(&self.pool).await { Ok(Some(last)) => previous_fire(now, self.daily_at, self.tz) > last, Ok(None) => true, Err(e) => { tracing::warn!(?e, "cron: read_last_tick failed; assuming no catch-up"); false } }; loop { if catchup { tracing::info!("cron: catch-up tick (missed scheduled slot)"); self.run_tick().await; catchup = false; continue; } // Recompute next-fire from now() each iteration so clock jumps // (NTP step, suspend/resume) don't strand us on a stale instant. let next = next_fire(Utc::now(), self.daily_at, self.tz); let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO); tracing::info!( next_fire_utc = %next.to_rfc3339(), wait_seconds = wait.as_secs(), "cron: sleeping until next slot" ); tokio::select! { _ = tokio::time::sleep(wait) => {} _ = self.cancel.cancelled() => { tracing::info!("cron: shutdown"); return; } } self.run_tick().await; } } async fn run_tick(&self) { let mut conn = match self.pool.acquire().await { Ok(c) => c, Err(e) => { tracing::error!(?e, "cron: acquire conn failed; skipping tick"); return; } }; // pg_try_advisory_lock is session-scoped — we must hold the same // connection for the unlock or the call silently no-ops on a // different connection from the pool. let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)") .bind(CRON_LOCK_KEY) .fetch_one(&mut *conn) .await .unwrap_or(false); if !acquired { tracing::info!("cron: tick skipped — another replica holds the lock"); return; } match self.metadata.run().await { Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"), Err(e) => tracing::error!(?e, "cron: metadata pass failed"), } match pipeline::enqueue_bookmarked_pending(&self.pool).await { Ok(summary) => tracing::info!(?summary, "cron: enqueued bookmarked-pending"), Err(e) => tracing::error!(?e, "cron: enqueue_bookmarked_pending failed"), } match jobs::reap_done(&self.pool, self.retention_days).await { Ok(n) => tracing::info!(reaped = n, "cron: done-job reaper finished"), Err(e) => tracing::error!(?e, "cron: done-job reaper failed"), } if let Err(e) = write_last_tick(&self.pool, Utc::now()).await { tracing::warn!(?e, "cron: persist last_metadata_tick_at failed"); } let _ = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(CRON_LOCK_KEY) .execute(&mut *conn) .await; drop(conn); } } // --------------------------------------------------------------------------- // Workers // --------------------------------------------------------------------------- struct WorkerContext { pool: PgPool, cancel: CancellationToken, dispatcher: Arc, session_expired: Arc, id: usize, } impl WorkerContext { async fn run(self) { loop { if self.cancel.is_cancelled() { tracing::info!(worker = self.id, "worker: shutdown"); return; } if self.session_expired.load(Ordering::Acquire) { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(30)) => continue, _ = self.cancel.cancelled() => return, } } let leases = match jobs::lease( &self.pool, Some(KIND_SYNC_CHAPTER_CONTENT), 1, Duration::from_secs(60), ) .await { Ok(v) => v, Err(e) => { tracing::warn!(worker = self.id, ?e, "worker: lease failed"); tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => continue, _ = self.cancel.cancelled() => return, } } }; let Some(lease) = leases.into_iter().next() else { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(1)) => continue, _ = self.cancel.cancelled() => return, } }; self.process_lease(lease).await; } } async fn process_lease(&self, lease: Lease) { // Consumer-side dedup safety net: if the chapter already has pages // (because a force-refetch race or a job that was re-enqueued // after a previous one finished), ack done without re-fetching. if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload { let page_count: Option = sqlx::query_scalar( "SELECT page_count FROM chapters WHERE id = $1", ) .bind(chapter_id) .fetch_optional(&self.pool) .await .ok() .flatten(); if matches!(page_count, Some(n) if n > 0) { let _ = jobs::ack_done(&self.pool, lease.id).await; return; } } let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) .catch_unwind() .await; match outcome { Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => { let _ = jobs::ack_done(&self.pool, lease.id).await; } Ok(Ok(SyncOutcome::SessionExpired)) => { tracing::error!( worker = self.id, lease_id = %lease.id, "session expired — workers will idle until restart" ); self.session_expired.store(true, Ordering::Release); let _ = jobs::release(&self.pool, lease.id).await; } Ok(Err(e)) => { tracing::warn!( worker = self.id, lease_id = %lease.id, error = ?e, "worker: dispatch error — ack failed" ); let _ = jobs::ack_failed( &self.pool, lease.id, &format!("{e:#}"), lease.attempts, lease.max_attempts, ) .await; } Err(_panic) => { tracing::error!( worker = self.id, lease_id = %lease.id, "worker: dispatcher panicked — ack failed" ); let _ = jobs::ack_failed( &self.pool, lease.id, "worker panicked", lease.attempts, lease.max_attempts, ) .await; } } } } // --------------------------------------------------------------------------- // Cron timing primitives // --------------------------------------------------------------------------- /// Compute the next UTC instant when `daily_at` (interpreted in `tz`) will /// fire, strictly after `now`. Handles DST gaps (spring-forward) by /// advancing past the gap; on DST overlap (fall-back) picks the later /// instant so the job runs once, not twice. pub fn next_fire(now: DateTime, daily_at: NaiveTime, tz: Tz) -> DateTime { let now_local = now.with_timezone(&tz); // Start with today's slot in the local TZ. let mut candidate = local_at(now_local.date_naive(), daily_at, tz); // If today's slot is in the past (or now), roll forward day-by-day. while candidate <= now { let next_day = candidate .with_timezone(&tz) .date_naive() .succ_opt() .unwrap_or_else(|| { // Defensive: succ_opt only fails at chrono's max date. chrono::NaiveDate::from_ymd_opt( candidate.year(), candidate.month(), candidate.day(), ) .expect("valid date") }); candidate = local_at(next_day, daily_at, tz); } candidate } /// The most recent fire instant at or before `now`. Used to detect missed /// slots after a restart. pub fn previous_fire(now: DateTime, daily_at: NaiveTime, tz: Tz) -> DateTime { let now_local = now.with_timezone(&tz); let today = local_at(now_local.date_naive(), daily_at, tz); if today <= now { return today; } let yesterday = now_local .date_naive() .pred_opt() .expect("a day before now"); local_at(yesterday, daily_at, tz) } /// Resolve a local date+time to a UTC instant in `tz`, navigating DST /// edges deterministically: /// - `LocalResult::Single` → that instant. /// - `LocalResult::Ambiguous(_, latest)` → the later instant (fall-back /// hour). Picking latest means a daily job fires once across the /// repeated hour, not twice. /// - `LocalResult::None` → spring-forward gap. Advance the local time /// by 1 minute and try again, repeating up to 120 times (so the worst /// case is still well inside an hour-long gap). fn local_at(date: chrono::NaiveDate, time: NaiveTime, tz: Tz) -> DateTime { use chrono::LocalResult; for offset_minutes in 0..120 { let mut t = time; if offset_minutes > 0 { let added = chrono::NaiveTime::from_num_seconds_from_midnight_opt( ((time.num_seconds_from_midnight() as i64 + offset_minutes * 60) % 86_400) as u32, 0, ) .unwrap_or(time); t = added; } let naive = date.and_time(t); match tz.from_local_datetime(&naive) { LocalResult::Single(dt) => return dt.with_timezone(&Utc), LocalResult::Ambiguous(_, latest) => return latest.with_timezone(&Utc), LocalResult::None => continue, } } // Should be unreachable — DST gaps are always less than an hour. Utc.from_utc_datetime(&date.and_time(time)) } // --------------------------------------------------------------------------- // crawler_state I/O // --------------------------------------------------------------------------- async fn read_last_tick(pool: &PgPool) -> sqlx::Result>> { let row: Option = sqlx::query_scalar( "SELECT value FROM crawler_state WHERE key = $1", ) .bind(STATE_KEY_LAST_TICK) .fetch_optional(pool) .await?; Ok(row.and_then(|v| { v.get("at") .and_then(|s| s.as_str()) .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.with_timezone(&Utc)) })) } async fn write_last_tick(pool: &PgPool, at: DateTime) -> sqlx::Result<()> { sqlx::query( "INSERT INTO crawler_state (key, value, updated_at) \ VALUES ($1, $2, now()) \ ON CONFLICT (key) DO UPDATE \ SET value = EXCLUDED.value, updated_at = now()", ) .bind(STATE_KEY_LAST_TICK) .bind(json!({ "at": at.to_rfc3339() })) .execute(pool) .await?; Ok(()) } // --------------------------------------------------------------------------- // Test helpers (not gated on cfg(test) — integration tests in tests/ dir // need them too). // --------------------------------------------------------------------------- pub mod test_support { //! Lightweight stubs the daemon tests use. Public because integration //! tests live outside this module. use super::*; use std::sync::atomic::AtomicUsize; pub struct CountingMetadataPass { pub count: AtomicUsize, } impl Default for CountingMetadataPass { fn default() -> Self { Self { count: AtomicUsize::new(0), } } } #[async_trait] impl MetadataPass for CountingMetadataPass { async fn run(&self) -> anyhow::Result { self.count.fetch_add(1, Ordering::AcqRel); Ok(pipeline::MetadataStats::default()) } } pub type DispatchFn = Arc< dyn Fn(JobPayload) -> futures_util::future::BoxFuture<'static, anyhow::Result> + Send + Sync, >; pub struct StubDispatcher { pub handler: DispatchFn, } #[async_trait] impl ChapterDispatcher for StubDispatcher { async fn dispatch(&self, payload: JobPayload) -> anyhow::Result { (self.handler)(payload).await } } pub fn always_done() -> Arc { Arc::new(StubDispatcher { handler: Arc::new(|_| Box::pin(async { Ok(SyncOutcome::Fetched { pages: 1 }) })), }) } pub fn panicking_dispatcher() -> Arc { Arc::new(StubDispatcher { handler: Arc::new(|_| Box::pin(async { panic!("intentional dispatcher panic") })), }) } } #[cfg(test)] mod tests { use super::*; use chrono::Duration as ChronoDuration; fn dt_utc(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime { Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).unwrap() } #[test] fn next_fire_in_utc_at_midnight_advances_one_day() { let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); let next = next_fire(now, at, Tz::UTC); // Next midnight is May 26 00:00 UTC. assert_eq!(next, dt_utc(2026, 5, 26, 0, 0)); } #[test] fn next_fire_before_today_slot_returns_today() { let now = dt_utc(2026, 5, 25, 23, 0); // 23:00 UTC let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap(); let next = next_fire(now, at, Tz::UTC); assert_eq!(next, dt_utc(2026, 5, 25, 23, 30)); } #[test] fn next_fire_skips_spring_forward_gap_in_europe_berlin() { // 2024-03-31: clocks jump 02:00 -> 03:00 in Berlin (CET -> CEST). // Asking for daily_at = 02:30 on the morning of the jump should // land on the *next valid* local instant past the gap. We test // by computing `next_fire` at 2024-03-31 00:30 UTC (= 01:30 CET, // i.e. just before the gap). The next 02:30 local does not exist, // so the helper advances past it. let now = dt_utc(2024, 3, 31, 0, 30); // 01:30 local Berlin (CET = UTC+1) let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap(); let next = next_fire(now, at, Tz::Europe__Berlin); // Local Berlin time skips from 02:00 -> 03:00. After the +1 minute // search, the first valid slot is 03:00 local on 2024-03-31, which // is 01:00 UTC (CEST = UTC+2). // We assert the result is strictly between (now) and 1h later // and is in UTC — the exact minute depends on how many +1m steps // were required. assert!(next > now); assert!(next < now + ChronoDuration::hours(2)); } #[test] fn next_fire_on_fall_back_picks_later_instant() { // 2024-10-27: clocks jump 03:00 -> 02:00 (CEST -> CET) in Berlin. // 02:30 happens twice on that day. We pick the later one. let now = dt_utc(2024, 10, 26, 12, 0); // day before, noon UTC let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap(); let next = next_fire(now, at, Tz::Europe__Berlin); // First 02:30 local is 00:30 UTC (CEST = UTC+2). // Second 02:30 local is 01:30 UTC (CET = UTC+1). // We expect the later instant: 01:30 UTC on 2024-10-27. assert_eq!(next, dt_utc(2024, 10, 27, 1, 30)); } #[test] fn previous_fire_returns_today_when_now_is_after_slot() { let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); let prev = previous_fire(now, at, Tz::UTC); assert_eq!(prev, dt_utc(2026, 5, 25, 0, 0)); } #[test] fn previous_fire_returns_yesterday_when_now_is_before_today_slot() { let now = dt_utc(2026, 5, 25, 8, 0); // 08:00 UTC let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap(); let prev = previous_fire(now, at, Tz::UTC); assert_eq!(prev, dt_utc(2026, 5, 24, 23, 30)); } }