feat(crawler): single-mode walker gated by recovery flag (0.36.0)

Collapses the crawler to a single newest-first walker and replaces the
N-consecutive-unchanged streak with a per-manga rule: stop on the first
manga where metadata is Unchanged AND chapter sync reports zero new
chapters. The early stop is gated by a per-source recovery flag stored
in `crawler_state` — set to `false` when a run starts, back to `true`
only on a clean exit (end-of-walk or intentional stop). A crashed run
leaves the flag `false` automatically (no shutdown code runs), so the
next tick walks the full catalog instead of bailing at the first
caught-up manga.

This means a crashed mid-walk run self-heals on the next tick: the
flag stays `false`, the next walk visits every page (recovering
anything the crash missed past its crash point), and steady state
resumes once the recovery sweep reaches end-of-walk.

Removed:
- DiscoverMode enum, Backfill mode, the boundary re-check +
  displaced-refs machinery in TargetSourceWalker.
- Drop-pass (mark_dropped_mangas) and seed-completion plumbing
  (mark_seed_completed / seed_completed_at). The recovery flag
  subsumes the seed-completion signal; drop detection was explicitly
  opted out.
- JobPayload::Discover (no production callers).
- CRAWLER_MODE / CRAWLER_INCREMENTAL_STOP_AFTER env vars and the
  CrawlerModePref config type.

`should_mark_clean_exit(walked_to_completion, hit_stop_condition)`
encodes the clean-exit truth table in its signature — `hit_limit` is
deliberately absent so a future edit cannot accidentally count a
caller-imposed cap as a clean exit.

Net -501 lines, 261 backend tests passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-29 23:49:28 +02:00
parent 33f7e19077
commit 9f56f283d4
15 changed files with 387 additions and 888 deletions

View File

@@ -1,4 +1,4 @@
//! Persistent job queue and the four job kinds.
//! Persistent job queue and its job kinds.
//!
//! Backed by Postgres (the `crawler_jobs` table). Workers lease rows
//! with `SELECT ... FOR UPDATE SKIP LOCKED`, heartbeat via
@@ -12,16 +12,9 @@ use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
use super::source::DiscoverMode;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum JobPayload {
/// Walk the source index and enqueue `SyncManga` jobs.
Discover {
source_id: String,
mode: DiscoverMode,
},
/// Fetch one manga's detail page, upsert metadata, enqueue
/// `SyncChapterList`.
SyncManga {

View File

@@ -13,8 +13,9 @@ use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist};
use crate::crawler::source::target::TargetSource;
use crate::crawler::source::{DiscoverMode, FetchContext, Source};
use crate::crawler::source::{FetchContext, Source};
use crate::repo;
use crate::repo::crawler::UpsertStatus;
use crate::storage::Storage;
/// Coarse counters surfaced for logging at the end of a metadata pass.
@@ -26,16 +27,42 @@ pub struct MetadataStats {
pub mangas_failed: usize,
}
/// Decide whether the per-ref loop should stop based on the Incremental
/// streak counter. Pulled out as a pure function so the rule is unit-
/// testable without standing up the walker or DB.
pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool {
match mode {
DiscoverMode::Backfill => false,
DiscoverMode::Incremental { stop_after_unchanged } => {
consecutive_unchanged >= stop_after_unchanged
}
}
/// Decide whether the per-ref loop should stop on the manga just
/// processed. The walk halts only when (a) the previous run exited
/// cleanly — so the index tail is known to be caught up and we're not
/// in a recovery sweep — AND (b) this manga's metadata hash matched
/// storage (`Unchanged`) AND (c) the chapter sync confirmed zero new
/// chapters. A `None` chapter count (skip_chapters, or a chapter-sync
/// error we logged-and-swallowed) refuses the stop because we can't
/// verify the tail is unchanged from a single piece of evidence.
///
/// Pure function so the rule is unit-testable without the walker, DB,
/// or browser.
pub(crate) fn should_stop(
was_clean: bool,
status: UpsertStatus,
chapters_new: Option<usize>,
) -> bool {
was_clean
&& matches!(status, UpsertStatus::Unchanged)
&& chapters_new == Some(0)
}
/// Whether the just-finished walk should be recorded as a clean exit.
/// `true` writes the recovery flag back to `completed: true`; `false`
/// leaves it `false` so the next tick treats this run as crashed and
/// does a recovery sweep.
///
/// `hit_limit` (the caller-imposed `CRAWLER_LIMIT` cap) is *not* an
/// argument: a limit cap by definition does not reach the catalog tail,
/// so it can never count as a clean exit. Encoding that in the type
/// (rather than as an `&& !hit_limit` clause inline) prevents a future
/// edit from accidentally adding it back to the truth table.
pub(crate) fn should_mark_clean_exit(
walked_to_completion: bool,
hit_stop_condition: bool,
) -> bool {
walked_to_completion || hit_stop_condition
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
@@ -45,15 +72,25 @@ pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> b
/// `limit == 0` means no cap (full sweep up to the source's own bound).
/// `skip_chapters == true` is the "metadata-only" mode (parser doesn't
/// extract chapters, and `sync_manga_chapters` is skipped — otherwise an
/// empty chapter list would soft-drop existing rows).
/// empty chapter list would soft-drop existing rows). In this mode the
/// stop condition never fires because chapter freshness can't be
/// confirmed, so the walk always runs to end-of-source.
///
/// `mode` controls the walk:
/// - `Backfill` — oldest-first, no early exit. The only mode that runs
/// the end-of-walk drop pass + writes `seed_completed_at`.
/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out
/// after N consecutive Unchanged upserts. Drop pass is skipped (the
/// tail of the index is never visited, so its `last_seen_at` is
/// stale and using it to soft-drop would be unsafe).
/// The walk is always newest-first. Steady-state runs stop on the first
/// manga where metadata is `Unchanged` AND chapter sync reports zero
/// new chapters — the source orders by `update_date DESC`, so anything
/// with a fresh chapter or fresh metadata is bumped to the top and will
/// be processed before we hit a fully-caught-up manga.
///
/// A per-source recovery flag stored in `crawler_state`
/// (`last_run_completed:<source_id>`) gates the early stop: it's set to
/// `false` right after `ensure_source` and back to `true` only when the
/// run exits via end-of-walk OR the intentional stop. A crash, panic,
/// or SIGKILL leaves the flag at `false`, so the next tick reads it,
/// recognizes the previous run did not exit cleanly, and walks the
/// full catalog (ignoring the stop condition) to re-cover anything the
/// crashed run missed past its crash point. Once that recovery sweep
/// reaches end-of-walk, steady-state resumes.
#[allow(clippy::too_many_arguments)]
pub async fn run_metadata_pass(
browser_manager: &BrowserManager,
@@ -64,7 +101,6 @@ pub async fn run_metadata_pass(
start_url: &str,
limit: usize,
skip_chapters: bool,
mode: DiscoverMode,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
) -> anyhow::Result<MetadataStats> {
@@ -97,28 +133,36 @@ pub async fn run_metadata_pass(
.await
.context("ensure_source")?;
let run_started_at = chrono::Utc::now();
// Read BEFORE flipping to "in-flight" — a `false` here means the
// previous run didn't reach a clean exit, and this run must walk
// the full catalog (recovery sweep) instead of bailing on the
// first caught-up manga.
let was_clean = repo::crawler::last_run_completed_cleanly(db, source_id)
.await
.context("read last_run_completed_cleanly")?;
repo::crawler::mark_run_started(db, source_id)
.await
.context("mark_run_started")?;
let max_refs = (limit > 0).then_some(limit);
tracing::info!(?mode, ?max_refs, "starting metadata pass");
tracing::info!(was_clean, ?max_refs, "starting metadata pass");
let mut walker = source
.discover(&ctx, mode)
.discover(&ctx)
.await
.context("discover failed")?;
let mut stats = MetadataStats::default();
// Run-scoped dedup of `source_manga_key`s already processed this pass.
// Backfill: the walker may append displaced refs that also appear on
// the page we're about to visit naturally; skipping the dup avoids
// redundant fetch_manga + upsert. Incremental: a shift causes the
// slot-last item of the page we just read to reappear at slot 0 of
// the next page; skipping it preserves the consecutive_unchanged
// streak math instead of inflating it with a re-confirm.
// A shift in the source index causes the slot-last item of the page
// we just read to reappear at slot 0 of the next page; skipping it
// here prevents redundant fetch_manga + upsert and avoids spuriously
// tripping the stop condition with a re-confirm of an entry we
// already counted.
let mut seen: HashSet<String> = HashSet::new();
let mut consecutive_unchanged: usize = 0;
let mut walked_to_completion = false;
let mut hit_limit = false;
let mut hit_incremental_stop = false;
let mut hit_stop_condition = false;
'outer: loop {
let batch = match walker.next_batch(&ctx).await? {
@@ -137,13 +181,13 @@ pub async fn run_metadata_pass(
// Skip refs we've already *successfully* processed this pass.
// Checking `contains` here (rather than `insert`) keeps the key
// out of `seen` on failure paths below, so a transient fetch or
// upsert error gets a second chance if the ref reappears via the
// backfill boundary re-check or another batch. Done *before*
// counting toward `stats.discovered` (the skipped ref did no
// work) and *before* touching `consecutive_unchanged` (a
// `continue` here preserves the streak rather than resetting or
// inflating it). The matching `seen.insert(...)` lives just
// after the successful upsert below.
// upsert error gets a second chance if the ref reappears in
// another batch. Done *before* counting toward
// `stats.discovered` (the skipped ref did no work) and *before*
// touching the stop check (a `continue` here doesn't let a
// re-confirm trip the stop condition). The matching
// `seen.insert(...)` lives just after the successful upsert
// below.
if seen.contains(&r.source_manga_key) {
tracing::debug!(
key = %r.source_manga_key,
@@ -230,7 +274,13 @@ pub async fn run_metadata_pass(
}
}
if !skip_chapters {
// Chapter sync. `chapters_new` feeds the stop check below:
// `None` (skip_chapters mode, or a logged-and-swallowed sync
// error) refuses to stop on this manga because we can't
// confirm "no new chapters."
let chapters_new: Option<usize> = if skip_chapters {
None
} else {
match repo::crawler::sync_manga_chapters(
db,
source_id,
@@ -239,79 +289,64 @@ pub async fn run_metadata_pass(
)
.await
{
Ok(diff) => tracing::info!(
manga_id = %upsert.manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"chapters synced"
),
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"chapter sync failed"
),
Ok(diff) => {
tracing::info!(
manga_id = %upsert.manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"chapters synced"
);
Some(diff.new)
}
Err(e) => {
tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"chapter sync failed"
);
None
}
}
}
};
// Incremental stop: count consecutive Unchanged upserts and
// bail once the threshold is reached. New/Updated resets the
// streak so a fresh entry mid-batch doesn't accidentally trip
// the cutoff.
match upsert.status {
repo::crawler::UpsertStatus::Unchanged => {
consecutive_unchanged += 1;
}
repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => {
consecutive_unchanged = 0;
}
}
if should_stop(mode, consecutive_unchanged) {
hit_incremental_stop = true;
if should_stop(was_clean, upsert.status, chapters_new) {
hit_stop_condition = true;
tracing::info!(
consecutive_unchanged,
"incremental stop threshold reached; halting walk"
key = %manga.source_manga_key,
"stop condition met (Unchanged metadata + 0 new chapters); halting walk"
);
break 'outer;
}
}
}
// Drop pass: only when the walk truly covered everything the source
// surfaces. `last_seen_at` on un-visited rows is stale, so running
// the drop on a partial walk would soft-drop the tail of the index.
let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop;
let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill);
if full_walk {
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
}
} else {
tracing::info!(
?mode,
hit_limit,
hit_incremental_stop,
"partial sync — skipping drop pass"
);
}
if backfill_complete {
if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await {
tracing::warn!(error = ?e, "mark_seed_completed failed");
} else {
tracing::info!(source_id, "seed marked complete");
// Recovery-flag write. Only on a clean exit (end-of-walk OR the
// intentional stop). `hit_limit` is a caller-imposed early break
// and does NOT count — the catalog tail wasn't reached, so a future
// tick still needs to walk past where we stopped. The truth table is
// pinned by `should_mark_clean_exit` so a future edit that adds
// `hit_limit` back into the disjunction trips its unit test. Flag-
// write errors are warned and swallowed: the run already did its
// work, and a stale `false` flag just buys a recovery sweep on the
// next tick.
let exited_cleanly = should_mark_clean_exit(walked_to_completion, hit_stop_condition);
if exited_cleanly {
if let Err(e) = repo::crawler::mark_run_completed(db, source_id).await {
tracing::warn!(error = ?e, "mark_run_completed failed");
}
}
tracing::info!(
?mode,
was_clean,
discovered = stats.discovered,
upserted = stats.upserted,
covers_fetched = stats.covers_fetched,
mangas_failed = stats.mangas_failed,
walked_to_completion,
hit_limit,
hit_incremental_stop,
hit_stop_condition,
exited_cleanly,
"metadata pass complete"
);
@@ -508,31 +543,79 @@ mod tests {
use super::*;
#[test]
fn backfill_never_stops_regardless_of_streak() {
assert!(!should_stop(DiscoverMode::Backfill, 0));
assert!(!should_stop(DiscoverMode::Backfill, 100));
assert!(!should_stop(DiscoverMode::Backfill, usize::MAX));
fn stop_condition_fires_on_unchanged_metadata_and_zero_new_chapters() {
// The whole point of the rule: in steady state, a manga whose
// metadata hash matches AND whose chapter list gained no new
// entries proves we've reached the caught-up tail of a
// newest-first index.
assert!(should_stop(true, UpsertStatus::Unchanged, Some(0)));
}
#[test]
fn incremental_stops_when_streak_meets_threshold() {
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 3,
};
assert!(!should_stop(mode, 0));
assert!(!should_stop(mode, 2));
assert!(should_stop(mode, 3), "stops at exactly the threshold");
assert!(should_stop(mode, 100), "stops at anything past threshold");
fn stop_condition_refuses_when_chapters_added() {
// Unchanged metadata + N new chapters means the source bumped
// this manga because of the chapter add; the rest of the index
// is still ahead of us. Don't bail.
assert!(!should_stop(true, UpsertStatus::Unchanged, Some(1)));
assert!(!should_stop(true, UpsertStatus::Unchanged, Some(42)));
}
#[test]
fn incremental_with_zero_threshold_stops_immediately() {
// A nonsensical config (no Unchanged needed to stop) shouldn't
// panic — it just means the very first ref triggers the bail.
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 0,
};
assert!(should_stop(mode, 0));
fn stop_condition_refuses_when_metadata_changed() {
// Updated or New metadata always continues — even with zero new
// chapters — because the change-of-metadata bump itself is what
// the walk is following.
assert!(!should_stop(true, UpsertStatus::Updated, Some(0)));
assert!(!should_stop(true, UpsertStatus::New, Some(0)));
}
#[test]
fn stop_condition_refuses_when_chapter_count_unknown() {
// skip_chapters mode (CLI metadata-only sweep) or a
// logged-and-swallowed chapter sync error: we can't claim "no
// new chapters" from absence of evidence, so don't stop. The
// operator who runs metadata-only intentionally wants a full
// walk anyway.
assert!(!should_stop(true, UpsertStatus::Unchanged, None));
}
#[test]
fn stop_condition_disabled_in_recovery_mode() {
// was_clean = false means the previous run did not exit cleanly;
// the catalog past its crash point is potentially un-synced. Walk
// to end-of-source no matter what individual mangas report.
assert!(!should_stop(false, UpsertStatus::Unchanged, Some(0)));
assert!(!should_stop(false, UpsertStatus::Unchanged, Some(1)));
assert!(!should_stop(false, UpsertStatus::Updated, Some(0)));
assert!(!should_stop(false, UpsertStatus::New, None));
}
#[test]
fn clean_exit_when_walked_to_completion() {
// End-of-walk reached the catalog tail — the recovery flag may
// safely flip back to `true`.
assert!(should_mark_clean_exit(true, false));
}
#[test]
fn clean_exit_when_stop_condition_fired() {
// First Unchanged + 0-new-chapter manga is a complete steady-
// state exit: every manga newer than this point was synced, and
// by source-side `update_date DESC` ordering everything past
// this point is at least as caught-up.
assert!(should_mark_clean_exit(false, true));
}
#[test]
fn dirty_exit_when_neither_completion_nor_stop_fired() {
// The walk ended for some other reason — including the
// caller-imposed `hit_limit` cap, which is the regression case
// this test exists for. `should_mark_clean_exit` does not take
// `hit_limit` as a parameter, so a future edit that adds
// `|| hit_limit` to the inline expression in `run_metadata_pass`
// would need to also touch this helper, and would fail this
// assertion when it did.
assert!(!should_mark_clean_exit(false, false));
}
#[test]

View File

@@ -8,19 +8,6 @@ pub mod target;
use async_trait::async_trait;
use chromiumoxide::browser::Browser;
use serde::{Deserialize, Serialize};
/// How a `discover` job should walk the source's index.
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum DiscoverMode {
/// Walk every index page from last back to first. Used for the
/// initial seed of a source.
Backfill,
/// Walk index pages from page 1 forward, stopping after
/// `stop_after_unchanged` consecutive mangas whose `metadata_hash`
/// matches storage. Used for the recurring cron tick.
Incremental { stop_after_unchanged: usize },
}
/// Pointer at a manga in the source's index, before we've fetched the
/// detail page. The `source_manga_key` is whatever stable id the source
@@ -83,14 +70,14 @@ pub struct FetchContext<'a> {
}
/// Lazy iterator over discovered manga refs. The caller drives the
/// walk one batch at a time, so it can break out as soon as a
/// downstream stop condition is met (e.g. N consecutive Unchanged
/// upserts in Incremental mode) without paying for pages it won't use.
/// walk one batch at a time, so it can break out as soon as the
/// downstream stop condition is met (the first manga where metadata is
/// `Unchanged` and chapter sync reports zero new chapters) without
/// paying for pages it won't use.
///
/// Batches are typically one source-index page each. Within a batch
/// refs are already in the right per-page order for the active mode
/// (Backfill reverses each page to oldest-first; Incremental leaves
/// the source's natural newest-first ordering).
/// refs are in the source's natural newest-first ordering — the same
/// `update_date DESC` sort that makes the stop condition meaningful.
#[async_trait]
pub trait DiscoverWalk: Send {
/// Return the next batch of refs, or `Ok(None)` when the source has
@@ -107,16 +94,14 @@ pub trait Source: Send + Sync {
/// Stable identifier — also the row key in the `sources` table.
fn id(&self) -> &'static str;
/// Begin discovery in `mode`. Returns a walker the caller drives
/// page-by-page via `next_batch`. The initial page-1 probe (used
/// to determine `last_page` and warm the cache for sites that
/// can't be paged without knowing the bound) happens inside this
/// call, so a fresh walker is ready to yield its first batch
/// without further setup.
/// Begin discovery. Returns a walker the caller drives page-by-page
/// via `next_batch`. The initial page-1 probe (used to determine
/// `last_page` and warm the cache for sites that can't be paged
/// without knowing the bound) happens inside this call, so a fresh
/// walker is ready to yield its first batch without further setup.
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>>;
async fn fetch_manga(

View File

@@ -15,22 +15,23 @@ use async_trait::async_trait;
use sha2::{Digest, Sha256};
use super::{
DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef,
SourceManga, SourceMangaRef,
DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga,
SourceMangaRef,
};
use crate::crawler::detect::{
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
};
/// `sources.id` value for this Source impl. Exposed as a const so the
/// daemon can look up per-source state (e.g. `seed_completed_at`)
/// before constructing the Source itself.
/// daemon can look up per-source state (e.g. the recovery flag) before
/// constructing the Source itself.
pub const SOURCE_ID: &str = "target";
/// In-loop retry budget for transient pages encountered during a single
/// `discover` walk. Bounded small because the job system itself retries
/// the whole `Discover` job on failure — these inline retries only need
/// to absorb a brief site hiccup mid-walk.
/// `discover` walk. Bounded small because the next cron tick will pick up
/// where this run left off via the recovery flag — these inline retries
/// only need to absorb a brief site hiccup mid-walk, not a sustained
/// outage.
const PAGE_TRANSIENT_RETRY_ATTEMPTS: u32 = 3;
const PAGE_TRANSIENT_RETRY_DELAY: Duration = Duration::from_secs(2);
@@ -72,7 +73,6 @@ impl Source for TargetSource {
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
// Always visit page 1 first because that's the only way to
// discover `last_page`. Retry it on transient — a broken first
@@ -89,10 +89,8 @@ impl Source for TargetSource {
parse_last_page(&doc)
};
let backfill = matches!(mode, DiscoverMode::Backfill);
let order = build_page_order(last_page, backfill);
let order = build_page_order(last_page);
tracing::info!(
?mode,
last_page = ?last_page,
page_count = order.len(),
"walking pagination"
@@ -100,10 +98,8 @@ impl Source for TargetSource {
Ok(Box::new(TargetSourceWalker {
base_url: self.base_url.clone(),
backfill,
pages_remaining: order,
first_page_html: Some(first_html),
prev: None,
}))
}
@@ -139,16 +135,13 @@ impl Source for TargetSource {
}
/// Build the queue of page numbers `TargetSource::discover` will walk.
/// Backfill is oldest-first: pages `last..=1` (within each page the
/// walker reverses entries, since the source orders by update_date
/// DESC). Incremental is newest-first: pages `1..=last` in natural
/// order. If `last_page` is unknown (source surfaces no pagination)
/// only page 1 is visited.
fn build_page_order(last_page: Option<i32>, backfill: bool) -> VecDeque<i32> {
match (last_page, backfill) {
(None, _) => VecDeque::from([1]),
(Some(last), true) => (1..=last).rev().collect(),
(Some(last), false) => (1..=last).collect(),
/// The site orders by `update_date DESC`, so newest-first is just the
/// natural page order: `1..=last`. If `last_page` is unknown (source
/// surfaces no pagination) only page 1 is visited.
fn build_page_order(last_page: Option<i32>) -> VecDeque<i32> {
match last_page {
None => VecDeque::from([1]),
Some(last) => (1..=last).collect(),
}
}
@@ -158,16 +151,8 @@ fn build_page_order(last_page: Option<i32>, backfill: bool) -> VecDeque<i32> {
/// batch covering page 1 doesn't re-fetch.
struct TargetSourceWalker {
base_url: String,
backfill: bool,
pages_remaining: VecDeque<i32>,
first_page_html: Option<String>,
/// Page number and slot-0 `source_manga_key` of the previously-walked
/// page. Updated after every batch (cheap, unconditional) but only
/// *read* by the boundary re-check, which itself runs only in backfill
/// mode. A single `Option` so the half-set state (`page_num` known but
/// `key` not, or vice versa) is unrepresentable; `None` here suppresses
/// the next iteration's re-check (no anchor to compare against).
prev: Option<(i32, String)>,
}
#[async_trait]
@@ -179,7 +164,7 @@ impl DiscoverWalk for TargetSourceWalker {
let Some(page_num) = self.pages_remaining.pop_front() else {
return Ok(None);
};
let mut page_refs = if page_num == 1 {
let page_refs = if page_num == 1 {
// Reuse the cached page-1 HTML from the initial probe. Take
// it (rather than clone) so a malformed page-order queue
// that re-visits page 1 still falls back to a real fetch.
@@ -214,77 +199,7 @@ impl DiscoverWalk for TargetSourceWalker {
)
.await?
};
// Capture slot-0 of the page as the site presents it (newest first)
// *before* the backfill `.reverse()` below — after reversal slot 0
// is the oldest entry, which would defeat the next iteration's
// boundary re-check.
let current_first = page_refs.first().map(|r| r.source_manga_key.clone());
// Boundary re-check (backfill only). The site orders by update_date
// DESC, so shifts during the walk push items from low-numbered pages
// to high-numbered ones — into pages backfill has already finished.
// After fetching this page, re-fetch the *previous* iteration's page
// and look for items that slid past us mid-walk. Must be the last
// navigation of the iteration to close the within-iteration race.
let mut displaced: Vec<SourceMangaRef> = Vec::new();
if self.backfill {
if let Some((prev_page_num, prev_first_key)) = self.prev.clone() {
match recheck_prev_page(ctx, &self.base_url, prev_page_num).await {
Ok(refetched) => {
let (d, outcome) = detect_displaced(&prev_first_key, &refetched);
match outcome {
DisplacementOutcome::NoShift => {}
DisplacementOutcome::Shifted(k) => {
tracing::info!(
page_num,
prev_page_num,
k,
"boundary re-check: shift detected, recovering displaced refs"
);
}
DisplacementOutcome::NotFoundFallback => {
tracing::warn!(
page_num,
prev_page_num,
prev_first_key = %prev_first_key,
refetched_len = refetched.len(),
"boundary re-check: prev_first not found, falling back to full re-process"
);
}
}
displaced = d;
}
Err(e) => {
tracing::warn!(
page_num,
prev_page_num,
error = ?e,
"boundary re-check: re-fetch failed, skipping check for this boundary"
);
}
}
}
}
// Remember the boundary for the next iteration. An empty `page_refs`
// yields `None`, which intentionally suppresses the next re-check
// (no anchor to compare against).
self.prev = current_first.map(|key| (page_num, key));
if self.backfill {
page_refs.reverse();
}
// Append displaced refs to the end of the batch. Order doesn't
// affect backfill semantics (no `consecutive_unchanged` streak in
// this mode), and the pipeline-level dedup set handles any overlap.
let displaced_count = displaced.len();
page_refs.extend(displaced);
tracing::info!(
page_num,
count = page_refs.len(),
displaced = displaced_count,
"page walked"
);
tracing::info!(page_num, count = page_refs.len(), "page walked");
Ok(Some(page_refs))
}
}
@@ -689,67 +604,6 @@ fn compute_metadata_hash(m: &SourceManga) -> String {
format!("{:x}", h.finalize())
}
/// Outcome of a boundary re-check, surfaced for telemetry + tests. The
/// caller's recovery action is determined by the returned `Vec`; this enum
/// only labels which branch fired so logging and assertions can distinguish
/// "site is stable" from "we papered over a shift" from "we fell back".
#[derive(Debug, PartialEq, Eq)]
enum DisplacementOutcome {
/// `prev_first` is still at slot 0 of the re-fetched page — no shift
/// happened between the prior iteration and this one's re-check.
NoShift,
/// `prev_first` slid down to slot `K`; the first `K` entries are items
/// that used to live on the page we just walked.
Shifted(usize),
/// `prev_first` is gone from the re-fetched page — multiple pages-worth
/// of shifts happened, or it was bumped to page 1. Treat all refetched
/// entries as potentially displaced; the pipeline-level dedup absorbs
/// the noise.
NotFoundFallback,
}
/// Compare a previously-walked page's slot-0 key against a fresh fetch of
/// that page. Returns the entries that appear *ahead* of `prev_first` in
/// the re-fetched page — items that slid in from the page the caller is
/// currently processing.
fn detect_displaced(
prev_first: &str,
refetched: &[SourceMangaRef],
) -> (Vec<SourceMangaRef>, DisplacementOutcome) {
let Some(k) = refetched
.iter()
.position(|r| r.source_manga_key == prev_first)
else {
return (refetched.to_vec(), DisplacementOutcome::NotFoundFallback);
};
if k == 0 {
(Vec::new(), DisplacementOutcome::NoShift)
} else {
(refetched[..k].to_vec(), DisplacementOutcome::Shifted(k))
}
}
/// Re-fetch a previously-walked listing page to feed [`detect_displaced`].
/// Uses the same retry chain as the primary page fetch in `next_batch` so
/// a transient hiccup doesn't tank an entire backfill walk.
async fn recheck_prev_page(
ctx: &FetchContext<'_>,
base_url: &str,
page_num: i32,
) -> Result<Vec<SourceMangaRef>, PageError> {
retry_on_transient(
|| async {
let url = page_url(base_url, page_num);
let html = navigate(ctx, &url).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1125,132 +979,25 @@ mod tests {
}
#[test]
fn build_page_order_backfill_is_last_to_one() {
// Backfill walks pages oldest-first: queue is [last, last-1, ..., 1]
// so popping from the front yields the last page first.
let order = build_page_order(Some(3), true);
assert_eq!(Vec::from(order), vec![3, 2, 1]);
}
#[test]
fn build_page_order_incremental_is_one_to_last() {
// Incremental walks newest-first in natural source order.
let order = build_page_order(Some(3), false);
fn build_page_order_is_natural_one_to_last() {
// Newest-first is just the source's natural pagination order:
// (update_date DESC) lives at page 1, oldest at the last page.
let order = build_page_order(Some(3));
assert_eq!(Vec::from(order), vec![1, 2, 3]);
}
#[test]
fn build_page_order_falls_back_to_page_one_only_without_pagination() {
let backfill = build_page_order(None, true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(None, false);
assert_eq!(Vec::from(incremental), vec![1]);
// Source surfaced no pagination control — visit page 1 alone
// and let the walk end after one batch.
let order = build_page_order(None);
assert_eq!(Vec::from(order), vec![1]);
}
#[test]
fn build_page_order_single_page_index_yields_one_entry() {
// Sources with exactly one page should not yield duplicates
// regardless of mode.
let backfill = build_page_order(Some(1), true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(Some(1), false);
assert_eq!(Vec::from(incremental), vec![1]);
}
fn make_ref(key: &str) -> SourceMangaRef {
SourceMangaRef {
source_manga_key: key.to_string(),
title: key.to_string(),
url: format!("https://target.example/manga/{key}"),
}
}
#[test]
fn detect_displaced_no_shift_when_prev_first_still_at_slot_zero() {
let refetched = vec![make_ref("A"), make_ref("B"), make_ref("C")];
let (displaced, outcome) = detect_displaced("A", &refetched);
assert!(displaced.is_empty());
assert_eq!(outcome, DisplacementOutcome::NoShift);
}
#[test]
fn detect_displaced_one_shift_returns_single_intruder() {
let refetched = vec![make_ref("X"), make_ref("A"), make_ref("B")];
let (displaced, outcome) = detect_displaced("A", &refetched);
assert_eq!(displaced.len(), 1);
assert_eq!(displaced[0].source_manga_key, "X");
assert_eq!(outcome, DisplacementOutcome::Shifted(1));
}
#[test]
fn detect_displaced_multi_shift_returns_all_intruders() {
let refetched = vec![
make_ref("X1"),
make_ref("X2"),
make_ref("X3"),
make_ref("A"),
make_ref("B"),
make_ref("C"),
];
let (displaced, outcome) = detect_displaced("A", &refetched);
let keys: Vec<&str> = displaced
.iter()
.map(|r| r.source_manga_key.as_str())
.collect();
assert_eq!(keys, vec!["X1", "X2", "X3"]);
assert_eq!(outcome, DisplacementOutcome::Shifted(3));
}
#[test]
fn detect_displaced_full_page_shift_returns_all_but_last() {
// `prev_first` at the last slot — every preceding entry is an
// intruder shifted in from the page the caller is processing.
let mut refetched: Vec<_> = (0..9).map(|i| make_ref(&format!("X{i}"))).collect();
refetched.push(make_ref("A"));
let (displaced, outcome) = detect_displaced("A", &refetched);
assert_eq!(displaced.len(), 9);
assert_eq!(outcome, DisplacementOutcome::Shifted(9));
}
#[test]
fn detect_displaced_not_found_returns_full_page_for_conservative_recovery() {
// > page-worth of shifts (or `prev_first` itself was bumped to
// page 1): can't pinpoint K, fall back to "process everything";
// pipeline dedup absorbs the noise.
let refetched = vec![make_ref("Y"), make_ref("Z")];
let (displaced, outcome) = detect_displaced("A", &refetched);
let keys: Vec<&str> = displaced
.iter()
.map(|r| r.source_manga_key.as_str())
.collect();
assert_eq!(keys, vec!["Y", "Z"]);
assert_eq!(outcome, DisplacementOutcome::NotFoundFallback);
}
#[test]
fn detect_displaced_empty_page_returns_empty_with_fallback_outcome() {
// Re-fetch came back empty (transient mimicry or last-page tail).
// No anchor means we can't classify; fall back is the safe label.
let (displaced, outcome) = detect_displaced("A", &[]);
assert!(displaced.is_empty());
assert_eq!(outcome, DisplacementOutcome::NotFoundFallback);
}
#[test]
fn detect_displaced_takes_first_occurrence_when_key_repeats() {
// Defensive: if the source ever returns the same key twice on a
// page, anchoring on the first match keeps the displaced slice
// bounded and deterministic.
let refetched = vec![
make_ref("X"),
make_ref("A"),
make_ref("Y"),
make_ref("A"),
];
let (displaced, outcome) = detect_displaced("A", &refetched);
assert_eq!(displaced.len(), 1);
assert_eq!(displaced[0].source_manga_key, "X");
assert_eq!(outcome, DisplacementOutcome::Shifted(1));
let order = build_page_order(Some(1));
assert_eq!(Vec::from(order), vec![1]);
}
#[test]