feat(crawler): recircuit TOR on transient pages and unauthenticated probes
- target.rs swaps retry_on_transient → retry_on_transient_with_hook, signaling NEWNYM via ctx.tor between attempts when configured. - session.rs gains verify_session_with_recircuit; the bare verify_session is now a one-line wrapper passing tor=None, unauth_max_recircuit=0. The inner run_session_probe_loop is pure-over-IO and unit-tested with closure-based fakes. - content.rs extracts fetch_chapter_html_once + the closure-driven fetch_chapter_html_with_recircuit, used by sync_chapter_content to retry on Transient or Unauthenticated up to a recircuit_budget. Budget = 0 (no TOR) preserves original behavior bit-for-bit. - app.rs and bin/crawler.rs construct the controller before on_launch and pass it into verify_session_with_recircuit, so a transient hiccup at startup no longer requires PHPSESSID rotation. Recircuit budget defaults to CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS (3). Errors from NEWNYM are logged and swallowed — failing to recircuit should not take down the crawl. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -123,6 +123,18 @@ async fn spawn_crawler_daemon(
|
||||
}
|
||||
let rate = Arc::new(rate);
|
||||
|
||||
let tor = crate::crawler::tor::TorController::from_parts(
|
||||
cfg.tor_control_url.as_deref(),
|
||||
cfg.tor_control_password.as_deref(),
|
||||
cfg.tor_control_cookie_path.as_deref(),
|
||||
)
|
||||
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
||||
.map(Arc::new);
|
||||
if let Some(t) = &tor {
|
||||
tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM");
|
||||
}
|
||||
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
|
||||
|
||||
// Browser manager. on_launch re-injects PHPSESSID on every fresh
|
||||
// chromium spawn so an idle teardown followed by re-launch stays
|
||||
// authenticated without operator action.
|
||||
@@ -135,17 +147,24 @@ async fn spawn_crawler_daemon(
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url.clone();
|
||||
let tor_for_launch = tor.as_ref().map(Arc::clone);
|
||||
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url.clone();
|
||||
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
|
||||
Box::pin(async move {
|
||||
session::inject_phpsessid(&browser, &sid, &domain)
|
||||
.await
|
||||
.context("on_launch: inject_phpsessid")?;
|
||||
session::verify_session(&browser, &start_url)
|
||||
.await
|
||||
.context("on_launch: verify_session")?;
|
||||
session::verify_session_with_recircuit(
|
||||
&browser,
|
||||
&start_url,
|
||||
tor_for_launch.as_deref(),
|
||||
tor_recircuit_max,
|
||||
)
|
||||
.await
|
||||
.context("on_launch: verify_session")?;
|
||||
Ok(())
|
||||
})
|
||||
});
|
||||
@@ -157,17 +176,6 @@ async fn spawn_crawler_daemon(
|
||||
|
||||
let session_expired = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let tor = crate::crawler::tor::TorController::from_parts(
|
||||
cfg.tor_control_url.as_deref(),
|
||||
cfg.tor_control_password.as_deref(),
|
||||
cfg.tor_control_cookie_path.as_deref(),
|
||||
)
|
||||
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
||||
.map(Arc::new);
|
||||
if let Some(t) = &tor {
|
||||
tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM");
|
||||
}
|
||||
|
||||
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
|
||||
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
|
||||
browser_manager: Arc::clone(&browser_manager),
|
||||
|
||||
@@ -88,6 +88,11 @@ async fn main() -> anyhow::Result<()> {
|
||||
.ok()
|
||||
.filter(|s| !s.trim().is_empty())
|
||||
.map(std::path::PathBuf::from);
|
||||
let tor_recircuit_max_attempts: u32 = std::env::var("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(3)
|
||||
.max(1);
|
||||
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
|
||||
|
||||
let db = PgPoolOptions::new()
|
||||
@@ -154,35 +159,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
"starting crawler"
|
||||
);
|
||||
|
||||
// BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium
|
||||
// alive for the entire run — same lifecycle as the old direct
|
||||
// `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the
|
||||
// session probe; bad cookies fail fast before any real work happens.
|
||||
let on_launch: browser_manager::OnLaunch = match (&phpsessid, &cookie_domain) {
|
||||
(Some(sid), Some(domain)) => {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url_clone = start_url.clone();
|
||||
Arc::new(move |browser| {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url_clone.clone();
|
||||
Box::pin(async move {
|
||||
session::inject_phpsessid(&browser, &sid, &domain)
|
||||
.await
|
||||
.context("inject_phpsessid")?;
|
||||
session::verify_session(&browser, &start_url)
|
||||
.await
|
||||
.context("verify_session")?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}
|
||||
_ => browser_manager::noop_on_launch(),
|
||||
};
|
||||
let session_ready = phpsessid.is_some() && cookie_domain.is_some();
|
||||
let manager = BrowserManager::new(options, Duration::ZERO, on_launch);
|
||||
|
||||
let tor = mangalord::crawler::tor::TorController::from_parts(
|
||||
tor_control_url.as_deref(),
|
||||
tor_control_password.as_deref(),
|
||||
@@ -194,6 +170,42 @@ async fn main() -> anyhow::Result<()> {
|
||||
tracing::info!(?t, "TOR control configured");
|
||||
}
|
||||
|
||||
// BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium
|
||||
// alive for the entire run — same lifecycle as the old direct
|
||||
// `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the
|
||||
// session probe; bad cookies fail fast before any real work happens.
|
||||
let on_launch: browser_manager::OnLaunch = match (&phpsessid, &cookie_domain) {
|
||||
(Some(sid), Some(domain)) => {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url_clone = start_url.clone();
|
||||
let tor_for_launch = tor.as_ref().map(Arc::clone);
|
||||
Arc::new(move |browser| {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url_clone.clone();
|
||||
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
|
||||
Box::pin(async move {
|
||||
session::inject_phpsessid(&browser, &sid, &domain)
|
||||
.await
|
||||
.context("inject_phpsessid")?;
|
||||
session::verify_session_with_recircuit(
|
||||
&browser,
|
||||
&start_url,
|
||||
tor_for_launch.as_deref(),
|
||||
tor_recircuit_max_attempts,
|
||||
)
|
||||
.await
|
||||
.context("verify_session")?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}
|
||||
_ => browser_manager::noop_on_launch(),
|
||||
};
|
||||
let session_ready = phpsessid.is_some() && cookie_domain.is_some();
|
||||
let manager = BrowserManager::new(options, Duration::ZERO, on_launch);
|
||||
|
||||
let result = run(
|
||||
Arc::clone(&manager),
|
||||
&db,
|
||||
|
||||
@@ -73,40 +73,35 @@ pub enum SyncOutcome {
|
||||
SessionExpired,
|
||||
}
|
||||
|
||||
/// Fetch all images for one chapter and persist them atomically. On
|
||||
/// any error after the first storage put, the DB transaction rolls
|
||||
/// back so the chapter stays at `page_count = 0` and is retried on the
|
||||
/// next run. Bytes already written to storage become orphans; a future
|
||||
/// reaper sweeps them.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn sync_chapter_content(
|
||||
browser: &chromiumoxide::Browser,
|
||||
db: &PgPool,
|
||||
storage: &dyn Storage,
|
||||
http: &reqwest::Client,
|
||||
rate: &HostRateLimiters,
|
||||
chapter_id: Uuid,
|
||||
manga_id: Uuid,
|
||||
source_url: &str,
|
||||
force_refetch: bool,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
_tor: Option<&crate::crawler::tor::TorController>,
|
||||
) -> anyhow::Result<SyncOutcome> {
|
||||
// Skip if already fetched, unless caller explicitly forces.
|
||||
if !force_refetch {
|
||||
let (page_count,): (i32,) =
|
||||
sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.context("read chapter page_count")?;
|
||||
if page_count > 0 {
|
||||
return Ok(SyncOutcome::Skipped);
|
||||
}
|
||||
}
|
||||
/// Per-chapter recircuit budget for both transient pages and
|
||||
/// `Unauthenticated` outcomes. When TOR is not configured the budget
|
||||
/// is effectively 0 (no recircuit attempted; original behavior).
|
||||
const CHAPTER_RECIRCUIT_MAX_ATTEMPTS: u32 = 3;
|
||||
|
||||
// Nav to chapter page (rate-limited per host).
|
||||
/// Outcome of [`fetch_chapter_html_with_recircuit`]. `Ok` carries the
|
||||
/// final reader HTML; the other two map to `sync_chapter_content`'s
|
||||
/// existing failure modes.
|
||||
#[derive(Debug)]
|
||||
enum ChapterFetchOutcome {
|
||||
Ok(String),
|
||||
/// `ChapterProbe::Unauthenticated` after exhausting recircuit
|
||||
/// budget (or with budget=0). Caller returns
|
||||
/// `SyncOutcome::SessionExpired`.
|
||||
SessionExpired,
|
||||
/// `ChapterProbe::Transient` after exhausting recircuit budget
|
||||
/// (or with budget=0). Caller bails so the dispatcher does
|
||||
/// exponential backoff.
|
||||
PersistentTransient,
|
||||
}
|
||||
|
||||
/// Single rate-limited Chromium navigation to the chapter URL,
|
||||
/// returning the page HTML. Extracted from `sync_chapter_content` so
|
||||
/// the recircuit loop can call it once per attempt.
|
||||
async fn fetch_chapter_html_once(
|
||||
browser: &chromiumoxide::Browser,
|
||||
rate: &HostRateLimiters,
|
||||
source_url: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
rate.wait_for(source_url).await?;
|
||||
let page = browser
|
||||
.new_page(source_url)
|
||||
@@ -125,28 +120,128 @@ pub async fn sync_chapter_content(
|
||||
crate::crawler::nav::SELECTOR_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
|
||||
let html = page.content().await.context("read chapter html")?;
|
||||
page.close().await.ok();
|
||||
Ok(html)
|
||||
}
|
||||
|
||||
// Three-way session classification: distinguishes a transient
|
||||
// hiccup (broken-page body or logged-in-but-no-reader) from a
|
||||
// genuine PHPSESSID expiry (no reader and no avatar widget). The
|
||||
// earlier binary `#avatar_menu` check conflated both and froze
|
||||
// every worker on a layout shift.
|
||||
match session::classify_chapter_probe(&html) {
|
||||
ChapterProbe::Unauthenticated => return Ok(SyncOutcome::SessionExpired),
|
||||
ChapterProbe::Transient => {
|
||||
/// Pure-over-IO loop: fetch + classify, with up to `recircuit_budget`
|
||||
/// NEWNYM-and-retry cycles after a `Transient` or `Unauthenticated`
|
||||
/// outcome. `recircuit_budget = 0` collapses to the original
|
||||
/// single-shot behavior — `Unauthenticated` → `SessionExpired`,
|
||||
/// `Transient` → `PersistentTransient` on the first hit, no recircuit.
|
||||
async fn fetch_chapter_html_with_recircuit<F, Fut, R, RFut>(
|
||||
mut fetch: F,
|
||||
mut recircuit: R,
|
||||
recircuit_budget: u32,
|
||||
source_url_for_msg: &str,
|
||||
) -> anyhow::Result<ChapterFetchOutcome>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = anyhow::Result<String>>,
|
||||
R: FnMut() -> RFut,
|
||||
RFut: std::future::Future<Output = ()>,
|
||||
{
|
||||
let mut recircuits = 0u32;
|
||||
loop {
|
||||
let html = fetch().await?;
|
||||
match session::classify_chapter_probe(&html) {
|
||||
ChapterProbe::Ok => return Ok(ChapterFetchOutcome::Ok(html)),
|
||||
ChapterProbe::Unauthenticated => {
|
||||
if recircuits < recircuit_budget {
|
||||
recircuits += 1;
|
||||
tracing::warn!(
|
||||
attempt = recircuits,
|
||||
max = recircuit_budget,
|
||||
url = source_url_for_msg,
|
||||
"chapter probe Unauthenticated; signaling TOR NEWNYM and retrying"
|
||||
);
|
||||
recircuit().await;
|
||||
continue;
|
||||
}
|
||||
return Ok(ChapterFetchOutcome::SessionExpired);
|
||||
}
|
||||
ChapterProbe::Transient => {
|
||||
if recircuits < recircuit_budget {
|
||||
recircuits += 1;
|
||||
tracing::warn!(
|
||||
attempt = recircuits,
|
||||
max = recircuit_budget,
|
||||
url = source_url_for_msg,
|
||||
"chapter probe Transient; signaling TOR NEWNYM and retrying"
|
||||
);
|
||||
recircuit().await;
|
||||
continue;
|
||||
}
|
||||
return Ok(ChapterFetchOutcome::PersistentTransient);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch all images for one chapter and persist them atomically. On
|
||||
/// any error after the first storage put, the DB transaction rolls
|
||||
/// back so the chapter stays at `page_count = 0` and is retried on the
|
||||
/// next run. Bytes already written to storage become orphans; a future
|
||||
/// reaper sweeps them.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn sync_chapter_content(
|
||||
browser: &chromiumoxide::Browser,
|
||||
db: &PgPool,
|
||||
storage: &dyn Storage,
|
||||
http: &reqwest::Client,
|
||||
rate: &HostRateLimiters,
|
||||
chapter_id: Uuid,
|
||||
manga_id: Uuid,
|
||||
source_url: &str,
|
||||
force_refetch: bool,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
) -> anyhow::Result<SyncOutcome> {
|
||||
// Skip if already fetched, unless caller explicitly forces.
|
||||
if !force_refetch {
|
||||
let (page_count,): (i32,) =
|
||||
sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.context("read chapter page_count")?;
|
||||
if page_count > 0 {
|
||||
return Ok(SyncOutcome::Skipped);
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch + classify with a recircuit budget when TOR is configured.
|
||||
// Without TOR the closure-recircuit is a no-op and the loop reduces
|
||||
// to the original single-attempt behavior.
|
||||
let recircuit_budget = if tor.is_some() { CHAPTER_RECIRCUIT_MAX_ATTEMPTS } else { 0 };
|
||||
let html = match fetch_chapter_html_with_recircuit(
|
||||
|| fetch_chapter_html_once(browser, rate, source_url),
|
||||
|| async {
|
||||
if let Some(t) = tor {
|
||||
if let Err(e) = t.new_identity().await {
|
||||
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
|
||||
}
|
||||
}
|
||||
},
|
||||
recircuit_budget,
|
||||
source_url,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
ChapterFetchOutcome::Ok(html) => html,
|
||||
ChapterFetchOutcome::SessionExpired => return Ok(SyncOutcome::SessionExpired),
|
||||
ChapterFetchOutcome::PersistentTransient => {
|
||||
// Surface as a typed Err so the dispatcher path runs
|
||||
// ack_failed with exponential backoff (rather than the
|
||||
// session-expired sticky flag).
|
||||
anyhow::bail!(
|
||||
"chapter page at {source_url} returned a transient response \
|
||||
(broken-page body or reader didn't render); will retry"
|
||||
"chapter page at {source_url} returned a transient response after \
|
||||
{recircuit_budget} TOR recircuit(s); will retry"
|
||||
);
|
||||
}
|
||||
ChapterProbe::Ok => {}
|
||||
}
|
||||
};
|
||||
|
||||
let images = parse_chapter_pages(&html)
|
||||
.with_context(|| format!("parse chapter pages at {source_url}"))?;
|
||||
@@ -305,4 +400,181 @@ mod tests {
|
||||
let err = parse_chapter_pages(html).expect_err("expected Transient");
|
||||
assert!(err.is_transient(), "got non-transient: {err}");
|
||||
}
|
||||
|
||||
// --- fetch_chapter_html_with_recircuit -------------------------------
|
||||
|
||||
const OK_HTML: &str = r#"<html><body><a id="pic_container"><img id="page1" src="x"/></a></body></html>"#;
|
||||
const UNAUTH_HTML: &str = r#"<html><body><header><div id="logo">x</div></header><main>please log in</main></body></html>"#;
|
||||
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_ok_first_attempt() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetches = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetches += 1;
|
||||
async { Ok(OK_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
||||
assert_eq!(fetches, 1);
|
||||
assert_eq!(recircuits, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_unauth_with_zero_budget_returns_session_expired() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetches = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetches += 1;
|
||||
async { Ok(UNAUTH_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
0,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok-result");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
|
||||
assert_eq!(fetches, 1);
|
||||
assert_eq!(recircuits, 0, "no recircuit when budget is 0 (TOR disabled)");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_unauth_then_ok_within_budget() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetch_n = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetch_n += 1;
|
||||
let n = fetch_n;
|
||||
async move {
|
||||
if n == 1 {
|
||||
Ok(UNAUTH_HTML.to_string())
|
||||
} else {
|
||||
Ok(OK_HTML.to_string())
|
||||
}
|
||||
}
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
||||
assert_eq!(fetch_n, 2);
|
||||
assert_eq!(recircuits, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_unauth_exhausts_budget_returns_session_expired() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetch_n = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetch_n += 1;
|
||||
async { Ok(UNAUTH_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
2,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok-result");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
|
||||
// budget=2 → initial + 2 recircuit-and-retry = 3 fetches.
|
||||
assert_eq!(fetch_n, 3);
|
||||
assert_eq!(recircuits, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_transient_then_ok_within_budget() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetch_n = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetch_n += 1;
|
||||
let n = fetch_n;
|
||||
async move {
|
||||
if n < 3 {
|
||||
Ok(TRANSIENT_HTML.to_string())
|
||||
} else {
|
||||
Ok(OK_HTML.to_string())
|
||||
}
|
||||
}
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
||||
assert_eq!(fetch_n, 3);
|
||||
assert_eq!(recircuits, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_transient_exhausts_budget_returns_persistent() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetch_n = 0u32;
|
||||
let outcome = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetch_n += 1;
|
||||
async { Ok(TRANSIENT_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect("ok-result");
|
||||
assert!(matches!(outcome, ChapterFetchOutcome::PersistentTransient));
|
||||
assert_eq!(fetch_n, 4, "budget=3 → 1 initial + 3 retries");
|
||||
assert_eq!(recircuits, 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recircuit_loop_propagates_fetch_errors() {
|
||||
let mut fetch_n = 0u32;
|
||||
let err = fetch_chapter_html_with_recircuit(
|
||||
|| {
|
||||
fetch_n += 1;
|
||||
async { Err(anyhow::anyhow!("nav timeout")) }
|
||||
},
|
||||
|| async {},
|
||||
3,
|
||||
"https://example/c",
|
||||
)
|
||||
.await
|
||||
.expect_err("fetch error bubbles");
|
||||
assert_eq!(fetch_n, 1);
|
||||
assert!(format!("{err:#}").contains("nav timeout"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,37 +162,117 @@ const PROBE_RETRY_DELAY: Duration = Duration::from_secs(2);
|
||||
/// limiter. The trade is worth it — failing here costs ~1s; failing 30
|
||||
/// minutes into a backfill costs 30 minutes.
|
||||
pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> {
|
||||
let mut attempt = 0u32;
|
||||
verify_session_with_recircuit(browser, probe_url, None, 0).await
|
||||
}
|
||||
|
||||
/// Like [`verify_session`] but, when `tor` is `Some`, signals
|
||||
/// `SIGNAL NEWNYM` between retries on transient pages AND treats
|
||||
/// `Unauthenticated` as a recoverable failure (up to
|
||||
/// `unauth_max_recircuit` recircuit cycles before giving up). The bare
|
||||
/// `verify_session` is `verify_session_with_recircuit(..., None, 0)`.
|
||||
///
|
||||
/// When `tor` is `None`, `unauth_max_recircuit` is ignored — `Unauth`
|
||||
/// stays a hard fail, matching the original behavior.
|
||||
pub async fn verify_session_with_recircuit(
|
||||
browser: &Browser,
|
||||
probe_url: &str,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
unauth_max_recircuit: u32,
|
||||
) -> anyhow::Result<()> {
|
||||
let effective_unauth_budget = if tor.is_some() { unauth_max_recircuit } else { 0 };
|
||||
run_session_probe_loop(
|
||||
|| fetch_probe_html(browser, probe_url),
|
||||
|| async {
|
||||
if let Some(t) = tor {
|
||||
if let Err(e) = t.new_identity().await {
|
||||
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
|
||||
}
|
||||
}
|
||||
},
|
||||
PROBE_MAX_ATTEMPTS,
|
||||
effective_unauth_budget,
|
||||
PROBE_RETRY_DELAY,
|
||||
probe_url,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Pure-over-IO loop body for the session probe. Generic over the
|
||||
/// fetch and recircuit closures so it can be unit-tested without a
|
||||
/// real browser or TOR daemon.
|
||||
///
|
||||
/// Semantics:
|
||||
/// - `SessionProbe::Ok` → return `Ok(())`.
|
||||
/// - `SessionProbe::Unauthenticated` → if `unauth_max_recircuit > 0`
|
||||
/// and budget remaining, call `recircuit` + sleep + retry. Otherwise
|
||||
/// bail with the "PHPSESSID expired" diagnostic, mentioning the
|
||||
/// recircuit count so a TOR-misconfig diagnosis is easier.
|
||||
/// - `SessionProbe::Transient` → up to `transient_max_attempts` total
|
||||
/// tries, calling `recircuit` between each. After the cap, bail with
|
||||
/// the "site down or rate-limiting" diagnostic.
|
||||
async fn run_session_probe_loop<F, Fut, R, RFut>(
|
||||
mut fetch_html: F,
|
||||
mut recircuit: R,
|
||||
transient_max_attempts: u32,
|
||||
unauth_max_recircuit: u32,
|
||||
retry_delay: Duration,
|
||||
probe_url_for_msg: &str,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = anyhow::Result<String>>,
|
||||
R: FnMut() -> RFut,
|
||||
RFut: std::future::Future<Output = ()>,
|
||||
{
|
||||
let mut transient_attempts = 0u32;
|
||||
let mut unauth_recircuits = 0u32;
|
||||
loop {
|
||||
attempt += 1;
|
||||
let html = fetch_probe_html(browser, probe_url).await?;
|
||||
let html = fetch_html().await?;
|
||||
match classify_probe(&html) {
|
||||
SessionProbe::Ok => {
|
||||
tracing::info!(attempt, "session probe ok — #logo + #avatar_menu present");
|
||||
tracing::info!(
|
||||
transient_attempts,
|
||||
unauth_recircuits,
|
||||
"session probe ok — #logo + #avatar_menu present"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
SessionProbe::Unauthenticated => {
|
||||
if unauth_recircuits < unauth_max_recircuit {
|
||||
unauth_recircuits += 1;
|
||||
tracing::warn!(
|
||||
attempt = unauth_recircuits,
|
||||
max = unauth_max_recircuit,
|
||||
"session probe Unauthenticated despite PHPSESSID; signaling TOR \
|
||||
NEWNYM and retrying"
|
||||
);
|
||||
recircuit().await;
|
||||
tokio::time::sleep(retry_delay).await;
|
||||
continue;
|
||||
}
|
||||
return Err(anyhow!(
|
||||
"session probe failed — #avatar_menu not present at {probe_url} \
|
||||
(page rendered the normal layout); PHPSESSID is missing, expired, \
|
||||
or revoked. Refresh CRAWLER_PHPSESSID and re-run."
|
||||
"session probe failed — #avatar_menu not present at {probe_url_for_msg} \
|
||||
after {unauth_recircuits} TOR recircuit(s); PHPSESSID is missing, \
|
||||
expired, or revoked. Refresh CRAWLER_PHPSESSID and re-run."
|
||||
));
|
||||
}
|
||||
SessionProbe::Transient if attempt < PROBE_MAX_ATTEMPTS => {
|
||||
tracing::warn!(
|
||||
attempt,
|
||||
max_attempts = PROBE_MAX_ATTEMPTS,
|
||||
"session probe got a transient page; retrying"
|
||||
);
|
||||
tokio::time::sleep(PROBE_RETRY_DELAY).await;
|
||||
}
|
||||
SessionProbe::Transient => {
|
||||
return Err(anyhow!(
|
||||
"session probe failed — probe page at {probe_url} returned a \
|
||||
broken-page response after {PROBE_MAX_ATTEMPTS} attempts. \
|
||||
The site appears to be down or rate-limiting us; try again \
|
||||
later before refreshing CRAWLER_PHPSESSID."
|
||||
));
|
||||
transient_attempts += 1;
|
||||
if transient_attempts >= transient_max_attempts {
|
||||
return Err(anyhow!(
|
||||
"session probe failed — probe page at {probe_url_for_msg} returned \
|
||||
a broken-page response after {transient_max_attempts} attempts. \
|
||||
The site appears to be down or rate-limiting us; try again \
|
||||
later before refreshing CRAWLER_PHPSESSID."
|
||||
));
|
||||
}
|
||||
tracing::warn!(
|
||||
attempt = transient_attempts,
|
||||
max_attempts = transient_max_attempts,
|
||||
"session probe got a transient page; recircuit + retry"
|
||||
);
|
||||
recircuit().await;
|
||||
tokio::time::sleep(retry_delay).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -336,6 +416,202 @@ mod tests {
|
||||
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
|
||||
}
|
||||
|
||||
// --- run_session_probe_loop -----------------------------------------
|
||||
//
|
||||
// These tests exercise the recircuit-aware loop without a real
|
||||
// browser. The fetch and recircuit closures are mocked over Vecs of
|
||||
// canned outcomes / counters.
|
||||
|
||||
const OK_HTML: &str = r#"<html><body><div id="logo"></div><div id="avatar_menu"></div></body></html>"#;
|
||||
const UNAUTH_HTML: &str = r#"<html><body><div id="logo"></div></body></html>"#;
|
||||
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_ok_on_first_attempt_does_not_recircuit() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut fetched = 0u32;
|
||||
run_session_probe_loop(
|
||||
|| {
|
||||
fetched += 1;
|
||||
async { Ok(OK_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
3,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect("ok on first attempt");
|
||||
assert_eq!(fetched, 1);
|
||||
assert_eq!(recircuits, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_unauth_then_ok_when_recircuit_budget_available() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut call = 0u32;
|
||||
run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
let n = call;
|
||||
async move {
|
||||
if n == 1 {
|
||||
Ok(UNAUTH_HTML.to_string())
|
||||
} else {
|
||||
Ok(OK_HTML.to_string())
|
||||
}
|
||||
}
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
3,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect("recovers after one recircuit");
|
||||
assert_eq!(call, 2);
|
||||
assert_eq!(recircuits, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_unauth_with_zero_recircuit_budget_fails_fast() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut call = 0u32;
|
||||
let err = run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
async { Ok(UNAUTH_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect_err("zero budget → fail");
|
||||
assert_eq!(call, 1, "no retry when budget is 0");
|
||||
assert_eq!(recircuits, 0);
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("Refresh CRAWLER_PHPSESSID"), "msg: {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_unauth_after_exhausting_budget_emits_recircuit_count() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut call = 0u32;
|
||||
let err = run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
async { Ok(UNAUTH_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
10, // transient budget irrelevant here
|
||||
2,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect_err("exhausts unauth budget");
|
||||
// 3 fetches total: initial + 2 recircuit-and-retry
|
||||
assert_eq!(call, 3);
|
||||
assert_eq!(recircuits, 2);
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("2 TOR recircuit"), "expected recircuit count in error, got: {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_transient_repeats_until_max_then_errors() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut call = 0u32;
|
||||
let err = run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
async { Ok(TRANSIENT_HTML.to_string()) }
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect_err("transient until max → fail");
|
||||
assert_eq!(call, 3);
|
||||
// recircuit fires between attempts: 3 attempts → 2 recircuits.
|
||||
assert_eq!(recircuits, 2);
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("broken-page response after 3 attempts"), "msg: {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_transient_then_ok_returns_ok_after_one_recircuit() {
|
||||
let mut recircuits = 0u32;
|
||||
let mut call = 0u32;
|
||||
run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
let n = call;
|
||||
async move {
|
||||
if n == 1 {
|
||||
Ok(TRANSIENT_HTML.to_string())
|
||||
} else {
|
||||
Ok(OK_HTML.to_string())
|
||||
}
|
||||
}
|
||||
},
|
||||
|| {
|
||||
recircuits += 1;
|
||||
async {}
|
||||
},
|
||||
3,
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect("ok on second try");
|
||||
assert_eq!(call, 2);
|
||||
assert_eq!(recircuits, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_loop_propagates_fetch_errors_immediately() {
|
||||
let mut call = 0u32;
|
||||
let err = run_session_probe_loop(
|
||||
|| {
|
||||
call += 1;
|
||||
async { Err(anyhow!("nav timeout")) }
|
||||
},
|
||||
|| async {},
|
||||
5,
|
||||
5,
|
||||
Duration::from_millis(0),
|
||||
"https://example/probe",
|
||||
)
|
||||
.await
|
||||
.expect_err("fetch error bubbles");
|
||||
assert_eq!(call, 1);
|
||||
assert!(format!("{err:#}").contains("nav timeout"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
|
||||
// Defensive: if a broken-page body somehow contains an
|
||||
|
||||
@@ -18,7 +18,7 @@ use super::{
|
||||
SourceMangaRef,
|
||||
};
|
||||
use crate::crawler::detect::{
|
||||
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
|
||||
has_logo_sentinel, is_broken_page_body, retry_on_transient_with_hook, PageError,
|
||||
};
|
||||
use crate::crawler::nav::{wait_for_nav, wait_for_selector, NavError, SELECTOR_TIMEOUT};
|
||||
|
||||
@@ -79,12 +79,13 @@ impl Source for TargetSource {
|
||||
// and the HTML is handed straight to the first `next_batch` call
|
||||
// so the walker doesn't re-fetch it. Page count is discovered
|
||||
// incrementally — see `TargetSourceWalker::next_batch`.
|
||||
let first_html = retry_on_transient(
|
||||
let first_html = retry_on_transient_with_hook(
|
||||
|| async {
|
||||
navigate(ctx, self.base_url.as_str(), LIST_PAGE_MARKER).await
|
||||
},
|
||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||
PAGE_TRANSIENT_RETRY_DELAY,
|
||||
|| async { recircuit_if_configured(ctx.tor).await },
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -169,7 +170,7 @@ impl DiscoverWalk for TargetSourceWalker {
|
||||
parse_manga_list_from(&doc)?
|
||||
}
|
||||
None => {
|
||||
retry_on_transient(
|
||||
retry_on_transient_with_hook(
|
||||
|| async {
|
||||
let html = navigate(
|
||||
ctx,
|
||||
@@ -182,12 +183,13 @@ impl DiscoverWalk for TargetSourceWalker {
|
||||
},
|
||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||
PAGE_TRANSIENT_RETRY_DELAY,
|
||||
|| async { recircuit_if_configured(ctx.tor).await },
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
} else {
|
||||
retry_on_transient(
|
||||
retry_on_transient_with_hook(
|
||||
|| async {
|
||||
let url = page_url(&self.base_url, page_num);
|
||||
let html = navigate(ctx, &url, LIST_PAGE_MARKER).await?;
|
||||
@@ -196,6 +198,7 @@ impl DiscoverWalk for TargetSourceWalker {
|
||||
},
|
||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||
PAGE_TRANSIENT_RETRY_DELAY,
|
||||
|| async { recircuit_if_configured(ctx.tor).await },
|
||||
)
|
||||
.await?
|
||||
};
|
||||
@@ -274,6 +277,20 @@ fn classify_navigate_html(html: String) -> Result<String, PageError> {
|
||||
Ok(html)
|
||||
}
|
||||
|
||||
/// Hook for [`retry_on_transient_with_hook`]: when TOR is configured,
|
||||
/// signal `NEWNYM` so the next navigation draws a fresh exit. Errors
|
||||
/// from the controller are logged and swallowed — failing to recircuit
|
||||
/// shouldn't take down the crawl, the next attempt just runs on the
|
||||
/// same circuit as before.
|
||||
async fn recircuit_if_configured(tor: Option<&crate::crawler::tor::TorController>) {
|
||||
if let Some(t) = tor {
|
||||
if let Err(e) = t.new_identity().await {
|
||||
tracing::warn!(error = %e, "TOR NEWNYM failed; retrying on same circuit");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Substitutes the first `/N/` path segment with the target page
|
||||
/// number. Source impls that paginate via a different URL shape can
|
||||
/// override this — for the modeled site the segment is always present.
|
||||
|
||||
Reference in New Issue
Block a user