//! Crawler pipeline — the reusable metadata pass and the enqueue helpers //! that fan out chapter-content work. Shared between the daemon (cron tick) //! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep. use std::collections::HashSet; use anyhow::Context; use sqlx::PgPool; use uuid::Uuid; use crate::crawler::browser_manager::BrowserManager; use crate::crawler::jobs::{self, EnqueueResult, JobPayload}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist}; use crate::crawler::source::target::TargetSource; use crate::crawler::source::{FetchContext, Source}; use crate::repo; use crate::repo::crawler::UpsertStatus; use crate::storage::Storage; /// Coarse counters surfaced for logging at the end of a metadata pass. #[derive(Debug, Default, Clone, Copy)] pub struct MetadataStats { pub discovered: usize, pub upserted: usize, pub covers_fetched: usize, pub mangas_failed: usize, } /// Decide whether the per-ref loop should stop on the manga just /// processed. The walk halts only when (a) the previous run exited /// cleanly — so the index tail is known to be caught up and we're not /// in a recovery sweep — AND (b) this manga's metadata hash matched /// storage (`Unchanged`) AND (c) the chapter sync confirmed zero new /// chapters. A `None` chapter count (skip_chapters, or a chapter-sync /// error we logged-and-swallowed) refuses the stop because we can't /// verify the tail is unchanged from a single piece of evidence. /// /// Pure function so the rule is unit-testable without the walker, DB, /// or browser. pub(crate) fn should_stop( was_clean: bool, status: UpsertStatus, chapters_new: Option, ) -> bool { was_clean && matches!(status, UpsertStatus::Unchanged) && chapters_new == Some(0) } /// Whether the just-finished walk should be recorded as a clean exit. /// `true` writes the recovery flag back to `completed: true`; `false` /// leaves it `false` so the next tick treats this run as crashed and /// does a recovery sweep. /// /// `hit_limit` (the caller-imposed `CRAWLER_LIMIT` cap) is *not* an /// argument: a limit cap by definition does not reach the catalog tail, /// so it can never count as a clean exit. Encoding that in the type /// (rather than as an `&& !hit_limit` clause inline) prevents a future /// edit from accidentally adding it back to the truth table. pub(crate) fn should_mark_clean_exit( walked_to_completion: bool, hit_stop_condition: bool, ) -> bool { walked_to_completion || hit_stop_condition } /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline /// for the target source. Pure metadata; chapter content is enqueued as /// separate `SyncChapterContent` jobs by the caller after this returns. /// /// `limit == 0` means no cap (full sweep up to the source's own bound). /// `skip_chapters == true` is the "metadata-only" mode (parser doesn't /// extract chapters, and `sync_manga_chapters` is skipped — otherwise an /// empty chapter list would soft-drop existing rows). In this mode the /// stop condition never fires because chapter freshness can't be /// confirmed, so the walk always runs to end-of-source. /// /// The walk is always newest-first. Steady-state runs stop on the first /// manga where metadata is `Unchanged` AND chapter sync reports zero /// new chapters — the source orders by `update_date DESC`, so anything /// with a fresh chapter or fresh metadata is bumped to the top and will /// be processed before we hit a fully-caught-up manga. /// /// A per-source recovery flag stored in `crawler_state` /// (`last_run_completed:`) gates the early stop: it's set to /// `false` right after `ensure_source` and back to `true` only when the /// run exits via end-of-walk OR the intentional stop. A crash, panic, /// or SIGKILL leaves the flag at `false`, so the next tick reads it, /// recognizes the previous run did not exit cleanly, and walks the /// full catalog (ignoring the stop condition) to re-cover anything the /// crashed run missed past its crash point. Once that recovery sweep /// reaches end-of-walk, steady-state resumes. #[allow(clippy::too_many_arguments)] pub async fn run_metadata_pass( browser_manager: &BrowserManager, db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, rate: &HostRateLimiters, start_url: &str, limit: usize, skip_chapters: bool, allowlist: &DownloadAllowlist, max_image_bytes: usize, ) -> anyhow::Result { let lease = browser_manager .acquire() .await .context("acquire browser lease for metadata pass")?; let browser_ref: &chromiumoxide::Browser = &lease; let source = { let s = TargetSource::new(start_url.to_string()); if skip_chapters { s.without_chapter_parsing() } else { s } }; let ctx = FetchContext { browser: browser_ref, rate, }; let source_id = source.id(); repo::crawler::ensure_source( db, source_id, "Target Site", &origin_of(start_url).unwrap_or_else(|| start_url.to_string()), ) .await .context("ensure_source")?; // Read BEFORE flipping to "in-flight" — a `false` here means the // previous run didn't reach a clean exit, and this run must walk // the full catalog (recovery sweep) instead of bailing on the // first caught-up manga. let was_clean = repo::crawler::last_run_completed_cleanly(db, source_id) .await .context("read last_run_completed_cleanly")?; repo::crawler::mark_run_started(db, source_id) .await .context("mark_run_started")?; let max_refs = (limit > 0).then_some(limit); tracing::info!(was_clean, ?max_refs, "starting metadata pass"); let mut walker = source .discover(&ctx) .await .context("discover failed")?; let mut stats = MetadataStats::default(); // Run-scoped dedup of `source_manga_key`s already processed this pass. // A shift in the source index causes the slot-last item of the page // we just read to reappear at slot 0 of the next page; skipping it // here prevents redundant fetch_manga + upsert and avoids spuriously // tripping the stop condition with a re-confirm of an entry we // already counted. let mut seen: HashSet = HashSet::new(); let mut walked_to_completion = false; let mut hit_limit = false; let mut hit_stop_condition = false; 'outer: loop { let batch = match walker.next_batch(&ctx).await? { Some(b) => b, None => { walked_to_completion = true; break; } }; for r in batch { if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) { hit_limit = true; tracing::info!(cap = ?max_refs, "max_results reached; halting walk"); break 'outer; } // Skip refs we've already *successfully* processed this pass. // Checking `contains` here (rather than `insert`) keeps the key // out of `seen` on failure paths below, so a transient fetch or // upsert error gets a second chance if the ref reappears in // another batch. Done *before* counting toward // `stats.discovered` (the skipped ref did no work) and *before* // touching the stop check (a `continue` here doesn't let a // re-confirm trip the stop condition). The matching // `seen.insert(...)` lives just after the successful upsert // below. if seen.contains(&r.source_manga_key) { tracing::debug!( key = %r.source_manga_key, "skip already-seen key in this run" ); continue; } stats.discovered += 1; tracing::info!( idx = stats.discovered, key = %r.source_manga_key, "fetching metadata" ); let manga = match source.fetch_manga(&ctx, &r).await { Ok(m) => m, Err(e) => { tracing::warn!( key = %r.source_manga_key, url = %r.url, error = ?e, "fetch_manga failed" ); stats.mangas_failed += 1; continue; } }; // Partial-render guard: an empty chapter list paired with a // prior count > 0 is overwhelmingly a chromium snapshot // taken between the #chapter_table wrapper render and its // rows render. The wait_for_selector wait in `navigate` // narrows this window but cannot close it for slow renders // beyond the selector budget. Treat as a transient failure // here — skip upsert, skip seen.insert — so the next batch // (or the next tick) retries. Skipped in `skip_chapters` // mode because the parser is configured to return an empty // Vec by design there. if !skip_chapters && manga.chapters.is_empty() { match repo::crawler::live_chapter_count_for_source_manga( db, source_id, &r.source_manga_key, ) .await { Ok(prior) if prior > 0 => { tracing::warn!( key = %r.source_manga_key, url = %r.url, prior_chapter_count = prior, "fetch_manga returned empty chapters but prior count > 0; treating as partial-render transient and skipping" ); stats.mangas_failed += 1; continue; } Ok(_) => {} Err(e) => { // DB lookup failed — fail safe: skip rather // than risk a soft-drop on a manga whose prior // count we couldn't confirm. tracing::warn!( key = %r.source_manga_key, error = ?e, "live_chapter_count_for_source_manga failed; skipping cautiously" ); stats.mangas_failed += 1; continue; } } } let upsert = match repo::crawler::upsert_manga_from_source( db, source_id, &r.url, &manga, ) .await { Ok(u) => u, Err(e) => { tracing::error!( key = %r.source_manga_key, error = ?e, "upsert_manga_from_source failed" ); stats.mangas_failed += 1; continue; } }; stats.upserted += 1; // Record success in the dedup set. Cover and chapter-sync // failures below are non-fatal and don't roll this back — // metadata is the durable source of truth for the dedup. seen.insert(r.source_manga_key.clone()); tracing::info!( key = %manga.source_manga_key, manga_id = %upsert.manga_id, status = ?upsert.status, title = %manga.title, "manga upserted" ); // Cover image: download when missing in storage or when metadata // signaled an update (cover URL is part of metadata_hash, so // Updated implies the URL may have moved). Failures are non-fatal. let needs_cover = upsert.cover_image_path.is_none() || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); if needs_cover { if let Some(cover_url) = manga.cover_url.as_deref() { match download_and_store_cover( db, storage, http, rate, &r.url, upsert.manga_id, cover_url, allowlist, max_image_bytes, ) .await { Ok(()) => stats.covers_fetched += 1, Err(e) => tracing::warn!( manga_id = %upsert.manga_id, error = ?e, "cover download failed" ), } } } // Chapter sync. `chapters_new` feeds the stop check below: // `None` (skip_chapters mode, or a logged-and-swallowed sync // error) refuses to stop on this manga because we can't // confirm "no new chapters." let chapters_new: Option = if skip_chapters { None } else { match repo::crawler::sync_manga_chapters( db, source_id, upsert.manga_id, &manga.chapters, ) .await { Ok(diff) => { tracing::info!( manga_id = %upsert.manga_id, new = diff.new, refreshed = diff.refreshed, dropped = diff.dropped, "chapters synced" ); Some(diff.new) } Err(e) => { tracing::warn!( manga_id = %upsert.manga_id, error = ?e, "chapter sync failed" ); None } } }; if should_stop(was_clean, upsert.status, chapters_new) { hit_stop_condition = true; tracing::info!( key = %manga.source_manga_key, "stop condition met (Unchanged metadata + 0 new chapters); halting walk" ); break 'outer; } } } // Recovery-flag write. Only on a clean exit (end-of-walk OR the // intentional stop). `hit_limit` is a caller-imposed early break // and does NOT count — the catalog tail wasn't reached, so a future // tick still needs to walk past where we stopped. The truth table is // pinned by `should_mark_clean_exit` so a future edit that adds // `hit_limit` back into the disjunction trips its unit test. Flag- // write errors are warned and swallowed: the run already did its // work, and a stale `false` flag just buys a recovery sweep on the // next tick. let exited_cleanly = should_mark_clean_exit(walked_to_completion, hit_stop_condition); if exited_cleanly { if let Err(e) = repo::crawler::mark_run_completed(db, source_id).await { tracing::warn!(error = ?e, "mark_run_completed failed"); } } tracing::info!( was_clean, discovered = stats.discovered, upserted = stats.upserted, covers_fetched = stats.covers_fetched, mangas_failed = stats.mangas_failed, walked_to_completion, hit_limit, hit_stop_condition, exited_cleanly, "metadata pass complete" ); drop(lease); Ok(stats) } /// Quarantine window for chapters whose latest `SyncChapterContent` job is /// `dead`. The partial dedup index `crawler_jobs_chapter_content_dedup_idx` /// only blocks `(pending|running)` duplicates, so without this gate a /// permanently-failing chapter is re-enqueued every cron tick, burns /// `max_attempts` retries, dies again, and spins forever. With the gate, /// dead chapters get a week of silence before the next attempt — long /// enough for a transient site issue to resolve, short enough that /// permanent failures don't stay permanent if conditions change. const CHAPTER_DEAD_QUARANTINE_DAYS: i64 = 7; /// Enqueue a `SyncChapterContent` job for every chapter of *any* bookmarked /// manga that still has `page_count = 0` and a non-dropped source row. /// Chapters whose latest job is `dead` within `CHAPTER_DEAD_QUARANTINE_DAYS` /// are excluded to break the dead-letter spin. /// Returns `(inserted, skipped)` counts. Dedup index handles repeats. pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result { let rows: Vec<(String, Uuid, String)> = sqlx::query_as( r#" SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key FROM chapters c JOIN bookmarks b ON b.manga_id = c.manga_id JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE c.page_count = 0 AND cs.dropped_at IS NULL AND NOT EXISTS ( SELECT 1 FROM crawler_jobs cj WHERE cj.payload->>'kind' = 'sync_chapter_content' AND cj.payload->>'chapter_id' = c.id::text AND cj.state = 'dead' AND cj.updated_at > now() - ($1::bigint || ' days')::interval ) GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at ORDER BY c.manga_id, c.created_at ASC "#, ) .bind(CHAPTER_DEAD_QUARANTINE_DAYS) .fetch_all(pool) .await .context("query bookmarked-pending chapters")?; let mut summary = EnqueueSummary::default(); for (source_id, chapter_id, source_chapter_key) in rows { let payload = JobPayload::SyncChapterContent { source_id, chapter_id, source_chapter_key, }; match jobs::enqueue(pool, &payload).await { Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, Ok(EnqueueResult::Skipped) => summary.skipped += 1, Err(e) => { tracing::warn!( %chapter_id, error = ?e, "enqueue chapter content failed" ); summary.failed += 1; } } } Ok(summary) } /// Enqueue chapter-content jobs for a *single* manga (the bookmark-create /// hook). Same dedup semantics as [`enqueue_bookmarked_pending`], including /// the dead-letter quarantine — a freshly bookmarked manga should not /// burn retries on chapters that just died on the cron tick. pub async fn enqueue_pending_for_manga( pool: &PgPool, manga_id: Uuid, ) -> anyhow::Result { let rows: Vec<(String, Uuid, String)> = sqlx::query_as( r#" SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key FROM chapters c JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE c.manga_id = $1 AND c.page_count = 0 AND cs.dropped_at IS NULL AND NOT EXISTS ( SELECT 1 FROM crawler_jobs cj WHERE cj.payload->>'kind' = 'sync_chapter_content' AND cj.payload->>'chapter_id' = c.id::text AND cj.state = 'dead' AND cj.updated_at > now() - ($2::bigint || ' days')::interval ) ORDER BY cs.source_id, c.id "#, ) .bind(manga_id) .bind(CHAPTER_DEAD_QUARANTINE_DAYS) .fetch_all(pool) .await .context("query pending chapters for manga")?; let mut summary = EnqueueSummary::default(); for (source_id, chapter_id, source_chapter_key) in rows { let payload = JobPayload::SyncChapterContent { source_id, chapter_id, source_chapter_key, }; match jobs::enqueue(pool, &payload).await { Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, Ok(EnqueueResult::Skipped) => summary.skipped += 1, Err(e) => { tracing::warn!( %chapter_id, error = ?e, "enqueue chapter content failed" ); summary.failed += 1; } } } Ok(summary) } #[derive(Debug, Default, Clone, Copy)] pub struct EnqueueSummary { pub inserted: usize, pub skipped: usize, pub failed: usize, } /// Download a cover image and persist its storage path. Local to the /// pipeline because the CLI still calls it from its inline chapter-content /// loop; once the worker pool fully replaces that path we can fold this /// into `pipeline` proper. #[allow(clippy::too_many_arguments)] async fn download_and_store_cover( db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, rate: &HostRateLimiters, manga_url: &str, manga_id: Uuid, cover_url: &str, allowlist: &DownloadAllowlist, max_image_bytes: usize, ) -> anyhow::Result<()> { let absolute = reqwest::Url::parse(manga_url) .context("parse manga URL")? .join(cover_url) .context("join cover URL onto manga URL")?; rate.wait_for(absolute.as_str()).await?; let bytes = fetch_bytes_capped( http, absolute.as_str(), Some(manga_url), allowlist, max_image_bytes, ) .await?; if !looks_like_image(&bytes) { anyhow::bail!( "cover URL {absolute} returned non-image bytes; refusing to store as binary blob" ); } let ext = infer::get(&bytes) .map(|k| k.extension()) .expect("looks_like_image asserted infer succeeded"); let key = format!("mangas/{manga_id}/cover.{ext}"); storage .put(&key, &bytes) .await .with_context(|| format!("store cover at {key}"))?; repo::manga::set_cover_image_path(db, manga_id, &key) .await .with_context(|| format!("update cover_image_path for {manga_id}"))?; tracing::info!( manga_id = %manga_id, key = %key, bytes = bytes.len(), %absolute, "cover stored" ); Ok(()) } use crate::crawler::url_utils::origin_of; #[cfg(test)] mod tests { use super::*; #[test] fn stop_condition_fires_on_unchanged_metadata_and_zero_new_chapters() { // The whole point of the rule: in steady state, a manga whose // metadata hash matches AND whose chapter list gained no new // entries proves we've reached the caught-up tail of a // newest-first index. assert!(should_stop(true, UpsertStatus::Unchanged, Some(0))); } #[test] fn stop_condition_refuses_when_chapters_added() { // Unchanged metadata + N new chapters means the source bumped // this manga because of the chapter add; the rest of the index // is still ahead of us. Don't bail. assert!(!should_stop(true, UpsertStatus::Unchanged, Some(1))); assert!(!should_stop(true, UpsertStatus::Unchanged, Some(42))); } #[test] fn stop_condition_refuses_when_metadata_changed() { // Updated or New metadata always continues — even with zero new // chapters — because the change-of-metadata bump itself is what // the walk is following. assert!(!should_stop(true, UpsertStatus::Updated, Some(0))); assert!(!should_stop(true, UpsertStatus::New, Some(0))); } #[test] fn stop_condition_refuses_when_chapter_count_unknown() { // skip_chapters mode (CLI metadata-only sweep) or a // logged-and-swallowed chapter sync error: we can't claim "no // new chapters" from absence of evidence, so don't stop. The // operator who runs metadata-only intentionally wants a full // walk anyway. assert!(!should_stop(true, UpsertStatus::Unchanged, None)); } #[test] fn stop_condition_disabled_in_recovery_mode() { // was_clean = false means the previous run did not exit cleanly; // the catalog past its crash point is potentially un-synced. Walk // to end-of-source no matter what individual mangas report. assert!(!should_stop(false, UpsertStatus::Unchanged, Some(0))); assert!(!should_stop(false, UpsertStatus::Unchanged, Some(1))); assert!(!should_stop(false, UpsertStatus::Updated, Some(0))); assert!(!should_stop(false, UpsertStatus::New, None)); } #[test] fn clean_exit_when_walked_to_completion() { // End-of-walk reached the catalog tail — the recovery flag may // safely flip back to `true`. assert!(should_mark_clean_exit(true, false)); } #[test] fn clean_exit_when_stop_condition_fired() { // First Unchanged + 0-new-chapter manga is a complete steady- // state exit: every manga newer than this point was synced, and // by source-side `update_date DESC` ordering everything past // this point is at least as caught-up. assert!(should_mark_clean_exit(false, true)); } #[test] fn dirty_exit_when_neither_completion_nor_stop_fired() { // The walk ended for some other reason — including the // caller-imposed `hit_limit` cap, which is the regression case // this test exists for. `should_mark_clean_exit` does not take // `hit_limit` as a parameter, so a future edit that adds // `|| hit_limit` to the inline expression in `run_metadata_pass` // would need to also touch this helper, and would fail this // assertion when it did. assert!(!should_mark_clean_exit(false, false)); } #[test] fn run_scoped_seen_set_skips_duplicate_source_manga_keys() { // Pins the per-ref loop contract: `contains` gates whether work // runs, and `insert` only fires on the success path (after upsert). // A failed ref that reappears later in the same pass must get a // second chance — that's why the loop uses contains-then-insert // instead of insert-and-skip-on-collision. let mut seen: HashSet = HashSet::new(); // First sighting of a key: not yet seen → loop proceeds. assert!(!seen.contains("manga-a"), "first sighting is unseen"); // Simulate a failed fetch_manga: do NOT insert. Next sighting must // still be considered unseen so the loop retries it. assert!(!seen.contains("manga-a"), "failed key is still retryable"); // Now simulate a successful upsert — insert is called. seen.insert("manga-a".to_string()); // Subsequent sightings of the same key are skipped. assert!(seen.contains("manga-a"), "successful key is now seen"); // Distinct keys never collide. assert!(!seen.contains("manga-b"), "different key independent"); seen.insert("manga-b".to_string()); assert!(seen.contains("manga-b")); assert!(seen.contains("manga-a"), "first key still recorded"); } }