fix(crawler): close walker race against site reordering (0.35.1)
The target site orders by update_date DESC, and any new or updated manga pushes everyone down by one slot. The paginated walker was blind to this drift: * Backfill (page last -> 1): shifts push items into pages already finished. The displaced manga was silently missed; with mark_dropped_mangas running on a fully-completed walk, items even got false-dropped because last_seen_at was stale. * Incremental (page 1 -> last): a shift causes the slot-last item of an already-read page to reappear on the next page, leading to a redundant fetch_manga and an inflated consecutive_unchanged streak. Fix is two-pronged: 1. Backfill boundary re-check. After fetching each page P, re-fetch the previously-walked page P+1 and check where its old slot-0 key now sits. If it slid to slot K, the first K entries are items that used to live on P and slid past us; they get appended to the batch. If the anchor is gone entirely (multi-page shift or it was bumped to page 1), the whole re-fetched page is processed conservatively and the pipeline dedup absorbs the noise. The re-check must be the *last* navigation of the iteration to close the within-iteration race. 2. Run-scoped dedup in run_metadata_pass. A HashSet<String> of source_manga_keys avoids double-processing. The set uses a contains-then-insert pattern with insert firing *after* a successful upsert, so a transient fetch/upsert failure leaves the key retryable if it reappears later in the same pass (via the boundary re-check or another batch). Incremental mode does not run the re-check (shifts move in the same direction as the walk); only the dedup helps it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.35.0"
|
version = "0.35.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"argon2",
|
"argon2",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.35.0"
|
version = "0.35.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
default-run = "mangalord"
|
default-run = "mangalord"
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,8 @@
|
|||||||
//! that fan out chapter-content work. Shared between the daemon (cron tick)
|
//! that fan out chapter-content work. Shared between the daemon (cron tick)
|
||||||
//! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep.
|
//! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
@@ -105,6 +107,14 @@ pub async fn run_metadata_pass(
|
|||||||
.context("discover failed")?;
|
.context("discover failed")?;
|
||||||
|
|
||||||
let mut stats = MetadataStats::default();
|
let mut stats = MetadataStats::default();
|
||||||
|
// Run-scoped dedup of `source_manga_key`s already processed this pass.
|
||||||
|
// Backfill: the walker may append displaced refs that also appear on
|
||||||
|
// the page we're about to visit naturally; skipping the dup avoids
|
||||||
|
// redundant fetch_manga + upsert. Incremental: a shift causes the
|
||||||
|
// slot-last item of the page we just read to reappear at slot 0 of
|
||||||
|
// the next page; skipping it preserves the consecutive_unchanged
|
||||||
|
// streak math instead of inflating it with a re-confirm.
|
||||||
|
let mut seen: HashSet<String> = HashSet::new();
|
||||||
let mut consecutive_unchanged: usize = 0;
|
let mut consecutive_unchanged: usize = 0;
|
||||||
let mut walked_to_completion = false;
|
let mut walked_to_completion = false;
|
||||||
let mut hit_limit = false;
|
let mut hit_limit = false;
|
||||||
@@ -124,6 +134,23 @@ pub async fn run_metadata_pass(
|
|||||||
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
|
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
|
||||||
break 'outer;
|
break 'outer;
|
||||||
}
|
}
|
||||||
|
// Skip refs we've already *successfully* processed this pass.
|
||||||
|
// Checking `contains` here (rather than `insert`) keeps the key
|
||||||
|
// out of `seen` on failure paths below, so a transient fetch or
|
||||||
|
// upsert error gets a second chance if the ref reappears via the
|
||||||
|
// backfill boundary re-check or another batch. Done *before*
|
||||||
|
// counting toward `stats.discovered` (the skipped ref did no
|
||||||
|
// work) and *before* touching `consecutive_unchanged` (a
|
||||||
|
// `continue` here preserves the streak rather than resetting or
|
||||||
|
// inflating it). The matching `seen.insert(...)` lives just
|
||||||
|
// after the successful upsert below.
|
||||||
|
if seen.contains(&r.source_manga_key) {
|
||||||
|
tracing::debug!(
|
||||||
|
key = %r.source_manga_key,
|
||||||
|
"skip already-seen key in this run"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
stats.discovered += 1;
|
stats.discovered += 1;
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
idx = stats.discovered,
|
idx = stats.discovered,
|
||||||
@@ -161,6 +188,10 @@ pub async fn run_metadata_pass(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
stats.upserted += 1;
|
stats.upserted += 1;
|
||||||
|
// Record success in the dedup set. Cover and chapter-sync
|
||||||
|
// failures below are non-fatal and don't roll this back —
|
||||||
|
// metadata is the durable source of truth for the dedup.
|
||||||
|
seen.insert(r.source_manga_key.clone());
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
key = %manga.source_manga_key,
|
key = %manga.source_manga_key,
|
||||||
manga_id = %upsert.manga_id,
|
manga_id = %upsert.manga_id,
|
||||||
@@ -473,4 +504,31 @@ mod tests {
|
|||||||
};
|
};
|
||||||
assert!(should_stop(mode, 0));
|
assert!(should_stop(mode, 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn run_scoped_seen_set_skips_duplicate_source_manga_keys() {
|
||||||
|
// Pins the per-ref loop contract: `contains` gates whether work
|
||||||
|
// runs, and `insert` only fires on the success path (after upsert).
|
||||||
|
// A failed ref that reappears later in the same pass must get a
|
||||||
|
// second chance — that's why the loop uses contains-then-insert
|
||||||
|
// instead of insert-and-skip-on-collision.
|
||||||
|
let mut seen: HashSet<String> = HashSet::new();
|
||||||
|
|
||||||
|
// First sighting of a key: not yet seen → loop proceeds.
|
||||||
|
assert!(!seen.contains("manga-a"), "first sighting is unseen");
|
||||||
|
// Simulate a failed fetch_manga: do NOT insert. Next sighting must
|
||||||
|
// still be considered unseen so the loop retries it.
|
||||||
|
assert!(!seen.contains("manga-a"), "failed key is still retryable");
|
||||||
|
|
||||||
|
// Now simulate a successful upsert — insert is called.
|
||||||
|
seen.insert("manga-a".to_string());
|
||||||
|
// Subsequent sightings of the same key are skipped.
|
||||||
|
assert!(seen.contains("manga-a"), "successful key is now seen");
|
||||||
|
|
||||||
|
// Distinct keys never collide.
|
||||||
|
assert!(!seen.contains("manga-b"), "different key independent");
|
||||||
|
seen.insert("manga-b".to_string());
|
||||||
|
assert!(seen.contains("manga-b"));
|
||||||
|
assert!(seen.contains("manga-a"), "first key still recorded");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -103,6 +103,7 @@ impl Source for TargetSource {
|
|||||||
backfill,
|
backfill,
|
||||||
pages_remaining: order,
|
pages_remaining: order,
|
||||||
first_page_html: Some(first_html),
|
first_page_html: Some(first_html),
|
||||||
|
prev: None,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -160,6 +161,13 @@ struct TargetSourceWalker {
|
|||||||
backfill: bool,
|
backfill: bool,
|
||||||
pages_remaining: VecDeque<i32>,
|
pages_remaining: VecDeque<i32>,
|
||||||
first_page_html: Option<String>,
|
first_page_html: Option<String>,
|
||||||
|
/// Page number and slot-0 `source_manga_key` of the previously-walked
|
||||||
|
/// page. Updated after every batch (cheap, unconditional) but only
|
||||||
|
/// *read* by the boundary re-check, which itself runs only in backfill
|
||||||
|
/// mode. A single `Option` so the half-set state (`page_num` known but
|
||||||
|
/// `key` not, or vice versa) is unrepresentable; `None` here suppresses
|
||||||
|
/// the next iteration's re-check (no anchor to compare against).
|
||||||
|
prev: Option<(i32, String)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -206,10 +214,77 @@ impl DiscoverWalk for TargetSourceWalker {
|
|||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
|
// Capture slot-0 of the page as the site presents it (newest first)
|
||||||
|
// *before* the backfill `.reverse()` below — after reversal slot 0
|
||||||
|
// is the oldest entry, which would defeat the next iteration's
|
||||||
|
// boundary re-check.
|
||||||
|
let current_first = page_refs.first().map(|r| r.source_manga_key.clone());
|
||||||
|
|
||||||
|
// Boundary re-check (backfill only). The site orders by update_date
|
||||||
|
// DESC, so shifts during the walk push items from low-numbered pages
|
||||||
|
// to high-numbered ones — into pages backfill has already finished.
|
||||||
|
// After fetching this page, re-fetch the *previous* iteration's page
|
||||||
|
// and look for items that slid past us mid-walk. Must be the last
|
||||||
|
// navigation of the iteration to close the within-iteration race.
|
||||||
|
let mut displaced: Vec<SourceMangaRef> = Vec::new();
|
||||||
|
if self.backfill {
|
||||||
|
if let Some((prev_page_num, prev_first_key)) = self.prev.clone() {
|
||||||
|
match recheck_prev_page(ctx, &self.base_url, prev_page_num).await {
|
||||||
|
Ok(refetched) => {
|
||||||
|
let (d, outcome) = detect_displaced(&prev_first_key, &refetched);
|
||||||
|
match outcome {
|
||||||
|
DisplacementOutcome::NoShift => {}
|
||||||
|
DisplacementOutcome::Shifted(k) => {
|
||||||
|
tracing::info!(
|
||||||
|
page_num,
|
||||||
|
prev_page_num,
|
||||||
|
k,
|
||||||
|
"boundary re-check: shift detected, recovering displaced refs"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
DisplacementOutcome::NotFoundFallback => {
|
||||||
|
tracing::warn!(
|
||||||
|
page_num,
|
||||||
|
prev_page_num,
|
||||||
|
prev_first_key = %prev_first_key,
|
||||||
|
refetched_len = refetched.len(),
|
||||||
|
"boundary re-check: prev_first not found, falling back to full re-process"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
displaced = d;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
page_num,
|
||||||
|
prev_page_num,
|
||||||
|
error = ?e,
|
||||||
|
"boundary re-check: re-fetch failed, skipping check for this boundary"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember the boundary for the next iteration. An empty `page_refs`
|
||||||
|
// yields `None`, which intentionally suppresses the next re-check
|
||||||
|
// (no anchor to compare against).
|
||||||
|
self.prev = current_first.map(|key| (page_num, key));
|
||||||
|
|
||||||
if self.backfill {
|
if self.backfill {
|
||||||
page_refs.reverse();
|
page_refs.reverse();
|
||||||
}
|
}
|
||||||
tracing::info!(page_num, count = page_refs.len(), "page walked");
|
// Append displaced refs to the end of the batch. Order doesn't
|
||||||
|
// affect backfill semantics (no `consecutive_unchanged` streak in
|
||||||
|
// this mode), and the pipeline-level dedup set handles any overlap.
|
||||||
|
let displaced_count = displaced.len();
|
||||||
|
page_refs.extend(displaced);
|
||||||
|
tracing::info!(
|
||||||
|
page_num,
|
||||||
|
count = page_refs.len(),
|
||||||
|
displaced = displaced_count,
|
||||||
|
"page walked"
|
||||||
|
);
|
||||||
Ok(Some(page_refs))
|
Ok(Some(page_refs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -592,6 +667,67 @@ fn compute_metadata_hash(m: &SourceManga) -> String {
|
|||||||
format!("{:x}", h.finalize())
|
format!("{:x}", h.finalize())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Outcome of a boundary re-check, surfaced for telemetry + tests. The
|
||||||
|
/// caller's recovery action is determined by the returned `Vec`; this enum
|
||||||
|
/// only labels which branch fired so logging and assertions can distinguish
|
||||||
|
/// "site is stable" from "we papered over a shift" from "we fell back".
|
||||||
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
|
enum DisplacementOutcome {
|
||||||
|
/// `prev_first` is still at slot 0 of the re-fetched page — no shift
|
||||||
|
/// happened between the prior iteration and this one's re-check.
|
||||||
|
NoShift,
|
||||||
|
/// `prev_first` slid down to slot `K`; the first `K` entries are items
|
||||||
|
/// that used to live on the page we just walked.
|
||||||
|
Shifted(usize),
|
||||||
|
/// `prev_first` is gone from the re-fetched page — multiple pages-worth
|
||||||
|
/// of shifts happened, or it was bumped to page 1. Treat all refetched
|
||||||
|
/// entries as potentially displaced; the pipeline-level dedup absorbs
|
||||||
|
/// the noise.
|
||||||
|
NotFoundFallback,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compare a previously-walked page's slot-0 key against a fresh fetch of
|
||||||
|
/// that page. Returns the entries that appear *ahead* of `prev_first` in
|
||||||
|
/// the re-fetched page — items that slid in from the page the caller is
|
||||||
|
/// currently processing.
|
||||||
|
fn detect_displaced(
|
||||||
|
prev_first: &str,
|
||||||
|
refetched: &[SourceMangaRef],
|
||||||
|
) -> (Vec<SourceMangaRef>, DisplacementOutcome) {
|
||||||
|
let Some(k) = refetched
|
||||||
|
.iter()
|
||||||
|
.position(|r| r.source_manga_key == prev_first)
|
||||||
|
else {
|
||||||
|
return (refetched.to_vec(), DisplacementOutcome::NotFoundFallback);
|
||||||
|
};
|
||||||
|
if k == 0 {
|
||||||
|
(Vec::new(), DisplacementOutcome::NoShift)
|
||||||
|
} else {
|
||||||
|
(refetched[..k].to_vec(), DisplacementOutcome::Shifted(k))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Re-fetch a previously-walked listing page to feed [`detect_displaced`].
|
||||||
|
/// Uses the same retry chain as the primary page fetch in `next_batch` so
|
||||||
|
/// a transient hiccup doesn't tank an entire backfill walk.
|
||||||
|
async fn recheck_prev_page(
|
||||||
|
ctx: &FetchContext<'_>,
|
||||||
|
base_url: &str,
|
||||||
|
page_num: i32,
|
||||||
|
) -> Result<Vec<SourceMangaRef>, PageError> {
|
||||||
|
retry_on_transient(
|
||||||
|
|| async {
|
||||||
|
let url = page_url(base_url, page_num);
|
||||||
|
let html = navigate(ctx, &url).await?;
|
||||||
|
let doc = scraper::Html::parse_document(&html);
|
||||||
|
parse_manga_list_from(&doc)
|
||||||
|
},
|
||||||
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -990,4 +1126,100 @@ mod tests {
|
|||||||
let incremental = build_page_order(Some(1), false);
|
let incremental = build_page_order(Some(1), false);
|
||||||
assert_eq!(Vec::from(incremental), vec![1]);
|
assert_eq!(Vec::from(incremental), vec![1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_ref(key: &str) -> SourceMangaRef {
|
||||||
|
SourceMangaRef {
|
||||||
|
source_manga_key: key.to_string(),
|
||||||
|
title: key.to_string(),
|
||||||
|
url: format!("https://target.example/manga/{key}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_no_shift_when_prev_first_still_at_slot_zero() {
|
||||||
|
let refetched = vec![make_ref("A"), make_ref("B"), make_ref("C")];
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
assert!(displaced.is_empty());
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::NoShift);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_one_shift_returns_single_intruder() {
|
||||||
|
let refetched = vec![make_ref("X"), make_ref("A"), make_ref("B")];
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
assert_eq!(displaced.len(), 1);
|
||||||
|
assert_eq!(displaced[0].source_manga_key, "X");
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::Shifted(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_multi_shift_returns_all_intruders() {
|
||||||
|
let refetched = vec![
|
||||||
|
make_ref("X1"),
|
||||||
|
make_ref("X2"),
|
||||||
|
make_ref("X3"),
|
||||||
|
make_ref("A"),
|
||||||
|
make_ref("B"),
|
||||||
|
make_ref("C"),
|
||||||
|
];
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
let keys: Vec<&str> = displaced
|
||||||
|
.iter()
|
||||||
|
.map(|r| r.source_manga_key.as_str())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(keys, vec!["X1", "X2", "X3"]);
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::Shifted(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_full_page_shift_returns_all_but_last() {
|
||||||
|
// `prev_first` at the last slot — every preceding entry is an
|
||||||
|
// intruder shifted in from the page the caller is processing.
|
||||||
|
let mut refetched: Vec<_> = (0..9).map(|i| make_ref(&format!("X{i}"))).collect();
|
||||||
|
refetched.push(make_ref("A"));
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
assert_eq!(displaced.len(), 9);
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::Shifted(9));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_not_found_returns_full_page_for_conservative_recovery() {
|
||||||
|
// > page-worth of shifts (or `prev_first` itself was bumped to
|
||||||
|
// page 1): can't pinpoint K, fall back to "process everything";
|
||||||
|
// pipeline dedup absorbs the noise.
|
||||||
|
let refetched = vec![make_ref("Y"), make_ref("Z")];
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
let keys: Vec<&str> = displaced
|
||||||
|
.iter()
|
||||||
|
.map(|r| r.source_manga_key.as_str())
|
||||||
|
.collect();
|
||||||
|
assert_eq!(keys, vec!["Y", "Z"]);
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::NotFoundFallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_empty_page_returns_empty_with_fallback_outcome() {
|
||||||
|
// Re-fetch came back empty (transient mimicry or last-page tail).
|
||||||
|
// No anchor means we can't classify; fall back is the safe label.
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &[]);
|
||||||
|
assert!(displaced.is_empty());
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::NotFoundFallback);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_displaced_takes_first_occurrence_when_key_repeats() {
|
||||||
|
// Defensive: if the source ever returns the same key twice on a
|
||||||
|
// page, anchoring on the first match keeps the displaced slice
|
||||||
|
// bounded and deterministic.
|
||||||
|
let refetched = vec![
|
||||||
|
make_ref("X"),
|
||||||
|
make_ref("A"),
|
||||||
|
make_ref("Y"),
|
||||||
|
make_ref("A"),
|
||||||
|
];
|
||||||
|
let (displaced, outcome) = detect_displaced("A", &refetched);
|
||||||
|
assert_eq!(displaced.len(), 1);
|
||||||
|
assert_eq!(displaced[0].source_manga_key, "X");
|
||||||
|
assert_eq!(outcome, DisplacementOutcome::Shifted(1));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "mangalord-frontend",
|
"name": "mangalord-frontend",
|
||||||
"version": "0.35.0",
|
"version": "0.35.1",
|
||||||
"private": true,
|
"private": true,
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
Reference in New Issue
Block a user