//! Chapter content sync — fetch a logged-in chapter page, extract its //! image URLs in `pageN` order, download each to storage, and atomically //! persist a `pages` row per image plus the chapter's `page_count`. //! //! Only chapters belonging to a manga someone has bookmarked are //! candidates. The crawler scans bookmarks at the start of each run and //! enqueues unfetched chapters; the API also enqueues at bookmark-time //! so users get instant feedback. Both feed into the same queue and //! dedup by chapter id. // Implementation lands in the next commits in this branch. Module is // declared so other crates can `use crawler::content` without breaking // builds while iteration is in progress. use anyhow::Context; use sqlx::PgPool; use uuid::Uuid; use crate::crawler::detect::PageError; use crate::crawler::rate_limit::HostRateLimiters; use crate::crawler::session; use crate::storage::Storage; /// Parse the chapter page DOM and return the page images in `pageN` /// order. Filters out the loader `` and any /// `` without a numeric `id="pageN"`. /// /// Reader pages don't render the site's `#logo` element, so the /// universal logo-sentinel can't apply here — instead we assert /// `a#pic_container` is present. Its absence means the response is the /// transient broken-page response (or a redirect to some other layout) /// and the caller should retry. pub fn parse_chapter_pages(html: &str) -> Result, PageError> { let doc = scraper::Html::parse_document(html); let container_sel = scraper::Selector::parse("a#pic_container").unwrap(); if doc.select(&container_sel).next().is_none() { return Err(PageError::transient("reader: a#pic_container missing")); } let sel = scraper::Selector::parse("a#pic_container img:not(.loading)").unwrap(); let mut pages: Vec = doc .select(&sel) .filter_map(|img| { let id = img.value().id()?; let n: i32 = id.strip_prefix("page")?.parse().ok()?; let src = img.value().attr("src")?.trim().to_string(); if src.is_empty() { return None; } Some(ChapterImage { page_number: n, url: src }) }) .collect(); pages.sort_by_key(|p| p.page_number); Ok(pages) } #[derive(Debug, Clone, PartialEq, Eq)] pub struct ChapterImage { pub page_number: i32, pub url: String, } /// Outcome of a single chapter sync — surfaced to callers for logging /// and exit-code decisions. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SyncOutcome { /// All images downloaded and stored, chapter row updated. Fetched { pages: usize }, /// `page_count > 0` already — no-op unless force_refetch is set. Skipped, /// Session probe failed mid-sync (avatar selector missing on the /// chapter page). Caller should abort the whole crawler run. SessionExpired, } /// Fetch all images for one chapter and persist them atomically. On /// any error after the first storage put, the DB transaction rolls /// back so the chapter stays at `page_count = 0` and is retried on the /// next run. Bytes already written to storage become orphans; a future /// reaper sweeps them. #[allow(clippy::too_many_arguments)] pub async fn sync_chapter_content( browser: &chromiumoxide::Browser, db: &PgPool, storage: &dyn Storage, http: &reqwest::Client, rate: &HostRateLimiters, chapter_id: Uuid, manga_id: Uuid, source_url: &str, force_refetch: bool, ) -> anyhow::Result { // Skip if already fetched, unless caller explicitly forces. if !force_refetch { let (page_count,): (i32,) = sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1") .bind(chapter_id) .fetch_one(db) .await .context("read chapter page_count")?; if page_count > 0 { return Ok(SyncOutcome::Skipped); } } // Nav to chapter page (rate-limited per host). rate.wait_for(source_url).await?; let page = browser .new_page(source_url) .await .with_context(|| format!("open chapter page {source_url}"))?; page.wait_for_navigation().await.context("wait for chapter nav")?; // Session probe: avatar present == still logged in. Missing means // PHPSESSID expired; bail the entire crawler run. if page.find_element("#avatar_menu").await.is_err() { page.close().await.ok(); return Ok(SyncOutcome::SessionExpired); } let html = page.content().await.context("read chapter html")?; page.close().await.ok(); let images = parse_chapter_pages(&html) .with_context(|| format!("parse chapter pages at {source_url}"))?; if images.is_empty() { anyhow::bail!("no page images parsed from {source_url}"); } // Resolve image URLs against the chapter URL (they may be relative). let base = reqwest::Url::parse(source_url).context("parse chapter URL")?; // Fetch every image bytes-first into memory before writing // anything. Lets us bail the whole chapter cleanly if any image // fails — DB stays at page_count=0, no partial rows persisted. let mut fetched: Vec<(i32, Vec, &'static str)> = Vec::with_capacity(images.len()); for img in &images { let url = base.join(&img.url).with_context(|| { format!("join image URL {} onto {source_url}", img.url) })?; rate.wait_for(url.as_str()).await?; let resp = http .get(url.clone()) // Source CDNs commonly check Referer. Set it to the // chapter page — matches what the browser would send. .header(reqwest::header::REFERER, source_url) .send() .await .with_context(|| format!("GET {url}"))? .error_for_status() .with_context(|| format!("non-2xx for {url}"))?; let bytes = resp.bytes().await.context("read image body")?.to_vec(); let ext = infer::get(&bytes).map(|k| k.extension()).unwrap_or("bin"); fetched.push((img.page_number, bytes, ext)); } // Atomic write: storage puts + page row inserts + page_count // update, all in one transaction. If anything fails, rollback + // the chapter is retried next run. Storage orphans the bytes; a // reaper sweeps them later. let mut tx = db.begin().await.context("open chapter sync tx")?; for (page_number, bytes, ext) in &fetched { let key = format!( "mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}", page_number ); storage .put(&key, bytes) .await .with_context(|| format!("put {key}"))?; // (chapter_id, page_number) is unique — re-runs idempotent. sqlx::query( "INSERT INTO pages (chapter_id, page_number, storage_key, content_type) VALUES ($1, $2, $3, $4) ON CONFLICT (chapter_id, page_number) DO UPDATE SET storage_key = EXCLUDED.storage_key, content_type = EXCLUDED.content_type", ) .bind(chapter_id) .bind(page_number) .bind(&key) .bind(format!("image/{ext}")) .execute(&mut *tx) .await .with_context(|| format!("insert page row {page_number}"))?; } sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2") .bind(fetched.len() as i32) .bind(chapter_id) .execute(&mut *tx) .await .context("update page_count")?; tx.commit().await.context("commit chapter sync")?; Ok(SyncOutcome::Fetched { pages: fetched.len() }) } // Suppress unused-import warning for `session` until the bin/crawler // wiring lands in this branch and uses it through this module. #[allow(dead_code)] fn _keep_session_in_scope() { let _ = session::registrable_domain; } #[cfg(test)] mod tests { use super::*; #[test] fn parse_chapter_pages_skips_loader_and_sorts_by_id() { // Loader image, two real pages out of order, and one with no id. let html = r#" "#; let pages = parse_chapter_pages(html).expect("parse"); assert_eq!(pages.len(), 2); assert_eq!(pages[0].page_number, 1); assert_eq!(pages[0].url, "https://cdn/1.jpg"); assert_eq!(pages[1].page_number, 2); assert_eq!(pages[1].url, "https://cdn/2.jpg"); } #[test] fn parse_chapter_pages_drops_images_without_src() { let html = r#" "#; let pages = parse_chapter_pages(html).expect("parse"); assert_eq!(pages.len(), 1); assert_eq!(pages[0].page_number, 2); } #[test] fn parse_chapter_pages_handles_three_digit_page_ids() { let html = r#" "#; let pages = parse_chapter_pages(html).expect("parse"); assert_eq!( pages.iter().map(|p| p.page_number).collect::>(), vec![9, 50, 126] ); } #[test] fn parse_chapter_pages_returns_transient_when_container_missing() { // Reader doesn't render #logo, so the universal logo sentinel // can't be used here — a#pic_container is the reader-specific // marker. Broken-page response trips this. let html = "\

we're sorry, the request file are not found.

\ "; let err = parse_chapter_pages(html).expect_err("expected Transient"); assert!(err.is_transient(), "got non-transient: {err}"); } }