From 545d86319957199506962ee7e94b9dc1fd9687ce Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 1 Jun 2026 21:46:45 +0200 Subject: [PATCH] feat(v1.1.1-triggers): triggers + outbox schema + repos MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrations 0008-0011 lay down the triggers framework's storage: - `triggers` + `kv_trigger_details` + `dead_letter_trigger_details` (Layout E, design notes §2). Parent table carries common columns including `registered_by_principal` — the dispatcher uses this to run the trigger as the user that registered it (design notes §4). - `outbox`: universal async dispatch substrate. KV/cron/pubsub/queue/ email/dead-letter all write rows in the same shape; the dispatcher claims due rows via FOR UPDATE SKIP LOCKED. `reply_to` is the NATS-style inbox id for sync HTTP (commit 6) — its presence flags "don't retry" per the design. - `dead_letters`: exact schema from design notes §4 with the four- value `resolution` CHECK constraint (`replayed | ignored | handled_by_script | handler_failed`) and partial index on unresolved rows for the dashboard badge. - `abandoned_executions`: forensic table for the dispatcher's "tried to resolve a dropped inbox" edge case (design notes §3 #9). Repo surfaces with Postgres impls behind traits so unit tests can swap in-memory backings: - `TriggerRepo` — CRUD + the `list_matching_kv` / `list_matching_dead_letter` hot paths the dispatcher uses. Includes a `collection_matches` helper that handles `*`, `prefix:*`, and exact-name globs. - `OutboxRepo` — insert + claim-due + delete + reschedule. - `DeadLetterRepo` — insert + get + list + unresolved-count + resolve + GC. - `AbandonedRepo` — insert + GC. `TriggerConfig::from_env` (new module) follows the existing `SandboxCeiling` env-loading pattern for `PICLOUD_MAX_TRIGGER_DEPTH`, `PICLOUD_TRIGGER_RETRY_*`, `PICLOUD_DEAD_LETTER_RETENTION_DAYS`, and `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS`. `Capability::AppManageTriggers(AppId)` and `AppDeadLetterManage(AppId)` join the enum. Both map onto the existing `Scope::AppAdmin` per the seven-scope commitment; `role_satisfies` grants them at the `AppAdmin` per-app role. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../manager-core/migrations/0008_triggers.sql | 72 ++ .../manager-core/migrations/0009_outbox.sql | 64 ++ .../migrations/0010_dead_letters.sql | 50 ++ .../migrations/0011_abandoned_executions.sql | 31 + crates/manager-core/src/abandoned_repo.rs | 128 ++++ crates/manager-core/src/authz.rs | 21 +- crates/manager-core/src/dead_letter_repo.rs | 261 ++++++++ crates/manager-core/src/lib.rs | 20 + crates/manager-core/src/outbox_repo.rs | 233 +++++++ crates/manager-core/src/trigger_config.rs | 157 +++++ crates/manager-core/src/trigger_repo.rs | 617 ++++++++++++++++++ 11 files changed, 1651 insertions(+), 3 deletions(-) create mode 100644 crates/manager-core/migrations/0008_triggers.sql create mode 100644 crates/manager-core/migrations/0009_outbox.sql create mode 100644 crates/manager-core/migrations/0010_dead_letters.sql create mode 100644 crates/manager-core/migrations/0011_abandoned_executions.sql create mode 100644 crates/manager-core/src/abandoned_repo.rs create mode 100644 crates/manager-core/src/dead_letter_repo.rs create mode 100644 crates/manager-core/src/outbox_repo.rs create mode 100644 crates/manager-core/src/trigger_config.rs create mode 100644 crates/manager-core/src/trigger_repo.rs diff --git a/crates/manager-core/migrations/0008_triggers.sql b/crates/manager-core/migrations/0008_triggers.sql new file mode 100644 index 0000000..4cf65a1 --- /dev/null +++ b/crates/manager-core/migrations/0008_triggers.sql @@ -0,0 +1,72 @@ +-- v1.1.1: Trigger framework — Layout E (design notes §2 + §7). +-- +-- A parent `triggers` table holds the common columns (script_id, retry +-- config, dispatch_mode, registered-by principal); per-kind detail +-- tables hold the kind-specific filter columns. v1.1.1 ships two +-- kinds: KV (collection_glob + ops) and dead_letter (source / trigger +-- / script filters). Future kinds (cron, pubsub, queue, email) extend +-- the parent and add their own detail table. +-- +-- `registered_by_principal` captures the admin user that registered +-- the trigger. The dispatcher resolves this back to a `Principal` at +-- execution time so the trigger runs as the user that set it up +-- (design notes §4: "a trigger execution runs as the principal that +-- registered the trigger"). +-- +-- HTTP routes stay in their own `routes` table for now (Phase 3 +-- production schema with its own trie-index columns); the dispatcher +-- discriminates HTTP outbox rows by `source_kind = 'http'` and +-- `trigger_id` referencing `routes.id`. Folding routes into triggers +-- is a v1.2 cleanup, not a v1.1.1 requirement. + +CREATE TABLE triggers ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + script_id UUID NOT NULL REFERENCES scripts(id) ON DELETE CASCADE, + kind TEXT NOT NULL CHECK (kind IN ('kv', 'dead_letter')), + enabled BOOLEAN NOT NULL DEFAULT TRUE, + -- Async by default — sync would mean the trigger fires inline with + -- the originating mutation, which v1.1.1 doesn't support. + dispatch_mode TEXT NOT NULL DEFAULT 'async' + CHECK (dispatch_mode IN ('sync', 'async')), + -- Defaults applied at write time so the row is auditable on its + -- own. Per-trigger overrides set on create; the env-defined + -- defaults provide the fallback values. + retry_max_attempts INT NOT NULL, + retry_backoff TEXT NOT NULL + CHECK (retry_backoff IN ('exponential', 'linear', 'constant')), + retry_base_ms INT NOT NULL, + registered_by_principal UUID NOT NULL REFERENCES admin_users(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- The dispatcher's hot lookup: "all enabled triggers for app X of +-- kind Y". Indexed only when enabled = TRUE so disabled rows don't +-- pollute the index. +CREATE INDEX idx_triggers_app_kind_enabled + ON triggers (app_id, kind) + WHERE enabled = TRUE; + +-- One row per KV trigger. `collection_glob` accepts: +-- "*" — any collection in the app +-- "widgets" — exact match +-- "users:*" — prefix wildcard (matched in Rust, not SQL) +-- `ops` is the subset of {insert, update, delete} this trigger +-- subscribes to. Empty array means "any op" (the trigger fires on +-- every mutation; admin endpoint validates this). +CREATE TABLE kv_trigger_details ( + trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE, + collection_glob TEXT NOT NULL, + ops TEXT[] NOT NULL +); + +-- One row per dead-letter trigger. All three filter columns are +-- nullable — NULL means "no filter on this dimension". A trigger +-- with all three nullable filters fires on every dead-letter row. +CREATE TABLE dead_letter_trigger_details ( + trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE, + source_filter TEXT, + trigger_id_filter UUID, + script_id_filter UUID +); diff --git a/crates/manager-core/migrations/0009_outbox.sql b/crates/manager-core/migrations/0009_outbox.sql new file mode 100644 index 0000000..8c05680 --- /dev/null +++ b/crates/manager-core/migrations/0009_outbox.sql @@ -0,0 +1,64 @@ +-- v1.1.1: Universal trigger outbox — design notes §2. +-- +-- One table for every async dispatch in the system. KV/cron/pubsub/ +-- queue/email/dead-letter all write rows in this shape; the dispatcher +-- claims due rows with `FOR UPDATE SKIP LOCKED` and routes them to +-- the executor. +-- +-- Sync HTTP also writes here (NATS-style inbox, design notes §3) — +-- `reply_to` carries an `inbox_id` that the orchestrator awaits on a +-- oneshot channel. `reply_to.is_some()` is the "don't retry" signal: +-- one attempt, surface the result via the inbox. +-- +-- `trigger_id` is a polymorphic reference discriminated by +-- `source_kind`: for `source_kind='http'` it references `routes.id`; +-- otherwise it references `triggers.id`. Polymorphism handled in +-- Rust (the dispatcher); no DB-level FK because Postgres doesn't +-- support polymorphic FKs cleanly. NULL is allowed because direct +-- admin-replay paths may not have a triggering row at all. +-- +-- `script_id` denormalized so the dispatcher resolves the target +-- script without an extra round-trip per row. + +CREATE TABLE outbox ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + source_kind TEXT NOT NULL + CHECK (source_kind IN ('http', 'kv', 'dead_letter')), + -- Polymorphic — see comment above. No FK constraint. + trigger_id UUID, + -- Pre-resolved at write time so the dispatcher doesn't re-look it up. + script_id UUID, + -- NULL = async (retry per policy). Some(inbox_id) = sync HTTP + -- (never retry; resolve the inbox with the result). + reply_to UUID, + -- ServiceEvent + ExecRequest scaffold serialized as JSONB. + payload JSONB NOT NULL, + -- Forensic field — the principal that triggered the originating + -- event. NOT the execution principal for trigger fan-out (that + -- comes from `triggers.registered_by_principal`). + origin_principal UUID, + -- Trigger-depth as the dispatcher will hand it to the executor. + -- Read out into ExecRequest.trigger_depth at dispatch time. + trigger_depth INT NOT NULL DEFAULT 0, + -- Originating execution id (for audit log grouping). Equals the + -- root for direct invocations; preserved across fan-out chains. + root_execution_id UUID, + attempt_count INT NOT NULL DEFAULT 0, + next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + -- Set inside the SELECT FOR UPDATE SKIP LOCKED transaction so + -- the dispatcher can't double-pick a row across concurrent loop + -- iterations. + claimed_at TIMESTAMPTZ, + claimed_by TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Hot index: the dispatcher's `WHERE next_attempt_at <= NOW() AND +-- claimed_at IS NULL` claim query. Partial index keeps the hot set +-- small even if the table grows large. +CREATE INDEX idx_outbox_due + ON outbox (next_attempt_at) + WHERE claimed_at IS NULL; + +CREATE INDEX idx_outbox_app ON outbox (app_id); diff --git a/crates/manager-core/migrations/0010_dead_letters.sql b/crates/manager-core/migrations/0010_dead_letters.sql new file mode 100644 index 0000000..89ddfd2 --- /dev/null +++ b/crates/manager-core/migrations/0010_dead_letters.sql @@ -0,0 +1,50 @@ +-- v1.1.1: dead_letters — design notes §4. +-- +-- Async invocations that exhaust their retry policy land here. Each +-- row carries the original event payload verbatim plus the attempt +-- history so handlers (registered via `dead_letter` triggers) and the +-- dashboard can decide what to do. +-- +-- Schema mirrors design notes §4. The CHECK constraint on +-- `resolution` enforces the closed vocabulary used by both the SDK +-- (`dead_letters::resolve(id, reason)`) and the recursion-stop rule +-- (`handler_failed`). Sync HTTP failures (`reply_to.is_some()`) never +-- land here — they're served via the inbox channel. +-- +-- Indexes: +-- - partial index on unresolved rows: the dashboard's +-- unresolved-count badge query (`COUNT(*) WHERE app_id = $1 AND +-- resolved_at IS NULL`). +-- - GC index on `created_at`: the weekly retention sweep. + +CREATE TABLE dead_letters ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + -- The outbox.id row that exhausted retries. The outbox row itself + -- has been deleted at this point. + original_event_id UUID NOT NULL, + source TEXT NOT NULL, + op TEXT NOT NULL, + -- Nullable because direct admin replays may have no trigger row. + trigger_id UUID, + script_id UUID, + payload JSONB NOT NULL, + attempt_count INT NOT NULL, + first_attempt_at TIMESTAMPTZ NOT NULL, + last_attempt_at TIMESTAMPTZ NOT NULL, + last_error TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ, + resolution TEXT + CHECK (resolution IN + ('replayed', 'ignored', 'handled_by_script', 'handler_failed')) +); + +-- Dashboard unresolved-count badge — partial index on the predicate +-- the query uses. +CREATE INDEX idx_dead_letters_app_unresolved + ON dead_letters (app_id) + WHERE resolved_at IS NULL; + +-- GC sweep scans by creation time. +CREATE INDEX idx_dead_letters_gc ON dead_letters (created_at); diff --git a/crates/manager-core/migrations/0011_abandoned_executions.sql b/crates/manager-core/migrations/0011_abandoned_executions.sql new file mode 100644 index 0000000..e83d5e7 --- /dev/null +++ b/crates/manager-core/migrations/0011_abandoned_executions.sql @@ -0,0 +1,31 @@ +-- v1.1.1: abandoned_executions — design notes §3 #9. +-- +-- Forensic table for the "dispatcher tried to resolve a oneshot inbox +-- but the receiver was already dropped" edge case. The orchestrator +-- timed out (returned 504 to the caller) and gave up on the channel, +-- but then the dispatcher's execution succeeded later. The caller +-- never sees the result; the row exists so the operator can +-- correlate when the abandoned-counter metric spikes. +-- +-- Only the dispatcher-after-orchestrator-timeout edge case writes +-- here; ordinary "script timed out, caller got 504" stays uneventful. +-- +-- 7-day retention, GC by `created_at`, sweep alongside dead_letters. + +CREATE TABLE abandoned_executions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + -- Original outbox row id (the row itself has been deleted). + outbox_id UUID NOT NULL, + script_id UUID, + -- The inbox channel id the dispatcher tried to resolve. + inbox_id UUID NOT NULL, + -- The HTTP status code the dispatcher attempted to send back. + status_code INT NOT NULL, + -- Truncated body / error description (capped at write time — + -- the dispatcher doesn't need to ship megabytes here). + result_summary TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_abandoned_executions_gc ON abandoned_executions (created_at); diff --git a/crates/manager-core/src/abandoned_repo.rs b/crates/manager-core/src/abandoned_repo.rs new file mode 100644 index 0000000..4637276 --- /dev/null +++ b/crates/manager-core/src/abandoned_repo.rs @@ -0,0 +1,128 @@ +//! `AbandonedExecutionsRepo` — forensic table written by the +//! dispatcher when it tries to resolve a sync-HTTP inbox channel +//! that's already been dropped (orchestrator timed out and gave up). +//! +//! Schema: see `migrations/0011_abandoned_executions.sql`. +//! +//! Tiny surface: insert + GC. Reading happens via direct SQL when +//! correlating the metric counter spike. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use picloud_shared::{AppId, ScriptId}; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, thiserror::Error)] +pub enum AbandonedRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), +} + +#[derive(Debug, Clone)] +pub struct NewAbandonedExecution { + pub app_id: AppId, + pub outbox_id: Uuid, + pub script_id: Option, + pub inbox_id: Uuid, + pub status_code: u16, + pub result_summary: Option, +} + +#[async_trait] +pub trait AbandonedRepo: Send + Sync { + async fn insert(&self, row: NewAbandonedExecution) -> Result; + + /// Retention sweep — deletes rows older than `older_than` up to + /// `limit` at a time. + async fn gc(&self, older_than: DateTime, limit: i64) -> Result; +} + +pub struct PostgresAbandonedRepo { + pool: PgPool, +} + +impl PostgresAbandonedRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +const SUMMARY_CAP_BYTES: usize = 4096; + +#[async_trait] +impl AbandonedRepo for PostgresAbandonedRepo { + async fn insert(&self, row: NewAbandonedExecution) -> Result { + // Truncate the summary at write-time. The forensic table + // doesn't need megabytes; the original outbox row may have + // been arbitrary size but we lose nothing useful by clipping. + let summary = row.result_summary.map(|s| truncate(s, SUMMARY_CAP_BYTES)); + let (id,): (Uuid,) = sqlx::query_as( + "INSERT INTO abandoned_executions ( \ + app_id, outbox_id, script_id, inbox_id, status_code, result_summary \ + ) VALUES ($1, $2, $3, $4, $5, $6) \ + RETURNING id", + ) + .bind(row.app_id.into_inner()) + .bind(row.outbox_id) + .bind(row.script_id.map(ScriptId::into_inner)) + .bind(row.inbox_id) + .bind(i32::from(row.status_code)) + .bind(summary) + .fetch_one(&self.pool) + .await?; + Ok(id) + } + + async fn gc(&self, older_than: DateTime, limit: i64) -> Result { + let res = sqlx::query( + "DELETE FROM abandoned_executions \ + WHERE id IN ( \ + SELECT id FROM abandoned_executions \ + WHERE created_at < $1 \ + FOR UPDATE SKIP LOCKED \ + LIMIT $2 \ + )", + ) + .bind(older_than) + .bind(limit) + .execute(&self.pool) + .await?; + Ok(res.rows_affected()) + } +} + +fn truncate(mut s: String, max_bytes: usize) -> String { + if s.len() <= max_bytes { + return s; + } + // Walk back from `max_bytes` to a UTF-8 char boundary so we never + // panic on `truncate` mid-codepoint. + let mut cut = max_bytes; + while cut > 0 && !s.is_char_boundary(cut) { + cut -= 1; + } + s.truncate(cut); + s +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn truncate_respects_char_boundaries() { + // 3-byte UTF-8 chars; cap inside the middle char should walk + // back to the start. + let s = "héllo".to_string(); + let t = truncate(s, 2); + assert!(t.is_char_boundary(t.len())); + assert_eq!(t, "h"); + } + + #[test] + fn truncate_passthrough_for_short_strings() { + assert_eq!(truncate("ok".into(), 100), "ok"); + } +} diff --git a/crates/manager-core/src/authz.rs b/crates/manager-core/src/authz.rs index a19a055..e241186 100644 --- a/crates/manager-core/src/authz.rs +++ b/crates/manager-core/src/authz.rs @@ -64,6 +64,14 @@ pub enum Capability { /// Write entries to this app's KV store (v1.1.1). Granted to /// `editor`+. Maps to `script:write` on API keys. AppKvWrite(AppId), + /// Create / list / delete triggers for this app (v1.1.1). Maps to + /// `app:admin` on API keys — triggers are app-configuration acts + /// rather than data-plane access. Granted to `app_admin`+. + AppManageTriggers(AppId), + /// Replay / resolve dead-letter rows for this app (v1.1.1). Maps + /// to `app:admin` on API keys. Public-HTTP scripts (principal None) + /// fail this check — managing dead letters is an admin act. + AppDeadLetterManage(AppId), } impl Capability { @@ -82,7 +90,9 @@ impl Capability { | Self::AppAdmin(id) | Self::AppLogRead(id) | Self::AppKvRead(id) - | Self::AppKvWrite(id) => Some(id), + | Self::AppKvWrite(id) + | Self::AppManageTriggers(id) + | Self::AppDeadLetterManage(id) => Some(id), } } @@ -101,7 +111,9 @@ impl Capability { Self::AppWriteScript(_) | Self::AppKvWrite(_) => Scope::ScriptWrite, Self::AppWriteRoute(_) => Scope::RouteWrite, Self::AppManageDomains(_) => Scope::DomainManage, - Self::AppAdmin(_) => Scope::AppAdmin, + Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => { + Scope::AppAdmin + } Self::AppLogRead(_) => Scope::LogRead, } } @@ -253,7 +265,10 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool { let in_app_admin = in_editor || matches!( cap, - Capability::AppManageDomains(_) | Capability::AppAdmin(_) + Capability::AppManageDomains(_) + | Capability::AppAdmin(_) + | Capability::AppManageTriggers(_) + | Capability::AppDeadLetterManage(_) ); match role { AppRole::Viewer => in_viewer, diff --git a/crates/manager-core/src/dead_letter_repo.rs b/crates/manager-core/src/dead_letter_repo.rs new file mode 100644 index 0000000..8ee53b3 --- /dev/null +++ b/crates/manager-core/src/dead_letter_repo.rs @@ -0,0 +1,261 @@ +//! `DeadLetterRepo` — CRUD over the `dead_letters` table. +//! +//! The dispatcher writes new rows when an async trigger exhausts its +//! retry policy. Admin endpoints (commit 8) read for the dashboard +//! list view and write to mark rows resolved or replay them. The GC +//! sweeper (commit 10) deletes expired rows by `created_at`. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use picloud_shared::{AppId, DeadLetterId, ScriptId, TriggerId}; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, thiserror::Error)] +pub enum DeadLetterRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), + + #[error("dead-letter row not found: {0}")] + NotFound(DeadLetterId), + + #[error("invalid resolution {0:?}")] + InvalidResolution(String), +} + +#[derive(Debug, Clone)] +pub struct NewDeadLetter { + pub app_id: AppId, + /// `outbox.id` that exhausted retries. Outbox row deleted at the + /// same time. + pub original_event_id: Uuid, + pub source: String, + pub op: String, + pub trigger_id: Option, + pub script_id: Option, + pub payload: serde_json::Value, + pub attempt_count: u32, + pub first_attempt_at: DateTime, + pub last_attempt_at: DateTime, + pub last_error: String, +} + +#[derive(Debug, Clone)] +pub struct DeadLetterRow { + pub id: DeadLetterId, + pub app_id: AppId, + pub original_event_id: Uuid, + pub source: String, + pub op: String, + pub trigger_id: Option, + pub script_id: Option, + pub payload: serde_json::Value, + pub attempt_count: u32, + pub first_attempt_at: DateTime, + pub last_attempt_at: DateTime, + pub last_error: String, + pub created_at: DateTime, + pub resolved_at: Option>, + pub resolution: Option, +} + +#[async_trait] +pub trait DeadLetterRepo: Send + Sync { + /// Insert a new dead-letter row. Returns the assigned id. + async fn insert(&self, row: NewDeadLetter) -> Result; + + async fn get(&self, id: DeadLetterId) -> Result, DeadLetterRepoError>; + + /// Lookup for the dashboard list view. `unresolved_only=true` + /// filters to `resolved_at IS NULL`. + async fn list_for_app( + &self, + app_id: AppId, + unresolved_only: bool, + limit: i64, + offset: i64, + ) -> Result, DeadLetterRepoError>; + + /// Hot path for the dashboard's per-app unresolved-count badge. + async fn unresolved_count(&self, app_id: AppId) -> Result; + + /// Mark the row resolved with the given reason. The reason MUST + /// be one of the four CHECK-constraint values + /// (`replayed`, `ignored`, `handled_by_script`, `handler_failed`). + async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError>; + + /// Retention sweep. Deletes rows with `created_at < older_than` + /// up to `limit` at a time, using FOR UPDATE SKIP LOCKED to play + /// nicely with concurrent dispatchers. Returns the count deleted. + async fn gc(&self, older_than: DateTime, limit: i64) -> Result; +} + +pub struct PostgresDeadLetterRepo { + pool: PgPool, +} + +impl PostgresDeadLetterRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +const ALLOWED_RESOLUTIONS: &[&str] = + &["replayed", "ignored", "handled_by_script", "handler_failed"]; + +#[async_trait] +impl DeadLetterRepo for PostgresDeadLetterRepo { + async fn insert(&self, row: NewDeadLetter) -> Result { + let (id,): (Uuid,) = sqlx::query_as( + "INSERT INTO dead_letters ( \ + app_id, original_event_id, source, op, trigger_id, script_id, \ + payload, attempt_count, first_attempt_at, last_attempt_at, last_error \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \ + RETURNING id", + ) + .bind(row.app_id.into_inner()) + .bind(row.original_event_id) + .bind(row.source) + .bind(row.op) + .bind(row.trigger_id.map(TriggerId::into_inner)) + .bind(row.script_id.map(ScriptId::into_inner)) + .bind(row.payload) + .bind(i32::try_from(row.attempt_count).unwrap_or(0)) + .bind(row.first_attempt_at) + .bind(row.last_attempt_at) + .bind(row.last_error) + .fetch_one(&self.pool) + .await?; + Ok(id.into()) + } + + async fn get(&self, id: DeadLetterId) -> Result, DeadLetterRepoError> { + let row: Option = sqlx::query_as( + "SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \ + payload, attempt_count, first_attempt_at, last_attempt_at, \ + last_error, created_at, resolved_at, resolution \ + FROM dead_letters WHERE id = $1", + ) + .bind(id.into_inner()) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(DeadLetterRowRaw::into_row)) + } + + async fn list_for_app( + &self, + app_id: AppId, + unresolved_only: bool, + limit: i64, + offset: i64, + ) -> Result, DeadLetterRepoError> { + let rows: Vec = sqlx::query_as( + "SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \ + payload, attempt_count, first_attempt_at, last_attempt_at, \ + last_error, created_at, resolved_at, resolution \ + FROM dead_letters \ + WHERE app_id = $1 \ + AND ($2::bool = FALSE OR resolved_at IS NULL) \ + ORDER BY created_at DESC \ + LIMIT $3 OFFSET $4", + ) + .bind(app_id.into_inner()) + .bind(unresolved_only) + .bind(limit) + .bind(offset) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(DeadLetterRowRaw::into_row).collect()) + } + + async fn unresolved_count(&self, app_id: AppId) -> Result { + let (count,): (i64,) = sqlx::query_as( + "SELECT COUNT(*) FROM dead_letters \ + WHERE app_id = $1 AND resolved_at IS NULL", + ) + .bind(app_id.into_inner()) + .fetch_one(&self.pool) + .await?; + Ok(count) + } + + async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError> { + if !ALLOWED_RESOLUTIONS.contains(&reason) { + return Err(DeadLetterRepoError::InvalidResolution(reason.to_string())); + } + let res = sqlx::query( + "UPDATE dead_letters \ + SET resolution = $2, resolved_at = NOW() \ + WHERE id = $1", + ) + .bind(id.into_inner()) + .bind(reason) + .execute(&self.pool) + .await?; + if res.rows_affected() == 0 { + return Err(DeadLetterRepoError::NotFound(id)); + } + Ok(()) + } + + async fn gc(&self, older_than: DateTime, limit: i64) -> Result { + // Tombstones picked under FOR UPDATE SKIP LOCKED so concurrent + // sweepers (cluster mode) don't fight each other. + let res = sqlx::query( + "DELETE FROM dead_letters \ + WHERE id IN ( \ + SELECT id FROM dead_letters \ + WHERE created_at < $1 \ + FOR UPDATE SKIP LOCKED \ + LIMIT $2 \ + )", + ) + .bind(older_than) + .bind(limit) + .execute(&self.pool) + .await?; + Ok(res.rows_affected()) + } +} + +#[derive(sqlx::FromRow)] +struct DeadLetterRowRaw { + id: Uuid, + app_id: Uuid, + original_event_id: Uuid, + source: String, + op: String, + trigger_id: Option, + script_id: Option, + payload: serde_json::Value, + attempt_count: i32, + first_attempt_at: DateTime, + last_attempt_at: DateTime, + last_error: String, + created_at: DateTime, + resolved_at: Option>, + resolution: Option, +} + +impl DeadLetterRowRaw { + fn into_row(self) -> DeadLetterRow { + DeadLetterRow { + id: self.id.into(), + app_id: self.app_id.into(), + original_event_id: self.original_event_id, + source: self.source, + op: self.op, + trigger_id: self.trigger_id.map(Into::into), + script_id: self.script_id.map(Into::into), + payload: self.payload, + attempt_count: u32::try_from(self.attempt_count).unwrap_or(0), + first_attempt_at: self.first_attempt_at, + last_attempt_at: self.last_attempt_at, + last_error: self.last_error, + created_at: self.created_at, + resolved_at: self.resolved_at, + resolution: self.resolution, + } + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index b308916..3515684 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -4,6 +4,7 @@ //! the same DB for now; once we add caching and per-node ingress, the //! manager will publish change events. +pub mod abandoned_repo; pub mod admin_session_repo; pub mod admin_user_repo; pub mod admin_users_api; @@ -21,16 +22,23 @@ pub mod auth_api; pub mod auth_bootstrap; pub mod auth_middleware; pub mod authz; +pub mod dead_letter_repo; pub mod kv_repo; pub mod kv_service; pub mod log_sink; pub mod migrations; +pub mod outbox_repo; pub mod repo; pub mod route_admin; pub mod route_repo; pub mod sandbox; pub mod scheduler; +pub mod trigger_config; +pub mod trigger_repo; +pub use abandoned_repo::{ + AbandonedRepo, AbandonedRepoError, NewAbandonedExecution, PostgresAbandonedRepo, +}; pub use admin_session_repo::{ AdminSessionLookup, AdminSessionRepository, AdminSessionRepositoryError, PostgresAdminSessionRepository, @@ -65,9 +73,15 @@ pub use auth_middleware::{ API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE, }; pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision}; +pub use dead_letter_repo::{ + DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo, +}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_service::KvServiceImpl; pub use log_sink::PostgresExecutionLogSink; +pub use outbox_repo::{ + NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo, +}; pub use repo::{ ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository, RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError, @@ -75,3 +89,9 @@ pub use repo::{ pub use route_admin::{compile_routes, route_admin_router, RouteAdminState}; pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository}; pub use sandbox::{CeilingError, SandboxCeiling}; +pub use trigger_config::{BackoffShape, TriggerConfig}; +pub use trigger_repo::{ + collection_matches, CreateDeadLetterTrigger, CreateKvTrigger, DeadLetterTriggerMatch, + KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, + TriggerRepo, TriggerRepoError, +}; diff --git a/crates/manager-core/src/outbox_repo.rs b/crates/manager-core/src/outbox_repo.rs new file mode 100644 index 0000000..42d4812 --- /dev/null +++ b/crates/manager-core/src/outbox_repo.rs @@ -0,0 +1,233 @@ +//! `OutboxRepo` — universal trigger outbox CRUD. Hot writes come from +//! the `OutboxEventEmitter` (KV mutations fan out via this) and the +//! sync-HTTP path. Hot reads come from the dispatcher, which claims +//! due rows via `FOR UPDATE SKIP LOCKED`. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use picloud_shared::{AdminUserId, AppId, ExecutionId, ScriptId, TriggerId}; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, thiserror::Error)] +pub enum OutboxRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OutboxSourceKind { + Http, + Kv, + DeadLetter, +} + +impl OutboxSourceKind { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Http => "http", + Self::Kv => "kv", + Self::DeadLetter => "dead_letter", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "http" => Some(Self::Http), + "kv" => Some(Self::Kv), + "dead_letter" => Some(Self::DeadLetter), + _ => None, + } + } +} + +/// Insert payload — what each event source writes when fanning out +/// to the outbox. `payload` is the serialized `TriggerEvent` (plus +/// any extra context the dispatcher needs to reconstruct an +/// `ExecRequest`). +#[derive(Debug, Clone)] +pub struct NewOutboxRow { + pub app_id: AppId, + pub source_kind: OutboxSourceKind, + pub trigger_id: Option, + pub script_id: Option, + pub reply_to: Option, + pub payload: serde_json::Value, + pub origin_principal: Option, + pub trigger_depth: u32, + pub root_execution_id: Option, +} + +/// Row as the dispatcher sees it after a claim. +#[derive(Debug, Clone)] +pub struct OutboxRow { + pub id: Uuid, + pub app_id: AppId, + pub source_kind: OutboxSourceKind, + pub trigger_id: Option, + pub script_id: Option, + pub reply_to: Option, + pub payload: serde_json::Value, + pub origin_principal: Option, + pub trigger_depth: u32, + pub root_execution_id: Option, + pub attempt_count: u32, + pub next_attempt_at: DateTime, + pub created_at: DateTime, +} + +#[async_trait] +pub trait OutboxRepo: Send + Sync { + async fn insert(&self, row: NewOutboxRow) -> Result; + + /// Claim up to `limit` due rows. Wraps the claim in a single + /// transaction so two concurrent dispatchers (cluster mode) can't + /// double-pick a row. Empty Vec when nothing is due. + async fn claim_due( + &self, + claimed_by: &str, + limit: i64, + ) -> Result, OutboxRepoError>; + + /// Remove a row after a terminal outcome (success or dead-letter). + async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError>; + + /// Failure path: bump attempt_count, clear the claim, set the + /// next attempt time. The dispatcher computes the delay (with + /// backoff + jitter) and passes it in. + async fn reschedule( + &self, + id: Uuid, + attempt_count: u32, + next_attempt_at: DateTime, + ) -> Result<(), OutboxRepoError>; +} + +pub struct PostgresOutboxRepo { + pool: PgPool, +} + +impl PostgresOutboxRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl OutboxRepo for PostgresOutboxRepo { + async fn insert(&self, row: NewOutboxRow) -> Result { + let (id,): (Uuid,) = sqlx::query_as( + "INSERT INTO outbox ( \ + app_id, source_kind, trigger_id, script_id, reply_to, \ + payload, origin_principal, trigger_depth, root_execution_id \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ + RETURNING id", + ) + .bind(row.app_id.into_inner()) + .bind(row.source_kind.as_str()) + .bind(row.trigger_id.map(TriggerId::into_inner)) + .bind(row.script_id.map(ScriptId::into_inner)) + .bind(row.reply_to) + .bind(row.payload) + .bind(row.origin_principal.map(AdminUserId::into_inner)) + .bind(i32::try_from(row.trigger_depth).unwrap_or(0)) + .bind(row.root_execution_id.map(ExecutionId::into_inner)) + .fetch_one(&self.pool) + .await?; + Ok(id) + } + + async fn claim_due( + &self, + claimed_by: &str, + limit: i64, + ) -> Result, OutboxRepoError> { + let rows: Vec = sqlx::query_as( + "WITH due AS ( \ + SELECT id FROM outbox \ + WHERE claimed_at IS NULL AND next_attempt_at <= NOW() \ + ORDER BY next_attempt_at \ + FOR UPDATE SKIP LOCKED \ + LIMIT $1 \ + ) \ + UPDATE outbox SET claimed_at = NOW(), claimed_by = $2 \ + WHERE id IN (SELECT id FROM due) \ + RETURNING id, app_id, source_kind, trigger_id, script_id, reply_to, \ + payload, origin_principal, trigger_depth, \ + root_execution_id, attempt_count, next_attempt_at, created_at", + ) + .bind(limit) + .bind(claimed_by) + .fetch_all(&self.pool) + .await?; + + Ok(rows.into_iter().filter_map(OutboxRowRaw::hydrate).collect()) + } + + async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError> { + sqlx::query("DELETE FROM outbox WHERE id = $1") + .bind(id) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn reschedule( + &self, + id: Uuid, + attempt_count: u32, + next_attempt_at: DateTime, + ) -> Result<(), OutboxRepoError> { + sqlx::query( + "UPDATE outbox SET attempt_count = $2, next_attempt_at = $3, \ + claimed_at = NULL, claimed_by = NULL \ + WHERE id = $1", + ) + .bind(id) + .bind(i32::try_from(attempt_count).unwrap_or(0)) + .bind(next_attempt_at) + .execute(&self.pool) + .await?; + Ok(()) + } +} + +#[derive(sqlx::FromRow)] +struct OutboxRowRaw { + id: Uuid, + app_id: Uuid, + source_kind: String, + trigger_id: Option, + script_id: Option, + reply_to: Option, + payload: serde_json::Value, + origin_principal: Option, + trigger_depth: i32, + root_execution_id: Option, + attempt_count: i32, + next_attempt_at: DateTime, + created_at: DateTime, +} + +impl OutboxRowRaw { + fn hydrate(self) -> Option { + Some(OutboxRow { + id: self.id, + app_id: self.app_id.into(), + source_kind: OutboxSourceKind::from_wire(&self.source_kind)?, + trigger_id: self.trigger_id.map(Into::into), + script_id: self.script_id.map(Into::into), + reply_to: self.reply_to, + payload: self.payload, + origin_principal: self.origin_principal.map(Into::into), + trigger_depth: u32::try_from(self.trigger_depth).unwrap_or(0), + root_execution_id: self.root_execution_id.map(Into::into), + attempt_count: u32::try_from(self.attempt_count).unwrap_or(0), + next_attempt_at: self.next_attempt_at, + created_at: self.created_at, + }) + } +} diff --git a/crates/manager-core/src/trigger_config.rs b/crates/manager-core/src/trigger_config.rs new file mode 100644 index 0000000..71ddab2 --- /dev/null +++ b/crates/manager-core/src/trigger_config.rs @@ -0,0 +1,157 @@ +//! Trigger-framework tunables. Defaults match design notes §3 (retry +//! policy) and §4 (retention). Each knob is env-overridable via a +//! `PICLOUD_*` variable following the same `tracing::warn` on parse +//! error pattern `SandboxCeiling::from_env` uses. + +use std::env; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum BackoffShape { + Exponential, + Linear, + Constant, +} + +impl BackoffShape { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Exponential => "exponential", + Self::Linear => "linear", + Self::Constant => "constant", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "exponential" => Some(Self::Exponential), + "linear" => Some(Self::Linear), + "constant" => Some(Self::Constant), + _ => None, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct TriggerConfig { + /// Maximum `cx.trigger_depth` before the dispatcher refuses + /// execution. Above this, the row is dropped + a metric bumped; + /// it is NOT dead-lettered (design notes §4: depth-exceeded + /// means "you built a loop"). Default 8. + pub max_trigger_depth: u32, + + /// Default retry attempts (per-trigger override on the row). + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + /// ±jitter as a percentage of the computed delay. Applied at + /// dispatch time — not per-trigger. + pub retry_jitter_pct: u32, + + /// dead-letter retention before GC, in days. Default 30. + pub dead_letter_retention_days: u32, + /// abandoned-execution retention before GC, in days. Default 7. + pub abandoned_retention_days: u32, +} + +impl TriggerConfig { + #[must_use] + pub const fn conservative() -> Self { + Self { + max_trigger_depth: 8, + retry_max_attempts: 3, + retry_backoff: BackoffShape::Exponential, + retry_base_ms: 1000, + retry_jitter_pct: 20, + dead_letter_retention_days: 30, + abandoned_retention_days: 7, + } + } + + #[must_use] + pub fn from_env() -> Self { + let mut c = Self::conservative(); + load_u32(&mut c.max_trigger_depth, "PICLOUD_MAX_TRIGGER_DEPTH"); + load_u32( + &mut c.retry_max_attempts, + "PICLOUD_TRIGGER_RETRY_MAX_ATTEMPTS", + ); + load_backoff(&mut c.retry_backoff, "PICLOUD_TRIGGER_RETRY_BACKOFF"); + load_u32(&mut c.retry_base_ms, "PICLOUD_TRIGGER_RETRY_BASE_MS"); + load_u32(&mut c.retry_jitter_pct, "PICLOUD_TRIGGER_RETRY_JITTER_PCT"); + load_u32( + &mut c.dead_letter_retention_days, + "PICLOUD_DEAD_LETTER_RETENTION_DAYS", + ); + load_u32( + &mut c.abandoned_retention_days, + "PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS", + ); + c + } +} + +impl Default for TriggerConfig { + fn default() -> Self { + Self::conservative() + } +} + +fn load_u32(dst: &mut u32, key: &str) { + if let Ok(v) = env::var(key) { + match v.parse::() { + Ok(n) => *dst = n, + Err(e) => { + tracing::warn!(env = key, error = %e, "ignoring invalid trigger-config value"); + } + } + } +} + +fn load_backoff(dst: &mut BackoffShape, key: &str) { + if let Ok(v) = env::var(key) { + match BackoffShape::from_wire(&v) { + Some(b) => *dst = b, + None => { + tracing::warn!( + env = key, + value = %v, + "ignoring invalid trigger-config backoff shape (use exponential|linear|constant)" + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn conservative_defaults_match_design_notes() { + let c = TriggerConfig::conservative(); + assert_eq!(c.max_trigger_depth, 8); + assert_eq!(c.retry_max_attempts, 3); + assert_eq!(c.retry_backoff, BackoffShape::Exponential); + assert_eq!(c.retry_base_ms, 1000); + assert_eq!(c.retry_jitter_pct, 20); + assert_eq!(c.dead_letter_retention_days, 30); + assert_eq!(c.abandoned_retention_days, 7); + } + + #[test] + fn backoff_round_trips() { + for shape in [ + BackoffShape::Exponential, + BackoffShape::Linear, + BackoffShape::Constant, + ] { + assert_eq!(BackoffShape::from_wire(shape.as_str()), Some(shape)); + } + assert_eq!(BackoffShape::from_wire("garbage"), None); + } +} diff --git a/crates/manager-core/src/trigger_repo.rs b/crates/manager-core/src/trigger_repo.rs new file mode 100644 index 0000000..15d6937 --- /dev/null +++ b/crates/manager-core/src/trigger_repo.rs @@ -0,0 +1,617 @@ +//! `TriggerRepo` — CRUD over the `triggers` parent + per-kind detail +//! tables. The admin endpoints (commit 4) sit on top of this; the +//! dispatcher (commit 5) reads `list_matching_*` to fan out events to +//! handler scripts. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use picloud_shared::{AdminUserId, AppId, KvEventOp, ScriptId, TriggerId}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use uuid::Uuid; + +use crate::trigger_config::BackoffShape; + +#[derive(Debug, thiserror::Error)] +pub enum TriggerRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), + + #[error("trigger not found: {0}")] + NotFound(TriggerId), + + #[error("invalid trigger payload: {0}")] + Invalid(String), +} + +/// Parent-table row plus the per-kind detail merged in. Serialized +/// back to admin clients via the JSON API. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Trigger { + pub id: TriggerId, + pub app_id: AppId, + pub script_id: ScriptId, + pub kind: TriggerKind, + pub enabled: bool, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, + pub created_at: DateTime, + pub updated_at: DateTime, + pub details: TriggerDetails, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TriggerKind { + Kv, + DeadLetter, +} + +impl TriggerKind { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Kv => "kv", + Self::DeadLetter => "dead_letter", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "kv" => Some(Self::Kv), + "dead_letter" => Some(Self::DeadLetter), + _ => None, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TriggerDispatchMode { + Sync, + Async, +} + +impl TriggerDispatchMode { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Sync => "sync", + Self::Async => "async", + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum TriggerDetails { + Kv { + collection_glob: String, + ops: Vec, + }, + DeadLetter { + #[serde(default, skip_serializing_if = "Option::is_none")] + source_filter: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + trigger_id_filter: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + script_id_filter: Option, + }, +} + +/// Create payload for a KV trigger. Defaults applied at the admin +/// layer (uses `TriggerConfig::from_env` to fill retry settings if +/// the request omitted them — keeps the row auditable). +#[derive(Debug, Clone)] +pub struct CreateKvTrigger { + pub script_id: ScriptId, + pub collection_glob: String, + pub ops: Vec, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, +} + +#[derive(Debug, Clone)] +pub struct CreateDeadLetterTrigger { + pub script_id: ScriptId, + pub source_filter: Option, + pub trigger_id_filter: Option, + pub script_id_filter: Option, + pub registered_by_principal: AdminUserId, +} + +/// One match for the dispatcher's "which KV triggers fire on this +/// event" lookup. Carries everything the dispatcher needs to construct +/// the outbox row. +#[derive(Debug, Clone)] +pub struct KvTriggerMatch { + pub trigger_id: TriggerId, + pub script_id: ScriptId, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, +} + +/// One match for the dispatcher's "which dead-letter triggers fire +/// on this dead-letter row" lookup. +#[derive(Debug, Clone)] +pub struct DeadLetterTriggerMatch { + pub trigger_id: TriggerId, + pub script_id: ScriptId, + pub dispatch_mode: TriggerDispatchMode, + pub registered_by_principal: AdminUserId, +} + +#[async_trait] +pub trait TriggerRepo: Send + Sync { + async fn create_kv_trigger( + &self, + app_id: AppId, + req: CreateKvTrigger, + ) -> Result; + + async fn create_dead_letter_trigger( + &self, + app_id: AppId, + req: CreateDeadLetterTrigger, + ) -> Result; + + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError>; + + async fn get(&self, id: TriggerId) -> Result, TriggerRepoError>; + + async fn delete(&self, id: TriggerId) -> Result; + + /// Dispatcher hot path: find every enabled KV trigger in `app_id` + /// whose `collection_glob` matches `collection` and whose `ops` + /// covers `op`. Glob matching done in Rust (the column is plain + /// TEXT, the matcher applies "*"/"prefix:*" semantics). + async fn list_matching_kv( + &self, + app_id: AppId, + collection: &str, + op: KvEventOp, + ) -> Result, TriggerRepoError>; + + /// Dispatcher hot path for dead-letter fan-out. Filters: source + /// (or any-source), originating trigger_id (or any), originating + /// script_id (or any). Each filter is "match OR is_null". + async fn list_matching_dead_letter( + &self, + app_id: AppId, + source: &str, + trigger_id: Option, + script_id: Option, + ) -> Result, TriggerRepoError>; +} + +// ---------------------------------------------------------------------------- +// Postgres impl +// ---------------------------------------------------------------------------- + +pub struct PostgresTriggerRepo { + pool: PgPool, +} + +impl PostgresTriggerRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl TriggerRepo for PostgresTriggerRepo { + async fn create_kv_trigger( + &self, + app_id: AppId, + req: CreateKvTrigger, + ) -> Result { + if req.collection_glob.is_empty() { + return Err(TriggerRepoError::Invalid( + "collection_glob must not be empty".into(), + )); + } + let mut tx = self.pool.begin().await?; + let parent: TriggerRow = sqlx::query_as( + "INSERT INTO triggers ( \ + app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal \ + ) VALUES ($1, $2, 'kv', TRUE, $3, $4, $5, $6, $7) \ + RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(req.script_id.into_inner()) + .bind(req.dispatch_mode.as_str()) + .bind(i32::try_from(req.retry_max_attempts).unwrap_or(3)) + .bind(req.retry_backoff.as_str()) + .bind(i32::try_from(req.retry_base_ms).unwrap_or(1000)) + .bind(req.registered_by_principal.into_inner()) + .fetch_one(&mut *tx) + .await?; + + let ops_str: Vec = req.ops.iter().map(|o| o.as_str().to_string()).collect(); + sqlx::query( + "INSERT INTO kv_trigger_details (trigger_id, collection_glob, ops) \ + VALUES ($1, $2, $3)", + ) + .bind(parent.id) + .bind(&req.collection_glob) + .bind(&ops_str) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind: TriggerKind::Kv, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details: TriggerDetails::Kv { + collection_glob: req.collection_glob, + ops: req.ops, + }, + }) + } + + async fn create_dead_letter_trigger( + &self, + app_id: AppId, + req: CreateDeadLetterTrigger, + ) -> Result { + let mut tx = self.pool.begin().await?; + // Dead-letter triggers force max_attempts=1 (design notes §4 + // recursion-stop). Backoff/base_ms irrelevant but the columns + // are NOT NULL — store sensible values. + let parent: TriggerRow = sqlx::query_as( + "INSERT INTO triggers ( \ + app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal \ + ) VALUES ($1, $2, 'dead_letter', TRUE, 'async', 1, 'constant', 0, $3) \ + RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(req.script_id.into_inner()) + .bind(req.registered_by_principal.into_inner()) + .fetch_one(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO dead_letter_trigger_details \ + (trigger_id, source_filter, trigger_id_filter, script_id_filter) \ + VALUES ($1, $2, $3, $4)", + ) + .bind(parent.id) + .bind(req.source_filter.as_deref()) + .bind(req.trigger_id_filter.map(TriggerId::into_inner)) + .bind(req.script_id_filter.map(ScriptId::into_inner)) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind: TriggerKind::DeadLetter, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(1), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Constant), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(0), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details: TriggerDetails::DeadLetter { + source_filter: req.source_filter, + trigger_id_filter: req.trigger_id_filter, + script_id_filter: req.script_id_filter, + }, + }) + } + + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { + let parents: Vec = sqlx::query_as( + "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at \ + FROM triggers WHERE app_id = $1 ORDER BY created_at DESC", + ) + .bind(app_id.into_inner()) + .fetch_all(&self.pool) + .await?; + + let mut out = Vec::with_capacity(parents.len()); + for p in parents { + out.push(hydrate_one(&self.pool, p).await?); + } + Ok(out) + } + + async fn get(&self, id: TriggerId) -> Result, TriggerRepoError> { + let parent: Option = sqlx::query_as( + "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at \ + FROM triggers WHERE id = $1", + ) + .bind(id.into_inner()) + .fetch_optional(&self.pool) + .await?; + match parent { + Some(p) => Ok(Some(hydrate_one(&self.pool, p).await?)), + None => Ok(None), + } + } + + async fn delete(&self, id: TriggerId) -> Result { + // ON DELETE CASCADE on the detail tables takes care of them. + let res = sqlx::query("DELETE FROM triggers WHERE id = $1") + .bind(id.into_inner()) + .execute(&self.pool) + .await?; + Ok(res.rows_affected() > 0) + } + + async fn list_matching_kv( + &self, + app_id: AppId, + collection: &str, + op: KvEventOp, + ) -> Result, TriggerRepoError> { + // Fetch all enabled KV triggers for the app — glob matching + // happens in Rust so we don't have to teach the query about + // `*` and `prefix:*`. Sets are tiny in practice (one app's + // worth of triggers, usually a handful). + let rows: Vec = sqlx::query_as( + "SELECT t.id, t.script_id, t.dispatch_mode, \ + t.retry_max_attempts, t.retry_backoff, t.retry_base_ms, \ + t.registered_by_principal, \ + d.collection_glob, d.ops \ + FROM triggers t \ + JOIN kv_trigger_details d ON d.trigger_id = t.id \ + WHERE t.app_id = $1 AND t.kind = 'kv' AND t.enabled = TRUE", + ) + .bind(app_id.into_inner()) + .fetch_all(&self.pool) + .await?; + + let op_str = op.as_str(); + let mut out = Vec::new(); + for r in rows { + if !collection_matches(&r.collection_glob, collection) { + continue; + } + let any_op = r.ops.is_empty(); + if !any_op && !r.ops.iter().any(|o| o == op_str) { + continue; + } + out.push(KvTriggerMatch { + trigger_id: r.id.into(), + script_id: r.script_id.into(), + dispatch_mode: dispatch_from_str(&r.dispatch_mode), + retry_max_attempts: u32::try_from(r.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&r.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(r.retry_base_ms).unwrap_or(1000), + registered_by_principal: r.registered_by_principal.into(), + }); + } + Ok(out) + } + + async fn list_matching_dead_letter( + &self, + app_id: AppId, + source: &str, + trigger_id: Option, + script_id: Option, + ) -> Result, TriggerRepoError> { + let rows: Vec = sqlx::query_as( + "SELECT t.id, t.script_id, t.dispatch_mode, t.registered_by_principal, \ + d.source_filter, d.trigger_id_filter, d.script_id_filter \ + FROM triggers t \ + JOIN dead_letter_trigger_details d ON d.trigger_id = t.id \ + WHERE t.app_id = $1 AND t.kind = 'dead_letter' AND t.enabled = TRUE \ + AND (d.source_filter IS NULL OR d.source_filter = $2) \ + AND (d.trigger_id_filter IS NULL OR d.trigger_id_filter = $3) \ + AND (d.script_id_filter IS NULL OR d.script_id_filter = $4)", + ) + .bind(app_id.into_inner()) + .bind(source) + .bind(trigger_id.map(TriggerId::into_inner)) + .bind(script_id.map(ScriptId::into_inner)) + .fetch_all(&self.pool) + .await?; + + Ok(rows + .into_iter() + .map(|r| DeadLetterTriggerMatch { + trigger_id: r.id.into(), + script_id: r.script_id.into(), + dispatch_mode: dispatch_from_str(&r.dispatch_mode), + registered_by_principal: r.registered_by_principal.into(), + }) + .collect()) + } +} + +async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { + let kind = TriggerKind::from_wire(&parent.kind).ok_or_else(|| { + TriggerRepoError::Invalid(format!("unknown trigger kind {}", parent.kind)) + })?; + + let details = match kind { + TriggerKind::Kv => { + let row: KvDetailRow = sqlx::query_as( + "SELECT collection_glob, ops FROM kv_trigger_details WHERE trigger_id = $1", + ) + .bind(parent.id) + .fetch_one(pool) + .await?; + let ops = row + .ops + .iter() + .filter_map(|s| KvEventOp::from_wire(s)) + .collect(); + TriggerDetails::Kv { + collection_glob: row.collection_glob, + ops, + } + } + TriggerKind::DeadLetter => { + let row: DlDetailRow = sqlx::query_as( + "SELECT source_filter, trigger_id_filter, script_id_filter \ + FROM dead_letter_trigger_details WHERE trigger_id = $1", + ) + .bind(parent.id) + .fetch_one(pool) + .await?; + TriggerDetails::DeadLetter { + source_filter: row.source_filter, + trigger_id_filter: row.trigger_id_filter.map(Into::into), + script_id_filter: row.script_id_filter.map(Into::into), + } + } + }; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details, + }) +} + +fn dispatch_from_str(s: &str) -> TriggerDispatchMode { + match s { + "sync" => TriggerDispatchMode::Sync, + _ => TriggerDispatchMode::Async, + } +} + +/// Match a `collection_glob` against an actual `collection` name. +/// Supported forms (in priority order): +/// - `"*"` → matches every collection +/// - `"foo*"` → prefix match (anything starting with "foo") +/// - `"foo"` → exact match +#[must_use] +pub fn collection_matches(glob: &str, collection: &str) -> bool { + if glob == "*" { + return true; + } + if let Some(prefix) = glob.strip_suffix('*') { + return collection.starts_with(prefix); + } + glob == collection +} + +#[derive(sqlx::FromRow)] +struct TriggerRow { + id: Uuid, + app_id: Uuid, + script_id: Uuid, + kind: String, + enabled: bool, + dispatch_mode: String, + retry_max_attempts: i32, + retry_backoff: String, + retry_base_ms: i32, + registered_by_principal: Uuid, + created_at: DateTime, + updated_at: DateTime, +} + +#[derive(sqlx::FromRow)] +struct KvDetailRow { + collection_glob: String, + ops: Vec, +} + +#[derive(sqlx::FromRow)] +#[allow(clippy::struct_field_names)] +struct DlDetailRow { + source_filter: Option, + trigger_id_filter: Option, + script_id_filter: Option, +} + +#[derive(sqlx::FromRow)] +struct KvMatchRow { + id: Uuid, + script_id: Uuid, + dispatch_mode: String, + retry_max_attempts: i32, + retry_backoff: String, + retry_base_ms: i32, + registered_by_principal: Uuid, + collection_glob: String, + ops: Vec, +} + +#[derive(sqlx::FromRow)] +struct DlMatchRow { + id: Uuid, + script_id: Uuid, + dispatch_mode: String, + registered_by_principal: Uuid, + #[allow(dead_code)] + source_filter: Option, + #[allow(dead_code)] + trigger_id_filter: Option, + #[allow(dead_code)] + script_id_filter: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn collection_matcher_handles_star_prefix_exact() { + assert!(collection_matches("*", "widgets")); + assert!(collection_matches("*", "")); + assert!(collection_matches("users:*", "users:1")); + assert!(collection_matches("users:*", "users:")); + assert!(!collection_matches("users:*", "orgs:1")); + assert!(collection_matches("widgets", "widgets")); + assert!(!collection_matches("widgets", "Widgets")); + } +}