From 93c7fd63fcbc25ae4bbd9f3921f5667ff2496e55 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 25 May 2026 19:59:09 +0200 Subject: [PATCH] feat: crawler job queue ops and dedup index (0.27.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds enqueue / lease / ack_done / ack_failed / release / reap_done on crawler::jobs, backed by the existing crawler_jobs table. lease() uses a single FOR UPDATE SKIP LOCKED CTE that also re-claims stale running rows (crashed-worker recovery), and ack_failed applies an exponential backoff capped at 1h before retrying. Migration 0014 adds a partial unique index on (payload->>'chapter_id') restricted to (pending|running) sync_chapter_content jobs, so producers can just INSERT ... ON CONFLICT DO NOTHING without racing each other. The slot frees again the moment the job leaves the in-flight states, so a future force-refetch can re-enqueue. Library-only — no daemon, no API hook. Those land in the next two phases. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- .../0014_crawler_jobs_dedup_index.sql | 15 + backend/src/crawler/jobs.rs | 220 ++++++++- backend/tests/crawler_jobs.rs | 441 ++++++++++++++++++ frontend/package.json | 2 +- 6 files changed, 676 insertions(+), 6 deletions(-) create mode 100644 backend/migrations/0014_crawler_jobs_dedup_index.sql create mode 100644 backend/tests/crawler_jobs.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index db763fc..60c050a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1448,7 +1448,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.26.1" +version = "0.27.0" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 418ddb1..0b7a503 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.26.1" +version = "0.27.0" edition = "2021" default-run = "mangalord" diff --git a/backend/migrations/0014_crawler_jobs_dedup_index.sql b/backend/migrations/0014_crawler_jobs_dedup_index.sql new file mode 100644 index 0000000..6ce90e7 --- /dev/null +++ b/backend/migrations/0014_crawler_jobs_dedup_index.sql @@ -0,0 +1,15 @@ +-- Dedup SyncChapterContent jobs in flight. +-- +-- Without this, the daemon's bookmark/cron enqueue paths would have to do a +-- pre-check + insert race that's incorrect under concurrency. The partial +-- unique index lets both producers use plain `INSERT ... ON CONFLICT DO +-- NOTHING`: at most one (pending|running) job per chapter_id exists, and the +-- slot frees again as soon as the job transitions to done/failed/dead so a +-- re-enqueue is possible after the row is reaped or a force-refetch is wanted. +-- +-- Scoped to sync_chapter_content payloads only so Discover / SyncManga / +-- SyncChapterList jobs (which don't carry a chapter_id) remain un-deduped. +CREATE UNIQUE INDEX crawler_jobs_chapter_content_dedup_idx + ON crawler_jobs ((payload->>'chapter_id')) + WHERE state IN ('pending', 'running') + AND payload->>'kind' = 'sync_chapter_content'; diff --git a/backend/src/crawler/jobs.rs b/backend/src/crawler/jobs.rs index 8b1dc26..1f3e0d1 100644 --- a/backend/src/crawler/jobs.rs +++ b/backend/src/crawler/jobs.rs @@ -5,11 +5,11 @@ //! `leased_until`, and ack by transitioning to `done` (or backoff / //! `dead`). Handlers are idempotent so a crash mid-run is recoverable //! by replay. -//! -//! Scaffold only — the actual queue wrapper and handler dispatch land -//! once we have the first `Source` impl exercising the pipeline. + +use std::time::Duration; use serde::{Deserialize, Serialize}; +use sqlx::PgPool; use uuid::Uuid; use super::source::DiscoverMode; @@ -53,3 +53,217 @@ pub enum JobState { Failed, Dead, } + +/// Kind discriminator stored in `payload->>'kind'`. Public so callers +/// (daemon worker, bookmark hook) can filter `lease()` to a single kind +/// without re-spelling the literal. +pub const KIND_SYNC_CHAPTER_CONTENT: &str = "sync_chapter_content"; + +#[derive(Debug)] +pub enum EnqueueResult { + Inserted(Uuid), + Skipped, +} + +#[derive(Debug, Clone)] +pub struct Lease { + pub id: Uuid, + pub payload: JobPayload, + pub attempts: i32, + pub max_attempts: i32, +} + +/// Exponential backoff for `ack_failed` retries. `attempts` is the +/// post-increment value reported by `lease()` (so the first failure has +/// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to +/// avoid runaway long sleeps that would outlive the daemon process. +fn backoff_for(attempts: i32) -> Duration { + let shift = attempts.saturating_sub(1).clamp(0, 20) as u32; + let secs = 60u64.saturating_mul(1u64 << shift); + Duration::from_secs(secs.min(3600)) +} + +/// Insert a new pending job. For `SyncChapterContent` payloads the +/// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks +/// a second `(pending|running)` insert per chapter_id, returning +/// `Skipped`. The slot frees again once the previous job leaves the +/// in-flight states (done/failed/dead), so a re-enqueue after a force +/// refetch succeeds. +pub async fn enqueue(pool: &PgPool, payload: &JobPayload) -> sqlx::Result { + let json = serde_json::to_value(payload).expect("JobPayload is always serializable"); + let id: Option = sqlx::query_scalar( + "INSERT INTO crawler_jobs (payload) VALUES ($1) \ + ON CONFLICT DO NOTHING RETURNING id", + ) + .bind(json) + .fetch_optional(pool) + .await?; + Ok(match id { + Some(id) => EnqueueResult::Inserted(id), + None => EnqueueResult::Skipped, + }) +} + +/// Lease up to `max` rows whose `state` is `pending`, or `running` with +/// an expired `leased_until` (the crashed-worker recovery path). The +/// inner CTE uses `FOR UPDATE SKIP LOCKED` so concurrent leasers don't +/// block each other and each row is handed to exactly one worker. +/// +/// `kind_filter` matches against `payload->>'kind'`; `None` means +/// any kind. +pub async fn lease( + pool: &PgPool, + kind_filter: Option<&str>, + max: i64, + lease_duration: Duration, +) -> sqlx::Result> { + let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64; + let rows: Vec<(Uuid, serde_json::Value, i32, i32)> = sqlx::query_as( + r#" + WITH leased AS ( + SELECT id FROM crawler_jobs + WHERE (state = 'pending' OR (state = 'running' AND leased_until < now())) + AND scheduled_at <= now() + AND ($1::text IS NULL OR payload->>'kind' = $1) + ORDER BY scheduled_at + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE crawler_jobs j + SET state = 'running', + attempts = j.attempts + 1, + leased_until = now() + ($3::bigint || ' milliseconds')::interval, + updated_at = now() + FROM leased l + WHERE j.id = l.id + RETURNING j.id, j.payload, j.attempts, j.max_attempts + "#, + ) + .bind(kind_filter) + .bind(max) + .bind(lease_ms) + .fetch_all(pool) + .await?; + + let mut leases = Vec::with_capacity(rows.len()); + for (id, payload_json, attempts, max_attempts) in rows { + let payload: JobPayload = serde_json::from_value(payload_json).map_err(|e| { + sqlx::Error::Decode(format!("invalid JobPayload JSON for job {id}: {e}").into()) + })?; + leases.push(Lease { + id, + payload, + attempts, + max_attempts, + }); + } + Ok(leases) +} + +/// Mark a leased job as successfully completed. +pub async fn ack_done(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> { + sqlx::query( + "UPDATE crawler_jobs \ + SET state = 'done', leased_until = NULL, updated_at = now() \ + WHERE id = $1", + ) + .bind(lease_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Mark a leased job as failed. If the current attempt count has reached +/// `max_attempts` the job is terminally dead and stops retrying; +/// otherwise it goes back to `pending` with `scheduled_at` pushed into +/// the future by the exponential backoff. +pub async fn ack_failed( + pool: &PgPool, + lease_id: Uuid, + error: &str, + attempts: i32, + max_attempts: i32, +) -> sqlx::Result<()> { + if attempts >= max_attempts { + sqlx::query( + "UPDATE crawler_jobs \ + SET state = 'dead', last_error = $2, leased_until = NULL, updated_at = now() \ + WHERE id = $1", + ) + .bind(lease_id) + .bind(error) + .execute(pool) + .await?; + } else { + let backoff_ms: i64 = backoff_for(attempts).as_millis().min(i64::MAX as u128) as i64; + sqlx::query( + "UPDATE crawler_jobs \ + SET state = 'pending', last_error = $2, leased_until = NULL, \ + scheduled_at = now() + ($3::bigint || ' milliseconds')::interval, \ + updated_at = now() \ + WHERE id = $1", + ) + .bind(lease_id) + .bind(error) + .bind(backoff_ms) + .execute(pool) + .await?; + } + Ok(()) +} + +/// Return a leased job to `pending` without burning a retry attempt. +/// Used on graceful shutdown and on session-expired aborts where the +/// failure isn't the job's fault. +pub async fn release(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> { + sqlx::query( + "UPDATE crawler_jobs \ + SET state = 'pending', leased_until = NULL, \ + attempts = GREATEST(0, attempts - 1), updated_at = now() \ + WHERE id = $1", + ) + .bind(lease_id) + .execute(pool) + .await?; + Ok(()) +} + +/// Delete `done` jobs whose `updated_at` is older than `retention_days` +/// days. `0` disables the reaper without touching the table. Returns the +/// number of rows removed. +pub async fn reap_done(pool: &PgPool, retention_days: u32) -> sqlx::Result { + if retention_days == 0 { + return Ok(0); + } + let result = sqlx::query( + "DELETE FROM crawler_jobs \ + WHERE state = 'done' \ + AND updated_at < now() - ($1::bigint || ' days')::interval", + ) + .bind(retention_days as i64) + .execute(pool) + .await?; + Ok(result.rows_affected()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backoff_grows_exponentially_and_caps_at_one_hour() { + // attempts == 1 → 60s, doubling each step. + assert_eq!(backoff_for(1), Duration::from_secs(60)); + assert_eq!(backoff_for(2), Duration::from_secs(120)); + assert_eq!(backoff_for(3), Duration::from_secs(240)); + assert_eq!(backoff_for(4), Duration::from_secs(480)); + assert_eq!(backoff_for(5), Duration::from_secs(960)); + assert_eq!(backoff_for(6), Duration::from_secs(1920)); + // 7th: 60 * 64 = 3840 → capped to 3600. + assert_eq!(backoff_for(7), Duration::from_secs(3600)); + assert_eq!(backoff_for(20), Duration::from_secs(3600)); + // Garbage / zero / negatives stay sane. + assert_eq!(backoff_for(0), Duration::from_secs(60)); + assert_eq!(backoff_for(-5), Duration::from_secs(60)); + } +} diff --git a/backend/tests/crawler_jobs.rs b/backend/tests/crawler_jobs.rs new file mode 100644 index 0000000..5a3db9b --- /dev/null +++ b/backend/tests/crawler_jobs.rs @@ -0,0 +1,441 @@ +//! Integration tests for `crawler::jobs` queue operations. +//! +//! Uses `#[sqlx::test(migrations = "./migrations")]` which provisions a fresh +//! migrated DB per test. No browser, no axum router — these exercise the SQL +//! shape and dedup-index semantics directly against Postgres. + +use std::time::Duration; + +use mangalord::crawler::jobs::{ + self, EnqueueResult, JobPayload, KIND_SYNC_CHAPTER_CONTENT, +}; +use mangalord::crawler::source::DiscoverMode; +use sqlx::PgPool; +use uuid::Uuid; + +fn chapter_content_payload(chapter_id: Uuid) -> JobPayload { + JobPayload::SyncChapterContent { + source_id: "target".into(), + chapter_id, + source_chapter_key: format!("ch-{chapter_id}"), + } +} + +fn discover_payload() -> JobPayload { + JobPayload::Discover { + source_id: "target".into(), + mode: DiscoverMode::Backfill, + } +} + +async fn job_state(pool: &PgPool, id: Uuid) -> String { + sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(pool) + .await + .unwrap() +} + +async fn job_attempts(pool: &PgPool, id: Uuid) -> i32 { + sqlx::query_scalar::<_, i32>("SELECT attempts FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(pool) + .await + .unwrap() +} + +async fn job_count(pool: &PgPool) -> i64 { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM crawler_jobs") + .fetch_one(pool) + .await + .unwrap() +} + +#[sqlx::test(migrations = "./migrations")] +async fn enqueue_inserts_pending_row_with_round_trip_payload(pool: PgPool) { + let chapter_id = Uuid::new_v4(); + let payload = chapter_content_payload(chapter_id); + + let result = jobs::enqueue(&pool, &payload).await.unwrap(); + let id = match result { + EnqueueResult::Inserted(id) => id, + EnqueueResult::Skipped => panic!("expected Inserted on first enqueue"), + }; + + assert_eq!(job_state(&pool, id).await, "pending"); + assert_eq!(job_attempts(&pool, id).await, 0); + + let raw_payload: serde_json::Value = + sqlx::query_scalar("SELECT payload FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + let decoded: JobPayload = serde_json::from_value(raw_payload).unwrap(); + match decoded { + JobPayload::SyncChapterContent { + source_id, + chapter_id: c, + source_chapter_key, + } => { + assert_eq!(source_id, "target"); + assert_eq!(c, chapter_id); + assert_eq!(source_chapter_key, format!("ch-{chapter_id}")); + } + _ => panic!("payload variant mismatch"), + } +} + +#[sqlx::test(migrations = "./migrations")] +async fn duplicate_chapter_content_while_pending_is_skipped(pool: PgPool) { + let chapter_id = Uuid::new_v4(); + let p = chapter_content_payload(chapter_id); + + let first = jobs::enqueue(&pool, &p).await.unwrap(); + assert!(matches!(first, EnqueueResult::Inserted(_))); + + let second = jobs::enqueue(&pool, &p).await.unwrap(); + assert!(matches!(second, EnqueueResult::Skipped)); + + assert_eq!(job_count(&pool).await, 1); +} + +#[sqlx::test(migrations = "./migrations")] +async fn duplicate_after_done_releases_dedup_slot(pool: PgPool) { + let chapter_id = Uuid::new_v4(); + let p = chapter_content_payload(chapter_id); + + let first_id = match jobs::enqueue(&pool, &p).await.unwrap() { + EnqueueResult::Inserted(id) => id, + EnqueueResult::Skipped => panic!("first enqueue should insert"), + }; + // Move the first job out of (pending|running) so the partial index drops it. + sqlx::query("UPDATE crawler_jobs SET state = 'done' WHERE id = $1") + .bind(first_id) + .execute(&pool) + .await + .unwrap(); + + let second = jobs::enqueue(&pool, &p).await.unwrap(); + assert!( + matches!(second, EnqueueResult::Inserted(_)), + "after done the chapter_id slot is free again" + ); + assert_eq!(job_count(&pool).await, 2); +} + +#[sqlx::test(migrations = "./migrations")] +async fn different_chapter_ids_can_coexist(pool: PgPool) { + let p1 = chapter_content_payload(Uuid::new_v4()); + let p2 = chapter_content_payload(Uuid::new_v4()); + assert!(matches!( + jobs::enqueue(&pool, &p1).await.unwrap(), + EnqueueResult::Inserted(_) + )); + assert!(matches!( + jobs::enqueue(&pool, &p2).await.unwrap(), + EnqueueResult::Inserted(_) + )); + assert_eq!(job_count(&pool).await, 2); +} + +#[sqlx::test(migrations = "./migrations")] +async fn non_chapter_content_payloads_are_never_deduped(pool: PgPool) { + let p = discover_payload(); + assert!(matches!( + jobs::enqueue(&pool, &p).await.unwrap(), + EnqueueResult::Inserted(_) + )); + assert!(matches!( + jobs::enqueue(&pool, &p).await.unwrap(), + EnqueueResult::Inserted(_) + )); + assert_eq!(job_count(&pool).await, 2); +} + +#[sqlx::test(migrations = "./migrations")] +async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + EnqueueResult::Skipped => unreachable!(), + }; + + let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60)) + .await + .unwrap(); + assert_eq!(leases.len(), 1); + let lease = &leases[0]; + assert_eq!(lease.id, id); + assert_eq!(lease.attempts, 1); + + assert_eq!(job_state(&pool, id).await, "running"); + + let leased_until: Option> = + sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + let leased_until = leased_until.expect("leased_until set"); + assert!(leased_until > chrono::Utc::now()); +} + +#[sqlx::test(migrations = "./migrations")] +async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { + let discover_id = match jobs::enqueue(&pool, &discover_payload()).await.unwrap() { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let chapter_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + + let leases = jobs::lease( + &pool, + Some(KIND_SYNC_CHAPTER_CONTENT), + 10, + Duration::from_secs(60), + ) + .await + .unwrap(); + assert_eq!(leases.len(), 1, "only chapter content payload leases"); + assert_eq!(leases[0].id, chapter_id); + // discover is still pending + assert_eq!(job_state(&pool, discover_id).await, "pending"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn concurrent_leases_under_skip_locked_return_disjoint_ids(pool: PgPool) { + // 4 pending jobs, two concurrent calls each asking for up to 2. + let mut ids = Vec::new(); + for _ in 0..4 { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + ids.push(id); + } + + let (a, b) = tokio::join!( + jobs::lease(&pool, None, 2, Duration::from_secs(60)), + jobs::lease(&pool, None, 2, Duration::from_secs(60)), + ); + let a = a.unwrap(); + let b = b.unwrap(); + let mut seen: Vec = a.iter().chain(b.iter()).map(|l| l.id).collect(); + seen.sort(); + seen.dedup(); + let count = a.len() + b.len(); + assert_eq!( + seen.len(), + count, + "no id appears in both lease results (SKIP LOCKED)" + ); + assert!(count >= 2, "at least one lease saw work"); + assert!(count <= 4); +} + +#[sqlx::test(migrations = "./migrations")] +async fn stale_running_lease_can_be_reclaimed(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + + let first = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + assert_eq!(first.len(), 1); + // Pretend the worker crashed: rewind leased_until into the past. + sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 minute' WHERE id = $1") + .bind(id) + .execute(&pool) + .await + .unwrap(); + + let second = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + assert_eq!(second.len(), 1, "stale running row was re-leased"); + assert_eq!(second[0].id, id); + assert_eq!(second[0].attempts, 2, "attempts bumped again"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn ack_done_transitions_state_and_clears_lease(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + jobs::ack_done(&pool, leases[0].id).await.unwrap(); + + assert_eq!(job_state(&pool, id).await, "done"); + let leased_until: Option> = + sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(leased_until.is_none()); +} + +#[sqlx::test(migrations = "./migrations")] +async fn ack_failed_under_max_returns_to_pending_with_future_schedule(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + let lease = &leases[0]; + jobs::ack_failed(&pool, lease.id, "boom", lease.attempts, lease.max_attempts) + .await + .unwrap(); + + assert_eq!(job_state(&pool, id).await, "pending"); + + let (scheduled_at, last_error): (chrono::DateTime, Option) = + sqlx::query_as("SELECT scheduled_at, last_error FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(scheduled_at > chrono::Utc::now()); + assert_eq!(last_error.as_deref(), Some("boom")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn ack_failed_at_max_marks_dead(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + // Force a single lease then mark "this was attempt N where N == max_attempts". + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + let lease = &leases[0]; + jobs::ack_failed(&pool, lease.id, "final boom", lease.max_attempts, lease.max_attempts) + .await + .unwrap(); + + assert_eq!(job_state(&pool, id).await, "dead"); + let last_error: Option = + sqlx::query_scalar("SELECT last_error FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(last_error.as_deref(), Some("final boom")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn release_returns_to_pending_and_undoes_attempt_increment(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + assert_eq!(leases[0].attempts, 1); + jobs::release(&pool, leases[0].id).await.unwrap(); + + assert_eq!(job_state(&pool, id).await, "pending"); + assert_eq!(job_attempts(&pool, id).await, 0); + let leased_until: Option> = + sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!(leased_until.is_none()); +} + +#[sqlx::test(migrations = "./migrations")] +async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) { + // Two done rows: one old (updated_at 10 days ago), one fresh. + let old_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let fresh_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + + sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '10 days' WHERE id = $1") + .bind(old_id) + .execute(&pool) + .await + .unwrap(); + sqlx::query("UPDATE crawler_jobs SET state='done' WHERE id = $1") + .bind(fresh_id) + .execute(&pool) + .await + .unwrap(); + + let deleted = jobs::reap_done(&pool, 7).await.unwrap(); + assert_eq!(deleted, 1); + + let remaining: Vec = sqlx::query_scalar("SELECT id FROM crawler_jobs ORDER BY id") + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(remaining, vec![fresh_id], "only fresh row remains"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn reap_done_zero_is_a_no_op(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '999 days' WHERE id = $1") + .bind(id) + .execute(&pool) + .await + .unwrap(); + + let deleted = jobs::reap_done(&pool, 0).await.unwrap(); + assert_eq!(deleted, 0); + assert_eq!(job_count(&pool).await, 1); +} diff --git a/frontend/package.json b/frontend/package.json index c995e89..cb0535c 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.26.1", + "version": "0.27.0", "private": true, "type": "module", "scripts": {