feat: in-process crawler daemon with cron and worker pool (0.28.0)

The backend now boots an internal crawler daemon that runs a daily
metadata pass (CRAWLER_DAILY_AT in CRAWLER_TZ, advisory-lock guarded
for multi-replica safety) and drains SyncChapterContent jobs from
crawler_jobs through a worker pool. Chromium launches lazily on first
job and is torn down after CRAWLER_IDLE_TIMEOUT_S seconds of inactivity.

Modules:
- crawler::browser_manager — lazy-launch / idle-teardown wrapper
  around browser::Handle, with an on_launch hook that re-injects
  PHPSESSID on every fresh Chromium spawn.
- crawler::pipeline — run_metadata_pass (the shared discover/upsert
  /cover/sync-chapters loop) and the enqueue_bookmarked_pending helper
  used by the cron tick.
- crawler::daemon — cron task + worker pool, behind two trait seams
  (MetadataPass, ChapterDispatcher) so tests can inject stubs without
  standing up Chromium or a live source.

Behavior:
- CRAWLER_DAEMON=false skips daemon spawn entirely (default for tests).
- Catch-up tick fires on startup if the last persisted slot was missed.
- A SyncOutcome::SessionExpired sets a sticky AtomicBool; workers
  idle until operator restart with a refreshed PHPSESSID.
- Worker dispatch wrapped in catch_unwind so a panicking handler
  marks the job failed instead of taking down the worker.
- Migration 0015 adds a small crawler_state k-v table for the
  last_metadata_tick_at watermark.

Dep additions: chrono-tz (IANA TZ parsing).

CLI (bin/crawler) reuses pipeline::run_metadata_pass and now holds
the browser via BrowserManager so the on_launch session injection
flow stays in one place. Inline chapter-content sync semantics are
unchanged — the queue is for the daemon, force-refetches and manual
backfills still bypass it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-25 20:32:02 +02:00
parent 93c7fd63fc
commit 9fe0f26d75
14 changed files with 2162 additions and 309 deletions

View File

@@ -15,6 +15,7 @@
//! caller-provided.
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use chromiumoxide::browser::{Browser, BrowserConfig};
@@ -95,25 +96,49 @@ pub(crate) fn parse_args(s: &str) -> Vec<String> {
/// Owned browser plus the spawned task that drives its CDP event loop.
/// Dropping `Handle` without calling `close` leaks the Chromium process
/// — always call `close().await` in production paths.
///
/// The browser is stored behind an `Arc` so it can be shared across
/// worker tasks (via [`Handle::shared`]) without copying. `Browser::new_page`
/// only needs `&self`, so multiple workers can drive the same browser
/// concurrently as long as the manager keeps the `Arc` alive.
pub struct Handle {
browser: Browser,
browser: Arc<Browser>,
driver: JoinHandle<()>,
}
impl Handle {
/// Borrow the browser. Equivalent to `&*handle.shared()`.
pub fn browser(&self) -> &Browser {
&self.browser
}
pub fn browser_mut(&mut self) -> &mut Browser {
&mut self.browser
/// Clone the shared handle. Workers hold these to call `new_page`
/// concurrently. The browser only exits when the last `Arc<Browser>`
/// is dropped (kill-on-drop), or when `close()` is called on the
/// originating `Handle` while it is the sole holder.
pub fn shared(&self) -> Arc<Browser> {
Arc::clone(&self.browser)
}
/// Closes the browser and awaits the driver task. Safe to call
/// multiple times — subsequent calls are no-ops.
pub async fn close(mut self) -> anyhow::Result<()> {
let _ = self.browser.close().await;
let _ = self.browser.wait().await;
/// Closes the browser and awaits the driver task. If other Arcs to
/// the browser are still alive we fall back to drop-kills-Chromium
/// semantics and just join the driver — this is the rare case where
/// shutdown raced an outstanding worker; the OS-level kill is the
/// safety net.
pub async fn close(self) -> anyhow::Result<()> {
match Arc::try_unwrap(self.browser) {
Ok(mut owned) => {
let _ = owned.close().await;
let _ = owned.wait().await;
}
Err(shared) => {
tracing::warn!(
strong_count = Arc::strong_count(&shared),
"Handle::close while Arc<Browser> still shared — relying on kill-on-drop"
);
drop(shared);
}
}
let _ = self.driver.await;
Ok(())
}
@@ -184,7 +209,10 @@ pub async fn launch(options: LaunchOptions) -> anyhow::Result<Handle> {
}
});
Ok(Handle { browser, driver })
Ok(Handle {
browser: Arc::new(browser),
driver,
})
}
fn cache_dir() -> anyhow::Result<PathBuf> {

View File

@@ -0,0 +1,262 @@
//! Lazy-launch / idle-teardown Chromium manager for the daemon.
//!
//! The first worker that calls [`BrowserManager::acquire`] triggers a real
//! Chromium launch (and the `on_launch` hook — used to re-inject the
//! PHPSESSID cookie on every fresh process). Each acquire bumps an active
//! counter; the returned [`BrowserLease`] decrements it on drop.
//!
//! When the active counter hits zero, a background reaper task waits
//! `idle_timeout`. If still zero on wake, it closes Chromium and clears the
//! cached handle. The next acquire re-launches.
//!
//! `idle_timeout = Duration::ZERO` disables the reaper — Chromium stays alive
//! until [`BrowserManager::shutdown`].
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use chromiumoxide::browser::Browser;
use futures_util::future::BoxFuture;
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::crawler::browser::{self, LaunchOptions};
/// Hook invoked on every fresh launch with the new browser. Typically used
/// to re-inject PHPSESSID + run the session probe. Errors abort the
/// `acquire` that triggered the launch — the next acquire will re-launch.
pub type OnLaunch =
Arc<dyn Fn(Arc<Browser>) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
/// Returns an `OnLaunch` that does nothing — useful when no session is
/// configured (e.g. CLI metadata-only runs).
pub fn noop_on_launch() -> OnLaunch {
Arc::new(|_| Box::pin(async { Ok(()) }))
}
/// Decoupled active-lease tracker. Owns the atomic counter and the idle
/// notifier so the wiring is unit-testable without standing up a real
/// `BrowserManager` (which would require launching Chromium).
#[derive(Default)]
pub(crate) struct ActiveTracker {
counter: AtomicUsize,
idle_signal: Notify,
}
impl ActiveTracker {
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub(crate) fn acquire(self: &Arc<Self>) {
self.counter.fetch_add(1, Ordering::AcqRel);
}
pub(crate) fn release(self: &Arc<Self>) {
if self.counter.fetch_sub(1, Ordering::AcqRel) == 1 {
self.idle_signal.notify_one();
}
}
pub(crate) fn current(&self) -> usize {
self.counter.load(Ordering::Acquire)
}
pub(crate) fn idle_signal(&self) -> &Notify {
&self.idle_signal
}
}
pub struct BrowserManager {
inner: Mutex<Inner>,
active: Arc<ActiveTracker>,
launch_opts: LaunchOptions,
idle_timeout: Duration,
on_launch: OnLaunch,
}
struct Inner {
handle: Option<browser::Handle>,
shared: Option<Arc<Browser>>,
}
impl BrowserManager {
pub fn new(
launch_opts: LaunchOptions,
idle_timeout: Duration,
on_launch: OnLaunch,
) -> Arc<Self> {
Arc::new(Self {
inner: Mutex::new(Inner {
handle: None,
shared: None,
}),
active: ActiveTracker::new(),
launch_opts,
idle_timeout,
on_launch,
})
}
/// Acquire a shared browser lease. The first acquire after a teardown
/// launches a fresh Chromium (and runs `on_launch`); subsequent acquires
/// while a process is alive just bump the counter and clone the `Arc`.
pub async fn acquire(&self) -> anyhow::Result<BrowserLease> {
let mut guard = self.inner.lock().await;
if guard.handle.is_none() {
let handle = browser::launch(self.launch_opts.clone())
.await
.context("BrowserManager: launch chromium")?;
let shared = handle.shared();
// Run the on-launch hook before publishing the handle so a session
// probe failure doesn't leave a half-initialized browser behind.
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
// Close the just-launched browser since we won't be using it.
let _ = handle.close().await;
return Err(e.context("BrowserManager: on_launch hook failed"));
}
guard.handle = Some(handle);
guard.shared = Some(shared);
}
let browser = guard
.shared
.as_ref()
.expect("shared set above")
.clone();
self.active.acquire();
Ok(BrowserLease {
browser,
active: Arc::clone(&self.active),
})
}
/// Forcefully close the cached browser regardless of active count.
/// Used on daemon shutdown. After this returns the next acquire will
/// re-launch from scratch.
pub async fn shutdown(&self) {
let mut guard = self.inner.lock().await;
guard.shared = None;
if let Some(handle) = guard.handle.take() {
let _ = handle.close().await;
}
}
fn idle_timeout(&self) -> Duration {
self.idle_timeout
}
fn active(&self) -> Arc<ActiveTracker> {
Arc::clone(&self.active)
}
}
/// Background reaper. Returns immediately when `idle_timeout == 0`.
/// Otherwise spawns a task that:
/// 1. Waits on `idle_signal` (woken when active hits zero).
/// 2. Sleeps `idle_timeout`.
/// 3. Re-checks the counter under the mutex — if still zero, takes the
/// handle and closes it.
///
/// Repeats forever until `cancel` fires.
pub fn spawn_idle_reaper(mgr: Arc<BrowserManager>, cancel: CancellationToken) -> JoinHandle<()> {
tokio::spawn(async move {
if mgr.idle_timeout().is_zero() {
// Block until cancellation, then exit.
cancel.cancelled().await;
return;
}
let active = mgr.active();
loop {
tokio::select! {
_ = cancel.cancelled() => return,
_ = active.idle_signal().notified() => {}
}
if active.current() > 0 {
continue;
}
tokio::select! {
_ = cancel.cancelled() => return,
_ = tokio::time::sleep(mgr.idle_timeout()) => {}
}
let mut guard = mgr.inner.lock().await;
if active.current() > 0 {
// A worker grabbed a lease during the sleep — abort teardown.
continue;
}
let handle = guard.handle.take();
guard.shared = None;
drop(guard);
if let Some(h) = handle {
let _ = h.close().await;
tracing::info!("BrowserManager: idle teardown — Chromium closed");
}
}
})
}
/// A worker-side handle that keeps the browser alive while in scope.
/// `Deref<Target = Browser>` so callers can pass `&*lease` to APIs that
/// expect `&Browser`.
pub struct BrowserLease {
browser: Arc<Browser>,
active: Arc<ActiveTracker>,
}
impl Deref for BrowserLease {
type Target = Browser;
fn deref(&self) -> &Browser {
&self.browser
}
}
impl Drop for BrowserLease {
fn drop(&mut self) {
self.active.release();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicBool;
#[test]
fn noop_on_launch_is_send_sync() {
fn assert_send_sync<T: Send + Sync>(_: &T) {}
let h = noop_on_launch();
assert_send_sync(&h);
}
#[tokio::test]
async fn active_tracker_signals_idle_only_on_zero_transition() {
let tracker = ActiveTracker::new();
let signaled = Arc::new(AtomicBool::new(false));
{
let s = Arc::clone(&signaled);
let t = Arc::clone(&tracker);
tokio::spawn(async move {
t.idle_signal().notified().await;
s.store(true, Ordering::Release);
});
}
tracker.acquire();
tracker.acquire();
assert_eq!(tracker.current(), 2);
tracker.release();
assert_eq!(tracker.current(), 1);
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(!signaled.load(Ordering::Acquire), "no idle signal at count 1");
tracker.release();
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(tracker.current(), 0);
assert!(
signaled.load(Ordering::Acquire),
"idle signal fires on 1 -> 0 transition"
);
}
}

View File

@@ -0,0 +1,633 @@
//! In-process crawler daemon.
//!
//! Owns a cron task that fires a daily metadata pass and N worker tasks
//! that drain `SyncChapterContent` jobs from `crawler_jobs`. The dispatch
//! seams ([`MetadataPass`], [`ChapterDispatcher`]) are traits so tests can
//! inject stubs without standing up a real Chromium / `Source` impl.
//!
//! ## Cron
//!
//! Each tick:
//! 1. Acquire a Postgres advisory lock on a dedicated pool connection
//! (multi-replica safety). Skip the tick on contention.
//! 2. Call [`MetadataPass::run`] (typically `pipeline::run_metadata_pass`).
//! 3. Enqueue `SyncChapterContent` jobs for any bookmarked manga whose
//! chapters still have `page_count = 0`.
//! 4. Reap `done` jobs older than `retention_days`.
//! 5. Persist `last_metadata_tick_at` and release the lock.
//!
//! If the last persisted tick is older than the most recent scheduled slot
//! (e.g. backend was down at midnight), the daemon fires immediately on
//! startup before resuming the regular schedule.
//!
//! ## Workers
//!
//! Each worker leases one chapter-content job at a time, dispatches via the
//! [`ChapterDispatcher`], and acks `done` / `failed` / re-`pending` based on
//! the outcome. A `SessionExpired` outcome flips the sticky
//! `session_expired` flag — all workers idle while it's set (until operator
//! restart with a refreshed PHPSESSID).
//!
//! Worker dispatch is wrapped in `catch_unwind` so a panicking handler
//! marks the job failed instead of taking down the worker task.
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Datelike, NaiveTime, TimeZone, Timelike, Utc};
use chrono_tz::Tz;
use futures_util::FutureExt;
use serde_json::json;
use sqlx::PgPool;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use crate::crawler::content::SyncOutcome;
use crate::crawler::jobs::{self, JobPayload, Lease, KIND_SYNC_CHAPTER_CONTENT};
use crate::crawler::pipeline;
/// Fixed `pg_try_advisory_lock` key. ASCII "MANGALRD" interpreted as a
/// big-endian i64. Hardcoded so every replica agrees on the lock identity
/// without consulting config.
pub const CRON_LOCK_KEY: i64 = 0x4D414E47414C5244;
const STATE_KEY_LAST_TICK: &str = "last_metadata_tick_at";
#[async_trait]
pub trait MetadataPass: Send + Sync {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats>;
}
#[async_trait]
pub trait ChapterDispatcher: Send + Sync {
async fn dispatch(&self, payload: JobPayload) -> anyhow::Result<SyncOutcome>;
}
/// Configuration for [`spawn`]. Use `None` for `metadata_pass` to disable
/// the cron entirely (worker-pool-only mode — useful when only the
/// bookmark-triggered enqueue path is wanted).
pub struct DaemonConfig {
pub metadata_pass: Option<Arc<dyn MetadataPass>>,
pub dispatcher: Arc<dyn ChapterDispatcher>,
pub chapter_workers: usize,
pub daily_at: NaiveTime,
pub tz: Tz,
pub retention_days: u32,
pub session_expired: Arc<AtomicBool>,
/// Tasks that should run alongside the cron + workers and be cancelled
/// on shutdown. Used to hand the daemon ownership of the browser
/// manager's idle reaper.
pub extra_tasks: Vec<tokio::task::JoinHandle<()>>,
}
pub struct DaemonHandle {
cancel: CancellationToken,
join: JoinSet<()>,
extra: Vec<tokio::task::JoinHandle<()>>,
}
impl DaemonHandle {
/// Trigger shutdown and await all worker / cron / extra tasks.
pub async fn shutdown(mut self) {
self.cancel.cancel();
while self.join.join_next().await.is_some() {}
for task in self.extra.drain(..) {
let _ = task.await;
}
}
/// Cancellation token that drives shutdown — exposed so callers
/// (`app::spawn_crawler_daemon`) can hand the same token to auxiliary
/// tasks (e.g. the BrowserManager idle reaper) and have them stop on
/// the daemon's signal.
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
}
/// Spawn the daemon. Returns immediately; tasks run in the background.
/// Pass an external [`CancellationToken`] so auxiliary tasks (e.g. a
/// BrowserManager idle reaper) can share the same shutdown signal —
/// typically created in the caller, cloned into both spawns.
pub fn spawn(pool: PgPool, cancel: CancellationToken, cfg: DaemonConfig) -> DaemonHandle {
let mut join = JoinSet::new();
let DaemonConfig {
metadata_pass,
dispatcher,
chapter_workers,
daily_at,
tz,
retention_days,
session_expired,
extra_tasks,
} = cfg;
if let Some(metadata) = metadata_pass {
let ctx = CronContext {
pool: pool.clone(),
cancel: cancel.clone(),
daily_at,
tz,
retention_days,
metadata,
};
join.spawn(async move { ctx.run().await });
} else {
tracing::info!("crawler daemon: no metadata_pass — cron disabled");
}
for worker_id in 0..chapter_workers.max(1) {
let ctx = WorkerContext {
pool: pool.clone(),
cancel: cancel.clone(),
dispatcher: Arc::clone(&dispatcher),
session_expired: Arc::clone(&session_expired),
id: worker_id,
};
join.spawn(async move { ctx.run().await });
}
DaemonHandle {
cancel,
join,
extra: extra_tasks,
}
}
// ---------------------------------------------------------------------------
// Cron
// ---------------------------------------------------------------------------
struct CronContext {
pool: PgPool,
cancel: CancellationToken,
daily_at: NaiveTime,
tz: Tz,
retention_days: u32,
metadata: Arc<dyn MetadataPass>,
}
impl CronContext {
async fn run(self) {
// On startup, fire immediately if the most recent slot has already
// passed and we never recorded a tick for it.
let now = Utc::now();
let mut catchup = match read_last_tick(&self.pool).await {
Ok(Some(last)) => previous_fire(now, self.daily_at, self.tz) > last,
Ok(None) => true,
Err(e) => {
tracing::warn!(?e, "cron: read_last_tick failed; assuming no catch-up");
false
}
};
loop {
if catchup {
tracing::info!("cron: catch-up tick (missed scheduled slot)");
self.run_tick().await;
catchup = false;
continue;
}
// Recompute next-fire from now() each iteration so clock jumps
// (NTP step, suspend/resume) don't strand us on a stale instant.
let next = next_fire(Utc::now(), self.daily_at, self.tz);
let wait = (next - Utc::now()).to_std().unwrap_or(Duration::ZERO);
tracing::info!(
next_fire_utc = %next.to_rfc3339(),
wait_seconds = wait.as_secs(),
"cron: sleeping until next slot"
);
tokio::select! {
_ = tokio::time::sleep(wait) => {}
_ = self.cancel.cancelled() => {
tracing::info!("cron: shutdown");
return;
}
}
self.run_tick().await;
}
}
async fn run_tick(&self) {
let mut conn = match self.pool.acquire().await {
Ok(c) => c,
Err(e) => {
tracing::error!(?e, "cron: acquire conn failed; skipping tick");
return;
}
};
// pg_try_advisory_lock is session-scoped — we must hold the same
// connection for the unlock or the call silently no-ops on a
// different connection from the pool.
let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
.bind(CRON_LOCK_KEY)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
if !acquired {
tracing::info!("cron: tick skipped — another replica holds the lock");
return;
}
match self.metadata.run().await {
Ok(stats) => tracing::info!(?stats, "cron: metadata pass done"),
Err(e) => tracing::error!(?e, "cron: metadata pass failed"),
}
match pipeline::enqueue_bookmarked_pending(&self.pool).await {
Ok(summary) => tracing::info!(?summary, "cron: enqueued bookmarked-pending"),
Err(e) => tracing::error!(?e, "cron: enqueue_bookmarked_pending failed"),
}
match jobs::reap_done(&self.pool, self.retention_days).await {
Ok(n) => tracing::info!(reaped = n, "cron: done-job reaper finished"),
Err(e) => tracing::error!(?e, "cron: done-job reaper failed"),
}
if let Err(e) = write_last_tick(&self.pool, Utc::now()).await {
tracing::warn!(?e, "cron: persist last_metadata_tick_at failed");
}
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(CRON_LOCK_KEY)
.execute(&mut *conn)
.await;
drop(conn);
}
}
// ---------------------------------------------------------------------------
// Workers
// ---------------------------------------------------------------------------
struct WorkerContext {
pool: PgPool,
cancel: CancellationToken,
dispatcher: Arc<dyn ChapterDispatcher>,
session_expired: Arc<AtomicBool>,
id: usize,
}
impl WorkerContext {
async fn run(self) {
loop {
if self.cancel.is_cancelled() {
tracing::info!(worker = self.id, "worker: shutdown");
return;
}
if self.session_expired.load(Ordering::Acquire) {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => continue,
_ = self.cancel.cancelled() => return,
}
}
let leases = match jobs::lease(
&self.pool,
Some(KIND_SYNC_CHAPTER_CONTENT),
1,
Duration::from_secs(60),
)
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(worker = self.id, ?e, "worker: lease failed");
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => continue,
_ = self.cancel.cancelled() => return,
}
}
};
let Some(lease) = leases.into_iter().next() else {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => continue,
_ = self.cancel.cancelled() => return,
}
};
self.process_lease(lease).await;
}
}
async fn process_lease(&self, lease: Lease) {
// Consumer-side dedup safety net: if the chapter already has pages
// (because a force-refetch race or a job that was re-enqueued
// after a previous one finished), ack done without re-fetching.
if let JobPayload::SyncChapterContent { chapter_id, .. } = &lease.payload {
let page_count: Option<i32> = sqlx::query_scalar(
"SELECT page_count FROM chapters WHERE id = $1",
)
.bind(chapter_id)
.fetch_optional(&self.pool)
.await
.ok()
.flatten();
if matches!(page_count, Some(n) if n > 0) {
let _ = jobs::ack_done(&self.pool, lease.id).await;
return;
}
}
let outcome = AssertUnwindSafe(self.dispatcher.dispatch(lease.payload.clone()))
.catch_unwind()
.await;
match outcome {
Ok(Ok(SyncOutcome::Fetched { .. } | SyncOutcome::Skipped)) => {
let _ = jobs::ack_done(&self.pool, lease.id).await;
}
Ok(Ok(SyncOutcome::SessionExpired)) => {
tracing::error!(
worker = self.id,
lease_id = %lease.id,
"session expired — workers will idle until restart"
);
self.session_expired.store(true, Ordering::Release);
let _ = jobs::release(&self.pool, lease.id).await;
}
Ok(Err(e)) => {
tracing::warn!(
worker = self.id,
lease_id = %lease.id,
error = ?e,
"worker: dispatch error — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
&format!("{e:#}"),
lease.attempts,
lease.max_attempts,
)
.await;
}
Err(_panic) => {
tracing::error!(
worker = self.id,
lease_id = %lease.id,
"worker: dispatcher panicked — ack failed"
);
let _ = jobs::ack_failed(
&self.pool,
lease.id,
"worker panicked",
lease.attempts,
lease.max_attempts,
)
.await;
}
}
}
}
// ---------------------------------------------------------------------------
// Cron timing primitives
// ---------------------------------------------------------------------------
/// Compute the next UTC instant when `daily_at` (interpreted in `tz`) will
/// fire, strictly after `now`. Handles DST gaps (spring-forward) by
/// advancing past the gap; on DST overlap (fall-back) picks the later
/// instant so the job runs once, not twice.
pub fn next_fire(now: DateTime<Utc>, daily_at: NaiveTime, tz: Tz) -> DateTime<Utc> {
let now_local = now.with_timezone(&tz);
// Start with today's slot in the local TZ.
let mut candidate = local_at(now_local.date_naive(), daily_at, tz);
// If today's slot is in the past (or now), roll forward day-by-day.
while candidate <= now {
let next_day = candidate
.with_timezone(&tz)
.date_naive()
.succ_opt()
.unwrap_or_else(|| {
// Defensive: succ_opt only fails at chrono's max date.
chrono::NaiveDate::from_ymd_opt(
candidate.year(),
candidate.month(),
candidate.day(),
)
.expect("valid date")
});
candidate = local_at(next_day, daily_at, tz);
}
candidate
}
/// The most recent fire instant at or before `now`. Used to detect missed
/// slots after a restart.
pub fn previous_fire(now: DateTime<Utc>, daily_at: NaiveTime, tz: Tz) -> DateTime<Utc> {
let now_local = now.with_timezone(&tz);
let today = local_at(now_local.date_naive(), daily_at, tz);
if today <= now {
return today;
}
let yesterday = now_local
.date_naive()
.pred_opt()
.expect("a day before now");
local_at(yesterday, daily_at, tz)
}
/// Resolve a local date+time to a UTC instant in `tz`, navigating DST
/// edges deterministically:
/// - `LocalResult::Single` → that instant.
/// - `LocalResult::Ambiguous(_, latest)` → the later instant (fall-back
/// hour). Picking latest means a daily job fires once across the
/// repeated hour, not twice.
/// - `LocalResult::None` → spring-forward gap. Advance the local time
/// by 1 minute and try again, repeating up to 120 times (so the worst
/// case is still well inside an hour-long gap).
fn local_at(date: chrono::NaiveDate, time: NaiveTime, tz: Tz) -> DateTime<Utc> {
use chrono::LocalResult;
for offset_minutes in 0..120 {
let mut t = time;
if offset_minutes > 0 {
let added = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
((time.num_seconds_from_midnight() as i64 + offset_minutes * 60) % 86_400) as u32,
0,
)
.unwrap_or(time);
t = added;
}
let naive = date.and_time(t);
match tz.from_local_datetime(&naive) {
LocalResult::Single(dt) => return dt.with_timezone(&Utc),
LocalResult::Ambiguous(_, latest) => return latest.with_timezone(&Utc),
LocalResult::None => continue,
}
}
// Should be unreachable — DST gaps are always less than an hour.
Utc.from_utc_datetime(&date.and_time(time))
}
// ---------------------------------------------------------------------------
// crawler_state I/O
// ---------------------------------------------------------------------------
async fn read_last_tick(pool: &PgPool) -> sqlx::Result<Option<DateTime<Utc>>> {
let row: Option<serde_json::Value> = sqlx::query_scalar(
"SELECT value FROM crawler_state WHERE key = $1",
)
.bind(STATE_KEY_LAST_TICK)
.fetch_optional(pool)
.await?;
Ok(row.and_then(|v| {
v.get("at")
.and_then(|s| s.as_str())
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
}))
}
async fn write_last_tick(pool: &PgPool, at: DateTime<Utc>) -> sqlx::Result<()> {
sqlx::query(
"INSERT INTO crawler_state (key, value, updated_at) \
VALUES ($1, $2, now()) \
ON CONFLICT (key) DO UPDATE \
SET value = EXCLUDED.value, updated_at = now()",
)
.bind(STATE_KEY_LAST_TICK)
.bind(json!({ "at": at.to_rfc3339() }))
.execute(pool)
.await?;
Ok(())
}
// ---------------------------------------------------------------------------
// Test helpers (not gated on cfg(test) — integration tests in tests/ dir
// need them too).
// ---------------------------------------------------------------------------
pub mod test_support {
//! Lightweight stubs the daemon tests use. Public because integration
//! tests live outside this module.
use super::*;
use std::sync::atomic::AtomicUsize;
pub struct CountingMetadataPass {
pub count: AtomicUsize,
}
impl Default for CountingMetadataPass {
fn default() -> Self {
Self {
count: AtomicUsize::new(0),
}
}
}
#[async_trait]
impl MetadataPass for CountingMetadataPass {
async fn run(&self) -> anyhow::Result<pipeline::MetadataStats> {
self.count.fetch_add(1, Ordering::AcqRel);
Ok(pipeline::MetadataStats::default())
}
}
pub type DispatchFn = Arc<
dyn Fn(JobPayload) -> futures_util::future::BoxFuture<'static, anyhow::Result<SyncOutcome>>
+ Send
+ Sync,
>;
pub struct StubDispatcher {
pub handler: DispatchFn,
}
#[async_trait]
impl ChapterDispatcher for StubDispatcher {
async fn dispatch(&self, payload: JobPayload) -> anyhow::Result<SyncOutcome> {
(self.handler)(payload).await
}
}
pub fn always_done() -> Arc<StubDispatcher> {
Arc::new(StubDispatcher {
handler: Arc::new(|_| Box::pin(async { Ok(SyncOutcome::Fetched { pages: 1 }) })),
})
}
pub fn panicking_dispatcher() -> Arc<StubDispatcher> {
Arc::new(StubDispatcher {
handler: Arc::new(|_| Box::pin(async { panic!("intentional dispatcher panic") })),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration as ChronoDuration;
fn dt_utc(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<Utc> {
Utc.with_ymd_and_hms(y, mo, d, h, mi, 0).unwrap()
}
#[test]
fn next_fire_in_utc_at_midnight_advances_one_day() {
let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC
let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
let next = next_fire(now, at, Tz::UTC);
// Next midnight is May 26 00:00 UTC.
assert_eq!(next, dt_utc(2026, 5, 26, 0, 0));
}
#[test]
fn next_fire_before_today_slot_returns_today() {
let now = dt_utc(2026, 5, 25, 23, 0); // 23:00 UTC
let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap();
let next = next_fire(now, at, Tz::UTC);
assert_eq!(next, dt_utc(2026, 5, 25, 23, 30));
}
#[test]
fn next_fire_skips_spring_forward_gap_in_europe_berlin() {
// 2024-03-31: clocks jump 02:00 -> 03:00 in Berlin (CET -> CEST).
// Asking for daily_at = 02:30 on the morning of the jump should
// land on the *next valid* local instant past the gap. We test
// by computing `next_fire` at 2024-03-31 00:30 UTC (= 01:30 CET,
// i.e. just before the gap). The next 02:30 local does not exist,
// so the helper advances past it.
let now = dt_utc(2024, 3, 31, 0, 30); // 01:30 local Berlin (CET = UTC+1)
let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap();
let next = next_fire(now, at, Tz::Europe__Berlin);
// Local Berlin time skips from 02:00 -> 03:00. After the +1 minute
// search, the first valid slot is 03:00 local on 2024-03-31, which
// is 01:00 UTC (CEST = UTC+2).
// We assert the result is strictly between (now) and 1h later
// and is in UTC — the exact minute depends on how many +1m steps
// were required.
assert!(next > now);
assert!(next < now + ChronoDuration::hours(2));
}
#[test]
fn next_fire_on_fall_back_picks_later_instant() {
// 2024-10-27: clocks jump 03:00 -> 02:00 (CEST -> CET) in Berlin.
// 02:30 happens twice on that day. We pick the later one.
let now = dt_utc(2024, 10, 26, 12, 0); // day before, noon UTC
let at = NaiveTime::from_hms_opt(2, 30, 0).unwrap();
let next = next_fire(now, at, Tz::Europe__Berlin);
// First 02:30 local is 00:30 UTC (CEST = UTC+2).
// Second 02:30 local is 01:30 UTC (CET = UTC+1).
// We expect the later instant: 01:30 UTC on 2024-10-27.
assert_eq!(next, dt_utc(2024, 10, 27, 1, 30));
}
#[test]
fn previous_fire_returns_today_when_now_is_after_slot() {
let now = dt_utc(2026, 5, 25, 12, 0); // noon UTC
let at = NaiveTime::from_hms_opt(0, 0, 0).unwrap();
let prev = previous_fire(now, at, Tz::UTC);
assert_eq!(prev, dt_utc(2026, 5, 25, 0, 0));
}
#[test]
fn previous_fire_returns_yesterday_when_now_is_before_today_slot() {
let now = dt_utc(2026, 5, 25, 8, 0); // 08:00 UTC
let at = NaiveTime::from_hms_opt(23, 30, 0).unwrap();
let prev = previous_fire(now, at, Tz::UTC);
assert_eq!(prev, dt_utc(2026, 5, 24, 23, 30));
}
}

View File

@@ -14,9 +14,12 @@
//! - [`diff`]: change detection — new / updated / dropped semantics.
pub mod browser;
pub mod browser_manager;
pub mod content;
pub mod daemon;
pub mod diff;
pub mod jobs;
pub mod pipeline;
pub mod rate_limit;
pub mod session;
pub mod source;

View File

@@ -0,0 +1,347 @@
//! Crawler pipeline — the reusable metadata pass and the enqueue helpers
//! that fan out chapter-content work. Shared between the daemon (cron tick)
//! and the CLI (`bin/crawler.rs`) so behavior stays in lockstep.
use anyhow::Context;
use sqlx::PgPool;
use uuid::Uuid;
use crate::crawler::browser_manager::BrowserManager;
use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
use crate::crawler::rate_limit::HostRateLimiters;
use crate::crawler::source::target::TargetSource;
use crate::crawler::source::{DiscoverMode, FetchContext, Source};
use crate::repo;
use crate::storage::Storage;
/// Coarse counters surfaced for logging at the end of a metadata pass.
#[derive(Debug, Default, Clone, Copy)]
pub struct MetadataStats {
pub discovered: usize,
pub upserted: usize,
pub covers_fetched: usize,
pub mangas_failed: usize,
}
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
/// for the target source. Pure metadata; chapter content is enqueued as
/// separate `SyncChapterContent` jobs by the caller after this returns.
///
/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is
/// the "metadata-only" mode (parser doesn't extract chapters, and
/// `sync_manga_chapters` is skipped — otherwise an empty chapter list
/// would soft-drop existing rows).
#[allow(clippy::too_many_arguments)]
pub async fn run_metadata_pass(
browser_manager: &BrowserManager,
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
start_url: &str,
limit: usize,
skip_chapters: bool,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
.acquire()
.await
.context("acquire browser lease for metadata pass")?;
let browser_ref: &chromiumoxide::Browser = &lease;
let source = {
let s = TargetSource::new(start_url.to_string());
if skip_chapters {
s.without_chapter_parsing()
} else {
s
}
};
let ctx = FetchContext {
browser: browser_ref,
rate,
};
let source_id = source.id();
repo::crawler::ensure_source(
db,
source_id,
"Target Site",
&origin_of(start_url).unwrap_or_else(|| start_url.to_string()),
)
.await
.context("ensure_source")?;
let run_started_at = chrono::Utc::now();
let max_refs = (limit > 0).then_some(limit);
tracing::info!(?max_refs, "discovering manga list");
let refs = source
.discover(&ctx, DiscoverMode::Backfill, max_refs)
.await
.context("discover failed")?;
tracing::info!(count = refs.len(), "discovered manga list");
let mut stats = MetadataStats {
discovered: refs.len(),
..MetadataStats::default()
};
for (i, r) in refs.iter().enumerate() {
tracing::info!(
idx = i + 1,
total = stats.discovered,
key = %r.source_manga_key,
"fetching metadata"
);
let manga = match source.fetch_manga(&ctx, r).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(
key = %r.source_manga_key,
url = %r.url,
error = ?e,
"fetch_manga failed"
);
stats.mangas_failed += 1;
continue;
}
};
let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga)
.await
{
Ok(u) => u,
Err(e) => {
tracing::error!(
key = %r.source_manga_key,
error = ?e,
"upsert_manga_from_source failed"
);
stats.mangas_failed += 1;
continue;
}
};
stats.upserted += 1;
tracing::info!(
key = %manga.source_manga_key,
manga_id = %upsert.manga_id,
status = ?upsert.status,
title = %manga.title,
"manga upserted"
);
// Cover image: download when missing in storage or when metadata
// signaled an update (cover URL is part of metadata_hash, so
// Updated implies the URL may have moved). Failures are non-fatal.
let needs_cover = upsert.cover_image_path.is_none()
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
if needs_cover {
if let Some(cover_url) = manga.cover_url.as_deref() {
match download_and_store_cover(
db,
storage,
http,
rate,
&r.url,
upsert.manga_id,
cover_url,
)
.await
{
Ok(()) => stats.covers_fetched += 1,
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"cover download failed"
),
}
}
}
if !skip_chapters {
match repo::crawler::sync_manga_chapters(
db,
source_id,
upsert.manga_id,
&manga.chapters,
)
.await
{
Ok(diff) => tracing::info!(
manga_id = %upsert.manga_id,
new = diff.new,
refreshed = diff.refreshed,
dropped = diff.dropped,
"chapters synced"
),
Err(e) => tracing::warn!(
manga_id = %upsert.manga_id,
error = ?e,
"chapter sync failed"
),
}
}
}
if limit == 0 {
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
}
} else {
tracing::info!(limit, "partial sync — skipping drop pass");
}
drop(lease);
Ok(stats)
}
/// Enqueue a `SyncChapterContent` job for every chapter of *any* bookmarked
/// manga that still has `page_count = 0` and a non-dropped source row.
/// Returns `(inserted, skipped)` counts. Dedup index handles repeats.
pub async fn enqueue_bookmarked_pending(pool: &PgPool) -> anyhow::Result<EnqueueSummary> {
let rows: Vec<(String, Uuid, String)> = sqlx::query_as(
r#"
SELECT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
FROM chapters c
JOIN bookmarks b ON b.manga_id = c.manga_id
JOIN chapter_sources cs ON cs.chapter_id = c.id
WHERE c.page_count = 0
AND cs.dropped_at IS NULL
GROUP BY cs.source_id, c.id, cs.source_chapter_key, c.manga_id, c.created_at
ORDER BY c.manga_id, c.created_at ASC
"#,
)
.fetch_all(pool)
.await
.context("query bookmarked-pending chapters")?;
let mut summary = EnqueueSummary::default();
for (source_id, chapter_id, source_chapter_key) in rows {
let payload = JobPayload::SyncChapterContent {
source_id,
chapter_id,
source_chapter_key,
};
match jobs::enqueue(pool, &payload).await {
Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1,
Ok(EnqueueResult::Skipped) => summary.skipped += 1,
Err(e) => {
tracing::warn!(
%chapter_id,
error = ?e,
"enqueue chapter content failed"
);
summary.failed += 1;
}
}
}
Ok(summary)
}
/// Enqueue chapter-content jobs for a *single* manga (the bookmark-create
/// hook). Same dedup semantics as [`enqueue_bookmarked_pending`].
pub async fn enqueue_pending_for_manga(
pool: &PgPool,
manga_id: Uuid,
) -> anyhow::Result<EnqueueSummary> {
let rows: Vec<(String, Uuid, String)> = sqlx::query_as(
r#"
SELECT DISTINCT cs.source_id, c.id AS chapter_id, cs.source_chapter_key
FROM chapters c
JOIN chapter_sources cs ON cs.chapter_id = c.id
WHERE c.manga_id = $1
AND c.page_count = 0
AND cs.dropped_at IS NULL
ORDER BY cs.source_id, c.id
"#,
)
.bind(manga_id)
.fetch_all(pool)
.await
.context("query pending chapters for manga")?;
let mut summary = EnqueueSummary::default();
for (source_id, chapter_id, source_chapter_key) in rows {
let payload = JobPayload::SyncChapterContent {
source_id,
chapter_id,
source_chapter_key,
};
match jobs::enqueue(pool, &payload).await {
Ok(EnqueueResult::Inserted(_)) => summary.inserted += 1,
Ok(EnqueueResult::Skipped) => summary.skipped += 1,
Err(e) => {
tracing::warn!(
%chapter_id,
error = ?e,
"enqueue chapter content failed"
);
summary.failed += 1;
}
}
}
Ok(summary)
}
#[derive(Debug, Default, Clone, Copy)]
pub struct EnqueueSummary {
pub inserted: usize,
pub skipped: usize,
pub failed: usize,
}
/// Download a cover image and persist its storage path. Local to the
/// pipeline because the CLI still calls it from its inline chapter-content
/// loop; once the worker pool fully replaces that path we can fold this
/// into `pipeline` proper.
async fn download_and_store_cover(
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
manga_url: &str,
manga_id: Uuid,
cover_url: &str,
) -> anyhow::Result<()> {
let absolute = reqwest::Url::parse(manga_url)
.context("parse manga URL")?
.join(cover_url)
.context("join cover URL onto manga URL")?;
rate.wait_for(absolute.as_str()).await?;
let resp = http
.get(absolute.clone())
.header(reqwest::header::REFERER, manga_url)
.send()
.await
.with_context(|| format!("GET {absolute}"))?
.error_for_status()
.with_context(|| format!("non-2xx for {absolute}"))?;
let bytes = resp.bytes().await.context("read cover body")?;
let kind = infer::get(&bytes);
let ext = kind.map(|k| k.extension()).unwrap_or("bin");
let key = format!("mangas/{manga_id}/cover.{ext}");
storage
.put(&key, &bytes)
.await
.with_context(|| format!("store cover at {key}"))?;
repo::manga::set_cover_image_path(db, manga_id, &key)
.await
.with_context(|| format!("update cover_image_path for {manga_id}"))?;
tracing::info!(
manga_id = %manga_id,
key = %key,
bytes = bytes.len(),
%absolute,
"cover stored"
);
Ok(())
}
fn origin_of(url: &str) -> Option<String> {
let (scheme, rest) = url.split_once("://")?;
let host = rest.split('/').next()?;
Some(format!("{scheme}://{host}"))
}