diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 18c40f1..0c14ad0 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1415,7 +1415,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.22.0" +version = "0.23.0" dependencies = [ "anyhow", "argon2", @@ -1433,6 +1433,7 @@ dependencies = [ "infer", "mime", "rand 0.8.6", + "reqwest", "scraper", "serde", "serde_json", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 19e08c2..fbf29c9 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,7 +1,8 @@ [package] name = "mangalord" -version = "0.22.0" +version = "0.23.0" edition = "2021" +default-run = "mangalord" [lib] path = "src/lib.rs" @@ -44,6 +45,7 @@ futures-util = "0.3" bytes = "1" chromiumoxide = { version = "0.7", features = ["tokio-runtime", "_fetcher-rusttls-tokio"], default-features = false } scraper = "0.20" +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "socks"] } [dev-dependencies] tempfile = "3" @@ -51,3 +53,4 @@ tower = { version = "0.5", features = ["util"] } http-body-util = "0.1" mime = "0.3" futures-util = "0.3" +tokio = { version = "1", features = ["test-util"] } diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 42e8227..e667ef6 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -1,29 +1,329 @@ //! Crawler binary. //! -//! Today: a thin shell that launches Chromium via the shared -//! `crawler::browser` module and exits. Useful as an ad-hoc smoke test -//! for the launcher in addition to the integration test in -//! `tests/crawler_browser_smoke.rs`. +//! Walks the source's manga listing (all pages), fetches each manga's +//! metadata + chapter list, downloads the cover into `Storage`, and +//! reconciles everything into the DB. Chapter *content* (page images) +//! is out of scope for now — only chapter rows + their source links +//! are written. //! -//! Future: reads config, picks `Source` impls, runs the job loop. +//! Configuration: +//! - **Start URL** (required): first CLI positional arg, else +//! `$CRAWLER_START_URL`. This is the manga *list* page (page 1). +//! - **Database** (required): `$DATABASE_URL`. +//! - **Storage dir**: `$STORAGE_DIR`, default `./data/storage` — +//! matches the API binary so both write to the same local tree. +//! - **Browser**: see `LaunchOptions::from_env` — +//! `CRAWLER_BROWSER_MODE` (`headed`|`headless`) and +//! `CRAWLER_BROWSER_ARGS`. +//! - **Rate limit**: `CRAWLER_RATE_MS` (ms between requests, default +//! `1000`). +//! - **Cap**: `CRAWLER_LIMIT` (max manga detail fetches per run, +//! default `0` = no cap). +//! - **Skip chapters**: `CRAWLER_SKIP_CHAPTERS=1` — turn off the +//! chapter selector in the parser AND skip the per-manga +//! `sync_manga_chapters` write. Use this for "metadata only" runs. +//! - **Proxy**: `$CRAWLER_PROXY` — single URL applied to both +//! Chromium (`--proxy-server`) and `reqwest::Proxy::all`. Supports +//! `http://`, `https://`, and `socks5://` (with optional user:pass). +//! Example: `socks5://user:pass@host:1080`. Unset → direct. -use mangalord::crawler::browser::{self, LaunchOptions}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, Context}; +use mangalord::crawler::{ + browser::{self, LaunchOptions}, + rate_limit::RateLimiter, + source::{target::TargetSource, DiscoverMode, FetchContext, Source}, +}; +use mangalord::repo; +use mangalord::storage::{LocalStorage, Storage}; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; +use tokio::sync::Mutex; use tracing_subscriber::EnvFilter; +use uuid::Uuid; #[tokio::main] async fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); tracing_subscriber::fmt() .with_env_filter( - EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "info,mangalord=debug".into()), + EnvFilter::try_from_default_env().unwrap_or_else(|_| { + "info,mangalord=debug,chromiumoxide::conn=off,chromiumoxide::handler=off" + .into() + }), ) .init(); - let options = LaunchOptions::from_env(); - tracing::info!(?options, "launching browser"); - let handle = browser::launch(options).await?; - tracing::info!("browser launched; closing"); - handle.close().await?; + let start_url = resolve_start_url()?; + let database_url = std::env::var("DATABASE_URL") + .map_err(|_| anyhow!("DATABASE_URL must be set"))?; + let storage_dir: PathBuf = std::env::var("STORAGE_DIR") + .unwrap_or_else(|_| "./data/storage".to_string()) + .into(); + let rate_ms = env_u64("CRAWLER_RATE_MS", 1000); + let limit = env_u64("CRAWLER_LIMIT", 0) as usize; + let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false); + let proxy_url = std::env::var("CRAWLER_PROXY") + .ok() + .filter(|s| !s.trim().is_empty()); + + let db = PgPoolOptions::new() + .max_connections(5) + .connect(&database_url) + .await + .context("connect to database")?; + sqlx::migrate!("./migrations").run(&db).await?; + + let storage: Arc = Arc::new(LocalStorage::new(&storage_dir)); + + // `no_proxy()` disables reqwest's own env-based detection so the + // single `CRAWLER_PROXY` knob is the only thing that influences + // routing. Otherwise an unrelated `HTTPS_PROXY` in the shell would + // silently route cover downloads while the browser stayed direct. + let mut http_builder = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .no_proxy(); + if let Some(proxy) = &proxy_url { + http_builder = http_builder + .proxy(reqwest::Proxy::all(proxy).with_context(|| format!("parse proxy URL: {proxy}"))?); + } + let http = http_builder.build().context("build http client")?; + + let mut options = LaunchOptions::from_env(); + if let Some(proxy) = &proxy_url { + options.extra_args.push(format!("--proxy-server={proxy}")); + } + tracing::info!( + ?options, + %start_url, + rate_ms, + limit, + skip_chapters, + proxy = ?proxy_url, + storage_dir = %storage_dir.display(), + "starting crawler" + ); + + let handle = browser::launch(options).await.context("launch browser")?; + let result = run( + handle.browser(), + &db, + storage.as_ref(), + &http, + &start_url, + rate_ms, + limit, + skip_chapters, + ) + .await; + handle.close().await.ok(); + result +} + +async fn run( + browser: &chromiumoxide::Browser, + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + start_url: &str, + rate_ms: u64, + limit: usize, + skip_chapters: bool, +) -> anyhow::Result<()> { + let rate = Mutex::new(RateLimiter::new(Duration::from_millis(rate_ms))); + let source = { + let s = TargetSource::new(start_url.to_string()); + if skip_chapters { + s.without_chapter_parsing() + } else { + s + } + }; + let ctx = FetchContext { + browser, + rate: &rate, + }; + + let source_id = source.id(); + repo::crawler::ensure_source( + db, + source_id, + "Target Site", + &origin_of(start_url).unwrap_or_else(|| start_url.to_string()), + ) + .await + .context("ensure_source")?; + + let run_started_at = chrono::Utc::now(); + + let max_refs = (limit > 0).then_some(limit); + tracing::info!(?max_refs, "discovering manga list"); + let refs = source + .discover(&ctx, DiscoverMode::Backfill, max_refs) + .await + .context("discover failed")?; + tracing::info!(count = refs.len(), "discovered manga list"); + + let to_fetch = refs; + let total = to_fetch.len(); + + for (i, r) in to_fetch.iter().enumerate() { + tracing::info!(idx = i + 1, total, key = %r.source_manga_key, "fetching metadata"); + let manga = match source.fetch_manga(&ctx, r).await { + Ok(m) => m, + Err(e) => { + tracing::warn!(key = %r.source_manga_key, url = %r.url, error = ?e, "fetch_manga failed"); + continue; + } + }; + + let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga) + .await + { + Ok(u) => u, + Err(e) => { + tracing::error!(key = %r.source_manga_key, error = ?e, "upsert_manga_from_source failed"); + continue; + } + }; + tracing::info!( + key = %manga.source_manga_key, + manga_id = %upsert.manga_id, + status = ?upsert.status, + title = %manga.title, + "manga upserted" + ); + + // Cover image: download when missing in storage (backfill for + // mangas synced before cover-download support, plus the New + // path) or when metadata changed (cover URL is part of + // metadata_hash, so an Updated status implies the URL may + // have moved). Failures are non-fatal. + let needs_cover = upsert.cover_image_path.is_none() + || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); + if needs_cover { + if let Some(cover_url) = manga.cover_url.as_deref() { + if let Err(e) = download_and_store_cover( + db, + storage, + http, + &rate, + &r.url, + upsert.manga_id, + cover_url, + ) + .await + { + tracing::warn!(manga_id = %upsert.manga_id, error = ?e, "cover download failed"); + } + } + } + + if !skip_chapters { + match repo::crawler::sync_manga_chapters( + db, + source_id, + upsert.manga_id, + &manga.chapters, + ) + .await + { + Ok(diff) => tracing::info!( + manga_id = %upsert.manga_id, + new = diff.new, + refreshed = diff.refreshed, + dropped = diff.dropped, + "chapters synced" + ), + Err(e) => tracing::warn!(manga_id = %upsert.manga_id, error = ?e, "chapter sync failed"), + } + } + } + + if limit == 0 { + match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await { + Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"), + Err(e) => tracing::warn!(error = ?e, "drop-pass failed"), + } + } else { + tracing::info!(limit, "partial sync — skipping drop pass"); + } + Ok(()) } + +async fn download_and_store_cover( + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + rate: &Mutex, + manga_url: &str, + manga_id: Uuid, + cover_url: &str, +) -> anyhow::Result<()> { + let absolute = reqwest::Url::parse(manga_url) + .context("parse manga URL")? + .join(cover_url) + .context("join cover URL onto manga URL")?; + + rate.lock().await.wait().await; + let resp = http + .get(absolute.clone()) + .send() + .await + .with_context(|| format!("GET {absolute}"))? + .error_for_status() + .with_context(|| format!("non-2xx for {absolute}"))?; + let bytes = resp.bytes().await.context("read cover body")?; + + // `infer` sniffs the magic bytes — same crate the upload handler + // uses, so we don't trust the URL's extension. + let kind = infer::get(&bytes); + let ext = kind.map(|k| k.extension()).unwrap_or("bin"); + let key = format!("mangas/{manga_id}/cover.{ext}"); + + storage + .put(&key, &bytes) + .await + .with_context(|| format!("store cover at {key}"))?; + repo::manga::set_cover_image_path(db, manga_id, &key) + .await + .with_context(|| format!("update cover_image_path for {manga_id}"))?; + tracing::info!(manga_id = %manga_id, key = %key, bytes = bytes.len(), %absolute, "cover stored"); + Ok(()) +} + +fn resolve_start_url() -> anyhow::Result { + if let Some(arg) = std::env::args().nth(1) { + return Ok(arg); + } + std::env::var("CRAWLER_START_URL").map_err(|_| { + anyhow!( + "start URL is required — pass as first CLI arg or set $CRAWLER_START_URL" + ) + }) +} + +fn origin_of(url: &str) -> Option { + let (scheme, rest) = url.split_once("://")?; + let host = rest.split('/').next()?; + Some(format!("{scheme}://{host}")) +} + +fn env_u64(name: &str, default: u64) -> u64 { + std::env::var(name) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(default) +} + +fn env_bool(name: &str, default: bool) -> bool { + match std::env::var(name).ok().as_deref() { + Some("1") | Some("true") | Some("TRUE") | Some("yes") => true, + Some("0") | Some("false") | Some("FALSE") | Some("no") => false, + _ => default, + } +} diff --git a/backend/src/crawler/browser.rs b/backend/src/crawler/browser.rs index 68e1846..1e419da 100644 --- a/backend/src/crawler/browser.rs +++ b/backend/src/crawler/browser.rs @@ -18,6 +18,7 @@ use std::path::PathBuf; use anyhow::Context; use chromiumoxide::browser::{Browser, BrowserConfig}; +use chromiumoxide::error::CdpError; use chromiumoxide::fetcher::{BrowserFetcher, BrowserFetcherOptions}; use futures_util::StreamExt; use tokio::task::JoinHandle; @@ -169,8 +170,16 @@ pub async fn launch(options: LaunchOptions) -> anyhow::Result { let driver = tokio::spawn(async move { while let Some(event) = handler.next().await { - if let Err(err) = event { - tracing::warn!(?err, "chromium handler event error"); + match event { + Ok(_) => {} + // chromiumoxide 0.7 ships fixed CDP type bindings, so any + // CDP event Chrome added later fails to deserialize. The + // connection is unaffected — these are noise. Suppress + // them so real failures stay visible. + Err(CdpError::Serde(_)) => { + tracing::trace!("chromium emitted an unrecognized CDP event"); + } + Err(err) => tracing::warn!(?err, "chromium handler event error"), } } }); diff --git a/backend/src/crawler/mod.rs b/backend/src/crawler/mod.rs index f6a962e..4531be8 100644 --- a/backend/src/crawler/mod.rs +++ b/backend/src/crawler/mod.rs @@ -16,4 +16,5 @@ pub mod browser; pub mod diff; pub mod jobs; +pub mod rate_limit; pub mod source; diff --git a/backend/src/crawler/rate_limit.rs b/backend/src/crawler/rate_limit.rs new file mode 100644 index 0000000..845be5c --- /dev/null +++ b/backend/src/crawler/rate_limit.rs @@ -0,0 +1,69 @@ +//! Per-host request pacing. +//! +//! Single-token bucket: each `wait().await` either returns immediately +//! (if at least `interval` has elapsed since the last call) or sleeps +//! just enough to satisfy it. Uses `tokio::time::Instant` so tests can +//! run under `start_paused` virtual time without sleeping for real. + +use std::time::Duration; +use tokio::time::Instant; + +#[derive(Debug)] +pub struct RateLimiter { + interval: Duration, + last: Option, +} + +impl RateLimiter { + pub fn new(interval: Duration) -> Self { + Self { + interval, + last: None, + } + } + + pub async fn wait(&mut self) { + if let Some(last) = self.last { + let elapsed = last.elapsed(); + if elapsed < self.interval { + tokio::time::sleep(self.interval - elapsed).await; + } + } + self.last = Some(Instant::now()); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test(start_paused = true)] + async fn first_call_does_not_sleep() { + let mut rl = RateLimiter::new(Duration::from_millis(100)); + let t0 = Instant::now(); + rl.wait().await; + assert_eq!(Instant::now() - t0, Duration::ZERO); + } + + #[tokio::test(start_paused = true)] + async fn second_call_sleeps_to_fill_interval() { + let mut rl = RateLimiter::new(Duration::from_millis(100)); + let t0 = Instant::now(); + rl.wait().await; + rl.wait().await; + // Second call had to wait the full 100ms after the (instant) + // first call. + assert_eq!(Instant::now() - t0, Duration::from_millis(100)); + } + + #[tokio::test(start_paused = true)] + async fn no_sleep_if_interval_already_elapsed() { + let mut rl = RateLimiter::new(Duration::from_millis(100)); + rl.wait().await; + tokio::time::sleep(Duration::from_millis(250)).await; + let t0 = Instant::now(); + rl.wait().await; + // Already 250ms past — no further wait needed. + assert_eq!(Instant::now() - t0, Duration::ZERO); + } +} diff --git a/backend/src/crawler/source.rs b/backend/src/crawler/source.rs index 23e3279..39defa6 100644 --- a/backend/src/crawler/source.rs +++ b/backend/src/crawler/source.rs @@ -3,9 +3,8 @@ //! Job handlers depend on this trait, not on a concrete site. Adding a //! new site is: implement `Source`, register it in a `sources` table //! row, and the existing job pipeline picks it up unchanged. -//! -//! Scaffold only — the first concrete impl lands in a follow-up commit -//! once the target site is locked in. + +pub mod target; use async_trait::async_trait; use chromiumoxide::browser::Browser; @@ -34,8 +33,10 @@ pub struct SourceMangaRef { } /// Full metadata returned by `fetch_manga`. The hash is computed by the -/// source impl (typically over the normalized field set) and is the -/// signal `diff` uses to detect metadata updates. +/// source impl over the metadata-only field set (title through +/// cover_url) — chapter changes are tracked separately via +/// `chapter_sources`, so they intentionally do not affect +/// `metadata_hash`. #[derive(Clone, Debug)] pub struct SourceManga { pub source_manga_key: String, @@ -47,6 +48,10 @@ pub struct SourceManga { pub status: Option, pub summary: Option, pub cover_url: Option, + /// Chapters surfaced on the same page as the metadata. Sources + /// where the chapter list lives elsewhere can leave this empty + /// and supply it via `fetch_chapter_list` instead. + pub chapters: Vec, pub metadata_hash: String, } @@ -68,10 +73,13 @@ pub struct SourceChapter { pub page_urls: Vec, } -/// Context passed to every `Source` call. Owns the browser handle, so -/// impls can `browser.new_page(...)` without bringing their own. +/// Context passed to every `Source` call. Carries the browser handle +/// plus a shared rate limiter so impls that issue multiple requests in +/// one call (e.g. pagination walks) honor the same per-host budget as +/// the outer job loop. pub struct FetchContext<'a> { pub browser: &'a Browser, + pub rate: &'a tokio::sync::Mutex, } #[async_trait] @@ -79,10 +87,15 @@ pub trait Source: Send + Sync { /// Stable identifier — also the row key in the `sources` table. fn id(&self) -> &'static str; + /// Returns up to `max_results` manga refs in source order. Pass + /// `None` for an uncapped walk (full backfill / incremental sweep). + /// Implementations should stop paginating as soon as the cap is + /// reached so partial runs don't pay for pages they won't use. async fn discover( &self, ctx: &FetchContext<'_>, mode: DiscoverMode, + max_results: Option, ) -> anyhow::Result>; async fn fetch_manga( diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs new file mode 100644 index 0000000..633611d --- /dev/null +++ b/backend/src/crawler/source/target.rs @@ -0,0 +1,675 @@ +//! First concrete [`Source`] impl, modeled on the selectors of the +//! old Puppeteer crawler. The name "target" is a placeholder — rename +//! once the site is officially identified. +//! +//! `scraper`'s selector parser does not support `:has()` or +//! `:contains()`, so the labelled-`td` lookups from the old script +//! (`td:has(label:contains("Author:"))`) are implemented by walking +//! the parsed tree. + +use std::time::Duration; + +use anyhow::Context; +use async_trait::async_trait; +use sha2::{Digest, Sha256}; + +use super::{ + DiscoverMode, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga, + SourceMangaRef, +}; + +pub struct TargetSource { + base_url: String, + parse_chapters: bool, +} + +impl TargetSource { + pub fn new(base_url: impl Into) -> Self { + Self { + base_url: base_url.into(), + parse_chapters: true, + } + } + + pub fn base_url(&self) -> &str { + &self.base_url + } + + /// Skip the chapter-list selector when parsing detail pages. + /// The returned `SourceManga.chapters` will be empty even when the + /// page has a chapter table. Caller must also avoid calling + /// `repo::crawler::sync_manga_chapters` for these mangas — an + /// empty list would otherwise soft-drop the manga's existing + /// chapter rows. + pub fn without_chapter_parsing(mut self) -> Self { + self.parse_chapters = false; + self + } +} + +#[async_trait] +impl Source for TargetSource { + fn id(&self) -> &'static str { + "target" + } + + async fn discover( + &self, + ctx: &FetchContext<'_>, + mode: DiscoverMode, + max_results: Option, + ) -> anyhow::Result> { + // Always visit page 1 first because that's the only way to + // discover `last_page`. We cache the HTML so we don't have to + // re-navigate when the iteration reaches page 1 again. + let first_html = navigate(ctx, self.base_url.as_str()).await?; + let last_page = { + let doc = scraper::Html::parse_document(&first_html); + parse_last_page(&doc) + }; + + let backfill = matches!(mode, DiscoverMode::Backfill); + let order: Vec = match (last_page, backfill) { + (None, _) => vec![1], + // Backfill = oldest-first: walk pages last → 1, then + // reverse within each page (the listing is update_date + // DESC, so the bottom of the last page is the oldest + // entry the source still surfaces). + (Some(last), true) => (1..=last).rev().collect(), + (Some(last), false) => (1..=last).collect(), + }; + tracing::info!( + ?mode, + last_page = ?last_page, + page_count = order.len(), + "walking pagination" + ); + + let mut all = Vec::new(); + for page_num in order { + let html = if page_num == 1 { + first_html.clone() + } else { + navigate(ctx, &page_url(&self.base_url, page_num)).await? + }; + let mut page_refs = { + let doc = scraper::Html::parse_document(&html); + parse_manga_list_from(&doc) + }; + if backfill { + page_refs.reverse(); + } + tracing::info!(page_num, count = page_refs.len(), "page walked"); + all.extend(page_refs); + if cap_reached(&all, max_results) { + tracing::info!(cap = ?max_results, "max_results reached; halting pagination"); + break; + } + } + + Ok(truncate_to_cap(all, max_results)) + } + + async fn fetch_manga( + &self, + ctx: &FetchContext<'_>, + r: &SourceMangaRef, + ) -> anyhow::Result { + let html = navigate(ctx, r.url.as_str()).await?; + parse_manga_detail(&html, &r.source_manga_key, self.parse_chapters) + .with_context(|| format!("parse manga detail at {}", r.url)) + } + + async fn fetch_chapter_list( + &self, + _ctx: &FetchContext<'_>, + _manga: &SourceManga, + ) -> anyhow::Result> { + anyhow::bail!("fetch_chapter_list not implemented yet") + } + + async fn fetch_chapter( + &self, + _ctx: &FetchContext<'_>, + _r: &SourceChapterRef, + ) -> anyhow::Result { + anyhow::bail!("fetch_chapter not implemented yet") + } +} + +fn cap_reached(buf: &[T], max: Option) -> bool { + matches!(max, Some(m) if buf.len() >= m) +} + +fn truncate_to_cap(mut buf: Vec, max: Option) -> Vec { + if let Some(m) = max { + buf.truncate(m); + } + buf +} + +/// Single point of rate-limited navigation. Every Source request goes +/// through here, so the limiter is the only knob that controls +/// per-host RPS. +async fn navigate(ctx: &FetchContext<'_>, url: &str) -> anyhow::Result { + ctx.rate.lock().await.wait().await; + let page = ctx.browser.new_page(url).await?; + page.wait_for_navigation().await?; + // Stopgap until we wait on a specific selector per page type — + // gives any post-load JS a beat to finish injecting content. + tokio::time::sleep(Duration::from_secs(1)).await; + let html = page.content().await?; + page.close().await?; + Ok(html) +} + +fn parse_last_page(doc: &scraper::Html) -> Option { + // Pagination links carry their page number as text. Take the + // numeric maximum so we don't depend on a specific layout (Prev, + // Next, ellipses, etc. all get filtered out by .parse). + let sel = scraper::Selector::parse("#left_side .pagination a").unwrap(); + doc.select(&sel) + .filter_map(|a| { + collapse_whitespace(&a.text().collect::()) + .parse::() + .ok() + }) + .max() +} + +/// Substitutes the first `/N/` path segment with the target page +/// number. Source impls that paginate via a different URL shape can +/// override this — for the modeled site the segment is always present. +fn page_url(template_url: &str, page: i32) -> String { + let bytes = template_url.as_bytes(); + let mut i = 0; + while i + 1 < bytes.len() { + if bytes[i] == b'/' && bytes[i + 1].is_ascii_digit() { + let start = i; + let mut j = i + 1; + while j < bytes.len() && bytes[j].is_ascii_digit() { + j += 1; + } + if j < bytes.len() && bytes[j] == b'/' { + let mut out = String::with_capacity(template_url.len() + 4); + out.push_str(&template_url[..start]); + out.push_str(&format!("/{page}/")); + out.push_str(&template_url[j + 1..]); + return out; + } + } + i += 1; + } + template_url.to_string() +} + +#[cfg(test)] +fn parse_manga_list(html: &str) -> Vec { + let doc = scraper::Html::parse_document(html); + parse_manga_list_from(&doc) +} + +fn parse_manga_list_from(doc: &scraper::Html) -> Vec { + let sel = scraper::Selector::parse("#left_side .pic_list .updatesli span a").unwrap(); + doc.select(&sel) + .filter_map(|a| { + let url = a.value().attr("href")?.trim().to_string(); + if url.is_empty() { + return None; + } + let title = collapse_whitespace(&a.text().collect::()); + if title.is_empty() { + return None; + } + Some(SourceMangaRef { + source_manga_key: derive_key_from_url(&url), + title, + url, + }) + }) + .collect() +} + +fn parse_manga_detail( + html: &str, + key: &str, + include_chapters: bool, +) -> anyhow::Result { + let doc = scraper::Html::parse_document(html); + + let title = first_text(&doc, ".w-title h1").context("missing .w-title h1")?; + let summary = first_text(&doc, ".manga_summary"); + let cover_url = first_attr(&doc, ".cover > img:nth-child(1)", "src"); + + let authors = links_in_labelled_td(&doc, "Author"); + let genres = links_in_labelled_td(&doc, "Genre"); + let raw_status = labelled_td_child_text(&doc, "Status", "span"); + let status = normalize_status(raw_status.as_deref(), key); + + let alternative_titles = labelled_td_value_after_label(&doc, "Alternative") + .map(|s| { + s.split([';', ',', '|']) + .map(str::trim) + .filter(|p| !p.is_empty()) + .map(String::from) + .collect() + }) + .unwrap_or_default(); + + let tag_sel = scraper::Selector::parse(".aside-body a.tag").unwrap(); + let tags: Vec = doc + .select(&tag_sel) + .map(|a| collapse_whitespace(&a.text().collect::())) + .map(|s| strip_tag_count(&s)) + .filter(|s| !s.is_empty()) + .collect(); + + let chapters = if include_chapters { + parse_chapter_list(&doc) + } else { + Vec::new() + }; + + let mut manga = SourceManga { + source_manga_key: key.to_string(), + title, + alternative_titles, + authors, + genres, + tags, + status, + summary, + cover_url, + chapters, + metadata_hash: String::new(), + }; + manga.metadata_hash = compute_metadata_hash(&manga); + Ok(manga) +} + +/// Source advertises status as "Ongoing" or "Completed"; we normalize +/// to the lowercase form the `mangas.status` CHECK constraint accepts. +/// Anything else is a parse miss (selector drift, new value, etc.) and +/// returns `None` after logging — the manga sync continues regardless. +fn normalize_status(raw: Option<&str>, key: &str) -> Option { + let trimmed = raw.map(str::trim).filter(|s| !s.is_empty())?; + if trimmed.eq_ignore_ascii_case("ongoing") { + Some("ongoing".to_string()) + } else if trimmed.eq_ignore_ascii_case("completed") { + Some("completed".to_string()) + } else { + tracing::error!( + key, + raw_status = trimmed, + "unknown manga status (expected 'Ongoing' or 'Completed'); continuing with status=None" + ); + None + } +} + +/// Strips a trailing digit-only `(NN)` suffix from a tag name, the form +/// the source uses to display tag counts. Non-numeric parentheses are +/// preserved. +fn strip_tag_count(s: &str) -> String { + let trimmed = s.trim(); + if trimmed.ends_with(')') { + if let Some(open) = trimmed.rfind('(') { + let inside = &trimmed[open + 1..trimmed.len() - 1]; + if !inside.is_empty() && inside.chars().all(|c| c.is_ascii_digit()) { + return trimmed[..open].trim().to_string(); + } + } + } + trimmed.to_string() +} + +fn parse_chapter_list(doc: &scraper::Html) -> Vec { + let sel = scraper::Selector::parse("#chapter_table td h4 a.chico").unwrap(); + doc.select(&sel) + .filter_map(|a| { + let url = a.value().attr("href")?.trim().to_string(); + if url.is_empty() { + return None; + } + let title_text = collapse_whitespace(&a.text().collect::()); + let number = parse_chapter_number(&title_text).unwrap_or(0); + Some(SourceChapterRef { + source_chapter_key: derive_key_from_url(&url), + number, + title: (!title_text.is_empty()).then_some(title_text), + url, + }) + }) + .collect() +} + +fn parse_chapter_number(text: &str) -> Option { + let mut buf = String::new(); + for c in text.chars() { + if c.is_ascii_digit() { + buf.push(c); + } else if !buf.is_empty() { + break; + } + } + buf.parse().ok() +} + +fn derive_key_from_url(url: &str) -> String { + url.split('?') + .next() + .unwrap_or(url) + .trim_end_matches('/') + .rsplit('/') + .find(|s| !s.is_empty()) + .unwrap_or(url) + .to_string() +} + +fn first_text(doc: &scraper::Html, sel: &str) -> Option { + let s = scraper::Selector::parse(sel).ok()?; + let el = doc.select(&s).next()?; + let text = collapse_whitespace(&el.text().collect::()); + (!text.is_empty()).then_some(text) +} + +fn first_attr(doc: &scraper::Html, sel: &str, attr: &str) -> Option { + let s = scraper::Selector::parse(sel).ok()?; + let el = doc.select(&s).next()?; + el.value().attr(attr).map(str::to_string) +} + +/// `td` whose contained `label` text begins with `label_prefix` — the +/// `scraper`-friendly equivalent of `td:has(label:contains("Foo"))`. +fn td_with_label<'a>( + doc: &'a scraper::Html, + label_prefix: &str, +) -> Option> { + let td_sel = scraper::Selector::parse("td").unwrap(); + let label_sel = scraper::Selector::parse("label").unwrap(); + for td in doc.select(&td_sel) { + for label in td.select(&label_sel) { + let text: String = label.text().collect(); + if text.trim().starts_with(label_prefix) { + return Some(td); + } + } + } + None +} + +fn links_in_labelled_td(doc: &scraper::Html, label_prefix: &str) -> Vec { + let Some(td) = td_with_label(doc, label_prefix) else { + return Vec::new(); + }; + let a_sel = scraper::Selector::parse("a").unwrap(); + td.select(&a_sel) + .map(|a| collapse_whitespace(&a.text().collect::())) + .filter(|s| !s.is_empty()) + .collect() +} + +fn labelled_td_child_text( + doc: &scraper::Html, + label_prefix: &str, + child_sel: &str, +) -> Option { + let td = td_with_label(doc, label_prefix)?; + let child = scraper::Selector::parse(child_sel).ok()?; + let el = td.select(&child).next()?; + let text = collapse_whitespace(&el.text().collect::()); + (!text.is_empty()).then_some(text) +} + +/// Returns the text content of the labelled `td` with the leading +/// "Label:" portion stripped — used for "Alternative:" which puts the +/// value directly in the cell rather than in a child element. +fn labelled_td_value_after_label( + doc: &scraper::Html, + label_prefix: &str, +) -> Option { + let td = td_with_label(doc, label_prefix)?; + let full: String = td.text().collect(); + let after = full.split_once(':').map(|(_, r)| r).unwrap_or(&full); + let trimmed = collapse_whitespace(after); + (!trimmed.is_empty()).then_some(trimmed) +} + +fn collapse_whitespace(s: &str) -> String { + s.split_whitespace().collect::>().join(" ") +} + +fn compute_metadata_hash(m: &SourceManga) -> String { + // Field separators are ASCII unit/record separators so a field + // containing a delimiter character can't be mistaken for two + // smaller fields. + let mut h = Sha256::new(); + fn feed(h: &mut Sha256, s: &str) { + h.update(s.as_bytes()); + h.update(b"\x1F"); + } + fn feed_list(h: &mut Sha256, xs: &[String]) { + for s in xs { + feed(h, s); + } + h.update(b"\x1E"); + } + feed(&mut h, &m.title); + feed_list(&mut h, &m.alternative_titles); + feed_list(&mut h, &m.authors); + feed_list(&mut h, &m.genres); + feed_list(&mut h, &m.tags); + feed(&mut h, m.status.as_deref().unwrap_or("")); + feed(&mut h, m.summary.as_deref().unwrap_or("")); + feed(&mut h, m.cover_url.as_deref().unwrap_or("")); + format!("{:x}", h.finalize()) +} + +#[cfg(test)] +mod tests { + use super::*; + + const LISTING_HTML: &str = r#" + +
+ +
+ + "#; + + const DETAIL_HTML: &str = r#" + +

Test Manga Title

+
+
A summary of the manga.
+ + + + + +
Author OneAuthor Two
ActionDrama
Ongoing
Alt Title 1; Alt Title 2
+ + + + + +

Ch.1

Ch.2 - The Beginning

Chapter 3: Onward

+ + "#; + + #[test] + fn parse_manga_list_extracts_title_url_and_derives_key() { + let refs = parse_manga_list(LISTING_HTML); + assert_eq!(refs.len(), 2, "third entry has empty href and is skipped"); + assert_eq!(refs[0].title, "Foo Manga"); + assert_eq!(refs[0].url, "https://target.example/manga/foo"); + assert_eq!(refs[0].source_manga_key, "foo"); + assert_eq!(refs[1].title, "Bar Baz"); + assert_eq!(refs[1].source_manga_key, "bar-baz"); + } + + #[test] + fn parse_manga_detail_pulls_all_fields() { + let m = parse_manga_detail(DETAIL_HTML, "test-key", true).expect("parse"); + assert_eq!(m.source_manga_key, "test-key"); + assert_eq!(m.title, "Test Manga Title"); + assert_eq!(m.summary.as_deref(), Some("A summary of the manga.")); + assert_eq!(m.authors, vec!["Author One", "Author Two"]); + assert_eq!(m.genres, vec!["Action", "Drama"]); + assert_eq!(m.status.as_deref(), Some("ongoing")); + assert_eq!(m.alternative_titles, vec!["Alt Title 1", "Alt Title 2"]); + // Counts in parentheses are stripped — "Fantasy (21)" → "Fantasy". + assert_eq!(m.tags, vec!["Fantasy", "Romance", "Action"]); + assert_eq!(m.cover_url.as_deref(), Some("/cover.jpg")); + assert!(!m.metadata_hash.is_empty()); + + assert_eq!(m.chapters.len(), 3); + assert_eq!(m.chapters[0].number, 1); + assert_eq!(m.chapters[0].title.as_deref(), Some("Ch.1")); + assert_eq!(m.chapters[0].url, "/manga/foo/chapter/1"); + assert_eq!(m.chapters[0].source_chapter_key, "1"); + assert_eq!(m.chapters[1].number, 2); + assert_eq!(m.chapters[1].title.as_deref(), Some("Ch.2 - The Beginning")); + assert_eq!(m.chapters[2].number, 3); + assert_eq!(m.chapters[2].title.as_deref(), Some("Chapter 3: Onward")); + } + + #[test] + fn status_normalized_case_insensitively() { + assert_eq!(normalize_status(Some("Ongoing"), "k").as_deref(), Some("ongoing")); + assert_eq!(normalize_status(Some("ONGOING"), "k").as_deref(), Some("ongoing")); + assert_eq!(normalize_status(Some(" completed "), "k").as_deref(), Some("completed")); + } + + #[test] + fn unknown_status_logs_and_returns_none() { + // Logging is observable in test output via tracing-test, but + // here we just assert the contract: unknown becomes None + // (and the manga is therefore still synced by the caller). + assert!(normalize_status(Some("Hiatus"), "k").is_none()); + assert!(normalize_status(Some(""), "k").is_none()); + assert!(normalize_status(None, "k").is_none()); + } + + #[test] + fn strip_tag_count_drops_trailing_digit_parens_only() { + assert_eq!(strip_tag_count("Fantasy (21)"), "Fantasy"); + assert_eq!(strip_tag_count(" Action (5) "), "Action"); + assert_eq!(strip_tag_count("Romance"), "Romance"); + // Non-numeric parens stay put. + assert_eq!(strip_tag_count("Slice of Life (sub)"), "Slice of Life (sub)"); + // Only the trailing paren is considered. + assert_eq!(strip_tag_count("Tag (a) (12)"), "Tag (a)"); + } + + #[test] + fn parse_chapter_number_grabs_first_integer_run() { + assert_eq!(parse_chapter_number("Ch.1"), Some(1)); + assert_eq!(parse_chapter_number("Chapter 12"), Some(12)); + assert_eq!(parse_chapter_number("Ch.2 - The Beginning"), Some(2)); + // Decimal chapters keep the integer part (i32 storage). + assert_eq!(parse_chapter_number("Ch.12.5"), Some(12)); + assert_eq!(parse_chapter_number("Special"), None); + } + + #[test] + fn parse_last_page_picks_highest_pagination_link() { + let html = r#" +
+ "#; + let doc = scraper::Html::parse_document(html); + assert_eq!(parse_last_page(&doc), Some(47)); + } + + #[test] + fn parse_last_page_none_when_no_pagination() { + let doc = scraper::Html::parse_document(""); + assert!(parse_last_page(&doc).is_none()); + } + + #[test] + fn page_url_substitutes_numeric_path_segment() { + assert_eq!( + page_url("https://site.example/list/1/?f=1&o=1&sortby=update_date&e=", 5), + "https://site.example/list/5/?f=1&o=1&sortby=update_date&e=" + ); + // No numeric segment → URL returned unchanged. + assert_eq!( + page_url("https://site.example/list/?f=1", 5), + "https://site.example/list/?f=1" + ); + } + + #[test] + fn derive_key_strips_trailing_slash_and_query() { + assert_eq!(derive_key_from_url("https://x.example/manga/foo/"), "foo"); + assert_eq!(derive_key_from_url("https://x.example/manga/foo?p=1"), "foo"); + assert_eq!(derive_key_from_url("/manga/bar"), "bar"); + } + + #[test] + fn metadata_hash_is_stable_and_field_sensitive() { + let base = parse_manga_detail(DETAIL_HTML, "k", true).unwrap(); + let again = parse_manga_detail(DETAIL_HTML, "k", true).unwrap(); + assert_eq!(base.metadata_hash, again.metadata_hash); + + // Same fields except status flipped — hash must change. + let altered_html = DETAIL_HTML.replace("Ongoing", "Completed"); + let altered = parse_manga_detail(&altered_html, "k", true).unwrap(); + assert_ne!(base.metadata_hash, altered.metadata_hash); + } + + #[test] + fn missing_optional_fields_parse_to_none() { + let html = r#"

Minimal

"#; + let m = parse_manga_detail(html, "min", true).unwrap(); + assert_eq!(m.title, "Minimal"); + assert!(m.summary.is_none()); + assert!(m.status.is_none()); + assert!(m.authors.is_empty()); + assert!(m.genres.is_empty()); + assert!(m.tags.is_empty()); + assert!(m.alternative_titles.is_empty()); + assert!(m.chapters.is_empty()); + } + + #[test] + fn parse_manga_detail_skips_chapters_when_disabled() { + // Same fixture that yields 3 chapters above; with include_chapters=false + // the chapter table is ignored and the rest of the metadata still parses. + let m = parse_manga_detail(DETAIL_HTML, "k", false).unwrap(); + assert!(m.chapters.is_empty(), "chapters should be empty when disabled"); + assert_eq!(m.title, "Test Manga Title", "other fields still parse"); + assert_eq!(m.authors, vec!["Author One", "Author Two"]); + } + + #[test] + fn parse_manga_detail_errors_on_missing_title() { + let html = "

nothing

"; + let err = parse_manga_detail(html, "x", true).unwrap_err(); + assert!(err.to_string().contains("missing .w-title h1")); + } +} diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs new file mode 100644 index 0000000..611949f --- /dev/null +++ b/backend/src/repo/crawler.rs @@ -0,0 +1,434 @@ +//! Persistence for crawled mangas. +//! +//! High-level operations: +//! - [`ensure_source`]: idempotent registration of a source row. +//! - [`upsert_manga_from_source`]: end-to-end "I saw this manga" — +//! creates or updates the `mangas` row, threads `manga_sources`, and +//! refreshes authors/genres/tags. Returns whether the manga is new, +//! updated (metadata_hash changed), or unchanged. +//! - [`sync_manga_chapters`]: per-manga chapter reconciliation. Adds +//! new ones, refreshes URLs on existing ones, soft-drops vanished. +//! - [`mark_dropped_mangas`]: end-of-run pass. Any manga from this +//! source whose `last_seen_at` is older than the run start is +//! soft-dropped. +//! +//! Each public function is a transaction boundary so a partial failure +//! mid-call leaves the DB in its pre-call state. + +use chrono::{DateTime, Utc}; +use sqlx::{PgPool, Postgres, Transaction}; +use uuid::Uuid; + +use crate::crawler::source::{SourceChapterRef, SourceManga}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UpsertStatus { + New, + Updated, + Unchanged, +} + +#[derive(Debug, Clone)] +pub struct UpsertedManga { + pub manga_id: Uuid, + pub status: UpsertStatus, + /// Current value of `mangas.cover_image_path` after the upsert. + /// `None` means the cover hasn't been downloaded yet — the caller + /// uses this to backfill covers for mangas that were synced before + /// cover-download support existed. + pub cover_image_path: Option, +} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] +pub struct ChapterDiff { + pub new: usize, + pub refreshed: usize, + pub dropped: usize, +} + +pub async fn ensure_source( + pool: &PgPool, + id: &str, + name: &str, + base_url: &str, +) -> sqlx::Result<()> { + sqlx::query( + r#" + INSERT INTO sources (id, name, base_url, enabled) + VALUES ($1, $2, $3, true) + ON CONFLICT (id) DO UPDATE + SET name = EXCLUDED.name, + base_url = EXCLUDED.base_url + "#, + ) + .bind(id) + .bind(name) + .bind(base_url) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn upsert_manga_from_source( + pool: &PgPool, + source_id: &str, + source_url: &str, + sm: &SourceManga, +) -> sqlx::Result { + let mut tx = pool.begin().await?; + + let existing: Option<(Uuid, Option)> = sqlx::query_as( + r#" + SELECT manga_id, metadata_hash + FROM manga_sources + WHERE source_id = $1 AND source_manga_key = $2 + "#, + ) + .bind(source_id) + .bind(&sm.source_manga_key) + .fetch_optional(&mut *tx) + .await?; + + let status_db = sm.status.as_deref().unwrap_or("ongoing"); + + // Note: `cover_image_path` is intentionally not written here. + // The repo layer doesn't know about the storage backend, so the + // caller (crawler binary) downloads the cover via the `Storage` + // trait and sets the path with `repo::manga::set_cover_image_path` + // once the bytes have landed. + let (manga_id, status) = match existing { + None => { + let (id,): (Uuid,) = sqlx::query_as( + r#" + INSERT INTO mangas (title, description, status, alt_titles) + VALUES ($1, $2, $3, $4) + RETURNING id + "#, + ) + .bind(&sm.title) + .bind(sm.summary.as_deref()) + .bind(status_db) + .bind(&sm.alternative_titles) + .fetch_one(&mut *tx) + .await?; + (id, UpsertStatus::New) + } + Some((id, prev_hash)) if prev_hash.as_deref() == Some(&sm.metadata_hash) => { + (id, UpsertStatus::Unchanged) + } + Some((id, _)) => { + sqlx::query( + r#" + UPDATE mangas + SET title = $1, + description = $2, + status = $3, + alt_titles = $4, + updated_at = NOW() + WHERE id = $5 + "#, + ) + .bind(&sm.title) + .bind(sm.summary.as_deref()) + .bind(status_db) + .bind(&sm.alternative_titles) + .bind(id) + .execute(&mut *tx) + .await?; + (id, UpsertStatus::Updated) + } + }; + + sqlx::query( + r#" + INSERT INTO manga_sources + (source_id, source_manga_key, manga_id, source_url, metadata_hash, last_seen_at, dropped_at) + VALUES ($1, $2, $3, $4, $5, NOW(), NULL) + ON CONFLICT (source_id, source_manga_key) DO UPDATE + SET source_url = EXCLUDED.source_url, + metadata_hash = EXCLUDED.metadata_hash, + last_seen_at = NOW(), + dropped_at = NULL + "#, + ) + .bind(source_id) + .bind(&sm.source_manga_key) + .bind(manga_id) + .bind(source_url) + .bind(&sm.metadata_hash) + .execute(&mut *tx) + .await?; + + sync_authors(&mut tx, manga_id, &sm.authors).await?; + sync_genres(&mut tx, manga_id, &sm.genres).await?; + sync_tags(&mut tx, manga_id, &sm.tags).await?; + + let cover_image_path: Option = + sqlx::query_scalar("SELECT cover_image_path FROM mangas WHERE id = $1") + .bind(manga_id) + .fetch_one(&mut *tx) + .await?; + + tx.commit().await?; + Ok(UpsertedManga { + manga_id, + status, + cover_image_path, + }) +} + +async fn sync_authors( + tx: &mut Transaction<'_, Postgres>, + manga_id: Uuid, + authors: &[String], +) -> sqlx::Result<()> { + sqlx::query("DELETE FROM manga_authors WHERE manga_id = $1") + .bind(manga_id) + .execute(&mut **tx) + .await?; + for (i, name) in authors.iter().enumerate() { + let trimmed = name.trim(); + if trimmed.is_empty() { + continue; + } + // Self-update on conflict so the row id is always returned — + // can't use DO NOTHING because that suppresses RETURNING. + let (author_id,): (Uuid,) = sqlx::query_as( + r#" + INSERT INTO authors (name) VALUES ($1) + ON CONFLICT (lower(name)) DO UPDATE SET name = authors.name + RETURNING id + "#, + ) + .bind(trimmed) + .fetch_one(&mut **tx) + .await?; + sqlx::query( + r#" + INSERT INTO manga_authors (manga_id, author_id, position) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING + "#, + ) + .bind(manga_id) + .bind(author_id) + .bind(i as i32) + .execute(&mut **tx) + .await?; + } + Ok(()) +} + +async fn sync_genres( + tx: &mut Transaction<'_, Postgres>, + manga_id: Uuid, + genres: &[String], +) -> sqlx::Result<()> { + sqlx::query("DELETE FROM manga_genres WHERE manga_id = $1") + .bind(manga_id) + .execute(&mut **tx) + .await?; + for name in genres { + let trimmed = name.trim(); + if trimmed.is_empty() { + continue; + } + // Case-insensitive lookup so a source-supplied "action" + // attaches to the seeded "Action" rather than creating a + // second row. + let existing: Option<(Uuid,)> = + sqlx::query_as("SELECT id FROM genres WHERE lower(name) = lower($1)") + .bind(trimmed) + .fetch_optional(&mut **tx) + .await?; + let genre_id = match existing { + Some((id,)) => id, + None => { + let (id,): (Uuid,) = sqlx::query_as( + r#" + INSERT INTO genres (name) VALUES ($1) + ON CONFLICT (name) DO UPDATE SET name = genres.name + RETURNING id + "#, + ) + .bind(trimmed) + .fetch_one(&mut **tx) + .await?; + tracing::info!(genre = trimmed, "added new genre from source"); + id + } + }; + sqlx::query( + "INSERT INTO manga_genres (manga_id, genre_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", + ) + .bind(manga_id) + .bind(genre_id) + .execute(&mut **tx) + .await?; + } + Ok(()) +} + +async fn sync_tags( + tx: &mut Transaction<'_, Postgres>, + manga_id: Uuid, + tags: &[String], +) -> sqlx::Result<()> { + sqlx::query("DELETE FROM manga_tags WHERE manga_id = $1") + .bind(manga_id) + .execute(&mut **tx) + .await?; + for name in tags { + let trimmed = name.trim(); + if trimmed.is_empty() { + continue; + } + let (tag_id,): (Uuid,) = sqlx::query_as( + r#" + INSERT INTO tags (name) VALUES ($1) + ON CONFLICT (lower(name)) DO UPDATE SET name = tags.name + RETURNING id + "#, + ) + .bind(trimmed) + .fetch_one(&mut **tx) + .await?; + sqlx::query( + r#" + INSERT INTO manga_tags (manga_id, tag_id, added_by) + VALUES ($1, $2, NULL) + ON CONFLICT DO NOTHING + "#, + ) + .bind(manga_id) + .bind(tag_id) + .execute(&mut **tx) + .await?; + } + Ok(()) +} + +pub async fn sync_manga_chapters( + pool: &PgPool, + source_id: &str, + manga_id: Uuid, + chapters: &[SourceChapterRef], +) -> sqlx::Result { + let mut tx = pool.begin().await?; + let mut diff = ChapterDiff::default(); + let seen_keys: Vec = chapters + .iter() + .map(|c| c.source_chapter_key.clone()) + .collect(); + + for c in chapters { + let existing: Option<(Uuid,)> = sqlx::query_as( + "SELECT chapter_id FROM chapter_sources WHERE source_id = $1 AND source_chapter_key = $2", + ) + .bind(source_id) + .bind(&c.source_chapter_key) + .fetch_optional(&mut *tx) + .await?; + + match existing { + None => { + // New chapter row. The (manga_id, number) unique + // constraint protects against re-inserts if the same + // number arrives via a different source_chapter_key. + let (chapter_id,): (Uuid,) = sqlx::query_as( + r#" + INSERT INTO chapters (manga_id, number, title, page_count) + VALUES ($1, $2, $3, 0) + ON CONFLICT (manga_id, number) DO UPDATE + SET title = EXCLUDED.title + RETURNING id + "#, + ) + .bind(manga_id) + .bind(c.number) + .bind(c.title.as_deref()) + .fetch_one(&mut *tx) + .await?; + sqlx::query( + r#" + INSERT INTO chapter_sources + (source_id, source_chapter_key, chapter_id, source_url, last_seen_at, dropped_at) + VALUES ($1, $2, $3, $4, NOW(), NULL) + "#, + ) + .bind(source_id) + .bind(&c.source_chapter_key) + .bind(chapter_id) + .bind(&c.url) + .execute(&mut *tx) + .await?; + diff.new += 1; + } + Some((chapter_id,)) => { + sqlx::query("UPDATE chapters SET title = $1 WHERE id = $2") + .bind(c.title.as_deref()) + .bind(chapter_id) + .execute(&mut *tx) + .await?; + sqlx::query( + r#" + UPDATE chapter_sources + SET source_url = $1, last_seen_at = NOW(), dropped_at = NULL + WHERE source_id = $2 AND source_chapter_key = $3 + "#, + ) + .bind(&c.url) + .bind(source_id) + .bind(&c.source_chapter_key) + .execute(&mut *tx) + .await?; + diff.refreshed += 1; + } + } + } + + // Soft-drop any chapter previously seen from this source for this + // manga that's not in the current list. + let result = sqlx::query( + r#" + UPDATE chapter_sources cs + SET dropped_at = NOW() + FROM chapters ch + WHERE cs.chapter_id = ch.id + AND ch.manga_id = $1 + AND cs.source_id = $2 + AND cs.dropped_at IS NULL + AND NOT (cs.source_chapter_key = ANY($3)) + "#, + ) + .bind(manga_id) + .bind(source_id) + .bind(&seen_keys) + .execute(&mut *tx) + .await?; + diff.dropped = result.rows_affected() as usize; + + tx.commit().await?; + Ok(diff) +} + +pub async fn mark_dropped_mangas( + pool: &PgPool, + source_id: &str, + run_started_at: DateTime, +) -> sqlx::Result { + let res = sqlx::query( + r#" + UPDATE manga_sources + SET dropped_at = NOW() + WHERE source_id = $1 + AND last_seen_at < $2 + AND dropped_at IS NULL + "#, + ) + .bind(source_id) + .bind(run_started_at) + .execute(pool) + .await?; + Ok(res.rows_affected()) +} diff --git a/backend/src/repo/mod.rs b/backend/src/repo/mod.rs index 450c2f3..d179484 100644 --- a/backend/src/repo/mod.rs +++ b/backend/src/repo/mod.rs @@ -3,6 +3,7 @@ pub mod author; pub mod bookmark; pub mod chapter; pub mod collection; +pub mod crawler; pub mod genre; pub mod manga; pub mod page; diff --git a/backend/tests/crawler_sync.rs b/backend/tests/crawler_sync.rs new file mode 100644 index 0000000..6981a24 --- /dev/null +++ b/backend/tests/crawler_sync.rs @@ -0,0 +1,397 @@ +//! Integration tests for `repo::crawler`. +//! +//! Each test runs against a fresh, migrated DB via `#[sqlx::test]`. +//! `DATABASE_URL` must point to a Postgres where the test user can +//! `CREATEDB`. + +use mangalord::crawler::source::{SourceChapterRef, SourceManga}; +use mangalord::repo::crawler::{self, ChapterDiff, UpsertStatus}; +use sqlx::PgPool; +use uuid::Uuid; + +/// Helper to spin up a `SourceManga` fixture with a stable shape so +/// each test can tweak just the fields it cares about. +fn sample_manga(key: &str, title: &str, hash: &str) -> SourceManga { + SourceManga { + source_manga_key: key.to_string(), + title: title.to_string(), + alternative_titles: vec!["Alt 1".into()], + authors: vec!["Author One".into()], + // Action is in the seeded `genres` table; Fantasy is too. + genres: vec!["Action".into(), "Fantasy".into()], + tags: vec!["popular".into()], + status: Some("ongoing".into()), + summary: Some("Sample summary.".into()), + cover_url: Some("/cover.jpg".into()), + chapters: vec![], + metadata_hash: hash.to_string(), + } +} + +#[sqlx::test(migrations = "./migrations")] +async fn ensure_source_is_idempotent(pool: PgPool) { + crawler::ensure_source(&pool, "target", "Target Site", "https://x.example") + .await + .unwrap(); + crawler::ensure_source(&pool, "target", "Target Site v2", "https://x.example") + .await + .unwrap(); + let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM sources WHERE id = 'target'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(count.0, 1); + let name: (String,) = sqlx::query_as("SELECT name FROM sources WHERE id = 'target'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(name.0, "Target Site v2", "name updates on re-call"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn first_upsert_inserts_manga_and_links_metadata(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo Manga", "hash-1"); + + let res = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + assert_eq!(res.status, UpsertStatus::New); + + // mangas row created + let row: (String, String, Vec) = + sqlx::query_as("SELECT title, status, alt_titles FROM mangas WHERE id = $1") + .bind(res.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row.0, "Foo Manga"); + assert_eq!(row.1, "ongoing"); + assert_eq!(row.2, vec!["Alt 1"]); + + // manga_sources row links the two + let link: (String, Uuid, Option) = sqlx::query_as( + "SELECT source_id, manga_id, metadata_hash FROM manga_sources WHERE source_manga_key = $1", + ) + .bind("foo") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(link.0, "target"); + assert_eq!(link.1, res.manga_id); + assert_eq!(link.2.as_deref(), Some("hash-1")); + + // Authors, genres, tags M2M populated + let n_authors: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM manga_authors WHERE manga_id = $1") + .bind(res.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(n_authors.0, 1); + let n_genres: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM manga_genres WHERE manga_id = $1") + .bind(res.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(n_genres.0, 2, "Action + Fantasy"); + let n_tags: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM manga_tags WHERE manga_id = $1") + .bind(res.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(n_tags.0, 1); +} + +#[sqlx::test(migrations = "./migrations")] +async fn second_upsert_with_same_hash_reports_unchanged(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo Manga", "hash-1"); + let first = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + let second = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + assert_eq!(second.status, UpsertStatus::Unchanged); + assert_eq!(second.manga_id, first.manga_id); +} + +#[sqlx::test(migrations = "./migrations")] +async fn upsert_with_changed_hash_updates_fields(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let mut m = sample_manga("foo", "Foo Manga", "hash-1"); + let first = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + + m.title = "Foo Manga (Revised)".into(); + m.status = Some("completed".into()); + m.metadata_hash = "hash-2".into(); + let second = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + + assert_eq!(second.status, UpsertStatus::Updated); + assert_eq!(second.manga_id, first.manga_id); + + let row: (String, String) = + sqlx::query_as("SELECT title, status FROM mangas WHERE id = $1") + .bind(first.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row.0, "Foo Manga (Revised)"); + assert_eq!(row.1, "completed"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn sync_chapters_adds_new_refreshes_existing_and_drops_vanished(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo Manga", "hash-1"); + let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + + let initial = vec![ + SourceChapterRef { + source_chapter_key: "1".into(), + number: 1, + title: Some("Ch.1".into()), + url: "https://x.example/foo/1".into(), + }, + SourceChapterRef { + source_chapter_key: "2".into(), + number: 2, + title: Some("Ch.2".into()), + url: "https://x.example/foo/2".into(), + }, + ]; + let diff = crawler::sync_manga_chapters(&pool, "target", up.manga_id, &initial) + .await + .unwrap(); + assert_eq!( + diff, + ChapterDiff { + new: 2, + refreshed: 0, + dropped: 0 + } + ); + + // Second run: keep ch1, replace ch2 with ch3 — ch2 should be dropped. + let second = vec![ + SourceChapterRef { + source_chapter_key: "1".into(), + number: 1, + title: Some("Ch.1 (renamed)".into()), + url: "https://x.example/foo/1".into(), + }, + SourceChapterRef { + source_chapter_key: "3".into(), + number: 3, + title: Some("Ch.3".into()), + url: "https://x.example/foo/3".into(), + }, + ]; + let diff = crawler::sync_manga_chapters(&pool, "target", up.manga_id, &second) + .await + .unwrap(); + assert_eq!( + diff, + ChapterDiff { + new: 1, + refreshed: 1, + dropped: 1 + } + ); + + // Renamed title propagated to chapters.title + let title: (Option,) = + sqlx::query_as("SELECT c.title FROM chapters c JOIN chapter_sources cs ON cs.chapter_id = c.id WHERE cs.source_chapter_key = '1'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(title.0.as_deref(), Some("Ch.1 (renamed)")); + + // Vanished chapter is soft-dropped (row still exists, dropped_at set). + let dropped: (Option>,) = + sqlx::query_as("SELECT dropped_at FROM chapter_sources WHERE source_chapter_key = '2'") + .fetch_one(&pool) + .await + .unwrap(); + assert!(dropped.0.is_some(), "ch2 should be soft-dropped"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn mark_dropped_mangas_only_drops_unseen(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + // Seed two mangas before "now" so a later run_started_at sees them as stale. + let _ = crawler::upsert_manga_from_source( + &pool, + "target", + "https://x.example/foo", + &sample_manga("foo", "Foo", "hf"), + ) + .await + .unwrap(); + let _ = crawler::upsert_manga_from_source( + &pool, + "target", + "https://x.example/bar", + &sample_manga("bar", "Bar", "hb"), + ) + .await + .unwrap(); + + // Now mark a new "run" beginning. Re-upsert only `foo` — `bar` + // should be the one flagged dropped. + let run_started = chrono::Utc::now(); + // Sleep briefly so the second upsert's NOW() > run_started_at. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = crawler::upsert_manga_from_source( + &pool, + "target", + "https://x.example/foo", + &sample_manga("foo", "Foo", "hf"), + ) + .await + .unwrap(); + + let n = crawler::mark_dropped_mangas(&pool, "target", run_started) + .await + .unwrap(); + assert_eq!(n, 1, "only bar should have been dropped"); + + let foo_dropped: (Option>,) = + sqlx::query_as("SELECT dropped_at FROM manga_sources WHERE source_manga_key = 'foo'") + .fetch_one(&pool) + .await + .unwrap(); + assert!(foo_dropped.0.is_none(), "foo seen this run, must not be dropped"); + let bar_dropped: (Option>,) = + sqlx::query_as("SELECT dropped_at FROM manga_sources WHERE source_manga_key = 'bar'") + .fetch_one(&pool) + .await + .unwrap(); + assert!(bar_dropped.0.is_some()); +} + +#[sqlx::test(migrations = "./migrations")] +async fn upsert_surfaces_cover_image_path_for_backfill_decisions(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo", "h1"); + + // First upsert: row is brand new, no cover stored yet. + let first = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + assert!(first.cover_image_path.is_none(), "new manga has no cover yet"); + + // Simulate cover landing in storage post-upsert. + sqlx::query("UPDATE mangas SET cover_image_path = $1 WHERE id = $2") + .bind("mangas/foo/cover.jpg") + .bind(first.manga_id) + .execute(&pool) + .await + .unwrap(); + + // Second upsert with same hash → Unchanged, but cover path is now + // surfaced so the caller knows the backfill is done. + let second = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + assert_eq!(second.status, UpsertStatus::Unchanged); + assert_eq!( + second.cover_image_path.as_deref(), + Some("mangas/foo/cover.jpg") + ); +} + +#[sqlx::test(migrations = "./migrations")] +async fn arbitrary_genres_from_source_get_inserted(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let mut m = sample_manga("foo", "Foo", "h"); + // "Action" is seeded by migration 0009. "Webtoons" is not. + m.genres = vec!["Action".into(), "Webtoons".into()]; + + let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + + let n_genre_links: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM manga_genres WHERE manga_id = $1") + .bind(up.manga_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(n_genre_links.0, 2, "both seeded and source-added genres attach"); + + let webtoons: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM genres WHERE name = 'Webtoons'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(webtoons.0, 1, "non-seeded genre was inserted"); + + // Case-insensitive de-dup: a second sync with the genre re-cased + // attaches the existing row, not a new one. + let mut m2 = sample_manga("bar", "Bar", "h2"); + m2.genres = vec!["webtoons".into()]; + let _ = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/bar", &m2) + .await + .unwrap(); + let webtoons_count: (i64,) = + sqlx::query_as("SELECT COUNT(*) FROM genres WHERE lower(name) = 'webtoons'") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(webtoons_count.0, 1, "case-insensitive lookup reuses the existing row"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn re_appearing_manga_clears_dropped_at(pool: PgPool) { + crawler::ensure_source(&pool, "target", "T", "https://x.example") + .await + .unwrap(); + let m = sample_manga("foo", "Foo", "h1"); + let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + + // Drop it manually. + sqlx::query( + "UPDATE manga_sources SET dropped_at = NOW() WHERE source_manga_key = 'foo'", + ) + .execute(&pool) + .await + .unwrap(); + + // Re-upsert: the link should un-drop. + let _ = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m) + .await + .unwrap(); + let dropped: (Option>, Uuid) = sqlx::query_as( + "SELECT dropped_at, manga_id FROM manga_sources WHERE source_manga_key = 'foo'", + ) + .fetch_one(&pool) + .await + .unwrap(); + assert!(dropped.0.is_none()); + assert_eq!(dropped.1, up.manga_id); +} diff --git a/frontend/package-lock.json b/frontend/package-lock.json index e5245a0..044e890 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "mangalord-frontend", - "version": "0.12.0", + "version": "0.23.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "mangalord-frontend", - "version": "0.12.0", + "version": "0.23.0", "devDependencies": { "@lucide/svelte": "^1.16.0", "@playwright/test": "^1.48.0", @@ -169,7 +169,6 @@ } ], "license": "MIT", - "peer": true, "engines": { "node": ">=18" }, @@ -193,7 +192,6 @@ } ], "license": "MIT", - "peer": true, "engines": { "node": ">=18" } @@ -1157,7 +1155,6 @@ "integrity": "sha512-mQjlkNo+rJvpln7V2IGY2j99BqhcFbS4UN0AQNKNYfhBAFZTuCDAdW3a1sgf330mvtNvsBXn3HpAhcmvdJTcIQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@standard-schema/spec": "^1.0.0", "@sveltejs/acorn-typescript": "^1.0.5", @@ -1200,7 +1197,6 @@ "integrity": "sha512-0ba1RQ/PHen5FGpdSrW7Y3fAMQjrXantECALeOiOdBdzR5+5vPP6HVZRLmZaQL+W8m++o+haIAKq5qT+MiZ7VA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@sveltejs/vite-plugin-svelte-inspector": "^3.0.0-next.0||^3.0.0", "debug": "^4.3.7", @@ -1359,7 +1355,6 @@ "integrity": "sha512-dyh/xO2Fh5bYrfWaaqGrRQQGkNdmYw6AmaAUvYeUMNTWQtvb796ikLdmTchRmOlOiIJ1TDXfWgVx1QkUlQ6Hew==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -1507,7 +1502,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2249,7 +2243,6 @@ "integrity": "sha512-8i7LzZj7BF8uplX+ZyOlIz86V6TAsSs+np6m1kpW9u0JWi4z/1t+FzcK1aek+ybTnAC4KhBL4uXCNT0wcUIeCw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "cssstyle": "^4.1.0", "data-urls": "^5.0.0", @@ -2638,7 +2631,6 @@ "integrity": "sha512-WHeFSbZYsPu3+bLoNRUuAO+wavNlocOPf3wSHTP7hcFKVnJeWsYlCDbr3mTS14FCizf9ccIxXA8sGL8zKeQN3g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -2810,7 +2802,6 @@ "integrity": "sha512-ymI5ykLPwIHW839E053FQbI1G+jnRFJEw3Kv5Y4njixVWywQBx+NUFpkkKyk5LIb36Fg9DVXSYpqiGekLD0hyw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", @@ -2997,7 +2988,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -3019,7 +3009,6 @@ "integrity": "sha512-o5a9xKjbtuhY6Bi5S3+HvbRERmouabWbyUcpXXUA1u+GNUKoROi9byOJ8M0nHbHYHkYICiMlqxkg1KkYmm25Sw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.21.3", "postcss": "^8.4.43", @@ -3138,7 +3127,6 @@ "integrity": "sha512-MSmPM9REYqDGBI8439mA4mWhV5sKmDlBKWIYbA3lRb2PTHACE0mgKwA8yQ2xq9vxDTuk4iPrECBAEW2aoFXY0Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/expect": "2.1.9", "@vitest/mocker": "2.1.9", diff --git a/frontend/package.json b/frontend/package.json index e2e4e84..813cb2d 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.22.0", + "version": "0.23.0", "private": true, "type": "module", "scripts": {