diff --git a/backend/src/app.rs b/backend/src/app.rs index 7132674..f83e42b 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -123,6 +123,18 @@ async fn spawn_crawler_daemon( } let rate = Arc::new(rate); + let tor = crate::crawler::tor::TorController::from_parts( + cfg.tor_control_url.as_deref(), + cfg.tor_control_password.as_deref(), + cfg.tor_control_cookie_path.as_deref(), + ) + .context("build TorController from CRAWLER_TOR_CONTROL_* env")? + .map(Arc::new); + if let Some(t) = &tor { + tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM"); + } + let tor_recircuit_max = cfg.tor_recircuit_max_attempts; + // Browser manager. on_launch re-injects PHPSESSID on every fresh // chromium spawn so an idle teardown followed by re-launch stays // authenticated without operator action. @@ -135,17 +147,24 @@ async fn spawn_crawler_daemon( let sid = sid.clone(); let domain = domain.clone(); let start_url = start_url.clone(); + let tor_for_launch = tor.as_ref().map(Arc::clone); let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| { let sid = sid.clone(); let domain = domain.clone(); let start_url = start_url.clone(); + let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone); Box::pin(async move { session::inject_phpsessid(&browser, &sid, &domain) .await .context("on_launch: inject_phpsessid")?; - session::verify_session(&browser, &start_url) - .await - .context("on_launch: verify_session")?; + session::verify_session_with_recircuit( + &browser, + &start_url, + tor_for_launch.as_deref(), + tor_recircuit_max, + ) + .await + .context("on_launch: verify_session")?; Ok(()) }) }); @@ -157,17 +176,6 @@ async fn spawn_crawler_daemon( let session_expired = Arc::new(AtomicBool::new(false)); - let tor = crate::crawler::tor::TorController::from_parts( - cfg.tor_control_url.as_deref(), - cfg.tor_control_password.as_deref(), - cfg.tor_control_cookie_path.as_deref(), - ) - .context("build TorController from CRAWLER_TOR_CONTROL_* env")? - .map(Arc::new); - if let Some(t) = &tor { - tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM"); - } - let metadata_pass: Option> = cfg.start_url.as_ref().map(|url| { let m: Arc = Arc::new(RealMetadataPass { browser_manager: Arc::clone(&browser_manager), diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index f47b4f2..1a62a10 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -88,6 +88,11 @@ async fn main() -> anyhow::Result<()> { .ok() .filter(|s| !s.trim().is_empty()) .map(std::path::PathBuf::from); + let tor_recircuit_max_attempts: u32 = std::env::var("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3) + .max(1); let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false); let db = PgPoolOptions::new() @@ -154,35 +159,6 @@ async fn main() -> anyhow::Result<()> { "starting crawler" ); - // BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium - // alive for the entire run — same lifecycle as the old direct - // `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the - // session probe; bad cookies fail fast before any real work happens. - let on_launch: browser_manager::OnLaunch = match (&phpsessid, &cookie_domain) { - (Some(sid), Some(domain)) => { - let sid = sid.clone(); - let domain = domain.clone(); - let start_url_clone = start_url.clone(); - Arc::new(move |browser| { - let sid = sid.clone(); - let domain = domain.clone(); - let start_url = start_url_clone.clone(); - Box::pin(async move { - session::inject_phpsessid(&browser, &sid, &domain) - .await - .context("inject_phpsessid")?; - session::verify_session(&browser, &start_url) - .await - .context("verify_session")?; - Ok(()) - }) - }) - } - _ => browser_manager::noop_on_launch(), - }; - let session_ready = phpsessid.is_some() && cookie_domain.is_some(); - let manager = BrowserManager::new(options, Duration::ZERO, on_launch); - let tor = mangalord::crawler::tor::TorController::from_parts( tor_control_url.as_deref(), tor_control_password.as_deref(), @@ -194,6 +170,42 @@ async fn main() -> anyhow::Result<()> { tracing::info!(?t, "TOR control configured"); } + // BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium + // alive for the entire run — same lifecycle as the old direct + // `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the + // session probe; bad cookies fail fast before any real work happens. + let on_launch: browser_manager::OnLaunch = match (&phpsessid, &cookie_domain) { + (Some(sid), Some(domain)) => { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url_clone = start_url.clone(); + let tor_for_launch = tor.as_ref().map(Arc::clone); + Arc::new(move |browser| { + let sid = sid.clone(); + let domain = domain.clone(); + let start_url = start_url_clone.clone(); + let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone); + Box::pin(async move { + session::inject_phpsessid(&browser, &sid, &domain) + .await + .context("inject_phpsessid")?; + session::verify_session_with_recircuit( + &browser, + &start_url, + tor_for_launch.as_deref(), + tor_recircuit_max_attempts, + ) + .await + .context("verify_session")?; + Ok(()) + }) + }) + } + _ => browser_manager::noop_on_launch(), + }; + let session_ready = phpsessid.is_some() && cookie_domain.is_some(); + let manager = BrowserManager::new(options, Duration::ZERO, on_launch); + let result = run( Arc::clone(&manager), &db, diff --git a/backend/src/crawler/content.rs b/backend/src/crawler/content.rs index 7cdb0f8..4406ce9 100644 --- a/backend/src/crawler/content.rs +++ b/backend/src/crawler/content.rs @@ -73,40 +73,35 @@ pub enum SyncOutcome { SessionExpired, } -/// Fetch all images for one chapter and persist them atomically. On -/// any error after the first storage put, the DB transaction rolls -/// back so the chapter stays at `page_count = 0` and is retried on the -/// next run. Bytes already written to storage become orphans; a future -/// reaper sweeps them. -#[allow(clippy::too_many_arguments)] -pub async fn sync_chapter_content( - browser: &chromiumoxide::Browser, - db: &PgPool, - storage: &dyn Storage, - http: &reqwest::Client, - rate: &HostRateLimiters, - chapter_id: Uuid, - manga_id: Uuid, - source_url: &str, - force_refetch: bool, - allowlist: &DownloadAllowlist, - max_image_bytes: usize, - _tor: Option<&crate::crawler::tor::TorController>, -) -> anyhow::Result { - // Skip if already fetched, unless caller explicitly forces. - if !force_refetch { - let (page_count,): (i32,) = - sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1") - .bind(chapter_id) - .fetch_one(db) - .await - .context("read chapter page_count")?; - if page_count > 0 { - return Ok(SyncOutcome::Skipped); - } - } +/// Per-chapter recircuit budget for both transient pages and +/// `Unauthenticated` outcomes. When TOR is not configured the budget +/// is effectively 0 (no recircuit attempted; original behavior). +const CHAPTER_RECIRCUIT_MAX_ATTEMPTS: u32 = 3; - // Nav to chapter page (rate-limited per host). +/// Outcome of [`fetch_chapter_html_with_recircuit`]. `Ok` carries the +/// final reader HTML; the other two map to `sync_chapter_content`'s +/// existing failure modes. +#[derive(Debug)] +enum ChapterFetchOutcome { + Ok(String), + /// `ChapterProbe::Unauthenticated` after exhausting recircuit + /// budget (or with budget=0). Caller returns + /// `SyncOutcome::SessionExpired`. + SessionExpired, + /// `ChapterProbe::Transient` after exhausting recircuit budget + /// (or with budget=0). Caller bails so the dispatcher does + /// exponential backoff. + PersistentTransient, +} + +/// Single rate-limited Chromium navigation to the chapter URL, +/// returning the page HTML. Extracted from `sync_chapter_content` so +/// the recircuit loop can call it once per attempt. +async fn fetch_chapter_html_once( + browser: &chromiumoxide::Browser, + rate: &HostRateLimiters, + source_url: &str, +) -> anyhow::Result { rate.wait_for(source_url).await?; let page = browser .new_page(source_url) @@ -125,28 +120,128 @@ pub async fn sync_chapter_content( crate::crawler::nav::SELECTOR_TIMEOUT, ) .await; - let html = page.content().await.context("read chapter html")?; page.close().await.ok(); + Ok(html) +} - // Three-way session classification: distinguishes a transient - // hiccup (broken-page body or logged-in-but-no-reader) from a - // genuine PHPSESSID expiry (no reader and no avatar widget). The - // earlier binary `#avatar_menu` check conflated both and froze - // every worker on a layout shift. - match session::classify_chapter_probe(&html) { - ChapterProbe::Unauthenticated => return Ok(SyncOutcome::SessionExpired), - ChapterProbe::Transient => { +/// Pure-over-IO loop: fetch + classify, with up to `recircuit_budget` +/// NEWNYM-and-retry cycles after a `Transient` or `Unauthenticated` +/// outcome. `recircuit_budget = 0` collapses to the original +/// single-shot behavior — `Unauthenticated` → `SessionExpired`, +/// `Transient` → `PersistentTransient` on the first hit, no recircuit. +async fn fetch_chapter_html_with_recircuit( + mut fetch: F, + mut recircuit: R, + recircuit_budget: u32, + source_url_for_msg: &str, +) -> anyhow::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + R: FnMut() -> RFut, + RFut: std::future::Future, +{ + let mut recircuits = 0u32; + loop { + let html = fetch().await?; + match session::classify_chapter_probe(&html) { + ChapterProbe::Ok => return Ok(ChapterFetchOutcome::Ok(html)), + ChapterProbe::Unauthenticated => { + if recircuits < recircuit_budget { + recircuits += 1; + tracing::warn!( + attempt = recircuits, + max = recircuit_budget, + url = source_url_for_msg, + "chapter probe Unauthenticated; signaling TOR NEWNYM and retrying" + ); + recircuit().await; + continue; + } + return Ok(ChapterFetchOutcome::SessionExpired); + } + ChapterProbe::Transient => { + if recircuits < recircuit_budget { + recircuits += 1; + tracing::warn!( + attempt = recircuits, + max = recircuit_budget, + url = source_url_for_msg, + "chapter probe Transient; signaling TOR NEWNYM and retrying" + ); + recircuit().await; + continue; + } + return Ok(ChapterFetchOutcome::PersistentTransient); + } + } + } +} + +/// Fetch all images for one chapter and persist them atomically. On +/// any error after the first storage put, the DB transaction rolls +/// back so the chapter stays at `page_count = 0` and is retried on the +/// next run. Bytes already written to storage become orphans; a future +/// reaper sweeps them. +#[allow(clippy::too_many_arguments)] +pub async fn sync_chapter_content( + browser: &chromiumoxide::Browser, + db: &PgPool, + storage: &dyn Storage, + http: &reqwest::Client, + rate: &HostRateLimiters, + chapter_id: Uuid, + manga_id: Uuid, + source_url: &str, + force_refetch: bool, + allowlist: &DownloadAllowlist, + max_image_bytes: usize, + tor: Option<&crate::crawler::tor::TorController>, +) -> anyhow::Result { + // Skip if already fetched, unless caller explicitly forces. + if !force_refetch { + let (page_count,): (i32,) = + sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1") + .bind(chapter_id) + .fetch_one(db) + .await + .context("read chapter page_count")?; + if page_count > 0 { + return Ok(SyncOutcome::Skipped); + } + } + + // Fetch + classify with a recircuit budget when TOR is configured. + // Without TOR the closure-recircuit is a no-op and the loop reduces + // to the original single-attempt behavior. + let recircuit_budget = if tor.is_some() { CHAPTER_RECIRCUIT_MAX_ATTEMPTS } else { 0 }; + let html = match fetch_chapter_html_with_recircuit( + || fetch_chapter_html_once(browser, rate, source_url), + || async { + if let Some(t) = tor { + if let Err(e) = t.new_identity().await { + tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit"); + } + } + }, + recircuit_budget, + source_url, + ) + .await? + { + ChapterFetchOutcome::Ok(html) => html, + ChapterFetchOutcome::SessionExpired => return Ok(SyncOutcome::SessionExpired), + ChapterFetchOutcome::PersistentTransient => { // Surface as a typed Err so the dispatcher path runs // ack_failed with exponential backoff (rather than the // session-expired sticky flag). anyhow::bail!( - "chapter page at {source_url} returned a transient response \ - (broken-page body or reader didn't render); will retry" + "chapter page at {source_url} returned a transient response after \ + {recircuit_budget} TOR recircuit(s); will retry" ); } - ChapterProbe::Ok => {} - } + }; let images = parse_chapter_pages(&html) .with_context(|| format!("parse chapter pages at {source_url}"))?; @@ -305,4 +400,181 @@ mod tests { let err = parse_chapter_pages(html).expect_err("expected Transient"); assert!(err.is_transient(), "got non-transient: {err}"); } + + // --- fetch_chapter_html_with_recircuit ------------------------------- + + const OK_HTML: &str = r#""#; + const UNAUTH_HTML: &str = r#"
please log in
"#; + const TRANSIENT_HTML: &str = "

we're sorry, the request file are not found.

"; + + #[tokio::test] + async fn recircuit_loop_ok_first_attempt() { + let mut recircuits = 0u32; + let mut fetches = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetches += 1; + async { Ok(OK_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 3, + "https://example/c", + ) + .await + .expect("ok"); + assert!(matches!(outcome, ChapterFetchOutcome::Ok(_))); + assert_eq!(fetches, 1); + assert_eq!(recircuits, 0); + } + + #[tokio::test] + async fn recircuit_loop_unauth_with_zero_budget_returns_session_expired() { + let mut recircuits = 0u32; + let mut fetches = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetches += 1; + async { Ok(UNAUTH_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 0, + "https://example/c", + ) + .await + .expect("ok-result"); + assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired)); + assert_eq!(fetches, 1); + assert_eq!(recircuits, 0, "no recircuit when budget is 0 (TOR disabled)"); + } + + #[tokio::test] + async fn recircuit_loop_unauth_then_ok_within_budget() { + let mut recircuits = 0u32; + let mut fetch_n = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetch_n += 1; + let n = fetch_n; + async move { + if n == 1 { + Ok(UNAUTH_HTML.to_string()) + } else { + Ok(OK_HTML.to_string()) + } + } + }, + || { + recircuits += 1; + async {} + }, + 3, + "https://example/c", + ) + .await + .expect("ok"); + assert!(matches!(outcome, ChapterFetchOutcome::Ok(_))); + assert_eq!(fetch_n, 2); + assert_eq!(recircuits, 1); + } + + #[tokio::test] + async fn recircuit_loop_unauth_exhausts_budget_returns_session_expired() { + let mut recircuits = 0u32; + let mut fetch_n = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetch_n += 1; + async { Ok(UNAUTH_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 2, + "https://example/c", + ) + .await + .expect("ok-result"); + assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired)); + // budget=2 → initial + 2 recircuit-and-retry = 3 fetches. + assert_eq!(fetch_n, 3); + assert_eq!(recircuits, 2); + } + + #[tokio::test] + async fn recircuit_loop_transient_then_ok_within_budget() { + let mut recircuits = 0u32; + let mut fetch_n = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetch_n += 1; + let n = fetch_n; + async move { + if n < 3 { + Ok(TRANSIENT_HTML.to_string()) + } else { + Ok(OK_HTML.to_string()) + } + } + }, + || { + recircuits += 1; + async {} + }, + 3, + "https://example/c", + ) + .await + .expect("ok"); + assert!(matches!(outcome, ChapterFetchOutcome::Ok(_))); + assert_eq!(fetch_n, 3); + assert_eq!(recircuits, 2); + } + + #[tokio::test] + async fn recircuit_loop_transient_exhausts_budget_returns_persistent() { + let mut recircuits = 0u32; + let mut fetch_n = 0u32; + let outcome = fetch_chapter_html_with_recircuit( + || { + fetch_n += 1; + async { Ok(TRANSIENT_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 3, + "https://example/c", + ) + .await + .expect("ok-result"); + assert!(matches!(outcome, ChapterFetchOutcome::PersistentTransient)); + assert_eq!(fetch_n, 4, "budget=3 → 1 initial + 3 retries"); + assert_eq!(recircuits, 3); + } + + #[tokio::test] + async fn recircuit_loop_propagates_fetch_errors() { + let mut fetch_n = 0u32; + let err = fetch_chapter_html_with_recircuit( + || { + fetch_n += 1; + async { Err(anyhow::anyhow!("nav timeout")) } + }, + || async {}, + 3, + "https://example/c", + ) + .await + .expect_err("fetch error bubbles"); + assert_eq!(fetch_n, 1); + assert!(format!("{err:#}").contains("nav timeout")); + } } diff --git a/backend/src/crawler/session.rs b/backend/src/crawler/session.rs index 060b4ae..39d3835 100644 --- a/backend/src/crawler/session.rs +++ b/backend/src/crawler/session.rs @@ -162,37 +162,117 @@ const PROBE_RETRY_DELAY: Duration = Duration::from_secs(2); /// limiter. The trade is worth it — failing here costs ~1s; failing 30 /// minutes into a backfill costs 30 minutes. pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> { - let mut attempt = 0u32; + verify_session_with_recircuit(browser, probe_url, None, 0).await +} + +/// Like [`verify_session`] but, when `tor` is `Some`, signals +/// `SIGNAL NEWNYM` between retries on transient pages AND treats +/// `Unauthenticated` as a recoverable failure (up to +/// `unauth_max_recircuit` recircuit cycles before giving up). The bare +/// `verify_session` is `verify_session_with_recircuit(..., None, 0)`. +/// +/// When `tor` is `None`, `unauth_max_recircuit` is ignored — `Unauth` +/// stays a hard fail, matching the original behavior. +pub async fn verify_session_with_recircuit( + browser: &Browser, + probe_url: &str, + tor: Option<&crate::crawler::tor::TorController>, + unauth_max_recircuit: u32, +) -> anyhow::Result<()> { + let effective_unauth_budget = if tor.is_some() { unauth_max_recircuit } else { 0 }; + run_session_probe_loop( + || fetch_probe_html(browser, probe_url), + || async { + if let Some(t) = tor { + if let Err(e) = t.new_identity().await { + tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit"); + } + } + }, + PROBE_MAX_ATTEMPTS, + effective_unauth_budget, + PROBE_RETRY_DELAY, + probe_url, + ) + .await +} + +/// Pure-over-IO loop body for the session probe. Generic over the +/// fetch and recircuit closures so it can be unit-tested without a +/// real browser or TOR daemon. +/// +/// Semantics: +/// - `SessionProbe::Ok` → return `Ok(())`. +/// - `SessionProbe::Unauthenticated` → if `unauth_max_recircuit > 0` +/// and budget remaining, call `recircuit` + sleep + retry. Otherwise +/// bail with the "PHPSESSID expired" diagnostic, mentioning the +/// recircuit count so a TOR-misconfig diagnosis is easier. +/// - `SessionProbe::Transient` → up to `transient_max_attempts` total +/// tries, calling `recircuit` between each. After the cap, bail with +/// the "site down or rate-limiting" diagnostic. +async fn run_session_probe_loop( + mut fetch_html: F, + mut recircuit: R, + transient_max_attempts: u32, + unauth_max_recircuit: u32, + retry_delay: Duration, + probe_url_for_msg: &str, +) -> anyhow::Result<()> +where + F: FnMut() -> Fut, + Fut: std::future::Future>, + R: FnMut() -> RFut, + RFut: std::future::Future, +{ + let mut transient_attempts = 0u32; + let mut unauth_recircuits = 0u32; loop { - attempt += 1; - let html = fetch_probe_html(browser, probe_url).await?; + let html = fetch_html().await?; match classify_probe(&html) { SessionProbe::Ok => { - tracing::info!(attempt, "session probe ok — #logo + #avatar_menu present"); + tracing::info!( + transient_attempts, + unauth_recircuits, + "session probe ok — #logo + #avatar_menu present" + ); return Ok(()); } SessionProbe::Unauthenticated => { + if unauth_recircuits < unauth_max_recircuit { + unauth_recircuits += 1; + tracing::warn!( + attempt = unauth_recircuits, + max = unauth_max_recircuit, + "session probe Unauthenticated despite PHPSESSID; signaling TOR \ + NEWNYM and retrying" + ); + recircuit().await; + tokio::time::sleep(retry_delay).await; + continue; + } return Err(anyhow!( - "session probe failed — #avatar_menu not present at {probe_url} \ - (page rendered the normal layout); PHPSESSID is missing, expired, \ - or revoked. Refresh CRAWLER_PHPSESSID and re-run." + "session probe failed — #avatar_menu not present at {probe_url_for_msg} \ + after {unauth_recircuits} TOR recircuit(s); PHPSESSID is missing, \ + expired, or revoked. Refresh CRAWLER_PHPSESSID and re-run." )); } - SessionProbe::Transient if attempt < PROBE_MAX_ATTEMPTS => { - tracing::warn!( - attempt, - max_attempts = PROBE_MAX_ATTEMPTS, - "session probe got a transient page; retrying" - ); - tokio::time::sleep(PROBE_RETRY_DELAY).await; - } SessionProbe::Transient => { - return Err(anyhow!( - "session probe failed — probe page at {probe_url} returned a \ - broken-page response after {PROBE_MAX_ATTEMPTS} attempts. \ - The site appears to be down or rate-limiting us; try again \ - later before refreshing CRAWLER_PHPSESSID." - )); + transient_attempts += 1; + if transient_attempts >= transient_max_attempts { + return Err(anyhow!( + "session probe failed — probe page at {probe_url_for_msg} returned \ + a broken-page response after {transient_max_attempts} attempts. \ + The site appears to be down or rate-limiting us; try again \ + later before refreshing CRAWLER_PHPSESSID." + )); + } + tracing::warn!( + attempt = transient_attempts, + max_attempts = transient_max_attempts, + "session probe got a transient page; recircuit + retry" + ); + recircuit().await; + tokio::time::sleep(retry_delay).await; } } } @@ -336,6 +416,202 @@ mod tests { assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok); } + // --- run_session_probe_loop ----------------------------------------- + // + // These tests exercise the recircuit-aware loop without a real + // browser. The fetch and recircuit closures are mocked over Vecs of + // canned outcomes / counters. + + const OK_HTML: &str = r#"
"#; + const UNAUTH_HTML: &str = r#""#; + const TRANSIENT_HTML: &str = "

we're sorry, the request file are not found.

"; + + #[tokio::test] + async fn probe_loop_ok_on_first_attempt_does_not_recircuit() { + let mut recircuits = 0u32; + let mut fetched = 0u32; + run_session_probe_loop( + || { + fetched += 1; + async { Ok(OK_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 3, + 3, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect("ok on first attempt"); + assert_eq!(fetched, 1); + assert_eq!(recircuits, 0); + } + + #[tokio::test] + async fn probe_loop_unauth_then_ok_when_recircuit_budget_available() { + let mut recircuits = 0u32; + let mut call = 0u32; + run_session_probe_loop( + || { + call += 1; + let n = call; + async move { + if n == 1 { + Ok(UNAUTH_HTML.to_string()) + } else { + Ok(OK_HTML.to_string()) + } + } + }, + || { + recircuits += 1; + async {} + }, + 3, + 3, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect("recovers after one recircuit"); + assert_eq!(call, 2); + assert_eq!(recircuits, 1); + } + + #[tokio::test] + async fn probe_loop_unauth_with_zero_recircuit_budget_fails_fast() { + let mut recircuits = 0u32; + let mut call = 0u32; + let err = run_session_probe_loop( + || { + call += 1; + async { Ok(UNAUTH_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 3, + 0, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect_err("zero budget → fail"); + assert_eq!(call, 1, "no retry when budget is 0"); + assert_eq!(recircuits, 0); + let msg = format!("{err:#}"); + assert!(msg.contains("Refresh CRAWLER_PHPSESSID"), "msg: {msg}"); + } + + #[tokio::test] + async fn probe_loop_unauth_after_exhausting_budget_emits_recircuit_count() { + let mut recircuits = 0u32; + let mut call = 0u32; + let err = run_session_probe_loop( + || { + call += 1; + async { Ok(UNAUTH_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 10, // transient budget irrelevant here + 2, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect_err("exhausts unauth budget"); + // 3 fetches total: initial + 2 recircuit-and-retry + assert_eq!(call, 3); + assert_eq!(recircuits, 2); + let msg = format!("{err:#}"); + assert!(msg.contains("2 TOR recircuit"), "expected recircuit count in error, got: {msg}"); + } + + #[tokio::test] + async fn probe_loop_transient_repeats_until_max_then_errors() { + let mut recircuits = 0u32; + let mut call = 0u32; + let err = run_session_probe_loop( + || { + call += 1; + async { Ok(TRANSIENT_HTML.to_string()) } + }, + || { + recircuits += 1; + async {} + }, + 3, + 0, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect_err("transient until max → fail"); + assert_eq!(call, 3); + // recircuit fires between attempts: 3 attempts → 2 recircuits. + assert_eq!(recircuits, 2); + let msg = format!("{err:#}"); + assert!(msg.contains("broken-page response after 3 attempts"), "msg: {msg}"); + } + + #[tokio::test] + async fn probe_loop_transient_then_ok_returns_ok_after_one_recircuit() { + let mut recircuits = 0u32; + let mut call = 0u32; + run_session_probe_loop( + || { + call += 1; + let n = call; + async move { + if n == 1 { + Ok(TRANSIENT_HTML.to_string()) + } else { + Ok(OK_HTML.to_string()) + } + } + }, + || { + recircuits += 1; + async {} + }, + 3, + 0, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect("ok on second try"); + assert_eq!(call, 2); + assert_eq!(recircuits, 1); + } + + #[tokio::test] + async fn probe_loop_propagates_fetch_errors_immediately() { + let mut call = 0u32; + let err = run_session_probe_loop( + || { + call += 1; + async { Err(anyhow!("nav timeout")) } + }, + || async {}, + 5, + 5, + Duration::from_millis(0), + "https://example/probe", + ) + .await + .expect_err("fetch error bubbles"); + assert_eq!(call, 1); + assert!(format!("{err:#}").contains("nav timeout")); + } + #[test] fn classify_probe_trusts_broken_body_over_stray_avatar_match() { // Defensive: if a broken-page body somehow contains an diff --git a/backend/src/crawler/source/target.rs b/backend/src/crawler/source/target.rs index 1be0a38..eb2e888 100644 --- a/backend/src/crawler/source/target.rs +++ b/backend/src/crawler/source/target.rs @@ -18,7 +18,7 @@ use super::{ SourceMangaRef, }; use crate::crawler::detect::{ - has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError, + has_logo_sentinel, is_broken_page_body, retry_on_transient_with_hook, PageError, }; use crate::crawler::nav::{wait_for_nav, wait_for_selector, NavError, SELECTOR_TIMEOUT}; @@ -79,12 +79,13 @@ impl Source for TargetSource { // and the HTML is handed straight to the first `next_batch` call // so the walker doesn't re-fetch it. Page count is discovered // incrementally — see `TargetSourceWalker::next_batch`. - let first_html = retry_on_transient( + let first_html = retry_on_transient_with_hook( || async { navigate(ctx, self.base_url.as_str(), LIST_PAGE_MARKER).await }, PAGE_TRANSIENT_RETRY_ATTEMPTS, PAGE_TRANSIENT_RETRY_DELAY, + || async { recircuit_if_configured(ctx.tor).await }, ) .await?; @@ -169,7 +170,7 @@ impl DiscoverWalk for TargetSourceWalker { parse_manga_list_from(&doc)? } None => { - retry_on_transient( + retry_on_transient_with_hook( || async { let html = navigate( ctx, @@ -182,12 +183,13 @@ impl DiscoverWalk for TargetSourceWalker { }, PAGE_TRANSIENT_RETRY_ATTEMPTS, PAGE_TRANSIENT_RETRY_DELAY, + || async { recircuit_if_configured(ctx.tor).await }, ) .await? } } } else { - retry_on_transient( + retry_on_transient_with_hook( || async { let url = page_url(&self.base_url, page_num); let html = navigate(ctx, &url, LIST_PAGE_MARKER).await?; @@ -196,6 +198,7 @@ impl DiscoverWalk for TargetSourceWalker { }, PAGE_TRANSIENT_RETRY_ATTEMPTS, PAGE_TRANSIENT_RETRY_DELAY, + || async { recircuit_if_configured(ctx.tor).await }, ) .await? }; @@ -274,6 +277,20 @@ fn classify_navigate_html(html: String) -> Result { Ok(html) } +/// Hook for [`retry_on_transient_with_hook`]: when TOR is configured, +/// signal `NEWNYM` so the next navigation draws a fresh exit. Errors +/// from the controller are logged and swallowed — failing to recircuit +/// shouldn't take down the crawl, the next attempt just runs on the +/// same circuit as before. +async fn recircuit_if_configured(tor: Option<&crate::crawler::tor::TorController>) { + if let Some(t) = tor { + if let Err(e) = t.new_identity().await { + tracing::warn!(error = %e, "TOR NEWNYM failed; retrying on same circuit"); + } + } +} + + /// Substitutes the first `/N/` path segment with the target page /// number. Source impls that paginate via a different URL shape can /// override this — for the modeled site the segment is always present.