feat(crawler): live status surface, runtime session, dead-job repo, auto-restart

Adds the in-process observability + control infrastructure the admin
dashboard consumes:

- status.rs: CrawlerStatus/Phase/WorkerState + StatusHandle. The daemon
  publishes its current phase (idle/walking/fetching-metadata/cover-backfill),
  per-worker activity, and last-pass summary. Wired through the cron,
  run_metadata_pass, and the worker loop.
- session_control.rs: SessionController refreshes PHPSESSID at runtime —
  rewrites the shared reqwest cookie jar, updates the value on_launch reads,
  persists to crawler_state (survives restart), and clears the expired flag.
  on_launch now reads the live session instead of a startup snapshot.
- RealChapterDispatcher auto-triggers a coordinated browser restart after
  CRAWLER_BROWSER_RESTART_THRESHOLD consecutive transient failures.
- repo::crawler: list_dead_jobs, requeue_dead_jobs (all/manga/job, bypassing
  the quarantine, skipping live duplicates), job_state_counts.
- AppState gains CrawlerControl bundling browser_manager + session + status
  + metadata_pass for the admin endpoints.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-03 20:38:54 +02:00
parent 3f91bea768
commit cd0a1e13a9
12 changed files with 787 additions and 19 deletions

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use anyhow::Context;
use async_trait::async_trait;
@@ -46,6 +46,24 @@ pub struct AppState {
/// same wiring that builds the daemon's chapter dispatcher, so a
/// force resync uses the daemon's BrowserManager + rate limiters.
pub resync: Option<Arc<dyn ResyncService>>,
/// Crawler observability + control handle (live status, coordinated
/// browser restart, runtime session, manual run). `None` when the
/// daemon is disabled; admin handlers gate on `.is_some()` → 503.
pub crawler: Option<Arc<CrawlerControl>>,
}
/// Shared handle the admin crawler endpoints use to observe and control
/// the running daemon. Bundled so the handlers take one optional field on
/// `AppState` rather than many.
pub struct CrawlerControl {
pub browser_manager: Arc<BrowserManager>,
pub session: Arc<crate::crawler::session_control::SessionController>,
pub status: crate::crawler::status::StatusHandle,
/// Used by the "run metadata pass now" endpoint; `None` when no
/// `CRAWLER_START_URL` is configured (cron disabled).
pub metadata_pass: Option<Arc<dyn MetadataPass>>,
/// Drain budget for a manually-triggered coordinated browser restart.
pub drain_deadline: std::time::Duration,
}
/// Bundle returned by [`build`]. The router is what `axum::serve` consumes;
@@ -80,12 +98,12 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
let storage: Arc<dyn Storage> = Arc::new(LocalStorage::new(config.storage_dir.clone()));
let (daemon, resync) = if config.crawler.daemon_enabled {
let (daemon, resync, crawler) = if config.crawler.daemon_enabled {
let spawned = spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?;
(Some(spawned.handle), Some(spawned.resync))
(Some(spawned.handle), Some(spawned.resync), Some(spawned.crawler))
} else {
tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)");
(None, None)
(None, None, None)
};
let auth_limiter = Arc::new(AuthRateLimiter::new(config.auth.rate_limit));
@@ -96,6 +114,7 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
upload: config.upload.clone(),
auth_limiter,
resync,
crawler,
};
let router = router(state).layer(cors_layer(&config.cors_allowed_origins));
Ok(AppHandle { router, daemon })
@@ -108,6 +127,7 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
struct SpawnedDaemon {
handle: daemon::DaemonHandle,
resync: Arc<dyn ResyncService>,
crawler: Arc<CrawlerControl>,
}
async fn spawn_crawler_daemon(
@@ -115,11 +135,17 @@ async fn spawn_crawler_daemon(
storage: Arc<dyn Storage>,
cfg: &CrawlerConfig,
) -> anyhow::Result<SpawnedDaemon> {
// Reqwest client with cookie jar pre-seeded so CDN image fetches
// include PHPSESSID. Same shape as bin/crawler.rs main().
// Reqwest client with a shared cookie jar so CDN image fetches include
// PHPSESSID. The same `Arc<Jar>` is held by the SessionController, so a
// runtime session refresh rewrites it in place. Initial value: a
// persisted runtime session (survives restart) takes precedence over
// CRAWLER_PHPSESSID env.
let cookie_jar = Arc::new(reqwest::cookie::Jar::default());
let initial_sid = crate::crawler::session_control::SessionController::load_persisted(&db)
.await
.or_else(|| cfg.phpsessid.clone());
if let (Some(sid), Some(domain), Some(start_url)) =
(&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url)
(&initial_sid, &cfg.cookie_domain, &cfg.start_url)
{
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
let seed_url = reqwest::Url::parse(start_url)
@@ -129,7 +155,7 @@ async fn spawn_crawler_daemon(
let mut http_builder = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.no_proxy()
.cookie_provider(cookie_jar);
.cookie_provider(Arc::clone(&cookie_jar));
if let Some(ua) = &cfg.user_agent {
http_builder = http_builder.user_agent(ua);
}
@@ -157,6 +183,23 @@ async fn spawn_crawler_daemon(
}
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
// Session controller + sticky session-expired flag. Created before the
// browser so the on_launch hook can read the *current* session value
// (rather than a value captured at startup), and so a runtime refresh
// updates the cookie everywhere.
let session_expired = Arc::new(AtomicBool::new(false));
let session_controller = crate::crawler::session_control::SessionController::new(
initial_sid,
Arc::clone(&cookie_jar),
cfg.cookie_domain.clone(),
cfg.start_url.clone(),
db.clone(),
Arc::clone(&session_expired),
);
// Live status surface, sized to the worker count.
let status = crate::crawler::status::StatusHandle::new(cfg.chapter_workers);
// Browser manager. on_launch re-injects PHPSESSID on every fresh
// chromium spawn so an idle teardown followed by re-launch stays
// authenticated without operator action.
@@ -165,18 +208,25 @@ async fn spawn_crawler_daemon(
let chromium_proxy = crate::crawler::url_utils::chromium_proxy_arg(proxy);
launch_opts.extra_args.push(format!("--proxy-server={chromium_proxy}"));
}
let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) {
(Some(sid), Some(domain), Some(start_url)) => {
let sid = sid.clone();
let on_launch = match (&cfg.cookie_domain, &cfg.start_url) {
(Some(domain), Some(start_url)) => {
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor.as_ref().map(Arc::clone);
let sc = Arc::clone(&session_controller);
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
let sid = sid.clone();
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
let sc = Arc::clone(&sc);
Box::pin(async move {
// Read the *current* session each launch so a runtime
// refresh is picked up on the next (re)launch. No session
// configured → run unauthenticated (metadata needs no auth).
let Some(sid) = sc.current().await else {
tracing::info!("on_launch: no session set — skipping inject + probe");
return Ok(());
};
session::inject_phpsessid(&browser, &sid, &domain)
.await
.context("on_launch: inject_phpsessid")?;
@@ -197,8 +247,6 @@ async fn spawn_crawler_daemon(
};
let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch);
let session_expired = Arc::new(AtomicBool::new(false));
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
browser_manager: Arc::clone(&browser_manager),
@@ -211,6 +259,7 @@ async fn spawn_crawler_daemon(
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures,
status: status.clone(),
tor: tor.as_ref().map(Arc::clone),
});
m
@@ -224,6 +273,9 @@ async fn spawn_crawler_daemon(
rate: Arc::clone(&rate),
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
transient_failures: Arc::new(AtomicU32::new(0)),
restart_threshold: cfg.browser_restart_threshold,
drain_deadline: cfg.job_timeout,
tor: tor.as_ref().map(Arc::clone),
});
@@ -261,21 +313,31 @@ async fn spawn_crawler_daemon(
db,
cancel,
DaemonConfig {
metadata_pass,
metadata_pass: metadata_pass.clone(),
dispatcher,
chapter_workers: cfg.chapter_workers,
daily_at: cfg.daily_at,
tz: cfg.tz,
retention_days: cfg.retention_days,
session_expired,
status: status.clone(),
job_timeout: cfg.job_timeout,
extra_tasks: vec![reaper_task, shutdown_task],
},
);
let crawler = Arc::new(CrawlerControl {
browser_manager: Arc::clone(&browser_manager),
session: session_controller,
status,
metadata_pass,
drain_deadline: cfg.job_timeout,
});
Ok(SpawnedDaemon {
handle: daemon_handle,
resync,
crawler,
})
}
@@ -295,6 +357,7 @@ struct RealMetadataPass {
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
metadata_max_consecutive_failures: u32,
status: crate::crawler::status::StatusHandle,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
@@ -313,6 +376,7 @@ impl MetadataPass for RealMetadataPass {
&self.download_allowlist,
self.max_image_bytes,
self.metadata_max_consecutive_failures,
Some(&self.status),
self.tor.as_deref(),
)
.await;
@@ -326,6 +390,9 @@ impl MetadataPass for RealMetadataPass {
// late, and a transient browser failure shouldn't cancel the
// residual cover backlog. The backfill has its own per-call cap
// so a runaway error stream can't monopolise the tick.
self.status
.set_phase(crate::crawler::status::Phase::CoverBackfill)
.await;
match pipeline::backfill_missing_covers(
&self.browser_manager,
&self.db,
@@ -363,6 +430,13 @@ struct RealChapterDispatcher {
rate: Arc<HostRateLimiters>,
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
/// Consecutive transient chapter failures; resets on any success.
/// Drives the automatic coordinated browser restart.
transient_failures: Arc<std::sync::atomic::AtomicU32>,
/// Consecutive-failure count that triggers an auto restart.
restart_threshold: u32,
/// How long a coordinated restart waits for in-flight leases to drain.
drain_deadline: std::time::Duration,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
@@ -400,10 +474,32 @@ impl ChapterDispatcher for RealChapterDispatcher {
.await;
drop(lease);
match result {
Ok(outcome) => Ok(outcome),
Ok(outcome) => {
// Any successful dispatch (including a clean Skipped)
// means the browser is healthy — reset the streak.
self.transient_failures.store(0, Ordering::Release);
Ok(outcome)
}
Err(e) => {
let streak = self.transient_failures.fetch_add(1, Ordering::AcqRel) + 1;
if crate::crawler::nav::anyhow_looks_browser_dead(&e) {
// Hard browser-dead: lazy invalidate (next acquire
// relaunches). Reset the streak — we're recovering.
self.browser_manager.invalidate().await;
self.transient_failures.store(0, Ordering::Release);
} else if self.restart_threshold > 0 && streak >= self.restart_threshold {
// Persistent transients that TOR recircuit couldn't
// fix — proactively restart Chromium.
tracing::warn!(
streak,
threshold = self.restart_threshold,
"auto browser restart: consecutive transient chapter failures"
);
let _ = self
.browser_manager
.coordinated_restart(self.drain_deadline)
.await;
self.transient_failures.store(0, Ordering::Release);
}
Err(e)
}

View File

@@ -306,6 +306,8 @@ async fn run(
// Circuit-breaker disabled for the operator-driven CLI: a manual
// sweep should push through transient failures, not self-abort.
0,
// No live status surface for the one-shot CLI.
None,
tor.as_deref(),
)
.await?;

View File

@@ -48,6 +48,7 @@ use tokio_util::sync::CancellationToken;
use crate::crawler::content::SyncOutcome;
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
use crate::crawler::pipeline;
use crate::crawler::status::{Phase, StatusHandle, WorkerState};
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
@@ -86,6 +87,8 @@ pub struct DaemonConfig {
pub tz: Tz,
pub retention_days: u32,
pub session_expired: Arc<AtomicBool>,
/// Live status surface updated by the cron + workers.
pub status: StatusHandle,
/// Hard upper bound on a single job's dispatch. A job that exceeds it
/// is acked failed (exponential backoff) rather than wedging a worker
/// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic
@@ -137,6 +140,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
tz,
retention_days,
session_expired,
status,
job_timeout,
extra_tasks,
} = cfg;
@@ -149,6 +153,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
tz,
retention_days,
metadata,
status: status.clone(),
};
join.spawn(async move { ctx.run().await });
} else {
@@ -161,6 +166,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
cancel: cancel.clone(),
dispatcher: Arc::clone(&dispatcher),
session_expired: Arc::clone(&session_expired),
status: status.clone(),
job_timeout,
id: worker_id,
};
@@ -185,6 +191,7 @@ struct CronContext {
tz: Tz,
retention_days: u32,
metadata: Arc<dyn MetadataPass>,
status: StatusHandle,
}
impl CronContext {
@@ -212,6 +219,11 @@ impl CronContext {
// (NTP step, suspend/resume) don't strand us on a stale instant.
let next = next_fire(Utc::now(), self.daily_at, self.tz);
let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO);
self.status
.set_phase(Phase::Idle {
next_fire: Some(next),
})
.await;
tracing::info!(
next_fire_utc = %next.to_rfc3339(),
wait_seconds = wait.as_secs(),
@@ -259,9 +271,13 @@ impl CronContext {
let metadata = &self.metadata;
let pool = &self.pool;
let retention_days = self.retention_days;
let status = &self.status;
let body = async move {
match metadata.run().await {
Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"),
Ok(stats) => {
status.record_pass(&stats, Utc::now()).await;
tracing::info!(?stats, "cron: metadata pass done");
}
Err(e) => tracing::error!(?e, "cron: metadata pass failed"),
}
match pipeline::enqueue_bookmarked_pending(pool).await {
@@ -299,6 +315,7 @@ struct WorkerContext {
cancel: CancellationToken,
dispatcher: Arc<dyn ChapterDispatcher>,
session_expired: Arc<AtomicBool>,
status: StatusHandle,
job_timeout: Duration,
id: usize,
}
@@ -379,12 +396,25 @@ impl WorkerContext {
})
};
// Publish what this worker is doing for the live status surface.
if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload {
self.status
.set_worker(
self.id,
WorkerState::Working {
chapter_id: *chapter_id,
},
)
.await;
}
// Outer timeout: a dispatch that exceeds `job_timeout` is acked
// failed (exponential backoff) rather than wedging the worker.
let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind();
let outcome = tokio::time::timeout(self.job_timeout, dispatch).await;
heartbeat.abort();
self.status.set_worker(self.id, WorkerState::Idle).await;
let outcome = match outcome {
Ok(o) => o,

View File

@@ -26,6 +26,8 @@ pub mod rate_limit;
pub mod resync;
pub mod safety;
pub mod session;
pub mod session_control;
pub mod source;
pub mod status;
pub mod tor;
pub mod url_utils;

View File

@@ -115,6 +115,7 @@ pub async fn run_metadata_pass(
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
max_consecutive_failures: u32,
status: Option<&crate::crawler::status::StatusHandle>,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
@@ -122,6 +123,9 @@ pub async fn run_metadata_pass(
.await
.context("acquire browser lease for metadata pass")?;
let browser_ref: &chromiumoxide::Browser = &lease;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::WalkingList).await;
}
let source = {
let s = TargetSource::new(start_url.to_string());
@@ -226,6 +230,14 @@ pub async fn run_metadata_pass(
continue;
}
stats.discovered += 1;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::FetchingMetadata {
index: stats.discovered,
total: max_refs,
title: r.title.clone(),
})
.await;
}
tracing::info!(
idx = stats.discovered,
key = %r.source_manga_key,

View File

@@ -0,0 +1,122 @@
//! Runtime-updatable crawler session (PHPSESSID).
//!
//! At startup the session comes from `CRAWLER_PHPSESSID`, but it expires
//! and previously needed a container restart to refresh. This controller
//! lets an admin push a fresh cookie at runtime: it rewrites the reqwest
//! cookie jar (CDN image fetches), updates the in-memory value the browser
//! `on_launch` hook reads, persists it to `crawler_state` (so it survives
//! a restart), and clears the sticky `session_expired` flag. A subsequent
//! coordinated browser restart re-runs `on_launch`, re-injecting the new
//! cookie into Chromium and re-probing.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Context;
use serde_json::json;
use sqlx::PgPool;
use tokio::sync::RwLock;
const STATE_KEY_RUNTIME_SESSION: &str = "runtime_session";
pub struct SessionController {
/// Current PHPSESSID — what `on_launch` injects into a fresh browser.
phpsessid: RwLock<Option<String>>,
/// The same `Arc<Jar>` handed to the reqwest client; updating it here
/// updates the client's cookies (the jar is internally mutable).
cookie_jar: Arc<reqwest::cookie::Jar>,
cookie_domain: Option<String>,
start_url: Option<String>,
db: PgPool,
session_expired: Arc<AtomicBool>,
}
impl SessionController {
pub fn new(
initial: Option<String>,
cookie_jar: Arc<reqwest::cookie::Jar>,
cookie_domain: Option<String>,
start_url: Option<String>,
db: PgPool,
session_expired: Arc<AtomicBool>,
) -> Arc<Self> {
Arc::new(Self {
phpsessid: RwLock::new(initial),
cookie_jar,
cookie_domain,
start_url,
db,
session_expired,
})
}
/// The PHPSESSID a fresh browser should inject (None when unset).
pub async fn current(&self) -> Option<String> {
self.phpsessid.read().await.clone()
}
/// Whether the sticky session-expired flag is set (chapter workers
/// idle while true).
pub fn is_expired(&self) -> bool {
self.session_expired.load(Ordering::Acquire)
}
/// Clear the session-expired flag without changing the cookie — used
/// when the operator knows the session is fine and wants workers to
/// resume immediately.
pub fn clear_expired(&self) {
self.session_expired.store(false, Ordering::Release);
}
/// Update the session everywhere: reqwest jar, in-memory value, and
/// persisted `crawler_state`. Clears the session-expired flag. Does
/// NOT relaunch the browser — the caller triggers a coordinated
/// restart so `on_launch` re-injects + re-probes.
pub async fn update(&self, sid: &str) -> anyhow::Result<()> {
let sid = sid.trim().to_string();
anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty");
if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) {
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
let seed_url =
reqwest::Url::parse(start_url).context("parse start_url for cookie seed")?;
self.cookie_jar.add_cookie_str(&cookie_str, &seed_url);
}
*self.phpsessid.write().await = Some(sid.clone());
persist(&self.db, &sid).await.context("persist runtime session")?;
self.session_expired.store(false, Ordering::Release);
tracing::info!("crawler session updated at runtime");
Ok(())
}
/// Read a persisted runtime session (if any) from `crawler_state`.
/// Called at startup so a mid-day refresh survives a restart.
pub async fn load_persisted(db: &PgPool) -> Option<String> {
let row: Option<serde_json::Value> =
sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1")
.bind(STATE_KEY_RUNTIME_SESSION)
.fetch_optional(db)
.await
.ok()
.flatten();
row.and_then(|v| {
v.get("phpsessid")
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
}
}
async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> {
sqlx::query(
"INSERT INTO crawler_state (key, value, updated_at) \
VALUES ($1, $2, now()) \
ON CONFLICT (key) DO UPDATE \
SET value = EXCLUDED.value, updated_at = now()",
)
.bind(STATE_KEY_RUNTIME_SESSION)
.bind(json!({ "phpsessid": sid }))
.execute(db)
.await?;
Ok(())
}

View File

@@ -0,0 +1,162 @@
//! Live, in-process crawler status.
//!
//! The metadata pass runs inline in the cron tick (it is not a
//! `crawler_jobs` row), so without this surface "what is the crawler doing
//! right now" is unanswerable from the dashboard. The daemon publishes its
//! current [`Phase`] and per-worker activity into a shared
//! [`StatusHandle`]; the admin endpoint reads a [`CrawlerStatus`] snapshot
//! and composes it with DB-derived queue counts + the session/browser
//! flags.
//!
//! NOTE: this is per-process state. The deployment is a single server
//! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals
//! (last-pass summary, runtime session) are persisted in `crawler_state`.
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::Serialize;
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::crawler::pipeline::MetadataStats;
/// What the daemon is doing right now. Serialised with an internal `state`
/// tag so the frontend can switch on it.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum Phase {
/// Sleeping until the next scheduled metadata pass.
Idle { next_fire: Option<DateTime<Utc>> },
/// Walking the source catalog list pages.
WalkingList,
/// Fetching one manga's metadata. `index`/`total` drive a progress bar
/// (`total` is `None` when the source size is unknown / uncapped).
FetchingMetadata {
index: usize,
total: Option<usize>,
title: String,
},
/// Backfilling covers that failed on first attempt.
CoverBackfill,
}
/// Per-worker activity. (Browser-restart and session-expired states are
/// surfaced as separate top-level fields by the endpoint, sourced from the
/// `BrowserManager` phase and the session flag respectively.)
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum WorkerState {
Idle,
Working { chapter_id: Uuid },
}
/// Summary of the most recent metadata pass (persisted across restarts in
/// `crawler_state` by the cron; mirrored here for the live read).
#[derive(Clone, Debug, Serialize, Default)]
pub struct LastPass {
pub at: Option<DateTime<Utc>>,
pub discovered: usize,
pub upserted: usize,
pub covers_fetched: usize,
pub mangas_failed: usize,
}
/// The mutable slice of status the daemon owns. Session/browser/queue are
/// composed at read time by the endpoint (they live elsewhere), so they
/// are not stored here.
#[derive(Clone, Debug, Serialize)]
pub struct CrawlerStatus {
pub phase: Phase,
pub workers: Vec<WorkerState>,
pub last_pass: LastPass,
}
impl CrawlerStatus {
fn new(num_workers: usize) -> Self {
Self {
phase: Phase::Idle { next_fire: None },
workers: (0..num_workers.max(1)).map(|_| WorkerState::Idle).collect(),
last_pass: LastPass::default(),
}
}
}
/// Cloneable handle the daemon tasks use to publish status. Cheap to clone
/// (`Arc`). All writers funnel through the helper methods so locking stays
/// localised.
#[derive(Clone)]
pub struct StatusHandle(Arc<RwLock<CrawlerStatus>>);
impl StatusHandle {
pub fn new(num_workers: usize) -> Self {
Self(Arc::new(RwLock::new(CrawlerStatus::new(num_workers))))
}
pub async fn set_phase(&self, phase: Phase) {
self.0.write().await.phase = phase;
}
pub async fn set_worker(&self, id: usize, state: WorkerState) {
let mut s = self.0.write().await;
if let Some(slot) = s.workers.get_mut(id) {
*slot = state;
}
}
/// Record a finished metadata pass. Stamps `at` with `now`.
pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime<Utc>) {
let mut s = self.0.write().await;
s.last_pass = LastPass {
at: Some(at),
discovered: stats.discovered,
upserted: stats.upserted,
covers_fetched: stats.covers_fetched,
mangas_failed: stats.mangas_failed,
};
}
/// Seed the last-pass summary from a persisted `crawler_state` value on
/// startup so the dashboard isn't blank until the first tick.
pub async fn set_last_pass(&self, last: LastPass) {
self.0.write().await.last_pass = last;
}
pub async fn snapshot(&self) -> CrawlerStatus {
self.0.read().await.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn set_worker_updates_only_that_slot() {
let h = StatusHandle::new(2);
let cid = Uuid::new_v4();
h.set_worker(1, WorkerState::Working { chapter_id: cid }).await;
let snap = h.snapshot().await;
assert!(matches!(snap.workers[0], WorkerState::Idle));
assert!(matches!(snap.workers[1], WorkerState::Working { .. }));
// Out-of-range id is a no-op, not a panic.
h.set_worker(99, WorkerState::Idle).await;
}
#[tokio::test]
async fn record_pass_captures_stats_and_timestamp() {
let h = StatusHandle::new(1);
let stats = MetadataStats {
discovered: 5,
upserted: 3,
covers_fetched: 2,
mangas_failed: 1,
};
let at = Utc::now();
h.record_pass(&stats, at).await;
let snap = h.snapshot().await;
assert_eq!(snap.last_pass.discovered, 5);
assert_eq!(snap.last_pass.upserted, 3);
assert_eq!(snap.last_pass.at, Some(at));
}
}

View File

@@ -17,8 +17,9 @@
//! Each public function is a transaction boundary so a partial failure
//! mid-call leaves the DB in its pre-call state.
use chrono::Utc;
use sqlx::{PgPool, Postgres, Transaction};
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::{FromRow, PgPool, Postgres, Transaction};
use uuid::Uuid;
use crate::crawler::source::{SourceChapterRef, SourceManga};
@@ -618,3 +619,169 @@ pub async fn last_run_completed_cleanly(
.unwrap_or(true))
}
// ---------------------------------------------------------------------------
// Dead-letter jobs: admin observability + requeue.
// ---------------------------------------------------------------------------
/// A `dead` crawler job joined to its chapter/manga context for the admin
/// dead-letter view. Chapter columns are `Option` because the join is
/// best-effort (the chapter may have been removed since the job died, or
/// the job may be a non-chapter kind).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct DeadJob {
pub id: Uuid,
pub kind: String,
pub chapter_id: Option<Uuid>,
pub manga_id: Option<Uuid>,
pub manga_title: Option<String>,
pub chapter_number: Option<i32>,
pub attempts: i32,
pub max_attempts: i32,
pub last_error: Option<String>,
pub updated_at: DateTime<Utc>,
}
/// Paginated list of `dead` jobs, newest-failed first, joined to chapter +
/// manga context. `search` filters on manga title (case-insensitive
/// substring). Returns the page slice plus the unfiltered-by-page total.
pub async fn list_dead_jobs(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<DeadJob>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<DeadJob> = sqlx::query_as(
r#"
SELECT
cj.id,
cj.payload->>'kind' AS kind,
(cj.payload->>'chapter_id')::uuid AS chapter_id,
c.manga_id AS manga_id,
m.title AS manga_title,
c.number AS chapter_number,
cj.attempts,
cj.max_attempts,
cj.last_error,
cj.updated_at
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state = 'dead'
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY cj.updated_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state = 'dead'
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// Scope of a dead-job requeue.
#[derive(Debug, Clone)]
pub enum RequeueScope {
/// Every dead job.
All,
/// Dead jobs whose chapter belongs to this manga.
Manga(Uuid),
/// A single dead job by its id.
Job(Uuid),
}
/// Requeue dead jobs back to `pending` with a fresh attempt budget. This is
/// an explicit operator override, so it bypasses the dead-letter quarantine
/// the enqueue helpers honour (we act directly on the row). Skips any dead
/// job whose chapter already has a `pending`/`running` job so the partial
/// dedup index is never violated. Returns the number of rows requeued.
pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result<u64> {
// Guard against resurrecting a dead job when a live one already covers
// the same chapter (would otherwise hit the dedup unique index).
const NO_LIVE_DUP: &str = r#"
AND NOT EXISTS (
SELECT 1 FROM crawler_jobs live
WHERE live.payload->>'kind' = 'sync_chapter_content'
AND live.payload->>'chapter_id' = crawler_jobs.payload->>'chapter_id'
AND live.state IN ('pending','running')
)
"#;
const SET: &str = "SET state = 'pending', attempts = 0, leased_until = NULL, \
last_error = NULL, scheduled_at = now(), updated_at = now()";
let affected = match scope {
RequeueScope::All => {
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} WHERE state = 'dead' {NO_LIVE_DUP}"
))
.execute(pool)
.await?
.rows_affected()
}
RequeueScope::Manga(manga_id) => {
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} \
WHERE state = 'dead' \
AND (payload->>'chapter_id')::uuid IN \
(SELECT id FROM chapters WHERE manga_id = $1) \
{NO_LIVE_DUP}"
))
.bind(manga_id)
.execute(pool)
.await?
.rows_affected()
}
RequeueScope::Job(job_id) => {
sqlx::query(&format!(
"UPDATE crawler_jobs {SET} WHERE state = 'dead' AND id = $1 {NO_LIVE_DUP}"
))
.bind(job_id)
.execute(pool)
.await?
.rows_affected()
}
};
Ok(affected)
}
/// Count crawler jobs grouped by state — drives the dashboard queue
/// gauges. Returns `(pending, running, dead)`.
pub async fn job_state_counts(pool: &PgPool) -> sqlx::Result<(i64, i64, i64)> {
let rows: Vec<(String, i64)> =
sqlx::query_as("SELECT state, COUNT(*) FROM crawler_jobs GROUP BY state")
.fetch_all(pool)
.await?;
let mut pending = 0;
let mut running = 0;
let mut dead = 0;
for (state, n) in rows {
match state.as_str() {
"pending" => pending = n,
"running" => running = n,
"dead" => dead = n,
_ => {}
}
}
Ok((pending, running, dead))
}