Files
Mangalord/backend/src/crawler/daemon.rs
MechaCat02 5c04b0532b fix(crawler): panic-isolate the cron tick body (0.36.5)
Worker dispatch was already wrapped in AssertUnwindSafe(...)
.catch_unwind() — a panicking handler ack's the job failed and the
worker keeps going. The cron tick had no such guard: a panic in
metadata.run, enqueue_bookmarked_pending, reap_done, or
write_last_tick would kill the cron task. The JoinSet would drop it,
workers would keep running, and no future metadata pass would ever
fire until daemon restart.

Wrap the tick body (between advisory-lock acquire and unlock) in the
same AssertUnwindSafe(...).catch_unwind() pattern. The unlock and
connection drop run unconditionally so a panicked tick doesn't leave
the lock held for another replica.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 20:08:11 +02:00

659 lines
24 KiB
Rust

//! In-process crawler daemon.
//!
//! Owns a cron task that fires a daily metadata pass and N worker tasks
//! that drain `SyncChapterContent` jobs from `crawler_jobs`. The dispatch
//! seams ([`MetadataPass`], [`ChapterDispatcher`]) are traits so tests can
//! inject stubs without standing up a real Chromium / `Source` impl.
//!
//! ## Cron
//!
//! Each tick:
//! 1. Acquire a Postgres advisory lock on a dedicated pool connection
//! (multi-replica safety). Skip the tick on contention.
//! 2. Call [`MetadataPass::run`] (typically `pipeline::run_metadata_pass`).
//! 3. Enqueue `SyncChapterContent` jobs for any bookmarked manga whose
//! chapters still have `page_count = 0`.
//! 4. Reap `done` jobs older than `retention_days`.
//! 5. Persist `last_metadata_tick_at` and release the lock.
//!
//! If the last persisted tick is older than the most recent scheduled slot
//! (e.g. backend was down at midnight), the daemon fires immediately on
//! startup before resuming the regular schedule.
//!
//! ## Workers
//!
//! Each worker leases one chapter-content job at a time, dispatches via the
//! [`ChapterDispatcher`], and acks `done` / `failed` / re-`pending` based on
//! the outcome. A `SessionExpired` outcome flips the sticky
//! `session_expired` flag — all workers idle while it's set (until operator
//! restart with a refreshed PHPSESSID).
//!
//! Worker dispatch is wrapped in `catch_unwind` so a panicking handler
//! marks the job failed instead of taking down the worker task.
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Datelike, NaiveTime, TimeZone, Timelike, Utc};
use chrono_tz::Tz;
use futures_util::FutureExt;
use serde_json::json;
use sqlx::PgPool;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use crate::crawler::content::SyncOutcome;
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
use crate::crawler::pipeline;
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
/// without consulting config.
pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244;
const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at";
#[async_trait]
pub trait MetadataPass: Send + Sync {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>;
}
#[async_trait]
pub trait ChapterDispatcher: Send + Sync {
async fn dispatch(&self, payload: JobPayload) -> anyhow::Result<SyncOutcome>;
}
/// Configuration for [`spawn`]. Use `None` for `metadata_pass` to disable
/// the cron entirely (worker-pool-only mode — useful when only the
/// bookmark-triggered enqueue path is wanted).
pub struct DaemonConfig {
pub metadata_pass: Option<Arc<dyn MetadataPass>>,
pub dispatcher: Arc<dyn ChapterDispatcher>,
pub chapter_workers: usize,
pub daily_at: NaiveTime,
pub tz: Tz,
pub retention_days: u32,
pub session_expired: Arc<AtomicBool>,
/// Tasks that should run alongside the cron + workers and be cancelled
/// on shutdown. Used to hand the daemon ownership of the browser
/// manager's idle reaper.
pub extra_tasks: Vec<tokio::task::JoinHandle<()>>,
}
pub struct DaemonHandle {
cancel: CancellationToken,
join: JoinSet<()>,
extra: Vec<tokio::task::JoinHandle<()>>,
}
impl DaemonHandle {
/// Trigger shutdown and await all worker / cron / extra tasks.
pub async fn shutdown(mut self) {
self.cancel.cancel();
while self.join.join_next().await.is_some() {}
for task in self.extra.drain(..) {
let _ = task.await;
}
}
/// Cancellation token that drives shutdown — exposed so callers
/// (`app::spawn_crawler_daemon`) can hand the same token to auxiliary
/// tasks (e.g. the BrowserManager idle reaper) and have them stop on
/// the daemon's signal.
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
}
/// Spawn the daemon. Returns immediately; tasks run in the background.
/// Pass an external [`CancellationToken`] so auxiliary tasks (e.g. a
/// BrowserManager idle reaper) can share the same shutdown signal —
/// typically created in the caller, cloned into both spawns.
pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> DaemonHandle {
let mut join = JoinSet::new();
let DaemonConfig {
metadata_pass,
dispatcher,
chapter_workers,
daily_at,
tz,
retention_days,
session_expired,
extra_tasks,
} = cfg;
if let Some(metadata) = metadata_pass {
let ctx = CronContext {
pool: pool.clone(),
cancel: cancel.clone(),
daily_at,
tz,
retention_days,
metadata,
};
join.spawn(async move { ctx.run().await });
} else {
tracing::info!("crawler daemon: no metadata_pass — cron disabled");
}
for worker_id in 0..chapter_workers.max(1) {
let ctx = WorkerContext {
pool: pool.clone(),
cancel: cancel.clone(),
dispatcher: Arc::clone(&dispatcher),
session_expired: Arc::clone(&session_expired),
id: worker_id,
};
join.spawn(async move { ctx.run().await });
}
DaemonHandle {
cancel,
join,
extra: extra_tasks,
}
}
// ---------------------------------------------------------------------------
// Cron
// ---------------------------------------------------------------------------
struct CronContext {
pool: PgPool,
cancel: CancellationToken,
daily_at: NaiveTime,
tz: Tz,
retention_days: u32,
metadata: Arc<dyn MetadataPass>,
}
impl CronContext {
async fn run(self) {
// On startup, fire immediately if the most recent slot has already
// passed and we never recorded a tick for it.
let now = Utc::now();
let mut catchup = match read_last_tick(&self.pool).await {
Ok(Some(last)) => previous_fire(now, self.daily_at, self.tz) > last,
Ok(None) => true,
Err(e) => {
tracing::warn!(?e, "cron: read_last_tick failed; assuming no catch-up");
false
}
};
loop {
if catchup {
tracing::info!("cron: catch-up tick (missed scheduled slot)");
self.run_tick().await;
catchup = false;
continue;
}
// Recompute next-fire from now() each iteration so clock jumps
// (NTP step, suspend/resume) don't strand us on a stale instant.
let next = next_fire(Utc::now(), self.daily_at, self.tz);
let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO);
tracing::info!(
next_fire_utc = %next.to_rfc3339(),
wait_seconds = wait.as_secs(),
"cron: sleeping until next slot"
);
tokio::select! {
_ = tokio::time::sleep(wait) => {}
_ = self.cancel.cancelled() => {
tracing::info!("cron: shutdown");
return;
}
}
self.run_tick().await;
}
}
async fn run_tick(&self) {
let mut conn = match self.pool.acquire().await {
Ok(c) => c,
Err(e) => {
tracing::error!(?e, "cron: acquire conn failed; skipping tick");
return;
}
};
// pg_try_advisory_lock is session-scoped — we must hold the same
// connection for the unlock or the call silently no-ops on a
// different connection from the pool.
let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(CRON_LOCK_KEY)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
if !acquired {
tracing::info!("cron: tick skipped — another replica holds the lock");
return;
}
// Panic-isolate the tick body the same way `process_lease` does
// for worker dispatch. Without this, a panic in metadata.run
// (or any of the follow-on steps) would kill the cron task and
// no future tick would ever run — workers would keep going but
// no new metadata work would be scheduled until daemon restart.
// The advisory unlock below runs unconditionally so a panicked
// tick doesn't leave the lock held for another replica.
let metadata = &self.metadata;
let pool = &self.pool;
let retention_days = self.retention_days;
let body = async move {
match metadata.run().await {
Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"),
Err(e) => tracing::error!(?e, "cron: metadata pass failed"),
}
match pipeline::enqueue_bookmarked_pending(pool).await {
Ok(summary) => {
tracing::info!(?summary, "cron: enqueued bookmarked-pending");
}
Err(e) => tracing::error!(?e, "cron: enqueue_bookmarked_pending failed"),
}
match jobs::reap_done(pool, retention_days).await {
Ok(n) => tracing::info!(reaped = n, "cron: done-job reaper finished"),
Err(e) => tracing::error!(?e, "cron: done-job reaper failed"),
}
if let Err(e) = write_last_tick(pool, Utc::now()).await {
tracing::warn!(?e, "cron: persist last_metadata_tick_at failed");
}
};
if let Err(_panic) = AssertUnwindSafe(body).catch_unwind().await {
tracing::error!("cron: tick body panicked — continuing");
}
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(CRON_LOCK_KEY)
.execute(&mut *conn)
.await;
drop(conn);
}
}
// ---------------------------------------------------------------------------
// Workers
// ---------------------------------------------------------------------------
struct WorkerContext {
pool: PgPool,
cancel: CancellationToken,
dispatcher: Arc<dyn ChapterDispatcher>,
session_expired: Arc<AtomicBool>,
id: usize,
}
impl WorkerContext {
async fn run(self) {
loop {
if self.cancel.is_cancelled() {
tracing::info!(worker = self.id, "worker: shutdown");
return;
}
if self.session_expired.load(Ordering::Acquire) {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => continue,
_ = self.cancel.cancelled() => return,
}
}
let leases = match jobs::lease(
&self.pool,
Some(KIND_SYNC_CHAPTER_CONTENT),
1,
Duration::from_secs(60),
)
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(worker = self.id, ?e, "worker: lease failed");
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => continue,
_ = self.cancel.cancelled() => return,
}
}
};
let Some(lease) = leases.into_iter().next() else {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => continue,
_ = self.cancel.cancelled() => return,
}
};
self.process_lease(lease).await;
}
}
async fn process_lease(&self, lease: Lease) {
// Consumer-side dedup safety net: if the chapter already has pages
// (because a force-refetch race or a job that was re-enqueued
// after a previous one finished), ack done without re-fetching.
if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload {
let page_count = crate::repo::chapter::page_count(&self.pool, *chapter_id)
.await
.ok()
.flatten();
if matches!(page_count, Some(n) if n > 0) {
let _ = jobs::ack_done(&self.pool, lease.id).await;
return;
}
}
let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind()
.await;
match outcome {
Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => {
let _ = jobs::ack_done(&self.pool, lease.id).await;
}
Ok(Ok(SyncOutcome::SessionExpired)) => {
tracing::error!(
worker = self.id,
lease_id = %lease.id,
"session expired — workers will idle until restart"
);
self.session_expired.store(true, Ordering::Release);
let _ = jobs::release(&self.pool, lease.id).await;
}
Ok(Err(e)) => {
tracing::warn!(
worker = self.id,
lease_id = %lease.id,
error = ?e,
"worker: dispatch error — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
&format!("{e:#}"),
lease.attempts,
lease.max_attempts,
)
.await;
}
Err(_panic) => {
tracing::error!(
worker = self.id,
lease_id = %lease.id,
"worker: dispatcher panicked — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
"worker panicked",
lease.attempts,
lease.max_attempts,
)
.await;
}
}
}
}
// ---------------------------------------------------------------------------
// Cron timing primitives
// ---------------------------------------------------------------------------
/// Compute the next UTC instant when `daily_at` (interpreted in `tz`) will
/// fire, strictly after `now`. Handles DST gaps (spring-forward) by
/// advancing past the gap; on DST overlap (fall-back) picks the later
/// instant so the job runs once, not twice.
pub fn next_fire(now: DateTime<Utc>, daily_at: NaiveTime, tz: Tz) -> DateTime<Utc> {
let now_local = now.with_timezone(&tz);
// Start with today's slot in the local TZ.
let mut candidate = local_at(now_local.date_naive(), daily_at, tz);
// If today's slot is in the past (or now), roll forward day-by-day.
while candidate <= now {
let next_day = candidate
.with_timezone(&tz)
.date_naive()
.succ_opt()
.unwrap_or_else(|| {
// Defensive: succ_opt only fails at chrono's max date.
chrono::NaiveDate::from_ymd_opt(
candidate.year(),
candidate.month(),
candidate.day(),
)
.expect("valid date")
});
candidate = local_at(next_day, daily_at, tz);
}
candidate
}
/// The most recent fire instant at or before `now`. Used to detect missed
/// slots after a restart.
pub fn previous_fire(now: DateTime<Utc>, daily_at: NaiveTime, tz: Tz) -> DateTime<Utc> {
let now_local = now.with_timezone(&tz);
let today = local_at(now_local.date_naive(), daily_at, tz);
if today <= now {
return today;
}
let yesterday = now_local
.date_naive()
.pred_opt()
.expect("a day before now");
local_at(yesterday, daily_at, tz)
}
/// Resolve a local date+time to a UTC instant in `tz`, navigating DST
/// edges deterministically:
/// - `LocalResult::Single` → that instant.
/// - `LocalResult::Ambiguous(_, latest)` → the later instant (fall-back
/// hour). Picking latest means a daily job fires once across the
/// repeated hour, not twice.
/// - `LocalResult::None` → spring-forward gap. Advance the local time
/// by 1 minute and try again, repeating up to 120 times (so the worst
/// case is still well inside an hour-long gap).
fn local_at(date: chrono::NaiveDate, time: NaiveTime, tz: Tz) -> DateTime<Utc> {
use chrono::LocalResult;
for offset_minutes in 0..120 {
let mut t = time;
if offset_minutes > 0 {
let added = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
((time.num_seconds_from_midnight() as i64 + offset_minutes * 60) % 86_400) as u32,
0,
)
.unwrap_or(time);
t = added;
}
let naive = date.and_time(t);
match tz.from_local_datetime(&naive) {
LocalResult::Single(dt) => return dt.with_timezone(&Utc),
LocalResult::Ambiguous(_, latest) => return latest.with_timezone(&Utc),
LocalResult::None => continue,
}
}
// Should be unreachable — DST gaps are always less than an hour.
Utc.from_utc_datetime(&date.and_time(time))
}
// ---------------------------------------------------------------------------
// crawler_state I/O
// ---------------------------------------------------------------------------
async fn read_last_tick(pool: &PgPool) -> sqlx::Result<Option<DateTime<Utc>>> {
let row: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT value FROM crawler_state WHERE key = $1",
)
.bind(STATE_KEY_LAST_TICK)
.fetch_optional(pool)
.await?;
Ok(row.and_then(|v| {
v.get("at")
.and_then(|s| s.as_str())
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
}))
}
async fn write_last_tick(pool: &PgPool, at: DateTime<Utc>) -> sqlx::Result<()> {
sqlx::query(
"INSERT INTO crawler_state (key, value, updated_at) \
VALUES ($1, $2, now()) \
ON CONFLICT (key) DO UPDATE \
SET value = EXCLUDED.value, updated_at = now()",
)
.bind(STATE_KEY_LAST_TICK)
.bind(json!({ "at": at.to_rfc3339() }))
.execute(pool)
.await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Test helpers (not gated on cfg(test) — integration tests in tests/ dir
// need them too).
// ---------------------------------------------------------------------------
pub mod test_support {
//! Lightweight stubs the daemon tests use. Public because integration
//! tests live outside this module.
use super::*;
use std::sync::atomic::AtomicUsize;
pub struct CountingMetadataPass {
pub count: AtomicUsize,
}
impl Default for CountingMetadataPass {
fn default() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
}
#[async_trait]
impl MetadataPass for CountingMetadataPass {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats> {
self.count.fetch_add(1, Ordering::AcqRel);
Ok(pipeline::MetadataStats::default())
}
}
pub type DispatchFn = Arc<
dyn Fn(JobPayload) -> futures_util::future::BoxFuture<'static, anyhow::Result<SyncOutcome>>
+ Send
+ Sync,
>;
pub struct StubDispatcher {
pub handler: DispatchFn,
}
#[async_trait]
impl ChapterDispatcher for StubDispatcher {
async fn dispatch(&self, payload: JobPayload) -> anyhow::Result<SyncOutcome> {
(self.handler)(payload).await
}
}
pub fn always_done() -> Arc<StubDispatcher> {
Arc::new(StubDispatcher {
handler: Arc::new(|_| Box::pin(async { Ok(SyncOutcome::Fetched { pages: 1 }) })),
})
}
pub fn panicking_dispatcher() -> Arc<StubDispatcher> {
Arc::new(StubDispatcher {
handler: Arc::new(|_| Box::pin(async { panic!("intentional dispatcher panic") })),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
fn dt_utc(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<Utc> {
Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).unwrap()
}
#[test]
fn next_fire_in_utc_at_midnight_advances_one_day() {
let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC
let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
let next = next_fire(now, at, Tz::UTC);
// Next midnight is May 26 00:00 UTC.
assert_eq!(next, dt_utc(2026, 5, 26, 0, 0));
}
#[test]
fn next_fire_before_today_slot_returns_today() {
let now = dt_utc(2026, 5, 25, 23, 0); // 23:00 UTC
let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap();
let next = next_fire(now, at, Tz::UTC);
assert_eq!(next, dt_utc(2026, 5, 25, 23, 30));
}
#[test]
fn next_fire_skips_spring_forward_gap_in_europe_berlin() {
// 2024-03-31: clocks jump 02:00 -> 03:00 in Berlin (CET -> CEST).
// Asking for daily_at = 02:30 on the morning of the jump should
// land on the *next valid* local instant past the gap. We test
// by computing `next_fire` at 2024-03-31 00:30 UTC (= 01:30 CET,
// i.e. just before the gap). The next 02:30 local does not exist,
// so the helper advances past it.
let now = dt_utc(2024, 3, 31, 0, 30); // 01:30 local Berlin (CET = UTC+1)
let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap();
let next = next_fire(now, at, Tz::Europe__Berlin);
// Local Berlin time skips from 02:00 -> 03:00. After the +1 minute
// search, the first valid slot is 03:00 local on 2024-03-31, which
// is 01:00 UTC (CEST = UTC+2).
// We assert the result is strictly between (now) and 1h later
// and is in UTC — the exact minute depends on how many +1m steps
// were required.
assert!(next > now);
assert!(next < now + ChronoDuration::hours(2));
}
#[test]
fn next_fire_on_fall_back_picks_later_instant() {
// 2024-10-27: clocks jump 03:00 -> 02:00 (CEST -> CET) in Berlin.
// 02:30 happens twice on that day. We pick the later one.
let now = dt_utc(2024, 10, 26, 12, 0); // day before, noon UTC
let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap();
let next = next_fire(now, at, Tz::Europe__Berlin);
// First 02:30 local is 00:30 UTC (CEST = UTC+2).
// Second 02:30 local is 01:30 UTC (CET = UTC+1).
// We expect the later instant: 01:30 UTC on 2024-10-27.
assert_eq!(next, dt_utc(2024, 10, 27, 1, 30));
}
#[test]
fn previous_fire_returns_today_when_now_is_after_slot() {
let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC
let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
let prev = previous_fire(now, at, Tz::UTC);
assert_eq!(prev, dt_utc(2026, 5, 25, 0, 0));
}
#[test]
fn previous_fire_returns_yesterday_when_now_is_before_today_slot() {
let now = dt_utc(2026, 5, 25, 8, 0); // 08:00 UTC
let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap();
let prev = previous_fire(now, at, Tz::UTC);
assert_eq!(prev, dt_utc(2026, 5, 24, 23, 30));
}
/// Documents the panic-isolation pattern `run_tick` now relies on:
/// `AssertUnwindSafe(...).catch_unwind().await` must yield `Err(_)`
/// when the wrapped future panics, so the surrounding loop (or in
/// our case, the unconditional advisory-unlock that follows) keeps
/// running. The shape of this test mirrors the production callsite.
#[tokio::test]
async fn assert_unwind_safe_catches_a_panicking_future() {
let result = AssertUnwindSafe(async {
panic!("boom");
})
.catch_unwind()
.await;
assert!(result.is_err(), "panicking future must yield Err");
}
}