feat: incremental crawl mode with seed-completion gate (0.33.0)

Daemon now auto-detects mode per source: Backfill until the first
full walk records `seed_completed:<source>` in `crawler_state`, then
Incremental (newest-first, stops after N consecutive Unchanged
upserts). `CRAWLER_MODE` overrides to a fixed mode; CLI rejects
`auto` since it has no pre-run DB state.

`Source::discover` returns a lazy `DiscoverWalk` so Incremental can
break out mid-walk without prefetching pages. The drop pass and seed
marker are now gated on a true full walk — fixes a latent soft-drop
of the index tail under partial sweeps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-28 06:41:26 +02:00
parent 51f42b03e9
commit 45ce0d8f12
11 changed files with 761 additions and 162 deletions

View File

@@ -23,14 +23,34 @@ pub struct MetadataStats {
pub mangas_failed: usize,
}
/// Decide whether the per-ref loop should stop based on the Incremental
/// streak counter. Pulled out as a pure function so the rule is unit-
/// testable without standing up the walker or DB.
pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool {
match mode {
DiscoverMode::Backfill => false,
DiscoverMode::Incremental { stop_after_unchanged } => {
consecutive_unchanged >= stop_after_unchanged
}
}
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
/// for the target source. Pure metadata; chapter content is enqueued as
/// separate `SyncChapterContent` jobs by the caller after this returns.
///
/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is
/// the "metadata-only" mode (parser doesn't extract chapters, and
/// `sync_manga_chapters` is skipped — otherwise an empty chapter list
/// would soft-drop existing rows).
/// `limit == 0` means no cap (full sweep up to the source's own bound).
/// `skip_chapters == true` is the "metadata-only" mode (parser doesn't
/// extract chapters, and `sync_manga_chapters` is skipped — otherwise an
/// empty chapter list would soft-drop existing rows).
///
/// `mode` controls the walk:
/// - `Backfill` — oldest-first, no early exit. The only mode that runs
/// the end-of-walk drop pass + writes `seed_completed_at`.
/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out
/// after N consecutive Unchanged upserts. Drop pass is skipped (the
/// tail of the index is never visited, so its `last_seen_at` is
/// stale and using it to soft-drop would be unsafe).
#[allow(clippy::too_many_arguments)]
pub async fn run_metadata_pass(
browser_manager: &BrowserManager,
@@ -41,6 +61,7 @@ pub async fn run_metadata_pass(
start_url: &str,
limit: usize,
skip_chapters: bool,
mode: DiscoverMode,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
.acquire()
@@ -74,123 +95,189 @@ pub async fn run_metadata_pass(
let run_started_at = chrono::Utc::now();
let max_refs = (limit > 0).then_some(limit);
tracing::info!(?max_refs, "discovering manga list");
let refs = source
.discover(&ctx, DiscoverMode::Backfill, max_refs)
tracing::info!(?mode, ?max_refs, "starting metadata pass");
let mut walker = source
.discover(&ctx, mode)
.await
.context("discover failed")?;
tracing::info!(count = refs.len(), "discovered manga list");
let mut stats = MetadataStats {
discovered: refs.len(),
..MetadataStats::default()
};
let mut stats = MetadataStats::default();
let mut consecutive_unchanged: usize = 0;
let mut walked_to_completion = false;
let mut hit_limit = false;
let mut hit_incremental_stop = false;
for (i, r) in refs.iter().enumerate() {
tracing::info!(
idx = i + 1,
total = stats.discovered,
key = %r.source_manga_key,
"fetching metadata"
);
let manga = match source.fetch_manga(&ctx, r).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
key = %r.source_manga_key,
url = %r.url,
error = ?e,
"fetch_manga failed"
);
stats.mangas_failed += 1;
continue;
'outer: loop {
let batch = match walker.next_batch(&ctx).await? {
Some(b) => b,
None => {
walked_to_completion = true;
break;
}
};
let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga)
.await
{
Ok(u) => u,
Err(e) => {
tracing::error!(
key = %r.source_manga_key,
error = ?e,
"upsert_manga_from_source failed"
);
stats.mangas_failed += 1;
continue;
for r in batch {
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
hit_limit = true;
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
break 'outer;
}
};
stats.upserted += 1;
tracing::info!(
key = %manga.source_manga_key,
manga_id = %upsert.manga_id,
status = ?upsert.status,
title = %manga.title,
"manga upserted"
);
// Cover image: download when missing in storage or when metadata
// signaled an update (cover URL is part of metadata_hash, so
// Updated implies the URL may have moved). Failures are non-fatal.
let needs_cover = upsert.cover_image_path.is_none()
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
if needs_cover {
if let Some(cover_url) = manga.cover_url.as_deref() {
match download_and_store_cover(
db,
storage,
http,
rate,
&r.url,
upsert.manga_id,
cover_url,
)
.await
{
Ok(()) => stats.covers_fetched += 1,
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
stats.discovered += 1;
tracing::info!(
idx = stats.discovered,
key = %r.source_manga_key,
"fetching metadata"
);
let manga = match source.fetch_manga(&ctx, &r).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
key = %r.source_manga_key,
url = %r.url,
error = ?e,
"cover download failed"
),
"fetch_manga failed"
);
stats.mangas_failed += 1;
continue;
}
}
}
};
if !skip_chapters {
match repo::crawler::sync_manga_chapters(
db,
source_id,
upsert.manga_id,
&manga.chapters,
let upsert = match repo::crawler::upsert_manga_from_source(
db, source_id, &r.url, &manga,
)
.await
{
Ok(diff) => tracing::info!(
manga_id = %upsert.manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"chapters synced"
),
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"chapter sync failed"
),
Ok(u) => u,
Err(e) => {
tracing::error!(
key = %r.source_manga_key,
error = ?e,
"upsert_manga_from_source failed"
);
stats.mangas_failed += 1;
continue;
}
};
stats.upserted += 1;
tracing::info!(
key = %manga.source_manga_key,
manga_id = %upsert.manga_id,
status = ?upsert.status,
title = %manga.title,
"manga upserted"
);
// Cover image: download when missing in storage or when metadata
// signaled an update (cover URL is part of metadata_hash, so
// Updated implies the URL may have moved). Failures are non-fatal.
let needs_cover = upsert.cover_image_path.is_none()
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
if needs_cover {
if let Some(cover_url) = manga.cover_url.as_deref() {
match download_and_store_cover(
db,
storage,
http,
rate,
&r.url,
upsert.manga_id,
cover_url,
)
.await
{
Ok(()) => stats.covers_fetched += 1,
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"cover download failed"
),
}
}
}
if !skip_chapters {
match repo::crawler::sync_manga_chapters(
db,
source_id,
upsert.manga_id,
&manga.chapters,
)
.await
{
Ok(diff) => tracing::info!(
manga_id = %upsert.manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"chapters synced"
),
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"chapter sync failed"
),
}
}
// Incremental stop: count consecutive Unchanged upserts and
// bail once the threshold is reached. New/Updated resets the
// streak so a fresh entry mid-batch doesn't accidentally trip
// the cutoff.
match upsert.status {
repo::crawler::UpsertStatus::Unchanged => {
consecutive_unchanged += 1;
}
repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => {
consecutive_unchanged = 0;
}
}
if should_stop(mode, consecutive_unchanged) {
hit_incremental_stop = true;
tracing::info!(
consecutive_unchanged,
"incremental stop threshold reached; halting walk"
);
break 'outer;
}
}
}
if limit == 0 {
// Drop pass: only when the walk truly covered everything the source
// surfaces. `last_seen_at` on un-visited rows is stale, so running
// the drop on a partial walk would soft-drop the tail of the index.
let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop;
let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill);
if full_walk {
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
}
} else {
tracing::info!(limit, "partial sync — skipping drop pass");
tracing::info!(
?mode,
hit_limit,
hit_incremental_stop,
"partial sync — skipping drop pass"
);
}
if backfill_complete {
if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await {
tracing::warn!(error = ?e, "mark_seed_completed failed");
} else {
tracing::info!(source_id, "seed marked complete");
}
}
tracing::info!(
?mode,
discovered = stats.discovered,
upserted = stats.upserted,
covers_fetched = stats.covers_fetched,
mangas_failed = stats.mangas_failed,
walked_to_completion,
hit_limit,
hit_incremental_stop,
"metadata pass complete"
);
drop(lease);
Ok(stats)
@@ -345,3 +432,36 @@ fn origin_of(url: &str) -> Option<String> {
let host = rest.split('/').next()?;
Some(format!("{scheme}://{host}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backfill_never_stops_regardless_of_streak() {
assert!(!should_stop(DiscoverMode::Backfill, 0));
assert!(!should_stop(DiscoverMode::Backfill, 100));
assert!(!should_stop(DiscoverMode::Backfill, usize::MAX));
}
#[test]
fn incremental_stops_when_streak_meets_threshold() {
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 3,
};
assert!(!should_stop(mode, 0));
assert!(!should_stop(mode, 2));
assert!(should_stop(mode, 3), "stops at exactly the threshold");
assert!(should_stop(mode, 100), "stops at anything past threshold");
}
#[test]
fn incremental_with_zero_threshold_stops_immediately() {
// A nonsensical config (no Unchanged needed to stop) shouldn't
// panic — it just means the very first ref triggers the bail.
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 0,
};
assert!(should_stop(mode, 0));
}
}

View File

@@ -82,21 +82,42 @@ pub struct FetchContext<'a> {
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
}
/// Lazy iterator over discovered manga refs. The caller drives the
/// walk one batch at a time, so it can break out as soon as a
/// downstream stop condition is met (e.g. N consecutive Unchanged
/// upserts in Incremental mode) without paying for pages it won't use.
///
/// Batches are typically one source-index page each. Within a batch
/// refs are already in the right per-page order for the active mode
/// (Backfill reverses each page to oldest-first; Incremental leaves
/// the source's natural newest-first ordering).
#[async_trait]
pub trait DiscoverWalk: Send {
/// Return the next batch of refs, or `Ok(None)` when the source has
/// no more pages. The walker is single-use; calling `next_batch`
/// after `None` is allowed and continues to return `None`.
async fn next_batch(
&mut self,
ctx: &FetchContext<'_>,
) -> anyhow::Result<Option<Vec<SourceMangaRef>>>;
}
#[async_trait]
pub trait Source: Send + Sync {
/// Stable identifier — also the row key in the `sources` table.
fn id(&self) -> &'static str;
/// Returns up to `max_results` manga refs in source order. Pass
/// `None` for an uncapped walk (full backfill / incremental sweep).
/// Implementations should stop paginating as soon as the cap is
/// reached so partial runs don't pay for pages they won't use.
/// Begin discovery in `mode`. Returns a walker the caller drives
/// page-by-page via `next_batch`. The initial page-1 probe (used
/// to determine `last_page` and warm the cache for sites that
/// can't be paged without knowing the bound) happens inside this
/// call, so a fresh walker is ready to yield its first batch
/// without further setup.
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
max_results: Option<usize>,
) -> anyhow::Result<Vec<SourceMangaRef>>;
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>>;
async fn fetch_manga(
&self,

View File

@@ -7,6 +7,7 @@
//! (`td:has(label:contains("Author:"))`) are implemented by walking
//! the parsed tree.
use std::collections::VecDeque;
use std::time::Duration;
use anyhow::Context;
@@ -14,13 +15,18 @@ use async_trait::async_trait;
use sha2::{Digest, Sha256};
use super::{
DiscoverMode, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga,
SourceMangaRef,
DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef,
SourceManga, SourceMangaRef,
};
use crate::crawler::detect::{
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
};
/// `sources.id` value for this Source impl. Exposed as a const so the
/// daemon can look up per-source state (e.g. `seed_completed_at`)
/// before constructing the Source itself.
pub const SOURCE_ID: &str = "target";
/// In-loop retry budget for transient pages encountered during a single
/// `discover` walk. Bounded small because the job system itself retries
/// the whole `Discover` job on failure — these inline retries only need
@@ -60,15 +66,14 @@ impl TargetSource {
#[async_trait]
impl Source for TargetSource {
fn id(&self) -> &'static str {
"target"
SOURCE_ID
}
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
max_results: Option<usize>,
) -> anyhow::Result<Vec<SourceMangaRef>> {
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
// Always visit page 1 first because that's the only way to
// discover `last_page`. Retry it on transient — a broken first
// page would otherwise abort the whole walk before we've even
@@ -85,15 +90,7 @@ impl Source for TargetSource {
};
let backfill = matches!(mode, DiscoverMode::Backfill);
let order: Vec<i32> = match (last_page, backfill) {
(None, _) => vec![1],
// Backfill = oldest-first: walk pages last → 1, then
// reverse within each page (the listing is update_date
// DESC, so the bottom of the last page is the oldest
// entry the source still surfaces).
(Some(last), true) => (1..=last).rev().collect(),
(Some(last), false) => (1..=last).collect(),
};
let order = build_page_order(last_page, backfill);
tracing::info!(
?mode,
last_page = ?last_page,
@@ -101,40 +98,12 @@ impl Source for TargetSource {
"walking pagination"
);
let mut all = Vec::new();
for page_num in order {
// Page 1 is already cached from the last_page probe — reuse
// it rather than navigating twice. Every other page goes
// through the retry helper so a single broken page mid-walk
// doesn't silently drop its mangas from the result.
let mut page_refs = if page_num == 1 {
let doc = scraper::Html::parse_document(&first_html);
parse_manga_list_from(&doc)?
} else {
retry_on_transient(
|| async {
let url = page_url(&self.base_url, page_num);
let html = navigate(ctx, &url).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
};
if backfill {
page_refs.reverse();
}
tracing::info!(page_num, count = page_refs.len(), "page walked");
all.extend(page_refs);
if cap_reached(&all, max_results) {
tracing::info!(cap = ?max_results, "max_results reached; halting pagination");
break;
}
}
Ok(truncate_to_cap(all, max_results))
Ok(Box::new(TargetSourceWalker {
base_url: self.base_url.clone(),
backfill,
pages_remaining: order,
first_page_html: Some(first_html),
}))
}
async fn fetch_manga(
@@ -168,15 +137,81 @@ impl Source for TargetSource {
}
}
fn cap_reached<T>(buf: &[T], max: Option<usize>) -> bool {
matches!(max, Some(m) if buf.len() >= m)
/// Build the queue of page numbers `TargetSource::discover` will walk.
/// Backfill is oldest-first: pages `last..=1` (within each page the
/// walker reverses entries, since the source orders by update_date
/// DESC). Incremental is newest-first: pages `1..=last` in natural
/// order. If `last_page` is unknown (source surfaces no pagination)
/// only page 1 is visited.
fn build_page_order(last_page: Option<i32>, backfill: bool) -> VecDeque<i32> {
match (last_page, backfill) {
(None, _) => VecDeque::from([1]),
(Some(last), true) => (1..=last).rev().collect(),
(Some(last), false) => (1..=last).collect(),
}
}
fn truncate_to_cap<T>(mut buf: Vec<T>, max: Option<usize>) -> Vec<T> {
if let Some(m) = max {
buf.truncate(m);
/// Walker returned by [`TargetSource::discover`]. Pops one source-index
/// page per `next_batch` call. Page 1's HTML is cached at construction
/// time (the discover call needed it to read `last_page` anyway) so the
/// batch covering page 1 doesn't re-fetch.
struct TargetSourceWalker {
base_url: String,
backfill: bool,
pages_remaining: VecDeque<i32>,
first_page_html: Option<String>,
}
#[async_trait]
impl DiscoverWalk for TargetSourceWalker {
async fn next_batch(
&mut self,
ctx: &FetchContext<'_>,
) -> anyhow::Result<Option<Vec<SourceMangaRef>>> {
let Some(page_num) = self.pages_remaining.pop_front() else {
return Ok(None);
};
let mut page_refs = if page_num == 1 {
// Reuse the cached page-1 HTML from the initial probe. Take
// it (rather than clone) so a malformed page-order queue
// that re-visits page 1 still falls back to a real fetch.
match self.first_page_html.take() {
Some(html) => {
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)?
}
None => {
retry_on_transient(
|| async {
let html = navigate(ctx, self.base_url.as_str()).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
}
}
} else {
retry_on_transient(
|| async {
let url = page_url(&self.base_url, page_num);
let html = navigate(ctx, &url).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
};
if self.backfill {
page_refs.reverse();
}
tracing::info!(page_num, count = page_refs.len(), "page walked");
Ok(Some(page_refs))
}
buf
}
/// Single point of rate-limited navigation. Every Source request goes
@@ -922,4 +957,37 @@ mod tests {
let err = parse_manga_detail(html, "x", true).expect_err("expected Transient");
assert!(err.is_transient(), "got non-transient: {err}");
}
#[test]
fn build_page_order_backfill_is_last_to_one() {
// Backfill walks pages oldest-first: queue is [last, last-1, ..., 1]
// so popping from the front yields the last page first.
let order = build_page_order(Some(3), true);
assert_eq!(Vec::from(order), vec![3, 2, 1]);
}
#[test]
fn build_page_order_incremental_is_one_to_last() {
// Incremental walks newest-first in natural source order.
let order = build_page_order(Some(3), false);
assert_eq!(Vec::from(order), vec![1, 2, 3]);
}
#[test]
fn build_page_order_falls_back_to_page_one_only_without_pagination() {
let backfill = build_page_order(None, true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(None, false);
assert_eq!(Vec::from(incremental), vec![1]);
}
#[test]
fn build_page_order_single_page_index_yields_one_entry() {
// Sources with exactly one page should not yield duplicates
// regardless of mode.
let backfill = build_page_order(Some(1), true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(Some(1), false);
assert_eq!(Vec::from(incremental), vec![1]);
}
}