diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 25f3ccb..f3e5b80 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -423,6 +423,24 @@ dependencies = [ "version_check", ] +[[package]] +name = "cookie_store" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15b2c103cf610ec6cae3da84a766285b42fd16aad564758459e6ecf128c75206" +dependencies = [ + "cookie", + "document-features", + "idna", + "log", + "publicsuffix", + "serde", + "serde_derive", + "serde_json", + "time", + "url", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -601,6 +619,15 @@ dependencies = [ "syn", ] +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -1386,6 +1413,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "lock_api" version = "0.4.14" @@ -1415,7 +1448,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.24.0" +version = "0.25.0" dependencies = [ "anyhow", "argon2", @@ -2039,6 +2072,22 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "psl-types" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" + +[[package]] +name = "publicsuffix" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42ea446cab60335f76979ec15e12619a2165b5ae2c12166bef27d283a9fadf" +dependencies = [ + "idna", + "psl-types", +] + [[package]] name = "quinn" version = "0.11.9" @@ -2240,6 +2289,8 @@ checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64", "bytes", + "cookie", + "cookie_store", "futures-core", "http", "http-body", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index c3ca581..b44b7f8 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.24.0" +version = "0.25.0" edition = "2021" default-run = "mangalord" @@ -45,7 +45,7 @@ futures-util = "0.3" bytes = "1" chromiumoxide = { version = "0.7", features = ["tokio-runtime", "_fetcher-rusttls-tokio"], default-features = false } scraper = "0.20" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "socks"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "socks", "cookies"] } [dev-dependencies] tempfile = "3" diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index e667ef6..d2510fe 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -2,9 +2,10 @@ //! //! Walks the source's manga listing (all pages), fetches each manga's //! metadata + chapter list, downloads the cover into `Storage`, and -//! reconciles everything into the DB. Chapter *content* (page images) -//! is out of scope for now — only chapter rows + their source links -//! are written. +//! reconciles everything into the DB. Then, for any chapter belonging +//! to a bookmarked manga whose `page_count` is still 0, fetches the +//! chapter page (logged in), pulls every image from the CDN, and writes +//! the `pages` rows atomically per chapter. //! //! Configuration: //! - **Start URL** (required): first CLI positional arg, else @@ -15,13 +16,34 @@ //! - **Browser**: see `LaunchOptions::from_env` — //! `CRAWLER_BROWSER_MODE` (`headed`|`headless`) and //! `CRAWLER_BROWSER_ARGS`. -//! - **Rate limit**: `CRAWLER_RATE_MS` (ms between requests, default -//! `1000`). +//! - **Rate limit**: `CRAWLER_RATE_MS` (ms between requests per host, +//! default `1000`). Per-host: catalog and each CDN have their own +//! bucket and don't share a budget. +//! - **CDN rate override** (optional): `CRAWLER_CDN_HOST` plus +//! `CRAWLER_CDN_RATE_MS` to give a specific host a different +//! interval. Useful when the image CDN tolerates higher RPS than +//! the catalog host. //! - **Cap**: `CRAWLER_LIMIT` (max manga detail fetches per run, //! default `0` = no cap). //! - **Skip chapters**: `CRAWLER_SKIP_CHAPTERS=1` — turn off the //! chapter selector in the parser AND skip the per-manga //! `sync_manga_chapters` write. Use this for "metadata only" runs. +//! - **Skip chapter content**: `CRAWLER_SKIP_CHAPTER_CONTENT=1` — +//! skip the page-image phase even if chapters need syncing. +//! - **Chapter content workers**: `CRAWLER_CHAPTER_WORKERS` (default +//! `1`). Multiple workers process distinct chapters concurrently; +//! the per-host rate limiter still gates total RPS to each origin. +//! - **Force re-fetch**: `CRAWLER_FORCE_REFETCH_CHAPTERS=1` — re-fetch +//! chapter images even when `page_count > 0`. Rare; use after the +//! source replaces a chapter's images. +//! - **PHPSESSID**: `CRAWLER_PHPSESSID` — paste your browser's +//! session cookie. Required for chapter content (logged-out reader +//! is paginated per-image and not viable at scale). +//! - **Cookie domain** (optional): `CRAWLER_COOKIE_DOMAIN` overrides +//! the auto-derived `..`. Only needed for +//! multi-part TLDs (`.co.uk`, etc.). +//! - **User agent** (optional): `CRAWLER_USER_AGENT` — applies to +//! reqwest image fetches. Default uses reqwest's built-in UA. //! - **Proxy**: `$CRAWLER_PROXY` — single URL applied to both //! Chromium (`--proxy-server`) and `reqwest::Proxy::all`. Supports //! `http://`, `https://`, and `socks5://` (with optional user:pass). @@ -32,16 +54,18 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; +use futures_util::stream::{self, StreamExt}; use mangalord::crawler::{ browser::{self, LaunchOptions}, - rate_limit::RateLimiter, + content::{self, SyncOutcome}, + rate_limit::HostRateLimiters, + session, source::{target::TargetSource, DiscoverMode, FetchContext, Source}, }; use mangalord::repo; use mangalord::storage::{LocalStorage, Storage}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; use uuid::Uuid; @@ -64,8 +88,25 @@ async fn main() -> anyhow::Result<()> { .unwrap_or_else(|_| "./data/storage".to_string()) .into(); let rate_ms = env_u64("CRAWLER_RATE_MS", 1000); + let cdn_host = std::env::var("CRAWLER_CDN_HOST") + .ok() + .filter(|s| !s.trim().is_empty()); + let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms); let limit = env_u64("CRAWLER_LIMIT", 0) as usize; let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false); + let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false); + let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize; + let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false); + let phpsessid = std::env::var("CRAWLER_PHPSESSID") + .ok() + .filter(|s| !s.trim().is_empty()); + let cookie_domain = std::env::var("CRAWLER_COOKIE_DOMAIN") + .ok() + .filter(|s| !s.trim().is_empty()) + .or_else(|| session::registrable_domain(&start_url)); + let user_agent = std::env::var("CRAWLER_USER_AGENT") + .ok() + .filter(|s| !s.trim().is_empty()); let proxy_url = std::env::var("CRAWLER_PROXY") .ok() .filter(|s| !s.trim().is_empty()); @@ -79,13 +120,25 @@ async fn main() -> anyhow::Result<()> { let storage: Arc = Arc::new(LocalStorage::new(&storage_dir)); - // `no_proxy()` disables reqwest's own env-based detection so the - // single `CRAWLER_PROXY` knob is the only thing that influences - // routing. Otherwise an unrelated `HTTPS_PROXY` in the shell would - // silently route cover downloads while the browser stayed direct. + // Build reqwest with: own cookie jar (seeded with PHPSESSID for + // the catalog domain only), optional UA override, optional single + // proxy. `no_proxy()` disables env-based detection so the + // CRAWLER_PROXY knob is the only routing input. + let cookie_jar = Arc::new(reqwest::cookie::Jar::default()); + if let (Some(sid), Some(domain)) = (&phpsessid, &cookie_domain) { + let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); + let seed_url = + reqwest::Url::parse(&start_url).context("parse start URL for cookie seed")?; + cookie_jar.add_cookie_str(&cookie_str, &seed_url); + tracing::info!(domain, "seeded PHPSESSID into reqwest cookie jar"); + } let mut http_builder = reqwest::Client::builder() .timeout(Duration::from_secs(30)) - .no_proxy(); + .no_proxy() + .cookie_provider(cookie_jar); + if let Some(ua) = &user_agent { + http_builder = http_builder.user_agent(ua); + } if let Some(proxy) = &proxy_url { http_builder = http_builder .proxy(reqwest::Proxy::all(proxy).with_context(|| format!("parse proxy URL: {proxy}"))?); @@ -100,40 +153,86 @@ async fn main() -> anyhow::Result<()> { ?options, %start_url, rate_ms, + cdn_host = ?cdn_host, + cdn_rate_ms, limit, skip_chapters, + skip_chapter_content, + chapter_workers, + force_refetch_chapters, + phpsessid_set = phpsessid.is_some(), + cookie_domain = ?cookie_domain, + user_agent = ?user_agent, proxy = ?proxy_url, storage_dir = %storage_dir.display(), "starting crawler" ); let handle = browser::launch(options).await.context("launch browser")?; + + // Cookie + session probe must happen *before* any browser + // navigation that depends on auth (i.e. chapter content). The + // discover/metadata phase doesn't strictly need auth, but + // probing now lets us fail fast: a bad cookie costs ~2s here + // instead of 30 min into a backfill. + let session_ready = if let (Some(sid), Some(domain)) = (&phpsessid, &cookie_domain) { + if let Err(e) = session::inject_phpsessid(handle.browser(), sid, domain).await { + handle.close().await.ok(); + return Err(e); + } + match session::verify_session(handle.browser(), &start_url).await { + Ok(()) => true, + Err(e) => { + handle.close().await.ok(); + return Err(e); + } + } + } else { + tracing::info!("no PHPSESSID supplied — chapter content phase will be skipped"); + false + }; + let result = run( handle.browser(), &db, - storage.as_ref(), + Arc::clone(&storage), &http, &start_url, rate_ms, + cdn_host.as_deref(), + cdn_rate_ms, limit, skip_chapters, + skip_chapter_content || !session_ready, + chapter_workers, + force_refetch_chapters, ) .await; handle.close().await.ok(); result } +#[allow(clippy::too_many_arguments)] async fn run( browser: &chromiumoxide::Browser, db: &PgPool, - storage: &dyn Storage, + storage: Arc, http: &reqwest::Client, start_url: &str, rate_ms: u64, + cdn_host: Option<&str>, + cdn_rate_ms: u64, limit: usize, skip_chapters: bool, + skip_chapter_content: bool, + chapter_workers: usize, + force_refetch_chapters: bool, ) -> anyhow::Result<()> { - let rate = Mutex::new(RateLimiter::new(Duration::from_millis(rate_ms))); + let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms)); + if let Some(host) = cdn_host { + rate = rate.with_override(host, Duration::from_millis(cdn_rate_ms)); + } + let rate = Arc::new(rate); let source = { let s = TargetSource::new(start_url.to_string()); if skip_chapters { @@ -144,7 +243,7 @@ async fn run( }; let ctx = FetchContext { browser, - rate: &rate, + rate: rate.as_ref(), }; let source_id = source.id(); @@ -208,9 +307,9 @@ async fn run( if let Some(cover_url) = manga.cover_url.as_deref() { if let Err(e) = download_and_store_cover( db, - storage, + storage.as_ref(), http, - &rate, + rate.as_ref(), &r.url, upsert.manga_id, cover_url, @@ -252,14 +351,149 @@ async fn run( tracing::info!(limit, "partial sync — skipping drop pass"); } + if !skip_chapter_content { + sync_bookmarked_chapter_content( + browser, + db, + Arc::clone(&storage), + http, + Arc::clone(&rate), + source_id, + chapter_workers, + force_refetch_chapters, + ) + .await?; + } + Ok(()) } +/// Find every chapter whose manga is bookmarked by at least one user +/// and that hasn't been content-synced yet, then fan them out across +/// `workers` concurrent tasks. Each task is one full chapter sync; the +/// per-host rate limiter caps total RPS to the source/CDN regardless +/// of worker count. +/// +/// A session-expired result from any task aborts the whole phase — +/// continuing wastes time and risks the source flagging the pattern. +#[allow(clippy::too_many_arguments)] +async fn sync_bookmarked_chapter_content( + browser: &chromiumoxide::Browser, + db: &PgPool, + storage: Arc, + http: &reqwest::Client, + rate: Arc, + source_id: &str, + workers: usize, + force_refetch: bool, +) -> anyhow::Result<()> { + let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as( + r#" + SELECT DISTINCT c.id, c.manga_id, cs.source_url + FROM chapters c + JOIN bookmarks b ON b.manga_id = c.manga_id + JOIN chapter_sources cs ON cs.chapter_id = c.id + WHERE cs.source_id = $1 + AND cs.dropped_at IS NULL + AND (c.page_count = 0 OR $2) + ORDER BY c.manga_id, c.created_at ASC + "#, + ) + .bind(source_id) + .bind(force_refetch) + .fetch_all(db) + .await + .context("query pending chapter content")?; + + if pending.is_empty() { + tracing::info!("chapter content: nothing pending"); + return Ok(()); + } + tracing::info!(count = pending.len(), workers, "chapter content phase starting"); + + // `for_each_concurrent` polls up to `workers` futures at once on + // the *current* task, so each future borrows the browser, db, and + // http client from the outer scope rather than requiring 'static + // captures via spawn. chromiumoxide's `Browser::new_page(&self)` + // is safe for concurrent calls; the per-host rate limiter + // serializes the actual on-wire requests against each origin. + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let stats = std::sync::Mutex::new(WorkerStats::default()); + + stream::iter(pending.into_iter()) + .for_each_concurrent(workers.max(1), |(chapter_id, manga_id, source_url)| { + let session_expired = Arc::clone(&session_expired); + let storage = Arc::clone(&storage); + let rate = Arc::clone(&rate); + let stats = &stats; + async move { + if session_expired.load(std::sync::atomic::Ordering::Relaxed) { + return; + } + let outcome = content::sync_chapter_content( + browser, + db, + storage.as_ref(), + http, + rate.as_ref(), + chapter_id, + manga_id, + &source_url, + force_refetch, + ) + .await; + let mut s = stats.lock().unwrap(); + match outcome { + Ok(SyncOutcome::Fetched { pages }) => { + tracing::info!(%chapter_id, pages, "chapter content fetched"); + s.fetched += 1; + } + Ok(SyncOutcome::Skipped) => s.skipped += 1, + Ok(SyncOutcome::SessionExpired) => { + tracing::error!( + %chapter_id, + "session expired mid-run — refresh CRAWLER_PHPSESSID and re-run" + ); + session_expired + .store(true, std::sync::atomic::Ordering::Relaxed); + } + Err(e) => { + tracing::warn!( + %chapter_id, error = ?e, "chapter content sync failed" + ); + s.failed += 1; + } + } + } + }) + .await; + + let total = stats.into_inner().unwrap(); + tracing::info!( + fetched = total.fetched, + skipped = total.skipped, + failed = total.failed, + "chapter content phase done" + ); + + if session_expired.load(std::sync::atomic::Ordering::Relaxed) { + anyhow::bail!("session expired during chapter content phase"); + } + Ok(()) +} + +#[derive(Default, Clone, Copy)] +struct WorkerStats { + fetched: usize, + skipped: usize, + failed: usize, +} + async fn download_and_store_cover( db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, - rate: &Mutex, + rate: &HostRateLimiters, manga_url: &str, manga_id: Uuid, cover_url: &str, @@ -269,9 +503,13 @@ async fn download_and_store_cover( .join(cover_url) .context("join cover URL onto manga URL")?; - rate.lock().await.wait().await; + rate.wait_for(absolute.as_str()).await?; let resp = http .get(absolute.clone()) + // Source CDNs commonly check Referer. Set it to the manga + // detail page that linked the cover — same UX as a real + // browser fetching the image. + .header(reqwest::header::REFERER, manga_url) .send() .await .with_context(|| format!("GET {absolute}"))? diff --git a/backend/src/crawler/content.rs b/backend/src/crawler/content.rs new file mode 100644 index 0000000..c804683 --- /dev/null +++ b/backend/src/crawler/content.rs @@ -0,0 +1,244 @@ +//! Chapter content sync — fetch a logged-in chapter page, extract its +//! image URLs in `pageN` order, download each to storage, and atomically +//! persist a `pages` row per image plus the chapter's `page_count`. +//! +//! Only chapters belonging to a manga someone has bookmarked are +//! candidates. The crawler scans bookmarks at the start of each run and +//! enqueues unfetched chapters; the API also enqueues at bookmark-time +//! so users get instant feedback. Both feed into the same queue and +//! dedup by chapter id. + +// Implementation lands in the next commits in this branch. Module is +// declared so other crates can `use crawler::content` without breaking +// builds while iteration is in progress. + +use anyhow::Context; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::crawler::rate_limit::HostRateLimiters; +use crate::crawler::session; +use crate::storage::Storage; + +/// Parse the chapter page DOM and return the page images in `pageN` +/// order. Filters out the loader `` and any +/// `` without a numeric `id="pageN"`. +pub fn parse_chapter_pages(html: &str) -> Vec { + let doc = scraper::Html::parse_document(html); + let sel = scraper::Selector::parse("a#pic_container img:not(.loading)").unwrap(); + let mut pages: Vec = doc + .select(&sel) + .filter_map(|img| { + let id = img.value().id()?; + let n: i32 = id.strip_prefix("page")?.parse().ok()?; + let src = img.value().attr("src")?.trim().to_string(); + if src.is_empty() { + return None; + } + Some(ChapterImage { page_number: n, url: src }) + }) + .collect(); + pages.sort_by_key(|p| p.page_number); + pages +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChapterImage { + pub page_number: i32, + pub url: String, +} + +/// Outcome of a single chapter sync — surfaced to callers for logging +/// and exit-code decisions. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SyncOutcome { + /// All images downloaded and stored, chapter row updated. + Fetched { pages: usize }, + /// `page_count > 0` already — no-op unless force_refetch is set. + Skipped, + /// Session probe failed mid-sync (avatar selector missing on the + /// chapter page). Caller should abort the whole crawler run. + SessionExpired, +} + +/// Fetch all images for one chapter and persist them atomically. On +/// any error after the first storage put, the DB transaction rolls +/// back so the chapter stays at `page_count = 0` and is retried on the +/// next run. Bytes already written to storage become orphans; a future +/// reaper sweeps them. +#[allow(clippy::too_many_arguments)] +pub async fn sync_chapter_content( + browser: &chromiumoxide::Browser, + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + rate: &HostRateLimiters, + chapter_id: Uuid, + manga_id: Uuid, + source_url: &str, + force_refetch: bool, +) -> anyhow::Result { + // Skip if already fetched, unless caller explicitly forces. + if !force_refetch { + let (page_count,): (i32,) = + sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1") + .bind(chapter_id) + .fetch_one(db) + .await + .context("read chapter page_count")?; + if page_count > 0 { + return Ok(SyncOutcome::Skipped); + } + } + + // Nav to chapter page (rate-limited per host). + rate.wait_for(source_url).await?; + let page = browser + .new_page(source_url) + .await + .with_context(|| format!("open chapter page {source_url}"))?; + page.wait_for_navigation().await.context("wait for chapter nav")?; + + // Session probe: avatar present == still logged in. Missing means + // PHPSESSID expired; bail the entire crawler run. + if page.find_element("#avatar_menu").await.is_err() { + page.close().await.ok(); + return Ok(SyncOutcome::SessionExpired); + } + + let html = page.content().await.context("read chapter html")?; + page.close().await.ok(); + + let images = parse_chapter_pages(&html); + if images.is_empty() { + anyhow::bail!("no page images parsed from {source_url}"); + } + + // Resolve image URLs against the chapter URL (they may be relative). + let base = reqwest::Url::parse(source_url).context("parse chapter URL")?; + + // Fetch every image bytes-first into memory before writing + // anything. Lets us bail the whole chapter cleanly if any image + // fails — DB stays at page_count=0, no partial rows persisted. + let mut fetched: Vec<(i32, Vec, &'static str)> = Vec::with_capacity(images.len()); + for img in &images { + let url = base.join(&img.url).with_context(|| { + format!("join image URL {} onto {source_url}", img.url) + })?; + rate.wait_for(url.as_str()).await?; + let resp = http + .get(url.clone()) + // Source CDNs commonly check Referer. Set it to the + // chapter page — matches what the browser would send. + .header(reqwest::header::REFERER, source_url) + .send() + .await + .with_context(|| format!("GET {url}"))? + .error_for_status() + .with_context(|| format!("non-2xx for {url}"))?; + let bytes = resp.bytes().await.context("read image body")?.to_vec(); + let ext = infer::get(&bytes).map(|k| k.extension()).unwrap_or("bin"); + fetched.push((img.page_number, bytes, ext)); + } + + // Atomic write: storage puts + page row inserts + page_count + // update, all in one transaction. If anything fails, rollback + + // the chapter is retried next run. Storage orphans the bytes; a + // reaper sweeps them later. + let mut tx = db.begin().await.context("open chapter sync tx")?; + for (page_number, bytes, ext) in &fetched { + let key = format!( + "mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}", + page_number + ); + storage + .put(&key, bytes) + .await + .with_context(|| format!("put {key}"))?; + // (chapter_id, page_number) is unique — re-runs idempotent. + sqlx::query( + "INSERT INTO pages (chapter_id, page_number, storage_key, content_type) + VALUES ($1, $2, $3, $4) + ON CONFLICT (chapter_id, page_number) DO UPDATE + SET storage_key = EXCLUDED.storage_key, + content_type = EXCLUDED.content_type", + ) + .bind(chapter_id) + .bind(page_number) + .bind(&key) + .bind(format!("image/{ext}")) + .execute(&mut *tx) + .await + .with_context(|| format!("insert page row {page_number}"))?; + } + sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2") + .bind(fetched.len() as i32) + .bind(chapter_id) + .execute(&mut *tx) + .await + .context("update page_count")?; + tx.commit().await.context("commit chapter sync")?; + + Ok(SyncOutcome::Fetched { pages: fetched.len() }) +} + +// Suppress unused-import warning for `session` until the bin/crawler +// wiring lands in this branch and uses it through this module. +#[allow(dead_code)] +fn _keep_session_in_scope() { + let _ = session::registrable_domain; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_chapter_pages_skips_loader_and_sorts_by_id() { + // Loader image, two real pages out of order, and one with no id. + let html = r#" + + + + + + + + "#; + let pages = parse_chapter_pages(html); + assert_eq!(pages.len(), 2); + assert_eq!(pages[0].page_number, 1); + assert_eq!(pages[0].url, "https://cdn/1.jpg"); + assert_eq!(pages[1].page_number, 2); + assert_eq!(pages[1].url, "https://cdn/2.jpg"); + } + + #[test] + fn parse_chapter_pages_drops_images_without_src() { + let html = r#" + + + + + "#; + let pages = parse_chapter_pages(html); + assert_eq!(pages.len(), 1); + assert_eq!(pages[0].page_number, 2); + } + + #[test] + fn parse_chapter_pages_handles_three_digit_page_ids() { + let html = r#" + + + + + + "#; + let pages = parse_chapter_pages(html); + assert_eq!( + pages.iter().map(|p| p.page_number).collect::>(), + vec![9, 50, 126] + ); + } +} diff --git a/backend/src/crawler/mod.rs b/backend/src/crawler/mod.rs index 4531be8..e9a4b5b 100644 --- a/backend/src/crawler/mod.rs +++ b/backend/src/crawler/mod.rs @@ -14,7 +14,9 @@ //! - [`diff`]: change detection — new / updated / dropped semantics. pub mod browser; +pub mod content; pub mod diff; pub mod jobs; pub mod rate_limit; +pub mod session; pub mod source; diff --git a/backend/src/crawler/rate_limit.rs b/backend/src/crawler/rate_limit.rs index 845be5c..d9d4aae 100644 --- a/backend/src/crawler/rate_limit.rs +++ b/backend/src/crawler/rate_limit.rs @@ -1,11 +1,22 @@ //! Per-host request pacing. //! -//! Single-token bucket: each `wait().await` either returns immediately -//! (if at least `interval` has elapsed since the last call) or sleeps -//! just enough to satisfy it. Uses `tokio::time::Instant` so tests can -//! run under `start_paused` virtual time without sleeping for real. +//! `RateLimiter` is a single-token bucket: each `wait().await` returns +//! immediately when at least `interval` has elapsed since the last call, +//! otherwise sleeps just enough to satisfy it. Uses +//! `tokio::time::Instant` so tests can run under `start_paused` virtual +//! time without sleeping for real. +//! +//! `HostRateLimiters` is the multi-host wrapper actually used by the +//! crawler — concurrent workers issuing requests to different origins +//! (catalog vs. CDN) don't contend on a shared budget; each host gets +//! its own bucket. `wait_for(url)` extracts the host, lazily creates a +//! limiter for it, and serializes only against other callers hitting +//! the same host. +use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tokio::time::Instant; #[derive(Debug)] @@ -33,6 +44,70 @@ impl RateLimiter { } } +/// Per-host rate limiter map. The outer `Mutex` is held only +/// during the entry-or-insert + Arc clone; the per-host `Mutex` +/// is held during the actual `wait().await`. So N workers calling +/// `wait_for(url)` on N different hosts contend nowhere except the brief +/// HashMap lookup; workers hitting the same host serialize on that +/// host's bucket. +#[derive(Debug)] +pub struct HostRateLimiters { + default_interval: Duration, + overrides: HashMap, + map: Mutex>>>, +} + +impl HostRateLimiters { + pub fn new(default_interval: Duration) -> Self { + Self { + default_interval, + overrides: HashMap::new(), + map: Mutex::new(HashMap::new()), + } + } + + /// Set a per-host interval that overrides `default_interval`. Calls + /// after a host's limiter has been instantiated do *not* re-create + /// it — set all overrides before the first `wait_for` to that host. + pub fn with_override(mut self, host: impl Into, interval: Duration) -> Self { + self.overrides.insert(host.into(), interval); + self + } + + /// Block until the per-host budget allows the next request to + /// `url`'s host. Returns an error only when the URL has no host + /// (malformed input). + pub async fn wait_for(&self, url: &str) -> anyhow::Result<()> { + let host = host_of(url) + .ok_or_else(|| anyhow::anyhow!("no host in url: {url}"))?; + let limiter = { + let mut map = self.map.lock().await; + map.entry(host.clone()) + .or_insert_with(|| { + let interval = self + .overrides + .get(&host) + .copied() + .unwrap_or(self.default_interval); + Arc::new(Mutex::new(RateLimiter::new(interval))) + }) + .clone() + }; + limiter.lock().await.wait().await; + Ok(()) + } +} + +/// Extract the host (no port) from a URL string. Returns `None` for +/// inputs without a `scheme://host` shape — those would never have +/// reached the network layer anyway. +fn host_of(url: &str) -> Option { + let after_scheme = url.split_once("://")?.1; + let host_with_port = after_scheme.split('/').next()?; + let host = host_with_port.rsplit_once(':').map_or(host_with_port, |(h, _)| h); + (!host.is_empty()).then(|| host.to_ascii_lowercase()) +} + #[cfg(test)] mod tests { use super::*; @@ -66,4 +141,44 @@ mod tests { // Already 250ms past — no further wait needed. assert_eq!(Instant::now() - t0, Duration::ZERO); } + + #[test] + fn host_of_parses_scheme_path_and_port() { + assert_eq!(host_of("https://Example.com/path").as_deref(), Some("example.com")); + assert_eq!(host_of("http://cdn.foo.bar/img.jpg").as_deref(), Some("cdn.foo.bar")); + assert_eq!(host_of("http://localhost:8080/x").as_deref(), Some("localhost")); + assert!(host_of("not a url").is_none()); + } + + #[tokio::test(start_paused = true)] + async fn host_rate_limiters_pace_per_host() { + // Two hosts at 100ms each. Two consecutive calls to the SAME + // host wait 100ms total. Two consecutive calls to DIFFERENT + // hosts both fire immediately. + let rl = HostRateLimiters::new(Duration::from_millis(100)); + + let t0 = Instant::now(); + rl.wait_for("https://a.example/x").await.unwrap(); + rl.wait_for("https://b.example/y").await.unwrap(); + assert_eq!(Instant::now() - t0, Duration::ZERO, "different hosts don't contend"); + + let t1 = Instant::now(); + rl.wait_for("https://a.example/x").await.unwrap(); + assert_eq!( + Instant::now() - t1, + Duration::from_millis(100), + "second call to same host waits a full interval" + ); + } + + #[tokio::test(start_paused = true)] + async fn host_rate_limiters_honor_overrides() { + let rl = HostRateLimiters::new(Duration::from_millis(1000)) + .with_override("fast.example", Duration::from_millis(100)); + + rl.wait_for("https://fast.example/a").await.unwrap(); + let t0 = Instant::now(); + rl.wait_for("https://fast.example/b").await.unwrap(); + assert_eq!(Instant::now() - t0, Duration::from_millis(100)); + } } diff --git a/backend/src/crawler/session.rs b/backend/src/crawler/session.rs new file mode 100644 index 0000000..1b257fc --- /dev/null +++ b/backend/src/crawler/session.rs @@ -0,0 +1,161 @@ +//! PHPSESSID injection + login probe. +//! +//! The catalog site we crawl renders chapter pages as a single multi- +//! page list only for logged-in users. We don't try to bypass the +//! login (CAPTCHA wall) — instead the operator pastes their browser's +//! `PHPSESSID` cookie into `CRAWLER_PHPSESSID` and the crawler injects +//! it into Chromium *and* reqwest before the first navigation. +//! +//! Two things the cookie alone doesn't give us: +//! 1. The cookie value is only meaningful to the *server* — we have +//! no way to predict from the value alone whether it's still valid. +//! `verify_session` does a navigation and checks for `#avatar_menu`, +//! which only renders for authenticated visitors. Bail clean at +//! startup if it's missing rather than discovering it 30 minutes +//! into a backfill. +//! 2. The reqwest client (used for cover and chapter-image downloads) +//! has its own cookie store; we seed it for the catalog host only. +//! CDN hosts are deliberately *not* given the cookie — they serve +//! image bytes by signed URLs and don't need it. + +use anyhow::{anyhow, Context}; +use chromiumoxide::browser::Browser; +use chromiumoxide::cdp::browser_protocol::network::CookieParam; + +/// Compute the cookie domain (e.g. `.example.com`) from a start URL. +/// The leading dot makes the cookie cover every subdomain — the source +/// often redirects between `www.` and other prefixes mid-crawl, and a +/// host-only cookie would silently drop on the cross-subdomain hop. +/// +/// Caveat: this takes the last two dot-labels, which is wrong for +/// multi-part TLDs (`.co.uk`, `.com.br` would resolve to `.co.uk` and +/// attach to every site on `.co.uk`). For those, the operator should +/// override via `CRAWLER_COOKIE_DOMAIN` rather than relying on this +/// function — pulling in the Public Suffix List for one knob isn't +/// worth it yet. +pub fn registrable_domain(url: &str) -> Option { + let after_scheme = url.split_once("://")?.1; + let host_with_port = after_scheme.split('/').next()?; + let host = host_with_port + .rsplit_once(':') + .map_or(host_with_port, |(h, _)| h) + .to_ascii_lowercase(); + if host.is_empty() { + return None; + } + let labels: Vec<&str> = host.split('.').filter(|l| !l.is_empty()).collect(); + if labels.len() < 2 { + // Bare hostname (e.g. `localhost`) — return as-is, no leading + // dot. Setting `.localhost` as cookie domain is invalid. + return Some(host); + } + let registrable = &labels[labels.len() - 2..]; + Some(format!(".{}", registrable.join("."))) +} + +/// Inject the PHPSESSID cookie into the browser's cookie store for the +/// catalog domain. Must be called before any navigation that depends on +/// authentication; subsequent navigations include the cookie +/// automatically. +pub async fn inject_phpsessid( + browser: &Browser, + sid: &str, + cookie_domain: &str, +) -> anyhow::Result<()> { + let cookie = CookieParam { + name: "PHPSESSID".to_string(), + value: sid.to_string(), + url: None, + domain: Some(cookie_domain.to_string()), + path: Some("/".to_string()), + secure: None, + http_only: Some(true), + same_site: None, + expires: None, + priority: None, + same_party: None, + source_scheme: None, + source_port: None, + partition_key: None, + }; + browser + .set_cookies(vec![cookie]) + .await + .context("set PHPSESSID in chromium cookie store")?; + tracing::info!(domain = cookie_domain, "injected PHPSESSID into browser"); + Ok(()) +} + +/// Navigate to `probe_url` and confirm the logged-in `#avatar_menu` +/// element is present. The selector only renders for authenticated +/// visitors, so its absence is the unambiguous signal that PHPSESSID +/// is missing, expired, or revoked. +/// +/// This burns one navigation against the catalog's rate limiter. The +/// trade is worth it — failing here costs ~1s; failing 30 minutes into +/// a backfill costs 30 minutes. +pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> { + let page = browser + .new_page(probe_url) + .await + .with_context(|| format!("open probe page {probe_url}"))?; + page.wait_for_navigation().await.context("wait for nav on probe")?; + // The avatar menu is rendered server-side as part of the header + // when a valid session cookie is present; absent JS is fine. + let found = page.find_element("#avatar_menu").await.is_ok(); + page.close().await.ok(); + if found { + tracing::info!("session probe ok — #avatar_menu present"); + Ok(()) + } else { + Err(anyhow!( + "session probe failed — #avatar_menu not present at {probe_url}; \ + PHPSESSID is missing, expired, or revoked. Refresh CRAWLER_PHPSESSID \ + and re-run." + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn registrable_domain_strips_subdomain() { + assert_eq!( + registrable_domain("https://www.target-site.com/manga/foo/").as_deref(), + Some(".target-site.com") + ); + assert_eq!( + registrable_domain("https://m.example.org").as_deref(), + Some(".example.org") + ); + } + + #[test] + fn registrable_domain_keeps_two_label_host() { + assert_eq!( + registrable_domain("https://example.com/").as_deref(), + Some(".example.com") + ); + } + + #[test] + fn registrable_domain_handles_port() { + assert_eq!( + registrable_domain("http://www.foo.bar:8080/x").as_deref(), + Some(".foo.bar") + ); + } + + #[test] + fn registrable_domain_bare_hostname_no_leading_dot() { + // .localhost would be invalid as a cookie Domain. + assert_eq!(registrable_domain("http://localhost:5173").as_deref(), Some("localhost")); + } + + #[test] + fn registrable_domain_returns_none_for_garbage() { + assert!(registrable_domain("not a url").is_none()); + } +} diff --git a/backend/src/crawler/source.rs b/backend/src/crawler/source.rs index 39defa6..cec6613 100644 --- a/backend/src/crawler/source.rs +++ b/backend/src/crawler/source.rs @@ -74,12 +74,12 @@ pub struct SourceChapter { } /// Context passed to every `Source` call. Carries the browser handle -/// plus a shared rate limiter so impls that issue multiple requests in -/// one call (e.g. pagination walks) honor the same per-host budget as -/// the outer job loop. +/// plus the per-host rate-limiter map so impls that issue multiple +/// requests in one call (pagination walks, multi-page chapter image +/// fetches) honor the right budget for each origin. pub struct FetchContext<'a> { pub browser: &'a Browser, - pub rate: &'a tokio::sync::Mutex, + pub rate: &'a crate::crawler::rate_limit::HostRateLimiters, } #[async_trait] diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs index 375ba69..17e2b3f 100644 --- a/backend/src/crawler/source/target.rs +++ b/backend/src/crawler/source/target.rs @@ -149,10 +149,10 @@ fn truncate_to_cap(mut buf: Vec, max: Option) -> Vec { } /// Single point of rate-limited navigation. Every Source request goes -/// through here, so the limiter is the only knob that controls -/// per-host RPS. +/// through here, so the per-host limiter map is the only knob that +/// controls per-origin RPS. async fn navigate(ctx: &FetchContext<'_>, url: &str) -> anyhow::Result { - ctx.rate.lock().await.wait().await; + ctx.rate.wait_for(url).await?; let page = ctx.browser.new_page(url).await?; page.wait_for_navigation().await?; // Stopgap until we wait on a specific selector per page type — diff --git a/frontend/package.json b/frontend/package.json index 338a1c5..8867d66 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.24.0", + "version": "0.25.0", "private": true, "type": "module", "scripts": {