feat(v1.1.1-gc): dead-letter + abandoned-executions retention sweepers

Two tokio tasks spawned at startup that sweep their respective
tables on a weekly cadence (design notes §3 #9 + §4 retention).
Both use `FOR UPDATE SKIP LOCKED` on the claim query so concurrent
sweepers in cluster mode (v1.3+) don't fight each other.

Defaults: 30 days for dead_letters, 7 days for abandoned_executions.
Both env-overridable via `PICLOUD_DEAD_LETTER_RETENTION_DAYS` and
`PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS` (loaded into
`TriggerConfig::from_env` from commit 5).

Per-tick batch cap (5_000 rows) so a sweep can't lock up the table
in a single transaction; the inner loop continues until 0 rows
affected, after which the outer tick waits for the next week.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-01 22:22:42 +02:00
parent 1795dfc98a
commit 6b7ff78730
3 changed files with 107 additions and 2 deletions

View File

@@ -0,0 +1,95 @@
//! Weekly retention sweepers for `dead_letters` + `abandoned_executions`.
//!
//! Both use the `FOR UPDATE SKIP LOCKED` claim pattern so concurrent
//! sweepers (cluster mode v1.3+) don't fight each other. Defaults
//! match design notes §3 / §4: 30 days for DL, 7 days for abandoned.
//! Both env-overridable via `PICLOUD_DEAD_LETTER_RETENTION_DAYS` and
//! `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS` (loaded by
//! `TriggerConfig::from_env`).
//!
//! Spawned from `build_app` alongside `spawn_session_pruner`.
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use crate::abandoned_repo::AbandonedRepo;
use crate::dead_letter_repo::DeadLetterRepo;
/// Weekly sweep cadence — matches `spawn_session_pruner` shape.
const SWEEP_INTERVAL: Duration = Duration::from_secs(7 * 24 * 60 * 60);
/// Per-tick batch cap so we don't try to delete millions of rows in
/// one transaction. The loop keeps deleting batches until a tick
/// returns 0 rows affected.
const SWEEP_BATCH: i64 = 5_000;
pub fn spawn_dead_letter_gc(repo: Arc<dyn DeadLetterRepo>, retention_days: u32) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(SWEEP_INTERVAL);
// Skip the immediate first fire — don't sweep at process start.
ticker.tick().await;
loop {
ticker.tick().await;
sweep_dead_letters(&*repo, retention_days).await;
}
});
}
pub fn spawn_abandoned_gc(repo: Arc<dyn AbandonedRepo>, retention_days: u32) {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(SWEEP_INTERVAL);
ticker.tick().await;
loop {
ticker.tick().await;
sweep_abandoned(&*repo, retention_days).await;
}
});
}
async fn sweep_dead_letters(repo: &dyn DeadLetterRepo, retention_days: u32) {
let cutoff = Utc::now() - chrono::Duration::days(i64::from(retention_days));
let mut total: u64 = 0;
loop {
match repo.gc(cutoff, SWEEP_BATCH).await {
Ok(0) => break,
Ok(n) => {
total += n;
if n < SWEEP_BATCH as u64 {
break;
}
}
Err(e) => {
tracing::warn!(?e, "dead_letters GC sweep errored");
break;
}
}
}
if total > 0 {
tracing::info!(swept = total, "dead_letters GC swept");
}
}
async fn sweep_abandoned(repo: &dyn AbandonedRepo, retention_days: u32) {
let cutoff = Utc::now() - chrono::Duration::days(i64::from(retention_days));
let mut total: u64 = 0;
loop {
match repo.gc(cutoff, SWEEP_BATCH).await {
Ok(0) => break,
Ok(n) => {
total += n;
if n < SWEEP_BATCH as u64 {
break;
}
}
Err(e) => {
tracing::warn!(?e, "abandoned_executions GC sweep errored");
break;
}
}
}
if total > 0 {
tracing::info!(swept = total, "abandoned_executions GC swept");
}
}