//! `TriggerRepo` — CRUD over the `triggers` parent + per-kind detail //! tables. The admin endpoints (commit 4) sit on top of this; the //! dispatcher (commit 5) reads `list_matching_*` to fan out events to //! handler scripts. use async_trait::async_trait; use chrono::{DateTime, Utc}; use picloud_shared::{AdminUserId, AppId, KvEventOp, ScriptId, TriggerId}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; use crate::trigger_config::BackoffShape; #[derive(Debug, thiserror::Error)] pub enum TriggerRepoError { #[error("database error: {0}")] Db(#[from] sqlx::Error), #[error("trigger not found: {0}")] NotFound(TriggerId), #[error("invalid trigger payload: {0}")] Invalid(String), } /// Parent-table row plus the per-kind detail merged in. Serialized /// back to admin clients via the JSON API. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Trigger { pub id: TriggerId, pub app_id: AppId, pub script_id: ScriptId, pub kind: TriggerKind, pub enabled: bool, pub dispatch_mode: TriggerDispatchMode, pub retry_max_attempts: u32, pub retry_backoff: BackoffShape, pub retry_base_ms: u32, pub registered_by_principal: AdminUserId, pub created_at: DateTime, pub updated_at: DateTime, pub details: TriggerDetails, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TriggerKind { Kv, DeadLetter, } impl TriggerKind { #[must_use] pub const fn as_str(self) -> &'static str { match self { Self::Kv => "kv", Self::DeadLetter => "dead_letter", } } #[must_use] pub fn from_wire(s: &str) -> Option { match s { "kv" => Some(Self::Kv), "dead_letter" => Some(Self::DeadLetter), _ => None, } } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TriggerDispatchMode { Sync, Async, } impl TriggerDispatchMode { #[must_use] pub const fn as_str(self) -> &'static str { match self { Self::Sync => "sync", Self::Async => "async", } } } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum TriggerDetails { Kv { collection_glob: String, ops: Vec, }, DeadLetter { #[serde(default, skip_serializing_if = "Option::is_none")] source_filter: Option, #[serde(default, skip_serializing_if = "Option::is_none")] trigger_id_filter: Option, #[serde(default, skip_serializing_if = "Option::is_none")] script_id_filter: Option, }, } /// Create payload for a KV trigger. Defaults applied at the admin /// layer (uses `TriggerConfig::from_env` to fill retry settings if /// the request omitted them — keeps the row auditable). #[derive(Debug, Clone)] pub struct CreateKvTrigger { pub script_id: ScriptId, pub collection_glob: String, pub ops: Vec, pub dispatch_mode: TriggerDispatchMode, pub retry_max_attempts: u32, pub retry_backoff: BackoffShape, pub retry_base_ms: u32, pub registered_by_principal: AdminUserId, } #[derive(Debug, Clone)] pub struct CreateDeadLetterTrigger { pub script_id: ScriptId, pub source_filter: Option, pub trigger_id_filter: Option, pub script_id_filter: Option, pub registered_by_principal: AdminUserId, } /// One match for the dispatcher's "which KV triggers fire on this /// event" lookup. Carries everything the dispatcher needs to construct /// the outbox row. #[derive(Debug, Clone)] pub struct KvTriggerMatch { pub trigger_id: TriggerId, pub script_id: ScriptId, pub dispatch_mode: TriggerDispatchMode, pub retry_max_attempts: u32, pub retry_backoff: BackoffShape, pub retry_base_ms: u32, pub registered_by_principal: AdminUserId, } /// One match for the dispatcher's "which dead-letter triggers fire /// on this dead-letter row" lookup. #[derive(Debug, Clone)] pub struct DeadLetterTriggerMatch { pub trigger_id: TriggerId, pub script_id: ScriptId, pub dispatch_mode: TriggerDispatchMode, pub registered_by_principal: AdminUserId, } #[async_trait] pub trait TriggerRepo: Send + Sync { async fn create_kv_trigger( &self, app_id: AppId, req: CreateKvTrigger, ) -> Result; async fn create_dead_letter_trigger( &self, app_id: AppId, req: CreateDeadLetterTrigger, ) -> Result; async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError>; async fn get(&self, id: TriggerId) -> Result, TriggerRepoError>; async fn delete(&self, id: TriggerId) -> Result; /// Dispatcher hot path: find every enabled KV trigger in `app_id` /// whose `collection_glob` matches `collection` and whose `ops` /// covers `op`. Glob matching done in Rust (the column is plain /// TEXT, the matcher applies "*"/"prefix:*" semantics). async fn list_matching_kv( &self, app_id: AppId, collection: &str, op: KvEventOp, ) -> Result, TriggerRepoError>; /// Dispatcher hot path for dead-letter fan-out. Filters: source /// (or any-source), originating trigger_id (or any), originating /// script_id (or any). Each filter is "match OR is_null". async fn list_matching_dead_letter( &self, app_id: AppId, source: &str, trigger_id: Option, script_id: Option, ) -> Result, TriggerRepoError>; } // ---------------------------------------------------------------------------- // Postgres impl // ---------------------------------------------------------------------------- pub struct PostgresTriggerRepo { pool: PgPool, } impl PostgresTriggerRepo { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl TriggerRepo for PostgresTriggerRepo { async fn create_kv_trigger( &self, app_id: AppId, req: CreateKvTrigger, ) -> Result { if req.collection_glob.is_empty() { return Err(TriggerRepoError::Invalid( "collection_glob must not be empty".into(), )); } let mut tx = self.pool.begin().await?; let parent: TriggerRow = sqlx::query_as( "INSERT INTO triggers ( \ app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal \ ) VALUES ($1, $2, 'kv', TRUE, $3, $4, $5, $6, $7) \ RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal, created_at, updated_at", ) .bind(app_id.into_inner()) .bind(req.script_id.into_inner()) .bind(req.dispatch_mode.as_str()) .bind(i32::try_from(req.retry_max_attempts).unwrap_or(3)) .bind(req.retry_backoff.as_str()) .bind(i32::try_from(req.retry_base_ms).unwrap_or(1000)) .bind(req.registered_by_principal.into_inner()) .fetch_one(&mut *tx) .await?; let ops_str: Vec = req.ops.iter().map(|o| o.as_str().to_string()).collect(); sqlx::query( "INSERT INTO kv_trigger_details (trigger_id, collection_glob, ops) \ VALUES ($1, $2, $3)", ) .bind(parent.id) .bind(&req.collection_glob) .bind(&ops_str) .execute(&mut *tx) .await?; tx.commit().await?; Ok(Trigger { id: parent.id.into(), app_id: parent.app_id.into(), script_id: parent.script_id.into(), kind: TriggerKind::Kv, enabled: parent.enabled, dispatch_mode: dispatch_from_str(&parent.dispatch_mode), retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) .unwrap_or(BackoffShape::Exponential), retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), registered_by_principal: parent.registered_by_principal.into(), created_at: parent.created_at, updated_at: parent.updated_at, details: TriggerDetails::Kv { collection_glob: req.collection_glob, ops: req.ops, }, }) } async fn create_dead_letter_trigger( &self, app_id: AppId, req: CreateDeadLetterTrigger, ) -> Result { let mut tx = self.pool.begin().await?; // Dead-letter triggers force max_attempts=1 (design notes §4 // recursion-stop). Backoff/base_ms irrelevant but the columns // are NOT NULL — store sensible values. let parent: TriggerRow = sqlx::query_as( "INSERT INTO triggers ( \ app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal \ ) VALUES ($1, $2, 'dead_letter', TRUE, 'async', 1, 'constant', 0, $3) \ RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal, created_at, updated_at", ) .bind(app_id.into_inner()) .bind(req.script_id.into_inner()) .bind(req.registered_by_principal.into_inner()) .fetch_one(&mut *tx) .await?; sqlx::query( "INSERT INTO dead_letter_trigger_details \ (trigger_id, source_filter, trigger_id_filter, script_id_filter) \ VALUES ($1, $2, $3, $4)", ) .bind(parent.id) .bind(req.source_filter.as_deref()) .bind(req.trigger_id_filter.map(TriggerId::into_inner)) .bind(req.script_id_filter.map(ScriptId::into_inner)) .execute(&mut *tx) .await?; tx.commit().await?; Ok(Trigger { id: parent.id.into(), app_id: parent.app_id.into(), script_id: parent.script_id.into(), kind: TriggerKind::DeadLetter, enabled: parent.enabled, dispatch_mode: dispatch_from_str(&parent.dispatch_mode), retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(1), retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) .unwrap_or(BackoffShape::Constant), retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(0), registered_by_principal: parent.registered_by_principal.into(), created_at: parent.created_at, updated_at: parent.updated_at, details: TriggerDetails::DeadLetter { source_filter: req.source_filter, trigger_id_filter: req.trigger_id_filter, script_id_filter: req.script_id_filter, }, }) } async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { let parents: Vec = sqlx::query_as( "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal, created_at, updated_at \ FROM triggers WHERE app_id = $1 ORDER BY created_at DESC", ) .bind(app_id.into_inner()) .fetch_all(&self.pool) .await?; let mut out = Vec::with_capacity(parents.len()); for p in parents { out.push(hydrate_one(&self.pool, p).await?); } Ok(out) } async fn get(&self, id: TriggerId) -> Result, TriggerRepoError> { let parent: Option = sqlx::query_as( "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ retry_max_attempts, retry_backoff, retry_base_ms, \ registered_by_principal, created_at, updated_at \ FROM triggers WHERE id = $1", ) .bind(id.into_inner()) .fetch_optional(&self.pool) .await?; match parent { Some(p) => Ok(Some(hydrate_one(&self.pool, p).await?)), None => Ok(None), } } async fn delete(&self, id: TriggerId) -> Result { // ON DELETE CASCADE on the detail tables takes care of them. let res = sqlx::query("DELETE FROM triggers WHERE id = $1") .bind(id.into_inner()) .execute(&self.pool) .await?; Ok(res.rows_affected() > 0) } async fn list_matching_kv( &self, app_id: AppId, collection: &str, op: KvEventOp, ) -> Result, TriggerRepoError> { // Fetch all enabled KV triggers for the app — glob matching // happens in Rust so we don't have to teach the query about // `*` and `prefix:*`. Sets are tiny in practice (one app's // worth of triggers, usually a handful). let rows: Vec = sqlx::query_as( "SELECT t.id, t.script_id, t.dispatch_mode, \ t.retry_max_attempts, t.retry_backoff, t.retry_base_ms, \ t.registered_by_principal, \ d.collection_glob, d.ops \ FROM triggers t \ JOIN kv_trigger_details d ON d.trigger_id = t.id \ WHERE t.app_id = $1 AND t.kind = 'kv' AND t.enabled = TRUE", ) .bind(app_id.into_inner()) .fetch_all(&self.pool) .await?; let op_str = op.as_str(); let mut out = Vec::new(); for r in rows { if !collection_matches(&r.collection_glob, collection) { continue; } let any_op = r.ops.is_empty(); if !any_op && !r.ops.iter().any(|o| o == op_str) { continue; } out.push(KvTriggerMatch { trigger_id: r.id.into(), script_id: r.script_id.into(), dispatch_mode: dispatch_from_str(&r.dispatch_mode), retry_max_attempts: u32::try_from(r.retry_max_attempts).unwrap_or(3), retry_backoff: BackoffShape::from_wire(&r.retry_backoff) .unwrap_or(BackoffShape::Exponential), retry_base_ms: u32::try_from(r.retry_base_ms).unwrap_or(1000), registered_by_principal: r.registered_by_principal.into(), }); } Ok(out) } async fn list_matching_dead_letter( &self, app_id: AppId, source: &str, trigger_id: Option, script_id: Option, ) -> Result, TriggerRepoError> { let rows: Vec = sqlx::query_as( "SELECT t.id, t.script_id, t.dispatch_mode, t.registered_by_principal, \ d.source_filter, d.trigger_id_filter, d.script_id_filter \ FROM triggers t \ JOIN dead_letter_trigger_details d ON d.trigger_id = t.id \ WHERE t.app_id = $1 AND t.kind = 'dead_letter' AND t.enabled = TRUE \ AND (d.source_filter IS NULL OR d.source_filter = $2) \ AND (d.trigger_id_filter IS NULL OR d.trigger_id_filter = $3) \ AND (d.script_id_filter IS NULL OR d.script_id_filter = $4)", ) .bind(app_id.into_inner()) .bind(source) .bind(trigger_id.map(TriggerId::into_inner)) .bind(script_id.map(ScriptId::into_inner)) .fetch_all(&self.pool) .await?; Ok(rows .into_iter() .map(|r| DeadLetterTriggerMatch { trigger_id: r.id.into(), script_id: r.script_id.into(), dispatch_mode: dispatch_from_str(&r.dispatch_mode), registered_by_principal: r.registered_by_principal.into(), }) .collect()) } } async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { let kind = TriggerKind::from_wire(&parent.kind).ok_or_else(|| { TriggerRepoError::Invalid(format!("unknown trigger kind {}", parent.kind)) })?; let details = match kind { TriggerKind::Kv => { let row: KvDetailRow = sqlx::query_as( "SELECT collection_glob, ops FROM kv_trigger_details WHERE trigger_id = $1", ) .bind(parent.id) .fetch_one(pool) .await?; let ops = row .ops .iter() .filter_map(|s| KvEventOp::from_wire(s)) .collect(); TriggerDetails::Kv { collection_glob: row.collection_glob, ops, } } TriggerKind::DeadLetter => { let row: DlDetailRow = sqlx::query_as( "SELECT source_filter, trigger_id_filter, script_id_filter \ FROM dead_letter_trigger_details WHERE trigger_id = $1", ) .bind(parent.id) .fetch_one(pool) .await?; TriggerDetails::DeadLetter { source_filter: row.source_filter, trigger_id_filter: row.trigger_id_filter.map(Into::into), script_id_filter: row.script_id_filter.map(Into::into), } } }; Ok(Trigger { id: parent.id.into(), app_id: parent.app_id.into(), script_id: parent.script_id.into(), kind, enabled: parent.enabled, dispatch_mode: dispatch_from_str(&parent.dispatch_mode), retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) .unwrap_or(BackoffShape::Exponential), retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), registered_by_principal: parent.registered_by_principal.into(), created_at: parent.created_at, updated_at: parent.updated_at, details, }) } fn dispatch_from_str(s: &str) -> TriggerDispatchMode { match s { "sync" => TriggerDispatchMode::Sync, _ => TriggerDispatchMode::Async, } } /// Match a `collection_glob` against an actual `collection` name. /// Supported forms (in priority order): /// - `"*"` → matches every collection /// - `"foo*"` → prefix match (anything starting with "foo") /// - `"foo"` → exact match #[must_use] pub fn collection_matches(glob: &str, collection: &str) -> bool { if glob == "*" { return true; } if let Some(prefix) = glob.strip_suffix('*') { return collection.starts_with(prefix); } glob == collection } #[derive(sqlx::FromRow)] struct TriggerRow { id: Uuid, app_id: Uuid, script_id: Uuid, kind: String, enabled: bool, dispatch_mode: String, retry_max_attempts: i32, retry_backoff: String, retry_base_ms: i32, registered_by_principal: Uuid, created_at: DateTime, updated_at: DateTime, } #[derive(sqlx::FromRow)] struct KvDetailRow { collection_glob: String, ops: Vec, } #[derive(sqlx::FromRow)] #[allow(clippy::struct_field_names)] struct DlDetailRow { source_filter: Option, trigger_id_filter: Option, script_id_filter: Option, } #[derive(sqlx::FromRow)] struct KvMatchRow { id: Uuid, script_id: Uuid, dispatch_mode: String, retry_max_attempts: i32, retry_backoff: String, retry_base_ms: i32, registered_by_principal: Uuid, collection_glob: String, ops: Vec, } #[derive(sqlx::FromRow)] struct DlMatchRow { id: Uuid, script_id: Uuid, dispatch_mode: String, registered_by_principal: Uuid, #[allow(dead_code)] source_filter: Option, #[allow(dead_code)] trigger_id_filter: Option, #[allow(dead_code)] script_id_filter: Option, } #[cfg(test)] mod tests { use super::*; #[test] fn collection_matcher_handles_star_prefix_exact() { assert!(collection_matches("*", "widgets")); assert!(collection_matches("*", "")); assert!(collection_matches("users:*", "users:1")); assert!(collection_matches("users:*", "users:")); assert!(!collection_matches("users:*", "orgs:1")); assert!(collection_matches("widgets", "widgets")); assert!(!collection_matches("widgets", "Widgets")); } }