From 6b7ff78730854a20256529c4e5531e15c978dffb Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 1 Jun 2026 22:22:42 +0200 Subject: [PATCH] feat(v1.1.1-gc): dead-letter + abandoned-executions retention sweepers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two tokio tasks spawned at startup that sweep their respective tables on a weekly cadence (design notes §3 #9 + §4 retention). Both use `FOR UPDATE SKIP LOCKED` on the claim query so concurrent sweepers in cluster mode (v1.3+) don't fight each other. Defaults: 30 days for dead_letters, 7 days for abandoned_executions. Both env-overridable via `PICLOUD_DEAD_LETTER_RETENTION_DAYS` and `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS` (loaded into `TriggerConfig::from_env` from commit 5). Per-tick batch cap (5_000 rows) so a sweep can't lock up the table in a single transaction; the inner loop continues until 0 rows affected, after which the outer tick waits for the next week. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/manager-core/src/gc.rs | 95 ++++++++++++++++++++++++++++++++++ crates/manager-core/src/lib.rs | 2 + crates/picloud/src/lib.rs | 12 ++++- 3 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 crates/manager-core/src/gc.rs diff --git a/crates/manager-core/src/gc.rs b/crates/manager-core/src/gc.rs new file mode 100644 index 0000000..8572bad --- /dev/null +++ b/crates/manager-core/src/gc.rs @@ -0,0 +1,95 @@ +//! Weekly retention sweepers for `dead_letters` + `abandoned_executions`. +//! +//! Both use the `FOR UPDATE SKIP LOCKED` claim pattern so concurrent +//! sweepers (cluster mode v1.3+) don't fight each other. Defaults +//! match design notes §3 / §4: 30 days for DL, 7 days for abandoned. +//! Both env-overridable via `PICLOUD_DEAD_LETTER_RETENTION_DAYS` and +//! `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS` (loaded by +//! `TriggerConfig::from_env`). +//! +//! Spawned from `build_app` alongside `spawn_session_pruner`. + +use std::sync::Arc; +use std::time::Duration; + +use chrono::Utc; + +use crate::abandoned_repo::AbandonedRepo; +use crate::dead_letter_repo::DeadLetterRepo; + +/// Weekly sweep cadence — matches `spawn_session_pruner` shape. +const SWEEP_INTERVAL: Duration = Duration::from_secs(7 * 24 * 60 * 60); + +/// Per-tick batch cap so we don't try to delete millions of rows in +/// one transaction. The loop keeps deleting batches until a tick +/// returns 0 rows affected. +const SWEEP_BATCH: i64 = 5_000; + +pub fn spawn_dead_letter_gc(repo: Arc, retention_days: u32) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(SWEEP_INTERVAL); + // Skip the immediate first fire — don't sweep at process start. + ticker.tick().await; + loop { + ticker.tick().await; + sweep_dead_letters(&*repo, retention_days).await; + } + }); +} + +pub fn spawn_abandoned_gc(repo: Arc, retention_days: u32) { + tokio::spawn(async move { + let mut ticker = tokio::time::interval(SWEEP_INTERVAL); + ticker.tick().await; + loop { + ticker.tick().await; + sweep_abandoned(&*repo, retention_days).await; + } + }); +} + +async fn sweep_dead_letters(repo: &dyn DeadLetterRepo, retention_days: u32) { + let cutoff = Utc::now() - chrono::Duration::days(i64::from(retention_days)); + let mut total: u64 = 0; + loop { + match repo.gc(cutoff, SWEEP_BATCH).await { + Ok(0) => break, + Ok(n) => { + total += n; + if n < SWEEP_BATCH as u64 { + break; + } + } + Err(e) => { + tracing::warn!(?e, "dead_letters GC sweep errored"); + break; + } + } + } + if total > 0 { + tracing::info!(swept = total, "dead_letters GC swept"); + } +} + +async fn sweep_abandoned(repo: &dyn AbandonedRepo, retention_days: u32) { + let cutoff = Utc::now() - chrono::Duration::days(i64::from(retention_days)); + let mut total: u64 = 0; + loop { + match repo.gc(cutoff, SWEEP_BATCH).await { + Ok(0) => break, + Ok(n) => { + total += n; + if n < SWEEP_BATCH as u64 { + break; + } + } + Err(e) => { + tracing::warn!(?e, "abandoned_executions GC sweep errored"); + break; + } + } + } + if total > 0 { + tracing::info!(swept = total, "abandoned_executions GC swept"); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index f796d16..6ff6869 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -26,6 +26,7 @@ pub mod dead_letter_repo; pub mod dead_letter_service; pub mod dead_letters_api; pub mod dispatcher; +pub mod gc; pub mod kv_repo; pub mod kv_service; pub mod log_sink; @@ -85,6 +86,7 @@ pub use dead_letter_repo::{ pub use dead_letter_service::PostgresDeadLetterService; pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState}; pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; +pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_service::KvServiceImpl; pub use log_sink::PostgresExecutionLogSink; diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 7ccb36b..4629ea1 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -220,8 +220,16 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { inbox: inbox_registry, outbox: outbox_writer, }; - // Commit 10 wires abandoned_repo into the GC sweeper. - let _ = &abandoned_repo; + // Weekly retention sweepers for dead_letters + abandoned_executions. + // Defaults: 30 days / 7 days (design notes §3 #9 + §4 retention). + picloud_manager_core::spawn_dead_letter_gc( + dl_repo.clone(), + trigger_config.dead_letter_retention_days, + ); + picloud_manager_core::spawn_abandoned_gc( + abandoned_repo.clone(), + trigger_config.abandoned_retention_days, + ); let triggers_state = TriggersState { triggers: trigger_repo, apps: apps_repo.clone(),