//! `OutboxRepo` — universal trigger outbox CRUD. Hot writes come from //! the `OutboxEventEmitter` (KV mutations fan out via this) and the //! sync-HTTP path. Hot reads come from the dispatcher, which claims //! due rows via `FOR UPDATE SKIP LOCKED`. use async_trait::async_trait; use chrono::{DateTime, Utc}; use picloud_shared::{ AdminUserId, AppId, ExecutionId, NewHttpOutbox, OutboxWriter, OutboxWriterError, ScriptId, TriggerId, }; use sqlx::PgPool; use uuid::Uuid; #[derive(Debug, thiserror::Error)] pub enum OutboxRepoError { #[error("database error: {0}")] Db(#[from] sqlx::Error), } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OutboxSourceKind { Http, Kv, /// v1.1.2. Docs, DeadLetter, /// v1.1.4. Cron, /// v1.1.5. Files, /// v1.1.5. Pubsub, /// v1.1.7. Inbound email POSTed to the webhook receiver. Email, } impl OutboxSourceKind { #[must_use] pub const fn as_str(self) -> &'static str { match self { Self::Http => "http", Self::Kv => "kv", Self::Docs => "docs", Self::DeadLetter => "dead_letter", Self::Cron => "cron", Self::Files => "files", Self::Pubsub => "pubsub", Self::Email => "email", } } #[must_use] pub fn from_wire(s: &str) -> Option { match s { "http" => Some(Self::Http), "kv" => Some(Self::Kv), "docs" => Some(Self::Docs), "dead_letter" => Some(Self::DeadLetter), "cron" => Some(Self::Cron), "files" => Some(Self::Files), "pubsub" => Some(Self::Pubsub), "email" => Some(Self::Email), _ => None, } } } /// Insert payload — what each event source writes when fanning out /// to the outbox. `payload` is the serialized `TriggerEvent` (plus /// any extra context the dispatcher needs to reconstruct an /// `ExecRequest`). #[derive(Debug, Clone)] pub struct NewOutboxRow { pub app_id: AppId, pub source_kind: OutboxSourceKind, pub trigger_id: Option, pub script_id: Option, pub reply_to: Option, pub payload: serde_json::Value, pub origin_principal: Option, pub trigger_depth: u32, pub root_execution_id: Option, } /// Row as the dispatcher sees it after a claim. #[derive(Debug, Clone)] pub struct OutboxRow { pub id: Uuid, pub app_id: AppId, pub source_kind: OutboxSourceKind, pub trigger_id: Option, pub script_id: Option, pub reply_to: Option, pub payload: serde_json::Value, pub origin_principal: Option, pub trigger_depth: u32, pub root_execution_id: Option, pub attempt_count: u32, pub next_attempt_at: DateTime, pub created_at: DateTime, } #[async_trait] pub trait OutboxRepo: Send + Sync { async fn insert(&self, row: NewOutboxRow) -> Result; /// Claim up to `limit` due rows. Wraps the claim in a single /// transaction so two concurrent dispatchers (cluster mode) can't /// double-pick a row. Empty Vec when nothing is due. async fn claim_due( &self, claimed_by: &str, limit: i64, ) -> Result, OutboxRepoError>; /// Remove a row after a terminal outcome (success or dead-letter). async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError>; /// Failure path: bump attempt_count, clear the claim, set the /// next attempt time. The dispatcher computes the delay (with /// backoff + jitter) and passes it in. async fn reschedule( &self, id: Uuid, attempt_count: u32, next_attempt_at: DateTime, ) -> Result<(), OutboxRepoError>; } pub struct PostgresOutboxRepo { pool: PgPool, } impl PostgresOutboxRepo { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl OutboxRepo for PostgresOutboxRepo { async fn insert(&self, row: NewOutboxRow) -> Result { let (id,): (Uuid,) = sqlx::query_as( "INSERT INTO outbox ( \ app_id, source_kind, trigger_id, script_id, reply_to, \ payload, origin_principal, trigger_depth, root_execution_id \ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ RETURNING id", ) .bind(row.app_id.into_inner()) .bind(row.source_kind.as_str()) .bind(row.trigger_id.map(TriggerId::into_inner)) .bind(row.script_id.map(ScriptId::into_inner)) .bind(row.reply_to) .bind(row.payload) .bind(row.origin_principal.map(AdminUserId::into_inner)) .bind(i32::try_from(row.trigger_depth).unwrap_or(0)) .bind(row.root_execution_id.map(ExecutionId::into_inner)) .fetch_one(&self.pool) .await?; Ok(id) } async fn claim_due( &self, claimed_by: &str, limit: i64, ) -> Result, OutboxRepoError> { let rows: Vec = sqlx::query_as( "WITH due AS ( \ SELECT id FROM outbox \ WHERE claimed_at IS NULL AND next_attempt_at <= NOW() \ ORDER BY next_attempt_at \ FOR UPDATE SKIP LOCKED \ LIMIT $1 \ ) \ UPDATE outbox SET claimed_at = NOW(), claimed_by = $2 \ WHERE id IN (SELECT id FROM due) \ RETURNING id, app_id, source_kind, trigger_id, script_id, reply_to, \ payload, origin_principal, trigger_depth, \ root_execution_id, attempt_count, next_attempt_at, created_at", ) .bind(limit) .bind(claimed_by) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().filter_map(OutboxRowRaw::hydrate).collect()) } async fn delete(&self, id: Uuid) -> Result<(), OutboxRepoError> { sqlx::query("DELETE FROM outbox WHERE id = $1") .bind(id) .execute(&self.pool) .await?; Ok(()) } async fn reschedule( &self, id: Uuid, attempt_count: u32, next_attempt_at: DateTime, ) -> Result<(), OutboxRepoError> { sqlx::query( "UPDATE outbox SET attempt_count = $2, next_attempt_at = $3, \ claimed_at = NULL, claimed_by = NULL \ WHERE id = $1", ) .bind(id) .bind(i32::try_from(attempt_count).unwrap_or(0)) .bind(next_attempt_at) .execute(&self.pool) .await?; Ok(()) } } /// `OutboxWriter` implementation so orchestrator-core (which can't /// depend on manager-core) can enqueue HTTP outbox rows through the /// shared trait. #[async_trait] impl OutboxWriter for PostgresOutboxRepo { async fn enqueue_http(&self, row: NewHttpOutbox) -> Result { self.insert(NewOutboxRow { app_id: row.app_id, source_kind: OutboxSourceKind::Http, trigger_id: Some(TriggerId::from(row.route_id)), script_id: Some(row.script_id), reply_to: row.reply_to, payload: row.payload, origin_principal: row.origin_principal, trigger_depth: row.trigger_depth, root_execution_id: row.root_execution_id, }) .await .map_err(|e| OutboxWriterError::Backend(e.to_string())) } } #[derive(sqlx::FromRow)] struct OutboxRowRaw { id: Uuid, app_id: Uuid, source_kind: String, trigger_id: Option, script_id: Option, reply_to: Option, payload: serde_json::Value, origin_principal: Option, trigger_depth: i32, root_execution_id: Option, attempt_count: i32, next_attempt_at: DateTime, created_at: DateTime, } impl OutboxRowRaw { fn hydrate(self) -> Option { Some(OutboxRow { id: self.id, app_id: self.app_id.into(), source_kind: OutboxSourceKind::from_wire(&self.source_kind)?, trigger_id: self.trigger_id.map(Into::into), script_id: self.script_id.map(Into::into), reply_to: self.reply_to, payload: self.payload, origin_principal: self.origin_principal.map(Into::into), trigger_depth: u32::try_from(self.trigger_depth).unwrap_or(0), root_execution_id: self.root_execution_id.map(Into::into), attempt_count: u32::try_from(self.attempt_count).unwrap_or(0), next_attempt_at: self.next_attempt_at, created_at: self.created_at, }) } }