//! Integration tests for `crawler::jobs` queue operations. //! //! Uses `#[sqlx::test(migrations = "./migrations")]` which provisions a fresh //! migrated DB per test. No browser, no axum router — these exercise the SQL //! shape and dedup-index semantics directly against Postgres. use std::time::Duration; use mangalord::crawler::jobs::{ self, EnqueueResult, JobPayload, KIND_SYNC_CHAPTER_CONTENT, }; use mangalord::crawler::source::DiscoverMode; use sqlx::PgPool; use uuid::Uuid; fn chapter_content_payload(chapter_id: Uuid) -> JobPayload { JobPayload::SyncChapterContent { source_id: "target".into(), chapter_id, source_chapter_key: format!("ch-{chapter_id}"), } } fn discover_payload() -> JobPayload { JobPayload::Discover { source_id: "target".into(), mode: DiscoverMode::Backfill, } } async fn job_state(pool: &PgPool, id: Uuid) -> String { sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(pool) .await .unwrap() } async fn job_attempts(pool: &PgPool, id: Uuid) -> i32 { sqlx::query_scalar::<_, i32>("SELECT attempts FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(pool) .await .unwrap() } async fn job_count(pool: &PgPool) -> i64 { sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM crawler_jobs") .fetch_one(pool) .await .unwrap() } #[sqlx::test(migrations = "./migrations")] async fn enqueue_inserts_pending_row_with_round_trip_payload(pool: PgPool) { let chapter_id = Uuid::new_v4(); let payload = chapter_content_payload(chapter_id); let result = jobs::enqueue(&pool, &payload).await.unwrap(); let id = match result { EnqueueResult::Inserted(id) => id, EnqueueResult::Skipped => panic!("expected Inserted on first enqueue"), }; assert_eq!(job_state(&pool, id).await, "pending"); assert_eq!(job_attempts(&pool, id).await, 0); let raw_payload: serde_json::Value = sqlx::query_scalar("SELECT payload FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); let decoded: JobPayload = serde_json::from_value(raw_payload).unwrap(); match decoded { JobPayload::SyncChapterContent { source_id, chapter_id: c, source_chapter_key, } => { assert_eq!(source_id, "target"); assert_eq!(c, chapter_id); assert_eq!(source_chapter_key, format!("ch-{chapter_id}")); } _ => panic!("payload variant mismatch"), } } #[sqlx::test(migrations = "./migrations")] async fn duplicate_chapter_content_while_pending_is_skipped(pool: PgPool) { let chapter_id = Uuid::new_v4(); let p = chapter_content_payload(chapter_id); let first = jobs::enqueue(&pool, &p).await.unwrap(); assert!(matches!(first, EnqueueResult::Inserted(_))); let second = jobs::enqueue(&pool, &p).await.unwrap(); assert!(matches!(second, EnqueueResult::Skipped)); assert_eq!(job_count(&pool).await, 1); } #[sqlx::test(migrations = "./migrations")] async fn duplicate_after_done_releases_dedup_slot(pool: PgPool) { let chapter_id = Uuid::new_v4(); let p = chapter_content_payload(chapter_id); let first_id = match jobs::enqueue(&pool, &p).await.unwrap() { EnqueueResult::Inserted(id) => id, EnqueueResult::Skipped => panic!("first enqueue should insert"), }; // Move the first job out of (pending|running) so the partial index drops it. sqlx::query("UPDATE crawler_jobs SET state = 'done' WHERE id = $1") .bind(first_id) .execute(&pool) .await .unwrap(); let second = jobs::enqueue(&pool, &p).await.unwrap(); assert!( matches!(second, EnqueueResult::Inserted(_)), "after done the chapter_id slot is free again" ); assert_eq!(job_count(&pool).await, 2); } #[sqlx::test(migrations = "./migrations")] async fn different_chapter_ids_can_coexist(pool: PgPool) { let p1 = chapter_content_payload(Uuid::new_v4()); let p2 = chapter_content_payload(Uuid::new_v4()); assert!(matches!( jobs::enqueue(&pool, &p1).await.unwrap(), EnqueueResult::Inserted(_) )); assert!(matches!( jobs::enqueue(&pool, &p2).await.unwrap(), EnqueueResult::Inserted(_) )); assert_eq!(job_count(&pool).await, 2); } #[sqlx::test(migrations = "./migrations")] async fn non_chapter_content_payloads_are_never_deduped(pool: PgPool) { let p = discover_payload(); assert!(matches!( jobs::enqueue(&pool, &p).await.unwrap(), EnqueueResult::Inserted(_) )); assert!(matches!( jobs::enqueue(&pool, &p).await.unwrap(), EnqueueResult::Inserted(_) )); assert_eq!(job_count(&pool).await, 2); } #[sqlx::test(migrations = "./migrations")] async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, EnqueueResult::Skipped => unreachable!(), }; let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60)) .await .unwrap(); assert_eq!(leases.len(), 1); let lease = &leases[0]; assert_eq!(lease.id, id); assert_eq!(lease.attempts, 1); assert_eq!(job_state(&pool, id).await, "running"); let leased_until: Option> = sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); let leased_until = leased_until.expect("leased_until set"); assert!(leased_until > chrono::Utc::now()); } #[sqlx::test(migrations = "./migrations")] async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { let discover_id = match jobs::enqueue(&pool, &discover_payload()).await.unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let chapter_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let leases = jobs::lease( &pool, Some(KIND_SYNC_CHAPTER_CONTENT), 10, Duration::from_secs(60), ) .await .unwrap(); assert_eq!(leases.len(), 1, "only chapter content payload leases"); assert_eq!(leases[0].id, chapter_id); // discover is still pending assert_eq!(job_state(&pool, discover_id).await, "pending"); } #[sqlx::test(migrations = "./migrations")] async fn concurrent_leases_under_skip_locked_return_disjoint_ids(pool: PgPool) { // 4 pending jobs, two concurrent calls each asking for up to 2. let mut ids = Vec::new(); for _ in 0..4 { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; ids.push(id); } let (a, b) = tokio::join!( jobs::lease(&pool, None, 2, Duration::from_secs(60)), jobs::lease(&pool, None, 2, Duration::from_secs(60)), ); let a = a.unwrap(); let b = b.unwrap(); let mut seen: Vec = a.iter().chain(b.iter()).map(|l| l.id).collect(); seen.sort(); seen.dedup(); let count = a.len() + b.len(); assert_eq!( seen.len(), count, "no id appears in both lease results (SKIP LOCKED)" ); assert!(count >= 2, "at least one lease saw work"); assert!(count <= 4); } #[sqlx::test(migrations = "./migrations")] async fn stale_running_lease_can_be_reclaimed(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let first = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); assert_eq!(first.len(), 1); // Pretend the worker crashed: rewind leased_until into the past. sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 minute' WHERE id = $1") .bind(id) .execute(&pool) .await .unwrap(); let second = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); assert_eq!(second.len(), 1, "stale running row was re-leased"); assert_eq!(second[0].id, id); assert_eq!(second[0].attempts, 2, "attempts bumped again"); } #[sqlx::test(migrations = "./migrations")] async fn ack_done_transitions_state_and_clears_lease(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); jobs::ack_done(&pool, leases[0].id).await.unwrap(); assert_eq!(job_state(&pool, id).await, "done"); let leased_until: Option> = sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); assert!(leased_until.is_none()); } #[sqlx::test(migrations = "./migrations")] async fn ack_failed_under_max_returns_to_pending_with_future_schedule(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); let lease = &leases[0]; jobs::ack_failed(&pool, lease.id, "boom", lease.attempts, lease.max_attempts) .await .unwrap(); assert_eq!(job_state(&pool, id).await, "pending"); let (scheduled_at, last_error): (chrono::DateTime, Option) = sqlx::query_as("SELECT scheduled_at, last_error FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); assert!(scheduled_at > chrono::Utc::now()); assert_eq!(last_error.as_deref(), Some("boom")); } #[sqlx::test(migrations = "./migrations")] async fn ack_failed_at_max_marks_dead(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; // Force a single lease then mark "this was attempt N where N == max_attempts". let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); let lease = &leases[0]; jobs::ack_failed(&pool, lease.id, "final boom", lease.max_attempts, lease.max_attempts) .await .unwrap(); assert_eq!(job_state(&pool, id).await, "dead"); let last_error: Option = sqlx::query_scalar("SELECT last_error FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); assert_eq!(last_error.as_deref(), Some("final boom")); } #[sqlx::test(migrations = "./migrations")] async fn release_returns_to_pending_and_undoes_attempt_increment(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) .await .unwrap(); assert_eq!(leases[0].attempts, 1); jobs::release(&pool, leases[0].id).await.unwrap(); assert_eq!(job_state(&pool, id).await, "pending"); assert_eq!(job_attempts(&pool, id).await, 0); let leased_until: Option> = sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") .bind(id) .fetch_one(&pool) .await .unwrap(); assert!(leased_until.is_none()); } #[sqlx::test(migrations = "./migrations")] async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) { // Two done rows: one old (updated_at 10 days ago), one fresh. let old_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; let fresh_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '10 days' WHERE id = $1") .bind(old_id) .execute(&pool) .await .unwrap(); sqlx::query("UPDATE crawler_jobs SET state='done' WHERE id = $1") .bind(fresh_id) .execute(&pool) .await .unwrap(); let deleted = jobs::reap_done(&pool, 7).await.unwrap(); assert_eq!(deleted, 1); let remaining: Vec = sqlx::query_scalar("SELECT id FROM crawler_jobs ORDER BY id") .fetch_all(&pool) .await .unwrap(); assert_eq!(remaining, vec![fresh_id], "only fresh row remains"); } #[sqlx::test(migrations = "./migrations")] async fn reap_done_zero_is_a_no_op(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) .await .unwrap() { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '999 days' WHERE id = $1") .bind(id) .execute(&pool) .await .unwrap(); let deleted = jobs::reap_done(&pool, 0).await.unwrap(); assert_eq!(deleted, 0); assert_eq!(job_count(&pool).await, 1); }