feat: cover retry backfill + admin force-resync for manga & chapter (0.50.0)
Adds a per-tick cover-backfill pass to the crawler daemon so mangas whose cover download failed on first attempt get retried — the metadata pass's early-stop optimisation otherwise prevents the walk from revisiting them. Adds admin-only POST /admin/mangas/:id/resync and POST /admin/chapters/:id/resync that refetch metadata + cover (or chapter content with force_refetch) from the crawler source synchronously and return the refreshed row. Surfaced in the UI as "Force resync" buttons on the manga detail and reader pages, admin-only via session.user.is_admin. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
||||
|
||||
[[package]]
|
||||
name = "mangalord"
|
||||
version = "0.49.1"
|
||||
version = "0.50.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mangalord"
|
||||
version = "0.49.1"
|
||||
version = "0.50.0"
|
||||
edition = "2021"
|
||||
default-run = "mangalord"
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
//! `crate::auth::extractor::RequireAdmin`).
|
||||
|
||||
pub mod mangas;
|
||||
pub mod resync;
|
||||
pub mod system;
|
||||
pub mod users;
|
||||
|
||||
@@ -16,5 +17,6 @@ pub fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.merge(users::routes())
|
||||
.merge(mangas::routes())
|
||||
.merge(resync::routes())
|
||||
.merge(system::routes())
|
||||
}
|
||||
|
||||
176
backend/src/api/admin/resync.rs
Normal file
176
backend/src/api/admin/resync.rs
Normal file
@@ -0,0 +1,176 @@
|
||||
//! Admin-triggered force resync of a single manga's metadata + cover,
|
||||
//! or a single chapter's content.
|
||||
//!
|
||||
//! Both endpoints are admin-only (`RequireAdmin`, cookie-only) and run
|
||||
//! synchronously with the request — the response carries the refreshed
|
||||
//! resource so the UI can swap it in without a follow-up GET. The work
|
||||
//! itself is delegated to [`ResyncService`] (set on AppState by
|
||||
//! `app::build` when the crawler daemon is enabled); when the daemon
|
||||
//! is disabled, both handlers return 503.
|
||||
|
||||
use axum::extract::{Path, State};
|
||||
use axum::routing::post;
|
||||
use axum::{Json, Router};
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::app::AppState;
|
||||
use crate::auth::extractor::RequireAdmin;
|
||||
use crate::crawler::resync::{ChapterResyncOutcome, ResyncError};
|
||||
use crate::domain::manga::MangaDetail;
|
||||
use crate::domain::Chapter;
|
||||
use crate::error::{AppError, AppResult};
|
||||
use crate::repo;
|
||||
use crate::repo::crawler::UpsertStatus;
|
||||
|
||||
pub fn routes() -> Router<AppState> {
|
||||
Router::new()
|
||||
.route("/admin/mangas/:id/resync", post(resync_manga))
|
||||
.route("/admin/chapters/:id/resync", post(resync_chapter))
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct MangaResyncResponse {
|
||||
pub manga: MangaDetail,
|
||||
/// `"new" | "updated" | "unchanged"` — mirrors [`UpsertStatus`].
|
||||
pub metadata_status: &'static str,
|
||||
pub cover_fetched: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct ChapterResyncResponse {
|
||||
pub chapter: Chapter,
|
||||
/// `"fetched" | "skipped"` — whether new pages landed or the
|
||||
/// service short-circuited (e.g. chapter already had pages and the
|
||||
/// session was lost so force was downgraded).
|
||||
pub outcome: &'static str,
|
||||
/// Page count when `outcome == "fetched"`. `None` for `skipped`.
|
||||
pub pages: Option<usize>,
|
||||
}
|
||||
|
||||
async fn resync_manga(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Path(manga_id): Path<Uuid>,
|
||||
) -> AppResult<Json<MangaResyncResponse>> {
|
||||
if !repo::manga::exists(&state.db, manga_id).await? {
|
||||
return Err(AppError::NotFound);
|
||||
}
|
||||
let resync = state
|
||||
.resync
|
||||
.as_ref()
|
||||
.ok_or_else(|| AppError::ServiceUnavailable(
|
||||
"crawler daemon is disabled; force resync unavailable".into(),
|
||||
))?;
|
||||
|
||||
let outcome = resync.resync_manga(manga_id).await.map_err(map_resync_err)?;
|
||||
|
||||
// Audit the action with the actor + the resync outcome so an
|
||||
// operator-of-operators can answer "who refetched this manga, and
|
||||
// did the cover land?" from the log alone.
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"manga_resync",
|
||||
"manga",
|
||||
Some(manga_id),
|
||||
json!({
|
||||
"metadata_status": status_str(outcome.metadata_status),
|
||||
"cover_fetched": outcome.cover_fetched,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let manga = repo::manga::get_detail(&state.db, manga_id).await?;
|
||||
Ok(Json(MangaResyncResponse {
|
||||
manga,
|
||||
metadata_status: status_str(outcome.metadata_status),
|
||||
cover_fetched: outcome.cover_fetched,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn resync_chapter(
|
||||
State(state): State<AppState>,
|
||||
admin: RequireAdmin,
|
||||
Path(chapter_id): Path<Uuid>,
|
||||
) -> AppResult<Json<ChapterResyncResponse>> {
|
||||
let resync = state
|
||||
.resync
|
||||
.as_ref()
|
||||
.ok_or_else(|| AppError::ServiceUnavailable(
|
||||
"crawler daemon is disabled; force resync unavailable".into(),
|
||||
))?;
|
||||
|
||||
// Look up the manga the chapter belongs to so we can return the
|
||||
// refreshed chapter row in the response and 404 for unknown ids.
|
||||
let manga_id: Option<Uuid> =
|
||||
sqlx::query_scalar("SELECT manga_id FROM chapters WHERE id = $1")
|
||||
.bind(chapter_id)
|
||||
.fetch_optional(&state.db)
|
||||
.await?;
|
||||
let Some(manga_id) = manga_id else {
|
||||
return Err(AppError::NotFound);
|
||||
};
|
||||
|
||||
let outcome = resync
|
||||
.resync_chapter(chapter_id)
|
||||
.await
|
||||
.map_err(map_resync_err)?;
|
||||
|
||||
let (outcome_str, pages) = match &outcome {
|
||||
ChapterResyncOutcome::Fetched { pages, .. } => ("fetched", Some(*pages)),
|
||||
ChapterResyncOutcome::Skipped { .. } => ("skipped", None),
|
||||
};
|
||||
|
||||
repo::admin_audit::insert(
|
||||
&state.db,
|
||||
admin.0.id,
|
||||
"chapter_resync",
|
||||
"chapter",
|
||||
Some(chapter_id),
|
||||
json!({
|
||||
"outcome": outcome_str,
|
||||
"pages": pages,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let chapter = repo::chapter::find_by_id_in_manga(&state.db, manga_id, chapter_id)
|
||||
.await?
|
||||
.ok_or(AppError::NotFound)?;
|
||||
Ok(Json(ChapterResyncResponse {
|
||||
chapter,
|
||||
outcome: outcome_str,
|
||||
pages,
|
||||
}))
|
||||
}
|
||||
|
||||
fn status_str(s: UpsertStatus) -> &'static str {
|
||||
match s {
|
||||
UpsertStatus::New => "new",
|
||||
UpsertStatus::Updated => "updated",
|
||||
UpsertStatus::Unchanged => "unchanged",
|
||||
}
|
||||
}
|
||||
|
||||
/// Map [`ResyncError`] (and the anyhow envelopes wrapping it) onto the
|
||||
/// right [`AppError`]. Anything else surfaces as a generic 500 via the
|
||||
/// `Other` arm — the operator sees the underlying anyhow chain in
|
||||
/// server logs, the client sees a clean envelope.
|
||||
fn map_resync_err(err: anyhow::Error) -> AppError {
|
||||
if let Some(rerr) = err.downcast_ref::<ResyncError>() {
|
||||
match rerr {
|
||||
ResyncError::NoMangaSource => AppError::ValidationFailed {
|
||||
message: "manga has no live crawler source — cannot resync".into(),
|
||||
details: json!({ "manga": "no_source" }),
|
||||
},
|
||||
ResyncError::NoChapterSource => AppError::ValidationFailed {
|
||||
message: "chapter has no live crawler source — cannot resync".into(),
|
||||
details: json!({ "chapter": "no_source" }),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
AppError::Other(err)
|
||||
}
|
||||
}
|
||||
@@ -24,6 +24,7 @@ use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass
|
||||
use crate::crawler::jobs::JobPayload;
|
||||
use crate::crawler::pipeline::{self, MetadataStats};
|
||||
use crate::crawler::rate_limit::HostRateLimiters;
|
||||
use crate::crawler::resync::{RealResyncService, ResyncService};
|
||||
use crate::crawler::safety::DownloadAllowlist;
|
||||
use crate::crawler::session;
|
||||
use crate::repo;
|
||||
@@ -39,6 +40,12 @@ pub struct AppState {
|
||||
/// One instance per AppState so tests stay isolated across the
|
||||
/// same process.
|
||||
pub auth_limiter: Arc<AuthRateLimiter>,
|
||||
/// Admin-triggered force resync. `None` when the crawler daemon
|
||||
/// is disabled (`CRAWLER_DAEMON=false`); admin handlers gate on
|
||||
/// `.is_some()` and return 503 otherwise. Set by [`build`] from the
|
||||
/// same wiring that builds the daemon's chapter dispatcher, so a
|
||||
/// force resync uses the daemon's BrowserManager + rate limiters.
|
||||
pub resync: Option<Arc<dyn ResyncService>>,
|
||||
}
|
||||
|
||||
/// Bundle returned by [`build`]. The router is what `axum::serve` consumes;
|
||||
@@ -73,11 +80,12 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
|
||||
|
||||
let storage: Arc<dyn Storage> = Arc::new(LocalStorage::new(config.storage_dir.clone()));
|
||||
|
||||
let daemon = if config.crawler.daemon_enabled {
|
||||
Some(spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?)
|
||||
let (daemon, resync) = if config.crawler.daemon_enabled {
|
||||
let spawned = spawn_crawler_daemon(db.clone(), Arc::clone(&storage), &config.crawler).await?;
|
||||
(Some(spawned.handle), Some(spawned.resync))
|
||||
} else {
|
||||
tracing::info!("crawler daemon disabled (CRAWLER_DAEMON=false)");
|
||||
None
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let auth_limiter = Arc::new(AuthRateLimiter::new(config.auth.rate_limit));
|
||||
@@ -87,16 +95,26 @@ pub async fn build(config: Config) -> anyhow::Result<AppHandle> {
|
||||
auth: config.auth.clone(),
|
||||
upload: config.upload.clone(),
|
||||
auth_limiter,
|
||||
resync,
|
||||
};
|
||||
let router = router(state).layer(cors_layer(&config.cors_allowed_origins));
|
||||
Ok(AppHandle { router, daemon })
|
||||
}
|
||||
|
||||
/// Bundle returned by [`spawn_crawler_daemon`]. The handle owns the
|
||||
/// daemon's tasks; `resync` is the operator-trigger service shared with
|
||||
/// `AppState` so admin endpoints can call into the same browser /
|
||||
/// rate-limit machinery.
|
||||
struct SpawnedDaemon {
|
||||
handle: daemon::DaemonHandle,
|
||||
resync: Arc<dyn ResyncService>,
|
||||
}
|
||||
|
||||
async fn spawn_crawler_daemon(
|
||||
db: PgPool,
|
||||
storage: Arc<dyn Storage>,
|
||||
cfg: &CrawlerConfig,
|
||||
) -> anyhow::Result<daemon::DaemonHandle> {
|
||||
) -> anyhow::Result<SpawnedDaemon> {
|
||||
// Reqwest client with cookie jar pre-seeded so CDN image fetches
|
||||
// include PHPSESSID. Same shape as bin/crawler.rs main().
|
||||
let cookie_jar = Arc::new(reqwest::cookie::Jar::default());
|
||||
@@ -198,6 +216,17 @@ async fn spawn_crawler_daemon(
|
||||
});
|
||||
|
||||
let dispatcher: Arc<dyn ChapterDispatcher> = Arc::new(RealChapterDispatcher {
|
||||
browser_manager: Arc::clone(&browser_manager),
|
||||
db: db.clone(),
|
||||
storage: Arc::clone(&storage),
|
||||
http: http.clone(),
|
||||
rate: Arc::clone(&rate),
|
||||
download_allowlist: cfg.download_allowlist.clone(),
|
||||
max_image_bytes: cfg.max_image_bytes,
|
||||
tor: tor.as_ref().map(Arc::clone),
|
||||
});
|
||||
|
||||
let resync: Arc<dyn ResyncService> = Arc::new(RealResyncService {
|
||||
browser_manager: Arc::clone(&browser_manager),
|
||||
db: db.clone(),
|
||||
storage: Arc::clone(&storage),
|
||||
@@ -242,7 +271,10 @@ async fn spawn_crawler_daemon(
|
||||
},
|
||||
);
|
||||
|
||||
Ok(daemon_handle)
|
||||
Ok(SpawnedDaemon {
|
||||
handle: daemon_handle,
|
||||
resync,
|
||||
})
|
||||
}
|
||||
|
||||
// Real impls of the daemon traits, owning the browser manager + I/O. Kept
|
||||
@@ -285,6 +317,36 @@ impl MetadataPass for RealMetadataPass {
|
||||
self.browser_manager.invalidate().await;
|
||||
}
|
||||
}
|
||||
// Cover backfill follows the metadata pass even when the pass
|
||||
// errored — the early-stop walk can complete its work and bail
|
||||
// late, and a transient browser failure shouldn't cancel the
|
||||
// residual cover backlog. The backfill has its own per-call cap
|
||||
// so a runaway error stream can't monopolise the tick.
|
||||
match pipeline::backfill_missing_covers(
|
||||
&self.browser_manager,
|
||||
&self.db,
|
||||
self.storage.as_ref(),
|
||||
&self.http,
|
||||
&self.rate,
|
||||
pipeline::COVER_BACKFILL_DEFAULT_MAX,
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
self.tor.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(stats) => {
|
||||
if stats.considered > 0 {
|
||||
tracing::info!(?stats, "cover backfill complete");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(error = ?e, "cover backfill failed");
|
||||
if crate::crawler::nav::anyhow_looks_browser_dead(&e) {
|
||||
self.browser_manager.invalidate().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod jobs;
|
||||
pub mod nav;
|
||||
pub mod pipeline;
|
||||
pub mod rate_limit;
|
||||
pub mod resync;
|
||||
pub mod safety;
|
||||
pub mod session;
|
||||
pub mod source;
|
||||
|
||||
@@ -13,7 +13,7 @@ use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
|
||||
use crate::crawler::rate_limit::HostRateLimiters;
|
||||
use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist};
|
||||
use crate::crawler::source::target::TargetSource;
|
||||
use crate::crawler::source::{FetchContext, Source};
|
||||
use crate::crawler::source::{FetchContext, Source, SourceMangaRef};
|
||||
use crate::repo;
|
||||
use crate::repo::crawler::UpsertStatus;
|
||||
use crate::storage::Storage;
|
||||
@@ -523,12 +523,133 @@ pub struct EnqueueSummary {
|
||||
pub failed: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub struct CoverBackfillStats {
|
||||
pub considered: usize,
|
||||
pub fetched: usize,
|
||||
pub failed: usize,
|
||||
}
|
||||
|
||||
/// Default per-tick cap for [`backfill_missing_covers`]. The metadata pass
|
||||
/// already retries covers when its walk reaches the affected manga; this
|
||||
/// backfill exists to catch the residual case where the early-stop
|
||||
/// optimisation prevents the walk from reaching mangas whose cover failed
|
||||
/// on first attempt. A small cap is enough because the backlog only grows
|
||||
/// from sporadic download failures, not from systematic misses.
|
||||
pub const COVER_BACKFILL_DEFAULT_MAX: usize = 10;
|
||||
|
||||
/// Re-attempt cover downloads for mangas where `cover_image_path IS NULL`
|
||||
/// but a live `manga_sources` row exists. Refetches the source detail
|
||||
/// page (which is where the cover URL lives) and downloads the cover.
|
||||
///
|
||||
/// Bounded by `max_mangas` per call so a steady stream of failing covers
|
||||
/// — e.g. a CDN host that's persistently 502 — can't monopolise a cron
|
||||
/// tick. Orders by `manga_sources.last_seen_at DESC` so the freshest
|
||||
/// missing-cover mangas are addressed first.
|
||||
///
|
||||
/// Failures are logged and counted, not raised: a single bad cover URL
|
||||
/// must not stall every other backfill behind it.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn backfill_missing_covers(
|
||||
browser_manager: &BrowserManager,
|
||||
db: &PgPool,
|
||||
storage: &dyn Storage,
|
||||
http: &reqwest::Client,
|
||||
rate: &HostRateLimiters,
|
||||
max_mangas: usize,
|
||||
allowlist: &DownloadAllowlist,
|
||||
max_image_bytes: usize,
|
||||
tor: Option<&crate::crawler::tor::TorController>,
|
||||
) -> anyhow::Result<CoverBackfillStats> {
|
||||
let mut stats = CoverBackfillStats::default();
|
||||
if max_mangas == 0 {
|
||||
return Ok(stats);
|
||||
}
|
||||
|
||||
let entries = repo::crawler::list_missing_covers(db, max_mangas as i64)
|
||||
.await
|
||||
.context("list_missing_covers")?;
|
||||
|
||||
if entries.is_empty() {
|
||||
return Ok(stats);
|
||||
}
|
||||
|
||||
let lease = browser_manager
|
||||
.acquire()
|
||||
.await
|
||||
.context("acquire browser lease for cover backfill")?;
|
||||
let browser_ref: &chromiumoxide::Browser = &lease;
|
||||
let ctx = FetchContext { browser: browser_ref, rate, tor };
|
||||
|
||||
for entry in entries {
|
||||
stats.considered += 1;
|
||||
// Metadata-only TargetSource: skip chapter-list parsing so a
|
||||
// missing-cover refetch doesn't soft-drop chapters on a partial
|
||||
// render. Cover URL alone is what we need.
|
||||
let source = TargetSource::new(entry.source_url.clone()).without_chapter_parsing();
|
||||
let r = SourceMangaRef {
|
||||
source_manga_key: entry.source_manga_key.clone(),
|
||||
title: String::new(),
|
||||
url: entry.source_url.clone(),
|
||||
};
|
||||
let cover_url = match source.fetch_manga(&ctx, &r).await {
|
||||
Ok(manga) => manga.cover_url,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
manga_id = %entry.manga_id,
|
||||
url = %entry.source_url,
|
||||
error = ?e,
|
||||
"cover backfill: fetch_manga failed"
|
||||
);
|
||||
stats.failed += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let Some(cover_url) = cover_url else {
|
||||
tracing::warn!(
|
||||
manga_id = %entry.manga_id,
|
||||
url = %entry.source_url,
|
||||
"cover backfill: source returned no cover_url"
|
||||
);
|
||||
stats.failed += 1;
|
||||
continue;
|
||||
};
|
||||
match download_and_store_cover(
|
||||
db,
|
||||
storage,
|
||||
http,
|
||||
rate,
|
||||
&entry.source_url,
|
||||
entry.manga_id,
|
||||
&cover_url,
|
||||
allowlist,
|
||||
max_image_bytes,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => stats.fetched += 1,
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
manga_id = %entry.manga_id,
|
||||
url = %entry.source_url,
|
||||
error = ?e,
|
||||
"cover backfill: download failed"
|
||||
);
|
||||
stats.failed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(lease);
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Download a cover image and persist its storage path. Local to the
|
||||
/// pipeline because the CLI still calls it from its inline chapter-content
|
||||
/// loop; once the worker pool fully replaces that path we can fold this
|
||||
/// into `pipeline` proper.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn download_and_store_cover(
|
||||
pub(crate) async fn download_and_store_cover(
|
||||
db: &PgPool,
|
||||
storage: &dyn Storage,
|
||||
http: &reqwest::Client,
|
||||
|
||||
277
backend/src/crawler/resync.rs
Normal file
277
backend/src/crawler/resync.rs
Normal file
@@ -0,0 +1,277 @@
|
||||
//! Admin-triggered resync of a single manga's metadata + cover, or a
|
||||
//! single chapter's content.
|
||||
//!
|
||||
//! The cron tick already retries covers and chapter content on its own
|
||||
//! schedule. This module exists for the operator-controlled path:
|
||||
//! "this manga's metadata is stale / its cover never landed / this
|
||||
//! chapter is broken — pull from source now, not at the next daily
|
||||
//! tick." Wired into the admin API, never into the queue, so the work
|
||||
//! happens synchronously with the HTTP request and the admin sees the
|
||||
//! refreshed row in the response.
|
||||
//!
|
||||
//! Shares the daemon's [`BrowserManager`], rate limiter, HTTP client,
|
||||
//! and TOR controller so a force resync respects the same per-host
|
||||
//! pacing and recircuit budget the daily crawl uses — admin actions
|
||||
//! must not let an operator accidentally hammer the source.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::crawler::browser_manager::BrowserManager;
|
||||
use crate::crawler::content::{self, SyncOutcome};
|
||||
use crate::crawler::pipeline;
|
||||
use crate::crawler::rate_limit::HostRateLimiters;
|
||||
use crate::crawler::safety::DownloadAllowlist;
|
||||
use crate::crawler::source::target::TargetSource;
|
||||
use crate::crawler::source::{FetchContext, Source, SourceMangaRef};
|
||||
use crate::crawler::tor::TorController;
|
||||
use crate::repo;
|
||||
use crate::repo::crawler::UpsertStatus;
|
||||
use crate::storage::Storage;
|
||||
|
||||
/// Outcome of [`ResyncService::resync_manga`]. Mirrors the bits the
|
||||
/// admin UI cares about — was the row actually re-upserted, did the
|
||||
/// cover land — so the response can show "metadata refreshed, cover
|
||||
/// re-downloaded" or "metadata unchanged" without a second round-trip.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct MangaResyncOutcome {
|
||||
pub manga_id: Uuid,
|
||||
pub metadata_status: UpsertStatus,
|
||||
pub cover_fetched: bool,
|
||||
}
|
||||
|
||||
/// Outcome of [`ResyncService::resync_chapter`]. `Fetched(pages)` is the
|
||||
/// success case; `Skipped` means the source row was already gone or the
|
||||
/// chapter had no live source.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChapterResyncOutcome {
|
||||
Fetched { chapter_id: Uuid, pages: usize },
|
||||
Skipped { chapter_id: Uuid, reason: String },
|
||||
}
|
||||
|
||||
/// Service exposed by the daemon to the admin API. Optional on
|
||||
/// [`AppState`] — `None` when the crawler daemon is disabled
|
||||
/// (`CRAWLER_DAEMON=false`), in which case admin handlers return 503.
|
||||
#[async_trait]
|
||||
pub trait ResyncService: Send + Sync {
|
||||
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome>;
|
||||
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome>;
|
||||
}
|
||||
|
||||
/// Errors with a stable shape so the API layer can map them to the
|
||||
/// right HTTP status (404 vs 422 vs 5xx). Anything else surfaces as a
|
||||
/// generic 500.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ResyncError {
|
||||
#[error("manga has no source to resync from")]
|
||||
NoMangaSource,
|
||||
#[error("chapter has no source to resync from")]
|
||||
NoChapterSource,
|
||||
}
|
||||
|
||||
pub struct RealResyncService {
|
||||
pub browser_manager: Arc<BrowserManager>,
|
||||
pub db: PgPool,
|
||||
pub storage: Arc<dyn Storage>,
|
||||
pub http: reqwest::Client,
|
||||
pub rate: Arc<HostRateLimiters>,
|
||||
pub download_allowlist: DownloadAllowlist,
|
||||
pub max_image_bytes: usize,
|
||||
pub tor: Option<Arc<TorController>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ResyncService for RealResyncService {
|
||||
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome> {
|
||||
// Pick the freshest live source row. Multi-source mangas
|
||||
// (theoretical — only one Source impl today) get the row whose
|
||||
// `last_seen_at` is newest; soft-dropped rows are skipped.
|
||||
let row: Option<(String, String, String)> = sqlx::query_as(
|
||||
"SELECT source_id, source_manga_key, source_url \
|
||||
FROM manga_sources \
|
||||
WHERE manga_id = $1 AND dropped_at IS NULL \
|
||||
ORDER BY last_seen_at DESC \
|
||||
LIMIT 1",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.fetch_optional(&self.db)
|
||||
.await
|
||||
.context("look up manga_sources for resync")?;
|
||||
let Some((_source_id, source_manga_key, source_url)) = row else {
|
||||
return Err(ResyncError::NoMangaSource.into());
|
||||
};
|
||||
|
||||
let lease = self
|
||||
.browser_manager
|
||||
.acquire()
|
||||
.await
|
||||
.context("acquire browser lease for manga resync")?;
|
||||
let browser_ref: &chromiumoxide::Browser = &lease;
|
||||
let ctx = FetchContext {
|
||||
browser: browser_ref,
|
||||
rate: &self.rate,
|
||||
tor: self.tor.as_deref(),
|
||||
};
|
||||
|
||||
// Parse chapters too — a force resync is "make this manga fully
|
||||
// current," not just metadata. The full pipeline handles the
|
||||
// partial-render guard for us; we replicate the same caution
|
||||
// here by skipping the chapter sync when the parser returned
|
||||
// empty but the manga previously had chapters.
|
||||
let source = TargetSource::new(source_url.clone());
|
||||
let r = SourceMangaRef {
|
||||
source_manga_key: source_manga_key.clone(),
|
||||
title: String::new(),
|
||||
url: source_url.clone(),
|
||||
};
|
||||
let manga = source
|
||||
.fetch_manga(&ctx, &r)
|
||||
.await
|
||||
.with_context(|| format!("fetch_manga during resync of {manga_id}"))?;
|
||||
|
||||
// Partial-render guard: same logic as run_metadata_pass.
|
||||
let source_id = source.id();
|
||||
if !manga.chapters.is_empty() || {
|
||||
let prior = repo::crawler::live_chapter_count_for_source_manga(
|
||||
&self.db,
|
||||
source_id,
|
||||
&source_manga_key,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
prior == 0
|
||||
} {
|
||||
// Either the new fetch surfaced chapters, or there were
|
||||
// none before either — chapter sync is safe to run.
|
||||
} else {
|
||||
tracing::warn!(
|
||||
%manga_id,
|
||||
source_url = %source_url,
|
||||
"resync_manga: fetch returned empty chapters but prior count > 0; skipping chapter sync to avoid soft-drop"
|
||||
);
|
||||
}
|
||||
|
||||
let upsert = repo::crawler::upsert_manga_from_source(
|
||||
&self.db,
|
||||
source_id,
|
||||
&source_url,
|
||||
&manga,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("upsert_manga_from_source during resync of {manga_id}"))?;
|
||||
|
||||
// Cover refetch: force-download regardless of UpsertStatus.
|
||||
// Admin clicked "resync" because they want the cover too.
|
||||
let mut cover_fetched = false;
|
||||
if let Some(cover_url) = manga.cover_url.as_deref() {
|
||||
match pipeline::download_and_store_cover(
|
||||
&self.db,
|
||||
self.storage.as_ref(),
|
||||
&self.http,
|
||||
&self.rate,
|
||||
&source_url,
|
||||
upsert.manga_id,
|
||||
cover_url,
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => cover_fetched = true,
|
||||
Err(e) => tracing::warn!(
|
||||
%manga_id,
|
||||
error = ?e,
|
||||
"resync_manga: cover download failed"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Chapter sync — only when the partial-render guard above
|
||||
// didn't bail.
|
||||
let prior_chapter_count = repo::crawler::live_chapter_count_for_source_manga(
|
||||
&self.db,
|
||||
source_id,
|
||||
&source_manga_key,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
if !manga.chapters.is_empty() || prior_chapter_count == 0 {
|
||||
match repo::crawler::sync_manga_chapters(
|
||||
&self.db,
|
||||
source_id,
|
||||
upsert.manga_id,
|
||||
&manga.chapters,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(diff) => tracing::info!(
|
||||
%manga_id,
|
||||
new = diff.new,
|
||||
refreshed = diff.refreshed,
|
||||
dropped = diff.dropped,
|
||||
"resync_manga: chapters synced"
|
||||
),
|
||||
Err(e) => tracing::warn!(
|
||||
%manga_id,
|
||||
error = ?e,
|
||||
"resync_manga: chapter sync failed"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
drop(lease);
|
||||
Ok(MangaResyncOutcome {
|
||||
manga_id: upsert.manga_id,
|
||||
metadata_status: upsert.status,
|
||||
cover_fetched,
|
||||
})
|
||||
}
|
||||
|
||||
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome> {
|
||||
let row = repo::chapter::dispatch_target(&self.db, chapter_id)
|
||||
.await
|
||||
.context("look up chapter_sources for resync")?;
|
||||
let Some((manga_id, source_url)) = row else {
|
||||
return Err(ResyncError::NoChapterSource.into());
|
||||
};
|
||||
|
||||
let lease = self
|
||||
.browser_manager
|
||||
.acquire()
|
||||
.await
|
||||
.context("acquire browser lease for chapter resync")?;
|
||||
let result = content::sync_chapter_content(
|
||||
&lease,
|
||||
&self.db,
|
||||
self.storage.as_ref(),
|
||||
&self.http,
|
||||
&self.rate,
|
||||
chapter_id,
|
||||
manga_id,
|
||||
&source_url,
|
||||
true,
|
||||
&self.download_allowlist,
|
||||
self.max_image_bytes,
|
||||
self.tor.as_deref(),
|
||||
)
|
||||
.await;
|
||||
drop(lease);
|
||||
|
||||
match result? {
|
||||
SyncOutcome::Fetched { pages } => {
|
||||
Ok(ChapterResyncOutcome::Fetched { chapter_id, pages })
|
||||
}
|
||||
SyncOutcome::Skipped => Ok(ChapterResyncOutcome::Skipped {
|
||||
chapter_id,
|
||||
reason: "chapter already had pages on disk".to_string(),
|
||||
}),
|
||||
SyncOutcome::SessionExpired => {
|
||||
anyhow::bail!("source session expired — operator must refresh PHPSESSID")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,11 @@ pub enum AppError {
|
||||
PayloadTooLarge(String),
|
||||
#[error("unsupported media type: {0}")]
|
||||
UnsupportedMediaType(String),
|
||||
/// 503 — a feature is currently unavailable, distinct from a 5xx
|
||||
/// internal error. Used when admin actions require the crawler
|
||||
/// daemon but it's been disabled (`CRAWLER_DAEMON=false`).
|
||||
#[error("service unavailable: {0}")]
|
||||
ServiceUnavailable(String),
|
||||
/// 429 with an optional `Retry-After` header value (in seconds).
|
||||
#[error("too many requests")]
|
||||
TooManyRequests {
|
||||
@@ -56,6 +61,7 @@ impl AppError {
|
||||
AppError::Conflict(_) => "conflict",
|
||||
AppError::PayloadTooLarge(_) => "payload_too_large",
|
||||
AppError::UnsupportedMediaType(_) => "unsupported_media_type",
|
||||
AppError::ServiceUnavailable(_) => "service_unavailable",
|
||||
AppError::TooManyRequests { .. } => "too_many_requests",
|
||||
AppError::ValidationFailed { .. } => "validation_failed",
|
||||
AppError::Database(sqlx::Error::RowNotFound) => "not_found",
|
||||
@@ -85,6 +91,9 @@ impl IntoResponse for AppError {
|
||||
AppError::UnsupportedMediaType(msg) => {
|
||||
(StatusCode::UNSUPPORTED_MEDIA_TYPE, msg.clone(), None)
|
||||
}
|
||||
AppError::ServiceUnavailable(msg) => {
|
||||
(StatusCode::SERVICE_UNAVAILABLE, msg.clone(), None)
|
||||
}
|
||||
AppError::TooManyRequests { retry_after_secs } => {
|
||||
// Emit `Retry-After: N` (RFC 6585 §4) so a well-behaved
|
||||
// client can back off correctly. Done by building the
|
||||
|
||||
@@ -542,6 +542,51 @@ pub async fn mark_run_completed(pool: &PgPool, source_id: &str) -> sqlx::Result<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List mangas whose `cover_image_path IS NULL` but a live
|
||||
/// `manga_sources` row still attaches them to a source. The bounded
|
||||
/// result feeds the cover-backfill pass in [`crate::crawler::pipeline`]:
|
||||
/// each entry is one (manga, freshest source row) pair where a cover
|
||||
/// re-download is in order.
|
||||
///
|
||||
/// Per-manga deduplication uses `DISTINCT ON (m.id)` keyed on the row
|
||||
/// with the newest `last_seen_at`, so a manga that's surfaced by
|
||||
/// multiple sources only produces one row (the freshest). Sort is
|
||||
/// stable for tests.
|
||||
pub async fn list_missing_covers(
|
||||
pool: &PgPool,
|
||||
max: i64,
|
||||
) -> sqlx::Result<Vec<MissingCoverEntry>> {
|
||||
let rows: Vec<(Uuid, String, String)> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT DISTINCT ON (m.id) m.id, ms.source_manga_key, ms.source_url
|
||||
FROM mangas m
|
||||
JOIN manga_sources ms ON ms.manga_id = m.id
|
||||
WHERE m.cover_image_path IS NULL
|
||||
AND ms.dropped_at IS NULL
|
||||
ORDER BY m.id, ms.last_seen_at DESC
|
||||
LIMIT $1
|
||||
"#,
|
||||
)
|
||||
.bind(max)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|(manga_id, source_manga_key, source_url)| MissingCoverEntry {
|
||||
manga_id,
|
||||
source_manga_key,
|
||||
source_url,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MissingCoverEntry {
|
||||
pub manga_id: Uuid,
|
||||
pub source_manga_key: String,
|
||||
pub source_url: String,
|
||||
}
|
||||
|
||||
/// Read the recovery flag for `source_id`. A missing row OR an
|
||||
/// unparseable value reads as `true` ("clean") — the former covers the
|
||||
/// first-ever run on a virgin DB (no recovery needed), the latter
|
||||
|
||||
350
backend/tests/api_admin_resync.rs
Normal file
350
backend/tests/api_admin_resync.rs
Normal file
@@ -0,0 +1,350 @@
|
||||
//! Integration tests for the admin force-resync endpoints.
|
||||
//!
|
||||
//! Real resync work requires Chromium, so these tests swap in a stub
|
||||
//! [`ResyncService`] to assert the handler-level contract: routing,
|
||||
//! admin gate, 503 when the daemon is disabled, 404 / 422 mapping for
|
||||
//! missing-resource / no-source cases, and the audit-log side effect.
|
||||
|
||||
mod common;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use axum::http::StatusCode;
|
||||
use serde_json::json;
|
||||
use sqlx::PgPool;
|
||||
use tower::ServiceExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
use mangalord::crawler::resync::{
|
||||
ChapterResyncOutcome, MangaResyncOutcome, ResyncError, ResyncService,
|
||||
};
|
||||
use mangalord::repo;
|
||||
use mangalord::repo::crawler::UpsertStatus;
|
||||
|
||||
/// Stub that records call counts and returns a canned outcome.
|
||||
struct StubResync {
|
||||
manga_calls: AtomicUsize,
|
||||
chapter_calls: AtomicUsize,
|
||||
/// When true, returns NoMangaSource / NoChapterSource.
|
||||
no_source: bool,
|
||||
}
|
||||
|
||||
impl StubResync {
|
||||
fn new() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
manga_calls: AtomicUsize::new(0),
|
||||
chapter_calls: AtomicUsize::new(0),
|
||||
no_source: false,
|
||||
})
|
||||
}
|
||||
fn no_source() -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
manga_calls: AtomicUsize::new(0),
|
||||
chapter_calls: AtomicUsize::new(0),
|
||||
no_source: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ResyncService for StubResync {
|
||||
async fn resync_manga(&self, manga_id: Uuid) -> anyhow::Result<MangaResyncOutcome> {
|
||||
self.manga_calls.fetch_add(1, Ordering::SeqCst);
|
||||
if self.no_source {
|
||||
return Err(ResyncError::NoMangaSource.into());
|
||||
}
|
||||
Ok(MangaResyncOutcome {
|
||||
manga_id,
|
||||
metadata_status: UpsertStatus::Updated,
|
||||
cover_fetched: true,
|
||||
})
|
||||
}
|
||||
async fn resync_chapter(&self, chapter_id: Uuid) -> anyhow::Result<ChapterResyncOutcome> {
|
||||
self.chapter_calls.fetch_add(1, Ordering::SeqCst);
|
||||
if self.no_source {
|
||||
return Err(ResyncError::NoChapterSource.into());
|
||||
}
|
||||
Ok(ChapterResyncOutcome::Fetched {
|
||||
chapter_id,
|
||||
pages: 7,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
async fn promote_admin(pool: &PgPool, username: &str) {
|
||||
let u = repo::user::find_by_username(pool, username)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
repo::user::set_is_admin_unchecked(pool, u.id, true)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn insert_manga(pool: &PgPool, title: &str) -> Uuid {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
"INSERT INTO mangas (title, status, alt_titles) VALUES ($1, 'ongoing', ARRAY[]::text[]) RETURNING id",
|
||||
)
|
||||
.bind(title)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
id
|
||||
}
|
||||
|
||||
async fn insert_chapter(pool: &PgPool, manga_id: Uuid, number: i32, pages: i32) -> Uuid {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
"INSERT INTO chapters (manga_id, number, title, page_count) VALUES ($1, $2, NULL, $3) RETURNING id",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(number)
|
||||
.bind(pages)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
id
|
||||
}
|
||||
|
||||
// ----- manga resync ---------------------------------------------------------
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn manga_resync_calls_service_and_returns_refreshed_detail(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "Hello").await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = common::body_json(resp).await;
|
||||
// Stub returned Updated + cover_fetched=true.
|
||||
assert_eq!(body["metadata_status"], "updated");
|
||||
assert_eq!(body["cover_fetched"], true);
|
||||
// Response includes the refreshed manga detail.
|
||||
assert_eq!(body["manga"]["id"], manga_id.to_string());
|
||||
assert_eq!(body["manga"]["title"], "Hello");
|
||||
|
||||
assert_eq!(stub.manga_calls.load(Ordering::SeqCst), 1);
|
||||
|
||||
// Audit row written.
|
||||
let (audit_count,): (i64,) =
|
||||
sqlx::query_as("SELECT count(*) FROM admin_audit WHERE action = 'manga_resync' AND target_id = $1")
|
||||
.bind(manga_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(audit_count, 1);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn manga_resync_returns_404_for_unknown_id(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/mangas/{}/resync", Uuid::new_v4()),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||
// Service must not have been called when the manga doesn't exist.
|
||||
assert_eq!(stub.manga_calls.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn manga_resync_maps_no_source_to_422(pool: PgPool) {
|
||||
let stub = StubResync::no_source();
|
||||
let h = common::harness_with_resync(pool.clone(), stub);
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "Manual upload, no crawler source").await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY);
|
||||
let body = common::body_json(resp).await;
|
||||
assert_eq!(body["error"]["details"]["manga"], "no_source");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn manga_resync_returns_503_when_daemon_disabled(pool: PgPool) {
|
||||
let h = common::harness(pool.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "Z").await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
let body = common::body_json(resp).await;
|
||||
assert_eq!(body["error"]["code"], "service_unavailable");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn manga_resync_requires_admin(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub);
|
||||
// Non-admin user.
|
||||
let (_u, cookie) = common::register_user(&h.app).await;
|
||||
let manga_id = insert_manga(&pool, "M").await;
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/mangas/{manga_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
// ----- chapter resync -------------------------------------------------------
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_resync_calls_service_and_returns_refreshed_chapter(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "M").await;
|
||||
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let body = common::body_json(resp).await;
|
||||
assert_eq!(body["outcome"], "fetched");
|
||||
assert_eq!(body["pages"], 7);
|
||||
assert_eq!(body["chapter"]["id"], chapter_id.to_string());
|
||||
assert_eq!(stub.chapter_calls.load(Ordering::SeqCst), 1);
|
||||
|
||||
let (audit_count,): (i64,) = sqlx::query_as(
|
||||
"SELECT count(*) FROM admin_audit WHERE action = 'chapter_resync' AND target_id = $1",
|
||||
)
|
||||
.bind(chapter_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(audit_count, 1);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_resync_returns_404_for_unknown_id(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/chapters/{}/resync", Uuid::new_v4()),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||
assert_eq!(stub.chapter_calls.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_resync_maps_no_source_to_422(pool: PgPool) {
|
||||
let stub = StubResync::no_source();
|
||||
let h = common::harness_with_resync(pool.clone(), stub);
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "M").await;
|
||||
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::UNPROCESSABLE_ENTITY);
|
||||
let body = common::body_json(resp).await;
|
||||
assert_eq!(body["error"]["details"]["chapter"], "no_source");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_resync_returns_503_when_daemon_disabled(pool: PgPool) {
|
||||
let h = common::harness(pool.clone());
|
||||
let (username, cookie) = common::register_user(&h.app).await;
|
||||
promote_admin(&pool, &username).await;
|
||||
let manga_id = insert_manga(&pool, "M").await;
|
||||
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_resync_requires_admin(pool: PgPool) {
|
||||
let stub = StubResync::new();
|
||||
let h = common::harness_with_resync(pool.clone(), stub);
|
||||
let (_u, cookie) = common::register_user(&h.app).await;
|
||||
let manga_id = insert_manga(&pool, "M").await;
|
||||
let chapter_id = insert_chapter(&pool, manga_id, 1, 0).await;
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_json_with_cookie(
|
||||
&format!("/api/v1/admin/chapters/{chapter_id}/resync"),
|
||||
json!({}),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::FORBIDDEN);
|
||||
}
|
||||
@@ -49,6 +49,7 @@ fn admin_test_router(pool: PgPool) -> (Router, TempDir) {
|
||||
auth,
|
||||
upload: UploadConfig::default(),
|
||||
auth_limiter,
|
||||
resync: None,
|
||||
};
|
||||
let app = Router::new()
|
||||
.nest("/api/v1", api::routes())
|
||||
|
||||
@@ -74,6 +74,10 @@ fn harness_with_auth_config(
|
||||
max_file_bytes: 256 * 1024,
|
||||
},
|
||||
auth_limiter,
|
||||
// Default harness has no crawler daemon wired up; admin resync
|
||||
// handlers return 503 in this config. Tests that need a stub
|
||||
// resync service swap it in via `harness_with_resync`.
|
||||
resync: None,
|
||||
};
|
||||
Harness { app: router(state), _storage_dir: storage_dir }
|
||||
}
|
||||
@@ -124,6 +128,37 @@ pub fn harness_with_auth_rate_limit(
|
||||
harness_with_auth_config(pool, storage, storage_dir, auth)
|
||||
}
|
||||
|
||||
/// Like [`harness`] but slots a caller-supplied [`ResyncService`] stub
|
||||
/// into `AppState.resync`. Used by the admin resync tests so the
|
||||
/// endpoint path is exercised without standing up a real Chromium.
|
||||
pub fn harness_with_resync(
|
||||
pool: PgPool,
|
||||
resync: Arc<dyn mangalord::crawler::resync::ResyncService>,
|
||||
) -> Harness {
|
||||
let storage_dir = tempfile::tempdir().expect("tempdir");
|
||||
let storage = Arc::new(LocalStorage::new(storage_dir.path()));
|
||||
let auth = AuthConfig {
|
||||
cookie_secure: false,
|
||||
..AuthConfig::default()
|
||||
};
|
||||
let auth_limiter = Arc::new(AuthRateLimiter::new(auth.rate_limit));
|
||||
let state = AppState {
|
||||
db: pool,
|
||||
storage,
|
||||
auth,
|
||||
upload: UploadConfig {
|
||||
max_request_bytes: 4 * 1024 * 1024,
|
||||
max_file_bytes: 256 * 1024,
|
||||
},
|
||||
auth_limiter,
|
||||
resync: Some(resync),
|
||||
};
|
||||
Harness {
|
||||
app: router(state),
|
||||
_storage_dir: storage_dir,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a real `Storage` and fails on the N-th `put` call so tests can
|
||||
/// assert that handlers roll their DB writes back when storage errors
|
||||
/// mid-upload. Reads and other operations delegate to `inner`.
|
||||
|
||||
@@ -829,6 +829,107 @@ async fn sync_tags_garbage_collects_orphan_user_attachments(pool: PgPool) {
|
||||
assert_eq!(orphan_rows, 0, "orphan user-attached tag should be reaped");
|
||||
}
|
||||
|
||||
// ---- list_missing_covers ---------------------------------------------------
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_missing_covers_only_returns_rows_without_cover(pool: PgPool) {
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
.await
|
||||
.unwrap();
|
||||
let with_cover = sample_manga("with", "With Cover", "h1");
|
||||
let without_cover = sample_manga("without", "No Cover", "h2");
|
||||
let _w = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/with", &with_cover)
|
||||
.await
|
||||
.unwrap();
|
||||
let nc = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/without", &without_cover)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Manually set a cover for `with` only.
|
||||
sqlx::query("UPDATE mangas SET cover_image_path = 'mangas/x/cover.jpg' WHERE id = $1")
|
||||
.bind(_w.manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
|
||||
assert_eq!(entries.len(), 1, "exactly the manga without a cover");
|
||||
assert_eq!(entries[0].manga_id, nc.manga_id);
|
||||
assert_eq!(entries[0].source_manga_key, "without");
|
||||
assert_eq!(entries[0].source_url, "https://x.example/without");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_missing_covers_skips_dropped_source_rows(pool: PgPool) {
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
.await
|
||||
.unwrap();
|
||||
let m = sample_manga("foo", "Foo", "h1");
|
||||
let up = crawler::upsert_manga_from_source(&pool, "target", "https://x.example/foo", &m)
|
||||
.await
|
||||
.unwrap();
|
||||
sqlx::query("UPDATE manga_sources SET dropped_at = NOW() WHERE manga_id = $1")
|
||||
.bind(up.manga_id)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
|
||||
assert!(
|
||||
entries.is_empty(),
|
||||
"dropped-source mangas must not be backfilled — no live source to fetch from"
|
||||
);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_missing_covers_respects_limit(pool: PgPool) {
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
.await
|
||||
.unwrap();
|
||||
for i in 0..5 {
|
||||
let key = format!("m{i}");
|
||||
let url = format!("https://x.example/{key}");
|
||||
let m = sample_manga(&key, &format!("M{i}"), &format!("h{i}"));
|
||||
let _ = crawler::upsert_manga_from_source(&pool, "target", &url, &m)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let entries = crawler::list_missing_covers(&pool, 3).await.unwrap();
|
||||
assert_eq!(entries.len(), 3, "limit caps the result set");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn list_missing_covers_deduplicates_per_manga(pool: PgPool) {
|
||||
// A manga surfaced by two sources should produce ONE backfill
|
||||
// entry, not two — otherwise the per-tick cap could be eaten by
|
||||
// duplicates and starve other mangas.
|
||||
crawler::ensure_source(&pool, "src-a", "A", "https://a.example")
|
||||
.await
|
||||
.unwrap();
|
||||
crawler::ensure_source(&pool, "src-b", "B", "https://b.example")
|
||||
.await
|
||||
.unwrap();
|
||||
let m = sample_manga("foo", "Foo", "h1");
|
||||
let up = crawler::upsert_manga_from_source(&pool, "src-a", "https://a.example/foo", &m)
|
||||
.await
|
||||
.unwrap();
|
||||
// Second source attaches to the SAME manga row.
|
||||
sqlx::query(
|
||||
"INSERT INTO manga_sources (source_id, source_manga_key, manga_id, source_url) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind("src-b")
|
||||
.bind("foo-on-b")
|
||||
.bind(up.manga_id)
|
||||
.bind("https://b.example/foo")
|
||||
.execute(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let entries = crawler::list_missing_covers(&pool, 50).await.unwrap();
|
||||
assert_eq!(entries.len(), 1, "DISTINCT ON (m.id) collapses duplicate source rows");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn re_appearing_manga_clears_dropped_at(pool: PgPool) {
|
||||
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||
|
||||
Reference in New Issue
Block a user