fix(crawler): queue chapter content in ascending number order (0.51.1)
Both enqueue paths now order by chapters.number so the cron tick and the bookmark hook insert jobs from chapter 1 upward instead of source-discovery or random-UUID order. The lease query tiebreaks on created_at so jobs sharing a batch's scheduled_at come off the queue in insertion order, propagating the enqueue intent through to dequeue. Concurrent workers and per-CDN latency can still drift actual completion order. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
||||
|
||||
[[package]]
|
||||
name = "mangalord"
|
||||
version = "0.51.0"
|
||||
version = "0.51.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mangalord"
|
||||
version = "0.51.0"
|
||||
version = "0.51.1"
|
||||
edition = "2021"
|
||||
default-run = "mangalord"
|
||||
|
||||
|
||||
@@ -104,6 +104,12 @@ pub async fn enqueue(pool: &PgPool, payload: &JobPayload) -> sqlx::Result<Enqueu
|
||||
///
|
||||
/// `kind_filter` matches against `payload->>'kind'`; `None` means
|
||||
/// any kind.
|
||||
///
|
||||
/// Ties on `scheduled_at` (the common case: a cron batch enqueues
|
||||
/// everything with the same default `now()`) break by `created_at`, so
|
||||
/// jobs come off the queue in insertion order. The enqueue paths insert
|
||||
/// chapter-content jobs in ascending `chapters.number` order, so this
|
||||
/// tiebreaker is what propagates that intent through to dequeue.
|
||||
pub async fn lease(
|
||||
pool: &PgPool,
|
||||
kind_filter: Option<&str>,
|
||||
@@ -118,7 +124,7 @@ pub async fn lease(
|
||||
WHERE (state = 'pending' OR (state = 'running' AND leased_until < now()))
|
||||
AND scheduled_at <= now()
|
||||
AND ($1::text IS NULL OR payload->>'kind' = $1)
|
||||
ORDER BY scheduled_at
|
||||
ORDER BY scheduled_at, created_at
|
||||
LIMIT $2
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
|
||||
@@ -429,8 +429,8 @@ pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<Enqueue
|
||||
AND cj.state = 'dead'
|
||||
AND cj.updated_at > now() - ($1::bigint || ' days')::interval
|
||||
)
|
||||
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at
|
||||
ORDER BY c.manga_id, c.created_at ASC
|
||||
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.number, c.created_at
|
||||
ORDER BY c.manga_id, c.number ASC, c.created_at ASC
|
||||
"#,
|
||||
)
|
||||
.bind(CHAPTER_DEAD_QUARANTINE_DAYS)
|
||||
@@ -471,7 +471,7 @@ pub async fn enqueue_pending_for_manga(
|
||||
) -> anyhow::Result<EnqueueSummary> {
|
||||
let rows: Vec<(String, Uuid, String)> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
|
||||
SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
|
||||
FROM chapters c
|
||||
JOIN chapter_sources cs ON cs.chapter_id = c.id
|
||||
WHERE c.manga_id = $1
|
||||
@@ -484,7 +484,8 @@ pub async fn enqueue_pending_for_manga(
|
||||
AND cj.state = 'dead'
|
||||
AND cj.updated_at > now() - ($2::bigint || ' days')::interval
|
||||
)
|
||||
ORDER BY cs.source_id, c.id
|
||||
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.number, c.created_at
|
||||
ORDER BY c.number ASC, c.created_at ASC, cs.source_id
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
|
||||
@@ -517,3 +517,132 @@ async fn enqueue_bookmarked_pending_resumes_after_quarantine_expires(pool: PgPoo
|
||||
);
|
||||
}
|
||||
|
||||
/// Helper: insert a chapter with the given `number` and a non-dropped
|
||||
/// source row, returning the chapter id. Used by the ordering tests so
|
||||
/// the setup boilerplate doesn't drown the assertion.
|
||||
async fn insert_pending_chapter(
|
||||
pool: &PgPool,
|
||||
manga_id: Uuid,
|
||||
number: i32,
|
||||
source_chapter_key: &str,
|
||||
) -> Uuid {
|
||||
let chapter_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, $2, 0) RETURNING id",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(number)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind("target")
|
||||
.bind(source_chapter_key)
|
||||
.bind(chapter_id)
|
||||
.bind(format!("https://example.com/{source_chapter_key}"))
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
chapter_id
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn enqueue_bookmarked_pending_queues_chapters_in_ascending_number_order(pool: PgPool) {
|
||||
// Insert chapters with `number` values 3, 1, 2 in that insertion
|
||||
// order — so `created_at` order (the previous tiebreaker) does NOT
|
||||
// match number order. After enqueue + lease, the worker should see
|
||||
// chapters 1, 2, 3 in that sequence.
|
||||
let user_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id",
|
||||
)
|
||||
.bind("alice")
|
||||
.bind("not-a-real-hash")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
|
||||
.bind("Test")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("Target")
|
||||
.bind("https://example.com")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await;
|
||||
let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await;
|
||||
let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await;
|
||||
sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)")
|
||||
.bind(user_id)
|
||||
.bind(manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap();
|
||||
assert_eq!(summary.inserted, 3);
|
||||
|
||||
let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60))
|
||||
.await
|
||||
.unwrap();
|
||||
let leased_chapter_ids: Vec<Uuid> = leases
|
||||
.iter()
|
||||
.map(|l| match &l.payload {
|
||||
JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id,
|
||||
other => panic!("unexpected payload kind: {other:?}"),
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(
|
||||
leased_chapter_ids,
|
||||
vec![c1, c2, c3],
|
||||
"chapters must be leased in ascending chapter-number order, not insertion order"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn enqueue_pending_for_manga_queues_chapters_in_ascending_number_order(pool: PgPool) {
|
||||
// Same scenario as above but exercising the bookmark-create hook path
|
||||
// (`enqueue_pending_for_manga`) which has its own ORDER BY.
|
||||
let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
|
||||
.bind("Test")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("Target")
|
||||
.bind("https://example.com")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await;
|
||||
let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await;
|
||||
let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await;
|
||||
|
||||
let summary = pipeline::enqueue_pending_for_manga(&pool, manga_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(summary.inserted, 3);
|
||||
|
||||
let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60))
|
||||
.await
|
||||
.unwrap();
|
||||
let leased_chapter_ids: Vec<Uuid> = leases
|
||||
.iter()
|
||||
.map(|l| match &l.payload {
|
||||
JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id,
|
||||
other => panic!("unexpected payload kind: {other:?}"),
|
||||
})
|
||||
.collect();
|
||||
assert_eq!(leased_chapter_ids, vec![c1, c2, c3]);
|
||||
}
|
||||
|
||||
|
||||
@@ -531,6 +531,89 @@ async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) {
|
||||
assert_eq!(remaining, vec![fresh_id], "only fresh row remains");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn lease_ties_on_scheduled_at_break_by_created_at(pool: PgPool) {
|
||||
// Locks in the tiebreaker that lets enqueue order survive the lease
|
||||
// step: when many jobs share `scheduled_at` (the common cron-batch
|
||||
// case), the worker must pick the earliest-inserted row, not whatever
|
||||
// Postgres returns in heap order. The enqueue path inserts chapters
|
||||
// in chapter-number order, so this tiebreaker is what makes "queue
|
||||
// in rising order" observable at the dequeue side too.
|
||||
let a = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
EnqueueResult::Inserted(id) => id,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let b = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
EnqueueResult::Inserted(id) => id,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let c = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
EnqueueResult::Inserted(id) => id,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
// Pin `scheduled_at` to a single literal instant (shared across all
|
||||
// three rows — `now()` would yield a different microsecond per UPDATE
|
||||
// and make scheduled_at the actual sort key). Reverse `created_at`
|
||||
// against insertion order so heap order would give the wrong answer.
|
||||
let shared_scheduled = chrono::Utc::now() - chrono::Duration::hours(1);
|
||||
sqlx::query(
|
||||
"UPDATE crawler_jobs \
|
||||
SET scheduled_at = $2, \
|
||||
created_at = $3 \
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(a)
|
||||
.bind(shared_scheduled)
|
||||
.bind(chrono::Utc::now() - chrono::Duration::seconds(10))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"UPDATE crawler_jobs \
|
||||
SET scheduled_at = $2, \
|
||||
created_at = $3 \
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(b)
|
||||
.bind(shared_scheduled)
|
||||
.bind(chrono::Utc::now() - chrono::Duration::seconds(20))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"UPDATE crawler_jobs \
|
||||
SET scheduled_at = $2, \
|
||||
created_at = $3 \
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(c)
|
||||
.bind(shared_scheduled)
|
||||
.bind(chrono::Utc::now() - chrono::Duration::seconds(30))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60))
|
||||
.await
|
||||
.unwrap();
|
||||
let order: Vec<Uuid> = leases.iter().map(|l| l.id).collect();
|
||||
assert_eq!(
|
||||
order,
|
||||
vec![c, b, a],
|
||||
"lease must return jobs in created_at order when scheduled_at ties"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn reap_done_zero_is_a_no_op(pool: PgPool) {
|
||||
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
|
||||
Reference in New Issue
Block a user