fix(crawler): review findings — requeue dedup, restart result, session validation

- requeue_dead_jobs: when a chapter has multiple dead jobs, revive only the
  newest (DISTINCT ON the chapter key) so a single UPDATE can't flip two
  dead rows for one chapter to pending and violate the partial unique dedup
  index (was a 500 that requeued nothing). Non-chapter jobs fall back to row
  id. Regression test added. (critical)
- coordinated_restart: a caller that coalesces into an in-progress restart
  now reports that restart's real outcome instead of a blind success, so the
  session-update "valid" / restart "ok" signal can't be falsely positive.
- SessionController::update: reject control chars / ';' / ',' in PHPSESSID
  before it reaches the cookie string + CDP cookie. Test added.
- Add non-admin 403 test on a mutating crawler endpoint; fix stale
  stream-to-storage doc comment.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-03 21:07:10 +02:00
parent ec0a8f2b5d
commit 832042d2b7
6 changed files with 169 additions and 67 deletions

View File

@@ -13,7 +13,7 @@
//! until [`BrowserManager::shutdown`]. //! until [`BrowserManager::shutdown`].
use std::ops::Deref; use std::ops::Deref;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@@ -103,6 +103,10 @@ pub struct BrowserManager {
/// Serialises coordinated restarts so concurrent requests collapse into /// Serialises coordinated restarts so concurrent requests collapse into
/// a single relaunch. /// a single relaunch.
restart_lock: Mutex<()>, restart_lock: Mutex<()>,
/// Result of the most recent relaunch, so a caller that coalesced into
/// an in-progress restart reports that restart's real outcome instead
/// of a blind success.
last_restart_ok: AtomicBool,
} }
struct Inner { struct Inner {
@@ -128,6 +132,7 @@ impl BrowserManager {
phase: AtomicU8::new(PHASE_HEALTHY), phase: AtomicU8::new(PHASE_HEALTHY),
resume: Notify::new(), resume: Notify::new(),
restart_lock: Mutex::new(()), restart_lock: Mutex::new(()),
last_restart_ok: AtomicBool::new(true),
}) })
} }
@@ -211,12 +216,17 @@ impl BrowserManager {
/// relaunch errors — so a failed restart never permanently wedges /// relaunch errors — so a failed restart never permanently wedges
/// acquisition (the next acquire retries the launch lazily). /// acquisition (the next acquire retries the launch lazily).
pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> { pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> {
// Dedup: if a restart is already running, wait for it and return. // Dedup: if a restart is already running, wait for it and report
// that restart's real outcome (not a blind success).
let _restart_guard = match self.restart_lock.try_lock() { let _restart_guard = match self.restart_lock.try_lock() {
Ok(g) => g, Ok(g) => g,
Err(_) => { Err(_) => {
let _ = self.restart_lock.lock().await; let _ = self.restart_lock.lock().await;
return Ok(()); return if self.last_restart_ok.load(Ordering::Acquire) {
Ok(())
} else {
Err(anyhow::anyhow!("a concurrent coordinated browser restart failed"))
};
} }
}; };
@@ -233,6 +243,7 @@ impl BrowserManager {
self.launch_into(&mut guard).await self.launch_into(&mut guard).await
}; };
self.last_restart_ok.store(relaunch.is_ok(), Ordering::Release);
self.set_phase(RestartPhase::Healthy); self.set_phase(RestartPhase::Healthy);
self.resume.notify_waiters(); self.resume.notify_waiters();
match &relaunch { match &relaunch {

View File

@@ -186,11 +186,12 @@ where
} }
} }
/// Fetch all images for one chapter and persist them atomically. On /// Fetch one chapter's images and persist them. Each image is streamed to
/// any error after the first storage put, the DB transaction rolls /// storage as it's fetched (peak memory ≈ one image, not the whole
/// back so the chapter stays at `page_count = 0` and is retried on the /// chapter); the page rows + `page_count` are then written in one short
/// next run. Bytes already written to storage become orphans; a future /// transaction. On any failure the chapter stays at `page_count = 0` (no
/// reaper sweeps them. /// partial rows) and the blobs already written are deleted best-effort by
/// [`cleanup_orphans`], so a retry starts clean.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn sync_chapter_content( pub async fn sync_chapter_content(
browser: &chromiumoxide::Browser, browser: &chromiumoxide::Browser,

View File

@@ -75,6 +75,13 @@ impl SessionController {
pub async fn update(&self, sid: &str) -> anyhow::Result<()> { pub async fn update(&self, sid: &str) -> anyhow::Result<()> {
let sid = sid.trim().to_string(); let sid = sid.trim().to_string();
anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty"); anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty");
// The value is spliced into a cookie string and a CDP CookieParam.
// Reject control chars and cookie delimiters so a pasted value
// can't smuggle extra attributes / break out of the cookie.
anyhow::ensure!(
sid.chars().all(|c| !c.is_control() && c != ';' && c != ','),
"PHPSESSID contains invalid characters"
);
if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) { if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) {
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/"); let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
@@ -120,3 +127,41 @@ async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> {
.await?; .await?;
Ok(()) Ok(())
} }
#[cfg(test)]
mod tests {
use super::*;
fn controller(db: PgPool) -> Arc<SessionController> {
SessionController::new(
None,
Arc::new(reqwest::cookie::Jar::default()),
Some("example.com".into()),
Some("https://example.com/".into()),
db,
Arc::new(AtomicBool::new(true)),
)
}
#[sqlx::test(migrations = "./migrations")]
async fn update_rejects_empty_and_control_chars(pool: PgPool) {
let c = controller(pool);
assert!(c.update(" ").await.is_err(), "empty rejected");
assert!(c.update("abc\r\ndef").await.is_err(), "CRLF rejected");
assert!(c.update("ab;Domain=evil").await.is_err(), "semicolon rejected");
assert!(c.update("x,y").await.is_err(), "comma rejected");
}
#[sqlx::test(migrations = "./migrations")]
async fn update_persists_and_clears_expired_then_round_trips(pool: PgPool) {
let c = controller(pool.clone());
c.update("good-sid-123").await.unwrap();
assert_eq!(c.current().await.as_deref(), Some("good-sid-123"));
assert!(!c.is_expired(), "update clears the expired flag");
// Persisted to crawler_state and readable by a fresh load.
assert_eq!(
SessionController::load_persisted(&pool).await.as_deref(),
Some("good-sid-123")
);
}
}

View File

@@ -714,68 +714,63 @@ pub enum RequeueScope {
/// Requeue dead jobs back to `pending` with a fresh attempt budget. This is /// Requeue dead jobs back to `pending` with a fresh attempt budget. This is
/// an explicit operator override, so it bypasses the dead-letter quarantine /// an explicit operator override, so it bypasses the dead-letter quarantine
/// the enqueue helpers honour (we act directly on the row). Skips any dead /// the enqueue helpers honour (we act directly on the row). Returns the
/// job whose chapter already has a `pending`/`running` job so the partial /// number of rows requeued.
/// dedup index is never violated. Returns the number of rows requeued. ///
/// Two invariants protect the partial unique dedup index
/// `crawler_jobs_chapter_content_dedup_idx` (one `pending|running`
/// sync_chapter_content job per chapter):
/// 1. A chapter that already has a live (`pending|running`) job is
/// skipped entirely (`NO_LIVE_DUP`).
/// 2. When a chapter has *multiple* dead jobs, only the newest is
/// revived (`DISTINCT ON` the chapter key) — without this, flipping
/// two dead rows for the same chapter to `pending` in one statement
/// would violate the index and abort the whole requeue. Non-chapter
/// jobs fall back to their row id so each stays distinct.
pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result<u64> { pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result<u64> {
// Guard against resurrecting a dead job when a live one already covers // Scope predicate spliced into the `pick` CTE. Only compile-time
// the same chapter (would otherwise hit the dedup unique index). // literals are interpolated; all values are bound below.
const NO_LIVE_DUP: &str = r#" let scope_pred: &str = match scope {
RequeueScope::All => "",
RequeueScope::Manga(_) => {
"AND (cj.payload->>'chapter_id')::uuid IN \
(SELECT id FROM chapters WHERE manga_id = $1)"
}
RequeueScope::Chapter(_) => "AND (cj.payload->>'chapter_id')::uuid = $1",
RequeueScope::Job(_) => "AND cj.id = $1",
};
let sql = format!(
r#"
WITH pick AS (
SELECT DISTINCT ON (COALESCE(cj.payload->>'chapter_id', cj.id::text)) cj.id
FROM crawler_jobs cj
WHERE cj.state = 'dead'
{scope_pred}
AND NOT EXISTS ( AND NOT EXISTS (
SELECT 1 FROM crawler_jobs live SELECT 1 FROM crawler_jobs live
WHERE live.payload->>'kind' = 'sync_chapter_content' WHERE live.payload->>'kind' = 'sync_chapter_content'
AND live.payload->>'chapter_id' = crawler_jobs.payload->>'chapter_id' AND live.payload->>'chapter_id' = cj.payload->>'chapter_id'
AND live.state IN ('pending','running') AND live.state IN ('pending','running')
) )
"#; ORDER BY COALESCE(cj.payload->>'chapter_id', cj.id::text), cj.updated_at DESC
const SET: &str = "SET state = 'pending', attempts = 0, leased_until = NULL, \ )
last_error = NULL, scheduled_at = now(), updated_at = now()"; UPDATE crawler_jobs
SET state = 'pending', attempts = 0, leased_until = NULL,
last_error = NULL, scheduled_at = now(), updated_at = now()
FROM pick
WHERE crawler_jobs.id = pick.id
"#
);
let affected = match scope { let mut q = sqlx::query(&sql);
RequeueScope::All => { match scope {
sqlx::query(&format!( RequeueScope::All => {}
"UPDATE crawler_jobs {SET} WHERE state = 'dead' {NO_LIVE_DUP}" RequeueScope::Manga(id) | RequeueScope::Chapter(id) | RequeueScope::Job(id) => {
)) q = q.bind(id);
.execute(pool)
.await?
.rows_affected()
} }
RequeueScope::Manga(manga_id) => {
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} \
WHERE state = 'dead' \
AND (payload->>'chapter_id')::uuid IN \
(SELECT id FROM chapters WHERE manga_id = $1) \
{NO_LIVE_DUP}"
))
.bind(manga_id)
.execute(pool)
.await?
.rows_affected()
} }
RequeueScope::Chapter(chapter_id) => { Ok(q.execute(pool).await?.rows_affected())
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} \
WHERE state = 'dead' \
AND (payload->>'chapter_id')::uuid = $1 \
{NO_LIVE_DUP}"
))
.bind(chapter_id)
.execute(pool)
.await?
.rows_affected()
}
RequeueScope::Job(job_id) => {
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} WHERE state = 'dead' AND id = $1 {NO_LIVE_DUP}"
))
.bind(job_id)
.execute(pool)
.await?
.rows_affected()
}
};
Ok(affected)
} }
/// Count crawler jobs grouped by state — drives the dashboard queue /// Count crawler jobs grouped by state — drives the dashboard queue

View File

@@ -121,6 +121,24 @@ async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) {
} }
} }
#[sqlx::test(migrations = "./migrations")]
async fn mutating_endpoints_reject_non_admin(pool: PgPool) {
let h = harness(pool);
// A logged-in non-admin must be forbidden from a mutating endpoint.
let (_u, cookie) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(post_json_with_cookie(
"/api/v1/admin/crawler/dead-jobs/requeue",
json!({ "scope": "all" }),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) { async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) {
let job_id = seed_dead_job(&pool, "Bleach").await; let job_id = seed_dead_job(&pool, "Bleach").await;

View File

@@ -170,6 +170,38 @@ async fn requeue_skips_dead_when_live_job_exists_for_same_chapter(pool: PgPool)
assert_eq!(state_of(&pool, dead).await, "dead"); assert_eq!(state_of(&pool, dead).await, "dead");
} }
#[sqlx::test(migrations = "./migrations")]
async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: PgPool) {
// Regression: two dead jobs for the SAME chapter must not both flip to
// pending in one statement — that would violate the partial unique
// dedup index and abort the whole requeue.
let (manga_id, c1) = seed_chapter(&pool, "A", 1).await;
let older = insert_job(&pool, c1, "dead", 5).await;
let newer = insert_job(&pool, c1, "dead", 5).await;
// Make `newer` unambiguously newer.
sqlx::query("UPDATE crawler_jobs SET updated_at = now() - interval '1 hour' WHERE id = $1")
.bind(older)
.execute(&pool)
.await
.unwrap();
for scope in [RequeueScope::All, RequeueScope::Manga(manga_id), RequeueScope::Chapter(c1)] {
// Reset to two-dead before each scope variant.
sqlx::query("UPDATE crawler_jobs SET state = 'dead' WHERE id = ANY($1)")
.bind(vec![older, newer])
.execute(&pool)
.await
.unwrap();
let n = crawler::requeue_dead_jobs(&pool, scope)
.await
.expect("requeue must not error on duplicate dead jobs");
assert_eq!(n, 1, "exactly one dead job per chapter is revived");
// The newest one is the survivor; the other stays dead.
assert_eq!(state_of(&pool, newer).await, "pending");
assert_eq!(state_of(&pool, older).await, "dead");
}
}
#[sqlx::test(migrations = "./migrations")] #[sqlx::test(migrations = "./migrations")]
async fn job_state_counts_groups_by_state(pool: PgPool) { async fn job_state_counts_groups_by_state(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await; let (_m, c1) = seed_chapter(&pool, "A", 1).await;