feat(crawler): reliability fixes — heartbeat, streaming, jitter, timeout, breaker

A1 Lease heartbeat: jobs::renew keeps a long-but-healthy job's lease fresh
so it is never stolen mid-flight nor inflated toward max_attempts.
A2 Stream chapter pages straight to storage (peak memory = one image) and
persist rows + page_count in one short transaction off the network path
(S3-ready); roll back stored blobs on failure via Storage::delete.
A3 ±20% jitter on exponential backoff to avoid a retry thundering herd.
A4 Outer per-dispatch timeout (CRAWLER_JOB_TIMEOUT_SECS, default 600) so a
hung job is acked-failed instead of wedging a worker.
A5 Metadata circuit-breaker (CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES,
default 10): abort a pass on a source outage without marking a clean exit,
so the next tick recovery-sweeps.

Adds CRAWLER_BROWSER_RESTART_THRESHOLD config (used by the upcoming
coordinated browser restart). Bumps version 0.52.0 -> 0.53.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-03 20:13:17 +02:00
parent 679abae736
commit 7a6815661f
12 changed files with 578 additions and 71 deletions

2
backend/Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]] [[package]]
name = "mangalord" name = "mangalord"
version = "0.52.0" version = "0.53.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"argon2", "argon2",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "mangalord" name = "mangalord"
version = "0.52.0" version = "0.53.0"
edition = "2021" edition = "2021"
default-run = "mangalord" default-run = "mangalord"

View File

@@ -210,6 +210,7 @@ async fn spawn_crawler_daemon(
manga_limit: cfg.manga_limit, manga_limit: cfg.manga_limit,
download_allowlist: cfg.download_allowlist.clone(), download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes, max_image_bytes: cfg.max_image_bytes,
metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures,
tor: tor.as_ref().map(Arc::clone), tor: tor.as_ref().map(Arc::clone),
}); });
m m
@@ -267,6 +268,7 @@ async fn spawn_crawler_daemon(
tz: cfg.tz, tz: cfg.tz,
retention_days: cfg.retention_days, retention_days: cfg.retention_days,
session_expired, session_expired,
job_timeout: cfg.job_timeout,
extra_tasks: vec![reaper_task, shutdown_task], extra_tasks: vec![reaper_task, shutdown_task],
}, },
); );
@@ -292,6 +294,7 @@ struct RealMetadataPass {
manga_limit: usize, manga_limit: usize,
download_allowlist: DownloadAllowlist, download_allowlist: DownloadAllowlist,
max_image_bytes: usize, max_image_bytes: usize,
metadata_max_consecutive_failures: u32,
tor: Option<Arc<crate::crawler::tor::TorController>>, tor: Option<Arc<crate::crawler::tor::TorController>>,
} }
@@ -309,6 +312,7 @@ impl MetadataPass for RealMetadataPass {
false, false,
&self.download_allowlist, &self.download_allowlist,
self.max_image_bytes, self.max_image_bytes,
self.metadata_max_consecutive_failures,
self.tor.as_deref(), self.tor.as_deref(),
) )
.await; .await;

View File

@@ -303,6 +303,9 @@ async fn run(
skip_chapters, skip_chapters,
allowlist.as_ref(), allowlist.as_ref(),
max_image_bytes, max_image_bytes,
// Circuit-breaker disabled for the operator-driven CLI: a manual
// sweep should push through transient failures, not self-abort.
0,
tor.as_deref(), tor.as_deref(),
) )
.await?; .await?;

View File

@@ -132,6 +132,19 @@ pub struct CrawlerConfig {
/// (full sweep up to the source's own bound). Sourced from /// (full sweep up to the source's own bound). Sourced from
/// `CRAWLER_LIMIT`, mirroring the CLI binary. /// `CRAWLER_LIMIT`, mirroring the CLI binary.
pub manga_limit: usize, pub manga_limit: usize,
/// Hard upper bound on a single chapter-content job dispatch. A job
/// exceeding this is acked failed (exponential backoff) instead of
/// wedging a worker. Defaults to 600s. `CRAWLER_JOB_TIMEOUT_SECS`.
pub job_timeout: Duration,
/// Consecutive `fetch_manga` failures that abort a metadata pass
/// (circuit-breaker for a source outage). The pass does NOT mark a
/// clean exit, so the next tick does a recovery sweep. Defaults to
/// 10. `CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES`.
pub metadata_max_consecutive_failures: u32,
/// Consecutive transient chapter failures (after TOR recircuit is
/// exhausted) that trigger an automatic coordinated browser restart.
/// Defaults to 3. `CRAWLER_BROWSER_RESTART_THRESHOLD`.
pub browser_restart_threshold: u32,
} }
impl Default for CrawlerConfig { impl Default for CrawlerConfig {
@@ -159,6 +172,9 @@ impl Default for CrawlerConfig {
download_allowlist: DownloadAllowlist::new(), download_allowlist: DownloadAllowlist::new(),
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES, max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
manga_limit: 0, manga_limit: 0,
job_timeout: Duration::from_secs(600),
metadata_max_consecutive_failures: 10,
browser_restart_threshold: 3,
} }
} }
} }
@@ -283,6 +299,13 @@ impl CrawlerConfig {
download_allowlist, download_allowlist,
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES), max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
manga_limit: env_usize("CRAWLER_LIMIT", 0), manga_limit: env_usize("CRAWLER_LIMIT", 0),
job_timeout: Duration::from_secs(env_u64("CRAWLER_JOB_TIMEOUT_SECS", 600).max(1)),
metadata_max_consecutive_failures: env_u64(
"CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES",
10,
) as u32,
browser_restart_threshold: env_u64("CRAWLER_BROWSER_RESTART_THRESHOLD", 3).max(1)
as u32,
}) })
} }
} }
@@ -384,6 +407,33 @@ mod tests {
assert_eq!(cfg.manga_limit, 0); assert_eq!(cfg.manga_limit, 0);
} }
#[test]
fn reliability_knobs_default_when_unset() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
let cfg = CrawlerConfig::from_env().expect("from_env");
assert_eq!(cfg.job_timeout, Duration::from_secs(600));
assert_eq!(cfg.metadata_max_consecutive_failures, 10);
assert_eq!(cfg.browser_restart_threshold, 3);
}
#[test]
fn reliability_knobs_parse_from_env() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
std::env::set_var("CRAWLER_JOB_TIMEOUT_SECS", "120");
std::env::set_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES", "5");
std::env::set_var("CRAWLER_BROWSER_RESTART_THRESHOLD", "7");
let cfg = CrawlerConfig::from_env().expect("from_env");
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
assert_eq!(cfg.job_timeout, Duration::from_secs(120));
assert_eq!(cfg.metadata_max_consecutive_failures, 5);
assert_eq!(cfg.browser_restart_threshold, 7);
}
#[test] #[test]
fn private_mode_env_parses_true() { fn private_mode_env_parses_true() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner()); let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());

View File

@@ -260,56 +260,118 @@ pub async fn sync_chapter_content(
// Resolve image URLs against the chapter URL (they may be relative). // Resolve image URLs against the chapter URL (they may be relative).
let base = reqwest::Url::parse(source_url).context("parse chapter URL")?; let base = reqwest::Url::parse(source_url).context("parse chapter URL")?;
// Fetch every image bytes-first into memory before writing // Stream each image straight to storage as it's fetched, capping peak
// anything. Lets us bail the whole chapter cleanly if any image // memory at a single image rather than the whole chapter. Track the
// fails — DB stays at page_count=0, no partial rows persisted. // keys written so they can be rolled back if a later page (or the
let mut fetched: Vec<(i32, Vec<u8>, &'static str)> = Vec::with_capacity(images.len()); // final DB commit) fails — preserving the all-or-nothing guarantee
// without holding a DB transaction open across the network puts
// (which matters once `Storage` is backed by S3).
let mut written_keys: Vec<String> = Vec::with_capacity(images.len());
let mut stored: Vec<StoredPage> = Vec::with_capacity(images.len());
for img in &images { for img in &images {
let url = base.join(&img.url).with_context(|| { match download_and_store_page(
format!("join image URL {} onto {source_url}", img.url) storage,
})?;
rate.wait_for(url.as_str()).await?;
let bytes = fetch_bytes_capped(
http, http,
url.as_str(), rate,
Some(source_url), &base,
source_url,
manga_id,
chapter_id,
img,
allowlist, allowlist,
max_image_bytes, max_image_bytes,
) )
.await? .await
.to_vec(); {
// Reject any non-image response: the only valid output of an Ok(page) => {
// image URL is an image. `infer` returns None on truncated written_keys.push(page.storage_key.clone());
// bytes too, which also wants to be a failure not a silent stored.push(page);
// `.bin` extension. }
if !looks_like_image(&bytes) { Err(e) => {
anyhow::bail!( cleanup_orphans(storage, &written_keys).await;
"image URL {url} returned non-image bytes \ return Err(e);
(first 16: {:?}); refusing to store as binary blob", }
&bytes.get(..16.min(bytes.len()))
);
} }
let ext = infer::get(&bytes)
.map(|k| k.extension())
.expect("looks_like_image asserted infer succeeded");
fetched.push((img.page_number, bytes, ext));
} }
// Atomic write: storage puts + page row inserts + page_count // Short transaction: page rows + page_count only, no network I/O. On
// update, all in one transaction. If anything fails, rollback + // failure, roll back the stored bytes so the chapter stays at
// the chapter is retried next run. Storage orphans the bytes; a // page_count=0 and is retried cleanly next run.
// reaper sweeps them later. if let Err(e) = persist_pages(db, chapter_id, &stored).await {
let mut tx = db.begin().await.context("open chapter sync tx")?; cleanup_orphans(storage, &written_keys).await;
for (page_number, bytes, ext) in &fetched { return Err(e);
let key = format!( }
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
page_number Ok(SyncOutcome::Fetched { pages: stored.len() })
}
/// A page image that has been written to storage and is awaiting its DB
/// row. Carries everything `persist_pages` needs.
pub(crate) struct StoredPage {
page_number: i32,
storage_key: String,
content_type: String,
}
/// Download a single page image, validate it's really an image, and write
/// it to storage. Returns the storage key + content type. Does not touch
/// the DB — persistence is batched into one short transaction afterward.
#[allow(clippy::too_many_arguments)]
async fn download_and_store_page(
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
base: &reqwest::Url,
source_url: &str,
manga_id: Uuid,
chapter_id: Uuid,
img: &ChapterImage,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
) -> anyhow::Result<StoredPage> {
let url = base
.join(&img.url)
.with_context(|| format!("join image URL {} onto {source_url}", img.url))?;
rate.wait_for(url.as_str()).await?;
let bytes = fetch_bytes_capped(http, url.as_str(), Some(source_url), allowlist, max_image_bytes)
.await?;
// Reject any non-image response: the only valid output of an image URL
// is an image. `infer` returns None on truncated bytes too, which also
// wants to be a failure not a silent `.bin` extension.
if !looks_like_image(&bytes) {
anyhow::bail!(
"image URL {url} returned non-image bytes \
(first 16: {:?}); refusing to store as binary blob",
&bytes.get(..16.min(bytes.len()))
); );
storage }
.put(&key, bytes) let ext = infer::get(&bytes)
.await .map(|k| k.extension())
.with_context(|| format!("put {key}"))?; .expect("looks_like_image asserted infer succeeded");
// (chapter_id, page_number) is unique — re-runs idempotent. let key = format!(
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
img.page_number
);
storage
.put(&key, &bytes)
.await
.with_context(|| format!("put {key}"))?;
Ok(StoredPage {
page_number: img.page_number,
storage_key: key,
content_type: format!("image/{ext}"),
})
}
/// Persist the page rows + chapter `page_count` in one short transaction.
/// `(chapter_id, page_number)` is unique so re-runs are idempotent.
pub(crate) async fn persist_pages(
db: &PgPool,
chapter_id: Uuid,
stored: &[StoredPage],
) -> anyhow::Result<()> {
let mut tx = db.begin().await.context("open chapter sync tx")?;
for page in stored {
sqlx::query( sqlx::query(
"INSERT INTO pages (chapter_id, page_number, storage_key, content_type) "INSERT INTO pages (chapter_id, page_number, storage_key, content_type)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
@@ -318,22 +380,36 @@ pub async fn sync_chapter_content(
content_type = EXCLUDED.content_type", content_type = EXCLUDED.content_type",
) )
.bind(chapter_id) .bind(chapter_id)
.bind(page_number) .bind(page.page_number)
.bind(&key) .bind(&page.storage_key)
.bind(format!("image/{ext}")) .bind(&page.content_type)
.execute(&mut *tx) .execute(&mut *tx)
.await .await
.with_context(|| format!("insert page row {page_number}"))?; .with_context(|| format!("insert page row {}", page.page_number))?;
} }
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2") sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
.bind(fetched.len() as i32) .bind(stored.len() as i32)
.bind(chapter_id) .bind(chapter_id)
.execute(&mut *tx) .execute(&mut *tx)
.await .await
.context("update page_count")?; .context("update page_count")?;
tx.commit().await.context("commit chapter sync")?; tx.commit().await.context("commit chapter sync")?;
Ok(())
}
Ok(SyncOutcome::Fetched { pages: fetched.len() }) /// Best-effort delete of partially-written page blobs after a chapter sync
/// fails, so a retry doesn't accumulate orphans. Errors are logged, not
/// raised — a leftover blob is harmless and a future reaper can sweep it.
pub(crate) async fn cleanup_orphans(storage: &dyn Storage, keys: &[String]) {
for key in keys {
if let Err(e) = storage.delete(key).await {
tracing::warn!(
%key,
error = ?e,
"failed to delete orphaned page blob after chapter sync failure"
);
}
}
} }
// Suppress unused-import warning for `session::registrable_domain` // Suppress unused-import warning for `session::registrable_domain`
@@ -347,6 +423,90 @@ fn _keep_session_in_scope() {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::storage::LocalStorage;
#[tokio::test]
async fn cleanup_orphans_deletes_written_keys() {
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorage::new(dir.path());
let keys = vec![
"mangas/m/chapters/c/pages/0001.jpg".to_string(),
"mangas/m/chapters/c/pages/0002.jpg".to_string(),
];
for k in &keys {
storage.put(k, b"\xff\xd8\xff\xe0 jpeg-ish").await.unwrap();
assert!(storage.exists(k).await.unwrap());
}
cleanup_orphans(&storage, &keys).await;
for k in &keys {
assert!(!storage.exists(k).await.unwrap(), "{k} should be deleted");
}
}
#[tokio::test]
async fn cleanup_orphans_tolerates_missing_keys() {
// A key that was never written (e.g. the put itself failed) must
// not make cleanup error — it's best-effort.
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorage::new(dir.path());
cleanup_orphans(&storage, &["never/written.jpg".to_string()]).await;
}
#[sqlx::test(migrations = "./migrations")]
async fn persist_pages_inserts_rows_and_sets_page_count(pool: PgPool) {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, 'T')")
.bind(manga_id)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
.bind(chapter_id)
.bind(manga_id)
.execute(&pool)
.await
.unwrap();
let stored = vec![
StoredPage {
page_number: 1,
storage_key: "k/0001.jpg".into(),
content_type: "image/jpeg".into(),
},
StoredPage {
page_number: 2,
storage_key: "k/0002.jpg".into(),
content_type: "image/jpeg".into(),
},
];
persist_pages(&pool, chapter_id, &stored).await.unwrap();
let page_count: i32 =
sqlx::query_scalar("SELECT page_count FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(page_count, 2);
let rows: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(rows, 2);
// Idempotent re-run (force refetch path): same rows, page_count stable.
persist_pages(&pool, chapter_id, &stored).await.unwrap();
let rows2: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(rows2, 2, "re-run is idempotent via ON CONFLICT");
}
#[test] #[test]
fn parse_chapter_pages_skips_loader_and_sorts_by_id() { fn parse_chapter_pages_skips_loader_and_sorts_by_id() {

View File

@@ -56,6 +56,15 @@ pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244;
const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at"; const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at";
/// Lease window handed to `jobs::lease`. Kept short, but continuously
/// extended by the per-job heartbeat (see [`WorkerContext::process_lease`])
/// so a long-but-healthy job never lapses and gets stolen.
const LEASE_DURATION: Duration = Duration::from_secs(60);
/// How often the heartbeat renews the lease while a job runs. A third of
/// the lease window leaves two missed-beat's slack before expiry.
const LEASE_HEARTBEAT: Duration = Duration::from_secs(20);
#[async_trait] #[async_trait]
pub trait MetadataPass: Send + Sync { pub trait MetadataPass: Send + Sync {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>; async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>;
@@ -77,6 +86,11 @@ pub struct DaemonConfig {
pub tz: Tz, pub tz: Tz,
pub retention_days: u32, pub retention_days: u32,
pub session_expired: Arc<AtomicBool>, pub session_expired: Arc<AtomicBool>,
/// Hard upper bound on a single job's dispatch. A job that exceeds it
/// is acked failed (exponential backoff) rather than wedging a worker
/// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic
/// single-job runtime.
pub job_timeout: Duration,
/// Tasks that should run alongside the cron + workers and be cancelled /// Tasks that should run alongside the cron + workers and be cancelled
/// on shutdown. Used to hand the daemon ownership of the browser /// on shutdown. Used to hand the daemon ownership of the browser
/// manager's idle reaper. /// manager's idle reaper.
@@ -123,6 +137,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
tz, tz,
retention_days, retention_days,
session_expired, session_expired,
job_timeout,
extra_tasks, extra_tasks,
} = cfg; } = cfg;
@@ -146,6 +161,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
cancel: cancel.clone(), cancel: cancel.clone(),
dispatcher: Arc::clone(&dispatcher), dispatcher: Arc::clone(&dispatcher),
session_expired: Arc::clone(&session_expired), session_expired: Arc::clone(&session_expired),
job_timeout,
id: worker_id, id: worker_id,
}; };
join.spawn(async move { ctx.run().await }); join.spawn(async move { ctx.run().await });
@@ -283,6 +299,7 @@ struct WorkerContext {
cancel: CancellationToken, cancel: CancellationToken,
dispatcher: Arc<dyn ChapterDispatcher>, dispatcher: Arc<dyn ChapterDispatcher>,
session_expired: Arc<AtomicBool>, session_expired: Arc<AtomicBool>,
job_timeout: Duration,
id: usize, id: usize,
} }
@@ -303,7 +320,7 @@ impl WorkerContext {
&self.pool, &self.pool,
Some(KIND_SYNC_CHAPTER_CONTENT), Some(KIND_SYNC_CHAPTER_CONTENT),
1, 1,
Duration::from_secs(60), LEASE_DURATION,
) )
.await .await
{ {
@@ -341,9 +358,54 @@ impl WorkerContext {
} }
} }
let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) // Heartbeat: keep the lease fresh while the (potentially long)
.catch_unwind() // dispatch runs, so a slow-but-healthy job is never re-leased and
.await; // never inflates `attempts` toward `max_attempts`. Stops itself
// once the job is no longer ours (renew returns false).
let heartbeat = {
let hb_pool = self.pool.clone();
let hb_id = lease.id;
tokio::spawn(async move {
loop {
tokio::time::sleep(LEASE_HEARTBEAT).await;
match jobs::renew(&hb_pool, hb_id, LEASE_DURATION).await {
Ok(true) => {}
Ok(false) => break,
Err(e) => {
tracing::warn!(lease_id = %hb_id, ?e, "heartbeat renew failed");
}
}
}
})
};
// Outer timeout: a dispatch that exceeds `job_timeout` is acked
// failed (exponential backoff) rather than wedging the worker.
let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind();
let outcome = tokio::time::timeout(self.job_timeout, dispatch).await;
heartbeat.abort();
let outcome = match outcome {
Ok(o) => o,
Err(_elapsed) => {
tracing::warn!(
worker = self.id,
lease_id = %lease.id,
timeout_secs = self.job_timeout.as_secs(),
"worker: dispatch timed out — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
"dispatch timed out",
lease.attempts,
lease.max_attempts,
)
.await;
return;
}
};
match outcome { match outcome {
Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => { Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => {
let _ = jobs::ack_done(&self.pool, lease.id).await; let _ = jobs::ack_done(&self.pool, lease.id).await;

View File

@@ -66,16 +66,33 @@ pub struct Lease {
pub max_attempts: i32, pub max_attempts: i32,
} }
/// Exponential backoff for `ack_failed` retries. `attempts` is the /// Deterministic exponential backoff base for `ack_failed` retries.
/// post-increment value reported by `lease()` (so the first failure has /// `attempts` is the post-increment value reported by `lease()` (so the
/// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to /// first failure has `attempts == 1` and waits 60s, the second 120s,
/// avoid runaway long sleeps that would outlive the daemon process. /// etc.). Capped at 1h to avoid runaway long sleeps that would outlive
fn backoff_for(attempts: i32) -> Duration { /// the daemon process. Jitter is applied separately by [`apply_jitter`].
fn backoff_base(attempts: i32) -> Duration {
let shift = attempts.saturating_sub(1).clamp(0, 20) as u32; let shift = attempts.saturating_sub(1).clamp(0, 20) as u32;
let secs = 60u64.saturating_mul(1u64 << shift); let secs = 60u64.saturating_mul(1u64 << shift);
Duration::from_secs(secs.min(3600)) Duration::from_secs(secs.min(3600))
} }
/// Apply ±20% jitter to a backoff duration. `jitter` is a fraction in
/// `[0.0, 1.0)` (e.g. `rand::random::<f64>()`), mapped to a multiplier in
/// `[0.8, 1.2)`. Pure so the bounds stay unit-testable. Spreading retries
/// avoids a thundering herd when a source outage fails many jobs at once.
fn apply_jitter(base: Duration, jitter: f64) -> Duration {
let frac = jitter.clamp(0.0, 1.0);
let mult = 0.8 + 0.4 * frac; // [0.8, 1.2)
Duration::from_secs((base.as_secs_f64() * mult).round() as u64)
}
/// Jittered exponential backoff for `ack_failed`. Wraps [`backoff_base`]
/// with a random ±20% spread.
fn backoff_for(attempts: i32) -> Duration {
apply_jitter(backoff_base(attempts), rand::random::<f64>())
}
/// Insert a new pending job. For `SyncChapterContent` payloads the /// Insert a new pending job. For `SyncChapterContent` payloads the
/// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks /// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks
/// a second `(pending|running)` insert per chapter_id, returning /// a second `(pending|running)` insert per chapter_id, returning
@@ -159,6 +176,35 @@ pub async fn lease(
Ok(leases) Ok(leases)
} }
/// Extend the lease on a still-owned `running` job. Returns `true` if the
/// row was updated (we still hold the lease), `false` if the job is no
/// longer `running` (re-leased after a missed heartbeat, or already
/// acked) — the caller's heartbeat loop should stop. The `state =
/// 'running'` guard mirrors [`ack_done`]'s rationale.
///
/// This is the heartbeat primitive: a worker renews periodically while a
/// long-but-healthy job runs so `leased_until` never lapses, which would
/// otherwise let another worker steal the in-flight job and spuriously
/// inflate `attempts` toward `max_attempts`.
pub async fn renew(
pool: &PgPool,
lease_id: Uuid,
lease_duration: Duration,
) -> sqlx::Result<bool> {
let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64;
let res = sqlx::query(
"UPDATE crawler_jobs \
SET leased_until = now() + ($2::bigint || ' milliseconds')::interval, \
updated_at = now() \
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.bind(lease_ms)
.execute(pool)
.await?;
Ok(res.rows_affected() > 0)
}
/// Mark a leased job as successfully completed. The `state = 'running'` /// Mark a leased job as successfully completed. The `state = 'running'`
/// predicate guards against a late ack from a worker whose lease expired /// predicate guards against a late ack from a worker whose lease expired
/// and was already re-leased by another worker: without it, the late ack /// and was already re-leased by another worker: without it, the late ack
@@ -278,19 +324,48 @@ mod tests {
use super::*; use super::*;
#[test] #[test]
fn backoff_grows_exponentially_and_caps_at_one_hour() { fn backoff_base_grows_exponentially_and_caps_at_one_hour() {
// attempts == 1 → 60s, doubling each step. // attempts == 1 → 60s, doubling each step.
assert_eq!(backoff_for(1), Duration::from_secs(60)); assert_eq!(backoff_base(1), Duration::from_secs(60));
assert_eq!(backoff_for(2), Duration::from_secs(120)); assert_eq!(backoff_base(2), Duration::from_secs(120));
assert_eq!(backoff_for(3), Duration::from_secs(240)); assert_eq!(backoff_base(3), Duration::from_secs(240));
assert_eq!(backoff_for(4), Duration::from_secs(480)); assert_eq!(backoff_base(4), Duration::from_secs(480));
assert_eq!(backoff_for(5), Duration::from_secs(960)); assert_eq!(backoff_base(5), Duration::from_secs(960));
assert_eq!(backoff_for(6), Duration::from_secs(1920)); assert_eq!(backoff_base(6), Duration::from_secs(1920));
// 7th: 60 * 64 = 3840 → capped to 3600. // 7th: 60 * 64 = 3840 → capped to 3600.
assert_eq!(backoff_for(7), Duration::from_secs(3600)); assert_eq!(backoff_base(7), Duration::from_secs(3600));
assert_eq!(backoff_for(20), Duration::from_secs(3600)); assert_eq!(backoff_base(20), Duration::from_secs(3600));
// Garbage / zero / negatives stay sane. // Garbage / zero / negatives stay sane.
assert_eq!(backoff_for(0), Duration::from_secs(60)); assert_eq!(backoff_base(0), Duration::from_secs(60));
assert_eq!(backoff_for(-5), Duration::from_secs(60)); assert_eq!(backoff_base(-5), Duration::from_secs(60));
}
#[test]
fn apply_jitter_stays_within_plus_minus_twenty_percent() {
let base = Duration::from_secs(100);
// Lower bound (jitter = 0.0) → 0.8x.
assert_eq!(apply_jitter(base, 0.0), Duration::from_secs(80));
// Midpoint (jitter = 0.5) → 1.0x.
assert_eq!(apply_jitter(base, 0.5), Duration::from_secs(100));
// Upper end (jitter → 1.0) → ~1.2x.
assert_eq!(apply_jitter(base, 1.0), Duration::from_secs(120));
// Out-of-range inputs are clamped, never panic.
assert_eq!(apply_jitter(base, -3.0), Duration::from_secs(80));
assert_eq!(apply_jitter(base, 9.0), Duration::from_secs(120));
}
#[test]
fn backoff_for_random_jitter_stays_in_band() {
// The production wrapper draws its own randomness; assert the
// result for a mid-range attempt always lands within the jitter
// band of the base, across many draws.
let base = backoff_base(3).as_secs_f64(); // 240s
for _ in 0..1000 {
let v = backoff_for(3).as_secs_f64();
assert!(
v >= base * 0.8 - 1.0 && v <= base * 1.2 + 1.0,
"jittered backoff {v} outside band of base {base}"
);
}
} }
} }

View File

@@ -65,6 +65,17 @@ pub(crate) fn should_mark_clean_exit(
walked_to_completion || hit_stop_condition walked_to_completion || hit_stop_condition
} }
/// Circuit-breaker: abort the walk once `consecutive` `fetch_manga`
/// failures reach `threshold`. A `threshold` of 0 disables the breaker
/// (unbounded — the legacy behaviour). When it fires the caller must NOT
/// mark a clean exit, so the next tick does a recovery sweep over the
/// catalog tail the aborted pass never reached.
///
/// Pure so the rule is unit-testable without the walker.
pub(crate) fn should_abort_pass(consecutive: u32, threshold: u32) -> bool {
threshold > 0 && consecutive >= threshold
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
/// for the target source. Pure metadata; chapter content is enqueued as /// for the target source. Pure metadata; chapter content is enqueued as
/// separate `SyncChapterContent` jobs by the caller after this returns. /// separate `SyncChapterContent` jobs by the caller after this returns.
@@ -103,6 +114,7 @@ pub async fn run_metadata_pass(
skip_chapters: bool, skip_chapters: bool,
allowlist: &DownloadAllowlist, allowlist: &DownloadAllowlist,
max_image_bytes: usize, max_image_bytes: usize,
max_consecutive_failures: u32,
tor: Option<&crate::crawler::tor::TorController>, tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<MetadataStats> { ) -> anyhow::Result<MetadataStats> {
let lease = browser_manager let lease = browser_manager
@@ -165,6 +177,11 @@ pub async fn run_metadata_pass(
let mut walked_to_completion = false; let mut walked_to_completion = false;
let mut hit_limit = false; let mut hit_limit = false;
let mut hit_stop_condition = false; let mut hit_stop_condition = false;
// Circuit-breaker state: consecutive fetch_manga failures. A sustained
// run abort (source outage) leaves the pass un-clean → recovery sweep
// next tick.
let mut consecutive_failures = 0u32;
let mut hit_failure_breaker = false;
'outer: loop { 'outer: loop {
let batch = match walker.next_batch(&ctx).await? { let batch = match walker.next_batch(&ctx).await? {
@@ -204,7 +221,10 @@ pub async fn run_metadata_pass(
"fetching metadata" "fetching metadata"
); );
let manga = match source.fetch_manga(&ctx, &r).await { let manga = match source.fetch_manga(&ctx, &r).await {
Ok(m) => m, Ok(m) => {
consecutive_failures = 0;
m
}
Err(e) => { Err(e) => {
tracing::warn!( tracing::warn!(
key = %r.source_manga_key, key = %r.source_manga_key,
@@ -213,6 +233,17 @@ pub async fn run_metadata_pass(
"fetch_manga failed" "fetch_manga failed"
); );
stats.mangas_failed += 1; stats.mangas_failed += 1;
consecutive_failures += 1;
if should_abort_pass(consecutive_failures, max_consecutive_failures) {
hit_failure_breaker = true;
tracing::error!(
consecutive_failures,
threshold = max_consecutive_failures,
"metadata pass: too many consecutive fetch_manga failures; \
aborting (recovery sweep on next tick)"
);
break 'outer;
}
continue; continue;
} }
}; };
@@ -390,6 +421,7 @@ pub async fn run_metadata_pass(
walked_to_completion, walked_to_completion,
hit_limit, hit_limit,
hit_stop_condition, hit_stop_condition,
hit_failure_breaker,
exited_cleanly, exited_cleanly,
"metadata pass complete" "metadata pass complete"
); );
@@ -756,6 +788,18 @@ mod tests {
assert!(!should_stop(false, UpsertStatus::New, None)); assert!(!should_stop(false, UpsertStatus::New, None));
} }
#[test]
fn abort_pass_fires_at_threshold_and_respects_disable() {
// Disabled (0) never fires, no matter how many failures.
assert!(!should_abort_pass(0, 0));
assert!(!should_abort_pass(100, 0));
// Below threshold: keep going.
assert!(!should_abort_pass(9, 10));
// At/above threshold: abort.
assert!(should_abort_pass(10, 10));
assert!(should_abort_pass(11, 10));
}
#[test] #[test]
fn clean_exit_when_walked_to_completion() { fn clean_exit_when_walked_to_completion() {
// End-of-walk reached the catalog tail — the recovery flag may // End-of-walk reached the catalog tail — the recovery flag may

View File

@@ -40,6 +40,7 @@ fn make_cfg(
tz: Tz::UTC, tz: Tz::UTC,
retention_days: 7, retention_days: 7,
session_expired, session_expired,
job_timeout: Duration::from_secs(60),
extra_tasks: Vec::new(), extra_tasks: Vec::new(),
} }
} }
@@ -88,6 +89,52 @@ impl ChapterDispatcher for PanickingDispatcher {
} }
} }
/// Never completes — used to verify the worker's outer dispatch timeout.
struct HangingDispatcher {
seen: AtomicUsize,
}
#[async_trait::async_trait]
impl ChapterDispatcher for HangingDispatcher {
async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result<SyncOutcome> {
self.seen.fetch_add(1, Ordering::AcqRel);
std::future::pending::<()>().await;
unreachable!("hanging dispatcher never resolves");
}
}
#[sqlx::test(migrations = "./migrations")]
async fn worker_times_out_a_hung_dispatch_and_acks_failed(pool: PgPool) {
enqueue_chapter_job(&pool).await;
let dispatcher = Arc::new(HangingDispatcher {
seen: AtomicUsize::new(0),
});
let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let cancel = CancellationToken::new();
let mut cfg = make_cfg(None, dispatcher.clone(), session_expired, 1);
cfg.job_timeout = Duration::from_millis(300);
let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg);
// The hung job should time out and return to pending with backoff
// (attempts=1 < max=5). Poll for the recorded error.
let mut timed_out = false;
for _ in 0..40 {
let n: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM crawler_jobs WHERE last_error = 'dispatch timed out'",
)
.fetch_one(&pool)
.await
.unwrap();
if n == 1 {
timed_out = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
handle.shutdown().await;
assert!(timed_out, "hung dispatch must be acked failed with a timeout error");
assert!(dispatcher.seen.load(Ordering::Acquire) >= 1);
}
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn workers_drain_jobs_through_dispatcher(pool: PgPool) { async fn workers_drain_jobs_through_dispatcher(pool: PgPool) {
enqueue_chapter_job(&pool).await; enqueue_chapter_job(&pool).await;

View File

@@ -185,6 +185,68 @@ async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPo
assert!(leased_until > chrono::Utc::now()); assert!(leased_until > chrono::Utc::now());
} }
#[sqlx::test(migrations = "./migrations")]
async fn renew_extends_leased_until_while_running(pool: PgPool) {
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
EnqueueResult::Skipped => unreachable!(),
};
// Lease with a short window, then collapse leased_until to the recent
// past so the renew is unambiguously an extension.
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(leases.len(), 1);
sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 second' WHERE id = $1")
.bind(id)
.execute(&pool)
.await
.unwrap();
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
.await
.unwrap();
assert!(still_owned, "renew on a running job returns true");
let leased_until: chrono::DateTime<chrono::Utc> =
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert!(
leased_until > chrono::Utc::now() + chrono::Duration::seconds(60),
"leased_until pushed ~120s into the future"
);
assert_eq!(job_state(&pool, id).await, "running");
}
#[sqlx::test(migrations = "./migrations")]
async fn renew_is_noop_once_job_no_longer_running(pool: PgPool) {
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
EnqueueResult::Skipped => unreachable!(),
};
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
.await
.unwrap();
// Job completes — heartbeat should now see it's no longer ours.
jobs::ack_done(&pool, leases[0].id).await.unwrap();
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
.await
.unwrap();
assert!(!still_owned, "renew on a non-running job returns false");
assert_eq!(job_state(&pool, id).await, "done");
}
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) {
let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo")) let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo"))

View File

@@ -1,6 +1,6 @@
{ {
"name": "mangalord-frontend", "name": "mangalord-frontend",
"version": "0.52.0", "version": "0.53.0",
"private": true, "private": true,
"type": "module", "type": "module",
"scripts": { "scripts": {