feat(crawler): retry_on_transient_with_hook for between-retry side effects
Adds a sibling fn that fires a caller-supplied async hook between a transient failure and the next attempt. The existing retry_on_transient becomes a thin wrapper over it (no-op hook), so no call sites churn yet. Hook contract: fires only between attempts (N-1 times for N attempts), never after a non-transient error or after the final attempt. Designed for TOR NEWNYM, but the signature is generic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -80,13 +80,36 @@ pub fn has_logo_sentinel(doc: &scraper::Html) -> bool {
|
||||
/// caller can fall back on the job system's retry/backoff once the
|
||||
/// inline budget is exhausted.
|
||||
pub async fn retry_on_transient<F, Fut, T>(
|
||||
mut op: F,
|
||||
op: F,
|
||||
max_attempts: u32,
|
||||
delay: Duration,
|
||||
) -> Result<T, PageError>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Result<T, PageError>>,
|
||||
{
|
||||
retry_on_transient_with_hook(op, max_attempts, delay, || async {}).await
|
||||
}
|
||||
|
||||
/// Like [`retry_on_transient`] but invokes `on_retry` between a
|
||||
/// transient failure and the subsequent sleep+retry. The hook does
|
||||
/// **not** fire on the first attempt, after a non-transient error, or
|
||||
/// after the final attempt (no retry follows). Hook failures are not
|
||||
/// propagated — return `()` from the future and log inside if needed.
|
||||
///
|
||||
/// Wire the TOR controller's `new_identity` here to rotate circuits
|
||||
/// between page-fetch retries; see [`crate::crawler::tor`].
|
||||
pub async fn retry_on_transient_with_hook<F, Fut, T, H, HFut>(
|
||||
mut op: F,
|
||||
max_attempts: u32,
|
||||
delay: Duration,
|
||||
mut on_retry: H,
|
||||
) -> Result<T, PageError>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: Future<Output = Result<T, PageError>>,
|
||||
H: FnMut() -> HFut,
|
||||
HFut: Future<Output = ()>,
|
||||
{
|
||||
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
|
||||
let mut attempt = 0u32;
|
||||
@@ -101,8 +124,9 @@ where
|
||||
attempt,
|
||||
max_attempts,
|
||||
error = %e,
|
||||
"transient error; sleeping before retry"
|
||||
"transient error; running on-retry hook and sleeping before retry"
|
||||
);
|
||||
on_retry().await;
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
@@ -247,4 +271,92 @@ mod tests {
|
||||
assert_eq!(result.unwrap(), 7);
|
||||
assert_eq!(attempt, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hook_fires_once_between_transient_and_success() {
|
||||
let mut attempt = 0u32;
|
||||
let mut hook_calls = 0u32;
|
||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
||||
|| {
|
||||
attempt += 1;
|
||||
let n = attempt;
|
||||
async move {
|
||||
if n < 2 {
|
||||
Err(PageError::transient("once"))
|
||||
} else {
|
||||
Ok(99)
|
||||
}
|
||||
}
|
||||
},
|
||||
5,
|
||||
Duration::from_millis(0),
|
||||
|| {
|
||||
hook_calls += 1;
|
||||
async {}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert_eq!(result.unwrap(), 99);
|
||||
assert_eq!(attempt, 2);
|
||||
assert_eq!(hook_calls, 1, "hook fires exactly once between attempts");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hook_does_not_fire_when_first_attempt_succeeds() {
|
||||
let mut hook_calls = 0u32;
|
||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
||||
|| async { Ok(1) },
|
||||
5,
|
||||
Duration::from_millis(0),
|
||||
|| {
|
||||
hook_calls += 1;
|
||||
async {}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(hook_calls, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hook_does_not_fire_after_non_transient_error() {
|
||||
let mut hook_calls = 0u32;
|
||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
||||
|| async { Err(PageError::Other(anyhow::anyhow!("permanent"))) },
|
||||
5,
|
||||
Duration::from_millis(0),
|
||||
|| {
|
||||
hook_calls += 1;
|
||||
async {}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
assert_eq!(hook_calls, 0, "non-transient must short-circuit before hook");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn hook_does_not_fire_after_final_failed_attempt() {
|
||||
// With max_attempts=3 and three persistent transients, the hook
|
||||
// should run twice (between 1→2 and 2→3) — never a third time,
|
||||
// because no retry follows attempt 3.
|
||||
let mut attempt = 0u32;
|
||||
let mut hook_calls = 0u32;
|
||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
||||
|| {
|
||||
attempt += 1;
|
||||
async { Err(PageError::transient("always")) }
|
||||
},
|
||||
3,
|
||||
Duration::from_millis(0),
|
||||
|| {
|
||||
hook_calls += 1;
|
||||
async {}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
assert_eq!(attempt, 3);
|
||||
assert_eq!(hook_calls, 2, "hook fires N-1 times for N attempts that all fail transient");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user