From 9fe0f26d75900c61b1493b965b07a78c515a4e07 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 25 May 2026 20:32:02 +0200 Subject: [PATCH] feat: in-process crawler daemon with cron and worker pool (0.28.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The backend now boots an internal crawler daemon that runs a daily metadata pass (CRAWLER_DAILY_AT in CRAWLER_TZ, advisory-lock guarded for multi-replica safety) and drains SyncChapterContent jobs from crawler_jobs through a worker pool. Chromium launches lazily on first job and is torn down after CRAWLER_IDLE_TIMEOUT_S seconds of inactivity. Modules: - crawler::browser_manager — lazy-launch / idle-teardown wrapper around browser::Handle, with an on_launch hook that re-injects PHPSESSID on every fresh Chromium spawn. - crawler::pipeline — run_metadata_pass (the shared discover/upsert /cover/sync-chapters loop) and the enqueue_bookmarked_pending helper used by the cron tick. - crawler::daemon — cron task + worker pool, behind two trait seams (MetadataPass, ChapterDispatcher) so tests can inject stubs without standing up Chromium or a live source. Behavior: - CRAWLER_DAEMON=false skips daemon spawn entirely (default for tests). - Catch-up tick fires on startup if the last persisted slot was missed. - A SyncOutcome::SessionExpired sets a sticky AtomicBool; workers idle until operator restart with a refreshed PHPSESSID. - Worker dispatch wrapped in catch_unwind so a panicking handler marks the job failed instead of taking down the worker. - Migration 0015 adds a small crawler_state k-v table for the last_metadata_tick_at watermark. Dep additions: chrono-tz (IANA TZ parsing). CLI (bin/crawler) reuses pipeline::run_metadata_pass and now holds the browser via BrowserManager so the on_launch session injection flow stays in one place. Inline chapter-content sync semantics are unchanged — the queue is for the daemon, force-refetches and manual backfills still bypass it. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 34 +- backend/Cargo.toml | 3 +- backend/migrations/0015_crawler_state.sql | 12 + backend/src/app.rs | 255 ++++++++- backend/src/bin/crawler.rs | 378 +++---------- backend/src/config.rs | 109 ++++ backend/src/crawler/browser.rs | 46 +- backend/src/crawler/browser_manager.rs | 262 +++++++++ backend/src/crawler/daemon.rs | 633 ++++++++++++++++++++++ backend/src/crawler/mod.rs | 3 + backend/src/crawler/pipeline.rs | 347 ++++++++++++ backend/src/main.rs | 15 +- backend/tests/crawler_daemon.rs | 372 +++++++++++++ frontend/package.json | 2 +- 14 files changed, 2162 insertions(+), 309 deletions(-) create mode 100644 backend/migrations/0015_crawler_state.sql create mode 100644 backend/src/crawler/browser_manager.rs create mode 100644 backend/src/crawler/daemon.rs create mode 100644 backend/src/crawler/pipeline.rs create mode 100644 backend/tests/crawler_daemon.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 60c050a..234a9f6 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -397,6 +397,28 @@ dependencies = [ "windows-link", ] +[[package]] +name = "chrono-tz" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93698b29de5e97ad0ae26447b344c482a7284c737d9ddc5f9e52b74a336671bb" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf 0.11.3", +] + +[[package]] +name = "chrono-tz-build" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c088aee841df9c3041febbb73934cfc39708749bf96dc827e3359cd39ef11b1" +dependencies = [ + "parse-zoneinfo", + "phf 0.11.3", + "phf_codegen 0.11.3", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1448,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.27.0" +version = "0.28.0" dependencies = [ "anyhow", "argon2", @@ -1459,6 +1481,7 @@ dependencies = [ "bytes", "chromiumoxide", "chrono", + "chrono-tz", "dotenvy", "futures-core", "futures-util", @@ -1868,6 +1891,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + [[package]] name = "password-hash" version = "0.5.0" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 0b7a503..d11e749 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.27.0" +version = "0.28.0" edition = "2021" default-run = "mangalord" @@ -23,6 +23,7 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } +chrono-tz = "0.9" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tower = { version = "0.5", features = ["util"] } diff --git a/backend/migrations/0015_crawler_state.sql b/backend/migrations/0015_crawler_state.sql new file mode 100644 index 0000000..ea07e7f --- /dev/null +++ b/backend/migrations/0015_crawler_state.sql @@ -0,0 +1,12 @@ +-- Small key-value table for daemon state that needs to survive restarts. +-- +-- Used so far only by the cron scheduler (`last_metadata_tick_at`) so it can +-- detect that the most recent slot was missed (e.g. the backend was down at +-- midnight) and fire immediately on startup before resuming the regular +-- schedule. JSONB on the value column lets future keys carry richer payloads +-- without another migration. +CREATE TABLE crawler_state ( + key text PRIMARY KEY, + value jsonb NOT NULL, + updated_at timestamptz NOT NULL DEFAULT now() +); diff --git a/backend/src/app.rs b/backend/src/app.rs index 58d8aa6..d8118ba 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -1,14 +1,25 @@ use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use anyhow::Context; +use async_trait::async_trait; use axum::extract::DefaultBodyLimit; use axum::http::{HeaderName, HeaderValue, Method}; use axum::Router; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; +use tokio_util::sync::CancellationToken; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; -use crate::config::{AuthConfig, Config, UploadConfig}; +use crate::config::{AuthConfig, Config, CrawlerConfig, UploadConfig}; +use crate::crawler::browser_manager::{self, BrowserManager}; +use crate::crawler::content::{self, SyncOutcome}; +use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass}; +use crate::crawler::jobs::JobPayload; +use crate::crawler::pipeline::{self, MetadataStats}; +use crate::crawler::rate_limit::HostRateLimiters; +use crate::crawler::session; use crate::storage::{LocalStorage, Storage}; #[derive(Clone)] @@ -19,7 +30,23 @@ pub struct AppState { pub upload: UploadConfig, } -pub async fn build(config: Config) -> anyhow::Result { +/// Bundle returned by [`build`]. The router is what `axum::serve` consumes; +/// the daemon (when enabled) outlives the HTTP server and is awaited via +/// [`AppHandle::shutdown`] after the listener has finished gracefully. +pub struct AppHandle { + pub router: Router, + pub daemon: Option, +} + +impl AppHandle { + pub async fn shutdown(self) { + if let Some(d) = self.daemon { + d.shutdown().await; + } + } +} + +pub async fn build(config: Config) -> anyhow::Result { let db = PgPoolOptions::new() .max_connections(10) .connect(&config.database_url) @@ -28,13 +55,235 @@ pub async fn build(config: Config) -> anyhow::Result { let storage: Arc = Arc::new(LocalStorage::new(config.storage_dir.clone())); + let daemon = if config.crawler.daemon_enabled { + Some(spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?) + } else { + tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)"); + None + }; + let state = AppState { db, storage, auth: config.auth.clone(), upload: config.upload.clone(), }; - Ok(router(state).layer(cors_layer(&config.cors_allowed_origins))) + let router = router(state).layer(cors_layer(&config.cors_allowed_origins)); + Ok(AppHandle { router, daemon }) +} + +async fn spawn_crawler_daemon( + db: PgPool, + storage: Arc, + cfg: &CrawlerConfig, +) -> anyhow::Result { + // Reqwest client with cookie jar pre-seeded so CDN image fetches + // include PHPSESSID. Same shape as bin/crawler.rs main(). + let cookie_jar = Arc::new(reqwest::cookie::Jar::default()); + if let (Some(sid), Some(domain), Some(start_url)) = + (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) + { + let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); + let seed_url = reqwest::Url::parse(start_url) + .context("parse CRAWLER_START_URL for cookie seed")?; + cookie_jar.add_cookie_str(&cookie_str, &seed_url); + } + let mut http_builder = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(30)) + .no_proxy() + .cookie_provider(cookie_jar); + if let Some(ua) = &cfg.user_agent { + http_builder = http_builder.user_agent(ua); + } + if let Some(proxy) = &cfg.proxy { + http_builder = http_builder + .proxy(reqwest::Proxy::all(proxy).with_context(|| format!("parse proxy: {proxy}"))?); + } + let http = http_builder.build().context("build crawler reqwest")?; + + let mut rate = HostRateLimiters::new(std::time::Duration::from_millis(cfg.rate_ms)); + if let Some(host) = &cfg.cdn_host { + rate = rate.with_override(host, std::time::Duration::from_millis(cfg.cdn_rate_ms)); + } + let rate = Arc::new(rate); + + // Browser manager. on_launch re-injects PHPSESSID on every fresh + // chromium spawn so an idle teardown followed by re-launch stays + // authenticated without operator action. + let mut launch_opts = cfg.browser.clone(); + if let Some(proxy) = &cfg.proxy { + launch_opts.extra_args.push(format!("--proxy-server={proxy}")); + } + let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) { + (Some(sid), Some(domain), Some(start_url)) => { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url = start_url.clone(); + let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url = start_url.clone(); + Box::pin(async move { + session::inject_phpsessid(&browser, &sid, &domain) + .await + .context("on_launch: inject_phpsessid")?; + session::verify_session(&browser, &start_url) + .await + .context("on_launch: verify_session")?; + Ok(()) + }) + }); + on_launch + } + _ => browser_manager::noop_on_launch(), + }; + let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch); + + let session_expired = Arc::new(AtomicBool::new(false)); + + let metadata_pass: Option> = cfg.start_url.as_ref().map(|url| { + let m: Arc = Arc::new(RealMetadataPass { + browser_manager: Arc::clone(&browser_manager), + db: db.clone(), + storage: Arc::clone(&storage), + http: http.clone(), + rate: Arc::clone(&rate), + start_url: url.clone(), + }); + m + }); + + let dispatcher: Arc = Arc::new(RealChapterDispatcher { + browser_manager: Arc::clone(&browser_manager), + db: db.clone(), + storage: Arc::clone(&storage), + http, + rate: Arc::clone(&rate), + }); + + // Shared cancellation: daemon shutdown cancels the BrowserManager's + // idle reaper too. Reaper itself is added to the daemon's extra_tasks + // so DaemonHandle::shutdown awaits its completion. + let cancel = CancellationToken::new(); + let reaper_task = browser_manager::spawn_idle_reaper( + Arc::clone(&browser_manager), + cancel.clone(), + ); + // Also close the browser explicitly on shutdown so we don't rely on + // kill-on-drop when other Arc holders may still exist. + let shutdown_task = { + let cancel = cancel.clone(); + let mgr = Arc::clone(&browser_manager); + tokio::spawn(async move { + cancel.cancelled().await; + mgr.shutdown().await; + }) + }; + + let daemon_handle = daemon::spawn( + db, + cancel, + DaemonConfig { + metadata_pass, + dispatcher, + chapter_workers: cfg.chapter_workers, + daily_at: cfg.daily_at, + tz: cfg.tz, + retention_days: cfg.retention_days, + session_expired, + extra_tasks: vec![reaper_task, shutdown_task], + }, + ); + + Ok(daemon_handle) +} + +// Real impls of the daemon traits, owning the browser manager + I/O. Kept +// in app.rs because they need the same builder-side env wiring that +// AppState gets — the daemon module itself stays free of reqwest / storage +// details so its tests don't pull them in. + +struct RealMetadataPass { + browser_manager: Arc, + db: PgPool, + storage: Arc, + http: reqwest::Client, + rate: Arc, + start_url: String, +} + +#[async_trait] +impl MetadataPass for RealMetadataPass { + async fn run(&self) -> anyhow::Result { + pipeline::run_metadata_pass( + &self.browser_manager, + &self.db, + self.storage.as_ref(), + &self.http, + &self.rate, + &self.start_url, + 0, + false, + ) + .await + } +} + +struct RealChapterDispatcher { + browser_manager: Arc, + db: PgPool, + storage: Arc, + http: reqwest::Client, + rate: Arc, +} + +#[async_trait] +impl ChapterDispatcher for RealChapterDispatcher { + async fn dispatch(&self, payload: JobPayload) -> anyhow::Result { + match payload { + JobPayload::SyncChapterContent { + source_id: _, + chapter_id, + source_chapter_key: _, + } => { + // Look up manga_id + source_url for this chapter. + let row: Option<(uuid::Uuid, String)> = sqlx::query_as( + "SELECT c.manga_id, cs.source_url \ + FROM chapters c \ + JOIN chapter_sources cs ON cs.chapter_id = c.id \ + WHERE c.id = $1 \ + LIMIT 1", + ) + .bind(chapter_id) + .fetch_optional(&self.db) + .await + .context("look up chapter for dispatch")?; + let Some((manga_id, source_url)) = row else { + // Chapter (or its source row) is gone — ack done. + return Ok(SyncOutcome::Skipped); + }; + let lease = self.browser_manager.acquire().await?; + let outcome = content::sync_chapter_content( + &lease, + &self.db, + self.storage.as_ref(), + &self.http, + &self.rate, + chapter_id, + manga_id, + &source_url, + false, + ) + .await?; + drop(lease); + Ok(outcome) + } + // Other payload kinds aren't dispatched by this daemon yet — + // metadata-driven jobs (Discover/SyncManga/SyncChapterList) + // are handled inline by the cron's metadata pass. + _ => Ok(SyncOutcome::Skipped), + } + } } /// Build a router from a pre-assembled state. Used by integration tests diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 9021a4a..d767598 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -1,58 +1,23 @@ //! Crawler binary. //! -//! Walks the source's manga listing (all pages), fetches each manga's -//! metadata + chapter list, downloads the cover into `Storage`, and -//! reconciles everything into the DB. Then, for any chapter belonging -//! to a bookmarked manga whose `page_count` is still 0, fetches the -//! chapter page (logged in), pulls every image from the CDN, and writes -//! the `pages` rows atomically per chapter. +//! Now an ops escape hatch sitting alongside the in-process daemon: walks +//! the source's manga listing (all pages), fetches each manga's metadata + +//! chapter list, downloads covers, reconciles chapters — and then, for any +//! chapter belonging to a bookmarked manga whose `page_count` is still 0, +//! fetches the chapter pages inline. The daemon does the same work through +//! `crawler_jobs`; the CLI is kept around for force-refetches and manual +//! backfills. //! -//! Configuration: -//! - **Start URL** (required): first CLI positional arg, else -//! `$CRAWLER_START_URL`. This is the manga *list* page (page 1). -//! - **Database** (required): `$DATABASE_URL`. -//! - **Storage dir**: `$STORAGE_DIR`, default `./data/storage` — -//! matches the API binary so both write to the same local tree. -//! - **Browser**: see `LaunchOptions::from_env` — -//! `CRAWLER_BROWSER_MODE` (`headed`|`headless`) and -//! `CRAWLER_BROWSER_ARGS`. -//! - **Rate limit**: `CRAWLER_RATE_MS` (ms between requests per host, -//! default `1000`). Per-host: catalog and each CDN have their own -//! bucket and don't share a budget. -//! - **CDN rate override** (optional): `CRAWLER_CDN_HOST` plus -//! `CRAWLER_CDN_RATE_MS` to give a specific host a different -//! interval. Useful when the image CDN tolerates higher RPS than -//! the catalog host. -//! - **Cap**: `CRAWLER_LIMIT` (max manga detail fetches per run, -//! default `0` = no cap). -//! - **Skip chapters**: `CRAWLER_SKIP_CHAPTERS=1` — turn off the -//! chapter selector in the parser AND skip the per-manga -//! `sync_manga_chapters` write. Use this for "metadata only" runs. -//! - **Skip chapter content**: `CRAWLER_SKIP_CHAPTER_CONTENT=1` — -//! skip the page-image phase even if chapters need syncing. -//! - **Chapter content workers**: `CRAWLER_CHAPTER_WORKERS` (default -//! `1`). Multiple workers process distinct chapters concurrently; -//! the per-host rate limiter still gates total RPS to each origin. -//! - **Force re-fetch**: `CRAWLER_FORCE_REFETCH_CHAPTERS=1` — re-fetch -//! chapter images even when `page_count > 0`. Rare; use after the -//! source replaces a chapter's images. -//! - **PHPSESSID**: `CRAWLER_PHPSESSID` — paste your browser's -//! session cookie. Required for chapter content (logged-out reader -//! is paginated per-image and not viable at scale). -//! - **Cookie domain** (optional): `CRAWLER_COOKIE_DOMAIN` overrides -//! the auto-derived `..`. Only needed for -//! multi-part TLDs (`.co.uk`, etc.). -//! - **User agent** (optional): `CRAWLER_USER_AGENT` — applies to -//! reqwest image fetches. Default uses reqwest's built-in UA. -//! - **Proxy**: `$CRAWLER_PROXY` — single URL applied to both -//! Chromium (`--proxy-server`) and `reqwest::Proxy::all`. Supports -//! `http://`, `https://`, and `socks5://` (with optional user:pass). -//! Example: `socks5://user:pass@host:1080`. Unset → direct. -//! - **Keep browser open**: `CRAWLER_KEEP_BROWSER_OPEN=1` — when -//! running headed, block on Ctrl+C at every shutdown point so the -//! operator can inspect DOM state, cookies, or network calls in the -//! visible Chromium window before exit. Ignored in headless mode -//! (no window to inspect). +//! Configuration mirrors the daemon's `CRAWLER_*` env vars (see +//! `crate::config::CrawlerConfig`) plus the CLI-only: +//! - **Start URL**: first CLI positional arg, else `$CRAWLER_START_URL`. +//! - **Skip chapters / chapter content / force re-fetch / keep browser**: +//! `CRAWLER_SKIP_CHAPTERS`, `CRAWLER_SKIP_CHAPTER_CONTENT`, +//! `CRAWLER_FORCE_REFETCH_CHAPTERS`, `CRAWLER_KEEP_BROWSER_OPEN`. +//! - **Limit**: `CRAWLER_LIMIT` (max manga detail fetches per run). +//! +//! See `crawler::pipeline::run_metadata_pass` for the shared metadata +//! flow. use std::path::PathBuf; use std::sync::Arc; @@ -60,14 +25,12 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use futures_util::stream::{self, StreamExt}; -use mangalord::crawler::{ - browser::{self, LaunchOptions}, - content::{self, SyncOutcome}, - rate_limit::HostRateLimiters, - session, - source::{target::TargetSource, DiscoverMode, FetchContext, Source}, -}; -use mangalord::repo; +use mangalord::crawler::browser::{BrowserMode, LaunchOptions}; +use mangalord::crawler::browser_manager::{self, BrowserManager}; +use mangalord::crawler::content::{self, SyncOutcome}; +use mangalord::crawler::pipeline; +use mangalord::crawler::rate_limit::HostRateLimiters; +use mangalord::crawler::session; use mangalord::storage::{LocalStorage, Storage}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -126,10 +89,6 @@ async fn main() -> anyhow::Result<()> { let storage: Arc = Arc::new(LocalStorage::new(&storage_dir)); - // Build reqwest with: own cookie jar (seeded with PHPSESSID for - // the catalog domain only), optional UA override, optional single - // proxy. `no_proxy()` disables env-based detection so the - // CRAWLER_PROXY knob is the only routing input. let cookie_jar = Arc::new(reqwest::cookie::Jar::default()); if let (Some(sid), Some(domain)) = (&phpsessid, &cookie_domain) { let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); @@ -155,12 +114,9 @@ async fn main() -> anyhow::Result<()> { if let Some(proxy) = &proxy_url { options.extra_args.push(format!("--proxy-server={proxy}")); } - // Keep-open is a debug aid; only meaningful when there's a window - // to inspect. Warn loudly if the operator set it under headless so - // they don't sit waiting for a Ctrl+C that won't show anything. let keep_open = match (keep_browser_open, options.mode) { - (true, browser::BrowserMode::Headed) => true, - (true, browser::BrowserMode::Headless) => { + (true, BrowserMode::Headed) => true, + (true, BrowserMode::Headless) => { tracing::warn!( "CRAWLER_KEEP_BROWSER_OPEN ignored in headless mode (no window to inspect)" ); @@ -188,32 +144,37 @@ async fn main() -> anyhow::Result<()> { "starting crawler" ); - let handle = browser::launch(options).await.context("launch browser")?; - - // Cookie + session probe must happen *before* any browser - // navigation that depends on auth (i.e. chapter content). The - // discover/metadata phase doesn't strictly need auth, but - // probing now lets us fail fast: a bad cookie costs ~2s here - // instead of 30 min into a backfill. - let session_ready = if let (Some(sid), Some(domain)) = (&phpsessid, &cookie_domain) { - if let Err(e) = session::inject_phpsessid(handle.browser(), sid, domain).await { - close_or_wait(handle, keep_open).await; - return Err(e); + // BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium + // alive for the entire run — same lifecycle as the old direct + // `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the + // session probe; bad cookies fail fast before any real work happens. + let on_launch: browser_manager::OnLaunch = match (&phpsessid, &cookie_domain) { + (Some(sid), Some(domain)) => { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url_clone = start_url.clone(); + Arc::new(move |browser| { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url = start_url_clone.clone(); + Box::pin(async move { + session::inject_phpsessid(&browser, &sid, &domain) + .await + .context("inject_phpsessid")?; + session::verify_session(&browser, &start_url) + .await + .context("verify_session")?; + Ok(()) + }) + }) } - match session::verify_session(handle.browser(), &start_url).await { - Ok(()) => true, - Err(e) => { - close_or_wait(handle, keep_open).await; - return Err(e); - } - } - } else { - tracing::info!("no PHPSESSID supplied — chapter content phase will be skipped"); - false + _ => browser_manager::noop_on_launch(), }; + let session_ready = phpsessid.is_some() && cookie_domain.is_some(); + let manager = BrowserManager::new(options, Duration::ZERO, on_launch); let result = run( - handle.browser(), + Arc::clone(&manager), &db, Arc::clone(&storage), &http, @@ -228,17 +189,7 @@ async fn main() -> anyhow::Result<()> { force_refetch_chapters, ) .await; - close_or_wait(handle, keep_open).await; - result -} -/// Either close the browser immediately or wait for Ctrl+C first. -/// `keep_open=true` is only ever passed when the browser is headed, so -/// the operator has a real window to poke at. Browser is dropped at -/// the end of this fn in both cases — chromiumoxide's `Browser` is -/// `kill_on_drop`, so we must wait for the Ctrl+C *before* the drop -/// or the Chromium child gets killed out from under the operator. -async fn close_or_wait(handle: browser::Handle, keep_open: bool) { if keep_open { tracing::info!( "crawler finished; browser kept open. Press Ctrl+C to close and exit." @@ -246,12 +197,13 @@ async fn close_or_wait(handle: browser::Handle, keep_open: bool) { let _ = tokio::signal::ctrl_c().await; tracing::info!("Ctrl+C received; closing browser"); } - let _ = handle.close().await; + manager.shutdown().await; + result } #[allow(clippy::too_many_arguments)] async fn run( - browser: &chromiumoxide::Browser, + manager: Arc, db: &PgPool, storage: Arc, http: &reqwest::Client, @@ -270,132 +222,28 @@ async fn run( rate = rate.with_override(host, Duration::from_millis(cdn_rate_ms)); } let rate = Arc::new(rate); - let source = { - let s = TargetSource::new(start_url.to_string()); - if skip_chapters { - s.without_chapter_parsing() - } else { - s - } - }; - let ctx = FetchContext { - browser, - rate: rate.as_ref(), - }; - let source_id = source.id(); - repo::crawler::ensure_source( + let stats = pipeline::run_metadata_pass( + manager.as_ref(), db, - source_id, - "Target Site", - &origin_of(start_url).unwrap_or_else(|| start_url.to_string()), + storage.as_ref(), + http, + rate.as_ref(), + start_url, + limit, + skip_chapters, ) - .await - .context("ensure_source")?; - - let run_started_at = chrono::Utc::now(); - - let max_refs = (limit > 0).then_some(limit); - tracing::info!(?max_refs, "discovering manga list"); - let refs = source - .discover(&ctx, DiscoverMode::Backfill, max_refs) - .await - .context("discover failed")?; - tracing::info!(count = refs.len(), "discovered manga list"); - - let to_fetch = refs; - let total = to_fetch.len(); - - for (i, r) in to_fetch.iter().enumerate() { - tracing::info!(idx = i + 1, total, key = %r.source_manga_key, "fetching metadata"); - let manga = match source.fetch_manga(&ctx, r).await { - Ok(m) => m, - Err(e) => { - tracing::warn!(key = %r.source_manga_key, url = %r.url, error = ?e, "fetch_manga failed"); - continue; - } - }; - - let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga) - .await - { - Ok(u) => u, - Err(e) => { - tracing::error!(key = %r.source_manga_key, error = ?e, "upsert_manga_from_source failed"); - continue; - } - }; - tracing::info!( - key = %manga.source_manga_key, - manga_id = %upsert.manga_id, - status = ?upsert.status, - title = %manga.title, - "manga upserted" - ); - - // Cover image: download when missing in storage (backfill for - // mangas synced before cover-download support, plus the New - // path) or when metadata changed (cover URL is part of - // metadata_hash, so an Updated status implies the URL may - // have moved). Failures are non-fatal. - let needs_cover = upsert.cover_image_path.is_none() - || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); - if needs_cover { - if let Some(cover_url) = manga.cover_url.as_deref() { - if let Err(e) = download_and_store_cover( - db, - storage.as_ref(), - http, - rate.as_ref(), - &r.url, - upsert.manga_id, - cover_url, - ) - .await - { - tracing::warn!(manga_id = %upsert.manga_id, error = ?e, "cover download failed"); - } - } - } - - if !skip_chapters { - match repo::crawler::sync_manga_chapters( - db, - source_id, - upsert.manga_id, - &manga.chapters, - ) - .await - { - Ok(diff) => tracing::info!( - manga_id = %upsert.manga_id, - new = diff.new, - refreshed = diff.refreshed, - dropped = diff.dropped, - "chapters synced" - ), - Err(e) => tracing::warn!(manga_id = %upsert.manga_id, error = ?e, "chapter sync failed"), - } - } - } - - if limit == 0 { - match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { - Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), - Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), - } - } else { - tracing::info!(limit, "partial sync — skipping drop pass"); - } + .await?; + tracing::info!(?stats, "metadata pass complete"); if !skip_chapter_content { sync_bookmarked_chapter_content( - browser, + Arc::clone(&manager), db, Arc::clone(&storage), http, Arc::clone(&rate), - source_id, + "target", chapter_workers, force_refetch_chapters, ) @@ -405,17 +253,15 @@ async fn run( Ok(()) } -/// Find every chapter whose manga is bookmarked by at least one user -/// and that hasn't been content-synced yet, then fan them out across -/// `workers` concurrent tasks. Each task is one full chapter sync; the -/// per-host rate limiter caps total RPS to the source/CDN regardless -/// of worker count. +/// Find every chapter whose manga is bookmarked by at least one user and +/// that hasn't been content-synced yet, then fan them out across `workers` +/// concurrent tasks. Same as before except the browser comes from a +/// BrowserManager lease so it interleaves cleanly with the metadata pass. /// -/// A session-expired result from any task aborts the whole phase — -/// continuing wastes time and risks the source flagging the pattern. +/// A `SessionExpired` result aborts the phase. #[allow(clippy::too_many_arguments)] async fn sync_bookmarked_chapter_content( - browser: &chromiumoxide::Browser, + manager: Arc, db: &PgPool, storage: Arc, http: &reqwest::Client, @@ -424,13 +270,6 @@ async fn sync_bookmarked_chapter_content( workers: usize, force_refetch: bool, ) -> anyhow::Result<()> { - // Subquery first so DISTINCT collapses multi-user bookmark rows - // without forcing every ORDER BY column into the SELECT list (PG - // rejects `ORDER BY c.created_at` against `SELECT DISTINCT c.id, - // c.manga_id, cs.source_url` with "ORDER BY expressions must - // appear in select list"). Outer ORDER BY then groups chapters by - // their manga, oldest first, so backfills proceed in reading - // order per manga. let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as( r#" SELECT id, manga_id, source_url FROM ( @@ -457,12 +296,6 @@ async fn sync_bookmarked_chapter_content( } tracing::info!(count = pending.len(), workers, "chapter content phase starting"); - // `for_each_concurrent` polls up to `workers` futures at once on - // the *current* task, so each future borrows the browser, db, and - // http client from the outer scope rather than requiring 'static - // captures via spawn. chromiumoxide's `Browser::new_page(&self)` - // is safe for concurrent calls; the per-host rate limiter - // serializes the actual on-wire requests against each origin. let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); let stats = std::sync::Mutex::new(WorkerStats::default()); @@ -471,13 +304,23 @@ async fn sync_bookmarked_chapter_content( let session_expired = Arc::clone(&session_expired); let storage = Arc::clone(&storage); let rate = Arc::clone(&rate); + let manager = Arc::clone(&manager); let stats = &stats; async move { if session_expired.load(std::sync::atomic::Ordering::Relaxed) { return; } + let lease = match manager.acquire().await { + Ok(l) => l, + Err(e) => { + tracing::error!(%chapter_id, error = ?e, "browser acquire failed"); + let mut s = stats.lock().unwrap(); + s.failed += 1; + return; + } + }; let outcome = content::sync_chapter_content( - browser, + &lease, db, storage.as_ref(), http, @@ -488,6 +331,7 @@ async fn sync_bookmarked_chapter_content( force_refetch, ) .await; + drop(lease); let mut s = stats.lock().unwrap(); match outcome { Ok(SyncOutcome::Fetched { pages }) => { @@ -535,51 +379,6 @@ struct WorkerStats { failed: usize, } -async fn download_and_store_cover( - db: &PgPool, - storage: &dyn Storage, - http: &reqwest::Client, - rate: &HostRateLimiters, - manga_url: &str, - manga_id: Uuid, - cover_url: &str, -) -> anyhow::Result<()> { - let absolute = reqwest::Url::parse(manga_url) - .context("parse manga URL")? - .join(cover_url) - .context("join cover URL onto manga URL")?; - - rate.wait_for(absolute.as_str()).await?; - let resp = http - .get(absolute.clone()) - // Source CDNs commonly check Referer. Set it to the manga - // detail page that linked the cover — same UX as a real - // browser fetching the image. - .header(reqwest::header::REFERER, manga_url) - .send() - .await - .with_context(|| format!("GET {absolute}"))? - .error_for_status() - .with_context(|| format!("non-2xx for {absolute}"))?; - let bytes = resp.bytes().await.context("read cover body")?; - - // `infer` sniffs the magic bytes — same crate the upload handler - // uses, so we don't trust the URL's extension. - let kind = infer::get(&bytes); - let ext = kind.map(|k| k.extension()).unwrap_or("bin"); - let key = format!("mangas/{manga_id}/cover.{ext}"); - - storage - .put(&key, &bytes) - .await - .with_context(|| format!("store cover at {key}"))?; - repo::manga::set_cover_image_path(db, manga_id, &key) - .await - .with_context(|| format!("update cover_image_path for {manga_id}"))?; - tracing::info!(manga_id = %manga_id, key = %key, bytes = bytes.len(), %absolute, "cover stored"); - Ok(()) -} - fn resolve_start_url() -> anyhow::Result { if let Some(arg) = std::env::args().nth(1) { return Ok(arg); @@ -591,12 +390,6 @@ fn resolve_start_url() -> anyhow::Result { }) } -fn origin_of(url: &str) -> Option { - let (scheme, rest) = url.split_once("://")?; - let host = rest.split('/').next()?; - Some(format!("{scheme}://{host}")) -} - fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() @@ -611,3 +404,4 @@ fn env_bool(name: &str, default: bool) -> bool { _ => default, } } + diff --git a/backend/src/config.rs b/backend/src/config.rs index 3a72370..113ec89 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -1,4 +1,10 @@ use std::path::PathBuf; +use std::time::Duration; + +use chrono::NaiveTime; +use chrono_tz::Tz; + +use crate::crawler::browser::LaunchOptions; #[derive(Clone, Debug)] pub struct AuthConfig { @@ -45,6 +51,54 @@ pub struct Config { pub auth: AuthConfig, pub upload: UploadConfig, pub cors_allowed_origins: Vec, + pub crawler: CrawlerConfig, +} + +/// All crawler-daemon knobs read from env. Mirrors the env vars the +/// `bin/crawler` binary already reads, plus the new daemon-only knobs +/// (daily_at, tz, idle_timeout, retention_days, daemon_enabled). +/// +/// `daemon_enabled = false` skips the daemon spawn entirely — used by +/// integration tests and dev runs that don't want background activity. +#[derive(Clone, Debug)] +pub struct CrawlerConfig { + pub daemon_enabled: bool, + pub daily_at: NaiveTime, + pub tz: Tz, + pub idle_timeout: Duration, + pub chapter_workers: usize, + pub retention_days: u32, + pub start_url: Option, + pub rate_ms: u64, + pub cdn_host: Option, + pub cdn_rate_ms: u64, + pub phpsessid: Option, + pub cookie_domain: Option, + pub user_agent: Option, + pub proxy: Option, + pub browser: LaunchOptions, +} + +impl Default for CrawlerConfig { + fn default() -> Self { + Self { + daemon_enabled: false, + daily_at: NaiveTime::from_hms_opt(0, 0, 0).unwrap(), + tz: Tz::UTC, + idle_timeout: Duration::from_secs(600), + chapter_workers: 1, + retention_days: 7, + start_url: None, + rate_ms: 1000, + cdn_host: None, + cdn_rate_ms: 1000, + phpsessid: None, + cookie_domain: None, + user_agent: None, + proxy: None, + browser: LaunchOptions::headless(), + } + } } impl Config { @@ -77,10 +131,65 @@ impl Config { .collect() }) .unwrap_or_default(), + crawler: CrawlerConfig::from_env()?, }) } } +impl CrawlerConfig { + pub fn from_env() -> anyhow::Result { + // Parse CRAWLER_DAILY_AT (HH:MM, 24h). Invalid → fail fast. + let daily_at = match std::env::var("CRAWLER_DAILY_AT").ok().as_deref() { + None | Some("") => NaiveTime::from_hms_opt(0, 0, 0).unwrap(), + Some(raw) => NaiveTime::parse_from_str(raw, "%H:%M").map_err(|e| { + anyhow::anyhow!("CRAWLER_DAILY_AT must be HH:MM (got {raw:?}): {e}") + })?, + }; + let tz: Tz = match std::env::var("CRAWLER_TZ").ok().as_deref() { + None | Some("") => Tz::UTC, + Some(raw) => raw + .parse() + .map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?, + }; + Ok(Self { + daemon_enabled: env_bool("CRAWLER_DAEMON", true), + daily_at, + tz, + idle_timeout: Duration::from_secs(env_u64("CRAWLER_IDLE_TIMEOUT_S", 600)), + chapter_workers: env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize, + retention_days: env_u64("CRAWLER_JOB_RETENTION_DAYS", 7) as u32, + start_url: std::env::var("CRAWLER_START_URL") + .ok() + .filter(|s| !s.trim().is_empty()), + rate_ms: env_u64("CRAWLER_RATE_MS", 1000), + cdn_host: std::env::var("CRAWLER_CDN_HOST") + .ok() + .filter(|s| !s.trim().is_empty()), + cdn_rate_ms: env_u64("CRAWLER_CDN_RATE_MS", env_u64("CRAWLER_RATE_MS", 1000)), + phpsessid: std::env::var("CRAWLER_PHPSESSID") + .ok() + .filter(|s| !s.trim().is_empty()), + cookie_domain: std::env::var("CRAWLER_COOKIE_DOMAIN") + .ok() + .filter(|s| !s.trim().is_empty()), + user_agent: std::env::var("CRAWLER_USER_AGENT") + .ok() + .filter(|s| !s.trim().is_empty()), + proxy: std::env::var("CRAWLER_PROXY") + .ok() + .filter(|s| !s.trim().is_empty()), + browser: LaunchOptions::from_env(), + }) + } +} + +fn env_u64(name: &str, default: u64) -> u64 { + std::env::var(name) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + fn env_bool(name: &str, default: bool) -> bool { match std::env::var(name).ok().as_deref() { Some("1") | Some("true") | Some("TRUE") | Some("yes") => true, diff --git a/backend/src/crawler/browser.rs b/backend/src/crawler/browser.rs index 1e419da..964e82f 100644 --- a/backend/src/crawler/browser.rs +++ b/backend/src/crawler/browser.rs @@ -15,6 +15,7 @@ //! caller-provided. use std::path::PathBuf; +use std::sync::Arc; use anyhow::Context; use chromiumoxide::browser::{Browser, BrowserConfig}; @@ -95,25 +96,49 @@ pub(crate) fn parse_args(s: &str) -> Vec { /// Owned browser plus the spawned task that drives its CDP event loop. /// Dropping `Handle` without calling `close` leaks the Chromium process /// — always call `close().await` in production paths. +/// +/// The browser is stored behind an `Arc` so it can be shared across +/// worker tasks (via [`Handle::shared`]) without copying. `Browser::new_page` +/// only needs `&self`, so multiple workers can drive the same browser +/// concurrently as long as the manager keeps the `Arc` alive. pub struct Handle { - browser: Browser, + browser: Arc, driver: JoinHandle<()>, } impl Handle { + /// Borrow the browser. Equivalent to `&*handle.shared()`. pub fn browser(&self) -> &Browser { &self.browser } - pub fn browser_mut(&mut self) -> &mut Browser { - &mut self.browser + /// Clone the shared handle. Workers hold these to call `new_page` + /// concurrently. The browser only exits when the last `Arc` + /// is dropped (kill-on-drop), or when `close()` is called on the + /// originating `Handle` while it is the sole holder. + pub fn shared(&self) -> Arc { + Arc::clone(&self.browser) } - /// Closes the browser and awaits the driver task. Safe to call - /// multiple times — subsequent calls are no-ops. - pub async fn close(mut self) -> anyhow::Result<()> { - let _ = self.browser.close().await; - let _ = self.browser.wait().await; + /// Closes the browser and awaits the driver task. If other Arcs to + /// the browser are still alive we fall back to drop-kills-Chromium + /// semantics and just join the driver — this is the rare case where + /// shutdown raced an outstanding worker; the OS-level kill is the + /// safety net. + pub async fn close(self) -> anyhow::Result<()> { + match Arc::try_unwrap(self.browser) { + Ok(mut owned) => { + let _ = owned.close().await; + let _ = owned.wait().await; + } + Err(shared) => { + tracing::warn!( + strong_count = Arc::strong_count(&shared), + "Handle::close while Arc still shared — relying on kill-on-drop" + ); + drop(shared); + } + } let _ = self.driver.await; Ok(()) } @@ -184,7 +209,10 @@ pub async fn launch(options: LaunchOptions) -> anyhow::Result { } }); - Ok(Handle { browser, driver }) + Ok(Handle { + browser: Arc::new(browser), + driver, + }) } fn cache_dir() -> anyhow::Result { diff --git a/backend/src/crawler/browser_manager.rs b/backend/src/crawler/browser_manager.rs new file mode 100644 index 0000000..54a411c --- /dev/null +++ b/backend/src/crawler/browser_manager.rs @@ -0,0 +1,262 @@ +//! Lazy-launch / idle-teardown Chromium manager for the daemon. +//! +//! The first worker that calls [`BrowserManager::acquire`] triggers a real +//! Chromium launch (and the `on_launch` hook — used to re-inject the +//! PHPSESSID cookie on every fresh process). Each acquire bumps an active +//! counter; the returned [`BrowserLease`] decrements it on drop. +//! +//! When the active counter hits zero, a background reaper task waits +//! `idle_timeout`. If still zero on wake, it closes Chromium and clears the +//! cached handle. The next acquire re-launches. +//! +//! `idle_timeout = Duration::ZERO` disables the reaper — Chromium stays alive +//! until [`BrowserManager::shutdown`]. + +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Context; +use chromiumoxide::browser::Browser; +use futures_util::future::BoxFuture; +use tokio::sync::{Mutex, Notify}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::crawler::browser::{self, LaunchOptions}; + +/// Hook invoked on every fresh launch with the new browser. Typically used +/// to re-inject PHPSESSID + run the session probe. Errors abort the +/// `acquire` that triggered the launch — the next acquire will re-launch. +pub type OnLaunch = + Arc) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>; + +/// Returns an `OnLaunch` that does nothing — useful when no session is +/// configured (e.g. CLI metadata-only runs). +pub fn noop_on_launch() -> OnLaunch { + Arc::new(|_| Box::pin(async { Ok(()) })) +} + +/// Decoupled active-lease tracker. Owns the atomic counter and the idle +/// notifier so the wiring is unit-testable without standing up a real +/// `BrowserManager` (which would require launching Chromium). +#[derive(Default)] +pub(crate) struct ActiveTracker { + counter: AtomicUsize, + idle_signal: Notify, +} + +impl ActiveTracker { + pub(crate) fn new() -> Arc { + Arc::new(Self::default()) + } + + pub(crate) fn acquire(self: &Arc) { + self.counter.fetch_add(1, Ordering::AcqRel); + } + + pub(crate) fn release(self: &Arc) { + if self.counter.fetch_sub(1, Ordering::AcqRel) == 1 { + self.idle_signal.notify_one(); + } + } + + pub(crate) fn current(&self) -> usize { + self.counter.load(Ordering::Acquire) + } + + pub(crate) fn idle_signal(&self) -> &Notify { + &self.idle_signal + } +} + +pub struct BrowserManager { + inner: Mutex, + active: Arc, + launch_opts: LaunchOptions, + idle_timeout: Duration, + on_launch: OnLaunch, +} + +struct Inner { + handle: Option, + shared: Option>, +} + +impl BrowserManager { + pub fn new( + launch_opts: LaunchOptions, + idle_timeout: Duration, + on_launch: OnLaunch, + ) -> Arc { + Arc::new(Self { + inner: Mutex::new(Inner { + handle: None, + shared: None, + }), + active: ActiveTracker::new(), + launch_opts, + idle_timeout, + on_launch, + }) + } + + /// Acquire a shared browser lease. The first acquire after a teardown + /// launches a fresh Chromium (and runs `on_launch`); subsequent acquires + /// while a process is alive just bump the counter and clone the `Arc`. + pub async fn acquire(&self) -> anyhow::Result { + let mut guard = self.inner.lock().await; + if guard.handle.is_none() { + let handle = browser::launch(self.launch_opts.clone()) + .await + .context("BrowserManager: launch chromium")?; + let shared = handle.shared(); + // Run the on-launch hook before publishing the handle so a session + // probe failure doesn't leave a half-initialized browser behind. + if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await { + // Close the just-launched browser since we won't be using it. + let _ = handle.close().await; + return Err(e.context("BrowserManager: on_launch hook failed")); + } + guard.handle = Some(handle); + guard.shared = Some(shared); + } + let browser = guard + .shared + .as_ref() + .expect("shared set above") + .clone(); + self.active.acquire(); + Ok(BrowserLease { + browser, + active: Arc::clone(&self.active), + }) + } + + /// Forcefully close the cached browser regardless of active count. + /// Used on daemon shutdown. After this returns the next acquire will + /// re-launch from scratch. + pub async fn shutdown(&self) { + let mut guard = self.inner.lock().await; + guard.shared = None; + if let Some(handle) = guard.handle.take() { + let _ = handle.close().await; + } + } + + fn idle_timeout(&self) -> Duration { + self.idle_timeout + } + + fn active(&self) -> Arc { + Arc::clone(&self.active) + } +} + +/// Background reaper. Returns immediately when `idle_timeout == 0`. +/// Otherwise spawns a task that: +/// 1. Waits on `idle_signal` (woken when active hits zero). +/// 2. Sleeps `idle_timeout`. +/// 3. Re-checks the counter under the mutex — if still zero, takes the +/// handle and closes it. +/// +/// Repeats forever until `cancel` fires. +pub fn spawn_idle_reaper(mgr: Arc, cancel: CancellationToken) -> JoinHandle<()> { + tokio::spawn(async move { + if mgr.idle_timeout().is_zero() { + // Block until cancellation, then exit. + cancel.cancelled().await; + return; + } + let active = mgr.active(); + loop { + tokio::select! { + _ = cancel.cancelled() => return, + _ = active.idle_signal().notified() => {} + } + if active.current() > 0 { + continue; + } + tokio::select! { + _ = cancel.cancelled() => return, + _ = tokio::time::sleep(mgr.idle_timeout()) => {} + } + let mut guard = mgr.inner.lock().await; + if active.current() > 0 { + // A worker grabbed a lease during the sleep — abort teardown. + continue; + } + let handle = guard.handle.take(); + guard.shared = None; + drop(guard); + if let Some(h) = handle { + let _ = h.close().await; + tracing::info!("BrowserManager: idle teardown — Chromium closed"); + } + } + }) +} + +/// A worker-side handle that keeps the browser alive while in scope. +/// `Deref` so callers can pass `&*lease` to APIs that +/// expect `&Browser`. +pub struct BrowserLease { + browser: Arc, + active: Arc, +} + +impl Deref for BrowserLease { + type Target = Browser; + fn deref(&self) -> &Browser { + &self.browser + } +} + +impl Drop for BrowserLease { + fn drop(&mut self) { + self.active.release(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::AtomicBool; + + #[test] + fn noop_on_launch_is_send_sync() { + fn assert_send_sync(_: &T) {} + let h = noop_on_launch(); + assert_send_sync(&h); + } + + #[tokio::test] + async fn active_tracker_signals_idle_only_on_zero_transition() { + let tracker = ActiveTracker::new(); + let signaled = Arc::new(AtomicBool::new(false)); + { + let s = Arc::clone(&signaled); + let t = Arc::clone(&tracker); + tokio::spawn(async move { + t.idle_signal().notified().await; + s.store(true, Ordering::Release); + }); + } + + tracker.acquire(); + tracker.acquire(); + assert_eq!(tracker.current(), 2); + tracker.release(); + assert_eq!(tracker.current(), 1); + tokio::time::sleep(Duration::from_millis(20)).await; + assert!(!signaled.load(Ordering::Acquire), "no idle signal at count 1"); + tracker.release(); + tokio::time::sleep(Duration::from_millis(20)).await; + assert_eq!(tracker.current(), 0); + assert!( + signaled.load(Ordering::Acquire), + "idle signal fires on 1 -> 0 transition" + ); + } +} diff --git a/backend/src/crawler/daemon.rs b/backend/src/crawler/daemon.rs new file mode 100644 index 0000000..4c822e4 --- /dev/null +++ b/backend/src/crawler/daemon.rs @@ -0,0 +1,633 @@ +//! In-process crawler daemon. +//! +//! Owns a cron task that fires a daily metadata pass and N worker tasks +//! that drain `SyncChapterContent` jobs from `crawler_jobs`. The dispatch +//! seams ([`MetadataPass`], [`ChapterDispatcher`]) are traits so tests can +//! inject stubs without standing up a real Chromium / `Source` impl. +//! +//! ## Cron +//! +//! Each tick: +//! 1. Acquire a Postgres advisory lock on a dedicated pool connection +//! (multi-replica safety). Skip the tick on contention. +//! 2. Call [`MetadataPass::run`] (typically `pipeline::run_metadata_pass`). +//! 3. Enqueue `SyncChapterContent` jobs for any bookmarked manga whose +//! chapters still have `page_count = 0`. +//! 4. Reap `done` jobs older than `retention_days`. +//! 5. Persist `last_metadata_tick_at` and release the lock. +//! +//! If the last persisted tick is older than the most recent scheduled slot +//! (e.g. backend was down at midnight), the daemon fires immediately on +//! startup before resuming the regular schedule. +//! +//! ## Workers +//! +//! Each worker leases one chapter-content job at a time, dispatches via the +//! [`ChapterDispatcher`], and acks `done` / `failed` / re-`pending` based on +//! the outcome. A `SessionExpired` outcome flips the sticky +//! `session_expired` flag — all workers idle while it's set (until operator +//! restart with a refreshed PHPSESSID). +//! +//! Worker dispatch is wrapped in `catch_unwind` so a panicking handler +//! marks the job failed instead of taking down the worker task. + +use std::panic::AssertUnwindSafe; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use chrono::{DateTime, Datelike, NaiveTime, TimeZone, Timelike, Utc}; +use chrono_tz::Tz; +use futures_util::FutureExt; +use serde_json::json; +use sqlx::PgPool; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; + +use crate::crawler::content::SyncOutcome; +use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT}; +use crate::crawler::pipeline; + +/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a +/// big-endian i64. Hardcoded so every replica agrees on the lock identity +/// without consulting config. +pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244; + +const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at"; + +#[async_trait] +pub trait MetadataPass: Send + Sync { + async fn run(&self) -> anyhow::Result; +} + +#[async_trait] +pub trait ChapterDispatcher: Send + Sync { + async fn dispatch(&self, payload: JobPayload) -> anyhow::Result; +} + +/// Configuration for [`spawn`]. Use `None` for `metadata_pass` to disable +/// the cron entirely (worker-pool-only mode — useful when only the +/// bookmark-triggered enqueue path is wanted). +pub struct DaemonConfig { + pub metadata_pass: Option>, + pub dispatcher: Arc, + pub chapter_workers: usize, + pub daily_at: NaiveTime, + pub tz: Tz, + pub retention_days: u32, + pub session_expired: Arc, + /// Tasks that should run alongside the cron + workers and be cancelled + /// on shutdown. Used to hand the daemon ownership of the browser + /// manager's idle reaper. + pub extra_tasks: Vec>, +} + +pub struct DaemonHandle { + cancel: CancellationToken, + join: JoinSet<()>, + extra: Vec>, +} + +impl DaemonHandle { + /// Trigger shutdown and await all worker / cron / extra tasks. + pub async fn shutdown(mut self) { + self.cancel.cancel(); + while self.join.join_next().await.is_some() {} + for task in self.extra.drain(..) { + let _ = task.await; + } + } + + /// Cancellation token that drives shutdown — exposed so callers + /// (`app::spawn_crawler_daemon`) can hand the same token to auxiliary + /// tasks (e.g. the BrowserManager idle reaper) and have them stop on + /// the daemon's signal. + pub fn cancel_token(&self) -> CancellationToken { + self.cancel.clone() + } +} + +/// Spawn the daemon. Returns immediately; tasks run in the background. +/// Pass an external [`CancellationToken`] so auxiliary tasks (e.g. a +/// BrowserManager idle reaper) can share the same shutdown signal — +/// typically created in the caller, cloned into both spawns. +pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> DaemonHandle { + let mut join = JoinSet::new(); + + let DaemonConfig { + metadata_pass, + dispatcher, + chapter_workers, + daily_at, + tz, + retention_days, + session_expired, + extra_tasks, + } = cfg; + + if let Some(metadata) = metadata_pass { + let ctx = CronContext { + pool: pool.clone(), + cancel: cancel.clone(), + daily_at, + tz, + retention_days, + metadata, + }; + join.spawn(async move { ctx.run().await }); + } else { + tracing::info!("crawler daemon: no metadata_pass — cron disabled"); + } + + for worker_id in 0..chapter_workers.max(1) { + let ctx = WorkerContext { + pool: pool.clone(), + cancel: cancel.clone(), + dispatcher: Arc::clone(&dispatcher), + session_expired: Arc::clone(&session_expired), + id: worker_id, + }; + join.spawn(async move { ctx.run().await }); + } + + DaemonHandle { + cancel, + join, + extra: extra_tasks, + } +} + +// --------------------------------------------------------------------------- +// Cron +// --------------------------------------------------------------------------- + +struct CronContext { + pool: PgPool, + cancel: CancellationToken, + daily_at: NaiveTime, + tz: Tz, + retention_days: u32, + metadata: Arc, +} + +impl CronContext { + async fn run(self) { + // On startup, fire immediately if the most recent slot has already + // passed and we never recorded a tick for it. + let now = Utc::now(); + let mut catchup = match read_last_tick(&self.pool).await { + Ok(Some(last)) => previous_fire(now, self.daily_at, self.tz) > last, + Ok(None) => true, + Err(e) => { + tracing::warn!(?e, "cron: read_last_tick failed; assuming no catch-up"); + false + } + }; + + loop { + if catchup { + tracing::info!("cron: catch-up tick (missed scheduled slot)"); + self.run_tick().await; + catchup = false; + continue; + } + // Recompute next-fire from now() each iteration so clock jumps + // (NTP step, suspend/resume) don't strand us on a stale instant. + let next = next_fire(Utc::now(), self.daily_at, self.tz); + let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO); + tracing::info!( + next_fire_utc = %next.to_rfc3339(), + wait_seconds = wait.as_secs(), + "cron: sleeping until next slot" + ); + tokio::select! { + _ = tokio::time::sleep(wait) => {} + _ = self.cancel.cancelled() => { + tracing::info!("cron: shutdown"); + return; + } + } + self.run_tick().await; + } + } + + async fn run_tick(&self) { + let mut conn = match self.pool.acquire().await { + Ok(c) => c, + Err(e) => { + tracing::error!(?e, "cron: acquire conn failed; skipping tick"); + return; + } + }; + // pg_try_advisory_lock is session-scoped — we must hold the same + // connection for the unlock or the call silently no-ops on a + // different connection from the pool. + let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)") + .bind(CRON_LOCK_KEY) + .fetch_one(&mut *conn) + .await + .unwrap_or(false); + if !acquired { + tracing::info!("cron: tick skipped — another replica holds the lock"); + return; + } + + match self.metadata.run().await { + Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"), + Err(e) => tracing::error!(?e, "cron: metadata pass failed"), + } + + match pipeline::enqueue_bookmarked_pending(&self.pool).await { + Ok(summary) => tracing::info!(?summary, "cron: enqueued bookmarked-pending"), + Err(e) => tracing::error!(?e, "cron: enqueue_bookmarked_pending failed"), + } + + match jobs::reap_done(&self.pool, self.retention_days).await { + Ok(n) => tracing::info!(reaped = n, "cron: done-job reaper finished"), + Err(e) => tracing::error!(?e, "cron: done-job reaper failed"), + } + + if let Err(e) = write_last_tick(&self.pool, Utc::now()).await { + tracing::warn!(?e, "cron: persist last_metadata_tick_at failed"); + } + + let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(CRON_LOCK_KEY) + .execute(&mut *conn) + .await; + drop(conn); + } +} + +// --------------------------------------------------------------------------- +// Workers +// --------------------------------------------------------------------------- + +struct WorkerContext { + pool: PgPool, + cancel: CancellationToken, + dispatcher: Arc, + session_expired: Arc, + id: usize, +} + +impl WorkerContext { + async fn run(self) { + loop { + if self.cancel.is_cancelled() { + tracing::info!(worker = self.id, "worker: shutdown"); + return; + } + if self.session_expired.load(Ordering::Acquire) { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(30)) => continue, + _ = self.cancel.cancelled() => return, + } + } + let leases = match jobs::lease( + &self.pool, + Some(KIND_SYNC_CHAPTER_CONTENT), + 1, + Duration::from_secs(60), + ) + .await + { + Ok(v) => v, + Err(e) => { + tracing::warn!(worker = self.id, ?e, "worker: lease failed"); + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => continue, + _ = self.cancel.cancelled() => return, + } + } + }; + let Some(lease) = leases.into_iter().next() else { + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(1)) => continue, + _ = self.cancel.cancelled() => return, + } + }; + self.process_lease(lease).await; + } + } + + async fn process_lease(&self, lease: Lease) { + // Consumer-side dedup safety net: if the chapter already has pages + // (because a force-refetch race or a job that was re-enqueued + // after a previous one finished), ack done without re-fetching. + if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload { + let page_count: Option = sqlx::query_scalar( + "SELECT page_count FROM chapters WHERE id = $1", + ) + .bind(chapter_id) + .fetch_optional(&self.pool) + .await + .ok() + .flatten(); + if matches!(page_count, Some(n) if n > 0) { + let _ = jobs::ack_done(&self.pool, lease.id).await; + return; + } + } + + let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) + .catch_unwind() + .await; + match outcome { + Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => { + let _ = jobs::ack_done(&self.pool, lease.id).await; + } + Ok(Ok(SyncOutcome::SessionExpired)) => { + tracing::error!( + worker = self.id, + lease_id = %lease.id, + "session expired — workers will idle until restart" + ); + self.session_expired.store(true, Ordering::Release); + let _ = jobs::release(&self.pool, lease.id).await; + } + Ok(Err(e)) => { + tracing::warn!( + worker = self.id, + lease_id = %lease.id, + error = ?e, + "worker: dispatch error — ack failed" + ); + let _ = jobs::ack_failed( + &self.pool, + lease.id, + &format!("{e:#}"), + lease.attempts, + lease.max_attempts, + ) + .await; + } + Err(_panic) => { + tracing::error!( + worker = self.id, + lease_id = %lease.id, + "worker: dispatcher panicked — ack failed" + ); + let _ = jobs::ack_failed( + &self.pool, + lease.id, + "worker panicked", + lease.attempts, + lease.max_attempts, + ) + .await; + } + } + } +} + +// --------------------------------------------------------------------------- +// Cron timing primitives +// --------------------------------------------------------------------------- + +/// Compute the next UTC instant when `daily_at` (interpreted in `tz`) will +/// fire, strictly after `now`. Handles DST gaps (spring-forward) by +/// advancing past the gap; on DST overlap (fall-back) picks the later +/// instant so the job runs once, not twice. +pub fn next_fire(now: DateTime, daily_at: NaiveTime, tz: Tz) -> DateTime { + let now_local = now.with_timezone(&tz); + // Start with today's slot in the local TZ. + let mut candidate = local_at(now_local.date_naive(), daily_at, tz); + // If today's slot is in the past (or now), roll forward day-by-day. + while candidate <= now { + let next_day = candidate + .with_timezone(&tz) + .date_naive() + .succ_opt() + .unwrap_or_else(|| { + // Defensive: succ_opt only fails at chrono's max date. + chrono::NaiveDate::from_ymd_opt( + candidate.year(), + candidate.month(), + candidate.day(), + ) + .expect("valid date") + }); + candidate = local_at(next_day, daily_at, tz); + } + candidate +} + +/// The most recent fire instant at or before `now`. Used to detect missed +/// slots after a restart. +pub fn previous_fire(now: DateTime, daily_at: NaiveTime, tz: Tz) -> DateTime { + let now_local = now.with_timezone(&tz); + let today = local_at(now_local.date_naive(), daily_at, tz); + if today <= now { + return today; + } + let yesterday = now_local + .date_naive() + .pred_opt() + .expect("a day before now"); + local_at(yesterday, daily_at, tz) +} + +/// Resolve a local date+time to a UTC instant in `tz`, navigating DST +/// edges deterministically: +/// - `LocalResult::Single` → that instant. +/// - `LocalResult::Ambiguous(_, latest)` → the later instant (fall-back +/// hour). Picking latest means a daily job fires once across the +/// repeated hour, not twice. +/// - `LocalResult::None` → spring-forward gap. Advance the local time +/// by 1 minute and try again, repeating up to 120 times (so the worst +/// case is still well inside an hour-long gap). +fn local_at(date: chrono::NaiveDate, time: NaiveTime, tz: Tz) -> DateTime { + use chrono::LocalResult; + for offset_minutes in 0..120 { + let mut t = time; + if offset_minutes > 0 { + let added = chrono::NaiveTime::from_num_seconds_from_midnight_opt( + ((time.num_seconds_from_midnight() as i64 + offset_minutes * 60) % 86_400) as u32, + 0, + ) + .unwrap_or(time); + t = added; + } + let naive = date.and_time(t); + match tz.from_local_datetime(&naive) { + LocalResult::Single(dt) => return dt.with_timezone(&Utc), + LocalResult::Ambiguous(_, latest) => return latest.with_timezone(&Utc), + LocalResult::None => continue, + } + } + // Should be unreachable — DST gaps are always less than an hour. + Utc.from_utc_datetime(&date.and_time(time)) +} + +// --------------------------------------------------------------------------- +// crawler_state I/O +// --------------------------------------------------------------------------- + +async fn read_last_tick(pool: &PgPool) -> sqlx::Result>> { + let row: Option = sqlx::query_scalar( + "SELECT value FROM crawler_state WHERE key = $1", + ) + .bind(STATE_KEY_LAST_TICK) + .fetch_optional(pool) + .await?; + Ok(row.and_then(|v| { + v.get("at") + .and_then(|s| s.as_str()) + .and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + })) +} + +async fn write_last_tick(pool: &PgPool, at: DateTime) -> sqlx::Result<()> { + sqlx::query( + "INSERT INTO crawler_state (key, value, updated_at) \ + VALUES ($1, $2, now()) \ + ON CONFLICT (key) DO UPDATE \ + SET value = EXCLUDED.value, updated_at = now()", + ) + .bind(STATE_KEY_LAST_TICK) + .bind(json!({ "at": at.to_rfc3339() })) + .execute(pool) + .await?; + Ok(()) +} + +// --------------------------------------------------------------------------- +// Test helpers (not gated on cfg(test) — integration tests in tests/ dir +// need them too). +// --------------------------------------------------------------------------- + +pub mod test_support { + //! Lightweight stubs the daemon tests use. Public because integration + //! tests live outside this module. + use super::*; + use std::sync::atomic::AtomicUsize; + + pub struct CountingMetadataPass { + pub count: AtomicUsize, + } + + impl Default for CountingMetadataPass { + fn default() -> Self { + Self { + count: AtomicUsize::new(0), + } + } + } + + #[async_trait] + impl MetadataPass for CountingMetadataPass { + async fn run(&self) -> anyhow::Result { + self.count.fetch_add(1, Ordering::AcqRel); + Ok(pipeline::MetadataStats::default()) + } + } + + pub type DispatchFn = Arc< + dyn Fn(JobPayload) -> futures_util::future::BoxFuture<'static, anyhow::Result> + + Send + + Sync, + >; + + pub struct StubDispatcher { + pub handler: DispatchFn, + } + + #[async_trait] + impl ChapterDispatcher for StubDispatcher { + async fn dispatch(&self, payload: JobPayload) -> anyhow::Result { + (self.handler)(payload).await + } + } + + pub fn always_done() -> Arc { + Arc::new(StubDispatcher { + handler: Arc::new(|_| Box::pin(async { Ok(SyncOutcome::Fetched { pages: 1 }) })), + }) + } + + pub fn panicking_dispatcher() -> Arc { + Arc::new(StubDispatcher { + handler: Arc::new(|_| Box::pin(async { panic!("intentional dispatcher panic") })), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration as ChronoDuration; + + fn dt_utc(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime { + Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).unwrap() + } + + #[test] + fn next_fire_in_utc_at_midnight_advances_one_day() { + let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC + let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); + let next = next_fire(now, at, Tz::UTC); + // Next midnight is May 26 00:00 UTC. + assert_eq!(next, dt_utc(2026, 5, 26, 0, 0)); + } + + #[test] + fn next_fire_before_today_slot_returns_today() { + let now = dt_utc(2026, 5, 25, 23, 0); // 23:00 UTC + let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap(); + let next = next_fire(now, at, Tz::UTC); + assert_eq!(next, dt_utc(2026, 5, 25, 23, 30)); + } + + #[test] + fn next_fire_skips_spring_forward_gap_in_europe_berlin() { + // 2024-03-31: clocks jump 02:00 -> 03:00 in Berlin (CET -> CEST). + // Asking for daily_at = 02:30 on the morning of the jump should + // land on the *next valid* local instant past the gap. We test + // by computing `next_fire` at 2024-03-31 00:30 UTC (= 01:30 CET, + // i.e. just before the gap). The next 02:30 local does not exist, + // so the helper advances past it. + let now = dt_utc(2024, 3, 31, 0, 30); // 01:30 local Berlin (CET = UTC+1) + let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap(); + let next = next_fire(now, at, Tz::Europe__Berlin); + // Local Berlin time skips from 02:00 -> 03:00. After the +1 minute + // search, the first valid slot is 03:00 local on 2024-03-31, which + // is 01:00 UTC (CEST = UTC+2). + // We assert the result is strictly between (now) and 1h later + // and is in UTC — the exact minute depends on how many +1m steps + // were required. + assert!(next > now); + assert!(next < now + ChronoDuration::hours(2)); + } + + #[test] + fn next_fire_on_fall_back_picks_later_instant() { + // 2024-10-27: clocks jump 03:00 -> 02:00 (CEST -> CET) in Berlin. + // 02:30 happens twice on that day. We pick the later one. + let now = dt_utc(2024, 10, 26, 12, 0); // day before, noon UTC + let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap(); + let next = next_fire(now, at, Tz::Europe__Berlin); + // First 02:30 local is 00:30 UTC (CEST = UTC+2). + // Second 02:30 local is 01:30 UTC (CET = UTC+1). + // We expect the later instant: 01:30 UTC on 2024-10-27. + assert_eq!(next, dt_utc(2024, 10, 27, 1, 30)); + } + + #[test] + fn previous_fire_returns_today_when_now_is_after_slot() { + let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC + let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); + let prev = previous_fire(now, at, Tz::UTC); + assert_eq!(prev, dt_utc(2026, 5, 25, 0, 0)); + } + + #[test] + fn previous_fire_returns_yesterday_when_now_is_before_today_slot() { + let now = dt_utc(2026, 5, 25, 8, 0); // 08:00 UTC + let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap(); + let prev = previous_fire(now, at, Tz::UTC); + assert_eq!(prev, dt_utc(2026, 5, 24, 23, 30)); + } +} diff --git a/backend/src/crawler/mod.rs b/backend/src/crawler/mod.rs index e9a4b5b..c50c39d 100644 --- a/backend/src/crawler/mod.rs +++ b/backend/src/crawler/mod.rs @@ -14,9 +14,12 @@ //! - [`diff`]: change detection — new / updated / dropped semantics. pub mod browser; +pub mod browser_manager; pub mod content; +pub mod daemon; pub mod diff; pub mod jobs; +pub mod pipeline; pub mod rate_limit; pub mod session; pub mod source; diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs new file mode 100644 index 0000000..88fa4e9 --- /dev/null +++ b/backend/src/crawler/pipeline.rs @@ -0,0 +1,347 @@ +//! Crawler pipeline — the reusable metadata pass and the enqueue helpers +//! that fan out chapter-content work. Shared between the daemon (cron tick) +//! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep. + +use anyhow::Context; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::crawler::browser_manager::BrowserManager; +use crate::crawler::jobs::{self, EnqueueResult, JobPayload}; +use crate::crawler::rate_limit::HostRateLimiters; +use crate::crawler::source::target::TargetSource; +use crate::crawler::source::{DiscoverMode, FetchContext, Source}; +use crate::repo; +use crate::storage::Storage; + +/// Coarse counters surfaced for logging at the end of a metadata pass. +#[derive(Debug, Default, Clone, Copy)] +pub struct MetadataStats { + pub discovered: usize, + pub upserted: usize, + pub covers_fetched: usize, + pub mangas_failed: usize, +} + +/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline +/// for the target source. Pure metadata; chapter content is enqueued as +/// separate `SyncChapterContent` jobs by the caller after this returns. +/// +/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is +/// the "metadata-only" mode (parser doesn't extract chapters, and +/// `sync_manga_chapters` is skipped — otherwise an empty chapter list +/// would soft-drop existing rows). +#[allow(clippy::too_many_arguments)] +pub async fn run_metadata_pass( + browser_manager: &BrowserManager, + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + rate: &HostRateLimiters, + start_url: &str, + limit: usize, + skip_chapters: bool, +) -> anyhow::Result { + let lease = browser_manager + .acquire() + .await + .context("acquire browser lease for metadata pass")?; + let browser_ref: &chromiumoxide::Browser = &lease; + + let source = { + let s = TargetSource::new(start_url.to_string()); + if skip_chapters { + s.without_chapter_parsing() + } else { + s + } + }; + let ctx = FetchContext { + browser: browser_ref, + rate, + }; + + let source_id = source.id(); + repo::crawler::ensure_source( + db, + source_id, + "Target Site", + &origin_of(start_url).unwrap_or_else(|| start_url.to_string()), + ) + .await + .context("ensure_source")?; + + let run_started_at = chrono::Utc::now(); + let max_refs = (limit > 0).then_some(limit); + + tracing::info!(?max_refs, "discovering manga list"); + let refs = source + .discover(&ctx, DiscoverMode::Backfill, max_refs) + .await + .context("discover failed")?; + tracing::info!(count = refs.len(), "discovered manga list"); + + let mut stats = MetadataStats { + discovered: refs.len(), + ..MetadataStats::default() + }; + + for (i, r) in refs.iter().enumerate() { + tracing::info!( + idx = i + 1, + total = stats.discovered, + key = %r.source_manga_key, + "fetching metadata" + ); + let manga = match source.fetch_manga(&ctx, r).await { + Ok(m) => m, + Err(e) => { + tracing::warn!( + key = %r.source_manga_key, + url = %r.url, + error = ?e, + "fetch_manga failed" + ); + stats.mangas_failed += 1; + continue; + } + }; + + let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga) + .await + { + Ok(u) => u, + Err(e) => { + tracing::error!( + key = %r.source_manga_key, + error = ?e, + "upsert_manga_from_source failed" + ); + stats.mangas_failed += 1; + continue; + } + }; + stats.upserted += 1; + tracing::info!( + key = %manga.source_manga_key, + manga_id = %upsert.manga_id, + status = ?upsert.status, + title = %manga.title, + "manga upserted" + ); + + // Cover image: download when missing in storage or when metadata + // signaled an update (cover URL is part of metadata_hash, so + // Updated implies the URL may have moved). Failures are non-fatal. + let needs_cover = upsert.cover_image_path.is_none() + || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); + if needs_cover { + if let Some(cover_url) = manga.cover_url.as_deref() { + match download_and_store_cover( + db, + storage, + http, + rate, + &r.url, + upsert.manga_id, + cover_url, + ) + .await + { + Ok(()) => stats.covers_fetched += 1, + Err(e) => tracing::warn!( + manga_id = %upsert.manga_id, + error = ?e, + "cover download failed" + ), + } + } + } + + if !skip_chapters { + match repo::crawler::sync_manga_chapters( + db, + source_id, + upsert.manga_id, + &manga.chapters, + ) + .await + { + Ok(diff) => tracing::info!( + manga_id = %upsert.manga_id, + new = diff.new, + refreshed = diff.refreshed, + dropped = diff.dropped, + "chapters synced" + ), + Err(e) => tracing::warn!( + manga_id = %upsert.manga_id, + error = ?e, + "chapter sync failed" + ), + } + } + } + + if limit == 0 { + match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { + Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), + Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), + } + } else { + tracing::info!(limit, "partial sync — skipping drop pass"); + } + + drop(lease); + Ok(stats) +} + +/// Enqueue a `SyncChapterContent` job for every chapter of *any* bookmarked +/// manga that still has `page_count = 0` and a non-dropped source row. +/// Returns `(inserted, skipped)` counts. Dedup index handles repeats. +pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result { + let rows: Vec<(String, Uuid, String)> = sqlx::query_as( + r#" + SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key + FROM chapters c + JOIN bookmarks b ON b.manga_id = c.manga_id + JOIN chapter_sources cs ON cs.chapter_id = c.id + WHERE c.page_count = 0 + AND cs.dropped_at IS NULL + GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at + ORDER BY c.manga_id, c.created_at ASC + "#, + ) + .fetch_all(pool) + .await + .context("query bookmarked-pending chapters")?; + + let mut summary = EnqueueSummary::default(); + for (source_id, chapter_id, source_chapter_key) in rows { + let payload = JobPayload::SyncChapterContent { + source_id, + chapter_id, + source_chapter_key, + }; + match jobs::enqueue(pool, &payload).await { + Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, + Ok(EnqueueResult::Skipped) => summary.skipped += 1, + Err(e) => { + tracing::warn!( + %chapter_id, + error = ?e, + "enqueue chapter content failed" + ); + summary.failed += 1; + } + } + } + Ok(summary) +} + +/// Enqueue chapter-content jobs for a *single* manga (the bookmark-create +/// hook). Same dedup semantics as [`enqueue_bookmarked_pending`]. +pub async fn enqueue_pending_for_manga( + pool: &PgPool, + manga_id: Uuid, +) -> anyhow::Result { + let rows: Vec<(String, Uuid, String)> = sqlx::query_as( + r#" + SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key + FROM chapters c + JOIN chapter_sources cs ON cs.chapter_id = c.id + WHERE c.manga_id = $1 + AND c.page_count = 0 + AND cs.dropped_at IS NULL + ORDER BY cs.source_id, c.id + "#, + ) + .bind(manga_id) + .fetch_all(pool) + .await + .context("query pending chapters for manga")?; + + let mut summary = EnqueueSummary::default(); + for (source_id, chapter_id, source_chapter_key) in rows { + let payload = JobPayload::SyncChapterContent { + source_id, + chapter_id, + source_chapter_key, + }; + match jobs::enqueue(pool, &payload).await { + Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1, + Ok(EnqueueResult::Skipped) => summary.skipped += 1, + Err(e) => { + tracing::warn!( + %chapter_id, + error = ?e, + "enqueue chapter content failed" + ); + summary.failed += 1; + } + } + } + Ok(summary) +} + +#[derive(Debug, Default, Clone, Copy)] +pub struct EnqueueSummary { + pub inserted: usize, + pub skipped: usize, + pub failed: usize, +} + +/// Download a cover image and persist its storage path. Local to the +/// pipeline because the CLI still calls it from its inline chapter-content +/// loop; once the worker pool fully replaces that path we can fold this +/// into `pipeline` proper. +async fn download_and_store_cover( + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + rate: &HostRateLimiters, + manga_url: &str, + manga_id: Uuid, + cover_url: &str, +) -> anyhow::Result<()> { + let absolute = reqwest::Url::parse(manga_url) + .context("parse manga URL")? + .join(cover_url) + .context("join cover URL onto manga URL")?; + + rate.wait_for(absolute.as_str()).await?; + let resp = http + .get(absolute.clone()) + .header(reqwest::header::REFERER, manga_url) + .send() + .await + .with_context(|| format!("GET {absolute}"))? + .error_for_status() + .with_context(|| format!("non-2xx for {absolute}"))?; + let bytes = resp.bytes().await.context("read cover body")?; + let kind = infer::get(&bytes); + let ext = kind.map(|k| k.extension()).unwrap_or("bin"); + let key = format!("mangas/{manga_id}/cover.{ext}"); + + storage + .put(&key, &bytes) + .await + .with_context(|| format!("store cover at {key}"))?; + repo::manga::set_cover_image_path(db, manga_id, &key) + .await + .with_context(|| format!("update cover_image_path for {manga_id}"))?; + tracing::info!( + manga_id = %manga_id, + key = %key, + bytes = bytes.len(), + %absolute, + "cover stored" + ); + Ok(()) +} + +fn origin_of(url: &str) -> Option { + let (scheme, rest) = url.split_once("://")?; + let host = rest.split('/').next()?; + Some(format!("{scheme}://{host}")) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 1a82224..2d21685 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -12,10 +12,21 @@ async fn main() -> anyhow::Result<()> { let config = mangalord::config::Config::from_env()?; let addr: SocketAddr = config.bind_address.parse()?; - let app = mangalord::app::build(config).await?; + let mangalord::app::AppHandle { router, daemon } = mangalord::app::build(config).await?; tracing::info!(%addr, "mangalord listening"); let listener = tokio::net::TcpListener::bind(addr).await?; - axum::serve(listener, app).await?; + axum::serve(listener, router) + .with_graceful_shutdown(async { + let _ = tokio::signal::ctrl_c().await; + tracing::info!("ctrl-c received; shutting down"); + }) + .await?; + + // Drain background tasks (crawler daemon) before exiting so Chromium + // gets a clean shutdown rather than relying on kill-on-drop. + if let Some(d) = daemon { + d.shutdown().await; + } Ok(()) } diff --git a/backend/tests/crawler_daemon.rs b/backend/tests/crawler_daemon.rs new file mode 100644 index 0000000..8d8f71a --- /dev/null +++ b/backend/tests/crawler_daemon.rs @@ -0,0 +1,372 @@ +//! Integration tests for the crawler daemon's cron + worker pool. The +//! daemon's full real path requires Chromium and a live source; here we +//! test the seam (MetadataPass / ChapterDispatcher traits) and the +//! cron/worker control-flow. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use chrono::NaiveTime; +use chrono_tz::Tz; +use mangalord::crawler::content::SyncOutcome; +use mangalord::crawler::daemon::{ + self, test_support::CountingMetadataPass, ChapterDispatcher, DaemonConfig, MetadataPass, + CRON_LOCK_KEY, +}; +use mangalord::crawler::jobs::{self, JobPayload}; +use mangalord::crawler::pipeline; +use serde_json::json; +use sqlx::PgPool; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +fn far_future_daily_at() -> NaiveTime { + // Some time hours from "now" so the scheduler sleeps for the whole test. + NaiveTime::from_hms_opt(23, 59, 0).unwrap() +} + +fn make_cfg( + metadata_pass: Option>, + dispatcher: Arc, + session_expired: Arc, + workers: usize, +) -> DaemonConfig { + DaemonConfig { + metadata_pass, + dispatcher, + chapter_workers: workers, + daily_at: far_future_daily_at(), + tz: Tz::UTC, + retention_days: 7, + session_expired, + extra_tasks: Vec::new(), + } +} + +async fn enqueue_chapter_job(pool: &PgPool) -> Uuid { + let chapter_id = Uuid::new_v4(); + let payload = JobPayload::SyncChapterContent { + source_id: "target".into(), + chapter_id, + source_chapter_key: format!("ch-{chapter_id}"), + }; + let res = jobs::enqueue(pool, &payload).await.unwrap(); + match res { + jobs::EnqueueResult::Inserted(_) => chapter_id, + jobs::EnqueueResult::Skipped => unreachable!("fresh chapter_id"), + } +} + +async fn count_state(pool: &PgPool, state: &str) -> i64 { + sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM crawler_jobs WHERE state = $1") + .bind(state) + .fetch_one(pool) + .await + .unwrap() +} + +struct AlwaysDoneDispatcher { + seen: AtomicUsize, +} +#[async_trait::async_trait] +impl ChapterDispatcher for AlwaysDoneDispatcher { + async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { + self.seen.fetch_add(1, Ordering::AcqRel); + Ok(SyncOutcome::Fetched { pages: 1 }) + } +} + +struct PanickingDispatcher { + seen: AtomicUsize, +} +#[async_trait::async_trait] +impl ChapterDispatcher for PanickingDispatcher { + async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { + self.seen.fetch_add(1, Ordering::AcqRel); + panic!("intentional dispatcher panic"); + } +} + +#[sqlx::test(migrations = "./migrations")] +async fn workers_drain_jobs_through_dispatcher(pool: PgPool) { + enqueue_chapter_job(&pool).await; + enqueue_chapter_job(&pool).await; + enqueue_chapter_job(&pool).await; + + let dispatcher = Arc::new(AlwaysDoneDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancel = CancellationToken::new(); + let handle = daemon::spawn( + pool.clone(), + cancel.clone(), + make_cfg(None, dispatcher.clone(), session_expired, 2), + ); + + // Wait for the workers to drain all three jobs. + let dispatcher_seen = || dispatcher.seen.load(Ordering::Acquire); + for _ in 0..40 { + if dispatcher_seen() >= 3 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + assert!( + dispatcher_seen() >= 3, + "expected at least 3 dispatches, got {}", + dispatcher_seen() + ); + + handle.shutdown().await; + assert_eq!(count_state(&pool, "done").await, 3); +} + +#[sqlx::test(migrations = "./migrations")] +async fn workers_idle_while_session_expired(pool: PgPool) { + let id = enqueue_chapter_job(&pool).await; + let dispatcher = Arc::new(AlwaysDoneDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(true)); + let cancel = CancellationToken::new(); + let handle = daemon::spawn( + pool.clone(), + cancel.clone(), + make_cfg(None, dispatcher.clone(), Arc::clone(&session_expired), 1), + ); + + // Wait long enough that a non-idled worker would have leased and ack'd. + tokio::time::sleep(Duration::from_millis(800)).await; + assert_eq!( + dispatcher.seen.load(Ordering::Acquire), + 0, + "dispatcher must not be invoked while session_expired flag is set" + ); + assert_eq!(count_state(&pool, "pending").await, 1); + let _ = id; + + handle.shutdown().await; +} + +#[sqlx::test(migrations = "./migrations")] +async fn dispatcher_panic_is_contained_and_job_is_acked_failed(pool: PgPool) { + enqueue_chapter_job(&pool).await; + enqueue_chapter_job(&pool).await; + + let dispatcher = Arc::new(PanickingDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancel = CancellationToken::new(); + let handle = daemon::spawn( + pool.clone(), + cancel.clone(), + make_cfg(None, dispatcher.clone(), session_expired, 1), + ); + + // Wait for the worker to handle both panicking jobs. + for _ in 0..40 { + if dispatcher.seen.load(Ordering::Acquire) >= 2 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + assert!( + dispatcher.seen.load(Ordering::Acquire) >= 2, + "worker must keep going after a panic — handled at least 2 jobs" + ); + + handle.shutdown().await; + + // attempts=1 below max=5, so the panicking jobs go back to pending with + // backoff and `last_error = "worker panicked"`. + let last_errors: Vec = sqlx::query_scalar( + "SELECT last_error FROM crawler_jobs WHERE last_error IS NOT NULL", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(last_errors.len(), 2); + assert!(last_errors.iter().all(|e| e == "worker panicked")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn cron_skips_tick_when_advisory_lock_held(pool: PgPool) { + // With no last_metadata_tick_at row, the daemon does a catch-up tick + // immediately on spawn. We hold the advisory lock on a separate + // connection beforehand so the catch-up's pg_try_advisory_lock returns + // false and the tick must skip without invoking the metadata pass. + let mut lock_conn = pool.acquire().await.unwrap(); + sqlx::query("SELECT pg_advisory_lock($1)") + .bind(CRON_LOCK_KEY) + .execute(&mut *lock_conn) + .await + .unwrap(); + + let counter = Arc::new(CountingMetadataPass::default()); + let dispatcher = Arc::new(AlwaysDoneDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancel = CancellationToken::new(); + // daily_at far in the future so after the (skipped) catch-up the + // cron sleeps for the rest of the test rather than racing for the lock. + let cfg = make_cfg( + Some(counter.clone() as Arc), + dispatcher, + session_expired, + 1, + ); + let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg); + + tokio::time::sleep(Duration::from_millis(800)).await; + assert_eq!( + counter.count.load(Ordering::Acquire), + 0, + "cron must skip the catch-up tick while the advisory lock is held" + ); + + sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(CRON_LOCK_KEY) + .execute(&mut *lock_conn) + .await + .unwrap(); + drop(lock_conn); + + handle.shutdown().await; +} + +#[sqlx::test(migrations = "./migrations")] +async fn cron_catches_up_when_last_tick_is_stale(pool: PgPool) { + // Pre-seed last_metadata_tick_at well in the past so previous_fire(now) + // > last_tick is trivially true and the daemon catches up immediately. + sqlx::query( + "INSERT INTO crawler_state (key, value) VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + ) + .bind("last_metadata_tick_at") + .bind(json!({"at": "2020-01-01T00:00:00Z"})) + .execute(&pool) + .await + .unwrap(); + + let counter = Arc::new(CountingMetadataPass::default()); + let dispatcher = Arc::new(AlwaysDoneDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancel = CancellationToken::new(); + let handle = daemon::spawn( + pool.clone(), + cancel.clone(), + make_cfg( + Some(counter.clone() as Arc), + dispatcher, + session_expired, + 1, + ), + ); + + for _ in 0..40 { + if counter.count.load(Ordering::Acquire) >= 1 { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + assert!( + counter.count.load(Ordering::Acquire) >= 1, + "catch-up tick should have fired immediately" + ); + + handle.shutdown().await; +} + +#[sqlx::test(migrations = "./migrations")] +async fn enqueue_bookmarked_pending_skips_dropped_sources(pool: PgPool) { + // Setup: one manga with two chapters (page_count = 0). One has a + // non-dropped source; the other's source is dropped. A user bookmarks + // the manga. Expectation: only the non-dropped chapter is enqueued. + let user_id: Uuid = sqlx::query_scalar( + "INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id", + ) + .bind("alice") + .bind("not-a-real-hash") + .fetch_one(&pool) + .await + .unwrap(); + let manga_id: Uuid = sqlx::query_scalar( + "INSERT INTO mangas (title) VALUES ($1) RETURNING id", + ) + .bind("Berserk") + .fetch_one(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") + .bind("target") + .bind("Target") + .bind("https://example.com") + .execute(&pool) + .await + .unwrap(); + let c1: Uuid = sqlx::query_scalar( + "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 1, 0) RETURNING id", + ) + .bind(manga_id) + .fetch_one(&pool) + .await + .unwrap(); + let c2: Uuid = sqlx::query_scalar( + "INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, 2, 0) RETURNING id", + ) + .bind(manga_id) + .fetch_one(&pool) + .await + .unwrap(); + // c1: alive source. c2: dropped source. + sqlx::query( + "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \ + VALUES ($1, $2, $3, $4)", + ) + .bind("target") + .bind("ch1") + .bind(c1) + .bind("https://example.com/ch1") + .execute(&pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url, dropped_at) \ + VALUES ($1, $2, $3, $4, now())", + ) + .bind("target") + .bind("ch2") + .bind(c2) + .bind("https://example.com/ch2") + .execute(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)") + .bind(user_id) + .bind(manga_id) + .execute(&pool) + .await + .unwrap(); + + let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap(); + assert_eq!(summary.inserted, 1, "only the non-dropped chapter enqueued"); + assert_eq!(summary.skipped, 0); + let payloads: Vec = sqlx::query_scalar( + "SELECT payload FROM crawler_jobs WHERE payload->>'kind' = 'sync_chapter_content'", + ) + .fetch_all(&pool) + .await + .unwrap(); + assert_eq!(payloads.len(), 1); + assert_eq!( + payloads[0]["chapter_id"].as_str().unwrap(), + c1.to_string() + ); +} + diff --git a/frontend/package.json b/frontend/package.json index cb0535c..1b57de0 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.27.0", + "version": "0.28.0", "private": true, "type": "module", "scripts": {