diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 668d33a..ba42dde 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.35.6" +version = "0.36.0" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index a463758..be56adf 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.35.6" +version = "0.36.0" edition = "2021" default-run = "mangalord" diff --git a/backend/src/app.rs b/backend/src/app.rs index 3302c75..b3c5ef2 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -13,7 +13,7 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; use crate::auth::rate_limit::AuthRateLimiter; -use crate::config::{AuthConfig, Config, CrawlerConfig, CrawlerModePref, UploadConfig}; +use crate::config::{AuthConfig, Config, CrawlerConfig, UploadConfig}; use crate::crawler::browser_manager::{self, BrowserManager}; use crate::crawler::content::{self, SyncOutcome}; use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass}; @@ -22,7 +22,6 @@ use crate::crawler::pipeline::{self, MetadataStats}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::safety::DownloadAllowlist; use crate::crawler::session; -use crate::crawler::source::{target as target_source, DiscoverMode}; use crate::repo; use crate::storage::{LocalStorage, Storage}; @@ -159,8 +158,6 @@ async fn spawn_crawler_daemon( http: http.clone(), rate: Arc::clone(&rate), start_url: url.clone(), - mode_pref: cfg.mode, - incremental_stop_after: cfg.incremental_stop_after, download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, }); @@ -226,8 +223,6 @@ struct RealMetadataPass { http: reqwest::Client, rate: Arc, start_url: String, - mode_pref: CrawlerModePref, - incremental_stop_after: usize, download_allowlist: DownloadAllowlist, max_image_bytes: usize, } @@ -235,13 +230,6 @@ struct RealMetadataPass { #[async_trait] impl MetadataPass for RealMetadataPass { async fn run(&self) -> anyhow::Result { - let mode = resolve_mode( - &self.db, - target_source::SOURCE_ID, - self.mode_pref, - self.incremental_stop_after, - ) - .await?; pipeline::run_metadata_pass( &self.browser_manager, &self.db, @@ -251,7 +239,6 @@ impl MetadataPass for RealMetadataPass { &self.start_url, 0, false, - mode, &self.download_allowlist, self.max_image_bytes, ) @@ -259,50 +246,6 @@ impl MetadataPass for RealMetadataPass { } } -/// Pick the active mode for this tick. `Explicit` short-circuits the -/// DB lookup. `Auto` reads `seed_completed_at`: missing → Backfill -/// (initial seed for this source), present → Incremental with the -/// configured threshold. -/// -/// A DB error during the Auto lookup propagates as `Err` rather than -/// silently degrading to Backfill — the daemon's `run_tick` catches -/// the error, logs, and skips the tick. That's safer than running a -/// full re-backfill (including a drop pass against stale-looking rows) -/// when the DB is flaky. -async fn resolve_mode( - db: &PgPool, - source_id: &str, - pref: CrawlerModePref, - incremental_stop_after: usize, -) -> anyhow::Result { - match pref { - CrawlerModePref::Explicit(m) => { - tracing::info!(?m, "crawler mode: explicit (CRAWLER_MODE override)"); - Ok(m) - } - CrawlerModePref::Auto => { - let seeded = repo::crawler::seed_completed_at(db, source_id) - .await - .context("seed_completed_at lookup for mode auto-detection")?; - match seeded { - Some(at) => { - tracing::info!( - seed_completed_at = %at.to_rfc3339(), - "crawler mode: auto → incremental (seed previously completed)" - ); - Ok(DiscoverMode::Incremental { - stop_after_unchanged: incremental_stop_after, - }) - } - None => { - tracing::info!("crawler mode: auto → backfill (no seed marker for source)"); - Ok(DiscoverMode::Backfill) - } - } - } - } -} - struct RealChapterDispatcher { browser_manager: Arc, db: PgPool, @@ -348,8 +291,8 @@ impl ChapterDispatcher for RealChapterDispatcher { Ok(outcome) } // Other payload kinds aren't dispatched by this daemon yet — - // metadata-driven jobs (Discover/SyncManga/SyncChapterList) - // are handled inline by the cron's metadata pass. + // SyncManga / SyncChapterList are handled inline by the cron's + // metadata pass. _ => Ok(SyncOutcome::Skipped), } } diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 346e838..702e218 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -31,7 +31,6 @@ use mangalord::crawler::content::{self, SyncOutcome}; use mangalord::crawler::pipeline; use mangalord::crawler::rate_limit::HostRateLimiters; use mangalord::crawler::session; -use mangalord::crawler::source::DiscoverMode; use mangalord::storage::{LocalStorage, Storage}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -63,8 +62,6 @@ async fn main() -> anyhow::Result<()> { let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms); let limit = env_u64("CRAWLER_LIMIT", 0) as usize; let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false); - let incremental_stop_after = env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize; - let mode = parse_crawler_mode(incremental_stop_after)?; let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false); let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize; let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false); @@ -143,7 +140,6 @@ async fn main() -> anyhow::Result<()> { user_agent = ?user_agent, proxy = ?proxy_url, keep_open, - ?mode, storage_dir = %storage_dir.display(), "starting crawler" ); @@ -191,7 +187,6 @@ async fn main() -> anyhow::Result<()> { skip_chapter_content || !session_ready, chapter_workers, force_refetch_chapters, - mode, ) .await; @@ -221,7 +216,6 @@ async fn run( skip_chapter_content: bool, chapter_workers: usize, force_refetch_chapters: bool, - mode: DiscoverMode, ) -> anyhow::Result<()> { let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms)); if let Some(host) = cdn_host { @@ -265,7 +259,6 @@ async fn run( start_url, limit, skip_chapters, - mode, allowlist.as_ref(), max_image_bytes, ) @@ -433,38 +426,6 @@ fn resolve_start_url() -> anyhow::Result { }) } -/// Parse the CLI's `CRAWLER_MODE`. Defaults to `backfill` because the -/// binary is operator-driven (manual reseeds, force-refetches) — the -/// auto-detect logic lives in the daemon. `auto` is rejected because -/// the CLI has no DB state to consult before the run. -fn parse_crawler_mode(incremental_stop_after: usize) -> anyhow::Result { - parse_crawler_mode_str( - std::env::var("CRAWLER_MODE").ok().as_deref(), - incremental_stop_after, - ) -} - -/// Pure variant of [`parse_crawler_mode`] — testable without env-var -/// mutation. -fn parse_crawler_mode_str( - raw: Option<&str>, - incremental_stop_after: usize, -) -> anyhow::Result { - match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() { - None | Some("") | Some("backfill") => Ok(DiscoverMode::Backfill), - Some("incremental") => Ok(DiscoverMode::Incremental { - stop_after_unchanged: incremental_stop_after, - }), - Some("auto") => Err(anyhow!( - "CRAWLER_MODE=auto isn't supported by the CLI (use backfill or incremental); \ - the daemon does auto-detection" - )), - Some(other) => Err(anyhow!( - "CRAWLER_MODE must be one of: backfill, incremental (got {other:?})" - )), - } -} - fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -480,55 +441,3 @@ fn env_bool(name: &str, default: bool) -> bool { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn cli_mode_defaults_to_backfill_when_unset_or_blank() { - let none = parse_crawler_mode_str(None, 20).unwrap(); - assert!(matches!(none, DiscoverMode::Backfill)); - let blank = parse_crawler_mode_str(Some(""), 20).unwrap(); - assert!(matches!(blank, DiscoverMode::Backfill)); - } - - #[test] - fn cli_mode_recognizes_backfill_and_incremental() { - let backfill = parse_crawler_mode_str(Some("backfill"), 20).unwrap(); - assert!(matches!(backfill, DiscoverMode::Backfill)); - - let incremental = parse_crawler_mode_str(Some("incremental"), 9).unwrap(); - assert!(matches!( - incremental, - DiscoverMode::Incremental { stop_after_unchanged: 9 } - )); - } - - #[test] - fn cli_mode_rejects_auto_explicitly() { - let err = parse_crawler_mode_str(Some("auto"), 20).unwrap_err(); - let msg = format!("{err}"); - assert!( - msg.contains("daemon"), - "rejection should point operator at the daemon: {msg}" - ); - } - - #[test] - fn cli_mode_rejects_unknown_value() { - let err = parse_crawler_mode_str(Some("garbage"), 20).unwrap_err(); - let msg = format!("{err}"); - assert!(msg.contains("backfill")); - assert!(msg.contains("incremental")); - } - - #[test] - fn cli_mode_is_case_insensitive_and_trims() { - let mixed = parse_crawler_mode_str(Some(" Incremental "), 4).unwrap(); - assert!(matches!( - mixed, - DiscoverMode::Incremental { stop_after_unchanged: 4 } - )); - } -} - diff --git a/backend/src/config.rs b/backend/src/config.rs index 1b11d03..28438d1 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -6,16 +6,6 @@ use chrono_tz::Tz; use crate::crawler::browser::LaunchOptions; use crate::crawler::safety::{DownloadAllowlist, DEFAULT_MAX_IMAGE_BYTES}; -use crate::crawler::source::DiscoverMode; - -/// What `CRAWLER_MODE` was set to. `Auto` is the daemon's default — -/// pick Backfill until `seed_completed_at` is written, then flip to -/// Incremental. `Explicit` forces a single mode regardless. -#[derive(Clone, Copy, Debug)] -pub enum CrawlerModePref { - Auto, - Explicit(DiscoverMode), -} #[derive(Clone, Debug)] pub struct AuthConfig { @@ -94,12 +84,6 @@ pub struct CrawlerConfig { pub user_agent: Option, pub proxy: Option, pub browser: LaunchOptions, - /// Mode preference for the metadata pass. Daemon default is `Auto` - /// (Backfill until `seed_completed_at` is written, then Incremental). - pub mode: CrawlerModePref, - /// `stop_after_unchanged` threshold supplied to Incremental in both - /// `Auto` (post-seed) and `Explicit(Incremental)` modes. - pub incremental_stop_after: usize, /// Hosts the crawler is allowed to download images / covers from. /// Always seeded with the host of `start_url` and (when set) the /// configured `cdn_host`. Additional hosts can be added via @@ -127,8 +111,6 @@ impl Default for CrawlerConfig { user_agent: None, proxy: None, browser: LaunchOptions::headless(), - mode: CrawlerModePref::Auto, - incremental_stop_after: 20, download_allowlist: DownloadAllowlist::new(), max_image_bytes: DEFAULT_MAX_IMAGE_BYTES, } @@ -195,9 +177,6 @@ impl CrawlerConfig { .parse() .map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?, }; - let incremental_stop_after = - env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize; - let mode = parse_mode_env(incremental_stop_after)?; let start_url = std::env::var("CRAWLER_START_URL") .ok() .filter(|s| !s.trim().is_empty()); @@ -230,8 +209,6 @@ impl CrawlerConfig { .ok() .filter(|s| !s.trim().is_empty()), browser: LaunchOptions::from_env(), - mode, - incremental_stop_after, download_allowlist, max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES), }) @@ -271,32 +248,6 @@ fn build_download_allowlist( allow } -/// Parse `CRAWLER_MODE`. Empty/unset → `Auto`. Recognized values are -/// `auto`, `backfill`, and `incremental` (case-insensitive). Anything -/// else is a hard error so a typo can't silently fall through to the -/// default and mask itself. -fn parse_mode_env(incremental_stop_after: usize) -> anyhow::Result { - parse_mode_str(std::env::var("CRAWLER_MODE").ok().as_deref(), incremental_stop_after) -} - -/// Pure variant of [`parse_mode_env`] — testable without env-var -/// mutation. Takes the raw value (or `None` if unset). -pub(crate) fn parse_mode_str( - raw: Option<&str>, - incremental_stop_after: usize, -) -> anyhow::Result { - match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() { - None | Some("") | Some("auto") => Ok(CrawlerModePref::Auto), - Some("backfill") => Ok(CrawlerModePref::Explicit(DiscoverMode::Backfill)), - Some("incremental") => Ok(CrawlerModePref::Explicit(DiscoverMode::Incremental { - stop_after_unchanged: incremental_stop_after, - })), - Some(other) => Err(anyhow::anyhow!( - "CRAWLER_MODE must be one of: auto, backfill, incremental (got {other:?})" - )), - } -} - fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -326,62 +277,3 @@ fn env_usize(name: &str, default: usize) -> usize { .unwrap_or(default) } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_mode_str_defaults_to_auto_when_unset_or_blank() { - let none = parse_mode_str(None, 20).unwrap(); - assert!(matches!(none, CrawlerModePref::Auto)); - let blank = parse_mode_str(Some(""), 20).unwrap(); - assert!(matches!(blank, CrawlerModePref::Auto)); - let whitespace = parse_mode_str(Some(" "), 20).unwrap(); - assert!(matches!(whitespace, CrawlerModePref::Auto)); - } - - #[test] - fn parse_mode_str_recognizes_each_keyword() { - let auto = parse_mode_str(Some("auto"), 20).unwrap(); - assert!(matches!(auto, CrawlerModePref::Auto)); - - let backfill = parse_mode_str(Some("backfill"), 20).unwrap(); - assert!(matches!( - backfill, - CrawlerModePref::Explicit(DiscoverMode::Backfill) - )); - - let incremental = parse_mode_str(Some("incremental"), 7).unwrap(); - assert!(matches!( - incremental, - CrawlerModePref::Explicit(DiscoverMode::Incremental { - stop_after_unchanged: 7 - }) - )); - } - - #[test] - fn parse_mode_str_is_case_insensitive_and_trims_whitespace() { - let mixed = parse_mode_str(Some(" Incremental "), 5).unwrap(); - assert!(matches!( - mixed, - CrawlerModePref::Explicit(DiscoverMode::Incremental { - stop_after_unchanged: 5 - }) - )); - let upper = parse_mode_str(Some("BACKFILL"), 5).unwrap(); - assert!(matches!( - upper, - CrawlerModePref::Explicit(DiscoverMode::Backfill) - )); - } - - #[test] - fn parse_mode_str_hard_errors_on_unknown_value() { - let err = parse_mode_str(Some("backfil"), 20).unwrap_err(); - let msg = format!("{err}"); - assert!(msg.contains("backfill"), "error should list valid values: {msg}"); - assert!(msg.contains("auto")); - assert!(msg.contains("incremental")); - } -} diff --git a/backend/src/crawler/jobs.rs b/backend/src/crawler/jobs.rs index 97a1452..0d84931 100644 --- a/backend/src/crawler/jobs.rs +++ b/backend/src/crawler/jobs.rs @@ -1,4 +1,4 @@ -//! Persistent job queue and the four job kinds. +//! Persistent job queue and its job kinds. //! //! Backed by Postgres (the `crawler_jobs` table). Workers lease rows //! with `SELECT ... FOR UPDATE SKIP LOCKED`, heartbeat via @@ -12,16 +12,9 @@ use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; -use super::source::DiscoverMode; - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum JobPayload { - /// Walk the source index and enqueue `SyncManga` jobs. - Discover { - source_id: String, - mode: DiscoverMode, - }, /// Fetch one manga's detail page, upsert metadata, enqueue /// `SyncChapterList`. SyncManga { diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index 8367351..3ed2d0c 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -13,8 +13,9 @@ use crate::crawler::jobs::{self, EnqueueResult, JobPayload}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist}; use crate::crawler::source::target::TargetSource; -use crate::crawler::source::{DiscoverMode, FetchContext, Source}; +use crate::crawler::source::{FetchContext, Source}; use crate::repo; +use crate::repo::crawler::UpsertStatus; use crate::storage::Storage; /// Coarse counters surfaced for logging at the end of a metadata pass. @@ -26,16 +27,42 @@ pub struct MetadataStats { pub mangas_failed: usize, } -/// Decide whether the per-ref loop should stop based on the Incremental -/// streak counter. Pulled out as a pure function so the rule is unit- -/// testable without standing up the walker or DB. -pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool { - match mode { - DiscoverMode::Backfill => false, - DiscoverMode::Incremental { stop_after_unchanged } => { - consecutive_unchanged >= stop_after_unchanged - } - } +/// Decide whether the per-ref loop should stop on the manga just +/// processed. The walk halts only when (a) the previous run exited +/// cleanly — so the index tail is known to be caught up and we're not +/// in a recovery sweep — AND (b) this manga's metadata hash matched +/// storage (`Unchanged`) AND (c) the chapter sync confirmed zero new +/// chapters. A `None` chapter count (skip_chapters, or a chapter-sync +/// error we logged-and-swallowed) refuses the stop because we can't +/// verify the tail is unchanged from a single piece of evidence. +/// +/// Pure function so the rule is unit-testable without the walker, DB, +/// or browser. +pub(crate) fn should_stop( + was_clean: bool, + status: UpsertStatus, + chapters_new: Option, +) -> bool { + was_clean + && matches!(status, UpsertStatus::Unchanged) + && chapters_new == Some(0) +} + +/// Whether the just-finished walk should be recorded as a clean exit. +/// `true` writes the recovery flag back to `completed: true`; `false` +/// leaves it `false` so the next tick treats this run as crashed and +/// does a recovery sweep. +/// +/// `hit_limit` (the caller-imposed `CRAWLER_LIMIT` cap) is *not* an +/// argument: a limit cap by definition does not reach the catalog tail, +/// so it can never count as a clean exit. Encoding that in the type +/// (rather than as an `&& !hit_limit` clause inline) prevents a future +/// edit from accidentally adding it back to the truth table. +pub(crate) fn should_mark_clean_exit( + walked_to_completion: bool, + hit_stop_condition: bool, +) -> bool { + walked_to_completion || hit_stop_condition } /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline @@ -45,15 +72,25 @@ pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> b /// `limit == 0` means no cap (full sweep up to the source's own bound). /// `skip_chapters == true` is the "metadata-only" mode (parser doesn't /// extract chapters, and `sync_manga_chapters` is skipped — otherwise an -/// empty chapter list would soft-drop existing rows). +/// empty chapter list would soft-drop existing rows). In this mode the +/// stop condition never fires because chapter freshness can't be +/// confirmed, so the walk always runs to end-of-source. /// -/// `mode` controls the walk: -/// - `Backfill` — oldest-first, no early exit. The only mode that runs -/// the end-of-walk drop pass + writes `seed_completed_at`. -/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out -/// after N consecutive Unchanged upserts. Drop pass is skipped (the -/// tail of the index is never visited, so its `last_seen_at` is -/// stale and using it to soft-drop would be unsafe). +/// The walk is always newest-first. Steady-state runs stop on the first +/// manga where metadata is `Unchanged` AND chapter sync reports zero +/// new chapters — the source orders by `update_date DESC`, so anything +/// with a fresh chapter or fresh metadata is bumped to the top and will +/// be processed before we hit a fully-caught-up manga. +/// +/// A per-source recovery flag stored in `crawler_state` +/// (`last_run_completed:`) gates the early stop: it's set to +/// `false` right after `ensure_source` and back to `true` only when the +/// run exits via end-of-walk OR the intentional stop. A crash, panic, +/// or SIGKILL leaves the flag at `false`, so the next tick reads it, +/// recognizes the previous run did not exit cleanly, and walks the +/// full catalog (ignoring the stop condition) to re-cover anything the +/// crashed run missed past its crash point. Once that recovery sweep +/// reaches end-of-walk, steady-state resumes. #[allow(clippy::too_many_arguments)] pub async fn run_metadata_pass( browser_manager: &BrowserManager, @@ -64,7 +101,6 @@ pub async fn run_metadata_pass( start_url: &str, limit: usize, skip_chapters: bool, - mode: DiscoverMode, allowlist: &DownloadAllowlist, max_image_bytes: usize, ) -> anyhow::Result { @@ -97,28 +133,36 @@ pub async fn run_metadata_pass( .await .context("ensure_source")?; - let run_started_at = chrono::Utc::now(); + // Read BEFORE flipping to "in-flight" — a `false` here means the + // previous run didn't reach a clean exit, and this run must walk + // the full catalog (recovery sweep) instead of bailing on the + // first caught-up manga. + let was_clean = repo::crawler::last_run_completed_cleanly(db, source_id) + .await + .context("read last_run_completed_cleanly")?; + repo::crawler::mark_run_started(db, source_id) + .await + .context("mark_run_started")?; + let max_refs = (limit > 0).then_some(limit); - tracing::info!(?mode, ?max_refs, "starting metadata pass"); + tracing::info!(was_clean, ?max_refs, "starting metadata pass"); let mut walker = source - .discover(&ctx, mode) + .discover(&ctx) .await .context("discover failed")?; let mut stats = MetadataStats::default(); // Run-scoped dedup of `source_manga_key`s already processed this pass. - // Backfill: the walker may append displaced refs that also appear on - // the page we're about to visit naturally; skipping the dup avoids - // redundant fetch_manga + upsert. Incremental: a shift causes the - // slot-last item of the page we just read to reappear at slot 0 of - // the next page; skipping it preserves the consecutive_unchanged - // streak math instead of inflating it with a re-confirm. + // A shift in the source index causes the slot-last item of the page + // we just read to reappear at slot 0 of the next page; skipping it + // here prevents redundant fetch_manga + upsert and avoids spuriously + // tripping the stop condition with a re-confirm of an entry we + // already counted. let mut seen: HashSet = HashSet::new(); - let mut consecutive_unchanged: usize = 0; let mut walked_to_completion = false; let mut hit_limit = false; - let mut hit_incremental_stop = false; + let mut hit_stop_condition = false; 'outer: loop { let batch = match walker.next_batch(&ctx).await? { @@ -137,13 +181,13 @@ pub async fn run_metadata_pass( // Skip refs we've already *successfully* processed this pass. // Checking `contains` here (rather than `insert`) keeps the key // out of `seen` on failure paths below, so a transient fetch or - // upsert error gets a second chance if the ref reappears via the - // backfill boundary re-check or another batch. Done *before* - // counting toward `stats.discovered` (the skipped ref did no - // work) and *before* touching `consecutive_unchanged` (a - // `continue` here preserves the streak rather than resetting or - // inflating it). The matching `seen.insert(...)` lives just - // after the successful upsert below. + // upsert error gets a second chance if the ref reappears in + // another batch. Done *before* counting toward + // `stats.discovered` (the skipped ref did no work) and *before* + // touching the stop check (a `continue` here doesn't let a + // re-confirm trip the stop condition). The matching + // `seen.insert(...)` lives just after the successful upsert + // below. if seen.contains(&r.source_manga_key) { tracing::debug!( key = %r.source_manga_key, @@ -230,7 +274,13 @@ pub async fn run_metadata_pass( } } - if !skip_chapters { + // Chapter sync. `chapters_new` feeds the stop check below: + // `None` (skip_chapters mode, or a logged-and-swallowed sync + // error) refuses to stop on this manga because we can't + // confirm "no new chapters." + let chapters_new: Option = if skip_chapters { + None + } else { match repo::crawler::sync_manga_chapters( db, source_id, @@ -239,79 +289,64 @@ pub async fn run_metadata_pass( ) .await { - Ok(diff) => tracing::info!( - manga_id = %upsert.manga_id, - new = diff.new, - refreshed = diff.refreshed, - dropped = diff.dropped, - "chapters synced" - ), - Err(e) => tracing::warn!( - manga_id = %upsert.manga_id, - error = ?e, - "chapter sync failed" - ), + Ok(diff) => { + tracing::info!( + manga_id = %upsert.manga_id, + new = diff.new, + refreshed = diff.refreshed, + dropped = diff.dropped, + "chapters synced" + ); + Some(diff.new) + } + Err(e) => { + tracing::warn!( + manga_id = %upsert.manga_id, + error = ?e, + "chapter sync failed" + ); + None + } } - } + }; - // Incremental stop: count consecutive Unchanged upserts and - // bail once the threshold is reached. New/Updated resets the - // streak so a fresh entry mid-batch doesn't accidentally trip - // the cutoff. - match upsert.status { - repo::crawler::UpsertStatus::Unchanged => { - consecutive_unchanged += 1; - } - repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => { - consecutive_unchanged = 0; - } - } - if should_stop(mode, consecutive_unchanged) { - hit_incremental_stop = true; + if should_stop(was_clean, upsert.status, chapters_new) { + hit_stop_condition = true; tracing::info!( - consecutive_unchanged, - "incremental stop threshold reached; halting walk" + key = %manga.source_manga_key, + "stop condition met (Unchanged metadata + 0 new chapters); halting walk" ); break 'outer; } } } - // Drop pass: only when the walk truly covered everything the source - // surfaces. `last_seen_at` on un-visited rows is stale, so running - // the drop on a partial walk would soft-drop the tail of the index. - let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop; - let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill); - if full_walk { - match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { - Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), - Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), - } - } else { - tracing::info!( - ?mode, - hit_limit, - hit_incremental_stop, - "partial sync — skipping drop pass" - ); - } - if backfill_complete { - if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await { - tracing::warn!(error = ?e, "mark_seed_completed failed"); - } else { - tracing::info!(source_id, "seed marked complete"); + // Recovery-flag write. Only on a clean exit (end-of-walk OR the + // intentional stop). `hit_limit` is a caller-imposed early break + // and does NOT count — the catalog tail wasn't reached, so a future + // tick still needs to walk past where we stopped. The truth table is + // pinned by `should_mark_clean_exit` so a future edit that adds + // `hit_limit` back into the disjunction trips its unit test. Flag- + // write errors are warned and swallowed: the run already did its + // work, and a stale `false` flag just buys a recovery sweep on the + // next tick. + let exited_cleanly = should_mark_clean_exit(walked_to_completion, hit_stop_condition); + if exited_cleanly { + if let Err(e) = repo::crawler::mark_run_completed(db, source_id).await { + tracing::warn!(error = ?e, "mark_run_completed failed"); } } tracing::info!( - ?mode, + was_clean, discovered = stats.discovered, upserted = stats.upserted, covers_fetched = stats.covers_fetched, mangas_failed = stats.mangas_failed, walked_to_completion, hit_limit, - hit_incremental_stop, + hit_stop_condition, + exited_cleanly, "metadata pass complete" ); @@ -508,31 +543,79 @@ mod tests { use super::*; #[test] - fn backfill_never_stops_regardless_of_streak() { - assert!(!should_stop(DiscoverMode::Backfill, 0)); - assert!(!should_stop(DiscoverMode::Backfill, 100)); - assert!(!should_stop(DiscoverMode::Backfill, usize::MAX)); + fn stop_condition_fires_on_unchanged_metadata_and_zero_new_chapters() { + // The whole point of the rule: in steady state, a manga whose + // metadata hash matches AND whose chapter list gained no new + // entries proves we've reached the caught-up tail of a + // newest-first index. + assert!(should_stop(true, UpsertStatus::Unchanged, Some(0))); } #[test] - fn incremental_stops_when_streak_meets_threshold() { - let mode = DiscoverMode::Incremental { - stop_after_unchanged: 3, - }; - assert!(!should_stop(mode, 0)); - assert!(!should_stop(mode, 2)); - assert!(should_stop(mode, 3), "stops at exactly the threshold"); - assert!(should_stop(mode, 100), "stops at anything past threshold"); + fn stop_condition_refuses_when_chapters_added() { + // Unchanged metadata + N new chapters means the source bumped + // this manga because of the chapter add; the rest of the index + // is still ahead of us. Don't bail. + assert!(!should_stop(true, UpsertStatus::Unchanged, Some(1))); + assert!(!should_stop(true, UpsertStatus::Unchanged, Some(42))); } #[test] - fn incremental_with_zero_threshold_stops_immediately() { - // A nonsensical config (no Unchanged needed to stop) shouldn't - // panic — it just means the very first ref triggers the bail. - let mode = DiscoverMode::Incremental { - stop_after_unchanged: 0, - }; - assert!(should_stop(mode, 0)); + fn stop_condition_refuses_when_metadata_changed() { + // Updated or New metadata always continues — even with zero new + // chapters — because the change-of-metadata bump itself is what + // the walk is following. + assert!(!should_stop(true, UpsertStatus::Updated, Some(0))); + assert!(!should_stop(true, UpsertStatus::New, Some(0))); + } + + #[test] + fn stop_condition_refuses_when_chapter_count_unknown() { + // skip_chapters mode (CLI metadata-only sweep) or a + // logged-and-swallowed chapter sync error: we can't claim "no + // new chapters" from absence of evidence, so don't stop. The + // operator who runs metadata-only intentionally wants a full + // walk anyway. + assert!(!should_stop(true, UpsertStatus::Unchanged, None)); + } + + #[test] + fn stop_condition_disabled_in_recovery_mode() { + // was_clean = false means the previous run did not exit cleanly; + // the catalog past its crash point is potentially un-synced. Walk + // to end-of-source no matter what individual mangas report. + assert!(!should_stop(false, UpsertStatus::Unchanged, Some(0))); + assert!(!should_stop(false, UpsertStatus::Unchanged, Some(1))); + assert!(!should_stop(false, UpsertStatus::Updated, Some(0))); + assert!(!should_stop(false, UpsertStatus::New, None)); + } + + #[test] + fn clean_exit_when_walked_to_completion() { + // End-of-walk reached the catalog tail — the recovery flag may + // safely flip back to `true`. + assert!(should_mark_clean_exit(true, false)); + } + + #[test] + fn clean_exit_when_stop_condition_fired() { + // First Unchanged + 0-new-chapter manga is a complete steady- + // state exit: every manga newer than this point was synced, and + // by source-side `update_date DESC` ordering everything past + // this point is at least as caught-up. + assert!(should_mark_clean_exit(false, true)); + } + + #[test] + fn dirty_exit_when_neither_completion_nor_stop_fired() { + // The walk ended for some other reason — including the + // caller-imposed `hit_limit` cap, which is the regression case + // this test exists for. `should_mark_clean_exit` does not take + // `hit_limit` as a parameter, so a future edit that adds + // `|| hit_limit` to the inline expression in `run_metadata_pass` + // would need to also touch this helper, and would fail this + // assertion when it did. + assert!(!should_mark_clean_exit(false, false)); } #[test] diff --git a/backend/src/crawler/source.rs b/backend/src/crawler/source.rs index b11c52c..edbbf3a 100644 --- a/backend/src/crawler/source.rs +++ b/backend/src/crawler/source.rs @@ -8,19 +8,6 @@ pub mod target; use async_trait::async_trait; use chromiumoxide::browser::Browser; -use serde::{Deserialize, Serialize}; - -/// How a `discover` job should walk the source's index. -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub enum DiscoverMode { - /// Walk every index page from last back to first. Used for the - /// initial seed of a source. - Backfill, - /// Walk index pages from page 1 forward, stopping after - /// `stop_after_unchanged` consecutive mangas whose `metadata_hash` - /// matches storage. Used for the recurring cron tick. - Incremental { stop_after_unchanged: usize }, -} /// Pointer at a manga in the source's index, before we've fetched the /// detail page. The `source_manga_key` is whatever stable id the source @@ -83,14 +70,14 @@ pub struct FetchContext<'a> { } /// Lazy iterator over discovered manga refs. The caller drives the -/// walk one batch at a time, so it can break out as soon as a -/// downstream stop condition is met (e.g. N consecutive Unchanged -/// upserts in Incremental mode) without paying for pages it won't use. +/// walk one batch at a time, so it can break out as soon as the +/// downstream stop condition is met (the first manga where metadata is +/// `Unchanged` and chapter sync reports zero new chapters) without +/// paying for pages it won't use. /// /// Batches are typically one source-index page each. Within a batch -/// refs are already in the right per-page order for the active mode -/// (Backfill reverses each page to oldest-first; Incremental leaves -/// the source's natural newest-first ordering). +/// refs are in the source's natural newest-first ordering — the same +/// `update_date DESC` sort that makes the stop condition meaningful. #[async_trait] pub trait DiscoverWalk: Send { /// Return the next batch of refs, or `Ok(None)` when the source has @@ -107,16 +94,14 @@ pub trait Source: Send + Sync { /// Stable identifier — also the row key in the `sources` table. fn id(&self) -> &'static str; - /// Begin discovery in `mode`. Returns a walker the caller drives - /// page-by-page via `next_batch`. The initial page-1 probe (used - /// to determine `last_page` and warm the cache for sites that - /// can't be paged without knowing the bound) happens inside this - /// call, so a fresh walker is ready to yield its first batch - /// without further setup. + /// Begin discovery. Returns a walker the caller drives page-by-page + /// via `next_batch`. The initial page-1 probe (used to determine + /// `last_page` and warm the cache for sites that can't be paged + /// without knowing the bound) happens inside this call, so a fresh + /// walker is ready to yield its first batch without further setup. async fn discover( &self, ctx: &FetchContext<'_>, - mode: DiscoverMode, ) -> anyhow::Result>; async fn fetch_manga( diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs index 9ce6643..a608530 100644 --- a/backend/src/crawler/source/target.rs +++ b/backend/src/crawler/source/target.rs @@ -15,22 +15,23 @@ use async_trait::async_trait; use sha2::{Digest, Sha256}; use super::{ - DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef, - SourceManga, SourceMangaRef, + DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga, + SourceMangaRef, }; use crate::crawler::detect::{ has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError, }; /// `sources.id` value for this Source impl. Exposed as a const so the -/// daemon can look up per-source state (e.g. `seed_completed_at`) -/// before constructing the Source itself. +/// daemon can look up per-source state (e.g. the recovery flag) before +/// constructing the Source itself. pub const SOURCE_ID: &str = "target"; /// In-loop retry budget for transient pages encountered during a single -/// `discover` walk. Bounded small because the job system itself retries -/// the whole `Discover` job on failure — these inline retries only need -/// to absorb a brief site hiccup mid-walk. +/// `discover` walk. Bounded small because the next cron tick will pick up +/// where this run left off via the recovery flag — these inline retries +/// only need to absorb a brief site hiccup mid-walk, not a sustained +/// outage. const PAGE_TRANSIENT_RETRY_ATTEMPTS: u32 = 3; const PAGE_TRANSIENT_RETRY_DELAY: Duration = Duration::from_secs(2); @@ -72,7 +73,6 @@ impl Source for TargetSource { async fn discover( &self, ctx: &FetchContext<'_>, - mode: DiscoverMode, ) -> anyhow::Result> { // Always visit page 1 first because that's the only way to // discover `last_page`. Retry it on transient — a broken first @@ -89,10 +89,8 @@ impl Source for TargetSource { parse_last_page(&doc) }; - let backfill = matches!(mode, DiscoverMode::Backfill); - let order = build_page_order(last_page, backfill); + let order = build_page_order(last_page); tracing::info!( - ?mode, last_page = ?last_page, page_count = order.len(), "walking pagination" @@ -100,10 +98,8 @@ impl Source for TargetSource { Ok(Box::new(TargetSourceWalker { base_url: self.base_url.clone(), - backfill, pages_remaining: order, first_page_html: Some(first_html), - prev: None, })) } @@ -139,16 +135,13 @@ impl Source for TargetSource { } /// Build the queue of page numbers `TargetSource::discover` will walk. -/// Backfill is oldest-first: pages `last..=1` (within each page the -/// walker reverses entries, since the source orders by update_date -/// DESC). Incremental is newest-first: pages `1..=last` in natural -/// order. If `last_page` is unknown (source surfaces no pagination) -/// only page 1 is visited. -fn build_page_order(last_page: Option, backfill: bool) -> VecDeque { - match (last_page, backfill) { - (None, _) => VecDeque::from([1]), - (Some(last), true) => (1..=last).rev().collect(), - (Some(last), false) => (1..=last).collect(), +/// The site orders by `update_date DESC`, so newest-first is just the +/// natural page order: `1..=last`. If `last_page` is unknown (source +/// surfaces no pagination) only page 1 is visited. +fn build_page_order(last_page: Option) -> VecDeque { + match last_page { + None => VecDeque::from([1]), + Some(last) => (1..=last).collect(), } } @@ -158,16 +151,8 @@ fn build_page_order(last_page: Option, backfill: bool) -> VecDeque { /// batch covering page 1 doesn't re-fetch. struct TargetSourceWalker { base_url: String, - backfill: bool, pages_remaining: VecDeque, first_page_html: Option, - /// Page number and slot-0 `source_manga_key` of the previously-walked - /// page. Updated after every batch (cheap, unconditional) but only - /// *read* by the boundary re-check, which itself runs only in backfill - /// mode. A single `Option` so the half-set state (`page_num` known but - /// `key` not, or vice versa) is unrepresentable; `None` here suppresses - /// the next iteration's re-check (no anchor to compare against). - prev: Option<(i32, String)>, } #[async_trait] @@ -179,7 +164,7 @@ impl DiscoverWalk for TargetSourceWalker { let Some(page_num) = self.pages_remaining.pop_front() else { return Ok(None); }; - let mut page_refs = if page_num == 1 { + let page_refs = if page_num == 1 { // Reuse the cached page-1 HTML from the initial probe. Take // it (rather than clone) so a malformed page-order queue // that re-visits page 1 still falls back to a real fetch. @@ -214,77 +199,7 @@ impl DiscoverWalk for TargetSourceWalker { ) .await? }; - // Capture slot-0 of the page as the site presents it (newest first) - // *before* the backfill `.reverse()` below — after reversal slot 0 - // is the oldest entry, which would defeat the next iteration's - // boundary re-check. - let current_first = page_refs.first().map(|r| r.source_manga_key.clone()); - - // Boundary re-check (backfill only). The site orders by update_date - // DESC, so shifts during the walk push items from low-numbered pages - // to high-numbered ones — into pages backfill has already finished. - // After fetching this page, re-fetch the *previous* iteration's page - // and look for items that slid past us mid-walk. Must be the last - // navigation of the iteration to close the within-iteration race. - let mut displaced: Vec = Vec::new(); - if self.backfill { - if let Some((prev_page_num, prev_first_key)) = self.prev.clone() { - match recheck_prev_page(ctx, &self.base_url, prev_page_num).await { - Ok(refetched) => { - let (d, outcome) = detect_displaced(&prev_first_key, &refetched); - match outcome { - DisplacementOutcome::NoShift => {} - DisplacementOutcome::Shifted(k) => { - tracing::info!( - page_num, - prev_page_num, - k, - "boundary re-check: shift detected, recovering displaced refs" - ); - } - DisplacementOutcome::NotFoundFallback => { - tracing::warn!( - page_num, - prev_page_num, - prev_first_key = %prev_first_key, - refetched_len = refetched.len(), - "boundary re-check: prev_first not found, falling back to full re-process" - ); - } - } - displaced = d; - } - Err(e) => { - tracing::warn!( - page_num, - prev_page_num, - error = ?e, - "boundary re-check: re-fetch failed, skipping check for this boundary" - ); - } - } - } - } - - // Remember the boundary for the next iteration. An empty `page_refs` - // yields `None`, which intentionally suppresses the next re-check - // (no anchor to compare against). - self.prev = current_first.map(|key| (page_num, key)); - - if self.backfill { - page_refs.reverse(); - } - // Append displaced refs to the end of the batch. Order doesn't - // affect backfill semantics (no `consecutive_unchanged` streak in - // this mode), and the pipeline-level dedup set handles any overlap. - let displaced_count = displaced.len(); - page_refs.extend(displaced); - tracing::info!( - page_num, - count = page_refs.len(), - displaced = displaced_count, - "page walked" - ); + tracing::info!(page_num, count = page_refs.len(), "page walked"); Ok(Some(page_refs)) } } @@ -689,67 +604,6 @@ fn compute_metadata_hash(m: &SourceManga) -> String { format!("{:x}", h.finalize()) } -/// Outcome of a boundary re-check, surfaced for telemetry + tests. The -/// caller's recovery action is determined by the returned `Vec`; this enum -/// only labels which branch fired so logging and assertions can distinguish -/// "site is stable" from "we papered over a shift" from "we fell back". -#[derive(Debug, PartialEq, Eq)] -enum DisplacementOutcome { - /// `prev_first` is still at slot 0 of the re-fetched page — no shift - /// happened between the prior iteration and this one's re-check. - NoShift, - /// `prev_first` slid down to slot `K`; the first `K` entries are items - /// that used to live on the page we just walked. - Shifted(usize), - /// `prev_first` is gone from the re-fetched page — multiple pages-worth - /// of shifts happened, or it was bumped to page 1. Treat all refetched - /// entries as potentially displaced; the pipeline-level dedup absorbs - /// the noise. - NotFoundFallback, -} - -/// Compare a previously-walked page's slot-0 key against a fresh fetch of -/// that page. Returns the entries that appear *ahead* of `prev_first` in -/// the re-fetched page — items that slid in from the page the caller is -/// currently processing. -fn detect_displaced( - prev_first: &str, - refetched: &[SourceMangaRef], -) -> (Vec, DisplacementOutcome) { - let Some(k) = refetched - .iter() - .position(|r| r.source_manga_key == prev_first) - else { - return (refetched.to_vec(), DisplacementOutcome::NotFoundFallback); - }; - if k == 0 { - (Vec::new(), DisplacementOutcome::NoShift) - } else { - (refetched[..k].to_vec(), DisplacementOutcome::Shifted(k)) - } -} - -/// Re-fetch a previously-walked listing page to feed [`detect_displaced`]. -/// Uses the same retry chain as the primary page fetch in `next_batch` so -/// a transient hiccup doesn't tank an entire backfill walk. -async fn recheck_prev_page( - ctx: &FetchContext<'_>, - base_url: &str, - page_num: i32, -) -> Result, PageError> { - retry_on_transient( - || async { - let url = page_url(base_url, page_num); - let html = navigate(ctx, &url).await?; - let doc = scraper::Html::parse_document(&html); - parse_manga_list_from(&doc) - }, - PAGE_TRANSIENT_RETRY_ATTEMPTS, - PAGE_TRANSIENT_RETRY_DELAY, - ) - .await -} - #[cfg(test)] mod tests { use super::*; @@ -1125,132 +979,25 @@ mod tests { } #[test] - fn build_page_order_backfill_is_last_to_one() { - // Backfill walks pages oldest-first: queue is [last, last-1, ..., 1] - // so popping from the front yields the last page first. - let order = build_page_order(Some(3), true); - assert_eq!(Vec::from(order), vec![3, 2, 1]); - } - - #[test] - fn build_page_order_incremental_is_one_to_last() { - // Incremental walks newest-first in natural source order. - let order = build_page_order(Some(3), false); + fn build_page_order_is_natural_one_to_last() { + // Newest-first is just the source's natural pagination order: + // (update_date DESC) lives at page 1, oldest at the last page. + let order = build_page_order(Some(3)); assert_eq!(Vec::from(order), vec![1, 2, 3]); } #[test] fn build_page_order_falls_back_to_page_one_only_without_pagination() { - let backfill = build_page_order(None, true); - assert_eq!(Vec::from(backfill), vec![1]); - let incremental = build_page_order(None, false); - assert_eq!(Vec::from(incremental), vec![1]); + // Source surfaced no pagination control — visit page 1 alone + // and let the walk end after one batch. + let order = build_page_order(None); + assert_eq!(Vec::from(order), vec![1]); } #[test] fn build_page_order_single_page_index_yields_one_entry() { - // Sources with exactly one page should not yield duplicates - // regardless of mode. - let backfill = build_page_order(Some(1), true); - assert_eq!(Vec::from(backfill), vec![1]); - let incremental = build_page_order(Some(1), false); - assert_eq!(Vec::from(incremental), vec![1]); - } - - fn make_ref(key: &str) -> SourceMangaRef { - SourceMangaRef { - source_manga_key: key.to_string(), - title: key.to_string(), - url: format!("https://target.example/manga/{key}"), - } - } - - #[test] - fn detect_displaced_no_shift_when_prev_first_still_at_slot_zero() { - let refetched = vec![make_ref("A"), make_ref("B"), make_ref("C")]; - let (displaced, outcome) = detect_displaced("A", &refetched); - assert!(displaced.is_empty()); - assert_eq!(outcome, DisplacementOutcome::NoShift); - } - - #[test] - fn detect_displaced_one_shift_returns_single_intruder() { - let refetched = vec![make_ref("X"), make_ref("A"), make_ref("B")]; - let (displaced, outcome) = detect_displaced("A", &refetched); - assert_eq!(displaced.len(), 1); - assert_eq!(displaced[0].source_manga_key, "X"); - assert_eq!(outcome, DisplacementOutcome::Shifted(1)); - } - - #[test] - fn detect_displaced_multi_shift_returns_all_intruders() { - let refetched = vec![ - make_ref("X1"), - make_ref("X2"), - make_ref("X3"), - make_ref("A"), - make_ref("B"), - make_ref("C"), - ]; - let (displaced, outcome) = detect_displaced("A", &refetched); - let keys: Vec<&str> = displaced - .iter() - .map(|r| r.source_manga_key.as_str()) - .collect(); - assert_eq!(keys, vec!["X1", "X2", "X3"]); - assert_eq!(outcome, DisplacementOutcome::Shifted(3)); - } - - #[test] - fn detect_displaced_full_page_shift_returns_all_but_last() { - // `prev_first` at the last slot — every preceding entry is an - // intruder shifted in from the page the caller is processing. - let mut refetched: Vec<_> = (0..9).map(|i| make_ref(&format!("X{i}"))).collect(); - refetched.push(make_ref("A")); - let (displaced, outcome) = detect_displaced("A", &refetched); - assert_eq!(displaced.len(), 9); - assert_eq!(outcome, DisplacementOutcome::Shifted(9)); - } - - #[test] - fn detect_displaced_not_found_returns_full_page_for_conservative_recovery() { - // > page-worth of shifts (or `prev_first` itself was bumped to - // page 1): can't pinpoint K, fall back to "process everything"; - // pipeline dedup absorbs the noise. - let refetched = vec![make_ref("Y"), make_ref("Z")]; - let (displaced, outcome) = detect_displaced("A", &refetched); - let keys: Vec<&str> = displaced - .iter() - .map(|r| r.source_manga_key.as_str()) - .collect(); - assert_eq!(keys, vec!["Y", "Z"]); - assert_eq!(outcome, DisplacementOutcome::NotFoundFallback); - } - - #[test] - fn detect_displaced_empty_page_returns_empty_with_fallback_outcome() { - // Re-fetch came back empty (transient mimicry or last-page tail). - // No anchor means we can't classify; fall back is the safe label. - let (displaced, outcome) = detect_displaced("A", &[]); - assert!(displaced.is_empty()); - assert_eq!(outcome, DisplacementOutcome::NotFoundFallback); - } - - #[test] - fn detect_displaced_takes_first_occurrence_when_key_repeats() { - // Defensive: if the source ever returns the same key twice on a - // page, anchoring on the first match keeps the displaced slice - // bounded and deterministic. - let refetched = vec![ - make_ref("X"), - make_ref("A"), - make_ref("Y"), - make_ref("A"), - ]; - let (displaced, outcome) = detect_displaced("A", &refetched); - assert_eq!(displaced.len(), 1); - assert_eq!(displaced[0].source_manga_key, "X"); - assert_eq!(outcome, DisplacementOutcome::Shifted(1)); + let order = build_page_order(Some(1)); + assert_eq!(Vec::from(order), vec![1]); } #[test] diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index d04def2..0cb7d2d 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -8,14 +8,16 @@ //! updated (metadata_hash changed), or unchanged. //! - [`sync_manga_chapters`]: per-manga chapter reconciliation. Adds //! new ones, refreshes URLs on existing ones, soft-drops vanished. -//! - [`mark_dropped_mangas`]: end-of-run pass. Any manga from this -//! source whose `last_seen_at` is older than the run start is -//! soft-dropped. +//! - [`mark_run_started`] / [`mark_run_completed`] / +//! [`last_run_completed_cleanly`]: per-source recovery flag in +//! `crawler_state`. A `false` flag on tick start means the previous +//! run did not exit cleanly and the next walk should ignore the +//! early-stop condition. //! //! Each public function is a transaction boundary so a partial failure //! mid-call leaves the DB in its pre-call state. -use chrono::{DateTime, Utc}; +use chrono::Utc; use sqlx::{PgPool, Postgres, Transaction}; use uuid::Uuid; @@ -456,19 +458,14 @@ pub async fn sync_manga_chapters( Ok(diff) } -/// Record that a complete Backfill walk has finished for `source_id`. -/// The presence of this row is what the daemon's mode auto-detection -/// uses to flip from Backfill to Incremental on subsequent ticks. -/// -/// Keyed `seed_completed:` in `crawler_state`. JSON payload -/// stores the timestamp so we can surface "last fully reseeded at" in -/// future ops tooling without another migration. -pub async fn mark_seed_completed( - pool: &PgPool, - source_id: &str, - at: DateTime, -) -> sqlx::Result<()> { - let key = format!("seed_completed:{source_id}"); +/// Mark a metadata pass as in-flight for `source_id`. Stamps +/// `last_run_completed:` in `crawler_state` with +/// `{"completed": false, "at": now}`. A crash, panic, or SIGKILL after +/// this point leaves the flag at `false`, which the next tick reads as +/// "previous run did not exit cleanly — walk the full catalog this +/// time" (recovery sweep). +pub async fn mark_run_started(pool: &PgPool, source_id: &str) -> sqlx::Result<()> { + let key = format!("last_run_completed:{source_id}"); sqlx::query( "INSERT INTO crawler_state (key, value, updated_at) \ VALUES ($1, $2, now()) \ @@ -476,50 +473,54 @@ pub async fn mark_seed_completed( SET value = EXCLUDED.value, updated_at = now()", ) .bind(&key) - .bind(serde_json::json!({ "at": at.to_rfc3339() })) + .bind(serde_json::json!({ + "completed": false, + "at": Utc::now().to_rfc3339(), + })) .execute(pool) .await?; Ok(()) } -/// Read the timestamp written by [`mark_seed_completed`], if any. -/// `None` means no complete Backfill has ever finished for this -/// source — the daemon should run Backfill on the next tick. -pub async fn seed_completed_at( +/// Mark a metadata pass as completed cleanly for `source_id`. Called +/// from the same place a run decides it reached end-of-walk or hit the +/// intentional stop. The next tick reads `true` and applies the normal +/// stop condition. +pub async fn mark_run_completed(pool: &PgPool, source_id: &str) -> sqlx::Result<()> { + let key = format!("last_run_completed:{source_id}"); + sqlx::query( + "INSERT INTO crawler_state (key, value, updated_at) \ + VALUES ($1, $2, now()) \ + ON CONFLICT (key) DO UPDATE \ + SET value = EXCLUDED.value, updated_at = now()", + ) + .bind(&key) + .bind(serde_json::json!({ + "completed": true, + "at": Utc::now().to_rfc3339(), + })) + .execute(pool) + .await?; + Ok(()) +} + +/// Read the recovery flag for `source_id`. A missing row OR an +/// unparseable value reads as `true` ("clean") — the former covers the +/// first-ever run on a virgin DB (no recovery needed), the latter +/// covers forward-compat against future schema changes; both fail-safe +/// toward not making an operator pay for an unnecessary full sweep. +pub async fn last_run_completed_cleanly( pool: &PgPool, source_id: &str, -) -> sqlx::Result>> { - let key = format!("seed_completed:{source_id}"); +) -> sqlx::Result { + let key = format!("last_run_completed:{source_id}"); let row: Option = sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1") .bind(&key) .fetch_optional(pool) .await?; - Ok(row.and_then(|v| { - v.get("at") - .and_then(|s| s.as_str()) - .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&Utc)) - })) + Ok(row + .and_then(|v| v.get("completed").and_then(|b| b.as_bool())) + .unwrap_or(true)) } -pub async fn mark_dropped_mangas( - pool: &PgPool, - source_id: &str, - run_started_at: DateTime, -) -> sqlx::Result { - let res = sqlx::query( - r#" - UPDATE manga_sources - SET dropped_at = NOW() - WHERE source_id = $1 - AND last_seen_at < $2 - AND dropped_at IS NULL - "#, - ) - .bind(source_id) - .bind(run_started_at) - .execute(pool) - .await?; - Ok(res.rows_affected()) -} diff --git a/backend/tests/crawler_incremental.rs b/backend/tests/crawler_incremental.rs deleted file mode 100644 index 65e0710..0000000 --- a/backend/tests/crawler_incremental.rs +++ /dev/null @@ -1,85 +0,0 @@ -//! Integration tests for the incremental-mode coordination state: -//! `mark_seed_completed` / `seed_completed_at` round-trip via the -//! `crawler_state` table. -//! -//! End-to-end pipeline behavior (walker + stop-on-Unchanged) requires -//! a real `chromiumoxide::Browser` to construct a `FetchContext`, so -//! the live integration of that path is covered by -//! `crawler_browser_smoke.rs` instead. The pure stop logic itself is -//! unit-tested in `crawler::pipeline::tests`. - -use chrono::Utc; -use mangalord::repo::crawler; -use sqlx::PgPool; - -#[sqlx::test(migrations = "./migrations")] -async fn seed_completed_at_none_before_any_run(pool: PgPool) { - crawler::ensure_source(&pool, "target", "T", "https://x.example") - .await - .unwrap(); - let res = crawler::seed_completed_at(&pool, "target").await.unwrap(); - assert!(res.is_none(), "fresh source has no seed marker"); -} - -#[sqlx::test(migrations = "./migrations")] -async fn mark_seed_completed_then_read_round_trips_timestamp(pool: PgPool) { - crawler::ensure_source(&pool, "target", "T", "https://x.example") - .await - .unwrap(); - let at = Utc::now(); - crawler::mark_seed_completed(&pool, "target", at) - .await - .unwrap(); - let read = crawler::seed_completed_at(&pool, "target") - .await - .unwrap() - .expect("marker present after mark"); - // RFC3339 round-trip is millisecond-precise on chrono::Utc; allow a - // 1ms tolerance to absorb postgres jsonb whitespace canonicalization. - let drift = (read - at).num_milliseconds().abs(); - assert!(drift <= 1, "round-trip drift: {drift}ms (at={at}, read={read})"); -} - -#[sqlx::test(migrations = "./migrations")] -async fn mark_seed_completed_overwrites_previous_value(pool: PgPool) { - crawler::ensure_source(&pool, "target", "T", "https://x.example") - .await - .unwrap(); - let first = Utc::now() - chrono::Duration::hours(1); - let second = Utc::now(); - crawler::mark_seed_completed(&pool, "target", first) - .await - .unwrap(); - crawler::mark_seed_completed(&pool, "target", second) - .await - .unwrap(); - let read = crawler::seed_completed_at(&pool, "target") - .await - .unwrap() - .expect("marker present"); - let drift = (read - second).num_milliseconds().abs(); - assert!(drift <= 1, "should reflect the latest mark, not the first"); -} - -#[sqlx::test(migrations = "./migrations")] -async fn seed_completed_is_per_source(pool: PgPool) { - // Two sources, only one is marked complete. The other must still - // report None — the key is namespaced by source_id. - crawler::ensure_source(&pool, "target", "T", "https://x.example") - .await - .unwrap(); - crawler::ensure_source(&pool, "other", "O", "https://y.example") - .await - .unwrap(); - crawler::mark_seed_completed(&pool, "target", Utc::now()) - .await - .unwrap(); - assert!(crawler::seed_completed_at(&pool, "target") - .await - .unwrap() - .is_some()); - assert!(crawler::seed_completed_at(&pool, "other") - .await - .unwrap() - .is_none()); -} diff --git a/backend/tests/crawler_jobs.rs b/backend/tests/crawler_jobs.rs index 02d4c37..5a2e77b 100644 --- a/backend/tests/crawler_jobs.rs +++ b/backend/tests/crawler_jobs.rs @@ -9,7 +9,6 @@ use std::time::Duration; use mangalord::crawler::jobs::{ self, EnqueueResult, JobPayload, KIND_SYNC_CHAPTER_CONTENT, }; -use mangalord::crawler::source::DiscoverMode; use sqlx::PgPool; use uuid::Uuid; @@ -21,10 +20,13 @@ fn chapter_content_payload(chapter_id: Uuid) -> JobPayload { } } -fn discover_payload() -> JobPayload { - JobPayload::Discover { +/// A non-`SyncChapterContent` payload, used to assert that only the +/// chapter-content kind is deduplicated by the partial index and that +/// `lease`'s kind filter correctly excludes other kinds. +fn sync_manga_payload(key: &str) -> JobPayload { + JobPayload::SyncManga { source_id: "target".into(), - mode: DiscoverMode::Backfill, + source_manga_key: key.into(), } } @@ -141,7 +143,7 @@ async fn different_chapter_ids_can_coexist(pool: PgPool) { #[sqlx::test(migrations = "./migrations")] async fn non_chapter_content_payloads_are_never_deduped(pool: PgPool) { - let p = discover_payload(); + let p = sync_manga_payload("foo"); assert!(matches!( jobs::enqueue(&pool, &p).await.unwrap(), EnqueueResult::Inserted(_) @@ -185,7 +187,10 @@ async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPo #[sqlx::test(migrations = "./migrations")] async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { - let discover_id = match jobs::enqueue(&pool, &discover_payload()).await.unwrap() { + let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo")) + .await + .unwrap() + { EnqueueResult::Inserted(id) => id, _ => unreachable!(), }; @@ -207,8 +212,8 @@ async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { .unwrap(); assert_eq!(leases.len(), 1, "only chapter content payload leases"); assert_eq!(leases[0].id, chapter_id); - // discover is still pending - assert_eq!(job_state(&pool, discover_id).await, "pending"); + // sync_manga is still pending + assert_eq!(job_state(&pool, manga_id).await, "pending"); } #[sqlx::test(migrations = "./migrations")] diff --git a/backend/tests/crawler_recovery_flag.rs b/backend/tests/crawler_recovery_flag.rs new file mode 100644 index 0000000..3d19765 --- /dev/null +++ b/backend/tests/crawler_recovery_flag.rs @@ -0,0 +1,82 @@ +//! Integration tests for the per-source recovery flag: +//! `mark_run_started` / `mark_run_completed` / `last_run_completed_cleanly` +//! round-trip via the `crawler_state` table. +//! +//! End-to-end pipeline behavior (a crashed run forcing a recovery sweep +//! on the next tick) requires a real `chromiumoxide::Browser` to drive +//! the walker, so that path is covered by `crawler_browser_smoke.rs`. +//! The pure stop-condition logic itself is unit-tested in +//! `crawler::pipeline::tests`. + +use mangalord::repo::crawler; +use sqlx::PgPool; + +#[sqlx::test(migrations = "./migrations")] +async fn defaults_to_clean_when_no_marker(pool: PgPool) { + // First-ever run semantics: absence of the key must NOT trigger a + // recovery walk on a virgin DB. Treat missing as "previous run + // completed cleanly" so the first tick can take the early-stop path. + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let clean = crawler::last_run_completed_cleanly(&pool, "target") + .await + .unwrap(); + assert!(clean, "absent marker must read as clean"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn mark_run_started_flips_to_false(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + crawler::mark_run_started(&pool, "target").await.unwrap(); + let clean = crawler::last_run_completed_cleanly(&pool, "target") + .await + .unwrap(); + assert!(!clean, "after mark_run_started, flag must read false"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn started_then_completed_round_trips_to_clean(pool: PgPool) { + // Steady-state: a run starts (flag → false) and exits cleanly + // (flag → true). The next tick should see "clean" and apply the + // normal stop condition. + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + crawler::mark_run_started(&pool, "target").await.unwrap(); + crawler::mark_run_completed(&pool, "target").await.unwrap(); + let clean = crawler::last_run_completed_cleanly(&pool, "target") + .await + .unwrap(); + assert!( + clean, + "after start → complete the flag must round-trip to clean" + ); +} + +#[sqlx::test(migrations = "./migrations")] +async fn flag_is_per_source(pool: PgPool) { + // Two sources, only one is mid-run. The other must still report + // clean — the crawler_state key is namespaced by source_id. + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + crawler::ensure_source(&pool, "other", "O", "https://y.example") + .await + .unwrap(); + crawler::mark_run_started(&pool, "target").await.unwrap(); + assert!( + !crawler::last_run_completed_cleanly(&pool, "target") + .await + .unwrap(), + "target is mid-run" + ); + assert!( + crawler::last_run_completed_cleanly(&pool, "other") + .await + .unwrap(), + "other source is untouched and reads clean" + ); +} diff --git a/backend/tests/crawler_sync.rs b/backend/tests/crawler_sync.rs index 0baee4f..2c2be30 100644 --- a/backend/tests/crawler_sync.rs +++ b/backend/tests/crawler_sync.rs @@ -528,62 +528,6 @@ async fn sync_chapters_serializes_concurrent_calls_for_same_manga(pool: PgPool) ); } -#[sqlx::test(migrations = "./migrations")] -async fn mark_dropped_mangas_only_drops_unseen(pool: PgPool) { - crawler::ensure_source(&pool, "target", "T", "https://x.example") - .await - .unwrap(); - // Seed two mangas before "now" so a later run_started_at sees them as stale. - let _ = crawler::upsert_manga_from_source( - &pool, - "target", - "https://x.example/foo", - &sample_manga("foo", "Foo", "hf"), - ) - .await - .unwrap(); - let _ = crawler::upsert_manga_from_source( - &pool, - "target", - "https://x.example/bar", - &sample_manga("bar", "Bar", "hb"), - ) - .await - .unwrap(); - - // Now mark a new "run" beginning. Re-upsert only `foo` — `bar` - // should be the one flagged dropped. - let run_started = chrono::Utc::now(); - // Sleep briefly so the second upsert's NOW() > run_started_at. - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - let _ = crawler::upsert_manga_from_source( - &pool, - "target", - "https://x.example/foo", - &sample_manga("foo", "Foo", "hf"), - ) - .await - .unwrap(); - - let n = crawler::mark_dropped_mangas(&pool, "target", run_started) - .await - .unwrap(); - assert_eq!(n, 1, "only bar should have been dropped"); - - let foo_dropped: (Option>,) = - sqlx::query_as("SELECT dropped_at FROM manga_sources WHERE source_manga_key = 'foo'") - .fetch_one(&pool) - .await - .unwrap(); - assert!(foo_dropped.0.is_none(), "foo seen this run, must not be dropped"); - let bar_dropped: (Option>,) = - sqlx::query_as("SELECT dropped_at FROM manga_sources WHERE source_manga_key = 'bar'") - .fetch_one(&pool) - .await - .unwrap(); - assert!(bar_dropped.0.is_some()); -} - #[sqlx::test(migrations = "./migrations")] async fn upsert_surfaces_cover_image_path_for_backfill_decisions(pool: PgPool) { crawler::ensure_source(&pool, "target", "T", "https://x.example") diff --git a/frontend/package.json b/frontend/package.json index 640b047..6db3d00 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.35.6", + "version": "0.36.0", "private": true, "type": "module", "scripts": {