From e02d125f513cf9990cd59e1189ffa3b5a6606f1e Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Thu, 4 Jun 2026 20:41:51 +0200 Subject: [PATCH] feat(crawler): live cover + chapter-content observability with realtime page counts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends the live dashboard so an operator can see exactly what's being fetched, in realtime: - Chapters being crawled now are tracked in the status as `active_chapters` (manga title · ch.N) with a live page counter that climbs per stored page (set_chapter_pages, pushed via the existing watch→SSE). The dispatcher registers each via an RAII ChapterGuard (sync Mutex) that removes the entry on completion, panic, or timeout-drop — replacing the old per-worker slot model. - Covers: status now carries the cover being fetched now (`current_cover`, set around download_and_store_cover in both the metadata pass and backfill) and a `covers_queued` backlog count; CoverBackfill phase gains index/total. - Two paginated backlog endpoints (fetched on demand, auto-refreshed when the live counts change): GET /admin/crawler/active-jobs (which chapters of which mangas are queued/running) and GET /admin/crawler/covers (mangas missing a cover). repo: list_active_jobs, list_missing_cover_mangas, count_missing_covers. - dispatch_target now also returns manga title + chapter number. Frontend: the crawler page replaces the Workers table with an Active-chapters table (live page bars), adds a current-cover line + covers-queued figure, and two backlog sections (Queued chapters / Queued covers) with search + Pager, auto-refetched via $effect on the live counts. Tests: status guard/page + cover unit tests; repo list/count tests; endpoint tests; frontend api tests. Version 0.53.1 -> 0.54.0. Co-Authored-By: Claude Opus 4.8 --- backend/Cargo.lock | 2 +- backend/Cargo.toml | 2 +- backend/src/api/admin/crawler.rs | 72 ++++- backend/src/app.rs | 25 +- backend/src/bin/crawler.rs | 2 + backend/src/crawler/content.rs | 17 +- backend/src/crawler/daemon.rs | 18 +- backend/src/crawler/pipeline.rs | 46 ++- backend/src/crawler/resync.rs | 4 +- backend/src/crawler/status.rs | 265 ++++++++++++++---- backend/src/repo/chapter.rs | 8 +- backend/src/repo/crawler.rs | 149 ++++++++++ backend/tests/api_admin_crawler.rs | 96 +++++++ backend/tests/crawler_dead_jobs.rs | 86 ++++++ backend/tests/repo_chapter.rs | 4 +- frontend/package.json | 2 +- frontend/src/lib/api/admin.test.ts | 43 ++- frontend/src/lib/api/admin.ts | 65 ++++- .../src/routes/admin/crawler/+page.svelte | 224 +++++++++++++-- 19 files changed, 1005 insertions(+), 125 deletions(-) diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 92937d9..2028f1b 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "mangalord" -version = "0.53.1" +version = "0.54.0" dependencies = [ "anyhow", "argon2", diff --git a/backend/Cargo.toml b/backend/Cargo.toml index ecdaedd..a670052 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mangalord" -version = "0.53.1" +version = "0.54.0" edition = "2021" default-run = "mangalord" diff --git a/backend/src/api/admin/crawler.rs b/backend/src/api/admin/crawler.rs index c3e8ad4..6e3f248 100644 --- a/backend/src/api/admin/crawler.rs +++ b/backend/src/api/admin/crawler.rs @@ -21,10 +21,10 @@ use uuid::Uuid; use crate::app::{AppState, CrawlerControl}; use crate::auth::extractor::RequireAdmin; use crate::crawler::browser_manager::RestartPhase; -use crate::crawler::status::{LastPass, Phase, WorkerState}; +use crate::crawler::status::{ActiveChapter, CoverTarget, LastPass, Phase}; use crate::error::{AppError, AppResult}; use crate::repo; -use crate::repo::crawler::{DeadJob, RequeueScope}; +use crate::repo::crawler::{ActiveJob, DeadJob, MissingCoverRow, RequeueScope}; /// Backstop recompose interval for the SSE stream. Phase/worker/session /// changes push instantly via the status `watch`; this only bounds the @@ -45,6 +45,8 @@ pub fn routes() -> Router { ) .route("/admin/crawler/dead-jobs", get(list_dead_jobs)) .route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs)) + .route("/admin/crawler/active-jobs", get(list_active_jobs)) + .route("/admin/crawler/covers", get(list_covers)) } // --------------------------------------------------------------------------- @@ -71,7 +73,14 @@ struct CrawlerStatusResponse { /// `"running"` | `"disabled"`. daemon: &'static str, phase: Option, - workers: Vec, + /// Configured chapter-worker count (for "N busy / M workers"). + worker_count: usize, + /// Chapters being crawled right now, with live page counts. + active_chapters: Vec, + /// The cover being fetched right now, if any. + current_cover: Option, + /// Mangas still queued for a cover fetch. + covers_queued: i64, last_pass: LastPass, session: SessionStatus, /// `"healthy"` | `"draining"` | `"restarting"` | `"down"`. @@ -97,12 +106,16 @@ async fn compose_status(state: &AppState) -> AppResult { running, dead, }; + let covers_queued = repo::crawler::count_missing_covers(&state.db).await?; Ok(match state.crawler.as_ref() { None => CrawlerStatusResponse { daemon: "disabled", phase: None, - workers: Vec::new(), + worker_count: 0, + active_chapters: Vec::new(), + current_cover: None, + covers_queued, last_pass: LastPass::default(), session: SessionStatus { expired: false, @@ -116,7 +129,10 @@ async fn compose_status(state: &AppState) -> AppResult { CrawlerStatusResponse { daemon: "running", phase: Some(snap.phase), - workers: snap.workers, + worker_count: snap.worker_count, + active_chapters: snap.active_chapters, + current_cover: snap.current_cover, + covers_queued, last_pass: snap.last_pass, session: SessionStatus { expired: c.session.is_expired(), @@ -420,6 +436,52 @@ fn scope_label(r: &RequeueRequest) -> &'static str { } } +// --------------------------------------------------------------------------- +// Queued-chapters + queued-covers backlogs (paginated, fetched on demand) +// --------------------------------------------------------------------------- + +/// Pagination + title-search params shared by the backlog list endpoints. +#[derive(Debug, Deserialize, Default)] +struct ListParams { + #[serde(default)] + search: Option, + #[serde(default = "default_limit")] + limit: i64, + #[serde(default)] + offset: i64, +} + +async fn list_active_jobs( + State(state): State, + _admin: RequireAdmin, + Query(params): Query, +) -> AppResult>> { + let limit = params.limit.clamp(1, 200); + let offset = params.offset.max(0); + let search = params.search.filter(|s| !s.trim().is_empty()); + let (items, total) = + repo::crawler::list_active_jobs(&state.db, search.as_deref(), limit, offset).await?; + Ok(Json(crate::api::pagination::PagedResponse::with_total( + items, limit, offset, total, + ))) +} + +async fn list_covers( + State(state): State, + _admin: RequireAdmin, + Query(params): Query, +) -> AppResult>> { + let limit = params.limit.clamp(1, 200); + let offset = params.offset.max(0); + let search = params.search.filter(|s| !s.trim().is_empty()); + let (items, total) = + repo::crawler::list_missing_cover_mangas(&state.db, search.as_deref(), limit, offset) + .await?; + Ok(Json(crate::api::pagination::PagedResponse::with_total( + items, limit, offset, total, + ))) +} + // --------------------------------------------------------------------------- fn require_crawler(state: &AppState) -> Result<&std::sync::Arc, AppError> { diff --git a/backend/src/app.rs b/backend/src/app.rs index a3be869..eae348d 100644 --- a/backend/src/app.rs +++ b/backend/src/app.rs @@ -276,6 +276,7 @@ async fn spawn_crawler_daemon( transient_failures: Arc::new(AtomicU32::new(0)), restart_threshold: cfg.browser_restart_threshold, drain_deadline: cfg.job_timeout, + status: status.clone(), tor: tor.as_ref().map(Arc::clone), }); @@ -389,10 +390,8 @@ impl MetadataPass for RealMetadataPass { // errored — the early-stop walk can complete its work and bail // late, and a transient browser failure shouldn't cancel the // residual cover backlog. The backfill has its own per-call cap - // so a runaway error stream can't monopolise the tick. - self.status - .set_phase(crate::crawler::status::Phase::CoverBackfill) - .await; + // so a runaway error stream can't monopolise the tick. It sets the + // CoverBackfill{index,total} phase + current_cover per entry. match pipeline::backfill_missing_covers( &self.browser_manager, &self.db, @@ -402,6 +401,7 @@ impl MetadataPass for RealMetadataPass { pipeline::COVER_BACKFILL_DEFAULT_MAX, &self.download_allowlist, self.max_image_bytes, + Some(&self.status), self.tor.as_deref(), ) .await @@ -437,6 +437,9 @@ struct RealChapterDispatcher { restart_threshold: u32, /// How long a coordinated restart waits for in-flight leases to drain. drain_deadline: std::time::Duration, + /// Live status surface — the dispatcher registers each chapter it + /// crawls (with a realtime page count) here. + status: crate::crawler::status::StatusHandle, tor: Option>, } @@ -452,10 +455,21 @@ impl ChapterDispatcher for RealChapterDispatcher { let row = repo::chapter::dispatch_target(&self.db, chapter_id) .await .context("look up chapter for dispatch")?; - let Some((manga_id, source_url)) = row else { + let Some((manga_id, source_url, manga_title, chapter_number)) = row else { // Chapter (or its source row) is gone — ack done. return Ok(SyncOutcome::Skipped); }; + // Register the chapter as crawling now (live status). The + // guard removes it on every exit path — success, panic, or + // the worker's outer-timeout drop. + let _active = self.status.begin_chapter(crate::crawler::status::ActiveChapter { + manga_id, + manga_title, + chapter_id, + chapter_number, + pages_done: 0, + pages_total: None, + }); let lease = self.browser_manager.acquire().await?; let result = content::sync_chapter_content( &lease, @@ -470,6 +484,7 @@ impl ChapterDispatcher for RealChapterDispatcher { &self.download_allowlist, self.max_image_bytes, self.tor.as_deref(), + Some(&self.status), ) .await; drop(lease); diff --git a/backend/src/bin/crawler.rs b/backend/src/bin/crawler.rs index 730c1e9..f36b3b8 100644 --- a/backend/src/bin/crawler.rs +++ b/backend/src/bin/crawler.rs @@ -417,6 +417,8 @@ async fn sync_bookmarked_chapter_content( allowlist.as_ref(), max_image_bytes, tor.as_deref(), + // CLI one-shot — no live status surface. + None, ) .await; drop(lease); diff --git a/backend/src/crawler/content.rs b/backend/src/crawler/content.rs index 4039469..8992102 100644 --- a/backend/src/crawler/content.rs +++ b/backend/src/crawler/content.rs @@ -206,6 +206,10 @@ pub async fn sync_chapter_content( allowlist: &DownloadAllowlist, max_image_bytes: usize, tor: Option<&crate::crawler::tor::TorController>, + // Optional live-status sink for the realtime page counter. The daemon + // dispatcher passes the shared handle (the chapter has already been + // registered via `begin_chapter`); the CLI / admin resync pass `None`. + progress: Option<&crate::crawler::status::StatusHandle>, ) -> anyhow::Result { // Skip if already fetched, unless caller explicitly forces. if !force_refetch { @@ -267,8 +271,13 @@ pub async fn sync_chapter_content( // final DB commit) fails — preserving the all-or-nothing guarantee // without holding a DB transaction open across the network puts // (which matters once `Storage` is backed by S3). - let mut written_keys: Vec = Vec::with_capacity(images.len()); - let mut stored: Vec = Vec::with_capacity(images.len()); + let total = images.len(); + // Publish the now-known page total so the dashboard shows "0/N". + if let Some(p) = progress { + p.set_chapter_pages(chapter_id, 0, Some(total)); + } + let mut written_keys: Vec = Vec::with_capacity(total); + let mut stored: Vec = Vec::with_capacity(total); for img in &images { match download_and_store_page( storage, @@ -287,6 +296,10 @@ pub async fn sync_chapter_content( Ok(page) => { written_keys.push(page.storage_key.clone()); stored.push(page); + // Live page counter: push the climbing count to subscribers. + if let Some(p) = progress { + p.set_chapter_pages(chapter_id, stored.len(), Some(total)); + } } Err(e) => { cleanup_orphans(storage, &written_keys).await; diff --git a/backend/src/crawler/daemon.rs b/backend/src/crawler/daemon.rs index 2120a3f..2c808d9 100644 --- a/backend/src/crawler/daemon.rs +++ b/backend/src/crawler/daemon.rs @@ -48,7 +48,7 @@ use tokio_util::sync::CancellationToken; use crate::crawler::content::SyncOutcome; use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT}; use crate::crawler::pipeline; -use crate::crawler::status::{Phase, StatusHandle, WorkerState}; +use crate::crawler::status::{Phase, StatusHandle}; /// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a /// big-endian i64. Hardcoded so every replica agrees on the lock identity @@ -396,17 +396,10 @@ impl WorkerContext { }) }; - // Publish what this worker is doing for the live status surface. - if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload { - self.status - .set_worker( - self.id, - WorkerState::Working { - chapter_id: *chapter_id, - }, - ) - .await; - } + // The "currently crawling" chapter (with its live page count) is + // registered by the dispatcher itself (RealChapterDispatcher) so it + // carries the manga/chapter identity + page progress and is removed + // via an RAII guard on every exit path. // Outer timeout: a dispatch that exceeds `job_timeout` is acked // failed (exponential backoff) rather than wedging the worker. @@ -414,7 +407,6 @@ impl WorkerContext { .catch_unwind(); let outcome = tokio::time::timeout(self.job_timeout, dispatch).await; heartbeat.abort(); - self.status.set_worker(self.id, WorkerState::Idle).await; let outcome = match outcome { Ok(o) => o, diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index 6341c96..338359e 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -349,7 +349,14 @@ pub async fn run_metadata_pass( || matches!(upsert.status, repo::crawler::UpsertStatus::Updated); if needs_cover { if let Some(cover_url) = manga.cover_url.as_deref() { - match download_and_store_cover( + if let Some(s) = status { + s.set_current_cover(Some(crate::crawler::status::CoverTarget { + manga_id: upsert.manga_id, + manga_title: manga.title.clone(), + })) + .await; + } + let cover_result = download_and_store_cover( db, storage, http, @@ -360,8 +367,11 @@ pub async fn run_metadata_pass( allowlist, max_image_bytes, ) - .await - { + .await; + if let Some(s) = status { + s.set_current_cover(None).await; + } + match cover_result { Ok(()) => stats.covers_fetched += 1, Err(e) => tracing::warn!( manga_id = %upsert.manga_id, @@ -615,6 +625,7 @@ pub async fn backfill_missing_covers( max_mangas: usize, allowlist: &DownloadAllowlist, max_image_bytes: usize, + status: Option<&crate::crawler::status::StatusHandle>, tor: Option<&crate::crawler::tor::TorController>, ) -> anyhow::Result { let mut stats = CoverBackfillStats::default(); @@ -637,8 +648,13 @@ pub async fn backfill_missing_covers( let browser_ref: &chromiumoxide::Browser = &lease; let ctx = FetchContext { browser: browser_ref, rate, tor }; - for entry in entries { + let total = entries.len(); + for (index, entry) in entries.into_iter().enumerate() { stats.considered += 1; + if let Some(s) = status { + s.set_phase(crate::crawler::status::Phase::CoverBackfill { index, total }) + .await; + } // Metadata-only TargetSource: skip chapter-list parsing so a // missing-cover refetch doesn't soft-drop chapters on a partial // render. Cover URL alone is what we need. @@ -648,8 +664,8 @@ pub async fn backfill_missing_covers( title: String::new(), url: entry.source_url.clone(), }; - let cover_url = match source.fetch_manga(&ctx, &r).await { - Ok(manga) => manga.cover_url, + let manga = match source.fetch_manga(&ctx, &r).await { + Ok(manga) => manga, Err(e) => { tracing::warn!( manga_id = %entry.manga_id, @@ -661,7 +677,7 @@ pub async fn backfill_missing_covers( continue; } }; - let Some(cover_url) = cover_url else { + let Some(cover_url) = manga.cover_url.clone() else { tracing::warn!( manga_id = %entry.manga_id, url = %entry.source_url, @@ -670,7 +686,14 @@ pub async fn backfill_missing_covers( stats.failed += 1; continue; }; - match download_and_store_cover( + if let Some(s) = status { + s.set_current_cover(Some(crate::crawler::status::CoverTarget { + manga_id: entry.manga_id, + manga_title: manga.title.clone(), + })) + .await; + } + let cover_result = download_and_store_cover( db, storage, http, @@ -681,8 +704,11 @@ pub async fn backfill_missing_covers( allowlist, max_image_bytes, ) - .await - { + .await; + if let Some(s) = status { + s.set_current_cover(None).await; + } + match cover_result { Ok(()) => stats.fetched += 1, Err(e) => { tracing::warn!( diff --git a/backend/src/crawler/resync.rs b/backend/src/crawler/resync.rs index 1e77333..f964c93 100644 --- a/backend/src/crawler/resync.rs +++ b/backend/src/crawler/resync.rs @@ -235,7 +235,7 @@ impl ResyncService for RealResyncService { let row = repo::chapter::dispatch_target(&self.db, chapter_id) .await .context("look up chapter_sources for resync")?; - let Some((manga_id, source_url)) = row else { + let Some((manga_id, source_url, _title, _number)) = row else { return Err(ResyncError::NoChapterSource.into()); }; @@ -257,6 +257,8 @@ impl ResyncService for RealResyncService { &self.download_allowlist, self.max_image_bytes, self.tor.as_deref(), + // Admin resync isn't a daemon worker slot — no live status. + None, ) .await; drop(lease); diff --git a/backend/src/crawler/status.rs b/backend/src/crawler/status.rs index 077676e..c0db8e1 100644 --- a/backend/src/crawler/status.rs +++ b/backend/src/crawler/status.rs @@ -3,16 +3,17 @@ //! The metadata pass runs inline in the cron tick (it is not a //! `crawler_jobs` row), so without this surface "what is the crawler doing //! right now" is unanswerable from the dashboard. The daemon publishes its -//! current [`Phase`] and per-worker activity into a shared -//! [`StatusHandle`]; the admin endpoint reads a [`CrawlerStatus`] snapshot -//! and composes it with DB-derived queue counts + the session/browser -//! flags. +//! current [`Phase`], the chapters being crawled right now (with a live +//! page count), and the cover being fetched into a shared [`StatusHandle`]; +//! the admin endpoint reads a [`CrawlerStatus`] snapshot and composes it +//! with DB-derived counts + the session/browser flags. //! //! NOTE: this is per-process state. The deployment is a single server //! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals //! (last-pass summary, runtime session) are persisted in `crawler_state`. -use std::sync::Arc; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use chrono::{DateTime, Utc}; use serde::Serialize; @@ -21,8 +22,8 @@ use uuid::Uuid; use crate::crawler::pipeline::MetadataStats; -/// What the daemon is doing right now. Serialised with an internal `state` -/// tag so the frontend can switch on it. +/// What the daemon's metadata pass is doing right now. Serialised with an +/// internal `state` tag so the frontend can switch on it. #[derive(Clone, Debug, Serialize)] #[serde(tag = "state", rename_all = "snake_case")] pub enum Phase { @@ -37,18 +38,30 @@ pub enum Phase { total: Option, title: String, }, - /// Backfilling covers that failed on first attempt. - CoverBackfill, + /// Backfilling covers that failed on first attempt. `index`/`total` + /// track progress through this tick's batch. + CoverBackfill { index: usize, total: usize }, } -/// Per-worker activity. (Browser-restart and session-expired states are -/// surfaced as separate top-level fields by the endpoint, sourced from the -/// `BrowserManager` phase and the session flag respectively.) +/// A chapter being downloaded right now, with a live page count. Keyed in +/// the status by `chapter_id`; inserted by the dispatcher when a job starts +/// and removed (via an RAII guard) when it finishes, panics, or times out. #[derive(Clone, Debug, Serialize)] -#[serde(tag = "state", rename_all = "snake_case")] -pub enum WorkerState { - Idle, - Working { chapter_id: Uuid }, +pub struct ActiveChapter { + pub manga_id: Uuid, + pub manga_title: String, + pub chapter_id: Uuid, + pub chapter_number: i32, + pub pages_done: usize, + /// `None` until the chapter page list has been parsed. + pub pages_total: Option, +} + +/// The manga whose cover is being downloaded right now. +#[derive(Clone, Debug, Serialize)] +pub struct CoverTarget { + pub manga_id: Uuid, + pub manga_title: String, } /// Summary of the most recent metadata pass (persisted across restarts in @@ -62,24 +75,30 @@ pub struct LastPass { pub mangas_failed: usize, } -/// The mutable slice of status the daemon owns. Session/browser/queue are -/// composed at read time by the endpoint (they live elsewhere), so they -/// are not stored here. +/// A point-in-time snapshot returned by [`StatusHandle::snapshot`]. The +/// session/browser/queue fields are composed at read time by the endpoint +/// (they live elsewhere), so they are not stored here. #[derive(Clone, Debug, Serialize)] pub struct CrawlerStatus { pub phase: Phase, - pub workers: Vec, + /// Number of configured chapter workers (for "N busy / M workers"). + pub worker_count: usize, + /// Chapters being downloaded right now, with live page counts. + pub active_chapters: Vec, pub last_pass: LastPass, + /// The cover being downloaded right now, if any. + pub current_cover: Option, } -impl CrawlerStatus { - fn new(num_workers: usize) -> Self { - Self { - phase: Phase::Idle { next_fire: None }, - workers: (0..num_workers.max(1)).map(|_| WorkerState::Idle).collect(), - last_pass: LastPass::default(), - } - } +/// Scalar status state held under the async `RwLock`. Active chapters live +/// in a separate sync map so per-page updates and RAII removal don't need +/// to `.await` (removal happens in `Drop`). +#[derive(Clone, Debug)] +struct Scalar { + phase: Phase, + worker_count: usize, + last_pass: LastPass, + current_cover: Option, } /// Cloneable handle the daemon tasks use to publish status. Cheap to clone @@ -88,18 +107,35 @@ impl CrawlerStatus { /// get pushed an update instead of polling. #[derive(Clone)] pub struct StatusHandle { - inner: Arc>, - /// Monotonic version bumped on every change. SSE handlers - /// `subscribe()` and `await .changed()` for instant pushes; `watch` - /// has no lost-wakeup so a change between snapshots is never missed. + scalar: Arc>, + /// Currently-downloading chapters keyed by `chapter_id`. A sync mutex so + /// the RAII [`ChapterGuard`]'s `Drop` can remove without `.await`. + active: Arc>>, + /// Monotonic version bumped on every change. SSE handlers `subscribe()` + /// and `await .changed()` for instant pushes; `watch` has no + /// lost-wakeup so a change between snapshots is never missed. version: Arc>, } +/// Lock the active map, recovering from a poisoned mutex (we never hold the +/// lock across a panic-prone section, so the data is still consistent). +fn lock_active( + m: &Mutex>, +) -> std::sync::MutexGuard<'_, HashMap> { + m.lock().unwrap_or_else(|e| e.into_inner()) +} + impl StatusHandle { pub fn new(num_workers: usize) -> Self { let (version, _rx) = watch::channel(0u64); Self { - inner: Arc::new(RwLock::new(CrawlerStatus::new(num_workers))), + scalar: Arc::new(RwLock::new(Scalar { + phase: Phase::Idle { next_fire: None }, + worker_count: num_workers.max(1), + last_pass: LastPass::default(), + current_cover: None, + })), + active: Arc::new(Mutex::new(HashMap::new())), version: Arc::new(version), } } @@ -122,15 +158,37 @@ impl StatusHandle { } pub async fn set_phase(&self, phase: Phase) { - self.inner.write().await.phase = phase; + self.scalar.write().await.phase = phase; self.bump(); } - pub async fn set_worker(&self, id: usize, state: WorkerState) { + /// Set (or clear) the cover being downloaded right now. + pub async fn set_current_cover(&self, cover: Option) { + self.scalar.write().await.current_cover = cover; + self.bump(); + } + + /// Register a chapter as crawling now; returns a guard that removes it + /// when dropped (on completion, panic-unwind, or timeout-drop). + pub fn begin_chapter(&self, chapter: ActiveChapter) -> ChapterGuard { + let id = chapter.chapter_id; + lock_active(&self.active).insert(id, chapter); + self.bump(); + ChapterGuard { + active: Arc::clone(&self.active), + version: Arc::clone(&self.version), + chapter_id: id, + } + } + + /// Update the live page count of an in-flight chapter. Sync (no + /// `.await`) so it's cheap to call once per stored page. + pub fn set_chapter_pages(&self, chapter_id: Uuid, done: usize, total: Option) { { - let mut s = self.inner.write().await; - if let Some(slot) = s.workers.get_mut(id) { - *slot = state; + let mut map = lock_active(&self.active); + if let Some(c) = map.get_mut(&chapter_id) { + c.pages_done = done; + c.pages_total = total; } } self.bump(); @@ -138,28 +196,55 @@ impl StatusHandle { /// Record a finished metadata pass. Stamps `at` with `now`. pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime) { - { - let mut s = self.inner.write().await; - s.last_pass = LastPass { - at: Some(at), - discovered: stats.discovered, - upserted: stats.upserted, - covers_fetched: stats.covers_fetched, - mangas_failed: stats.mangas_failed, - }; - } + self.scalar.write().await.last_pass = LastPass { + at: Some(at), + discovered: stats.discovered, + upserted: stats.upserted, + covers_fetched: stats.covers_fetched, + mangas_failed: stats.mangas_failed, + }; self.bump(); } /// Seed the last-pass summary from a persisted `crawler_state` value on /// startup so the dashboard isn't blank until the first tick. pub async fn set_last_pass(&self, last: LastPass) { - self.inner.write().await.last_pass = last; + self.scalar.write().await.last_pass = last; self.bump(); } pub async fn snapshot(&self) -> CrawlerStatus { - self.inner.read().await.clone() + let scalar = self.scalar.read().await.clone(); + let mut active_chapters: Vec = + lock_active(&self.active).values().cloned().collect(); + // Stable, readable order: by chapter number then id. + active_chapters.sort_by(|a, b| { + a.chapter_number + .cmp(&b.chapter_number) + .then(a.chapter_id.cmp(&b.chapter_id)) + }); + CrawlerStatus { + phase: scalar.phase, + worker_count: scalar.worker_count, + active_chapters, + last_pass: scalar.last_pass, + current_cover: scalar.current_cover, + } + } +} + +/// RAII handle removing an [`ActiveChapter`] from the live status when the +/// chapter dispatch finishes, panics, or is dropped on timeout. +pub struct ChapterGuard { + active: Arc>>, + version: Arc>, + chapter_id: Uuid, +} + +impl Drop for ChapterGuard { + fn drop(&mut self) { + lock_active(&self.active).remove(&self.chapter_id); + self.version.send_modify(|v| *v = v.wrapping_add(1)); } } @@ -167,16 +252,73 @@ impl StatusHandle { mod tests { use super::*; + fn sample_chapter(n: i32) -> ActiveChapter { + ActiveChapter { + manga_id: Uuid::new_v4(), + manga_title: "M".into(), + chapter_id: Uuid::new_v4(), + chapter_number: n, + pages_done: 0, + pages_total: None, + } + } + #[tokio::test] - async fn set_worker_updates_only_that_slot() { + async fn begin_chapter_shows_in_snapshot_and_guard_removes_on_drop() { let h = StatusHandle::new(2); - let cid = Uuid::new_v4(); - h.set_worker(1, WorkerState::Working { chapter_id: cid }).await; + let chap = sample_chapter(7); + let cid = chap.chapter_id; + { + let _guard = h.begin_chapter(chap); + let snap = h.snapshot().await; + assert_eq!(snap.active_chapters.len(), 1); + assert_eq!(snap.active_chapters[0].chapter_id, cid); + assert_eq!(snap.worker_count, 2); + } + // Guard dropped → entry removed. let snap = h.snapshot().await; - assert!(matches!(snap.workers[0], WorkerState::Idle)); - assert!(matches!(snap.workers[1], WorkerState::Working { .. })); - // Out-of-range id is a no-op, not a panic. - h.set_worker(99, WorkerState::Idle).await; + assert!(snap.active_chapters.is_empty()); + } + + #[tokio::test] + async fn set_chapter_pages_updates_live_count() { + let h = StatusHandle::new(1); + let chap = sample_chapter(1); + let cid = chap.chapter_id; + let _guard = h.begin_chapter(chap); + h.set_chapter_pages(cid, 3, Some(20)); + let snap = h.snapshot().await; + assert_eq!(snap.active_chapters[0].pages_done, 3); + assert_eq!(snap.active_chapters[0].pages_total, Some(20)); + // Updating an unknown chapter is a no-op, not a panic. + h.set_chapter_pages(Uuid::new_v4(), 9, Some(9)); + } + + #[tokio::test] + async fn snapshot_sorts_active_chapters_by_number() { + let h = StatusHandle::new(2); + let _g1 = h.begin_chapter(sample_chapter(5)); + let _g2 = h.begin_chapter(sample_chapter(2)); + let snap = h.snapshot().await; + assert_eq!(snap.active_chapters[0].chapter_number, 2); + assert_eq!(snap.active_chapters[1].chapter_number, 5); + } + + #[tokio::test] + async fn set_current_cover_round_trips() { + let h = StatusHandle::new(1); + let mid = Uuid::new_v4(); + h.set_current_cover(Some(CoverTarget { + manga_id: mid, + manga_title: "One Piece".into(), + })) + .await; + assert_eq!( + h.snapshot().await.current_cover.map(|c| c.manga_id), + Some(mid) + ); + h.set_current_cover(None).await; + assert!(h.snapshot().await.current_cover.is_none()); } #[tokio::test] @@ -197,14 +339,17 @@ mod tests { } #[tokio::test] - async fn subscribe_resolves_on_mutation_and_poke() { + async fn subscribe_resolves_on_mutation_poke_and_chapter_change() { let h = StatusHandle::new(1); let mut rx = h.subscribe(); - // A mutation wakes the subscriber. h.set_phase(Phase::WalkingList).await; rx.changed().await.unwrap(); - // A bare poke (external signal) also wakes it. h.poke(); rx.changed().await.unwrap(); + // begin_chapter + guard drop each bump the version. + let g = h.begin_chapter(sample_chapter(1)); + rx.changed().await.unwrap(); + drop(g); + rx.changed().await.unwrap(); } } diff --git a/backend/src/repo/chapter.rs b/backend/src/repo/chapter.rs index 7bd3fe7..cc1065a 100644 --- a/backend/src/repo/chapter.rs +++ b/backend/src/repo/chapter.rs @@ -138,14 +138,18 @@ pub async fn page_count(pool: &PgPool, id: Uuid) -> sqlx::Result> { /// filter — this resolver stays in lockstep so a chapter that was /// dropped between enqueue and lease isn't dispatched against a stale /// URL. +/// Returns `(manga_id, source_url, manga_title, chapter_number)`. The +/// title + number feed the live "currently crawling" status; the rest is +/// what the dispatcher needs to do the work. pub async fn dispatch_target( pool: &PgPool, chapter_id: Uuid, -) -> sqlx::Result> { +) -> sqlx::Result> { sqlx::query_as( - "SELECT c.manga_id, cs.source_url \ + "SELECT c.manga_id, cs.source_url, m.title, c.number \ FROM chapters c \ JOIN chapter_sources cs ON cs.chapter_id = c.id \ + JOIN mangas m ON m.id = c.manga_id \ WHERE c.id = $1 \ AND cs.dropped_at IS NULL \ ORDER BY cs.last_seen_at DESC \ diff --git a/backend/src/repo/crawler.rs b/backend/src/repo/crawler.rs index 84d3a43..8911152 100644 --- a/backend/src/repo/crawler.rs +++ b/backend/src/repo/crawler.rs @@ -699,6 +699,155 @@ pub async fn list_dead_jobs( Ok((items, total)) } +/// An in-flight chapter-content job (`pending` or `running`) joined to its +/// chapter + manga, for the "queued chapters" admin view. +#[derive(Debug, Clone, Serialize, FromRow)] +pub struct ActiveJob { + pub id: Uuid, + pub chapter_id: Option, + pub manga_id: Option, + pub manga_title: Option, + pub chapter_number: Option, + /// `"pending"` or `"running"`. + pub state: String, + pub attempts: i32, + pub max_attempts: i32, + pub updated_at: DateTime, +} + +/// Paginated list of `pending`/`running` chapter-content jobs (which +/// chapters of which mangas are queued or being crawled). Running first, +/// then by scheduled order. `search` filters on manga title. +pub async fn list_active_jobs( + pool: &PgPool, + search: Option<&str>, + limit: i64, + offset: i64, +) -> sqlx::Result<(Vec, i64)> { + let search_pat = search + .map(|s| format!("%{}%", s.trim())) + .filter(|p| p.len() > 2); + + let items: Vec = sqlx::query_as( + r#" + SELECT + cj.id, + (cj.payload->>'chapter_id')::uuid AS chapter_id, + c.manga_id AS manga_id, + m.title AS manga_title, + c.number AS chapter_number, + cj.state, + cj.attempts, + cj.max_attempts, + cj.updated_at + FROM crawler_jobs cj + LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid + LEFT JOIN mangas m ON m.id = c.manga_id + WHERE cj.state IN ('pending','running') + AND cj.payload->>'kind' = 'sync_chapter_content' + AND ($1::text IS NULL OR m.title ILIKE $1) + ORDER BY (cj.state = 'running') DESC, cj.scheduled_at, cj.created_at + LIMIT $2 OFFSET $3 + "#, + ) + .bind(&search_pat) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await?; + + let total: i64 = sqlx::query_scalar( + r#" + SELECT COUNT(*) + FROM crawler_jobs cj + LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid + LEFT JOIN mangas m ON m.id = c.manga_id + WHERE cj.state IN ('pending','running') + AND cj.payload->>'kind' = 'sync_chapter_content' + AND ($1::text IS NULL OR m.title ILIKE $1) + "#, + ) + .bind(&search_pat) + .fetch_one(pool) + .await?; + + Ok((items, total)) +} + +/// A manga whose cover is still missing (queued for cover fetch). +#[derive(Debug, Clone, Serialize, FromRow)] +pub struct MissingCoverRow { + pub manga_id: Uuid, + pub manga_title: String, +} + +/// Count mangas with no cover yet but a live source row — the cover +/// backlog the metadata pass + backfill drain. +pub async fn count_missing_covers(pool: &PgPool) -> sqlx::Result { + sqlx::query_scalar( + r#" + SELECT COUNT(*) FROM mangas m + WHERE m.cover_image_path IS NULL + AND EXISTS ( + SELECT 1 FROM manga_sources ms + WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL + ) + "#, + ) + .fetch_one(pool) + .await +} + +/// Paginated list of mangas queued for a cover fetch (no cover yet + a live +/// source), with titles. `search` filters on title. Freshest source first. +pub async fn list_missing_cover_mangas( + pool: &PgPool, + search: Option<&str>, + limit: i64, + offset: i64, +) -> sqlx::Result<(Vec, i64)> { + let search_pat = search + .map(|s| format!("%{}%", s.trim())) + .filter(|p| p.len() > 2); + + let items: Vec = sqlx::query_as( + r#" + SELECT m.id AS manga_id, m.title AS manga_title + FROM mangas m + WHERE m.cover_image_path IS NULL + AND EXISTS ( + SELECT 1 FROM manga_sources ms + WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL + ) + AND ($1::text IS NULL OR m.title ILIKE $1) + ORDER BY m.updated_at DESC + LIMIT $2 OFFSET $3 + "#, + ) + .bind(&search_pat) + .bind(limit) + .bind(offset) + .fetch_all(pool) + .await?; + + let total: i64 = sqlx::query_scalar( + r#" + SELECT COUNT(*) FROM mangas m + WHERE m.cover_image_path IS NULL + AND EXISTS ( + SELECT 1 FROM manga_sources ms + WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL + ) + AND ($1::text IS NULL OR m.title ILIKE $1) + "#, + ) + .bind(&search_pat) + .fetch_one(pool) + .await?; + + Ok((items, total)) +} + /// Scope of a dead-job requeue. #[derive(Debug, Clone)] pub enum RequeueScope { diff --git a/backend/tests/api_admin_crawler.rs b/backend/tests/api_admin_crawler.rs index d1b8b9f..2f0caaf 100644 --- a/backend/tests/api_admin_crawler.rs +++ b/backend/tests/api_admin_crawler.rs @@ -64,6 +64,102 @@ async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid { job_id } +/// Seed a chapter-content job in a given state ('pending'/'running'). +async fn seed_job(pool: &PgPool, title: &str, state: &str) { + let manga_id = Uuid::new_v4(); + let chapter_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)") + .bind(manga_id) + .bind(title) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)") + .bind(chapter_id) + .bind(manga_id) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO crawler_jobs (id, payload, state) VALUES ($1, $2, $3)") + .bind(Uuid::new_v4()) + .bind(json!({ + "kind": "sync_chapter_content", + "source_id": "target", + "chapter_id": chapter_id, + "source_chapter_key": "k", + })) + .bind(state) + .execute(pool) + .await + .unwrap(); +} + +/// Seed a manga with no cover + a live source row (queued for cover fetch). +async fn seed_missing_cover(pool: &PgPool, title: &str) { + let manga_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)") + .bind(manga_id) + .bind(title) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target','T','http://x') ON CONFLICT DO NOTHING") + .execute(pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \ + VALUES ('target', $1, $2, 'http://x/m')", + ) + .bind(format!("k-{manga_id}")) + .bind(manga_id) + .execute(pool) + .await + .unwrap(); +} + +#[sqlx::test(migrations = "./migrations")] +async fn active_jobs_and_covers_lists_over_http(pool: PgPool) { + seed_job(&pool, "Naruto", "pending").await; + seed_job(&pool, "Bleach", "running").await; + seed_missing_cover(&pool, "One Piece").await; + let h = harness(pool.clone()); + let cookie = seed_admin(&pool, &h.app).await; + + // Queued/active chapters. + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(body["page"]["total"], 2); + + // Queued covers. + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/covers", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(body["page"]["total"], 1); + assert_eq!(body["items"][0]["manga_title"], "One Piece"); + + // Both are admin-gated. + let (_u, plain) = register_user(&h.app).await; + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &plain)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); +} + #[sqlx::test(migrations = "./migrations")] async fn get_status_requires_admin(pool: PgPool) { let h = harness(pool); diff --git a/backend/tests/crawler_dead_jobs.rs b/backend/tests/crawler_dead_jobs.rs index 8e32095..5972f4f 100644 --- a/backend/tests/crawler_dead_jobs.rs +++ b/backend/tests/crawler_dead_jobs.rs @@ -7,6 +7,32 @@ use serde_json::json; use sqlx::PgPool; use uuid::Uuid; +/// Seed a manga with no cover + a live source row (so it's "queued for a +/// cover fetch"). Returns the manga id. +async fn seed_missing_cover(pool: &PgPool, title: &str) -> Uuid { + let manga_id = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)") + .bind(manga_id) + .bind(title) + .execute(pool) + .await + .unwrap(); + sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target', 'T', 'http://x') ON CONFLICT DO NOTHING") + .execute(pool) + .await + .unwrap(); + sqlx::query( + "INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \ + VALUES ('target', $1, $2, 'http://x/m')", + ) + .bind(format!("k-{manga_id}")) + .bind(manga_id) + .execute(pool) + .await + .unwrap(); + manga_id +} + /// Seed a manga + chapter and return their ids. async fn seed_chapter(pool: &PgPool, title: &str, number: i32) -> (Uuid, Uuid) { let manga_id = Uuid::new_v4(); @@ -202,6 +228,66 @@ async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: Pg } } +#[sqlx::test(migrations = "./migrations")] +async fn list_active_jobs_returns_pending_and_running_running_first(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await; + let (_m2, c2) = seed_chapter(&pool, "Bleach", 10).await; + insert_job(&pool, c1, "pending", 0).await; + insert_job(&pool, c2, "running", 1).await; + // A dead + a done job must NOT appear. + let (_m3, c3) = seed_chapter(&pool, "Gone", 1).await; + insert_job(&pool, c3, "dead", 5).await; + + let (items, total) = crawler::list_active_jobs(&pool, None, 50, 0).await.unwrap(); + assert_eq!(total, 2); + assert_eq!(items.len(), 2); + // Running first. + assert_eq!(items[0].state, "running"); + assert_eq!(items[0].manga_title.as_deref(), Some("Bleach")); + assert_eq!(items[1].state, "pending"); + assert_eq!(items[1].chapter_number, Some(700)); +} + +#[sqlx::test(migrations = "./migrations")] +async fn list_active_jobs_filters_by_title(pool: PgPool) { + let (_m, c1) = seed_chapter(&pool, "Naruto", 1).await; + let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await; + insert_job(&pool, c1, "pending", 0).await; + insert_job(&pool, c2, "pending", 0).await; + let (items, total) = crawler::list_active_jobs(&pool, Some("piece"), 50, 0) + .await + .unwrap(); + assert_eq!(total, 1); + assert_eq!(items[0].manga_title.as_deref(), Some("One Piece")); +} + +#[sqlx::test(migrations = "./migrations")] +async fn missing_covers_count_and_list(pool: PgPool) { + seed_missing_cover(&pool, "Naruto").await; + seed_missing_cover(&pool, "Bleach").await; + // A manga WITH a cover must not be counted. + let with_cover = Uuid::new_v4(); + sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, 'Done', 'k.jpg')") + .bind(with_cover) + .execute(&pool) + .await + .unwrap(); + + assert_eq!(crawler::count_missing_covers(&pool).await.unwrap(), 2); + + let (items, total) = crawler::list_missing_cover_mangas(&pool, None, 50, 0) + .await + .unwrap(); + assert_eq!(total, 2); + assert_eq!(items.len(), 2); + + let (items, total) = crawler::list_missing_cover_mangas(&pool, Some("naru"), 50, 0) + .await + .unwrap(); + assert_eq!(total, 1); + assert_eq!(items[0].manga_title, "Naruto"); +} + #[sqlx::test(migrations = "./migrations")] async fn job_state_counts_groups_by_state(pool: PgPool) { let (_m, c1) = seed_chapter(&pool, "A", 1).await; diff --git a/backend/tests/repo_chapter.rs b/backend/tests/repo_chapter.rs index 1ed9f05..c2777e8 100644 --- a/backend/tests/repo_chapter.rs +++ b/backend/tests/repo_chapter.rs @@ -109,7 +109,7 @@ async fn dispatch_target_prefers_most_recent_live_source(pool: PgPool) { seed_chapter_with_two_live_sources(&pool).await; let row = dispatch_target(&pool, chapter_id).await.unwrap(); - let (_manga_id, source_url) = + let (_manga_id, source_url, _title, _number) = row.expect("two live sources should yield a dispatch target"); assert_eq!( source_url, new_url, @@ -133,7 +133,7 @@ async fn dispatch_target_skips_dropped_sources(pool: PgPool) { .unwrap(); let row = dispatch_target(&pool, chapter_id).await.unwrap(); - let (_manga_id, source_url) = + let (_manga_id, source_url, _title, _number) = row.expect("a single live source should still yield a dispatch target"); assert!( source_url != new_url, diff --git a/frontend/package.json b/frontend/package.json index 95a2575..7b622fd 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -1,6 +1,6 @@ { "name": "mangalord-frontend", - "version": "0.53.1", + "version": "0.54.0", "private": true, "type": "module", "scripts": { diff --git a/frontend/src/lib/api/admin.test.ts b/frontend/src/lib/api/admin.test.ts index 99381da..6d14d09 100644 --- a/frontend/src/lib/api/admin.test.ts +++ b/frontend/src/lib/api/admin.test.ts @@ -24,7 +24,9 @@ import { updateCrawlerSession, clearCrawlerSessionExpired, listDeadJobs, - requeueDeadJobs + requeueDeadJobs, + listActiveJobs, + listMissingCovers } from './admin'; function ok(body: unknown, status = 200): Response { @@ -350,7 +352,19 @@ describe('admin crawler api client', () => { const statusFixture = { daemon: 'running', phase: { state: 'fetching_metadata', index: 3, total: 10, title: 'One Piece' }, - workers: [{ state: 'idle' }], + worker_count: 2, + active_chapters: [ + { + manga_id: 'm-1', + manga_title: 'Bleach', + chapter_id: 'c-1', + chapter_number: 12, + pages_done: 4, + pages_total: 20 + } + ], + current_cover: { manga_id: 'm-2', manga_title: 'Naruto' }, + covers_queued: 7, last_pass: { at: null, discovered: 0, upserted: 0, covers_fetched: 0, mangas_failed: 0 }, session: { expired: false, configured: true }, browser: 'healthy', @@ -361,15 +375,38 @@ describe('admin crawler api client', () => { expect(crawlerStatusStreamUrl()).toMatch(/\/v1\/admin\/crawler\/stream$/); }); - it('getCrawlerStatus GETs /v1/admin/crawler', async () => { + it('getCrawlerStatus GETs /v1/admin/crawler with live chapter/cover fields', async () => { fetchSpy.mockResolvedValueOnce(ok(statusFixture)); const s = await getCrawlerStatus(); expect(s.queue.dead).toBe(4); expect(s.phase?.state).toBe('fetching_metadata'); + expect(s.active_chapters[0].pages_done).toBe(4); + expect(s.active_chapters[0].pages_total).toBe(20); + expect(s.current_cover?.manga_title).toBe('Naruto'); + expect(s.covers_queued).toBe(7); const url = fetchSpy.mock.calls[0][0] as string; expect(url).toMatch(/\/v1\/admin\/crawler$/); }); + it('listActiveJobs GETs /v1/admin/crawler/active-jobs with search', async () => { + fetchSpy.mockResolvedValueOnce( + ok({ items: [], page: { limit: 20, offset: 0, total: 0 } }) + ); + await listActiveJobs({ search: 'bleach' }); + const url = fetchSpy.mock.calls[0][0] as string; + expect(url).toMatch(/\/v1\/admin\/crawler\/active-jobs\?/); + expect(url).toContain('search=bleach'); + }); + + it('listMissingCovers GETs /v1/admin/crawler/covers', async () => { + fetchSpy.mockResolvedValueOnce( + ok({ items: [{ manga_id: 'm-1', manga_title: 'X' }], page: { limit: 20, offset: 0, total: 1 } }) + ); + const r = await listMissingCovers(); + expect(r.items[0].manga_title).toBe('X'); + expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/covers$/); + }); + it('runCrawlerPass POSTs /v1/admin/crawler/run', async () => { fetchSpy.mockResolvedValueOnce(ok({ started: true })); const r = await runCrawlerPass(); diff --git a/frontend/src/lib/api/admin.ts b/frontend/src/lib/api/admin.ts index 238b28e..ff1c826 100644 --- a/frontend/src/lib/api/admin.ts +++ b/frontend/src/lib/api/admin.ts @@ -222,9 +222,17 @@ export type CrawlerPhase = | { state: 'idle'; next_fire: string | null } | { state: 'walking_list' } | { state: 'fetching_metadata'; index: number; total: number | null; title: string } - | { state: 'cover_backfill' }; + | { state: 'cover_backfill'; index: number; total: number }; -export type CrawlerWorker = { state: 'idle' } | { state: 'working'; chapter_id: string }; +/** A chapter being crawled right now, with a live page count. */ +export type ActiveChapter = { + manga_id: string; + manga_title: string; + chapter_id: string; + chapter_number: number; + pages_done: number; + pages_total: number | null; +}; export type CrawlerLastPass = { at: string | null; @@ -237,7 +245,10 @@ export type CrawlerLastPass = { export type CrawlerStatus = { daemon: 'running' | 'disabled'; phase: CrawlerPhase | null; - workers: CrawlerWorker[]; + worker_count: number; + active_chapters: ActiveChapter[]; + current_cover: { manga_id: string; manga_title: string } | null; + covers_queued: number; last_pass: CrawlerLastPass; session: { expired: boolean; configured: boolean }; browser: 'healthy' | 'draining' | 'restarting' | 'down'; @@ -324,3 +335,51 @@ export async function requeueDeadJobs(scope: RequeueScope): Promise<{ requeued: body: JSON.stringify(scope) }); } + +/** A queued/running chapter-content job (which chapters are queued). */ +export type ActiveJob = { + id: string; + chapter_id: string | null; + manga_id: string | null; + manga_title: string | null; + chapter_number: number | null; + state: 'pending' | 'running'; + attempts: number; + max_attempts: number; + updated_at: string; +}; + +export type ActiveJobsPage = { items: ActiveJob[]; page: Page }; + +/** GET /v1/admin/crawler/active-jobs — which chapters of which mangas are + * queued or running now. */ +export async function listActiveJobs(opts?: { + search?: string; + limit?: number; + offset?: number; +}): Promise { + const params = new URLSearchParams(); + if (opts?.search) params.set('search', opts.search); + if (opts?.limit != null) params.set('limit', String(opts.limit)); + if (opts?.offset != null) params.set('offset', String(opts.offset)); + const qs = params.toString(); + return request(`/v1/admin/crawler/active-jobs${qs ? `?${qs}` : ''}`); +} + +/** A manga queued for a cover fetch (no cover yet + a live source). */ +export type MissingCover = { manga_id: string; manga_title: string }; +export type MissingCoversPage = { items: MissingCover[]; page: Page }; + +/** GET /v1/admin/crawler/covers — which manga covers are queued. */ +export async function listMissingCovers(opts?: { + search?: string; + limit?: number; + offset?: number; +}): Promise { + const params = new URLSearchParams(); + if (opts?.search) params.set('search', opts.search); + if (opts?.limit != null) params.set('limit', String(opts.limit)); + if (opts?.offset != null) params.set('offset', String(opts.offset)); + const qs = params.toString(); + return request(`/v1/admin/crawler/covers${qs ? `?${qs}` : ''}`); +} diff --git a/frontend/src/routes/admin/crawler/+page.svelte b/frontend/src/routes/admin/crawler/+page.svelte index ee53f48..d62cc85 100644 --- a/frontend/src/routes/admin/crawler/+page.svelte +++ b/frontend/src/routes/admin/crawler/+page.svelte @@ -11,9 +11,13 @@ clearCrawlerSessionExpired, listDeadJobs, requeueDeadJobs, + listActiveJobs, + listMissingCovers, type CrawlerStatus, type CrawlerPhase, type DeadJob, + type ActiveJob, + type MissingCover, type RequeueScope } from '$lib/api/admin'; @@ -31,6 +35,20 @@ let deadPage = $state(1); const DEAD_LIMIT = 20; + // Queued chapters (pending/running) + let activeJobs: ActiveJob[] = $state([]); + let activeTotal = $state(0); + let activeSearch = $state(''); + let activePage = $state(1); + const ACTIVE_LIMIT = 20; + + // Queued covers (mangas missing a cover) + let covers: MissingCover[] = $state([]); + let coversTotal = $state(0); + let coversSearch = $state(''); + let coversPage = $state(1); + const COVERS_LIMIT = 20; + // Modals let sessionModalOpen = $state(false); let restartModalOpen = $state(false); @@ -60,6 +78,55 @@ } } + async function loadActiveJobs() { + try { + const resp = await listActiveJobs({ + search: activeSearch.trim() || undefined, + limit: ACTIVE_LIMIT, + offset: (activePage - 1) * ACTIVE_LIMIT + }); + activeJobs = resp.items; + activeTotal = resp.page.total ?? resp.items.length; + } catch (e) { + error = e instanceof Error ? e.message : 'failed to load queued chapters'; + } + } + + async function loadCovers() { + try { + const resp = await listMissingCovers({ + search: coversSearch.trim() || undefined, + limit: COVERS_LIMIT, + offset: (coversPage - 1) * COVERS_LIMIT + }); + covers = resp.items; + coversTotal = resp.page.total ?? resp.items.length; + } catch (e) { + error = e instanceof Error ? e.message : 'failed to load queued covers'; + } + } + + // Auto-refresh the (fetched, not streamed) backlog lists when the live + // status shows the relevant counts moved — keeps the lists feeling live + // without pushing big payloads over SSE. `$effect` re-runs when these + // tracked values change. + let lastQueueKey = $state(''); + let lastCoversKey = $state(-1); + $effect(() => { + const k = `${status?.queue.pending ?? 0}:${status?.queue.running ?? 0}`; + if (k !== lastQueueKey) { + lastQueueKey = k; + loadActiveJobs(); + } + }); + $effect(() => { + const c = status?.covers_queued ?? -1; + if (c !== lastCoversKey) { + lastCoversKey = c; + loadCovers(); + } + }); + // Live updates via Server-Sent Events instead of polling. The // EventSource is opened on mount and closed on destroy, so the // subscription exists only while this page is showing live data. @@ -171,6 +238,23 @@ loadDeadJobs(); } + function onSearchActive() { + activePage = 1; + loadActiveJobs(); + } + function onActivePageChange(p: number) { + activePage = p; + loadActiveJobs(); + } + function onSearchCovers() { + coversPage = 1; + loadCovers(); + } + function onCoversPageChange(p: number) { + coversPage = p; + loadCovers(); + } + // ---- display helpers ---- function phaseLabel(p: CrawlerPhase | null): string { if (!p) return 'Daemon disabled'; @@ -184,7 +268,7 @@ case 'fetching_metadata': return `Fetching metadata · ${p.index}/${p.total ?? '?'} · ${p.title}`; case 'cover_backfill': - return 'Backfilling covers'; + return `Backfilling covers · ${p.index + 1}/${p.total}`; } } @@ -215,6 +299,14 @@ } const deadTotalPages = $derived(Math.max(1, Math.ceil(deadTotal / DEAD_LIMIT))); + const activeTotalPages = $derived(Math.max(1, Math.ceil(activeTotal / ACTIVE_LIMIT))); + const coversTotalPages = $derived(Math.max(1, Math.ceil(coversTotal / COVERS_LIMIT))); + + function chapterPercent(c: { pages_done: number; pages_total: number | null }): number | null { + return c.pages_total && c.pages_total > 0 + ? Math.min(100, (c.pages_done / c.pages_total) * 100) + : null; + }
@@ -260,6 +352,12 @@

{/if} + {#if status.current_cover} +

+ 🖼 Fetching cover: {status.current_cover.manga_title} +

+ {/if} +

Last pass: {#if status.last_pass.at} @@ -288,7 +386,7 @@ {/if} - +

Queue

@@ -299,27 +397,28 @@
{status.queue.running}
Dead
{status.queue.dead}
+
Covers queued
+
{status.covers_queued}
-

Workers

- {#if status.workers.length === 0} -

none

+

Active chapters ({status.active_chapters.length}/{status.worker_count})

+ {#if status.active_chapters.length === 0} +

idle — no chapters downloading

{:else} - +
- {#each status.workers as w, i (i)} + {#each status.active_chapters as c (c.chapter_id)} - - + + - {/each} @@ -331,6 +430,82 @@

Loading…

{/if} + +
+
+

Queued chapters ({activeTotal})

+
+ e.key === 'Enter' && onSearchActive()} + /> + +
+
+ {#if activeJobs.length === 0} +

No chapters queued.

+ {:else} +
#{i} - {w.state} + {c.manga_title} · ch.{c.chapter_number} + {c.pages_done}/{c.pages_total ?? '?'} + + {#if chapterPercent(c) !== null} + {@render Bar({ percent: chapterPercent(c) ?? 0 })} + {/if} {w.state === 'working' ? w.chapter_id : '—'}
+ + + + + + + + + {#each activeJobs as j (j.id)} + + + + + + {/each} + +
Manga / ChapterStateAtt.
+ {j.manga_title ?? '(unknown)'} + {#if j.chapter_number != null}· ch.{j.chapter_number}{/if} + + {j.state} + {j.attempts}/{j.max_attempts}
+ + {/if} +
+ + +
+
+

Queued covers ({coversTotal})

+
+ e.key === 'Enter' && onSearchCovers()} + /> + +
+
+ {#if covers.length === 0} +

No covers queued 🎉

+ {:else} + + + + + + {#each covers as c (c.manga_id)} + + {/each} + +
Manga
{c.manga_title}
+ + {/if} +
+
@@ -643,4 +818,21 @@ .muted { color: var(--text-muted); } + .cover { + font-size: var(--font-sm); + } + .backlog { + margin-top: var(--space-4); + } + .pagecount { + font-family: var(--font-mono, monospace); + font-size: var(--font-xs); + white-space: nowrap; + } + .pagebar { + width: 8rem; + } + table.active td { + vertical-align: middle; + }