feat: chapter content sync via PHPSESSID + per-host pacing (0.25.0)

After the metadata pass, the crawler now fetches per-chapter image
content for chapters belonging to bookmarked mangas. Logged-in chapter
pages render every page image at once (no per-page navigation), so the
crawler reuses the operator's browser session via a pasted PHPSESSID
cookie. Each chapter sync is a single transaction: storage puts + page
row inserts + page_count update commit together, or roll back together
on any image error so the chapter stays at page_count=0 and is retried
next run.

New crawler modules:

- `rate_limit::HostRateLimiters`: per-host buckets keyed by URL host,
  with optional per-host overrides. Replaces the single shared
  `Mutex<RateLimiter>`. Catalog and CDN no longer share a budget;
  default 1 req/s per host.
- `session`: derives `.<registrable>.<tld>` from the start URL
  (override via `CRAWLER_COOKIE_DOMAIN` for multi-part TLDs), injects
  PHPSESSID into the Chromium cookie store, probes `#avatar_menu` at
  startup to fail fast on a bad/expired cookie.
- `content`: parses `a#pic_container img:not(.loading)` with `pageN`
  id-based sorting (DOM order isn't trusted), then performs the
  atomic chapter sync.

bin/crawler additions:

- Concurrent chapter content phase via `futures_util::for_each_concurrent`
  (`CRAWLER_CHAPTER_WORKERS`, default 1). Browser is borrowed across
  workers — chromiumoxide allows concurrent `new_page` on `&self` —
  and per-host rate limit gates total RPS regardless of worker count.
- reqwest gets the `cookies` feature, a `Jar` seeded with PHPSESSID
  for the catalog domain only (CDN intentionally not given the
  cookie), and `Referer` is set on cover + chapter image fetches.
- New env knobs: `CRAWLER_PHPSESSID`, `CRAWLER_COOKIE_DOMAIN`,
  `CRAWLER_USER_AGENT`, `CRAWLER_CHAPTER_WORKERS`,
  `CRAWLER_SKIP_CHAPTER_CONTENT`, `CRAWLER_FORCE_REFETCH_CHAPTERS`,
  `CRAWLER_CDN_HOST` + `CRAWLER_CDN_RATE_MS`.
- Mid-run session-expired detection: `#avatar_menu` is re-checked on
  every chapter page nav; first failure aborts the phase with a
  cookie-refresh message.

Bookmark-driven enqueueing is sync-on-crawl-tick only: the bookmarked
chapters with `page_count = 0` are queried at the start of the
chapter-content phase. Sync-on-bookmark via an API hook is deferred
to a follow-up branch — that needs a daemon consumer of crawler_jobs,
which doesn't exist yet.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-23 00:28:36 +02:00
parent 51346227dd
commit d24e68c78d
10 changed files with 846 additions and 35 deletions

View File

@@ -0,0 +1,244 @@
//! Chapter content sync — fetch a logged-in chapter page, extract its
//! image URLs in `pageN` order, download each to storage, and atomically
//! persist a `pages` row per image plus the chapter's `page_count`.
//!
//! Only chapters belonging to a manga someone has bookmarked are
//! candidates. The crawler scans bookmarks at the start of each run and
//! enqueues unfetched chapters; the API also enqueues at bookmark-time
//! so users get instant feedback. Both feed into the same queue and
//! dedup by chapter id.
// Implementation lands in the next commits in this branch. Module is
// declared so other crates can `use crawler::content` without breaking
// builds while iteration is in progress.
use anyhow::Context;
use sqlx::PgPool;
use uuid::Uuid;
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::session;
use crate::storage::Storage;
/// Parse the chapter page DOM and return the page images in `pageN`
/// order. Filters out the loader `<img class="loading">` and any
/// `<img>` without a numeric `id="pageN"`.
pub fn parse_chapter_pages(html: &str) -> Vec<ChapterImage> {
let doc = scraper::Html::parse_document(html);
let sel = scraper::Selector::parse("a#pic_container img:not(.loading)").unwrap();
let mut pages: Vec<ChapterImage> = doc
.select(&sel)
.filter_map(|img| {
let id = img.value().id()?;
let n: i32 = id.strip_prefix("page")?.parse().ok()?;
let src = img.value().attr("src")?.trim().to_string();
if src.is_empty() {
return None;
}
Some(ChapterImage { page_number: n, url: src })
})
.collect();
pages.sort_by_key(|p| p.page_number);
pages
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChapterImage {
pub page_number: i32,
pub url: String,
}
/// Outcome of a single chapter sync — surfaced to callers for logging
/// and exit-code decisions.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncOutcome {
/// All images downloaded and stored, chapter row updated.
Fetched { pages: usize },
/// `page_count > 0` already — no-op unless force_refetch is set.
Skipped,
/// Session probe failed mid-sync (avatar selector missing on the
/// chapter page). Caller should abort the whole crawler run.
SessionExpired,
}
/// Fetch all images for one chapter and persist them atomically. On
/// any error after the first storage put, the DB transaction rolls
/// back so the chapter stays at `page_count = 0` and is retried on the
/// next run. Bytes already written to storage become orphans; a future
/// reaper sweeps them.
#[allow(clippy::too_many_arguments)]
pub async fn sync_chapter_content(
browser: &chromiumoxide::Browser,
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
chapter_id: Uuid,
manga_id: Uuid,
source_url: &str,
force_refetch: bool,
) -> anyhow::Result<SyncOutcome> {
// Skip if already fetched, unless caller explicitly forces.
if !force_refetch {
let (page_count,): (i32,) =
sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_one(db)
.await
.context("read chapter page_count")?;
if page_count > 0 {
return Ok(SyncOutcome::Skipped);
}
}
// Nav to chapter page (rate-limited per host).
rate.wait_for(source_url).await?;
let page = browser
.new_page(source_url)
.await
.with_context(|| format!("open chapter page {source_url}"))?;
page.wait_for_navigation().await.context("wait for chapter nav")?;
// Session probe: avatar present == still logged in. Missing means
// PHPSESSID expired; bail the entire crawler run.
if page.find_element("#avatar_menu").await.is_err() {
page.close().await.ok();
return Ok(SyncOutcome::SessionExpired);
}
let html = page.content().await.context("read chapter html")?;
page.close().await.ok();
let images = parse_chapter_pages(&html);
if images.is_empty() {
anyhow::bail!("no page images parsed from {source_url}");
}
// Resolve image URLs against the chapter URL (they may be relative).
let base = reqwest::Url::parse(source_url).context("parse chapter URL")?;
// Fetch every image bytes-first into memory before writing
// anything. Lets us bail the whole chapter cleanly if any image
// fails — DB stays at page_count=0, no partial rows persisted.
let mut fetched: Vec<(i32, Vec<u8>, &'static str)> = Vec::with_capacity(images.len());
for img in &images {
let url = base.join(&img.url).with_context(|| {
format!("join image URL {} onto {source_url}", img.url)
})?;
rate.wait_for(url.as_str()).await?;
let resp = http
.get(url.clone())
// Source CDNs commonly check Referer. Set it to the
// chapter page — matches what the browser would send.
.header(reqwest::header::REFERER, source_url)
.send()
.await
.with_context(|| format!("GET {url}"))?
.error_for_status()
.with_context(|| format!("non-2xx for {url}"))?;
let bytes = resp.bytes().await.context("read image body")?.to_vec();
let ext = infer::get(&bytes).map(|k| k.extension()).unwrap_or("bin");
fetched.push((img.page_number, bytes, ext));
}
// Atomic write: storage puts + page row inserts + page_count
// update, all in one transaction. If anything fails, rollback +
// the chapter is retried next run. Storage orphans the bytes; a
// reaper sweeps them later.
let mut tx = db.begin().await.context("open chapter sync tx")?;
for (page_number, bytes, ext) in &fetched {
let key = format!(
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
page_number
);
storage
.put(&key, bytes)
.await
.with_context(|| format!("put {key}"))?;
// (chapter_id, page_number) is unique — re-runs idempotent.
sqlx::query(
"INSERT INTO pages (chapter_id, page_number, storage_key, content_type)
VALUES ($1, $2, $3, $4)
ON CONFLICT (chapter_id, page_number) DO UPDATE
SET storage_key = EXCLUDED.storage_key,
content_type = EXCLUDED.content_type",
)
.bind(chapter_id)
.bind(page_number)
.bind(&key)
.bind(format!("image/{ext}"))
.execute(&mut *tx)
.await
.with_context(|| format!("insert page row {page_number}"))?;
}
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
.bind(fetched.len() as i32)
.bind(chapter_id)
.execute(&mut *tx)
.await
.context("update page_count")?;
tx.commit().await.context("commit chapter sync")?;
Ok(SyncOutcome::Fetched { pages: fetched.len() })
}
// Suppress unused-import warning for `session` until the bin/crawler
// wiring lands in this branch and uses it through this module.
#[allow(dead_code)]
fn _keep_session_in_scope() {
let _ = session::registrable_domain;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_chapter_pages_skips_loader_and_sorts_by_id() {
// Loader image, two real pages out of order, and one with no id.
let html = r#"
<html><body id="body"><a id="pic_container">
<img class="loading" src="/images/ajax-loader2.gif">
<img id="page2" class="page2" src="https://cdn/2.jpg">
<img id="page1" class="page1" src="https://cdn/1.jpg">
<img src="https://cdn/orphan.jpg">
<img id="not-a-page" src="https://cdn/not-a-page.jpg">
</a></body></html>
"#;
let pages = parse_chapter_pages(html);
assert_eq!(pages.len(), 2);
assert_eq!(pages[0].page_number, 1);
assert_eq!(pages[0].url, "https://cdn/1.jpg");
assert_eq!(pages[1].page_number, 2);
assert_eq!(pages[1].url, "https://cdn/2.jpg");
}
#[test]
fn parse_chapter_pages_drops_images_without_src() {
let html = r#"
<a id="pic_container">
<img id="page1" src="">
<img id="page2" src="https://cdn/2.jpg">
</a>
"#;
let pages = parse_chapter_pages(html);
assert_eq!(pages.len(), 1);
assert_eq!(pages[0].page_number, 2);
}
#[test]
fn parse_chapter_pages_handles_three_digit_page_ids() {
let html = r#"
<a id="pic_container">
<img id="page126" src="https://cdn/126.jpg">
<img id="page9" src="https://cdn/9.jpg">
<img id="page50" src="https://cdn/50.jpg">
</a>
"#;
let pages = parse_chapter_pages(html);
assert_eq!(
pages.iter().map(|p| p.page_number).collect::<Vec<_>>(),
vec![9, 50, 126]
);
}
}

View File

@@ -14,7 +14,9 @@
//! - [`diff`]: change detection — new / updated / dropped semantics.
pub mod browser;
pub mod content;
pub mod diff;
pub mod jobs;
pub mod rate_limit;
pub mod session;
pub mod source;

View File

@@ -1,11 +1,22 @@
//! Per-host request pacing.
//!
//! Single-token bucket: each `wait().await` either returns immediately
//! (if at least `interval` has elapsed since the last call) or sleeps
//! just enough to satisfy it. Uses `tokio::time::Instant` so tests can
//! run under `start_paused` virtual time without sleeping for real.
//! `RateLimiter` is a single-token bucket: each `wait().await` returns
//! immediately when at least `interval` has elapsed since the last call,
//! otherwise sleeps just enough to satisfy it. Uses
//! `tokio::time::Instant` so tests can run under `start_paused` virtual
//! time without sleeping for real.
//!
//! `HostRateLimiters` is the multi-host wrapper actually used by the
//! crawler — concurrent workers issuing requests to different origins
//! (catalog vs. CDN) don't contend on a shared budget; each host gets
//! its own bucket. `wait_for(url)` extracts the host, lazily creates a
//! limiter for it, and serializes only against other callers hitting
//! the same host.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
#[derive(Debug)]
@@ -33,6 +44,70 @@ impl RateLimiter {
}
}
/// Per-host rate limiter map. The outer `Mutex<HashMap>` is held only
/// during the entry-or-insert + Arc clone; the per-host `Mutex<RateLimiter>`
/// is held during the actual `wait().await`. So N workers calling
/// `wait_for(url)` on N different hosts contend nowhere except the brief
/// HashMap lookup; workers hitting the same host serialize on that
/// host's bucket.
#[derive(Debug)]
pub struct HostRateLimiters {
default_interval: Duration,
overrides: HashMap<String, Duration>,
map: Mutex<HashMap<String, Arc<Mutex<RateLimiter>>>>,
}
impl HostRateLimiters {
pub fn new(default_interval: Duration) -> Self {
Self {
default_interval,
overrides: HashMap::new(),
map: Mutex::new(HashMap::new()),
}
}
/// Set a per-host interval that overrides `default_interval`. Calls
/// after a host's limiter has been instantiated do *not* re-create
/// it — set all overrides before the first `wait_for` to that host.
pub fn with_override(mut self, host: impl Into<String>, interval: Duration) -> Self {
self.overrides.insert(host.into(), interval);
self
}
/// Block until the per-host budget allows the next request to
/// `url`'s host. Returns an error only when the URL has no host
/// (malformed input).
pub async fn wait_for(&self, url: &str) -> anyhow::Result<()> {
let host = host_of(url)
.ok_or_else(|| anyhow::anyhow!("no host in url: {url}"))?;
let limiter = {
let mut map = self.map.lock().await;
map.entry(host.clone())
.or_insert_with(|| {
let interval = self
.overrides
.get(&host)
.copied()
.unwrap_or(self.default_interval);
Arc::new(Mutex::new(RateLimiter::new(interval)))
})
.clone()
};
limiter.lock().await.wait().await;
Ok(())
}
}
/// Extract the host (no port) from a URL string. Returns `None` for
/// inputs without a `scheme://host` shape — those would never have
/// reached the network layer anyway.
fn host_of(url: &str) -> Option<String> {
let after_scheme = url.split_once("://")?.1;
let host_with_port = after_scheme.split('/').next()?;
let host = host_with_port.rsplit_once(':').map_or(host_with_port, |(h, _)| h);
(!host.is_empty()).then(|| host.to_ascii_lowercase())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -66,4 +141,44 @@ mod tests {
// Already 250ms past — no further wait needed.
assert_eq!(Instant::now() - t0, Duration::ZERO);
}
#[test]
fn host_of_parses_scheme_path_and_port() {
assert_eq!(host_of("https://Example.com/path").as_deref(), Some("example.com"));
assert_eq!(host_of("http://cdn.foo.bar/img.jpg").as_deref(), Some("cdn.foo.bar"));
assert_eq!(host_of("http://localhost:8080/x").as_deref(), Some("localhost"));
assert!(host_of("not a url").is_none());
}
#[tokio::test(start_paused = true)]
async fn host_rate_limiters_pace_per_host() {
// Two hosts at 100ms each. Two consecutive calls to the SAME
// host wait 100ms total. Two consecutive calls to DIFFERENT
// hosts both fire immediately.
let rl = HostRateLimiters::new(Duration::from_millis(100));
let t0 = Instant::now();
rl.wait_for("https://a.example/x").await.unwrap();
rl.wait_for("https://b.example/y").await.unwrap();
assert_eq!(Instant::now() - t0, Duration::ZERO, "different hosts don't contend");
let t1 = Instant::now();
rl.wait_for("https://a.example/x").await.unwrap();
assert_eq!(
Instant::now() - t1,
Duration::from_millis(100),
"second call to same host waits a full interval"
);
}
#[tokio::test(start_paused = true)]
async fn host_rate_limiters_honor_overrides() {
let rl = HostRateLimiters::new(Duration::from_millis(1000))
.with_override("fast.example", Duration::from_millis(100));
rl.wait_for("https://fast.example/a").await.unwrap();
let t0 = Instant::now();
rl.wait_for("https://fast.example/b").await.unwrap();
assert_eq!(Instant::now() - t0, Duration::from_millis(100));
}
}

View File

@@ -0,0 +1,161 @@
//! PHPSESSID injection + login probe.
//!
//! The catalog site we crawl renders chapter pages as a single multi-
//! page list only for logged-in users. We don't try to bypass the
//! login (CAPTCHA wall) — instead the operator pastes their browser's
//! `PHPSESSID` cookie into `CRAWLER_PHPSESSID` and the crawler injects
//! it into Chromium *and* reqwest before the first navigation.
//!
//! Two things the cookie alone doesn't give us:
//! 1. The cookie value is only meaningful to the *server* — we have
//! no way to predict from the value alone whether it's still valid.
//! `verify_session` does a navigation and checks for `#avatar_menu`,
//! which only renders for authenticated visitors. Bail clean at
//! startup if it's missing rather than discovering it 30 minutes
//! into a backfill.
//! 2. The reqwest client (used for cover and chapter-image downloads)
//! has its own cookie store; we seed it for the catalog host only.
//! CDN hosts are deliberately *not* given the cookie — they serve
//! image bytes by signed URLs and don't need it.
use anyhow::{anyhow, Context};
use chromiumoxide::browser::Browser;
use chromiumoxide::cdp::browser_protocol::network::CookieParam;
/// Compute the cookie domain (e.g. `.example.com`) from a start URL.
/// The leading dot makes the cookie cover every subdomain — the source
/// often redirects between `www.` and other prefixes mid-crawl, and a
/// host-only cookie would silently drop on the cross-subdomain hop.
///
/// Caveat: this takes the last two dot-labels, which is wrong for
/// multi-part TLDs (`.co.uk`, `.com.br` would resolve to `.co.uk` and
/// attach to every site on `.co.uk`). For those, the operator should
/// override via `CRAWLER_COOKIE_DOMAIN` rather than relying on this
/// function — pulling in the Public Suffix List for one knob isn't
/// worth it yet.
pub fn registrable_domain(url: &str) -> Option<String> {
let after_scheme = url.split_once("://")?.1;
let host_with_port = after_scheme.split('/').next()?;
let host = host_with_port
.rsplit_once(':')
.map_or(host_with_port, |(h, _)| h)
.to_ascii_lowercase();
if host.is_empty() {
return None;
}
let labels: Vec<&str> = host.split('.').filter(|l| !l.is_empty()).collect();
if labels.len() < 2 {
// Bare hostname (e.g. `localhost`) — return as-is, no leading
// dot. Setting `.localhost` as cookie domain is invalid.
return Some(host);
}
let registrable = &labels[labels.len() - 2..];
Some(format!(".{}", registrable.join(".")))
}
/// Inject the PHPSESSID cookie into the browser's cookie store for the
/// catalog domain. Must be called before any navigation that depends on
/// authentication; subsequent navigations include the cookie
/// automatically.
pub async fn inject_phpsessid(
browser: &Browser,
sid: &str,
cookie_domain: &str,
) -> anyhow::Result<()> {
let cookie = CookieParam {
name: "PHPSESSID".to_string(),
value: sid.to_string(),
url: None,
domain: Some(cookie_domain.to_string()),
path: Some("/".to_string()),
secure: None,
http_only: Some(true),
same_site: None,
expires: None,
priority: None,
same_party: None,
source_scheme: None,
source_port: None,
partition_key: None,
};
browser
.set_cookies(vec![cookie])
.await
.context("set PHPSESSID in chromium cookie store")?;
tracing::info!(domain = cookie_domain, "injected PHPSESSID into browser");
Ok(())
}
/// Navigate to `probe_url` and confirm the logged-in `#avatar_menu`
/// element is present. The selector only renders for authenticated
/// visitors, so its absence is the unambiguous signal that PHPSESSID
/// is missing, expired, or revoked.
///
/// This burns one navigation against the catalog's rate limiter. The
/// trade is worth it — failing here costs ~1s; failing 30 minutes into
/// a backfill costs 30 minutes.
pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> {
let page = browser
.new_page(probe_url)
.await
.with_context(|| format!("open probe page {probe_url}"))?;
page.wait_for_navigation().await.context("wait for nav on probe")?;
// The avatar menu is rendered server-side as part of the header
// when a valid session cookie is present; absent JS is fine.
let found = page.find_element("#avatar_menu").await.is_ok();
page.close().await.ok();
if found {
tracing::info!("session probe ok — #avatar_menu present");
Ok(())
} else {
Err(anyhow!(
"session probe failed — #avatar_menu not present at {probe_url}; \
PHPSESSID is missing, expired, or revoked. Refresh CRAWLER_PHPSESSID \
and re-run."
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn registrable_domain_strips_subdomain() {
assert_eq!(
registrable_domain("https://www.target-site.com/manga/foo/").as_deref(),
Some(".target-site.com")
);
assert_eq!(
registrable_domain("https://m.example.org").as_deref(),
Some(".example.org")
);
}
#[test]
fn registrable_domain_keeps_two_label_host() {
assert_eq!(
registrable_domain("https://example.com/").as_deref(),
Some(".example.com")
);
}
#[test]
fn registrable_domain_handles_port() {
assert_eq!(
registrable_domain("http://www.foo.bar:8080/x").as_deref(),
Some(".foo.bar")
);
}
#[test]
fn registrable_domain_bare_hostname_no_leading_dot() {
// .localhost would be invalid as a cookie Domain.
assert_eq!(registrable_domain("http://localhost:5173").as_deref(), Some("localhost"));
}
#[test]
fn registrable_domain_returns_none_for_garbage() {
assert!(registrable_domain("not a url").is_none());
}
}

View File

@@ -74,12 +74,12 @@ pub struct SourceChapter {
}
/// Context passed to every `Source` call. Carries the browser handle
/// plus a shared rate limiter so impls that issue multiple requests in
/// one call (e.g. pagination walks) honor the same per-host budget as
/// the outer job loop.
/// plus the per-host rate-limiter map so impls that issue multiple
/// requests in one call (pagination walks, multi-page chapter image
/// fetches) honor the right budget for each origin.
pub struct FetchContext<'a> {
pub browser: &'a Browser,
pub rate: &'a tokio::sync::Mutex<crate::crawler::rate_limit::RateLimiter>,
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
}
#[async_trait]

View File

@@ -149,10 +149,10 @@ fn truncate_to_cap<T>(mut buf: Vec<T>, max: Option<usize>) -> Vec<T> {
}
/// Single point of rate-limited navigation. Every Source request goes
/// through here, so the limiter is the only knob that controls
/// per-host RPS.
/// through here, so the per-host limiter map is the only knob that
/// controls per-origin RPS.
async fn navigate(ctx: &FetchContext<'_>, url: &str) -> anyhow::Result<String> {
ctx.rate.lock().await.wait().await;
ctx.rate.wait_for(url).await?;
let page = ctx.browser.new_page(url).await?;
page.wait_for_navigation().await?;
// Stopgap until we wait on a specific selector per page type —