//! The triggers-framework dispatcher. //! //! Single tokio task that polls the outbox, claims due rows //! (`FOR UPDATE SKIP LOCKED`), and routes each to the executor. //! Shares the `ExecutionGate` with sync HTTP — they compete for the //! same permit budget, matching design notes §2. //! //! Outcome handling per design notes §3 and §4: //! - reply_to.is_some() (sync HTTP): never retry. Deliver to inbox //! (or write `abandoned_executions` if the receiver dropped). //! - is_dead_letter_handler == true: never retry, never DL. Failure //! just annotates the original DL row with `resolution = //! 'handler_failed'` and bumps a metric. //! - Otherwise on failure: if `attempt_count + 1 < max_attempts`, //! reschedule with backoff + jitter. Else, write a `dead_letters` //! row and delete from outbox. //! //! Depth-limit: `trigger_depth > max_trigger_depth` skips execution //! entirely (log + metric) and deletes the row — does NOT dead-letter //! (design notes §4: depth-exceeded means "you built a loop", and //! dead-lettering would just re-fire the same loop). use std::sync::Arc; use std::time::Duration; use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_orchestrator_core::{ExecutionGate, ExecutorClient}; use picloud_shared::{ ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome, InboxFailureKind, InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, }; use rand::Rng; use uuid::Uuid; use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution}; use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter}; use crate::outbox_repo::{OutboxRepo, OutboxRow, OutboxSourceKind}; use crate::principal_resolver::PrincipalResolver; use crate::repo::ScriptRepository; use crate::trigger_config::{BackoffShape, TriggerConfig}; use crate::trigger_repo::{TriggerKind, TriggerRepo}; /// Bundle the dispatcher reads from. Each handle is `Arc` so /// tests can substitute in-memory backings. pub struct Dispatcher { pub outbox: Arc, pub triggers: Arc, pub scripts: Arc, pub dead_letters: Arc, pub abandoned: Arc, pub principals: Arc, pub executor: Arc, pub gate: Arc, pub inbox: Arc, pub config: TriggerConfig, /// Stable id for this dispatcher instance — written into /// `outbox.claimed_by` for forensics. In MVP this is the host's /// pid; cluster mode (v1.3+) uses node identity. pub instance_id: String, } /// How many outbox rows the dispatcher tries to claim per tick. /// Bounded to keep the working set small even if there's a flood. const CLAIM_BATCH: i64 = 8; /// Polling cadence. Short enough that fan-out feels instant; long /// enough that an idle dispatcher doesn't burn cycles. const TICK_INTERVAL: Duration = Duration::from_millis(100); /// Hard cap on the wall-clock budget passed to the executor for an /// async-dispatched script. Sync HTTP gets a per-script timeout via /// the orchestrator path; async rows don't have one, so we apply a /// platform-wide ceiling here. Matches `LocalExecutorClient`'s own /// 5-minute cap. const ASYNC_EXEC_TIMEOUT: Duration = Duration::from_secs(300); impl Dispatcher { /// Spawn the dispatcher loop as a detached `tokio::task`. The /// returned `JoinHandle` is dropped — the loop runs for the /// process lifetime. pub fn spawn(self) { tokio::spawn(async move { self.run().await; }); } async fn run(self) { let mut ticker = tokio::time::interval(TICK_INTERVAL); // Skip the immediate first fire so we don't race startup. ticker.tick().await; loop { ticker.tick().await; if let Err(err) = self.tick().await { tracing::warn!(?err, "dispatcher tick errored"); } } } async fn tick(&self) -> Result<(), DispatcherError> { // Cheap gate sample so we don't claim rows we can't dispatch. // The exact permit budget is reapplied per-row below. let rows = self .outbox .claim_due(&self.instance_id, CLAIM_BATCH) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; if rows.is_empty() { return Ok(()); } for row in rows { // Process serially within a tick — the outer ticker is the // pacing mechanism. Concurrent dispatchers are a cluster- // mode concern; v1.1.1 MVP has one. if let Err(err) = self.dispatch_one(row).await { tracing::warn!(?err, "dispatch one errored"); } } Ok(()) } async fn dispatch_one(&self, row: OutboxRow) -> Result<(), DispatcherError> { // Depth-limit check — design notes §4: loops aren't DL'd. if row.trigger_depth > self.config.max_trigger_depth { tracing::warn!( outbox_id = %row.id, app_id = %row.app_id, trigger_depth = row.trigger_depth, "trigger depth exceeded; dropping row" ); // TODO(metrics): bump `picloud_trigger_depth_exceeded{app_id,trigger_id}`. self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; return Ok(()); } // Gate admission — non-blocking. If the gate is saturated, // release the claim by rescheduling so another tick can pick // it up. The row stays "due" essentially immediately. let Ok(permit) = self.gate.try_acquire() else { let next = Utc::now() + chrono::Duration::milliseconds(100); self.outbox .reschedule(row.id, row.attempt_count, next) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; return Ok(()); }; // Resolve the trigger config (KV / DL) or pull the HTTP // payload directly off the outbox row. let (resolved, exec_req) = match row.source_kind { OutboxSourceKind::Http => match self.build_http_request(&row).await { Ok(pair) => pair, Err(err) => { tracing::warn!(outbox_id = %row.id, ?err, "http exec build failed; dropping"); self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; drop(permit); return Ok(()); } }, OutboxSourceKind::Kv | OutboxSourceKind::Docs | OutboxSourceKind::DeadLetter | OutboxSourceKind::Cron | OutboxSourceKind::Files => { let resolved = self.resolve_trigger(&row).await?; let req = match self.build_exec_request(&row, &resolved).await { Ok(req) => req, Err(err) => { tracing::warn!(outbox_id = %row.id, ?err, "exec request build failed; dropping row"); self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; drop(permit); return Ok(()); } }; (resolved, req) } }; // The gate permit auto-releases when this scope ends or when // the executor finishes. We hand control to the executor and // wait synchronously here — sync HTTP and dispatcher share the // semaphore so this is intentional. let source = resolved.script_source.clone(); let identity = picloud_orchestrator_core::ScriptIdentity { script_id: resolved.script_id, updated_at: resolved.script_updated_at, }; let outcome = self .executor .execute_with_identity(identity, &source, exec_req, ASYNC_EXEC_TIMEOUT) .await; drop(permit); match outcome { Ok(resp) => self.handle_success(&row, &resolved, resp).await, Err(err) => self.handle_failure(&row, &resolved, err).await, } } async fn resolve_trigger(&self, row: &OutboxRow) -> Result { // For KV and DL kinds, the outbox carries `trigger_id`. Use it // to look up the trigger row, then resolve the script. let Some(trigger_id) = row.trigger_id else { return Err(DispatcherError::ResolveTrigger( "outbox row missing trigger_id".into(), )); }; let trigger = self .triggers .get(trigger_id) .await .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? .ok_or_else(|| { DispatcherError::ResolveTrigger(format!("trigger {trigger_id} not found")) })?; let script = self .scripts .get(trigger.script_id) .await .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? .ok_or_else(|| { DispatcherError::ResolveTrigger(format!("script {} not found", trigger.script_id)) })?; Ok(ResolvedTrigger { trigger_kind: trigger.kind, is_dead_letter_handler: matches!(trigger.kind, TriggerKind::DeadLetter), script_id: script.id, script_source: script.source, script_name: script.name, script_updated_at: script.updated_at, sandbox_overrides: script.sandbox, registered_by_principal: trigger.registered_by_principal, retry_max_attempts: trigger.retry_max_attempts, retry_backoff: trigger.retry_backoff, retry_base_ms: trigger.retry_base_ms, }) } async fn build_exec_request( &self, row: &OutboxRow, resolved: &ResolvedTrigger, ) -> Result { let trigger_event: TriggerEvent = serde_json::from_value(row.payload.clone()) .map_err(|e| DispatcherError::ResolveTrigger(format!("decode payload: {e}")))?; let principal = self .principals .resolve(resolved.registered_by_principal) .await .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))?; let execution_id = ExecutionId::new(); Ok(ExecRequest { execution_id, request_id: RequestId::new(), script_id: resolved.script_id, script_name: resolved.script_name.clone(), invocation_type: InvocationType::Function, path: format!("/trigger/{}", trigger_event.source()), headers: std::collections::BTreeMap::new(), body: serde_json::Value::Null, params: std::collections::BTreeMap::new(), query: std::collections::BTreeMap::new(), rest: String::new(), sandbox_overrides: resolved.sandbox_overrides, app_id: row.app_id, principal: Some(principal), trigger_depth: row.trigger_depth, root_execution_id: row.root_execution_id.unwrap_or(execution_id), is_dead_letter_handler: resolved.is_dead_letter_handler, event: Some(trigger_event), }) } /// Build an `(ResolvedTrigger, ExecRequest)` for an HTTP outbox /// row. HTTP rows don't have a backing `triggers` row (the /// `trigger_id` references `routes.id` instead). We pull the /// script id off the outbox row, the request shape off the /// payload, and synthesize a `ResolvedTrigger` with retry /// settings irrelevant for HTTP (sync HTTP is never retried; /// async HTTP uses default policy from `TriggerConfig`). async fn build_http_request( &self, row: &OutboxRow, ) -> Result<(ResolvedTrigger, ExecRequest), DispatcherError> { let Some(script_id) = row.script_id else { return Err(DispatcherError::ResolveTrigger( "HTTP outbox row missing script_id".into(), )); }; let script = self .scripts .get(script_id) .await .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? .ok_or_else(|| { DispatcherError::ResolveTrigger(format!("script {script_id} not found")) })?; let payload: HttpDispatchPayload = serde_json::from_value(row.payload.clone()) .map_err(|e| DispatcherError::ResolveTrigger(format!("decode http payload: {e}")))?; let execution_id = ExecutionId::new(); let req = ExecRequest { execution_id, request_id: RequestId::new(), script_id, script_name: payload.script_name.clone(), invocation_type: InvocationType::Http, path: payload.path.clone(), headers: payload.headers, body: payload.body, params: payload.params, query: payload.query, rest: payload.rest, sandbox_overrides: script.sandbox, app_id: row.app_id, // HTTP outbox rows don't run as the trigger registrant — // they run with no principal (public ingress) or the // attached one (origin_principal forensic field is not // promoted to execution principal in this MVP). principal: None, trigger_depth: row.trigger_depth, root_execution_id: row.root_execution_id.unwrap_or(execution_id), is_dead_letter_handler: false, event: None, }; let resolved = ResolvedTrigger { trigger_kind: TriggerKind::Kv, // placeholder; HTTP doesn't have a kind is_dead_letter_handler: false, script_id, script_source: script.source, script_name: payload.script_name, script_updated_at: script.updated_at, sandbox_overrides: script.sandbox, // HTTP outbox rows don't carry a registered_by_principal // — use a sentinel zero UUID since this field isn't used // downstream for HTTP (no retries, no inbox principal). registered_by_principal: picloud_shared::AdminUserId::from(uuid::Uuid::nil()), // Async HTTP uses the platform default retry policy from // TriggerConfig. Sync HTTP (reply_to.is_some) never retries // regardless. retry_max_attempts: self.config.retry_max_attempts, retry_backoff: self.config.retry_backoff, retry_base_ms: self.config.retry_base_ms, }; Ok((resolved, req)) } async fn handle_success( &self, row: &OutboxRow, _resolved: &ResolvedTrigger, resp: ExecResponse, ) -> Result<(), DispatcherError> { if let Some(inbox_id) = row.reply_to { self.deliver_inbox(row, inbox_id, InboxResult::Success(summarize(&resp))) .await; } self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; Ok(()) } async fn handle_failure( &self, row: &OutboxRow, resolved: &ResolvedTrigger, err: ExecError, ) -> Result<(), DispatcherError> { // Sync HTTP: always single-attempt. Always deliver outcome // (success-or-failure) to the inbox. Never retry, never DL. if let Some(inbox_id) = row.reply_to { let (kind, message) = classify_exec_error(&err); self.deliver_inbox( row, inbox_id, InboxResult::Failure { kind, message: message.clone(), }, ) .await; self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; return Ok(()); } // Dead-letter handler: never retry, never DL. Failure // annotates the original DL row + bumps a metric. if resolved.is_dead_letter_handler { tracing::error!( outbox_id = %row.id, app_id = %row.app_id, ?err, "dead-letter handler failed; not retrying" ); // TODO(metrics): bump `picloud_dead_letter_handler_failures{app_id}`. // Annotate the original DL row (id is `row.payload.dead_letter.id` // when the payload is a DeadLetter TriggerEvent). Best-effort: // if the payload doesn't decode, just log and move on. if let Ok(TriggerEvent::DeadLetter { dead_letter_id, .. }) = serde_json::from_value::(row.payload.clone()) { if let Err(e) = self .dead_letters .resolve(dead_letter_id, "handler_failed") .await { tracing::warn!(?e, "could not annotate DL row as handler_failed"); } } self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; return Ok(()); } // Async event: retry per policy, then dead-letter. let attempt = row.attempt_count + 1; if attempt < resolved.retry_max_attempts { let delay = compute_backoff( attempt, resolved.retry_backoff, resolved.retry_base_ms, self.config.retry_jitter_pct, ); let next = Utc::now() + chrono::Duration::milliseconds(i64::from(delay)); tracing::info!( outbox_id = %row.id, attempt, max_attempts = resolved.retry_max_attempts, retry_in_ms = delay, "rescheduling outbox row" ); self.outbox .reschedule(row.id, attempt, next) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; return Ok(()); } // Exhausted retries → dead-letter. let (op, source) = describe_event(&row.payload); let now = Utc::now(); if let Err(e) = self .dead_letters .insert(NewDeadLetter { app_id: row.app_id, original_event_id: row.id, source, op, trigger_id: row.trigger_id, script_id: Some(resolved.script_id), payload: row.payload.clone(), attempt_count: attempt, first_attempt_at: row.created_at, last_attempt_at: now, last_error: err.to_string(), }) .await { tracing::error!(?e, "failed to write dead-letter row"); } self.outbox .delete(row.id) .await .map_err(|e| DispatcherError::Outbox(e.to_string()))?; Ok(()) } async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) { match self.inbox.deliver(inbox_id, result.clone()).await { InboxDeliveryOutcome::Delivered => {} InboxDeliveryOutcome::Abandoned => { // Receiver was dropped — record forensic row + bump // metric. let (status_code, summary) = match &result { InboxResult::Success(s) => (s.status_code, None), InboxResult::Failure { kind, message } => { (failure_kind_to_status(*kind), Some(message.clone())) } }; if let Err(e) = self .abandoned .insert(NewAbandonedExecution { app_id: row.app_id, outbox_id: row.id, script_id: row.script_id, inbox_id, status_code, result_summary: summary, }) .await { tracing::warn!(?e, "abandoned_executions insert failed"); } // TODO(metrics): bump `picloud_abandoned_executions_total{app_id}`. } } } } #[derive(Debug)] pub struct ResolvedTrigger { pub trigger_kind: TriggerKind, pub is_dead_letter_handler: bool, pub script_id: ScriptId, pub script_source: String, pub script_name: String, /// v1.1.3: freshness comparator for the orchestrator's top-level /// script cache. The dispatcher hands `(script_id, updated_at)` /// in alongside the source so cached ASTs can be reused across /// triggered invocations. pub script_updated_at: chrono::DateTime, pub sandbox_overrides: ScriptSandbox, pub registered_by_principal: picloud_shared::AdminUserId, pub retry_max_attempts: u32, pub retry_backoff: BackoffShape, pub retry_base_ms: u32, } #[derive(Debug, thiserror::Error)] pub enum DispatcherError { #[error("outbox: {0}")] Outbox(String), #[error("resolve trigger: {0}")] ResolveTrigger(String), } fn summarize(resp: &ExecResponse) -> ExecResponseSummary { ExecResponseSummary { status_code: resp.status_code, headers: resp.headers.clone(), body: resp.body.clone(), } } /// Map `ExecError` onto the design-notes §3 status-code table. fn classify_exec_error(err: &ExecError) -> (InboxFailureKind, String) { match err { ExecError::Parse(s) | ExecError::InvalidResponse(s) => { (InboxFailureKind::Validation, s.clone()) } ExecError::Timeout(_) => (InboxFailureKind::Timeout, err.to_string()), ExecError::OperationBudgetExceeded => (InboxFailureKind::OperationBudget, err.to_string()), ExecError::Overloaded { .. } => (InboxFailureKind::Overloaded, err.to_string()), ExecError::Runtime(s) => (InboxFailureKind::Runtime, s.clone()), } } fn failure_kind_to_status(k: InboxFailureKind) -> u16 { match k { InboxFailureKind::Validation => 422, InboxFailureKind::Runtime => 502, InboxFailureKind::Overloaded => 503, InboxFailureKind::Timeout => 504, InboxFailureKind::OperationBudget => 507, InboxFailureKind::Platform => 500, } } /// `(op, source)` extracted from the outbox payload. Used to seed the /// `dead_letters` row when retries exhaust. fn describe_event(payload: &serde_json::Value) -> (String, String) { let source = payload .get("source") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let op = payload .get("op") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); (op, source) } /// Compute backoff (ms) for the given attempt + policy + jitter. /// Attempt is 1-indexed (first retry = attempt 1). #[must_use] pub fn compute_backoff(attempt: u32, backoff: BackoffShape, base_ms: u32, jitter_pct: u32) -> u32 { let base_ms = u64::from(base_ms); let attempt = u64::from(attempt.saturating_sub(1)); let raw = match backoff { BackoffShape::Constant => base_ms, BackoffShape::Linear => base_ms * (attempt + 1), // 1x base, 2x base, 4x base, … (saturating). BackoffShape::Exponential => base_ms.saturating_mul(1u64 << attempt.min(20)), }; let raw = u32::try_from(raw.min(u64::from(u32::MAX))).unwrap_or(u32::MAX); apply_jitter(raw, jitter_pct) } fn apply_jitter(raw: u32, pct: u32) -> u32 { if pct == 0 { return raw; } let pct = pct.min(100); // ±span% — bounded by raw itself so we can't underflow when // raw + offset goes below zero. let span = u64::from(raw) * u64::from(pct) / 100; if span == 0 { return raw; } let span_i64 = i64::try_from(span).unwrap_or(i64::MAX); let mut rng = rand::thread_rng(); let offset = rng.gen_range(-span_i64..=span_i64); let signed = i64::from(raw).saturating_add(offset).max(0); u32::try_from(signed.min(i64::from(u32::MAX))).unwrap_or(u32::MAX) } #[cfg(test)] mod tests { use super::*; #[test] fn exponential_backoff_doubles_per_attempt() { // No jitter (pct=0) for a deterministic check. assert_eq!(compute_backoff(1, BackoffShape::Exponential, 1000, 0), 1000); assert_eq!(compute_backoff(2, BackoffShape::Exponential, 1000, 0), 2000); assert_eq!(compute_backoff(3, BackoffShape::Exponential, 1000, 0), 4000); assert_eq!(compute_backoff(4, BackoffShape::Exponential, 1000, 0), 8000); } #[test] fn linear_backoff_scales_with_attempt() { assert_eq!(compute_backoff(1, BackoffShape::Linear, 100, 0), 100); assert_eq!(compute_backoff(2, BackoffShape::Linear, 100, 0), 200); assert_eq!(compute_backoff(5, BackoffShape::Linear, 100, 0), 500); } #[test] fn constant_backoff_returns_base() { for attempt in 1..=5 { assert_eq!( compute_backoff(attempt, BackoffShape::Constant, 750, 0), 750 ); } } #[test] fn jitter_within_pct_of_base() { for _ in 0..100 { let v = compute_backoff(1, BackoffShape::Constant, 1000, 20); // ±20% of 1000 = 800..=1200. assert!((800..=1200).contains(&v), "jitter out of range: {v}"); } } #[test] fn classify_exec_error_covers_every_variant() { let parse = classify_exec_error(&ExecError::Parse("nope".into())); assert!(matches!(parse.0, InboxFailureKind::Validation)); let invalid = classify_exec_error(&ExecError::InvalidResponse("bad".into())); assert!(matches!(invalid.0, InboxFailureKind::Validation)); let timeout = classify_exec_error(&ExecError::Timeout(30)); assert!(matches!(timeout.0, InboxFailureKind::Timeout)); let budget = classify_exec_error(&ExecError::OperationBudgetExceeded); assert!(matches!(budget.0, InboxFailureKind::OperationBudget)); let runtime = classify_exec_error(&ExecError::Runtime("threw".into())); assert!(matches!(runtime.0, InboxFailureKind::Runtime)); let overload = classify_exec_error(&ExecError::Overloaded { retry_after_secs: 1, }); assert!(matches!(overload.0, InboxFailureKind::Overloaded)); } #[test] fn failure_kind_status_codes_match_design_notes() { assert_eq!(failure_kind_to_status(InboxFailureKind::Validation), 422); assert_eq!(failure_kind_to_status(InboxFailureKind::Runtime), 502); assert_eq!(failure_kind_to_status(InboxFailureKind::Overloaded), 503); assert_eq!(failure_kind_to_status(InboxFailureKind::Timeout), 504); assert_eq!( failure_kind_to_status(InboxFailureKind::OperationBudget), 507 ); assert_eq!(failure_kind_to_status(InboxFailureKind::Platform), 500); } }