fix(crawler): quarantine recently-dead chapters from re-enqueue (0.35.4)
The partial dedup index only blocks (pending|running) duplicates, so once a SyncChapterContent job transitions to 'dead' (max_attempts exhausted) the slot frees. Every subsequent cron tick re-enqueued the chapter — page_count = 0 and dropped_at IS NULL stay true — burned another max_attempts retries, and died again. Permanent-failure chapters spun forever. enqueue_bookmarked_pending and enqueue_pending_for_manga now skip chapters whose latest sync_chapter_content job is dead within CHAPTER_DEAD_QUARANTINE_DAYS (7). A failed chapter goes silent for a week, then gets one more shot — long enough for a transient site issue to resolve, short enough that permanent failures don't stay permanent if conditions change. Two integration tests pin both halves of the contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
||||
|
||||
[[package]]
|
||||
name = "mangalord"
|
||||
version = "0.35.3"
|
||||
version = "0.35.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mangalord"
|
||||
version = "0.35.3"
|
||||
version = "0.35.4"
|
||||
edition = "2021"
|
||||
default-run = "mangalord"
|
||||
|
||||
|
||||
@@ -319,8 +319,20 @@ pub async fn run_metadata_pass(
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Quarantine window for chapters whose latest `SyncChapterContent` job is
|
||||
/// `dead`. The partial dedup index `crawler_jobs_chapter_content_dedup_idx`
|
||||
/// only blocks `(pending|running)` duplicates, so without this gate a
|
||||
/// permanently-failing chapter is re-enqueued every cron tick, burns
|
||||
/// `max_attempts` retries, dies again, and spins forever. With the gate,
|
||||
/// dead chapters get a week of silence before the next attempt — long
|
||||
/// enough for a transient site issue to resolve, short enough that
|
||||
/// permanent failures don't stay permanent if conditions change.
|
||||
const CHAPTER_DEAD_QUARANTINE_DAYS: i64 = 7;
|
||||
|
||||
/// Enqueue a `SyncChapterContent` job for every chapter of *any* bookmarked
|
||||
/// manga that still has `page_count = 0` and a non-dropped source row.
|
||||
/// Chapters whose latest job is `dead` within `CHAPTER_DEAD_QUARANTINE_DAYS`
|
||||
/// are excluded to break the dead-letter spin.
|
||||
/// Returns `(inserted, skipped)` counts. Dedup index handles repeats.
|
||||
pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<EnqueueSummary> {
|
||||
let rows: Vec<(String, Uuid, String)> = sqlx::query_as(
|
||||
@@ -331,10 +343,18 @@ pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<Enqueue
|
||||
JOIN chapter_sources cs ON cs.chapter_id = c.id
|
||||
WHERE c.page_count = 0
|
||||
AND cs.dropped_at IS NULL
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM crawler_jobs cj
|
||||
WHERE cj.payload->>'kind' = 'sync_chapter_content'
|
||||
AND cj.payload->>'chapter_id' = c.id::text
|
||||
AND cj.state = 'dead'
|
||||
AND cj.updated_at > now() - ($1::bigint || ' days')::interval
|
||||
)
|
||||
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at
|
||||
ORDER BY c.manga_id, c.created_at ASC
|
||||
"#,
|
||||
)
|
||||
.bind(CHAPTER_DEAD_QUARANTINE_DAYS)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.context("query bookmarked-pending chapters")?;
|
||||
@@ -363,7 +383,9 @@ pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<Enqueue
|
||||
}
|
||||
|
||||
/// Enqueue chapter-content jobs for a *single* manga (the bookmark-create
|
||||
/// hook). Same dedup semantics as [`enqueue_bookmarked_pending`].
|
||||
/// hook). Same dedup semantics as [`enqueue_bookmarked_pending`], including
|
||||
/// the dead-letter quarantine — a freshly bookmarked manga should not
|
||||
/// burn retries on chapters that just died on the cron tick.
|
||||
pub async fn enqueue_pending_for_manga(
|
||||
pool: &PgPool,
|
||||
manga_id: Uuid,
|
||||
@@ -376,10 +398,18 @@ pub async fn enqueue_pending_for_manga(
|
||||
WHERE c.manga_id = $1
|
||||
AND c.page_count = 0
|
||||
AND cs.dropped_at IS NULL
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM crawler_jobs cj
|
||||
WHERE cj.payload->>'kind' = 'sync_chapter_content'
|
||||
AND cj.payload->>'chapter_id' = c.id::text
|
||||
AND cj.state = 'dead'
|
||||
AND cj.updated_at > now() - ($2::bigint || ' days')::interval
|
||||
)
|
||||
ORDER BY cs.source_id, c.id
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(CHAPTER_DEAD_QUARANTINE_DAYS)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
.context("query pending chapters for manga")?;
|
||||
|
||||
@@ -370,3 +370,150 @@ async fn enqueue_bookmarked_pending_skips_dropped_sources(pool: PgPool) {
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn enqueue_bookmarked_pending_skips_recently_dead_chapters(pool: PgPool) {
|
||||
// Setup: a chapter whose last SyncChapterContent job died yesterday.
|
||||
// The cron tick must not re-enqueue — without the quarantine, the
|
||||
// chapter would spin: re-enqueue → max_attempts retries → dies again
|
||||
// → re-enqueue next tick → forever.
|
||||
let user_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id",
|
||||
)
|
||||
.bind("alice")
|
||||
.bind("not-a-real-hash")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let manga_id: Uuid =
|
||||
sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
|
||||
.bind("Test")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("Target")
|
||||
.bind("https://example.com")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let chapter_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("ch1")
|
||||
.bind(chapter_id)
|
||||
.bind("https://example.com/ch1")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)")
|
||||
.bind(user_id)
|
||||
.bind(manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
// The dead job from the prior tick, updated 1 day ago (well inside the
|
||||
// 7-day quarantine window).
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_jobs (payload, state, updated_at) \
|
||||
VALUES ($1::jsonb, 'dead', now() - interval '1 day')",
|
||||
)
|
||||
.bind(serde_json::json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id.to_string(),
|
||||
"source_chapter_key": "ch1",
|
||||
}))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap();
|
||||
assert_eq!(summary.inserted, 0, "recently dead chapter is quarantined");
|
||||
assert_eq!(summary.skipped, 0);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn enqueue_bookmarked_pending_resumes_after_quarantine_expires(pool: PgPool) {
|
||||
// Same setup as above but the dead job is 10 days old — past the
|
||||
// 7-day quarantine. The chapter should be re-enqueued so a once-failed
|
||||
// chapter eventually gets a second shot at success.
|
||||
let user_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id",
|
||||
)
|
||||
.bind("alice")
|
||||
.bind("not-a-real-hash")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let manga_id: Uuid =
|
||||
sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
|
||||
.bind("Test")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("Target")
|
||||
.bind("https://example.com")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let chapter_id: Uuid = sqlx::query_scalar(
|
||||
"INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind("target")
|
||||
.bind("ch1")
|
||||
.bind(chapter_id)
|
||||
.bind("https://example.com/ch1")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)")
|
||||
.bind(user_id)
|
||||
.bind(manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_jobs (payload, state, updated_at) \
|
||||
VALUES ($1::jsonb, 'dead', now() - interval '10 days')",
|
||||
)
|
||||
.bind(serde_json::json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id.to_string(),
|
||||
"source_chapter_key": "ch1",
|
||||
}))
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap();
|
||||
assert_eq!(
|
||||
summary.inserted, 1,
|
||||
"dead chapter is re-enqueued after quarantine expires"
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user