//! Admin-facing read queries that join manga/chapter with the crawler //! signals (`manga_sources`, `chapter_sources`, `crawler_jobs`) to //! derive a sync state per row at query time. //! //! Priority order for `MangaSyncState`: //! 1. `InProgress` — any pending/running `sync_manga` or //! `sync_chapter_list` job matches this manga. //! 2. `Dropped` — manga has source rows AND every one of them is //! `dropped_at IS NOT NULL`. //! 3. `Synced` — default (includes user-uploaded mangas with no //! `manga_sources` rows at all). //! //! Priority order for `ChapterSyncState`: //! 1. `Downloading` — pending/running `sync_chapter_content` for this id //! 2. `Dropped` — chapter has source rows AND all are dropped //! 3. `Failed` — `page_count = 0` AND a `dead` `sync_chapter_content` //! row exists for this chapter. Constrained to `page_count = 0` //! because once pages are on disk the chapter IS synced — a //! historical dead job (likely from a re-download attempt that //! crashed) is noise that gets reaped after retention. Surfacing //! "Failed" when content is present would contradict //! `ChapterSyncState::Synced`'s "downloaded at some point" contract. //! 4. `NotDownloaded` — `page_count = 0`, no in-flight, no dead job //! 5. `Synced` — `page_count > 0` //! //! Reminder: `done` jobs are reaped after `CRAWLER_JOB_RETENTION_DAYS`, //! so `chapters.page_count > 0` is the durable "this is synced" signal, //! not the job table. use chrono::{DateTime, Utc}; use serde::Serialize; use sqlx::{FromRow, PgPool}; use uuid::Uuid; use crate::domain::{ChapterSyncState, MangaSyncState}; use crate::error::AppResult; #[derive(Debug, Serialize, FromRow)] pub struct AdminMangaRow { pub id: Uuid, pub title: String, pub status: String, pub cover_image_path: Option, pub created_at: DateTime, pub updated_at: DateTime, pub sync_state: MangaSyncState, pub chapter_count: i64, pub latest_seen_at: Option>, } #[derive(Debug, Default)] pub struct ListAdminMangasQuery { pub search: Option, pub sync_state: Option, pub limit: i64, pub offset: i64, } const MANGA_SYNC_STATE_CASE: &str = r#" CASE WHEN EXISTS ( SELECT 1 FROM crawler_jobs cj WHERE cj.state IN ('pending','running') AND ( (cj.payload->>'kind' = 'sync_chapter_list' AND (cj.payload->>'manga_id')::uuid = m.id) OR (cj.payload->>'kind' = 'sync_manga' AND EXISTS ( SELECT 1 FROM manga_sources ms WHERE ms.manga_id = m.id AND ms.source_id = cj.payload->>'source_id' AND ms.source_manga_key = cj.payload->>'source_manga_key' )) ) ) THEN 'in_progress' WHEN EXISTS (SELECT 1 FROM manga_sources ms WHERE ms.manga_id = m.id) AND NOT EXISTS ( SELECT 1 FROM manga_sources ms WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL ) THEN 'dropped' ELSE 'synced' END "#; /// Paginated admin manga list with derived sync state and total count. /// Filters by `search` (substring on title, case-insensitive) and /// `sync_state` (post-derivation). The CTE keeps the case expression /// in one place — the same projection feeds both the page rows and the /// totals count under the same filter. pub async fn list_mangas_with_sync_state( pool: &PgPool, q: &ListAdminMangasQuery, ) -> AppResult<(Vec, i64)> { let search_pat = q .search .as_ref() .map(|s| format!("%{}%", s.trim())) .filter(|p| p.len() > 2); // sqlx::Type → text: bind the snake_case representation manually so // the SQL can compare it as text without an explicit cast. let sync_filter = q.sync_state.map(|s| match s { MangaSyncState::InProgress => "in_progress", MangaSyncState::Dropped => "dropped", MangaSyncState::Synced => "synced", }); let sql = format!( r#" WITH classified AS ( SELECT m.id, m.title, m.status, m.cover_image_path, m.created_at, m.updated_at, {case} AS sync_state, (SELECT COUNT(*) FROM chapters c WHERE c.manga_id = m.id) AS chapter_count, (SELECT MAX(last_seen_at) FROM manga_sources ms WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL) AS latest_seen_at FROM mangas m WHERE ($1::text IS NULL OR m.title ILIKE $1) ) SELECT * FROM classified WHERE ($2::text IS NULL OR sync_state = $2) ORDER BY updated_at DESC LIMIT $3 OFFSET $4 "#, case = MANGA_SYNC_STATE_CASE ); let items: Vec = sqlx::query_as(&sql) .bind(&search_pat) .bind(sync_filter) .bind(q.limit) .bind(q.offset) .fetch_all(pool) .await?; let total_sql = format!( r#" WITH classified AS ( SELECT {case} AS sync_state FROM mangas m WHERE ($1::text IS NULL OR m.title ILIKE $1) ) SELECT COUNT(*) FROM classified WHERE ($2::text IS NULL OR sync_state = $2) "#, case = MANGA_SYNC_STATE_CASE ); let total: i64 = sqlx::query_scalar(&total_sql) .bind(&search_pat) .bind(sync_filter) .fetch_one(pool) .await?; Ok((items, total)) } #[derive(Debug, Serialize, FromRow)] pub struct AdminChapterRow { pub id: Uuid, pub manga_id: Uuid, pub number: i32, pub title: Option, pub page_count: i32, pub created_at: DateTime, pub sync_state: ChapterSyncState, pub latest_seen_at: Option>, } #[derive(Debug, Default)] pub struct ListAdminChaptersQuery { pub manga_id: Uuid, pub limit: i64, pub offset: i64, } /// Paginated chapter list with derived sync state. Pagination is non- /// optional — long-runners can have thousands of chapters and the /// per-row scalar subqueries make the unbounded variant a real /// stall risk even behind an admin guard. Returns the page slice plus /// the unfiltered total so the UI can render "showing N of M". pub async fn list_chapters_with_sync_state( pool: &PgPool, q: &ListAdminChaptersQuery, ) -> AppResult<(Vec, i64)> { let items: Vec = sqlx::query_as( r#" SELECT c.id, c.manga_id, c.number, c.title, c.page_count, c.created_at, CASE WHEN EXISTS ( SELECT 1 FROM crawler_jobs cj WHERE cj.state IN ('pending','running') AND cj.payload->>'kind' = 'sync_chapter_content' AND (cj.payload->>'chapter_id')::uuid = c.id ) THEN 'downloading' WHEN EXISTS (SELECT 1 FROM chapter_sources cs WHERE cs.chapter_id = c.id) AND NOT EXISTS ( SELECT 1 FROM chapter_sources cs WHERE cs.chapter_id = c.id AND cs.dropped_at IS NULL ) THEN 'dropped' WHEN c.page_count = 0 AND EXISTS ( SELECT 1 FROM crawler_jobs cj WHERE cj.state = 'dead' AND cj.payload->>'kind' = 'sync_chapter_content' AND (cj.payload->>'chapter_id')::uuid = c.id ) THEN 'failed' WHEN c.page_count = 0 THEN 'not_downloaded' ELSE 'synced' END AS sync_state, (SELECT MAX(last_seen_at) FROM chapter_sources cs WHERE cs.chapter_id = c.id AND cs.dropped_at IS NULL) AS latest_seen_at FROM chapters c WHERE c.manga_id = $1 ORDER BY c.number ASC LIMIT $2 OFFSET $3 "#, ) .bind(q.manga_id) .bind(q.limit) .bind(q.offset) .fetch_all(pool) .await?; let total: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM chapters WHERE manga_id = $1") .bind(q.manga_id) .fetch_one(pool) .await?; Ok((items, total)) }