//! Crawler pipeline — the reusable metadata pass and the enqueue helpers //! that fan out chapter-content work. Shared between the daemon (cron tick) //! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep. use anyhow::Context; use sqlx::PgPool; use uuid::Uuid; use crate::crawler::browser_manager::BrowserManager; use crate::crawler::jobs::{self, EnqueueResult, JobPayload}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::source::target::TargetSource; use crate::crawler::source::{DiscoverMode, FetchContext, Source}; use crate::repo; use crate::storage::Storage; /// Coarse counters surfaced for logging at the end of a metadata pass. #[derive(Debug, Default, Clone, Copy)] pub struct MetadataStats { pub discovered: usize, pub upserted: usize, pub covers_fetched: usize, pub mangas_failed: usize, } /// Decide whether the per-ref loop should stop based on the Incremental /// streak counter. Pulled out as a pure function so the rule is unit- /// testable without standing up the walker or DB. pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool { match mode { DiscoverMode::Backfill => false, DiscoverMode::Incremental { stop_after_unchanged } => { consecutive_unchanged >= stop_after_unchanged } } } /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline /// for the target source. Pure metadata; chapter content is enqueued as /// separate `SyncChapterContent` jobs by the caller after this returns. /// /// `limit == 0` means no cap (full sweep up to the source's own bound). /// `skip_chapters == true` is the "metadata-only" mode (parser doesn't /// extract chapters, and `sync_manga_chapters` is skipped — otherwise an /// empty chapter list would soft-drop existing rows). /// /// `mode` controls the walk: /// - `Backfill` — oldest-first, no early exit. The only mode that runs /// the end-of-walk drop pass + writes `seed_completed_at`. /// - `Incremental { stop_after_unchanged }` — newest-first, breaks out /// after N consecutive Unchanged upserts. Drop pass is skipped (the /// tail of the index is never visited, so its `last_seen_at` is /// stale and using it to soft-drop would be unsafe). #[allow(clippy::too_many_arguments)] pub async fn run_metadata_pass( browser_manager: &BrowserManager, db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, rate: &HostRateLimiters, start_url: &str, limit: usize, skip_chapters: bool, mode: DiscoverMode, ) -> anyhow::Result { let lease = browser_manager .acquire() .await .context("acquire browser lease for metadata pass")?; let browser_ref: &chromiumoxide::Browser = &lease; let source = { let s = TargetSource::new(start_url.to_string()); if skip_chapters { s.without_chapter_parsing() } else { s } }; let ctx = FetchContext { browser: browser_ref, rate, }; let source_id = source.id(); repo::crawler::ensure_source( db, source_id, "Target Site", &origin_of(start_url).unwrap_or_else(|| start_url.to_string()), ) .await .context("ensure_source")?; let run_started_at = chrono::Utc::now(); let max_refs = (limit > 0).then_some(limit); tracing::info!(?mode, ?max_refs, "starting metadata pass"); let mut walker = source .discover(&ctx, mode) .await .context("discover failed")?; let mut stats = MetadataStats::default(); let mut consecutive_unchanged: usize = 0; let mut walked_to_completion = false; let mut hit_limit = false; let mut hit_incremental_stop = false; 'outer: loop { let batch = match walker.next_batch(&ctx).await? { Some(b) => b, None => { walked_to_completion = true; break; } }; for r in batch { if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) { hit_limit = true; tracing::info!(cap = ?max_refs, "max_results reached; halting walk"); break 'outer; } stats.discovered += 1; tracing::info!( idx = stats.discovered, key = %r.source_manga_key, "fetching metadata" ); let manga = match source.fetch_manga(&ctx, &r).await { Ok(m) => m, Err(e) => { tracing::warn!( key = %r.source_manga_key, url = %r.url, error = ?e, "fetch_manga failed" ); stats.mangas_failed += 1; continue; } }; let upsert = match repo::crawler::upsert_manga_from_source( db, source_id, &r.url, &manga, ) .await { Ok(u) => u, Err(e) => { tracing::error!( key = %r.source_manga_key, error = ?e, "upsert_manga_from_source failed" ); stats.mangas_failed += 1; continue; } }; stats.upserted += 1; tracing::info!( key = %manga.source_manga_key, manga_id = %upsert.manga_id, status = ?upsert.status, title = %manga.title, "manga upserted" ); // Cover image: download when missing in storage or when metadata // signaled an update (cover URL is part of metadata_hash, so // Updated implies the URL may have moved). Failures are non-fatal. let needs_cover = upsert.cover_image_path.is_none() || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); if needs_cover { if let Some(cover_url) = manga.cover_url.as_deref() { match download_and_store_cover( db, storage, http, rate, &r.url, upsert.manga_id, cover_url, ) .await { Ok(()) => stats.covers_fetched += 1, Err(e) => tracing::warn!( manga_id = %upsert.manga_id, error = ?e, "cover download failed" ), } } } if !skip_chapters { match repo::crawler::sync_manga_chapters( db, source_id, upsert.manga_id, &manga.chapters, ) .await { Ok(diff) => tracing::info!( manga_id = %upsert.manga_id, new = diff.new, refreshed = diff.refreshed, dropped = diff.dropped, "chapters synced" ), Err(e) => tracing::warn!( manga_id = %upsert.manga_id, error = ?e, "chapter sync failed" ), } } // Incremental stop: count consecutive Unchanged upserts and // bail once the threshold is reached. New/Updated resets the // streak so a fresh entry mid-batch doesn't accidentally trip // the cutoff. match upsert.status { repo::crawler::UpsertStatus::Unchanged => { consecutive_unchanged += 1; } repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => { consecutive_unchanged = 0; } } if should_stop(mode, consecutive_unchanged) { hit_incremental_stop = true; tracing::info!( consecutive_unchanged, "incremental stop threshold reached; halting walk" ); break 'outer; } } } // Drop pass: only when the walk truly covered everything the source // surfaces. `last_seen_at` on un-visited rows is stale, so running // the drop on a partial walk would soft-drop the tail of the index. let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop; let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill); if full_walk { match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), } } else { tracing::info!( ?mode, hit_limit, hit_incremental_stop, "partial sync — skipping drop pass" ); } if backfill_complete { if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await { tracing::warn!(error = ?e, "mark_seed_completed failed"); } else { tracing::info!(source_id, "seed marked complete"); } } tracing::info!( ?mode, discovered = stats.discovered, upserted = stats.upserted, covers_fetched = stats.covers_fetched, mangas_failed = stats.mangas_failed, walked_to_completion, hit_limit, hit_incremental_stop, "metadata pass complete" ); drop(lease); Ok(stats) } /// Enqueue a `SyncChapterContent` job for every chapter of *any* bookmarked /// manga that still has `page_count = 0` and a non-dropped source row. /// Returns `(inserted, skipped)` counts. Dedup index handles repeats. pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result { let rows: Vec<(String, Uuid, String)> = sqlx::query_as( r#" SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key FROM chapters c JOIN bookmarks b ON b.manga_id = c.manga_id JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE c.page_count = 0 AND cs.dropped_at IS NULL GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at ORDER BY c.manga_id, c.created_at ASC "#, ) .fetch_all(pool) .await .context("query bookmarked-pending chapters")?; let mut summary = EnqueueSummary::default(); for (source_id, chapter_id, source_chapter_key) in rows { let payload = JobPayload::SyncChapterContent { source_id, chapter_id, source_chapter_key, }; match jobs::enqueue(pool, &payload).await { Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, Ok(EnqueueResult::Skipped) => summary.skipped += 1, Err(e) => { tracing::warn!( %chapter_id, error = ?e, "enqueue chapter content failed" ); summary.failed += 1; } } } Ok(summary) } /// Enqueue chapter-content jobs for a *single* manga (the bookmark-create /// hook). Same dedup semantics as [`enqueue_bookmarked_pending`]. pub async fn enqueue_pending_for_manga( pool: &PgPool, manga_id: Uuid, ) -> anyhow::Result { let rows: Vec<(String, Uuid, String)> = sqlx::query_as( r#" SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key FROM chapters c JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE c.manga_id = $1 AND c.page_count = 0 AND cs.dropped_at IS NULL ORDER BY cs.source_id, c.id "#, ) .bind(manga_id) .fetch_all(pool) .await .context("query pending chapters for manga")?; let mut summary = EnqueueSummary::default(); for (source_id, chapter_id, source_chapter_key) in rows { let payload = JobPayload::SyncChapterContent { source_id, chapter_id, source_chapter_key, }; match jobs::enqueue(pool, &payload).await { Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, Ok(EnqueueResult::Skipped) => summary.skipped += 1, Err(e) => { tracing::warn!( %chapter_id, error = ?e, "enqueue chapter content failed" ); summary.failed += 1; } } } Ok(summary) } #[derive(Debug, Default, Clone, Copy)] pub struct EnqueueSummary { pub inserted: usize, pub skipped: usize, pub failed: usize, } /// Download a cover image and persist its storage path. Local to the /// pipeline because the CLI still calls it from its inline chapter-content /// loop; once the worker pool fully replaces that path we can fold this /// into `pipeline` proper. async fn download_and_store_cover( db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, rate: &HostRateLimiters, manga_url: &str, manga_id: Uuid, cover_url: &str, ) -> anyhow::Result<()> { let absolute = reqwest::Url::parse(manga_url) .context("parse manga URL")? .join(cover_url) .context("join cover URL onto manga URL")?; rate.wait_for(absolute.as_str()).await?; let resp = http .get(absolute.clone()) .header(reqwest::header::REFERER, manga_url) .send() .await .with_context(|| format!("GET {absolute}"))? .error_for_status() .with_context(|| format!("non-2xx for {absolute}"))?; let bytes = resp.bytes().await.context("read cover body")?; let kind = infer::get(&bytes); let ext = kind.map(|k| k.extension()).unwrap_or("bin"); let key = format!("mangas/{manga_id}/cover.{ext}"); storage .put(&key, &bytes) .await .with_context(|| format!("store cover at {key}"))?; repo::manga::set_cover_image_path(db, manga_id, &key) .await .with_context(|| format!("update cover_image_path for {manga_id}"))?; tracing::info!( manga_id = %manga_id, key = %key, bytes = bytes.len(), %absolute, "cover stored" ); Ok(()) } fn origin_of(url: &str) -> Option { let (scheme, rest) = url.split_once("://")?; let host = rest.split('/').next()?; Some(format!("{scheme}://{host}")) } #[cfg(test)] mod tests { use super::*; #[test] fn backfill_never_stops_regardless_of_streak() { assert!(!should_stop(DiscoverMode::Backfill, 0)); assert!(!should_stop(DiscoverMode::Backfill, 100)); assert!(!should_stop(DiscoverMode::Backfill, usize::MAX)); } #[test] fn incremental_stops_when_streak_meets_threshold() { let mode = DiscoverMode::Incremental { stop_after_unchanged: 3, }; assert!(!should_stop(mode, 0)); assert!(!should_stop(mode, 2)); assert!(should_stop(mode, 3), "stops at exactly the threshold"); assert!(should_stop(mode, 100), "stops at anything past threshold"); } #[test] fn incremental_with_zero_threshold_stops_immediately() { // A nonsensical config (no Unchanged needed to stop) shouldn't // panic — it just means the very first ref triggers the bail. let mode = DiscoverMode::Incremental { stop_after_unchanged: 0, }; assert!(should_stop(mode, 0)); } }