Migrations 0008-0011 lay down the triggers framework's storage: - `triggers` + `kv_trigger_details` + `dead_letter_trigger_details` (Layout E, design notes §2). Parent table carries common columns including `registered_by_principal` — the dispatcher uses this to run the trigger as the user that registered it (design notes §4). - `outbox`: universal async dispatch substrate. KV/cron/pubsub/queue/ email/dead-letter all write rows in the same shape; the dispatcher claims due rows via FOR UPDATE SKIP LOCKED. `reply_to` is the NATS-style inbox id for sync HTTP (commit 6) — its presence flags "don't retry" per the design. - `dead_letters`: exact schema from design notes §4 with the four- value `resolution` CHECK constraint (`replayed | ignored | handled_by_script | handler_failed`) and partial index on unresolved rows for the dashboard badge. - `abandoned_executions`: forensic table for the dispatcher's "tried to resolve a dropped inbox" edge case (design notes §3 #9). Repo surfaces with Postgres impls behind traits so unit tests can swap in-memory backings: - `TriggerRepo` — CRUD + the `list_matching_kv` / `list_matching_dead_letter` hot paths the dispatcher uses. Includes a `collection_matches` helper that handles `*`, `prefix:*`, and exact-name globs. - `OutboxRepo` — insert + claim-due + delete + reschedule. - `DeadLetterRepo` — insert + get + list + unresolved-count + resolve + GC. - `AbandonedRepo` — insert + GC. `TriggerConfig::from_env` (new module) follows the existing `SandboxCeiling` env-loading pattern for `PICLOUD_MAX_TRIGGER_DEPTH`, `PICLOUD_TRIGGER_RETRY_*`, `PICLOUD_DEAD_LETTER_RETENTION_DAYS`, and `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS`. `Capability::AppManageTriggers(AppId)` and `AppDeadLetterManage(AppId)` join the enum. Both map onto the existing `Scope::AppAdmin` per the seven-scope commitment; `role_satisfies` grants them at the `AppAdmin` per-app role. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
158 lines
4.6 KiB
Rust
158 lines
4.6 KiB
Rust
//! Trigger-framework tunables. Defaults match design notes §3 (retry
|
|
//! policy) and §4 (retention). Each knob is env-overridable via a
|
|
//! `PICLOUD_*` variable following the same `tracing::warn` on parse
|
|
//! error pattern `SandboxCeiling::from_env` uses.
|
|
|
|
use std::env;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum BackoffShape {
|
|
Exponential,
|
|
Linear,
|
|
Constant,
|
|
}
|
|
|
|
impl BackoffShape {
|
|
#[must_use]
|
|
pub const fn as_str(self) -> &'static str {
|
|
match self {
|
|
Self::Exponential => "exponential",
|
|
Self::Linear => "linear",
|
|
Self::Constant => "constant",
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn from_wire(s: &str) -> Option<Self> {
|
|
match s {
|
|
"exponential" => Some(Self::Exponential),
|
|
"linear" => Some(Self::Linear),
|
|
"constant" => Some(Self::Constant),
|
|
_ => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy)]
|
|
pub struct TriggerConfig {
|
|
/// Maximum `cx.trigger_depth` before the dispatcher refuses
|
|
/// execution. Above this, the row is dropped + a metric bumped;
|
|
/// it is NOT dead-lettered (design notes §4: depth-exceeded
|
|
/// means "you built a loop"). Default 8.
|
|
pub max_trigger_depth: u32,
|
|
|
|
/// Default retry attempts (per-trigger override on the row).
|
|
pub retry_max_attempts: u32,
|
|
pub retry_backoff: BackoffShape,
|
|
pub retry_base_ms: u32,
|
|
/// ±jitter as a percentage of the computed delay. Applied at
|
|
/// dispatch time — not per-trigger.
|
|
pub retry_jitter_pct: u32,
|
|
|
|
/// dead-letter retention before GC, in days. Default 30.
|
|
pub dead_letter_retention_days: u32,
|
|
/// abandoned-execution retention before GC, in days. Default 7.
|
|
pub abandoned_retention_days: u32,
|
|
}
|
|
|
|
impl TriggerConfig {
|
|
#[must_use]
|
|
pub const fn conservative() -> Self {
|
|
Self {
|
|
max_trigger_depth: 8,
|
|
retry_max_attempts: 3,
|
|
retry_backoff: BackoffShape::Exponential,
|
|
retry_base_ms: 1000,
|
|
retry_jitter_pct: 20,
|
|
dead_letter_retention_days: 30,
|
|
abandoned_retention_days: 7,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn from_env() -> Self {
|
|
let mut c = Self::conservative();
|
|
load_u32(&mut c.max_trigger_depth, "PICLOUD_MAX_TRIGGER_DEPTH");
|
|
load_u32(
|
|
&mut c.retry_max_attempts,
|
|
"PICLOUD_TRIGGER_RETRY_MAX_ATTEMPTS",
|
|
);
|
|
load_backoff(&mut c.retry_backoff, "PICLOUD_TRIGGER_RETRY_BACKOFF");
|
|
load_u32(&mut c.retry_base_ms, "PICLOUD_TRIGGER_RETRY_BASE_MS");
|
|
load_u32(&mut c.retry_jitter_pct, "PICLOUD_TRIGGER_RETRY_JITTER_PCT");
|
|
load_u32(
|
|
&mut c.dead_letter_retention_days,
|
|
"PICLOUD_DEAD_LETTER_RETENTION_DAYS",
|
|
);
|
|
load_u32(
|
|
&mut c.abandoned_retention_days,
|
|
"PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS",
|
|
);
|
|
c
|
|
}
|
|
}
|
|
|
|
impl Default for TriggerConfig {
|
|
fn default() -> Self {
|
|
Self::conservative()
|
|
}
|
|
}
|
|
|
|
fn load_u32(dst: &mut u32, key: &str) {
|
|
if let Ok(v) = env::var(key) {
|
|
match v.parse::<u32>() {
|
|
Ok(n) => *dst = n,
|
|
Err(e) => {
|
|
tracing::warn!(env = key, error = %e, "ignoring invalid trigger-config value");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
fn load_backoff(dst: &mut BackoffShape, key: &str) {
|
|
if let Ok(v) = env::var(key) {
|
|
match BackoffShape::from_wire(&v) {
|
|
Some(b) => *dst = b,
|
|
None => {
|
|
tracing::warn!(
|
|
env = key,
|
|
value = %v,
|
|
"ignoring invalid trigger-config backoff shape (use exponential|linear|constant)"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn conservative_defaults_match_design_notes() {
|
|
let c = TriggerConfig::conservative();
|
|
assert_eq!(c.max_trigger_depth, 8);
|
|
assert_eq!(c.retry_max_attempts, 3);
|
|
assert_eq!(c.retry_backoff, BackoffShape::Exponential);
|
|
assert_eq!(c.retry_base_ms, 1000);
|
|
assert_eq!(c.retry_jitter_pct, 20);
|
|
assert_eq!(c.dead_letter_retention_days, 30);
|
|
assert_eq!(c.abandoned_retention_days, 7);
|
|
}
|
|
|
|
#[test]
|
|
fn backoff_round_trips() {
|
|
for shape in [
|
|
BackoffShape::Exponential,
|
|
BackoffShape::Linear,
|
|
BackoffShape::Constant,
|
|
] {
|
|
assert_eq!(BackoffShape::from_wire(shape.as_str()), Some(shape));
|
|
}
|
|
assert_eq!(BackoffShape::from_wire("garbage"), None);
|
|
}
|
|
}
|