Collapses the crawler to a single newest-first walker and replaces the N-consecutive-unchanged streak with a per-manga rule: stop on the first manga where metadata is Unchanged AND chapter sync reports zero new chapters. The early stop is gated by a per-source recovery flag stored in `crawler_state` — set to `false` when a run starts, back to `true` only on a clean exit (end-of-walk or intentional stop). A crashed run leaves the flag `false` automatically (no shutdown code runs), so the next tick walks the full catalog instead of bailing at the first caught-up manga. This means a crashed mid-walk run self-heals on the next tick: the flag stays `false`, the next walk visits every page (recovering anything the crash missed past its crash point), and steady state resumes once the recovery sweep reaches end-of-walk. Removed: - DiscoverMode enum, Backfill mode, the boundary re-check + displaced-refs machinery in TargetSourceWalker. - Drop-pass (mark_dropped_mangas) and seed-completion plumbing (mark_seed_completed / seed_completed_at). The recovery flag subsumes the seed-completion signal; drop detection was explicitly opted out. - JobPayload::Discover (no production callers). - CRAWLER_MODE / CRAWLER_INCREMENTAL_STOP_AFTER env vars and the CrawlerModePref config type. `should_mark_clean_exit(walked_to_completion, hit_stop_condition)` encodes the clean-exit truth table in its signature — `hit_limit` is deliberately absent so a future edit cannot accidentally count a caller-imposed cap as a clean exit. Net -501 lines, 261 backend tests passing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
553 lines
18 KiB
Rust
553 lines
18 KiB
Rust
//! Integration tests for `crawler::jobs` queue operations.
|
|
//!
|
|
//! Uses `#[sqlx::test(migrations = "./migrations")]` which provisions a fresh
|
|
//! migrated DB per test. No browser, no axum router — these exercise the SQL
|
|
//! shape and dedup-index semantics directly against Postgres.
|
|
|
|
use std::time::Duration;
|
|
|
|
use mangalord::crawler::jobs::{
|
|
self, EnqueueResult, JobPayload, KIND_SYNC_CHAPTER_CONTENT,
|
|
};
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
fn chapter_content_payload(chapter_id: Uuid) -> JobPayload {
|
|
JobPayload::SyncChapterContent {
|
|
source_id: "target".into(),
|
|
chapter_id,
|
|
source_chapter_key: format!("ch-{chapter_id}"),
|
|
}
|
|
}
|
|
|
|
/// A non-`SyncChapterContent` payload, used to assert that only the
|
|
/// chapter-content kind is deduplicated by the partial index and that
|
|
/// `lease`'s kind filter correctly excludes other kinds.
|
|
fn sync_manga_payload(key: &str) -> JobPayload {
|
|
JobPayload::SyncManga {
|
|
source_id: "target".into(),
|
|
source_manga_key: key.into(),
|
|
}
|
|
}
|
|
|
|
async fn job_state(pool: &PgPool, id: Uuid) -> String {
|
|
sqlx::query_scalar::<_, String>("SELECT state FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(pool)
|
|
.await
|
|
.unwrap()
|
|
}
|
|
|
|
async fn job_attempts(pool: &PgPool, id: Uuid) -> i32 {
|
|
sqlx::query_scalar::<_, i32>("SELECT attempts FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(pool)
|
|
.await
|
|
.unwrap()
|
|
}
|
|
|
|
async fn job_count(pool: &PgPool) -> i64 {
|
|
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM crawler_jobs")
|
|
.fetch_one(pool)
|
|
.await
|
|
.unwrap()
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn enqueue_inserts_pending_row_with_round_trip_payload(pool: PgPool) {
|
|
let chapter_id = Uuid::new_v4();
|
|
let payload = chapter_content_payload(chapter_id);
|
|
|
|
let result = jobs::enqueue(&pool, &payload).await.unwrap();
|
|
let id = match result {
|
|
EnqueueResult::Inserted(id) => id,
|
|
EnqueueResult::Skipped => panic!("expected Inserted on first enqueue"),
|
|
};
|
|
|
|
assert_eq!(job_state(&pool, id).await, "pending");
|
|
assert_eq!(job_attempts(&pool, id).await, 0);
|
|
|
|
let raw_payload: serde_json::Value =
|
|
sqlx::query_scalar("SELECT payload FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
let decoded: JobPayload = serde_json::from_value(raw_payload).unwrap();
|
|
match decoded {
|
|
JobPayload::SyncChapterContent {
|
|
source_id,
|
|
chapter_id: c,
|
|
source_chapter_key,
|
|
} => {
|
|
assert_eq!(source_id, "target");
|
|
assert_eq!(c, chapter_id);
|
|
assert_eq!(source_chapter_key, format!("ch-{chapter_id}"));
|
|
}
|
|
_ => panic!("payload variant mismatch"),
|
|
}
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn duplicate_chapter_content_while_pending_is_skipped(pool: PgPool) {
|
|
let chapter_id = Uuid::new_v4();
|
|
let p = chapter_content_payload(chapter_id);
|
|
|
|
let first = jobs::enqueue(&pool, &p).await.unwrap();
|
|
assert!(matches!(first, EnqueueResult::Inserted(_)));
|
|
|
|
let second = jobs::enqueue(&pool, &p).await.unwrap();
|
|
assert!(matches!(second, EnqueueResult::Skipped));
|
|
|
|
assert_eq!(job_count(&pool).await, 1);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn duplicate_after_done_releases_dedup_slot(pool: PgPool) {
|
|
let chapter_id = Uuid::new_v4();
|
|
let p = chapter_content_payload(chapter_id);
|
|
|
|
let first_id = match jobs::enqueue(&pool, &p).await.unwrap() {
|
|
EnqueueResult::Inserted(id) => id,
|
|
EnqueueResult::Skipped => panic!("first enqueue should insert"),
|
|
};
|
|
// Move the first job out of (pending|running) so the partial index drops it.
|
|
sqlx::query("UPDATE crawler_jobs SET state = 'done' WHERE id = $1")
|
|
.bind(first_id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let second = jobs::enqueue(&pool, &p).await.unwrap();
|
|
assert!(
|
|
matches!(second, EnqueueResult::Inserted(_)),
|
|
"after done the chapter_id slot is free again"
|
|
);
|
|
assert_eq!(job_count(&pool).await, 2);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn different_chapter_ids_can_coexist(pool: PgPool) {
|
|
let p1 = chapter_content_payload(Uuid::new_v4());
|
|
let p2 = chapter_content_payload(Uuid::new_v4());
|
|
assert!(matches!(
|
|
jobs::enqueue(&pool, &p1).await.unwrap(),
|
|
EnqueueResult::Inserted(_)
|
|
));
|
|
assert!(matches!(
|
|
jobs::enqueue(&pool, &p2).await.unwrap(),
|
|
EnqueueResult::Inserted(_)
|
|
));
|
|
assert_eq!(job_count(&pool).await, 2);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn non_chapter_content_payloads_are_never_deduped(pool: PgPool) {
|
|
let p = sync_manga_payload("foo");
|
|
assert!(matches!(
|
|
jobs::enqueue(&pool, &p).await.unwrap(),
|
|
EnqueueResult::Inserted(_)
|
|
));
|
|
assert!(matches!(
|
|
jobs::enqueue(&pool, &p).await.unwrap(),
|
|
EnqueueResult::Inserted(_)
|
|
));
|
|
assert_eq!(job_count(&pool).await, 2);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn lease_marks_running_and_bumps_attempts_and_sets_leased_until(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
EnqueueResult::Skipped => unreachable!(),
|
|
};
|
|
|
|
let leases = jobs::lease(&pool, None, 10, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(leases.len(), 1);
|
|
let lease = &leases[0];
|
|
assert_eq!(lease.id, id);
|
|
assert_eq!(lease.attempts, 1);
|
|
|
|
assert_eq!(job_state(&pool, id).await, "running");
|
|
|
|
let leased_until: Option<chrono::DateTime<chrono::Utc>> =
|
|
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
let leased_until = leased_until.expect("leased_until set");
|
|
assert!(leased_until > chrono::Utc::now());
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn lease_with_kind_filter_only_matches_that_kind(pool: PgPool) {
|
|
let manga_id = match jobs::enqueue(&pool, &sync_manga_payload("foo"))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let chapter_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
|
|
let leases = jobs::lease(
|
|
&pool,
|
|
Some(KIND_SYNC_CHAPTER_CONTENT),
|
|
10,
|
|
Duration::from_secs(60),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(leases.len(), 1, "only chapter content payload leases");
|
|
assert_eq!(leases[0].id, chapter_id);
|
|
// sync_manga is still pending
|
|
assert_eq!(job_state(&pool, manga_id).await, "pending");
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn concurrent_leases_under_skip_locked_return_disjoint_ids(pool: PgPool) {
|
|
// 4 pending jobs, two concurrent calls each asking for up to 2.
|
|
let mut ids = Vec::new();
|
|
for _ in 0..4 {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
ids.push(id);
|
|
}
|
|
|
|
let (a, b) = tokio::join!(
|
|
jobs::lease(&pool, None, 2, Duration::from_secs(60)),
|
|
jobs::lease(&pool, None, 2, Duration::from_secs(60)),
|
|
);
|
|
let a = a.unwrap();
|
|
let b = b.unwrap();
|
|
let mut seen: Vec<Uuid> = a.iter().chain(b.iter()).map(|l| l.id).collect();
|
|
seen.sort();
|
|
seen.dedup();
|
|
let count = a.len() + b.len();
|
|
assert_eq!(
|
|
seen.len(),
|
|
count,
|
|
"no id appears in both lease results (SKIP LOCKED)"
|
|
);
|
|
assert!(count >= 2, "at least one lease saw work");
|
|
assert!(count <= 4);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn stale_running_lease_can_be_reclaimed(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
|
|
let first = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(first.len(), 1);
|
|
// Pretend the worker crashed: rewind leased_until into the past.
|
|
sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 minute' WHERE id = $1")
|
|
.bind(id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let second = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(second.len(), 1, "stale running row was re-leased");
|
|
assert_eq!(second[0].id, id);
|
|
assert_eq!(second[0].attempts, 2, "attempts bumped again");
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn ack_done_transitions_state_and_clears_lease(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
jobs::ack_done(&pool, leases[0].id).await.unwrap();
|
|
|
|
assert_eq!(job_state(&pool, id).await, "done");
|
|
let leased_until: Option<chrono::DateTime<chrono::Utc>> =
|
|
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert!(leased_until.is_none());
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn ack_failed_under_max_returns_to_pending_with_future_schedule(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
let lease = &leases[0];
|
|
jobs::ack_failed(&pool, lease.id, "boom", lease.attempts, lease.max_attempts)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(job_state(&pool, id).await, "pending");
|
|
|
|
let (scheduled_at, last_error): (chrono::DateTime<chrono::Utc>, Option<String>) =
|
|
sqlx::query_as("SELECT scheduled_at, last_error FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert!(scheduled_at > chrono::Utc::now());
|
|
assert_eq!(last_error.as_deref(), Some("boom"));
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn ack_failed_at_max_marks_dead(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
// Force a single lease then mark "this was attempt N where N == max_attempts".
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
let lease = &leases[0];
|
|
jobs::ack_failed(&pool, lease.id, "final boom", lease.max_attempts, lease.max_attempts)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(job_state(&pool, id).await, "dead");
|
|
let last_error: Option<String> =
|
|
sqlx::query_scalar("SELECT last_error FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(last_error.as_deref(), Some("final boom"));
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn ack_done_no_ops_when_lease_was_stolen(pool: PgPool) {
|
|
// Worker A's lease expires, worker B re-leases the job (state stays
|
|
// 'running' but attempts++ and leased_until refreshed). A late
|
|
// ack_done from worker A must not clobber B's progress.
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
// Worker A grabs the lease, but its lease expires immediately.
|
|
let _a_leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
sqlx::query("UPDATE crawler_jobs SET leased_until = now() - interval '1 minute' WHERE id = $1")
|
|
.bind(id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
// Worker B re-leases the expired-but-still-running job.
|
|
let b_leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(b_leases.len(), 1);
|
|
assert_eq!(b_leases[0].attempts, 2, "re-lease bumps attempts");
|
|
|
|
// Worker A's late ack_done — guarded by `state = 'running'` + lease_id
|
|
// but in the simplest implementation the guard is state-only. Either
|
|
// way, the job stays 'running' with worker B's progress intact.
|
|
jobs::ack_done(&pool, id).await.unwrap();
|
|
// Worker B is still working; until B acks, the job remains 'running'
|
|
// with its leased_until in the future and attempts == 2.
|
|
// (We can't make ack_done's lease_id distinguish A from B today —
|
|
// both share the same `id` — so the strongest current guarantee is
|
|
// that a late ack_done doesn't fire when state is already 'done',
|
|
// exercised below.)
|
|
// Finalize: worker B acks done.
|
|
jobs::ack_done(&pool, b_leases[0].id).await.unwrap();
|
|
assert_eq!(job_state(&pool, id).await, "done");
|
|
assert_eq!(job_attempts(&pool, id).await, 2);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn ack_failed_no_ops_when_state_is_not_running(pool: PgPool) {
|
|
// After a job transitions to 'done', a stale ack_failed (e.g. a
|
|
// worker that finished work and queued its ack but then handed off
|
|
// before the SQL ran) must not flip the state back to 'pending' or
|
|
// 'dead'. The `state = 'running'` predicate enforces this.
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
jobs::ack_done(&pool, leases[0].id).await.unwrap();
|
|
assert_eq!(job_state(&pool, id).await, "done");
|
|
|
|
// Late ack_failed arrives. Must be a no-op.
|
|
jobs::ack_failed(&pool, leases[0].id, "late", 1, 5)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(
|
|
job_state(&pool, id).await,
|
|
"done",
|
|
"late ack_failed must not resurrect a done job"
|
|
);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn release_no_ops_when_state_is_not_running(pool: PgPool) {
|
|
// Mirror of ack_failed_no_ops_when_state_is_not_running. release also
|
|
// decrements `attempts`, which would corrupt a re-leased job's
|
|
// attempt count if the guard were missing.
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
jobs::ack_done(&pool, leases[0].id).await.unwrap();
|
|
let attempts_before = job_attempts(&pool, id).await;
|
|
|
|
// Late release arrives.
|
|
jobs::release(&pool, leases[0].id).await.unwrap();
|
|
assert_eq!(
|
|
job_state(&pool, id).await,
|
|
"done",
|
|
"late release must not flip a done job back to pending"
|
|
);
|
|
assert_eq!(
|
|
job_attempts(&pool, id).await,
|
|
attempts_before,
|
|
"late release must not decrement attempts of a non-running job"
|
|
);
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn release_returns_to_pending_and_undoes_attempt_increment(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let leases = jobs::lease(&pool, None, 1, Duration::from_secs(60))
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(leases[0].attempts, 1);
|
|
jobs::release(&pool, leases[0].id).await.unwrap();
|
|
|
|
assert_eq!(job_state(&pool, id).await, "pending");
|
|
assert_eq!(job_attempts(&pool, id).await, 0);
|
|
let leased_until: Option<chrono::DateTime<chrono::Utc>> =
|
|
sqlx::query_scalar("SELECT leased_until FROM crawler_jobs WHERE id = $1")
|
|
.bind(id)
|
|
.fetch_one(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert!(leased_until.is_none());
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn reap_done_deletes_old_rows_keeps_fresh(pool: PgPool) {
|
|
// Two done rows: one old (updated_at 10 days ago), one fresh.
|
|
let old_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
let fresh_id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
|
|
sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '10 days' WHERE id = $1")
|
|
.bind(old_id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
sqlx::query("UPDATE crawler_jobs SET state='done' WHERE id = $1")
|
|
.bind(fresh_id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let deleted = jobs::reap_done(&pool, 7).await.unwrap();
|
|
assert_eq!(deleted, 1);
|
|
|
|
let remaining: Vec<Uuid> = sqlx::query_scalar("SELECT id FROM crawler_jobs ORDER BY id")
|
|
.fetch_all(&pool)
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(remaining, vec![fresh_id], "only fresh row remains");
|
|
}
|
|
|
|
#[sqlx::test(migrations = "./migrations")]
|
|
async fn reap_done_zero_is_a_no_op(pool: PgPool) {
|
|
let id = match jobs::enqueue(&pool, &chapter_content_payload(Uuid::new_v4()))
|
|
.await
|
|
.unwrap()
|
|
{
|
|
EnqueueResult::Inserted(id) => id,
|
|
_ => unreachable!(),
|
|
};
|
|
sqlx::query("UPDATE crawler_jobs SET state='done', updated_at = now() - interval '999 days' WHERE id = $1")
|
|
.bind(id)
|
|
.execute(&pool)
|
|
.await
|
|
.unwrap();
|
|
|
|
let deleted = jobs::reap_done(&pool, 0).await.unwrap();
|
|
assert_eq!(deleted, 0);
|
|
assert_eq!(job_count(&pool).await, 1);
|
|
}
|