fix(crawler): serialize sync_manga_chapters per-manga (0.35.6)
Two concurrent calls of sync_manga_chapters for the same manga both read seen_keys, both run the drop UPDATE filtered on `NOT (key = ANY $3)`, and the later commit can soft-drop a chapter the earlier had just inserted (lost-update under MVCC). Today the cron tick is the only caller and the daemon-level advisory lock keeps it single-flight, but that lock is held on one pool connection and doesn't actually serialize the *function*: any future caller (bookmark hook, admin-triggered re-sync, parallel worker) would race against the cron. Add `pg_advisory_xact_lock(hashtextextended(manga_id::text, 0))` at the start of the transaction. Auto-releases on commit/rollback so a panic mid-call can't strand the lock. Lock keyed per-manga so calls for different mangas still parallelize. Test sync_chapters_serializes_concurrent_calls_for_same_manga spawns two tokio tasks calling the function concurrently with overlapping chapter lists and asserts every chapter survives. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -423,6 +423,111 @@ async fn sync_chapters_isolates_colliding_keys_across_mangas(pool: PgPool) {
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn sync_chapters_serializes_concurrent_calls_for_same_manga(pool: PgPool) {
|
||||
// Without the per-manga advisory lock, two concurrent calls would
|
||||
// both read `seen_keys`, both run the drop UPDATE filtered on `NOT
|
||||
// (key = ANY $3)`, and the later commit could soft-drop a chapter
|
||||
// the earlier had just inserted. The lock makes the calls strictly
|
||||
// sequential per-manga: whichever runs second sees the first one's
|
||||
// committed chapters and treats their absence as a "dropped" signal
|
||||
// only if the second list legitimately omits them.
|
||||
//
|
||||
// Concretely: pre-state [A]. Call X syncs [A, B]; call Y syncs
|
||||
// [A, B, C]. Whatever the schedule, the final state must include
|
||||
// *all three* chapters because neither call legitimately omits the
|
||||
// other's contribution — both lists are supersets of each other's
|
||||
// pre-existing rows.
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
.await
|
||||
.unwrap();
|
||||
let m = sample_manga("foo", "Foo Manga", "hash-1");
|
||||
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
|
||||
.await
|
||||
.unwrap();
|
||||
let manga_id = up.manga_id;
|
||||
|
||||
// Pre-state: [A].
|
||||
let pre = vec![SourceChapterRef {
|
||||
source_chapter_key: "A".into(),
|
||||
number: 1,
|
||||
title: Some("Ch.A".into()),
|
||||
url: "https://x.example/foo/A".into(),
|
||||
}];
|
||||
crawler::sync_manga_chapters(&pool, "target", manga_id, &pre)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Two concurrent calls. Call X adds B; call Y adds B + C. Both keep
|
||||
// A. Their drop branches would otherwise race against each other.
|
||||
let list_x = vec![
|
||||
SourceChapterRef {
|
||||
source_chapter_key: "A".into(),
|
||||
number: 1,
|
||||
title: Some("Ch.A".into()),
|
||||
url: "https://x.example/foo/A".into(),
|
||||
},
|
||||
SourceChapterRef {
|
||||
source_chapter_key: "B".into(),
|
||||
number: 2,
|
||||
title: Some("Ch.B".into()),
|
||||
url: "https://x.example/foo/B".into(),
|
||||
},
|
||||
];
|
||||
let list_y = vec![
|
||||
SourceChapterRef {
|
||||
source_chapter_key: "A".into(),
|
||||
number: 1,
|
||||
title: Some("Ch.A".into()),
|
||||
url: "https://x.example/foo/A".into(),
|
||||
},
|
||||
SourceChapterRef {
|
||||
source_chapter_key: "B".into(),
|
||||
number: 2,
|
||||
title: Some("Ch.B".into()),
|
||||
url: "https://x.example/foo/B".into(),
|
||||
},
|
||||
SourceChapterRef {
|
||||
source_chapter_key: "C".into(),
|
||||
number: 3,
|
||||
title: Some("Ch.C".into()),
|
||||
url: "https://x.example/foo/C".into(),
|
||||
},
|
||||
];
|
||||
let pool_x = pool.clone();
|
||||
let pool_y = pool.clone();
|
||||
let (rx, ry) = tokio::join!(
|
||||
tokio::spawn(async move {
|
||||
crawler::sync_manga_chapters(&pool_x, "target", manga_id, &list_x).await
|
||||
}),
|
||||
tokio::spawn(async move {
|
||||
crawler::sync_manga_chapters(&pool_y, "target", manga_id, &list_y).await
|
||||
}),
|
||||
);
|
||||
rx.unwrap().expect("call X");
|
||||
ry.unwrap().expect("call Y");
|
||||
|
||||
// All three keys must survive with dropped_at NULL — the lock
|
||||
// ensures the later call sees the earlier one's INSERTs and the
|
||||
// drop UPDATE finds nothing to drop.
|
||||
let alive: Vec<String> = sqlx::query_scalar(
|
||||
"SELECT cs.source_chapter_key \
|
||||
FROM chapter_sources cs \
|
||||
JOIN chapters ch ON ch.id = cs.chapter_id \
|
||||
WHERE ch.manga_id = $1 AND cs.dropped_at IS NULL \
|
||||
ORDER BY cs.source_chapter_key",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
alive,
|
||||
vec!["A".to_string(), "B".to_string(), "C".to_string()],
|
||||
"all chapters survive concurrent syncs that both contain them"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn mark_dropped_mangas_only_drops_unseen(pool: PgPool) {
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
|
||||
Reference in New Issue
Block a user