diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 441ecc1..7037e40 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.52.0" +version = "0.53.0" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index f62838d..161dd14 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.52.0" +version = "0.53.0" edition = "2021" default-run = "mangalord" diff --git a/backend/src/app.rs b/backend/src/app.rs index 18d1192..bf0908b 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -210,6 +210,7 @@ async fn spawn_crawler_daemon( manga_limit: cfg.manga_limit, download_allowlist: cfg.download_allowlist.clone(), max_image_bytes: cfg.max_image_bytes, + metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures, tor: tor.as_ref().map(Arc::clone), }); m @@ -267,6 +268,7 @@ async fn spawn_crawler_daemon( tz: cfg.tz, retention_days: cfg.retention_days, session_expired, + job_timeout: cfg.job_timeout, extra_tasks: vec![reaper_task, shutdown_task], }, ); @@ -292,6 +294,7 @@ struct RealMetadataPass { manga_limit: usize, download_allowlist: DownloadAllowlist, max_image_bytes: usize, + metadata_max_consecutive_failures: u32, tor: Option>, } @@ -309,6 +312,7 @@ impl MetadataPass for RealMetadataPass { false, &self.download_allowlist, self.max_image_bytes, + self.metadata_max_consecutive_failures, self.tor.as_deref(), ) .await; diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 15ca8d4..9892048 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -303,6 +303,9 @@ async fn run( skip_chapters, allowlist.as_ref(), max_image_bytes, + // Circuit-breaker disabled for the operator-driven CLI: a manual + // sweep should push through transient failures, not self-abort. + 0, tor.as_deref(), ) .await?; diff --git a/backend/src/config.rs b/backend/src/config.rs index b54aadc..9c62bec 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -132,6 +132,19 @@ pub struct CrawlerConfig { /// (full sweep up to the source's own bound). Sourced from /// `CRAWLER_LIMIT`, mirroring the CLI binary. pub manga_limit: usize, + /// Hard upper bound on a single chapter-content job dispatch. A job + /// exceeding this is acked failed (exponential backoff) instead of + /// wedging a worker. Defaults to 600s. `CRAWLER_JOB_TIMEOUT_SECS`. + pub job_timeout: Duration, + /// Consecutive `fetch_manga` failures that abort a metadata pass + /// (circuit-breaker for a source outage). The pass does NOT mark a + /// clean exit, so the next tick does a recovery sweep. Defaults to + /// 10. `CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES`. + pub metadata_max_consecutive_failures: u32, + /// Consecutive transient chapter failures (after TOR recircuit is + /// exhausted) that trigger an automatic coordinated browser restart. + /// Defaults to 3. `CRAWLER_BROWSER_RESTART_THRESHOLD`. + pub browser_restart_threshold: u32, } impl Default for CrawlerConfig { @@ -159,6 +172,9 @@ impl Default for CrawlerConfig { download_allowlist: DownloadAllowlist::new(), max_image_bytes: DEFAULT_MAX_IMAGE_BYTES, manga_limit: 0, + job_timeout: Duration::from_secs(600), + metadata_max_consecutive_failures: 10, + browser_restart_threshold: 3, } } } @@ -283,6 +299,13 @@ impl CrawlerConfig { download_allowlist, max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES), manga_limit: env_usize("CRAWLER_LIMIT", 0), + job_timeout: Duration::from_secs(env_u64("CRAWLER_JOB_TIMEOUT_SECS", 600).max(1)), + metadata_max_consecutive_failures: env_u64( + "CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES", + 10, + ) as u32, + browser_restart_threshold: env_u64("CRAWLER_BROWSER_RESTART_THRESHOLD", 3).max(1) + as u32, }) } } @@ -384,6 +407,33 @@ mod tests { assert_eq!(cfg.manga_limit, 0); } + #[test] + fn reliability_knobs_default_when_unset() { + let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner()); + std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS"); + std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES"); + std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD"); + let cfg = CrawlerConfig::from_env().expect("from_env"); + assert_eq!(cfg.job_timeout, Duration::from_secs(600)); + assert_eq!(cfg.metadata_max_consecutive_failures, 10); + assert_eq!(cfg.browser_restart_threshold, 3); + } + + #[test] + fn reliability_knobs_parse_from_env() { + let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner()); + std::env::set_var("CRAWLER_JOB_TIMEOUT_SECS", "120"); + std::env::set_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES", "5"); + std::env::set_var("CRAWLER_BROWSER_RESTART_THRESHOLD", "7"); + let cfg = CrawlerConfig::from_env().expect("from_env"); + std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS"); + std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES"); + std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD"); + assert_eq!(cfg.job_timeout, Duration::from_secs(120)); + assert_eq!(cfg.metadata_max_consecutive_failures, 5); + assert_eq!(cfg.browser_restart_threshold, 7); + } + #[test] fn private_mode_env_parses_true() { let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner()); diff --git a/backend/src/crawler/content.rs b/backend/src/crawler/content.rs index 1549786..42e42d1 100644 --- a/backend/src/crawler/content.rs +++ b/backend/src/crawler/content.rs @@ -260,56 +260,118 @@ pub async fn sync_chapter_content( // Resolve image URLs against the chapter URL (they may be relative). let base = reqwest::Url::parse(source_url).context("parse chapter URL")?; - // Fetch every image bytes-first into memory before writing - // anything. Lets us bail the whole chapter cleanly if any image - // fails — DB stays at page_count=0, no partial rows persisted. - let mut fetched: Vec<(i32, Vec, &'static str)> = Vec::with_capacity(images.len()); + // Stream each image straight to storage as it's fetched, capping peak + // memory at a single image rather than the whole chapter. Track the + // keys written so they can be rolled back if a later page (or the + // final DB commit) fails — preserving the all-or-nothing guarantee + // without holding a DB transaction open across the network puts + // (which matters once `Storage` is backed by S3). + let mut written_keys: Vec = Vec::with_capacity(images.len()); + let mut stored: Vec = Vec::with_capacity(images.len()); for img in &images { - let url = base.join(&img.url).with_context(|| { - format!("join image URL {} onto {source_url}", img.url) - })?; - rate.wait_for(url.as_str()).await?; - let bytes = fetch_bytes_capped( + match download_and_store_page( + storage, http, - url.as_str(), - Some(source_url), + rate, + &base, + source_url, + manga_id, + chapter_id, + img, allowlist, max_image_bytes, ) - .await? - .to_vec(); - // Reject any non-image response: the only valid output of an - // image URL is an image. `infer` returns None on truncated - // bytes too, which also wants to be a failure not a silent - // `.bin` extension. - if !looks_like_image(&bytes) { - anyhow::bail!( - "image URL {url} returned non-image bytes \ - (first 16: {:?}); refusing to store as binary blob", - &bytes.get(..16.min(bytes.len())) - ); + .await + { + Ok(page) => { + written_keys.push(page.storage_key.clone()); + stored.push(page); + } + Err(e) => { + cleanup_orphans(storage, &written_keys).await; + return Err(e); + } } - let ext = infer::get(&bytes) - .map(|k| k.extension()) - .expect("looks_like_image asserted infer succeeded"); - fetched.push((img.page_number, bytes, ext)); } - // Atomic write: storage puts + page row inserts + page_count - // update, all in one transaction. If anything fails, rollback + - // the chapter is retried next run. Storage orphans the bytes; a - // reaper sweeps them later. - let mut tx = db.begin().await.context("open chapter sync tx")?; - for (page_number, bytes, ext) in &fetched { - let key = format!( - "mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}", - page_number + // Short transaction: page rows + page_count only, no network I/O. On + // failure, roll back the stored bytes so the chapter stays at + // page_count=0 and is retried cleanly next run. + if let Err(e) = persist_pages(db, chapter_id, &stored).await { + cleanup_orphans(storage, &written_keys).await; + return Err(e); + } + + Ok(SyncOutcome::Fetched { pages: stored.len() }) +} + +/// A page image that has been written to storage and is awaiting its DB +/// row. Carries everything `persist_pages` needs. +pub(crate) struct StoredPage { + page_number: i32, + storage_key: String, + content_type: String, +} + +/// Download a single page image, validate it's really an image, and write +/// it to storage. Returns the storage key + content type. Does not touch +/// the DB — persistence is batched into one short transaction afterward. +#[allow(clippy::too_many_arguments)] +async fn download_and_store_page( + storage: &dyn Storage, + http: &reqwest::Client, + rate: &HostRateLimiters, + base: &reqwest::Url, + source_url: &str, + manga_id: Uuid, + chapter_id: Uuid, + img: &ChapterImage, + allowlist: &DownloadAllowlist, + max_image_bytes: usize, +) -> anyhow::Result { + let url = base + .join(&img.url) + .with_context(|| format!("join image URL {} onto {source_url}", img.url))?; + rate.wait_for(url.as_str()).await?; + let bytes = fetch_bytes_capped(http, url.as_str(), Some(source_url), allowlist, max_image_bytes) + .await?; + // Reject any non-image response: the only valid output of an image URL + // is an image. `infer` returns None on truncated bytes too, which also + // wants to be a failure not a silent `.bin` extension. + if !looks_like_image(&bytes) { + anyhow::bail!( + "image URL {url} returned non-image bytes \ + (first 16: {:?}); refusing to store as binary blob", + &bytes.get(..16.min(bytes.len())) ); - storage - .put(&key, bytes) - .await - .with_context(|| format!("put {key}"))?; - // (chapter_id, page_number) is unique — re-runs idempotent. + } + let ext = infer::get(&bytes) + .map(|k| k.extension()) + .expect("looks_like_image asserted infer succeeded"); + let key = format!( + "mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}", + img.page_number + ); + storage + .put(&key, &bytes) + .await + .with_context(|| format!("put {key}"))?; + Ok(StoredPage { + page_number: img.page_number, + storage_key: key, + content_type: format!("image/{ext}"), + }) +} + +/// Persist the page rows + chapter `page_count` in one short transaction. +/// `(chapter_id, page_number)` is unique so re-runs are idempotent. +pub(crate) async fn persist_pages( + db: &PgPool, + chapter_id: Uuid, + stored: &[StoredPage], +) -> anyhow::Result<()> { + let mut tx = db.begin().await.context("open chapter sync tx")?; + for page in stored { sqlx::query( "INSERT INTO pages (chapter_id, page_number, storage_key, content_type) VALUES ($1, $2, $3, $4) @@ -318,22 +380,36 @@ pub async fn sync_chapter_content( content_type = EXCLUDED.content_type", ) .bind(chapter_id) - .bind(page_number) - .bind(&key) - .bind(format!("image/{ext}")) + .bind(page.page_number) + .bind(&page.storage_key) + .bind(&page.content_type) .execute(&mut *tx) .await - .with_context(|| format!("insert page row {page_number}"))?; + .with_context(|| format!("insert page row {}", page.page_number))?; } sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2") - .bind(fetched.len() as i32) + .bind(stored.len() as i32) .bind(chapter_id) .execute(&mut *tx) .await .context("update page_count")?; tx.commit().await.context("commit chapter sync")?; + Ok(()) +} - Ok(SyncOutcome::Fetched { pages: fetched.len() }) +/// Best-effort delete of partially-written page blobs after a chapter sync +/// fails, so a retry doesn't accumulate orphans. Errors are logged, not +/// raised — a leftover blob is harmless and a future reaper can sweep it. +pub(crate) async fn cleanup_orphans(storage: &dyn Storage, keys: &[String]) { + for key in keys { + if let Err(e) = storage.delete(key).await { + tracing::warn!( + %key, + error = ?e, + "failed to delete orphaned page blob after chapter sync failure" + ); + } + } } // Suppress unused-import warning for `session::registrable_domain` @@ -347,6 +423,90 @@ fn _keep_session_in_scope() { #[cfg(test)] mod tests { use super::*; + use crate::storage::LocalStorage; + + #[tokio::test] + async fn cleanup_orphans_deletes_written_keys() { + let dir = tempfile::tempdir().unwrap(); + let storage = LocalStorage::new(dir.path()); + let keys = vec![ + "mangas/m/chapters/c/pages/0001.jpg".to_string(), + "mangas/m/chapters/c/pages/0002.jpg".to_string(), + ]; + for k in &keys { + storage.put(k, b"\xff\xd8\xff\xe0 jpeg-ish").await.unwrap(); + assert!(storage.exists(k).await.unwrap()); + } + cleanup_orphans(&storage, &keys).await; + for k in &keys { + assert!(!storage.exists(k).await.unwrap(), "{k} should be deleted"); + } + } + + #[tokio::test] + async fn cleanup_orphans_tolerates_missing_keys() { + // A key that was never written (e.g. the put itself failed) must + // not make cleanup error — it's best-effort. + let dir = tempfile::tempdir().unwrap(); + let storage = LocalStorage::new(dir.path()); + cleanup_orphans(&storage, &["never/written.jpg".to_string()]).await; + } + + #[sqlx::test(migrations = "./migrations")] + async fn persist_pages_inserts_rows_and_sets_page_count(pool: PgPool) { + let manga_id = Uuid::new_v4(); + let chapter_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, 'T')") + .bind(manga_id) + .execute(&pool) + .await + .unwrap(); + sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)") + .bind(chapter_id) + .bind(manga_id) + .execute(&pool) + .await + .unwrap(); + + let stored = vec![ + StoredPage { + page_number: 1, + storage_key: "k/0001.jpg".into(), + content_type: "image/jpeg".into(), + }, + StoredPage { + page_number: 2, + storage_key: "k/0002.jpg".into(), + content_type: "image/jpeg".into(), + }, + ]; + persist_pages(&pool, chapter_id, &stored).await.unwrap(); + + let page_count: i32 = + sqlx::query_scalar("SELECT page_count FROM chapters WHERE id = $1") + .bind(chapter_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(page_count, 2); + let rows: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1") + .bind(chapter_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(rows, 2); + + // Idempotent re-run (force refetch path): same rows, page_count stable. + persist_pages(&pool, chapter_id, &stored).await.unwrap(); + let rows2: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1") + .bind(chapter_id) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(rows2, 2, "re-run is idempotent via ON CONFLICT"); + } #[test] fn parse_chapter_pages_skips_loader_and_sorts_by_id() { diff --git a/backend/src/crawler/daemon.rs b/backend/src/crawler/daemon.rs index eb3e512..f9d3611 100644 --- a/backend/src/crawler/daemon.rs +++ b/backend/src/crawler/daemon.rs @@ -56,6 +56,15 @@ pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244; const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at"; +/// Lease window handed to `jobs::lease`. Kept short, but continuously +/// extended by the per-job heartbeat (see [`WorkerContext::process_lease`]) +/// so a long-but-healthy job never lapses and gets stolen. +const LEASE_DURATION: Duration = Duration::from_secs(60); + +/// How often the heartbeat renews the lease while a job runs. A third of +/// the lease window leaves two missed-beat's slack before expiry. +const LEASE_HEARTBEAT: Duration = Duration::from_secs(20); + #[async_trait] pub trait MetadataPass: Send + Sync { async fn run(&self) -> anyhow::Result; @@ -77,6 +86,11 @@ pub struct DaemonConfig { pub tz: Tz, pub retention_days: u32, pub session_expired: Arc, + /// Hard upper bound on a single job's dispatch. A job that exceeds it + /// is acked failed (exponential backoff) rather than wedging a worker + /// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic + /// single-job runtime. + pub job_timeout: Duration, /// Tasks that should run alongside the cron + workers and be cancelled /// on shutdown. Used to hand the daemon ownership of the browser /// manager's idle reaper. @@ -123,6 +137,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem tz, retention_days, session_expired, + job_timeout, extra_tasks, } = cfg; @@ -146,6 +161,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem cancel: cancel.clone(), dispatcher: Arc::clone(&dispatcher), session_expired: Arc::clone(&session_expired), + job_timeout, id: worker_id, }; join.spawn(async move { ctx.run().await }); @@ -283,6 +299,7 @@ struct WorkerContext { cancel: CancellationToken, dispatcher: Arc, session_expired: Arc, + job_timeout: Duration, id: usize, } @@ -303,7 +320,7 @@ impl WorkerContext { &self.pool, Some(KIND_SYNC_CHAPTER_CONTENT), 1, - Duration::from_secs(60), + LEASE_DURATION, ) .await { @@ -341,9 +358,54 @@ impl WorkerContext { } } - let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) - .catch_unwind() - .await; + // Heartbeat: keep the lease fresh while the (potentially long) + // dispatch runs, so a slow-but-healthy job is never re-leased and + // never inflates `attempts` toward `max_attempts`. Stops itself + // once the job is no longer ours (renew returns false). + let heartbeat = { + let hb_pool = self.pool.clone(); + let hb_id = lease.id; + tokio::spawn(async move { + loop { + tokio::time::sleep(LEASE_HEARTBEAT).await; + match jobs::renew(&hb_pool, hb_id, LEASE_DURATION).await { + Ok(true) => {} + Ok(false) => break, + Err(e) => { + tracing::warn!(lease_id = %hb_id, ?e, "heartbeat renew failed"); + } + } + } + }) + }; + + // Outer timeout: a dispatch that exceeds `job_timeout` is acked + // failed (exponential backoff) rather than wedging the worker. + let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone())) + .catch_unwind(); + let outcome = tokio::time::timeout(self.job_timeout, dispatch).await; + heartbeat.abort(); + + let outcome = match outcome { + Ok(o) => o, + Err(_elapsed) => { + tracing::warn!( + worker = self.id, + lease_id = %lease.id, + timeout_secs = self.job_timeout.as_secs(), + "worker: dispatch timed out — ack failed" + ); + let _ = jobs::ack_failed( + &self.pool, + lease.id, + "dispatch timed out", + lease.attempts, + lease.max_attempts, + ) + .await; + return; + } + }; match outcome { Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => { let _ = jobs::ack_done(&self.pool, lease.id).await; diff --git a/backend/src/crawler/jobs.rs b/backend/src/crawler/jobs.rs index ea7eaa9..9f65d1b 100644 --- a/backend/src/crawler/jobs.rs +++ b/backend/src/crawler/jobs.rs @@ -66,16 +66,33 @@ pub struct Lease { pub max_attempts: i32, } -/// Exponential backoff for `ack_failed` retries. `attempts` is the -/// post-increment value reported by `lease()` (so the first failure has -/// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to -/// avoid runaway long sleeps that would outlive the daemon process. -fn backoff_for(attempts: i32) -> Duration { +/// Deterministic exponential backoff base for `ack_failed` retries. +/// `attempts` is the post-increment value reported by `lease()` (so the +/// first failure has `attempts == 1` and waits 60s, the second 120s, +/// etc.). Capped at 1h to avoid runaway long sleeps that would outlive +/// the daemon process. Jitter is applied separately by [`apply_jitter`]. +fn backoff_base(attempts: i32) -> Duration { let shift = attempts.saturating_sub(1).clamp(0, 20) as u32; let secs = 60u64.saturating_mul(1u64 << shift); Duration::from_secs(secs.min(3600)) } +/// Apply ±20% jitter to a backoff duration. `jitter` is a fraction in +/// `[0.0, 1.0)` (e.g. `rand::random::()`), mapped to a multiplier in +/// `[0.8, 1.2)`. Pure so the bounds stay unit-testable. Spreading retries +/// avoids a thundering herd when a source outage fails many jobs at once. +fn apply_jitter(base: Duration, jitter: f64) -> Duration { + let frac = jitter.clamp(0.0, 1.0); + let mult = 0.8 + 0.4 * frac; // [0.8, 1.2) + Duration::from_secs((base.as_secs_f64() * mult).round() as u64) +} + +/// Jittered exponential backoff for `ack_failed`. Wraps [`backoff_base`] +/// with a random ±20% spread. +fn backoff_for(attempts: i32) -> Duration { + apply_jitter(backoff_base(attempts), rand::random::()) +} + /// Insert a new pending job. For `SyncChapterContent` payloads the /// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks /// a second `(pending|running)` insert per chapter_id, returning @@ -159,6 +176,35 @@ pub async fn lease( Ok(leases) } +/// Extend the lease on a still-owned `running` job. Returns `true` if the +/// row was updated (we still hold the lease), `false` if the job is no +/// longer `running` (re-leased after a missed heartbeat, or already +/// acked) — the caller's heartbeat loop should stop. The `state = +/// 'running'` guard mirrors [`ack_done`]'s rationale. +/// +/// This is the heartbeat primitive: a worker renews periodically while a +/// long-but-healthy job runs so `leased_until` never lapses, which would +/// otherwise let another worker steal the in-flight job and spuriously +/// inflate `attempts` toward `max_attempts`. +pub async fn renew( + pool: &PgPool, + lease_id: Uuid, + lease_duration: Duration, +) -> sqlx::Result { + let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64; + let res = sqlx::query( + "UPDATE crawler_jobs \ + SET leased_until = now() + ($2::bigint || ' milliseconds')::interval, \ + updated_at = now() \ + WHERE id = $1 AND state = 'running'", + ) + .bind(lease_id) + .bind(lease_ms) + .execute(pool) + .await?; + Ok(res.rows_affected() > 0) +} + /// Mark a leased job as successfully completed. The `state = 'running'` /// predicate guards against a late ack from a worker whose lease expired /// and was already re-leased by another worker: without it, the late ack @@ -278,19 +324,48 @@ mod tests { use super::*; #[test] - fn backoff_grows_exponentially_and_caps_at_one_hour() { + fn backoff_base_grows_exponentially_and_caps_at_one_hour() { // attempts == 1 → 60s, doubling each step. - assert_eq!(backoff_for(1), Duration::from_secs(60)); - assert_eq!(backoff_for(2), Duration::from_secs(120)); - assert_eq!(backoff_for(3), Duration::from_secs(240)); - assert_eq!(backoff_for(4), Duration::from_secs(480)); - assert_eq!(backoff_for(5), Duration::from_secs(960)); - assert_eq!(backoff_for(6), Duration::from_secs(1920)); + assert_eq!(backoff_base(1), Duration::from_secs(60)); + assert_eq!(backoff_base(2), Duration::from_secs(120)); + assert_eq!(backoff_base(3), Duration::from_secs(240)); + assert_eq!(backoff_base(4), Duration::from_secs(480)); + assert_eq!(backoff_base(5), Duration::from_secs(960)); + assert_eq!(backoff_base(6), Duration::from_secs(1920)); // 7th: 60 * 64 = 3840 → capped to 3600. - assert_eq!(backoff_for(7), Duration::from_secs(3600)); - assert_eq!(backoff_for(20), Duration::from_secs(3600)); + assert_eq!(backoff_base(7), Duration::from_secs(3600)); + assert_eq!(backoff_base(20), Duration::from_secs(3600)); // Garbage / zero / negatives stay sane. - assert_eq!(backoff_for(0), Duration::from_secs(60)); - assert_eq!(backoff_for(-5), Duration::from_secs(60)); + assert_eq!(backoff_base(0), Duration::from_secs(60)); + assert_eq!(backoff_base(-5), Duration::from_secs(60)); + } + + #[test] + fn apply_jitter_stays_within_plus_minus_twenty_percent() { + let base = Duration::from_secs(100); + // Lower bound (jitter = 0.0) → 0.8x. + assert_eq!(apply_jitter(base, 0.0), Duration::from_secs(80)); + // Midpoint (jitter = 0.5) → 1.0x. + assert_eq!(apply_jitter(base, 0.5), Duration::from_secs(100)); + // Upper end (jitter → 1.0) → ~1.2x. + assert_eq!(apply_jitter(base, 1.0), Duration::from_secs(120)); + // Out-of-range inputs are clamped, never panic. + assert_eq!(apply_jitter(base, -3.0), Duration::from_secs(80)); + assert_eq!(apply_jitter(base, 9.0), Duration::from_secs(120)); + } + + #[test] + fn backoff_for_random_jitter_stays_in_band() { + // The production wrapper draws its own randomness; assert the + // result for a mid-range attempt always lands within the jitter + // band of the base, across many draws. + let base = backoff_base(3).as_secs_f64(); // 240s + for _ in 0..1000 { + let v = backoff_for(3).as_secs_f64(); + assert!( + v >= base * 0.8 - 1.0 && v <= base * 1.2 + 1.0, + "jittered backoff {v} outside band of base {base}" + ); + } } } diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index 0dc5516..3e17c3c 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -65,6 +65,17 @@ pub(crate) fn should_mark_clean_exit( walked_to_completion || hit_stop_condition } +/// Circuit-breaker: abort the walk once `consecutive` `fetch_manga` +/// failures reach `threshold`. A `threshold` of 0 disables the breaker +/// (unbounded — the legacy behaviour). When it fires the caller must NOT +/// mark a clean exit, so the next tick does a recovery sweep over the +/// catalog tail the aborted pass never reached. +/// +/// Pure so the rule is unit-testable without the walker. +pub(crate) fn should_abort_pass(consecutive: u32, threshold: u32) -> bool { + threshold > 0 && consecutive >= threshold +} + /// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline /// for the target source. Pure metadata; chapter content is enqueued as /// separate `SyncChapterContent` jobs by the caller after this returns. @@ -103,6 +114,7 @@ pub async fn run_metadata_pass( skip_chapters: bool, allowlist: &DownloadAllowlist, max_image_bytes: usize, + max_consecutive_failures: u32, tor: Option<&crate::crawler::tor::TorController>, ) -> anyhow::Result { let lease = browser_manager @@ -165,6 +177,11 @@ pub async fn run_metadata_pass( let mut walked_to_completion = false; let mut hit_limit = false; let mut hit_stop_condition = false; + // Circuit-breaker state: consecutive fetch_manga failures. A sustained + // run abort (source outage) leaves the pass un-clean → recovery sweep + // next tick. + let mut consecutive_failures = 0u32; + let mut hit_failure_breaker = false; 'outer: loop { let batch = match walker.next_batch(&ctx).await? { @@ -204,7 +221,10 @@ pub async fn run_metadata_pass( "fetching metadata" ); let manga = match source.fetch_manga(&ctx, &r).await { - Ok(m) => m, + Ok(m) => { + consecutive_failures = 0; + m + } Err(e) => { tracing::warn!( key = %r.source_manga_key, @@ -213,6 +233,17 @@ pub async fn run_metadata_pass( "fetch_manga failed" ); stats.mangas_failed += 1; + consecutive_failures += 1; + if should_abort_pass(consecutive_failures, max_consecutive_failures) { + hit_failure_breaker = true; + tracing::error!( + consecutive_failures, + threshold = max_consecutive_failures, + "metadata pass: too many consecutive fetch_manga failures; \ + aborting (recovery sweep on next tick)" + ); + break 'outer; + } continue; } }; @@ -390,6 +421,7 @@ pub async fn run_metadata_pass( walked_to_completion, hit_limit, hit_stop_condition, + hit_failure_breaker, exited_cleanly, "metadata pass complete" ); @@ -756,6 +788,18 @@ mod tests { assert!(!should_stop(false, UpsertStatus::New, None)); } + #[test] + fn abort_pass_fires_at_threshold_and_respects_disable() { + // Disabled (0) never fires, no matter how many failures. + assert!(!should_abort_pass(0, 0)); + assert!(!should_abort_pass(100, 0)); + // Below threshold: keep going. + assert!(!should_abort_pass(9, 10)); + // At/above threshold: abort. + assert!(should_abort_pass(10, 10)); + assert!(should_abort_pass(11, 10)); + } + #[test] fn clean_exit_when_walked_to_completion() { // End-of-walk reached the catalog tail — the recovery flag may diff --git a/backend/tests/crawler_daemon.rs b/backend/tests/crawler_daemon.rs index 16b308d..69c41c5 100644 --- a/backend/tests/crawler_daemon.rs +++ b/backend/tests/crawler_daemon.rs @@ -40,6 +40,7 @@ fn make_cfg( tz: Tz::UTC, retention_days: 7, session_expired, + job_timeout: Duration::from_secs(60), extra_tasks: Vec::new(), } } @@ -88,6 +89,52 @@ impl ChapterDispatcher for PanickingDispatcher { } } +/// Never completes — used to verify the worker's outer dispatch timeout. +struct HangingDispatcher { + seen: AtomicUsize, +} +#[async_trait::async_trait] +impl ChapterDispatcher for HangingDispatcher { + async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result { + self.seen.fetch_add(1, Ordering::AcqRel); + std::future::pending::<()>().await; + unreachable!("hanging dispatcher never resolves"); + } +} + +#[sqlx::test(migrations = "./migrations")] +async fn worker_times_out_a_hung_dispatch_and_acks_failed(pool: PgPool) { + enqueue_chapter_job(&pool).await; + let dispatcher = Arc::new(HangingDispatcher { + seen: AtomicUsize::new(0), + }); + let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let cancel = CancellationToken::new(); + let mut cfg = make_cfg(None, dispatcher.clone(), session_expired, 1); + cfg.job_timeout = Duration::from_millis(300); + let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg); + + // The hung job should time out and return to pending with backoff + // (attempts=1 < max=5). Poll for the recorded error. + let mut timed_out = false; + for _ in 0..40 { + let n: i64 = sqlx::query_scalar( + "SELECT COUNT(*) FROM crawler_jobs WHERE last_error = 'dispatch timed out'", + ) + .fetch_one(&pool) + .await + .unwrap(); + if n == 1 { + timed_out = true; + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + handle.shutdown().await; + assert!(timed_out, "hung dispatch must be acked failed with a timeout error"); + assert!(dispatcher.seen.load(Ordering::Acquire) >= 1); +} + #[sqlx::test(migrations = "./migrations")] async fn workers_drain_jobs_through_dispatcher(pool: PgPool) { enqueue_chapter_job(&pool).await; diff --git a/backend/tests/crawler_jobs.rs b/backend/tests/crawler_jobs.rs index 7c1fd3b..14a33de 100644 --- a/backend/tests/crawler_jobs.rs +++ b/backend/tests/crawler_jobs.rs @@ -185,6 +185,68 @@ async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPo assert!(leased_until > chrono::Utc::now()); } +#[sqlx::test(migrations = "./migrations")] +async fn renew_extends_leased_until_while_running(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + EnqueueResult::Skipped => unreachable!(), + }; + + // Lease with a short window, then collapse leased_until to the recent + // past so the renew is unambiguously an extension. + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(5)) + .await + .unwrap(); + assert_eq!(leases.len(), 1); + sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 second' WHERE id = $1") + .bind(id) + .execute(&pool) + .await + .unwrap(); + + let still_owned = jobs::renew(&pool, id, Duration::from_secs(120)) + .await + .unwrap(); + assert!(still_owned, "renew on a running job returns true"); + + let leased_until: chrono::DateTime = + sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1") + .bind(id) + .fetch_one(&pool) + .await + .unwrap(); + assert!( + leased_until > chrono::Utc::now() + chrono::Duration::seconds(60), + "leased_until pushed ~120s into the future" + ); + assert_eq!(job_state(&pool, id).await, "running"); +} + +#[sqlx::test(migrations = "./migrations")] +async fn renew_is_noop_once_job_no_longer_running(pool: PgPool) { + let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4())) + .await + .unwrap() + { + EnqueueResult::Inserted(id) => id, + EnqueueResult::Skipped => unreachable!(), + }; + let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60)) + .await + .unwrap(); + // Job completes — heartbeat should now see it's no longer ours. + jobs::ack_done(&pool, leases[0].id).await.unwrap(); + + let still_owned = jobs::renew(&pool, id, Duration::from_secs(120)) + .await + .unwrap(); + assert!(!still_owned, "renew on a non-running job returns false"); + assert_eq!(job_state(&pool, id).await, "done"); +} + #[sqlx::test(migrations = "./migrations")] async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) { let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo")) diff --git a/frontend/package.json b/frontend/package.json index f3942ff..00793c0 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.52.0", + "version": "0.53.0", "private": true, "type": "module", "scripts": {