//! Persistent job queue and the four job kinds. //! //! Backed by Postgres (the `crawler_jobs` table). Workers lease rows //! with `SELECT ... FOR UPDATE SKIP LOCKED`, heartbeat via //! `leased_until`, and ack by transitioning to `done` (or backoff / //! `dead`). Handlers are idempotent so a crash mid-run is recoverable //! by replay. use std::time::Duration; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; use super::source::DiscoverMode; #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum JobPayload { /// Walk the source index and enqueue `SyncManga` jobs. Discover { source_id: String, mode: DiscoverMode, }, /// Fetch one manga's detail page, upsert metadata, enqueue /// `SyncChapterList`. SyncManga { source_id: String, source_manga_key: String, }, /// Diff the chapter list, enqueue `SyncChapterContent` for new /// chapters, soft-drop vanished ones. SyncChapterList { source_id: String, manga_id: Uuid, source_manga_key: String, }, /// Download a single chapter's page images into storage. SyncChapterContent { source_id: String, chapter_id: Uuid, source_chapter_key: String, }, } #[derive(Clone, Copy, Debug, sqlx::Type, Serialize, Deserialize)] #[sqlx(type_name = "text", rename_all = "snake_case")] #[serde(rename_all = "snake_case")] pub enum JobState { Pending, Running, Done, Failed, Dead, } /// Kind discriminator stored in `payload->>'kind'`. Public so callers /// (daemon worker, bookmark hook) can filter `lease()` to a single kind /// without re-spelling the literal. pub const KIND_SYNC_CHAPTER_CONTENT: &str = "sync_chapter_content"; #[derive(Debug)] pub enum EnqueueResult { Inserted(Uuid), Skipped, } #[derive(Debug, Clone)] pub struct Lease { pub id: Uuid, pub payload: JobPayload, pub attempts: i32, pub max_attempts: i32, } /// Exponential backoff for `ack_failed` retries. `attempts` is the /// post-increment value reported by `lease()` (so the first failure has /// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to /// avoid runaway long sleeps that would outlive the daemon process. fn backoff_for(attempts: i32) -> Duration { let shift = attempts.saturating_sub(1).clamp(0, 20) as u32; let secs = 60u64.saturating_mul(1u64 << shift); Duration::from_secs(secs.min(3600)) } /// Insert a new pending job. For `SyncChapterContent` payloads the /// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks /// a second `(pending|running)` insert per chapter_id, returning /// `Skipped`. The slot frees again once the previous job leaves the /// in-flight states (done/failed/dead), so a re-enqueue after a force /// refetch succeeds. pub async fn enqueue(pool: &PgPool, payload: &JobPayload) -> sqlx::Result { let json = serde_json::to_value(payload).expect("JobPayload is always serializable"); let id: Option = sqlx::query_scalar( "INSERT INTO crawler_jobs (payload) VALUES ($1) \ ON CONFLICT DO NOTHING RETURNING id", ) .bind(json) .fetch_optional(pool) .await?; Ok(match id { Some(id) => EnqueueResult::Inserted(id), None => EnqueueResult::Skipped, }) } /// Lease up to `max` rows whose `state` is `pending`, or `running` with /// an expired `leased_until` (the crashed-worker recovery path). The /// inner CTE uses `FOR UPDATE SKIP LOCKED` so concurrent leasers don't /// block each other and each row is handed to exactly one worker. /// /// `kind_filter` matches against `payload->>'kind'`; `None` means /// any kind. pub async fn lease( pool: &PgPool, kind_filter: Option<&str>, max: i64, lease_duration: Duration, ) -> sqlx::Result> { let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64; let rows: Vec<(Uuid, serde_json::Value, i32, i32)> = sqlx::query_as( r#" WITH leased AS ( SELECT id FROM crawler_jobs WHERE (state = 'pending' OR (state = 'running' AND leased_until < now())) AND scheduled_at <= now() AND ($1::text IS NULL OR payload->>'kind' = $1) ORDER BY scheduled_at LIMIT $2 FOR UPDATE SKIP LOCKED ) UPDATE crawler_jobs j SET state = 'running', attempts = j.attempts + 1, leased_until = now() + ($3::bigint || ' milliseconds')::interval, updated_at = now() FROM leased l WHERE j.id = l.id RETURNING j.id, j.payload, j.attempts, j.max_attempts "#, ) .bind(kind_filter) .bind(max) .bind(lease_ms) .fetch_all(pool) .await?; let mut leases = Vec::with_capacity(rows.len()); for (id, payload_json, attempts, max_attempts) in rows { let payload: JobPayload = serde_json::from_value(payload_json).map_err(|e| { sqlx::Error::Decode(format!("invalid JobPayload JSON for job {id}: {e}").into()) })?; leases.push(Lease { id, payload, attempts, max_attempts, }); } Ok(leases) } /// Mark a leased job as successfully completed. pub async fn ack_done(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> { sqlx::query( "UPDATE crawler_jobs \ SET state = 'done', leased_until = NULL, updated_at = now() \ WHERE id = $1", ) .bind(lease_id) .execute(pool) .await?; Ok(()) } /// Mark a leased job as failed. If the current attempt count has reached /// `max_attempts` the job is terminally dead and stops retrying; /// otherwise it goes back to `pending` with `scheduled_at` pushed into /// the future by the exponential backoff. pub async fn ack_failed( pool: &PgPool, lease_id: Uuid, error: &str, attempts: i32, max_attempts: i32, ) -> sqlx::Result<()> { if attempts >= max_attempts { sqlx::query( "UPDATE crawler_jobs \ SET state = 'dead', last_error = $2, leased_until = NULL, updated_at = now() \ WHERE id = $1", ) .bind(lease_id) .bind(error) .execute(pool) .await?; } else { let backoff_ms: i64 = backoff_for(attempts).as_millis().min(i64::MAX as u128) as i64; sqlx::query( "UPDATE crawler_jobs \ SET state = 'pending', last_error = $2, leased_until = NULL, \ scheduled_at = now() + ($3::bigint || ' milliseconds')::interval, \ updated_at = now() \ WHERE id = $1", ) .bind(lease_id) .bind(error) .bind(backoff_ms) .execute(pool) .await?; } Ok(()) } /// Return a leased job to `pending` without burning a retry attempt. /// Used on graceful shutdown and on session-expired aborts where the /// failure isn't the job's fault. pub async fn release(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> { sqlx::query( "UPDATE crawler_jobs \ SET state = 'pending', leased_until = NULL, \ attempts = GREATEST(0, attempts - 1), updated_at = now() \ WHERE id = $1", ) .bind(lease_id) .execute(pool) .await?; Ok(()) } /// Delete `done` jobs whose `updated_at` is older than `retention_days` /// days. `0` disables the reaper without touching the table. Returns the /// number of rows removed. pub async fn reap_done(pool: &PgPool, retention_days: u32) -> sqlx::Result { if retention_days == 0 { return Ok(0); } let result = sqlx::query( "DELETE FROM crawler_jobs \ WHERE state = 'done' \ AND updated_at < now() - ($1::bigint || ' days')::interval", ) .bind(retention_days as i64) .execute(pool) .await?; Ok(result.rows_affected()) } #[cfg(test)] mod tests { use super::*; #[test] fn backoff_grows_exponentially_and_caps_at_one_hour() { // attempts == 1 → 60s, doubling each step. assert_eq!(backoff_for(1), Duration::from_secs(60)); assert_eq!(backoff_for(2), Duration::from_secs(120)); assert_eq!(backoff_for(3), Duration::from_secs(240)); assert_eq!(backoff_for(4), Duration::from_secs(480)); assert_eq!(backoff_for(5), Duration::from_secs(960)); assert_eq!(backoff_for(6), Duration::from_secs(1920)); // 7th: 60 * 64 = 3840 → capped to 3600. assert_eq!(backoff_for(7), Duration::from_secs(3600)); assert_eq!(backoff_for(20), Duration::from_secs(3600)); // Garbage / zero / negatives stay sane. assert_eq!(backoff_for(0), Duration::from_secs(60)); assert_eq!(backoff_for(-5), Duration::from_secs(60)); } }