Compare commits
10 Commits
docs/hando
...
feat/crawl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e02d125f51 | ||
|
|
fb4182f68d | ||
|
|
da6e320836 | ||
|
|
832042d2b7 | ||
|
|
ec0a8f2b5d | ||
|
|
6f0a8d88c9 | ||
|
|
41bf9455a1 | ||
|
|
cd0a1e13a9 | ||
|
|
3f91bea768 | ||
|
|
7a6815661f |
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
||||
|
||||
[[package]]
|
||||
name = "mangalord"
|
||||
version = "0.52.0"
|
||||
version = "0.54.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mangalord"
|
||||
version = "0.52.0"
|
||||
version = "0.54.0"
|
||||
edition = "2021"
|
||||
default-run = "mangalord"
|
||||
|
||||
|
||||
491
backend/src/api/admin/crawler.rs
Normal file
491
backend/src/api/admin/crawler.rs
Normal file
@@ -0,0 +1,491 @@
|
||||
//! Admin-only crawler observability + control endpoints.
|
||||
//!
|
||||
//! Mounted under `/api/v1/admin/crawler*`, cookie-only via `RequireAdmin`.
|
||||
//! All control endpoints return 503 when the crawler daemon is disabled
|
||||
//! (`AppState.crawler == None`). Reads compose the live in-process status
|
||||
//! ([`crate::crawler::status`]) with DB-derived queue counts and the
|
||||
//! session/browser flags.
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::{Query, State};
|
||||
use axum::response::sse::{Event, KeepAlive, Sse};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Json, Router};
|
||||
use futures_util::stream::Stream;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::app::{AppState, CrawlerControl};
|
||||
use crate::auth::extractor::RequireAdmin;
|
||||
use crate::crawler::browser_manager::RestartPhase;
|
||||
use crate::crawler::status::{ActiveChapter, CoverTarget, LastPass, Phase};
|
||||
use crate::error::{AppError, AppResult};
|
||||
use crate::repo;
|
||||
use crate::repo::crawler::{ActiveJob, DeadJob, MissingCoverRow, RequeueScope};
|
||||
|
||||
/// Backstop recompose interval for the SSE stream. Phase/worker/session
|
||||
/// changes push instantly via the status `watch`; this only bounds the
|
||||
/// staleness of DB-derived queue counts and the browser phase when those
|
||||
/// change without an accompanying status poke.
|
||||
const SSE_BACKSTOP: Duration = Duration::from_secs(5);
|
||||
|
||||
pub fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/admin/crawler", get(get_status))
|
||||
.route("/admin/crawler/stream", get(stream_status))
|
||||
.route("/admin/crawler/run", post(run_now))
|
||||
.route("/admin/crawler/browser/restart", post(restart_browser))
|
||||
.route("/admin/crawler/session", post(update_session))
|
||||
.route(
|
||||
"/admin/crawler/session/clear-expired",
|
||||
post(clear_session_expired),
|
||||
)
|
||||
.route("/admin/crawler/dead-jobs", get(list_dead_jobs))
|
||||
.route("/admin/crawler/dead-jobs/requeue", post(requeue_dead_jobs))
|
||||
.route("/admin/crawler/active-jobs", get(list_active_jobs))
|
||||
.route("/admin/crawler/covers", get(list_covers))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// GET /admin/crawler — live status
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct QueueCounts {
|
||||
pending: i64,
|
||||
running: i64,
|
||||
dead: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SessionStatus {
|
||||
/// Whether the sticky session-expired flag is set (chapter workers idle).
|
||||
expired: bool,
|
||||
/// Whether a PHPSESSID is currently configured at all.
|
||||
configured: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct CrawlerStatusResponse {
|
||||
/// `"running"` | `"disabled"`.
|
||||
daemon: &'static str,
|
||||
phase: Option<Phase>,
|
||||
/// Configured chapter-worker count (for "N busy / M workers").
|
||||
worker_count: usize,
|
||||
/// Chapters being crawled right now, with live page counts.
|
||||
active_chapters: Vec<ActiveChapter>,
|
||||
/// The cover being fetched right now, if any.
|
||||
current_cover: Option<CoverTarget>,
|
||||
/// Mangas still queued for a cover fetch.
|
||||
covers_queued: i64,
|
||||
last_pass: LastPass,
|
||||
session: SessionStatus,
|
||||
/// `"healthy"` | `"draining"` | `"restarting"` | `"down"`.
|
||||
browser: &'static str,
|
||||
queue: QueueCounts,
|
||||
}
|
||||
|
||||
fn browser_phase_str(p: RestartPhase) -> &'static str {
|
||||
match p {
|
||||
RestartPhase::Healthy => "healthy",
|
||||
RestartPhase::Draining => "draining",
|
||||
RestartPhase::Restarting => "restarting",
|
||||
}
|
||||
}
|
||||
|
||||
/// Compose a full status snapshot from the in-memory status, the
|
||||
/// browser/session flags, and a fresh DB queue-count query. Shared by the
|
||||
/// one-shot `get_status` and the SSE `stream_status`.
|
||||
async fn compose_status(state: &AppState) -> AppResult<CrawlerStatusResponse> {
|
||||
let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?;
|
||||
let queue = QueueCounts {
|
||||
pending,
|
||||
running,
|
||||
dead,
|
||||
};
|
||||
let covers_queued = repo::crawler::count_missing_covers(&state.db).await?;
|
||||
|
||||
Ok(match state.crawler.as_ref() {
|
||||
None => CrawlerStatusResponse {
|
||||
daemon: "disabled",
|
||||
phase: None,
|
||||
worker_count: 0,
|
||||
active_chapters: Vec::new(),
|
||||
current_cover: None,
|
||||
covers_queued,
|
||||
last_pass: LastPass::default(),
|
||||
session: SessionStatus {
|
||||
expired: false,
|
||||
configured: false,
|
||||
},
|
||||
browser: "down",
|
||||
queue,
|
||||
},
|
||||
Some(c) => {
|
||||
let snap = c.status.snapshot().await;
|
||||
CrawlerStatusResponse {
|
||||
daemon: "running",
|
||||
phase: Some(snap.phase),
|
||||
worker_count: snap.worker_count,
|
||||
active_chapters: snap.active_chapters,
|
||||
current_cover: snap.current_cover,
|
||||
covers_queued,
|
||||
last_pass: snap.last_pass,
|
||||
session: SessionStatus {
|
||||
expired: c.session.is_expired(),
|
||||
configured: c.session.current().await.is_some(),
|
||||
},
|
||||
browser: browser_phase_str(c.browser_manager.phase()),
|
||||
queue,
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_status(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
) -> AppResult<Json<CrawlerStatusResponse>> {
|
||||
Ok(Json(compose_status(&state).await?))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// GET /admin/crawler/stream — Server-Sent Events live status
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Push live status to the dashboard instead of polling. Emits a snapshot
|
||||
/// immediately on connect, then on every status change (instant, via the
|
||||
/// `watch` notifier) and on a [`SSE_BACKSTOP`] tick (to refresh DB queue
|
||||
/// counts / browser phase that change without a status poke). The browser
|
||||
/// opens this only while the crawler page is mounted and closes it on
|
||||
/// navigate-away, so the subscription is scoped to the active page.
|
||||
async fn stream_status(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||
// Subscribe before the first emit so no change between the initial
|
||||
// snapshot and the first await is lost.
|
||||
let rx = state.crawler.as_ref().map(|c| c.status.subscribe());
|
||||
|
||||
let stream = futures_util::stream::unfold(
|
||||
(state, rx, true),
|
||||
|(state, mut rx, first)| async move {
|
||||
// After the first immediate emit, wait for a change or the
|
||||
// backstop tick before recomposing.
|
||||
if !first {
|
||||
match rx.as_mut() {
|
||||
Some(rx) => {
|
||||
tokio::select! {
|
||||
_ = rx.changed() => {}
|
||||
_ = tokio::time::sleep(SSE_BACKSTOP) => {}
|
||||
}
|
||||
}
|
||||
None => tokio::time::sleep(SSE_BACKSTOP).await,
|
||||
}
|
||||
}
|
||||
// Compose; on a transient DB error, emit a keep-alive comment
|
||||
// rather than tearing down the stream.
|
||||
let event = match compose_status(&state).await {
|
||||
Ok(resp) => Event::default()
|
||||
.event("status")
|
||||
.json_data(&resp)
|
||||
.unwrap_or_else(|_| Event::default().comment("serialize error")),
|
||||
Err(_) => Event::default().comment("status unavailable"),
|
||||
};
|
||||
Some((Ok(event), (state, rx, false)))
|
||||
},
|
||||
);
|
||||
|
||||
Sse::new(stream).keep_alive(KeepAlive::default())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/run — trigger an out-of-cycle metadata pass
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RunResponse {
|
||||
started: bool,
|
||||
}
|
||||
|
||||
async fn run_now(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<RunResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
let mp = c.metadata_pass.as_ref().ok_or_else(|| {
|
||||
AppError::ServiceUnavailable("no source configured (CRAWLER_START_URL unset)".into())
|
||||
})?;
|
||||
let mp = std::sync::Arc::clone(mp);
|
||||
// Fire-and-forget: the pass can run for minutes; the dashboard polls
|
||||
// status for progress. Overlap with the daily cron is rare (daily) and
|
||||
// both serialise on the single browser lease.
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = mp.run().await {
|
||||
tracing::warn!(error = ?e, "manual metadata pass failed");
|
||||
}
|
||||
});
|
||||
repo::admin_audit::insert(&state.db, admin.0.id, "crawler_run", "crawler", None, json!({}))
|
||||
.await?;
|
||||
Ok(Json(RunResponse { started: true }))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/browser/restart — coordinated restart
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RestartResponse {
|
||||
ok: bool,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn restart_browser(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<RestartResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
let result = c.browser_manager.coordinated_restart(c.drain_deadline).await;
|
||||
// A successful coordinated_restart re-runs on_launch, which re-injects
|
||||
// PHPSESSID and re-probes — i.e. the session is live. Drop the sticky
|
||||
// `session_expired` flag so chapter workers stop idling without
|
||||
// requiring a second click on "Clear expired".
|
||||
if result.is_ok() {
|
||||
c.session.clear_expired();
|
||||
}
|
||||
// Push the post-restart browser phase to live subscribers immediately.
|
||||
c.status.poke();
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_browser_restart",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "ok": result.is_ok() }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(match result {
|
||||
Ok(()) => RestartResponse {
|
||||
ok: true,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => RestartResponse {
|
||||
ok: false,
|
||||
error: Some(format!("{e:#}")),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// POST /admin/crawler/session — refresh PHPSESSID
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct UpdateSessionRequest {
|
||||
phpsessid: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct UpdateSessionResponse {
|
||||
/// Whether the post-update browser relaunch + session probe succeeded.
|
||||
valid: bool,
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn update_session(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Json(body): Json<UpdateSessionRequest>,
|
||||
) -> AppResult<Json<UpdateSessionResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
c.session
|
||||
.update(&body.phpsessid)
|
||||
.await
|
||||
.map_err(|e| AppError::InvalidInput(format!("{e:#}")))?;
|
||||
// Relaunch the browser so on_launch re-injects the new cookie and
|
||||
// re-probes — the restart's success IS the session-validity signal.
|
||||
let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await;
|
||||
// Session + browser state changed — push to live subscribers.
|
||||
c.status.poke();
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_session_update",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "valid": probe.is_ok() }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(match probe {
|
||||
Ok(()) => UpdateSessionResponse {
|
||||
valid: true,
|
||||
error: None,
|
||||
},
|
||||
Err(e) => UpdateSessionResponse {
|
||||
valid: false,
|
||||
error: Some(format!("{e:#}")),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct ClearExpiredResponse {
|
||||
cleared: bool,
|
||||
}
|
||||
|
||||
async fn clear_session_expired(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
) -> AppResult<Json<ClearExpiredResponse>> {
|
||||
let c = require_crawler(&state)?;
|
||||
c.session.clear_expired();
|
||||
// session.expired flipped — push to live subscribers.
|
||||
c.status.poke();
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_session_clear_expired",
|
||||
"crawler",
|
||||
None,
|
||||
json!({}),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(ClearExpiredResponse { cleared: true }))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dead jobs
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
struct DeadJobsParams {
|
||||
#[serde(default)]
|
||||
search: Option<String>,
|
||||
#[serde(default = "default_limit")]
|
||||
limit: i64,
|
||||
#[serde(default)]
|
||||
offset: i64,
|
||||
}
|
||||
|
||||
fn default_limit() -> i64 {
|
||||
50
|
||||
}
|
||||
|
||||
async fn list_dead_jobs(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
Query(params): Query<DeadJobsParams>,
|
||||
) -> AppResult<Json<crate::api::pagination::PagedResponse<DeadJob>>> {
|
||||
let limit = params.limit.clamp(1, 200);
|
||||
let offset = params.offset.max(0);
|
||||
let search = params.search.filter(|s| !s.trim().is_empty());
|
||||
let (items, total) =
|
||||
repo::crawler::list_dead_jobs(&state.db, search.as_deref(), limit, offset).await?;
|
||||
Ok(Json(crate::api::pagination::PagedResponse::with_total(
|
||||
items, limit, offset, total,
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(tag = "scope", rename_all = "snake_case")]
|
||||
enum RequeueRequest {
|
||||
All,
|
||||
Manga { manga_id: Uuid },
|
||||
Chapter { chapter_id: Uuid },
|
||||
Job { job_id: Uuid },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct RequeueResponse {
|
||||
requeued: u64,
|
||||
}
|
||||
|
||||
async fn requeue_dead_jobs(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Json(body): Json<RequeueRequest>,
|
||||
) -> AppResult<Json<RequeueResponse>> {
|
||||
let scope = match &body {
|
||||
RequeueRequest::All => RequeueScope::All,
|
||||
RequeueRequest::Manga { manga_id } => RequeueScope::Manga(*manga_id),
|
||||
RequeueRequest::Chapter { chapter_id } => RequeueScope::Chapter(*chapter_id),
|
||||
RequeueRequest::Job { job_id } => RequeueScope::Job(*job_id),
|
||||
};
|
||||
let requeued = repo::crawler::requeue_dead_jobs(&state.db, scope).await?;
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"crawler_dead_jobs_requeue",
|
||||
"crawler",
|
||||
None,
|
||||
json!({ "requeued": requeued, "scope": scope_label(&body) }),
|
||||
)
|
||||
.await?;
|
||||
Ok(Json(RequeueResponse { requeued }))
|
||||
}
|
||||
|
||||
fn scope_label(r: &RequeueRequest) -> &'static str {
|
||||
match r {
|
||||
RequeueRequest::All => "all",
|
||||
RequeueRequest::Manga { .. } => "manga",
|
||||
RequeueRequest::Chapter { .. } => "chapter",
|
||||
RequeueRequest::Job { .. } => "job",
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Queued-chapters + queued-covers backlogs (paginated, fetched on demand)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Pagination + title-search params shared by the backlog list endpoints.
|
||||
#[derive(Debug, Deserialize, Default)]
|
||||
struct ListParams {
|
||||
#[serde(default)]
|
||||
search: Option<String>,
|
||||
#[serde(default = "default_limit")]
|
||||
limit: i64,
|
||||
#[serde(default)]
|
||||
offset: i64,
|
||||
}
|
||||
|
||||
async fn list_active_jobs(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
Query(params): Query<ListParams>,
|
||||
) -> AppResult<Json<crate::api::pagination::PagedResponse<ActiveJob>>> {
|
||||
let limit = params.limit.clamp(1, 200);
|
||||
let offset = params.offset.max(0);
|
||||
let search = params.search.filter(|s| !s.trim().is_empty());
|
||||
let (items, total) =
|
||||
repo::crawler::list_active_jobs(&state.db, search.as_deref(), limit, offset).await?;
|
||||
Ok(Json(crate::api::pagination::PagedResponse::with_total(
|
||||
items, limit, offset, total,
|
||||
)))
|
||||
}
|
||||
|
||||
async fn list_covers(
|
||||
State(state): State<AppState>,
|
||||
_admin: RequireAdmin,
|
||||
Query(params): Query<ListParams>,
|
||||
) -> AppResult<Json<crate::api::pagination::PagedResponse<MissingCoverRow>>> {
|
||||
let limit = params.limit.clamp(1, 200);
|
||||
let offset = params.offset.max(0);
|
||||
let search = params.search.filter(|s| !s.trim().is_empty());
|
||||
let (items, total) =
|
||||
repo::crawler::list_missing_cover_mangas(&state.db, search.as_deref(), limit, offset)
|
||||
.await?;
|
||||
Ok(Json(crate::api::pagination::PagedResponse::with_total(
|
||||
items, limit, offset, total,
|
||||
)))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn require_crawler(state: &AppState) -> Result<&std::sync::Arc<CrawlerControl>, AppError> {
|
||||
state.crawler.as_ref().ok_or_else(|| {
|
||||
AppError::ServiceUnavailable("crawler daemon is disabled".into())
|
||||
})
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
//! bot/API tokens cannot reach admin routes (see
|
||||
//! `crate::auth::extractor::RequireAdmin`).
|
||||
|
||||
pub mod crawler;
|
||||
pub mod mangas;
|
||||
pub mod resync;
|
||||
pub mod system;
|
||||
@@ -19,4 +20,5 @@ pub fn routes() -> Router<AppState> {
|
||||
.merge(mangas::routes())
|
||||
.merge(resync::routes())
|
||||
.merge(system::routes())
|
||||
.merge(crawler::routes())
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
@@ -46,6 +46,24 @@ pub struct AppState {
|
||||
/// same wiring that builds the daemon's chapter dispatcher, so a
|
||||
/// force resync uses the daemon's BrowserManager + rate limiters.
|
||||
pub resync: Option<Arc<dyn ResyncService>>,
|
||||
/// Crawler observability + control handle (live status, coordinated
|
||||
/// browser restart, runtime session, manual run). `None` when the
|
||||
/// daemon is disabled; admin handlers gate on `.is_some()` → 503.
|
||||
pub crawler: Option<Arc<CrawlerControl>>,
|
||||
}
|
||||
|
||||
/// Shared handle the admin crawler endpoints use to observe and control
|
||||
/// the running daemon. Bundled so the handlers take one optional field on
|
||||
/// `AppState` rather than many.
|
||||
pub struct CrawlerControl {
|
||||
pub browser_manager: Arc<BrowserManager>,
|
||||
pub session: Arc<crate::crawler::session_control::SessionController>,
|
||||
pub status: crate::crawler::status::StatusHandle,
|
||||
/// Used by the "run metadata pass now" endpoint; `None` when no
|
||||
/// `CRAWLER_START_URL` is configured (cron disabled).
|
||||
pub metadata_pass: Option<Arc<dyn MetadataPass>>,
|
||||
/// Drain budget for a manually-triggered coordinated browser restart.
|
||||
pub drain_deadline: std::time::Duration,
|
||||
}
|
||||
|
||||
/// Bundle returned by [`build`]. The router is what `axum::serve` consumes;
|
||||
@@ -80,12 +98,12 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
|
||||
|
||||
let storage: Arc<dyn Storage> = Arc::new(LocalStorage::new(config.storage_dir.clone()));
|
||||
|
||||
let (daemon, resync) = if config.crawler.daemon_enabled {
|
||||
let (daemon, resync, crawler) = if config.crawler.daemon_enabled {
|
||||
let spawned = spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?;
|
||||
(Some(spawned.handle), Some(spawned.resync))
|
||||
(Some(spawned.handle), Some(spawned.resync), Some(spawned.crawler))
|
||||
} else {
|
||||
tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)");
|
||||
(None, None)
|
||||
(None, None, None)
|
||||
};
|
||||
|
||||
let auth_limiter = Arc::new(AuthRateLimiter::new(config.auth.rate_limit));
|
||||
@@ -96,6 +114,7 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
|
||||
upload: config.upload.clone(),
|
||||
auth_limiter,
|
||||
resync,
|
||||
crawler,
|
||||
};
|
||||
let router = router(state).layer(cors_layer(&config.cors_allowed_origins));
|
||||
Ok(AppHandle { router, daemon })
|
||||
@@ -108,6 +127,7 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
|
||||
struct SpawnedDaemon {
|
||||
handle: daemon::DaemonHandle,
|
||||
resync: Arc<dyn ResyncService>,
|
||||
crawler: Arc<CrawlerControl>,
|
||||
}
|
||||
|
||||
async fn spawn_crawler_daemon(
|
||||
@@ -115,11 +135,17 @@ async fn spawn_crawler_daemon(
|
||||
storage: Arc<dyn Storage>,
|
||||
cfg: &CrawlerConfig,
|
||||
) -> anyhow::Result<SpawnedDaemon> {
|
||||
// Reqwest client with cookie jar pre-seeded so CDN image fetches
|
||||
// include PHPSESSID. Same shape as bin/crawler.rs main().
|
||||
// Reqwest client with a shared cookie jar so CDN image fetches include
|
||||
// PHPSESSID. The same `Arc<Jar>` is held by the SessionController, so a
|
||||
// runtime session refresh rewrites it in place. Initial value: a
|
||||
// persisted runtime session (survives restart) takes precedence over
|
||||
// CRAWLER_PHPSESSID env.
|
||||
let cookie_jar = Arc::new(reqwest::cookie::Jar::default());
|
||||
let initial_sid = crate::crawler::session_control::SessionController::load_persisted(&db)
|
||||
.await
|
||||
.or_else(|| cfg.phpsessid.clone());
|
||||
if let (Some(sid), Some(domain), Some(start_url)) =
|
||||
(&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url)
|
||||
(&initial_sid, &cfg.cookie_domain, &cfg.start_url)
|
||||
{
|
||||
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
|
||||
let seed_url = reqwest::Url::parse(start_url)
|
||||
@@ -129,7 +155,7 @@ async fn spawn_crawler_daemon(
|
||||
let mut http_builder = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.no_proxy()
|
||||
.cookie_provider(cookie_jar);
|
||||
.cookie_provider(Arc::clone(&cookie_jar));
|
||||
if let Some(ua) = &cfg.user_agent {
|
||||
http_builder = http_builder.user_agent(ua);
|
||||
}
|
||||
@@ -157,6 +183,23 @@ async fn spawn_crawler_daemon(
|
||||
}
|
||||
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
|
||||
|
||||
// Session controller + sticky session-expired flag. Created before the
|
||||
// browser so the on_launch hook can read the *current* session value
|
||||
// (rather than a value captured at startup), and so a runtime refresh
|
||||
// updates the cookie everywhere.
|
||||
let session_expired = Arc::new(AtomicBool::new(false));
|
||||
let session_controller = crate::crawler::session_control::SessionController::new(
|
||||
initial_sid,
|
||||
Arc::clone(&cookie_jar),
|
||||
cfg.cookie_domain.clone(),
|
||||
cfg.start_url.clone(),
|
||||
db.clone(),
|
||||
Arc::clone(&session_expired),
|
||||
);
|
||||
|
||||
// Live status surface, sized to the worker count.
|
||||
let status = crate::crawler::status::StatusHandle::new(cfg.chapter_workers);
|
||||
|
||||
// Browser manager. on_launch re-injects PHPSESSID on every fresh
|
||||
// chromium spawn so an idle teardown followed by re-launch stays
|
||||
// authenticated without operator action.
|
||||
@@ -165,18 +208,25 @@ async fn spawn_crawler_daemon(
|
||||
let chromium_proxy = crate::crawler::url_utils::chromium_proxy_arg(proxy);
|
||||
launch_opts.extra_args.push(format!("--proxy-server={chromium_proxy}"));
|
||||
}
|
||||
let on_launch = match (&cfg.phpsessid, &cfg.cookie_domain, &cfg.start_url) {
|
||||
(Some(sid), Some(domain), Some(start_url)) => {
|
||||
let sid = sid.clone();
|
||||
let on_launch = match (&cfg.cookie_domain, &cfg.start_url) {
|
||||
(Some(domain), Some(start_url)) => {
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url.clone();
|
||||
let tor_for_launch = tor.as_ref().map(Arc::clone);
|
||||
let sc = Arc::clone(&session_controller);
|
||||
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
|
||||
let sid = sid.clone();
|
||||
let domain = domain.clone();
|
||||
let start_url = start_url.clone();
|
||||
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
|
||||
let sc = Arc::clone(&sc);
|
||||
Box::pin(async move {
|
||||
// Read the *current* session each launch so a runtime
|
||||
// refresh is picked up on the next (re)launch. No session
|
||||
// configured → run unauthenticated (metadata needs no auth).
|
||||
let Some(sid) = sc.current().await else {
|
||||
tracing::info!("on_launch: no session set — skipping inject + probe");
|
||||
return Ok(());
|
||||
};
|
||||
session::inject_phpsessid(&browser, &sid, &domain)
|
||||
.await
|
||||
.context("on_launch: inject_phpsessid")?;
|
||||
@@ -197,8 +247,6 @@ async fn spawn_crawler_daemon(
|
||||
};
|
||||
let browser_manager = BrowserManager::new(launch_opts, cfg.idle_timeout, on_launch);
|
||||
|
||||
let session_expired = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
|
||||
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
|
||||
browser_manager: Arc::clone(&browser_manager),
|
||||
@@ -210,6 +258,8 @@ async fn spawn_crawler_daemon(
|
||||
manga_limit: cfg.manga_limit,
|
||||
download_allowlist: cfg.download_allowlist.clone(),
|
||||
max_image_bytes: cfg.max_image_bytes,
|
||||
metadata_max_consecutive_failures: cfg.metadata_max_consecutive_failures,
|
||||
status: status.clone(),
|
||||
tor: tor.as_ref().map(Arc::clone),
|
||||
});
|
||||
m
|
||||
@@ -223,6 +273,10 @@ async fn spawn_crawler_daemon(
|
||||
rate: Arc::clone(&rate),
|
||||
download_allowlist: cfg.download_allowlist.clone(),
|
||||
max_image_bytes: cfg.max_image_bytes,
|
||||
transient_failures: Arc::new(AtomicU32::new(0)),
|
||||
restart_threshold: cfg.browser_restart_threshold,
|
||||
drain_deadline: cfg.job_timeout,
|
||||
status: status.clone(),
|
||||
tor: tor.as_ref().map(Arc::clone),
|
||||
});
|
||||
|
||||
@@ -260,20 +314,31 @@ async fn spawn_crawler_daemon(
|
||||
db,
|
||||
cancel,
|
||||
DaemonConfig {
|
||||
metadata_pass,
|
||||
metadata_pass: metadata_pass.clone(),
|
||||
dispatcher,
|
||||
chapter_workers: cfg.chapter_workers,
|
||||
daily_at: cfg.daily_at,
|
||||
tz: cfg.tz,
|
||||
retention_days: cfg.retention_days,
|
||||
session_expired,
|
||||
status: status.clone(),
|
||||
job_timeout: cfg.job_timeout,
|
||||
extra_tasks: vec![reaper_task, shutdown_task],
|
||||
},
|
||||
);
|
||||
|
||||
let crawler = Arc::new(CrawlerControl {
|
||||
browser_manager: Arc::clone(&browser_manager),
|
||||
session: session_controller,
|
||||
status,
|
||||
metadata_pass,
|
||||
drain_deadline: cfg.job_timeout,
|
||||
});
|
||||
|
||||
Ok(SpawnedDaemon {
|
||||
handle: daemon_handle,
|
||||
resync,
|
||||
crawler,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -292,6 +357,8 @@ struct RealMetadataPass {
|
||||
manga_limit: usize,
|
||||
download_allowlist: DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
metadata_max_consecutive_failures: u32,
|
||||
status: crate::crawler::status::StatusHandle,
|
||||
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
||||
}
|
||||
|
||||
@@ -309,6 +376,8 @@ impl MetadataPass for RealMetadataPass {
|
||||
false,
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
self.metadata_max_consecutive_failures,
|
||||
Some(&self.status),
|
||||
self.tor.as_deref(),
|
||||
)
|
||||
.await;
|
||||
@@ -321,7 +390,8 @@ impl MetadataPass for RealMetadataPass {
|
||||
// errored — the early-stop walk can complete its work and bail
|
||||
// late, and a transient browser failure shouldn't cancel the
|
||||
// residual cover backlog. The backfill has its own per-call cap
|
||||
// so a runaway error stream can't monopolise the tick.
|
||||
// so a runaway error stream can't monopolise the tick. It sets the
|
||||
// CoverBackfill{index,total} phase + current_cover per entry.
|
||||
match pipeline::backfill_missing_covers(
|
||||
&self.browser_manager,
|
||||
&self.db,
|
||||
@@ -331,6 +401,7 @@ impl MetadataPass for RealMetadataPass {
|
||||
pipeline::COVER_BACKFILL_DEFAULT_MAX,
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
Some(&self.status),
|
||||
self.tor.as_deref(),
|
||||
)
|
||||
.await
|
||||
@@ -359,6 +430,16 @@ struct RealChapterDispatcher {
|
||||
rate: Arc<HostRateLimiters>,
|
||||
download_allowlist: DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
/// Consecutive transient chapter failures; resets on any success.
|
||||
/// Drives the automatic coordinated browser restart.
|
||||
transient_failures: Arc<std::sync::atomic::AtomicU32>,
|
||||
/// Consecutive-failure count that triggers an auto restart.
|
||||
restart_threshold: u32,
|
||||
/// How long a coordinated restart waits for in-flight leases to drain.
|
||||
drain_deadline: std::time::Duration,
|
||||
/// Live status surface — the dispatcher registers each chapter it
|
||||
/// crawls (with a realtime page count) here.
|
||||
status: crate::crawler::status::StatusHandle,
|
||||
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
||||
}
|
||||
|
||||
@@ -374,10 +455,21 @@ impl ChapterDispatcher for RealChapterDispatcher {
|
||||
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
|
||||
.await
|
||||
.context("look up chapter for dispatch")?;
|
||||
let Some((manga_id, source_url)) = row else {
|
||||
let Some((manga_id, source_url, manga_title, chapter_number)) = row else {
|
||||
// Chapter (or its source row) is gone — ack done.
|
||||
return Ok(SyncOutcome::Skipped);
|
||||
};
|
||||
// Register the chapter as crawling now (live status). The
|
||||
// guard removes it on every exit path — success, panic, or
|
||||
// the worker's outer-timeout drop.
|
||||
let _active = self.status.begin_chapter(crate::crawler::status::ActiveChapter {
|
||||
manga_id,
|
||||
manga_title,
|
||||
chapter_id,
|
||||
chapter_number,
|
||||
pages_done: 0,
|
||||
pages_total: None,
|
||||
});
|
||||
let lease = self.browser_manager.acquire().await?;
|
||||
let result = content::sync_chapter_content(
|
||||
&lease,
|
||||
@@ -392,14 +484,37 @@ impl ChapterDispatcher for RealChapterDispatcher {
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
self.tor.as_deref(),
|
||||
Some(&self.status),
|
||||
)
|
||||
.await;
|
||||
drop(lease);
|
||||
match result {
|
||||
Ok(outcome) => Ok(outcome),
|
||||
Ok(outcome) => {
|
||||
// Any successful dispatch (including a clean Skipped)
|
||||
// means the browser is healthy — reset the streak.
|
||||
self.transient_failures.store(0, Ordering::Release);
|
||||
Ok(outcome)
|
||||
}
|
||||
Err(e) => {
|
||||
let streak = self.transient_failures.fetch_add(1, Ordering::AcqRel) + 1;
|
||||
if crate::crawler::nav::anyhow_looks_browser_dead(&e) {
|
||||
// Hard browser-dead: lazy invalidate (next acquire
|
||||
// relaunches). Reset the streak — we're recovering.
|
||||
self.browser_manager.invalidate().await;
|
||||
self.transient_failures.store(0, Ordering::Release);
|
||||
} else if self.restart_threshold > 0 && streak >= self.restart_threshold {
|
||||
// Persistent transients that TOR recircuit couldn't
|
||||
// fix — proactively restart Chromium.
|
||||
tracing::warn!(
|
||||
streak,
|
||||
threshold = self.restart_threshold,
|
||||
"auto browser restart: consecutive transient chapter failures"
|
||||
);
|
||||
let _ = self
|
||||
.browser_manager
|
||||
.coordinated_restart(self.drain_deadline)
|
||||
.await;
|
||||
self.transient_failures.store(0, Ordering::Release);
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
|
||||
@@ -303,6 +303,11 @@ async fn run(
|
||||
skip_chapters,
|
||||
allowlist.as_ref(),
|
||||
max_image_bytes,
|
||||
// Circuit-breaker disabled for the operator-driven CLI: a manual
|
||||
// sweep should push through transient failures, not self-abort.
|
||||
0,
|
||||
// No live status surface for the one-shot CLI.
|
||||
None,
|
||||
tor.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
@@ -412,6 +417,8 @@ async fn sync_bookmarked_chapter_content(
|
||||
allowlist.as_ref(),
|
||||
max_image_bytes,
|
||||
tor.as_deref(),
|
||||
// CLI one-shot — no live status surface.
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
drop(lease);
|
||||
|
||||
@@ -132,6 +132,19 @@ pub struct CrawlerConfig {
|
||||
/// (full sweep up to the source's own bound). Sourced from
|
||||
/// `CRAWLER_LIMIT`, mirroring the CLI binary.
|
||||
pub manga_limit: usize,
|
||||
/// Hard upper bound on a single chapter-content job dispatch. A job
|
||||
/// exceeding this is acked failed (exponential backoff) instead of
|
||||
/// wedging a worker. Defaults to 600s. `CRAWLER_JOB_TIMEOUT_SECS`.
|
||||
pub job_timeout: Duration,
|
||||
/// Consecutive `fetch_manga` failures that abort a metadata pass
|
||||
/// (circuit-breaker for a source outage). The pass does NOT mark a
|
||||
/// clean exit, so the next tick does a recovery sweep. Defaults to
|
||||
/// 10. `CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES`.
|
||||
pub metadata_max_consecutive_failures: u32,
|
||||
/// Consecutive transient chapter failures (after TOR recircuit is
|
||||
/// exhausted) that trigger an automatic coordinated browser restart.
|
||||
/// Defaults to 3. `CRAWLER_BROWSER_RESTART_THRESHOLD`.
|
||||
pub browser_restart_threshold: u32,
|
||||
}
|
||||
|
||||
impl Default for CrawlerConfig {
|
||||
@@ -159,6 +172,9 @@ impl Default for CrawlerConfig {
|
||||
download_allowlist: DownloadAllowlist::new(),
|
||||
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
||||
manga_limit: 0,
|
||||
job_timeout: Duration::from_secs(600),
|
||||
metadata_max_consecutive_failures: 10,
|
||||
browser_restart_threshold: 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -283,6 +299,13 @@ impl CrawlerConfig {
|
||||
download_allowlist,
|
||||
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
||||
manga_limit: env_usize("CRAWLER_LIMIT", 0),
|
||||
job_timeout: Duration::from_secs(env_u64("CRAWLER_JOB_TIMEOUT_SECS", 600).max(1)),
|
||||
metadata_max_consecutive_failures: env_u64(
|
||||
"CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES",
|
||||
10,
|
||||
) as u32,
|
||||
browser_restart_threshold: env_u64("CRAWLER_BROWSER_RESTART_THRESHOLD", 3).max(1)
|
||||
as u32,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -384,6 +407,33 @@ mod tests {
|
||||
assert_eq!(cfg.manga_limit, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reliability_knobs_default_when_unset() {
|
||||
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
|
||||
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
|
||||
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
|
||||
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
|
||||
let cfg = CrawlerConfig::from_env().expect("from_env");
|
||||
assert_eq!(cfg.job_timeout, Duration::from_secs(600));
|
||||
assert_eq!(cfg.metadata_max_consecutive_failures, 10);
|
||||
assert_eq!(cfg.browser_restart_threshold, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reliability_knobs_parse_from_env() {
|
||||
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
|
||||
std::env::set_var("CRAWLER_JOB_TIMEOUT_SECS", "120");
|
||||
std::env::set_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES", "5");
|
||||
std::env::set_var("CRAWLER_BROWSER_RESTART_THRESHOLD", "7");
|
||||
let cfg = CrawlerConfig::from_env().expect("from_env");
|
||||
std::env::remove_var("CRAWLER_JOB_TIMEOUT_SECS");
|
||||
std::env::remove_var("CRAWLER_METADATA_MAX_CONSECUTIVE_FAILURES");
|
||||
std::env::remove_var("CRAWLER_BROWSER_RESTART_THRESHOLD");
|
||||
assert_eq!(cfg.job_timeout, Duration::from_secs(120));
|
||||
assert_eq!(cfg.metadata_max_consecutive_failures, 5);
|
||||
assert_eq!(cfg.browser_restart_threshold, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn private_mode_env_parses_true() {
|
||||
let _g = ENV_GUARD.lock().unwrap_or_else(|p| p.into_inner());
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
//! until [`BrowserManager::shutdown`].
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -71,12 +71,42 @@ impl ActiveTracker {
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifecycle gate for a coordinated browser restart. `acquire()` parks
|
||||
/// while not [`RestartPhase::Healthy`] so no new navigation starts mid-
|
||||
/// restart; long-lived lease holders (the metadata pass) cooperate by
|
||||
/// checking [`BrowserManager::is_restart_pending`] at safe boundaries.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum RestartPhase {
|
||||
/// Normal operation — acquires proceed.
|
||||
Healthy,
|
||||
/// Restart requested; new acquires park, waiting for in-flight leases
|
||||
/// to drain.
|
||||
Draining,
|
||||
/// Chromium is being closed + relaunched.
|
||||
Restarting,
|
||||
}
|
||||
|
||||
const PHASE_HEALTHY: u8 = 0;
|
||||
const PHASE_DRAINING: u8 = 1;
|
||||
const PHASE_RESTARTING: u8 = 2;
|
||||
|
||||
pub struct BrowserManager {
|
||||
inner: Mutex<Inner>,
|
||||
active: Arc<ActiveTracker>,
|
||||
launch_opts: LaunchOptions,
|
||||
idle_timeout: Duration,
|
||||
on_launch: OnLaunch,
|
||||
/// Coarse lifecycle phase (one of the `PHASE_*` constants).
|
||||
phase: AtomicU8,
|
||||
/// Woken when the phase returns to `Healthy` so parked acquires resume.
|
||||
resume: Notify,
|
||||
/// Serialises coordinated restarts so concurrent requests collapse into
|
||||
/// a single relaunch.
|
||||
restart_lock: Mutex<()>,
|
||||
/// Result of the most recent relaunch, so a caller that coalesced into
|
||||
/// an in-progress restart reports that restart's real outcome instead
|
||||
/// of a blind success.
|
||||
last_restart_ok: AtomicBool,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
@@ -99,28 +129,72 @@ impl BrowserManager {
|
||||
launch_opts,
|
||||
idle_timeout,
|
||||
on_launch,
|
||||
phase: AtomicU8::new(PHASE_HEALTHY),
|
||||
resume: Notify::new(),
|
||||
restart_lock: Mutex::new(()),
|
||||
last_restart_ok: AtomicBool::new(true),
|
||||
})
|
||||
}
|
||||
|
||||
/// Current restart phase.
|
||||
pub fn phase(&self) -> RestartPhase {
|
||||
match self.phase.load(Ordering::Acquire) {
|
||||
PHASE_DRAINING => RestartPhase::Draining,
|
||||
PHASE_RESTARTING => RestartPhase::Restarting,
|
||||
_ => RestartPhase::Healthy,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_phase(&self, phase: RestartPhase) {
|
||||
let v = match phase {
|
||||
RestartPhase::Healthy => PHASE_HEALTHY,
|
||||
RestartPhase::Draining => PHASE_DRAINING,
|
||||
RestartPhase::Restarting => PHASE_RESTARTING,
|
||||
};
|
||||
self.phase.store(v, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Whether a coordinated restart is in progress. Long-lived lease
|
||||
/// holders poll this at safe boundaries and yield their lease so the
|
||||
/// drain can complete promptly.
|
||||
pub fn is_restart_pending(&self) -> bool {
|
||||
self.phase() != RestartPhase::Healthy
|
||||
}
|
||||
|
||||
/// Launch Chromium into `guard`, running the `on_launch` hook before
|
||||
/// publishing the handle so a probe failure doesn't leave a half-
|
||||
/// initialised browser behind.
|
||||
async fn launch_into(&self, guard: &mut Inner) -> anyhow::Result<()> {
|
||||
let handle = browser::launch(self.launch_opts.clone())
|
||||
.await
|
||||
.context("BrowserManager: launch chromium")?;
|
||||
let shared = handle.shared();
|
||||
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
|
||||
let _ = handle.close().await;
|
||||
return Err(e.context("BrowserManager: on_launch hook failed"));
|
||||
}
|
||||
guard.handle = Some(handle);
|
||||
guard.shared = Some(shared);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Acquire a shared browser lease. The first acquire after a teardown
|
||||
/// launches a fresh Chromium (and runs `on_launch`); subsequent acquires
|
||||
/// while a process is alive just bump the counter and clone the `Arc`.
|
||||
pub async fn acquire(&self) -> anyhow::Result<BrowserLease> {
|
||||
// Park while a coordinated restart is draining/relaunching so no new
|
||||
// navigation starts against a browser that's about to be torn down.
|
||||
// The short sleep fallback guarantees liveness even if a `resume`
|
||||
// notification is missed (classic Notify lost-wakeup).
|
||||
while self.phase() != RestartPhase::Healthy {
|
||||
tokio::select! {
|
||||
_ = self.resume.notified() => {}
|
||||
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
|
||||
}
|
||||
}
|
||||
let mut guard = self.inner.lock().await;
|
||||
if guard.handle.is_none() {
|
||||
let handle = browser::launch(self.launch_opts.clone())
|
||||
.await
|
||||
.context("BrowserManager: launch chromium")?;
|
||||
let shared = handle.shared();
|
||||
// Run the on-launch hook before publishing the handle so a session
|
||||
// probe failure doesn't leave a half-initialized browser behind.
|
||||
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
|
||||
// Close the just-launched browser since we won't be using it.
|
||||
let _ = handle.close().await;
|
||||
return Err(e.context("BrowserManager: on_launch hook failed"));
|
||||
}
|
||||
guard.handle = Some(handle);
|
||||
guard.shared = Some(shared);
|
||||
self.launch_into(&mut guard).await?;
|
||||
}
|
||||
let browser = guard
|
||||
.shared
|
||||
@@ -134,6 +208,51 @@ impl BrowserManager {
|
||||
})
|
||||
}
|
||||
|
||||
/// Coordinated restart: block new acquires, wait for in-flight leases
|
||||
/// to drain (up to `drain_deadline`, then force), close + relaunch
|
||||
/// Chromium (re-running `on_launch` → re-inject session + probe), then
|
||||
/// resume parked acquirers. Concurrent calls collapse into one
|
||||
/// relaunch. The phase is always returned to `Healthy` — even if the
|
||||
/// relaunch errors — so a failed restart never permanently wedges
|
||||
/// acquisition (the next acquire retries the launch lazily).
|
||||
pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> {
|
||||
// Dedup: if a restart is already running, wait for it and report
|
||||
// that restart's real outcome (not a blind success).
|
||||
let _restart_guard = match self.restart_lock.try_lock() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
let _ = self.restart_lock.lock().await;
|
||||
return if self.last_restart_ok.load(Ordering::Acquire) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow::anyhow!("a concurrent coordinated browser restart failed"))
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
self.set_phase(RestartPhase::Draining);
|
||||
await_drain(&self.active, drain_deadline).await;
|
||||
|
||||
self.set_phase(RestartPhase::Restarting);
|
||||
let relaunch = {
|
||||
let mut guard = self.inner.lock().await;
|
||||
guard.shared = None;
|
||||
if let Some(handle) = guard.handle.take() {
|
||||
let _ = handle.close().await;
|
||||
}
|
||||
self.launch_into(&mut guard).await
|
||||
};
|
||||
|
||||
self.last_restart_ok.store(relaunch.is_ok(), Ordering::Release);
|
||||
self.set_phase(RestartPhase::Healthy);
|
||||
self.resume.notify_waiters();
|
||||
match &relaunch {
|
||||
Ok(()) => tracing::info!("BrowserManager: coordinated restart complete"),
|
||||
Err(e) => tracing::error!(error = ?e, "BrowserManager: coordinated restart relaunch failed"),
|
||||
}
|
||||
relaunch.context("coordinated_restart: relaunch")
|
||||
}
|
||||
|
||||
/// Forcefully close the cached browser regardless of active count.
|
||||
/// Used on daemon shutdown. After this returns the next acquire will
|
||||
/// re-launch from scratch.
|
||||
@@ -176,6 +295,29 @@ impl BrowserManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the active-lease count to reach zero, up to `deadline`. Wakes
|
||||
/// on the tracker's idle signal and re-checks on a short poll so a missed
|
||||
/// signal can't strand the drain. Returns when drained or when the
|
||||
/// deadline elapses (the caller then force-restarts). Extracted as a free
|
||||
/// fn so the timing logic is unit-testable without launching Chromium.
|
||||
async fn await_drain(active: &Arc<ActiveTracker>, deadline: Duration) {
|
||||
let start = tokio::time::Instant::now();
|
||||
while active.current() > 0 {
|
||||
let Some(remaining) = deadline.checked_sub(start.elapsed()) else {
|
||||
tracing::warn!(
|
||||
active = active.current(),
|
||||
"coordinated_restart: drain deadline exceeded — forcing relaunch"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let nap = remaining.min(Duration::from_millis(250));
|
||||
tokio::select! {
|
||||
_ = active.idle_signal().notified() => {}
|
||||
_ = tokio::time::sleep(nap) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background reaper. Returns immediately when `idle_timeout == 0`.
|
||||
/// Otherwise spawns a task that:
|
||||
/// 1. Waits on `idle_signal` (woken when active hits zero).
|
||||
@@ -270,6 +412,63 @@ mod tests {
|
||||
mgr.invalidate().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_returns_immediately_when_already_idle() {
|
||||
let active = ActiveTracker::new();
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_secs(5)).await;
|
||||
assert!(start.elapsed() < Duration::from_millis(200), "no wait when idle");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_completes_when_lease_released() {
|
||||
let active = ActiveTracker::new();
|
||||
active.acquire();
|
||||
let bg = {
|
||||
let a = Arc::clone(&active);
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
a.release();
|
||||
})
|
||||
};
|
||||
// Generous deadline; should return shortly after the release, not
|
||||
// at the deadline.
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_secs(5)).await;
|
||||
assert!(start.elapsed() < Duration::from_secs(2), "drained on release");
|
||||
assert_eq!(active.current(), 0);
|
||||
bg.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_force_returns_after_deadline_when_stuck() {
|
||||
let active = ActiveTracker::new();
|
||||
active.acquire(); // never released
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_millis(300)).await;
|
||||
let elapsed = start.elapsed();
|
||||
assert!(elapsed >= Duration::from_millis(250), "waited ~deadline: {elapsed:?}");
|
||||
assert!(elapsed < Duration::from_secs(2), "but not forever: {elapsed:?}");
|
||||
assert_eq!(active.current(), 1, "still held — caller force-restarts");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn phase_transitions_reflect_is_restart_pending() {
|
||||
let mgr = BrowserManager::new(
|
||||
crate::crawler::browser::LaunchOptions::default(),
|
||||
Duration::ZERO,
|
||||
noop_on_launch(),
|
||||
);
|
||||
assert_eq!(mgr.phase(), RestartPhase::Healthy);
|
||||
assert!(!mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Draining);
|
||||
assert!(mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Restarting);
|
||||
assert!(mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Healthy);
|
||||
assert!(!mgr.is_restart_pending());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn active_tracker_signals_idle_only_on_zero_transition() {
|
||||
let tracker = ActiveTracker::new();
|
||||
|
||||
@@ -186,11 +186,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch all images for one chapter and persist them atomically. On
|
||||
/// any error after the first storage put, the DB transaction rolls
|
||||
/// back so the chapter stays at `page_count = 0` and is retried on the
|
||||
/// next run. Bytes already written to storage become orphans; a future
|
||||
/// reaper sweeps them.
|
||||
/// Fetch one chapter's images and persist them. Each image is streamed to
|
||||
/// storage as it's fetched (peak memory ≈ one image, not the whole
|
||||
/// chapter); the page rows + `page_count` are then written in one short
|
||||
/// transaction. On any failure the chapter stays at `page_count = 0` (no
|
||||
/// partial rows) and the blobs already written are deleted best-effort by
|
||||
/// [`cleanup_orphans`], so a retry starts clean.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn sync_chapter_content(
|
||||
browser: &chromiumoxide::Browser,
|
||||
@@ -205,6 +206,10 @@ pub async fn sync_chapter_content(
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
// Optional live-status sink for the realtime page counter. The daemon
|
||||
// dispatcher passes the shared handle (the chapter has already been
|
||||
// registered via `begin_chapter`); the CLI / admin resync pass `None`.
|
||||
progress: Option<&crate::crawler::status::StatusHandle>,
|
||||
) -> anyhow::Result<SyncOutcome> {
|
||||
// Skip if already fetched, unless caller explicitly forces.
|
||||
if !force_refetch {
|
||||
@@ -260,56 +265,127 @@ pub async fn sync_chapter_content(
|
||||
// Resolve image URLs against the chapter URL (they may be relative).
|
||||
let base = reqwest::Url::parse(source_url).context("parse chapter URL")?;
|
||||
|
||||
// Fetch every image bytes-first into memory before writing
|
||||
// anything. Lets us bail the whole chapter cleanly if any image
|
||||
// fails — DB stays at page_count=0, no partial rows persisted.
|
||||
let mut fetched: Vec<(i32, Vec<u8>, &'static str)> = Vec::with_capacity(images.len());
|
||||
// Stream each image straight to storage as it's fetched, capping peak
|
||||
// memory at a single image rather than the whole chapter. Track the
|
||||
// keys written so they can be rolled back if a later page (or the
|
||||
// final DB commit) fails — preserving the all-or-nothing guarantee
|
||||
// without holding a DB transaction open across the network puts
|
||||
// (which matters once `Storage` is backed by S3).
|
||||
let total = images.len();
|
||||
// Publish the now-known page total so the dashboard shows "0/N".
|
||||
if let Some(p) = progress {
|
||||
p.set_chapter_pages(chapter_id, 0, Some(total));
|
||||
}
|
||||
let mut written_keys: Vec<String> = Vec::with_capacity(total);
|
||||
let mut stored: Vec<StoredPage> = Vec::with_capacity(total);
|
||||
for img in &images {
|
||||
let url = base.join(&img.url).with_context(|| {
|
||||
format!("join image URL {} onto {source_url}", img.url)
|
||||
})?;
|
||||
rate.wait_for(url.as_str()).await?;
|
||||
let bytes = fetch_bytes_capped(
|
||||
match download_and_store_page(
|
||||
storage,
|
||||
http,
|
||||
url.as_str(),
|
||||
Some(source_url),
|
||||
rate,
|
||||
&base,
|
||||
source_url,
|
||||
manga_id,
|
||||
chapter_id,
|
||||
img,
|
||||
allowlist,
|
||||
max_image_bytes,
|
||||
)
|
||||
.await?
|
||||
.to_vec();
|
||||
// Reject any non-image response: the only valid output of an
|
||||
// image URL is an image. `infer` returns None on truncated
|
||||
// bytes too, which also wants to be a failure not a silent
|
||||
// `.bin` extension.
|
||||
if !looks_like_image(&bytes) {
|
||||
anyhow::bail!(
|
||||
"image URL {url} returned non-image bytes \
|
||||
(first 16: {:?}); refusing to store as binary blob",
|
||||
&bytes.get(..16.min(bytes.len()))
|
||||
);
|
||||
.await
|
||||
{
|
||||
Ok(page) => {
|
||||
written_keys.push(page.storage_key.clone());
|
||||
stored.push(page);
|
||||
// Live page counter: push the climbing count to subscribers.
|
||||
if let Some(p) = progress {
|
||||
p.set_chapter_pages(chapter_id, stored.len(), Some(total));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
cleanup_orphans(storage, &written_keys).await;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
let ext = infer::get(&bytes)
|
||||
.map(|k| k.extension())
|
||||
.expect("looks_like_image asserted infer succeeded");
|
||||
fetched.push((img.page_number, bytes, ext));
|
||||
}
|
||||
|
||||
// Atomic write: storage puts + page row inserts + page_count
|
||||
// update, all in one transaction. If anything fails, rollback +
|
||||
// the chapter is retried next run. Storage orphans the bytes; a
|
||||
// reaper sweeps them later.
|
||||
let mut tx = db.begin().await.context("open chapter sync tx")?;
|
||||
for (page_number, bytes, ext) in &fetched {
|
||||
let key = format!(
|
||||
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
|
||||
page_number
|
||||
// Short transaction: page rows + page_count only, no network I/O. On
|
||||
// failure, roll back the stored bytes so the chapter stays at
|
||||
// page_count=0 and is retried cleanly next run.
|
||||
if let Err(e) = persist_pages(db, chapter_id, &stored).await {
|
||||
cleanup_orphans(storage, &written_keys).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(SyncOutcome::Fetched { pages: stored.len() })
|
||||
}
|
||||
|
||||
/// A page image that has been written to storage and is awaiting its DB
|
||||
/// row. Carries everything `persist_pages` needs.
|
||||
pub(crate) struct StoredPage {
|
||||
page_number: i32,
|
||||
storage_key: String,
|
||||
content_type: String,
|
||||
}
|
||||
|
||||
/// Download a single page image, validate it's really an image, and write
|
||||
/// it to storage. Returns the storage key + content type. Does not touch
|
||||
/// the DB — persistence is batched into one short transaction afterward.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn download_and_store_page(
|
||||
storage: &dyn Storage,
|
||||
http: &reqwest::Client,
|
||||
rate: &HostRateLimiters,
|
||||
base: &reqwest::Url,
|
||||
source_url: &str,
|
||||
manga_id: Uuid,
|
||||
chapter_id: Uuid,
|
||||
img: &ChapterImage,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
) -> anyhow::Result<StoredPage> {
|
||||
let url = base
|
||||
.join(&img.url)
|
||||
.with_context(|| format!("join image URL {} onto {source_url}", img.url))?;
|
||||
rate.wait_for(url.as_str()).await?;
|
||||
let bytes = fetch_bytes_capped(http, url.as_str(), Some(source_url), allowlist, max_image_bytes)
|
||||
.await?;
|
||||
// Reject any non-image response: the only valid output of an image URL
|
||||
// is an image. `infer` returns None on truncated bytes too, which also
|
||||
// wants to be a failure not a silent `.bin` extension.
|
||||
if !looks_like_image(&bytes) {
|
||||
anyhow::bail!(
|
||||
"image URL {url} returned non-image bytes \
|
||||
(first 16: {:?}); refusing to store as binary blob",
|
||||
&bytes.get(..16.min(bytes.len()))
|
||||
);
|
||||
storage
|
||||
.put(&key, bytes)
|
||||
.await
|
||||
.with_context(|| format!("put {key}"))?;
|
||||
// (chapter_id, page_number) is unique — re-runs idempotent.
|
||||
}
|
||||
let ext = infer::get(&bytes)
|
||||
.map(|k| k.extension())
|
||||
.expect("looks_like_image asserted infer succeeded");
|
||||
let key = format!(
|
||||
"mangas/{manga_id}/chapters/{chapter_id}/pages/{:04}.{ext}",
|
||||
img.page_number
|
||||
);
|
||||
storage
|
||||
.put(&key, &bytes)
|
||||
.await
|
||||
.with_context(|| format!("put {key}"))?;
|
||||
Ok(StoredPage {
|
||||
page_number: img.page_number,
|
||||
storage_key: key,
|
||||
content_type: format!("image/{ext}"),
|
||||
})
|
||||
}
|
||||
|
||||
/// Persist the page rows + chapter `page_count` in one short transaction.
|
||||
/// `(chapter_id, page_number)` is unique so re-runs are idempotent.
|
||||
pub(crate) async fn persist_pages(
|
||||
db: &PgPool,
|
||||
chapter_id: Uuid,
|
||||
stored: &[StoredPage],
|
||||
) -> anyhow::Result<()> {
|
||||
let mut tx = db.begin().await.context("open chapter sync tx")?;
|
||||
for page in stored {
|
||||
sqlx::query(
|
||||
"INSERT INTO pages (chapter_id, page_number, storage_key, content_type)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
@@ -318,22 +394,36 @@ pub async fn sync_chapter_content(
|
||||
content_type = EXCLUDED.content_type",
|
||||
)
|
||||
.bind(chapter_id)
|
||||
.bind(page_number)
|
||||
.bind(&key)
|
||||
.bind(format!("image/{ext}"))
|
||||
.bind(page.page_number)
|
||||
.bind(&page.storage_key)
|
||||
.bind(&page.content_type)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.with_context(|| format!("insert page row {page_number}"))?;
|
||||
.with_context(|| format!("insert page row {}", page.page_number))?;
|
||||
}
|
||||
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
|
||||
.bind(fetched.len() as i32)
|
||||
.bind(stored.len() as i32)
|
||||
.bind(chapter_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.context("update page_count")?;
|
||||
tx.commit().await.context("commit chapter sync")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Ok(SyncOutcome::Fetched { pages: fetched.len() })
|
||||
/// Best-effort delete of partially-written page blobs after a chapter sync
|
||||
/// fails, so a retry doesn't accumulate orphans. Errors are logged, not
|
||||
/// raised — a leftover blob is harmless and a future reaper can sweep it.
|
||||
pub(crate) async fn cleanup_orphans(storage: &dyn Storage, keys: &[String]) {
|
||||
for key in keys {
|
||||
if let Err(e) = storage.delete(key).await {
|
||||
tracing::warn!(
|
||||
%key,
|
||||
error = ?e,
|
||||
"failed to delete orphaned page blob after chapter sync failure"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Suppress unused-import warning for `session::registrable_domain`
|
||||
@@ -347,6 +437,90 @@ fn _keep_session_in_scope() {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::storage::LocalStorage;
|
||||
|
||||
#[tokio::test]
|
||||
async fn cleanup_orphans_deletes_written_keys() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let storage = LocalStorage::new(dir.path());
|
||||
let keys = vec![
|
||||
"mangas/m/chapters/c/pages/0001.jpg".to_string(),
|
||||
"mangas/m/chapters/c/pages/0002.jpg".to_string(),
|
||||
];
|
||||
for k in &keys {
|
||||
storage.put(k, b"\xff\xd8\xff\xe0 jpeg-ish").await.unwrap();
|
||||
assert!(storage.exists(k).await.unwrap());
|
||||
}
|
||||
cleanup_orphans(&storage, &keys).await;
|
||||
for k in &keys {
|
||||
assert!(!storage.exists(k).await.unwrap(), "{k} should be deleted");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cleanup_orphans_tolerates_missing_keys() {
|
||||
// A key that was never written (e.g. the put itself failed) must
|
||||
// not make cleanup error — it's best-effort.
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let storage = LocalStorage::new(dir.path());
|
||||
cleanup_orphans(&storage, &["never/written.jpg".to_string()]).await;
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn persist_pages_inserts_rows_and_sets_page_count(pool: PgPool) {
|
||||
let manga_id = Uuid::new_v4();
|
||||
let chapter_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, 'T')")
|
||||
.bind(manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
|
||||
.bind(chapter_id)
|
||||
.bind(manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let stored = vec![
|
||||
StoredPage {
|
||||
page_number: 1,
|
||||
storage_key: "k/0001.jpg".into(),
|
||||
content_type: "image/jpeg".into(),
|
||||
},
|
||||
StoredPage {
|
||||
page_number: 2,
|
||||
storage_key: "k/0002.jpg".into(),
|
||||
content_type: "image/jpeg".into(),
|
||||
},
|
||||
];
|
||||
persist_pages(&pool, chapter_id, &stored).await.unwrap();
|
||||
|
||||
let page_count: i32 =
|
||||
sqlx::query_scalar("SELECT page_count FROM chapters WHERE id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(page_count, 2);
|
||||
let rows: i64 =
|
||||
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows, 2);
|
||||
|
||||
// Idempotent re-run (force refetch path): same rows, page_count stable.
|
||||
persist_pages(&pool, chapter_id, &stored).await.unwrap();
|
||||
let rows2: i64 =
|
||||
sqlx::query_scalar("SELECT COUNT(*) FROM pages WHERE chapter_id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(rows2, 2, "re-run is idempotent via ON CONFLICT");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_chapter_pages_skips_loader_and_sorts_by_id() {
|
||||
|
||||
@@ -48,6 +48,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::crawler::content::SyncOutcome;
|
||||
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
|
||||
use crate::crawler::pipeline;
|
||||
use crate::crawler::status::{Phase, StatusHandle};
|
||||
|
||||
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
|
||||
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
|
||||
@@ -56,6 +57,15 @@ pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244;
|
||||
|
||||
const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at";
|
||||
|
||||
/// Lease window handed to `jobs::lease`. Kept short, but continuously
|
||||
/// extended by the per-job heartbeat (see [`WorkerContext::process_lease`])
|
||||
/// so a long-but-healthy job never lapses and gets stolen.
|
||||
const LEASE_DURATION: Duration = Duration::from_secs(60);
|
||||
|
||||
/// How often the heartbeat renews the lease while a job runs. A third of
|
||||
/// the lease window leaves two missed-beat's slack before expiry.
|
||||
const LEASE_HEARTBEAT: Duration = Duration::from_secs(20);
|
||||
|
||||
#[async_trait]
|
||||
pub trait MetadataPass: Send + Sync {
|
||||
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>;
|
||||
@@ -77,6 +87,13 @@ pub struct DaemonConfig {
|
||||
pub tz: Tz,
|
||||
pub retention_days: u32,
|
||||
pub session_expired: Arc<AtomicBool>,
|
||||
/// Live status surface updated by the cron + workers.
|
||||
pub status: StatusHandle,
|
||||
/// Hard upper bound on a single job's dispatch. A job that exceeds it
|
||||
/// is acked failed (exponential backoff) rather than wedging a worker
|
||||
/// forever. Must exceed [`LEASE_HEARTBEAT`] and the realistic
|
||||
/// single-job runtime.
|
||||
pub job_timeout: Duration,
|
||||
/// Tasks that should run alongside the cron + workers and be cancelled
|
||||
/// on shutdown. Used to hand the daemon ownership of the browser
|
||||
/// manager's idle reaper.
|
||||
@@ -123,6 +140,8 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
|
||||
tz,
|
||||
retention_days,
|
||||
session_expired,
|
||||
status,
|
||||
job_timeout,
|
||||
extra_tasks,
|
||||
} = cfg;
|
||||
|
||||
@@ -134,6 +153,7 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
|
||||
tz,
|
||||
retention_days,
|
||||
metadata,
|
||||
status: status.clone(),
|
||||
};
|
||||
join.spawn(async move { ctx.run().await });
|
||||
} else {
|
||||
@@ -146,6 +166,8 @@ pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> Daem
|
||||
cancel: cancel.clone(),
|
||||
dispatcher: Arc::clone(&dispatcher),
|
||||
session_expired: Arc::clone(&session_expired),
|
||||
status: status.clone(),
|
||||
job_timeout,
|
||||
id: worker_id,
|
||||
};
|
||||
join.spawn(async move { ctx.run().await });
|
||||
@@ -169,6 +191,7 @@ struct CronContext {
|
||||
tz: Tz,
|
||||
retention_days: u32,
|
||||
metadata: Arc<dyn MetadataPass>,
|
||||
status: StatusHandle,
|
||||
}
|
||||
|
||||
impl CronContext {
|
||||
@@ -196,6 +219,11 @@ impl CronContext {
|
||||
// (NTP step, suspend/resume) don't strand us on a stale instant.
|
||||
let next = next_fire(Utc::now(), self.daily_at, self.tz);
|
||||
let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO);
|
||||
self.status
|
||||
.set_phase(Phase::Idle {
|
||||
next_fire: Some(next),
|
||||
})
|
||||
.await;
|
||||
tracing::info!(
|
||||
next_fire_utc = %next.to_rfc3339(),
|
||||
wait_seconds = wait.as_secs(),
|
||||
@@ -243,9 +271,13 @@ impl CronContext {
|
||||
let metadata = &self.metadata;
|
||||
let pool = &self.pool;
|
||||
let retention_days = self.retention_days;
|
||||
let status = &self.status;
|
||||
let body = async move {
|
||||
match metadata.run().await {
|
||||
Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"),
|
||||
Ok(stats) => {
|
||||
status.record_pass(&stats, Utc::now()).await;
|
||||
tracing::info!(?stats, "cron: metadata pass done");
|
||||
}
|
||||
Err(e) => tracing::error!(?e, "cron: metadata pass failed"),
|
||||
}
|
||||
match pipeline::enqueue_bookmarked_pending(pool).await {
|
||||
@@ -283,6 +315,8 @@ struct WorkerContext {
|
||||
cancel: CancellationToken,
|
||||
dispatcher: Arc<dyn ChapterDispatcher>,
|
||||
session_expired: Arc<AtomicBool>,
|
||||
status: StatusHandle,
|
||||
job_timeout: Duration,
|
||||
id: usize,
|
||||
}
|
||||
|
||||
@@ -303,7 +337,7 @@ impl WorkerContext {
|
||||
&self.pool,
|
||||
Some(KIND_SYNC_CHAPTER_CONTENT),
|
||||
1,
|
||||
Duration::from_secs(60),
|
||||
LEASE_DURATION,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -341,9 +375,59 @@ impl WorkerContext {
|
||||
}
|
||||
}
|
||||
|
||||
let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
|
||||
.catch_unwind()
|
||||
.await;
|
||||
// Heartbeat: keep the lease fresh while the (potentially long)
|
||||
// dispatch runs, so a slow-but-healthy job is never re-leased and
|
||||
// never inflates `attempts` toward `max_attempts`. Stops itself
|
||||
// once the job is no longer ours (renew returns false).
|
||||
let heartbeat = {
|
||||
let hb_pool = self.pool.clone();
|
||||
let hb_id = lease.id;
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(LEASE_HEARTBEAT).await;
|
||||
match jobs::renew(&hb_pool, hb_id, LEASE_DURATION).await {
|
||||
Ok(true) => {}
|
||||
Ok(false) => break,
|
||||
Err(e) => {
|
||||
tracing::warn!(lease_id = %hb_id, ?e, "heartbeat renew failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
// The "currently crawling" chapter (with its live page count) is
|
||||
// registered by the dispatcher itself (RealChapterDispatcher) so it
|
||||
// carries the manga/chapter identity + page progress and is removed
|
||||
// via an RAII guard on every exit path.
|
||||
|
||||
// Outer timeout: a dispatch that exceeds `job_timeout` is acked
|
||||
// failed (exponential backoff) rather than wedging the worker.
|
||||
let dispatch = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
|
||||
.catch_unwind();
|
||||
let outcome = tokio::time::timeout(self.job_timeout, dispatch).await;
|
||||
heartbeat.abort();
|
||||
|
||||
let outcome = match outcome {
|
||||
Ok(o) => o,
|
||||
Err(_elapsed) => {
|
||||
tracing::warn!(
|
||||
worker = self.id,
|
||||
lease_id = %lease.id,
|
||||
timeout_secs = self.job_timeout.as_secs(),
|
||||
"worker: dispatch timed out — ack failed"
|
||||
);
|
||||
let _ = jobs::ack_failed(
|
||||
&self.pool,
|
||||
lease.id,
|
||||
"dispatch timed out",
|
||||
lease.attempts,
|
||||
lease.max_attempts,
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
match outcome {
|
||||
Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => {
|
||||
let _ = jobs::ack_done(&self.pool, lease.id).await;
|
||||
@@ -355,6 +439,8 @@ impl WorkerContext {
|
||||
"session expired — workers will idle until restart"
|
||||
);
|
||||
self.session_expired.store(true, Ordering::Release);
|
||||
// Push the session-expired flip to live status subscribers.
|
||||
self.status.poke();
|
||||
let _ = jobs::release(&self.pool, lease.id).await;
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
|
||||
@@ -66,16 +66,33 @@ pub struct Lease {
|
||||
pub max_attempts: i32,
|
||||
}
|
||||
|
||||
/// Exponential backoff for `ack_failed` retries. `attempts` is the
|
||||
/// post-increment value reported by `lease()` (so the first failure has
|
||||
/// `attempts == 1` and waits 60s, the second 120s, etc.). Capped at 1h to
|
||||
/// avoid runaway long sleeps that would outlive the daemon process.
|
||||
fn backoff_for(attempts: i32) -> Duration {
|
||||
/// Deterministic exponential backoff base for `ack_failed` retries.
|
||||
/// `attempts` is the post-increment value reported by `lease()` (so the
|
||||
/// first failure has `attempts == 1` and waits 60s, the second 120s,
|
||||
/// etc.). Capped at 1h to avoid runaway long sleeps that would outlive
|
||||
/// the daemon process. Jitter is applied separately by [`apply_jitter`].
|
||||
fn backoff_base(attempts: i32) -> Duration {
|
||||
let shift = attempts.saturating_sub(1).clamp(0, 20) as u32;
|
||||
let secs = 60u64.saturating_mul(1u64 << shift);
|
||||
Duration::from_secs(secs.min(3600))
|
||||
}
|
||||
|
||||
/// Apply ±20% jitter to a backoff duration. `jitter` is a fraction in
|
||||
/// `[0.0, 1.0)` (e.g. `rand::random::<f64>()`), mapped to a multiplier in
|
||||
/// `[0.8, 1.2)`. Pure so the bounds stay unit-testable. Spreading retries
|
||||
/// avoids a thundering herd when a source outage fails many jobs at once.
|
||||
fn apply_jitter(base: Duration, jitter: f64) -> Duration {
|
||||
let frac = jitter.clamp(0.0, 1.0);
|
||||
let mult = 0.8 + 0.4 * frac; // [0.8, 1.2)
|
||||
Duration::from_secs((base.as_secs_f64() * mult).round() as u64)
|
||||
}
|
||||
|
||||
/// Jittered exponential backoff for `ack_failed`. Wraps [`backoff_base`]
|
||||
/// with a random ±20% spread.
|
||||
fn backoff_for(attempts: i32) -> Duration {
|
||||
apply_jitter(backoff_base(attempts), rand::random::<f64>())
|
||||
}
|
||||
|
||||
/// Insert a new pending job. For `SyncChapterContent` payloads the
|
||||
/// partial unique index `crawler_jobs_chapter_content_dedup_idx` blocks
|
||||
/// a second `(pending|running)` insert per chapter_id, returning
|
||||
@@ -159,6 +176,35 @@ pub async fn lease(
|
||||
Ok(leases)
|
||||
}
|
||||
|
||||
/// Extend the lease on a still-owned `running` job. Returns `true` if the
|
||||
/// row was updated (we still hold the lease), `false` if the job is no
|
||||
/// longer `running` (re-leased after a missed heartbeat, or already
|
||||
/// acked) — the caller's heartbeat loop should stop. The `state =
|
||||
/// 'running'` guard mirrors [`ack_done`]'s rationale.
|
||||
///
|
||||
/// This is the heartbeat primitive: a worker renews periodically while a
|
||||
/// long-but-healthy job runs so `leased_until` never lapses, which would
|
||||
/// otherwise let another worker steal the in-flight job and spuriously
|
||||
/// inflate `attempts` toward `max_attempts`.
|
||||
pub async fn renew(
|
||||
pool: &PgPool,
|
||||
lease_id: Uuid,
|
||||
lease_duration: Duration,
|
||||
) -> sqlx::Result<bool> {
|
||||
let lease_ms: i64 = lease_duration.as_millis().min(i64::MAX as u128) as i64;
|
||||
let res = sqlx::query(
|
||||
"UPDATE crawler_jobs \
|
||||
SET leased_until = now() + ($2::bigint || ' milliseconds')::interval, \
|
||||
updated_at = now() \
|
||||
WHERE id = $1 AND state = 'running'",
|
||||
)
|
||||
.bind(lease_id)
|
||||
.bind(lease_ms)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(res.rows_affected() > 0)
|
||||
}
|
||||
|
||||
/// Mark a leased job as successfully completed. The `state = 'running'`
|
||||
/// predicate guards against a late ack from a worker whose lease expired
|
||||
/// and was already re-leased by another worker: without it, the late ack
|
||||
@@ -278,19 +324,48 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn backoff_grows_exponentially_and_caps_at_one_hour() {
|
||||
fn backoff_base_grows_exponentially_and_caps_at_one_hour() {
|
||||
// attempts == 1 → 60s, doubling each step.
|
||||
assert_eq!(backoff_for(1), Duration::from_secs(60));
|
||||
assert_eq!(backoff_for(2), Duration::from_secs(120));
|
||||
assert_eq!(backoff_for(3), Duration::from_secs(240));
|
||||
assert_eq!(backoff_for(4), Duration::from_secs(480));
|
||||
assert_eq!(backoff_for(5), Duration::from_secs(960));
|
||||
assert_eq!(backoff_for(6), Duration::from_secs(1920));
|
||||
assert_eq!(backoff_base(1), Duration::from_secs(60));
|
||||
assert_eq!(backoff_base(2), Duration::from_secs(120));
|
||||
assert_eq!(backoff_base(3), Duration::from_secs(240));
|
||||
assert_eq!(backoff_base(4), Duration::from_secs(480));
|
||||
assert_eq!(backoff_base(5), Duration::from_secs(960));
|
||||
assert_eq!(backoff_base(6), Duration::from_secs(1920));
|
||||
// 7th: 60 * 64 = 3840 → capped to 3600.
|
||||
assert_eq!(backoff_for(7), Duration::from_secs(3600));
|
||||
assert_eq!(backoff_for(20), Duration::from_secs(3600));
|
||||
assert_eq!(backoff_base(7), Duration::from_secs(3600));
|
||||
assert_eq!(backoff_base(20), Duration::from_secs(3600));
|
||||
// Garbage / zero / negatives stay sane.
|
||||
assert_eq!(backoff_for(0), Duration::from_secs(60));
|
||||
assert_eq!(backoff_for(-5), Duration::from_secs(60));
|
||||
assert_eq!(backoff_base(0), Duration::from_secs(60));
|
||||
assert_eq!(backoff_base(-5), Duration::from_secs(60));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn apply_jitter_stays_within_plus_minus_twenty_percent() {
|
||||
let base = Duration::from_secs(100);
|
||||
// Lower bound (jitter = 0.0) → 0.8x.
|
||||
assert_eq!(apply_jitter(base, 0.0), Duration::from_secs(80));
|
||||
// Midpoint (jitter = 0.5) → 1.0x.
|
||||
assert_eq!(apply_jitter(base, 0.5), Duration::from_secs(100));
|
||||
// Upper end (jitter → 1.0) → ~1.2x.
|
||||
assert_eq!(apply_jitter(base, 1.0), Duration::from_secs(120));
|
||||
// Out-of-range inputs are clamped, never panic.
|
||||
assert_eq!(apply_jitter(base, -3.0), Duration::from_secs(80));
|
||||
assert_eq!(apply_jitter(base, 9.0), Duration::from_secs(120));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backoff_for_random_jitter_stays_in_band() {
|
||||
// The production wrapper draws its own randomness; assert the
|
||||
// result for a mid-range attempt always lands within the jitter
|
||||
// band of the base, across many draws.
|
||||
let base = backoff_base(3).as_secs_f64(); // 240s
|
||||
for _ in 0..1000 {
|
||||
let v = backoff_for(3).as_secs_f64();
|
||||
assert!(
|
||||
v >= base * 0.8 - 1.0 && v <= base * 1.2 + 1.0,
|
||||
"jittered backoff {v} outside band of base {base}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,8 @@ pub mod rate_limit;
|
||||
pub mod resync;
|
||||
pub mod safety;
|
||||
pub mod session;
|
||||
pub mod session_control;
|
||||
pub mod source;
|
||||
pub mod status;
|
||||
pub mod tor;
|
||||
pub mod url_utils;
|
||||
|
||||
@@ -65,6 +65,17 @@ pub(crate) fn should_mark_clean_exit(
|
||||
walked_to_completion || hit_stop_condition
|
||||
}
|
||||
|
||||
/// Circuit-breaker: abort the walk once `consecutive` `fetch_manga`
|
||||
/// failures reach `threshold`. A `threshold` of 0 disables the breaker
|
||||
/// (unbounded — the legacy behaviour). When it fires the caller must NOT
|
||||
/// mark a clean exit, so the next tick does a recovery sweep over the
|
||||
/// catalog tail the aborted pass never reached.
|
||||
///
|
||||
/// Pure so the rule is unit-testable without the walker.
|
||||
pub(crate) fn should_abort_pass(consecutive: u32, threshold: u32) -> bool {
|
||||
threshold > 0 && consecutive >= threshold
|
||||
}
|
||||
|
||||
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
|
||||
/// for the target source. Pure metadata; chapter content is enqueued as
|
||||
/// separate `SyncChapterContent` jobs by the caller after this returns.
|
||||
@@ -103,6 +114,8 @@ pub async fn run_metadata_pass(
|
||||
skip_chapters: bool,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
max_consecutive_failures: u32,
|
||||
status: Option<&crate::crawler::status::StatusHandle>,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
) -> anyhow::Result<MetadataStats> {
|
||||
let lease = browser_manager
|
||||
@@ -110,6 +123,9 @@ pub async fn run_metadata_pass(
|
||||
.await
|
||||
.context("acquire browser lease for metadata pass")?;
|
||||
let browser_ref: &chromiumoxide::Browser = &lease;
|
||||
if let Some(s) = status {
|
||||
s.set_phase(crate::crawler::status::Phase::WalkingList).await;
|
||||
}
|
||||
|
||||
let source = {
|
||||
let s = TargetSource::new(start_url.to_string());
|
||||
@@ -165,6 +181,11 @@ pub async fn run_metadata_pass(
|
||||
let mut walked_to_completion = false;
|
||||
let mut hit_limit = false;
|
||||
let mut hit_stop_condition = false;
|
||||
// Circuit-breaker state: consecutive fetch_manga failures. A sustained
|
||||
// run abort (source outage) leaves the pass un-clean → recovery sweep
|
||||
// next tick.
|
||||
let mut consecutive_failures = 0u32;
|
||||
let mut hit_failure_breaker = false;
|
||||
|
||||
'outer: loop {
|
||||
let batch = match walker.next_batch(&ctx).await? {
|
||||
@@ -175,6 +196,17 @@ pub async fn run_metadata_pass(
|
||||
}
|
||||
};
|
||||
for r in batch {
|
||||
// Cooperative checkpoint: if a coordinated browser restart is
|
||||
// pending, yield our (long-lived) lease so the drain can
|
||||
// proceed instead of stalling for the rest of the walk. The
|
||||
// pass exits un-clean, so the next tick recovery-sweeps the
|
||||
// tail we didn't reach.
|
||||
if browser_manager.is_restart_pending() {
|
||||
tracing::info!(
|
||||
"metadata pass: browser restart pending — yielding (recovery sweep next tick)"
|
||||
);
|
||||
break 'outer;
|
||||
}
|
||||
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
|
||||
hit_limit = true;
|
||||
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
|
||||
@@ -198,13 +230,24 @@ pub async fn run_metadata_pass(
|
||||
continue;
|
||||
}
|
||||
stats.discovered += 1;
|
||||
if let Some(s) = status {
|
||||
s.set_phase(crate::crawler::status::Phase::FetchingMetadata {
|
||||
index: stats.discovered,
|
||||
total: max_refs,
|
||||
title: r.title.clone(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
tracing::info!(
|
||||
idx = stats.discovered,
|
||||
key = %r.source_manga_key,
|
||||
"fetching metadata"
|
||||
);
|
||||
let manga = match source.fetch_manga(&ctx, &r).await {
|
||||
Ok(m) => m,
|
||||
Ok(m) => {
|
||||
consecutive_failures = 0;
|
||||
m
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
key = %r.source_manga_key,
|
||||
@@ -213,6 +256,17 @@ pub async fn run_metadata_pass(
|
||||
"fetch_manga failed"
|
||||
);
|
||||
stats.mangas_failed += 1;
|
||||
consecutive_failures += 1;
|
||||
if should_abort_pass(consecutive_failures, max_consecutive_failures) {
|
||||
hit_failure_breaker = true;
|
||||
tracing::error!(
|
||||
consecutive_failures,
|
||||
threshold = max_consecutive_failures,
|
||||
"metadata pass: too many consecutive fetch_manga failures; \
|
||||
aborting (recovery sweep on next tick)"
|
||||
);
|
||||
break 'outer;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -295,7 +349,14 @@ pub async fn run_metadata_pass(
|
||||
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
|
||||
if needs_cover {
|
||||
if let Some(cover_url) = manga.cover_url.as_deref() {
|
||||
match download_and_store_cover(
|
||||
if let Some(s) = status {
|
||||
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
|
||||
manga_id: upsert.manga_id,
|
||||
manga_title: manga.title.clone(),
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
let cover_result = download_and_store_cover(
|
||||
db,
|
||||
storage,
|
||||
http,
|
||||
@@ -306,8 +367,11 @@ pub async fn run_metadata_pass(
|
||||
allowlist,
|
||||
max_image_bytes,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
if let Some(s) = status {
|
||||
s.set_current_cover(None).await;
|
||||
}
|
||||
match cover_result {
|
||||
Ok(()) => stats.covers_fetched += 1,
|
||||
Err(e) => tracing::warn!(
|
||||
manga_id = %upsert.manga_id,
|
||||
@@ -390,6 +454,7 @@ pub async fn run_metadata_pass(
|
||||
walked_to_completion,
|
||||
hit_limit,
|
||||
hit_stop_condition,
|
||||
hit_failure_breaker,
|
||||
exited_cleanly,
|
||||
"metadata pass complete"
|
||||
);
|
||||
@@ -560,6 +625,7 @@ pub async fn backfill_missing_covers(
|
||||
max_mangas: usize,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
status: Option<&crate::crawler::status::StatusHandle>,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
) -> anyhow::Result<CoverBackfillStats> {
|
||||
let mut stats = CoverBackfillStats::default();
|
||||
@@ -582,8 +648,13 @@ pub async fn backfill_missing_covers(
|
||||
let browser_ref: &chromiumoxide::Browser = &lease;
|
||||
let ctx = FetchContext { browser: browser_ref, rate, tor };
|
||||
|
||||
for entry in entries {
|
||||
let total = entries.len();
|
||||
for (index, entry) in entries.into_iter().enumerate() {
|
||||
stats.considered += 1;
|
||||
if let Some(s) = status {
|
||||
s.set_phase(crate::crawler::status::Phase::CoverBackfill { index, total })
|
||||
.await;
|
||||
}
|
||||
// Metadata-only TargetSource: skip chapter-list parsing so a
|
||||
// missing-cover refetch doesn't soft-drop chapters on a partial
|
||||
// render. Cover URL alone is what we need.
|
||||
@@ -593,8 +664,8 @@ pub async fn backfill_missing_covers(
|
||||
title: String::new(),
|
||||
url: entry.source_url.clone(),
|
||||
};
|
||||
let cover_url = match source.fetch_manga(&ctx, &r).await {
|
||||
Ok(manga) => manga.cover_url,
|
||||
let manga = match source.fetch_manga(&ctx, &r).await {
|
||||
Ok(manga) => manga,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
manga_id = %entry.manga_id,
|
||||
@@ -606,7 +677,7 @@ pub async fn backfill_missing_covers(
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let Some(cover_url) = cover_url else {
|
||||
let Some(cover_url) = manga.cover_url.clone() else {
|
||||
tracing::warn!(
|
||||
manga_id = %entry.manga_id,
|
||||
url = %entry.source_url,
|
||||
@@ -615,7 +686,14 @@ pub async fn backfill_missing_covers(
|
||||
stats.failed += 1;
|
||||
continue;
|
||||
};
|
||||
match download_and_store_cover(
|
||||
if let Some(s) = status {
|
||||
s.set_current_cover(Some(crate::crawler::status::CoverTarget {
|
||||
manga_id: entry.manga_id,
|
||||
manga_title: manga.title.clone(),
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
let cover_result = download_and_store_cover(
|
||||
db,
|
||||
storage,
|
||||
http,
|
||||
@@ -626,8 +704,11 @@ pub async fn backfill_missing_covers(
|
||||
allowlist,
|
||||
max_image_bytes,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
if let Some(s) = status {
|
||||
s.set_current_cover(None).await;
|
||||
}
|
||||
match cover_result {
|
||||
Ok(()) => stats.fetched += 1,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
@@ -756,6 +837,18 @@ mod tests {
|
||||
assert!(!should_stop(false, UpsertStatus::New, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn abort_pass_fires_at_threshold_and_respects_disable() {
|
||||
// Disabled (0) never fires, no matter how many failures.
|
||||
assert!(!should_abort_pass(0, 0));
|
||||
assert!(!should_abort_pass(100, 0));
|
||||
// Below threshold: keep going.
|
||||
assert!(!should_abort_pass(9, 10));
|
||||
// At/above threshold: abort.
|
||||
assert!(should_abort_pass(10, 10));
|
||||
assert!(should_abort_pass(11, 10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clean_exit_when_walked_to_completion() {
|
||||
// End-of-walk reached the catalog tail — the recovery flag may
|
||||
|
||||
@@ -235,7 +235,7 @@ impl ResyncService for RealResyncService {
|
||||
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
|
||||
.await
|
||||
.context("look up chapter_sources for resync")?;
|
||||
let Some((manga_id, source_url)) = row else {
|
||||
let Some((manga_id, source_url, _title, _number)) = row else {
|
||||
return Err(ResyncError::NoChapterSource.into());
|
||||
};
|
||||
|
||||
@@ -257,6 +257,8 @@ impl ResyncService for RealResyncService {
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
self.tor.as_deref(),
|
||||
// Admin resync isn't a daemon worker slot — no live status.
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
drop(lease);
|
||||
|
||||
180
backend/src/crawler/session_control.rs
Normal file
180
backend/src/crawler/session_control.rs
Normal file
@@ -0,0 +1,180 @@
|
||||
//! Runtime-updatable crawler session (PHPSESSID).
|
||||
//!
|
||||
//! At startup the session comes from `CRAWLER_PHPSESSID`, but it expires
|
||||
//! and previously needed a container restart to refresh. This controller
|
||||
//! lets an admin push a fresh cookie at runtime: it rewrites the reqwest
|
||||
//! cookie jar (CDN image fetches), updates the in-memory value the browser
|
||||
//! `on_launch` hook reads, persists it to `crawler_state` (so it survives
|
||||
//! a restart), and clears the sticky `session_expired` flag. A subsequent
|
||||
//! coordinated browser restart re-runs `on_launch`, re-injecting the new
|
||||
//! cookie into Chromium and re-probing.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const STATE_KEY_RUNTIME_SESSION: &str = "runtime_session";
|
||||
|
||||
pub struct SessionController {
|
||||
/// Current PHPSESSID — what `on_launch` injects into a fresh browser.
|
||||
phpsessid: RwLock<Option<String>>,
|
||||
/// The same `Arc<Jar>` handed to the reqwest client; updating it here
|
||||
/// updates the client's cookies (the jar is internally mutable).
|
||||
cookie_jar: Arc<reqwest::cookie::Jar>,
|
||||
cookie_domain: Option<String>,
|
||||
start_url: Option<String>,
|
||||
db: PgPool,
|
||||
session_expired: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl SessionController {
|
||||
pub fn new(
|
||||
initial: Option<String>,
|
||||
cookie_jar: Arc<reqwest::cookie::Jar>,
|
||||
cookie_domain: Option<String>,
|
||||
start_url: Option<String>,
|
||||
db: PgPool,
|
||||
session_expired: Arc<AtomicBool>,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
phpsessid: RwLock::new(initial),
|
||||
cookie_jar,
|
||||
cookie_domain,
|
||||
start_url,
|
||||
db,
|
||||
session_expired,
|
||||
})
|
||||
}
|
||||
|
||||
/// The PHPSESSID a fresh browser should inject (None when unset).
|
||||
pub async fn current(&self) -> Option<String> {
|
||||
self.phpsessid.read().await.clone()
|
||||
}
|
||||
|
||||
/// Whether the sticky session-expired flag is set (chapter workers
|
||||
/// idle while true).
|
||||
pub fn is_expired(&self) -> bool {
|
||||
self.session_expired.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
/// Clear the session-expired flag without changing the cookie — used
|
||||
/// when the operator knows the session is fine and wants workers to
|
||||
/// resume immediately.
|
||||
pub fn clear_expired(&self) {
|
||||
self.session_expired.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Update the session everywhere: reqwest jar, in-memory value, and
|
||||
/// persisted `crawler_state`. Clears the session-expired flag. Does
|
||||
/// NOT relaunch the browser — the caller triggers a coordinated
|
||||
/// restart so `on_launch` re-injects + re-probes.
|
||||
pub async fn update(&self, sid: &str) -> anyhow::Result<()> {
|
||||
let sid = sid.trim().to_string();
|
||||
anyhow::ensure!(!sid.is_empty(), "PHPSESSID must not be empty");
|
||||
// The value is spliced into a cookie string and a CDP CookieParam.
|
||||
// Reject control chars and cookie delimiters so a pasted value
|
||||
// can't smuggle extra attributes / break out of the cookie.
|
||||
anyhow::ensure!(
|
||||
sid.chars().all(|c| !c.is_control() && c != ';' && c != ','),
|
||||
"PHPSESSID contains invalid characters"
|
||||
);
|
||||
|
||||
if let (Some(domain), Some(start_url)) = (&self.cookie_domain, &self.start_url) {
|
||||
let cookie_str = format!("PHPSESSID={sid}; Domain={domain}; Path=/");
|
||||
let seed_url =
|
||||
reqwest::Url::parse(start_url).context("parse start_url for cookie seed")?;
|
||||
self.cookie_jar.add_cookie_str(&cookie_str, &seed_url);
|
||||
}
|
||||
*self.phpsessid.write().await = Some(sid.clone());
|
||||
persist(&self.db, &sid).await.context("persist runtime session")?;
|
||||
self.session_expired.store(false, Ordering::Release);
|
||||
tracing::info!("crawler session updated at runtime");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read a persisted runtime session (if any) from `crawler_state`.
|
||||
/// Called at startup so a mid-day refresh survives a restart.
|
||||
pub async fn load_persisted(db: &PgPool) -> Option<String> {
|
||||
let row: Option<serde_json::Value> =
|
||||
sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1")
|
||||
.bind(STATE_KEY_RUNTIME_SESSION)
|
||||
.fetch_optional(db)
|
||||
.await
|
||||
.ok()
|
||||
.flatten();
|
||||
row.and_then(|v| {
|
||||
v.get("phpsessid")
|
||||
.and_then(|s| s.as_str())
|
||||
.map(|s| s.to_string())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn persist(db: &PgPool, sid: &str) -> sqlx::Result<()> {
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_state (key, value, updated_at) \
|
||||
VALUES ($1, $2, now()) \
|
||||
ON CONFLICT (key) DO UPDATE \
|
||||
SET value = EXCLUDED.value, updated_at = now()",
|
||||
)
|
||||
.bind(STATE_KEY_RUNTIME_SESSION)
|
||||
.bind(json!({ "phpsessid": sid }))
|
||||
.execute(db)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn controller(db: PgPool) -> Arc<SessionController> {
|
||||
SessionController::new(
|
||||
None,
|
||||
Arc::new(reqwest::cookie::Jar::default()),
|
||||
Some("example.com".into()),
|
||||
Some("https://example.com/".into()),
|
||||
db,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
)
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn update_rejects_empty_and_control_chars(pool: PgPool) {
|
||||
let c = controller(pool);
|
||||
assert!(c.update(" ").await.is_err(), "empty rejected");
|
||||
assert!(c.update("abc\r\ndef").await.is_err(), "CRLF rejected");
|
||||
assert!(c.update("ab;Domain=evil").await.is_err(), "semicolon rejected");
|
||||
assert!(c.update("x,y").await.is_err(), "comma rejected");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn update_persists_and_clears_expired_then_round_trips(pool: PgPool) {
|
||||
let c = controller(pool.clone());
|
||||
c.update("good-sid-123").await.unwrap();
|
||||
assert_eq!(c.current().await.as_deref(), Some("good-sid-123"));
|
||||
assert!(!c.is_expired(), "update clears the expired flag");
|
||||
// Persisted to crawler_state and readable by a fresh load.
|
||||
assert_eq!(
|
||||
SessionController::load_persisted(&pool).await.as_deref(),
|
||||
Some("good-sid-123")
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn clear_expired_flips_sticky_flag_without_touching_session(pool: PgPool) {
|
||||
// The flag starts `true` per `controller(pool)`'s test wiring.
|
||||
let c = controller(pool);
|
||||
assert!(c.is_expired(), "test fixture starts with the flag set");
|
||||
c.clear_expired();
|
||||
assert!(!c.is_expired(), "clear_expired flips the sticky flag to false");
|
||||
assert!(
|
||||
c.current().await.is_none(),
|
||||
"clear_expired does not invent a session"
|
||||
);
|
||||
}
|
||||
}
|
||||
355
backend/src/crawler/status.rs
Normal file
355
backend/src/crawler/status.rs
Normal file
@@ -0,0 +1,355 @@
|
||||
//! Live, in-process crawler status.
|
||||
//!
|
||||
//! The metadata pass runs inline in the cron tick (it is not a
|
||||
//! `crawler_jobs` row), so without this surface "what is the crawler doing
|
||||
//! right now" is unanswerable from the dashboard. The daemon publishes its
|
||||
//! current [`Phase`], the chapters being crawled right now (with a live
|
||||
//! page count), and the cover being fetched into a shared [`StatusHandle`];
|
||||
//! the admin endpoint reads a [`CrawlerStatus`] snapshot and composes it
|
||||
//! with DB-derived counts + the session/browser flags.
|
||||
//!
|
||||
//! NOTE: this is per-process state. The deployment is a single server
|
||||
//! (see CLAUDE.md), so an in-memory handle is sufficient; durable signals
|
||||
//! (last-pass summary, runtime session) are persisted in `crawler_state`.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
use tokio::sync::{watch, RwLock};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::crawler::pipeline::MetadataStats;
|
||||
|
||||
/// What the daemon's metadata pass is doing right now. Serialised with an
|
||||
/// internal `state` tag so the frontend can switch on it.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(tag = "state", rename_all = "snake_case")]
|
||||
pub enum Phase {
|
||||
/// Sleeping until the next scheduled metadata pass.
|
||||
Idle { next_fire: Option<DateTime<Utc>> },
|
||||
/// Walking the source catalog list pages.
|
||||
WalkingList,
|
||||
/// Fetching one manga's metadata. `index`/`total` drive a progress bar
|
||||
/// (`total` is `None` when the source size is unknown / uncapped).
|
||||
FetchingMetadata {
|
||||
index: usize,
|
||||
total: Option<usize>,
|
||||
title: String,
|
||||
},
|
||||
/// Backfilling covers that failed on first attempt. `index`/`total`
|
||||
/// track progress through this tick's batch.
|
||||
CoverBackfill { index: usize, total: usize },
|
||||
}
|
||||
|
||||
/// A chapter being downloaded right now, with a live page count. Keyed in
|
||||
/// the status by `chapter_id`; inserted by the dispatcher when a job starts
|
||||
/// and removed (via an RAII guard) when it finishes, panics, or times out.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ActiveChapter {
|
||||
pub manga_id: Uuid,
|
||||
pub manga_title: String,
|
||||
pub chapter_id: Uuid,
|
||||
pub chapter_number: i32,
|
||||
pub pages_done: usize,
|
||||
/// `None` until the chapter page list has been parsed.
|
||||
pub pages_total: Option<usize>,
|
||||
}
|
||||
|
||||
/// The manga whose cover is being downloaded right now.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct CoverTarget {
|
||||
pub manga_id: Uuid,
|
||||
pub manga_title: String,
|
||||
}
|
||||
|
||||
/// Summary of the most recent metadata pass (persisted across restarts in
|
||||
/// `crawler_state` by the cron; mirrored here for the live read).
|
||||
#[derive(Clone, Debug, Serialize, Default)]
|
||||
pub struct LastPass {
|
||||
pub at: Option<DateTime<Utc>>,
|
||||
pub discovered: usize,
|
||||
pub upserted: usize,
|
||||
pub covers_fetched: usize,
|
||||
pub mangas_failed: usize,
|
||||
}
|
||||
|
||||
/// A point-in-time snapshot returned by [`StatusHandle::snapshot`]. The
|
||||
/// session/browser/queue fields are composed at read time by the endpoint
|
||||
/// (they live elsewhere), so they are not stored here.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct CrawlerStatus {
|
||||
pub phase: Phase,
|
||||
/// Number of configured chapter workers (for "N busy / M workers").
|
||||
pub worker_count: usize,
|
||||
/// Chapters being downloaded right now, with live page counts.
|
||||
pub active_chapters: Vec<ActiveChapter>,
|
||||
pub last_pass: LastPass,
|
||||
/// The cover being downloaded right now, if any.
|
||||
pub current_cover: Option<CoverTarget>,
|
||||
}
|
||||
|
||||
/// Scalar status state held under the async `RwLock`. Active chapters live
|
||||
/// in a separate sync map so per-page updates and RAII removal don't need
|
||||
/// to `.await` (removal happens in `Drop`).
|
||||
#[derive(Clone, Debug)]
|
||||
struct Scalar {
|
||||
phase: Phase,
|
||||
worker_count: usize,
|
||||
last_pass: LastPass,
|
||||
current_cover: Option<CoverTarget>,
|
||||
}
|
||||
|
||||
/// Cloneable handle the daemon tasks use to publish status. Cheap to clone
|
||||
/// (`Arc`). All writers funnel through the helper methods so locking stays
|
||||
/// localised. Every mutation bumps a `watch` version so SSE subscribers
|
||||
/// get pushed an update instead of polling.
|
||||
#[derive(Clone)]
|
||||
pub struct StatusHandle {
|
||||
scalar: Arc<RwLock<Scalar>>,
|
||||
/// Currently-downloading chapters keyed by `chapter_id`. A sync mutex so
|
||||
/// the RAII [`ChapterGuard`]'s `Drop` can remove without `.await`.
|
||||
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
|
||||
/// Monotonic version bumped on every change. SSE handlers `subscribe()`
|
||||
/// and `await .changed()` for instant pushes; `watch` has no
|
||||
/// lost-wakeup so a change between snapshots is never missed.
|
||||
version: Arc<watch::Sender<u64>>,
|
||||
}
|
||||
|
||||
/// Lock the active map, recovering from a poisoned mutex (we never hold the
|
||||
/// lock across a panic-prone section, so the data is still consistent).
|
||||
fn lock_active(
|
||||
m: &Mutex<HashMap<Uuid, ActiveChapter>>,
|
||||
) -> std::sync::MutexGuard<'_, HashMap<Uuid, ActiveChapter>> {
|
||||
m.lock().unwrap_or_else(|e| e.into_inner())
|
||||
}
|
||||
|
||||
impl StatusHandle {
|
||||
pub fn new(num_workers: usize) -> Self {
|
||||
let (version, _rx) = watch::channel(0u64);
|
||||
Self {
|
||||
scalar: Arc::new(RwLock::new(Scalar {
|
||||
phase: Phase::Idle { next_fire: None },
|
||||
worker_count: num_workers.max(1),
|
||||
last_pass: LastPass::default(),
|
||||
current_cover: None,
|
||||
})),
|
||||
active: Arc::new(Mutex::new(HashMap::new())),
|
||||
version: Arc::new(version),
|
||||
}
|
||||
}
|
||||
|
||||
fn bump(&self) {
|
||||
self.version.send_modify(|v| *v = v.wrapping_add(1));
|
||||
}
|
||||
|
||||
/// A receiver whose `.changed()` resolves on the next status change.
|
||||
pub fn subscribe(&self) -> watch::Receiver<u64> {
|
||||
self.version.subscribe()
|
||||
}
|
||||
|
||||
/// Signal a change without mutating in-memory state — used when an
|
||||
/// *external* signal the live snapshot reflects (browser phase,
|
||||
/// session-expired flag, queue counts) has changed, so subscribers
|
||||
/// recompose promptly.
|
||||
pub fn poke(&self) {
|
||||
self.bump();
|
||||
}
|
||||
|
||||
pub async fn set_phase(&self, phase: Phase) {
|
||||
self.scalar.write().await.phase = phase;
|
||||
self.bump();
|
||||
}
|
||||
|
||||
/// Set (or clear) the cover being downloaded right now.
|
||||
pub async fn set_current_cover(&self, cover: Option<CoverTarget>) {
|
||||
self.scalar.write().await.current_cover = cover;
|
||||
self.bump();
|
||||
}
|
||||
|
||||
/// Register a chapter as crawling now; returns a guard that removes it
|
||||
/// when dropped (on completion, panic-unwind, or timeout-drop).
|
||||
pub fn begin_chapter(&self, chapter: ActiveChapter) -> ChapterGuard {
|
||||
let id = chapter.chapter_id;
|
||||
lock_active(&self.active).insert(id, chapter);
|
||||
self.bump();
|
||||
ChapterGuard {
|
||||
active: Arc::clone(&self.active),
|
||||
version: Arc::clone(&self.version),
|
||||
chapter_id: id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the live page count of an in-flight chapter. Sync (no
|
||||
/// `.await`) so it's cheap to call once per stored page.
|
||||
pub fn set_chapter_pages(&self, chapter_id: Uuid, done: usize, total: Option<usize>) {
|
||||
{
|
||||
let mut map = lock_active(&self.active);
|
||||
if let Some(c) = map.get_mut(&chapter_id) {
|
||||
c.pages_done = done;
|
||||
c.pages_total = total;
|
||||
}
|
||||
}
|
||||
self.bump();
|
||||
}
|
||||
|
||||
/// Record a finished metadata pass. Stamps `at` with `now`.
|
||||
pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime<Utc>) {
|
||||
self.scalar.write().await.last_pass = LastPass {
|
||||
at: Some(at),
|
||||
discovered: stats.discovered,
|
||||
upserted: stats.upserted,
|
||||
covers_fetched: stats.covers_fetched,
|
||||
mangas_failed: stats.mangas_failed,
|
||||
};
|
||||
self.bump();
|
||||
}
|
||||
|
||||
/// Seed the last-pass summary from a persisted `crawler_state` value on
|
||||
/// startup so the dashboard isn't blank until the first tick.
|
||||
pub async fn set_last_pass(&self, last: LastPass) {
|
||||
self.scalar.write().await.last_pass = last;
|
||||
self.bump();
|
||||
}
|
||||
|
||||
pub async fn snapshot(&self) -> CrawlerStatus {
|
||||
let scalar = self.scalar.read().await.clone();
|
||||
let mut active_chapters: Vec<ActiveChapter> =
|
||||
lock_active(&self.active).values().cloned().collect();
|
||||
// Stable, readable order: by chapter number then id.
|
||||
active_chapters.sort_by(|a, b| {
|
||||
a.chapter_number
|
||||
.cmp(&b.chapter_number)
|
||||
.then(a.chapter_id.cmp(&b.chapter_id))
|
||||
});
|
||||
CrawlerStatus {
|
||||
phase: scalar.phase,
|
||||
worker_count: scalar.worker_count,
|
||||
active_chapters,
|
||||
last_pass: scalar.last_pass,
|
||||
current_cover: scalar.current_cover,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII handle removing an [`ActiveChapter`] from the live status when the
|
||||
/// chapter dispatch finishes, panics, or is dropped on timeout.
|
||||
pub struct ChapterGuard {
|
||||
active: Arc<Mutex<HashMap<Uuid, ActiveChapter>>>,
|
||||
version: Arc<watch::Sender<u64>>,
|
||||
chapter_id: Uuid,
|
||||
}
|
||||
|
||||
impl Drop for ChapterGuard {
|
||||
fn drop(&mut self) {
|
||||
lock_active(&self.active).remove(&self.chapter_id);
|
||||
self.version.send_modify(|v| *v = v.wrapping_add(1));
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn sample_chapter(n: i32) -> ActiveChapter {
|
||||
ActiveChapter {
|
||||
manga_id: Uuid::new_v4(),
|
||||
manga_title: "M".into(),
|
||||
chapter_id: Uuid::new_v4(),
|
||||
chapter_number: n,
|
||||
pages_done: 0,
|
||||
pages_total: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn begin_chapter_shows_in_snapshot_and_guard_removes_on_drop() {
|
||||
let h = StatusHandle::new(2);
|
||||
let chap = sample_chapter(7);
|
||||
let cid = chap.chapter_id;
|
||||
{
|
||||
let _guard = h.begin_chapter(chap);
|
||||
let snap = h.snapshot().await;
|
||||
assert_eq!(snap.active_chapters.len(), 1);
|
||||
assert_eq!(snap.active_chapters[0].chapter_id, cid);
|
||||
assert_eq!(snap.worker_count, 2);
|
||||
}
|
||||
// Guard dropped → entry removed.
|
||||
let snap = h.snapshot().await;
|
||||
assert!(snap.active_chapters.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_chapter_pages_updates_live_count() {
|
||||
let h = StatusHandle::new(1);
|
||||
let chap = sample_chapter(1);
|
||||
let cid = chap.chapter_id;
|
||||
let _guard = h.begin_chapter(chap);
|
||||
h.set_chapter_pages(cid, 3, Some(20));
|
||||
let snap = h.snapshot().await;
|
||||
assert_eq!(snap.active_chapters[0].pages_done, 3);
|
||||
assert_eq!(snap.active_chapters[0].pages_total, Some(20));
|
||||
// Updating an unknown chapter is a no-op, not a panic.
|
||||
h.set_chapter_pages(Uuid::new_v4(), 9, Some(9));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn snapshot_sorts_active_chapters_by_number() {
|
||||
let h = StatusHandle::new(2);
|
||||
let _g1 = h.begin_chapter(sample_chapter(5));
|
||||
let _g2 = h.begin_chapter(sample_chapter(2));
|
||||
let snap = h.snapshot().await;
|
||||
assert_eq!(snap.active_chapters[0].chapter_number, 2);
|
||||
assert_eq!(snap.active_chapters[1].chapter_number, 5);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_current_cover_round_trips() {
|
||||
let h = StatusHandle::new(1);
|
||||
let mid = Uuid::new_v4();
|
||||
h.set_current_cover(Some(CoverTarget {
|
||||
manga_id: mid,
|
||||
manga_title: "One Piece".into(),
|
||||
}))
|
||||
.await;
|
||||
assert_eq!(
|
||||
h.snapshot().await.current_cover.map(|c| c.manga_id),
|
||||
Some(mid)
|
||||
);
|
||||
h.set_current_cover(None).await;
|
||||
assert!(h.snapshot().await.current_cover.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_pass_captures_stats_and_timestamp() {
|
||||
let h = StatusHandle::new(1);
|
||||
let stats = MetadataStats {
|
||||
discovered: 5,
|
||||
upserted: 3,
|
||||
covers_fetched: 2,
|
||||
mangas_failed: 1,
|
||||
};
|
||||
let at = Utc::now();
|
||||
h.record_pass(&stats, at).await;
|
||||
let snap = h.snapshot().await;
|
||||
assert_eq!(snap.last_pass.discovered, 5);
|
||||
assert_eq!(snap.last_pass.upserted, 3);
|
||||
assert_eq!(snap.last_pass.at, Some(at));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_resolves_on_mutation_poke_and_chapter_change() {
|
||||
let h = StatusHandle::new(1);
|
||||
let mut rx = h.subscribe();
|
||||
h.set_phase(Phase::WalkingList).await;
|
||||
rx.changed().await.unwrap();
|
||||
h.poke();
|
||||
rx.changed().await.unwrap();
|
||||
// begin_chapter + guard drop each bump the version.
|
||||
let g = h.begin_chapter(sample_chapter(1));
|
||||
rx.changed().await.unwrap();
|
||||
drop(g);
|
||||
rx.changed().await.unwrap();
|
||||
}
|
||||
}
|
||||
@@ -138,14 +138,18 @@ pub async fn page_count(pool: &PgPool, id: Uuid) -> sqlx::Result<Option<i32>> {
|
||||
/// filter — this resolver stays in lockstep so a chapter that was
|
||||
/// dropped between enqueue and lease isn't dispatched against a stale
|
||||
/// URL.
|
||||
/// Returns `(manga_id, source_url, manga_title, chapter_number)`. The
|
||||
/// title + number feed the live "currently crawling" status; the rest is
|
||||
/// what the dispatcher needs to do the work.
|
||||
pub async fn dispatch_target(
|
||||
pool: &PgPool,
|
||||
chapter_id: Uuid,
|
||||
) -> sqlx::Result<Option<(Uuid, String)>> {
|
||||
) -> sqlx::Result<Option<(Uuid, String, String, i32)>> {
|
||||
sqlx::query_as(
|
||||
"SELECT c.manga_id, cs.source_url \
|
||||
"SELECT c.manga_id, cs.source_url, m.title, c.number \
|
||||
FROM chapters c \
|
||||
JOIN chapter_sources cs ON cs.chapter_id = c.id \
|
||||
JOIN mangas m ON m.id = c.manga_id \
|
||||
WHERE c.id = $1 \
|
||||
AND cs.dropped_at IS NULL \
|
||||
ORDER BY cs.last_seen_at DESC \
|
||||
|
||||
@@ -17,8 +17,9 @@
|
||||
//! Each public function is a transaction boundary so a partial failure
|
||||
//! mid-call leaves the DB in its pre-call state.
|
||||
|
||||
use chrono::Utc;
|
||||
use sqlx::{PgPool, Postgres, Transaction};
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
use sqlx::{FromRow, PgPool, Postgres, Transaction};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::crawler::source::{SourceChapterRef, SourceManga};
|
||||
@@ -618,3 +619,327 @@ pub async fn last_run_completed_cleanly(
|
||||
.unwrap_or(true))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Dead-letter jobs: admin observability + requeue.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// A `dead` crawler job joined to its chapter/manga context for the admin
|
||||
/// dead-letter view. Chapter columns are `Option` because the join is
|
||||
/// best-effort (the chapter may have been removed since the job died, or
|
||||
/// the job may be a non-chapter kind).
|
||||
#[derive(Debug, Clone, Serialize, FromRow)]
|
||||
pub struct DeadJob {
|
||||
pub id: Uuid,
|
||||
pub kind: String,
|
||||
pub chapter_id: Option<Uuid>,
|
||||
pub manga_id: Option<Uuid>,
|
||||
pub manga_title: Option<String>,
|
||||
pub chapter_number: Option<i32>,
|
||||
pub attempts: i32,
|
||||
pub max_attempts: i32,
|
||||
pub last_error: Option<String>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Paginated list of `dead` jobs, newest-failed first, joined to chapter +
|
||||
/// manga context. `search` filters on manga title (case-insensitive
|
||||
/// substring). Returns the page slice plus the unfiltered-by-page total.
|
||||
pub async fn list_dead_jobs(
|
||||
pool: &PgPool,
|
||||
search: Option<&str>,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> sqlx::Result<(Vec<DeadJob>, i64)> {
|
||||
let search_pat = search
|
||||
.map(|s| format!("%{}%", s.trim()))
|
||||
.filter(|p| p.len() > 2);
|
||||
|
||||
let items: Vec<DeadJob> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT
|
||||
cj.id,
|
||||
cj.payload->>'kind' AS kind,
|
||||
(cj.payload->>'chapter_id')::uuid AS chapter_id,
|
||||
c.manga_id AS manga_id,
|
||||
m.title AS manga_title,
|
||||
c.number AS chapter_number,
|
||||
cj.attempts,
|
||||
cj.max_attempts,
|
||||
cj.last_error,
|
||||
cj.updated_at
|
||||
FROM crawler_jobs cj
|
||||
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
|
||||
LEFT JOIN mangas m ON m.id = c.manga_id
|
||||
WHERE cj.state = 'dead'
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
ORDER BY cj.updated_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COUNT(*)
|
||||
FROM crawler_jobs cj
|
||||
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
|
||||
LEFT JOIN mangas m ON m.id = c.manga_id
|
||||
WHERE cj.state = 'dead'
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok((items, total))
|
||||
}
|
||||
|
||||
/// An in-flight chapter-content job (`pending` or `running`) joined to its
|
||||
/// chapter + manga, for the "queued chapters" admin view.
|
||||
#[derive(Debug, Clone, Serialize, FromRow)]
|
||||
pub struct ActiveJob {
|
||||
pub id: Uuid,
|
||||
pub chapter_id: Option<Uuid>,
|
||||
pub manga_id: Option<Uuid>,
|
||||
pub manga_title: Option<String>,
|
||||
pub chapter_number: Option<i32>,
|
||||
/// `"pending"` or `"running"`.
|
||||
pub state: String,
|
||||
pub attempts: i32,
|
||||
pub max_attempts: i32,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Paginated list of `pending`/`running` chapter-content jobs (which
|
||||
/// chapters of which mangas are queued or being crawled). Running first,
|
||||
/// then by scheduled order. `search` filters on manga title.
|
||||
pub async fn list_active_jobs(
|
||||
pool: &PgPool,
|
||||
search: Option<&str>,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> sqlx::Result<(Vec<ActiveJob>, i64)> {
|
||||
let search_pat = search
|
||||
.map(|s| format!("%{}%", s.trim()))
|
||||
.filter(|p| p.len() > 2);
|
||||
|
||||
let items: Vec<ActiveJob> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT
|
||||
cj.id,
|
||||
(cj.payload->>'chapter_id')::uuid AS chapter_id,
|
||||
c.manga_id AS manga_id,
|
||||
m.title AS manga_title,
|
||||
c.number AS chapter_number,
|
||||
cj.state,
|
||||
cj.attempts,
|
||||
cj.max_attempts,
|
||||
cj.updated_at
|
||||
FROM crawler_jobs cj
|
||||
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
|
||||
LEFT JOIN mangas m ON m.id = c.manga_id
|
||||
WHERE cj.state IN ('pending','running')
|
||||
AND cj.payload->>'kind' = 'sync_chapter_content'
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
ORDER BY (cj.state = 'running') DESC, cj.scheduled_at, cj.created_at
|
||||
LIMIT $2 OFFSET $3
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COUNT(*)
|
||||
FROM crawler_jobs cj
|
||||
LEFT JOIN chapters c ON c.id = (cj.payload->>'chapter_id')::uuid
|
||||
LEFT JOIN mangas m ON m.id = c.manga_id
|
||||
WHERE cj.state IN ('pending','running')
|
||||
AND cj.payload->>'kind' = 'sync_chapter_content'
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok((items, total))
|
||||
}
|
||||
|
||||
/// A manga whose cover is still missing (queued for cover fetch).
|
||||
#[derive(Debug, Clone, Serialize, FromRow)]
|
||||
pub struct MissingCoverRow {
|
||||
pub manga_id: Uuid,
|
||||
pub manga_title: String,
|
||||
}
|
||||
|
||||
/// Count mangas with no cover yet but a live source row — the cover
|
||||
/// backlog the metadata pass + backfill drain.
|
||||
pub async fn count_missing_covers(pool: &PgPool) -> sqlx::Result<i64> {
|
||||
sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COUNT(*) FROM mangas m
|
||||
WHERE m.cover_image_path IS NULL
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM manga_sources ms
|
||||
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Paginated list of mangas queued for a cover fetch (no cover yet + a live
|
||||
/// source), with titles. `search` filters on title. Freshest source first.
|
||||
pub async fn list_missing_cover_mangas(
|
||||
pool: &PgPool,
|
||||
search: Option<&str>,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> sqlx::Result<(Vec<MissingCoverRow>, i64)> {
|
||||
let search_pat = search
|
||||
.map(|s| format!("%{}%", s.trim()))
|
||||
.filter(|p| p.len() > 2);
|
||||
|
||||
let items: Vec<MissingCoverRow> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT m.id AS manga_id, m.title AS manga_title
|
||||
FROM mangas m
|
||||
WHERE m.cover_image_path IS NULL
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM manga_sources ms
|
||||
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
|
||||
)
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
ORDER BY m.updated_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
let total: i64 = sqlx::query_scalar(
|
||||
r#"
|
||||
SELECT COUNT(*) FROM mangas m
|
||||
WHERE m.cover_image_path IS NULL
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM manga_sources ms
|
||||
WHERE ms.manga_id = m.id AND ms.dropped_at IS NULL
|
||||
)
|
||||
AND ($1::text IS NULL OR m.title ILIKE $1)
|
||||
"#,
|
||||
)
|
||||
.bind(&search_pat)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok((items, total))
|
||||
}
|
||||
|
||||
/// Scope of a dead-job requeue.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RequeueScope {
|
||||
/// Every dead job.
|
||||
All,
|
||||
/// Dead jobs whose chapter belongs to this manga.
|
||||
Manga(Uuid),
|
||||
/// Dead jobs for a single chapter.
|
||||
Chapter(Uuid),
|
||||
/// A single dead job by its id.
|
||||
Job(Uuid),
|
||||
}
|
||||
|
||||
/// Requeue dead jobs back to `pending` with a fresh attempt budget. This is
|
||||
/// an explicit operator override, so it bypasses the dead-letter quarantine
|
||||
/// the enqueue helpers honour (we act directly on the row). Returns the
|
||||
/// number of rows requeued.
|
||||
///
|
||||
/// Two invariants protect the partial unique dedup index
|
||||
/// `crawler_jobs_chapter_content_dedup_idx` (one `pending|running`
|
||||
/// sync_chapter_content job per chapter):
|
||||
/// 1. A chapter that already has a live (`pending|running`) job is
|
||||
/// skipped entirely (`NO_LIVE_DUP`).
|
||||
/// 2. When a chapter has *multiple* dead jobs, only the newest is
|
||||
/// revived (`DISTINCT ON` the chapter key) — without this, flipping
|
||||
/// two dead rows for the same chapter to `pending` in one statement
|
||||
/// would violate the index and abort the whole requeue. Non-chapter
|
||||
/// jobs fall back to their row id so each stays distinct.
|
||||
pub async fn requeue_dead_jobs(pool: &PgPool, scope: RequeueScope) -> sqlx::Result<u64> {
|
||||
// Scope predicate spliced into the `pick` CTE. Only compile-time
|
||||
// literals are interpolated; all values are bound below.
|
||||
let scope_pred: &str = match scope {
|
||||
RequeueScope::All => "",
|
||||
RequeueScope::Manga(_) => {
|
||||
"AND (cj.payload->>'chapter_id')::uuid IN \
|
||||
(SELECT id FROM chapters WHERE manga_id = $1)"
|
||||
}
|
||||
RequeueScope::Chapter(_) => "AND (cj.payload->>'chapter_id')::uuid = $1",
|
||||
RequeueScope::Job(_) => "AND cj.id = $1",
|
||||
};
|
||||
|
||||
let sql = format!(
|
||||
r#"
|
||||
WITH pick AS (
|
||||
SELECT DISTINCT ON (COALESCE(cj.payload->>'chapter_id', cj.id::text)) cj.id
|
||||
FROM crawler_jobs cj
|
||||
WHERE cj.state = 'dead'
|
||||
{scope_pred}
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM crawler_jobs live
|
||||
WHERE live.payload->>'kind' = 'sync_chapter_content'
|
||||
AND live.payload->>'chapter_id' = cj.payload->>'chapter_id'
|
||||
AND live.state IN ('pending','running')
|
||||
)
|
||||
ORDER BY COALESCE(cj.payload->>'chapter_id', cj.id::text), cj.updated_at DESC
|
||||
)
|
||||
UPDATE crawler_jobs
|
||||
SET state = 'pending', attempts = 0, leased_until = NULL,
|
||||
last_error = NULL, scheduled_at = now(), updated_at = now()
|
||||
FROM pick
|
||||
WHERE crawler_jobs.id = pick.id
|
||||
"#
|
||||
);
|
||||
|
||||
let mut q = sqlx::query(&sql);
|
||||
match scope {
|
||||
RequeueScope::All => {}
|
||||
RequeueScope::Manga(id) | RequeueScope::Chapter(id) | RequeueScope::Job(id) => {
|
||||
q = q.bind(id);
|
||||
}
|
||||
}
|
||||
Ok(q.execute(pool).await?.rows_affected())
|
||||
}
|
||||
|
||||
/// Count crawler jobs grouped by state — drives the dashboard queue
|
||||
/// gauges. Returns `(pending, running, dead)`.
|
||||
pub async fn job_state_counts(pool: &PgPool) -> sqlx::Result<(i64, i64, i64)> {
|
||||
let rows: Vec<(String, i64)> =
|
||||
sqlx::query_as("SELECT state, COUNT(*) FROM crawler_jobs GROUP BY state")
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
let mut pending = 0;
|
||||
let mut running = 0;
|
||||
let mut dead = 0;
|
||||
for (state, n) in rows {
|
||||
match state.as_str() {
|
||||
"pending" => pending = n,
|
||||
"running" => running = n,
|
||||
"dead" => dead = n,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Ok((pending, running, dead))
|
||||
}
|
||||
|
||||
|
||||
344
backend/tests/api_admin_crawler.rs
Normal file
344
backend/tests/api_admin_crawler.rs
Normal file
@@ -0,0 +1,344 @@
|
||||
//! Integration tests for the admin crawler observability/control API.
|
||||
//!
|
||||
//! The default test harness wires `AppState.crawler = None` (no daemon),
|
||||
//! so the *control* endpoints return 503 and the *read* endpoints that
|
||||
//! work off the DB (status shell, dead-jobs list/requeue) still function.
|
||||
//! This is exactly the production "daemon disabled" posture.
|
||||
|
||||
mod common;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::http::StatusCode;
|
||||
use axum::Router;
|
||||
use http_body_util::BodyExt;
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use tower::ServiceExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use common::{body_json, get, get_with_cookie, post_json_with_cookie, register_user, harness};
|
||||
|
||||
async fn seed_admin(pool: &PgPool, app: &Router) -> String {
|
||||
let (username, cookie) = register_user(app).await;
|
||||
let u = mangalord::repo::user::find_by_username(pool, &username)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
mangalord::repo::user::set_is_admin_unchecked(pool, u.id, true)
|
||||
.await
|
||||
.unwrap();
|
||||
cookie
|
||||
}
|
||||
|
||||
async fn seed_dead_job(pool: &PgPool, title: &str) -> Uuid {
|
||||
let manga_id = Uuid::new_v4();
|
||||
let chapter_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
|
||||
.bind(chapter_id)
|
||||
.bind(manga_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let job_id = Uuid::new_v4();
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \
|
||||
VALUES ($1, $2, 'dead', 5, 'boom')",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id,
|
||||
"source_chapter_key": "k",
|
||||
}))
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
job_id
|
||||
}
|
||||
|
||||
/// Seed a chapter-content job in a given state ('pending'/'running').
|
||||
async fn seed_job(pool: &PgPool, title: &str, state: &str) {
|
||||
let manga_id = Uuid::new_v4();
|
||||
let chapter_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, 1)")
|
||||
.bind(chapter_id)
|
||||
.bind(manga_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO crawler_jobs (id, payload, state) VALUES ($1, $2, $3)")
|
||||
.bind(Uuid::new_v4())
|
||||
.bind(json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id,
|
||||
"source_chapter_key": "k",
|
||||
}))
|
||||
.bind(state)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Seed a manga with no cover + a live source row (queued for cover fetch).
|
||||
async fn seed_missing_cover(pool: &PgPool, title: &str) {
|
||||
let manga_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target','T','http://x') ON CONFLICT DO NOTHING")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
|
||||
VALUES ('target', $1, $2, 'http://x/m')",
|
||||
)
|
||||
.bind(format!("k-{manga_id}"))
|
||||
.bind(manga_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn active_jobs_and_covers_lists_over_http(pool: PgPool) {
|
||||
seed_job(&pool, "Naruto", "pending").await;
|
||||
seed_job(&pool, "Bleach", "running").await;
|
||||
seed_missing_cover(&pool, "One Piece").await;
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
// Queued/active chapters.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["page"]["total"], 2);
|
||||
|
||||
// Queued covers.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/covers", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["page"]["total"], 1);
|
||||
assert_eq!(body["items"][0]["manga_title"], "One Piece");
|
||||
|
||||
// Both are admin-gated.
|
||||
let (_u, plain) = register_user(&h.app).await;
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/active-jobs", &plain))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn get_status_requires_admin(pool: PgPool) {
|
||||
let h = harness(pool);
|
||||
// Unauthenticated → 401.
|
||||
let resp = h.app.clone().oneshot(get("/api/v1/admin/crawler")).await.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
|
||||
// Authenticated non-admin → 403.
|
||||
let (_u, cookie) = register_user(&h.app).await;
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn get_status_reports_disabled_daemon_with_queue_counts(pool: PgPool) {
|
||||
seed_dead_job(&pool, "Naruto").await;
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["daemon"], "disabled");
|
||||
assert_eq!(body["queue"]["dead"], 1);
|
||||
assert_eq!(body["browser"], "down");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) {
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
for uri in [
|
||||
"/api/v1/admin/crawler/run",
|
||||
"/api/v1/admin/crawler/browser/restart",
|
||||
"/api/v1/admin/crawler/session/clear-expired",
|
||||
] {
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(post_json_with_cookie(uri, json!({}), &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
resp.status(),
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"{uri} should be 503 when daemon disabled"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn status_stream_requires_admin(pool: PgPool) {
|
||||
let h = harness(pool);
|
||||
// Unauthenticated → 401.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get("/api/v1/admin/crawler/stream"))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
// Non-admin → 403.
|
||||
let (_u, cookie) = register_user(&h.app).await;
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn status_stream_emits_initial_event(pool: PgPool) {
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let ct = resp
|
||||
.headers()
|
||||
.get(axum::http::header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
assert!(ct.starts_with("text/event-stream"), "content-type was {ct:?}");
|
||||
|
||||
// Accumulate frames (the immediate snapshot may arrive split across
|
||||
// frames) until the status payload appears, with an overall timeout so
|
||||
// the never-ending stream can't hang the test.
|
||||
let mut body = resp.into_body();
|
||||
let mut acc = String::new();
|
||||
let deadline = tokio::time::timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
let Some(frame) = body.frame().await else { break };
|
||||
if let Ok(data) = frame.expect("frame ok").into_data() {
|
||||
acc.push_str(&String::from_utf8_lossy(&data));
|
||||
if acc.contains("\"daemon\"") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
assert!(deadline.is_ok(), "did not receive status within 5s; got: {acc:?}");
|
||||
assert!(acc.contains("\"daemon\""), "missing status payload: {acc}");
|
||||
assert!(acc.contains("status"), "missing SSE event name: {acc}");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn mutating_endpoints_reject_non_admin(pool: PgPool) {
|
||||
let h = harness(pool);
|
||||
// A logged-in non-admin must be forbidden from a mutating endpoint.
|
||||
let (_u, cookie) = register_user(&h.app).await;
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(post_json_with_cookie(
|
||||
"/api/v1/admin/crawler/dead-jobs/requeue",
|
||||
json!({ "scope": "all" }),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn dead_jobs_list_and_requeue_over_http(pool: PgPool) {
|
||||
let job_id = seed_dead_job(&pool, "Bleach").await;
|
||||
let h = harness(pool.clone());
|
||||
let cookie = seed_admin(&pool, &h.app).await;
|
||||
|
||||
// List.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(get_with_cookie("/api/v1/admin/crawler/dead-jobs", &cookie))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["page"]["total"], 1);
|
||||
assert_eq!(body["items"][0]["manga_title"], "Bleach");
|
||||
|
||||
// Requeue the single job.
|
||||
let resp = h
|
||||
.app
|
||||
.clone()
|
||||
.oneshot(post_json_with_cookie(
|
||||
"/api/v1/admin/crawler/dead-jobs/requeue",
|
||||
json!({ "scope": "job", "job_id": job_id }),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = body_json(resp).await;
|
||||
assert_eq!(body["requeued"], 1);
|
||||
|
||||
let state: String = sqlx::query_scalar("SELECT state FROM crawler_jobs WHERE id = $1")
|
||||
.bind(job_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(state, "pending");
|
||||
}
|
||||
@@ -50,6 +50,7 @@ fn admin_test_router(pool: PgPool) -> (Router, TempDir) {
|
||||
upload: UploadConfig::default(),
|
||||
auth_limiter,
|
||||
resync: None,
|
||||
crawler: None,
|
||||
};
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", api::routes())
|
||||
|
||||
@@ -78,6 +78,7 @@ fn harness_with_auth_config(
|
||||
// handlers return 503 in this config. Tests that need a stub
|
||||
// resync service swap it in via `harness_with_resync`.
|
||||
resync: None,
|
||||
crawler: None,
|
||||
};
|
||||
Harness { app: router(state), _storage_dir: storage_dir }
|
||||
}
|
||||
@@ -152,6 +153,7 @@ pub fn harness_with_resync(
|
||||
},
|
||||
auth_limiter,
|
||||
resync: Some(resync),
|
||||
crawler: None,
|
||||
};
|
||||
Harness {
|
||||
app: router(state),
|
||||
|
||||
@@ -40,6 +40,8 @@ fn make_cfg(
|
||||
tz: Tz::UTC,
|
||||
retention_days: 7,
|
||||
session_expired,
|
||||
status: mangalord::crawler::status::StatusHandle::new(workers),
|
||||
job_timeout: Duration::from_secs(60),
|
||||
extra_tasks: Vec::new(),
|
||||
}
|
||||
}
|
||||
@@ -88,6 +90,52 @@ impl ChapterDispatcher for PanickingDispatcher {
|
||||
}
|
||||
}
|
||||
|
||||
/// Never completes — used to verify the worker's outer dispatch timeout.
|
||||
struct HangingDispatcher {
|
||||
seen: AtomicUsize,
|
||||
}
|
||||
#[async_trait::async_trait]
|
||||
impl ChapterDispatcher for HangingDispatcher {
|
||||
async fn dispatch(&self, _payload: JobPayload) -> anyhow::Result<SyncOutcome> {
|
||||
self.seen.fetch_add(1, Ordering::AcqRel);
|
||||
std::future::pending::<()>().await;
|
||||
unreachable!("hanging dispatcher never resolves");
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn worker_times_out_a_hung_dispatch_and_acks_failed(pool: PgPool) {
|
||||
enqueue_chapter_job(&pool).await;
|
||||
let dispatcher = Arc::new(HangingDispatcher {
|
||||
seen: AtomicUsize::new(0),
|
||||
});
|
||||
let session_expired = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
let cancel = CancellationToken::new();
|
||||
let mut cfg = make_cfg(None, dispatcher.clone(), session_expired, 1);
|
||||
cfg.job_timeout = Duration::from_millis(300);
|
||||
let handle = daemon::spawn(pool.clone(), cancel.clone(), cfg);
|
||||
|
||||
// The hung job should time out and return to pending with backoff
|
||||
// (attempts=1 < max=5). Poll for the recorded error.
|
||||
let mut timed_out = false;
|
||||
for _ in 0..40 {
|
||||
let n: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM crawler_jobs WHERE last_error = 'dispatch timed out'",
|
||||
)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
if n == 1 {
|
||||
timed_out = true;
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
handle.shutdown().await;
|
||||
assert!(timed_out, "hung dispatch must be acked failed with a timeout error");
|
||||
assert!(dispatcher.seen.load(Ordering::Acquire) >= 1);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn workers_drain_jobs_through_dispatcher(pool: PgPool) {
|
||||
enqueue_chapter_job(&pool).await;
|
||||
|
||||
304
backend/tests/crawler_dead_jobs.rs
Normal file
304
backend/tests/crawler_dead_jobs.rs
Normal file
@@ -0,0 +1,304 @@
|
||||
//! Integration tests for the dead-letter admin queries in
|
||||
//! `repo::crawler`: listing dead jobs with manga/chapter context and the
|
||||
//! scoped requeue (all / per-manga / single) used by the admin dashboard.
|
||||
|
||||
use mangalord::repo::crawler::{self, RequeueScope};
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Seed a manga with no cover + a live source row (so it's "queued for a
|
||||
/// cover fetch"). Returns the manga id.
|
||||
async fn seed_missing_cover(pool: &PgPool, title: &str) -> Uuid {
|
||||
let manga_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, $2, NULL)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO sources (id, name, base_url) VALUES ('target', 'T', 'http://x') ON CONFLICT DO NOTHING")
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query(
|
||||
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
|
||||
VALUES ('target', $1, $2, 'http://x/m')",
|
||||
)
|
||||
.bind(format!("k-{manga_id}"))
|
||||
.bind(manga_id)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
manga_id
|
||||
}
|
||||
|
||||
/// Seed a manga + chapter and return their ids.
|
||||
async fn seed_chapter(pool: &PgPool, title: &str, number: i32) -> (Uuid, Uuid) {
|
||||
let manga_id = Uuid::new_v4();
|
||||
let chapter_id = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title) VALUES ($1, $2)")
|
||||
.bind(manga_id)
|
||||
.bind(title)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("INSERT INTO chapters (id, manga_id, number) VALUES ($1, $2, $3)")
|
||||
.bind(chapter_id)
|
||||
.bind(manga_id)
|
||||
.bind(number)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
(manga_id, chapter_id)
|
||||
}
|
||||
|
||||
/// Insert a crawler_jobs row in a given state for a chapter-content job.
|
||||
async fn insert_job(pool: &PgPool, chapter_id: Uuid, state: &str, attempts: i32) -> Uuid {
|
||||
let id = Uuid::new_v4();
|
||||
let payload = json!({
|
||||
"kind": "sync_chapter_content",
|
||||
"source_id": "target",
|
||||
"chapter_id": chapter_id,
|
||||
"source_chapter_key": "k",
|
||||
});
|
||||
sqlx::query(
|
||||
"INSERT INTO crawler_jobs (id, payload, state, attempts, last_error) \
|
||||
VALUES ($1, $2, $3, $4, 'boom')",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(payload)
|
||||
.bind(state)
|
||||
.bind(attempts)
|
||||
.execute(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
id
|
||||
}
|
||||
|
||||
async fn state_of(pool: &PgPool, id: Uuid) -> String {
|
||||
sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1")
|
||||
.bind(id)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_dead_jobs_returns_context_and_total(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
|
||||
insert_job(&pool, c1, "dead", 5).await;
|
||||
// A non-dead job must not appear.
|
||||
let (_m2, c2) = seed_chapter(&pool, "Bleach", 1).await;
|
||||
insert_job(&pool, c2, "pending", 0).await;
|
||||
|
||||
let (items, total) = crawler::list_dead_jobs(&pool, None, 50, 0).await.unwrap();
|
||||
assert_eq!(total, 1);
|
||||
assert_eq!(items.len(), 1);
|
||||
let row = &items[0];
|
||||
assert_eq!(row.manga_title.as_deref(), Some("Naruto"));
|
||||
assert_eq!(row.chapter_number, Some(700));
|
||||
assert_eq!(row.attempts, 5);
|
||||
assert_eq!(row.last_error.as_deref(), Some("boom"));
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_dead_jobs_filters_by_title_search(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
|
||||
insert_job(&pool, c1, "dead", 5).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await;
|
||||
insert_job(&pool, c2, "dead", 5).await;
|
||||
|
||||
let (items, total) = crawler::list_dead_jobs(&pool, Some("piece"), 50, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(total, 1);
|
||||
assert_eq!(items[0].manga_title.as_deref(), Some("One Piece"));
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_all_resets_dead_jobs_to_pending(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
|
||||
let j1 = insert_job(&pool, c1, "dead", 5).await;
|
||||
let j2 = insert_job(&pool, c2, "dead", 5).await;
|
||||
|
||||
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(n, 2);
|
||||
assert_eq!(state_of(&pool, j1).await, "pending");
|
||||
assert_eq!(state_of(&pool, j2).await, "pending");
|
||||
let attempts: i32 = sqlx::query_scalar("SELECT attempts FROM crawler_jobs WHERE id = $1")
|
||||
.bind(j1)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(attempts, 0, "attempts reset on requeue");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_by_manga_scopes_to_that_manga(pool: PgPool) {
|
||||
let (m1, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
|
||||
let j1 = insert_job(&pool, c1, "dead", 5).await;
|
||||
let j2 = insert_job(&pool, c2, "dead", 5).await;
|
||||
|
||||
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Manga(m1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(n, 1);
|
||||
assert_eq!(state_of(&pool, j1).await, "pending");
|
||||
assert_eq!(state_of(&pool, j2).await, "dead", "other manga untouched");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_by_chapter_scopes_to_that_chapter(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "A", 2).await;
|
||||
let j1 = insert_job(&pool, c1, "dead", 5).await;
|
||||
let j2 = insert_job(&pool, c2, "dead", 5).await;
|
||||
|
||||
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Chapter(c1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(n, 1);
|
||||
assert_eq!(state_of(&pool, j1).await, "pending");
|
||||
assert_eq!(state_of(&pool, j2).await, "dead", "other chapter untouched");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_single_job(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
|
||||
let j1 = insert_job(&pool, c1, "dead", 5).await;
|
||||
let j2 = insert_job(&pool, c2, "dead", 5).await;
|
||||
|
||||
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::Job(j1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(n, 1);
|
||||
assert_eq!(state_of(&pool, j1).await, "pending");
|
||||
assert_eq!(state_of(&pool, j2).await, "dead");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_skips_dead_when_live_job_exists_for_same_chapter(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let dead = insert_job(&pool, c1, "dead", 5).await;
|
||||
// A live pending job for the SAME chapter already exists.
|
||||
insert_job(&pool, c1, "pending", 0).await;
|
||||
|
||||
let n = crawler::requeue_dead_jobs(&pool, RequeueScope::All)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(n, 0, "must not resurrect a dead job that has a live counterpart");
|
||||
assert_eq!(state_of(&pool, dead).await, "dead");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn requeue_with_two_dead_jobs_for_one_chapter_revives_one_not_500(pool: PgPool) {
|
||||
// Regression: two dead jobs for the SAME chapter must not both flip to
|
||||
// pending in one statement — that would violate the partial unique
|
||||
// dedup index and abort the whole requeue.
|
||||
let (manga_id, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let older = insert_job(&pool, c1, "dead", 5).await;
|
||||
let newer = insert_job(&pool, c1, "dead", 5).await;
|
||||
// Make `newer` unambiguously newer.
|
||||
sqlx::query("UPDATE crawler_jobs SET updated_at = now() - interval '1 hour' WHERE id = $1")
|
||||
.bind(older)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for scope in [RequeueScope::All, RequeueScope::Manga(manga_id), RequeueScope::Chapter(c1)] {
|
||||
// Reset to two-dead before each scope variant.
|
||||
sqlx::query("UPDATE crawler_jobs SET state = 'dead' WHERE id = ANY($1)")
|
||||
.bind(vec![older, newer])
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
let n = crawler::requeue_dead_jobs(&pool, scope)
|
||||
.await
|
||||
.expect("requeue must not error on duplicate dead jobs");
|
||||
assert_eq!(n, 1, "exactly one dead job per chapter is revived");
|
||||
// The newest one is the survivor; the other stays dead.
|
||||
assert_eq!(state_of(&pool, newer).await, "pending");
|
||||
assert_eq!(state_of(&pool, older).await, "dead");
|
||||
}
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_active_jobs_returns_pending_and_running_running_first(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "Naruto", 700).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "Bleach", 10).await;
|
||||
insert_job(&pool, c1, "pending", 0).await;
|
||||
insert_job(&pool, c2, "running", 1).await;
|
||||
// A dead + a done job must NOT appear.
|
||||
let (_m3, c3) = seed_chapter(&pool, "Gone", 1).await;
|
||||
insert_job(&pool, c3, "dead", 5).await;
|
||||
|
||||
let (items, total) = crawler::list_active_jobs(&pool, None, 50, 0).await.unwrap();
|
||||
assert_eq!(total, 2);
|
||||
assert_eq!(items.len(), 2);
|
||||
// Running first.
|
||||
assert_eq!(items[0].state, "running");
|
||||
assert_eq!(items[0].manga_title.as_deref(), Some("Bleach"));
|
||||
assert_eq!(items[1].state, "pending");
|
||||
assert_eq!(items[1].chapter_number, Some(700));
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_active_jobs_filters_by_title(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "Naruto", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "One Piece", 1).await;
|
||||
insert_job(&pool, c1, "pending", 0).await;
|
||||
insert_job(&pool, c2, "pending", 0).await;
|
||||
let (items, total) = crawler::list_active_jobs(&pool, Some("piece"), 50, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(total, 1);
|
||||
assert_eq!(items[0].manga_title.as_deref(), Some("One Piece"));
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn missing_covers_count_and_list(pool: PgPool) {
|
||||
seed_missing_cover(&pool, "Naruto").await;
|
||||
seed_missing_cover(&pool, "Bleach").await;
|
||||
// A manga WITH a cover must not be counted.
|
||||
let with_cover = Uuid::new_v4();
|
||||
sqlx::query("INSERT INTO mangas (id, title, cover_image_path) VALUES ($1, 'Done', 'k.jpg')")
|
||||
.bind(with_cover)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(crawler::count_missing_covers(&pool).await.unwrap(), 2);
|
||||
|
||||
let (items, total) = crawler::list_missing_cover_mangas(&pool, None, 50, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(total, 2);
|
||||
assert_eq!(items.len(), 2);
|
||||
|
||||
let (items, total) = crawler::list_missing_cover_mangas(&pool, Some("naru"), 50, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(total, 1);
|
||||
assert_eq!(items[0].manga_title, "Naruto");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn job_state_counts_groups_by_state(pool: PgPool) {
|
||||
let (_m, c1) = seed_chapter(&pool, "A", 1).await;
|
||||
let (_m2, c2) = seed_chapter(&pool, "B", 1).await;
|
||||
let (_m3, c3) = seed_chapter(&pool, "C", 1).await;
|
||||
insert_job(&pool, c1, "pending", 0).await;
|
||||
insert_job(&pool, c2, "dead", 5).await;
|
||||
insert_job(&pool, c3, "dead", 5).await;
|
||||
|
||||
let (pending, running, dead) = crawler::job_state_counts(&pool).await.unwrap();
|
||||
assert_eq!(pending, 1);
|
||||
assert_eq!(running, 0);
|
||||
assert_eq!(dead, 2);
|
||||
}
|
||||
@@ -185,6 +185,68 @@ async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPo
|
||||
assert!(leased_until > chrono::Utc::now());
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn renew_extends_leased_until_while_running(pool: PgPool) {
|
||||
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
EnqueueResult::Inserted(id) => id,
|
||||
EnqueueResult::Skipped => unreachable!(),
|
||||
};
|
||||
|
||||
// Lease with a short window, then collapse leased_until to the recent
|
||||
// past so the renew is unambiguously an extension.
|
||||
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(5))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(leases.len(), 1);
|
||||
sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 second' WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(still_owned, "renew on a running job returns true");
|
||||
|
||||
let leased_until: chrono::DateTime<chrono::Utc> =
|
||||
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
|
||||
.bind(id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(
|
||||
leased_until > chrono::Utc::now() + chrono::Duration::seconds(60),
|
||||
"leased_until pushed ~120s into the future"
|
||||
);
|
||||
assert_eq!(job_state(&pool, id).await, "running");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn renew_is_noop_once_job_no_longer_running(pool: PgPool) {
|
||||
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
EnqueueResult::Inserted(id) => id,
|
||||
EnqueueResult::Skipped => unreachable!(),
|
||||
};
|
||||
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
||||
.await
|
||||
.unwrap();
|
||||
// Job completes — heartbeat should now see it's no longer ours.
|
||||
jobs::ack_done(&pool, leases[0].id).await.unwrap();
|
||||
|
||||
let still_owned = jobs::renew(&pool, id, Duration::from_secs(120))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!still_owned, "renew on a non-running job returns false");
|
||||
assert_eq!(job_state(&pool, id).await, "done");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) {
|
||||
let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo"))
|
||||
|
||||
@@ -109,7 +109,7 @@ async fn dispatch_target_prefers_most_recent_live_source(pool: PgPool) {
|
||||
seed_chapter_with_two_live_sources(&pool).await;
|
||||
|
||||
let row = dispatch_target(&pool, chapter_id).await.unwrap();
|
||||
let (_manga_id, source_url) =
|
||||
let (_manga_id, source_url, _title, _number) =
|
||||
row.expect("two live sources should yield a dispatch target");
|
||||
assert_eq!(
|
||||
source_url, new_url,
|
||||
@@ -133,7 +133,7 @@ async fn dispatch_target_skips_dropped_sources(pool: PgPool) {
|
||||
.unwrap();
|
||||
|
||||
let row = dispatch_target(&pool, chapter_id).await.unwrap();
|
||||
let (_manga_id, source_url) =
|
||||
let (_manga_id, source_url, _title, _number) =
|
||||
row.expect("a single live source should still yield a dispatch target");
|
||||
assert!(
|
||||
source_url != new_url,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "mangalord-frontend",
|
||||
"version": "0.52.0",
|
||||
"version": "0.54.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
@@ -16,7 +16,17 @@ import {
|
||||
listAdminChapters,
|
||||
getSystemStats,
|
||||
resyncManga,
|
||||
resyncChapter
|
||||
resyncChapter,
|
||||
getCrawlerStatus,
|
||||
crawlerStatusStreamUrl,
|
||||
runCrawlerPass,
|
||||
restartCrawlerBrowser,
|
||||
updateCrawlerSession,
|
||||
clearCrawlerSessionExpired,
|
||||
listDeadJobs,
|
||||
requeueDeadJobs,
|
||||
listActiveJobs,
|
||||
listMissingCovers
|
||||
} from './admin';
|
||||
|
||||
function ok(body: unknown, status = 200): Response {
|
||||
@@ -329,3 +339,126 @@ describe('admin api client', () => {
|
||||
expect(got.pages).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe('admin crawler api client', () => {
|
||||
let fetchSpy: MockInstance<typeof globalThis.fetch>;
|
||||
beforeEach(() => {
|
||||
fetchSpy = vi.spyOn(globalThis, 'fetch');
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
const statusFixture = {
|
||||
daemon: 'running',
|
||||
phase: { state: 'fetching_metadata', index: 3, total: 10, title: 'One Piece' },
|
||||
worker_count: 2,
|
||||
active_chapters: [
|
||||
{
|
||||
manga_id: 'm-1',
|
||||
manga_title: 'Bleach',
|
||||
chapter_id: 'c-1',
|
||||
chapter_number: 12,
|
||||
pages_done: 4,
|
||||
pages_total: 20
|
||||
}
|
||||
],
|
||||
current_cover: { manga_id: 'm-2', manga_title: 'Naruto' },
|
||||
covers_queued: 7,
|
||||
last_pass: { at: null, discovered: 0, upserted: 0, covers_fetched: 0, mangas_failed: 0 },
|
||||
session: { expired: false, configured: true },
|
||||
browser: 'healthy',
|
||||
queue: { pending: 2, running: 1, dead: 4 }
|
||||
};
|
||||
|
||||
it('crawlerStatusStreamUrl points at the SSE endpoint under the API base', () => {
|
||||
expect(crawlerStatusStreamUrl()).toMatch(/\/v1\/admin\/crawler\/stream$/);
|
||||
});
|
||||
|
||||
it('getCrawlerStatus GETs /v1/admin/crawler with live chapter/cover fields', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok(statusFixture));
|
||||
const s = await getCrawlerStatus();
|
||||
expect(s.queue.dead).toBe(4);
|
||||
expect(s.phase?.state).toBe('fetching_metadata');
|
||||
expect(s.active_chapters[0].pages_done).toBe(4);
|
||||
expect(s.active_chapters[0].pages_total).toBe(20);
|
||||
expect(s.current_cover?.manga_title).toBe('Naruto');
|
||||
expect(s.covers_queued).toBe(7);
|
||||
const url = fetchSpy.mock.calls[0][0] as string;
|
||||
expect(url).toMatch(/\/v1\/admin\/crawler$/);
|
||||
});
|
||||
|
||||
it('listActiveJobs GETs /v1/admin/crawler/active-jobs with search', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(
|
||||
ok({ items: [], page: { limit: 20, offset: 0, total: 0 } })
|
||||
);
|
||||
await listActiveJobs({ search: 'bleach' });
|
||||
const url = fetchSpy.mock.calls[0][0] as string;
|
||||
expect(url).toMatch(/\/v1\/admin\/crawler\/active-jobs\?/);
|
||||
expect(url).toContain('search=bleach');
|
||||
});
|
||||
|
||||
it('listMissingCovers GETs /v1/admin/crawler/covers', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(
|
||||
ok({ items: [{ manga_id: 'm-1', manga_title: 'X' }], page: { limit: 20, offset: 0, total: 1 } })
|
||||
);
|
||||
const r = await listMissingCovers();
|
||||
expect(r.items[0].manga_title).toBe('X');
|
||||
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/covers$/);
|
||||
});
|
||||
|
||||
it('runCrawlerPass POSTs /v1/admin/crawler/run', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok({ started: true }));
|
||||
const r = await runCrawlerPass();
|
||||
expect(r.started).toBe(true);
|
||||
const init = fetchSpy.mock.calls[0][1] as RequestInit;
|
||||
expect(init.method).toBe('POST');
|
||||
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/run$/);
|
||||
});
|
||||
|
||||
it('restartCrawlerBrowser POSTs the restart endpoint', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok({ ok: true, error: null }));
|
||||
const r = await restartCrawlerBrowser();
|
||||
expect(r.ok).toBe(true);
|
||||
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/browser\/restart$/);
|
||||
});
|
||||
|
||||
it('updateCrawlerSession POSTs the phpsessid body', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok({ valid: true, error: null }));
|
||||
const r = await updateCrawlerSession('abc123');
|
||||
expect(r.valid).toBe(true);
|
||||
const init = fetchSpy.mock.calls[0][1] as RequestInit;
|
||||
expect(init.method).toBe('POST');
|
||||
expect(JSON.parse(init.body as string)).toEqual({ phpsessid: 'abc123' });
|
||||
});
|
||||
|
||||
it('clearCrawlerSessionExpired POSTs clear-expired', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok({ cleared: true }));
|
||||
const r = await clearCrawlerSessionExpired();
|
||||
expect(r.cleared).toBe(true);
|
||||
expect(fetchSpy.mock.calls[0][0]).toMatch(/\/v1\/admin\/crawler\/session\/clear-expired$/);
|
||||
});
|
||||
|
||||
it('listDeadJobs forwards search + pagination', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(
|
||||
ok({ items: [], page: { limit: 20, offset: 20, total: 0 } })
|
||||
);
|
||||
await listDeadJobs({ search: 'naruto', limit: 20, offset: 20 });
|
||||
const url = fetchSpy.mock.calls[0][0] as string;
|
||||
expect(url).toContain('search=naruto');
|
||||
expect(url).toContain('offset=20');
|
||||
});
|
||||
|
||||
it('requeueDeadJobs POSTs the scope body', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(ok({ requeued: 3 }));
|
||||
const r = await requeueDeadJobs({ scope: 'manga', manga_id: 'm-9' });
|
||||
expect(r.requeued).toBe(3);
|
||||
const init = fetchSpy.mock.calls[0][1] as RequestInit;
|
||||
expect(JSON.parse(init.body as string)).toEqual({ scope: 'manga', manga_id: 'm-9' });
|
||||
});
|
||||
|
||||
it('surfaces a 503 as ApiError', async () => {
|
||||
fetchSpy.mockResolvedValueOnce(envelope(503, 'service_unavailable', 'disabled'));
|
||||
await expect(runCrawlerPass()).rejects.toMatchObject({ status: 503 });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
// won't reach these routes). 403s thrown here propagate up to the
|
||||
// /admin layout, which renders the framework error page.
|
||||
|
||||
import { request, type Page } from './client';
|
||||
import { request, apiUrl, type Page } from './client';
|
||||
import type { User } from './auth';
|
||||
import type { MangaDetail } from './mangas';
|
||||
import type { Chapter } from './chapters';
|
||||
@@ -214,3 +214,172 @@ export async function resyncChapter(id: string): Promise<ChapterResyncResponse>
|
||||
{ method: 'POST' }
|
||||
);
|
||||
}
|
||||
|
||||
// ---- crawler observability + control ---------------------------------------
|
||||
|
||||
/** Current daemon activity. Discriminated on `state`. */
|
||||
export type CrawlerPhase =
|
||||
| { state: 'idle'; next_fire: string | null }
|
||||
| { state: 'walking_list' }
|
||||
| { state: 'fetching_metadata'; index: number; total: number | null; title: string }
|
||||
| { state: 'cover_backfill'; index: number; total: number };
|
||||
|
||||
/** A chapter being crawled right now, with a live page count. */
|
||||
export type ActiveChapter = {
|
||||
manga_id: string;
|
||||
manga_title: string;
|
||||
chapter_id: string;
|
||||
chapter_number: number;
|
||||
pages_done: number;
|
||||
pages_total: number | null;
|
||||
};
|
||||
|
||||
export type CrawlerLastPass = {
|
||||
at: string | null;
|
||||
discovered: number;
|
||||
upserted: number;
|
||||
covers_fetched: number;
|
||||
mangas_failed: number;
|
||||
};
|
||||
|
||||
export type CrawlerStatus = {
|
||||
daemon: 'running' | 'disabled';
|
||||
phase: CrawlerPhase | null;
|
||||
worker_count: number;
|
||||
active_chapters: ActiveChapter[];
|
||||
current_cover: { manga_id: string; manga_title: string } | null;
|
||||
covers_queued: number;
|
||||
last_pass: CrawlerLastPass;
|
||||
session: { expired: boolean; configured: boolean };
|
||||
browser: 'healthy' | 'draining' | 'restarting' | 'down';
|
||||
queue: { pending: number; running: number; dead: number };
|
||||
};
|
||||
|
||||
export async function getCrawlerStatus(): Promise<CrawlerStatus> {
|
||||
return request<CrawlerStatus>('/v1/admin/crawler');
|
||||
}
|
||||
|
||||
/** URL of the Server-Sent Events live-status stream. Open with
|
||||
* `new EventSource(...)` while the crawler page is mounted and close it on
|
||||
* navigate-away so the subscription is scoped to the active page. Each
|
||||
* message is a named `status` event whose `data` is a {@link CrawlerStatus}. */
|
||||
export function crawlerStatusStreamUrl(): string {
|
||||
return apiUrl('/v1/admin/crawler/stream');
|
||||
}
|
||||
|
||||
/** POST /v1/admin/crawler/run — trigger an out-of-cycle metadata pass. */
|
||||
export async function runCrawlerPass(): Promise<{ started: boolean }> {
|
||||
return request('/v1/admin/crawler/run', { method: 'POST' });
|
||||
}
|
||||
|
||||
/** POST /v1/admin/crawler/browser/restart — coordinated Chromium restart. */
|
||||
export async function restartCrawlerBrowser(): Promise<{ ok: boolean; error: string | null }> {
|
||||
return request('/v1/admin/crawler/browser/restart', { method: 'POST' });
|
||||
}
|
||||
|
||||
/** POST /v1/admin/crawler/session — refresh PHPSESSID and re-probe. */
|
||||
export async function updateCrawlerSession(
|
||||
phpsessid: string
|
||||
): Promise<{ valid: boolean; error: string | null }> {
|
||||
return request('/v1/admin/crawler/session', {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify({ phpsessid })
|
||||
});
|
||||
}
|
||||
|
||||
/** POST /v1/admin/crawler/session/clear-expired — resume idled workers. */
|
||||
export async function clearCrawlerSessionExpired(): Promise<{ cleared: boolean }> {
|
||||
return request('/v1/admin/crawler/session/clear-expired', { method: 'POST' });
|
||||
}
|
||||
|
||||
export type DeadJob = {
|
||||
id: string;
|
||||
kind: string;
|
||||
chapter_id: string | null;
|
||||
manga_id: string | null;
|
||||
manga_title: string | null;
|
||||
chapter_number: number | null;
|
||||
attempts: number;
|
||||
max_attempts: number;
|
||||
last_error: string | null;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type DeadJobsPage = { items: DeadJob[]; page: Page };
|
||||
|
||||
export async function listDeadJobs(opts?: {
|
||||
search?: string;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}): Promise<DeadJobsPage> {
|
||||
const params = new URLSearchParams();
|
||||
if (opts?.search) params.set('search', opts.search);
|
||||
if (opts?.limit != null) params.set('limit', String(opts.limit));
|
||||
if (opts?.offset != null) params.set('offset', String(opts.offset));
|
||||
const qs = params.toString();
|
||||
return request<DeadJobsPage>(`/v1/admin/crawler/dead-jobs${qs ? `?${qs}` : ''}`);
|
||||
}
|
||||
|
||||
/** Requeue scope: all dead jobs, one manga's, one chapter's, or a single job. */
|
||||
export type RequeueScope =
|
||||
| { scope: 'all' }
|
||||
| { scope: 'manga'; manga_id: string }
|
||||
| { scope: 'chapter'; chapter_id: string }
|
||||
| { scope: 'job'; job_id: string };
|
||||
|
||||
export async function requeueDeadJobs(scope: RequeueScope): Promise<{ requeued: number }> {
|
||||
return request('/v1/admin/crawler/dead-jobs/requeue', {
|
||||
method: 'POST',
|
||||
headers: { 'content-type': 'application/json' },
|
||||
body: JSON.stringify(scope)
|
||||
});
|
||||
}
|
||||
|
||||
/** A queued/running chapter-content job (which chapters are queued). */
|
||||
export type ActiveJob = {
|
||||
id: string;
|
||||
chapter_id: string | null;
|
||||
manga_id: string | null;
|
||||
manga_title: string | null;
|
||||
chapter_number: number | null;
|
||||
state: 'pending' | 'running';
|
||||
attempts: number;
|
||||
max_attempts: number;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type ActiveJobsPage = { items: ActiveJob[]; page: Page };
|
||||
|
||||
/** GET /v1/admin/crawler/active-jobs — which chapters of which mangas are
|
||||
* queued or running now. */
|
||||
export async function listActiveJobs(opts?: {
|
||||
search?: string;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}): Promise<ActiveJobsPage> {
|
||||
const params = new URLSearchParams();
|
||||
if (opts?.search) params.set('search', opts.search);
|
||||
if (opts?.limit != null) params.set('limit', String(opts.limit));
|
||||
if (opts?.offset != null) params.set('offset', String(opts.offset));
|
||||
const qs = params.toString();
|
||||
return request<ActiveJobsPage>(`/v1/admin/crawler/active-jobs${qs ? `?${qs}` : ''}`);
|
||||
}
|
||||
|
||||
/** A manga queued for a cover fetch (no cover yet + a live source). */
|
||||
export type MissingCover = { manga_id: string; manga_title: string };
|
||||
export type MissingCoversPage = { items: MissingCover[]; page: Page };
|
||||
|
||||
/** GET /v1/admin/crawler/covers — which manga covers are queued. */
|
||||
export async function listMissingCovers(opts?: {
|
||||
search?: string;
|
||||
limit?: number;
|
||||
offset?: number;
|
||||
}): Promise<MissingCoversPage> {
|
||||
const params = new URLSearchParams();
|
||||
if (opts?.search) params.set('search', opts.search);
|
||||
if (opts?.limit != null) params.set('limit', String(opts.limit));
|
||||
if (opts?.offset != null) params.set('offset', String(opts.offset));
|
||||
const qs = params.toString();
|
||||
return request<MissingCoversPage>(`/v1/admin/crawler/covers${qs ? `?${qs}` : ''}`);
|
||||
}
|
||||
|
||||
@@ -12,6 +12,15 @@ export function fileUrl(key: string): string {
|
||||
return `${BASE}/v1/files/${key}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds an API URL for non-`fetch` consumers (e.g. `EventSource` for SSE),
|
||||
* applying the same `VITE_API_BASE` prefix as `request()`. `path` is the
|
||||
* route after the base, e.g. `/v1/admin/crawler/stream`.
|
||||
*/
|
||||
export function apiUrl(path: string): string {
|
||||
return `${BASE}${path}`;
|
||||
}
|
||||
|
||||
export class ApiError extends Error {
|
||||
constructor(
|
||||
public readonly status: number,
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
{ href: '/admin', label: 'Overview' },
|
||||
{ href: '/admin/users', label: 'Users' },
|
||||
{ href: '/admin/mangas', label: 'Mangas' },
|
||||
{ href: '/admin/crawler', label: 'Crawler' },
|
||||
{ href: '/admin/system', label: 'System' }
|
||||
];
|
||||
</script>
|
||||
|
||||
838
frontend/src/routes/admin/crawler/+page.svelte
Normal file
838
frontend/src/routes/admin/crawler/+page.svelte
Normal file
@@ -0,0 +1,838 @@
|
||||
<script lang="ts">
|
||||
import { onMount, onDestroy } from 'svelte';
|
||||
import Modal from '$lib/components/Modal.svelte';
|
||||
import Pager from '$lib/components/Pager.svelte';
|
||||
import {
|
||||
getCrawlerStatus,
|
||||
crawlerStatusStreamUrl,
|
||||
runCrawlerPass,
|
||||
restartCrawlerBrowser,
|
||||
updateCrawlerSession,
|
||||
clearCrawlerSessionExpired,
|
||||
listDeadJobs,
|
||||
requeueDeadJobs,
|
||||
listActiveJobs,
|
||||
listMissingCovers,
|
||||
type CrawlerStatus,
|
||||
type CrawlerPhase,
|
||||
type DeadJob,
|
||||
type ActiveJob,
|
||||
type MissingCover,
|
||||
type RequeueScope
|
||||
} from '$lib/api/admin';
|
||||
|
||||
let status: CrawlerStatus | null = $state(null);
|
||||
let error: string | null = $state(null);
|
||||
let notice: string | null = $state(null);
|
||||
let live = $state(false);
|
||||
let source: EventSource | null = null;
|
||||
let busy = $state(false);
|
||||
|
||||
// Dead jobs
|
||||
let deadJobs: DeadJob[] = $state([]);
|
||||
let deadTotal = $state(0);
|
||||
let deadSearch = $state('');
|
||||
let deadPage = $state(1);
|
||||
const DEAD_LIMIT = 20;
|
||||
|
||||
// Queued chapters (pending/running)
|
||||
let activeJobs: ActiveJob[] = $state([]);
|
||||
let activeTotal = $state(0);
|
||||
let activeSearch = $state('');
|
||||
let activePage = $state(1);
|
||||
const ACTIVE_LIMIT = 20;
|
||||
|
||||
// Queued covers (mangas missing a cover)
|
||||
let covers: MissingCover[] = $state([]);
|
||||
let coversTotal = $state(0);
|
||||
let coversSearch = $state('');
|
||||
let coversPage = $state(1);
|
||||
const COVERS_LIMIT = 20;
|
||||
|
||||
// Modals
|
||||
let sessionModalOpen = $state(false);
|
||||
let restartModalOpen = $state(false);
|
||||
let phpsessid = $state('');
|
||||
let sessionResult: string | null = $state(null);
|
||||
|
||||
async function refresh() {
|
||||
try {
|
||||
status = await getCrawlerStatus();
|
||||
error = null;
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : 'refresh failed';
|
||||
}
|
||||
}
|
||||
|
||||
async function loadDeadJobs() {
|
||||
try {
|
||||
const resp = await listDeadJobs({
|
||||
search: deadSearch.trim() || undefined,
|
||||
limit: DEAD_LIMIT,
|
||||
offset: (deadPage - 1) * DEAD_LIMIT
|
||||
});
|
||||
deadJobs = resp.items;
|
||||
deadTotal = resp.page.total ?? resp.items.length;
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : 'failed to load dead jobs';
|
||||
}
|
||||
}
|
||||
|
||||
async function loadActiveJobs() {
|
||||
try {
|
||||
const resp = await listActiveJobs({
|
||||
search: activeSearch.trim() || undefined,
|
||||
limit: ACTIVE_LIMIT,
|
||||
offset: (activePage - 1) * ACTIVE_LIMIT
|
||||
});
|
||||
activeJobs = resp.items;
|
||||
activeTotal = resp.page.total ?? resp.items.length;
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : 'failed to load queued chapters';
|
||||
}
|
||||
}
|
||||
|
||||
async function loadCovers() {
|
||||
try {
|
||||
const resp = await listMissingCovers({
|
||||
search: coversSearch.trim() || undefined,
|
||||
limit: COVERS_LIMIT,
|
||||
offset: (coversPage - 1) * COVERS_LIMIT
|
||||
});
|
||||
covers = resp.items;
|
||||
coversTotal = resp.page.total ?? resp.items.length;
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : 'failed to load queued covers';
|
||||
}
|
||||
}
|
||||
|
||||
// Auto-refresh the (fetched, not streamed) backlog lists when the live
|
||||
// status shows the relevant counts moved — keeps the lists feeling live
|
||||
// without pushing big payloads over SSE. `$effect` re-runs when these
|
||||
// tracked values change.
|
||||
let lastQueueKey = $state('');
|
||||
let lastCoversKey = $state(-1);
|
||||
$effect(() => {
|
||||
const k = `${status?.queue.pending ?? 0}:${status?.queue.running ?? 0}`;
|
||||
if (k !== lastQueueKey) {
|
||||
lastQueueKey = k;
|
||||
loadActiveJobs();
|
||||
}
|
||||
});
|
||||
$effect(() => {
|
||||
const c = status?.covers_queued ?? -1;
|
||||
if (c !== lastCoversKey) {
|
||||
lastCoversKey = c;
|
||||
loadCovers();
|
||||
}
|
||||
});
|
||||
|
||||
// Live updates via Server-Sent Events instead of polling. The
|
||||
// EventSource is opened on mount and closed on destroy, so the
|
||||
// subscription exists only while this page is showing live data.
|
||||
function openStream() {
|
||||
const es = new EventSource(crawlerStatusStreamUrl(), { withCredentials: true });
|
||||
es.addEventListener('status', (e) => {
|
||||
try {
|
||||
status = JSON.parse((e as MessageEvent).data) as CrawlerStatus;
|
||||
error = null;
|
||||
live = true;
|
||||
} catch {
|
||||
// ignore a malformed frame; the next one will replace it
|
||||
}
|
||||
});
|
||||
es.onopen = () => {
|
||||
live = true;
|
||||
};
|
||||
es.onerror = () => {
|
||||
// The browser auto-reconnects; reflect the gap in the UI.
|
||||
live = false;
|
||||
};
|
||||
source = es;
|
||||
}
|
||||
|
||||
onMount(() => {
|
||||
// One-shot fetch for instant initial paint + resilience if SSE is
|
||||
// blocked; the stream then drives subsequent updates.
|
||||
refresh();
|
||||
loadDeadJobs();
|
||||
openStream();
|
||||
});
|
||||
onDestroy(() => {
|
||||
source?.close();
|
||||
source = null;
|
||||
});
|
||||
|
||||
async function withBusy(label: string, fn: () => Promise<void>) {
|
||||
busy = true;
|
||||
notice = null;
|
||||
error = null;
|
||||
try {
|
||||
await fn();
|
||||
} catch (e) {
|
||||
error = e instanceof Error ? e.message : `${label} failed`;
|
||||
} finally {
|
||||
busy = false;
|
||||
await refresh();
|
||||
}
|
||||
}
|
||||
|
||||
async function onRunPass() {
|
||||
await withBusy('run pass', async () => {
|
||||
await runCrawlerPass();
|
||||
notice = 'Metadata pass started.';
|
||||
});
|
||||
}
|
||||
|
||||
async function onConfirmRestart() {
|
||||
restartModalOpen = false;
|
||||
await withBusy('restart browser', async () => {
|
||||
const r = await restartCrawlerBrowser();
|
||||
notice = r.ok ? 'Browser restarted.' : `Restart failed: ${r.error ?? 'unknown'}`;
|
||||
});
|
||||
}
|
||||
|
||||
async function onSaveSession() {
|
||||
sessionResult = null;
|
||||
busy = true;
|
||||
try {
|
||||
const r = await updateCrawlerSession(phpsessid);
|
||||
sessionResult = r.valid
|
||||
? '✓ Session valid — workers resumed.'
|
||||
: `✕ Probe failed: ${r.error ?? 'unauthenticated'}`;
|
||||
if (r.valid) {
|
||||
sessionModalOpen = false;
|
||||
phpsessid = '';
|
||||
notice = 'Session updated.';
|
||||
}
|
||||
} catch (e) {
|
||||
sessionResult = e instanceof Error ? e.message : 'update failed';
|
||||
} finally {
|
||||
busy = false;
|
||||
await refresh();
|
||||
}
|
||||
}
|
||||
|
||||
async function onClearExpired() {
|
||||
await withBusy('clear expired', async () => {
|
||||
await clearCrawlerSessionExpired();
|
||||
notice = 'Session-expired flag cleared.';
|
||||
});
|
||||
}
|
||||
|
||||
async function requeue(scope: RequeueScope) {
|
||||
await withBusy('requeue', async () => {
|
||||
const r = await requeueDeadJobs(scope);
|
||||
notice = `Requeued ${r.requeued} job(s).`;
|
||||
await loadDeadJobs();
|
||||
});
|
||||
}
|
||||
|
||||
function onSearchDead() {
|
||||
deadPage = 1;
|
||||
loadDeadJobs();
|
||||
}
|
||||
|
||||
function onDeadPageChange(p: number) {
|
||||
deadPage = p;
|
||||
loadDeadJobs();
|
||||
}
|
||||
|
||||
function onSearchActive() {
|
||||
activePage = 1;
|
||||
loadActiveJobs();
|
||||
}
|
||||
function onActivePageChange(p: number) {
|
||||
activePage = p;
|
||||
loadActiveJobs();
|
||||
}
|
||||
function onSearchCovers() {
|
||||
coversPage = 1;
|
||||
loadCovers();
|
||||
}
|
||||
function onCoversPageChange(p: number) {
|
||||
coversPage = p;
|
||||
loadCovers();
|
||||
}
|
||||
|
||||
// ---- display helpers ----
|
||||
function phaseLabel(p: CrawlerPhase | null): string {
|
||||
if (!p) return 'Daemon disabled';
|
||||
switch (p.state) {
|
||||
case 'idle':
|
||||
return p.next_fire
|
||||
? `Idle — next pass ${new Date(p.next_fire).toLocaleString()}`
|
||||
: 'Idle';
|
||||
case 'walking_list':
|
||||
return 'Walking source list';
|
||||
case 'fetching_metadata':
|
||||
return `Fetching metadata · ${p.index}/${p.total ?? '?'} · ${p.title}`;
|
||||
case 'cover_backfill':
|
||||
return `Backfilling covers · ${p.index + 1}/${p.total}`;
|
||||
}
|
||||
}
|
||||
|
||||
function phasePercent(p: CrawlerPhase | null): number | null {
|
||||
if (p && p.state === 'fetching_metadata' && p.total && p.total > 0) {
|
||||
return Math.min(100, (p.index / p.total) * 100);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function sessionPill(s: CrawlerStatus): { cls: string; text: string } {
|
||||
if (s.daemon === 'disabled') return { cls: 'badge-not_downloaded', text: 'n/a' };
|
||||
if (s.session.expired) return { cls: 'badge-in_progress', text: 'Expired' };
|
||||
if (!s.session.configured) return { cls: 'badge-not_downloaded', text: 'Not set' };
|
||||
return { cls: 'badge-synced', text: 'OK' };
|
||||
}
|
||||
|
||||
function browserPill(s: CrawlerStatus): { cls: string; text: string } {
|
||||
switch (s.browser) {
|
||||
case 'healthy':
|
||||
return { cls: 'badge-synced', text: 'Up' };
|
||||
case 'draining':
|
||||
case 'restarting':
|
||||
return { cls: 'badge-in_progress', text: s.browser };
|
||||
default:
|
||||
return { cls: 'badge-not_downloaded', text: 'Down' };
|
||||
}
|
||||
}
|
||||
|
||||
const deadTotalPages = $derived(Math.max(1, Math.ceil(deadTotal / DEAD_LIMIT)));
|
||||
const activeTotalPages = $derived(Math.max(1, Math.ceil(activeTotal / ACTIVE_LIMIT)));
|
||||
const coversTotalPages = $derived(Math.max(1, Math.ceil(coversTotal / COVERS_LIMIT)));
|
||||
|
||||
function chapterPercent(c: { pages_done: number; pages_total: number | null }): number | null {
|
||||
return c.pages_total && c.pages_total > 0
|
||||
? Math.min(100, (c.pages_done / c.pages_total) * 100)
|
||||
: null;
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="titlebar">
|
||||
<h1>Crawler</h1>
|
||||
<span class="livedot" class:on={live} title={live ? 'Live (SSE)' : 'Reconnecting…'}>
|
||||
{live ? '● live' : '○ reconnecting…'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
{#if error}
|
||||
<p class="error" role="alert">{error}</p>
|
||||
{/if}
|
||||
{#if notice}
|
||||
<p class="notice" role="status">{notice}</p>
|
||||
{/if}
|
||||
|
||||
{#if status}
|
||||
<!-- Status hero -->
|
||||
<section class="hero" data-testid="crawler-hero">
|
||||
<div class="pills">
|
||||
<span class="pill"
|
||||
>Daemon
|
||||
<span class={`badge ${status.daemon === 'running' ? 'badge-synced' : 'badge-not_downloaded'}`}
|
||||
>{status.daemon}</span
|
||||
></span
|
||||
>
|
||||
<span class="pill"
|
||||
>Session <span class={`badge ${sessionPill(status).cls}`}>{sessionPill(status).text}</span></span
|
||||
>
|
||||
<span class="pill"
|
||||
>Browser <span class={`badge ${browserPill(status).cls}`}>{browserPill(status).text}</span></span
|
||||
>
|
||||
</div>
|
||||
|
||||
<p class="phase" data-testid="crawler-phase">{phaseLabel(status.phase)}</p>
|
||||
{#if phasePercent(status.phase) !== null}
|
||||
{@render Bar({ percent: phasePercent(status.phase) ?? 0 })}
|
||||
{/if}
|
||||
|
||||
{#if status.session.expired}
|
||||
<p class="warn">
|
||||
⚠ Chapter downloads paused — session expired. Metadata + list crawl continue.
|
||||
</p>
|
||||
{/if}
|
||||
|
||||
{#if status.current_cover}
|
||||
<p class="cover" data-testid="current-cover">
|
||||
🖼 Fetching cover: <strong>{status.current_cover.manga_title}</strong>
|
||||
</p>
|
||||
{/if}
|
||||
|
||||
<p class="lastpass">
|
||||
Last pass:
|
||||
{#if status.last_pass.at}
|
||||
{new Date(status.last_pass.at).toLocaleString()} ·
|
||||
{status.last_pass.discovered} seen · {status.last_pass.upserted} upserted ·
|
||||
{status.last_pass.mangas_failed} failed
|
||||
{:else}
|
||||
— none yet this session
|
||||
{/if}
|
||||
</p>
|
||||
</section>
|
||||
|
||||
<!-- Controls -->
|
||||
<section class="controls">
|
||||
<button onclick={onRunPass} disabled={busy || status.daemon !== 'running'}
|
||||
>Run metadata pass now</button
|
||||
>
|
||||
<button onclick={() => (restartModalOpen = true)} disabled={busy || status.daemon !== 'running'}
|
||||
>Restart browser</button
|
||||
>
|
||||
<button onclick={() => { sessionModalOpen = true; sessionResult = null; }} disabled={busy || status.daemon !== 'running'}
|
||||
>Manage session…</button
|
||||
>
|
||||
{#if status.session.expired}
|
||||
<button onclick={onClearExpired} disabled={busy}>Clear expired flag</button>
|
||||
{/if}
|
||||
</section>
|
||||
|
||||
<!-- Queue + covers stats -->
|
||||
<section class="grid2">
|
||||
<article>
|
||||
<h2>Queue</h2>
|
||||
<dl>
|
||||
<dt>Pending</dt>
|
||||
<dd>{status.queue.pending}</dd>
|
||||
<dt>Running</dt>
|
||||
<dd>{status.queue.running}</dd>
|
||||
<dt>Dead</dt>
|
||||
<dd>{status.queue.dead}</dd>
|
||||
<dt>Covers queued</dt>
|
||||
<dd>{status.covers_queued}</dd>
|
||||
</dl>
|
||||
</article>
|
||||
<article>
|
||||
<h2>Active chapters ({status.active_chapters.length}/{status.worker_count})</h2>
|
||||
{#if status.active_chapters.length === 0}
|
||||
<p class="muted">idle — no chapters downloading</p>
|
||||
{:else}
|
||||
<table class="active">
|
||||
<tbody>
|
||||
{#each status.active_chapters as c (c.chapter_id)}
|
||||
<tr>
|
||||
<td>{c.manga_title} · ch.{c.chapter_number}</td>
|
||||
<td class="pagecount" data-testid="active-pages">
|
||||
{c.pages_done}/{c.pages_total ?? '?'}
|
||||
</td>
|
||||
<td class="pagebar">
|
||||
{#if chapterPercent(c) !== null}
|
||||
{@render Bar({ percent: chapterPercent(c) ?? 0 })}
|
||||
{/if}
|
||||
</td>
|
||||
</tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
{/if}
|
||||
</article>
|
||||
</section>
|
||||
{:else}
|
||||
<p>Loading…</p>
|
||||
{/if}
|
||||
|
||||
<!-- Queued chapters (pending/running backlog) -->
|
||||
<section class="backlog">
|
||||
<div class="deadhead">
|
||||
<h2>Queued chapters ({activeTotal})</h2>
|
||||
<div class="deadtools">
|
||||
<input
|
||||
placeholder="Search manga…"
|
||||
bind:value={activeSearch}
|
||||
onkeydown={(e) => e.key === 'Enter' && onSearchActive()}
|
||||
/>
|
||||
<button onclick={onSearchActive}>Search</button>
|
||||
</div>
|
||||
</div>
|
||||
{#if activeJobs.length === 0}
|
||||
<p class="muted">No chapters queued.</p>
|
||||
{:else}
|
||||
<table class="dead">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Manga / Chapter</th>
|
||||
<th>State</th>
|
||||
<th>Att.</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{#each activeJobs as j (j.id)}
|
||||
<tr>
|
||||
<td>
|
||||
{j.manga_title ?? '(unknown)'}
|
||||
{#if j.chapter_number != null}· ch.{j.chapter_number}{/if}
|
||||
</td>
|
||||
<td>
|
||||
<span
|
||||
class={`badge ${j.state === 'running' ? 'badge-downloading' : 'badge-not_downloaded'}`}
|
||||
>{j.state}</span
|
||||
>
|
||||
</td>
|
||||
<td>{j.attempts}/{j.max_attempts}</td>
|
||||
</tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
<Pager page={activePage} totalPages={activeTotalPages} onChange={onActivePageChange} />
|
||||
{/if}
|
||||
</section>
|
||||
|
||||
<!-- Queued covers (mangas missing a cover) -->
|
||||
<section class="backlog">
|
||||
<div class="deadhead">
|
||||
<h2>Queued covers ({coversTotal})</h2>
|
||||
<div class="deadtools">
|
||||
<input
|
||||
placeholder="Search manga…"
|
||||
bind:value={coversSearch}
|
||||
onkeydown={(e) => e.key === 'Enter' && onSearchCovers()}
|
||||
/>
|
||||
<button onclick={onSearchCovers}>Search</button>
|
||||
</div>
|
||||
</div>
|
||||
{#if covers.length === 0}
|
||||
<p class="muted">No covers queued 🎉</p>
|
||||
{:else}
|
||||
<table class="dead">
|
||||
<thead>
|
||||
<tr><th>Manga</th></tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{#each covers as c (c.manga_id)}
|
||||
<tr><td>{c.manga_title}</td></tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
<Pager page={coversPage} totalPages={coversTotalPages} onChange={onCoversPageChange} />
|
||||
{/if}
|
||||
</section>
|
||||
|
||||
<!-- Dead jobs -->
|
||||
<section class="deadjobs">
|
||||
<div class="deadhead">
|
||||
<h2>Dead jobs ({deadTotal})</h2>
|
||||
<div class="deadtools">
|
||||
<input
|
||||
placeholder="Search manga…"
|
||||
bind:value={deadSearch}
|
||||
onkeydown={(e) => e.key === 'Enter' && onSearchDead()}
|
||||
/>
|
||||
<button onclick={onSearchDead}>Search</button>
|
||||
<button
|
||||
onclick={() => requeue({ scope: 'all' })}
|
||||
disabled={busy || deadTotal === 0}>Requeue all ({deadTotal})</button
|
||||
>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{#if deadJobs.length === 0}
|
||||
<p class="muted">No dead jobs 🎉</p>
|
||||
{:else}
|
||||
<table class="dead">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Manga / Chapter</th>
|
||||
<th>Att.</th>
|
||||
<th>Failed</th>
|
||||
<th>Last error</th>
|
||||
<th class="actions">Action</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{#each deadJobs as j (j.id)}
|
||||
<tr>
|
||||
<td>
|
||||
{j.manga_title ?? '(unknown)'}
|
||||
{#if j.chapter_number != null}· ch.{j.chapter_number}{/if}
|
||||
</td>
|
||||
<td>{j.attempts}/{j.max_attempts}</td>
|
||||
<td>{new Date(j.updated_at).toLocaleDateString()}</td>
|
||||
<td class="err" title={j.last_error ?? ''}>{j.last_error ?? '—'}</td>
|
||||
<td class="actions">
|
||||
<button onclick={() => requeue({ scope: 'job', job_id: j.id })} disabled={busy}
|
||||
>Requeue</button
|
||||
>
|
||||
{#if j.manga_id}
|
||||
<button
|
||||
class="secondary"
|
||||
onclick={() => requeue({ scope: 'manga', manga_id: j.manga_id! })}
|
||||
disabled={busy}>Manga</button
|
||||
>
|
||||
{/if}
|
||||
</td>
|
||||
</tr>
|
||||
{/each}
|
||||
</tbody>
|
||||
</table>
|
||||
<Pager page={deadPage} totalPages={deadTotalPages} onChange={onDeadPageChange} />
|
||||
{/if}
|
||||
</section>
|
||||
|
||||
<!-- Restart confirm modal -->
|
||||
<Modal open={restartModalOpen} title="Restart browser" onClose={() => (restartModalOpen = false)} size="sm">
|
||||
{#snippet children()}
|
||||
<p>This relaunches Chromium and re-injects the session cookie.</p>
|
||||
<ul class="coord">
|
||||
<li>In-flight jobs are allowed to finish (bounded), then forced.</li>
|
||||
<li>New jobs pause until the relaunch completes.</li>
|
||||
<li>The metadata pass yields at its next checkpoint.</li>
|
||||
</ul>
|
||||
{/snippet}
|
||||
{#snippet footer()}
|
||||
<button onclick={() => (restartModalOpen = false)}>Cancel</button>
|
||||
<button class="primary" onclick={onConfirmRestart} disabled={busy}>Restart</button>
|
||||
{/snippet}
|
||||
</Modal>
|
||||
|
||||
<!-- Session modal -->
|
||||
<Modal open={sessionModalOpen} title="Manage crawler session" onClose={() => (sessionModalOpen = false)} size="md">
|
||||
{#snippet children()}
|
||||
<label for="phpsessid">PHPSESSID</label>
|
||||
<input id="phpsessid" type="password" bind:value={phpsessid} autocomplete="off" />
|
||||
<p class="hint">
|
||||
Saving rewrites the cookie everywhere, persists it, restarts the browser, and re-probes.
|
||||
</p>
|
||||
{#if sessionResult}
|
||||
<p class="sessionresult">{sessionResult}</p>
|
||||
{/if}
|
||||
{/snippet}
|
||||
{#snippet footer()}
|
||||
<button onclick={() => (sessionModalOpen = false)}>Cancel</button>
|
||||
<button class="primary" onclick={onSaveSession} disabled={busy || phpsessid.trim() === ''}
|
||||
>Save & validate</button
|
||||
>
|
||||
{/snippet}
|
||||
</Modal>
|
||||
|
||||
{#snippet Bar({ percent }: { percent: number })}
|
||||
<div class="bar" role="progressbar" aria-valuenow={percent} aria-valuemin="0" aria-valuemax="100">
|
||||
<div class="fill" style:width="{Math.min(100, Math.max(0, percent))}%"></div>
|
||||
<span class="label">{percent.toFixed(0)}%</span>
|
||||
</div>
|
||||
{/snippet}
|
||||
|
||||
<style>
|
||||
h1 {
|
||||
margin: 0;
|
||||
}
|
||||
.titlebar {
|
||||
display: flex;
|
||||
align-items: baseline;
|
||||
gap: var(--space-3);
|
||||
margin-bottom: var(--space-4);
|
||||
}
|
||||
.livedot {
|
||||
font-size: var(--font-sm);
|
||||
color: var(--text-muted);
|
||||
}
|
||||
.livedot.on {
|
||||
color: var(--success, #0a7d2c);
|
||||
}
|
||||
h2 {
|
||||
margin: 0 0 var(--space-3) 0;
|
||||
font-size: var(--font-sm);
|
||||
color: var(--text-muted);
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.04em;
|
||||
}
|
||||
.hero {
|
||||
padding: var(--space-4);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: var(--radius-md);
|
||||
background: var(--surface);
|
||||
margin-bottom: var(--space-4);
|
||||
}
|
||||
.pills {
|
||||
display: flex;
|
||||
gap: var(--space-4);
|
||||
flex-wrap: wrap;
|
||||
margin-bottom: var(--space-3);
|
||||
}
|
||||
.pill {
|
||||
font-size: var(--font-sm);
|
||||
color: var(--text-muted);
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
gap: var(--space-2);
|
||||
}
|
||||
.phase {
|
||||
font-size: var(--font-lg);
|
||||
font-weight: var(--weight-semibold);
|
||||
margin: var(--space-2) 0;
|
||||
}
|
||||
.lastpass,
|
||||
.hint {
|
||||
color: var(--text-muted);
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
.warn {
|
||||
color: #92400e;
|
||||
background: #fef3c7;
|
||||
border: 1px solid #fcd34d;
|
||||
padding: var(--space-2) var(--space-3);
|
||||
border-radius: var(--radius-md);
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
.controls {
|
||||
display: flex;
|
||||
gap: var(--space-2);
|
||||
flex-wrap: wrap;
|
||||
margin-bottom: var(--space-4);
|
||||
}
|
||||
.grid2 {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fit, minmax(16rem, 1fr));
|
||||
gap: var(--space-3);
|
||||
margin-bottom: var(--space-4);
|
||||
}
|
||||
article {
|
||||
padding: var(--space-3);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: var(--radius-md);
|
||||
background: var(--surface);
|
||||
}
|
||||
dl {
|
||||
display: grid;
|
||||
grid-template-columns: max-content 1fr;
|
||||
gap: var(--space-1) var(--space-3);
|
||||
margin: 0;
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
dt {
|
||||
color: var(--text-muted);
|
||||
}
|
||||
dd {
|
||||
margin: 0;
|
||||
font-family: var(--font-mono, monospace);
|
||||
}
|
||||
table {
|
||||
width: 100%;
|
||||
border-collapse: collapse;
|
||||
}
|
||||
th,
|
||||
td {
|
||||
padding: var(--space-2);
|
||||
text-align: left;
|
||||
border-bottom: 1px solid var(--border);
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
.actions {
|
||||
text-align: right;
|
||||
}
|
||||
.mono {
|
||||
font-family: var(--font-mono, monospace);
|
||||
font-size: var(--font-xs);
|
||||
}
|
||||
.err {
|
||||
max-width: 22rem;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
white-space: nowrap;
|
||||
color: var(--text-muted);
|
||||
}
|
||||
.deadhead {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
gap: var(--space-3);
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
.deadtools {
|
||||
display: flex;
|
||||
gap: var(--space-2);
|
||||
}
|
||||
button.secondary {
|
||||
background: var(--surface-elevated);
|
||||
}
|
||||
.notice {
|
||||
color: var(--success, #0a7d2c);
|
||||
padding: var(--space-2) var(--space-3);
|
||||
border: 1px solid var(--success, #0a7d2c);
|
||||
border-radius: var(--radius-md);
|
||||
margin-bottom: var(--space-3);
|
||||
}
|
||||
.sessionresult {
|
||||
margin-top: var(--space-2);
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
.coord {
|
||||
margin: var(--space-2) 0;
|
||||
padding-left: var(--space-4);
|
||||
font-size: var(--font-sm);
|
||||
color: var(--text-muted);
|
||||
}
|
||||
/* badges (shared convention with admin/mangas) */
|
||||
:global(.badge) {
|
||||
display: inline-block;
|
||||
padding: 0 var(--space-2);
|
||||
border-radius: var(--radius-sm, 4px);
|
||||
font-size: var(--font-xs);
|
||||
font-weight: var(--weight-semibold);
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.04em;
|
||||
border: 1px solid var(--border);
|
||||
background: var(--surface);
|
||||
}
|
||||
:global(.badge-synced) {
|
||||
background: #dcfce7;
|
||||
color: #166534;
|
||||
border-color: #86efac;
|
||||
}
|
||||
:global(.badge-in_progress),
|
||||
:global(.badge-downloading) {
|
||||
background: #fef3c7;
|
||||
color: #92400e;
|
||||
border-color: #fcd34d;
|
||||
}
|
||||
:global(.badge-not_downloaded) {
|
||||
background: var(--surface-elevated);
|
||||
color: var(--text-muted);
|
||||
}
|
||||
.bar {
|
||||
position: relative;
|
||||
background: var(--surface-elevated);
|
||||
border-radius: var(--radius-sm, 4px);
|
||||
height: 1.5rem;
|
||||
margin: var(--space-2) 0;
|
||||
overflow: hidden;
|
||||
}
|
||||
.fill {
|
||||
height: 100%;
|
||||
background: #22c55e;
|
||||
transition: width 0.3s ease;
|
||||
}
|
||||
.label {
|
||||
position: absolute;
|
||||
top: 50%;
|
||||
left: 50%;
|
||||
transform: translate(-50%, -50%);
|
||||
font-size: var(--font-xs);
|
||||
font-weight: var(--weight-semibold);
|
||||
}
|
||||
.error {
|
||||
color: var(--danger, #dc2626);
|
||||
padding: var(--space-2) var(--space-3);
|
||||
border: 1px solid var(--danger, #dc2626);
|
||||
border-radius: var(--radius-md);
|
||||
margin-bottom: var(--space-3);
|
||||
}
|
||||
.muted {
|
||||
color: var(--text-muted);
|
||||
}
|
||||
.cover {
|
||||
font-size: var(--font-sm);
|
||||
}
|
||||
.backlog {
|
||||
margin-top: var(--space-4);
|
||||
}
|
||||
.pagecount {
|
||||
font-family: var(--font-mono, monospace);
|
||||
font-size: var(--font-xs);
|
||||
white-space: nowrap;
|
||||
}
|
||||
.pagebar {
|
||||
width: 8rem;
|
||||
}
|
||||
table.active td {
|
||||
vertical-align: middle;
|
||||
}
|
||||
</style>
|
||||
@@ -3,6 +3,7 @@
|
||||
import {
|
||||
listAdminMangas,
|
||||
listAdminChapters,
|
||||
requeueDeadJobs,
|
||||
type AdminMangasPage,
|
||||
type AdminChapterRow,
|
||||
type MangaSyncState
|
||||
@@ -59,6 +60,27 @@
|
||||
function badgeClass(state: string): string {
|
||||
return `badge badge-${state}`;
|
||||
}
|
||||
|
||||
let requeuingChapter: string | null = $state(null);
|
||||
|
||||
/** Requeue the dead job(s) for a single failed chapter, then refresh
|
||||
* that manga's chapter list so the pill updates. */
|
||||
async function requeueChapter(mangaId: string, chapterId: string) {
|
||||
requeuingChapter = chapterId;
|
||||
error = null;
|
||||
try {
|
||||
await requeueDeadJobs({ scope: 'chapter', chapter_id: chapterId });
|
||||
const resp = await listAdminChapters(mangaId, { limit: 500 });
|
||||
chaptersByManga[mangaId] = {
|
||||
items: resp.items,
|
||||
total: resp.page.total ?? resp.items.length
|
||||
};
|
||||
} catch (e) {
|
||||
error = e instanceof ApiError ? e.message : 'requeue failed';
|
||||
} finally {
|
||||
requeuingChapter = null;
|
||||
}
|
||||
}
|
||||
</script>
|
||||
|
||||
<h1>Mangas</h1>
|
||||
@@ -153,6 +175,16 @@
|
||||
<span class={badgeClass(c.sync_state)}>
|
||||
{c.sync_state}
|
||||
</span>
|
||||
{#if c.sync_state === 'failed'}
|
||||
<button
|
||||
class="requeue"
|
||||
onclick={() => requeueChapter(m.id, c.id)}
|
||||
disabled={requeuingChapter === c.id}
|
||||
title="Requeue this chapter"
|
||||
>
|
||||
↻ requeue
|
||||
</button>
|
||||
{/if}
|
||||
</td>
|
||||
</tr>
|
||||
{/each}
|
||||
@@ -272,6 +304,11 @@
|
||||
color: #991b1b;
|
||||
border-color: #fca5a5;
|
||||
}
|
||||
.requeue {
|
||||
margin-left: var(--space-2);
|
||||
font-size: var(--font-xs);
|
||||
padding: 0 var(--space-2);
|
||||
}
|
||||
.badge-not_downloaded {
|
||||
background: var(--surface-elevated);
|
||||
color: var(--text-muted);
|
||||
|
||||
Reference in New Issue
Block a user