use std::sync::Arc; use std::sync::atomic::AtomicBool; use anyhow::Context; use async_trait::async_trait; use axum::extract::DefaultBodyLimit; use axum::http::{HeaderName, HeaderValue, Method}; use axum::Router; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; use tokio_util::sync::CancellationToken; use tower_http::cors::{AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; use crate::config::{AuthConfig, Config, CrawlerConfig, CrawlerModePref, UploadConfig}; use crate::crawler::browser_manager::{self, BrowserManager}; use crate::crawler::content::{self, SyncOutcome}; use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass}; use crate::crawler::jobs::JobPayload; use crate::crawler::pipeline::{self, MetadataStats}; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::safety::DownloadAllowlist; use crate::crawler::session; use crate::crawler::source::{target as target_source, DiscoverMode}; use crate::repo; use crate::storage::{LocalStorage, Storage}; #[derive(Clone)] pub struct AppState { pub db: PgPool, pub storage: Arc, pub auth: AuthConfig, pub upload: UploadConfig, } /// Bundle returned by [`build`]. The router is what `axum::serve` consumes; /// the daemon (when enabled) outlives the HTTP server and is awaited via /// [`AppHandle::shutdown`] after the listener has finished gracefully. pub struct AppHandle { pub router: Router, pub daemon: Option, } impl AppHandle { pub async fn shutdown(self) { if let Some(d) = self.daemon { d.shutdown().await; } } } pub async fn build(config: Config) -> anyhow::Result { let db = PgPoolOptions::new() .max_connections(10) .connect(&config.database_url) .await?; sqlx::migrate!("./migrations").run(&db).await?; let storage: Arc = Arc::new(LocalStorage::new(config.storage_dir.clone())); let daemon = if config.crawler.daemon_enabled { Some(spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?) } else { tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)"); None }; let state = AppState { db, storage, auth: config.auth.clone(), upload: config.upload.clone(), }; let router = router(state).layer(cors_layer(&config.cors_allowed_origins)); Ok(AppHandle { router, daemon }) } async fn spawn_crawler_daemon( db: PgPool, storage: Arc, cfg: &CrawlerConfig, ) -> anyhow::Result { // Reqwest client with cookie jar pre-seeded so CDN image fetches // include PHPSESSID. Same shape as bin/crawler.rs main(). let cookie_jar = Arc::new(reqwest::cookie::Jar::default()); if let (Some(sid), Some(domain), Some(start_url)) = (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) { let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); let seed_url = reqwest::Url::parse(start_url) .context("parse CRAWLER_START_URL for cookie seed")?; cookie_jar.add_cookie_str(&cookie_str, &seed_url); } let mut http_builder = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(30)) .no_proxy() .cookie_provider(cookie_jar); if let Some(ua) = &cfg.user_agent { http_builder = http_builder.user_agent(ua); } if let Some(proxy) = &cfg.proxy { http_builder = http_builder .proxy(reqwest::Proxy::all(proxy).with_context(|| format!("parse proxy: {proxy}"))?); } let http = http_builder.build().context("build crawler reqwest")?; let mut rate = HostRateLimiters::new(std::time::Duration::from_millis(cfg.rate_ms)); if let Some(host) = &cfg.cdn_host { rate = rate.with_override(host, std::time::Duration::from_millis(cfg.cdn_rate_ms)); } let rate = Arc::new(rate); // Browser manager. on_launch re-injects PHPSESSID on every fresh // chromium spawn so an idle teardown followed by re-launch stays // authenticated without operator action. let mut launch_opts = cfg.browser.clone(); if let Some(proxy) = &cfg.proxy { launch_opts.extra_args.push(format!("--proxy-server={proxy}")); } let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) { (Some(sid), Some(domain), Some(start_url)) => { let sid = sid.clone(); let domain = domain.clone(); let start_url = start_url.clone(); let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| { let sid = sid.clone(); let domain = domain.clone(); let start_url = start_url.clone(); Box::pin(async move { session::inject_phpsessid(&browser, &sid, &domain) .await .context("on_launch: inject_phpsessid")?; session::verify_session(&browser, &start_url) .await .context("on_launch: verify_session")?; Ok(()) }) }); on_launch } _ => browser_manager::noop_on_launch(), }; let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch); let session_expired = Arc::new(AtomicBool::new(false)); let metadata_pass: Option> = cfg.start_url.as_ref().map(|url| { let m: Arc = Arc::new(RealMetadataPass { browser_manager: Arc::clone(&browser_manager), db: db.clone(), storage: Arc::clone(&storage), http: http.clone(), rate: Arc::clone(&rate), start_url: url.clone(), mode_pref: cfg.mode, incremental_stop_after: cfg.incremental_stop_after, download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, }); m }); let dispatcher: Arc = Arc::new(RealChapterDispatcher { browser_manager: Arc::clone(&browser_manager), db: db.clone(), storage: Arc::clone(&storage), http, rate: Arc::clone(&rate), download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, }); // Shared cancellation: daemon shutdown cancels the BrowserManager's // idle reaper too. Reaper itself is added to the daemon's extra_tasks // so DaemonHandle::shutdown awaits its completion. let cancel = CancellationToken::new(); let reaper_task = browser_manager::spawn_idle_reaper( Arc::clone(&browser_manager), cancel.clone(), ); // Also close the browser explicitly on shutdown so we don't rely on // kill-on-drop when other Arc holders may still exist. let shutdown_task = { let cancel = cancel.clone(); let mgr = Arc::clone(&browser_manager); tokio::spawn(async move { cancel.cancelled().await; mgr.shutdown().await; }) }; let daemon_handle = daemon::spawn( db, cancel, DaemonConfig { metadata_pass, dispatcher, chapter_workers: cfg.chapter_workers, daily_at: cfg.daily_at, tz: cfg.tz, retention_days: cfg.retention_days, session_expired, extra_tasks: vec![reaper_task, shutdown_task], }, ); Ok(daemon_handle) } // Real impls of the daemon traits, owning the browser manager + I/O. Kept // in app.rs because they need the same builder-side env wiring that // AppState gets — the daemon module itself stays free of reqwest / storage // details so its tests don't pull them in. struct RealMetadataPass { browser_manager: Arc, db: PgPool, storage: Arc, http: reqwest::Client, rate: Arc, start_url: String, mode_pref: CrawlerModePref, incremental_stop_after: usize, download_allowlist: DownloadAllowlist, max_image_bytes: usize, } #[async_trait] impl MetadataPass for RealMetadataPass { async fn run(&self) -> anyhow::Result { let mode = resolve_mode( &self.db, target_source::SOURCE_ID, self.mode_pref, self.incremental_stop_after, ) .await?; pipeline::run_metadata_pass( &self.browser_manager, &self.db, self.storage.as_ref(), &self.http, &self.rate, &self.start_url, 0, false, mode, &self.download_allowlist, self.max_image_bytes, ) .await } } /// Pick the active mode for this tick. `Explicit` short-circuits the /// DB lookup. `Auto` reads `seed_completed_at`: missing → Backfill /// (initial seed for this source), present → Incremental with the /// configured threshold. /// /// A DB error during the Auto lookup propagates as `Err` rather than /// silently degrading to Backfill — the daemon's `run_tick` catches /// the error, logs, and skips the tick. That's safer than running a /// full re-backfill (including a drop pass against stale-looking rows) /// when the DB is flaky. async fn resolve_mode( db: &PgPool, source_id: &str, pref: CrawlerModePref, incremental_stop_after: usize, ) -> anyhow::Result { match pref { CrawlerModePref::Explicit(m) => { tracing::info!(?m, "crawler mode: explicit (CRAWLER_MODE override)"); Ok(m) } CrawlerModePref::Auto => { let seeded = repo::crawler::seed_completed_at(db, source_id) .await .context("seed_completed_at lookup for mode auto-detection")?; match seeded { Some(at) => { tracing::info!( seed_completed_at = %at.to_rfc3339(), "crawler mode: auto → incremental (seed previously completed)" ); Ok(DiscoverMode::Incremental { stop_after_unchanged: incremental_stop_after, }) } None => { tracing::info!("crawler mode: auto → backfill (no seed marker for source)"); Ok(DiscoverMode::Backfill) } } } } } struct RealChapterDispatcher { browser_manager: Arc, db: PgPool, storage: Arc, http: reqwest::Client, rate: Arc, download_allowlist: DownloadAllowlist, max_image_bytes: usize, } #[async_trait] impl ChapterDispatcher for RealChapterDispatcher { async fn dispatch(&self, payload: JobPayload) -> anyhow::Result { match payload { JobPayload::SyncChapterContent { source_id: _, chapter_id, source_chapter_key: _, } => { let row = repo::chapter::dispatch_target(&self.db, chapter_id) .await .context("look up chapter for dispatch")?; let Some((manga_id, source_url)) = row else { // Chapter (or its source row) is gone — ack done. return Ok(SyncOutcome::Skipped); }; let lease = self.browser_manager.acquire().await?; let outcome = content::sync_chapter_content( &lease, &self.db, self.storage.as_ref(), &self.http, &self.rate, chapter_id, manga_id, &source_url, false, &self.download_allowlist, self.max_image_bytes, ) .await?; drop(lease); Ok(outcome) } // Other payload kinds aren't dispatched by this daemon yet — // metadata-driven jobs (Discover/SyncManga/SyncChapterList) // are handled inline by the cron's metadata pass. _ => Ok(SyncOutcome::Skipped), } } } /// Build a router from a pre-assembled state. Used by integration tests /// so they can swap in a test DB pool and a `tempfile`-backed storage. pub fn router(state: AppState) -> Router { let max_request_bytes = state.upload.max_request_bytes; Router::new() .nest("/api/v1", crate::api::routes()) .layer(DefaultBodyLimit::max(max_request_bytes)) .with_state(state) .layer(TraceLayer::new_for_http()) } pub(crate) fn cors_layer(allowed_origins: &[String]) -> CorsLayer { if allowed_origins.is_empty() { // Same-origin only — no CORS headers emitted. return CorsLayer::new(); } let origins: Vec = allowed_origins .iter() .filter_map(|o| HeaderValue::from_str(o).ok()) .collect(); CorsLayer::new() .allow_origin(AllowOrigin::list(origins)) .allow_credentials(true) .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE]) .allow_headers([ HeaderName::from_static("content-type"), HeaderName::from_static("authorization"), ]) } #[cfg(test)] mod tests { use super::*; use axum::body::Body; use axum::http::Request; use axum::routing::get; use tower::ServiceExt; fn test_router() -> Router { Router::new().route("/", get(|| async { "ok" })) } #[tokio::test] async fn allowlist_preflight_emits_credentialed_headers() { let app = test_router().layer(cors_layer(&["https://app.example.com".to_string()])); let resp = app .oneshot( Request::builder() .method(Method::OPTIONS) .uri("/") .header("origin", "https://app.example.com") .header("access-control-request-method", "POST") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert_eq!( resp.headers().get("access-control-allow-origin").unwrap(), "https://app.example.com" ); assert_eq!( resp.headers().get("access-control-allow-credentials").unwrap(), "true" ); } #[tokio::test] async fn allowlist_rejects_unlisted_origin() { let app = test_router().layer(cors_layer(&["https://app.example.com".to_string()])); let resp = app .oneshot( Request::builder() .method(Method::OPTIONS) .uri("/") .header("origin", "https://evil.example.org") .header("access-control-request-method", "POST") .body(Body::empty()) .unwrap(), ) .await .unwrap(); // Browsers will refuse the response when the allow-origin header // is absent (or doesn't echo the requesting origin). assert!(resp.headers().get("access-control-allow-origin").is_none()); } #[tokio::test] async fn empty_allowlist_is_same_origin_only() { let app = test_router().layer(cors_layer(&[])); let resp = app .oneshot( Request::builder() .method(Method::OPTIONS) .uri("/") .header("origin", "https://app.example.com") .header("access-control-request-method", "POST") .body(Body::empty()) .unwrap(), ) .await .unwrap(); assert!(resp.headers().get("access-control-allow-origin").is_none()); assert!(resp.headers().get("access-control-allow-credentials").is_none()); } }