//! Integration tests for the crawler daemon's cron + worker pool. The //! daemon's full real path requires Chromium and a live source; here we //! test the seam (MetadataPass / ChapterDispatcher traits) and the //! cron/worker control-flow. use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use chrono::NaiveTime; use chrono_tz::Tz; use mangalord::crawler::content::SyncOutcome; use mangalord::crawler::daemon::{ self, test_support::CountingMetadataPass, ChapterDispatcher, DaemonConfig, MetadataPass, CRON_LOCK_KEY, }; use mangalord::crawler::jobs::{self, JobPayload}; use mangalord::crawler::pipeline; use serde_json::json; use sqlx::PgPool; use tokio_util::sync::CancellationToken; use uuid::Uuid; fn far_future_daily_at() -> NaiveTime { // Some time hours from "now" so the scheduler sleeps for the whole test. NaiveTime::from_hms_opt(23, 59, 0).unwrap() } fn make_cfg( metadata_pass: Option>, dispatcher: Arc, session_expired: Arc, workers: usize, ) -> DaemonConfig { DaemonConfig { metadata_pass, dispatcher, chapter_workers: workers, daily_at: far_future_daily_at(), tz: Tz::UTC, retention_days: 7, session_expired, status: mangalord::crawler::status::StatusHandle::new(workers), job_timeout: Duration::from_secs(60), extra_tasks: Vec::new(), } } async fn enqueue_chapter_job(pool: &PgPool) -> Uuid { let chapter_id = Uuid::new_v4(); let payload = JobPayload::SyncChapterContent { source_id: "target".into(), chapter_id, source_chapter_key: format!("ch-{chapter_id}"), }; let res = jobs::enqueue(pool, &payload).await.unwrap(); match res { jobs::EnqueueResult::Inserted(_) => chapter_id, jobs::EnqueueResult::Skipped => unreachable!("fresh chapter_id"), } } async fn count_state(pool: &PgPool, state: &str) -> i64 { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM crawler_jobs WHERE state = $1") .bind(state) .fetch_one(pool) .await .unwrap() } struct AlwaysDoneDispatcher { seen: AtomicUsize, } #[async_trait::async_trait] impl ChapterDispatcher for AlwaysDoneDispatcher { async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { self.seen.fetch_add(1, Ordering::AcqRel); Ok(SyncOutcome::Fetched { pages: 1 }) } } struct PanickingDispatcher { seen: AtomicUsize, } #[async_trait::async_trait] impl ChapterDispatcher for PanickingDispatcher { async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { self.seen.fetch_add(1, Ordering::AcqRel); panic!("intentional dispatcher panic"); } } /// Never completes — used to verify the worker's outer dispatch timeout. struct HangingDispatcher { seen: AtomicUsize, } #[async_trait::async_trait] impl ChapterDispatcher for HangingDispatcher { async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { self.seen.fetch_add(1, Ordering::AcqRel); std::future::pending::<()>().await; unreachable!("hanging dispatcher never resolves"); } } #[sqlx::test(migrations = "./migrations")] async fn worker_times_out_a_hung_dispatch_and_acks_failed(pool: PgPool) { enqueue_chapter_job(&pool).await; let dispatcher = Arc::new(HangingDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let cancel = CancellationToken::new(); let mut cfg = make_cfg(None, dispatcher.clone(), session_expired, 1); cfg.job_timeout = Duration::from_millis(300); let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg); // The hung job should time out and return to pending with backoff // (attempts=1 < max=5). Poll for the recorded error. let mut timed_out = false; for _ in 0..40 { let n: i64 = sqlx::query_scalar( "SELECT COUNT(*) FROM crawler_jobs WHERE last_error = 'dispatch timed out'", ) .fetch_one(&pool) .await .unwrap(); if n == 1 { timed_out = true; break; } tokio::time::sleep(Duration::from_millis(50)).await; } handle.shutdown().await; assert!(timed_out, "hung dispatch must be acked failed with a timeout error"); assert!(dispatcher.seen.load(Ordering::Acquire) >= 1); } #[sqlx::test(migrations = "./migrations")] async fn workers_drain_jobs_through_dispatcher(pool: PgPool) { enqueue_chapter_job(&pool).await; enqueue_chapter_job(&pool).await; enqueue_chapter_job(&pool).await; let dispatcher = Arc::new(AlwaysDoneDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let cancel = CancellationToken::new(); let handle = daemon::spawn( pool.clone(), cancel.clone(), make_cfg(None, dispatcher.clone(), session_expired, 2), ); // Wait for the workers to drain all three jobs. let dispatcher_seen = || dispatcher.seen.load(Ordering::Acquire); for _ in 0..40 { if dispatcher_seen() >= 3 { break; } tokio::time::sleep(Duration::from_millis(50)).await; } assert!( dispatcher_seen() >= 3, "expected at least 3 dispatches, got {}", dispatcher_seen() ); handle.shutdown().await; assert_eq!(count_state(&pool, "done").await, 3); } #[sqlx::test(migrations = "./migrations")] async fn workers_idle_while_session_expired(pool: PgPool) { let id = enqueue_chapter_job(&pool).await; let dispatcher = Arc::new(AlwaysDoneDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(true)); let cancel = CancellationToken::new(); let handle = daemon::spawn( pool.clone(), cancel.clone(), make_cfg(None, dispatcher.clone(), Arc::clone(&session_expired), 1), ); // Wait long enough that a non-idled worker would have leased and ack'd. tokio::time::sleep(Duration::from_millis(800)).await; assert_eq!( dispatcher.seen.load(Ordering::Acquire), 0, "dispatcher must not be invoked while session_expired flag is set" ); assert_eq!(count_state(&pool, "pending").await, 1); let _ = id; handle.shutdown().await; } #[sqlx::test(migrations = "./migrations")] async fn dispatcher_panic_is_contained_and_job_is_acked_failed(pool: PgPool) { enqueue_chapter_job(&pool).await; enqueue_chapter_job(&pool).await; let dispatcher = Arc::new(PanickingDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let cancel = CancellationToken::new(); let handle = daemon::spawn( pool.clone(), cancel.clone(), make_cfg(None, dispatcher.clone(), session_expired, 1), ); // Wait for the worker to handle both panicking jobs. for _ in 0..40 { if dispatcher.seen.load(Ordering::Acquire) >= 2 { break; } tokio::time::sleep(Duration::from_millis(50)).await; } assert!( dispatcher.seen.load(Ordering::Acquire) >= 2, "worker must keep going after a panic — handled at least 2 jobs" ); handle.shutdown().await; // attempts=1 below max=5, so the panicking jobs go back to pending with // backoff and `last_error = "worker panicked"`. let last_errors: Vec = sqlx::query_scalar( "SELECT last_error FROM crawler_jobs WHERE last_error IS NOT NULL", ) .fetch_all(&pool) .await .unwrap(); assert_eq!(last_errors.len(), 2); assert!(last_errors.iter().all(|e| e == "worker panicked")); } #[sqlx::test(migrations = "./migrations")] async fn cron_skips_tick_when_advisory_lock_held(pool: PgPool) { // With no last_metadata_tick_at row, the daemon does a catch-up tick // immediately on spawn. We hold the advisory lock on a separate // connection beforehand so the catch-up's pg_try_advisory_lock returns // false and the tick must skip without invoking the metadata pass. let mut lock_conn = pool.acquire().await.unwrap(); sqlx::query("SELECT pg_advisory_lock($1)") .bind(CRON_LOCK_KEY) .execute(&mut *lock_conn) .await .unwrap(); let counter = Arc::new(CountingMetadataPass::default()); let dispatcher = Arc::new(AlwaysDoneDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let cancel = CancellationToken::new(); // daily_at far in the future so after the (skipped) catch-up the // cron sleeps for the rest of the test rather than racing for the lock. let cfg = make_cfg( Some(counter.clone() as Arc), dispatcher, session_expired, 1, ); let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg); tokio::time::sleep(Duration::from_millis(800)).await; assert_eq!( counter.count.load(Ordering::Acquire), 0, "cron must skip the catch-up tick while the advisory lock is held" ); sqlx::query("SELECT pg_advisory_unlock($1)") .bind(CRON_LOCK_KEY) .execute(&mut *lock_conn) .await .unwrap(); drop(lock_conn); handle.shutdown().await; } #[sqlx::test(migrations = "./migrations")] async fn cron_catches_up_when_last_tick_is_stale(pool: PgPool) { // Pre-seed last_metadata_tick_at well in the past so previous_fire(now) // > last_tick is trivially true and the daemon catches up immediately. sqlx::query( "INSERT INTO crawler_state (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", ) .bind("last_metadata_tick_at") .bind(json!({"at": "2020-01-01T00:00:00Z"})) .execute(&pool) .await .unwrap(); let counter = Arc::new(CountingMetadataPass::default()); let dispatcher = Arc::new(AlwaysDoneDispatcher { seen: AtomicUsize::new(0), }); let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let cancel = CancellationToken::new(); let handle = daemon::spawn( pool.clone(), cancel.clone(), make_cfg( Some(counter.clone() as Arc), dispatcher, session_expired, 1, ), ); for _ in 0..40 { if counter.count.load(Ordering::Acquire) >= 1 { break; } tokio::time::sleep(Duration::from_millis(50)).await; } assert!( counter.count.load(Ordering::Acquire) >= 1, "catch-up tick should have fired immediately" ); handle.shutdown().await; } #[sqlx::test(migrations = "./migrations")] async fn enqueue_bookmarked_pending_skips_dropped_sources(pool: PgPool) { // Setup: one manga with two chapters (page_count = 0). One has a // non-dropped source; the other's source is dropped. A user bookmarks // the manga. Expectation: only the non-dropped chapter is enqueued. let user_id: Uuid = sqlx::query_scalar( "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", ) .bind("alice") .bind("not-a-real-hash") .fetch_one(&pool) .await .unwrap(); let manga_id: Uuid = sqlx::query_scalar( "INSERT INTO mangas (title) VALUES ($1) RETURNING id", ) .bind("Berserk") .fetch_one(&pool) .await .unwrap(); sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") .bind("target") .bind("Target") .bind("https://example.com") .execute(&pool) .await .unwrap(); let c1: Uuid = sqlx::query_scalar( "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id", ) .bind(manga_id) .fetch_one(&pool) .await .unwrap(); let c2: Uuid = sqlx::query_scalar( "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 2, 0) RETURNING id", ) .bind(manga_id) .fetch_one(&pool) .await .unwrap(); // c1: alive source. c2: dropped source. sqlx::query( "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ VALUES ($1, $2, $3, $4)", ) .bind("target") .bind("ch1") .bind(c1) .bind("https://example.com/ch1") .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url, dropped_at) \ VALUES ($1, $2, $3, $4, now())", ) .bind("target") .bind("ch2") .bind(c2) .bind("https://example.com/ch2") .execute(&pool) .await .unwrap(); sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") .bind(user_id) .bind(manga_id) .execute(&pool) .await .unwrap(); let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); assert_eq!(summary.inserted, 1, "only the non-dropped chapter enqueued"); assert_eq!(summary.skipped, 0); let payloads: Vec = sqlx::query_scalar( "SELECT payload FROM crawler_jobs WHERE payload->>'kind' = 'sync_chapter_content'", ) .fetch_all(&pool) .await .unwrap(); assert_eq!(payloads.len(), 1); assert_eq!( payloads[0]["chapter_id"].as_str().unwrap(), c1.to_string() ); } #[sqlx::test(migrations = "./migrations")] async fn enqueue_bookmarked_pending_skips_recently_dead_chapters(pool: PgPool) { // Setup: a chapter whose last SyncChapterContent job died yesterday. // The cron tick must not re-enqueue — without the quarantine, the // chapter would spin: re-enqueue → max_attempts retries → dies again // → re-enqueue next tick → forever. let user_id: Uuid = sqlx::query_scalar( "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", ) .bind("alice") .bind("not-a-real-hash") .fetch_one(&pool) .await .unwrap(); let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") .bind("Test") .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", ) .bind("target") .bind("Target") .bind("https://example.com") .execute(&pool) .await .unwrap(); let chapter_id: Uuid = sqlx::query_scalar( "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id", ) .bind(manga_id) .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ VALUES ($1, $2, $3, $4)", ) .bind("target") .bind("ch1") .bind(chapter_id) .bind("https://example.com/ch1") .execute(&pool) .await .unwrap(); sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") .bind(user_id) .bind(manga_id) .execute(&pool) .await .unwrap(); // The dead job from the prior tick, updated 1 day ago (well inside the // 7-day quarantine window). sqlx::query( "INSERT INTO crawler_jobs (payload, state, updated_at) \ VALUES ($1::jsonb, 'dead', now() - interval '1 day')", ) .bind(serde_json::json!({ "kind": "sync_chapter_content", "source_id": "target", "chapter_id": chapter_id.to_string(), "source_chapter_key": "ch1", })) .execute(&pool) .await .unwrap(); let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); assert_eq!(summary.inserted, 0, "recently dead chapter is quarantined"); assert_eq!(summary.skipped, 0); } #[sqlx::test(migrations = "./migrations")] async fn enqueue_bookmarked_pending_resumes_after_quarantine_expires(pool: PgPool) { // Same setup as above but the dead job is 10 days old — past the // 7-day quarantine. The chapter should be re-enqueued so a once-failed // chapter eventually gets a second shot at success. let user_id: Uuid = sqlx::query_scalar( "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", ) .bind("alice") .bind("not-a-real-hash") .fetch_one(&pool) .await .unwrap(); let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") .bind("Test") .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", ) .bind("target") .bind("Target") .bind("https://example.com") .execute(&pool) .await .unwrap(); let chapter_id: Uuid = sqlx::query_scalar( "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id", ) .bind(manga_id) .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ VALUES ($1, $2, $3, $4)", ) .bind("target") .bind("ch1") .bind(chapter_id) .bind("https://example.com/ch1") .execute(&pool) .await .unwrap(); sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") .bind(user_id) .bind(manga_id) .execute(&pool) .await .unwrap(); sqlx::query( "INSERT INTO crawler_jobs (payload, state, updated_at) \ VALUES ($1::jsonb, 'dead', now() - interval '10 days')", ) .bind(serde_json::json!({ "kind": "sync_chapter_content", "source_id": "target", "chapter_id": chapter_id.to_string(), "source_chapter_key": "ch1", })) .execute(&pool) .await .unwrap(); let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); assert_eq!( summary.inserted, 1, "dead chapter is re-enqueued after quarantine expires" ); } /// Helper: insert a chapter with the given `number` and a non-dropped /// source row, returning the chapter id. Used by the ordering tests so /// the setup boilerplate doesn't drown the assertion. async fn insert_pending_chapter( pool: &PgPool, manga_id: Uuid, number: i32, source_chapter_key: &str, ) -> Uuid { let chapter_id: Uuid = sqlx::query_scalar( "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, $2, 0) RETURNING id", ) .bind(manga_id) .bind(number) .fetch_one(pool) .await .unwrap(); sqlx::query( "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ VALUES ($1, $2, $3, $4)", ) .bind("target") .bind(source_chapter_key) .bind(chapter_id) .bind(format!("https://example.com/{source_chapter_key}")) .execute(pool) .await .unwrap(); chapter_id } #[sqlx::test(migrations = "./migrations")] async fn enqueue_bookmarked_pending_queues_chapters_in_ascending_number_order(pool: PgPool) { // Insert chapters with `number` values 3, 1, 2 in that insertion // order — so `created_at` order (the previous tiebreaker) does NOT // match number order. After enqueue + lease, the worker should see // chapters 1, 2, 3 in that sequence. let user_id: Uuid = sqlx::query_scalar( "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", ) .bind("alice") .bind("not-a-real-hash") .fetch_one(&pool) .await .unwrap(); let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") .bind("Test") .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", ) .bind("target") .bind("Target") .bind("https://example.com") .execute(&pool) .await .unwrap(); let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await; let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await; let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await; sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") .bind(user_id) .bind(manga_id) .execute(&pool) .await .unwrap(); let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); assert_eq!(summary.inserted, 3); let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60)) .await .unwrap(); let leased_chapter_ids: Vec = leases .iter() .map(|l| match &l.payload { JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id, other => panic!("unexpected payload kind: {other:?}"), }) .collect(); assert_eq!( leased_chapter_ids, vec![c1, c2, c3], "chapters must be leased in ascending chapter-number order, not insertion order" ); } #[sqlx::test(migrations = "./migrations")] async fn enqueue_pending_for_manga_queues_chapters_in_ascending_number_order(pool: PgPool) { // Same scenario as above but exercising the bookmark-create hook path // (`enqueue_pending_for_manga`) which has its own ORDER BY. let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") .bind("Test") .fetch_one(&pool) .await .unwrap(); sqlx::query( "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", ) .bind("target") .bind("Target") .bind("https://example.com") .execute(&pool) .await .unwrap(); let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await; let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await; let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await; let summary = pipeline::enqueue_pending_for_manga(&pool, manga_id) .await .unwrap(); assert_eq!(summary.inserted, 3); let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60)) .await .unwrap(); let leased_chapter_ids: Vec = leases .iter() .map(|l| match &l.payload { JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id, other => panic!("unexpected payload kind: {other:?}"), }) .collect(); assert_eq!(leased_chapter_ids, vec![c1, c2, c3]); }