diff --git a/backend/Cargo.lock b/backend/Cargo.lock index a222205..2d4352c 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.35.0" +version = "0.35.1" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 1682be4..10022ab 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.35.0" +version = "0.35.1" edition = "2021" default-run = "mangalord" diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index e2b08a9..f8afd4b 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -2,6 +2,8 @@ //! that fan out chapter-content work. Shared between the daemon (cron tick) //! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep. +use std::collections::HashSet; + use anyhow::Context; use sqlx::PgPool; use uuid::Uuid; @@ -105,6 +107,14 @@ pub async fn run_metadata_pass( .context("discover failed")?; let mut stats = MetadataStats::default(); + // Run-scoped dedup of `source_manga_key`s already processed this pass. + // Backfill: the walker may append displaced refs that also appear on + // the page we're about to visit naturally; skipping the dup avoids + // redundant fetch_manga + upsert. Incremental: a shift causes the + // slot-last item of the page we just read to reappear at slot 0 of + // the next page; skipping it preserves the consecutive_unchanged + // streak math instead of inflating it with a re-confirm. + let mut seen: HashSet = HashSet::new(); let mut consecutive_unchanged: usize = 0; let mut walked_to_completion = false; let mut hit_limit = false; @@ -124,6 +134,23 @@ pub async fn run_metadata_pass( tracing::info!(cap = ?max_refs, "max_results reached; halting walk"); break 'outer; } + // Skip refs we've already *successfully* processed this pass. + // Checking `contains` here (rather than `insert`) keeps the key + // out of `seen` on failure paths below, so a transient fetch or + // upsert error gets a second chance if the ref reappears via the + // backfill boundary re-check or another batch. Done *before* + // counting toward `stats.discovered` (the skipped ref did no + // work) and *before* touching `consecutive_unchanged` (a + // `continue` here preserves the streak rather than resetting or + // inflating it). The matching `seen.insert(...)` lives just + // after the successful upsert below. + if seen.contains(&r.source_manga_key) { + tracing::debug!( + key = %r.source_manga_key, + "skip already-seen key in this run" + ); + continue; + } stats.discovered += 1; tracing::info!( idx = stats.discovered, @@ -161,6 +188,10 @@ pub async fn run_metadata_pass( } }; stats.upserted += 1; + // Record success in the dedup set. Cover and chapter-sync + // failures below are non-fatal and don't roll this back — + // metadata is the durable source of truth for the dedup. + seen.insert(r.source_manga_key.clone()); tracing::info!( key = %manga.source_manga_key, manga_id = %upsert.manga_id, @@ -473,4 +504,31 @@ mod tests { }; assert!(should_stop(mode, 0)); } + + #[test] + fn run_scoped_seen_set_skips_duplicate_source_manga_keys() { + // Pins the per-ref loop contract: `contains` gates whether work + // runs, and `insert` only fires on the success path (after upsert). + // A failed ref that reappears later in the same pass must get a + // second chance — that's why the loop uses contains-then-insert + // instead of insert-and-skip-on-collision. + let mut seen: HashSet = HashSet::new(); + + // First sighting of a key: not yet seen → loop proceeds. + assert!(!seen.contains("manga-a"), "first sighting is unseen"); + // Simulate a failed fetch_manga: do NOT insert. Next sighting must + // still be considered unseen so the loop retries it. + assert!(!seen.contains("manga-a"), "failed key is still retryable"); + + // Now simulate a successful upsert — insert is called. + seen.insert("manga-a".to_string()); + // Subsequent sightings of the same key are skipped. + assert!(seen.contains("manga-a"), "successful key is now seen"); + + // Distinct keys never collide. + assert!(!seen.contains("manga-b"), "different key independent"); + seen.insert("manga-b".to_string()); + assert!(seen.contains("manga-b")); + assert!(seen.contains("manga-a"), "first key still recorded"); + } } diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs index dba59aa..de1ed42 100644 --- a/backend/src/crawler/source/target.rs +++ b/backend/src/crawler/source/target.rs @@ -103,6 +103,7 @@ impl Source for TargetSource { backfill, pages_remaining: order, first_page_html: Some(first_html), + prev: None, })) } @@ -160,6 +161,13 @@ struct TargetSourceWalker { backfill: bool, pages_remaining: VecDeque, first_page_html: Option, + /// Page number and slot-0 `source_manga_key` of the previously-walked + /// page. Updated after every batch (cheap, unconditional) but only + /// *read* by the boundary re-check, which itself runs only in backfill + /// mode. A single `Option` so the half-set state (`page_num` known but + /// `key` not, or vice versa) is unrepresentable; `None` here suppresses + /// the next iteration's re-check (no anchor to compare against). + prev: Option<(i32, String)>, } #[async_trait] @@ -206,10 +214,77 @@ impl DiscoverWalk for TargetSourceWalker { ) .await? }; + // Capture slot-0 of the page as the site presents it (newest first) + // *before* the backfill `.reverse()` below — after reversal slot 0 + // is the oldest entry, which would defeat the next iteration's + // boundary re-check. + let current_first = page_refs.first().map(|r| r.source_manga_key.clone()); + + // Boundary re-check (backfill only). The site orders by update_date + // DESC, so shifts during the walk push items from low-numbered pages + // to high-numbered ones — into pages backfill has already finished. + // After fetching this page, re-fetch the *previous* iteration's page + // and look for items that slid past us mid-walk. Must be the last + // navigation of the iteration to close the within-iteration race. + let mut displaced: Vec = Vec::new(); + if self.backfill { + if let Some((prev_page_num, prev_first_key)) = self.prev.clone() { + match recheck_prev_page(ctx, &self.base_url, prev_page_num).await { + Ok(refetched) => { + let (d, outcome) = detect_displaced(&prev_first_key, &refetched); + match outcome { + DisplacementOutcome::NoShift => {} + DisplacementOutcome::Shifted(k) => { + tracing::info!( + page_num, + prev_page_num, + k, + "boundary re-check: shift detected, recovering displaced refs" + ); + } + DisplacementOutcome::NotFoundFallback => { + tracing::warn!( + page_num, + prev_page_num, + prev_first_key = %prev_first_key, + refetched_len = refetched.len(), + "boundary re-check: prev_first not found, falling back to full re-process" + ); + } + } + displaced = d; + } + Err(e) => { + tracing::warn!( + page_num, + prev_page_num, + error = ?e, + "boundary re-check: re-fetch failed, skipping check for this boundary" + ); + } + } + } + } + + // Remember the boundary for the next iteration. An empty `page_refs` + // yields `None`, which intentionally suppresses the next re-check + // (no anchor to compare against). + self.prev = current_first.map(|key| (page_num, key)); + if self.backfill { page_refs.reverse(); } - tracing::info!(page_num, count = page_refs.len(), "page walked"); + // Append displaced refs to the end of the batch. Order doesn't + // affect backfill semantics (no `consecutive_unchanged` streak in + // this mode), and the pipeline-level dedup set handles any overlap. + let displaced_count = displaced.len(); + page_refs.extend(displaced); + tracing::info!( + page_num, + count = page_refs.len(), + displaced = displaced_count, + "page walked" + ); Ok(Some(page_refs)) } } @@ -592,6 +667,67 @@ fn compute_metadata_hash(m: &SourceManga) -> String { format!("{:x}", h.finalize()) } +/// Outcome of a boundary re-check, surfaced for telemetry + tests. The +/// caller's recovery action is determined by the returned `Vec`; this enum +/// only labels which branch fired so logging and assertions can distinguish +/// "site is stable" from "we papered over a shift" from "we fell back". +#[derive(Debug, PartialEq, Eq)] +enum DisplacementOutcome { + /// `prev_first` is still at slot 0 of the re-fetched page — no shift + /// happened between the prior iteration and this one's re-check. + NoShift, + /// `prev_first` slid down to slot `K`; the first `K` entries are items + /// that used to live on the page we just walked. + Shifted(usize), + /// `prev_first` is gone from the re-fetched page — multiple pages-worth + /// of shifts happened, or it was bumped to page 1. Treat all refetched + /// entries as potentially displaced; the pipeline-level dedup absorbs + /// the noise. + NotFoundFallback, +} + +/// Compare a previously-walked page's slot-0 key against a fresh fetch of +/// that page. Returns the entries that appear *ahead* of `prev_first` in +/// the re-fetched page — items that slid in from the page the caller is +/// currently processing. +fn detect_displaced( + prev_first: &str, + refetched: &[SourceMangaRef], +) -> (Vec, DisplacementOutcome) { + let Some(k) = refetched + .iter() + .position(|r| r.source_manga_key == prev_first) + else { + return (refetched.to_vec(), DisplacementOutcome::NotFoundFallback); + }; + if k == 0 { + (Vec::new(), DisplacementOutcome::NoShift) + } else { + (refetched[..k].to_vec(), DisplacementOutcome::Shifted(k)) + } +} + +/// Re-fetch a previously-walked listing page to feed [`detect_displaced`]. +/// Uses the same retry chain as the primary page fetch in `next_batch` so +/// a transient hiccup doesn't tank an entire backfill walk. +async fn recheck_prev_page( + ctx: &FetchContext<'_>, + base_url: &str, + page_num: i32, +) -> Result, PageError> { + retry_on_transient( + || async { + let url = page_url(base_url, page_num); + let html = navigate(ctx, &url).await?; + let doc = scraper::Html::parse_document(&html); + parse_manga_list_from(&doc) + }, + PAGE_TRANSIENT_RETRY_ATTEMPTS, + PAGE_TRANSIENT_RETRY_DELAY, + ) + .await +} + #[cfg(test)] mod tests { use super::*; @@ -990,4 +1126,100 @@ mod tests { let incremental = build_page_order(Some(1), false); assert_eq!(Vec::from(incremental), vec![1]); } + + fn make_ref(key: &str) -> SourceMangaRef { + SourceMangaRef { + source_manga_key: key.to_string(), + title: key.to_string(), + url: format!("https://target.example/manga/{key}"), + } + } + + #[test] + fn detect_displaced_no_shift_when_prev_first_still_at_slot_zero() { + let refetched = vec![make_ref("A"), make_ref("B"), make_ref("C")]; + let (displaced, outcome) = detect_displaced("A", &refetched); + assert!(displaced.is_empty()); + assert_eq!(outcome, DisplacementOutcome::NoShift); + } + + #[test] + fn detect_displaced_one_shift_returns_single_intruder() { + let refetched = vec![make_ref("X"), make_ref("A"), make_ref("B")]; + let (displaced, outcome) = detect_displaced("A", &refetched); + assert_eq!(displaced.len(), 1); + assert_eq!(displaced[0].source_manga_key, "X"); + assert_eq!(outcome, DisplacementOutcome::Shifted(1)); + } + + #[test] + fn detect_displaced_multi_shift_returns_all_intruders() { + let refetched = vec![ + make_ref("X1"), + make_ref("X2"), + make_ref("X3"), + make_ref("A"), + make_ref("B"), + make_ref("C"), + ]; + let (displaced, outcome) = detect_displaced("A", &refetched); + let keys: Vec<&str> = displaced + .iter() + .map(|r| r.source_manga_key.as_str()) + .collect(); + assert_eq!(keys, vec!["X1", "X2", "X3"]); + assert_eq!(outcome, DisplacementOutcome::Shifted(3)); + } + + #[test] + fn detect_displaced_full_page_shift_returns_all_but_last() { + // `prev_first` at the last slot — every preceding entry is an + // intruder shifted in from the page the caller is processing. + let mut refetched: Vec<_> = (0..9).map(|i| make_ref(&format!("X{i}"))).collect(); + refetched.push(make_ref("A")); + let (displaced, outcome) = detect_displaced("A", &refetched); + assert_eq!(displaced.len(), 9); + assert_eq!(outcome, DisplacementOutcome::Shifted(9)); + } + + #[test] + fn detect_displaced_not_found_returns_full_page_for_conservative_recovery() { + // > page-worth of shifts (or `prev_first` itself was bumped to + // page 1): can't pinpoint K, fall back to "process everything"; + // pipeline dedup absorbs the noise. + let refetched = vec![make_ref("Y"), make_ref("Z")]; + let (displaced, outcome) = detect_displaced("A", &refetched); + let keys: Vec<&str> = displaced + .iter() + .map(|r| r.source_manga_key.as_str()) + .collect(); + assert_eq!(keys, vec!["Y", "Z"]); + assert_eq!(outcome, DisplacementOutcome::NotFoundFallback); + } + + #[test] + fn detect_displaced_empty_page_returns_empty_with_fallback_outcome() { + // Re-fetch came back empty (transient mimicry or last-page tail). + // No anchor means we can't classify; fall back is the safe label. + let (displaced, outcome) = detect_displaced("A", &[]); + assert!(displaced.is_empty()); + assert_eq!(outcome, DisplacementOutcome::NotFoundFallback); + } + + #[test] + fn detect_displaced_takes_first_occurrence_when_key_repeats() { + // Defensive: if the source ever returns the same key twice on a + // page, anchoring on the first match keeps the displaced slice + // bounded and deterministic. + let refetched = vec![ + make_ref("X"), + make_ref("A"), + make_ref("Y"), + make_ref("A"), + ]; + let (displaced, outcome) = detect_displaced("A", &refetched); + assert_eq!(displaced.len(), 1); + assert_eq!(displaced[0].source_manga_key, "X"); + assert_eq!(outcome, DisplacementOutcome::Shifted(1)); + } } diff --git a/frontend/package.json b/frontend/package.json index 6a36df7..0d0ad92 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.35.0", + "version": "0.35.1", "private": true, "type": "module", "scripts": {