feat: incremental crawl mode with seed-completion gate (0.33.0)

Daemon now auto-detects mode per source: Backfill until the first
full walk records `seed_completed:<source>` in `crawler_state`, then
Incremental (newest-first, stops after N consecutive Unchanged
upserts). `CRAWLER_MODE` overrides to a fixed mode; CLI rejects
`auto` since it has no pre-run DB state.

`Source::discover` returns a lazy `DiscoverWalk` so Incremental can
break out mid-walk without prefetching pages. The drop pass and seed
marker are now gated on a true full walk — fixes a latent soft-drop
of the index tail under partial sweeps.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-28 06:41:16 +02:00
parent 51f42b03e9
commit 9b4f3525f6
11 changed files with 761 additions and 162 deletions

2
backend/Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "mangalord"
version = "0.32.0"
version = "0.33.0"
dependencies = [
"anyhow",
"argon2",

View File

@@ -1,6 +1,6 @@
[package]
name = "mangalord"
version = "0.32.0"
version = "0.33.0"
edition = "2021"
default-run = "mangalord"

View File

@@ -12,7 +12,7 @@ use tokio_util::sync::CancellationToken;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;
use crate::config::{AuthConfig, Config, CrawlerConfig, UploadConfig};
use crate::config::{AuthConfig, Config, CrawlerConfig, CrawlerModePref, UploadConfig};
use crate::crawler::browser_manager::{self, BrowserManager};
use crate::crawler::content::{self, SyncOutcome};
use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass};
@@ -20,6 +20,8 @@ use crate::crawler::jobs::JobPayload;
use crate::crawler::pipeline::{self, MetadataStats};
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::session;
use crate::crawler::source::{target as target_source, DiscoverMode};
use crate::repo;
use crate::storage::{LocalStorage, Storage};
#[derive(Clone)]
@@ -149,6 +151,8 @@ async fn spawn_crawler_daemon(
http: http.clone(),
rate: Arc::clone(&rate),
start_url: url.clone(),
mode_pref: cfg.mode,
incremental_stop_after: cfg.incremental_stop_after,
});
m
});
@@ -210,11 +214,20 @@ struct RealMetadataPass {
http: reqwest::Client,
rate: Arc<HostRateLimiters>,
start_url: String,
mode_pref: CrawlerModePref,
incremental_stop_after: usize,
}
#[async_trait]
impl MetadataPass for RealMetadataPass {
async fn run(&self) -> anyhow::Result<MetadataStats> {
let mode = resolve_mode(
&self.db,
target_source::SOURCE_ID,
self.mode_pref,
self.incremental_stop_after,
)
.await?;
pipeline::run_metadata_pass(
&self.browser_manager,
&self.db,
@@ -224,11 +237,56 @@ impl MetadataPass for RealMetadataPass {
&self.start_url,
0,
false,
mode,
)
.await
}
}
/// Pick the active mode for this tick. `Explicit` short-circuits the
/// DB lookup. `Auto` reads `seed_completed_at`: missing → Backfill
/// (initial seed for this source), present → Incremental with the
/// configured threshold.
///
/// A DB error during the Auto lookup propagates as `Err` rather than
/// silently degrading to Backfill — the daemon's `run_tick` catches
/// the error, logs, and skips the tick. That's safer than running a
/// full re-backfill (including a drop pass against stale-looking rows)
/// when the DB is flaky.
async fn resolve_mode(
db: &PgPool,
source_id: &str,
pref: CrawlerModePref,
incremental_stop_after: usize,
) -> anyhow::Result<DiscoverMode> {
match pref {
CrawlerModePref::Explicit(m) => {
tracing::info!(?m, "crawler mode: explicit (CRAWLER_MODE override)");
Ok(m)
}
CrawlerModePref::Auto => {
let seeded = repo::crawler::seed_completed_at(db, source_id)
.await
.context("seed_completed_at lookup for mode auto-detection")?;
match seeded {
Some(at) => {
tracing::info!(
seed_completed_at = %at.to_rfc3339(),
"crawler mode: auto → incremental (seed previously completed)"
);
Ok(DiscoverMode::Incremental {
stop_after_unchanged: incremental_stop_after,
})
}
None => {
tracing::info!("crawler mode: auto → backfill (no seed marker for source)");
Ok(DiscoverMode::Backfill)
}
}
}
}
}
struct RealChapterDispatcher {
browser_manager: Arc<BrowserManager>,
db: PgPool,

View File

@@ -31,6 +31,7 @@ use mangalord::crawler::content::{self, SyncOutcome};
use mangalord::crawler::pipeline;
use mangalord::crawler::rate_limit::HostRateLimiters;
use mangalord::crawler::session;
use mangalord::crawler::source::DiscoverMode;
use mangalord::storage::{LocalStorage, Storage};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
@@ -62,6 +63,8 @@ async fn main() -> anyhow::Result<()> {
let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms);
let limit = env_u64("CRAWLER_LIMIT", 0) as usize;
let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false);
let incremental_stop_after = env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize;
let mode = parse_crawler_mode(incremental_stop_after)?;
let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false);
let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize;
let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false);
@@ -140,6 +143,7 @@ async fn main() -> anyhow::Result<()> {
user_agent = ?user_agent,
proxy = ?proxy_url,
keep_open,
?mode,
storage_dir = %storage_dir.display(),
"starting crawler"
);
@@ -187,6 +191,7 @@ async fn main() -> anyhow::Result<()> {
skip_chapter_content || !session_ready,
chapter_workers,
force_refetch_chapters,
mode,
)
.await;
@@ -216,6 +221,7 @@ async fn run(
skip_chapter_content: bool,
chapter_workers: usize,
force_refetch_chapters: bool,
mode: DiscoverMode,
) -> anyhow::Result<()> {
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
if let Some(host) = cdn_host {
@@ -232,6 +238,7 @@ async fn run(
start_url,
limit,
skip_chapters,
mode,
)
.await?;
tracing::info!(?stats, "metadata pass complete");
@@ -390,6 +397,38 @@ fn resolve_start_url() -> anyhow::Result<String> {
})
}
/// Parse the CLI's `CRAWLER_MODE`. Defaults to `backfill` because the
/// binary is operator-driven (manual reseeds, force-refetches) — the
/// auto-detect logic lives in the daemon. `auto` is rejected because
/// the CLI has no DB state to consult before the run.
fn parse_crawler_mode(incremental_stop_after: usize) -> anyhow::Result<DiscoverMode> {
parse_crawler_mode_str(
std::env::var("CRAWLER_MODE").ok().as_deref(),
incremental_stop_after,
)
}
/// Pure variant of [`parse_crawler_mode`] — testable without env-var
/// mutation.
fn parse_crawler_mode_str(
raw: Option<&str>,
incremental_stop_after: usize,
) -> anyhow::Result<DiscoverMode> {
match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() {
None | Some("") | Some("backfill") => Ok(DiscoverMode::Backfill),
Some("incremental") => Ok(DiscoverMode::Incremental {
stop_after_unchanged: incremental_stop_after,
}),
Some("auto") => Err(anyhow!(
"CRAWLER_MODE=auto isn't supported by the CLI (use backfill or incremental); \
the daemon does auto-detection"
)),
Some(other) => Err(anyhow!(
"CRAWLER_MODE must be one of: backfill, incremental (got {other:?})"
)),
}
}
fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
@@ -405,3 +444,55 @@ fn env_bool(name: &str, default: bool) -> bool {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cli_mode_defaults_to_backfill_when_unset_or_blank() {
let none = parse_crawler_mode_str(None, 20).unwrap();
assert!(matches!(none, DiscoverMode::Backfill));
let blank = parse_crawler_mode_str(Some(""), 20).unwrap();
assert!(matches!(blank, DiscoverMode::Backfill));
}
#[test]
fn cli_mode_recognizes_backfill_and_incremental() {
let backfill = parse_crawler_mode_str(Some("backfill"), 20).unwrap();
assert!(matches!(backfill, DiscoverMode::Backfill));
let incremental = parse_crawler_mode_str(Some("incremental"), 9).unwrap();
assert!(matches!(
incremental,
DiscoverMode::Incremental { stop_after_unchanged: 9 }
));
}
#[test]
fn cli_mode_rejects_auto_explicitly() {
let err = parse_crawler_mode_str(Some("auto"), 20).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("daemon"),
"rejection should point operator at the daemon: {msg}"
);
}
#[test]
fn cli_mode_rejects_unknown_value() {
let err = parse_crawler_mode_str(Some("garbage"), 20).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("backfill"));
assert!(msg.contains("incremental"));
}
#[test]
fn cli_mode_is_case_insensitive_and_trims() {
let mixed = parse_crawler_mode_str(Some(" Incremental "), 4).unwrap();
assert!(matches!(
mixed,
DiscoverMode::Incremental { stop_after_unchanged: 4 }
));
}
}

View File

@@ -5,6 +5,16 @@ use chrono::NaiveTime;
use chrono_tz::Tz;
use crate::crawler::browser::LaunchOptions;
use crate::crawler::source::DiscoverMode;
/// What `CRAWLER_MODE` was set to. `Auto` is the daemon's default —
/// pick Backfill until `seed_completed_at` is written, then flip to
/// Incremental. `Explicit` forces a single mode regardless.
#[derive(Clone, Copy, Debug)]
pub enum CrawlerModePref {
Auto,
Explicit(DiscoverMode),
}
#[derive(Clone, Debug)]
pub struct AuthConfig {
@@ -77,6 +87,12 @@ pub struct CrawlerConfig {
pub user_agent: Option<String>,
pub proxy: Option<String>,
pub browser: LaunchOptions,
/// Mode preference for the metadata pass. Daemon default is `Auto`
/// (Backfill until `seed_completed_at` is written, then Incremental).
pub mode: CrawlerModePref,
/// `stop_after_unchanged` threshold supplied to Incremental in both
/// `Auto` (post-seed) and `Explicit(Incremental)` modes.
pub incremental_stop_after: usize,
}
impl Default for CrawlerConfig {
@@ -97,6 +113,8 @@ impl Default for CrawlerConfig {
user_agent: None,
proxy: None,
browser: LaunchOptions::headless(),
mode: CrawlerModePref::Auto,
incremental_stop_after: 20,
}
}
}
@@ -151,6 +169,9 @@ impl CrawlerConfig {
.parse()
.map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?,
};
let incremental_stop_after =
env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize;
let mode = parse_mode_env(incremental_stop_after)?;
Ok(Self {
daemon_enabled: env_bool("CRAWLER_DAEMON", true),
daily_at,
@@ -179,10 +200,38 @@ impl CrawlerConfig {
.ok()
.filter(|s| !s.trim().is_empty()),
browser: LaunchOptions::from_env(),
mode,
incremental_stop_after,
})
}
}
/// Parse `CRAWLER_MODE`. Empty/unset → `Auto`. Recognized values are
/// `auto`, `backfill`, and `incremental` (case-insensitive). Anything
/// else is a hard error so a typo can't silently fall through to the
/// default and mask itself.
fn parse_mode_env(incremental_stop_after: usize) -> anyhow::Result<CrawlerModePref> {
parse_mode_str(std::env::var("CRAWLER_MODE").ok().as_deref(), incremental_stop_after)
}
/// Pure variant of [`parse_mode_env`] — testable without env-var
/// mutation. Takes the raw value (or `None` if unset).
pub(crate) fn parse_mode_str(
raw: Option<&str>,
incremental_stop_after: usize,
) -> anyhow::Result<CrawlerModePref> {
match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() {
None | Some("") | Some("auto") => Ok(CrawlerModePref::Auto),
Some("backfill") => Ok(CrawlerModePref::Explicit(DiscoverMode::Backfill)),
Some("incremental") => Ok(CrawlerModePref::Explicit(DiscoverMode::Incremental {
stop_after_unchanged: incremental_stop_after,
})),
Some(other) => Err(anyhow::anyhow!(
"CRAWLER_MODE must be one of: auto, backfill, incremental (got {other:?})"
)),
}
}
fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
@@ -211,3 +260,63 @@ fn env_usize(name: &str, default: usize) -> usize {
.and_then(|s| s.parse().ok())
.unwrap_or(default)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_mode_str_defaults_to_auto_when_unset_or_blank() {
let none = parse_mode_str(None, 20).unwrap();
assert!(matches!(none, CrawlerModePref::Auto));
let blank = parse_mode_str(Some(""), 20).unwrap();
assert!(matches!(blank, CrawlerModePref::Auto));
let whitespace = parse_mode_str(Some(" "), 20).unwrap();
assert!(matches!(whitespace, CrawlerModePref::Auto));
}
#[test]
fn parse_mode_str_recognizes_each_keyword() {
let auto = parse_mode_str(Some("auto"), 20).unwrap();
assert!(matches!(auto, CrawlerModePref::Auto));
let backfill = parse_mode_str(Some("backfill"), 20).unwrap();
assert!(matches!(
backfill,
CrawlerModePref::Explicit(DiscoverMode::Backfill)
));
let incremental = parse_mode_str(Some("incremental"), 7).unwrap();
assert!(matches!(
incremental,
CrawlerModePref::Explicit(DiscoverMode::Incremental {
stop_after_unchanged: 7
})
));
}
#[test]
fn parse_mode_str_is_case_insensitive_and_trims_whitespace() {
let mixed = parse_mode_str(Some(" Incremental "), 5).unwrap();
assert!(matches!(
mixed,
CrawlerModePref::Explicit(DiscoverMode::Incremental {
stop_after_unchanged: 5
})
));
let upper = parse_mode_str(Some("BACKFILL"), 5).unwrap();
assert!(matches!(
upper,
CrawlerModePref::Explicit(DiscoverMode::Backfill)
));
}
#[test]
fn parse_mode_str_hard_errors_on_unknown_value() {
let err = parse_mode_str(Some("backfil"), 20).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("backfill"), "error should list valid values: {msg}");
assert!(msg.contains("auto"));
assert!(msg.contains("incremental"));
}
}

View File

@@ -23,14 +23,34 @@ pub struct MetadataStats {
pub mangas_failed: usize,
}
/// Decide whether the per-ref loop should stop based on the Incremental
/// streak counter. Pulled out as a pure function so the rule is unit-
/// testable without standing up the walker or DB.
pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool {
match mode {
DiscoverMode::Backfill => false,
DiscoverMode::Incremental { stop_after_unchanged } => {
consecutive_unchanged >= stop_after_unchanged
}
}
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
/// for the target source. Pure metadata; chapter content is enqueued as
/// separate `SyncChapterContent` jobs by the caller after this returns.
///
/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is
/// the "metadata-only" mode (parser doesn't extract chapters, and
/// `sync_manga_chapters` is skipped — otherwise an empty chapter list
/// would soft-drop existing rows).
/// `limit == 0` means no cap (full sweep up to the source's own bound).
/// `skip_chapters == true` is the "metadata-only" mode (parser doesn't
/// extract chapters, and `sync_manga_chapters` is skipped — otherwise an
/// empty chapter list would soft-drop existing rows).
///
/// `mode` controls the walk:
/// - `Backfill` — oldest-first, no early exit. The only mode that runs
/// the end-of-walk drop pass + writes `seed_completed_at`.
/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out
/// after N consecutive Unchanged upserts. Drop pass is skipped (the
/// tail of the index is never visited, so its `last_seen_at` is
/// stale and using it to soft-drop would be unsafe).
#[allow(clippy::too_many_arguments)]
pub async fn run_metadata_pass(
browser_manager: &BrowserManager,
@@ -41,6 +61,7 @@ pub async fn run_metadata_pass(
start_url: &str,
limit: usize,
skip_chapters: bool,
mode: DiscoverMode,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
.acquire()
@@ -74,26 +95,39 @@ pub async fn run_metadata_pass(
let run_started_at = chrono::Utc::now();
let max_refs = (limit > 0).then_some(limit);
tracing::info!(?max_refs, "discovering manga list");
let refs = source
.discover(&ctx, DiscoverMode::Backfill, max_refs)
tracing::info!(?mode, ?max_refs, "starting metadata pass");
let mut walker = source
.discover(&ctx, mode)
.await
.context("discover failed")?;
tracing::info!(count = refs.len(), "discovered manga list");
let mut stats = MetadataStats {
discovered: refs.len(),
..MetadataStats::default()
let mut stats = MetadataStats::default();
let mut consecutive_unchanged: usize = 0;
let mut walked_to_completion = false;
let mut hit_limit = false;
let mut hit_incremental_stop = false;
'outer: loop {
let batch = match walker.next_batch(&ctx).await? {
Some(b) => b,
None => {
walked_to_completion = true;
break;
}
};
for (i, r) in refs.iter().enumerate() {
for r in batch {
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
hit_limit = true;
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
break 'outer;
}
stats.discovered += 1;
tracing::info!(
idx = i + 1,
total = stats.discovered,
idx = stats.discovered,
key = %r.source_manga_key,
"fetching metadata"
);
let manga = match source.fetch_manga(&ctx, r).await {
let manga = match source.fetch_manga(&ctx, &r).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
@@ -107,7 +141,9 @@ pub async fn run_metadata_pass(
}
};
let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga)
let upsert = match repo::crawler::upsert_manga_from_source(
db, source_id, &r.url, &manga,
)
.await
{
Ok(u) => u,
@@ -181,16 +217,67 @@ pub async fn run_metadata_pass(
),
}
}
// Incremental stop: count consecutive Unchanged upserts and
// bail once the threshold is reached. New/Updated resets the
// streak so a fresh entry mid-batch doesn't accidentally trip
// the cutoff.
match upsert.status {
repo::crawler::UpsertStatus::Unchanged => {
consecutive_unchanged += 1;
}
repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => {
consecutive_unchanged = 0;
}
}
if should_stop(mode, consecutive_unchanged) {
hit_incremental_stop = true;
tracing::info!(
consecutive_unchanged,
"incremental stop threshold reached; halting walk"
);
break 'outer;
}
}
}
if limit == 0 {
// Drop pass: only when the walk truly covered everything the source
// surfaces. `last_seen_at` on un-visited rows is stale, so running
// the drop on a partial walk would soft-drop the tail of the index.
let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop;
let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill);
if full_walk {
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
}
} else {
tracing::info!(limit, "partial sync — skipping drop pass");
tracing::info!(
?mode,
hit_limit,
hit_incremental_stop,
"partial sync — skipping drop pass"
);
}
if backfill_complete {
if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await {
tracing::warn!(error = ?e, "mark_seed_completed failed");
} else {
tracing::info!(source_id, "seed marked complete");
}
}
tracing::info!(
?mode,
discovered = stats.discovered,
upserted = stats.upserted,
covers_fetched = stats.covers_fetched,
mangas_failed = stats.mangas_failed,
walked_to_completion,
hit_limit,
hit_incremental_stop,
"metadata pass complete"
);
drop(lease);
Ok(stats)
@@ -345,3 +432,36 @@ fn origin_of(url: &str) -> Option<String> {
let host = rest.split('/').next()?;
Some(format!("{scheme}://{host}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backfill_never_stops_regardless_of_streak() {
assert!(!should_stop(DiscoverMode::Backfill, 0));
assert!(!should_stop(DiscoverMode::Backfill, 100));
assert!(!should_stop(DiscoverMode::Backfill, usize::MAX));
}
#[test]
fn incremental_stops_when_streak_meets_threshold() {
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 3,
};
assert!(!should_stop(mode, 0));
assert!(!should_stop(mode, 2));
assert!(should_stop(mode, 3), "stops at exactly the threshold");
assert!(should_stop(mode, 100), "stops at anything past threshold");
}
#[test]
fn incremental_with_zero_threshold_stops_immediately() {
// A nonsensical config (no Unchanged needed to stop) shouldn't
// panic — it just means the very first ref triggers the bail.
let mode = DiscoverMode::Incremental {
stop_after_unchanged: 0,
};
assert!(should_stop(mode, 0));
}
}

View File

@@ -82,21 +82,42 @@ pub struct FetchContext<'a> {
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
}
/// Lazy iterator over discovered manga refs. The caller drives the
/// walk one batch at a time, so it can break out as soon as a
/// downstream stop condition is met (e.g. N consecutive Unchanged
/// upserts in Incremental mode) without paying for pages it won't use.
///
/// Batches are typically one source-index page each. Within a batch
/// refs are already in the right per-page order for the active mode
/// (Backfill reverses each page to oldest-first; Incremental leaves
/// the source's natural newest-first ordering).
#[async_trait]
pub trait DiscoverWalk: Send {
/// Return the next batch of refs, or `Ok(None)` when the source has
/// no more pages. The walker is single-use; calling `next_batch`
/// after `None` is allowed and continues to return `None`.
async fn next_batch(
&mut self,
ctx: &FetchContext<'_>,
) -> anyhow::Result<Option<Vec<SourceMangaRef>>>;
}
#[async_trait]
pub trait Source: Send + Sync {
/// Stable identifier — also the row key in the `sources` table.
fn id(&self) -> &'static str;
/// Returns up to `max_results` manga refs in source order. Pass
/// `None` for an uncapped walk (full backfill / incremental sweep).
/// Implementations should stop paginating as soon as the cap is
/// reached so partial runs don't pay for pages they won't use.
/// Begin discovery in `mode`. Returns a walker the caller drives
/// page-by-page via `next_batch`. The initial page-1 probe (used
/// to determine `last_page` and warm the cache for sites that
/// can't be paged without knowing the bound) happens inside this
/// call, so a fresh walker is ready to yield its first batch
/// without further setup.
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
max_results: Option<usize>,
) -> anyhow::Result<Vec<SourceMangaRef>>;
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>>;
async fn fetch_manga(
&self,

View File

@@ -7,6 +7,7 @@
//! (`td:has(label:contains("Author:"))`) are implemented by walking
//! the parsed tree.
use std::collections::VecDeque;
use std::time::Duration;
use anyhow::Context;
@@ -14,13 +15,18 @@ use async_trait::async_trait;
use sha2::{Digest, Sha256};
use super::{
DiscoverMode, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga,
SourceMangaRef,
DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef,
SourceManga, SourceMangaRef,
};
use crate::crawler::detect::{
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
};
/// `sources.id` value for this Source impl. Exposed as a const so the
/// daemon can look up per-source state (e.g. `seed_completed_at`)
/// before constructing the Source itself.
pub const SOURCE_ID: &str = "target";
/// In-loop retry budget for transient pages encountered during a single
/// `discover` walk. Bounded small because the job system itself retries
/// the whole `Discover` job on failure — these inline retries only need
@@ -60,15 +66,14 @@ impl TargetSource {
#[async_trait]
impl Source for TargetSource {
fn id(&self) -> &'static str {
"target"
SOURCE_ID
}
async fn discover(
&self,
ctx: &FetchContext<'_>,
mode: DiscoverMode,
max_results: Option<usize>,
) -> anyhow::Result<Vec<SourceMangaRef>> {
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
// Always visit page 1 first because that's the only way to
// discover `last_page`. Retry it on transient — a broken first
// page would otherwise abort the whole walk before we've even
@@ -85,15 +90,7 @@ impl Source for TargetSource {
};
let backfill = matches!(mode, DiscoverMode::Backfill);
let order: Vec<i32> = match (last_page, backfill) {
(None, _) => vec![1],
// Backfill = oldest-first: walk pages last → 1, then
// reverse within each page (the listing is update_date
// DESC, so the bottom of the last page is the oldest
// entry the source still surfaces).
(Some(last), true) => (1..=last).rev().collect(),
(Some(last), false) => (1..=last).collect(),
};
let order = build_page_order(last_page, backfill);
tracing::info!(
?mode,
last_page = ?last_page,
@@ -101,40 +98,12 @@ impl Source for TargetSource {
"walking pagination"
);
let mut all = Vec::new();
for page_num in order {
// Page 1 is already cached from the last_page probe — reuse
// it rather than navigating twice. Every other page goes
// through the retry helper so a single broken page mid-walk
// doesn't silently drop its mangas from the result.
let mut page_refs = if page_num == 1 {
let doc = scraper::Html::parse_document(&first_html);
parse_manga_list_from(&doc)?
} else {
retry_on_transient(
|| async {
let url = page_url(&self.base_url, page_num);
let html = navigate(ctx, &url).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
};
if backfill {
page_refs.reverse();
}
tracing::info!(page_num, count = page_refs.len(), "page walked");
all.extend(page_refs);
if cap_reached(&all, max_results) {
tracing::info!(cap = ?max_results, "max_results reached; halting pagination");
break;
}
}
Ok(truncate_to_cap(all, max_results))
Ok(Box::new(TargetSourceWalker {
base_url: self.base_url.clone(),
backfill,
pages_remaining: order,
first_page_html: Some(first_html),
}))
}
async fn fetch_manga(
@@ -168,15 +137,81 @@ impl Source for TargetSource {
}
}
fn cap_reached<T>(buf: &[T], max: Option<usize>) -> bool {
matches!(max, Some(m) if buf.len() >= m)
/// Build the queue of page numbers `TargetSource::discover` will walk.
/// Backfill is oldest-first: pages `last..=1` (within each page the
/// walker reverses entries, since the source orders by update_date
/// DESC). Incremental is newest-first: pages `1..=last` in natural
/// order. If `last_page` is unknown (source surfaces no pagination)
/// only page 1 is visited.
fn build_page_order(last_page: Option<i32>, backfill: bool) -> VecDeque<i32> {
match (last_page, backfill) {
(None, _) => VecDeque::from([1]),
(Some(last), true) => (1..=last).rev().collect(),
(Some(last), false) => (1..=last).collect(),
}
}
fn truncate_to_cap<T>(mut buf: Vec<T>, max: Option<usize>) -> Vec<T> {
if let Some(m) = max {
buf.truncate(m);
/// Walker returned by [`TargetSource::discover`]. Pops one source-index
/// page per `next_batch` call. Page 1's HTML is cached at construction
/// time (the discover call needed it to read `last_page` anyway) so the
/// batch covering page 1 doesn't re-fetch.
struct TargetSourceWalker {
base_url: String,
backfill: bool,
pages_remaining: VecDeque<i32>,
first_page_html: Option<String>,
}
#[async_trait]
impl DiscoverWalk for TargetSourceWalker {
async fn next_batch(
&mut self,
ctx: &FetchContext<'_>,
) -> anyhow::Result<Option<Vec<SourceMangaRef>>> {
let Some(page_num) = self.pages_remaining.pop_front() else {
return Ok(None);
};
let mut page_refs = if page_num == 1 {
// Reuse the cached page-1 HTML from the initial probe. Take
// it (rather than clone) so a malformed page-order queue
// that re-visits page 1 still falls back to a real fetch.
match self.first_page_html.take() {
Some(html) => {
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)?
}
None => {
retry_on_transient(
|| async {
let html = navigate(ctx, self.base_url.as_str()).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
}
}
} else {
retry_on_transient(
|| async {
let url = page_url(&self.base_url, page_num);
let html = navigate(ctx, &url).await?;
let doc = scraper::Html::parse_document(&html);
parse_manga_list_from(&doc)
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
)
.await?
};
if self.backfill {
page_refs.reverse();
}
tracing::info!(page_num, count = page_refs.len(), "page walked");
Ok(Some(page_refs))
}
buf
}
/// Single point of rate-limited navigation. Every Source request goes
@@ -922,4 +957,37 @@ mod tests {
let err = parse_manga_detail(html, "x", true).expect_err("expected Transient");
assert!(err.is_transient(), "got non-transient: {err}");
}
#[test]
fn build_page_order_backfill_is_last_to_one() {
// Backfill walks pages oldest-first: queue is [last, last-1, ..., 1]
// so popping from the front yields the last page first.
let order = build_page_order(Some(3), true);
assert_eq!(Vec::from(order), vec![3, 2, 1]);
}
#[test]
fn build_page_order_incremental_is_one_to_last() {
// Incremental walks newest-first in natural source order.
let order = build_page_order(Some(3), false);
assert_eq!(Vec::from(order), vec![1, 2, 3]);
}
#[test]
fn build_page_order_falls_back_to_page_one_only_without_pagination() {
let backfill = build_page_order(None, true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(None, false);
assert_eq!(Vec::from(incremental), vec![1]);
}
#[test]
fn build_page_order_single_page_index_yields_one_entry() {
// Sources with exactly one page should not yield duplicates
// regardless of mode.
let backfill = build_page_order(Some(1), true);
assert_eq!(Vec::from(backfill), vec![1]);
let incremental = build_page_order(Some(1), false);
assert_eq!(Vec::from(incremental), vec![1]);
}
}

View File

@@ -412,6 +412,53 @@ pub async fn sync_manga_chapters(
Ok(diff)
}
/// Record that a complete Backfill walk has finished for `source_id`.
/// The presence of this row is what the daemon's mode auto-detection
/// uses to flip from Backfill to Incremental on subsequent ticks.
///
/// Keyed `seed_completed:<source_id>` in `crawler_state`. JSON payload
/// stores the timestamp so we can surface "last fully reseeded at" in
/// future ops tooling without another migration.
pub async fn mark_seed_completed(
pool: &PgPool,
source_id: &str,
at: DateTime<Utc>,
) -> sqlx::Result<()> {
let key = format!("seed_completed:{source_id}");
sqlx::query(
"INSERT INTO crawler_state (key, value, updated_at) \
VALUES ($1, $2, now()) \
ON CONFLICT (key) DO UPDATE \
SET value = EXCLUDED.value, updated_at = now()",
)
.bind(&key)
.bind(serde_json::json!({ "at": at.to_rfc3339() }))
.execute(pool)
.await?;
Ok(())
}
/// Read the timestamp written by [`mark_seed_completed`], if any.
/// `None` means no complete Backfill has ever finished for this
/// source — the daemon should run Backfill on the next tick.
pub async fn seed_completed_at(
pool: &PgPool,
source_id: &str,
) -> sqlx::Result<Option<DateTime<Utc>>> {
let key = format!("seed_completed:{source_id}");
let row: Option<serde_json::Value> =
sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1")
.bind(&key)
.fetch_optional(pool)
.await?;
Ok(row.and_then(|v| {
v.get("at")
.and_then(|s| s.as_str())
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
}))
}
pub async fn mark_dropped_mangas(
pool: &PgPool,
source_id: &str,

View File

@@ -0,0 +1,85 @@
//! Integration tests for the incremental-mode coordination state:
//! `mark_seed_completed` / `seed_completed_at` round-trip via the
//! `crawler_state` table.
//!
//! End-to-end pipeline behavior (walker + stop-on-Unchanged) requires
//! a real `chromiumoxide::Browser` to construct a `FetchContext`, so
//! the live integration of that path is covered by
//! `crawler_browser_smoke.rs` instead. The pure stop logic itself is
//! unit-tested in `crawler::pipeline::tests`.
use chrono::Utc;
use mangalord::repo::crawler;
use sqlx::PgPool;
#[sqlx::test(migrations = "./migrations")]
async fn seed_completed_at_none_before_any_run(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let res = crawler::seed_completed_at(&pool, "target").await.unwrap();
assert!(res.is_none(), "fresh source has no seed marker");
}
#[sqlx::test(migrations = "./migrations")]
async fn mark_seed_completed_then_read_round_trips_timestamp(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let at = Utc::now();
crawler::mark_seed_completed(&pool, "target", at)
.await
.unwrap();
let read = crawler::seed_completed_at(&pool, "target")
.await
.unwrap()
.expect("marker present after mark");
// RFC3339 round-trip is millisecond-precise on chrono::Utc; allow a
// 1ms tolerance to absorb postgres jsonb whitespace canonicalization.
let drift = (read - at).num_milliseconds().abs();
assert!(drift <= 1, "round-trip drift: {drift}ms (at={at}, read={read})");
}
#[sqlx::test(migrations = "./migrations")]
async fn mark_seed_completed_overwrites_previous_value(pool: PgPool) {
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
let first = Utc::now() - chrono::Duration::hours(1);
let second = Utc::now();
crawler::mark_seed_completed(&pool, "target", first)
.await
.unwrap();
crawler::mark_seed_completed(&pool, "target", second)
.await
.unwrap();
let read = crawler::seed_completed_at(&pool, "target")
.await
.unwrap()
.expect("marker present");
let drift = (read - second).num_milliseconds().abs();
assert!(drift <= 1, "should reflect the latest mark, not the first");
}
#[sqlx::test(migrations = "./migrations")]
async fn seed_completed_is_per_source(pool: PgPool) {
// Two sources, only one is marked complete. The other must still
// report None — the key is namespaced by source_id.
crawler::ensure_source(&pool, "target", "T", "https://x.example")
.await
.unwrap();
crawler::ensure_source(&pool, "other", "O", "https://y.example")
.await
.unwrap();
crawler::mark_seed_completed(&pool, "target", Utc::now())
.await
.unwrap();
assert!(crawler::seed_completed_at(&pool, "target")
.await
.unwrap()
.is_some());
assert!(crawler::seed_completed_at(&pool, "other")
.await
.unwrap()
.is_none());
}

View File

@@ -1,6 +1,6 @@
{
"name": "mangalord-frontend",
"version": "0.32.0",
"version": "0.33.0",
"private": true,
"type": "module",
"scripts": {