From 45ce0d8f12222696c894b8f1bb81cad462201dda Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Thu, 28 May 2026 06:41:26 +0200 Subject: [PATCH] feat: incremental crawl mode with seed-completion gate (0.33.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daemon now auto-detects mode per source: Backfill until the first full walk records `seed_completed:` in `crawler_state`, then Incremental (newest-first, stops after N consecutive Unchanged upserts). `CRAWLER_MODE` overrides to a fixed mode; CLI rejects `auto` since it has no pre-run DB state. `Source::discover` returns a lazy `DiscoverWalk` so Incremental can break out mid-walk without prefetching pages. The drop pass and seed marker are now gated on a true full walk — fixes a latent soft-drop of the index tail under partial sweeps. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- backend/src/app.rs | 60 ++++- backend/src/bin/crawler.rs | 91 ++++++++ backend/src/config.rs | 109 +++++++++ backend/src/crawler/pipeline.rs | 316 ++++++++++++++++++--------- backend/src/crawler/source.rs | 33 ++- backend/src/crawler/source/target.rs | 176 ++++++++++----- backend/src/repo/crawler.rs | 47 ++++ backend/tests/crawler_incremental.rs | 85 +++++++ frontend/package.json | 2 +- 11 files changed, 761 insertions(+), 162 deletions(-) create mode 100644 backend/tests/crawler_incremental.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index f599f82..c5ded69 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.32.0" +version = "0.33.0" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 3bc4dca..30934f2 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.32.0" +version = "0.33.0" edition = "2021" default-run = "mangalord" diff --git a/backend/src/app.rs b/backend/src/app.rs index d8118ba..27734bb 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; -use crate::config::{AuthConfig, Config, CrawlerConfig, UploadConfig}; +use crate::config::{AuthConfig, Config, CrawlerConfig, CrawlerModePref, UploadConfig}; use crate::crawler::browser_manager::{self, BrowserManager}; use crate::crawler::content::{self, SyncOutcome}; use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass}; @@ -20,6 +20,8 @@ use crate::crawler::jobs::JobPayload; use crate::crawler::pipeline::{self, MetadataStats}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::session; +use crate::crawler::source::{target as target_source, DiscoverMode}; +use crate::repo; use crate::storage::{LocalStorage, Storage}; #[derive(Clone)] @@ -149,6 +151,8 @@ async fn spawn_crawler_daemon( http: http.clone(), rate: Arc::clone(&rate), start_url: url.clone(), + mode_pref: cfg.mode, + incremental_stop_after: cfg.incremental_stop_after, }); m }); @@ -210,11 +214,20 @@ struct RealMetadataPass { http: reqwest::Client, rate: Arc, start_url: String, + mode_pref: CrawlerModePref, + incremental_stop_after: usize, } #[async_trait] impl MetadataPass for RealMetadataPass { async fn run(&self) -> anyhow::Result { + let mode = resolve_mode( + &self.db, + target_source::SOURCE_ID, + self.mode_pref, + self.incremental_stop_after, + ) + .await?; pipeline::run_metadata_pass( &self.browser_manager, &self.db, @@ -224,11 +237,56 @@ impl MetadataPass for RealMetadataPass { &self.start_url, 0, false, + mode, ) .await } } +/// Pick the active mode for this tick. `Explicit` short-circuits the +/// DB lookup. `Auto` reads `seed_completed_at`: missing → Backfill +/// (initial seed for this source), present → Incremental with the +/// configured threshold. +/// +/// A DB error during the Auto lookup propagates as `Err` rather than +/// silently degrading to Backfill — the daemon's `run_tick` catches +/// the error, logs, and skips the tick. That's safer than running a +/// full re-backfill (including a drop pass against stale-looking rows) +/// when the DB is flaky. +async fn resolve_mode( + db: &PgPool, + source_id: &str, + pref: CrawlerModePref, + incremental_stop_after: usize, +) -> anyhow::Result { + match pref { + CrawlerModePref::Explicit(m) => { + tracing::info!(?m, "crawler mode: explicit (CRAWLER_MODE override)"); + Ok(m) + } + CrawlerModePref::Auto => { + let seeded = repo::crawler::seed_completed_at(db, source_id) + .await + .context("seed_completed_at lookup for mode auto-detection")?; + match seeded { + Some(at) => { + tracing::info!( + seed_completed_at = %at.to_rfc3339(), + "crawler mode: auto → incremental (seed previously completed)" + ); + Ok(DiscoverMode::Incremental { + stop_after_unchanged: incremental_stop_after, + }) + } + None => { + tracing::info!("crawler mode: auto → backfill (no seed marker for source)"); + Ok(DiscoverMode::Backfill) + } + } + } + } +} + struct RealChapterDispatcher { browser_manager: Arc, db: PgPool, diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index d767598..58c80e7 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -31,6 +31,7 @@ use mangalord::crawler::content::{self, SyncOutcome}; use mangalord::crawler::pipeline; use mangalord::crawler::rate_limit::HostRateLimiters; use mangalord::crawler::session; +use mangalord::crawler::source::DiscoverMode; use mangalord::storage::{LocalStorage, Storage}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -62,6 +63,8 @@ async fn main() -> anyhow::Result<()> { let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms); let limit = env_u64("CRAWLER_LIMIT", 0) as usize; let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false); + let incremental_stop_after = env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize; + let mode = parse_crawler_mode(incremental_stop_after)?; let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false); let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize; let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false); @@ -140,6 +143,7 @@ async fn main() -> anyhow::Result<()> { user_agent = ?user_agent, proxy = ?proxy_url, keep_open, + ?mode, storage_dir = %storage_dir.display(), "starting crawler" ); @@ -187,6 +191,7 @@ async fn main() -> anyhow::Result<()> { skip_chapter_content || !session_ready, chapter_workers, force_refetch_chapters, + mode, ) .await; @@ -216,6 +221,7 @@ async fn run( skip_chapter_content: bool, chapter_workers: usize, force_refetch_chapters: bool, + mode: DiscoverMode, ) -> anyhow::Result<()> { let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms)); if let Some(host) = cdn_host { @@ -232,6 +238,7 @@ async fn run( start_url, limit, skip_chapters, + mode, ) .await?; tracing::info!(?stats, "metadata pass complete"); @@ -390,6 +397,38 @@ fn resolve_start_url() -> anyhow::Result { }) } +/// Parse the CLI's `CRAWLER_MODE`. Defaults to `backfill` because the +/// binary is operator-driven (manual reseeds, force-refetches) — the +/// auto-detect logic lives in the daemon. `auto` is rejected because +/// the CLI has no DB state to consult before the run. +fn parse_crawler_mode(incremental_stop_after: usize) -> anyhow::Result { + parse_crawler_mode_str( + std::env::var("CRAWLER_MODE").ok().as_deref(), + incremental_stop_after, + ) +} + +/// Pure variant of [`parse_crawler_mode`] — testable without env-var +/// mutation. +fn parse_crawler_mode_str( + raw: Option<&str>, + incremental_stop_after: usize, +) -> anyhow::Result { + match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() { + None | Some("") | Some("backfill") => Ok(DiscoverMode::Backfill), + Some("incremental") => Ok(DiscoverMode::Incremental { + stop_after_unchanged: incremental_stop_after, + }), + Some("auto") => Err(anyhow!( + "CRAWLER_MODE=auto isn't supported by the CLI (use backfill or incremental); \ + the daemon does auto-detection" + )), + Some(other) => Err(anyhow!( + "CRAWLER_MODE must be one of: backfill, incremental (got {other:?})" + )), + } +} + fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -405,3 +444,55 @@ fn env_bool(name: &str, default: bool) -> bool { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cli_mode_defaults_to_backfill_when_unset_or_blank() { + let none = parse_crawler_mode_str(None, 20).unwrap(); + assert!(matches!(none, DiscoverMode::Backfill)); + let blank = parse_crawler_mode_str(Some(""), 20).unwrap(); + assert!(matches!(blank, DiscoverMode::Backfill)); + } + + #[test] + fn cli_mode_recognizes_backfill_and_incremental() { + let backfill = parse_crawler_mode_str(Some("backfill"), 20).unwrap(); + assert!(matches!(backfill, DiscoverMode::Backfill)); + + let incremental = parse_crawler_mode_str(Some("incremental"), 9).unwrap(); + assert!(matches!( + incremental, + DiscoverMode::Incremental { stop_after_unchanged: 9 } + )); + } + + #[test] + fn cli_mode_rejects_auto_explicitly() { + let err = parse_crawler_mode_str(Some("auto"), 20).unwrap_err(); + let msg = format!("{err}"); + assert!( + msg.contains("daemon"), + "rejection should point operator at the daemon: {msg}" + ); + } + + #[test] + fn cli_mode_rejects_unknown_value() { + let err = parse_crawler_mode_str(Some("garbage"), 20).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("backfill")); + assert!(msg.contains("incremental")); + } + + #[test] + fn cli_mode_is_case_insensitive_and_trims() { + let mixed = parse_crawler_mode_str(Some(" Incremental "), 4).unwrap(); + assert!(matches!( + mixed, + DiscoverMode::Incremental { stop_after_unchanged: 4 } + )); + } +} + diff --git a/backend/src/config.rs b/backend/src/config.rs index 113ec89..7f0180a 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -5,6 +5,16 @@ use chrono::NaiveTime; use chrono_tz::Tz; use crate::crawler::browser::LaunchOptions; +use crate::crawler::source::DiscoverMode; + +/// What `CRAWLER_MODE` was set to. `Auto` is the daemon's default — +/// pick Backfill until `seed_completed_at` is written, then flip to +/// Incremental. `Explicit` forces a single mode regardless. +#[derive(Clone, Copy, Debug)] +pub enum CrawlerModePref { + Auto, + Explicit(DiscoverMode), +} #[derive(Clone, Debug)] pub struct AuthConfig { @@ -77,6 +87,12 @@ pub struct CrawlerConfig { pub user_agent: Option, pub proxy: Option, pub browser: LaunchOptions, + /// Mode preference for the metadata pass. Daemon default is `Auto` + /// (Backfill until `seed_completed_at` is written, then Incremental). + pub mode: CrawlerModePref, + /// `stop_after_unchanged` threshold supplied to Incremental in both + /// `Auto` (post-seed) and `Explicit(Incremental)` modes. + pub incremental_stop_after: usize, } impl Default for CrawlerConfig { @@ -97,6 +113,8 @@ impl Default for CrawlerConfig { user_agent: None, proxy: None, browser: LaunchOptions::headless(), + mode: CrawlerModePref::Auto, + incremental_stop_after: 20, } } } @@ -151,6 +169,9 @@ impl CrawlerConfig { .parse() .map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?, }; + let incremental_stop_after = + env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize; + let mode = parse_mode_env(incremental_stop_after)?; Ok(Self { daemon_enabled: env_bool("CRAWLER_DAEMON", true), daily_at, @@ -179,10 +200,38 @@ impl CrawlerConfig { .ok() .filter(|s| !s.trim().is_empty()), browser: LaunchOptions::from_env(), + mode, + incremental_stop_after, }) } } +/// Parse `CRAWLER_MODE`. Empty/unset → `Auto`. Recognized values are +/// `auto`, `backfill`, and `incremental` (case-insensitive). Anything +/// else is a hard error so a typo can't silently fall through to the +/// default and mask itself. +fn parse_mode_env(incremental_stop_after: usize) -> anyhow::Result { + parse_mode_str(std::env::var("CRAWLER_MODE").ok().as_deref(), incremental_stop_after) +} + +/// Pure variant of [`parse_mode_env`] — testable without env-var +/// mutation. Takes the raw value (or `None` if unset). +pub(crate) fn parse_mode_str( + raw: Option<&str>, + incremental_stop_after: usize, +) -> anyhow::Result { + match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() { + None | Some("") | Some("auto") => Ok(CrawlerModePref::Auto), + Some("backfill") => Ok(CrawlerModePref::Explicit(DiscoverMode::Backfill)), + Some("incremental") => Ok(CrawlerModePref::Explicit(DiscoverMode::Incremental { + stop_after_unchanged: incremental_stop_after, + })), + Some(other) => Err(anyhow::anyhow!( + "CRAWLER_MODE must be one of: auto, backfill, incremental (got {other:?})" + )), + } +} + fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -211,3 +260,63 @@ fn env_usize(name: &str, default: usize) -> usize { .and_then(|s| s.parse().ok()) .unwrap_or(default) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_mode_str_defaults_to_auto_when_unset_or_blank() { + let none = parse_mode_str(None, 20).unwrap(); + assert!(matches!(none, CrawlerModePref::Auto)); + let blank = parse_mode_str(Some(""), 20).unwrap(); + assert!(matches!(blank, CrawlerModePref::Auto)); + let whitespace = parse_mode_str(Some(" "), 20).unwrap(); + assert!(matches!(whitespace, CrawlerModePref::Auto)); + } + + #[test] + fn parse_mode_str_recognizes_each_keyword() { + let auto = parse_mode_str(Some("auto"), 20).unwrap(); + assert!(matches!(auto, CrawlerModePref::Auto)); + + let backfill = parse_mode_str(Some("backfill"), 20).unwrap(); + assert!(matches!( + backfill, + CrawlerModePref::Explicit(DiscoverMode::Backfill) + )); + + let incremental = parse_mode_str(Some("incremental"), 7).unwrap(); + assert!(matches!( + incremental, + CrawlerModePref::Explicit(DiscoverMode::Incremental { + stop_after_unchanged: 7 + }) + )); + } + + #[test] + fn parse_mode_str_is_case_insensitive_and_trims_whitespace() { + let mixed = parse_mode_str(Some(" Incremental "), 5).unwrap(); + assert!(matches!( + mixed, + CrawlerModePref::Explicit(DiscoverMode::Incremental { + stop_after_unchanged: 5 + }) + )); + let upper = parse_mode_str(Some("BACKFILL"), 5).unwrap(); + assert!(matches!( + upper, + CrawlerModePref::Explicit(DiscoverMode::Backfill) + )); + } + + #[test] + fn parse_mode_str_hard_errors_on_unknown_value() { + let err = parse_mode_str(Some("backfil"), 20).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("backfill"), "error should list valid values: {msg}"); + assert!(msg.contains("auto")); + assert!(msg.contains("incremental")); + } +} diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index 88fa4e9..e1050e9 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -23,14 +23,34 @@ pub struct MetadataStats { pub mangas_failed: usize, } +/// Decide whether the per-ref loop should stop based on the Incremental +/// streak counter. Pulled out as a pure function so the rule is unit- +/// testable without standing up the walker or DB. +pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool { + match mode { + DiscoverMode::Backfill => false, + DiscoverMode::Incremental { stop_after_unchanged } => { + consecutive_unchanged >= stop_after_unchanged + } + } +} + /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline /// for the target source. Pure metadata; chapter content is enqueued as /// separate `SyncChapterContent` jobs by the caller after this returns. /// -/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is -/// the "metadata-only" mode (parser doesn't extract chapters, and -/// `sync_manga_chapters` is skipped — otherwise an empty chapter list -/// would soft-drop existing rows). +/// `limit == 0` means no cap (full sweep up to the source's own bound). +/// `skip_chapters == true` is the "metadata-only" mode (parser doesn't +/// extract chapters, and `sync_manga_chapters` is skipped — otherwise an +/// empty chapter list would soft-drop existing rows). +/// +/// `mode` controls the walk: +/// - `Backfill` — oldest-first, no early exit. The only mode that runs +/// the end-of-walk drop pass + writes `seed_completed_at`. +/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out +/// after N consecutive Unchanged upserts. Drop pass is skipped (the +/// tail of the index is never visited, so its `last_seen_at` is +/// stale and using it to soft-drop would be unsafe). #[allow(clippy::too_many_arguments)] pub async fn run_metadata_pass( browser_manager: &BrowserManager, @@ -41,6 +61,7 @@ pub async fn run_metadata_pass( start_url: &str, limit: usize, skip_chapters: bool, + mode: DiscoverMode, ) -> anyhow::Result { let lease = browser_manager .acquire() @@ -74,123 +95,189 @@ pub async fn run_metadata_pass( let run_started_at = chrono::Utc::now(); let max_refs = (limit > 0).then_some(limit); - tracing::info!(?max_refs, "discovering manga list"); - let refs = source - .discover(&ctx, DiscoverMode::Backfill, max_refs) + tracing::info!(?mode, ?max_refs, "starting metadata pass"); + let mut walker = source + .discover(&ctx, mode) .await .context("discover failed")?; - tracing::info!(count = refs.len(), "discovered manga list"); - let mut stats = MetadataStats { - discovered: refs.len(), - ..MetadataStats::default() - }; + let mut stats = MetadataStats::default(); + let mut consecutive_unchanged: usize = 0; + let mut walked_to_completion = false; + let mut hit_limit = false; + let mut hit_incremental_stop = false; - for (i, r) in refs.iter().enumerate() { - tracing::info!( - idx = i + 1, - total = stats.discovered, - key = %r.source_manga_key, - "fetching metadata" - ); - let manga = match source.fetch_manga(&ctx, r).await { - Ok(m) => m, - Err(e) => { - tracing::warn!( - key = %r.source_manga_key, - url = %r.url, - error = ?e, - "fetch_manga failed" - ); - stats.mangas_failed += 1; - continue; + 'outer: loop { + let batch = match walker.next_batch(&ctx).await? { + Some(b) => b, + None => { + walked_to_completion = true; + break; } }; - - let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga) - .await - { - Ok(u) => u, - Err(e) => { - tracing::error!( - key = %r.source_manga_key, - error = ?e, - "upsert_manga_from_source failed" - ); - stats.mangas_failed += 1; - continue; + for r in batch { + if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) { + hit_limit = true; + tracing::info!(cap = ?max_refs, "max_results reached; halting walk"); + break 'outer; } - }; - stats.upserted += 1; - tracing::info!( - key = %manga.source_manga_key, - manga_id = %upsert.manga_id, - status = ?upsert.status, - title = %manga.title, - "manga upserted" - ); - - // Cover image: download when missing in storage or when metadata - // signaled an update (cover URL is part of metadata_hash, so - // Updated implies the URL may have moved). Failures are non-fatal. - let needs_cover = upsert.cover_image_path.is_none() - || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); - if needs_cover { - if let Some(cover_url) = manga.cover_url.as_deref() { - match download_and_store_cover( - db, - storage, - http, - rate, - &r.url, - upsert.manga_id, - cover_url, - ) - .await - { - Ok(()) => stats.covers_fetched += 1, - Err(e) => tracing::warn!( - manga_id = %upsert.manga_id, + stats.discovered += 1; + tracing::info!( + idx = stats.discovered, + key = %r.source_manga_key, + "fetching metadata" + ); + let manga = match source.fetch_manga(&ctx, &r).await { + Ok(m) => m, + Err(e) => { + tracing::warn!( + key = %r.source_manga_key, + url = %r.url, error = ?e, - "cover download failed" - ), + "fetch_manga failed" + ); + stats.mangas_failed += 1; + continue; } - } - } + }; - if !skip_chapters { - match repo::crawler::sync_manga_chapters( - db, - source_id, - upsert.manga_id, - &manga.chapters, + let upsert = match repo::crawler::upsert_manga_from_source( + db, source_id, &r.url, &manga, ) .await { - Ok(diff) => tracing::info!( - manga_id = %upsert.manga_id, - new = diff.new, - refreshed = diff.refreshed, - dropped = diff.dropped, - "chapters synced" - ), - Err(e) => tracing::warn!( - manga_id = %upsert.manga_id, - error = ?e, - "chapter sync failed" - ), + Ok(u) => u, + Err(e) => { + tracing::error!( + key = %r.source_manga_key, + error = ?e, + "upsert_manga_from_source failed" + ); + stats.mangas_failed += 1; + continue; + } + }; + stats.upserted += 1; + tracing::info!( + key = %manga.source_manga_key, + manga_id = %upsert.manga_id, + status = ?upsert.status, + title = %manga.title, + "manga upserted" + ); + + // Cover image: download when missing in storage or when metadata + // signaled an update (cover URL is part of metadata_hash, so + // Updated implies the URL may have moved). Failures are non-fatal. + let needs_cover = upsert.cover_image_path.is_none() + || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); + if needs_cover { + if let Some(cover_url) = manga.cover_url.as_deref() { + match download_and_store_cover( + db, + storage, + http, + rate, + &r.url, + upsert.manga_id, + cover_url, + ) + .await + { + Ok(()) => stats.covers_fetched += 1, + Err(e) => tracing::warn!( + manga_id = %upsert.manga_id, + error = ?e, + "cover download failed" + ), + } + } + } + + if !skip_chapters { + match repo::crawler::sync_manga_chapters( + db, + source_id, + upsert.manga_id, + &manga.chapters, + ) + .await + { + Ok(diff) => tracing::info!( + manga_id = %upsert.manga_id, + new = diff.new, + refreshed = diff.refreshed, + dropped = diff.dropped, + "chapters synced" + ), + Err(e) => tracing::warn!( + manga_id = %upsert.manga_id, + error = ?e, + "chapter sync failed" + ), + } + } + + // Incremental stop: count consecutive Unchanged upserts and + // bail once the threshold is reached. New/Updated resets the + // streak so a fresh entry mid-batch doesn't accidentally trip + // the cutoff. + match upsert.status { + repo::crawler::UpsertStatus::Unchanged => { + consecutive_unchanged += 1; + } + repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => { + consecutive_unchanged = 0; + } + } + if should_stop(mode, consecutive_unchanged) { + hit_incremental_stop = true; + tracing::info!( + consecutive_unchanged, + "incremental stop threshold reached; halting walk" + ); + break 'outer; } } } - if limit == 0 { + // Drop pass: only when the walk truly covered everything the source + // surfaces. `last_seen_at` on un-visited rows is stale, so running + // the drop on a partial walk would soft-drop the tail of the index. + let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop; + let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill); + if full_walk { match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), } } else { - tracing::info!(limit, "partial sync — skipping drop pass"); + tracing::info!( + ?mode, + hit_limit, + hit_incremental_stop, + "partial sync — skipping drop pass" + ); } + if backfill_complete { + if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await { + tracing::warn!(error = ?e, "mark_seed_completed failed"); + } else { + tracing::info!(source_id, "seed marked complete"); + } + } + + tracing::info!( + ?mode, + discovered = stats.discovered, + upserted = stats.upserted, + covers_fetched = stats.covers_fetched, + mangas_failed = stats.mangas_failed, + walked_to_completion, + hit_limit, + hit_incremental_stop, + "metadata pass complete" + ); drop(lease); Ok(stats) @@ -345,3 +432,36 @@ fn origin_of(url: &str) -> Option { let host = rest.split('/').next()?; Some(format!("{scheme}://{host}")) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backfill_never_stops_regardless_of_streak() { + assert!(!should_stop(DiscoverMode::Backfill, 0)); + assert!(!should_stop(DiscoverMode::Backfill, 100)); + assert!(!should_stop(DiscoverMode::Backfill, usize::MAX)); + } + + #[test] + fn incremental_stops_when_streak_meets_threshold() { + let mode = DiscoverMode::Incremental { + stop_after_unchanged: 3, + }; + assert!(!should_stop(mode, 0)); + assert!(!should_stop(mode, 2)); + assert!(should_stop(mode, 3), "stops at exactly the threshold"); + assert!(should_stop(mode, 100), "stops at anything past threshold"); + } + + #[test] + fn incremental_with_zero_threshold_stops_immediately() { + // A nonsensical config (no Unchanged needed to stop) shouldn't + // panic — it just means the very first ref triggers the bail. + let mode = DiscoverMode::Incremental { + stop_after_unchanged: 0, + }; + assert!(should_stop(mode, 0)); + } +} diff --git a/backend/src/crawler/source.rs b/backend/src/crawler/source.rs index cec6613..b11c52c 100644 --- a/backend/src/crawler/source.rs +++ b/backend/src/crawler/source.rs @@ -82,21 +82,42 @@ pub struct FetchContext<'a> { pub rate: &'a crate::crawler::rate_limit::HostRateLimiters, } +/// Lazy iterator over discovered manga refs. The caller drives the +/// walk one batch at a time, so it can break out as soon as a +/// downstream stop condition is met (e.g. N consecutive Unchanged +/// upserts in Incremental mode) without paying for pages it won't use. +/// +/// Batches are typically one source-index page each. Within a batch +/// refs are already in the right per-page order for the active mode +/// (Backfill reverses each page to oldest-first; Incremental leaves +/// the source's natural newest-first ordering). +#[async_trait] +pub trait DiscoverWalk: Send { + /// Return the next batch of refs, or `Ok(None)` when the source has + /// no more pages. The walker is single-use; calling `next_batch` + /// after `None` is allowed and continues to return `None`. + async fn next_batch( + &mut self, + ctx: &FetchContext<'_>, + ) -> anyhow::Result>>; +} + #[async_trait] pub trait Source: Send + Sync { /// Stable identifier — also the row key in the `sources` table. fn id(&self) -> &'static str; - /// Returns up to `max_results` manga refs in source order. Pass - /// `None` for an uncapped walk (full backfill / incremental sweep). - /// Implementations should stop paginating as soon as the cap is - /// reached so partial runs don't pay for pages they won't use. + /// Begin discovery in `mode`. Returns a walker the caller drives + /// page-by-page via `next_batch`. The initial page-1 probe (used + /// to determine `last_page` and warm the cache for sites that + /// can't be paged without knowing the bound) happens inside this + /// call, so a fresh walker is ready to yield its first batch + /// without further setup. async fn discover( &self, ctx: &FetchContext<'_>, mode: DiscoverMode, - max_results: Option, - ) -> anyhow::Result>; + ) -> anyhow::Result>; async fn fetch_manga( &self, diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs index 9288bee..dba59aa 100644 --- a/backend/src/crawler/source/target.rs +++ b/backend/src/crawler/source/target.rs @@ -7,6 +7,7 @@ //! (`td:has(label:contains("Author:"))`) are implemented by walking //! the parsed tree. +use std::collections::VecDeque; use std::time::Duration; use anyhow::Context; @@ -14,13 +15,18 @@ use async_trait::async_trait; use sha2::{Digest, Sha256}; use super::{ - DiscoverMode, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga, - SourceMangaRef, + DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef, + SourceManga, SourceMangaRef, }; use crate::crawler::detect::{ has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError, }; +/// `sources.id` value for this Source impl. Exposed as a const so the +/// daemon can look up per-source state (e.g. `seed_completed_at`) +/// before constructing the Source itself. +pub const SOURCE_ID: &str = "target"; + /// In-loop retry budget for transient pages encountered during a single /// `discover` walk. Bounded small because the job system itself retries /// the whole `Discover` job on failure — these inline retries only need @@ -60,15 +66,14 @@ impl TargetSource { #[async_trait] impl Source for TargetSource { fn id(&self) -> &'static str { - "target" + SOURCE_ID } async fn discover( &self, ctx: &FetchContext<'_>, mode: DiscoverMode, - max_results: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result> { // Always visit page 1 first because that's the only way to // discover `last_page`. Retry it on transient — a broken first // page would otherwise abort the whole walk before we've even @@ -85,15 +90,7 @@ impl Source for TargetSource { }; let backfill = matches!(mode, DiscoverMode::Backfill); - let order: Vec = match (last_page, backfill) { - (None, _) => vec![1], - // Backfill = oldest-first: walk pages last → 1, then - // reverse within each page (the listing is update_date - // DESC, so the bottom of the last page is the oldest - // entry the source still surfaces). - (Some(last), true) => (1..=last).rev().collect(), - (Some(last), false) => (1..=last).collect(), - }; + let order = build_page_order(last_page, backfill); tracing::info!( ?mode, last_page = ?last_page, @@ -101,40 +98,12 @@ impl Source for TargetSource { "walking pagination" ); - let mut all = Vec::new(); - for page_num in order { - // Page 1 is already cached from the last_page probe — reuse - // it rather than navigating twice. Every other page goes - // through the retry helper so a single broken page mid-walk - // doesn't silently drop its mangas from the result. - let mut page_refs = if page_num == 1 { - let doc = scraper::Html::parse_document(&first_html); - parse_manga_list_from(&doc)? - } else { - retry_on_transient( - || async { - let url = page_url(&self.base_url, page_num); - let html = navigate(ctx, &url).await?; - let doc = scraper::Html::parse_document(&html); - parse_manga_list_from(&doc) - }, - PAGE_TRANSIENT_RETRY_ATTEMPTS, - PAGE_TRANSIENT_RETRY_DELAY, - ) - .await? - }; - if backfill { - page_refs.reverse(); - } - tracing::info!(page_num, count = page_refs.len(), "page walked"); - all.extend(page_refs); - if cap_reached(&all, max_results) { - tracing::info!(cap = ?max_results, "max_results reached; halting pagination"); - break; - } - } - - Ok(truncate_to_cap(all, max_results)) + Ok(Box::new(TargetSourceWalker { + base_url: self.base_url.clone(), + backfill, + pages_remaining: order, + first_page_html: Some(first_html), + })) } async fn fetch_manga( @@ -168,15 +137,81 @@ impl Source for TargetSource { } } -fn cap_reached(buf: &[T], max: Option) -> bool { - matches!(max, Some(m) if buf.len() >= m) +/// Build the queue of page numbers `TargetSource::discover` will walk. +/// Backfill is oldest-first: pages `last..=1` (within each page the +/// walker reverses entries, since the source orders by update_date +/// DESC). Incremental is newest-first: pages `1..=last` in natural +/// order. If `last_page` is unknown (source surfaces no pagination) +/// only page 1 is visited. +fn build_page_order(last_page: Option, backfill: bool) -> VecDeque { + match (last_page, backfill) { + (None, _) => VecDeque::from([1]), + (Some(last), true) => (1..=last).rev().collect(), + (Some(last), false) => (1..=last).collect(), + } } -fn truncate_to_cap(mut buf: Vec, max: Option) -> Vec { - if let Some(m) = max { - buf.truncate(m); +/// Walker returned by [`TargetSource::discover`]. Pops one source-index +/// page per `next_batch` call. Page 1's HTML is cached at construction +/// time (the discover call needed it to read `last_page` anyway) so the +/// batch covering page 1 doesn't re-fetch. +struct TargetSourceWalker { + base_url: String, + backfill: bool, + pages_remaining: VecDeque, + first_page_html: Option, +} + +#[async_trait] +impl DiscoverWalk for TargetSourceWalker { + async fn next_batch( + &mut self, + ctx: &FetchContext<'_>, + ) -> anyhow::Result>> { + let Some(page_num) = self.pages_remaining.pop_front() else { + return Ok(None); + }; + let mut page_refs = if page_num == 1 { + // Reuse the cached page-1 HTML from the initial probe. Take + // it (rather than clone) so a malformed page-order queue + // that re-visits page 1 still falls back to a real fetch. + match self.first_page_html.take() { + Some(html) => { + let doc = scraper::Html::parse_document(&html); + parse_manga_list_from(&doc)? + } + None => { + retry_on_transient( + || async { + let html = navigate(ctx, self.base_url.as_str()).await?; + let doc = scraper::Html::parse_document(&html); + parse_manga_list_from(&doc) + }, + PAGE_TRANSIENT_RETRY_ATTEMPTS, + PAGE_TRANSIENT_RETRY_DELAY, + ) + .await? + } + } + } else { + retry_on_transient( + || async { + let url = page_url(&self.base_url, page_num); + let html = navigate(ctx, &url).await?; + let doc = scraper::Html::parse_document(&html); + parse_manga_list_from(&doc) + }, + PAGE_TRANSIENT_RETRY_ATTEMPTS, + PAGE_TRANSIENT_RETRY_DELAY, + ) + .await? + }; + if self.backfill { + page_refs.reverse(); + } + tracing::info!(page_num, count = page_refs.len(), "page walked"); + Ok(Some(page_refs)) } - buf } /// Single point of rate-limited navigation. Every Source request goes @@ -922,4 +957,37 @@ mod tests { let err = parse_manga_detail(html, "x", true).expect_err("expected Transient"); assert!(err.is_transient(), "got non-transient: {err}"); } + + #[test] + fn build_page_order_backfill_is_last_to_one() { + // Backfill walks pages oldest-first: queue is [last, last-1, ..., 1] + // so popping from the front yields the last page first. + let order = build_page_order(Some(3), true); + assert_eq!(Vec::from(order), vec![3, 2, 1]); + } + + #[test] + fn build_page_order_incremental_is_one_to_last() { + // Incremental walks newest-first in natural source order. + let order = build_page_order(Some(3), false); + assert_eq!(Vec::from(order), vec![1, 2, 3]); + } + + #[test] + fn build_page_order_falls_back_to_page_one_only_without_pagination() { + let backfill = build_page_order(None, true); + assert_eq!(Vec::from(backfill), vec![1]); + let incremental = build_page_order(None, false); + assert_eq!(Vec::from(incremental), vec![1]); + } + + #[test] + fn build_page_order_single_page_index_yields_one_entry() { + // Sources with exactly one page should not yield duplicates + // regardless of mode. + let backfill = build_page_order(Some(1), true); + assert_eq!(Vec::from(backfill), vec![1]); + let incremental = build_page_order(Some(1), false); + assert_eq!(Vec::from(incremental), vec![1]); + } } diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index 2645323..eb83877 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -412,6 +412,53 @@ pub async fn sync_manga_chapters( Ok(diff) } +/// Record that a complete Backfill walk has finished for `source_id`. +/// The presence of this row is what the daemon's mode auto-detection +/// uses to flip from Backfill to Incremental on subsequent ticks. +/// +/// Keyed `seed_completed:` in `crawler_state`. JSON payload +/// stores the timestamp so we can surface "last fully reseeded at" in +/// future ops tooling without another migration. +pub async fn mark_seed_completed( + pool: &PgPool, + source_id: &str, + at: DateTime, +) -> sqlx::Result<()> { + let key = format!("seed_completed:{source_id}"); + sqlx::query( + "INSERT INTO crawler_state (key, value, updated_at) \ + VALUES ($1, $2, now()) \ + ON CONFLICT (key) DO UPDATE \ + SET value = EXCLUDED.value, updated_at = now()", + ) + .bind(&key) + .bind(serde_json::json!({ "at": at.to_rfc3339() })) + .execute(pool) + .await?; + Ok(()) +} + +/// Read the timestamp written by [`mark_seed_completed`], if any. +/// `None` means no complete Backfill has ever finished for this +/// source — the daemon should run Backfill on the next tick. +pub async fn seed_completed_at( + pool: &PgPool, + source_id: &str, +) -> sqlx::Result>> { + let key = format!("seed_completed:{source_id}"); + let row: Option = + sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1") + .bind(&key) + .fetch_optional(pool) + .await?; + Ok(row.and_then(|v| { + v.get("at") + .and_then(|s| s.as_str()) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + })) +} + pub async fn mark_dropped_mangas( pool: &PgPool, source_id: &str, diff --git a/backend/tests/crawler_incremental.rs b/backend/tests/crawler_incremental.rs new file mode 100644 index 0000000..65e0710 --- /dev/null +++ b/backend/tests/crawler_incremental.rs @@ -0,0 +1,85 @@ +//! Integration tests for the incremental-mode coordination state: +//! `mark_seed_completed` / `seed_completed_at` round-trip via the +//! `crawler_state` table. +//! +//! End-to-end pipeline behavior (walker + stop-on-Unchanged) requires +//! a real `chromiumoxide::Browser` to construct a `FetchContext`, so +//! the live integration of that path is covered by +//! `crawler_browser_smoke.rs` instead. The pure stop logic itself is +//! unit-tested in `crawler::pipeline::tests`. + +use chrono::Utc; +use mangalord::repo::crawler; +use sqlx::PgPool; + +#[sqlx::test(migrations = "./migrations")] +async fn seed_completed_at_none_before_any_run(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let res = crawler::seed_completed_at(&pool, "target").await.unwrap(); + assert!(res.is_none(), "fresh source has no seed marker"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn mark_seed_completed_then_read_round_trips_timestamp(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let at = Utc::now(); + crawler::mark_seed_completed(&pool, "target", at) + .await + .unwrap(); + let read = crawler::seed_completed_at(&pool, "target") + .await + .unwrap() + .expect("marker present after mark"); + // RFC3339 round-trip is millisecond-precise on chrono::Utc; allow a + // 1ms tolerance to absorb postgres jsonb whitespace canonicalization. + let drift = (read - at).num_milliseconds().abs(); + assert!(drift <= 1, "round-trip drift: {drift}ms (at={at}, read={read})"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn mark_seed_completed_overwrites_previous_value(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let first = Utc::now() - chrono::Duration::hours(1); + let second = Utc::now(); + crawler::mark_seed_completed(&pool, "target", first) + .await + .unwrap(); + crawler::mark_seed_completed(&pool, "target", second) + .await + .unwrap(); + let read = crawler::seed_completed_at(&pool, "target") + .await + .unwrap() + .expect("marker present"); + let drift = (read - second).num_milliseconds().abs(); + assert!(drift <= 1, "should reflect the latest mark, not the first"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn seed_completed_is_per_source(pool: PgPool) { + // Two sources, only one is marked complete. The other must still + // report None — the key is namespaced by source_id. + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + crawler::ensure_source(&pool, "other", "O", "https://y.example") + .await + .unwrap(); + crawler::mark_seed_completed(&pool, "target", Utc::now()) + .await + .unwrap(); + assert!(crawler::seed_completed_at(&pool, "target") + .await + .unwrap() + .is_some()); + assert!(crawler::seed_completed_at(&pool, "other") + .await + .unwrap() + .is_none()); +} diff --git a/frontend/package.json b/frontend/package.json index 18b626e..7624415 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.32.0", + "version": "0.33.0", "private": true, "type": "module", "scripts": {