diff --git a/backend/src/crawler/browser_manager.rs b/backend/src/crawler/browser_manager.rs index 8bc7cd4..47c5ac0 100644 --- a/backend/src/crawler/browser_manager.rs +++ b/backend/src/crawler/browser_manager.rs @@ -13,7 +13,7 @@ //! until [`BrowserManager::shutdown`]. use std::ops::Deref; -use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -103,6 +103,10 @@ pub struct BrowserManager { /// Serialises coordinated restarts so concurrent requests collapse into /// a single relaunch. restart_lock: Mutex<()>, + /// Result of the most recent relaunch, so a caller that coalesced into + /// an in-progress restart reports that restart's real outcome instead + /// of a blind success. + last_restart_ok: AtomicBool, } struct Inner { @@ -128,6 +132,7 @@ impl BrowserManager { phase: AtomicU8::new(PHASE_HEALTHY), resume: Notify::new(), restart_lock: Mutex::new(()), + last_restart_ok: AtomicBool::new(true), }) } @@ -211,12 +216,17 @@ impl BrowserManager { /// relaunch errors — so a failed restart never permanently wedges /// acquisition (the next acquire retries the launch lazily). pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> { - // Dedup: if a restart is already running, wait for it and return. + // Dedup: if a restart is already running, wait for it and report + // that restart's real outcome (not a blind success). let _restart_guard = match self.restart_lock.try_lock() { Ok(g) => g, Err(_) => { let _ = self.restart_lock.lock().await; - return Ok(()); + return if self.last_restart_ok.load(Ordering::Acquire) { + Ok(()) + } else { + Err(anyhow::anyhow!("a concurrent coordinated browser restart failed")) + }; } }; @@ -233,6 +243,7 @@ impl BrowserManager { self.launch_into(&mut guard).await }; + self.last_restart_ok.store(relaunch.is_ok(), Ordering::Release); self.set_phase(RestartPhase::Healthy); self.resume.notify_waiters(); match &relaunch { diff --git a/backend/src/crawler/content.rs b/backend/src/crawler/content.rs index 42e42d1..4039469 100644 --- a/backend/src/crawler/content.rs +++ b/backend/src/crawler/content.rs @@ -186,11 +186,12 @@ where } } -/// Fetch all images for one chapter and persist them atomically. On -/// any error after the first storage put, the DB transaction rolls -/// back so the chapter stays at `page_count = 0` and is retried on the -/// next run. Bytes already written to storage become orphans; a future -/// reaper sweeps them. +/// Fetch one chapter's images and persist them. Each image is streamed to +/// storage as it's fetched (peak memory ≈ one image, not the whole +/// chapter); the page rows + `page_count` are then written in one short +/// transaction. On any failure the chapter stays at `page_count = 0` (no +/// partial rows) and the blobs already written are deleted best-effort by +/// [`cleanup_orphans`], so a retry starts clean. #[allow(clippy::too_many_arguments)] pub async fn sync_chapter_content( browser: &chromiumoxide::Browser, diff --git a/backend/src/crawler/session_control.rs b/backend/src/crawler/session_control.rs index 91e060a..6558316 100644 --- a/backend/src/crawler/session_control.rs +++ b/backend/src/crawler/session_control.rs @@ -75,6 +75,13 @@ impl SessionController { pub async fn update(&self, sid: &str) -> anyhow::Result<()> { let sid = sid.trim().to_string(); anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty"); + // The value is spliced into a cookie string and a CDP CookieParam. + // Reject control chars and cookie delimiters so a pasted value + // can't smuggle extra attributes / break out of the cookie. + anyhow::ensure!( + sid.chars().all(|c| !c.is_control() && c != ';' && c != ','), + "PHPSESSID contains invalid characters" + ); if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) { let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); @@ -120,3 +127,41 @@ async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> { .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + + fn controller(db: PgPool) -> Arc { + SessionController::new( + None, + Arc::new(reqwest::cookie::Jar::default()), + Some("example.com".into()), + Some("https://example.com/".into()), + db, + Arc::new(AtomicBool::new(true)), + ) + } + + #[sqlx::test(migrations = "./migrations")] + async fn update_rejects_empty_and_control_chars(pool: PgPool) { + let c = controller(pool); + assert!(c.update(" ").await.is_err(), "empty rejected"); + assert!(c.update("abc\r\ndef").await.is_err(), "CRLF rejected"); + assert!(c.update("ab;Domain=evil").await.is_err(), "semicolon rejected"); + assert!(c.update("x,y").await.is_err(), "comma rejected"); + } + + #[sqlx::test(migrations = "./migrations")] + async fn update_persists_and_clears_expired_then_round_trips(pool: PgPool) { + let c = controller(pool.clone()); + c.update("good-sid-123").await.unwrap(); + assert_eq!(c.current().await.as_deref(), Some("good-sid-123")); + assert!(!c.is_expired(), "update clears the expired flag"); + // Persisted to crawler_state and readable by a fresh load. + assert_eq!( + SessionController::load_persisted(&pool).await.as_deref(), + Some("good-sid-123") + ); + } +} diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index 86f032c..84d3a43 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -714,68 +714,63 @@ pub enum RequeueScope { /// Requeue dead jobs back to `pending` with a fresh attempt budget. This is /// an explicit operator override, so it bypasses the dead-letter quarantine -/// the enqueue helpers honour (we act directly on the row). Skips any dead -/// job whose chapter already has a `pending`/`running` job so the partial -/// dedup index is never violated. Returns the number of rows requeued. +/// the enqueue helpers honour (we act directly on the row). Returns the +/// number of rows requeued. +/// +/// Two invariants protect the partial unique dedup index +/// `crawler_jobs_chapter_content_dedup_idx` (one `pending|running` +/// sync_chapter_content job per chapter): +/// 1. A chapter that already has a live (`pending|running`) job is +/// skipped entirely (`NO_LIVE_DUP`). +/// 2. When a chapter has *multiple* dead jobs, only the newest is +/// revived (`DISTINCT ON` the chapter key) — without this, flipping +/// two dead rows for the same chapter to `pending` in one statement +/// would violate the index and abort the whole requeue. Non-chapter +/// jobs fall back to their row id so each stays distinct. pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result { - // Guard against resurrecting a dead job when a live one already covers - // the same chapter (would otherwise hit the dedup unique index). - const NO_LIVE_DUP: &str = r#" - AND NOT EXISTS ( - SELECT 1 FROM crawler_jobs live - WHERE live.payload->>'kind' = 'sync_chapter_content' - AND live.payload->>'chapter_id' = crawler_jobs.payload->>'chapter_id' - AND live.state IN ('pending','running') - ) - "#; - const SET: &str = "SET state = 'pending', attempts = 0, leased_until = NULL, \ - last_error = NULL, scheduled_at = now(), updated_at = now()"; - - let affected = match scope { - RequeueScope::All => { - sqlx::query(&format!( - "UPDATE crawler_jobs {SET} WHERE state = 'dead' {NO_LIVE_DUP}" - )) - .execute(pool) - .await? - .rows_affected() - } - RequeueScope::Manga(manga_id) => { - sqlx::query(&format!( - "UPDATE crawler_jobs {SET} \ - WHERE state = 'dead' \ - AND (payload->>'chapter_id')::uuid IN \ - (SELECT id FROM chapters WHERE manga_id = $1) \ - {NO_LIVE_DUP}" - )) - .bind(manga_id) - .execute(pool) - .await? - .rows_affected() - } - RequeueScope::Chapter(chapter_id) => { - sqlx::query(&format!( - "UPDATE crawler_jobs {SET} \ - WHERE state = 'dead' \ - AND (payload->>'chapter_id')::uuid = $1 \ - {NO_LIVE_DUP}" - )) - .bind(chapter_id) - .execute(pool) - .await? - .rows_affected() - } - RequeueScope::Job(job_id) => { - sqlx::query(&format!( - "UPDATE crawler_jobs {SET} WHERE state = 'dead' AND id = $1 {NO_LIVE_DUP}" - )) - .bind(job_id) - .execute(pool) - .await? - .rows_affected() + // Scope predicate spliced into the `pick` CTE. Only compile-time + // literals are interpolated; all values are bound below. + let scope_pred: &str = match scope { + RequeueScope::All => "", + RequeueScope::Manga(_) => { + "AND (cj.payload->>'chapter_id')::uuid IN \ + (SELECT id FROM chapters WHERE manga_id = $1)" } + RequeueScope::Chapter(_) => "AND (cj.payload->>'chapter_id')::uuid = $1", + RequeueScope::Job(_) => "AND cj.id = $1", }; - Ok(affected) + + let sql = format!( + r#" + WITH pick AS ( + SELECT DISTINCT ON (COALESCE(cj.payload->>'chapter_id', cj.id::text)) cj.id + FROM crawler_jobs cj + WHERE cj.state = 'dead' + {scope_pred} + AND NOT EXISTS ( + SELECT 1 FROM crawler_jobs live + WHERE live.payload->>'kind' = 'sync_chapter_content' + AND live.payload->>'chapter_id' = cj.payload->>'chapter_id' + AND live.state IN ('pending','running') + ) + ORDER BY COALESCE(cj.payload->>'chapter_id', cj.id::text), cj.updated_at DESC + ) + UPDATE crawler_jobs + SET state = 'pending', attempts = 0, leased_until = NULL, + last_error = NULL, scheduled_at = now(), updated_at = now() + FROM pick + WHERE crawler_jobs.id = pick.id + "# + ); + + let mut q = sqlx::query(&sql); + match scope { + RequeueScope::All => {} + RequeueScope::Manga(id) | RequeueScope::Chapter(id) | RequeueScope::Job(id) => { + q = q.bind(id); + } + } + Ok(q.execute(pool).await?.rows_affected()) } /// Count crawler jobs grouped by state — drives the dashboard queue diff --git a/backend/tests/api_admin_crawler.rs b/backend/tests/api_admin_crawler.rs index e7dcd41..bbedf77 100644 --- a/backend/tests/api_admin_crawler.rs +++ b/backend/tests/api_admin_crawler.rs @@ -121,6 +121,24 @@ async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) { } } +#[sqlx::test(migrations = "./migrations")] +async fn mutating_endpoints_reject_non_admin(pool: PgPool) { + let h = harness(pool); + // A logged-in non-admin must be forbidden from a mutating endpoint. + let (_u, cookie) = register_user(&h.app).await; + let resp = h + .app + .clone() + .oneshot(post_json_with_cookie( + "/api/v1/admin/crawler/dead-jobs/requeue", + json!({ "scope": "all" }), + &cookie, + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); +} + #[sqlx::test(migrations = "./migrations")] async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) { let job_id = seed_dead_job(&pool, "Bleach").await; diff --git a/backend/tests/crawler_dead_jobs.rs b/backend/tests/crawler_dead_jobs.rs index 3642728..8e32095 100644 --- a/backend/tests/crawler_dead_jobs.rs +++ b/backend/tests/crawler_dead_jobs.rs @@ -170,6 +170,38 @@ async fn requeue_skips_dead_when_live_job_exists_for_same_chapter(pool: PgPool) assert_eq!(state_of(&pool, dead).await, "dead"); } +#[sqlx::test(migrations = "./migrations")] +async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: PgPool) { + // Regression: two dead jobs for the SAME chapter must not both flip to + // pending in one statement — that would violate the partial unique + // dedup index and abort the whole requeue. + let (manga_id, c1) = seed_chapter(&pool, "A", 1).await; + let older = insert_job(&pool, c1, "dead", 5).await; + let newer = insert_job(&pool, c1, "dead", 5).await; + // Make `newer` unambiguously newer. + sqlx::query("UPDATE crawler_jobs SET updated_at = now() - interval '1 hour' WHERE id = $1") + .bind(older) + .execute(&pool) + .await + .unwrap(); + + for scope in [RequeueScope::All, RequeueScope::Manga(manga_id), RequeueScope::Chapter(c1)] { + // Reset to two-dead before each scope variant. + sqlx::query("UPDATE crawler_jobs SET state = 'dead' WHERE id = ANY($1)") + .bind(vec![older, newer]) + .execute(&pool) + .await + .unwrap(); + let n = crawler::requeue_dead_jobs(&pool, scope) + .await + .expect("requeue must not error on duplicate dead jobs"); + assert_eq!(n, 1, "exactly one dead job per chapter is revived"); + // The newest one is the survivor; the other stays dead. + assert_eq!(state_of(&pool, newer).await, "pending"); + assert_eq!(state_of(&pool, older).await, "dead"); + } +} + #[sqlx::test(migrations = "./migrations")] async fn job_state_counts_groups_by_state(pool: PgPool) { let (_m, c1) = seed_chapter(&pool, "A", 1).await;