feat(crawler): live cover + chapter-content observability with realtime page counts

Extends the live dashboard so an operator can see exactly what's being
fetched, in realtime:

- Chapters being crawled now are tracked in the status as `active_chapters`
  (manga title · ch.N) with a live page counter that climbs per stored page
  (set_chapter_pages, pushed via the existing watch→SSE). The dispatcher
  registers each via an RAII ChapterGuard (sync Mutex) that removes the
  entry on completion, panic, or timeout-drop — replacing the old per-worker
  slot model.
- Covers: status now carries the cover being fetched now (`current_cover`,
  set around download_and_store_cover in both the metadata pass and backfill)
  and a `covers_queued` backlog count; CoverBackfill phase gains index/total.
- Two paginated backlog endpoints (fetched on demand, auto-refreshed when the
  live counts change): GET /admin/crawler/active-jobs (which chapters of which
  mangas are queued/running) and GET /admin/crawler/covers (mangas missing a
  cover). repo: list_active_jobs, list_missing_cover_mangas, count_missing_covers.
- dispatch_target now also returns manga title + chapter number.

Frontend: the crawler page replaces the Workers table with an Active-chapters
table (live page bars), adds a current-cover line + covers-queued figure, and
two backlog sections (Queued chapters / Queued covers) with search + Pager,
auto-refetched via $effect on the live counts.

Tests: status guard/page + cover unit tests; repo list/count tests; endpoint
tests; frontend api tests. Version 0.53.1 -> 0.54.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-04 20:41:51 +02:00
parent fb4182f68d
commit e02d125f51
19 changed files with 1005 additions and 125 deletions

2
backend/Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "mangalord"
version = "0.53.1"
version = "0.54.0"
dependencies = [
"anyhow",
"argon2",

View File

@@ -1,6 +1,6 @@
[package]
name = "mangalord"
version = "0.53.1"
version = "0.54.0"
edition = "2021"
default-run = "mangalord"

View File

@@ -21,10 +21,10 @@ use uuid::Uuid;
use crate::app::{AppState, CrawlerControl};
use crate::auth::extractor::RequireAdmin;
use crate::crawler::browser_manager::RestartPhase;
use crate::crawler::status::{LastPass, Phase, WorkerState};
use crate::crawler::status::{ActiveChapter, CoverTarget, LastPass, Phase};
use crate::error::{AppError, AppResult};
use crate::repo;
use crate::repo::crawler::{DeadJob, RequeueScope};
use crate::repo::crawler::{ActiveJob, DeadJob, MissingCoverRow, RequeueScope};
/// Backstop recompose interval for the SSE stream. Phase/worker/session
/// changes push instantly via the status `watch`; this only bounds the
@@ -45,6 +45,8 @@ pub fn routes() -> Router<AppState> {
)
.route("/admin/crawler/dead-jobs", get(list_dead_jobs))
.route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs))
.route("/admin/crawler/active-jobs", get(list_active_jobs))
.route("/admin/crawler/covers", get(list_covers))
}
// ---------------------------------------------------------------------------
@@ -71,7 +73,14 @@ struct CrawlerStatusResponse {
/// `"running"` | `"disabled"`.
daemon: &'static str,
phase: Option<Phase>,
workers: Vec<WorkerState>,
/// Configured chapter-worker count (for "N busy / M workers").
worker_count: usize,
/// Chapters being crawled right now, with live page counts.
active_chapters: Vec<ActiveChapter>,
/// The cover being fetched right now, if any.
current_cover: Option<CoverTarget>,
/// Mangas still queued for a cover fetch.
covers_queued: i64,
last_pass: LastPass,
session: SessionStatus,
/// `"healthy"` | `"draining"` | `"restarting"` | `"down"`.
@@ -97,12 +106,16 @@ async fn compose_status(state: &AppState) -> AppResult<CrawlerStatusResponse> {
running,
dead,
};
let covers_queued = repo::crawler::count_missing_covers(&state.db).await?;
Ok(match state.crawler.as_ref() {
None => CrawlerStatusResponse {
daemon: "disabled",
phase: None,
workers: Vec::new(),
worker_count: 0,
active_chapters: Vec::new(),
current_cover: None,
covers_queued,
last_pass: LastPass::default(),
session: SessionStatus {
expired: false,
@@ -116,7 +129,10 @@ async fn compose_status(state: &AppState) -> AppResult<CrawlerStatusResponse> {
CrawlerStatusResponse {
daemon: "running",
phase: Some(snap.phase),
workers: snap.workers,
worker_count: snap.worker_count,
active_chapters: snap.active_chapters,
current_cover: snap.current_cover,
covers_queued,
last_pass: snap.last_pass,
session: SessionStatus {
expired: c.session.is_expired(),
@@ -420,6 +436,52 @@ fn scope_label(r: &RequeueRequest) -> &'static str {
}
}
// ---------------------------------------------------------------------------
// Queued-chapters + queued-covers backlogs (paginated, fetched on demand)
// ---------------------------------------------------------------------------
/// Pagination + title-search params shared by the backlog list endpoints.
#[derive(Debug, Deserialize, Default)]
struct ListParams {
#[serde(default)]
search: Option<String>,
#[serde(default = "default_limit")]
limit: i64,
#[serde(default)]
offset: i64,
}
async fn list_active_jobs(
State(state): State<AppState>,
_admin: RequireAdmin,
Query(params): Query<ListParams>,
) -> AppResult<Json<crate::api::pagination::PagedResponse<ActiveJob>>> {
let limit = params.limit.clamp(1, 200);
let offset = params.offset.max(0);
let search = params.search.filter(|s| !s.trim().is_empty());
let (items, total) =
repo::crawler::list_active_jobs(&state.db, search.as_deref(), limit, offset).await?;
Ok(Json(crate::api::pagination::PagedResponse::with_total(
items, limit, offset, total,
)))
}
async fn list_covers(
State(state): State<AppState>,
_admin: RequireAdmin,
Query(params): Query<ListParams>,
) -> AppResult<Json<crate::api::pagination::PagedResponse<MissingCoverRow>>> {
let limit = params.limit.clamp(1, 200);
let offset = params.offset.max(0);
let search = params.search.filter(|s| !s.trim().is_empty());
let (items, total) =
repo::crawler::list_missing_cover_mangas(&state.db, search.as_deref(), limit, offset)
.await?;
Ok(Json(crate::api::pagination::PagedResponse::with_total(
items, limit, offset, total,
)))
}
// ---------------------------------------------------------------------------
fn require_crawler(state: &AppState) -> Result<&std::sync::Arc<CrawlerControl>, AppError> {

View File

@@ -276,6 +276,7 @@ async fn spawn_crawler_daemon(
transient_failures: Arc::new(AtomicU32::new(0)),
restart_threshold: cfg.browser_restart_threshold,
drain_deadline: cfg.job_timeout,
status: status.clone(),
tor: tor.as_ref().map(Arc::clone),
});
@@ -389,10 +390,8 @@ impl MetadataPass for RealMetadataPass {
// errored — the early-stop walk can complete its work and bail
// late, and a transient browser failure shouldn't cancel the
// residual cover backlog. The backfill has its own per-call cap
// so a runaway error stream can't monopolise the tick.
self.status
.set_phase(crate::crawler::status::Phase::CoverBackfill)
.await;
// so a runaway error stream can't monopolise the tick. It sets the
// CoverBackfill{index,total} phase + current_cover per entry.
match pipeline::backfill_missing_covers(
&self.browser_manager,
&self.db,
@@ -402,6 +401,7 @@ impl MetadataPass for RealMetadataPass {
pipeline::COVER_BACKFILL_DEFAULT_MAX,
&self.download_allowlist,
self.max_image_bytes,
Some(&self.status),
self.tor.as_deref(),
)
.await
@@ -437,6 +437,9 @@ struct RealChapterDispatcher {
restart_threshold: u32,
/// How long a coordinated restart waits for in-flight leases to drain.
drain_deadline: std::time::Duration,
/// Live status surface — the dispatcher registers each chapter it
/// crawls (with a realtime page count) here.
status: crate::crawler::status::StatusHandle,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
@@ -452,10 +455,21 @@ impl ChapterDispatcher for RealChapterDispatcher {
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
.await
.context("look up chapter for dispatch")?;
let Some((manga_id, source_url)) = row else {
let Some((manga_id, source_url, manga_title, chapter_number)) = row else {
// Chapter (or its source row) is gone — ack done.
return Ok(SyncOutcome::Skipped);
};
// Register the chapter as crawling now (live status). The
// guard removes it on every exit path — success, panic, or
// the worker's outer-timeout drop.
let _active = self.status.begin_chapter(crate::crawler::status::ActiveChapter {
manga_id,
manga_title,
chapter_id,
chapter_number,
pages_done: 0,
pages_total: None,
});
let lease = self.browser_manager.acquire().await?;
let result = content::sync_chapter_content(
&lease,
@@ -470,6 +484,7 @@ impl ChapterDispatcher for RealChapterDispatcher {
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
Some(&self.status),
)
.await;
drop(lease);

View File

@@ -417,6 +417,8 @@ async fn sync_bookmarked_chapter_content(
allowlist.as_ref(),
max_image_bytes,
tor.as_deref(),
// CLI one-shot — no live status surface.
None,
)
.await;
drop(lease);

View File

@@ -206,6 +206,10 @@ pub async fn sync_chapter_content(
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
tor: Option<&crate::crawler::tor::TorController>,
// Optional live-status sink for the realtime page counter. The daemon
// dispatcher passes the shared handle (the chapter has already been
// registered via `begin_chapter`); the CLI / admin resync pass `None`.
progress: Option<&crate::crawler::status::StatusHandle>,
) -> anyhow::Result<SyncOutcome> {
// Skip if already fetched, unless caller explicitly forces.
if !force_refetch {
@@ -267,8 +271,13 @@ pub async fn sync_chapter_content(
// final DB commit) fails — preserving the all-or-nothing guarantee
// without holding a DB transaction open across the network puts
// (which matters once `Storage` is backed by S3).
let mut written_keys: Vec<String> = Vec::with_capacity(images.len());
let mut stored: Vec<StoredPage> = Vec::with_capacity(images.len());
let total = images.len();
// Publish the now-known page total so the dashboard shows "0/N".
if let Some(p) = progress {
p.set_chapter_pages(chapter_id, 0, Some(total));
}
let mut written_keys: Vec<String> = Vec::with_capacity(total);
let mut stored: Vec<StoredPage> = Vec::with_capacity(total);
for img in &images {
match download_and_store_page(
storage,
@@ -287,6 +296,10 @@ pub async fn sync_chapter_content(
Ok(page) => {
written_keys.push(page.storage_key.clone());
stored.push(page);
// Live page counter: push the climbing count to subscribers.
if let Some(p) = progress {
p.set_chapter_pages(chapter_id, stored.len(), Some(total));
}
}
Err(e) => {
cleanup_orphans(storage, &written_keys).await;

View File

@@ -48,7 +48,7 @@ use tokio_util::sync::CancellationToken;
use crate::crawler::content::SyncOutcome;
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
use crate::crawler::pipeline;
use crate::crawler::status::{Phase, StatusHandle, WorkerState};
use crate::crawler::status::{Phase, StatusHandle};
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
@@ -396,17 +396,10 @@ impl WorkerContext {
})
};
// Publish what this worker is doing for the live status surface.
if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload {
self.status
.set_worker(
self.id,
WorkerState::Working {
chapter_id: *chapter_id,
},
)
.await;
}
// The "currently crawling" chapter (with its live page count) is
// registered by the dispatcher itself (RealChapterDispatcher) so it
// carries the manga/chapter identity + page progress and is removed
// via an RAII guard on every exit path.
// Outer timeout: a dispatch that exceeds `job_timeout` is acked
// failed (exponential backoff) rather than wedging the worker.
@@ -414,7 +407,6 @@ impl WorkerContext {
.catch_unwind();
let outcome = tokio::time::timeout(self.job_timeout, dispatch).await;
heartbeat.abort();
self.status.set_worker(self.id, WorkerState::Idle).await;
let outcome = match outcome {
Ok(o) => o,

View File

@@ -349,7 +349,14 @@ pub async fn run_metadata_pass(
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
if needs_cover {
if let Some(cover_url) = manga.cover_url.as_deref() {
match download_and_store_cover(
if let Some(s) = status {
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
manga_id: upsert.manga_id,
manga_title: manga.title.clone(),
}))
.await;
}
let cover_result = download_and_store_cover(
db,
storage,
http,
@@ -360,8 +367,11 @@ pub async fn run_metadata_pass(
allowlist,
max_image_bytes,
)
.await
{
.await;
if let Some(s) = status {
s.set_current_cover(None).await;
}
match cover_result {
Ok(()) => stats.covers_fetched += 1,
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
@@ -615,6 +625,7 @@ pub async fn backfill_missing_covers(
max_mangas: usize,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
status: Option<&crate::crawler::status::StatusHandle>,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<CoverBackfillStats> {
let mut stats = CoverBackfillStats::default();
@@ -637,8 +648,13 @@ pub async fn backfill_missing_covers(
let browser_ref: &chromiumoxide::Browser = &lease;
let ctx = FetchContext { browser: browser_ref, rate, tor };
for entry in entries {
let total = entries.len();
for (index, entry) in entries.into_iter().enumerate() {
stats.considered += 1;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::CoverBackfill { index, total })
.await;
}
// Metadata-only TargetSource: skip chapter-list parsing so a
// missing-cover refetch doesn't soft-drop chapters on a partial
// render. Cover URL alone is what we need.
@@ -648,8 +664,8 @@ pub async fn backfill_missing_covers(
title: String::new(),
url: entry.source_url.clone(),
};
let cover_url = match source.fetch_manga(&ctx, &r).await {
Ok(manga) => manga.cover_url,
let manga = match source.fetch_manga(&ctx, &r).await {
Ok(manga) => manga,
Err(e) => {
tracing::warn!(
manga_id = %entry.manga_id,
@@ -661,7 +677,7 @@ pub async fn backfill_missing_covers(
continue;
}
};
let Some(cover_url) = cover_url else {
let Some(cover_url) = manga.cover_url.clone() else {
tracing::warn!(
manga_id = %entry.manga_id,
url = %entry.source_url,
@@ -670,7 +686,14 @@ pub async fn backfill_missing_covers(
stats.failed += 1;
continue;
};
match download_and_store_cover(
if let Some(s) = status {
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
manga_id: entry.manga_id,
manga_title: manga.title.clone(),
}))
.await;
}
let cover_result = download_and_store_cover(
db,
storage,
http,
@@ -681,8 +704,11 @@ pub async fn backfill_missing_covers(
allowlist,
max_image_bytes,
)
.await
{
.await;
if let Some(s) = status {
s.set_current_cover(None).await;
}
match cover_result {
Ok(()) => stats.fetched += 1,
Err(e) => {
tracing::warn!(

View File

@@ -235,7 +235,7 @@ impl ResyncService for RealResyncService {
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
.await
.context("look up chapter_sources for resync")?;
let Some((manga_id, source_url)) = row else {
let Some((manga_id, source_url, _title, _number)) = row else {
return Err(ResyncError::NoChapterSource.into());
};
@@ -257,6 +257,8 @@ impl ResyncService for RealResyncService {
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
// Admin resync isn't a daemon worker slot — no live status.
None,
)
.await;
drop(lease);

View File

@@ -3,16 +3,17 @@
//! The metadata pass runs inline in the cron tick (it is not a
//! `crawler_jobs` row), so without this surface "what is the crawler doing
//! right now" is unanswerable from the dashboard. The daemon publishes its
//! current [`Phase`] and per-worker activity into a shared
//! [`StatusHandle`]; the admin endpoint reads a [`CrawlerStatus`] snapshot
//! and composes it with DB-derived queue counts + the session/browser
//! flags.
//! current [`Phase`], the chapters being crawled right now (with a live
//! page count), and the cover being fetched into a shared [`StatusHandle`];
//! the admin endpoint reads a [`CrawlerStatus`] snapshot and composes it
//! with DB-derived counts + the session/browser flags.
//!
//! NOTE: this is per-process state. The deployment is a single server
//! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals
//! (last-pass summary, runtime session) are persisted in `crawler_state`.
use std::sync::Arc;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use serde::Serialize;
@@ -21,8 +22,8 @@ use uuid::Uuid;
use crate::crawler::pipeline::MetadataStats;
/// What the daemon is doing right now. Serialised with an internal `state`
/// tag so the frontend can switch on it.
/// What the daemon's metadata pass is doing right now. Serialised with an
/// internal `state` tag so the frontend can switch on it.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum Phase {
@@ -37,18 +38,30 @@ pub enum Phase {
total: Option<usize>,
title: String,
},
/// Backfilling covers that failed on first attempt.
CoverBackfill,
/// Backfilling covers that failed on first attempt. `index`/`total`
/// track progress through this tick's batch.
CoverBackfill { index: usize, total: usize },
}
/// Per-worker activity. (Browser-restart and session-expired states are
/// surfaced as separate top-level fields by the endpoint, sourced from the
/// `BrowserManager` phase and the session flag respectively.)
/// A chapter being downloaded right now, with a live page count. Keyed in
/// the status by `chapter_id`; inserted by the dispatcher when a job starts
/// and removed (via an RAII guard) when it finishes, panics, or times out.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum WorkerState {
Idle,
Working { chapter_id: Uuid },
pub struct ActiveChapter {
pub manga_id: Uuid,
pub manga_title: String,
pub chapter_id: Uuid,
pub chapter_number: i32,
pub pages_done: usize,
/// `None` until the chapter page list has been parsed.
pub pages_total: Option<usize>,
}
/// The manga whose cover is being downloaded right now.
#[derive(Clone, Debug, Serialize)]
pub struct CoverTarget {
pub manga_id: Uuid,
pub manga_title: String,
}
/// Summary of the most recent metadata pass (persisted across restarts in
@@ -62,24 +75,30 @@ pub struct LastPass {
pub mangas_failed: usize,
}
/// The mutable slice of status the daemon owns. Session/browser/queue are
/// composed at read time by the endpoint (they live elsewhere), so they
/// are not stored here.
/// A point-in-time snapshot returned by [`StatusHandle::snapshot`]. The
/// session/browser/queue fields are composed at read time by the endpoint
/// (they live elsewhere), so they are not stored here.
#[derive(Clone, Debug, Serialize)]
pub struct CrawlerStatus {
pub phase: Phase,
pub workers: Vec<WorkerState>,
/// Number of configured chapter workers (for "N busy / M workers").
pub worker_count: usize,
/// Chapters being downloaded right now, with live page counts.
pub active_chapters: Vec<ActiveChapter>,
pub last_pass: LastPass,
/// The cover being downloaded right now, if any.
pub current_cover: Option<CoverTarget>,
}
impl CrawlerStatus {
fn new(num_workers: usize) -> Self {
Self {
phase: Phase::Idle { next_fire: None },
workers: (0..num_workers.max(1)).map(|_| WorkerState::Idle).collect(),
last_pass: LastPass::default(),
}
}
/// Scalar status state held under the async `RwLock`. Active chapters live
/// in a separate sync map so per-page updates and RAII removal don't need
/// to `.await` (removal happens in `Drop`).
#[derive(Clone, Debug)]
struct Scalar {
phase: Phase,
worker_count: usize,
last_pass: LastPass,
current_cover: Option<CoverTarget>,
}
/// Cloneable handle the daemon tasks use to publish status. Cheap to clone
@@ -88,18 +107,35 @@ impl CrawlerStatus {
/// get pushed an update instead of polling.
#[derive(Clone)]
pub struct StatusHandle {
inner: Arc<RwLock<CrawlerStatus>>,
/// Monotonic version bumped on every change. SSE handlers
/// `subscribe()` and `await .changed()` for instant pushes; `watch`
/// has no lost-wakeup so a change between snapshots is never missed.
scalar: Arc<RwLock<Scalar>>,
/// Currently-downloading chapters keyed by `chapter_id`. A sync mutex so
/// the RAII [`ChapterGuard`]'s `Drop` can remove without `.await`.
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
/// Monotonic version bumped on every change. SSE handlers `subscribe()`
/// and `await .changed()` for instant pushes; `watch` has no
/// lost-wakeup so a change between snapshots is never missed.
version: Arc<watch::Sender<u64>>,
}
/// Lock the active map, recovering from a poisoned mutex (we never hold the
/// lock across a panic-prone section, so the data is still consistent).
fn lock_active(
m: &Mutex<HashMap<Uuid, ActiveChapter>>,
) -> std::sync::MutexGuard<'_, HashMap<Uuid, ActiveChapter>> {
m.lock().unwrap_or_else(|e| e.into_inner())
}
impl StatusHandle {
pub fn new(num_workers: usize) -> Self {
let (version, _rx) = watch::channel(0u64);
Self {
inner: Arc::new(RwLock::new(CrawlerStatus::new(num_workers))),
scalar: Arc::new(RwLock::new(Scalar {
phase: Phase::Idle { next_fire: None },
worker_count: num_workers.max(1),
last_pass: LastPass::default(),
current_cover: None,
})),
active: Arc::new(Mutex::new(HashMap::new())),
version: Arc::new(version),
}
}
@@ -122,15 +158,37 @@ impl StatusHandle {
}
pub async fn set_phase(&self, phase: Phase) {
self.inner.write().await.phase = phase;
self.scalar.write().await.phase = phase;
self.bump();
}
pub async fn set_worker(&self, id: usize, state: WorkerState) {
/// Set (or clear) the cover being downloaded right now.
pub async fn set_current_cover(&self, cover: Option<CoverTarget>) {
self.scalar.write().await.current_cover = cover;
self.bump();
}
/// Register a chapter as crawling now; returns a guard that removes it
/// when dropped (on completion, panic-unwind, or timeout-drop).
pub fn begin_chapter(&self, chapter: ActiveChapter) -> ChapterGuard {
let id = chapter.chapter_id;
lock_active(&self.active).insert(id, chapter);
self.bump();
ChapterGuard {
active: Arc::clone(&self.active),
version: Arc::clone(&self.version),
chapter_id: id,
}
}
/// Update the live page count of an in-flight chapter. Sync (no
/// `.await`) so it's cheap to call once per stored page.
pub fn set_chapter_pages(&self, chapter_id: Uuid, done: usize, total: Option<usize>) {
{
let mut s = self.inner.write().await;
if let Some(slot) = s.workers.get_mut(id) {
*slot = state;
let mut map = lock_active(&self.active);
if let Some(c) = map.get_mut(&chapter_id) {
c.pages_done = done;
c.pages_total = total;
}
}
self.bump();
@@ -138,28 +196,55 @@ impl StatusHandle {
/// Record a finished metadata pass. Stamps `at` with `now`.
pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime<Utc>) {
{
let mut s = self.inner.write().await;
s.last_pass = LastPass {
at: Some(at),
discovered: stats.discovered,
upserted: stats.upserted,
covers_fetched: stats.covers_fetched,
mangas_failed: stats.mangas_failed,
};
}
self.scalar.write().await.last_pass = LastPass {
at: Some(at),
discovered: stats.discovered,
upserted: stats.upserted,
covers_fetched: stats.covers_fetched,
mangas_failed: stats.mangas_failed,
};
self.bump();
}
/// Seed the last-pass summary from a persisted `crawler_state` value on
/// startup so the dashboard isn't blank until the first tick.
pub async fn set_last_pass(&self, last: LastPass) {
self.inner.write().await.last_pass = last;
self.scalar.write().await.last_pass = last;
self.bump();
}
pub async fn snapshot(&self) -> CrawlerStatus {
self.inner.read().await.clone()
let scalar = self.scalar.read().await.clone();
let mut active_chapters: Vec<ActiveChapter> =
lock_active(&self.active).values().cloned().collect();
// Stable, readable order: by chapter number then id.
active_chapters.sort_by(|a, b| {
a.chapter_number
.cmp(&b.chapter_number)
.then(a.chapter_id.cmp(&b.chapter_id))
});
CrawlerStatus {
phase: scalar.phase,
worker_count: scalar.worker_count,
active_chapters,
last_pass: scalar.last_pass,
current_cover: scalar.current_cover,
}
}
}
/// RAII handle removing an [`ActiveChapter`] from the live status when the
/// chapter dispatch finishes, panics, or is dropped on timeout.
pub struct ChapterGuard {
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
version: Arc<watch::Sender<u64>>,
chapter_id: Uuid,
}
impl Drop for ChapterGuard {
fn drop(&mut self) {
lock_active(&self.active).remove(&self.chapter_id);
self.version.send_modify(|v| *v = v.wrapping_add(1));
}
}
@@ -167,16 +252,73 @@ impl StatusHandle {
mod tests {
use super::*;
fn sample_chapter(n: i32) -> ActiveChapter {
ActiveChapter {
manga_id: Uuid::new_v4(),
manga_title: "M".into(),
chapter_id: Uuid::new_v4(),
chapter_number: n,
pages_done: 0,
pages_total: None,
}
}
#[tokio::test]
async fn set_worker_updates_only_that_slot() {
async fn begin_chapter_shows_in_snapshot_and_guard_removes_on_drop() {
let h = StatusHandle::new(2);
let cid = Uuid::new_v4();
h.set_worker(1, WorkerState::Working { chapter_id: cid }).await;
let chap = sample_chapter(7);
let cid = chap.chapter_id;
{
let _guard = h.begin_chapter(chap);
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters.len(), 1);
assert_eq!(snap.active_chapters[0].chapter_id, cid);
assert_eq!(snap.worker_count, 2);
}
// Guard dropped → entry removed.
let snap = h.snapshot().await;
assert!(matches!(snap.workers[0], WorkerState::Idle));
assert!(matches!(snap.workers[1], WorkerState::Working { .. }));
// Out-of-range id is a no-op, not a panic.
h.set_worker(99, WorkerState::Idle).await;
assert!(snap.active_chapters.is_empty());
}
#[tokio::test]
async fn set_chapter_pages_updates_live_count() {
let h = StatusHandle::new(1);
let chap = sample_chapter(1);
let cid = chap.chapter_id;
let _guard = h.begin_chapter(chap);
h.set_chapter_pages(cid, 3, Some(20));
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters[0].pages_done, 3);
assert_eq!(snap.active_chapters[0].pages_total, Some(20));
// Updating an unknown chapter is a no-op, not a panic.
h.set_chapter_pages(Uuid::new_v4(), 9, Some(9));
}
#[tokio::test]
async fn snapshot_sorts_active_chapters_by_number() {
let h = StatusHandle::new(2);
let _g1 = h.begin_chapter(sample_chapter(5));
let _g2 = h.begin_chapter(sample_chapter(2));
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters[0].chapter_number, 2);
assert_eq!(snap.active_chapters[1].chapter_number, 5);
}
#[tokio::test]
async fn set_current_cover_round_trips() {
let h = StatusHandle::new(1);
let mid = Uuid::new_v4();
h.set_current_cover(Some(CoverTarget {
manga_id: mid,
manga_title: "One Piece".into(),
}))
.await;
assert_eq!(
h.snapshot().await.current_cover.map(|c| c.manga_id),
Some(mid)
);
h.set_current_cover(None).await;
assert!(h.snapshot().await.current_cover.is_none());
}
#[tokio::test]
@@ -197,14 +339,17 @@ mod tests {
}
#[tokio::test]
async fn subscribe_resolves_on_mutation_and_poke() {
async fn subscribe_resolves_on_mutation_poke_and_chapter_change() {
let h = StatusHandle::new(1);
let mut rx = h.subscribe();
// A mutation wakes the subscriber.
h.set_phase(Phase::WalkingList).await;
rx.changed().await.unwrap();
// A bare poke (external signal) also wakes it.
h.poke();
rx.changed().await.unwrap();
// begin_chapter + guard drop each bump the version.
let g = h.begin_chapter(sample_chapter(1));
rx.changed().await.unwrap();
drop(g);
rx.changed().await.unwrap();
}
}

View File

@@ -138,14 +138,18 @@ pub async fn page_count(pool: &PgPool, id: Uuid) -> sqlx::Result<Option<i32>> {
/// filter — this resolver stays in lockstep so a chapter that was
/// dropped between enqueue and lease isn't dispatched against a stale
/// URL.
/// Returns `(manga_id, source_url, manga_title, chapter_number)`. The
/// title + number feed the live "currently crawling" status; the rest is
/// what the dispatcher needs to do the work.
pub async fn dispatch_target(
pool: &PgPool,
chapter_id: Uuid,
) -> sqlx::Result<Option<(Uuid, String)>> {
) -> sqlx::Result<Option<(Uuid, String, String, i32)>> {
sqlx::query_as(
"SELECT c.manga_id, cs.source_url \
"SELECT c.manga_id, cs.source_url, m.title, c.number \
FROM chapters c \
JOIN chapter_sources cs ON cs.chapter_id = c.id \
JOIN mangas m ON m.id = c.manga_id \
WHERE c.id = $1 \
AND cs.dropped_at IS NULL \
ORDER BY cs.last_seen_at DESC \

View File

@@ -699,6 +699,155 @@ pub async fn list_dead_jobs(
Ok((items, total))
}
/// An in-flight chapter-content job (`pending` or `running`) joined to its
/// chapter + manga, for the "queued chapters" admin view.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ActiveJob {
pub id: Uuid,
pub chapter_id: Option<Uuid>,
pub manga_id: Option<Uuid>,
pub manga_title: Option<String>,
pub chapter_number: Option<i32>,
/// `"pending"` or `"running"`.
pub state: String,
pub attempts: i32,
pub max_attempts: i32,
pub updated_at: DateTime<Utc>,
}
/// Paginated list of `pending`/`running` chapter-content jobs (which
/// chapters of which mangas are queued or being crawled). Running first,
/// then by scheduled order. `search` filters on manga title.
pub async fn list_active_jobs(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<ActiveJob>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<ActiveJob> = sqlx::query_as(
r#"
SELECT
cj.id,
(cj.payload->>'chapter_id')::uuid AS chapter_id,
c.manga_id AS manga_id,
m.title AS manga_title,
c.number AS chapter_number,
cj.state,
cj.attempts,
cj.max_attempts,
cj.updated_at
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state IN ('pending','running')
AND cj.payload->>'kind' = 'sync_chapter_content'
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY (cj.state = 'running') DESC, cj.scheduled_at, cj.created_at
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state IN ('pending','running')
AND cj.payload->>'kind' = 'sync_chapter_content'
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// A manga whose cover is still missing (queued for cover fetch).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct MissingCoverRow {
pub manga_id: Uuid,
pub manga_title: String,
}
/// Count mangas with no cover yet but a live source row — the cover
/// backlog the metadata pass + backfill drain.
pub async fn count_missing_covers(pool: &PgPool) -> sqlx::Result<i64> {
sqlx::query_scalar(
r#"
SELECT COUNT(*) FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
"#,
)
.fetch_one(pool)
.await
}
/// Paginated list of mangas queued for a cover fetch (no cover yet + a live
/// source), with titles. `search` filters on title. Freshest source first.
pub async fn list_missing_cover_mangas(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<MissingCoverRow>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<MissingCoverRow> = sqlx::query_as(
r#"
SELECT m.id AS manga_id, m.title AS manga_title
FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY m.updated_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*) FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// Scope of a dead-job requeue.
#[derive(Debug, Clone)]
pub enum RequeueScope {

View File

@@ -64,6 +64,102 @@ async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid {
job_id
}
/// Seed a chapter-content job in a given state ('pending'/'running').
async fn seed_job(pool: &PgPool, title: &str, state: &str) {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
.bind(chapter_id)
.bind(manga_id)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO crawler_jobs (id, payload, state) VALUES ($1, $2, $3)")
.bind(Uuid::new_v4())
.bind(json!({
"kind": "sync_chapter_content",
"source_id": "target",
"chapter_id": chapter_id,
"source_chapter_key": "k",
}))
.bind(state)
.execute(pool)
.await
.unwrap();
}
/// Seed a manga with no cover + a live source row (queued for cover fetch).
async fn seed_missing_cover(pool: &PgPool, title: &str) {
let manga_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target','T','http://x') ON CONFLICT DO NOTHING")
.execute(pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
VALUES ('target', $1, $2, 'http://x/m')",
)
.bind(format!("k-{manga_id}"))
.bind(manga_id)
.execute(pool)
.await
.unwrap();
}
#[sqlx::test(migrations = "./migrations")]
async fn active_jobs_and_covers_lists_over_http(pool: PgPool) {
seed_job(&pool, "Naruto", "pending").await;
seed_job(&pool, "Bleach", "running").await;
seed_missing_cover(&pool, "One Piece").await;
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
// Queued/active chapters.
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["page"]["total"], 2);
// Queued covers.
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/covers", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["page"]["total"], 1);
assert_eq!(body["items"][0]["manga_title"], "One Piece");
// Both are admin-gated.
let (_u, plain) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &plain))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")]
async fn get_status_requires_admin(pool: PgPool) {
let h = harness(pool);

View File

@@ -7,6 +7,32 @@ use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
/// Seed a manga with no cover + a live source row (so it's "queued for a
/// cover fetch"). Returns the manga id.
async fn seed_missing_cover(pool: &PgPool, title: &str) -> Uuid {
let manga_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target', 'T', 'http://x') ON CONFLICT DO NOTHING")
.execute(pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
VALUES ('target', $1, $2, 'http://x/m')",
)
.bind(format!("k-{manga_id}"))
.bind(manga_id)
.execute(pool)
.await
.unwrap();
manga_id
}
/// Seed a manga + chapter and return their ids.
async fn seed_chapter(pool: &PgPool, title: &str, number: i32) -> (Uuid, Uuid) {
let manga_id = Uuid::new_v4();
@@ -202,6 +228,66 @@ async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: Pg
}
}
#[sqlx::test(migrations = "./migrations")]
async fn list_active_jobs_returns_pending_and_running_running_first(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
let (_m2, c2) = seed_chapter(&pool, "Bleach", 10).await;
insert_job(&pool, c1, "pending", 0).await;
insert_job(&pool, c2, "running", 1).await;
// A dead + a done job must NOT appear.
let (_m3, c3) = seed_chapter(&pool, "Gone", 1).await;
insert_job(&pool, c3, "dead", 5).await;
let (items, total) = crawler::list_active_jobs(&pool, None, 50, 0).await.unwrap();
assert_eq!(total, 2);
assert_eq!(items.len(), 2);
// Running first.
assert_eq!(items[0].state, "running");
assert_eq!(items[0].manga_title.as_deref(), Some("Bleach"));
assert_eq!(items[1].state, "pending");
assert_eq!(items[1].chapter_number, Some(700));
}
#[sqlx::test(migrations = "./migrations")]
async fn list_active_jobs_filters_by_title(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 1).await;
let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await;
insert_job(&pool, c1, "pending", 0).await;
insert_job(&pool, c2, "pending", 0).await;
let (items, total) = crawler::list_active_jobs(&pool, Some("piece"), 50, 0)
.await
.unwrap();
assert_eq!(total, 1);
assert_eq!(items[0].manga_title.as_deref(), Some("One Piece"));
}
#[sqlx::test(migrations = "./migrations")]
async fn missing_covers_count_and_list(pool: PgPool) {
seed_missing_cover(&pool, "Naruto").await;
seed_missing_cover(&pool, "Bleach").await;
// A manga WITH a cover must not be counted.
let with_cover = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, 'Done', 'k.jpg')")
.bind(with_cover)
.execute(&pool)
.await
.unwrap();
assert_eq!(crawler::count_missing_covers(&pool).await.unwrap(), 2);
let (items, total) = crawler::list_missing_cover_mangas(&pool, None, 50, 0)
.await
.unwrap();
assert_eq!(total, 2);
assert_eq!(items.len(), 2);
let (items, total) = crawler::list_missing_cover_mangas(&pool, Some("naru"), 50, 0)
.await
.unwrap();
assert_eq!(total, 1);
assert_eq!(items[0].manga_title, "Naruto");
}
#[sqlx::test(migrations = "./migrations")]
async fn job_state_counts_groups_by_state(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;

View File

@@ -109,7 +109,7 @@ async fn dispatch_target_prefers_most_recent_live_source(pool: PgPool) {
seed_chapter_with_two_live_sources(&pool).await;
let row = dispatch_target(&pool, chapter_id).await.unwrap();
let (_manga_id, source_url) =
let (_manga_id, source_url, _title, _number) =
row.expect("two live sources should yield a dispatch target");
assert_eq!(
source_url, new_url,
@@ -133,7 +133,7 @@ async fn dispatch_target_skips_dropped_sources(pool: PgPool) {
.unwrap();
let row = dispatch_target(&pool, chapter_id).await.unwrap();
let (_manga_id, source_url) =
let (_manga_id, source_url, _title, _number) =
row.expect("a single live source should still yield a dispatch target");
assert!(
source_url != new_url,