//! Persistence for crawled mangas. //! //! High-level operations: //! - [`ensure_source`]: idempotent registration of a source row. //! - [`upsert_manga_from_source`]: end-to-end "I saw this manga" — //! creates or updates the `mangas` row, threads `manga_sources`, and //! refreshes authors/genres/tags. Returns whether the manga is new, //! updated (metadata_hash changed), or unchanged. //! - [`sync_manga_chapters`]: per-manga chapter reconciliation. Adds //! new ones, refreshes URLs on existing ones, soft-drops vanished. //! - [`mark_run_started`] / [`mark_run_completed`] / //! [`last_run_completed_cleanly`]: per-source recovery flag in //! `crawler_state`. A `false` flag on tick start means the previous //! run did not exit cleanly and the next walk should ignore the //! early-stop condition. //! //! Each public function is a transaction boundary so a partial failure //! mid-call leaves the DB in its pre-call state. use chrono::{DateTime, Utc}; use serde::Serialize; use sqlx::{FromRow, PgPool, Postgres, Transaction}; use uuid::Uuid; use crate::crawler::source::{SourceChapterRef, SourceManga}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum UpsertStatus { New, Updated, Unchanged, } #[derive(Debug, Clone)] pub struct UpsertedManga { pub manga_id: Uuid, pub status: UpsertStatus, /// Current value of `mangas.cover_image_path` after the upsert. /// `None` means the cover hasn't been downloaded yet — the caller /// uses this to backfill covers for mangas that were synced before /// cover-download support existed. pub cover_image_path: Option, } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] pub struct ChapterDiff { pub new: usize, pub refreshed: usize, pub dropped: usize, } pub async fn ensure_source( pool: &PgPool, id: &str, name: &str, base_url: &str, ) -> sqlx::Result<()> { sqlx::query( r#" INSERT INTO sources (id, name, base_url, enabled) VALUES ($1, $2, $3, true) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, base_url = EXCLUDED.base_url "#, ) .bind(id) .bind(name) .bind(base_url) .execute(pool) .await?; Ok(()) } pub async fn upsert_manga_from_source( pool: &PgPool, source_id: &str, source_url: &str, sm: &SourceManga, ) -> sqlx::Result { let mut tx = pool.begin().await?; let existing: Option<(Uuid, Option)> = sqlx::query_as( r#" SELECT manga_id, metadata_hash FROM manga_sources WHERE source_id = $1 AND source_manga_key = $2 "#, ) .bind(source_id) .bind(&sm.source_manga_key) .fetch_optional(&mut *tx) .await?; let status_db = sm.status.as_deref().unwrap_or("ongoing"); // Note: `cover_image_path` is intentionally not written here. // The repo layer doesn't know about the storage backend, so the // caller (crawler binary) downloads the cover via the `Storage` // trait and sets the path with `repo::manga::set_cover_image_path` // once the bytes have landed. let (manga_id, status) = match existing { None => { let (id,): (Uuid,) = sqlx::query_as( r#" INSERT INTO mangas (title, description, status, alt_titles) VALUES ($1, $2, $3, $4) RETURNING id "#, ) .bind(&sm.title) .bind(sm.summary.as_deref()) .bind(status_db) .bind(&sm.alternative_titles) .fetch_one(&mut *tx) .await?; (id, UpsertStatus::New) } Some((id, prev_hash)) if prev_hash.as_deref() == Some(&sm.metadata_hash) => { (id, UpsertStatus::Unchanged) } Some((id, _)) => { sqlx::query( r#" UPDATE mangas SET title = $1, description = $2, status = $3, alt_titles = $4, updated_at = NOW() WHERE id = $5 "#, ) .bind(&sm.title) .bind(sm.summary.as_deref()) .bind(status_db) .bind(&sm.alternative_titles) .bind(id) .execute(&mut *tx) .await?; (id, UpsertStatus::Updated) } }; sqlx::query( r#" INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url, metadata_hash, last_seen_at, dropped_at) VALUES ($1, $2, $3, $4, $5, NOW(), NULL) ON CONFLICT (source_id, source_manga_key) DO UPDATE SET source_url = EXCLUDED.source_url, metadata_hash = EXCLUDED.metadata_hash, last_seen_at = NOW(), dropped_at = NULL "#, ) .bind(source_id) .bind(&sm.source_manga_key) .bind(manga_id) .bind(source_url) .bind(&sm.metadata_hash) .execute(&mut *tx) .await?; sync_authors(&mut tx, manga_id, &sm.authors).await?; sync_genres(&mut tx, manga_id, &sm.genres).await?; sync_tags(&mut tx, manga_id, &sm.tags).await?; let cover_image_path: Option = sqlx::query_scalar("SELECT cover_image_path FROM mangas WHERE id = $1") .bind(manga_id) .fetch_one(&mut *tx) .await?; tx.commit().await?; Ok(UpsertedManga { manga_id, status, cover_image_path, }) } async fn sync_authors( tx: &mut Transaction<'_, Postgres>, manga_id: Uuid, authors: &[String], ) -> sqlx::Result<()> { sqlx::query("DELETE FROM manga_authors WHERE manga_id = $1") .bind(manga_id) .execute(&mut **tx) .await?; for (i, name) in authors.iter().enumerate() { let trimmed = name.trim(); if trimmed.is_empty() { continue; } // Self-update on conflict so the row id is always returned — // can't use DO NOTHING because that suppresses RETURNING. let (author_id,): (Uuid,) = sqlx::query_as( r#" INSERT INTO authors (name) VALUES ($1) ON CONFLICT (lower(name)) DO UPDATE SET name = authors.name RETURNING id "#, ) .bind(trimmed) .fetch_one(&mut **tx) .await?; sqlx::query( r#" INSERT INTO manga_authors (manga_id, author_id, position) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING "#, ) .bind(manga_id) .bind(author_id) .bind(i as i32) .execute(&mut **tx) .await?; } Ok(()) } async fn sync_genres( tx: &mut Transaction<'_, Postgres>, manga_id: Uuid, genres: &[String], ) -> sqlx::Result<()> { sqlx::query("DELETE FROM manga_genres WHERE manga_id = $1") .bind(manga_id) .execute(&mut **tx) .await?; for name in genres { let trimmed = name.trim(); if trimmed.is_empty() { continue; } // Case-insensitive lookup so a source-supplied "action" // attaches to the seeded "Action" rather than creating a // second row. let existing: Option<(Uuid,)> = sqlx::query_as("SELECT id FROM genres WHERE lower(name) = lower($1)") .bind(trimmed) .fetch_optional(&mut **tx) .await?; let genre_id = match existing { Some((id,)) => id, None => { let (id,): (Uuid,) = sqlx::query_as( r#" INSERT INTO genres (name) VALUES ($1) ON CONFLICT (name) DO UPDATE SET name = genres.name RETURNING id "#, ) .bind(trimmed) .fetch_one(&mut **tx) .await?; tracing::info!(genre = trimmed, "added new genre from source"); id } }; sqlx::query( "INSERT INTO manga_genres (manga_id, genre_id) VALUES ($1, $2) ON CONFLICT DO NOTHING", ) .bind(manga_id) .bind(genre_id) .execute(&mut **tx) .await?; } Ok(()) } async fn sync_tags( tx: &mut Transaction<'_, Postgres>, manga_id: Uuid, tags: &[String], ) -> sqlx::Result<()> { // Only clear crawler-owned attachments (added_by IS NULL). User- // attached tags are owned by the attaching user and must survive // the recurring metadata pass — see manga_tags.added_by in // migration 0009. // // Note on orphans: `manga_tags.added_by` is `ON DELETE SET NULL`, // so an attachment whose user was deleted becomes // indistinguishable from a crawler-owned row and is cleaned up // here. That mirrors how `api::mangas::detach_tag` already treats // orphans ("nobody owns it, refuse to let anyone but admin clear // them") — the crawler now becomes the eventual reaper. Tracked // by `sync_tags_garbage_collects_orphan_user_attachments` in // backend/tests/crawler_sync.rs. sqlx::query("DELETE FROM manga_tags WHERE manga_id = $1 AND added_by IS NULL") .bind(manga_id) .execute(&mut **tx) .await?; for name in tags { let trimmed = name.trim(); if trimmed.is_empty() { continue; } let (tag_id,): (Uuid,) = sqlx::query_as( r#" INSERT INTO tags (name) VALUES ($1) ON CONFLICT (lower(name)) DO UPDATE SET name = tags.name RETURNING id "#, ) .bind(trimmed) .fetch_one(&mut **tx) .await?; sqlx::query( r#" INSERT INTO manga_tags (manga_id, tag_id, added_by) VALUES ($1, $2, NULL) ON CONFLICT DO NOTHING "#, ) .bind(manga_id) .bind(tag_id) .execute(&mut **tx) .await?; } Ok(()) } pub async fn sync_manga_chapters( pool: &PgPool, source_id: &str, manga_id: Uuid, chapters: &[SourceChapterRef], ) -> sqlx::Result { let mut tx = pool.begin().await?; // Per-manga advisory lock. Two concurrent calls for the same manga // would otherwise both read `seen_keys`, both run the drop UPDATE // filtered on `NOT (key = ANY $3)`, and the later commit could soft- // drop a chapter the earlier commit had just inserted (lost-update // shape under MVCC). `pg_advisory_xact_lock` is scoped to this // transaction: it auto-releases on COMMIT/ROLLBACK so a Rust-side // panic mid-call doesn't strand the lock. The single-arg int8 form // keyed by `hashtextextended(manga_id::text, 0)` shares Postgres' // global advisory-lock namespace with `CRON_LOCK_KEY`, but collision // is 2^-64 per pair (a UUID-derived hash hitting the fixed cron key // is effectively impossible). sqlx::query("SELECT pg_advisory_xact_lock(hashtextextended($1::text, 0))") .bind(manga_id) .execute(&mut *tx) .await?; let mut diff = ChapterDiff::default(); let seen_keys: Vec = chapters .iter() .map(|c| c.source_chapter_key.clone()) .collect(); for (idx, c) in chapters.iter().enumerate() { // `source_index` captures the chapter's position in the source // DOM (0 = first = newest on this site) so the list query can // reverse it for the user-facing list — see migration 0021. // Every sync overwrites the value on both branches, so a new // chapter inserted at the top of the source shifts every other // row down by one on the next tick. let source_index = idx as i32; // Lookup is constrained by manga_id (via the chapters join) so a // source whose chapter slugs collide across mangas (e.g. // "chapter-1" appearing under two different mangas) attributes // each row to the correct manga. Migration 0017 dropped the // (source_id, source_chapter_key) PK in favour of // (source_id, chapter_id) for exactly this reason. let existing: Option<(Uuid,)> = sqlx::query_as( "SELECT cs.chapter_id \ FROM chapter_sources cs \ JOIN chapters ch ON ch.id = cs.chapter_id \ WHERE cs.source_id = $1 \ AND cs.source_chapter_key = $2 \ AND ch.manga_id = $3", ) .bind(source_id) .bind(&c.source_chapter_key) .bind(manga_id) .fetch_optional(&mut *tx) .await?; match existing { None => { // New chapter row. As of 0013 there's no (manga_id, // number) UNIQUE, so duplicate-numbered chapters from // the source (different uploaders, notices, alt // translations) each get their own row — chapter // identity is the UUID, not the number. let (chapter_id,): (Uuid,) = sqlx::query_as( r#" INSERT INTO chapters (manga_id, number, title, page_count, source_index) VALUES ($1, $2, $3, 0, $4) RETURNING id "#, ) .bind(manga_id) .bind(c.number) .bind(c.title.as_deref()) .bind(source_index) .fetch_one(&mut *tx) .await?; sqlx::query( r#" INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url, last_seen_at, dropped_at) VALUES ($1, $2, $3, $4, NOW(), NULL) "#, ) .bind(source_id) .bind(&c.source_chapter_key) .bind(chapter_id) .bind(&c.url) .execute(&mut *tx) .await?; diff.new += 1; } Some((chapter_id,)) => { sqlx::query( "UPDATE chapters SET title = $1, source_index = $2 WHERE id = $3", ) .bind(c.title.as_deref()) .bind(source_index) .bind(chapter_id) .execute(&mut *tx) .await?; // chapter_id is now the natural per-(source, chapter) // identifier — use it directly instead of re-keying on // (source_id, source_chapter_key) which may not be unique. sqlx::query( r#" UPDATE chapter_sources SET source_url = $1, last_seen_at = NOW(), dropped_at = NULL WHERE source_id = $2 AND chapter_id = $3 "#, ) .bind(&c.url) .bind(source_id) .bind(chapter_id) .execute(&mut *tx) .await?; diff.refreshed += 1; } } } // Soft-drop any chapter previously seen from this source for this // manga that's not in the current list. let result = sqlx::query( r#" UPDATE chapter_sources cs SET dropped_at = NOW() FROM chapters ch WHERE cs.chapter_id = ch.id AND ch.manga_id = $1 AND cs.source_id = $2 AND cs.dropped_at IS NULL AND NOT (cs.source_chapter_key = ANY($3)) "#, ) .bind(manga_id) .bind(source_id) .bind(&seen_keys) .execute(&mut *tx) .await?; diff.dropped = result.rows_affected() as usize; tx.commit().await?; Ok(diff) } /// Count the chapters that the source `(source_id, source_manga_key)` /// is currently known to attach to — i.e. the number of `chapter_sources` /// rows for the manga identified by the (source_id, source_manga_key) /// pair, restricted to live (`dropped_at IS NULL`) rows. /// /// Used by the metadata pass's partial-render guard: if `fetch_manga` /// returns an empty `chapters` Vec but the source previously surfaced /// chapters here, that's most likely a chromium snapshot taken between /// the `#chapter_table` wrapper render and its rows render — the /// safest move is to skip `sync_manga_chapters` so the soft-drop /// branch doesn't flip every existing chapter to `dropped_at`. /// /// Returns `Ok(0)` when the manga is brand-new (no `manga_sources` /// row yet), which is the legitimate "this manga has no chapters yet" /// case and must NOT be flagged. pub async fn live_chapter_count_for_source_manga( pool: &PgPool, source_id: &str, source_manga_key: &str, ) -> sqlx::Result { let row: Option<(i64,)> = sqlx::query_as( "SELECT COUNT(*) \ FROM chapter_sources cs \ JOIN chapters c ON c.id = cs.chapter_id \ JOIN manga_sources ms \ ON ms.manga_id = c.manga_id \ AND ms.source_id = cs.source_id \ WHERE ms.source_id = $1 \ AND ms.source_manga_key = $2 \ AND cs.dropped_at IS NULL", ) .bind(source_id) .bind(source_manga_key) .fetch_optional(pool) .await?; Ok(row.map(|(n,)| n).unwrap_or(0)) } /// Mark a metadata pass as in-flight for `source_id`. Stamps /// `last_run_completed:` in `crawler_state` with /// `{"completed": false, "at": now}`. A crash, panic, or SIGKILL after /// this point leaves the flag at `false`, which the next tick reads as /// "previous run did not exit cleanly — walk the full catalog this /// time" (recovery sweep). pub async fn mark_run_started(pool: &PgPool, source_id: &str) -> sqlx::Result<()> { let key = format!("last_run_completed:{source_id}"); sqlx::query( "INSERT INTO crawler_state (key, value, updated_at) \ VALUES ($1, $2, now()) \ ON CONFLICT (key) DO UPDATE \ SET value = EXCLUDED.value, updated_at = now()", ) .bind(&key) .bind(serde_json::json!({ "completed": false, "at": Utc::now().to_rfc3339(), })) .execute(pool) .await?; Ok(()) } /// Mark a metadata pass as completed cleanly for `source_id`. Called /// from the same place a run decides it reached end-of-walk or hit the /// intentional stop. The next tick reads `true` and applies the normal /// stop condition. pub async fn mark_run_completed(pool: &PgPool, source_id: &str) -> sqlx::Result<()> { let key = format!("last_run_completed:{source_id}"); sqlx::query( "INSERT INTO crawler_state (key, value, updated_at) \ VALUES ($1, $2, now()) \ ON CONFLICT (key) DO UPDATE \ SET value = EXCLUDED.value, updated_at = now()", ) .bind(&key) .bind(serde_json::json!({ "completed": true, "at": Utc::now().to_rfc3339(), })) .execute(pool) .await?; Ok(()) } /// List mangas whose `cover_image_path IS NULL` but a live /// `manga_sources` row still attaches them to a source. The bounded /// result feeds the cover-backfill pass in [`crate::crawler::pipeline`]: /// each entry is one (manga, freshest source row) pair where a cover /// re-download is in order. /// /// Per-manga deduplication uses `DISTINCT ON (m.id)` keyed on the row /// with the newest `last_seen_at`, so a manga that's surfaced by /// multiple sources only produces one row (the freshest). Sort is /// stable for tests. pub async fn list_missing_covers( pool: &PgPool, max: i64, ) -> sqlx::Result> { let rows: Vec<(Uuid, String, String)> = sqlx::query_as( r#" SELECT DISTINCT ON (m.id) m.id, ms.source_manga_key, ms.source_url FROM mangas m JOIN manga_sources ms ON ms.manga_id = m.id WHERE m.cover_image_path IS NULL AND ms.dropped_at IS NULL ORDER BY m.id, ms.last_seen_at DESC LIMIT $1 "#, ) .bind(max) .fetch_all(pool) .await?; Ok(rows .into_iter() .map(|(manga_id, source_manga_key, source_url)| MissingCoverEntry { manga_id, source_manga_key, source_url, }) .collect()) } #[derive(Debug, Clone, PartialEq, Eq)] pub struct MissingCoverEntry { pub manga_id: Uuid, pub source_manga_key: String, pub source_url: String, } /// Read the recovery flag for `source_id`. A missing row OR an /// unparseable value reads as `true` ("clean") — the former covers the /// first-ever run on a virgin DB (no recovery needed), the latter /// covers forward-compat against future schema changes; both fail-safe /// toward not making an operator pay for an unnecessary full sweep. pub async fn last_run_completed_cleanly( pool: &PgPool, source_id: &str, ) -> sqlx::Result { let key = format!("last_run_completed:{source_id}"); let row: Option = sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1") .bind(&key) .fetch_optional(pool) .await?; Ok(row .and_then(|v| v.get("completed").and_then(|b| b.as_bool())) .unwrap_or(true)) } // --------------------------------------------------------------------------- // Dead-letter jobs: admin observability + requeue. // --------------------------------------------------------------------------- /// A `dead` crawler job joined to its chapter/manga context for the admin /// dead-letter view. Chapter columns are `Option` because the join is /// best-effort (the chapter may have been removed since the job died, or /// the job may be a non-chapter kind). #[derive(Debug, Clone, Serialize, FromRow)] pub struct DeadJob { pub id: Uuid, pub kind: String, pub chapter_id: Option, pub manga_id: Option, pub manga_title: Option, pub chapter_number: Option, pub attempts: i32, pub max_attempts: i32, pub last_error: Option, pub updated_at: DateTime, } /// Paginated list of `dead` jobs, newest-failed first, joined to chapter + /// manga context. `search` filters on manga title (case-insensitive /// substring). Returns the page slice plus the unfiltered-by-page total. pub async fn list_dead_jobs( pool: &PgPool, search: Option<&str>, limit: i64, offset: i64, ) -> sqlx::Result<(Vec, i64)> { let search_pat = search .map(|s| format!("%{}%", s.trim())) .filter(|p| p.len() > 2); let items: Vec = sqlx::query_as( r#" SELECT cj.id, cj.payload->>'kind' AS kind, (cj.payload->>'chapter_id')::uuid AS chapter_id, c.manga_id AS manga_id, m.title AS manga_title, c.number AS chapter_number, cj.attempts, cj.max_attempts, cj.last_error, cj.updated_at FROM crawler_jobs cj LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid LEFT JOIN mangas m ON m.id = c.manga_id WHERE cj.state = 'dead' AND ($1::text IS NULL OR m.title ILIKE $1) ORDER BY cj.updated_at DESC LIMIT $2 OFFSET $3 "#, ) .bind(&search_pat) .bind(limit) .bind(offset) .fetch_all(pool) .await?; let total: i64 = sqlx::query_scalar( r#" SELECT COUNT(*) FROM crawler_jobs cj LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid LEFT JOIN mangas m ON m.id = c.manga_id WHERE cj.state = 'dead' AND ($1::text IS NULL OR m.title ILIKE $1) "#, ) .bind(&search_pat) .fetch_one(pool) .await?; Ok((items, total)) } /// Scope of a dead-job requeue. #[derive(Debug, Clone)] pub enum RequeueScope { /// Every dead job. All, /// Dead jobs whose chapter belongs to this manga. Manga(Uuid), /// Dead jobs for a single chapter. Chapter(Uuid), /// A single dead job by its id. Job(Uuid), } /// Requeue dead jobs back to `pending` with a fresh attempt budget. This is /// an explicit operator override, so it bypasses the dead-letter quarantine /// the enqueue helpers honour (we act directly on the row). Skips any dead /// job whose chapter already has a `pending`/`running` job so the partial /// dedup index is never violated. Returns the number of rows requeued. pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result { // Guard against resurrecting a dead job when a live one already covers // the same chapter (would otherwise hit the dedup unique index). const NO_LIVE_DUP: &str = r#" AND NOT EXISTS ( SELECT 1 FROM crawler_jobs live WHERE live.payload->>'kind' = 'sync_chapter_content' AND live.payload->>'chapter_id' = crawler_jobs.payload->>'chapter_id' AND live.state IN ('pending','running') ) "#; const SET: &str = "SET state = 'pending', attempts = 0, leased_until = NULL, \ last_error = NULL, scheduled_at = now(), updated_at = now()"; let affected = match scope { RequeueScope::All => { sqlx::query(&format!( "UPDATE crawler_jobs {SET} WHERE state = 'dead' {NO_LIVE_DUP}" )) .execute(pool) .await? .rows_affected() } RequeueScope::Manga(manga_id) => { sqlx::query(&format!( "UPDATE crawler_jobs {SET} \ WHERE state = 'dead' \ AND (payload->>'chapter_id')::uuid IN \ (SELECT id FROM chapters WHERE manga_id = $1) \ {NO_LIVE_DUP}" )) .bind(manga_id) .execute(pool) .await? .rows_affected() } RequeueScope::Chapter(chapter_id) => { sqlx::query(&format!( "UPDATE crawler_jobs {SET} \ WHERE state = 'dead' \ AND (payload->>'chapter_id')::uuid = $1 \ {NO_LIVE_DUP}" )) .bind(chapter_id) .execute(pool) .await? .rows_affected() } RequeueScope::Job(job_id) => { sqlx::query(&format!( "UPDATE crawler_jobs {SET} WHERE state = 'dead' AND id = $1 {NO_LIVE_DUP}" )) .bind(job_id) .execute(pool) .await? .rows_affected() } }; Ok(affected) } /// Count crawler jobs grouped by state — drives the dashboard queue /// gauges. Returns `(pending, running, dead)`. pub async fn job_state_counts(pool: &PgPool) -> sqlx::Result<(i64, i64, i64)> { let rows: Vec<(String, i64)> = sqlx::query_as("SELECT state, COUNT(*) FROM crawler_jobs GROUP BY state") .fetch_all(pool) .await?; let mut pending = 0; let mut running = 0; let mut dead = 0; for (state, n) in rows { match state.as_str() { "pending" => pending = n, "running" => running = n, "dead" => dead = n, _ => {} } } Ok((pending, running, dead)) }