feat(v1.1.1-triggers): triggers + outbox schema + repos
Migrations 0008-0011 lay down the triggers framework's storage: - `triggers` + `kv_trigger_details` + `dead_letter_trigger_details` (Layout E, design notes §2). Parent table carries common columns including `registered_by_principal` — the dispatcher uses this to run the trigger as the user that registered it (design notes §4). - `outbox`: universal async dispatch substrate. KV/cron/pubsub/queue/ email/dead-letter all write rows in the same shape; the dispatcher claims due rows via FOR UPDATE SKIP LOCKED. `reply_to` is the NATS-style inbox id for sync HTTP (commit 6) — its presence flags "don't retry" per the design. - `dead_letters`: exact schema from design notes §4 with the four- value `resolution` CHECK constraint (`replayed | ignored | handled_by_script | handler_failed`) and partial index on unresolved rows for the dashboard badge. - `abandoned_executions`: forensic table for the dispatcher's "tried to resolve a dropped inbox" edge case (design notes §3 #9). Repo surfaces with Postgres impls behind traits so unit tests can swap in-memory backings: - `TriggerRepo` — CRUD + the `list_matching_kv` / `list_matching_dead_letter` hot paths the dispatcher uses. Includes a `collection_matches` helper that handles `*`, `prefix:*`, and exact-name globs. - `OutboxRepo` — insert + claim-due + delete + reschedule. - `DeadLetterRepo` — insert + get + list + unresolved-count + resolve + GC. - `AbandonedRepo` — insert + GC. `TriggerConfig::from_env` (new module) follows the existing `SandboxCeiling` env-loading pattern for `PICLOUD_MAX_TRIGGER_DEPTH`, `PICLOUD_TRIGGER_RETRY_*`, `PICLOUD_DEAD_LETTER_RETENTION_DAYS`, and `PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS`. `Capability::AppManageTriggers(AppId)` and `AppDeadLetterManage(AppId)` join the enum. Both map onto the existing `Scope::AppAdmin` per the seven-scope commitment; `role_satisfies` grants them at the `AppAdmin` per-app role. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
72
crates/manager-core/migrations/0008_triggers.sql
Normal file
72
crates/manager-core/migrations/0008_triggers.sql
Normal file
@@ -0,0 +1,72 @@
|
||||
-- v1.1.1: Trigger framework — Layout E (design notes §2 + §7).
|
||||
--
|
||||
-- A parent `triggers` table holds the common columns (script_id, retry
|
||||
-- config, dispatch_mode, registered-by principal); per-kind detail
|
||||
-- tables hold the kind-specific filter columns. v1.1.1 ships two
|
||||
-- kinds: KV (collection_glob + ops) and dead_letter (source / trigger
|
||||
-- / script filters). Future kinds (cron, pubsub, queue, email) extend
|
||||
-- the parent and add their own detail table.
|
||||
--
|
||||
-- `registered_by_principal` captures the admin user that registered
|
||||
-- the trigger. The dispatcher resolves this back to a `Principal` at
|
||||
-- execution time so the trigger runs as the user that set it up
|
||||
-- (design notes §4: "a trigger execution runs as the principal that
|
||||
-- registered the trigger").
|
||||
--
|
||||
-- HTTP routes stay in their own `routes` table for now (Phase 3
|
||||
-- production schema with its own trie-index columns); the dispatcher
|
||||
-- discriminates HTTP outbox rows by `source_kind = 'http'` and
|
||||
-- `trigger_id` referencing `routes.id`. Folding routes into triggers
|
||||
-- is a v1.2 cleanup, not a v1.1.1 requirement.
|
||||
|
||||
CREATE TABLE triggers (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
|
||||
script_id UUID NOT NULL REFERENCES scripts(id) ON DELETE CASCADE,
|
||||
kind TEXT NOT NULL CHECK (kind IN ('kv', 'dead_letter')),
|
||||
enabled BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
-- Async by default — sync would mean the trigger fires inline with
|
||||
-- the originating mutation, which v1.1.1 doesn't support.
|
||||
dispatch_mode TEXT NOT NULL DEFAULT 'async'
|
||||
CHECK (dispatch_mode IN ('sync', 'async')),
|
||||
-- Defaults applied at write time so the row is auditable on its
|
||||
-- own. Per-trigger overrides set on create; the env-defined
|
||||
-- defaults provide the fallback values.
|
||||
retry_max_attempts INT NOT NULL,
|
||||
retry_backoff TEXT NOT NULL
|
||||
CHECK (retry_backoff IN ('exponential', 'linear', 'constant')),
|
||||
retry_base_ms INT NOT NULL,
|
||||
registered_by_principal UUID NOT NULL REFERENCES admin_users(id) ON DELETE CASCADE,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- The dispatcher's hot lookup: "all enabled triggers for app X of
|
||||
-- kind Y". Indexed only when enabled = TRUE so disabled rows don't
|
||||
-- pollute the index.
|
||||
CREATE INDEX idx_triggers_app_kind_enabled
|
||||
ON triggers (app_id, kind)
|
||||
WHERE enabled = TRUE;
|
||||
|
||||
-- One row per KV trigger. `collection_glob` accepts:
|
||||
-- "*" — any collection in the app
|
||||
-- "widgets" — exact match
|
||||
-- "users:*" — prefix wildcard (matched in Rust, not SQL)
|
||||
-- `ops` is the subset of {insert, update, delete} this trigger
|
||||
-- subscribes to. Empty array means "any op" (the trigger fires on
|
||||
-- every mutation; admin endpoint validates this).
|
||||
CREATE TABLE kv_trigger_details (
|
||||
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
|
||||
collection_glob TEXT NOT NULL,
|
||||
ops TEXT[] NOT NULL
|
||||
);
|
||||
|
||||
-- One row per dead-letter trigger. All three filter columns are
|
||||
-- nullable — NULL means "no filter on this dimension". A trigger
|
||||
-- with all three nullable filters fires on every dead-letter row.
|
||||
CREATE TABLE dead_letter_trigger_details (
|
||||
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
|
||||
source_filter TEXT,
|
||||
trigger_id_filter UUID,
|
||||
script_id_filter UUID
|
||||
);
|
||||
64
crates/manager-core/migrations/0009_outbox.sql
Normal file
64
crates/manager-core/migrations/0009_outbox.sql
Normal file
@@ -0,0 +1,64 @@
|
||||
-- v1.1.1: Universal trigger outbox — design notes §2.
|
||||
--
|
||||
-- One table for every async dispatch in the system. KV/cron/pubsub/
|
||||
-- queue/email/dead-letter all write rows in this shape; the dispatcher
|
||||
-- claims due rows with `FOR UPDATE SKIP LOCKED` and routes them to
|
||||
-- the executor.
|
||||
--
|
||||
-- Sync HTTP also writes here (NATS-style inbox, design notes §3) —
|
||||
-- `reply_to` carries an `inbox_id` that the orchestrator awaits on a
|
||||
-- oneshot channel. `reply_to.is_some()` is the "don't retry" signal:
|
||||
-- one attempt, surface the result via the inbox.
|
||||
--
|
||||
-- `trigger_id` is a polymorphic reference discriminated by
|
||||
-- `source_kind`: for `source_kind='http'` it references `routes.id`;
|
||||
-- otherwise it references `triggers.id`. Polymorphism handled in
|
||||
-- Rust (the dispatcher); no DB-level FK because Postgres doesn't
|
||||
-- support polymorphic FKs cleanly. NULL is allowed because direct
|
||||
-- admin-replay paths may not have a triggering row at all.
|
||||
--
|
||||
-- `script_id` denormalized so the dispatcher resolves the target
|
||||
-- script without an extra round-trip per row.
|
||||
|
||||
CREATE TABLE outbox (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
|
||||
source_kind TEXT NOT NULL
|
||||
CHECK (source_kind IN ('http', 'kv', 'dead_letter')),
|
||||
-- Polymorphic — see comment above. No FK constraint.
|
||||
trigger_id UUID,
|
||||
-- Pre-resolved at write time so the dispatcher doesn't re-look it up.
|
||||
script_id UUID,
|
||||
-- NULL = async (retry per policy). Some(inbox_id) = sync HTTP
|
||||
-- (never retry; resolve the inbox with the result).
|
||||
reply_to UUID,
|
||||
-- ServiceEvent + ExecRequest scaffold serialized as JSONB.
|
||||
payload JSONB NOT NULL,
|
||||
-- Forensic field — the principal that triggered the originating
|
||||
-- event. NOT the execution principal for trigger fan-out (that
|
||||
-- comes from `triggers.registered_by_principal`).
|
||||
origin_principal UUID,
|
||||
-- Trigger-depth as the dispatcher will hand it to the executor.
|
||||
-- Read out into ExecRequest.trigger_depth at dispatch time.
|
||||
trigger_depth INT NOT NULL DEFAULT 0,
|
||||
-- Originating execution id (for audit log grouping). Equals the
|
||||
-- root for direct invocations; preserved across fan-out chains.
|
||||
root_execution_id UUID,
|
||||
attempt_count INT NOT NULL DEFAULT 0,
|
||||
next_attempt_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
-- Set inside the SELECT FOR UPDATE SKIP LOCKED transaction so
|
||||
-- the dispatcher can't double-pick a row across concurrent loop
|
||||
-- iterations.
|
||||
claimed_at TIMESTAMPTZ,
|
||||
claimed_by TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Hot index: the dispatcher's `WHERE next_attempt_at <= NOW() AND
|
||||
-- claimed_at IS NULL` claim query. Partial index keeps the hot set
|
||||
-- small even if the table grows large.
|
||||
CREATE INDEX idx_outbox_due
|
||||
ON outbox (next_attempt_at)
|
||||
WHERE claimed_at IS NULL;
|
||||
|
||||
CREATE INDEX idx_outbox_app ON outbox (app_id);
|
||||
50
crates/manager-core/migrations/0010_dead_letters.sql
Normal file
50
crates/manager-core/migrations/0010_dead_letters.sql
Normal file
@@ -0,0 +1,50 @@
|
||||
-- v1.1.1: dead_letters — design notes §4.
|
||||
--
|
||||
-- Async invocations that exhaust their retry policy land here. Each
|
||||
-- row carries the original event payload verbatim plus the attempt
|
||||
-- history so handlers (registered via `dead_letter` triggers) and the
|
||||
-- dashboard can decide what to do.
|
||||
--
|
||||
-- Schema mirrors design notes §4. The CHECK constraint on
|
||||
-- `resolution` enforces the closed vocabulary used by both the SDK
|
||||
-- (`dead_letters::resolve(id, reason)`) and the recursion-stop rule
|
||||
-- (`handler_failed`). Sync HTTP failures (`reply_to.is_some()`) never
|
||||
-- land here — they're served via the inbox channel.
|
||||
--
|
||||
-- Indexes:
|
||||
-- - partial index on unresolved rows: the dashboard's
|
||||
-- unresolved-count badge query (`COUNT(*) WHERE app_id = $1 AND
|
||||
-- resolved_at IS NULL`).
|
||||
-- - GC index on `created_at`: the weekly retention sweep.
|
||||
|
||||
CREATE TABLE dead_letters (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
|
||||
-- The outbox.id row that exhausted retries. The outbox row itself
|
||||
-- has been deleted at this point.
|
||||
original_event_id UUID NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
op TEXT NOT NULL,
|
||||
-- Nullable because direct admin replays may have no trigger row.
|
||||
trigger_id UUID,
|
||||
script_id UUID,
|
||||
payload JSONB NOT NULL,
|
||||
attempt_count INT NOT NULL,
|
||||
first_attempt_at TIMESTAMPTZ NOT NULL,
|
||||
last_attempt_at TIMESTAMPTZ NOT NULL,
|
||||
last_error TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
resolved_at TIMESTAMPTZ,
|
||||
resolution TEXT
|
||||
CHECK (resolution IN
|
||||
('replayed', 'ignored', 'handled_by_script', 'handler_failed'))
|
||||
);
|
||||
|
||||
-- Dashboard unresolved-count badge — partial index on the predicate
|
||||
-- the query uses.
|
||||
CREATE INDEX idx_dead_letters_app_unresolved
|
||||
ON dead_letters (app_id)
|
||||
WHERE resolved_at IS NULL;
|
||||
|
||||
-- GC sweep scans by creation time.
|
||||
CREATE INDEX idx_dead_letters_gc ON dead_letters (created_at);
|
||||
31
crates/manager-core/migrations/0011_abandoned_executions.sql
Normal file
31
crates/manager-core/migrations/0011_abandoned_executions.sql
Normal file
@@ -0,0 +1,31 @@
|
||||
-- v1.1.1: abandoned_executions — design notes §3 #9.
|
||||
--
|
||||
-- Forensic table for the "dispatcher tried to resolve a oneshot inbox
|
||||
-- but the receiver was already dropped" edge case. The orchestrator
|
||||
-- timed out (returned 504 to the caller) and gave up on the channel,
|
||||
-- but then the dispatcher's execution succeeded later. The caller
|
||||
-- never sees the result; the row exists so the operator can
|
||||
-- correlate when the abandoned-counter metric spikes.
|
||||
--
|
||||
-- Only the dispatcher-after-orchestrator-timeout edge case writes
|
||||
-- here; ordinary "script timed out, caller got 504" stays uneventful.
|
||||
--
|
||||
-- 7-day retention, GC by `created_at`, sweep alongside dead_letters.
|
||||
|
||||
CREATE TABLE abandoned_executions (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
|
||||
-- Original outbox row id (the row itself has been deleted).
|
||||
outbox_id UUID NOT NULL,
|
||||
script_id UUID,
|
||||
-- The inbox channel id the dispatcher tried to resolve.
|
||||
inbox_id UUID NOT NULL,
|
||||
-- The HTTP status code the dispatcher attempted to send back.
|
||||
status_code INT NOT NULL,
|
||||
-- Truncated body / error description (capped at write time —
|
||||
-- the dispatcher doesn't need to ship megabytes here).
|
||||
result_summary TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_abandoned_executions_gc ON abandoned_executions (created_at);
|
||||
128
crates/manager-core/src/abandoned_repo.rs
Normal file
128
crates/manager-core/src/abandoned_repo.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
//! `AbandonedExecutionsRepo` — forensic table written by the
|
||||
//! dispatcher when it tries to resolve a sync-HTTP inbox channel
|
||||
//! that's already been dropped (orchestrator timed out and gave up).
|
||||
//!
|
||||
//! Schema: see `migrations/0011_abandoned_executions.sql`.
|
||||
//!
|
||||
//! Tiny surface: insert + GC. Reading happens via direct SQL when
|
||||
//! correlating the metric counter spike.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use picloud_shared::{AppId, ScriptId};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum AbandonedRepoError {
|
||||
#[error("database error: {0}")]
|
||||
Db(#[from] sqlx::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewAbandonedExecution {
|
||||
pub app_id: AppId,
|
||||
pub outbox_id: Uuid,
|
||||
pub script_id: Option<ScriptId>,
|
||||
pub inbox_id: Uuid,
|
||||
pub status_code: u16,
|
||||
pub result_summary: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AbandonedRepo: Send + Sync {
|
||||
async fn insert(&self, row: NewAbandonedExecution) -> Result<Uuid, AbandonedRepoError>;
|
||||
|
||||
/// Retention sweep — deletes rows older than `older_than` up to
|
||||
/// `limit` at a time.
|
||||
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, AbandonedRepoError>;
|
||||
}
|
||||
|
||||
pub struct PostgresAbandonedRepo {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresAbandonedRepo {
|
||||
#[must_use]
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
const SUMMARY_CAP_BYTES: usize = 4096;
|
||||
|
||||
#[async_trait]
|
||||
impl AbandonedRepo for PostgresAbandonedRepo {
|
||||
async fn insert(&self, row: NewAbandonedExecution) -> Result<Uuid, AbandonedRepoError> {
|
||||
// Truncate the summary at write-time. The forensic table
|
||||
// doesn't need megabytes; the original outbox row may have
|
||||
// been arbitrary size but we lose nothing useful by clipping.
|
||||
let summary = row.result_summary.map(|s| truncate(s, SUMMARY_CAP_BYTES));
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
"INSERT INTO abandoned_executions ( \
|
||||
app_id, outbox_id, script_id, inbox_id, status_code, result_summary \
|
||||
) VALUES ($1, $2, $3, $4, $5, $6) \
|
||||
RETURNING id",
|
||||
)
|
||||
.bind(row.app_id.into_inner())
|
||||
.bind(row.outbox_id)
|
||||
.bind(row.script_id.map(ScriptId::into_inner))
|
||||
.bind(row.inbox_id)
|
||||
.bind(i32::from(row.status_code))
|
||||
.bind(summary)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, AbandonedRepoError> {
|
||||
let res = sqlx::query(
|
||||
"DELETE FROM abandoned_executions \
|
||||
WHERE id IN ( \
|
||||
SELECT id FROM abandoned_executions \
|
||||
WHERE created_at < $1 \
|
||||
FOR UPDATE SKIP LOCKED \
|
||||
LIMIT $2 \
|
||||
)",
|
||||
)
|
||||
.bind(older_than)
|
||||
.bind(limit)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(res.rows_affected())
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate(mut s: String, max_bytes: usize) -> String {
|
||||
if s.len() <= max_bytes {
|
||||
return s;
|
||||
}
|
||||
// Walk back from `max_bytes` to a UTF-8 char boundary so we never
|
||||
// panic on `truncate` mid-codepoint.
|
||||
let mut cut = max_bytes;
|
||||
while cut > 0 && !s.is_char_boundary(cut) {
|
||||
cut -= 1;
|
||||
}
|
||||
s.truncate(cut);
|
||||
s
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn truncate_respects_char_boundaries() {
|
||||
// 3-byte UTF-8 chars; cap inside the middle char should walk
|
||||
// back to the start.
|
||||
let s = "héllo".to_string();
|
||||
let t = truncate(s, 2);
|
||||
assert!(t.is_char_boundary(t.len()));
|
||||
assert_eq!(t, "h");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_passthrough_for_short_strings() {
|
||||
assert_eq!(truncate("ok".into(), 100), "ok");
|
||||
}
|
||||
}
|
||||
@@ -64,6 +64,14 @@ pub enum Capability {
|
||||
/// Write entries to this app's KV store (v1.1.1). Granted to
|
||||
/// `editor`+. Maps to `script:write` on API keys.
|
||||
AppKvWrite(AppId),
|
||||
/// Create / list / delete triggers for this app (v1.1.1). Maps to
|
||||
/// `app:admin` on API keys — triggers are app-configuration acts
|
||||
/// rather than data-plane access. Granted to `app_admin`+.
|
||||
AppManageTriggers(AppId),
|
||||
/// Replay / resolve dead-letter rows for this app (v1.1.1). Maps
|
||||
/// to `app:admin` on API keys. Public-HTTP scripts (principal None)
|
||||
/// fail this check — managing dead letters is an admin act.
|
||||
AppDeadLetterManage(AppId),
|
||||
}
|
||||
|
||||
impl Capability {
|
||||
@@ -82,7 +90,9 @@ impl Capability {
|
||||
| Self::AppAdmin(id)
|
||||
| Self::AppLogRead(id)
|
||||
| Self::AppKvRead(id)
|
||||
| Self::AppKvWrite(id) => Some(id),
|
||||
| Self::AppKvWrite(id)
|
||||
| Self::AppManageTriggers(id)
|
||||
| Self::AppDeadLetterManage(id) => Some(id),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +111,9 @@ impl Capability {
|
||||
Self::AppWriteScript(_) | Self::AppKvWrite(_) => Scope::ScriptWrite,
|
||||
Self::AppWriteRoute(_) => Scope::RouteWrite,
|
||||
Self::AppManageDomains(_) => Scope::DomainManage,
|
||||
Self::AppAdmin(_) => Scope::AppAdmin,
|
||||
Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => {
|
||||
Scope::AppAdmin
|
||||
}
|
||||
Self::AppLogRead(_) => Scope::LogRead,
|
||||
}
|
||||
}
|
||||
@@ -253,7 +265,10 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
|
||||
let in_app_admin = in_editor
|
||||
|| matches!(
|
||||
cap,
|
||||
Capability::AppManageDomains(_) | Capability::AppAdmin(_)
|
||||
Capability::AppManageDomains(_)
|
||||
| Capability::AppAdmin(_)
|
||||
| Capability::AppManageTriggers(_)
|
||||
| Capability::AppDeadLetterManage(_)
|
||||
);
|
||||
match role {
|
||||
AppRole::Viewer => in_viewer,
|
||||
|
||||
261
crates/manager-core/src/dead_letter_repo.rs
Normal file
261
crates/manager-core/src/dead_letter_repo.rs
Normal file
@@ -0,0 +1,261 @@
|
||||
//! `DeadLetterRepo` — CRUD over the `dead_letters` table.
|
||||
//!
|
||||
//! The dispatcher writes new rows when an async trigger exhausts its
|
||||
//! retry policy. Admin endpoints (commit 8) read for the dashboard
|
||||
//! list view and write to mark rows resolved or replay them. The GC
|
||||
//! sweeper (commit 10) deletes expired rows by `created_at`.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use picloud_shared::{AppId, DeadLetterId, ScriptId, TriggerId};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeadLetterRepoError {
|
||||
#[error("database error: {0}")]
|
||||
Db(#[from] sqlx::Error),
|
||||
|
||||
#[error("dead-letter row not found: {0}")]
|
||||
NotFound(DeadLetterId),
|
||||
|
||||
#[error("invalid resolution {0:?}")]
|
||||
InvalidResolution(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewDeadLetter {
|
||||
pub app_id: AppId,
|
||||
/// `outbox.id` that exhausted retries. Outbox row deleted at the
|
||||
/// same time.
|
||||
pub original_event_id: Uuid,
|
||||
pub source: String,
|
||||
pub op: String,
|
||||
pub trigger_id: Option<TriggerId>,
|
||||
pub script_id: Option<ScriptId>,
|
||||
pub payload: serde_json::Value,
|
||||
pub attempt_count: u32,
|
||||
pub first_attempt_at: DateTime<Utc>,
|
||||
pub last_attempt_at: DateTime<Utc>,
|
||||
pub last_error: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeadLetterRow {
|
||||
pub id: DeadLetterId,
|
||||
pub app_id: AppId,
|
||||
pub original_event_id: Uuid,
|
||||
pub source: String,
|
||||
pub op: String,
|
||||
pub trigger_id: Option<TriggerId>,
|
||||
pub script_id: Option<ScriptId>,
|
||||
pub payload: serde_json::Value,
|
||||
pub attempt_count: u32,
|
||||
pub first_attempt_at: DateTime<Utc>,
|
||||
pub last_attempt_at: DateTime<Utc>,
|
||||
pub last_error: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub resolved_at: Option<DateTime<Utc>>,
|
||||
pub resolution: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DeadLetterRepo: Send + Sync {
|
||||
/// Insert a new dead-letter row. Returns the assigned id.
|
||||
async fn insert(&self, row: NewDeadLetter) -> Result<DeadLetterId, DeadLetterRepoError>;
|
||||
|
||||
async fn get(&self, id: DeadLetterId) -> Result<Option<DeadLetterRow>, DeadLetterRepoError>;
|
||||
|
||||
/// Lookup for the dashboard list view. `unresolved_only=true`
|
||||
/// filters to `resolved_at IS NULL`.
|
||||
async fn list_for_app(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
unresolved_only: bool,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<DeadLetterRow>, DeadLetterRepoError>;
|
||||
|
||||
/// Hot path for the dashboard's per-app unresolved-count badge.
|
||||
async fn unresolved_count(&self, app_id: AppId) -> Result<i64, DeadLetterRepoError>;
|
||||
|
||||
/// Mark the row resolved with the given reason. The reason MUST
|
||||
/// be one of the four CHECK-constraint values
|
||||
/// (`replayed`, `ignored`, `handled_by_script`, `handler_failed`).
|
||||
async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError>;
|
||||
|
||||
/// Retention sweep. Deletes rows with `created_at < older_than`
|
||||
/// up to `limit` at a time, using FOR UPDATE SKIP LOCKED to play
|
||||
/// nicely with concurrent dispatchers. Returns the count deleted.
|
||||
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, DeadLetterRepoError>;
|
||||
}
|
||||
|
||||
pub struct PostgresDeadLetterRepo {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresDeadLetterRepo {
|
||||
#[must_use]
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
const ALLOWED_RESOLUTIONS: &[&str] =
|
||||
&["replayed", "ignored", "handled_by_script", "handler_failed"];
|
||||
|
||||
#[async_trait]
|
||||
impl DeadLetterRepo for PostgresDeadLetterRepo {
|
||||
async fn insert(&self, row: NewDeadLetter) -> Result<DeadLetterId, DeadLetterRepoError> {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
"INSERT INTO dead_letters ( \
|
||||
app_id, original_event_id, source, op, trigger_id, script_id, \
|
||||
payload, attempt_count, first_attempt_at, last_attempt_at, last_error \
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) \
|
||||
RETURNING id",
|
||||
)
|
||||
.bind(row.app_id.into_inner())
|
||||
.bind(row.original_event_id)
|
||||
.bind(row.source)
|
||||
.bind(row.op)
|
||||
.bind(row.trigger_id.map(TriggerId::into_inner))
|
||||
.bind(row.script_id.map(ScriptId::into_inner))
|
||||
.bind(row.payload)
|
||||
.bind(i32::try_from(row.attempt_count).unwrap_or(0))
|
||||
.bind(row.first_attempt_at)
|
||||
.bind(row.last_attempt_at)
|
||||
.bind(row.last_error)
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(id.into())
|
||||
}
|
||||
|
||||
async fn get(&self, id: DeadLetterId) -> Result<Option<DeadLetterRow>, DeadLetterRepoError> {
|
||||
let row: Option<DeadLetterRowRaw> = sqlx::query_as(
|
||||
"SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \
|
||||
payload, attempt_count, first_attempt_at, last_attempt_at, \
|
||||
last_error, created_at, resolved_at, resolution \
|
||||
FROM dead_letters WHERE id = $1",
|
||||
)
|
||||
.bind(id.into_inner())
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
Ok(row.map(DeadLetterRowRaw::into_row))
|
||||
}
|
||||
|
||||
async fn list_for_app(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
unresolved_only: bool,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<DeadLetterRow>, DeadLetterRepoError> {
|
||||
let rows: Vec<DeadLetterRowRaw> = sqlx::query_as(
|
||||
"SELECT id, app_id, original_event_id, source, op, trigger_id, script_id, \
|
||||
payload, attempt_count, first_attempt_at, last_attempt_at, \
|
||||
last_error, created_at, resolved_at, resolution \
|
||||
FROM dead_letters \
|
||||
WHERE app_id = $1 \
|
||||
AND ($2::bool = FALSE OR resolved_at IS NULL) \
|
||||
ORDER BY created_at DESC \
|
||||
LIMIT $3 OFFSET $4",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(unresolved_only)
|
||||
.bind(limit)
|
||||
.bind(offset)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().map(DeadLetterRowRaw::into_row).collect())
|
||||
}
|
||||
|
||||
async fn unresolved_count(&self, app_id: AppId) -> Result<i64, DeadLetterRepoError> {
|
||||
let (count,): (i64,) = sqlx::query_as(
|
||||
"SELECT COUNT(*) FROM dead_letters \
|
||||
WHERE app_id = $1 AND resolved_at IS NULL",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
async fn resolve(&self, id: DeadLetterId, reason: &str) -> Result<(), DeadLetterRepoError> {
|
||||
if !ALLOWED_RESOLUTIONS.contains(&reason) {
|
||||
return Err(DeadLetterRepoError::InvalidResolution(reason.to_string()));
|
||||
}
|
||||
let res = sqlx::query(
|
||||
"UPDATE dead_letters \
|
||||
SET resolution = $2, resolved_at = NOW() \
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(id.into_inner())
|
||||
.bind(reason)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
if res.rows_affected() == 0 {
|
||||
return Err(DeadLetterRepoError::NotFound(id));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn gc(&self, older_than: DateTime<Utc>, limit: i64) -> Result<u64, DeadLetterRepoError> {
|
||||
// Tombstones picked under FOR UPDATE SKIP LOCKED so concurrent
|
||||
// sweepers (cluster mode) don't fight each other.
|
||||
let res = sqlx::query(
|
||||
"DELETE FROM dead_letters \
|
||||
WHERE id IN ( \
|
||||
SELECT id FROM dead_letters \
|
||||
WHERE created_at < $1 \
|
||||
FOR UPDATE SKIP LOCKED \
|
||||
LIMIT $2 \
|
||||
)",
|
||||
)
|
||||
.bind(older_than)
|
||||
.bind(limit)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(res.rows_affected())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct DeadLetterRowRaw {
|
||||
id: Uuid,
|
||||
app_id: Uuid,
|
||||
original_event_id: Uuid,
|
||||
source: String,
|
||||
op: String,
|
||||
trigger_id: Option<Uuid>,
|
||||
script_id: Option<Uuid>,
|
||||
payload: serde_json::Value,
|
||||
attempt_count: i32,
|
||||
first_attempt_at: DateTime<Utc>,
|
||||
last_attempt_at: DateTime<Utc>,
|
||||
last_error: String,
|
||||
created_at: DateTime<Utc>,
|
||||
resolved_at: Option<DateTime<Utc>>,
|
||||
resolution: Option<String>,
|
||||
}
|
||||
|
||||
impl DeadLetterRowRaw {
|
||||
fn into_row(self) -> DeadLetterRow {
|
||||
DeadLetterRow {
|
||||
id: self.id.into(),
|
||||
app_id: self.app_id.into(),
|
||||
original_event_id: self.original_event_id,
|
||||
source: self.source,
|
||||
op: self.op,
|
||||
trigger_id: self.trigger_id.map(Into::into),
|
||||
script_id: self.script_id.map(Into::into),
|
||||
payload: self.payload,
|
||||
attempt_count: u32::try_from(self.attempt_count).unwrap_or(0),
|
||||
first_attempt_at: self.first_attempt_at,
|
||||
last_attempt_at: self.last_attempt_at,
|
||||
last_error: self.last_error,
|
||||
created_at: self.created_at,
|
||||
resolved_at: self.resolved_at,
|
||||
resolution: self.resolution,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@
|
||||
//! the same DB for now; once we add caching and per-node ingress, the
|
||||
//! manager will publish change events.
|
||||
|
||||
pub mod abandoned_repo;
|
||||
pub mod admin_session_repo;
|
||||
pub mod admin_user_repo;
|
||||
pub mod admin_users_api;
|
||||
@@ -21,16 +22,23 @@ pub mod auth_api;
|
||||
pub mod auth_bootstrap;
|
||||
pub mod auth_middleware;
|
||||
pub mod authz;
|
||||
pub mod dead_letter_repo;
|
||||
pub mod kv_repo;
|
||||
pub mod kv_service;
|
||||
pub mod log_sink;
|
||||
pub mod migrations;
|
||||
pub mod outbox_repo;
|
||||
pub mod repo;
|
||||
pub mod route_admin;
|
||||
pub mod route_repo;
|
||||
pub mod sandbox;
|
||||
pub mod scheduler;
|
||||
pub mod trigger_config;
|
||||
pub mod trigger_repo;
|
||||
|
||||
pub use abandoned_repo::{
|
||||
AbandonedRepo, AbandonedRepoError, NewAbandonedExecution, PostgresAbandonedRepo,
|
||||
};
|
||||
pub use admin_session_repo::{
|
||||
AdminSessionLookup, AdminSessionRepository, AdminSessionRepositoryError,
|
||||
PostgresAdminSessionRepository,
|
||||
@@ -65,9 +73,15 @@ pub use auth_middleware::{
|
||||
API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE,
|
||||
};
|
||||
pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision};
|
||||
pub use dead_letter_repo::{
|
||||
DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo,
|
||||
};
|
||||
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
|
||||
pub use kv_service::KvServiceImpl;
|
||||
pub use log_sink::PostgresExecutionLogSink;
|
||||
pub use outbox_repo::{
|
||||
NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo,
|
||||
};
|
||||
pub use repo::{
|
||||
ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository,
|
||||
RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError,
|
||||
@@ -75,3 +89,9 @@ pub use repo::{
|
||||
pub use route_admin::{compile_routes, route_admin_router, RouteAdminState};
|
||||
pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository};
|
||||
pub use sandbox::{CeilingError, SandboxCeiling};
|
||||
pub use trigger_config::{BackoffShape, TriggerConfig};
|
||||
pub use trigger_repo::{
|
||||
collection_matches, CreateDeadLetterTrigger, CreateKvTrigger, DeadLetterTriggerMatch,
|
||||
KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind,
|
||||
TriggerRepo, TriggerRepoError,
|
||||
};
|
||||
|
||||
233
crates/manager-core/src/outbox_repo.rs
Normal file
233
crates/manager-core/src/outbox_repo.rs
Normal file
@@ -0,0 +1,233 @@
|
||||
//! `OutboxRepo` — universal trigger outbox CRUD. Hot writes come from
|
||||
//! the `OutboxEventEmitter` (KV mutations fan out via this) and the
|
||||
//! sync-HTTP path. Hot reads come from the dispatcher, which claims
|
||||
//! due rows via `FOR UPDATE SKIP LOCKED`.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use picloud_shared::{AdminUserId, AppId, ExecutionId, ScriptId, TriggerId};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum OutboxRepoError {
|
||||
#[error("database error: {0}")]
|
||||
Db(#[from] sqlx::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum OutboxSourceKind {
|
||||
Http,
|
||||
Kv,
|
||||
DeadLetter,
|
||||
}
|
||||
|
||||
impl OutboxSourceKind {
|
||||
#[must_use]
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Http => "http",
|
||||
Self::Kv => "kv",
|
||||
Self::DeadLetter => "dead_letter",
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn from_wire(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"http" => Some(Self::Http),
|
||||
"kv" => Some(Self::Kv),
|
||||
"dead_letter" => Some(Self::DeadLetter),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert payload — what each event source writes when fanning out
|
||||
/// to the outbox. `payload` is the serialized `TriggerEvent` (plus
|
||||
/// any extra context the dispatcher needs to reconstruct an
|
||||
/// `ExecRequest`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewOutboxRow {
|
||||
pub app_id: AppId,
|
||||
pub source_kind: OutboxSourceKind,
|
||||
pub trigger_id: Option<TriggerId>,
|
||||
pub script_id: Option<ScriptId>,
|
||||
pub reply_to: Option<Uuid>,
|
||||
pub payload: serde_json::Value,
|
||||
pub origin_principal: Option<AdminUserId>,
|
||||
pub trigger_depth: u32,
|
||||
pub root_execution_id: Option<ExecutionId>,
|
||||
}
|
||||
|
||||
/// Row as the dispatcher sees it after a claim.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OutboxRow {
|
||||
pub id: Uuid,
|
||||
pub app_id: AppId,
|
||||
pub source_kind: OutboxSourceKind,
|
||||
pub trigger_id: Option<TriggerId>,
|
||||
pub script_id: Option<ScriptId>,
|
||||
pub reply_to: Option<Uuid>,
|
||||
pub payload: serde_json::Value,
|
||||
pub origin_principal: Option<AdminUserId>,
|
||||
pub trigger_depth: u32,
|
||||
pub root_execution_id: Option<ExecutionId>,
|
||||
pub attempt_count: u32,
|
||||
pub next_attempt_at: DateTime<Utc>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait OutboxRepo: Send + Sync {
|
||||
async fn insert(&self, row: NewOutboxRow) -> Result<Uuid, OutboxRepoError>;
|
||||
|
||||
/// Claim up to `limit` due rows. Wraps the claim in a single
|
||||
/// transaction so two concurrent dispatchers (cluster mode) can't
|
||||
/// double-pick a row. Empty Vec when nothing is due.
|
||||
async fn claim_due(
|
||||
&self,
|
||||
claimed_by: &str,
|
||||
limit: i64,
|
||||
) -> Result<Vec<OutboxRow>, OutboxRepoError>;
|
||||
|
||||
/// Remove a row after a terminal outcome (success or dead-letter).
|
||||
async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError>;
|
||||
|
||||
/// Failure path: bump attempt_count, clear the claim, set the
|
||||
/// next attempt time. The dispatcher computes the delay (with
|
||||
/// backoff + jitter) and passes it in.
|
||||
async fn reschedule(
|
||||
&self,
|
||||
id: Uuid,
|
||||
attempt_count: u32,
|
||||
next_attempt_at: DateTime<Utc>,
|
||||
) -> Result<(), OutboxRepoError>;
|
||||
}
|
||||
|
||||
pub struct PostgresOutboxRepo {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresOutboxRepo {
|
||||
#[must_use]
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl OutboxRepo for PostgresOutboxRepo {
|
||||
async fn insert(&self, row: NewOutboxRow) -> Result<Uuid, OutboxRepoError> {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
"INSERT INTO outbox ( \
|
||||
app_id, source_kind, trigger_id, script_id, reply_to, \
|
||||
payload, origin_principal, trigger_depth, root_execution_id \
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
|
||||
RETURNING id",
|
||||
)
|
||||
.bind(row.app_id.into_inner())
|
||||
.bind(row.source_kind.as_str())
|
||||
.bind(row.trigger_id.map(TriggerId::into_inner))
|
||||
.bind(row.script_id.map(ScriptId::into_inner))
|
||||
.bind(row.reply_to)
|
||||
.bind(row.payload)
|
||||
.bind(row.origin_principal.map(AdminUserId::into_inner))
|
||||
.bind(i32::try_from(row.trigger_depth).unwrap_or(0))
|
||||
.bind(row.root_execution_id.map(ExecutionId::into_inner))
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn claim_due(
|
||||
&self,
|
||||
claimed_by: &str,
|
||||
limit: i64,
|
||||
) -> Result<Vec<OutboxRow>, OutboxRepoError> {
|
||||
let rows: Vec<OutboxRowRaw> = sqlx::query_as(
|
||||
"WITH due AS ( \
|
||||
SELECT id FROM outbox \
|
||||
WHERE claimed_at IS NULL AND next_attempt_at <= NOW() \
|
||||
ORDER BY next_attempt_at \
|
||||
FOR UPDATE SKIP LOCKED \
|
||||
LIMIT $1 \
|
||||
) \
|
||||
UPDATE outbox SET claimed_at = NOW(), claimed_by = $2 \
|
||||
WHERE id IN (SELECT id FROM due) \
|
||||
RETURNING id, app_id, source_kind, trigger_id, script_id, reply_to, \
|
||||
payload, origin_principal, trigger_depth, \
|
||||
root_execution_id, attempt_count, next_attempt_at, created_at",
|
||||
)
|
||||
.bind(limit)
|
||||
.bind(claimed_by)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(rows.into_iter().filter_map(OutboxRowRaw::hydrate).collect())
|
||||
}
|
||||
|
||||
async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError> {
|
||||
sqlx::query("DELETE FROM outbox WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn reschedule(
|
||||
&self,
|
||||
id: Uuid,
|
||||
attempt_count: u32,
|
||||
next_attempt_at: DateTime<Utc>,
|
||||
) -> Result<(), OutboxRepoError> {
|
||||
sqlx::query(
|
||||
"UPDATE outbox SET attempt_count = $2, next_attempt_at = $3, \
|
||||
claimed_at = NULL, claimed_by = NULL \
|
||||
WHERE id = $1",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(i32::try_from(attempt_count).unwrap_or(0))
|
||||
.bind(next_attempt_at)
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct OutboxRowRaw {
|
||||
id: Uuid,
|
||||
app_id: Uuid,
|
||||
source_kind: String,
|
||||
trigger_id: Option<Uuid>,
|
||||
script_id: Option<Uuid>,
|
||||
reply_to: Option<Uuid>,
|
||||
payload: serde_json::Value,
|
||||
origin_principal: Option<Uuid>,
|
||||
trigger_depth: i32,
|
||||
root_execution_id: Option<Uuid>,
|
||||
attempt_count: i32,
|
||||
next_attempt_at: DateTime<Utc>,
|
||||
created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl OutboxRowRaw {
|
||||
fn hydrate(self) -> Option<OutboxRow> {
|
||||
Some(OutboxRow {
|
||||
id: self.id,
|
||||
app_id: self.app_id.into(),
|
||||
source_kind: OutboxSourceKind::from_wire(&self.source_kind)?,
|
||||
trigger_id: self.trigger_id.map(Into::into),
|
||||
script_id: self.script_id.map(Into::into),
|
||||
reply_to: self.reply_to,
|
||||
payload: self.payload,
|
||||
origin_principal: self.origin_principal.map(Into::into),
|
||||
trigger_depth: u32::try_from(self.trigger_depth).unwrap_or(0),
|
||||
root_execution_id: self.root_execution_id.map(Into::into),
|
||||
attempt_count: u32::try_from(self.attempt_count).unwrap_or(0),
|
||||
next_attempt_at: self.next_attempt_at,
|
||||
created_at: self.created_at,
|
||||
})
|
||||
}
|
||||
}
|
||||
157
crates/manager-core/src/trigger_config.rs
Normal file
157
crates/manager-core/src/trigger_config.rs
Normal file
@@ -0,0 +1,157 @@
|
||||
//! Trigger-framework tunables. Defaults match design notes §3 (retry
|
||||
//! policy) and §4 (retention). Each knob is env-overridable via a
|
||||
//! `PICLOUD_*` variable following the same `tracing::warn` on parse
|
||||
//! error pattern `SandboxCeiling::from_env` uses.
|
||||
|
||||
use std::env;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum BackoffShape {
|
||||
Exponential,
|
||||
Linear,
|
||||
Constant,
|
||||
}
|
||||
|
||||
impl BackoffShape {
|
||||
#[must_use]
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Exponential => "exponential",
|
||||
Self::Linear => "linear",
|
||||
Self::Constant => "constant",
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn from_wire(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"exponential" => Some(Self::Exponential),
|
||||
"linear" => Some(Self::Linear),
|
||||
"constant" => Some(Self::Constant),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct TriggerConfig {
|
||||
/// Maximum `cx.trigger_depth` before the dispatcher refuses
|
||||
/// execution. Above this, the row is dropped + a metric bumped;
|
||||
/// it is NOT dead-lettered (design notes §4: depth-exceeded
|
||||
/// means "you built a loop"). Default 8.
|
||||
pub max_trigger_depth: u32,
|
||||
|
||||
/// Default retry attempts (per-trigger override on the row).
|
||||
pub retry_max_attempts: u32,
|
||||
pub retry_backoff: BackoffShape,
|
||||
pub retry_base_ms: u32,
|
||||
/// ±jitter as a percentage of the computed delay. Applied at
|
||||
/// dispatch time — not per-trigger.
|
||||
pub retry_jitter_pct: u32,
|
||||
|
||||
/// dead-letter retention before GC, in days. Default 30.
|
||||
pub dead_letter_retention_days: u32,
|
||||
/// abandoned-execution retention before GC, in days. Default 7.
|
||||
pub abandoned_retention_days: u32,
|
||||
}
|
||||
|
||||
impl TriggerConfig {
|
||||
#[must_use]
|
||||
pub const fn conservative() -> Self {
|
||||
Self {
|
||||
max_trigger_depth: 8,
|
||||
retry_max_attempts: 3,
|
||||
retry_backoff: BackoffShape::Exponential,
|
||||
retry_base_ms: 1000,
|
||||
retry_jitter_pct: 20,
|
||||
dead_letter_retention_days: 30,
|
||||
abandoned_retention_days: 7,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn from_env() -> Self {
|
||||
let mut c = Self::conservative();
|
||||
load_u32(&mut c.max_trigger_depth, "PICLOUD_MAX_TRIGGER_DEPTH");
|
||||
load_u32(
|
||||
&mut c.retry_max_attempts,
|
||||
"PICLOUD_TRIGGER_RETRY_MAX_ATTEMPTS",
|
||||
);
|
||||
load_backoff(&mut c.retry_backoff, "PICLOUD_TRIGGER_RETRY_BACKOFF");
|
||||
load_u32(&mut c.retry_base_ms, "PICLOUD_TRIGGER_RETRY_BASE_MS");
|
||||
load_u32(&mut c.retry_jitter_pct, "PICLOUD_TRIGGER_RETRY_JITTER_PCT");
|
||||
load_u32(
|
||||
&mut c.dead_letter_retention_days,
|
||||
"PICLOUD_DEAD_LETTER_RETENTION_DAYS",
|
||||
);
|
||||
load_u32(
|
||||
&mut c.abandoned_retention_days,
|
||||
"PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS",
|
||||
);
|
||||
c
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TriggerConfig {
|
||||
fn default() -> Self {
|
||||
Self::conservative()
|
||||
}
|
||||
}
|
||||
|
||||
fn load_u32(dst: &mut u32, key: &str) {
|
||||
if let Ok(v) = env::var(key) {
|
||||
match v.parse::<u32>() {
|
||||
Ok(n) => *dst = n,
|
||||
Err(e) => {
|
||||
tracing::warn!(env = key, error = %e, "ignoring invalid trigger-config value");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn load_backoff(dst: &mut BackoffShape, key: &str) {
|
||||
if let Ok(v) = env::var(key) {
|
||||
match BackoffShape::from_wire(&v) {
|
||||
Some(b) => *dst = b,
|
||||
None => {
|
||||
tracing::warn!(
|
||||
env = key,
|
||||
value = %v,
|
||||
"ignoring invalid trigger-config backoff shape (use exponential|linear|constant)"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn conservative_defaults_match_design_notes() {
|
||||
let c = TriggerConfig::conservative();
|
||||
assert_eq!(c.max_trigger_depth, 8);
|
||||
assert_eq!(c.retry_max_attempts, 3);
|
||||
assert_eq!(c.retry_backoff, BackoffShape::Exponential);
|
||||
assert_eq!(c.retry_base_ms, 1000);
|
||||
assert_eq!(c.retry_jitter_pct, 20);
|
||||
assert_eq!(c.dead_letter_retention_days, 30);
|
||||
assert_eq!(c.abandoned_retention_days, 7);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backoff_round_trips() {
|
||||
for shape in [
|
||||
BackoffShape::Exponential,
|
||||
BackoffShape::Linear,
|
||||
BackoffShape::Constant,
|
||||
] {
|
||||
assert_eq!(BackoffShape::from_wire(shape.as_str()), Some(shape));
|
||||
}
|
||||
assert_eq!(BackoffShape::from_wire("garbage"), None);
|
||||
}
|
||||
}
|
||||
617
crates/manager-core/src/trigger_repo.rs
Normal file
617
crates/manager-core/src/trigger_repo.rs
Normal file
@@ -0,0 +1,617 @@
|
||||
//! `TriggerRepo` — CRUD over the `triggers` parent + per-kind detail
|
||||
//! tables. The admin endpoints (commit 4) sit on top of this; the
|
||||
//! dispatcher (commit 5) reads `list_matching_*` to fan out events to
|
||||
//! handler scripts.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use picloud_shared::{AdminUserId, AppId, KvEventOp, ScriptId, TriggerId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::trigger_config::BackoffShape;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TriggerRepoError {
|
||||
#[error("database error: {0}")]
|
||||
Db(#[from] sqlx::Error),
|
||||
|
||||
#[error("trigger not found: {0}")]
|
||||
NotFound(TriggerId),
|
||||
|
||||
#[error("invalid trigger payload: {0}")]
|
||||
Invalid(String),
|
||||
}
|
||||
|
||||
/// Parent-table row plus the per-kind detail merged in. Serialized
|
||||
/// back to admin clients via the JSON API.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Trigger {
|
||||
pub id: TriggerId,
|
||||
pub app_id: AppId,
|
||||
pub script_id: ScriptId,
|
||||
pub kind: TriggerKind,
|
||||
pub enabled: bool,
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
pub retry_max_attempts: u32,
|
||||
pub retry_backoff: BackoffShape,
|
||||
pub retry_base_ms: u32,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub details: TriggerDetails,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TriggerKind {
|
||||
Kv,
|
||||
DeadLetter,
|
||||
}
|
||||
|
||||
impl TriggerKind {
|
||||
#[must_use]
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Kv => "kv",
|
||||
Self::DeadLetter => "dead_letter",
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn from_wire(s: &str) -> Option<Self> {
|
||||
match s {
|
||||
"kv" => Some(Self::Kv),
|
||||
"dead_letter" => Some(Self::DeadLetter),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TriggerDispatchMode {
|
||||
Sync,
|
||||
Async,
|
||||
}
|
||||
|
||||
impl TriggerDispatchMode {
|
||||
#[must_use]
|
||||
pub const fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Sync => "sync",
|
||||
Self::Async => "async",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum TriggerDetails {
|
||||
Kv {
|
||||
collection_glob: String,
|
||||
ops: Vec<KvEventOp>,
|
||||
},
|
||||
DeadLetter {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
source_filter: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
trigger_id_filter: Option<TriggerId>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
script_id_filter: Option<ScriptId>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Create payload for a KV trigger. Defaults applied at the admin
|
||||
/// layer (uses `TriggerConfig::from_env` to fill retry settings if
|
||||
/// the request omitted them — keeps the row auditable).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateKvTrigger {
|
||||
pub script_id: ScriptId,
|
||||
pub collection_glob: String,
|
||||
pub ops: Vec<KvEventOp>,
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
pub retry_max_attempts: u32,
|
||||
pub retry_backoff: BackoffShape,
|
||||
pub retry_base_ms: u32,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateDeadLetterTrigger {
|
||||
pub script_id: ScriptId,
|
||||
pub source_filter: Option<String>,
|
||||
pub trigger_id_filter: Option<TriggerId>,
|
||||
pub script_id_filter: Option<ScriptId>,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
/// One match for the dispatcher's "which KV triggers fire on this
|
||||
/// event" lookup. Carries everything the dispatcher needs to construct
|
||||
/// the outbox row.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KvTriggerMatch {
|
||||
pub trigger_id: TriggerId,
|
||||
pub script_id: ScriptId,
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
pub retry_max_attempts: u32,
|
||||
pub retry_backoff: BackoffShape,
|
||||
pub retry_base_ms: u32,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
/// One match for the dispatcher's "which dead-letter triggers fire
|
||||
/// on this dead-letter row" lookup.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeadLetterTriggerMatch {
|
||||
pub trigger_id: TriggerId,
|
||||
pub script_id: ScriptId,
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait TriggerRepo: Send + Sync {
|
||||
async fn create_kv_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateKvTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError>;
|
||||
|
||||
async fn create_dead_letter_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateDeadLetterTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError>;
|
||||
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError>;
|
||||
|
||||
async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError>;
|
||||
|
||||
async fn delete(&self, id: TriggerId) -> Result<bool, TriggerRepoError>;
|
||||
|
||||
/// Dispatcher hot path: find every enabled KV trigger in `app_id`
|
||||
/// whose `collection_glob` matches `collection` and whose `ops`
|
||||
/// covers `op`. Glob matching done in Rust (the column is plain
|
||||
/// TEXT, the matcher applies "*"/"prefix:*" semantics).
|
||||
async fn list_matching_kv(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
op: KvEventOp,
|
||||
) -> Result<Vec<KvTriggerMatch>, TriggerRepoError>;
|
||||
|
||||
/// Dispatcher hot path for dead-letter fan-out. Filters: source
|
||||
/// (or any-source), originating trigger_id (or any), originating
|
||||
/// script_id (or any). Each filter is "match OR is_null".
|
||||
async fn list_matching_dead_letter(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
source: &str,
|
||||
trigger_id: Option<TriggerId>,
|
||||
script_id: Option<ScriptId>,
|
||||
) -> Result<Vec<DeadLetterTriggerMatch>, TriggerRepoError>;
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Postgres impl
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
pub struct PostgresTriggerRepo {
|
||||
pool: PgPool,
|
||||
}
|
||||
|
||||
impl PostgresTriggerRepo {
|
||||
#[must_use]
|
||||
pub fn new(pool: PgPool) -> Self {
|
||||
Self { pool }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TriggerRepo for PostgresTriggerRepo {
|
||||
async fn create_kv_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateKvTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError> {
|
||||
if req.collection_glob.is_empty() {
|
||||
return Err(TriggerRepoError::Invalid(
|
||||
"collection_glob must not be empty".into(),
|
||||
));
|
||||
}
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let parent: TriggerRow = sqlx::query_as(
|
||||
"INSERT INTO triggers ( \
|
||||
app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal \
|
||||
) VALUES ($1, $2, 'kv', TRUE, $3, $4, $5, $6, $7) \
|
||||
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal, created_at, updated_at",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(req.script_id.into_inner())
|
||||
.bind(req.dispatch_mode.as_str())
|
||||
.bind(i32::try_from(req.retry_max_attempts).unwrap_or(3))
|
||||
.bind(req.retry_backoff.as_str())
|
||||
.bind(i32::try_from(req.retry_base_ms).unwrap_or(1000))
|
||||
.bind(req.registered_by_principal.into_inner())
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let ops_str: Vec<String> = req.ops.iter().map(|o| o.as_str().to_string()).collect();
|
||||
sqlx::query(
|
||||
"INSERT INTO kv_trigger_details (trigger_id, collection_glob, ops) \
|
||||
VALUES ($1, $2, $3)",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.bind(&req.collection_glob)
|
||||
.bind(&ops_str)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Trigger {
|
||||
id: parent.id.into(),
|
||||
app_id: parent.app_id.into(),
|
||||
script_id: parent.script_id.into(),
|
||||
kind: TriggerKind::Kv,
|
||||
enabled: parent.enabled,
|
||||
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
|
||||
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
|
||||
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
|
||||
.unwrap_or(BackoffShape::Exponential),
|
||||
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
|
||||
registered_by_principal: parent.registered_by_principal.into(),
|
||||
created_at: parent.created_at,
|
||||
updated_at: parent.updated_at,
|
||||
details: TriggerDetails::Kv {
|
||||
collection_glob: req.collection_glob,
|
||||
ops: req.ops,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_dead_letter_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateDeadLetterTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError> {
|
||||
let mut tx = self.pool.begin().await?;
|
||||
// Dead-letter triggers force max_attempts=1 (design notes §4
|
||||
// recursion-stop). Backoff/base_ms irrelevant but the columns
|
||||
// are NOT NULL — store sensible values.
|
||||
let parent: TriggerRow = sqlx::query_as(
|
||||
"INSERT INTO triggers ( \
|
||||
app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal \
|
||||
) VALUES ($1, $2, 'dead_letter', TRUE, 'async', 1, 'constant', 0, $3) \
|
||||
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal, created_at, updated_at",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(req.script_id.into_inner())
|
||||
.bind(req.registered_by_principal.into_inner())
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO dead_letter_trigger_details \
|
||||
(trigger_id, source_filter, trigger_id_filter, script_id_filter) \
|
||||
VALUES ($1, $2, $3, $4)",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.bind(req.source_filter.as_deref())
|
||||
.bind(req.trigger_id_filter.map(TriggerId::into_inner))
|
||||
.bind(req.script_id_filter.map(ScriptId::into_inner))
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Trigger {
|
||||
id: parent.id.into(),
|
||||
app_id: parent.app_id.into(),
|
||||
script_id: parent.script_id.into(),
|
||||
kind: TriggerKind::DeadLetter,
|
||||
enabled: parent.enabled,
|
||||
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
|
||||
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(1),
|
||||
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
|
||||
.unwrap_or(BackoffShape::Constant),
|
||||
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(0),
|
||||
registered_by_principal: parent.registered_by_principal.into(),
|
||||
created_at: parent.created_at,
|
||||
updated_at: parent.updated_at,
|
||||
details: TriggerDetails::DeadLetter {
|
||||
source_filter: req.source_filter,
|
||||
trigger_id_filter: req.trigger_id_filter,
|
||||
script_id_filter: req.script_id_filter,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
|
||||
let parents: Vec<TriggerRow> = sqlx::query_as(
|
||||
"SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal, created_at, updated_at \
|
||||
FROM triggers WHERE app_id = $1 ORDER BY created_at DESC",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let mut out = Vec::with_capacity(parents.len());
|
||||
for p in parents {
|
||||
out.push(hydrate_one(&self.pool, p).await?);
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError> {
|
||||
let parent: Option<TriggerRow> = sqlx::query_as(
|
||||
"SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal, created_at, updated_at \
|
||||
FROM triggers WHERE id = $1",
|
||||
)
|
||||
.bind(id.into_inner())
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
match parent {
|
||||
Some(p) => Ok(Some(hydrate_one(&self.pool, p).await?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete(&self, id: TriggerId) -> Result<bool, TriggerRepoError> {
|
||||
// ON DELETE CASCADE on the detail tables takes care of them.
|
||||
let res = sqlx::query("DELETE FROM triggers WHERE id = $1")
|
||||
.bind(id.into_inner())
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
Ok(res.rows_affected() > 0)
|
||||
}
|
||||
|
||||
async fn list_matching_kv(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
op: KvEventOp,
|
||||
) -> Result<Vec<KvTriggerMatch>, TriggerRepoError> {
|
||||
// Fetch all enabled KV triggers for the app — glob matching
|
||||
// happens in Rust so we don't have to teach the query about
|
||||
// `*` and `prefix:*`. Sets are tiny in practice (one app's
|
||||
// worth of triggers, usually a handful).
|
||||
let rows: Vec<KvMatchRow> = sqlx::query_as(
|
||||
"SELECT t.id, t.script_id, t.dispatch_mode, \
|
||||
t.retry_max_attempts, t.retry_backoff, t.retry_base_ms, \
|
||||
t.registered_by_principal, \
|
||||
d.collection_glob, d.ops \
|
||||
FROM triggers t \
|
||||
JOIN kv_trigger_details d ON d.trigger_id = t.id \
|
||||
WHERE t.app_id = $1 AND t.kind = 'kv' AND t.enabled = TRUE",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
let op_str = op.as_str();
|
||||
let mut out = Vec::new();
|
||||
for r in rows {
|
||||
if !collection_matches(&r.collection_glob, collection) {
|
||||
continue;
|
||||
}
|
||||
let any_op = r.ops.is_empty();
|
||||
if !any_op && !r.ops.iter().any(|o| o == op_str) {
|
||||
continue;
|
||||
}
|
||||
out.push(KvTriggerMatch {
|
||||
trigger_id: r.id.into(),
|
||||
script_id: r.script_id.into(),
|
||||
dispatch_mode: dispatch_from_str(&r.dispatch_mode),
|
||||
retry_max_attempts: u32::try_from(r.retry_max_attempts).unwrap_or(3),
|
||||
retry_backoff: BackoffShape::from_wire(&r.retry_backoff)
|
||||
.unwrap_or(BackoffShape::Exponential),
|
||||
retry_base_ms: u32::try_from(r.retry_base_ms).unwrap_or(1000),
|
||||
registered_by_principal: r.registered_by_principal.into(),
|
||||
});
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
async fn list_matching_dead_letter(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
source: &str,
|
||||
trigger_id: Option<TriggerId>,
|
||||
script_id: Option<ScriptId>,
|
||||
) -> Result<Vec<DeadLetterTriggerMatch>, TriggerRepoError> {
|
||||
let rows: Vec<DlMatchRow> = sqlx::query_as(
|
||||
"SELECT t.id, t.script_id, t.dispatch_mode, t.registered_by_principal, \
|
||||
d.source_filter, d.trigger_id_filter, d.script_id_filter \
|
||||
FROM triggers t \
|
||||
JOIN dead_letter_trigger_details d ON d.trigger_id = t.id \
|
||||
WHERE t.app_id = $1 AND t.kind = 'dead_letter' AND t.enabled = TRUE \
|
||||
AND (d.source_filter IS NULL OR d.source_filter = $2) \
|
||||
AND (d.trigger_id_filter IS NULL OR d.trigger_id_filter = $3) \
|
||||
AND (d.script_id_filter IS NULL OR d.script_id_filter = $4)",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(source)
|
||||
.bind(trigger_id.map(TriggerId::into_inner))
|
||||
.bind(script_id.map(ScriptId::into_inner))
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
Ok(rows
|
||||
.into_iter()
|
||||
.map(|r| DeadLetterTriggerMatch {
|
||||
trigger_id: r.id.into(),
|
||||
script_id: r.script_id.into(),
|
||||
dispatch_mode: dispatch_from_str(&r.dispatch_mode),
|
||||
registered_by_principal: r.registered_by_principal.into(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result<Trigger, TriggerRepoError> {
|
||||
let kind = TriggerKind::from_wire(&parent.kind).ok_or_else(|| {
|
||||
TriggerRepoError::Invalid(format!("unknown trigger kind {}", parent.kind))
|
||||
})?;
|
||||
|
||||
let details = match kind {
|
||||
TriggerKind::Kv => {
|
||||
let row: KvDetailRow = sqlx::query_as(
|
||||
"SELECT collection_glob, ops FROM kv_trigger_details WHERE trigger_id = $1",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
let ops = row
|
||||
.ops
|
||||
.iter()
|
||||
.filter_map(|s| KvEventOp::from_wire(s))
|
||||
.collect();
|
||||
TriggerDetails::Kv {
|
||||
collection_glob: row.collection_glob,
|
||||
ops,
|
||||
}
|
||||
}
|
||||
TriggerKind::DeadLetter => {
|
||||
let row: DlDetailRow = sqlx::query_as(
|
||||
"SELECT source_filter, trigger_id_filter, script_id_filter \
|
||||
FROM dead_letter_trigger_details WHERE trigger_id = $1",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
TriggerDetails::DeadLetter {
|
||||
source_filter: row.source_filter,
|
||||
trigger_id_filter: row.trigger_id_filter.map(Into::into),
|
||||
script_id_filter: row.script_id_filter.map(Into::into),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Trigger {
|
||||
id: parent.id.into(),
|
||||
app_id: parent.app_id.into(),
|
||||
script_id: parent.script_id.into(),
|
||||
kind,
|
||||
enabled: parent.enabled,
|
||||
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
|
||||
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
|
||||
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
|
||||
.unwrap_or(BackoffShape::Exponential),
|
||||
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
|
||||
registered_by_principal: parent.registered_by_principal.into(),
|
||||
created_at: parent.created_at,
|
||||
updated_at: parent.updated_at,
|
||||
details,
|
||||
})
|
||||
}
|
||||
|
||||
fn dispatch_from_str(s: &str) -> TriggerDispatchMode {
|
||||
match s {
|
||||
"sync" => TriggerDispatchMode::Sync,
|
||||
_ => TriggerDispatchMode::Async,
|
||||
}
|
||||
}
|
||||
|
||||
/// Match a `collection_glob` against an actual `collection` name.
|
||||
/// Supported forms (in priority order):
|
||||
/// - `"*"` → matches every collection
|
||||
/// - `"foo*"` → prefix match (anything starting with "foo")
|
||||
/// - `"foo"` → exact match
|
||||
#[must_use]
|
||||
pub fn collection_matches(glob: &str, collection: &str) -> bool {
|
||||
if glob == "*" {
|
||||
return true;
|
||||
}
|
||||
if let Some(prefix) = glob.strip_suffix('*') {
|
||||
return collection.starts_with(prefix);
|
||||
}
|
||||
glob == collection
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct TriggerRow {
|
||||
id: Uuid,
|
||||
app_id: Uuid,
|
||||
script_id: Uuid,
|
||||
kind: String,
|
||||
enabled: bool,
|
||||
dispatch_mode: String,
|
||||
retry_max_attempts: i32,
|
||||
retry_backoff: String,
|
||||
retry_base_ms: i32,
|
||||
registered_by_principal: Uuid,
|
||||
created_at: DateTime<Utc>,
|
||||
updated_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct KvDetailRow {
|
||||
collection_glob: String,
|
||||
ops: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(clippy::struct_field_names)]
|
||||
struct DlDetailRow {
|
||||
source_filter: Option<String>,
|
||||
trigger_id_filter: Option<Uuid>,
|
||||
script_id_filter: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct KvMatchRow {
|
||||
id: Uuid,
|
||||
script_id: Uuid,
|
||||
dispatch_mode: String,
|
||||
retry_max_attempts: i32,
|
||||
retry_backoff: String,
|
||||
retry_base_ms: i32,
|
||||
registered_by_principal: Uuid,
|
||||
collection_glob: String,
|
||||
ops: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct DlMatchRow {
|
||||
id: Uuid,
|
||||
script_id: Uuid,
|
||||
dispatch_mode: String,
|
||||
registered_by_principal: Uuid,
|
||||
#[allow(dead_code)]
|
||||
source_filter: Option<String>,
|
||||
#[allow(dead_code)]
|
||||
trigger_id_filter: Option<Uuid>,
|
||||
#[allow(dead_code)]
|
||||
script_id_filter: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn collection_matcher_handles_star_prefix_exact() {
|
||||
assert!(collection_matches("*", "widgets"));
|
||||
assert!(collection_matches("*", ""));
|
||||
assert!(collection_matches("users:*", "users:1"));
|
||||
assert!(collection_matches("users:*", "users:"));
|
||||
assert!(!collection_matches("users:*", "orgs:1"));
|
||||
assert!(collection_matches("widgets", "widgets"));
|
||||
assert!(!collection_matches("widgets", "Widgets"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user