From 33f7e1907732b2b12be3393899c97a06c86f098f Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Fri, 29 May 2026 20:45:01 +0200 Subject: [PATCH] fix(crawler): serialize sync_manga_chapters per-manga (0.35.6) Two concurrent calls of sync_manga_chapters for the same manga both read seen_keys, both run the drop UPDATE filtered on `NOT (key = ANY $3)`, and the later commit can soft-drop a chapter the earlier had just inserted (lost-update under MVCC). Today the cron tick is the only caller and the daemon-level advisory lock keeps it single-flight, but that lock is held on one pool connection and doesn't actually serialize the *function*: any future caller (bookmark hook, admin-triggered re-sync, parallel worker) would race against the cron. Add `pg_advisory_xact_lock(hashtextextended(manga_id::text, 0))` at the start of the transaction. Auto-releases on commit/rollback so a panic mid-call can't strand the lock. Lock keyed per-manga so calls for different mangas still parallelize. Test sync_chapters_serializes_concurrent_calls_for_same_manga spawns two tokio tasks calling the function concurrently with overlapping chapter lists and asserts every chapter survives. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- backend/src/repo/crawler.rs | 16 ++++++ backend/tests/crawler_sync.rs | 105 ++++++++++++++++++++++++++++++++++ frontend/package.json | 2 +- 5 files changed, 124 insertions(+), 3 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 4b3ce7b..668d33a 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.35.5" +version = "0.35.6" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 41bc9d0..a463758 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.35.5" +version = "0.35.6" edition = "2021" default-run = "mangalord" diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index cfca00c..d04def2 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -328,6 +328,22 @@ pub async fn sync_manga_chapters( chapters: &[SourceChapterRef], ) -> sqlx::Result { let mut tx = pool.begin().await?; + // Per-manga advisory lock. Two concurrent calls for the same manga + // would otherwise both read `seen_keys`, both run the drop UPDATE + // filtered on `NOT (key = ANY $3)`, and the later commit could soft- + // drop a chapter the earlier commit had just inserted (lost-update + // shape under MVCC). `pg_advisory_xact_lock` is scoped to this + // transaction: it auto-releases on COMMIT/ROLLBACK so a Rust-side + // panic mid-call doesn't strand the lock. The single-arg int8 form + // keyed by `hashtextextended(manga_id::text, 0)` shares Postgres' + // global advisory-lock namespace with `CRON_LOCK_KEY`, but collision + // is 2^-64 per pair (a UUID-derived hash hitting the fixed cron key + // is effectively impossible). + sqlx::query("SELECT pg_advisory_xact_lock(hashtextextended($1::text, 0))") + .bind(manga_id) + .execute(&mut *tx) + .await?; + let mut diff = ChapterDiff::default(); let seen_keys: Vec = chapters .iter() diff --git a/backend/tests/crawler_sync.rs b/backend/tests/crawler_sync.rs index 725f2af..0baee4f 100644 --- a/backend/tests/crawler_sync.rs +++ b/backend/tests/crawler_sync.rs @@ -423,6 +423,111 @@ async fn sync_chapters_isolates_colliding_keys_across_mangas(pool: PgPool) { ); } +#[sqlx::test(migrations = "./migrations")] +async fn sync_chapters_serializes_concurrent_calls_for_same_manga(pool: PgPool) { + // Without the per-manga advisory lock, two concurrent calls would + // both read `seen_keys`, both run the drop UPDATE filtered on `NOT + // (key = ANY $3)`, and the later commit could soft-drop a chapter + // the earlier had just inserted. The lock makes the calls strictly + // sequential per-manga: whichever runs second sees the first one's + // committed chapters and treats their absence as a "dropped" signal + // only if the second list legitimately omits them. + // + // Concretely: pre-state [A]. Call X syncs [A, B]; call Y syncs + // [A, B, C]. Whatever the schedule, the final state must include + // *all three* chapters because neither call legitimately omits the + // other's contribution — both lists are supersets of each other's + // pre-existing rows. + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo Manga", "hash-1"); + let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + let manga_id = up.manga_id; + + // Pre-state: [A]. + let pre = vec![SourceChapterRef { + source_chapter_key: "A".into(), + number: 1, + title: Some("Ch.A".into()), + url: "https://x.example/foo/A".into(), + }]; + crawler::sync_manga_chapters(&pool, "target", manga_id, &pre) + .await + .unwrap(); + + // Two concurrent calls. Call X adds B; call Y adds B + C. Both keep + // A. Their drop branches would otherwise race against each other. + let list_x = vec![ + SourceChapterRef { + source_chapter_key: "A".into(), + number: 1, + title: Some("Ch.A".into()), + url: "https://x.example/foo/A".into(), + }, + SourceChapterRef { + source_chapter_key: "B".into(), + number: 2, + title: Some("Ch.B".into()), + url: "https://x.example/foo/B".into(), + }, + ]; + let list_y = vec![ + SourceChapterRef { + source_chapter_key: "A".into(), + number: 1, + title: Some("Ch.A".into()), + url: "https://x.example/foo/A".into(), + }, + SourceChapterRef { + source_chapter_key: "B".into(), + number: 2, + title: Some("Ch.B".into()), + url: "https://x.example/foo/B".into(), + }, + SourceChapterRef { + source_chapter_key: "C".into(), + number: 3, + title: Some("Ch.C".into()), + url: "https://x.example/foo/C".into(), + }, + ]; + let pool_x = pool.clone(); + let pool_y = pool.clone(); + let (rx, ry) = tokio::join!( + tokio::spawn(async move { + crawler::sync_manga_chapters(&pool_x, "target", manga_id, &list_x).await + }), + tokio::spawn(async move { + crawler::sync_manga_chapters(&pool_y, "target", manga_id, &list_y).await + }), + ); + rx.unwrap().expect("call X"); + ry.unwrap().expect("call Y"); + + // All three keys must survive with dropped_at NULL — the lock + // ensures the later call sees the earlier one's INSERTs and the + // drop UPDATE finds nothing to drop. + let alive: Vec = sqlx::query_scalar( + "SELECT cs.source_chapter_key \ + FROM chapter_sources cs \ + JOIN chapters ch ON ch.id = cs.chapter_id \ + WHERE ch.manga_id = $1 AND cs.dropped_at IS NULL \ + ORDER BY cs.source_chapter_key", + ) + .bind(manga_id) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!( + alive, + vec!["A".to_string(), "B".to_string(), "C".to_string()], + "all chapters survive concurrent syncs that both contain them" + ); +} + #[sqlx::test(migrations = "./migrations")] async fn mark_dropped_mangas_only_drops_unseen(pool: PgPool) { crawler::ensure_source(&pool, "target", "T", "https://x.example") diff --git a/frontend/package.json b/frontend/package.json index 14ff174..640b047 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.35.5", + "version": "0.35.6", "private": true, "type": "module", "scripts": {