Files
PiCloud/crates/manager-core/src/dead_letter_repo.rs
MechaCat02 545d863199 feat(v1.1.1-triggers): triggers + outbox schema + repos
Migrations 0008-0011 lay down the triggers framework's storage:

- `triggers` + `kv_trigger_details` + `dead_letter_trigger_details`
  (Layout E, design notes §2). Parent table carries common columns
  including `registered_by_principal` — the dispatcher uses this to
  run the trigger as the user that registered it (design notes §4).
- `outbox`: universal async dispatch substrate. KV/cron/pubsub/queue/
  email/dead-letter all write rows in the same shape; the dispatcher
  claims due rows via FOR UPDATE SKIP LOCKED. `reply_to` is the
  NATS-style inbox id for sync HTTP (commit 6) — its presence flags
  "don't retry" per the design.
- `dead_letters`: exact schema from design notes §4 with the four-
  value `resolution` CHECK constraint (`replayed | ignored |
  handled_by_script | handler_failed`) and partial index on
  unresolved rows for the dashboard badge.
- `abandoned_executions`: forensic table for the dispatcher's
  "tried to resolve a dropped inbox" edge case (design notes §3 #9).

Repo surfaces with Postgres impls behind traits so unit tests can
swap in-memory backings:
- `TriggerRepo` — CRUD + the `list_matching_kv` /
  `list_matching_dead_letter` hot paths the dispatcher uses.
  Includes a `collection_matches` helper that handles `*`, `prefix:*`,
  and exact-name globs.
- `OutboxRepo` — insert + claim-due + delete + reschedule.
- `DeadLetterRepo` — insert + get + list + unresolved-count +
  resolve + GC.
- `AbandonedRepo` — insert + GC.

`TriggerConfig::from_env` (new module) follows the existing
`SandboxCeiling` env-loading pattern for `PICLOUD_MAX_TRIGGER_DEPTH`,
`PICLOUD_TRIGGER_RETRY_*`, `PICLOUD_DEAD_LETTER_RETENTION_DAYS`, and
`PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS`.

`Capability::AppManageTriggers(AppId)` and `AppDeadLetterManage(AppId)`
join the enum. Both map onto the existing `Scope::AppAdmin` per the
seven-scope commitment; `role_satisfies` grants them at the
`AppAdmin` per-app role.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 21:46:45 +02:00

262 lines
8.7 KiB
Rust

//! `DeadLetterRepo` — CRUD over the `dead_letters` table.
//!
//! The dispatcher writes new rows when an async trigger exhausts its
//! retry policy. Admin endpoints (commit 8) read for the dashboard
//! list view and write to mark rows resolved or replay them. The GC
//! sweeper (commit 10) deletes expired rows by `created_at`.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use picloud_shared::{AppId, DeadLetterId, ScriptId, TriggerId};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum DeadLetterRepoError {
#[error("database error: {0}")]
Db(#[from] sqlx::Error),
#[error("dead-letter row not found: {0}")]
NotFound(DeadLetterId),
#[error("invalid resolution {0:?}")]
InvalidResolution(String),
}
#[derive(Debug, Clone)]
pub struct NewDeadLetter {
pub app_id: AppId,
/// `outbox.id` that exhausted retries. Outbox row deleted at the
/// same time.
pub original_event_id: Uuid,
pub source: String,
pub op: String,
pub trigger_id: Option<TriggerId>,
pub script_id: Option<ScriptId>,
pub payload: serde_json::Value,
pub attempt_count: u32,
pub first_attempt_at: DateTime<Utc>,
pub last_attempt_at: DateTime<Utc>,
pub last_error: String,
}
#[derive(Debug, Clone)]
pub struct DeadLetterRow {
pub id: DeadLetterId,
pub app_id: AppId,
pub original_event_id: Uuid,
pub source: String,
pub op: String,
pub trigger_id: Option<TriggerId>,
pub script_id: Option<ScriptId>,
pub payload: serde_json::Value,
pub attempt_count: u32,
pub first_attempt_at: DateTime<Utc>,
pub last_attempt_at: DateTime<Utc>,
pub last_error: String,
pub created_at: DateTime<Utc>,
pub resolved_at: Option<DateTime<Utc>>,
pub resolution: Option<String>,
}
#[async_trait]
pub trait DeadLetterRepo: Send + Sync {
/// Insert a new dead-letter row. Returns the assigned id.
async fn insert(&self, row: NewDeadLetter) -> Result<DeadLetterId, DeadLetterRepoError>;
async fn get(&self, id: DeadLetterId) -> Result<Option<DeadLetterRow>, DeadLetterRepoError>;
/// Lookup for the dashboard list view. `unresolved_only=true`
/// filters to `resolved_at IS NULL`.
async fn list_for_app(
&self,
app_id: AppId,
unresolved_only: bool,
limit: i64,
offset: i64,
) -> Result<Vec<DeadLetterRow>, DeadLetterRepoError>;
/// Hot path for the dashboard's per-app unresolved-count badge.
async fn unresolved_count(&self, app_id: AppId) -> Result<i64, DeadLetterRepoError>;
/// Mark the row resolved with the given reason. The reason MUST
/// be one of the four CHECK-constraint values
/// (`replayed`, `ignored`, `handled_by_script`, `handler_failed`).
async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError>;
/// Retention sweep. Deletes rows with `created_at < older_than`
/// up to `limit` at a time, using FOR UPDATE SKIP LOCKED to play
/// nicely with concurrent dispatchers. Returns the count deleted.
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, DeadLetterRepoError>;
}
pub struct PostgresDeadLetterRepo {
pool: PgPool,
}
impl PostgresDeadLetterRepo {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
const ALLOWED_RESOLUTIONS: &[&str] =
&["replayed", "ignored", "handled_by_script", "handler_failed"];
#[async_trait]
impl DeadLetterRepo for PostgresDeadLetterRepo {
async fn insert(&self, row: NewDeadLetter) -> Result<DeadLetterId, DeadLetterRepoError> {
let (id,): (Uuid,) = sqlx::query_as(
"INSERT INTO dead_letters ( \
app_id, original_event_id, source, op, trigger_id, script_id, \
payload, attempt_count, first_attempt_at, last_attempt_at, last_error \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
RETURNING id",
)
.bind(row.app_id.into_inner())
.bind(row.original_event_id)
.bind(row.source)
.bind(row.op)
.bind(row.trigger_id.map(TriggerId::into_inner))
.bind(row.script_id.map(ScriptId::into_inner))
.bind(row.payload)
.bind(i32::try_from(row.attempt_count).unwrap_or(0))
.bind(row.first_attempt_at)
.bind(row.last_attempt_at)
.bind(row.last_error)
.fetch_one(&self.pool)
.await?;
Ok(id.into())
}
async fn get(&self, id: DeadLetterId) -> Result<Option<DeadLetterRow>, DeadLetterRepoError> {
let row: Option<DeadLetterRowRaw> = sqlx::query_as(
"SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \
payload, attempt_count, first_attempt_at, last_attempt_at, \
last_error, created_at, resolved_at, resolution \
FROM dead_letters WHERE id = $1",
)
.bind(id.into_inner())
.fetch_optional(&self.pool)
.await?;
Ok(row.map(DeadLetterRowRaw::into_row))
}
async fn list_for_app(
&self,
app_id: AppId,
unresolved_only: bool,
limit: i64,
offset: i64,
) -> Result<Vec<DeadLetterRow>, DeadLetterRepoError> {
let rows: Vec<DeadLetterRowRaw> = sqlx::query_as(
"SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \
payload, attempt_count, first_attempt_at, last_attempt_at, \
last_error, created_at, resolved_at, resolution \
FROM dead_letters \
WHERE app_id = $1 \
AND ($2::bool = FALSE OR resolved_at IS NULL) \
ORDER BY created_at DESC \
LIMIT $3 OFFSET $4",
)
.bind(app_id.into_inner())
.bind(unresolved_only)
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(DeadLetterRowRaw::into_row).collect())
}
async fn unresolved_count(&self, app_id: AppId) -> Result<i64, DeadLetterRepoError> {
let (count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM dead_letters \
WHERE app_id = $1 AND resolved_at IS NULL",
)
.bind(app_id.into_inner())
.fetch_one(&self.pool)
.await?;
Ok(count)
}
async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError> {
if !ALLOWED_RESOLUTIONS.contains(&reason) {
return Err(DeadLetterRepoError::InvalidResolution(reason.to_string()));
}
let res = sqlx::query(
"UPDATE dead_letters \
SET resolution = $2, resolved_at = NOW() \
WHERE id = $1",
)
.bind(id.into_inner())
.bind(reason)
.execute(&self.pool)
.await?;
if res.rows_affected() == 0 {
return Err(DeadLetterRepoError::NotFound(id));
}
Ok(())
}
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, DeadLetterRepoError> {
// Tombstones picked under FOR UPDATE SKIP LOCKED so concurrent
// sweepers (cluster mode) don't fight each other.
let res = sqlx::query(
"DELETE FROM dead_letters \
WHERE id IN ( \
SELECT id FROM dead_letters \
WHERE created_at < $1 \
FOR UPDATE SKIP LOCKED \
LIMIT $2 \
)",
)
.bind(older_than)
.bind(limit)
.execute(&self.pool)
.await?;
Ok(res.rows_affected())
}
}
#[derive(sqlx::FromRow)]
struct DeadLetterRowRaw {
id: Uuid,
app_id: Uuid,
original_event_id: Uuid,
source: String,
op: String,
trigger_id: Option<Uuid>,
script_id: Option<Uuid>,
payload: serde_json::Value,
attempt_count: i32,
first_attempt_at: DateTime<Utc>,
last_attempt_at: DateTime<Utc>,
last_error: String,
created_at: DateTime<Utc>,
resolved_at: Option<DateTime<Utc>>,
resolution: Option<String>,
}
impl DeadLetterRowRaw {
fn into_row(self) -> DeadLetterRow {
DeadLetterRow {
id: self.id.into(),
app_id: self.app_id.into(),
original_event_id: self.original_event_id,
source: self.source,
op: self.op,
trigger_id: self.trigger_id.map(Into::into),
script_id: self.script_id.map(Into::into),
payload: self.payload,
attempt_count: u32::try_from(self.attempt_count).unwrap_or(0),
first_attempt_at: self.first_attempt_at,
last_attempt_at: self.last_attempt_at,
last_error: self.last_error,
created_at: self.created_at,
resolved_at: self.resolved_at,
resolution: self.resolution,
}
}
}