From e93eec89e50c1491a3edca48815e6d84a8b9abe5 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Tue, 2 Jun 2026 21:13:51 +0200 Subject: [PATCH] fix(crawler): queue chapter content in ascending number order (0.51.1) Both enqueue paths now order by chapters.number so the cron tick and the bookmark hook insert jobs from chapter 1 upward instead of source-discovery or random-UUID order. The lease query tiebreaks on created_at so jobs sharing a batch's scheduled_at come off the queue in insertion order, propagating the enqueue intent through to dequeue. Concurrent workers and per-CDN latency can still drift actual completion order. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- backend/src/crawler/jobs.rs | 8 +- backend/src/crawler/pipeline.rs | 9 ++- backend/tests/crawler_daemon.rs | 129 ++++++++++++++++++++++++++++++++ backend/tests/crawler_jobs.rs | 83 ++++++++++++++++++++ frontend/package.json | 2 +- 7 files changed, 227 insertions(+), 8 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index f70a05b..24810b3 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.51.0" +version = "0.51.1" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index e2246ba..dbfbc5d 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.51.0" +version = "0.51.1" edition = "2021" default-run = "mangalord" diff --git a/backend/src/crawler/jobs.rs b/backend/src/crawler/jobs.rs index 0d84931..ea7eaa9 100644 --- a/backend/src/crawler/jobs.rs +++ b/backend/src/crawler/jobs.rs @@ -104,6 +104,12 @@ pub async fn enqueue(pool: &PgPool, payload: &JobPayload) -> sqlx::Result>'kind'`; `None` means /// any kind. +/// +/// Ties on `scheduled_at` (the common case: a cron batch enqueues +/// everything with the same default `now()`) break by `created_at`, so +/// jobs come off the queue in insertion order. The enqueue paths insert +/// chapter-content jobs in ascending `chapters.number` order, so this +/// tiebreaker is what propagates that intent through to dequeue. pub async fn lease( pool: &PgPool, kind_filter: Option<&str>, @@ -118,7 +124,7 @@ pub async fn lease( WHERE (state = 'pending' OR (state = 'running' AND leased_until < now())) AND scheduled_at <= now() AND ($1::text IS NULL OR payload->>'kind' = $1) - ORDER BY scheduled_at + ORDER BY scheduled_at, created_at LIMIT $2 FOR UPDATE SKIP LOCKED ) diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index c6422c0..0dc5516 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -429,8 +429,8 @@ pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result now() - ($1::bigint || ' days')::interval ) - GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at - ORDER BY c.manga_id, c.created_at ASC + GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.number, c.created_at + ORDER BY c.manga_id, c.number ASC, c.created_at ASC "#, ) .bind(CHAPTER_DEAD_QUARANTINE_DAYS) @@ -471,7 +471,7 @@ pub async fn enqueue_pending_for_manga( ) -> anyhow::Result { let rows: Vec<(String, Uuid, String)> = sqlx::query_as( r#" - SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key + SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key FROM chapters c JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE c.manga_id = $1 @@ -484,7 +484,8 @@ pub async fn enqueue_pending_for_manga( AND cj.state = 'dead' AND cj.updated_at > now() - ($2::bigint || ' days')::interval ) - ORDER BY cs.source_id, c.id + GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.number, c.created_at + ORDER BY c.number ASC, c.created_at ASC, cs.source_id "#, ) .bind(manga_id) diff --git a/backend/tests/crawler_daemon.rs b/backend/tests/crawler_daemon.rs index e57ec59..16b308d 100644 --- a/backend/tests/crawler_daemon.rs +++ b/backend/tests/crawler_daemon.rs @@ -517,3 +517,132 @@ async fn enqueue_bookmarked_pending_resumes_after_quarantine_expires(pool: PgPoo ); } +/// Helper: insert a chapter with the given `number` and a non-dropped +/// source row, returning the chapter id. Used by the ordering tests so +/// the setup boilerplate doesn't drown the assertion. +async fn insert_pending_chapter( + pool: &PgPool, + manga_id: Uuid, + number: i32, + source_chapter_key: &str, +) -> Uuid { + let chapter_id: Uuid = sqlx::query_scalar( + "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, $2, 0) RETURNING id", + ) + .bind(manga_id) + .bind(number) + .fetch_one(pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ + VALUES ($1, $2, $3, $4)", + ) + .bind("target") + .bind(source_chapter_key) + .bind(chapter_id) + .bind(format!("https://example.com/{source_chapter_key}")) + .execute(pool) + .await + .unwrap(); + chapter_id +} + +#[sqlx::test(migrations = "./migrations")] +async fn enqueue_bookmarked_pending_queues_chapters_in_ascending_number_order(pool: PgPool) { + // Insert chapters with `number` values 3, 1, 2 in that insertion + // order — so `created_at` order (the previous tiebreaker) does NOT + // match number order. After enqueue + lease, the worker should see + // chapters 1, 2, 3 in that sequence. + let user_id: Uuid = sqlx::query_scalar( + "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", + ) + .bind("alice") + .bind("not-a-real-hash") + .fetch_one(&pool) + .await + .unwrap(); + let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") + .bind("Test") + .fetch_one(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + ) + .bind("target") + .bind("Target") + .bind("https://example.com") + .execute(&pool) + .await + .unwrap(); + let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await; + let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await; + let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await; + sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") + .bind(user_id) + .bind(manga_id) + .execute(&pool) + .await + .unwrap(); + + let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); + assert_eq!(summary.inserted, 3); + + let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60)) + .await + .unwrap(); + let leased_chapter_ids: Vec = leases + .iter() + .map(|l| match &l.payload { + JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id, + other => panic!("unexpected payload kind: {other:?}"), + }) + .collect(); + assert_eq!( + leased_chapter_ids, + vec![c1, c2, c3], + "chapters must be leased in ascending chapter-number order, not insertion order" + ); +} + +#[sqlx::test(migrations = "./migrations")] +async fn enqueue_pending_for_manga_queues_chapters_in_ascending_number_order(pool: PgPool) { + // Same scenario as above but exercising the bookmark-create hook path + // (`enqueue_pending_for_manga`) which has its own ORDER BY. + let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id") + .bind("Test") + .fetch_one(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + ) + .bind("target") + .bind("Target") + .bind("https://example.com") + .execute(&pool) + .await + .unwrap(); + let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await; + let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await; + let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await; + + let summary = pipeline::enqueue_pending_for_manga(&pool, manga_id) + .await + .unwrap(); + assert_eq!(summary.inserted, 3); + + let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60)) + .await + .unwrap(); + let leased_chapter_ids: Vec = leases + .iter() + .map(|l| match &l.payload { + JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id, + other => panic!("unexpected payload kind: {other:?}"), + }) + .collect(); + assert_eq!(leased_chapter_ids, vec![c1, c2, c3]); +} + diff --git a/backend/tests/crawler_jobs.rs b/backend/tests/crawler_jobs.rs index 5a2e77b..7c1fd3b 100644 --- a/backend/tests/crawler_jobs.rs +++ b/backend/tests/crawler_jobs.rs @@ -531,6 +531,89 @@ async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) { assert_eq!(remaining, vec![fresh_id], "only fresh row remains"); } +#[sqlx::test(migrations = "./migrations")] +async fn lease_ties_on_scheduled_at_break_by_created_at(pool: PgPool) { + // Locks in the tiebreaker that lets enqueue order survive the lease + // step: when many jobs share `scheduled_at` (the common cron-batch + // case), the worker must pick the earliest-inserted row, not whatever + // Postgres returns in heap order. The enqueue path inserts chapters + // in chapter-number order, so this tiebreaker is what makes "queue + // in rising order" observable at the dequeue side too. + let a = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let b = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + let c = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + _ => unreachable!(), + }; + + // Pin `scheduled_at` to a single literal instant (shared across all + // three rows — `now()` would yield a different microsecond per UPDATE + // and make scheduled_at the actual sort key). Reverse `created_at` + // against insertion order so heap order would give the wrong answer. + let shared_scheduled = chrono::Utc::now() - chrono::Duration::hours(1); + sqlx::query( + "UPDATE crawler_jobs \ + SET scheduled_at = $2, \ + created_at = $3 \ + WHERE id = $1", + ) + .bind(a) + .bind(shared_scheduled) + .bind(chrono::Utc::now() - chrono::Duration::seconds(10)) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "UPDATE crawler_jobs \ + SET scheduled_at = $2, \ + created_at = $3 \ + WHERE id = $1", + ) + .bind(b) + .bind(shared_scheduled) + .bind(chrono::Utc::now() - chrono::Duration::seconds(20)) + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "UPDATE crawler_jobs \ + SET scheduled_at = $2, \ + created_at = $3 \ + WHERE id = $1", + ) + .bind(c) + .bind(shared_scheduled) + .bind(chrono::Utc::now() - chrono::Duration::seconds(30)) + .execute(&pool) + .await + .unwrap(); + + let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60)) + .await + .unwrap(); + let order: Vec = leases.iter().map(|l| l.id).collect(); + assert_eq!( + order, + vec![c, b, a], + "lease must return jobs in created_at order when scheduled_at ties" + ); +} + #[sqlx::test(migrations = "./migrations")] async fn reap_done_zero_is_a_no_op(pool: PgPool) { let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) diff --git a/frontend/package.json b/frontend/package.json index 8a0188f..ea84563 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.51.0", + "version": "0.51.1", "private": true, "type": "module", "scripts": {