Compare commits

...

15 Commits

Author SHA1 Message Date
MechaCat02
e02d125f51 feat(crawler): live cover + chapter-content observability with realtime page counts
Extends the live dashboard so an operator can see exactly what's being
fetched, in realtime:

- Chapters being crawled now are tracked in the status as `active_chapters`
  (manga title · ch.N) with a live page counter that climbs per stored page
  (set_chapter_pages, pushed via the existing watch→SSE). The dispatcher
  registers each via an RAII ChapterGuard (sync Mutex) that removes the
  entry on completion, panic, or timeout-drop — replacing the old per-worker
  slot model.
- Covers: status now carries the cover being fetched now (`current_cover`,
  set around download_and_store_cover in both the metadata pass and backfill)
  and a `covers_queued` backlog count; CoverBackfill phase gains index/total.
- Two paginated backlog endpoints (fetched on demand, auto-refreshed when the
  live counts change): GET /admin/crawler/active-jobs (which chapters of which
  mangas are queued/running) and GET /admin/crawler/covers (mangas missing a
  cover). repo: list_active_jobs, list_missing_cover_mangas, count_missing_covers.
- dispatch_target now also returns manga title + chapter number.

Frontend: the crawler page replaces the Workers table with an Active-chapters
table (live page bars), adds a current-cover line + covers-queued figure, and
two backlog sections (Queued chapters / Queued covers) with search + Pager,
auto-refetched via $effect on the live counts.

Tests: status guard/page + cover unit tests; repo list/count tests; endpoint
tests; frontend api tests. Version 0.53.1 -> 0.54.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 20:41:51 +02:00
MechaCat02
fb4182f68d fix(admin): clear session-expired flag on successful browser restart (0.53.1)
A successful `coordinated_restart` re-runs `on_launch`, which re-injects
PHPSESSID and re-probes via `verify_session_with_recircuit` — so reaching
`Ok(())` proves the session is live. But the handler never dropped the
sticky `session_expired` flag, so the admin UI continued to report
"Session Expired" and chapter workers kept idling until the operator
made a second click on "Clear expired" (or pushed a new cookie).

The fix is one line in `restart_browser`: on `Ok(())`, call
`c.session.clear_expired()`. The error path still leaves the flag set
since a failed restart means the probe didn't confirm.

Adds a focused `clear_expired_flips_sticky_flag_without_touching_session`
unit test to pin the controller-side semantic; the existing
`update_persists_and_clears_expired_then_round_trips` test continues to
cover the cookie-refresh path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-04 19:49:28 +02:00
MechaCat02
da6e320836 feat(crawler): live status via SSE instead of polling
Replace the dashboard's 5s polling with a Server-Sent Events stream:

- StatusHandle gains a tokio `watch` version bumped on every mutation;
  GET /admin/crawler/stream subscribes and pushes a composed snapshot
  immediately on connect, then on every status change (instant, no
  lost-wakeup) plus a 5s backstop for DB queue counts / browser phase.
- Non-status signals poke the notifier so they push immediately too:
  session-expired (worker), session update / clear-expired / browser
  restart (endpoints).
- compose_status is shared by the one-shot GET and the stream; the stream
  tolerates transient DB errors with a keep-alive comment instead of
  tearing down.

Frontend: the crawler page opens an EventSource on mount and closes it on
destroy, so the subscription is scoped to the active page (no global
subscription). A one-shot fetch still paints initial state / serves as a
fallback if SSE is blocked; a live/reconnecting indicator reflects the
connection. The existing reverse proxy already streams SSE (its abort
timer is cleared once response headers arrive), so no proxy change needed.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-04 19:13:50 +02:00
MechaCat02
832042d2b7 fix(crawler): review findings — requeue dedup, restart result, session validation
- requeue_dead_jobs: when a chapter has multiple dead jobs, revive only the
  newest (DISTINCT ON the chapter key) so a single UPDATE can't flip two
  dead rows for one chapter to pending and violate the partial unique dedup
  index (was a 500 that requeued nothing). Non-chapter jobs fall back to row
  id. Regression test added. (critical)
- coordinated_restart: a caller that coalesces into an in-progress restart
  now reports that restart's real outcome instead of a blind success, so the
  session-update "valid" / restart "ok" signal can't be falsely positive.
- SessionController::update: reject control chars / ';' / ',' in PHPSESSID
  before it reaches the cookie string + CDP cookie. Test added.
- Add non-admin 403 test on a mutating crawler endpoint; fix stale
  stream-to-storage doc comment.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 21:07:10 +02:00
MechaCat02
ec0a8f2b5d feat(admin): Crawler dashboard — live status, controls, dead-job requeue
New /admin/crawler tab (5s-polled): status hero (daemon/session/browser
pills, phase line + progress bar, session-expired banner, last-pass),
controls (run pass, restart browser w/ confirm, manage session modal,
clear expired), queue gauges + worker table, and a dead-jobs table with
search, Pager, and per-job / per-manga / all requeue.

Adds inline "requeue" on failed chapters in the admin manga page, the
typed api-client functions in lib/api/admin.ts (+ tests), and the Crawler
nav tab. Version 0.52.0 -> 0.53.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:48:13 +02:00
MechaCat02
6f0a8d88c9 feat(api): add per-chapter requeue scope for dead jobs
Lets the admin manga page requeue a single failed chapter's dead job(s)
inline, without a job id. Adds RequeueScope::Chapter + the matching
request variant and a repo test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:48:13 +02:00
MechaCat02
41bf9455a1 feat(api): admin crawler observability + control endpoints
GET  /admin/crawler                      live status (phase, workers,
                                          last pass, session, browser, queue)
POST /admin/crawler/run                  trigger an out-of-cycle metadata pass
POST /admin/crawler/browser/restart      coordinated Chromium restart
POST /admin/crawler/session              refresh PHPSESSID + re-probe
POST /admin/crawler/session/clear-expired clear the sticky expired flag
GET  /admin/crawler/dead-jobs            paginated dead-letter list
POST /admin/crawler/dead-jobs/requeue    requeue all / per-manga / single

All cookie-only via RequireAdmin; control endpoints 503 when the daemon is
disabled; mutations are audit-logged. Reads compose the live status with
DB-derived queue counts.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:39:04 +02:00
MechaCat02
cd0a1e13a9 feat(crawler): live status surface, runtime session, dead-job repo, auto-restart
Adds the in-process observability + control infrastructure the admin
dashboard consumes:

- status.rs: CrawlerStatus/Phase/WorkerState + StatusHandle. The daemon
  publishes its current phase (idle/walking/fetching-metadata/cover-backfill),
  per-worker activity, and last-pass summary. Wired through the cron,
  run_metadata_pass, and the worker loop.
- session_control.rs: SessionController refreshes PHPSESSID at runtime —
  rewrites the shared reqwest cookie jar, updates the value on_launch reads,
  persists to crawler_state (survives restart), and clears the expired flag.
  on_launch now reads the live session instead of a startup snapshot.
- RealChapterDispatcher auto-triggers a coordinated browser restart after
  CRAWLER_BROWSER_RESTART_THRESHOLD consecutive transient failures.
- repo::crawler: list_dead_jobs, requeue_dead_jobs (all/manga/job, bypassing
  the quarantine, skipping live duplicates), job_state_counts.
- AppState gains CrawlerControl bundling browser_manager + session + status
  + metadata_pass for the admin endpoints.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:38:54 +02:00
MechaCat02
3f91bea768 feat(crawler): coordinated browser restart gate in BrowserManager
Adds a Healthy/Draining/Restarting lifecycle gate. `acquire()` parks while
a restart is in progress; `coordinated_restart(deadline)` blocks new
acquires, drains in-flight leases (bounded, then forces), closes +
relaunches Chromium (re-running on_launch → re-inject session + probe),
then resumes parked acquirers. Concurrent restart requests collapse into
one relaunch; the phase always returns to Healthy so a failed relaunch
never wedges acquisition.

The metadata pass cooperates via is_restart_pending() at its per-manga
checkpoint, yielding its long-lived lease (recovery sweep next tick)
instead of stalling the drain.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:18:09 +02:00
MechaCat02
7a6815661f feat(crawler): reliability fixes — heartbeat, streaming, jitter, timeout, breaker
A1 Lease heartbeat: jobs::renew keeps a long-but-healthy job's lease fresh
so it is never stolen mid-flight nor inflated toward max_attempts.
A2 Stream chapter pages straight to storage (peak memory = one image) and
persist rows + page_count in one short transaction off the network path
(S3-ready); roll back stored blobs on failure via Storage::delete.
A3 ±20% jitter on exponential backoff to avoid a retry thundering herd.
A4 Outer per-dispatch timeout (CRAWLER_JOB_TIMEOUT_SECS, default 600) so a
hung job is acked-failed instead of wedging a worker.
A5 Metadata circuit-breaker (CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES,
default 10): abort a pass on a source outage without marking a clean exit,
so the next tick recovery-sweeps.

Adds CRAWLER_BROWSER_RESTART_THRESHOLD config (used by the upcoming
coordinated browser restart). Bumps version 0.52.0 -> 0.53.0.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 20:13:17 +02:00
MechaCat02
679abae736 feat(chapter): preserve source-site order in chapter list (0.52.0)
Some checks failed
deploy / test-backend (push) Failing after 11m48s
deploy / test-frontend (push) Successful in 9m45s
deploy / build-and-push (push) Has been skipped
deploy / deploy (push) Has been skipped
The user-facing chapter list ordered by (number ASC, created_at ASC),
which broke the source site's order in two ways: non-numeric entries
("notice. : Officials") parsed to number=0 and clustered at the top,
even though the site placed them mid-list, and variants sharing a
number ("Ch.14 : PH" / "Ch.14 : Official") were torn apart by the
created_at tiebreak.

Capture each chapter's position in the source DOM as `source_index`
(0 = first = newest on this site) on every crawler sync, including the
UPDATE branch so a new chapter prepended on the source shifts every
existing row down by one on the next tick. The list query reverses
this with `ORDER BY source_index DESC NULLS LAST, number ASC,
created_at ASC` so the oldest chapter appears first, variants stay
adjacent in the order the site shows them, and non-numeric entries
land where the site placed them. User-uploaded chapters and pre-
migration rows keep their NULL source_index and fall through to the
prior number/created_at tiebreak via NULLS LAST.

The reader's client-side `[...chapters].sort((a,b) => a.number - b.number)`
is dropped; prev/next now walks the server-ordered array positionally
so it traverses variants and non-numeric entries in display order.

Existing data populates on the next cron tick or via admin force-resync.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 07:25:09 +02:00
MechaCat02
b812c6d16c fix(reader): drop "Chapter N:" prefix from chapter title display (0.51.2)
The chapter list on the manga detail page, the reader's chapter-select
dropdown, the continuous-mode chapter bar, the browser tab title, and
the profile upload-history entries all prepended "Chapter {number}:"
in front of the crawled site title. Source titles already include
"Ch.N" themselves and the manga page renders chapters inside an <ol>,
so the prefix duplicated information the user could already see.

A small chapterLabel(c) helper in $lib/api/chapters returns the site
title as-is, falling back to "Chapter {number}" only when the
crawler captured an empty title (link/option stays non-empty). The
five render sites now call it. The previous-/next-chapter nav
buttons still read "Previous chapter (Ch. N)" / "Next chapter (Ch. N)"
since those are wayfinding labels, not title display.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 07:22:17 +02:00
MechaCat02
e93eec89e5 fix(crawler): queue chapter content in ascending number order (0.51.1)
Both enqueue paths now order by chapters.number so the cron tick and the
bookmark hook insert jobs from chapter 1 upward instead of source-discovery
or random-UUID order. The lease query tiebreaks on created_at so jobs
sharing a batch's scheduled_at come off the queue in insertion order,
propagating the enqueue intent through to dequeue. Concurrent workers
and per-CDN latency can still drift actual completion order.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 21:13:51 +02:00
MechaCat02
8818c890c5 feat(reader): chapter select dropdown for direct chapter jumps (0.51.0)
Adds a chapter `<select>` to the reader's top nav listing every chapter
of the current manga, defaulting to the open chapter; picking another
entry navigates straight to it without going back to the manga detail
page. Options use the "Ch. N — Title" form to match the existing
chapter tile and prev/next buttons in the reader bar.

Covered by a new Playwright spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 07:09:30 +02:00
MechaCat02
c134bdbbde feat: cover retry backfill + admin force-resync for manga & chapter (0.50.0)
Adds a per-tick cover-backfill pass to the crawler daemon so mangas whose
cover download failed on first attempt get retried — the metadata pass's
early-stop optimisation otherwise prevents the walk from revisiting them.

Adds admin-only POST /admin/mangas/:id/resync and POST /admin/chapters/:id/resync
that refetch metadata + cover (or chapter content with force_refetch) from the
crawler source synchronously and return the refreshed row. Surfaced in the
UI as "Force resync" buttons on the manga detail and reader pages,
admin-only via session.user.is_admin.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 22:00:09 +02:00
44 changed files with 6489 additions and 163 deletions

2
backend/Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "mangalord"
version = "0.49.1"
version = "0.54.0"
dependencies = [
"anyhow",
"argon2",

View File

@@ -1,6 +1,6 @@
[package]
name = "mangalord"
version = "0.49.1"
version = "0.54.0"
edition = "2021"
default-run = "mangalord"

View File

@@ -0,0 +1,18 @@
-- Capture each chapter's position in the source site's chapter list so
-- the user-facing list can preserve site order: variants of the same
-- chapter number (e.g. "Ch.14 : PH" next to "Ch.14 : Official") stay
-- adjacent, and non-numeric entries like "notice. : Officials" land
-- where the site placed them rather than clustering at the top under
-- number = 0.
--
-- Lower source_index = closer to the top of the source DOM = newer
-- chapter on this site (it renders newest-first). The list query
-- reverses this with ORDER BY source_index DESC so the oldest chapter
-- appears first in our UI.
--
-- NULL is the sentinel for user-uploaded chapters (no source row) and
-- for crawled rows that pre-date this migration. The list query keeps
-- the existing (number, created_at) tiebreak via NULLS LAST so those
-- fall through to the prior behaviour until the next crawler tick
-- populates the column.
ALTER TABLE chapters ADD COLUMN source_index INTEGER;

View File

@@ -0,0 +1,491 @@
//! Admin-only crawler observability + control endpoints.
//!
//! Mounted under `/api/v1/admin/crawler*`, cookie-only via `RequireAdmin`.
//! All control endpoints return 503 when the crawler daemon is disabled
//! (`AppState.crawler == None`). Reads compose the live in-process status
//! ([`crate::crawler::status`]) with DB-derived queue counts and the
//! session/browser flags.
use std::convert::Infallible;
use std::time::Duration;
use axum::extract::{Query, State};
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::{get, post};
use axum::{Json, Router};
use futures_util::stream::Stream;
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use crate::app::{AppState, CrawlerControl};
use crate::auth::extractor::RequireAdmin;
use crate::crawler::browser_manager::RestartPhase;
use crate::crawler::status::{ActiveChapter, CoverTarget, LastPass, Phase};
use crate::error::{AppError, AppResult};
use crate::repo;
use crate::repo::crawler::{ActiveJob, DeadJob, MissingCoverRow, RequeueScope};
/// Backstop recompose interval for the SSE stream. Phase/worker/session
/// changes push instantly via the status `watch`; this only bounds the
/// staleness of DB-derived queue counts and the browser phase when those
/// change without an accompanying status poke.
const SSE_BACKSTOP: Duration = Duration::from_secs(5);
pub fn routes() -> Router<AppState> {
Router::new()
.route("/admin/crawler", get(get_status))
.route("/admin/crawler/stream", get(stream_status))
.route("/admin/crawler/run", post(run_now))
.route("/admin/crawler/browser/restart", post(restart_browser))
.route("/admin/crawler/session", post(update_session))
.route(
"/admin/crawler/session/clear-expired",
post(clear_session_expired),
)
.route("/admin/crawler/dead-jobs", get(list_dead_jobs))
.route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs))
.route("/admin/crawler/active-jobs", get(list_active_jobs))
.route("/admin/crawler/covers", get(list_covers))
}
// ---------------------------------------------------------------------------
// GET /admin/crawler — live status
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize)]
struct QueueCounts {
pending: i64,
running: i64,
dead: i64,
}
#[derive(Debug, Serialize)]
struct SessionStatus {
/// Whether the sticky session-expired flag is set (chapter workers idle).
expired: bool,
/// Whether a PHPSESSID is currently configured at all.
configured: bool,
}
#[derive(Debug, Serialize)]
struct CrawlerStatusResponse {
/// `"running"` | `"disabled"`.
daemon: &'static str,
phase: Option<Phase>,
/// Configured chapter-worker count (for "N busy / M workers").
worker_count: usize,
/// Chapters being crawled right now, with live page counts.
active_chapters: Vec<ActiveChapter>,
/// The cover being fetched right now, if any.
current_cover: Option<CoverTarget>,
/// Mangas still queued for a cover fetch.
covers_queued: i64,
last_pass: LastPass,
session: SessionStatus,
/// `"healthy"` | `"draining"` | `"restarting"` | `"down"`.
browser: &'static str,
queue: QueueCounts,
}
fn browser_phase_str(p: RestartPhase) -> &'static str {
match p {
RestartPhase::Healthy => "healthy",
RestartPhase::Draining => "draining",
RestartPhase::Restarting => "restarting",
}
}
/// Compose a full status snapshot from the in-memory status, the
/// browser/session flags, and a fresh DB queue-count query. Shared by the
/// one-shot `get_status` and the SSE `stream_status`.
async fn compose_status(state: &AppState) -> AppResult<CrawlerStatusResponse> {
let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?;
let queue = QueueCounts {
pending,
running,
dead,
};
let covers_queued = repo::crawler::count_missing_covers(&state.db).await?;
Ok(match state.crawler.as_ref() {
None => CrawlerStatusResponse {
daemon: "disabled",
phase: None,
worker_count: 0,
active_chapters: Vec::new(),
current_cover: None,
covers_queued,
last_pass: LastPass::default(),
session: SessionStatus {
expired: false,
configured: false,
},
browser: "down",
queue,
},
Some(c) => {
let snap = c.status.snapshot().await;
CrawlerStatusResponse {
daemon: "running",
phase: Some(snap.phase),
worker_count: snap.worker_count,
active_chapters: snap.active_chapters,
current_cover: snap.current_cover,
covers_queued,
last_pass: snap.last_pass,
session: SessionStatus {
expired: c.session.is_expired(),
configured: c.session.current().await.is_some(),
},
browser: browser_phase_str(c.browser_manager.phase()),
queue,
}
}
})
}
async fn get_status(
State(state): State<AppState>,
_admin: RequireAdmin,
) -> AppResult<Json<CrawlerStatusResponse>> {
Ok(Json(compose_status(&state).await?))
}
// ---------------------------------------------------------------------------
// GET /admin/crawler/stream — Server-Sent Events live status
// ---------------------------------------------------------------------------
/// Push live status to the dashboard instead of polling. Emits a snapshot
/// immediately on connect, then on every status change (instant, via the
/// `watch` notifier) and on a [`SSE_BACKSTOP`] tick (to refresh DB queue
/// counts / browser phase that change without a status poke). The browser
/// opens this only while the crawler page is mounted and closes it on
/// navigate-away, so the subscription is scoped to the active page.
async fn stream_status(
State(state): State<AppState>,
_admin: RequireAdmin,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// Subscribe before the first emit so no change between the initial
// snapshot and the first await is lost.
let rx = state.crawler.as_ref().map(|c| c.status.subscribe());
let stream = futures_util::stream::unfold(
(state, rx, true),
|(state, mut rx, first)| async move {
// After the first immediate emit, wait for a change or the
// backstop tick before recomposing.
if !first {
match rx.as_mut() {
Some(rx) => {
tokio::select! {
_ = rx.changed() => {}
_ = tokio::time::sleep(SSE_BACKSTOP) => {}
}
}
None => tokio::time::sleep(SSE_BACKSTOP).await,
}
}
// Compose; on a transient DB error, emit a keep-alive comment
// rather than tearing down the stream.
let event = match compose_status(&state).await {
Ok(resp) => Event::default()
.event("status")
.json_data(&resp)
.unwrap_or_else(|_| Event::default().comment("serialize error")),
Err(_) => Event::default().comment("status unavailable"),
};
Some((Ok(event), (state, rx, false)))
},
);
Sse::new(stream).keep_alive(KeepAlive::default())
}
// ---------------------------------------------------------------------------
// POST /admin/crawler/run — trigger an out-of-cycle metadata pass
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize)]
struct RunResponse {
started: bool,
}
async fn run_now(
State(state): State<AppState>,
admin: RequireAdmin,
) -> AppResult<Json<RunResponse>> {
let c = require_crawler(&state)?;
let mp = c.metadata_pass.as_ref().ok_or_else(|| {
AppError::ServiceUnavailable("no source configured (CRAWLER_START_URL unset)".into())
})?;
let mp = std::sync::Arc::clone(mp);
// Fire-and-forget: the pass can run for minutes; the dashboard polls
// status for progress. Overlap with the daily cron is rare (daily) and
// both serialise on the single browser lease.
tokio::spawn(async move {
if let Err(e) = mp.run().await {
tracing::warn!(error = ?e, "manual metadata pass failed");
}
});
repo::admin_audit::insert(&state.db, admin.0.id, "crawler_run", "crawler", None, json!({}))
.await?;
Ok(Json(RunResponse { started: true }))
}
// ---------------------------------------------------------------------------
// POST /admin/crawler/browser/restart — coordinated restart
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize)]
struct RestartResponse {
ok: bool,
error: Option<String>,
}
async fn restart_browser(
State(state): State<AppState>,
admin: RequireAdmin,
) -> AppResult<Json<RestartResponse>> {
let c = require_crawler(&state)?;
let result = c.browser_manager.coordinated_restart(c.drain_deadline).await;
// A successful coordinated_restart re-runs on_launch, which re-injects
// PHPSESSID and re-probes — i.e. the session is live. Drop the sticky
// `session_expired` flag so chapter workers stop idling without
// requiring a second click on "Clear expired".
if result.is_ok() {
c.session.clear_expired();
}
// Push the post-restart browser phase to live subscribers immediately.
c.status.poke();
repo::admin_audit::insert(
&state.db,
admin.0.id,
"crawler_browser_restart",
"crawler",
None,
json!({ "ok": result.is_ok() }),
)
.await?;
Ok(Json(match result {
Ok(()) => RestartResponse {
ok: true,
error: None,
},
Err(e) => RestartResponse {
ok: false,
error: Some(format!("{e:#}")),
},
}))
}
// ---------------------------------------------------------------------------
// POST /admin/crawler/session — refresh PHPSESSID
// ---------------------------------------------------------------------------
#[derive(Debug, Deserialize)]
struct UpdateSessionRequest {
phpsessid: String,
}
#[derive(Debug, Serialize)]
struct UpdateSessionResponse {
/// Whether the post-update browser relaunch + session probe succeeded.
valid: bool,
error: Option<String>,
}
async fn update_session(
State(state): State<AppState>,
admin: RequireAdmin,
Json(body): Json<UpdateSessionRequest>,
) -> AppResult<Json<UpdateSessionResponse>> {
let c = require_crawler(&state)?;
c.session
.update(&body.phpsessid)
.await
.map_err(|e| AppError::InvalidInput(format!("{e:#}")))?;
// Relaunch the browser so on_launch re-injects the new cookie and
// re-probes — the restart's success IS the session-validity signal.
let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await;
// Session + browser state changed — push to live subscribers.
c.status.poke();
repo::admin_audit::insert(
&state.db,
admin.0.id,
"crawler_session_update",
"crawler",
None,
json!({ "valid": probe.is_ok() }),
)
.await?;
Ok(Json(match probe {
Ok(()) => UpdateSessionResponse {
valid: true,
error: None,
},
Err(e) => UpdateSessionResponse {
valid: false,
error: Some(format!("{e:#}")),
},
}))
}
#[derive(Debug, Serialize)]
struct ClearExpiredResponse {
cleared: bool,
}
async fn clear_session_expired(
State(state): State<AppState>,
admin: RequireAdmin,
) -> AppResult<Json<ClearExpiredResponse>> {
let c = require_crawler(&state)?;
c.session.clear_expired();
// session.expired flipped — push to live subscribers.
c.status.poke();
repo::admin_audit::insert(
&state.db,
admin.0.id,
"crawler_session_clear_expired",
"crawler",
None,
json!({}),
)
.await?;
Ok(Json(ClearExpiredResponse { cleared: true }))
}
// ---------------------------------------------------------------------------
// Dead jobs
// ---------------------------------------------------------------------------
#[derive(Debug, Deserialize, Default)]
struct DeadJobsParams {
#[serde(default)]
search: Option<String>,
#[serde(default = "default_limit")]
limit: i64,
#[serde(default)]
offset: i64,
}
fn default_limit() -> i64 {
50
}
async fn list_dead_jobs(
State(state): State<AppState>,
_admin: RequireAdmin,
Query(params): Query<DeadJobsParams>,
) -> AppResult<Json<crate::api::pagination::PagedResponse<DeadJob>>> {
let limit = params.limit.clamp(1, 200);
let offset = params.offset.max(0);
let search = params.search.filter(|s| !s.trim().is_empty());
let (items, total) =
repo::crawler::list_dead_jobs(&state.db, search.as_deref(), limit, offset).await?;
Ok(Json(crate::api::pagination::PagedResponse::with_total(
items, limit, offset, total,
)))
}
#[derive(Debug, Deserialize)]
#[serde(tag = "scope", rename_all = "snake_case")]
enum RequeueRequest {
All,
Manga { manga_id: Uuid },
Chapter { chapter_id: Uuid },
Job { job_id: Uuid },
}
#[derive(Debug, Serialize)]
struct RequeueResponse {
requeued: u64,
}
async fn requeue_dead_jobs(
State(state): State<AppState>,
admin: RequireAdmin,
Json(body): Json<RequeueRequest>,
) -> AppResult<Json<RequeueResponse>> {
let scope = match &body {
RequeueRequest::All => RequeueScope::All,
RequeueRequest::Manga { manga_id } => RequeueScope::Manga(*manga_id),
RequeueRequest::Chapter { chapter_id } => RequeueScope::Chapter(*chapter_id),
RequeueRequest::Job { job_id } => RequeueScope::Job(*job_id),
};
let requeued = repo::crawler::requeue_dead_jobs(&state.db, scope).await?;
repo::admin_audit::insert(
&state.db,
admin.0.id,
"crawler_dead_jobs_requeue",
"crawler",
None,
json!({ "requeued": requeued, "scope": scope_label(&body) }),
)
.await?;
Ok(Json(RequeueResponse { requeued }))
}
fn scope_label(r: &RequeueRequest) -> &'static str {
match r {
RequeueRequest::All => "all",
RequeueRequest::Manga { .. } => "manga",
RequeueRequest::Chapter { .. } => "chapter",
RequeueRequest::Job { .. } => "job",
}
}
// ---------------------------------------------------------------------------
// Queued-chapters + queued-covers backlogs (paginated, fetched on demand)
// ---------------------------------------------------------------------------
/// Pagination + title-search params shared by the backlog list endpoints.
#[derive(Debug, Deserialize, Default)]
struct ListParams {
#[serde(default)]
search: Option<String>,
#[serde(default = "default_limit")]
limit: i64,
#[serde(default)]
offset: i64,
}
async fn list_active_jobs(
State(state): State<AppState>,
_admin: RequireAdmin,
Query(params): Query<ListParams>,
) -> AppResult<Json<crate::api::pagination::PagedResponse<ActiveJob>>> {
let limit = params.limit.clamp(1, 200);
let offset = params.offset.max(0);
let search = params.search.filter(|s| !s.trim().is_empty());
let (items, total) =
repo::crawler::list_active_jobs(&state.db, search.as_deref(), limit, offset).await?;
Ok(Json(crate::api::pagination::PagedResponse::with_total(
items, limit, offset, total,
)))
}
async fn list_covers(
State(state): State<AppState>,
_admin: RequireAdmin,
Query(params): Query<ListParams>,
) -> AppResult<Json<crate::api::pagination::PagedResponse<MissingCoverRow>>> {
let limit = params.limit.clamp(1, 200);
let offset = params.offset.max(0);
let search = params.search.filter(|s| !s.trim().is_empty());
let (items, total) =
repo::crawler::list_missing_cover_mangas(&state.db, search.as_deref(), limit, offset)
.await?;
Ok(Json(crate::api::pagination::PagedResponse::with_total(
items, limit, offset, total,
)))
}
// ---------------------------------------------------------------------------
fn require_crawler(state: &AppState) -> Result<&std::sync::Arc<CrawlerControl>, AppError> {
state.crawler.as_ref().ok_or_else(|| {
AppError::ServiceUnavailable("crawler daemon is disabled".into())
})
}

View File

@@ -4,7 +4,9 @@
//! bot/API tokens cannot reach admin routes (see
//! `crate::auth::extractor::RequireAdmin`).
pub mod crawler;
pub mod mangas;
pub mod resync;
pub mod system;
pub mod users;
@@ -16,5 +18,7 @@ pub fn routes() -> Router<AppState> {
Router::new()
.merge(users::routes())
.merge(mangas::routes())
.merge(resync::routes())
.merge(system::routes())
.merge(crawler::routes())
}

View File

@@ -0,0 +1,176 @@
//! Admin-triggered force resync of a single manga's metadata + cover,
//! or a single chapter's content.
//!
//! Both endpoints are admin-only (`RequireAdmin`, cookie-only) and run
//! synchronously with the request — the response carries the refreshed
//! resource so the UI can swap it in without a follow-up GET. The work
//! itself is delegated to [`ResyncService`] (set on AppState by
//! `app::build` when the crawler daemon is enabled); when the daemon
//! is disabled, both handlers return 503.
use axum::extract::{Path, State};
use axum::routing::post;
use axum::{Json, Router};
use serde::Serialize;
use serde_json::json;
use uuid::Uuid;
use crate::app::AppState;
use crate::auth::extractor::RequireAdmin;
use crate::crawler::resync::{ChapterResyncOutcome, ResyncError};
use crate::domain::manga::MangaDetail;
use crate::domain::Chapter;
use crate::error::{AppError, AppResult};
use crate::repo;
use crate::repo::crawler::UpsertStatus;
pub fn routes() -> Router<AppState> {
Router::new()
.route("/admin/mangas/:id/resync", post(resync_manga))
.route("/admin/chapters/:id/resync", post(resync_chapter))
}
#[derive(Debug, Serialize)]
pub struct MangaResyncResponse {
pub manga: MangaDetail,
/// `"new" | "updated" | "unchanged"` — mirrors [`UpsertStatus`].
pub metadata_status: &'static str,
pub cover_fetched: bool,
}
#[derive(Debug, Serialize)]
pub struct ChapterResyncResponse {
pub chapter: Chapter,
/// `"fetched" | "skipped"` — whether new pages landed or the
/// service short-circuited (e.g. chapter already had pages and the
/// session was lost so force was downgraded).
pub outcome: &'static str,
/// Page count when `outcome == "fetched"`. `None` for `skipped`.
pub pages: Option<usize>,
}
async fn resync_manga(
State(state): State<AppState>,
admin: RequireAdmin,
Path(manga_id): Path<Uuid>,
) -> AppResult<Json<MangaResyncResponse>> {
if !repo::manga::exists(&state.db, manga_id).await? {
return Err(AppError::NotFound);
}
let resync = state
.resync
.as_ref()
.ok_or_else(|| AppError::ServiceUnavailable(
"crawler daemon is disabled; force resync unavailable".into(),
))?;
let outcome = resync.resync_manga(manga_id).await.map_err(map_resync_err)?;
// Audit the action with the actor + the resync outcome so an
// operator-of-operators can answer "who refetched this manga, and
// did the cover land?" from the log alone.
repo::admin_audit::insert(
&state.db,
admin.0.id,
"manga_resync",
"manga",
Some(manga_id),
json!({
"metadata_status": status_str(outcome.metadata_status),
"cover_fetched": outcome.cover_fetched,
}),
)
.await?;
let manga = repo::manga::get_detail(&state.db, manga_id).await?;
Ok(Json(MangaResyncResponse {
manga,
metadata_status: status_str(outcome.metadata_status),
cover_fetched: outcome.cover_fetched,
}))
}
async fn resync_chapter(
State(state): State<AppState>,
admin: RequireAdmin,
Path(chapter_id): Path<Uuid>,
) -> AppResult<Json<ChapterResyncResponse>> {
let resync = state
.resync
.as_ref()
.ok_or_else(|| AppError::ServiceUnavailable(
"crawler daemon is disabled; force resync unavailable".into(),
))?;
// Look up the manga the chapter belongs to so we can return the
// refreshed chapter row in the response and 404 for unknown ids.
let manga_id: Option<Uuid> =
sqlx::query_scalar("SELECT manga_id FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_optional(&state.db)
.await?;
let Some(manga_id) = manga_id else {
return Err(AppError::NotFound);
};
let outcome = resync
.resync_chapter(chapter_id)
.await
.map_err(map_resync_err)?;
let (outcome_str, pages) = match &outcome {
ChapterResyncOutcome::Fetched { pages, .. } => ("fetched", Some(*pages)),
ChapterResyncOutcome::Skipped { .. } => ("skipped", None),
};
repo::admin_audit::insert(
&state.db,
admin.0.id,
"chapter_resync",
"chapter",
Some(chapter_id),
json!({
"outcome": outcome_str,
"pages": pages,
}),
)
.await?;
let chapter = repo::chapter::find_by_id_in_manga(&state.db, manga_id, chapter_id)
.await?
.ok_or(AppError::NotFound)?;
Ok(Json(ChapterResyncResponse {
chapter,
outcome: outcome_str,
pages,
}))
}
fn status_str(s: UpsertStatus) -> &'static str {
match s {
UpsertStatus::New => "new",
UpsertStatus::Updated => "updated",
UpsertStatus::Unchanged => "unchanged",
}
}
/// Map [`ResyncError`] (and the anyhow envelopes wrapping it) onto the
/// right [`AppError`]. Anything else surfaces as a generic 500 via the
/// `Other` arm — the operator sees the underlying anyhow chain in
/// server logs, the client sees a clean envelope.
fn map_resync_err(err: anyhow::Error) -> AppError {
if let Some(rerr) = err.downcast_ref::<ResyncError>() {
match rerr {
ResyncError::NoMangaSource => AppError::ValidationFailed {
message: "manga has no live crawler source — cannot resync".into(),
details: json!({ "manga": "no_source" }),
},
ResyncError::NoChapterSource => AppError::ValidationFailed {
message: "chapter has no live crawler source — cannot resync".into(),
details: json!({ "chapter": "no_source" }),
},
}
} else {
AppError::Other(err)
}
}

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use anyhow::Context;
use async_trait::async_trait;
@@ -24,6 +24,7 @@ use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass
use crate::crawler::jobs::JobPayload;
use crate::crawler::pipeline::{self, MetadataStats};
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::resync::{RealResyncService, ResyncService};
use crate::crawler::safety::DownloadAllowlist;
use crate::crawler::session;
use crate::repo;
@@ -39,6 +40,30 @@ pub struct AppState {
/// One instance per AppState so tests stay isolated across the
/// same process.
pub auth_limiter: Arc<AuthRateLimiter>,
/// Admin-triggered force resync. `None` when the crawler daemon
/// is disabled (`CRAWLER_DAEMON=false`); admin handlers gate on
/// `.is_some()` and return 503 otherwise. Set by [`build`] from the
/// same wiring that builds the daemon's chapter dispatcher, so a
/// force resync uses the daemon's BrowserManager + rate limiters.
pub resync: Option<Arc<dyn ResyncService>>,
/// Crawler observability + control handle (live status, coordinated
/// browser restart, runtime session, manual run). `None` when the
/// daemon is disabled; admin handlers gate on `.is_some()` → 503.
pub crawler: Option<Arc<CrawlerControl>>,
}
/// Shared handle the admin crawler endpoints use to observe and control
/// the running daemon. Bundled so the handlers take one optional field on
/// `AppState` rather than many.
pub struct CrawlerControl {
pub browser_manager: Arc<BrowserManager>,
pub session: Arc<crate::crawler::session_control::SessionController>,
pub status: crate::crawler::status::StatusHandle,
/// Used by the "run metadata pass now" endpoint; `None` when no
/// `CRAWLER_START_URL` is configured (cron disabled).
pub metadata_pass: Option<Arc<dyn MetadataPass>>,
/// Drain budget for a manually-triggered coordinated browser restart.
pub drain_deadline: std::time::Duration,
}
/// Bundle returned by [`build`]. The router is what `axum::serve` consumes;
@@ -73,11 +98,12 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
let storage: Arc<dyn Storage> = Arc::new(LocalStorage::new(config.storage_dir.clone()));
let daemon = if config.crawler.daemon_enabled {
Some(spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?)
let (daemon, resync, crawler) = if config.crawler.daemon_enabled {
let spawned = spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?;
(Some(spawned.handle), Some(spawned.resync), Some(spawned.crawler))
} else {
tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)");
None
(None, None, None)
};
let auth_limiter = Arc::new(AuthRateLimiter::new(config.auth.rate_limit));
@@ -87,21 +113,39 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
auth: config.auth.clone(),
upload: config.upload.clone(),
auth_limiter,
resync,
crawler,
};
let router = router(state).layer(cors_layer(&config.cors_allowed_origins));
Ok(AppHandle { router, daemon })
}
/// Bundle returned by [`spawn_crawler_daemon`]. The handle owns the
/// daemon's tasks; `resync` is the operator-trigger service shared with
/// `AppState` so admin endpoints can call into the same browser /
/// rate-limit machinery.
struct SpawnedDaemon {
handle: daemon::DaemonHandle,
resync: Arc<dyn ResyncService>,
crawler: Arc<CrawlerControl>,
}
async fn spawn_crawler_daemon(
db: PgPool,
storage: Arc<dyn Storage>,
cfg: &CrawlerConfig,
) -> anyhow::Result<daemon::DaemonHandle> {
// Reqwest client with cookie jar pre-seeded so CDN image fetches
// include PHPSESSID. Same shape as bin/crawler.rs main().
) -> anyhow::Result<SpawnedDaemon> {
// Reqwest client with a shared cookie jar so CDN image fetches include
// PHPSESSID. The same `Arc<Jar>` is held by the SessionController, so a
// runtime session refresh rewrites it in place. Initial value: a
// persisted runtime session (survives restart) takes precedence over
// CRAWLER_PHPSESSID env.
let cookie_jar = Arc::new(reqwest::cookie::Jar::default());
let initial_sid = crate::crawler::session_control::SessionController::load_persisted(&db)
.await
.or_else(|| cfg.phpsessid.clone());
if let (Some(sid), Some(domain), Some(start_url)) =
(&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url)
(&initial_sid, &cfg.cookie_domain, &cfg.start_url)
{
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
let seed_url = reqwest::Url::parse(start_url)
@@ -111,7 +155,7 @@ async fn spawn_crawler_daemon(
let mut http_builder = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.no_proxy()
.cookie_provider(cookie_jar);
.cookie_provider(Arc::clone(&cookie_jar));
if let Some(ua) = &cfg.user_agent {
http_builder = http_builder.user_agent(ua);
}
@@ -139,6 +183,23 @@ async fn spawn_crawler_daemon(
}
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
// Session controller + sticky session-expired flag. Created before the
// browser so the on_launch hook can read the *current* session value
// (rather than a value captured at startup), and so a runtime refresh
// updates the cookie everywhere.
let session_expired = Arc::new(AtomicBool::new(false));
let session_controller = crate::crawler::session_control::SessionController::new(
initial_sid,
Arc::clone(&cookie_jar),
cfg.cookie_domain.clone(),
cfg.start_url.clone(),
db.clone(),
Arc::clone(&session_expired),
);
// Live status surface, sized to the worker count.
let status = crate::crawler::status::StatusHandle::new(cfg.chapter_workers);
// Browser manager. on_launch re-injects PHPSESSID on every fresh
// chromium spawn so an idle teardown followed by re-launch stays
// authenticated without operator action.
@@ -147,18 +208,25 @@ async fn spawn_crawler_daemon(
let chromium_proxy = crate::crawler::url_utils::chromium_proxy_arg(proxy);
launch_opts.extra_args.push(format!("--proxy-server={chromium_proxy}"));
}
let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) {
(Some(sid), Some(domain), Some(start_url)) => {
let sid = sid.clone();
let on_launch = match (&cfg.cookie_domain, &cfg.start_url) {
(Some(domain), Some(start_url)) => {
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor.as_ref().map(Arc::clone);
let sc = Arc::clone(&session_controller);
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
let sid = sid.clone();
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
let sc = Arc::clone(&sc);
Box::pin(async move {
// Read the *current* session each launch so a runtime
// refresh is picked up on the next (re)launch. No session
// configured → run unauthenticated (metadata needs no auth).
let Some(sid) = sc.current().await else {
tracing::info!("on_launch: no session set — skipping inject + probe");
return Ok(());
};
session::inject_phpsessid(&browser, &sid, &domain)
.await
.context("on_launch: inject_phpsessid")?;
@@ -179,8 +247,6 @@ async fn spawn_crawler_daemon(
};
let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch);
let session_expired = Arc::new(AtomicBool::new(false));
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
browser_manager: Arc::clone(&browser_manager),
@@ -192,12 +258,29 @@ async fn spawn_crawler_daemon(
manga_limit: cfg.manga_limit,
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures,
status: status.clone(),
tor: tor.as_ref().map(Arc::clone),
});
m
});
let dispatcher: Arc<dyn ChapterDispatcher> = Arc::new(RealChapterDispatcher {
browser_manager: Arc::clone(&browser_manager),
db: db.clone(),
storage: Arc::clone(&storage),
http: http.clone(),
rate: Arc::clone(&rate),
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
transient_failures: Arc::new(AtomicU32::new(0)),
restart_threshold: cfg.browser_restart_threshold,
drain_deadline: cfg.job_timeout,
status: status.clone(),
tor: tor.as_ref().map(Arc::clone),
});
let resync: Arc<dyn ResyncService> = Arc::new(RealResyncService {
browser_manager: Arc::clone(&browser_manager),
db: db.clone(),
storage: Arc::clone(&storage),
@@ -231,18 +314,32 @@ async fn spawn_crawler_daemon(
db,
cancel,
DaemonConfig {
metadata_pass,
metadata_pass: metadata_pass.clone(),
dispatcher,
chapter_workers: cfg.chapter_workers,
daily_at: cfg.daily_at,
tz: cfg.tz,
retention_days: cfg.retention_days,
session_expired,
status: status.clone(),
job_timeout: cfg.job_timeout,
extra_tasks: vec![reaper_task, shutdown_task],
},
);
Ok(daemon_handle)
let crawler = Arc::new(CrawlerControl {
browser_manager: Arc::clone(&browser_manager),
session: session_controller,
status,
metadata_pass,
drain_deadline: cfg.job_timeout,
});
Ok(SpawnedDaemon {
handle: daemon_handle,
resync,
crawler,
})
}
// Real impls of the daemon traits, owning the browser manager + I/O. Kept
@@ -260,6 +357,8 @@ struct RealMetadataPass {
manga_limit: usize,
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
metadata_max_consecutive_failures: u32,
status: crate::crawler::status::StatusHandle,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
@@ -277,6 +376,8 @@ impl MetadataPass for RealMetadataPass {
false,
&self.download_allowlist,
self.max_image_bytes,
self.metadata_max_consecutive_failures,
Some(&self.status),
self.tor.as_deref(),
)
.await;
@@ -285,6 +386,38 @@ impl MetadataPass for RealMetadataPass {
self.browser_manager.invalidate().await;
}
}
// Cover backfill follows the metadata pass even when the pass
// errored — the early-stop walk can complete its work and bail
// late, and a transient browser failure shouldn't cancel the
// residual cover backlog. The backfill has its own per-call cap
// so a runaway error stream can't monopolise the tick. It sets the
// CoverBackfill{index,total} phase + current_cover per entry.
match pipeline::backfill_missing_covers(
&self.browser_manager,
&self.db,
self.storage.as_ref(),
&self.http,
&self.rate,
pipeline::COVER_BACKFILL_DEFAULT_MAX,
&self.download_allowlist,
self.max_image_bytes,
Some(&self.status),
self.tor.as_deref(),
)
.await
{
Ok(stats) => {
if stats.considered > 0 {
tracing::info!(?stats, "cover backfill complete");
}
}
Err(e) => {
tracing::warn!(error = ?e, "cover backfill failed");
if crate::crawler::nav::anyhow_looks_browser_dead(&e) {
self.browser_manager.invalidate().await;
}
}
}
result
}
}
@@ -297,6 +430,16 @@ struct RealChapterDispatcher {
rate: Arc<HostRateLimiters>,
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
/// Consecutive transient chapter failures; resets on any success.
/// Drives the automatic coordinated browser restart.
transient_failures: Arc<std::sync::atomic::AtomicU32>,
/// Consecutive-failure count that triggers an auto restart.
restart_threshold: u32,
/// How long a coordinated restart waits for in-flight leases to drain.
drain_deadline: std::time::Duration,
/// Live status surface — the dispatcher registers each chapter it
/// crawls (with a realtime page count) here.
status: crate::crawler::status::StatusHandle,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
@@ -312,10 +455,21 @@ impl ChapterDispatcher for RealChapterDispatcher {
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
.await
.context("look up chapter for dispatch")?;
let Some((manga_id, source_url)) = row else {
let Some((manga_id, source_url, manga_title, chapter_number)) = row else {
// Chapter (or its source row) is gone — ack done.
return Ok(SyncOutcome::Skipped);
};
// Register the chapter as crawling now (live status). The
// guard removes it on every exit path — success, panic, or
// the worker's outer-timeout drop.
let _active = self.status.begin_chapter(crate::crawler::status::ActiveChapter {
manga_id,
manga_title,
chapter_id,
chapter_number,
pages_done: 0,
pages_total: None,
});
let lease = self.browser_manager.acquire().await?;
let result = content::sync_chapter_content(
&lease,
@@ -330,14 +484,37 @@ impl ChapterDispatcher for RealChapterDispatcher {
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
Some(&self.status),
)
.await;
drop(lease);
match result {
Ok(outcome) => Ok(outcome),
Ok(outcome) => {
// Any successful dispatch (including a clean Skipped)
// means the browser is healthy — reset the streak.
self.transient_failures.store(0, Ordering::Release);
Ok(outcome)
}
Err(e) => {
let streak = self.transient_failures.fetch_add(1, Ordering::AcqRel) + 1;
if crate::crawler::nav::anyhow_looks_browser_dead(&e) {
// Hard browser-dead: lazy invalidate (next acquire
// relaunches). Reset the streak — we're recovering.
self.browser_manager.invalidate().await;
self.transient_failures.store(0, Ordering::Release);
} else if self.restart_threshold > 0 && streak >= self.restart_threshold {
// Persistent transients that TOR recircuit couldn't
// fix — proactively restart Chromium.
tracing::warn!(
streak,
threshold = self.restart_threshold,
"auto browser restart: consecutive transient chapter failures"
);
let _ = self
.browser_manager
.coordinated_restart(self.drain_deadline)
.await;
self.transient_failures.store(0, Ordering::Release);
}
Err(e)
}

View File

@@ -303,6 +303,11 @@ async fn run(
skip_chapters,
allowlist.as_ref(),
max_image_bytes,
// Circuit-breaker disabled for the operator-driven CLI: a manual
// sweep should push through transient failures, not self-abort.
0,
// No live status surface for the one-shot CLI.
None,
tor.as_deref(),
)
.await?;
@@ -412,6 +417,8 @@ async fn sync_bookmarked_chapter_content(
allowlist.as_ref(),
max_image_bytes,
tor.as_deref(),
// CLI one-shot — no live status surface.
None,
)
.await;
drop(lease);

View File

@@ -132,6 +132,19 @@ pub struct CrawlerConfig {
/// (full sweep up to the source's own bound). Sourced from
/// `CRAWLER_LIMIT`, mirroring the CLI binary.
pub manga_limit: usize,
/// Hard upper bound on a single chapter-content job dispatch. A job
/// exceeding this is acked failed (exponential backoff) instead of
/// wedging a worker. Defaults to 600s. `CRAWLER_JOB_TIMEOUT_SECS`.
pub job_timeout: Duration,
/// Consecutive `fetch_manga` failures that abort a metadata pass
/// (circuit-breaker for a source outage). The pass does NOT mark a
/// clean exit, so the next tick does a recovery sweep. Defaults to
/// 10. `CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES`.
pub metadata_max_consecutive_failures: u32,
/// Consecutive transient chapter failures (after TOR recircuit is
/// exhausted) that trigger an automatic coordinated browser restart.
/// Defaults to 3. `CRAWLER_BROWSER_RESTART_THRESHOLD`.
pub browser_restart_threshold: u32,
}
impl Default for CrawlerConfig {
@@ -159,6 +172,9 @@ impl Default for CrawlerConfig {
download_allowlist: DownloadAllowlist::new(),
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
manga_limit: 0,
job_timeout: Duration::from_secs(600),
metadata_max_consecutive_failures: 10,
browser_restart_threshold: 3,
}
}
}
@@ -283,6 +299,13 @@ impl CrawlerConfig {
download_allowlist,
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
manga_limit: env_usize("CRAWLER_LIMIT", 0),
job_timeout: Duration::from_secs(env_u64("CRAWLER_JOB_TIMEOUT_SECS", 600).max(1)),
metadata_max_consecutive_failures: env_u64(
"CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES",
10,
) as u32,
browser_restart_threshold: env_u64("CRAWLER_BROWSER_RESTART_THRESHOLD", 3).max(1)
as u32,
})
}
}
@@ -384,6 +407,33 @@ mod tests {
assert_eq!(cfg.manga_limit, 0);
}
#[test]
fn reliability_knobs_default_when_unset() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
let cfg = CrawlerConfig::from_env().expect("from_env");
assert_eq!(cfg.job_timeout, Duration::from_secs(600));
assert_eq!(cfg.metadata_max_consecutive_failures, 10);
assert_eq!(cfg.browser_restart_threshold, 3);
}
#[test]
fn reliability_knobs_parse_from_env() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
std::env::set_var("CRAWLER_JOB_TIMEOUT_SECS", "120");
std::env::set_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES", "5");
std::env::set_var("CRAWLER_BROWSER_RESTART_THRESHOLD", "7");
let cfg = CrawlerConfig::from_env().expect("from_env");
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
assert_eq!(cfg.job_timeout, Duration::from_secs(120));
assert_eq!(cfg.metadata_max_consecutive_failures, 5);
assert_eq!(cfg.browser_restart_threshold, 7);
}
#[test]
fn private_mode_env_parses_true() {
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());

View File

@@ -13,7 +13,7 @@
//! until [`BrowserManager::shutdown`].
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -71,12 +71,42 @@ impl ActiveTracker {
}
}
/// Lifecycle gate for a coordinated browser restart. `acquire()` parks
/// while not [`RestartPhase::Healthy`] so no new navigation starts mid-
/// restart; long-lived lease holders (the metadata pass) cooperate by
/// checking [`BrowserManager::is_restart_pending`] at safe boundaries.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum RestartPhase {
/// Normal operation — acquires proceed.
Healthy,
/// Restart requested; new acquires park, waiting for in-flight leases
/// to drain.
Draining,
/// Chromium is being closed + relaunched.
Restarting,
}
const PHASE_HEALTHY: u8 = 0;
const PHASE_DRAINING: u8 = 1;
const PHASE_RESTARTING: u8 = 2;
pub struct BrowserManager {
inner: Mutex<Inner>,
active: Arc<ActiveTracker>,
launch_opts: LaunchOptions,
idle_timeout: Duration,
on_launch: OnLaunch,
/// Coarse lifecycle phase (one of the `PHASE_*` constants).
phase: AtomicU8,
/// Woken when the phase returns to `Healthy` so parked acquires resume.
resume: Notify,
/// Serialises coordinated restarts so concurrent requests collapse into
/// a single relaunch.
restart_lock: Mutex<()>,
/// Result of the most recent relaunch, so a caller that coalesced into
/// an in-progress restart reports that restart's real outcome instead
/// of a blind success.
last_restart_ok: AtomicBool,
}
struct Inner {
@@ -99,28 +129,72 @@ impl BrowserManager {
launch_opts,
idle_timeout,
on_launch,
phase: AtomicU8::new(PHASE_HEALTHY),
resume: Notify::new(),
restart_lock: Mutex::new(()),
last_restart_ok: AtomicBool::new(true),
})
}
/// Current restart phase.
pub fn phase(&self) -> RestartPhase {
match self.phase.load(Ordering::Acquire) {
PHASE_DRAINING => RestartPhase::Draining,
PHASE_RESTARTING => RestartPhase::Restarting,
_ => RestartPhase::Healthy,
}
}
fn set_phase(&self, phase: RestartPhase) {
let v = match phase {
RestartPhase::Healthy => PHASE_HEALTHY,
RestartPhase::Draining => PHASE_DRAINING,
RestartPhase::Restarting => PHASE_RESTARTING,
};
self.phase.store(v, Ordering::Release);
}
/// Whether a coordinated restart is in progress. Long-lived lease
/// holders poll this at safe boundaries and yield their lease so the
/// drain can complete promptly.
pub fn is_restart_pending(&self) -> bool {
self.phase() != RestartPhase::Healthy
}
/// Launch Chromium into `guard`, running the `on_launch` hook before
/// publishing the handle so a probe failure doesn't leave a half-
/// initialised browser behind.
async fn launch_into(&self, guard: &mut Inner) -> anyhow::Result<()> {
let handle = browser::launch(self.launch_opts.clone())
.await
.context("BrowserManager: launch chromium")?;
let shared = handle.shared();
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
let _ = handle.close().await;
return Err(e.context("BrowserManager: on_launch hook failed"));
}
guard.handle = Some(handle);
guard.shared = Some(shared);
Ok(())
}
/// Acquire a shared browser lease. The first acquire after a teardown
/// launches a fresh Chromium (and runs `on_launch`); subsequent acquires
/// while a process is alive just bump the counter and clone the `Arc`.
pub async fn acquire(&self) -> anyhow::Result<BrowserLease> {
// Park while a coordinated restart is draining/relaunching so no new
// navigation starts against a browser that's about to be torn down.
// The short sleep fallback guarantees liveness even if a `resume`
// notification is missed (classic Notify lost-wakeup).
while self.phase() != RestartPhase::Healthy {
tokio::select! {
_ = self.resume.notified() => {}
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
}
}
let mut guard = self.inner.lock().await;
if guard.handle.is_none() {
let handle = browser::launch(self.launch_opts.clone())
.await
.context("BrowserManager: launch chromium")?;
let shared = handle.shared();
// Run the on-launch hook before publishing the handle so a session
// probe failure doesn't leave a half-initialized browser behind.
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
// Close the just-launched browser since we won't be using it.
let _ = handle.close().await;
return Err(e.context("BrowserManager: on_launch hook failed"));
}
guard.handle = Some(handle);
guard.shared = Some(shared);
self.launch_into(&mut guard).await?;
}
let browser = guard
.shared
@@ -134,6 +208,51 @@ impl BrowserManager {
})
}
/// Coordinated restart: block new acquires, wait for in-flight leases
/// to drain (up to `drain_deadline`, then force), close + relaunch
/// Chromium (re-running `on_launch` → re-inject session + probe), then
/// resume parked acquirers. Concurrent calls collapse into one
/// relaunch. The phase is always returned to `Healthy` — even if the
/// relaunch errors — so a failed restart never permanently wedges
/// acquisition (the next acquire retries the launch lazily).
pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> {
// Dedup: if a restart is already running, wait for it and report
// that restart's real outcome (not a blind success).
let _restart_guard = match self.restart_lock.try_lock() {
Ok(g) => g,
Err(_) => {
let _ = self.restart_lock.lock().await;
return if self.last_restart_ok.load(Ordering::Acquire) {
Ok(())
} else {
Err(anyhow::anyhow!("a concurrent coordinated browser restart failed"))
};
}
};
self.set_phase(RestartPhase::Draining);
await_drain(&self.active, drain_deadline).await;
self.set_phase(RestartPhase::Restarting);
let relaunch = {
let mut guard = self.inner.lock().await;
guard.shared = None;
if let Some(handle) = guard.handle.take() {
let _ = handle.close().await;
}
self.launch_into(&mut guard).await
};
self.last_restart_ok.store(relaunch.is_ok(), Ordering::Release);
self.set_phase(RestartPhase::Healthy);
self.resume.notify_waiters();
match &relaunch {
Ok(()) => tracing::info!("BrowserManager: coordinated restart complete"),
Err(e) => tracing::error!(error = ?e, "BrowserManager: coordinated restart relaunch failed"),
}
relaunch.context("coordinated_restart: relaunch")
}
/// Forcefully close the cached browser regardless of active count.
/// Used on daemon shutdown. After this returns the next acquire will
/// re-launch from scratch.
@@ -176,6 +295,29 @@ impl BrowserManager {
}
}
/// Wait for the active-lease count to reach zero, up to `deadline`. Wakes
/// on the tracker's idle signal and re-checks on a short poll so a missed
/// signal can't strand the drain. Returns when drained or when the
/// deadline elapses (the caller then force-restarts). Extracted as a free
/// fn so the timing logic is unit-testable without launching Chromium.
async fn await_drain(active: &Arc<ActiveTracker>, deadline: Duration) {
let start = tokio::time::Instant::now();
while active.current() > 0 {
let Some(remaining) = deadline.checked_sub(start.elapsed()) else {
tracing::warn!(
active = active.current(),
"coordinated_restart: drain deadline exceeded — forcing relaunch"
);
return;
};
let nap = remaining.min(Duration::from_millis(250));
tokio::select! {
_ = active.idle_signal().notified() => {}
_ = tokio::time::sleep(nap) => {}
}
}
}
/// Background reaper. Returns immediately when `idle_timeout == 0`.
/// Otherwise spawns a task that:
/// 1. Waits on `idle_signal` (woken when active hits zero).
@@ -270,6 +412,63 @@ mod tests {
mgr.invalidate().await;
}
#[tokio::test]
async fn await_drain_returns_immediately_when_already_idle() {
let active = ActiveTracker::new();
let start = tokio::time::Instant::now();
await_drain(&active, Duration::from_secs(5)).await;
assert!(start.elapsed() < Duration::from_millis(200), "no wait when idle");
}
#[tokio::test]
async fn await_drain_completes_when_lease_released() {
let active = ActiveTracker::new();
active.acquire();
let bg = {
let a = Arc::clone(&active);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
a.release();
})
};
// Generous deadline; should return shortly after the release, not
// at the deadline.
let start = tokio::time::Instant::now();
await_drain(&active, Duration::from_secs(5)).await;
assert!(start.elapsed() < Duration::from_secs(2), "drained on release");
assert_eq!(active.current(), 0);
bg.await.unwrap();
}
#[tokio::test]
async fn await_drain_force_returns_after_deadline_when_stuck() {
let active = ActiveTracker::new();
active.acquire(); // never released
let start = tokio::time::Instant::now();
await_drain(&active, Duration::from_millis(300)).await;
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(250), "waited ~deadline: {elapsed:?}");
assert!(elapsed < Duration::from_secs(2), "but not forever: {elapsed:?}");
assert_eq!(active.current(), 1, "still held — caller force-restarts");
}
#[test]
fn phase_transitions_reflect_is_restart_pending() {
let mgr = BrowserManager::new(
crate::crawler::browser::LaunchOptions::default(),
Duration::ZERO,
noop_on_launch(),
);
assert_eq!(mgr.phase(), RestartPhase::Healthy);
assert!(!mgr.is_restart_pending());
mgr.set_phase(RestartPhase::Draining);
assert!(mgr.is_restart_pending());
mgr.set_phase(RestartPhase::Restarting);
assert!(mgr.is_restart_pending());
mgr.set_phase(RestartPhase::Healthy);
assert!(!mgr.is_restart_pending());
}
#[tokio::test]
async fn active_tracker_signals_idle_only_on_zero_transition() {
let tracker = ActiveTracker::new();

View File

@@ -186,11 +186,12 @@ where
}
}
/// Fetch all images for one chapter and persist them atomically. On
/// any error after the first storage put, the DB transaction rolls
/// back so the chapter stays at `page_count = 0` and is retried on the
/// next run. Bytes already written to storage become orphans; a future
/// reaper sweeps them.
/// Fetch one chapter's images and persist them. Each image is streamed to
/// storage as it's fetched (peak memory ≈ one image, not the whole
/// chapter); the page rows + `page_count` are then written in one short
/// transaction. On any failure the chapter stays at `page_count = 0` (no
/// partial rows) and the blobs already written are deleted best-effort by
/// [`cleanup_orphans`], so a retry starts clean.
#[allow(clippy::too_many_arguments)]
pub async fn sync_chapter_content(
browser: &chromiumoxide::Browser,
@@ -205,6 +206,10 @@ pub async fn sync_chapter_content(
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
tor: Option<&crate::crawler::tor::TorController>,
// Optional live-status sink for the realtime page counter. The daemon
// dispatcher passes the shared handle (the chapter has already been
// registered via `begin_chapter`); the CLI / admin resync pass `None`.
progress: Option<&crate::crawler::status::StatusHandle>,
) -> anyhow::Result<SyncOutcome> {
// Skip if already fetched, unless caller explicitly forces.
if !force_refetch {
@@ -260,28 +265,93 @@ pub async fn sync_chapter_content(
// Resolve image URLs against the chapter URL (they may be relative).
let base = reqwest::Url::parse(source_url).context("parse chapter URL")?;
// Fetch every image bytes-first into memory before writing
// anything. Lets us bail the whole chapter cleanly if any image
// fails — DB stays at page_count=0, no partial rows persisted.
let mut fetched: Vec<(i32, Vec<u8>, &'static str)> = Vec::with_capacity(images.len());
// Stream each image straight to storage as it's fetched, capping peak
// memory at a single image rather than the whole chapter. Track the
// keys written so they can be rolled back if a later page (or the
// final DB commit) fails — preserving the all-or-nothing guarantee
// without holding a DB transaction open across the network puts
// (which matters once `Storage` is backed by S3).
let total = images.len();
// Publish the now-known page total so the dashboard shows "0/N".
if let Some(p) = progress {
p.set_chapter_pages(chapter_id, 0, Some(total));
}
let mut written_keys: Vec<String> = Vec::with_capacity(total);
let mut stored: Vec<StoredPage> = Vec::with_capacity(total);
for img in &images {
let url = base.join(&img.url).with_context(|| {
format!("join image URL {} onto {source_url}", img.url)
})?;
rate.wait_for(url.as_str()).await?;
let bytes = fetch_bytes_capped(
match download_and_store_page(
storage,
http,
url.as_str(),
Some(source_url),
rate,
&base,
source_url,
manga_id,
chapter_id,
img,
allowlist,
max_image_bytes,
)
.await?
.to_vec();
// Reject any non-image response: the only valid output of an
// image URL is an image. `infer` returns None on truncated
// bytes too, which also wants to be a failure not a silent
// `.bin` extension.
.await
{
Ok(page) => {
written_keys.push(page.storage_key.clone());
stored.push(page);
// Live page counter: push the climbing count to subscribers.
if let Some(p) = progress {
p.set_chapter_pages(chapter_id, stored.len(), Some(total));
}
}
Err(e) => {
cleanup_orphans(storage, &written_keys).await;
return Err(e);
}
}
}
// Short transaction: page rows + page_count only, no network I/O. On
// failure, roll back the stored bytes so the chapter stays at
// page_count=0 and is retried cleanly next run.
if let Err(e) = persist_pages(db, chapter_id, &stored).await {
cleanup_orphans(storage, &written_keys).await;
return Err(e);
}
Ok(SyncOutcome::Fetched { pages: stored.len() })
}
/// A page image that has been written to storage and is awaiting its DB
/// row. Carries everything `persist_pages` needs.
pub(crate) struct StoredPage {
page_number: i32,
storage_key: String,
content_type: String,
}
/// Download a single page image, validate it's really an image, and write
/// it to storage. Returns the storage key + content type. Does not touch
/// the DB — persistence is batched into one short transaction afterward.
#[allow(clippy::too_many_arguments)]
async fn download_and_store_page(
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
base: &reqwest::Url,
source_url: &str,
manga_id: Uuid,
chapter_id: Uuid,
img: &ChapterImage,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
) -> anyhow::Result<StoredPage> {
let url = base
.join(&img.url)
.with_context(|| format!("join image URL {} onto {source_url}", img.url))?;
rate.wait_for(url.as_str()).await?;
let bytes = fetch_bytes_capped(http, url.as_str(), Some(source_url), allowlist, max_image_bytes)
.await?;
// Reject any non-image response: the only valid output of an image URL
// is an image. `infer` returns None on truncated bytes too, which also
// wants to be a failure not a silent `.bin` extension.
if !looks_like_image(&bytes) {
anyhow::bail!(
"image URL {url} returned non-image bytes \
@@ -292,24 +362,30 @@ pub async fn sync_chapter_content(
let ext = infer::get(&bytes)
.map(|k| k.extension())
.expect("looks_like_image asserted infer succeeded");
fetched.push((img.page_number, bytes, ext));
}
// Atomic write: storage puts + page row inserts + page_count
// update, all in one transaction. If anything fails, rollback +
// the chapter is retried next run. Storage orphans the bytes; a
// reaper sweeps them later.
let mut tx = db.begin().await.context("open chapter sync tx")?;
for (page_number, bytes, ext) in &fetched {
let key = format!(
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
page_number
img.page_number
);
storage
.put(&key, bytes)
.put(&key, &bytes)
.await
.with_context(|| format!("put {key}"))?;
// (chapter_id, page_number) is unique — re-runs idempotent.
Ok(StoredPage {
page_number: img.page_number,
storage_key: key,
content_type: format!("image/{ext}"),
})
}
/// Persist the page rows + chapter `page_count` in one short transaction.
/// `(chapter_id, page_number)` is unique so re-runs are idempotent.
pub(crate) async fn persist_pages(
db: &PgPool,
chapter_id: Uuid,
stored: &[StoredPage],
) -> anyhow::Result<()> {
let mut tx = db.begin().await.context("open chapter sync tx")?;
for page in stored {
sqlx::query(
"INSERT INTO pages (chapter_id, page_number, storage_key, content_type)
VALUES ($1, $2, $3, $4)
@@ -318,22 +394,36 @@ pub async fn sync_chapter_content(
content_type = EXCLUDED.content_type",
)
.bind(chapter_id)
.bind(page_number)
.bind(&key)
.bind(format!("image/{ext}"))
.bind(page.page_number)
.bind(&page.storage_key)
.bind(&page.content_type)
.execute(&mut *tx)
.await
.with_context(|| format!("insert page row {page_number}"))?;
.with_context(|| format!("insert page row {}", page.page_number))?;
}
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
.bind(fetched.len() as i32)
.bind(stored.len() as i32)
.bind(chapter_id)
.execute(&mut *tx)
.await
.context("update page_count")?;
tx.commit().await.context("commit chapter sync")?;
Ok(())
}
Ok(SyncOutcome::Fetched { pages: fetched.len() })
/// Best-effort delete of partially-written page blobs after a chapter sync
/// fails, so a retry doesn't accumulate orphans. Errors are logged, not
/// raised — a leftover blob is harmless and a future reaper can sweep it.
pub(crate) async fn cleanup_orphans(storage: &dyn Storage, keys: &[String]) {
for key in keys {
if let Err(e) = storage.delete(key).await {
tracing::warn!(
%key,
error = ?e,
"failed to delete orphaned page blob after chapter sync failure"
);
}
}
}
// Suppress unused-import warning for `session::registrable_domain`
@@ -347,6 +437,90 @@ fn _keep_session_in_scope() {
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::LocalStorage;
#[tokio::test]
async fn cleanup_orphans_deletes_written_keys() {
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorage::new(dir.path());
let keys = vec![
"mangas/m/chapters/c/pages/0001.jpg".to_string(),
"mangas/m/chapters/c/pages/0002.jpg".to_string(),
];
for k in &keys {
storage.put(k, b"\xff\xd8\xff\xe0 jpeg-ish").await.unwrap();
assert!(storage.exists(k).await.unwrap());
}
cleanup_orphans(&storage, &keys).await;
for k in &keys {
assert!(!storage.exists(k).await.unwrap(), "{k} should be deleted");
}
}
#[tokio::test]
async fn cleanup_orphans_tolerates_missing_keys() {
// A key that was never written (e.g. the put itself failed) must
// not make cleanup error — it's best-effort.
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorage::new(dir.path());
cleanup_orphans(&storage, &["never/written.jpg".to_string()]).await;
}
#[sqlx::test(migrations = "./migrations")]
async fn persist_pages_inserts_rows_and_sets_page_count(pool: PgPool) {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, 'T')")
.bind(manga_id)
.execute(&pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
.bind(chapter_id)
.bind(manga_id)
.execute(&pool)
.await
.unwrap();
let stored = vec![
StoredPage {
page_number: 1,
storage_key: "k/0001.jpg".into(),
content_type: "image/jpeg".into(),
},
StoredPage {
page_number: 2,
storage_key: "k/0002.jpg".into(),
content_type: "image/jpeg".into(),
},
];
persist_pages(&pool, chapter_id, &stored).await.unwrap();
let page_count: i32 =
sqlx::query_scalar("SELECT page_count FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(page_count, 2);
let rows: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(rows, 2);
// Idempotent re-run (force refetch path): same rows, page_count stable.
persist_pages(&pool, chapter_id, &stored).await.unwrap();
let rows2: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(rows2, 2, "re-run is idempotent via ON CONFLICT");
}
#[test]
fn parse_chapter_pages_skips_loader_and_sorts_by_id() {

View File

@@ -48,6 +48,7 @@ use tokio_util::sync::CancellationToken;
use crate::crawler::content::SyncOutcome;
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
use crate::crawler::pipeline;
use crate::crawler::status::{Phase, StatusHandle};
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
@@ -56,6 +57,15 @@ pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244;
const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at";
/// Lease window handed to `jobs::lease`. Kept short, but continuously
/// extended by the per-job heartbeat (see [`WorkerContext::process_lease`])
/// so a long-but-healthy job never lapses and gets stolen.
const LEASE_DURATION: Duration = Duration::from_secs(60);
/// How often the heartbeat renews the lease while a job runs. A third of
/// the lease window leaves two missed-beat's slack before expiry.
const LEASE_HEARTBEAT: Duration = Duration::from_secs(20);
#[async_trait]
pub trait MetadataPass: Send + Sync {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>;
@@ -77,6 +87,13 @@ pub struct DaemonConfig {
pub tz: Tz,
pub retention_days: u32,
pub session_expired: Arc<AtomicBool>,
/// Live status surface updated by the cron + workers.
pub status: StatusHandle,
/// Hard upper bound on a single job's dispatch. A job that exceeds it
/// is acked failed (exponential backoff) rather than wedging a worker
/// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic
/// single-job runtime.
pub job_timeout: Duration,
/// Tasks that should run alongside the cron + workers and be cancelled
/// on shutdown. Used to hand the daemon ownership of the browser
/// manager's idle reaper.
@@ -123,6 +140,8 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
tz,
retention_days,
session_expired,
status,
job_timeout,
extra_tasks,
} = cfg;
@@ -134,6 +153,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
tz,
retention_days,
metadata,
status: status.clone(),
};
join.spawn(async move { ctx.run().await });
} else {
@@ -146,6 +166,8 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
cancel: cancel.clone(),
dispatcher: Arc::clone(&dispatcher),
session_expired: Arc::clone(&session_expired),
status: status.clone(),
job_timeout,
id: worker_id,
};
join.spawn(async move { ctx.run().await });
@@ -169,6 +191,7 @@ struct CronContext {
tz: Tz,
retention_days: u32,
metadata: Arc<dyn MetadataPass>,
status: StatusHandle,
}
impl CronContext {
@@ -196,6 +219,11 @@ impl CronContext {
// (NTP step, suspend/resume) don't strand us on a stale instant.
let next = next_fire(Utc::now(), self.daily_at, self.tz);
let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO);
self.status
.set_phase(Phase::Idle {
next_fire: Some(next),
})
.await;
tracing::info!(
next_fire_utc = %next.to_rfc3339(),
wait_seconds = wait.as_secs(),
@@ -243,9 +271,13 @@ impl CronContext {
let metadata = &self.metadata;
let pool = &self.pool;
let retention_days = self.retention_days;
let status = &self.status;
let body = async move {
match metadata.run().await {
Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"),
Ok(stats) => {
status.record_pass(&stats, Utc::now()).await;
tracing::info!(?stats, "cron: metadata pass done");
}
Err(e) => tracing::error!(?e, "cron: metadata pass failed"),
}
match pipeline::enqueue_bookmarked_pending(pool).await {
@@ -283,6 +315,8 @@ struct WorkerContext {
cancel: CancellationToken,
dispatcher: Arc<dyn ChapterDispatcher>,
session_expired: Arc<AtomicBool>,
status: StatusHandle,
job_timeout: Duration,
id: usize,
}
@@ -303,7 +337,7 @@ impl WorkerContext {
&self.pool,
Some(KIND_SYNC_CHAPTER_CONTENT),
1,
Duration::from_secs(60),
LEASE_DURATION,
)
.await
{
@@ -341,9 +375,59 @@ impl WorkerContext {
}
}
let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind()
// Heartbeat: keep the lease fresh while the (potentially long)
// dispatch runs, so a slow-but-healthy job is never re-leased and
// never inflates `attempts` toward `max_attempts`. Stops itself
// once the job is no longer ours (renew returns false).
let heartbeat = {
let hb_pool = self.pool.clone();
let hb_id = lease.id;
tokio::spawn(async move {
loop {
tokio::time::sleep(LEASE_HEARTBEAT).await;
match jobs::renew(&hb_pool, hb_id, LEASE_DURATION).await {
Ok(true) => {}
Ok(false) => break,
Err(e) => {
tracing::warn!(lease_id = %hb_id, ?e, "heartbeat renew failed");
}
}
}
})
};
// The "currently crawling" chapter (with its live page count) is
// registered by the dispatcher itself (RealChapterDispatcher) so it
// carries the manga/chapter identity + page progress and is removed
// via an RAII guard on every exit path.
// Outer timeout: a dispatch that exceeds `job_timeout` is acked
// failed (exponential backoff) rather than wedging the worker.
let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind();
let outcome = tokio::time::timeout(self.job_timeout, dispatch).await;
heartbeat.abort();
let outcome = match outcome {
Ok(o) => o,
Err(_elapsed) => {
tracing::warn!(
worker = self.id,
lease_id = %lease.id,
timeout_secs = self.job_timeout.as_secs(),
"worker: dispatch timed out — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
"dispatch timed out",
lease.attempts,
lease.max_attempts,
)
.await;
return;
}
};
match outcome {
Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => {
let _ = jobs::ack_done(&self.pool, lease.id).await;
@@ -355,6 +439,8 @@ impl WorkerContext {
"session expired — workers will idle until restart"
);
self.session_expired.store(true, Ordering::Release);
// Push the session-expired flip to live status subscribers.
self.status.poke();
let _ = jobs::release(&self.pool, lease.id).await;
}
Ok(Err(e)) => {

View File

@@ -66,16 +66,33 @@ pub struct Lease {
pub max_attempts: i32,
}
/// Exponential backoff for `ack_failed` retries. `attempts` is the
/// post-increment value reported by `lease()` (so the first failure has
/// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to
/// avoid runaway long sleeps that would outlive the daemon process.
fn backoff_for(attempts: i32) -> Duration {
/// Deterministic exponential backoff base for `ack_failed` retries.
/// `attempts` is the post-increment value reported by `lease()` (so the
/// first failure has `attempts == 1` and waits 60s, the second 120s,
/// etc.). Capped at 1h to avoid runaway long sleeps that would outlive
/// the daemon process. Jitter is applied separately by [`apply_jitter`].
fn backoff_base(attempts: i32) -> Duration {
let shift = attempts.saturating_sub(1).clamp(0, 20) as u32;
let secs = 60u64.saturating_mul(1u64 << shift);
Duration::from_secs(secs.min(3600))
}
/// Apply ±20% jitter to a backoff duration. `jitter` is a fraction in
/// `[0.0, 1.0)` (e.g. `rand::random::<f64>()`), mapped to a multiplier in
/// `[0.8, 1.2)`. Pure so the bounds stay unit-testable. Spreading retries
/// avoids a thundering herd when a source outage fails many jobs at once.
fn apply_jitter(base: Duration, jitter: f64) -> Duration {
let frac = jitter.clamp(0.0, 1.0);
let mult = 0.8 + 0.4 * frac; // [0.8, 1.2)
Duration::from_secs((base.as_secs_f64() * mult).round() as u64)
}
/// Jittered exponential backoff for `ack_failed`. Wraps [`backoff_base`]
/// with a random ±20% spread.
fn backoff_for(attempts: i32) -> Duration {
apply_jitter(backoff_base(attempts), rand::random::<f64>())
}
/// Insert a new pending job. For `SyncChapterContent` payloads the
/// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks
/// a second `(pending|running)` insert per chapter_id, returning
@@ -104,6 +121,12 @@ pub async fn enqueue(pool: &PgPool, payload: &JobPayload) -> sqlx::Result<Enqueu
///
/// `kind_filter` matches against `payload->>'kind'`; `None` means
/// any kind.
///
/// Ties on `scheduled_at` (the common case: a cron batch enqueues
/// everything with the same default `now()`) break by `created_at`, so
/// jobs come off the queue in insertion order. The enqueue paths insert
/// chapter-content jobs in ascending `chapters.number` order, so this
/// tiebreaker is what propagates that intent through to dequeue.
pub async fn lease(
pool: &PgPool,
kind_filter: Option<&str>,
@@ -118,7 +141,7 @@ pub async fn lease(
WHERE (state = 'pending' OR (state = 'running' AND leased_until < now()))
AND scheduled_at <= now()
AND ($1::text IS NULL OR payload->>'kind' = $1)
ORDER BY scheduled_at
ORDER BY scheduled_at, created_at
LIMIT $2
FOR UPDATE SKIP LOCKED
)
@@ -153,6 +176,35 @@ pub async fn lease(
Ok(leases)
}
/// Extend the lease on a still-owned `running` job. Returns `true` if the
/// row was updated (we still hold the lease), `false` if the job is no
/// longer `running` (re-leased after a missed heartbeat, or already
/// acked) — the caller's heartbeat loop should stop. The `state =
/// 'running'` guard mirrors [`ack_done`]'s rationale.
///
/// This is the heartbeat primitive: a worker renews periodically while a
/// long-but-healthy job runs so `leased_until` never lapses, which would
/// otherwise let another worker steal the in-flight job and spuriously
/// inflate `attempts` toward `max_attempts`.
pub async fn renew(
pool: &PgPool,
lease_id: Uuid,
lease_duration: Duration,
) -> sqlx::Result<bool> {
let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64;
let res = sqlx::query(
"UPDATE crawler_jobs \
SET leased_until = now() + ($2::bigint || ' milliseconds')::interval, \
updated_at = now() \
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.bind(lease_ms)
.execute(pool)
.await?;
Ok(res.rows_affected() > 0)
}
/// Mark a leased job as successfully completed. The `state = 'running'`
/// predicate guards against a late ack from a worker whose lease expired
/// and was already re-leased by another worker: without it, the late ack
@@ -272,19 +324,48 @@ mod tests {
use super::*;
#[test]
fn backoff_grows_exponentially_and_caps_at_one_hour() {
fn backoff_base_grows_exponentially_and_caps_at_one_hour() {
// attempts == 1 → 60s, doubling each step.
assert_eq!(backoff_for(1), Duration::from_secs(60));
assert_eq!(backoff_for(2), Duration::from_secs(120));
assert_eq!(backoff_for(3), Duration::from_secs(240));
assert_eq!(backoff_for(4), Duration::from_secs(480));
assert_eq!(backoff_for(5), Duration::from_secs(960));
assert_eq!(backoff_for(6), Duration::from_secs(1920));
assert_eq!(backoff_base(1), Duration::from_secs(60));
assert_eq!(backoff_base(2), Duration::from_secs(120));
assert_eq!(backoff_base(3), Duration::from_secs(240));
assert_eq!(backoff_base(4), Duration::from_secs(480));
assert_eq!(backoff_base(5), Duration::from_secs(960));
assert_eq!(backoff_base(6), Duration::from_secs(1920));
// 7th: 60 * 64 = 3840 → capped to 3600.
assert_eq!(backoff_for(7), Duration::from_secs(3600));
assert_eq!(backoff_for(20), Duration::from_secs(3600));
assert_eq!(backoff_base(7), Duration::from_secs(3600));
assert_eq!(backoff_base(20), Duration::from_secs(3600));
// Garbage / zero / negatives stay sane.
assert_eq!(backoff_for(0), Duration::from_secs(60));
assert_eq!(backoff_for(-5), Duration::from_secs(60));
assert_eq!(backoff_base(0), Duration::from_secs(60));
assert_eq!(backoff_base(-5), Duration::from_secs(60));
}
#[test]
fn apply_jitter_stays_within_plus_minus_twenty_percent() {
let base = Duration::from_secs(100);
// Lower bound (jitter = 0.0) → 0.8x.
assert_eq!(apply_jitter(base, 0.0), Duration::from_secs(80));
// Midpoint (jitter = 0.5) → 1.0x.
assert_eq!(apply_jitter(base, 0.5), Duration::from_secs(100));
// Upper end (jitter → 1.0) → ~1.2x.
assert_eq!(apply_jitter(base, 1.0), Duration::from_secs(120));
// Out-of-range inputs are clamped, never panic.
assert_eq!(apply_jitter(base, -3.0), Duration::from_secs(80));
assert_eq!(apply_jitter(base, 9.0), Duration::from_secs(120));
}
#[test]
fn backoff_for_random_jitter_stays_in_band() {
// The production wrapper draws its own randomness; assert the
// result for a mid-range attempt always lands within the jitter
// band of the base, across many draws.
let base = backoff_base(3).as_secs_f64(); // 240s
for _ in 0..1000 {
let v = backoff_for(3).as_secs_f64();
assert!(
v >= base * 0.8 - 1.0 && v <= base * 1.2 + 1.0,
"jittered backoff {v} outside band of base {base}"
);
}
}
}

View File

@@ -23,8 +23,11 @@ pub mod jobs;
pub mod nav;
pub mod pipeline;
pub mod rate_limit;
pub mod resync;
pub mod safety;
pub mod session;
pub mod session_control;
pub mod source;
pub mod status;
pub mod tor;
pub mod url_utils;

View File

@@ -13,7 +13,7 @@ use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist};
use crate::crawler::source::target::TargetSource;
use crate::crawler::source::{FetchContext, Source};
use crate::crawler::source::{FetchContext, Source, SourceMangaRef};
use crate::repo;
use crate::repo::crawler::UpsertStatus;
use crate::storage::Storage;
@@ -65,6 +65,17 @@ pub(crate) fn should_mark_clean_exit(
walked_to_completion || hit_stop_condition
}
/// Circuit-breaker: abort the walk once `consecutive` `fetch_manga`
/// failures reach `threshold`. A `threshold` of 0 disables the breaker
/// (unbounded — the legacy behaviour). When it fires the caller must NOT
/// mark a clean exit, so the next tick does a recovery sweep over the
/// catalog tail the aborted pass never reached.
///
/// Pure so the rule is unit-testable without the walker.
pub(crate) fn should_abort_pass(consecutive: u32, threshold: u32) -> bool {
threshold > 0 && consecutive >= threshold
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
/// for the target source. Pure metadata; chapter content is enqueued as
/// separate `SyncChapterContent` jobs by the caller after this returns.
@@ -103,6 +114,8 @@ pub async fn run_metadata_pass(
skip_chapters: bool,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
max_consecutive_failures: u32,
status: Option<&crate::crawler::status::StatusHandle>,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
@@ -110,6 +123,9 @@ pub async fn run_metadata_pass(
.await
.context("acquire browser lease for metadata pass")?;
let browser_ref: &chromiumoxide::Browser = &lease;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::WalkingList).await;
}
let source = {
let s = TargetSource::new(start_url.to_string());
@@ -165,6 +181,11 @@ pub async fn run_metadata_pass(
let mut walked_to_completion = false;
let mut hit_limit = false;
let mut hit_stop_condition = false;
// Circuit-breaker state: consecutive fetch_manga failures. A sustained
// run abort (source outage) leaves the pass un-clean → recovery sweep
// next tick.
let mut consecutive_failures = 0u32;
let mut hit_failure_breaker = false;
'outer: loop {
let batch = match walker.next_batch(&ctx).await? {
@@ -175,6 +196,17 @@ pub async fn run_metadata_pass(
}
};
for r in batch {
// Cooperative checkpoint: if a coordinated browser restart is
// pending, yield our (long-lived) lease so the drain can
// proceed instead of stalling for the rest of the walk. The
// pass exits un-clean, so the next tick recovery-sweeps the
// tail we didn't reach.
if browser_manager.is_restart_pending() {
tracing::info!(
"metadata pass: browser restart pending — yielding (recovery sweep next tick)"
);
break 'outer;
}
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
hit_limit = true;
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
@@ -198,13 +230,24 @@ pub async fn run_metadata_pass(
continue;
}
stats.discovered += 1;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::FetchingMetadata {
index: stats.discovered,
total: max_refs,
title: r.title.clone(),
})
.await;
}
tracing::info!(
idx = stats.discovered,
key = %r.source_manga_key,
"fetching metadata"
);
let manga = match source.fetch_manga(&ctx, &r).await {
Ok(m) => m,
Ok(m) => {
consecutive_failures = 0;
m
}
Err(e) => {
tracing::warn!(
key = %r.source_manga_key,
@@ -213,6 +256,17 @@ pub async fn run_metadata_pass(
"fetch_manga failed"
);
stats.mangas_failed += 1;
consecutive_failures += 1;
if should_abort_pass(consecutive_failures, max_consecutive_failures) {
hit_failure_breaker = true;
tracing::error!(
consecutive_failures,
threshold = max_consecutive_failures,
"metadata pass: too many consecutive fetch_manga failures; \
aborting (recovery sweep on next tick)"
);
break 'outer;
}
continue;
}
};
@@ -295,7 +349,14 @@ pub async fn run_metadata_pass(
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
if needs_cover {
if let Some(cover_url) = manga.cover_url.as_deref() {
match download_and_store_cover(
if let Some(s) = status {
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
manga_id: upsert.manga_id,
manga_title: manga.title.clone(),
}))
.await;
}
let cover_result = download_and_store_cover(
db,
storage,
http,
@@ -306,8 +367,11 @@ pub async fn run_metadata_pass(
allowlist,
max_image_bytes,
)
.await
{
.await;
if let Some(s) = status {
s.set_current_cover(None).await;
}
match cover_result {
Ok(()) => stats.covers_fetched += 1,
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
@@ -390,6 +454,7 @@ pub async fn run_metadata_pass(
walked_to_completion,
hit_limit,
hit_stop_condition,
hit_failure_breaker,
exited_cleanly,
"metadata pass complete"
);
@@ -429,8 +494,8 @@ pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<Enqueue
AND cj.state = 'dead'
AND cj.updated_at > now() - ($1::bigint || ' days')::interval
)
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at
ORDER BY c.manga_id, c.created_at ASC
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.number, c.created_at
ORDER BY c.manga_id, c.number ASC, c.created_at ASC
"#,
)
.bind(CHAPTER_DEAD_QUARANTINE_DAYS)
@@ -471,7 +536,7 @@ pub async fn enqueue_pending_for_manga(
) -> anyhow::Result<EnqueueSummary> {
let rows: Vec<(String, Uuid, String)> = sqlx::query_as(
r#"
SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
FROM chapters c
JOIN chapter_sources cs ON cs.chapter_id = c.id
WHERE c.manga_id = $1
@@ -484,7 +549,8 @@ pub async fn enqueue_pending_for_manga(
AND cj.state = 'dead'
AND cj.updated_at > now() - ($2::bigint || ' days')::interval
)
ORDER BY cs.source_id, c.id
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.number, c.created_at
ORDER BY c.number ASC, c.created_at ASC, cs.source_id
"#,
)
.bind(manga_id)
@@ -523,12 +589,149 @@ pub struct EnqueueSummary {
pub failed: usize,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct CoverBackfillStats {
pub considered: usize,
pub fetched: usize,
pub failed: usize,
}
/// Default per-tick cap for [`backfill_missing_covers`]. The metadata pass
/// already retries covers when its walk reaches the affected manga; this
/// backfill exists to catch the residual case where the early-stop
/// optimisation prevents the walk from reaching mangas whose cover failed
/// on first attempt. A small cap is enough because the backlog only grows
/// from sporadic download failures, not from systematic misses.
pub const COVER_BACKFILL_DEFAULT_MAX: usize = 10;
/// Re-attempt cover downloads for mangas where `cover_image_path IS NULL`
/// but a live `manga_sources` row exists. Refetches the source detail
/// page (which is where the cover URL lives) and downloads the cover.
///
/// Bounded by `max_mangas` per call so a steady stream of failing covers
/// — e.g. a CDN host that's persistently 502 — can't monopolise a cron
/// tick. Orders by `manga_sources.last_seen_at DESC` so the freshest
/// missing-cover mangas are addressed first.
///
/// Failures are logged and counted, not raised: a single bad cover URL
/// must not stall every other backfill behind it.
#[allow(clippy::too_many_arguments)]
pub async fn backfill_missing_covers(
browser_manager: &BrowserManager,
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
max_mangas: usize,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
status: Option<&crate::crawler::status::StatusHandle>,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<CoverBackfillStats> {
let mut stats = CoverBackfillStats::default();
if max_mangas == 0 {
return Ok(stats);
}
let entries = repo::crawler::list_missing_covers(db, max_mangas as i64)
.await
.context("list_missing_covers")?;
if entries.is_empty() {
return Ok(stats);
}
let lease = browser_manager
.acquire()
.await
.context("acquire browser lease for cover backfill")?;
let browser_ref: &chromiumoxide::Browser = &lease;
let ctx = FetchContext { browser: browser_ref, rate, tor };
let total = entries.len();
for (index, entry) in entries.into_iter().enumerate() {
stats.considered += 1;
if let Some(s) = status {
s.set_phase(crate::crawler::status::Phase::CoverBackfill { index, total })
.await;
}
// Metadata-only TargetSource: skip chapter-list parsing so a
// missing-cover refetch doesn't soft-drop chapters on a partial
// render. Cover URL alone is what we need.
let source = TargetSource::new(entry.source_url.clone()).without_chapter_parsing();
let r = SourceMangaRef {
source_manga_key: entry.source_manga_key.clone(),
title: String::new(),
url: entry.source_url.clone(),
};
let manga = match source.fetch_manga(&ctx, &r).await {
Ok(manga) => manga,
Err(e) => {
tracing::warn!(
manga_id = %entry.manga_id,
url = %entry.source_url,
error = ?e,
"cover backfill: fetch_manga failed"
);
stats.failed += 1;
continue;
}
};
let Some(cover_url) = manga.cover_url.clone() else {
tracing::warn!(
manga_id = %entry.manga_id,
url = %entry.source_url,
"cover backfill: source returned no cover_url"
);
stats.failed += 1;
continue;
};
if let Some(s) = status {
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
manga_id: entry.manga_id,
manga_title: manga.title.clone(),
}))
.await;
}
let cover_result = download_and_store_cover(
db,
storage,
http,
rate,
&entry.source_url,
entry.manga_id,
&cover_url,
allowlist,
max_image_bytes,
)
.await;
if let Some(s) = status {
s.set_current_cover(None).await;
}
match cover_result {
Ok(()) => stats.fetched += 1,
Err(e) => {
tracing::warn!(
manga_id = %entry.manga_id,
url = %entry.source_url,
error = ?e,
"cover backfill: download failed"
);
stats.failed += 1;
}
}
}
drop(lease);
Ok(stats)
}
/// Download a cover image and persist its storage path. Local to the
/// pipeline because the CLI still calls it from its inline chapter-content
/// loop; once the worker pool fully replaces that path we can fold this
/// into `pipeline` proper.
#[allow(clippy::too_many_arguments)]
async fn download_and_store_cover(
pub(crate) async fn download_and_store_cover(
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
@@ -634,6 +837,18 @@ mod tests {
assert!(!should_stop(false, UpsertStatus::New, None));
}
#[test]
fn abort_pass_fires_at_threshold_and_respects_disable() {
// Disabled (0) never fires, no matter how many failures.
assert!(!should_abort_pass(0, 0));
assert!(!should_abort_pass(100, 0));
// Below threshold: keep going.
assert!(!should_abort_pass(9, 10));
// At/above threshold: abort.
assert!(should_abort_pass(10, 10));
assert!(should_abort_pass(11, 10));
}
#[test]
fn clean_exit_when_walked_to_completion() {
// End-of-walk reached the catalog tail — the recovery flag may

View File

@@ -0,0 +1,279 @@
//! Admin-triggered resync of a single manga's metadata + cover, or a
//! single chapter's content.
//!
//! The cron tick already retries covers and chapter content on its own
//! schedule. This module exists for the operator-controlled path:
//! "this manga's metadata is stale / its cover never landed / this
//! chapter is broken — pull from source now, not at the next daily
//! tick." Wired into the admin API, never into the queue, so the work
//! happens synchronously with the HTTP request and the admin sees the
//! refreshed row in the response.
//!
//! Shares the daemon's [`BrowserManager`], rate limiter, HTTP client,
//! and TOR controller so a force resync respects the same per-host
//! pacing and recircuit budget the daily crawl uses — admin actions
//! must not let an operator accidentally hammer the source.
use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use sqlx::PgPool;
use uuid::Uuid;
use crate::crawler::browser_manager::BrowserManager;
use crate::crawler::content::{self, SyncOutcome};
use crate::crawler::pipeline;
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::safety::DownloadAllowlist;
use crate::crawler::source::target::TargetSource;
use crate::crawler::source::{FetchContext, Source, SourceMangaRef};
use crate::crawler::tor::TorController;
use crate::repo;
use crate::repo::crawler::UpsertStatus;
use crate::storage::Storage;
/// Outcome of [`ResyncService::resync_manga`]. Mirrors the bits the
/// admin UI cares about — was the row actually re-upserted, did the
/// cover land — so the response can show "metadata refreshed, cover
/// re-downloaded" or "metadata unchanged" without a second round-trip.
#[derive(Debug, Clone, Copy)]
pub struct MangaResyncOutcome {
pub manga_id: Uuid,
pub metadata_status: UpsertStatus,
pub cover_fetched: bool,
}
/// Outcome of [`ResyncService::resync_chapter`]. `Fetched(pages)` is the
/// success case; `Skipped` means the source row was already gone or the
/// chapter had no live source.
#[derive(Debug, Clone)]
pub enum ChapterResyncOutcome {
Fetched { chapter_id: Uuid, pages: usize },
Skipped { chapter_id: Uuid, reason: String },
}
/// Service exposed by the daemon to the admin API. Optional on
/// [`AppState`] — `None` when the crawler daemon is disabled
/// (`CRAWLER_DAEMON=false`), in which case admin handlers return 503.
#[async_trait]
pub trait ResyncService: Send + Sync {
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome>;
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome>;
}
/// Errors with a stable shape so the API layer can map them to the
/// right HTTP status (404 vs 422 vs 5xx). Anything else surfaces as a
/// generic 500.
#[derive(Debug, thiserror::Error)]
pub enum ResyncError {
#[error("manga has no source to resync from")]
NoMangaSource,
#[error("chapter has no source to resync from")]
NoChapterSource,
}
pub struct RealResyncService {
pub browser_manager: Arc<BrowserManager>,
pub db: PgPool,
pub storage: Arc<dyn Storage>,
pub http: reqwest::Client,
pub rate: Arc<HostRateLimiters>,
pub download_allowlist: DownloadAllowlist,
pub max_image_bytes: usize,
pub tor: Option<Arc<TorController>>,
}
#[async_trait]
impl ResyncService for RealResyncService {
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome> {
// Pick the freshest live source row. Multi-source mangas
// (theoretical — only one Source impl today) get the row whose
// `last_seen_at` is newest; soft-dropped rows are skipped.
let row: Option<(String, String, String)> = sqlx::query_as(
"SELECT source_id, source_manga_key, source_url \
FROM manga_sources \
WHERE manga_id = $1 AND dropped_at IS NULL \
ORDER BY last_seen_at DESC \
LIMIT 1",
)
.bind(manga_id)
.fetch_optional(&self.db)
.await
.context("look up manga_sources for resync")?;
let Some((_source_id, source_manga_key, source_url)) = row else {
return Err(ResyncError::NoMangaSource.into());
};
let lease = self
.browser_manager
.acquire()
.await
.context("acquire browser lease for manga resync")?;
let browser_ref: &chromiumoxide::Browser = &lease;
let ctx = FetchContext {
browser: browser_ref,
rate: &self.rate,
tor: self.tor.as_deref(),
};
// Parse chapters too — a force resync is "make this manga fully
// current," not just metadata. The full pipeline handles the
// partial-render guard for us; we replicate the same caution
// here by skipping the chapter sync when the parser returned
// empty but the manga previously had chapters.
let source = TargetSource::new(source_url.clone());
let r = SourceMangaRef {
source_manga_key: source_manga_key.clone(),
title: String::new(),
url: source_url.clone(),
};
let manga = source
.fetch_manga(&ctx, &r)
.await
.with_context(|| format!("fetch_manga during resync of {manga_id}"))?;
// Partial-render guard: same logic as run_metadata_pass.
let source_id = source.id();
if !manga.chapters.is_empty() || {
let prior = repo::crawler::live_chapter_count_for_source_manga(
&self.db,
source_id,
&source_manga_key,
)
.await
.unwrap_or(0);
prior == 0
} {
// Either the new fetch surfaced chapters, or there were
// none before either — chapter sync is safe to run.
} else {
tracing::warn!(
%manga_id,
source_url = %source_url,
"resync_manga: fetch returned empty chapters but prior count > 0; skipping chapter sync to avoid soft-drop"
);
}
let upsert = repo::crawler::upsert_manga_from_source(
&self.db,
source_id,
&source_url,
&manga,
)
.await
.with_context(|| format!("upsert_manga_from_source during resync of {manga_id}"))?;
// Cover refetch: force-download regardless of UpsertStatus.
// Admin clicked "resync" because they want the cover too.
let mut cover_fetched = false;
if let Some(cover_url) = manga.cover_url.as_deref() {
match pipeline::download_and_store_cover(
&self.db,
self.storage.as_ref(),
&self.http,
&self.rate,
&source_url,
upsert.manga_id,
cover_url,
&self.download_allowlist,
self.max_image_bytes,
)
.await
{
Ok(()) => cover_fetched = true,
Err(e) => tracing::warn!(
%manga_id,
error = ?e,
"resync_manga: cover download failed"
),
}
}
// Chapter sync — only when the partial-render guard above
// didn't bail.
let prior_chapter_count = repo::crawler::live_chapter_count_for_source_manga(
&self.db,
source_id,
&source_manga_key,
)
.await
.unwrap_or(0);
if !manga.chapters.is_empty() || prior_chapter_count == 0 {
match repo::crawler::sync_manga_chapters(
&self.db,
source_id,
upsert.manga_id,
&manga.chapters,
)
.await
{
Ok(diff) => tracing::info!(
%manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"resync_manga: chapters synced"
),
Err(e) => tracing::warn!(
%manga_id,
error = ?e,
"resync_manga: chapter sync failed"
),
}
}
drop(lease);
Ok(MangaResyncOutcome {
manga_id: upsert.manga_id,
metadata_status: upsert.status,
cover_fetched,
})
}
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome> {
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
.await
.context("look up chapter_sources for resync")?;
let Some((manga_id, source_url, _title, _number)) = row else {
return Err(ResyncError::NoChapterSource.into());
};
let lease = self
.browser_manager
.acquire()
.await
.context("acquire browser lease for chapter resync")?;
let result = content::sync_chapter_content(
&lease,
&self.db,
self.storage.as_ref(),
&self.http,
&self.rate,
chapter_id,
manga_id,
&source_url,
true,
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
// Admin resync isn't a daemon worker slot — no live status.
None,
)
.await;
drop(lease);
match result? {
SyncOutcome::Fetched { pages } => {
Ok(ChapterResyncOutcome::Fetched { chapter_id, pages })
}
SyncOutcome::Skipped => Ok(ChapterResyncOutcome::Skipped {
chapter_id,
reason: "chapter already had pages on disk".to_string(),
}),
SyncOutcome::SessionExpired => {
anyhow::bail!("source session expired — operator must refresh PHPSESSID")
}
}
}
}

View File

@@ -0,0 +1,180 @@
//! Runtime-updatable crawler session (PHPSESSID).
//!
//! At startup the session comes from `CRAWLER_PHPSESSID`, but it expires
//! and previously needed a container restart to refresh. This controller
//! lets an admin push a fresh cookie at runtime: it rewrites the reqwest
//! cookie jar (CDN image fetches), updates the in-memory value the browser
//! `on_launch` hook reads, persists it to `crawler_state` (so it survives
//! a restart), and clears the sticky `session_expired` flag. A subsequent
//! coordinated browser restart re-runs `on_launch`, re-injecting the new
//! cookie into Chromium and re-probing.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Context;
use serde_json::json;
use sqlx::PgPool;
use tokio::sync::RwLock;
const STATE_KEY_RUNTIME_SESSION: &str = "runtime_session";
pub struct SessionController {
/// Current PHPSESSID — what `on_launch` injects into a fresh browser.
phpsessid: RwLock<Option<String>>,
/// The same `Arc<Jar>` handed to the reqwest client; updating it here
/// updates the client's cookies (the jar is internally mutable).
cookie_jar: Arc<reqwest::cookie::Jar>,
cookie_domain: Option<String>,
start_url: Option<String>,
db: PgPool,
session_expired: Arc<AtomicBool>,
}
impl SessionController {
pub fn new(
initial: Option<String>,
cookie_jar: Arc<reqwest::cookie::Jar>,
cookie_domain: Option<String>,
start_url: Option<String>,
db: PgPool,
session_expired: Arc<AtomicBool>,
) -> Arc<Self> {
Arc::new(Self {
phpsessid: RwLock::new(initial),
cookie_jar,
cookie_domain,
start_url,
db,
session_expired,
})
}
/// The PHPSESSID a fresh browser should inject (None when unset).
pub async fn current(&self) -> Option<String> {
self.phpsessid.read().await.clone()
}
/// Whether the sticky session-expired flag is set (chapter workers
/// idle while true).
pub fn is_expired(&self) -> bool {
self.session_expired.load(Ordering::Acquire)
}
/// Clear the session-expired flag without changing the cookie — used
/// when the operator knows the session is fine and wants workers to
/// resume immediately.
pub fn clear_expired(&self) {
self.session_expired.store(false, Ordering::Release);
}
/// Update the session everywhere: reqwest jar, in-memory value, and
/// persisted `crawler_state`. Clears the session-expired flag. Does
/// NOT relaunch the browser — the caller triggers a coordinated
/// restart so `on_launch` re-injects + re-probes.
pub async fn update(&self, sid: &str) -> anyhow::Result<()> {
let sid = sid.trim().to_string();
anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty");
// The value is spliced into a cookie string and a CDP CookieParam.
// Reject control chars and cookie delimiters so a pasted value
// can't smuggle extra attributes / break out of the cookie.
anyhow::ensure!(
sid.chars().all(|c| !c.is_control() && c != ';' && c != ','),
"PHPSESSID contains invalid characters"
);
if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) {
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
let seed_url =
reqwest::Url::parse(start_url).context("parse start_url for cookie seed")?;
self.cookie_jar.add_cookie_str(&cookie_str, &seed_url);
}
*self.phpsessid.write().await = Some(sid.clone());
persist(&self.db, &sid).await.context("persist runtime session")?;
self.session_expired.store(false, Ordering::Release);
tracing::info!("crawler session updated at runtime");
Ok(())
}
/// Read a persisted runtime session (if any) from `crawler_state`.
/// Called at startup so a mid-day refresh survives a restart.
pub async fn load_persisted(db: &PgPool) -> Option<String> {
let row: Option<serde_json::Value> =
sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1")
.bind(STATE_KEY_RUNTIME_SESSION)
.fetch_optional(db)
.await
.ok()
.flatten();
row.and_then(|v| {
v.get("phpsessid")
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
}
}
async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> {
sqlx::query(
"INSERT INTO crawler_state (key, value, updated_at) \
VALUES ($1, $2, now()) \
ON CONFLICT (key) DO UPDATE \
SET value = EXCLUDED.value, updated_at = now()",
)
.bind(STATE_KEY_RUNTIME_SESSION)
.bind(json!({ "phpsessid": sid }))
.execute(db)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn controller(db: PgPool) -> Arc<SessionController> {
SessionController::new(
None,
Arc::new(reqwest::cookie::Jar::default()),
Some("example.com".into()),
Some("https://example.com/".into()),
db,
Arc::new(AtomicBool::new(true)),
)
}
#[sqlx::test(migrations = "./migrations")]
async fn update_rejects_empty_and_control_chars(pool: PgPool) {
let c = controller(pool);
assert!(c.update(" ").await.is_err(), "empty rejected");
assert!(c.update("abc\r\ndef").await.is_err(), "CRLF rejected");
assert!(c.update("ab;Domain=evil").await.is_err(), "semicolon rejected");
assert!(c.update("x,y").await.is_err(), "comma rejected");
}
#[sqlx::test(migrations = "./migrations")]
async fn update_persists_and_clears_expired_then_round_trips(pool: PgPool) {
let c = controller(pool.clone());
c.update("good-sid-123").await.unwrap();
assert_eq!(c.current().await.as_deref(), Some("good-sid-123"));
assert!(!c.is_expired(), "update clears the expired flag");
// Persisted to crawler_state and readable by a fresh load.
assert_eq!(
SessionController::load_persisted(&pool).await.as_deref(),
Some("good-sid-123")
);
}
#[sqlx::test(migrations = "./migrations")]
async fn clear_expired_flips_sticky_flag_without_touching_session(pool: PgPool) {
// The flag starts `true` per `controller(pool)`'s test wiring.
let c = controller(pool);
assert!(c.is_expired(), "test fixture starts with the flag set");
c.clear_expired();
assert!(!c.is_expired(), "clear_expired flips the sticky flag to false");
assert!(
c.current().await.is_none(),
"clear_expired does not invent a session"
);
}
}

View File

@@ -0,0 +1,355 @@
//! Live, in-process crawler status.
//!
//! The metadata pass runs inline in the cron tick (it is not a
//! `crawler_jobs` row), so without this surface "what is the crawler doing
//! right now" is unanswerable from the dashboard. The daemon publishes its
//! current [`Phase`], the chapters being crawled right now (with a live
//! page count), and the cover being fetched into a shared [`StatusHandle`];
//! the admin endpoint reads a [`CrawlerStatus`] snapshot and composes it
//! with DB-derived counts + the session/browser flags.
//!
//! NOTE: this is per-process state. The deployment is a single server
//! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals
//! (last-pass summary, runtime session) are persisted in `crawler_state`.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use chrono::{DateTime, Utc};
use serde::Serialize;
use tokio::sync::{watch, RwLock};
use uuid::Uuid;
use crate::crawler::pipeline::MetadataStats;
/// What the daemon's metadata pass is doing right now. Serialised with an
/// internal `state` tag so the frontend can switch on it.
#[derive(Clone, Debug, Serialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum Phase {
/// Sleeping until the next scheduled metadata pass.
Idle { next_fire: Option<DateTime<Utc>> },
/// Walking the source catalog list pages.
WalkingList,
/// Fetching one manga's metadata. `index`/`total` drive a progress bar
/// (`total` is `None` when the source size is unknown / uncapped).
FetchingMetadata {
index: usize,
total: Option<usize>,
title: String,
},
/// Backfilling covers that failed on first attempt. `index`/`total`
/// track progress through this tick's batch.
CoverBackfill { index: usize, total: usize },
}
/// A chapter being downloaded right now, with a live page count. Keyed in
/// the status by `chapter_id`; inserted by the dispatcher when a job starts
/// and removed (via an RAII guard) when it finishes, panics, or times out.
#[derive(Clone, Debug, Serialize)]
pub struct ActiveChapter {
pub manga_id: Uuid,
pub manga_title: String,
pub chapter_id: Uuid,
pub chapter_number: i32,
pub pages_done: usize,
/// `None` until the chapter page list has been parsed.
pub pages_total: Option<usize>,
}
/// The manga whose cover is being downloaded right now.
#[derive(Clone, Debug, Serialize)]
pub struct CoverTarget {
pub manga_id: Uuid,
pub manga_title: String,
}
/// Summary of the most recent metadata pass (persisted across restarts in
/// `crawler_state` by the cron; mirrored here for the live read).
#[derive(Clone, Debug, Serialize, Default)]
pub struct LastPass {
pub at: Option<DateTime<Utc>>,
pub discovered: usize,
pub upserted: usize,
pub covers_fetched: usize,
pub mangas_failed: usize,
}
/// A point-in-time snapshot returned by [`StatusHandle::snapshot`]. The
/// session/browser/queue fields are composed at read time by the endpoint
/// (they live elsewhere), so they are not stored here.
#[derive(Clone, Debug, Serialize)]
pub struct CrawlerStatus {
pub phase: Phase,
/// Number of configured chapter workers (for "N busy / M workers").
pub worker_count: usize,
/// Chapters being downloaded right now, with live page counts.
pub active_chapters: Vec<ActiveChapter>,
pub last_pass: LastPass,
/// The cover being downloaded right now, if any.
pub current_cover: Option<CoverTarget>,
}
/// Scalar status state held under the async `RwLock`. Active chapters live
/// in a separate sync map so per-page updates and RAII removal don't need
/// to `.await` (removal happens in `Drop`).
#[derive(Clone, Debug)]
struct Scalar {
phase: Phase,
worker_count: usize,
last_pass: LastPass,
current_cover: Option<CoverTarget>,
}
/// Cloneable handle the daemon tasks use to publish status. Cheap to clone
/// (`Arc`). All writers funnel through the helper methods so locking stays
/// localised. Every mutation bumps a `watch` version so SSE subscribers
/// get pushed an update instead of polling.
#[derive(Clone)]
pub struct StatusHandle {
scalar: Arc<RwLock<Scalar>>,
/// Currently-downloading chapters keyed by `chapter_id`. A sync mutex so
/// the RAII [`ChapterGuard`]'s `Drop` can remove without `.await`.
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
/// Monotonic version bumped on every change. SSE handlers `subscribe()`
/// and `await .changed()` for instant pushes; `watch` has no
/// lost-wakeup so a change between snapshots is never missed.
version: Arc<watch::Sender<u64>>,
}
/// Lock the active map, recovering from a poisoned mutex (we never hold the
/// lock across a panic-prone section, so the data is still consistent).
fn lock_active(
m: &Mutex<HashMap<Uuid, ActiveChapter>>,
) -> std::sync::MutexGuard<'_, HashMap<Uuid, ActiveChapter>> {
m.lock().unwrap_or_else(|e| e.into_inner())
}
impl StatusHandle {
pub fn new(num_workers: usize) -> Self {
let (version, _rx) = watch::channel(0u64);
Self {
scalar: Arc::new(RwLock::new(Scalar {
phase: Phase::Idle { next_fire: None },
worker_count: num_workers.max(1),
last_pass: LastPass::default(),
current_cover: None,
})),
active: Arc::new(Mutex::new(HashMap::new())),
version: Arc::new(version),
}
}
fn bump(&self) {
self.version.send_modify(|v| *v = v.wrapping_add(1));
}
/// A receiver whose `.changed()` resolves on the next status change.
pub fn subscribe(&self) -> watch::Receiver<u64> {
self.version.subscribe()
}
/// Signal a change without mutating in-memory state — used when an
/// *external* signal the live snapshot reflects (browser phase,
/// session-expired flag, queue counts) has changed, so subscribers
/// recompose promptly.
pub fn poke(&self) {
self.bump();
}
pub async fn set_phase(&self, phase: Phase) {
self.scalar.write().await.phase = phase;
self.bump();
}
/// Set (or clear) the cover being downloaded right now.
pub async fn set_current_cover(&self, cover: Option<CoverTarget>) {
self.scalar.write().await.current_cover = cover;
self.bump();
}
/// Register a chapter as crawling now; returns a guard that removes it
/// when dropped (on completion, panic-unwind, or timeout-drop).
pub fn begin_chapter(&self, chapter: ActiveChapter) -> ChapterGuard {
let id = chapter.chapter_id;
lock_active(&self.active).insert(id, chapter);
self.bump();
ChapterGuard {
active: Arc::clone(&self.active),
version: Arc::clone(&self.version),
chapter_id: id,
}
}
/// Update the live page count of an in-flight chapter. Sync (no
/// `.await`) so it's cheap to call once per stored page.
pub fn set_chapter_pages(&self, chapter_id: Uuid, done: usize, total: Option<usize>) {
{
let mut map = lock_active(&self.active);
if let Some(c) = map.get_mut(&chapter_id) {
c.pages_done = done;
c.pages_total = total;
}
}
self.bump();
}
/// Record a finished metadata pass. Stamps `at` with `now`.
pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime<Utc>) {
self.scalar.write().await.last_pass = LastPass {
at: Some(at),
discovered: stats.discovered,
upserted: stats.upserted,
covers_fetched: stats.covers_fetched,
mangas_failed: stats.mangas_failed,
};
self.bump();
}
/// Seed the last-pass summary from a persisted `crawler_state` value on
/// startup so the dashboard isn't blank until the first tick.
pub async fn set_last_pass(&self, last: LastPass) {
self.scalar.write().await.last_pass = last;
self.bump();
}
pub async fn snapshot(&self) -> CrawlerStatus {
let scalar = self.scalar.read().await.clone();
let mut active_chapters: Vec<ActiveChapter> =
lock_active(&self.active).values().cloned().collect();
// Stable, readable order: by chapter number then id.
active_chapters.sort_by(|a, b| {
a.chapter_number
.cmp(&b.chapter_number)
.then(a.chapter_id.cmp(&b.chapter_id))
});
CrawlerStatus {
phase: scalar.phase,
worker_count: scalar.worker_count,
active_chapters,
last_pass: scalar.last_pass,
current_cover: scalar.current_cover,
}
}
}
/// RAII handle removing an [`ActiveChapter`] from the live status when the
/// chapter dispatch finishes, panics, or is dropped on timeout.
pub struct ChapterGuard {
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
version: Arc<watch::Sender<u64>>,
chapter_id: Uuid,
}
impl Drop for ChapterGuard {
fn drop(&mut self) {
lock_active(&self.active).remove(&self.chapter_id);
self.version.send_modify(|v| *v = v.wrapping_add(1));
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_chapter(n: i32) -> ActiveChapter {
ActiveChapter {
manga_id: Uuid::new_v4(),
manga_title: "M".into(),
chapter_id: Uuid::new_v4(),
chapter_number: n,
pages_done: 0,
pages_total: None,
}
}
#[tokio::test]
async fn begin_chapter_shows_in_snapshot_and_guard_removes_on_drop() {
let h = StatusHandle::new(2);
let chap = sample_chapter(7);
let cid = chap.chapter_id;
{
let _guard = h.begin_chapter(chap);
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters.len(), 1);
assert_eq!(snap.active_chapters[0].chapter_id, cid);
assert_eq!(snap.worker_count, 2);
}
// Guard dropped → entry removed.
let snap = h.snapshot().await;
assert!(snap.active_chapters.is_empty());
}
#[tokio::test]
async fn set_chapter_pages_updates_live_count() {
let h = StatusHandle::new(1);
let chap = sample_chapter(1);
let cid = chap.chapter_id;
let _guard = h.begin_chapter(chap);
h.set_chapter_pages(cid, 3, Some(20));
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters[0].pages_done, 3);
assert_eq!(snap.active_chapters[0].pages_total, Some(20));
// Updating an unknown chapter is a no-op, not a panic.
h.set_chapter_pages(Uuid::new_v4(), 9, Some(9));
}
#[tokio::test]
async fn snapshot_sorts_active_chapters_by_number() {
let h = StatusHandle::new(2);
let _g1 = h.begin_chapter(sample_chapter(5));
let _g2 = h.begin_chapter(sample_chapter(2));
let snap = h.snapshot().await;
assert_eq!(snap.active_chapters[0].chapter_number, 2);
assert_eq!(snap.active_chapters[1].chapter_number, 5);
}
#[tokio::test]
async fn set_current_cover_round_trips() {
let h = StatusHandle::new(1);
let mid = Uuid::new_v4();
h.set_current_cover(Some(CoverTarget {
manga_id: mid,
manga_title: "One Piece".into(),
}))
.await;
assert_eq!(
h.snapshot().await.current_cover.map(|c| c.manga_id),
Some(mid)
);
h.set_current_cover(None).await;
assert!(h.snapshot().await.current_cover.is_none());
}
#[tokio::test]
async fn record_pass_captures_stats_and_timestamp() {
let h = StatusHandle::new(1);
let stats = MetadataStats {
discovered: 5,
upserted: 3,
covers_fetched: 2,
mangas_failed: 1,
};
let at = Utc::now();
h.record_pass(&stats, at).await;
let snap = h.snapshot().await;
assert_eq!(snap.last_pass.discovered, 5);
assert_eq!(snap.last_pass.upserted, 3);
assert_eq!(snap.last_pass.at, Some(at));
}
#[tokio::test]
async fn subscribe_resolves_on_mutation_poke_and_chapter_change() {
let h = StatusHandle::new(1);
let mut rx = h.subscribe();
h.set_phase(Phase::WalkingList).await;
rx.changed().await.unwrap();
h.poke();
rx.changed().await.unwrap();
// begin_chapter + guard drop each bump the version.
let g = h.begin_chapter(sample_chapter(1));
rx.changed().await.unwrap();
drop(g);
rx.changed().await.unwrap();
}
}

View File

@@ -21,6 +21,11 @@ pub enum AppError {
PayloadTooLarge(String),
#[error("unsupported media type: {0}")]
UnsupportedMediaType(String),
/// 503 — a feature is currently unavailable, distinct from a 5xx
/// internal error. Used when admin actions require the crawler
/// daemon but it's been disabled (`CRAWLER_DAEMON=false`).
#[error("service unavailable: {0}")]
ServiceUnavailable(String),
/// 429 with an optional `Retry-After` header value (in seconds).
#[error("too many requests")]
TooManyRequests {
@@ -56,6 +61,7 @@ impl AppError {
AppError::Conflict(_) => "conflict",
AppError::PayloadTooLarge(_) => "payload_too_large",
AppError::UnsupportedMediaType(_) => "unsupported_media_type",
AppError::ServiceUnavailable(_) => "service_unavailable",
AppError::TooManyRequests { .. } => "too_many_requests",
AppError::ValidationFailed { .. } => "validation_failed",
AppError::Database(sqlx::Error::RowNotFound) => "not_found",
@@ -85,6 +91,9 @@ impl IntoResponse for AppError {
AppError::UnsupportedMediaType(msg) => {
(StatusCode::UNSUPPORTED_MEDIA_TYPE, msg.clone(), None)
}
AppError::ServiceUnavailable(msg) => {
(StatusCode::SERVICE_UNAVAILABLE, msg.clone(), None)
}
AppError::TooManyRequests { retry_after_secs } => {
// Emit `Retry-After: N` (RFC 6585 §4) so a well-behaved
// client can back off correctly. Done by building the

View File

@@ -12,15 +12,20 @@ pub async fn list_for_manga(
limit: i64,
offset: i64,
) -> AppResult<Vec<Chapter>> {
// Secondary sort by created_at gives duplicate-numbered chapters
// (multiple uploaders/translations of the same number) a stable
// order in lists and prev/next reader navigation.
// Display order = source-site order reversed. The crawler stamps
// `source_index` = position in the source DOM (0 = first = newest
// on this site, see migration 0021), so DESC puts the oldest
// chapter first and keeps the site's variant grouping and the
// placement of non-numeric entries (e.g. "notice. : Officials")
// intact. NULLS LAST keeps user-uploaded chapters (no source row)
// and rows that pre-date the migration below crawled rows; the
// (number, created_at) tail then orders them deterministically.
let rows = sqlx::query_as::<_, Chapter>(
r#"
SELECT id, manga_id, number, title, page_count, created_at
FROM chapters
WHERE manga_id = $1
ORDER BY number ASC, created_at ASC
ORDER BY source_index DESC NULLS LAST, number ASC, created_at ASC
LIMIT $2 OFFSET $3
"#,
)
@@ -133,14 +138,18 @@ pub async fn page_count(pool: &PgPool, id: Uuid) -> sqlx::Result<Option<i32>> {
/// filter — this resolver stays in lockstep so a chapter that was
/// dropped between enqueue and lease isn't dispatched against a stale
/// URL.
/// Returns `(manga_id, source_url, manga_title, chapter_number)`. The
/// title + number feed the live "currently crawling" status; the rest is
/// what the dispatcher needs to do the work.
pub async fn dispatch_target(
pool: &PgPool,
chapter_id: Uuid,
) -> sqlx::Result<Option<(Uuid, String)>> {
) -> sqlx::Result<Option<(Uuid, String, String, i32)>> {
sqlx::query_as(
"SELECT c.manga_id, cs.source_url \
"SELECT c.manga_id, cs.source_url, m.title, c.number \
FROM chapters c \
JOIN chapter_sources cs ON cs.chapter_id = c.id \
JOIN mangas m ON m.id = c.manga_id \
WHERE c.id = $1 \
AND cs.dropped_at IS NULL \
ORDER BY cs.last_seen_at DESC \

View File

@@ -17,8 +17,9 @@
//! Each public function is a transaction boundary so a partial failure
//! mid-call leaves the DB in its pre-call state.
use chrono::Utc;
use sqlx::{PgPool, Postgres, Transaction};
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::{FromRow, PgPool, Postgres, Transaction};
use uuid::Uuid;
use crate::crawler::source::{SourceChapterRef, SourceManga};
@@ -352,7 +353,14 @@ pub async fn sync_manga_chapters(
.map(|c| c.source_chapter_key.clone())
.collect();
for c in chapters {
for (idx, c) in chapters.iter().enumerate() {
// `source_index` captures the chapter's position in the source
// DOM (0 = first = newest on this site) so the list query can
// reverse it for the user-facing list — see migration 0021.
// Every sync overwrites the value on both branches, so a new
// chapter inserted at the top of the source shifts every other
// row down by one on the next tick.
let source_index = idx as i32;
// Lookup is constrained by manga_id (via the chapters join) so a
// source whose chapter slugs collide across mangas (e.g.
// "chapter-1" appearing under two different mangas) attributes
@@ -382,14 +390,15 @@ pub async fn sync_manga_chapters(
// identity is the UUID, not the number.
let (chapter_id,): (Uuid,) = sqlx::query_as(
r#"
INSERT INTO chapters (manga_id, number, title, page_count)
VALUES ($1, $2, $3, 0)
INSERT INTO chapters (manga_id, number, title, page_count, source_index)
VALUES ($1, $2, $3, 0, $4)
RETURNING id
"#,
)
.bind(manga_id)
.bind(c.number)
.bind(c.title.as_deref())
.bind(source_index)
.fetch_one(&mut *tx)
.await?;
sqlx::query(
@@ -408,8 +417,11 @@ pub async fn sync_manga_chapters(
diff.new += 1;
}
Some((chapter_id,)) => {
sqlx::query("UPDATE chapters SET title = $1 WHERE id = $2")
sqlx::query(
"UPDATE chapters SET title = $1, source_index = $2 WHERE id = $3",
)
.bind(c.title.as_deref())
.bind(source_index)
.bind(chapter_id)
.execute(&mut *tx)
.await?;
@@ -542,6 +554,51 @@ pub async fn mark_run_completed(pool: &PgPool, source_id: &str) -> sqlx::Result<
Ok(())
}
/// List mangas whose `cover_image_path IS NULL` but a live
/// `manga_sources` row still attaches them to a source. The bounded
/// result feeds the cover-backfill pass in [`crate::crawler::pipeline`]:
/// each entry is one (manga, freshest source row) pair where a cover
/// re-download is in order.
///
/// Per-manga deduplication uses `DISTINCT ON (m.id)` keyed on the row
/// with the newest `last_seen_at`, so a manga that's surfaced by
/// multiple sources only produces one row (the freshest). Sort is
/// stable for tests.
pub async fn list_missing_covers(
pool: &PgPool,
max: i64,
) -> sqlx::Result<Vec<MissingCoverEntry>> {
let rows: Vec<(Uuid, String, String)> = sqlx::query_as(
r#"
SELECT DISTINCT ON (m.id) m.id, ms.source_manga_key, ms.source_url
FROM mangas m
JOIN manga_sources ms ON ms.manga_id = m.id
WHERE m.cover_image_path IS NULL
AND ms.dropped_at IS NULL
ORDER BY m.id, ms.last_seen_at DESC
LIMIT $1
"#,
)
.bind(max)
.fetch_all(pool)
.await?;
Ok(rows
.into_iter()
.map(|(manga_id, source_manga_key, source_url)| MissingCoverEntry {
manga_id,
source_manga_key,
source_url,
})
.collect())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MissingCoverEntry {
pub manga_id: Uuid,
pub source_manga_key: String,
pub source_url: String,
}
/// Read the recovery flag for `source_id`. A missing row OR an
/// unparseable value reads as `true` ("clean") — the former covers the
/// first-ever run on a virgin DB (no recovery needed), the latter
@@ -562,3 +619,327 @@ pub async fn last_run_completed_cleanly(
.unwrap_or(true))
}
// ---------------------------------------------------------------------------
// Dead-letter jobs: admin observability + requeue.
// ---------------------------------------------------------------------------
/// A `dead` crawler job joined to its chapter/manga context for the admin
/// dead-letter view. Chapter columns are `Option` because the join is
/// best-effort (the chapter may have been removed since the job died, or
/// the job may be a non-chapter kind).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct DeadJob {
pub id: Uuid,
pub kind: String,
pub chapter_id: Option<Uuid>,
pub manga_id: Option<Uuid>,
pub manga_title: Option<String>,
pub chapter_number: Option<i32>,
pub attempts: i32,
pub max_attempts: i32,
pub last_error: Option<String>,
pub updated_at: DateTime<Utc>,
}
/// Paginated list of `dead` jobs, newest-failed first, joined to chapter +
/// manga context. `search` filters on manga title (case-insensitive
/// substring). Returns the page slice plus the unfiltered-by-page total.
pub async fn list_dead_jobs(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<DeadJob>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<DeadJob> = sqlx::query_as(
r#"
SELECT
cj.id,
cj.payload->>'kind' AS kind,
(cj.payload->>'chapter_id')::uuid AS chapter_id,
c.manga_id AS manga_id,
m.title AS manga_title,
c.number AS chapter_number,
cj.attempts,
cj.max_attempts,
cj.last_error,
cj.updated_at
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state = 'dead'
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY cj.updated_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state = 'dead'
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// An in-flight chapter-content job (`pending` or `running`) joined to its
/// chapter + manga, for the "queued chapters" admin view.
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct ActiveJob {
pub id: Uuid,
pub chapter_id: Option<Uuid>,
pub manga_id: Option<Uuid>,
pub manga_title: Option<String>,
pub chapter_number: Option<i32>,
/// `"pending"` or `"running"`.
pub state: String,
pub attempts: i32,
pub max_attempts: i32,
pub updated_at: DateTime<Utc>,
}
/// Paginated list of `pending`/`running` chapter-content jobs (which
/// chapters of which mangas are queued or being crawled). Running first,
/// then by scheduled order. `search` filters on manga title.
pub async fn list_active_jobs(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<ActiveJob>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<ActiveJob> = sqlx::query_as(
r#"
SELECT
cj.id,
(cj.payload->>'chapter_id')::uuid AS chapter_id,
c.manga_id AS manga_id,
m.title AS manga_title,
c.number AS chapter_number,
cj.state,
cj.attempts,
cj.max_attempts,
cj.updated_at
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state IN ('pending','running')
AND cj.payload->>'kind' = 'sync_chapter_content'
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY (cj.state = 'running') DESC, cj.scheduled_at, cj.created_at
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*)
FROM crawler_jobs cj
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
LEFT JOIN mangas m ON m.id = c.manga_id
WHERE cj.state IN ('pending','running')
AND cj.payload->>'kind' = 'sync_chapter_content'
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// A manga whose cover is still missing (queued for cover fetch).
#[derive(Debug, Clone, Serialize, FromRow)]
pub struct MissingCoverRow {
pub manga_id: Uuid,
pub manga_title: String,
}
/// Count mangas with no cover yet but a live source row — the cover
/// backlog the metadata pass + backfill drain.
pub async fn count_missing_covers(pool: &PgPool) -> sqlx::Result<i64> {
sqlx::query_scalar(
r#"
SELECT COUNT(*) FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
"#,
)
.fetch_one(pool)
.await
}
/// Paginated list of mangas queued for a cover fetch (no cover yet + a live
/// source), with titles. `search` filters on title. Freshest source first.
pub async fn list_missing_cover_mangas(
pool: &PgPool,
search: Option<&str>,
limit: i64,
offset: i64,
) -> sqlx::Result<(Vec<MissingCoverRow>, i64)> {
let search_pat = search
.map(|s| format!("%{}%", s.trim()))
.filter(|p| p.len() > 2);
let items: Vec<MissingCoverRow> = sqlx::query_as(
r#"
SELECT m.id AS manga_id, m.title AS manga_title
FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
AND ($1::text IS NULL OR m.title ILIKE $1)
ORDER BY m.updated_at DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(&search_pat)
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
let total: i64 = sqlx::query_scalar(
r#"
SELECT COUNT(*) FROM mangas m
WHERE m.cover_image_path IS NULL
AND EXISTS (
SELECT 1 FROM manga_sources ms
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
)
AND ($1::text IS NULL OR m.title ILIKE $1)
"#,
)
.bind(&search_pat)
.fetch_one(pool)
.await?;
Ok((items, total))
}
/// Scope of a dead-job requeue.
#[derive(Debug, Clone)]
pub enum RequeueScope {
/// Every dead job.
All,
/// Dead jobs whose chapter belongs to this manga.
Manga(Uuid),
/// Dead jobs for a single chapter.
Chapter(Uuid),
/// A single dead job by its id.
Job(Uuid),
}
/// Requeue dead jobs back to `pending` with a fresh attempt budget. This is
/// an explicit operator override, so it bypasses the dead-letter quarantine
/// the enqueue helpers honour (we act directly on the row). Returns the
/// number of rows requeued.
///
/// Two invariants protect the partial unique dedup index
/// `crawler_jobs_chapter_content_dedup_idx` (one `pending|running`
/// sync_chapter_content job per chapter):
/// 1. A chapter that already has a live (`pending|running`) job is
/// skipped entirely (`NO_LIVE_DUP`).
/// 2. When a chapter has *multiple* dead jobs, only the newest is
/// revived (`DISTINCT ON` the chapter key) — without this, flipping
/// two dead rows for the same chapter to `pending` in one statement
/// would violate the index and abort the whole requeue. Non-chapter
/// jobs fall back to their row id so each stays distinct.
pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result<u64> {
// Scope predicate spliced into the `pick` CTE. Only compile-time
// literals are interpolated; all values are bound below.
let scope_pred: &str = match scope {
RequeueScope::All => "",
RequeueScope::Manga(_) => {
"AND (cj.payload->>'chapter_id')::uuid IN \
(SELECT id FROM chapters WHERE manga_id = $1)"
}
RequeueScope::Chapter(_) => "AND (cj.payload->>'chapter_id')::uuid = $1",
RequeueScope::Job(_) => "AND cj.id = $1",
};
let sql = format!(
r#"
WITH pick AS (
SELECT DISTINCT ON (COALESCE(cj.payload->>'chapter_id', cj.id::text)) cj.id
FROM crawler_jobs cj
WHERE cj.state = 'dead'
{scope_pred}
AND NOT EXISTS (
SELECT 1 FROM crawler_jobs live
WHERE live.payload->>'kind' = 'sync_chapter_content'
AND live.payload->>'chapter_id' = cj.payload->>'chapter_id'
AND live.state IN ('pending','running')
)
ORDER BY COALESCE(cj.payload->>'chapter_id', cj.id::text), cj.updated_at DESC
)
UPDATE crawler_jobs
SET state = 'pending', attempts = 0, leased_until = NULL,
last_error = NULL, scheduled_at = now(), updated_at = now()
FROM pick
WHERE crawler_jobs.id = pick.id
"#
);
let mut q = sqlx::query(&sql);
match scope {
RequeueScope::All => {}
RequeueScope::Manga(id) | RequeueScope::Chapter(id) | RequeueScope::Job(id) => {
q = q.bind(id);
}
}
Ok(q.execute(pool).await?.rows_affected())
}
/// Count crawler jobs grouped by state — drives the dashboard queue
/// gauges. Returns `(pending, running, dead)`.
pub async fn job_state_counts(pool: &PgPool) -> sqlx::Result<(i64, i64, i64)> {
let rows: Vec<(String, i64)> =
sqlx::query_as("SELECT state, COUNT(*) FROM crawler_jobs GROUP BY state")
.fetch_all(pool)
.await?;
let mut pending = 0;
let mut running = 0;
let mut dead = 0;
for (state, n) in rows {
match state.as_str() {
"pending" => pending = n,
"running" => running = n,
"dead" => dead = n,
_ => {}
}
}
Ok((pending, running, dead))
}

View File

@@ -0,0 +1,344 @@
//! Integration tests for the admin crawler observability/control API.
//!
//! The default test harness wires `AppState.crawler = None` (no daemon),
//! so the *control* endpoints return 503 and the *read* endpoints that
//! work off the DB (status shell, dead-jobs list/requeue) still function.
//! This is exactly the production "daemon disabled" posture.
mod common;
use std::time::Duration;
use axum::http::StatusCode;
use axum::Router;
use http_body_util::BodyExt;
use serde_json::json;
use sqlx::PgPool;
use tower::ServiceExt;
use uuid::Uuid;
use common::{body_json, get, get_with_cookie, post_json_with_cookie, register_user, harness};
async fn seed_admin(pool: &PgPool, app: &Router) -> String {
let (username, cookie) = register_user(app).await;
let u = mangalord::repo::user::find_by_username(pool, &username)
.await
.unwrap()
.unwrap();
mangalord::repo::user::set_is_admin_unchecked(pool, u.id, true)
.await
.unwrap();
cookie
}
async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
.bind(chapter_id)
.bind(manga_id)
.execute(pool)
.await
.unwrap();
let job_id = Uuid::new_v4();
sqlx::query(
"INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \
VALUES ($1, $2, 'dead', 5, 'boom')",
)
.bind(job_id)
.bind(json!({
"kind": "sync_chapter_content",
"source_id": "target",
"chapter_id": chapter_id,
"source_chapter_key": "k",
}))
.execute(pool)
.await
.unwrap();
job_id
}
/// Seed a chapter-content job in a given state ('pending'/'running').
async fn seed_job(pool: &PgPool, title: &str, state: &str) {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
.bind(chapter_id)
.bind(manga_id)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO crawler_jobs (id, payload, state) VALUES ($1, $2, $3)")
.bind(Uuid::new_v4())
.bind(json!({
"kind": "sync_chapter_content",
"source_id": "target",
"chapter_id": chapter_id,
"source_chapter_key": "k",
}))
.bind(state)
.execute(pool)
.await
.unwrap();
}
/// Seed a manga with no cover + a live source row (queued for cover fetch).
async fn seed_missing_cover(pool: &PgPool, title: &str) {
let manga_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target','T','http://x') ON CONFLICT DO NOTHING")
.execute(pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
VALUES ('target', $1, $2, 'http://x/m')",
)
.bind(format!("k-{manga_id}"))
.bind(manga_id)
.execute(pool)
.await
.unwrap();
}
#[sqlx::test(migrations = "./migrations")]
async fn active_jobs_and_covers_lists_over_http(pool: PgPool) {
seed_job(&pool, "Naruto", "pending").await;
seed_job(&pool, "Bleach", "running").await;
seed_missing_cover(&pool, "One Piece").await;
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
// Queued/active chapters.
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["page"]["total"], 2);
// Queued covers.
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/covers", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["page"]["total"], 1);
assert_eq!(body["items"][0]["manga_title"], "One Piece");
// Both are admin-gated.
let (_u, plain) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &plain))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")]
async fn get_status_requires_admin(pool: PgPool) {
let h = harness(pool);
// Unauthenticated → 401.
let resp = h.app.clone().oneshot(get("/api/v1/admin/crawler")).await.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
// Authenticated non-admin → 403.
let (_u, cookie) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")]
async fn get_status_reports_disabled_daemon_with_queue_counts(pool: PgPool) {
seed_dead_job(&pool, "Naruto").await;
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["daemon"], "disabled");
assert_eq!(body["queue"]["dead"], 1);
assert_eq!(body["browser"], "down");
}
#[sqlx::test(migrations = "./migrations")]
async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) {
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
for uri in [
"/api/v1/admin/crawler/run",
"/api/v1/admin/crawler/browser/restart",
"/api/v1/admin/crawler/session/clear-expired",
] {
let resp = h
.app
.clone()
.oneshot(post_json_with_cookie(uri, json!({}), &cookie))
.await
.unwrap();
assert_eq!(
resp.status(),
StatusCode::SERVICE_UNAVAILABLE,
"{uri} should be 503 when daemon disabled"
);
}
}
#[sqlx::test(migrations = "./migrations")]
async fn status_stream_requires_admin(pool: PgPool) {
let h = harness(pool);
// Unauthenticated → 401.
let resp = h
.app
.clone()
.oneshot(get("/api/v1/admin/crawler/stream"))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
// Non-admin → 403.
let (_u, cookie) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")]
async fn status_stream_emits_initial_event(pool: PgPool) {
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp
.headers()
.get(axum::http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_string();
assert!(ct.starts_with("text/event-stream"), "content-type was {ct:?}");
// Accumulate frames (the immediate snapshot may arrive split across
// frames) until the status payload appears, with an overall timeout so
// the never-ending stream can't hang the test.
let mut body = resp.into_body();
let mut acc = String::new();
let deadline = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let Some(frame) = body.frame().await else { break };
if let Ok(data) = frame.expect("frame ok").into_data() {
acc.push_str(&String::from_utf8_lossy(&data));
if acc.contains("\"daemon\"") {
break;
}
}
}
})
.await;
assert!(deadline.is_ok(), "did not receive status within 5s; got: {acc:?}");
assert!(acc.contains("\"daemon\""), "missing status payload: {acc}");
assert!(acc.contains("status"), "missing SSE event name: {acc}");
}
#[sqlx::test(migrations = "./migrations")]
async fn mutating_endpoints_reject_non_admin(pool: PgPool) {
let h = harness(pool);
// A logged-in non-admin must be forbidden from a mutating endpoint.
let (_u, cookie) = register_user(&h.app).await;
let resp = h
.app
.clone()
.oneshot(post_json_with_cookie(
"/api/v1/admin/crawler/dead-jobs/requeue",
json!({ "scope": "all" }),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
#[sqlx::test(migrations = "./migrations")]
async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) {
let job_id = seed_dead_job(&pool, "Bleach").await;
let h = harness(pool.clone());
let cookie = seed_admin(&pool, &h.app).await;
// List.
let resp = h
.app
.clone()
.oneshot(get_with_cookie("/api/v1/admin/crawler/dead-jobs", &cookie))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["page"]["total"], 1);
assert_eq!(body["items"][0]["manga_title"], "Bleach");
// Requeue the single job.
let resp = h
.app
.clone()
.oneshot(post_json_with_cookie(
"/api/v1/admin/crawler/dead-jobs/requeue",
json!({ "scope": "job", "job_id": job_id }),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = body_json(resp).await;
assert_eq!(body["requeued"], 1);
let state: String = sqlx::query_scalar("SELECT state FROM crawler_jobs WHERE id = $1")
.bind(job_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(state, "pending");
}

View File

@@ -0,0 +1,350 @@
//! Integration tests for the admin force-resync endpoints.
//!
//! Real resync work requires Chromium, so these tests swap in a stub
//! [`ResyncService`] to assert the handler-level contract: routing,
//! admin gate, 503 when the daemon is disabled, 404 / 422 mapping for
//! missing-resource / no-source cases, and the audit-log side effect.
mod common;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use axum::http::StatusCode;
use serde_json::json;
use sqlx::PgPool;
use tower::ServiceExt;
use uuid::Uuid;
use mangalord::crawler::resync::{
ChapterResyncOutcome, MangaResyncOutcome, ResyncError, ResyncService,
};
use mangalord::repo;
use mangalord::repo::crawler::UpsertStatus;
/// Stub that records call counts and returns a canned outcome.
struct StubResync {
manga_calls: AtomicUsize,
chapter_calls: AtomicUsize,
/// When true, returns NoMangaSource / NoChapterSource.
no_source: bool,
}
impl StubResync {
fn new() -> Arc<Self> {
Arc::new(Self {
manga_calls: AtomicUsize::new(0),
chapter_calls: AtomicUsize::new(0),
no_source: false,
})
}
fn no_source() -> Arc<Self> {
Arc::new(Self {
manga_calls: AtomicUsize::new(0),
chapter_calls: AtomicUsize::new(0),
no_source: true,
})
}
}
#[async_trait]
impl ResyncService for StubResync {
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome> {
self.manga_calls.fetch_add(1, Ordering::SeqCst);
if self.no_source {
return Err(ResyncError::NoMangaSource.into());
}
Ok(MangaResyncOutcome {
manga_id,
metadata_status: UpsertStatus::Updated,
cover_fetched: true,
})
}
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome> {
self.chapter_calls.fetch_add(1, Ordering::SeqCst);
if self.no_source {
return Err(ResyncError::NoChapterSource.into());
}
Ok(ChapterResyncOutcome::Fetched {
chapter_id,
pages: 7,
})
}
}
async fn promote_admin(pool: &PgPool, username: &str) {
let u = repo::user::find_by_username(pool, username)
.await
.unwrap()
.unwrap();
repo::user::set_is_admin_unchecked(pool, u.id, true)
.await
.unwrap();
}
async fn insert_manga(pool: &PgPool, title: &str) -> Uuid {
let (id,): (Uuid,) = sqlx::query_as(
"INSERT INTO mangas (title, status, alt_titles) VALUES ($1, 'ongoing', ARRAY[]::text[]) RETURNING id",
)
.bind(title)
.fetch_one(pool)
.await
.unwrap();
id
}
async fn insert_chapter(pool: &PgPool, manga_id: Uuid, number: i32, pages: i32) -> Uuid {
let (id,): (Uuid,) = sqlx::query_as(
"INSERT INTO chapters (manga_id, number, title, page_count) VALUES ($1, $2, NULL, $3) RETURNING id",
)
.bind(manga_id)
.bind(number)
.bind(pages)
.fetch_one(pool)
.await
.unwrap();
id
}
// ----- manga resync ---------------------------------------------------------
#[sqlx::test(migrations = "./migrations")]
async fn manga_resync_calls_service_and_returns_refreshed_detail(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "Hello").await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = common::body_json(resp).await;
// Stub returned Updated + cover_fetched=true.
assert_eq!(body["metadata_status"], "updated");
assert_eq!(body["cover_fetched"], true);
// Response includes the refreshed manga detail.
assert_eq!(body["manga"]["id"], manga_id.to_string());
assert_eq!(body["manga"]["title"], "Hello");
assert_eq!(stub.manga_calls.load(Ordering::SeqCst), 1);
// Audit row written.
let (audit_count,): (i64,) =
sqlx::query_as("SELECT count(*) FROM admin_audit WHERE action = 'manga_resync' AND target_id = $1")
.bind(manga_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(audit_count, 1);
}
#[sqlx::test(migrations = "./migrations")]
async fn manga_resync_returns_404_for_unknown_id(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/mangas/{}/resync", Uuid::new_v4()),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
// Service must not have been called when the manga doesn't exist.
assert_eq!(stub.manga_calls.load(Ordering::SeqCst), 0);
}
#[sqlx::test(migrations = "./migrations")]
async fn manga_resync_maps_no_source_to_422(pool: PgPool) {
let stub = StubResync::no_source();
let h = common::harness_with_resync(pool.clone(), stub);
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "Manual upload, no crawler source").await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY);
let body = common::body_json(resp).await;
assert_eq!(body["error"]["details"]["manga"], "no_source");
}
#[sqlx::test(migrations = "./migrations")]
async fn manga_resync_returns_503_when_daemon_disabled(pool: PgPool) {
let h = common::harness(pool.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "Z").await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
let body = common::body_json(resp).await;
assert_eq!(body["error"]["code"], "service_unavailable");
}
#[sqlx::test(migrations = "./migrations")]
async fn manga_resync_requires_admin(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub);
// Non-admin user.
let (_u, cookie) = common::register_user(&h.app).await;
let manga_id = insert_manga(&pool, "M").await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}
// ----- chapter resync -------------------------------------------------------
#[sqlx::test(migrations = "./migrations")]
async fn chapter_resync_calls_service_and_returns_refreshed_chapter(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "M").await;
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body = common::body_json(resp).await;
assert_eq!(body["outcome"], "fetched");
assert_eq!(body["pages"], 7);
assert_eq!(body["chapter"]["id"], chapter_id.to_string());
assert_eq!(stub.chapter_calls.load(Ordering::SeqCst), 1);
let (audit_count,): (i64,) = sqlx::query_as(
"SELECT count(*) FROM admin_audit WHERE action = 'chapter_resync' AND target_id = $1",
)
.bind(chapter_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(audit_count, 1);
}
#[sqlx::test(migrations = "./migrations")]
async fn chapter_resync_returns_404_for_unknown_id(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/chapters/{}/resync", Uuid::new_v4()),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_eq!(stub.chapter_calls.load(Ordering::SeqCst), 0);
}
#[sqlx::test(migrations = "./migrations")]
async fn chapter_resync_maps_no_source_to_422(pool: PgPool) {
let stub = StubResync::no_source();
let h = common::harness_with_resync(pool.clone(), stub);
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "M").await;
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY);
let body = common::body_json(resp).await;
assert_eq!(body["error"]["details"]["chapter"], "no_source");
}
#[sqlx::test(migrations = "./migrations")]
async fn chapter_resync_returns_503_when_daemon_disabled(pool: PgPool) {
let h = common::harness(pool.clone());
let (username, cookie) = common::register_user(&h.app).await;
promote_admin(&pool, &username).await;
let manga_id = insert_manga(&pool, "M").await;
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}
#[sqlx::test(migrations = "./migrations")]
async fn chapter_resync_requires_admin(pool: PgPool) {
let stub = StubResync::new();
let h = common::harness_with_resync(pool.clone(), stub);
let (_u, cookie) = common::register_user(&h.app).await;
let manga_id = insert_manga(&pool, "M").await;
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
let resp = h
.app
.oneshot(common::post_json_with_cookie(
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
json!({}),
&cookie,
))
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
}

View File

@@ -49,6 +49,8 @@ fn admin_test_router(pool: PgPool) -> (Router, TempDir) {
auth,
upload: UploadConfig::default(),
auth_limiter,
resync: None,
crawler: None,
};
let app = Router::new()
.nest("/api/v1", api::routes())

View File

@@ -74,6 +74,11 @@ fn harness_with_auth_config(
max_file_bytes: 256 * 1024,
},
auth_limiter,
// Default harness has no crawler daemon wired up; admin resync
// handlers return 503 in this config. Tests that need a stub
// resync service swap it in via `harness_with_resync`.
resync: None,
crawler: None,
};
Harness { app: router(state), _storage_dir: storage_dir }
}
@@ -124,6 +129,38 @@ pub fn harness_with_auth_rate_limit(
harness_with_auth_config(pool, storage, storage_dir, auth)
}
/// Like [`harness`] but slots a caller-supplied [`ResyncService`] stub
/// into `AppState.resync`. Used by the admin resync tests so the
/// endpoint path is exercised without standing up a real Chromium.
pub fn harness_with_resync(
pool: PgPool,
resync: Arc<dyn mangalord::crawler::resync::ResyncService>,
) -> Harness {
let storage_dir = tempfile::tempdir().expect("tempdir");
let storage = Arc::new(LocalStorage::new(storage_dir.path()));
let auth = AuthConfig {
cookie_secure: false,
..AuthConfig::default()
};
let auth_limiter = Arc::new(AuthRateLimiter::new(auth.rate_limit));
let state = AppState {
db: pool,
storage,
auth,
upload: UploadConfig {
max_request_bytes: 4 * 1024 * 1024,
max_file_bytes: 256 * 1024,
},
auth_limiter,
resync: Some(resync),
crawler: None,
};
Harness {
app: router(state),
_storage_dir: storage_dir,
}
}
/// Wraps a real `Storage` and fails on the N-th `put` call so tests can
/// assert that handlers roll their DB writes back when storage errors
/// mid-upload. Reads and other operations delegate to `inner`.

View File

@@ -40,6 +40,8 @@ fn make_cfg(
tz: Tz::UTC,
retention_days: 7,
session_expired,
status: mangalord::crawler::status::StatusHandle::new(workers),
job_timeout: Duration::from_secs(60),
extra_tasks: Vec::new(),
}
}
@@ -88,6 +90,52 @@ impl ChapterDispatcher for PanickingDispatcher {
}
}
/// Never completes — used to verify the worker's outer dispatch timeout.
struct HangingDispatcher {
seen: AtomicUsize,
}
#[async_trait::async_trait]
impl ChapterDispatcher for HangingDispatcher {
async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result<SyncOutcome> {
self.seen.fetch_add(1, Ordering::AcqRel);
std::future::pending::<()>().await;
unreachable!("hanging dispatcher never resolves");
}
}
#[sqlx::test(migrations = "./migrations")]
async fn worker_times_out_a_hung_dispatch_and_acks_failed(pool: PgPool) {
enqueue_chapter_job(&pool).await;
let dispatcher = Arc::new(HangingDispatcher {
seen: AtomicUsize::new(0),
});
let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false));
let cancel = CancellationToken::new();
let mut cfg = make_cfg(None, dispatcher.clone(), session_expired, 1);
cfg.job_timeout = Duration::from_millis(300);
let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg);
// The hung job should time out and return to pending with backoff
// (attempts=1 < max=5). Poll for the recorded error.
let mut timed_out = false;
for _ in 0..40 {
let n: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM crawler_jobs WHERE last_error = 'dispatch timed out'",
)
.fetch_one(&pool)
.await
.unwrap();
if n == 1 {
timed_out = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
handle.shutdown().await;
assert!(timed_out, "hung dispatch must be acked failed with a timeout error");
assert!(dispatcher.seen.load(Ordering::Acquire) >= 1);
}
#[sqlx::test(migrations = "./migrations")]
async fn workers_drain_jobs_through_dispatcher(pool: PgPool) {
enqueue_chapter_job(&pool).await;
@@ -517,3 +565,132 @@ async fn enqueue_bookmarked_pending_resumes_after_quarantine_expires(pool: PgPoo
);
}
/// Helper: insert a chapter with the given `number` and a non-dropped
/// source row, returning the chapter id. Used by the ordering tests so
/// the setup boilerplate doesn't drown the assertion.
async fn insert_pending_chapter(
pool: &PgPool,
manga_id: Uuid,
number: i32,
source_chapter_key: &str,
) -> Uuid {
let chapter_id: Uuid = sqlx::query_scalar(
"INSERT INTO chapters (manga_id, number, page_count) VALUES ($1, $2, 0) RETURNING id",
)
.bind(manga_id)
.bind(number)
.fetch_one(pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO chapter_sources (source_id, source_chapter_key, chapter_id, source_url) \
VALUES ($1, $2, $3, $4)",
)
.bind("target")
.bind(source_chapter_key)
.bind(chapter_id)
.bind(format!("https://example.com/{source_chapter_key}"))
.execute(pool)
.await
.unwrap();
chapter_id
}
#[sqlx::test(migrations = "./migrations")]
async fn enqueue_bookmarked_pending_queues_chapters_in_ascending_number_order(pool: PgPool) {
// Insert chapters with `number` values 3, 1, 2 in that insertion
// order — so `created_at` order (the previous tiebreaker) does NOT
// match number order. After enqueue + lease, the worker should see
// chapters 1, 2, 3 in that sequence.
let user_id: Uuid = sqlx::query_scalar(
"INSERT INTO users (username, password_hash) VALUES ($1, $2) RETURNING id",
)
.bind("alice")
.bind("not-a-real-hash")
.fetch_one(&pool)
.await
.unwrap();
let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
.bind("Test")
.fetch_one(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
)
.bind("target")
.bind("Target")
.bind("https://example.com")
.execute(&pool)
.await
.unwrap();
let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await;
let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await;
let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await;
sqlx::query("INSERT INTO bookmarks (user_id, manga_id) VALUES ($1, $2)")
.bind(user_id)
.bind(manga_id)
.execute(&pool)
.await
.unwrap();
let summary = pipeline::enqueue_bookmarked_pending(&pool).await.unwrap();
assert_eq!(summary.inserted, 3);
let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60))
.await
.unwrap();
let leased_chapter_ids: Vec<Uuid> = leases
.iter()
.map(|l| match &l.payload {
JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id,
other => panic!("unexpected payload kind: {other:?}"),
})
.collect();
assert_eq!(
leased_chapter_ids,
vec![c1, c2, c3],
"chapters must be leased in ascending chapter-number order, not insertion order"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn enqueue_pending_for_manga_queues_chapters_in_ascending_number_order(pool: PgPool) {
// Same scenario as above but exercising the bookmark-create hook path
// (`enqueue_pending_for_manga`) which has its own ORDER BY.
let manga_id: Uuid = sqlx::query_scalar("INSERT INTO mangas (title) VALUES ($1) RETURNING id")
.bind("Test")
.fetch_one(&pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO sources (id, name, base_url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
)
.bind("target")
.bind("Target")
.bind("https://example.com")
.execute(&pool)
.await
.unwrap();
let c3 = insert_pending_chapter(&pool, manga_id, 3, "ch3").await;
let c1 = insert_pending_chapter(&pool, manga_id, 1, "ch1").await;
let c2 = insert_pending_chapter(&pool, manga_id, 2, "ch2").await;
let summary = pipeline::enqueue_pending_for_manga(&pool, manga_id)
.await
.unwrap();
assert_eq!(summary.inserted, 3);
let leases = jobs::lease(&pool, None, 10, std::time::Duration::from_secs(60))
.await
.unwrap();
let leased_chapter_ids: Vec<Uuid> = leases
.iter()
.map(|l| match &l.payload {
JobPayload::SyncChapterContent { chapter_id, .. } => *chapter_id,
other => panic!("unexpected payload kind: {other:?}"),
})
.collect();
assert_eq!(leased_chapter_ids, vec![c1, c2, c3]);
}

View File

@@ -0,0 +1,304 @@
//! Integration tests for the dead-letter admin queries in
//! `repo::crawler`: listing dead jobs with manga/chapter context and the
//! scoped requeue (all / per-manga / single) used by the admin dashboard.
use mangalord::repo::crawler::{self, RequeueScope};
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
/// Seed a manga with no cover + a live source row (so it's "queued for a
/// cover fetch"). Returns the manga id.
async fn seed_missing_cover(pool: &PgPool, title: &str) -> Uuid {
let manga_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target', 'T', 'http://x') ON CONFLICT DO NOTHING")
.execute(pool)
.await
.unwrap();
sqlx::query(
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
VALUES ('target', $1, $2, 'http://x/m')",
)
.bind(format!("k-{manga_id}"))
.bind(manga_id)
.execute(pool)
.await
.unwrap();
manga_id
}
/// Seed a manga + chapter and return their ids.
async fn seed_chapter(pool: &PgPool, title: &str, number: i32) -> (Uuid, Uuid) {
let manga_id = Uuid::new_v4();
let chapter_id = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
.bind(manga_id)
.bind(title)
.execute(pool)
.await
.unwrap();
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, $3)")
.bind(chapter_id)
.bind(manga_id)
.bind(number)
.execute(pool)
.await
.unwrap();
(manga_id, chapter_id)
}
/// Insert a crawler_jobs row in a given state for a chapter-content job.
async fn insert_job(pool: &PgPool, chapter_id: Uuid, state: &str, attempts: i32) -> Uuid {
let id = Uuid::new_v4();
let payload = json!({
"kind": "sync_chapter_content",
"source_id": "target",
"chapter_id": chapter_id,
"source_chapter_key": "k",
});
sqlx::query(
"INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \
VALUES ($1, $2, $3, $4, 'boom')",
)
.bind(id)
.bind(payload)
.bind(state)
.bind(attempts)
.execute(pool)
.await
.unwrap();
id
}
async fn state_of(pool: &PgPool, id: Uuid) -> String {
sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1")
.bind(id)
.fetch_one(pool)
.await
.unwrap()
}
#[sqlx::test(migrations = "./migrations")]
async fn list_dead_jobs_returns_context_and_total(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
insert_job(&pool, c1, "dead", 5).await;
// A non-dead job must not appear.
let (_m2, c2) = seed_chapter(&pool, "Bleach", 1).await;
insert_job(&pool, c2, "pending", 0).await;
let (items, total) = crawler::list_dead_jobs(&pool, None, 50, 0).await.unwrap();
assert_eq!(total, 1);
assert_eq!(items.len(), 1);
let row = &items[0];
assert_eq!(row.manga_title.as_deref(), Some("Naruto"));
assert_eq!(row.chapter_number, Some(700));
assert_eq!(row.attempts, 5);
assert_eq!(row.last_error.as_deref(), Some("boom"));
}
#[sqlx::test(migrations = "./migrations")]
async fn list_dead_jobs_filters_by_title_search(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
insert_job(&pool, c1, "dead", 5).await;
let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await;
insert_job(&pool, c2, "dead", 5).await;
let (items, total) = crawler::list_dead_jobs(&pool, Some("piece"), 50, 0)
.await
.unwrap();
assert_eq!(total, 1);
assert_eq!(items[0].manga_title.as_deref(), Some("One Piece"));
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_all_resets_dead_jobs_to_pending(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
let j1 = insert_job(&pool, c1, "dead", 5).await;
let j2 = insert_job(&pool, c2, "dead", 5).await;
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All)
.await
.unwrap();
assert_eq!(n, 2);
assert_eq!(state_of(&pool, j1).await, "pending");
assert_eq!(state_of(&pool, j2).await, "pending");
let attempts: i32 = sqlx::query_scalar("SELECT attempts FROM crawler_jobs WHERE id = $1")
.bind(j1)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(attempts, 0, "attempts reset on requeue");
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_by_manga_scopes_to_that_manga(pool: PgPool) {
let (m1, c1) = seed_chapter(&pool, "A", 1).await;
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
let j1 = insert_job(&pool, c1, "dead", 5).await;
let j2 = insert_job(&pool, c2, "dead", 5).await;
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Manga(m1))
.await
.unwrap();
assert_eq!(n, 1);
assert_eq!(state_of(&pool, j1).await, "pending");
assert_eq!(state_of(&pool, j2).await, "dead", "other manga untouched");
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_by_chapter_scopes_to_that_chapter(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
let (_m2, c2) = seed_chapter(&pool, "A", 2).await;
let j1 = insert_job(&pool, c1, "dead", 5).await;
let j2 = insert_job(&pool, c2, "dead", 5).await;
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Chapter(c1))
.await
.unwrap();
assert_eq!(n, 1);
assert_eq!(state_of(&pool, j1).await, "pending");
assert_eq!(state_of(&pool, j2).await, "dead", "other chapter untouched");
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_single_job(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
let j1 = insert_job(&pool, c1, "dead", 5).await;
let j2 = insert_job(&pool, c2, "dead", 5).await;
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Job(j1))
.await
.unwrap();
assert_eq!(n, 1);
assert_eq!(state_of(&pool, j1).await, "pending");
assert_eq!(state_of(&pool, j2).await, "dead");
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_skips_dead_when_live_job_exists_for_same_chapter(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
let dead = insert_job(&pool, c1, "dead", 5).await;
// A live pending job for the SAME chapter already exists.
insert_job(&pool, c1, "pending", 0).await;
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All)
.await
.unwrap();
assert_eq!(n, 0, "must not resurrect a dead job that has a live counterpart");
assert_eq!(state_of(&pool, dead).await, "dead");
}
#[sqlx::test(migrations = "./migrations")]
async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: PgPool) {
// Regression: two dead jobs for the SAME chapter must not both flip to
// pending in one statement — that would violate the partial unique
// dedup index and abort the whole requeue.
let (manga_id, c1) = seed_chapter(&pool, "A", 1).await;
let older = insert_job(&pool, c1, "dead", 5).await;
let newer = insert_job(&pool, c1, "dead", 5).await;
// Make `newer` unambiguously newer.
sqlx::query("UPDATE crawler_jobs SET updated_at = now() - interval '1 hour' WHERE id = $1")
.bind(older)
.execute(&pool)
.await
.unwrap();
for scope in [RequeueScope::All, RequeueScope::Manga(manga_id), RequeueScope::Chapter(c1)] {
// Reset to two-dead before each scope variant.
sqlx::query("UPDATE crawler_jobs SET state = 'dead' WHERE id = ANY($1)")
.bind(vec![older, newer])
.execute(&pool)
.await
.unwrap();
let n = crawler::requeue_dead_jobs(&pool, scope)
.await
.expect("requeue must not error on duplicate dead jobs");
assert_eq!(n, 1, "exactly one dead job per chapter is revived");
// The newest one is the survivor; the other stays dead.
assert_eq!(state_of(&pool, newer).await, "pending");
assert_eq!(state_of(&pool, older).await, "dead");
}
}
#[sqlx::test(migrations = "./migrations")]
async fn list_active_jobs_returns_pending_and_running_running_first(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
let (_m2, c2) = seed_chapter(&pool, "Bleach", 10).await;
insert_job(&pool, c1, "pending", 0).await;
insert_job(&pool, c2, "running", 1).await;
// A dead + a done job must NOT appear.
let (_m3, c3) = seed_chapter(&pool, "Gone", 1).await;
insert_job(&pool, c3, "dead", 5).await;
let (items, total) = crawler::list_active_jobs(&pool, None, 50, 0).await.unwrap();
assert_eq!(total, 2);
assert_eq!(items.len(), 2);
// Running first.
assert_eq!(items[0].state, "running");
assert_eq!(items[0].manga_title.as_deref(), Some("Bleach"));
assert_eq!(items[1].state, "pending");
assert_eq!(items[1].chapter_number, Some(700));
}
#[sqlx::test(migrations = "./migrations")]
async fn list_active_jobs_filters_by_title(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "Naruto", 1).await;
let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await;
insert_job(&pool, c1, "pending", 0).await;
insert_job(&pool, c2, "pending", 0).await;
let (items, total) = crawler::list_active_jobs(&pool, Some("piece"), 50, 0)
.await
.unwrap();
assert_eq!(total, 1);
assert_eq!(items[0].manga_title.as_deref(), Some("One Piece"));
}
#[sqlx::test(migrations = "./migrations")]
async fn missing_covers_count_and_list(pool: PgPool) {
seed_missing_cover(&pool, "Naruto").await;
seed_missing_cover(&pool, "Bleach").await;
// A manga WITH a cover must not be counted.
let with_cover = Uuid::new_v4();
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, 'Done', 'k.jpg')")
.bind(with_cover)
.execute(&pool)
.await
.unwrap();
assert_eq!(crawler::count_missing_covers(&pool).await.unwrap(), 2);
let (items, total) = crawler::list_missing_cover_mangas(&pool, None, 50, 0)
.await
.unwrap();
assert_eq!(total, 2);
assert_eq!(items.len(), 2);
let (items, total) = crawler::list_missing_cover_mangas(&pool, Some("naru"), 50, 0)
.await
.unwrap();
assert_eq!(total, 1);
assert_eq!(items[0].manga_title, "Naruto");
}
#[sqlx::test(migrations = "./migrations")]
async fn job_state_counts_groups_by_state(pool: PgPool) {
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
let (_m3, c3) = seed_chapter(&pool, "C", 1).await;
insert_job(&pool, c1, "pending", 0).await;
insert_job(&pool, c2, "dead", 5).await;
insert_job(&pool, c3, "dead", 5).await;
let (pending, running, dead) = crawler::job_state_counts(&pool).await.unwrap();
assert_eq!(pending, 1);
assert_eq!(running, 0);
assert_eq!(dead, 2);
}

View File

@@ -185,6 +185,68 @@ async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPo
assert!(leased_until > chrono::Utc::now());
}
#[sqlx::test(migrations = "./migrations")]
async fn renew_extends_leased_until_while_running(pool: PgPool) {
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
EnqueueResult::Skipped => unreachable!(),
};
// Lease with a short window, then collapse leased_until to the recent
// past so the renew is unambiguously an extension.
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(5))
.await
.unwrap();
assert_eq!(leases.len(), 1);
sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 second' WHERE id = $1")
.bind(id)
.execute(&pool)
.await
.unwrap();
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
.await
.unwrap();
assert!(still_owned, "renew on a running job returns true");
let leased_until: chrono::DateTime<chrono::Utc> =
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
.bind(id)
.fetch_one(&pool)
.await
.unwrap();
assert!(
leased_until > chrono::Utc::now() + chrono::Duration::seconds(60),
"leased_until pushed ~120s into the future"
);
assert_eq!(job_state(&pool, id).await, "running");
}
#[sqlx::test(migrations = "./migrations")]
async fn renew_is_noop_once_job_no_longer_running(pool: PgPool) {
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
EnqueueResult::Skipped => unreachable!(),
};
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
.await
.unwrap();
// Job completes — heartbeat should now see it's no longer ours.
jobs::ack_done(&pool, leases[0].id).await.unwrap();
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
.await
.unwrap();
assert!(!still_owned, "renew on a non-running job returns false");
assert_eq!(job_state(&pool, id).await, "done");
}
#[sqlx::test(migrations = "./migrations")]
async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) {
let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo"))
@@ -531,6 +593,89 @@ async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) {
assert_eq!(remaining, vec![fresh_id], "only fresh row remains");
}
#[sqlx::test(migrations = "./migrations")]
async fn lease_ties_on_scheduled_at_break_by_created_at(pool: PgPool) {
// Locks in the tiebreaker that lets enqueue order survive the lease
// step: when many jobs share `scheduled_at` (the common cron-batch
// case), the worker must pick the earliest-inserted row, not whatever
// Postgres returns in heap order. The enqueue path inserts chapters
// in chapter-number order, so this tiebreaker is what makes "queue
// in rising order" observable at the dequeue side too.
let a = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
_ => unreachable!(),
};
let b = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
_ => unreachable!(),
};
let c = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
.await
.unwrap()
{
EnqueueResult::Inserted(id) => id,
_ => unreachable!(),
};
// Pin `scheduled_at` to a single literal instant (shared across all
// three rows — `now()` would yield a different microsecond per UPDATE
// and make scheduled_at the actual sort key). Reverse `created_at`
// against insertion order so heap order would give the wrong answer.
let shared_scheduled = chrono::Utc::now() - chrono::Duration::hours(1);
sqlx::query(
"UPDATE crawler_jobs \
SET scheduled_at = $2, \
created_at = $3 \
WHERE id = $1",
)
.bind(a)
.bind(shared_scheduled)
.bind(chrono::Utc::now() - chrono::Duration::seconds(10))
.execute(&pool)
.await
.unwrap();
sqlx::query(
"UPDATE crawler_jobs \
SET scheduled_at = $2, \
created_at = $3 \
WHERE id = $1",
)
.bind(b)
.bind(shared_scheduled)
.bind(chrono::Utc::now() - chrono::Duration::seconds(20))
.execute(&pool)
.await
.unwrap();
sqlx::query(
"UPDATE crawler_jobs \
SET scheduled_at = $2, \
created_at = $3 \
WHERE id = $1",
)
.bind(c)
.bind(shared_scheduled)
.bind(chrono::Utc::now() - chrono::Duration::seconds(30))
.execute(&pool)
.await
.unwrap();
let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60))
.await
.unwrap();
let order: Vec<Uuid> = leases.iter().map(|l| l.id).collect();
assert_eq!(
order,
vec![c, b, a],
"lease must return jobs in created_at order when scheduled_at ties"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn reap_done_zero_is_a_no_op(pool: PgPool) {
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))

View File

@@ -6,6 +6,7 @@
use mangalord::crawler::source::{SourceChapterRef, SourceManga};
use mangalord::repo::crawler::{self, ChapterDiff, UpsertStatus};
use mangalord::repo::chapter as chapter_repo;
use sqlx::PgPool;
use uuid::Uuid;
@@ -829,6 +830,107 @@ async fn sync_tags_garbage_collects_orphan_user_attachments(pool: PgPool) {
assert_eq!(orphan_rows, 0, "orphan user-attached tag should be reaped");
}
// ---- list_missing_covers ---------------------------------------------------
#[sqlx::test(migrations = "./migrations")]
async fn list_missing_covers_only_returns_rows_without_cover(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let with_cover = sample_manga("with", "With Cover", "h1");
let without_cover = sample_manga("without", "No Cover", "h2");
let _w = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/with", &with_cover)
.await
.unwrap();
let nc = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/without", &without_cover)
.await
.unwrap();
// Manually set a cover for `with` only.
sqlx::query("UPDATE mangas SET cover_image_path = 'mangas/x/cover.jpg' WHERE id = $1")
.bind(_w.manga_id)
.execute(&pool)
.await
.unwrap();
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
assert_eq!(entries.len(), 1, "exactly the manga without a cover");
assert_eq!(entries[0].manga_id, nc.manga_id);
assert_eq!(entries[0].source_manga_key, "without");
assert_eq!(entries[0].source_url, "https://x.example/without");
}
#[sqlx::test(migrations = "./migrations")]
async fn list_missing_covers_skips_dropped_source_rows(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo", "h1");
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
.await
.unwrap();
sqlx::query("UPDATE manga_sources SET dropped_at = NOW() WHERE manga_id = $1")
.bind(up.manga_id)
.execute(&pool)
.await
.unwrap();
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
assert!(
entries.is_empty(),
"dropped-source mangas must not be backfilled — no live source to fetch from"
);
}
#[sqlx::test(migrations = "./migrations")]
async fn list_missing_covers_respects_limit(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
for i in 0..5 {
let key = format!("m{i}");
let url = format!("https://x.example/{key}");
let m = sample_manga(&key, &format!("M{i}"), &format!("h{i}"));
let _ = crawler::upsert_manga_from_source(&pool, "target", &url, &m)
.await
.unwrap();
}
let entries = crawler::list_missing_covers(&pool, 3).await.unwrap();
assert_eq!(entries.len(), 3, "limit caps the result set");
}
#[sqlx::test(migrations = "./migrations")]
async fn list_missing_covers_deduplicates_per_manga(pool: PgPool) {
// A manga surfaced by two sources should produce ONE backfill
// entry, not two — otherwise the per-tick cap could be eaten by
// duplicates and starve other mangas.
crawler::ensure_source(&pool, "src-a", "A", "https://a.example")
.await
.unwrap();
crawler::ensure_source(&pool, "src-b", "B", "https://b.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo", "h1");
let up = crawler::upsert_manga_from_source(&pool, "src-a", "https://a.example/foo", &m)
.await
.unwrap();
// Second source attaches to the SAME manga row.
sqlx::query(
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
VALUES ($1, $2, $3, $4)",
)
.bind("src-b")
.bind("foo-on-b")
.bind(up.manga_id)
.bind("https://b.example/foo")
.execute(&pool)
.await
.unwrap();
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
assert_eq!(entries.len(), 1, "DISTINCT ON (m.id) collapses duplicate source rows");
}
#[sqlx::test(migrations = "./migrations")]
async fn re_appearing_manga_clears_dropped_at(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
@@ -860,3 +962,261 @@ async fn re_appearing_manga_clears_dropped_at(pool: PgPool) {
assert!(dropped.0.is_none());
assert_eq!(dropped.1, up.manga_id);
}
// ---- source_index: site-order preservation ----
//
// The user-facing chapter list reverses the source-site order so that
// the oldest chapter appears first. The crawler records each row's DOM
// position in `chapters.source_index` (0 = first in source DOM = newest
// on this site) on every sync; the list query orders by source_index
// DESC NULLS LAST, falling through to number/created_at for rows with
// no source row (e.g. user uploads).
#[sqlx::test(migrations = "./migrations")]
async fn source_index_set_on_insert_matches_dom_order(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo Manga", "hash-1");
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
.await
.unwrap();
let chapters = vec![
SourceChapterRef {
source_chapter_key: "a".into(),
number: 30,
title: Some("Ch.30".into()),
url: "https://x.example/foo/a".into(),
},
SourceChapterRef {
source_chapter_key: "b".into(),
number: 29,
title: Some("Ch.29".into()),
url: "https://x.example/foo/b".into(),
},
SourceChapterRef {
source_chapter_key: "c".into(),
number: 28,
title: Some("Ch.28".into()),
url: "https://x.example/foo/c".into(),
},
];
crawler::sync_manga_chapters(&pool, "target", up.manga_id, &chapters)
.await
.unwrap();
let rows: Vec<(String, Option<i32>)> = sqlx::query_as(
"SELECT cs.source_chapter_key, c.source_index \
FROM chapters c \
JOIN chapter_sources cs ON cs.chapter_id = c.id \
WHERE c.manga_id = $1 \
ORDER BY cs.source_chapter_key",
)
.bind(up.manga_id)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(
rows,
vec![
("a".to_string(), Some(0)),
("b".to_string(), Some(1)),
("c".to_string(), Some(2)),
],
"source_index reflects enumerate() position in the input slice",
);
}
#[sqlx::test(migrations = "./migrations")]
async fn source_index_rewritten_on_resync_when_new_chapter_prepended(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo Manga", "hash-1");
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
.await
.unwrap();
let first = vec![
SourceChapterRef {
source_chapter_key: "a".into(),
number: 1,
title: Some("Ch.1".into()),
url: "https://x.example/foo/a".into(),
},
SourceChapterRef {
source_chapter_key: "b".into(),
number: 2,
title: Some("Ch.2".into()),
url: "https://x.example/foo/b".into(),
},
];
crawler::sync_manga_chapters(&pool, "target", up.manga_id, &first)
.await
.unwrap();
// Second sync: a brand-new chapter appears at the top of the source
// (newest first on the site). All existing rows must shift their
// source_index down by one so the display order stays correct.
let second = vec![
SourceChapterRef {
source_chapter_key: "new".into(),
number: 3,
title: Some("Ch.3".into()),
url: "https://x.example/foo/new".into(),
},
SourceChapterRef {
source_chapter_key: "a".into(),
number: 1,
title: Some("Ch.1".into()),
url: "https://x.example/foo/a".into(),
},
SourceChapterRef {
source_chapter_key: "b".into(),
number: 2,
title: Some("Ch.2".into()),
url: "https://x.example/foo/b".into(),
},
];
crawler::sync_manga_chapters(&pool, "target", up.manga_id, &second)
.await
.unwrap();
let rows: Vec<(String, Option<i32>)> = sqlx::query_as(
"SELECT cs.source_chapter_key, c.source_index \
FROM chapters c \
JOIN chapter_sources cs ON cs.chapter_id = c.id \
WHERE c.manga_id = $1 \
ORDER BY cs.source_chapter_key",
)
.bind(up.manga_id)
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(
rows,
vec![
("a".to_string(), Some(1)),
("b".to_string(), Some(2)),
("new".to_string(), Some(0)),
],
"new chapter takes index 0, existing rows shift down on UPDATE",
);
}
#[sqlx::test(migrations = "./migrations")]
async fn list_for_manga_returns_source_order_reversed(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo Manga", "hash-1");
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
.await
.unwrap();
// Site DOM order (top-down = newest-first):
// ch11 (number = 11)
// notice (number = 0, non-numeric label on the site)
// ch10 (number = 10)
// Numbers deliberately disagree with DOM order: a number-based sort
// would put notice first, but the site places it between ch10 and
// ch11. Reversed-DOM display should yield [ch10, notice, ch11].
let chapters = vec![
SourceChapterRef {
source_chapter_key: "ch11".into(),
number: 11,
title: Some("Ch.11 : Official".into()),
url: "https://x.example/foo/11".into(),
},
SourceChapterRef {
source_chapter_key: "notice".into(),
number: 0,
title: Some("notice. : Officials".into()),
url: "https://x.example/foo/notice".into(),
},
SourceChapterRef {
source_chapter_key: "ch10".into(),
number: 10,
title: Some("Ch.10 : Official".into()),
url: "https://x.example/foo/10".into(),
},
];
crawler::sync_manga_chapters(&pool, "target", up.manga_id, &chapters)
.await
.unwrap();
let listed = chapter_repo::list_for_manga(&pool, up.manga_id, 50, 0)
.await
.unwrap();
let keys: Vec<String> = listed
.iter()
.map(|c| c.title.clone().unwrap_or_default())
.collect();
assert_eq!(
keys,
vec![
"Ch.10 : Official".to_string(),
"notice. : Officials".to_string(),
"Ch.11 : Official".to_string(),
],
"list returns chapters in reversed source-DOM order, so the \
oldest appears first and non-numeric entries land where the \
site placed them",
);
}
#[sqlx::test(migrations = "./migrations")]
async fn list_for_manga_places_null_source_index_last(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let m = sample_manga("foo", "Foo Manga", "hash-1");
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
.await
.unwrap();
// Crawled chapters get source_index 0 and 1; the upload path leaves
// it NULL. NULLS LAST plus the (number, created_at) tail means the
// upload sits after both crawled rows even though its number is in
// the middle.
let crawled = vec![
SourceChapterRef {
source_chapter_key: "a".into(),
number: 1,
title: Some("Ch.1".into()),
url: "https://x.example/foo/a".into(),
},
SourceChapterRef {
source_chapter_key: "b".into(),
number: 3,
title: Some("Ch.3".into()),
url: "https://x.example/foo/b".into(),
},
];
crawler::sync_manga_chapters(&pool, "target", up.manga_id, &crawled)
.await
.unwrap();
chapter_repo::create(&pool, up.manga_id, 2, Some("User upload Ch.2"), None)
.await
.unwrap();
let listed = chapter_repo::list_for_manga(&pool, up.manga_id, 50, 0)
.await
.unwrap();
let titles: Vec<String> = listed
.iter()
.map(|c| c.title.clone().unwrap_or_default())
.collect();
assert_eq!(
titles,
vec![
"Ch.3".to_string(),
"Ch.1".to_string(),
"User upload Ch.2".to_string(),
],
"crawled rows ordered by reversed source_index; user upload \
(NULL source_index) falls through to the end",
);
}

View File

@@ -109,7 +109,7 @@ async fn dispatch_target_prefers_most_recent_live_source(pool: PgPool) {
seed_chapter_with_two_live_sources(&pool).await;
let row = dispatch_target(&pool, chapter_id).await.unwrap();
let (_manga_id, source_url) =
let (_manga_id, source_url, _title, _number) =
row.expect("two live sources should yield a dispatch target");
assert_eq!(
source_url, new_url,
@@ -133,7 +133,7 @@ async fn dispatch_target_skips_dropped_sources(pool: PgPool) {
.unwrap();
let row = dispatch_target(&pool, chapter_id).await.unwrap();
let (_manga_id, source_url) =
let (_manga_id, source_url, _title, _number) =
row.expect("a single live source should still yield a dispatch target");
assert!(
source_url != new_url,

View File

@@ -0,0 +1,167 @@
import { test, expect, type Page } from '@playwright/test';
const mangaId = '33333333-3333-3333-3333-333333333333';
const chapter1Id = 'c1111111-3333-3333-3333-333333333333';
const chapter2Id = 'c2222222-3333-3333-3333-333333333333';
const chapter3Id = 'c3333333-3333-3333-3333-333333333333';
const mangaFixture = {
id: mangaId,
title: 'Vinland Saga',
author: 'Makoto Yukimura',
description: null,
cover_image_path: null,
created_at: '2026-01-01T00:00:00Z',
updated_at: '2026-01-01T00:00:00Z'
};
const chaptersFixture = [
{
id: chapter1Id,
manga_id: mangaId,
number: 1,
title: 'Somewhere, Not Here',
page_count: 1,
created_at: '2026-01-01T00:00:00Z'
},
{
id: chapter2Id,
manga_id: mangaId,
number: 2,
title: null,
page_count: 1,
created_at: '2026-01-02T00:00:00Z'
},
{
id: chapter3Id,
manga_id: mangaId,
number: 3,
title: 'Sword Dance',
page_count: 1,
created_at: '2026-01-03T00:00:00Z'
}
];
function pageFixture(chapterId: string) {
return [
{
id: `p1111111-${chapterId.slice(1, 8)}-3333-3333-333333333333`,
chapter_id: chapterId,
page_number: 1,
storage_key: `mangas/${mangaId}/chapters/${chapterId}/pages/0001.png`,
content_type: 'image/png'
}
];
}
async function mockReaderApis(page: Page) {
// Force public mode so the layout doesn't bounce anonymous visitors
// to /login (the dev backend on this machine runs with
// PRIVATE_MODE=true, which the layout's universal load respects).
await page.route('**/api/v1/auth/config', (route) =>
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify({ self_register_enabled: true, private_mode: false })
})
);
await page.route('**/api/v1/auth/me', (route) =>
route.fulfill({
status: 401,
contentType: 'application/json',
body: JSON.stringify({ error: { code: 'unauthenticated', message: '' } })
})
);
await page.route('**/api/v1/auth/me/preferences', (route) =>
route.fulfill({
status: 401,
contentType: 'application/json',
body: JSON.stringify({ error: { code: 'unauthenticated', message: '' } })
})
);
await page.route('**/api/v1/me/bookmarks*', (route) =>
route.fulfill({
status: 401,
contentType: 'application/json',
body: JSON.stringify({ error: { code: 'unauthenticated', message: '' } })
})
);
await page.route(`**/api/v1/mangas/${mangaId}`, (route) =>
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify(mangaFixture)
})
);
await page.route(new RegExp(`/api/v1/mangas/${mangaId}/chapters(\\?.*)?$`), (route) =>
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify({
items: chaptersFixture,
page: { limit: 200, offset: 0, total: chaptersFixture.length }
})
})
);
for (const c of chaptersFixture) {
await page.route(`**/api/v1/mangas/${mangaId}/chapters/${c.id}`, (route) =>
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify(c)
})
);
await page.route(
`**/api/v1/mangas/${mangaId}/chapters/${c.id}/pages`,
(route) =>
route.fulfill({
status: 200,
contentType: 'application/json',
body: JSON.stringify({ pages: pageFixture(c.id) })
})
);
}
const png = Buffer.from(
'89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4890000000d49444154789c63000100000005000158a3b62a0000000049454e44ae426082',
'hex'
);
await page.route('**/api/v1/files/**', (route) =>
route.fulfill({ status: 200, contentType: 'image/png', body: png })
);
}
test('reader chapter select lists every chapter with the manga-detail-style label', async ({
page
}) => {
await mockReaderApis(page);
await page.goto(`/manga/${mangaId}/chapter/${chapter2Id}`);
const select = page.getByTestId('reader-chapter-select');
await expect(select).toBeVisible();
// The current chapter is preselected.
await expect(select).toHaveValue(chapter2Id);
// Each chapter rendered as "Ch. N — Title" (or "Ch. N" when title is null),
// in ascending number order — matching the prev/next sort.
const labels = await select.locator('option').allTextContents();
expect(labels.map((l) => l.trim())).toEqual([
'Ch. 1 — Somewhere, Not Here',
'Ch. 2',
'Ch. 3 — Sword Dance'
]);
});
test('choosing a chapter from the select navigates to that chapter', async ({ page }) => {
await mockReaderApis(page);
await page.goto(`/manga/${mangaId}/chapter/${chapter1Id}`);
await expect(page.getByTestId('reader-chapter-select')).toHaveValue(chapter1Id);
await page.getByTestId('reader-chapter-select').selectOption(chapter3Id);
await expect(page).toHaveURL(
new RegExp(`/manga/${mangaId}/chapter/${chapter3Id}$`)
);
await expect(page.getByTestId('reader-chapter-select')).toHaveValue(chapter3Id);
});

View File

@@ -120,7 +120,7 @@ test('manga overview shows title, cover, and a chapter list', async ({ page }) =
await expect(page.getByTestId('manga-title')).toHaveText('Berserk');
await expect(page.getByTestId('manga-author')).toContainText('Kentaro Miura');
await expect(page.getByTestId('manga-cover')).toBeVisible();
await expect(page.getByTestId('chapter-list')).toContainText('Chapter 1');
await expect(page.getByTestId('chapter-list')).toContainText('The Brand');
await expect(page.getByTestId('bookmark-signin')).toBeVisible();
});

View File

@@ -1,6 +1,6 @@
{
"name": "mangalord-frontend",
"version": "0.49.1",
"version": "0.54.0",
"private": true,
"type": "module",
"scripts": {

View File

@@ -14,7 +14,19 @@ import {
createAdminUser,
listAdminMangas,
listAdminChapters,
getSystemStats
getSystemStats,
resyncManga,
resyncChapter,
getCrawlerStatus,
crawlerStatusStreamUrl,
runCrawlerPass,
restartCrawlerBrowser,
updateCrawlerSession,
clearCrawlerSessionExpired,
listDeadJobs,
requeueDeadJobs,
listActiveJobs,
listMissingCovers
} from './admin';
function ok(body: unknown, status = 200): Response {
@@ -242,4 +254,211 @@ describe('admin api client', () => {
const s = await getSystemStats();
expect(s.disk).toBeNull();
});
// ---- force resync ----
it('resyncManga POSTs to /v1/admin/mangas/{id}/resync and returns the envelope', async () => {
const resp = {
manga: {
id: 'm-1',
title: 'T',
status: 'ongoing',
alt_titles: [],
description: null,
cover_image_path: 'mangas/m-1/cover.jpg',
created_at: '2026-01-01T00:00:00Z',
updated_at: '2026-01-02T00:00:00Z',
authors: [],
genres: [],
tags: []
},
metadata_status: 'updated',
cover_fetched: true
};
fetchSpy.mockResolvedValueOnce(ok(resp));
const got = await resyncManga('m-1');
expect(got.metadata_status).toBe('updated');
expect(got.cover_fetched).toBe(true);
expect(got.manga.id).toBe('m-1');
const url = fetchSpy.mock.calls[0][0] as string;
expect(url).toMatch(/\/v1\/admin\/mangas\/m-1\/resync$/);
const init = fetchSpy.mock.calls[0][1] as RequestInit;
expect(init.method).toBe('POST');
});
it('resyncManga surfaces 503 service_unavailable when the daemon is off', async () => {
fetchSpy.mockResolvedValueOnce(
envelope(503, 'service_unavailable', 'crawler daemon is disabled')
);
await expect(resyncManga('m-1')).rejects.toMatchObject({
status: 503,
code: 'service_unavailable'
});
});
it('resyncChapter POSTs to /v1/admin/chapters/{id}/resync and returns the envelope', async () => {
const resp = {
chapter: {
id: 'c-1',
manga_id: 'm-1',
number: 1,
title: 'Foo',
page_count: 7,
created_at: '2026-01-01T00:00:00Z'
},
outcome: 'fetched',
pages: 7
};
fetchSpy.mockResolvedValueOnce(ok(resp));
const got = await resyncChapter('c-1');
expect(got.outcome).toBe('fetched');
expect(got.pages).toBe(7);
expect(got.chapter.page_count).toBe(7);
const url = fetchSpy.mock.calls[0][0] as string;
expect(url).toMatch(/\/v1\/admin\/chapters\/c-1\/resync$/);
const init = fetchSpy.mock.calls[0][1] as RequestInit;
expect(init.method).toBe('POST');
});
it('resyncChapter handles the "skipped" outcome envelope', async () => {
const resp = {
chapter: {
id: 'c-1',
manga_id: 'm-1',
number: 1,
title: null,
page_count: 7,
created_at: '2026-01-01T00:00:00Z'
},
outcome: 'skipped',
pages: null
};
fetchSpy.mockResolvedValueOnce(ok(resp));
const got = await resyncChapter('c-1');
expect(got.outcome).toBe('skipped');
expect(got.pages).toBeNull();
});
});
describe('admin crawler api client', () => {
let fetchSpy: MockInstance<typeof globalThis.fetch>;
beforeEach(() => {
fetchSpy = vi.spyOn(globalThis, 'fetch');
});
afterEach(() => {
vi.restoreAllMocks();
});
const statusFixture = {
daemon: 'running',
phase: { state: 'fetching_metadata', index: 3, total: 10, title: 'One Piece' },
worker_count: 2,
active_chapters: [
{
manga_id: 'm-1',
manga_title: 'Bleach',
chapter_id: 'c-1',
chapter_number: 12,
pages_done: 4,
pages_total: 20
}
],
current_cover: { manga_id: 'm-2', manga_title: 'Naruto' },
covers_queued: 7,
last_pass: { at: null, discovered: 0, upserted: 0, covers_fetched: 0, mangas_failed: 0 },
session: { expired: false, configured: true },
browser: 'healthy',
queue: { pending: 2, running: 1, dead: 4 }
};
it('crawlerStatusStreamUrl points at the SSE endpoint under the API base', () => {
expect(crawlerStatusStreamUrl()).toMatch(/\/v1\/admin\/crawler\/stream$/);
});
it('getCrawlerStatus GETs /v1/admin/crawler with live chapter/cover fields', async () => {
fetchSpy.mockResolvedValueOnce(ok(statusFixture));
const s = await getCrawlerStatus();
expect(s.queue.dead).toBe(4);
expect(s.phase?.state).toBe('fetching_metadata');
expect(s.active_chapters[0].pages_done).toBe(4);
expect(s.active_chapters[0].pages_total).toBe(20);
expect(s.current_cover?.manga_title).toBe('Naruto');
expect(s.covers_queued).toBe(7);
const url = fetchSpy.mock.calls[0][0] as string;
expect(url).toMatch(/\/v1\/admin\/crawler$/);
});
it('listActiveJobs GETs /v1/admin/crawler/active-jobs with search', async () => {
fetchSpy.mockResolvedValueOnce(
ok({ items: [], page: { limit: 20, offset: 0, total: 0 } })
);
await listActiveJobs({ search: 'bleach' });
const url = fetchSpy.mock.calls[0][0] as string;
expect(url).toMatch(/\/v1\/admin\/crawler\/active-jobs\?/);
expect(url).toContain('search=bleach');
});
it('listMissingCovers GETs /v1/admin/crawler/covers', async () => {
fetchSpy.mockResolvedValueOnce(
ok({ items: [{ manga_id: 'm-1', manga_title: 'X' }], page: { limit: 20, offset: 0, total: 1 } })
);
const r = await listMissingCovers();
expect(r.items[0].manga_title).toBe('X');
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/covers$/);
});
it('runCrawlerPass POSTs /v1/admin/crawler/run', async () => {
fetchSpy.mockResolvedValueOnce(ok({ started: true }));
const r = await runCrawlerPass();
expect(r.started).toBe(true);
const init = fetchSpy.mock.calls[0][1] as RequestInit;
expect(init.method).toBe('POST');
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/run$/);
});
it('restartCrawlerBrowser POSTs the restart endpoint', async () => {
fetchSpy.mockResolvedValueOnce(ok({ ok: true, error: null }));
const r = await restartCrawlerBrowser();
expect(r.ok).toBe(true);
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/browser\/restart$/);
});
it('updateCrawlerSession POSTs the phpsessid body', async () => {
fetchSpy.mockResolvedValueOnce(ok({ valid: true, error: null }));
const r = await updateCrawlerSession('abc123');
expect(r.valid).toBe(true);
const init = fetchSpy.mock.calls[0][1] as RequestInit;
expect(init.method).toBe('POST');
expect(JSON.parse(init.body as string)).toEqual({ phpsessid: 'abc123' });
});
it('clearCrawlerSessionExpired POSTs clear-expired', async () => {
fetchSpy.mockResolvedValueOnce(ok({ cleared: true }));
const r = await clearCrawlerSessionExpired();
expect(r.cleared).toBe(true);
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/session\/clear-expired$/);
});
it('listDeadJobs forwards search + pagination', async () => {
fetchSpy.mockResolvedValueOnce(
ok({ items: [], page: { limit: 20, offset: 20, total: 0 } })
);
await listDeadJobs({ search: 'naruto', limit: 20, offset: 20 });
const url = fetchSpy.mock.calls[0][0] as string;
expect(url).toContain('search=naruto');
expect(url).toContain('offset=20');
});
it('requeueDeadJobs POSTs the scope body', async () => {
fetchSpy.mockResolvedValueOnce(ok({ requeued: 3 }));
const r = await requeueDeadJobs({ scope: 'manga', manga_id: 'm-9' });
expect(r.requeued).toBe(3);
const init = fetchSpy.mock.calls[0][1] as RequestInit;
expect(JSON.parse(init.body as string)).toEqual({ scope: 'manga', manga_id: 'm-9' });
});
it('surfaces a 503 as ApiError', async () => {
fetchSpy.mockResolvedValueOnce(envelope(503, 'service_unavailable', 'disabled'));
await expect(runCrawlerPass()).rejects.toMatchObject({ status: 503 });
});
});

View File

@@ -3,8 +3,10 @@
// won't reach these routes). 403s thrown here propagate up to the
// /admin layout, which renders the framework error page.
import { request, type Page } from './client';
import { request, apiUrl, type Page } from './client';
import type { User } from './auth';
import type { MangaDetail } from './mangas';
import type { Chapter } from './chapters';
// ---- users -----------------------------------------------------------------
@@ -176,3 +178,208 @@ export type SystemStats = {
export async function getSystemStats(): Promise<SystemStats> {
return request<SystemStats>('/v1/admin/system');
}
// ---- force resync ----------------------------------------------------------
export type MangaResyncResponse = {
manga: MangaDetail;
metadata_status: 'new' | 'updated' | 'unchanged';
cover_fetched: boolean;
};
export type ChapterResyncResponse = {
chapter: Chapter;
outcome: 'fetched' | 'skipped';
/** Page count when `outcome === 'fetched'`; null when skipped. */
pages: number | null;
};
/** POST /v1/admin/mangas/:id/resync — refetches metadata + cover from
* the manga's live crawler source. Long-running (one HTTP request per
* Chromium nav + image download), so the UI should disable the trigger
* and surface progress. */
export async function resyncManga(id: string): Promise<MangaResyncResponse> {
return request<MangaResyncResponse>(
`/v1/admin/mangas/${encodeURIComponent(id)}/resync`,
{ method: 'POST' }
);
}
/** POST /v1/admin/chapters/:id/resync — force-refetches a chapter's
* pages even if `page_count > 0`. Same long-running caveat as
* `resyncManga`. */
export async function resyncChapter(id: string): Promise<ChapterResyncResponse> {
return request<ChapterResyncResponse>(
`/v1/admin/chapters/${encodeURIComponent(id)}/resync`,
{ method: 'POST' }
);
}
// ---- crawler observability + control ---------------------------------------
/** Current daemon activity. Discriminated on `state`. */
export type CrawlerPhase =
| { state: 'idle'; next_fire: string | null }
| { state: 'walking_list' }
| { state: 'fetching_metadata'; index: number; total: number | null; title: string }
| { state: 'cover_backfill'; index: number; total: number };
/** A chapter being crawled right now, with a live page count. */
export type ActiveChapter = {
manga_id: string;
manga_title: string;
chapter_id: string;
chapter_number: number;
pages_done: number;
pages_total: number | null;
};
export type CrawlerLastPass = {
at: string | null;
discovered: number;
upserted: number;
covers_fetched: number;
mangas_failed: number;
};
export type CrawlerStatus = {
daemon: 'running' | 'disabled';
phase: CrawlerPhase | null;
worker_count: number;
active_chapters: ActiveChapter[];
current_cover: { manga_id: string; manga_title: string } | null;
covers_queued: number;
last_pass: CrawlerLastPass;
session: { expired: boolean; configured: boolean };
browser: 'healthy' | 'draining' | 'restarting' | 'down';
queue: { pending: number; running: number; dead: number };
};
export async function getCrawlerStatus(): Promise<CrawlerStatus> {
return request<CrawlerStatus>('/v1/admin/crawler');
}
/** URL of the Server-Sent Events live-status stream. Open with
* `new EventSource(...)` while the crawler page is mounted and close it on
* navigate-away so the subscription is scoped to the active page. Each
* message is a named `status` event whose `data` is a {@link CrawlerStatus}. */
export function crawlerStatusStreamUrl(): string {
return apiUrl('/v1/admin/crawler/stream');
}
/** POST /v1/admin/crawler/run — trigger an out-of-cycle metadata pass. */
export async function runCrawlerPass(): Promise<{ started: boolean }> {
return request('/v1/admin/crawler/run', { method: 'POST' });
}
/** POST /v1/admin/crawler/browser/restart — coordinated Chromium restart. */
export async function restartCrawlerBrowser(): Promise<{ ok: boolean; error: string | null }> {
return request('/v1/admin/crawler/browser/restart', { method: 'POST' });
}
/** POST /v1/admin/crawler/session — refresh PHPSESSID and re-probe. */
export async function updateCrawlerSession(
phpsessid: string
): Promise<{ valid: boolean; error: string | null }> {
return request('/v1/admin/crawler/session', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ phpsessid })
});
}
/** POST /v1/admin/crawler/session/clear-expired — resume idled workers. */
export async function clearCrawlerSessionExpired(): Promise<{ cleared: boolean }> {
return request('/v1/admin/crawler/session/clear-expired', { method: 'POST' });
}
export type DeadJob = {
id: string;
kind: string;
chapter_id: string | null;
manga_id: string | null;
manga_title: string | null;
chapter_number: number | null;
attempts: number;
max_attempts: number;
last_error: string | null;
updated_at: string;
};
export type DeadJobsPage = { items: DeadJob[]; page: Page };
export async function listDeadJobs(opts?: {
search?: string;
limit?: number;
offset?: number;
}): Promise<DeadJobsPage> {
const params = new URLSearchParams();
if (opts?.search) params.set('search', opts.search);
if (opts?.limit != null) params.set('limit', String(opts.limit));
if (opts?.offset != null) params.set('offset', String(opts.offset));
const qs = params.toString();
return request<DeadJobsPage>(`/v1/admin/crawler/dead-jobs${qs ? `?${qs}` : ''}`);
}
/** Requeue scope: all dead jobs, one manga's, one chapter's, or a single job. */
export type RequeueScope =
| { scope: 'all' }
| { scope: 'manga'; manga_id: string }
| { scope: 'chapter'; chapter_id: string }
| { scope: 'job'; job_id: string };
export async function requeueDeadJobs(scope: RequeueScope): Promise<{ requeued: number }> {
return request('/v1/admin/crawler/dead-jobs/requeue', {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(scope)
});
}
/** A queued/running chapter-content job (which chapters are queued). */
export type ActiveJob = {
id: string;
chapter_id: string | null;
manga_id: string | null;
manga_title: string | null;
chapter_number: number | null;
state: 'pending' | 'running';
attempts: number;
max_attempts: number;
updated_at: string;
};
export type ActiveJobsPage = { items: ActiveJob[]; page: Page };
/** GET /v1/admin/crawler/active-jobs — which chapters of which mangas are
* queued or running now. */
export async function listActiveJobs(opts?: {
search?: string;
limit?: number;
offset?: number;
}): Promise<ActiveJobsPage> {
const params = new URLSearchParams();
if (opts?.search) params.set('search', opts.search);
if (opts?.limit != null) params.set('limit', String(opts.limit));
if (opts?.offset != null) params.set('offset', String(opts.offset));
const qs = params.toString();
return request<ActiveJobsPage>(`/v1/admin/crawler/active-jobs${qs ? `?${qs}` : ''}`);
}
/** A manga queued for a cover fetch (no cover yet + a live source). */
export type MissingCover = { manga_id: string; manga_title: string };
export type MissingCoversPage = { items: MissingCover[]; page: Page };
/** GET /v1/admin/crawler/covers — which manga covers are queued. */
export async function listMissingCovers(opts?: {
search?: string;
limit?: number;
offset?: number;
}): Promise<MissingCoversPage> {
const params = new URLSearchParams();
if (opts?.search) params.set('search', opts.search);
if (opts?.limit != null) params.set('limit', String(opts.limit));
if (opts?.offset != null) params.set('offset', String(opts.offset));
const qs = params.toString();
return request<MissingCoversPage>(`/v1/admin/crawler/covers${qs ? `?${qs}` : ''}`);
}

View File

@@ -11,7 +11,8 @@ import {
listChapters,
getChapter,
getChapterPages,
createChapter
createChapter,
chapterLabel
} from './chapters';
function ok(body: unknown): Response {
@@ -129,6 +130,18 @@ describe('chapters api client', () => {
}
});
describe('chapterLabel', () => {
it('returns the site title verbatim when present', () => {
expect(chapterLabel({ number: 7, title: 'Ch.7 : Official' })).toBe(
'Ch.7 : Official'
);
});
it('falls back to "Chapter {number}" when title is null', () => {
expect(chapterLabel({ number: 3, title: null })).toBe('Chapter 3');
});
});
it('getChapterPages unwraps the {pages} envelope into the array', async () => {
fetchSpy.mockResolvedValueOnce(
ok({

View File

@@ -14,6 +14,10 @@ export type ChaptersPage = {
page: Page;
};
export function chapterLabel(c: Pick<Chapter, 'number' | 'title'>): string {
return c.title ?? `Chapter ${c.number}`;
}
export type ListOptions = {
limit?: number;
offset?: number;

View File

@@ -12,6 +12,15 @@ export function fileUrl(key: string): string {
return `${BASE}/v1/files/${key}`;
}
/**
* Builds an API URL for non-`fetch` consumers (e.g. `EventSource` for SSE),
* applying the same `VITE_API_BASE` prefix as `request()`. `path` is the
* route after the base, e.g. `/v1/admin/crawler/stream`.
*/
export function apiUrl(path: string): string {
return `${BASE}${path}`;
}
export class ApiError extends Error {
constructor(
public readonly status: number,

View File

@@ -6,6 +6,7 @@
{ href: '/admin', label: 'Overview' },
{ href: '/admin/users', label: 'Users' },
{ href: '/admin/mangas', label: 'Mangas' },
{ href: '/admin/crawler', label: 'Crawler' },
{ href: '/admin/system', label: 'System' }
];
</script>

View File

@@ -0,0 +1,838 @@
<script lang="ts">
import { onMount, onDestroy } from 'svelte';
import Modal from '$lib/components/Modal.svelte';
import Pager from '$lib/components/Pager.svelte';
import {
getCrawlerStatus,
crawlerStatusStreamUrl,
runCrawlerPass,
restartCrawlerBrowser,
updateCrawlerSession,
clearCrawlerSessionExpired,
listDeadJobs,
requeueDeadJobs,
listActiveJobs,
listMissingCovers,
type CrawlerStatus,
type CrawlerPhase,
type DeadJob,
type ActiveJob,
type MissingCover,
type RequeueScope
} from '$lib/api/admin';
let status: CrawlerStatus | null = $state(null);
let error: string | null = $state(null);
let notice: string | null = $state(null);
let live = $state(false);
let source: EventSource | null = null;
let busy = $state(false);
// Dead jobs
let deadJobs: DeadJob[] = $state([]);
let deadTotal = $state(0);
let deadSearch = $state('');
let deadPage = $state(1);
const DEAD_LIMIT = 20;
// Queued chapters (pending/running)
let activeJobs: ActiveJob[] = $state([]);
let activeTotal = $state(0);
let activeSearch = $state('');
let activePage = $state(1);
const ACTIVE_LIMIT = 20;
// Queued covers (mangas missing a cover)
let covers: MissingCover[] = $state([]);
let coversTotal = $state(0);
let coversSearch = $state('');
let coversPage = $state(1);
const COVERS_LIMIT = 20;
// Modals
let sessionModalOpen = $state(false);
let restartModalOpen = $state(false);
let phpsessid = $state('');
let sessionResult: string | null = $state(null);
async function refresh() {
try {
status = await getCrawlerStatus();
error = null;
} catch (e) {
error = e instanceof Error ? e.message : 'refresh failed';
}
}
async function loadDeadJobs() {
try {
const resp = await listDeadJobs({
search: deadSearch.trim() || undefined,
limit: DEAD_LIMIT,
offset: (deadPage - 1) * DEAD_LIMIT
});
deadJobs = resp.items;
deadTotal = resp.page.total ?? resp.items.length;
} catch (e) {
error = e instanceof Error ? e.message : 'failed to load dead jobs';
}
}
async function loadActiveJobs() {
try {
const resp = await listActiveJobs({
search: activeSearch.trim() || undefined,
limit: ACTIVE_LIMIT,
offset: (activePage - 1) * ACTIVE_LIMIT
});
activeJobs = resp.items;
activeTotal = resp.page.total ?? resp.items.length;
} catch (e) {
error = e instanceof Error ? e.message : 'failed to load queued chapters';
}
}
async function loadCovers() {
try {
const resp = await listMissingCovers({
search: coversSearch.trim() || undefined,
limit: COVERS_LIMIT,
offset: (coversPage - 1) * COVERS_LIMIT
});
covers = resp.items;
coversTotal = resp.page.total ?? resp.items.length;
} catch (e) {
error = e instanceof Error ? e.message : 'failed to load queued covers';
}
}
// Auto-refresh the (fetched, not streamed) backlog lists when the live
// status shows the relevant counts moved — keeps the lists feeling live
// without pushing big payloads over SSE. `$effect` re-runs when these
// tracked values change.
let lastQueueKey = $state('');
let lastCoversKey = $state(-1);
$effect(() => {
const k = `${status?.queue.pending ?? 0}:${status?.queue.running ?? 0}`;
if (k !== lastQueueKey) {
lastQueueKey = k;
loadActiveJobs();
}
});
$effect(() => {
const c = status?.covers_queued ?? -1;
if (c !== lastCoversKey) {
lastCoversKey = c;
loadCovers();
}
});
// Live updates via Server-Sent Events instead of polling. The
// EventSource is opened on mount and closed on destroy, so the
// subscription exists only while this page is showing live data.
function openStream() {
const es = new EventSource(crawlerStatusStreamUrl(), { withCredentials: true });
es.addEventListener('status', (e) => {
try {
status = JSON.parse((e as MessageEvent).data) as CrawlerStatus;
error = null;
live = true;
} catch {
// ignore a malformed frame; the next one will replace it
}
});
es.onopen = () => {
live = true;
};
es.onerror = () => {
// The browser auto-reconnects; reflect the gap in the UI.
live = false;
};
source = es;
}
onMount(() => {
// One-shot fetch for instant initial paint + resilience if SSE is
// blocked; the stream then drives subsequent updates.
refresh();
loadDeadJobs();
openStream();
});
onDestroy(() => {
source?.close();
source = null;
});
async function withBusy(label: string, fn: () => Promise<void>) {
busy = true;
notice = null;
error = null;
try {
await fn();
} catch (e) {
error = e instanceof Error ? e.message : `${label} failed`;
} finally {
busy = false;
await refresh();
}
}
async function onRunPass() {
await withBusy('run pass', async () => {
await runCrawlerPass();
notice = 'Metadata pass started.';
});
}
async function onConfirmRestart() {
restartModalOpen = false;
await withBusy('restart browser', async () => {
const r = await restartCrawlerBrowser();
notice = r.ok ? 'Browser restarted.' : `Restart failed: ${r.error ?? 'unknown'}`;
});
}
async function onSaveSession() {
sessionResult = null;
busy = true;
try {
const r = await updateCrawlerSession(phpsessid);
sessionResult = r.valid
? '✓ Session valid — workers resumed.'
: `✕ Probe failed: ${r.error ?? 'unauthenticated'}`;
if (r.valid) {
sessionModalOpen = false;
phpsessid = '';
notice = 'Session updated.';
}
} catch (e) {
sessionResult = e instanceof Error ? e.message : 'update failed';
} finally {
busy = false;
await refresh();
}
}
async function onClearExpired() {
await withBusy('clear expired', async () => {
await clearCrawlerSessionExpired();
notice = 'Session-expired flag cleared.';
});
}
async function requeue(scope: RequeueScope) {
await withBusy('requeue', async () => {
const r = await requeueDeadJobs(scope);
notice = `Requeued ${r.requeued} job(s).`;
await loadDeadJobs();
});
}
function onSearchDead() {
deadPage = 1;
loadDeadJobs();
}
function onDeadPageChange(p: number) {
deadPage = p;
loadDeadJobs();
}
function onSearchActive() {
activePage = 1;
loadActiveJobs();
}
function onActivePageChange(p: number) {
activePage = p;
loadActiveJobs();
}
function onSearchCovers() {
coversPage = 1;
loadCovers();
}
function onCoversPageChange(p: number) {
coversPage = p;
loadCovers();
}
// ---- display helpers ----
function phaseLabel(p: CrawlerPhase | null): string {
if (!p) return 'Daemon disabled';
switch (p.state) {
case 'idle':
return p.next_fire
? `Idle — next pass ${new Date(p.next_fire).toLocaleString()}`
: 'Idle';
case 'walking_list':
return 'Walking source list';
case 'fetching_metadata':
return `Fetching metadata · ${p.index}/${p.total ?? '?'} · ${p.title}`;
case 'cover_backfill':
return `Backfilling covers · ${p.index + 1}/${p.total}`;
}
}
function phasePercent(p: CrawlerPhase | null): number | null {
if (p && p.state === 'fetching_metadata' && p.total && p.total > 0) {
return Math.min(100, (p.index / p.total) * 100);
}
return null;
}
function sessionPill(s: CrawlerStatus): { cls: string; text: string } {
if (s.daemon === 'disabled') return { cls: 'badge-not_downloaded', text: 'n/a' };
if (s.session.expired) return { cls: 'badge-in_progress', text: 'Expired' };
if (!s.session.configured) return { cls: 'badge-not_downloaded', text: 'Not set' };
return { cls: 'badge-synced', text: 'OK' };
}
function browserPill(s: CrawlerStatus): { cls: string; text: string } {
switch (s.browser) {
case 'healthy':
return { cls: 'badge-synced', text: 'Up' };
case 'draining':
case 'restarting':
return { cls: 'badge-in_progress', text: s.browser };
default:
return { cls: 'badge-not_downloaded', text: 'Down' };
}
}
const deadTotalPages = $derived(Math.max(1, Math.ceil(deadTotal / DEAD_LIMIT)));
const activeTotalPages = $derived(Math.max(1, Math.ceil(activeTotal / ACTIVE_LIMIT)));
const coversTotalPages = $derived(Math.max(1, Math.ceil(coversTotal / COVERS_LIMIT)));
function chapterPercent(c: { pages_done: number; pages_total: number | null }): number | null {
return c.pages_total && c.pages_total > 0
? Math.min(100, (c.pages_done / c.pages_total) * 100)
: null;
}
</script>
<div class="titlebar">
<h1>Crawler</h1>
<span class="livedot" class:on={live} title={live ? 'Live (SSE)' : 'Reconnecting…'}>
{live ? '● live' : '○ reconnecting…'}
</span>
</div>
{#if error}
<p class="error" role="alert">{error}</p>
{/if}
{#if notice}
<p class="notice" role="status">{notice}</p>
{/if}
{#if status}
<!-- Status hero -->
<section class="hero" data-testid="crawler-hero">
<div class="pills">
<span class="pill"
>Daemon
<span class={`badge ${status.daemon === 'running' ? 'badge-synced' : 'badge-not_downloaded'}`}
>{status.daemon}</span
></span
>
<span class="pill"
>Session <span class={`badge ${sessionPill(status).cls}`}>{sessionPill(status).text}</span></span
>
<span class="pill"
>Browser <span class={`badge ${browserPill(status).cls}`}>{browserPill(status).text}</span></span
>
</div>
<p class="phase" data-testid="crawler-phase">{phaseLabel(status.phase)}</p>
{#if phasePercent(status.phase) !== null}
{@render Bar({ percent: phasePercent(status.phase) ?? 0 })}
{/if}
{#if status.session.expired}
<p class="warn">
⚠ Chapter downloads paused — session expired. Metadata + list crawl continue.
</p>
{/if}
{#if status.current_cover}
<p class="cover" data-testid="current-cover">
🖼 Fetching cover: <strong>{status.current_cover.manga_title}</strong>
</p>
{/if}
<p class="lastpass">
Last pass:
{#if status.last_pass.at}
{new Date(status.last_pass.at).toLocaleString()} ·
{status.last_pass.discovered} seen · {status.last_pass.upserted} upserted ·
{status.last_pass.mangas_failed} failed
{:else}
— none yet this session
{/if}
</p>
</section>
<!-- Controls -->
<section class="controls">
<button onclick={onRunPass} disabled={busy || status.daemon !== 'running'}
>Run metadata pass now</button
>
<button onclick={() => (restartModalOpen = true)} disabled={busy || status.daemon !== 'running'}
>Restart browser</button
>
<button onclick={() => { sessionModalOpen = true; sessionResult = null; }} disabled={busy || status.daemon !== 'running'}
>Manage session…</button
>
{#if status.session.expired}
<button onclick={onClearExpired} disabled={busy}>Clear expired flag</button>
{/if}
</section>
<!-- Queue + covers stats -->
<section class="grid2">
<article>
<h2>Queue</h2>
<dl>
<dt>Pending</dt>
<dd>{status.queue.pending}</dd>
<dt>Running</dt>
<dd>{status.queue.running}</dd>
<dt>Dead</dt>
<dd>{status.queue.dead}</dd>
<dt>Covers queued</dt>
<dd>{status.covers_queued}</dd>
</dl>
</article>
<article>
<h2>Active chapters ({status.active_chapters.length}/{status.worker_count})</h2>
{#if status.active_chapters.length === 0}
<p class="muted">idle — no chapters downloading</p>
{:else}
<table class="active">
<tbody>
{#each status.active_chapters as c (c.chapter_id)}
<tr>
<td>{c.manga_title} · ch.{c.chapter_number}</td>
<td class="pagecount" data-testid="active-pages">
{c.pages_done}/{c.pages_total ?? '?'}
</td>
<td class="pagebar">
{#if chapterPercent(c) !== null}
{@render Bar({ percent: chapterPercent(c) ?? 0 })}
{/if}
</td>
</tr>
{/each}
</tbody>
</table>
{/if}
</article>
</section>
{:else}
<p>Loading…</p>
{/if}
<!-- Queued chapters (pending/running backlog) -->
<section class="backlog">
<div class="deadhead">
<h2>Queued chapters ({activeTotal})</h2>
<div class="deadtools">
<input
placeholder="Search manga…"
bind:value={activeSearch}
onkeydown={(e) => e.key === 'Enter' && onSearchActive()}
/>
<button onclick={onSearchActive}>Search</button>
</div>
</div>
{#if activeJobs.length === 0}
<p class="muted">No chapters queued.</p>
{:else}
<table class="dead">
<thead>
<tr>
<th>Manga / Chapter</th>
<th>State</th>
<th>Att.</th>
</tr>
</thead>
<tbody>
{#each activeJobs as j (j.id)}
<tr>
<td>
{j.manga_title ?? '(unknown)'}
{#if j.chapter_number != null}· ch.{j.chapter_number}{/if}
</td>
<td>
<span
class={`badge ${j.state === 'running' ? 'badge-downloading' : 'badge-not_downloaded'}`}
>{j.state}</span
>
</td>
<td>{j.attempts}/{j.max_attempts}</td>
</tr>
{/each}
</tbody>
</table>
<Pager page={activePage} totalPages={activeTotalPages} onChange={onActivePageChange} />
{/if}
</section>
<!-- Queued covers (mangas missing a cover) -->
<section class="backlog">
<div class="deadhead">
<h2>Queued covers ({coversTotal})</h2>
<div class="deadtools">
<input
placeholder="Search manga…"
bind:value={coversSearch}
onkeydown={(e) => e.key === 'Enter' && onSearchCovers()}
/>
<button onclick={onSearchCovers}>Search</button>
</div>
</div>
{#if covers.length === 0}
<p class="muted">No covers queued 🎉</p>
{:else}
<table class="dead">
<thead>
<tr><th>Manga</th></tr>
</thead>
<tbody>
{#each covers as c (c.manga_id)}
<tr><td>{c.manga_title}</td></tr>
{/each}
</tbody>
</table>
<Pager page={coversPage} totalPages={coversTotalPages} onChange={onCoversPageChange} />
{/if}
</section>
<!-- Dead jobs -->
<section class="deadjobs">
<div class="deadhead">
<h2>Dead jobs ({deadTotal})</h2>
<div class="deadtools">
<input
placeholder="Search manga…"
bind:value={deadSearch}
onkeydown={(e) => e.key === 'Enter' && onSearchDead()}
/>
<button onclick={onSearchDead}>Search</button>
<button
onclick={() => requeue({ scope: 'all' })}
disabled={busy || deadTotal === 0}>Requeue all ({deadTotal})</button
>
</div>
</div>
{#if deadJobs.length === 0}
<p class="muted">No dead jobs 🎉</p>
{:else}
<table class="dead">
<thead>
<tr>
<th>Manga / Chapter</th>
<th>Att.</th>
<th>Failed</th>
<th>Last error</th>
<th class="actions">Action</th>
</tr>
</thead>
<tbody>
{#each deadJobs as j (j.id)}
<tr>
<td>
{j.manga_title ?? '(unknown)'}
{#if j.chapter_number != null}· ch.{j.chapter_number}{/if}
</td>
<td>{j.attempts}/{j.max_attempts}</td>
<td>{new Date(j.updated_at).toLocaleDateString()}</td>
<td class="err" title={j.last_error ?? ''}>{j.last_error ?? '—'}</td>
<td class="actions">
<button onclick={() => requeue({ scope: 'job', job_id: j.id })} disabled={busy}
>Requeue</button
>
{#if j.manga_id}
<button
class="secondary"
onclick={() => requeue({ scope: 'manga', manga_id: j.manga_id! })}
disabled={busy}>Manga</button
>
{/if}
</td>
</tr>
{/each}
</tbody>
</table>
<Pager page={deadPage} totalPages={deadTotalPages} onChange={onDeadPageChange} />
{/if}
</section>
<!-- Restart confirm modal -->
<Modal open={restartModalOpen} title="Restart browser" onClose={() => (restartModalOpen = false)} size="sm">
{#snippet children()}
<p>This relaunches Chromium and re-injects the session cookie.</p>
<ul class="coord">
<li>In-flight jobs are allowed to finish (bounded), then forced.</li>
<li>New jobs pause until the relaunch completes.</li>
<li>The metadata pass yields at its next checkpoint.</li>
</ul>
{/snippet}
{#snippet footer()}
<button onclick={() => (restartModalOpen = false)}>Cancel</button>
<button class="primary" onclick={onConfirmRestart} disabled={busy}>Restart</button>
{/snippet}
</Modal>
<!-- Session modal -->
<Modal open={sessionModalOpen} title="Manage crawler session" onClose={() => (sessionModalOpen = false)} size="md">
{#snippet children()}
<label for="phpsessid">PHPSESSID</label>
<input id="phpsessid" type="password" bind:value={phpsessid} autocomplete="off" />
<p class="hint">
Saving rewrites the cookie everywhere, persists it, restarts the browser, and re-probes.
</p>
{#if sessionResult}
<p class="sessionresult">{sessionResult}</p>
{/if}
{/snippet}
{#snippet footer()}
<button onclick={() => (sessionModalOpen = false)}>Cancel</button>
<button class="primary" onclick={onSaveSession} disabled={busy || phpsessid.trim() === ''}
>Save &amp; validate</button
>
{/snippet}
</Modal>
{#snippet Bar({ percent }: { percent: number })}
<div class="bar" role="progressbar" aria-valuenow={percent} aria-valuemin="0" aria-valuemax="100">
<div class="fill" style:width="{Math.min(100, Math.max(0, percent))}%"></div>
<span class="label">{percent.toFixed(0)}%</span>
</div>
{/snippet}
<style>
h1 {
margin: 0;
}
.titlebar {
display: flex;
align-items: baseline;
gap: var(--space-3);
margin-bottom: var(--space-4);
}
.livedot {
font-size: var(--font-sm);
color: var(--text-muted);
}
.livedot.on {
color: var(--success, #0a7d2c);
}
h2 {
margin: 0 0 var(--space-3) 0;
font-size: var(--font-sm);
color: var(--text-muted);
text-transform: uppercase;
letter-spacing: 0.04em;
}
.hero {
padding: var(--space-4);
border: 1px solid var(--border);
border-radius: var(--radius-md);
background: var(--surface);
margin-bottom: var(--space-4);
}
.pills {
display: flex;
gap: var(--space-4);
flex-wrap: wrap;
margin-bottom: var(--space-3);
}
.pill {
font-size: var(--font-sm);
color: var(--text-muted);
display: inline-flex;
align-items: center;
gap: var(--space-2);
}
.phase {
font-size: var(--font-lg);
font-weight: var(--weight-semibold);
margin: var(--space-2) 0;
}
.lastpass,
.hint {
color: var(--text-muted);
font-size: var(--font-sm);
}
.warn {
color: #92400e;
background: #fef3c7;
border: 1px solid #fcd34d;
padding: var(--space-2) var(--space-3);
border-radius: var(--radius-md);
font-size: var(--font-sm);
}
.controls {
display: flex;
gap: var(--space-2);
flex-wrap: wrap;
margin-bottom: var(--space-4);
}
.grid2 {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(16rem, 1fr));
gap: var(--space-3);
margin-bottom: var(--space-4);
}
article {
padding: var(--space-3);
border: 1px solid var(--border);
border-radius: var(--radius-md);
background: var(--surface);
}
dl {
display: grid;
grid-template-columns: max-content 1fr;
gap: var(--space-1) var(--space-3);
margin: 0;
font-size: var(--font-sm);
}
dt {
color: var(--text-muted);
}
dd {
margin: 0;
font-family: var(--font-mono, monospace);
}
table {
width: 100%;
border-collapse: collapse;
}
th,
td {
padding: var(--space-2);
text-align: left;
border-bottom: 1px solid var(--border);
font-size: var(--font-sm);
}
.actions {
text-align: right;
}
.mono {
font-family: var(--font-mono, monospace);
font-size: var(--font-xs);
}
.err {
max-width: 22rem;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
color: var(--text-muted);
}
.deadhead {
display: flex;
justify-content: space-between;
align-items: center;
gap: var(--space-3);
flex-wrap: wrap;
}
.deadtools {
display: flex;
gap: var(--space-2);
}
button.secondary {
background: var(--surface-elevated);
}
.notice {
color: var(--success, #0a7d2c);
padding: var(--space-2) var(--space-3);
border: 1px solid var(--success, #0a7d2c);
border-radius: var(--radius-md);
margin-bottom: var(--space-3);
}
.sessionresult {
margin-top: var(--space-2);
font-size: var(--font-sm);
}
.coord {
margin: var(--space-2) 0;
padding-left: var(--space-4);
font-size: var(--font-sm);
color: var(--text-muted);
}
/* badges (shared convention with admin/mangas) */
:global(.badge) {
display: inline-block;
padding: 0 var(--space-2);
border-radius: var(--radius-sm, 4px);
font-size: var(--font-xs);
font-weight: var(--weight-semibold);
text-transform: uppercase;
letter-spacing: 0.04em;
border: 1px solid var(--border);
background: var(--surface);
}
:global(.badge-synced) {
background: #dcfce7;
color: #166534;
border-color: #86efac;
}
:global(.badge-in_progress),
:global(.badge-downloading) {
background: #fef3c7;
color: #92400e;
border-color: #fcd34d;
}
:global(.badge-not_downloaded) {
background: var(--surface-elevated);
color: var(--text-muted);
}
.bar {
position: relative;
background: var(--surface-elevated);
border-radius: var(--radius-sm, 4px);
height: 1.5rem;
margin: var(--space-2) 0;
overflow: hidden;
}
.fill {
height: 100%;
background: #22c55e;
transition: width 0.3s ease;
}
.label {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
font-size: var(--font-xs);
font-weight: var(--weight-semibold);
}
.error {
color: var(--danger, #dc2626);
padding: var(--space-2) var(--space-3);
border: 1px solid var(--danger, #dc2626);
border-radius: var(--radius-md);
margin-bottom: var(--space-3);
}
.muted {
color: var(--text-muted);
}
.cover {
font-size: var(--font-sm);
}
.backlog {
margin-top: var(--space-4);
}
.pagecount {
font-family: var(--font-mono, monospace);
font-size: var(--font-xs);
white-space: nowrap;
}
.pagebar {
width: 8rem;
}
table.active td {
vertical-align: middle;
}
</style>

View File

@@ -3,6 +3,7 @@
import {
listAdminMangas,
listAdminChapters,
requeueDeadJobs,
type AdminMangasPage,
type AdminChapterRow,
type MangaSyncState
@@ -59,6 +60,27 @@
function badgeClass(state: string): string {
return `badge badge-${state}`;
}
let requeuingChapter: string | null = $state(null);
/** Requeue the dead job(s) for a single failed chapter, then refresh
* that manga's chapter list so the pill updates. */
async function requeueChapter(mangaId: string, chapterId: string) {
requeuingChapter = chapterId;
error = null;
try {
await requeueDeadJobs({ scope: 'chapter', chapter_id: chapterId });
const resp = await listAdminChapters(mangaId, { limit: 500 });
chaptersByManga[mangaId] = {
items: resp.items,
total: resp.page.total ?? resp.items.length
};
} catch (e) {
error = e instanceof ApiError ? e.message : 'requeue failed';
} finally {
requeuingChapter = null;
}
}
</script>
<h1>Mangas</h1>
@@ -153,6 +175,16 @@
<span class={badgeClass(c.sync_state)}>
{c.sync_state}
</span>
{#if c.sync_state === 'failed'}
<button
class="requeue"
onclick={() => requeueChapter(m.id, c.id)}
disabled={requeuingChapter === c.id}
title="Requeue this chapter"
>
↻ requeue
</button>
{/if}
</td>
</tr>
{/each}
@@ -272,6 +304,11 @@
color: #991b1b;
border-color: #fca5a5;
}
.requeue {
margin-left: var(--space-2);
font-size: var(--font-xs);
padding: 0 var(--space-2);
}
.badge-not_downloaded {
background: var(--surface-elevated);
color: var(--text-muted);

View File

@@ -1,13 +1,16 @@
<script lang="ts">
import { fileUrl } from '$lib/api/client';
import { fileUrl, ApiError } from '$lib/api/client';
import { createBookmark, deleteBookmark, type Bookmark } from '$lib/api/bookmarks';
import {
attachTag,
detachTag,
type AuthorRef,
type GenreRef,
type MangaDetail,
type TagRef
} from '$lib/api/mangas';
import { resyncManga } from '$lib/api/admin';
import { chapterLabel } from '$lib/api/chapters';
import { listTags, type Tag } from '$lib/api/tags';
import { session } from '$lib/session.svelte';
import Chip from '$lib/components/Chip.svelte';
@@ -16,9 +19,15 @@
import FolderPlus from '@lucide/svelte/icons/folder-plus';
import Pencil from '@lucide/svelte/icons/pencil';
import UploadCloud from '@lucide/svelte/icons/upload-cloud';
import RefreshCw from '@lucide/svelte/icons/refresh-cw';
let { data } = $props();
const manga = $derived(data.manga);
// `manga` is locally overridable so a successful force resync can
// swap in the refreshed detail (new cover URL, refreshed status,
// etc.) without a router reload. Falls back to the server-loaded
// data otherwise.
let mangaOverride = $state<MangaDetail | null>(null);
const manga = $derived<MangaDetail>(mangaOverride ?? data.manga);
const chapters = $derived(data.chapters);
const readProgress = $derived(data.readProgress);
/** Chapter row from the local chapters list when present (so we
@@ -37,6 +46,11 @@
continueChapter?.number ?? readProgress?.chapter_number ?? null
);
const continueChapterTitle = $derived(continueChapter?.title ?? null);
const continueLabel = $derived(
continueChapterNumber != null
? chapterLabel({ number: continueChapterNumber, title: continueChapterTitle })
: null
);
const authors = $derived<AuthorRef[]>(manga.authors);
const genres = $derived<GenreRef[]>(manga.genres);
@@ -171,6 +185,31 @@
const statusLabel = $derived(manga.status === 'completed' ? 'Completed' : 'Ongoing');
let collectionModalOpen = $state(false);
// ---- Admin force resync ----
let resyncBusy = $state(false);
let resyncMessage = $state<{ kind: 'ok' | 'err'; text: string } | null>(null);
async function forceResync() {
if (!session.user?.is_admin || resyncBusy) return;
resyncBusy = true;
resyncMessage = null;
try {
const r = await resyncManga(manga.id);
mangaOverride = r.manga;
const coverNote = r.cover_fetched
? ' Cover re-downloaded.'
: ' Cover unchanged.';
resyncMessage = {
kind: 'ok',
text: `Metadata ${r.metadata_status}.${coverNote}`
};
} catch (e) {
const msg = e instanceof ApiError ? e.message : (e as Error).message;
resyncMessage = { kind: 'err', text: msg };
} finally {
resyncBusy = false;
}
}
</script>
<svelte:head>
@@ -344,7 +383,34 @@
<UploadCloud size={16} aria-hidden="true" />
<span>Upload chapter</span>
</a>
{#if session.user.is_admin}
<button
type="button"
class="action"
onclick={forceResync}
disabled={resyncBusy}
title="Refetch metadata + cover from the crawler source"
data-testid="force-resync-manga"
>
<RefreshCw
size={16}
aria-hidden="true"
class={resyncBusy ? 'spin' : ''}
/>
<span>{resyncBusy ? 'Resyncing…' : 'Force resync'}</span>
</button>
{/if}
</div>
{#if resyncMessage}
<p
class="resync-msg"
class:err={resyncMessage.kind === 'err'}
role="status"
data-testid="force-resync-message"
>
{resyncMessage.text}
</p>
{/if}
{:else}
<a class="action" href="/login" data-testid="bookmark-signin">
Sign in to bookmark or collect
@@ -371,7 +437,7 @@
>
<span class="continue-label">Continue reading</span>
<span class="continue-target">
Chapter {continueChapterNumber}{#if continueChapterTitle}: {continueChapterTitle}{/if}
{continueLabel}
{#if readProgress && readProgress.page > 1}
— page {readProgress.page}
{/if}
@@ -385,7 +451,7 @@
{#each chapters as c (c.id)}
<li>
<a href="/manga/{manga.id}/chapter/{c.id}">
Chapter {c.number}{#if c.title}: {c.title}{/if}
{chapterLabel(c)}
</a>
<span class="pages">({c.page_count} pages)</span>
</li>
@@ -586,6 +652,29 @@
color: var(--text);
}
.resync-msg {
margin-top: var(--space-2);
color: var(--text-muted);
font-size: var(--font-sm);
}
.resync-msg.err {
color: var(--danger);
}
:global(.spin) {
animation: spin 0.9s linear infinite;
}
@keyframes spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}
.continue {
display: flex;
flex-direction: column;

View File

@@ -1,10 +1,12 @@
<script lang="ts">
import { onMount, onDestroy } from 'svelte';
import { goto } from '$app/navigation';
import { fileUrl } from '$lib/api/client';
import { goto, invalidateAll } from '$app/navigation';
import { fileUrl, ApiError } from '$lib/api/client';
import { GAP_PX, type ReaderPageGap } from '$lib/api/preferences';
import { preferences } from '$lib/preferences.svelte';
import { updateReadProgress } from '$lib/api/read_progress';
import { chapterLabel } from '$lib/api/chapters';
import { resyncChapter } from '$lib/api/admin';
import { readerFullscreen } from '$lib/reader-fullscreen.svelte';
import { session } from '$lib/session.svelte';
import ChevronLeft from '@lucide/svelte/icons/chevron-left';
@@ -15,6 +17,7 @@
import ScrollText from '@lucide/svelte/icons/scroll-text';
import Maximize2 from '@lucide/svelte/icons/maximize-2';
import Minimize2 from '@lucide/svelte/icons/minimize-2';
import RefreshCw from '@lucide/svelte/icons/refresh-cw';
let { data } = $props();
const manga = $derived(data.manga);
@@ -26,28 +29,25 @@
const gapPx = $derived(GAP_PX[preferences.readerPageGap]);
const pageTitle = $derived(
chapter.title
? `Mangalord | ${manga.title} · Ch. ${chapter.number}: ${chapter.title}`
: `Mangalord | ${manga.title} · Ch. ${chapter.number}`
`Mangalord | ${manga.title} · ${chapterLabel(chapter)}`
);
// Prev/next chapter computed from the chapter list. listChapters
// returns chapters in number ASC order; we still resolve via find
// rather than index because the current chapter's position may
// not be `chapter.number - 1` (sparse numbering / chapter 0.5 /
// future skipped numbers).
const sortedChapters = $derived(
[...chapters].sort((a, b) => a.number - b.number)
);
// returns chapters in display order (reversed source-site order, so
// oldest first — see backend repo::chapter::list_for_manga), and
// prev/next walks that order positionally. Resolving the current
// index via `find` rather than `chapter.number - 1` matters because
// numbers aren't a reliable index: variants share numbers, non-
// numeric entries pin to 0, and uploads can sparse-fill.
const currentIdx = $derived(
sortedChapters.findIndex((c) => c.id === chapter.id)
chapters.findIndex((c) => c.id === chapter.id)
);
const prevChapter = $derived(
currentIdx > 0 ? sortedChapters[currentIdx - 1] : null
currentIdx > 0 ? chapters[currentIdx - 1] : null
);
const nextChapter = $derived(
currentIdx >= 0 && currentIdx < sortedChapters.length - 1
? sortedChapters[currentIdx + 1]
currentIdx >= 0 && currentIdx < chapters.length - 1
? chapters[currentIdx + 1]
: null
);
@@ -256,6 +256,36 @@
if (typeof window !== 'undefined') window.removeEventListener('keydown', onKeydown);
});
// ---- Admin force resync (current chapter) ----
let resyncBusy = $state(false);
let resyncMessage = $state<{ kind: 'ok' | 'err'; text: string } | null>(null);
async function forceResync() {
if (!session.user?.is_admin || resyncBusy) return;
resyncBusy = true;
resyncMessage = null;
try {
const r = await resyncChapter(chapter.id);
if (r.outcome === 'fetched') {
resyncMessage = {
kind: 'ok',
text: `Refetched ${r.pages} page${r.pages === 1 ? '' : 's'}. Reloading…`
};
// Re-run all loaders for this route so the reader picks
// up the freshly-downloaded pages. The page.ts loader
// doesn't `depends()` on anything explicitly, so
// invalidateAll is the right brush here.
await invalidateAll();
} else {
resyncMessage = { kind: 'ok', text: 'No new pages — source had nothing fresh.' };
}
} catch (e) {
const msg = e instanceof ApiError ? e.message : (e as Error).message;
resyncMessage = { kind: 'err', text: msg };
} finally {
resyncBusy = false;
}
}
// ---- Reading progress tracking ----
//
// High-water mark seeded from the server: progress only ever moves
@@ -427,6 +457,27 @@
</a>
<div class="controls" role="group" aria-label="reader options">
<label class="chapter-field">
<span class="visually-hidden">Jump to chapter</span>
<select
class="chapter-select"
value={chapter.id}
onchange={(e) => {
const target = (e.currentTarget as HTMLSelectElement).value;
if (target && target !== chapter.id) {
void goto(`/manga/${manga.id}/chapter/${target}`);
}
}}
data-testid="reader-chapter-select"
>
{#each chapters as c (c.id)}
<option value={c.id}>
{chapterLabel(c)}
</option>
{/each}
</select>
</label>
<div class="mode-toggle" role="radiogroup" aria-label="layout">
<button
type="button"
@@ -481,6 +532,23 @@
{/if}
</span>
{#if session.user?.is_admin}
<button
type="button"
class="reader-resync"
onclick={forceResync}
disabled={resyncBusy}
title={resyncMessage?.kind === 'err'
? resyncMessage.text
: 'Force refetch this chapter from the crawler source'}
aria-label="Force resync chapter"
data-testid="force-resync-chapter"
>
<RefreshCw size={16} aria-hidden="true" class={resyncBusy ? 'spin' : ''} />
<span>{resyncBusy ? 'Resyncing…' : 'Force resync'}</span>
</button>
{/if}
<button
type="button"
class="fullscreen-toggle"
@@ -494,6 +562,17 @@
</button>
</nav>
{#if resyncMessage}
<p
class="resync-toast"
class:err={resyncMessage.kind === 'err'}
role="status"
data-testid="force-resync-message"
>
{resyncMessage.text}
</p>
{/if}
<!--
Floating exit affordance — only rendered while focus mode is on.
Lives in the top-right corner with a low resting opacity so it
@@ -604,7 +683,7 @@
</span>
</button>
<span class="chapter-bar-current" aria-hidden="true">
Ch. {chapter.number}{#if chapter.title}{chapter.title}{/if}
{chapterLabel(chapter)}
</span>
<button
type="button"
@@ -741,7 +820,8 @@
outline-offset: -2px;
}
.gap-field select {
.gap-field select,
.chapter-select {
height: 32px;
padding: 0 var(--space-2);
background: var(--surface);
@@ -751,6 +831,13 @@
font-size: var(--font-sm);
}
/* Cap the chapter dropdown's resting width so long titles don't
push the rest of the nav off-screen; the native control's
expanded menu still shows full option text on focus. */
.chapter-select {
max-width: 16rem;
}
.visually-hidden {
position: absolute;
width: 1px;
@@ -911,7 +998,8 @@
}
/* ===== Focus-mode controls ===== */
.fullscreen-toggle {
.fullscreen-toggle,
.reader-resync {
display: inline-flex;
align-items: center;
gap: var(--space-1);
@@ -925,12 +1013,52 @@
font-size: var(--font-xs);
}
.fullscreen-toggle:hover {
.fullscreen-toggle:hover,
.reader-resync:hover:not(:disabled) {
background: var(--surface-elevated);
color: var(--text);
border-color: var(--primary);
}
.reader-resync:disabled {
opacity: 0.7;
cursor: progress;
}
.resync-toast {
position: fixed;
top: calc(var(--app-header-h) + var(--reader-nav-h, 48px) + var(--space-2));
right: var(--space-3);
z-index: 11;
margin: 0;
padding: var(--space-2) var(--space-3);
max-width: min(420px, calc(100vw - 2 * var(--space-3)));
background: var(--surface);
color: var(--text);
border: 1px solid var(--primary);
border-radius: var(--radius-md);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.12);
font-size: var(--font-sm);
}
.resync-toast.err {
border-color: var(--danger);
color: var(--danger);
}
:global(.spin) {
animation: spin 0.9s linear infinite;
}
@keyframes spin {
from {
transform: rotate(0deg);
}
to {
transform: rotate(360deg);
}
}
/* Small floating exit affordance — corner-pinned, low resting
opacity so it doesn't sit on the chapter image too aggressively
but is still findable without hover. */

View File

@@ -1,5 +1,6 @@
<script lang="ts">
import { fileUrl } from '$lib/api/client';
import { chapterLabel } from '$lib/api/chapters';
import { clearReadProgress, type ReadProgressSummary } from '$lib/api/read_progress';
import BookImage from '@lucide/svelte/icons/book-image';
import Trash2 from '@lucide/svelte/icons/trash-2';
@@ -186,7 +187,7 @@
<a href="/manga/{u.manga_id}" class="title">{u.manga_title}</a>
<span class="target">
<a href="/manga/{u.manga_id}/chapter/{u.chapter.id}">
Chapter {u.chapter.number}{#if u.chapter.title}: {u.chapter.title}{/if}
{chapterLabel(u.chapter)}
</a>
<span class="muted">({u.chapter.page_count} pages)</span>
</span>