diff --git a/Cargo.lock b/Cargo.lock index 890ff57..7da1d69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1605,6 +1605,7 @@ dependencies = [ "base64", "chrono", "data-encoding", + "picloud-executor-core", "picloud-orchestrator-core", "picloud-shared", "rand 0.8.6", diff --git a/crates/manager-core/Cargo.toml b/crates/manager-core/Cargo.toml index a1feeb1..90e62fd 100644 --- a/crates/manager-core/Cargo.toml +++ b/crates/manager-core/Cargo.toml @@ -10,13 +10,16 @@ workspace = true [dependencies] picloud-shared.workspace = true +picloud-executor-core.workspace = true picloud-orchestrator-core.workspace = true async-trait.workspace = true axum.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true uuid.workspace = true chrono.workspace = true @@ -24,7 +27,6 @@ sqlx.workspace = true url.workspace = true argon2.workspace = true -rand.workspace = true sha2.workspace = true base64.workspace = true data-encoding.workspace = true diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs new file mode 100644 index 0000000..f41725a --- /dev/null +++ b/crates/manager-core/src/dispatcher.rs @@ -0,0 +1,610 @@ +//! The triggers-framework dispatcher. +//! +//! Single tokio task that polls the outbox, claims due rows +//! (`FOR UPDATE SKIP LOCKED`), and routes each to the executor. +//! Shares the `ExecutionGate` with sync HTTP — they compete for the +//! same permit budget, matching design notes §2. +//! +//! Outcome handling per design notes §3 and §4: +//! - reply_to.is_some() (sync HTTP): never retry. Deliver to inbox +//! (or write `abandoned_executions` if the receiver dropped). +//! - is_dead_letter_handler == true: never retry, never DL. Failure +//! just annotates the original DL row with `resolution = +//! 'handler_failed'` and bumps a metric. +//! - Otherwise on failure: if `attempt_count + 1 < max_attempts`, +//! reschedule with backoff + jitter. Else, write a `dead_letters` +//! row and delete from outbox. +//! +//! Depth-limit: `trigger_depth > max_trigger_depth` skips execution +//! entirely (log + metric) and deletes the row — does NOT dead-letter +//! (design notes §4: depth-exceeded means "you built a loop", and +//! dead-lettering would just re-fire the same loop). + +use std::sync::Arc; +use std::time::Duration; + +use chrono::Utc; +use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; +use picloud_orchestrator_core::{ExecutionGate, ExecutorClient}; +use picloud_shared::{ + ExecResponseSummary, ExecutionId, InboxDeliveryOutcome, InboxFailureKind, InboxResolver, + InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, +}; +use rand::Rng; +use uuid::Uuid; + +use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution}; +use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter}; +use crate::outbox_repo::{OutboxRepo, OutboxRow, OutboxSourceKind}; +use crate::principal_resolver::PrincipalResolver; +use crate::repo::ScriptRepository; +use crate::trigger_config::{BackoffShape, TriggerConfig}; +use crate::trigger_repo::{TriggerKind, TriggerRepo}; + +/// Bundle the dispatcher reads from. Each handle is `Arc` so +/// tests can substitute in-memory backings. +pub struct Dispatcher { + pub outbox: Arc, + pub triggers: Arc, + pub scripts: Arc, + pub dead_letters: Arc, + pub abandoned: Arc, + pub principals: Arc, + pub executor: Arc, + pub gate: Arc, + pub inbox: Arc, + pub config: TriggerConfig, + /// Stable id for this dispatcher instance — written into + /// `outbox.claimed_by` for forensics. In MVP this is the host's + /// pid; cluster mode (v1.3+) uses node identity. + pub instance_id: String, +} + +/// How many outbox rows the dispatcher tries to claim per tick. +/// Bounded to keep the working set small even if there's a flood. +const CLAIM_BATCH: i64 = 8; + +/// Polling cadence. Short enough that fan-out feels instant; long +/// enough that an idle dispatcher doesn't burn cycles. +const TICK_INTERVAL: Duration = Duration::from_millis(100); + +/// Hard cap on the wall-clock budget passed to the executor for an +/// async-dispatched script. Sync HTTP gets a per-script timeout via +/// the orchestrator path; async rows don't have one, so we apply a +/// platform-wide ceiling here. Matches `LocalExecutorClient`'s own +/// 5-minute cap. +const ASYNC_EXEC_TIMEOUT: Duration = Duration::from_secs(300); + +impl Dispatcher { + /// Spawn the dispatcher loop as a detached `tokio::task`. The + /// returned `JoinHandle` is dropped — the loop runs for the + /// process lifetime. + pub fn spawn(self) { + tokio::spawn(async move { + self.run().await; + }); + } + + async fn run(self) { + let mut ticker = tokio::time::interval(TICK_INTERVAL); + // Skip the immediate first fire so we don't race startup. + ticker.tick().await; + loop { + ticker.tick().await; + if let Err(err) = self.tick().await { + tracing::warn!(?err, "dispatcher tick errored"); + } + } + } + + async fn tick(&self) -> Result<(), DispatcherError> { + // Cheap gate sample so we don't claim rows we can't dispatch. + // The exact permit budget is reapplied per-row below. + let rows = self + .outbox + .claim_due(&self.instance_id, CLAIM_BATCH) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + if rows.is_empty() { + return Ok(()); + } + for row in rows { + // Process serially within a tick — the outer ticker is the + // pacing mechanism. Concurrent dispatchers are a cluster- + // mode concern; v1.1.1 MVP has one. + if let Err(err) = self.dispatch_one(row).await { + tracing::warn!(?err, "dispatch one errored"); + } + } + Ok(()) + } + + async fn dispatch_one(&self, row: OutboxRow) -> Result<(), DispatcherError> { + // Depth-limit check — design notes §4: loops aren't DL'd. + if row.trigger_depth > self.config.max_trigger_depth { + tracing::warn!( + outbox_id = %row.id, + app_id = %row.app_id, + trigger_depth = row.trigger_depth, + "trigger depth exceeded; dropping row" + ); + // TODO(metrics): bump `picloud_trigger_depth_exceeded{app_id,trigger_id}`. + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + return Ok(()); + } + + // Gate admission — non-blocking. If the gate is saturated, + // release the claim by rescheduling so another tick can pick + // it up. The row stays "due" essentially immediately. + let Ok(permit) = self.gate.try_acquire() else { + let next = Utc::now() + chrono::Duration::milliseconds(100); + self.outbox + .reschedule(row.id, row.attempt_count, next) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + return Ok(()); + }; + + // Resolve the trigger config (KV or DL) and the script. + let resolved = match row.source_kind { + OutboxSourceKind::Http => { + // Sync HTTP path lands here when commit 6 wires up + // the orchestrator -> outbox bridge. For now, this + // arm is a forward-compat stub — drop the row to + // avoid a permanent stuck state. + tracing::debug!(outbox_id = %row.id, "HTTP outbox row encountered; commit 6 wires this in"); + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + drop(permit); + return Ok(()); + } + OutboxSourceKind::Kv | OutboxSourceKind::DeadLetter => { + self.resolve_trigger(&row).await? + } + }; + + let exec_req = match self.build_exec_request(&row, &resolved).await { + Ok(req) => req, + Err(err) => { + tracing::warn!(outbox_id = %row.id, ?err, "exec request build failed; dropping row"); + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + drop(permit); + return Ok(()); + } + }; + + // The gate permit auto-releases when this scope ends or when + // the executor finishes. We hand control to the executor and + // wait synchronously here — sync HTTP and dispatcher share the + // semaphore so this is intentional. + let source = resolved.script_source.clone(); + let outcome = self + .executor + .execute(&source, exec_req, ASYNC_EXEC_TIMEOUT) + .await; + drop(permit); + + match outcome { + Ok(resp) => self.handle_success(&row, &resolved, resp).await, + Err(err) => self.handle_failure(&row, &resolved, err).await, + } + } + + async fn resolve_trigger(&self, row: &OutboxRow) -> Result { + // For KV and DL kinds, the outbox carries `trigger_id`. Use it + // to look up the trigger row, then resolve the script. + let Some(trigger_id) = row.trigger_id else { + return Err(DispatcherError::ResolveTrigger( + "outbox row missing trigger_id".into(), + )); + }; + let trigger = self + .triggers + .get(trigger_id) + .await + .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? + .ok_or_else(|| { + DispatcherError::ResolveTrigger(format!("trigger {trigger_id} not found")) + })?; + + let script = self + .scripts + .get(trigger.script_id) + .await + .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? + .ok_or_else(|| { + DispatcherError::ResolveTrigger(format!("script {} not found", trigger.script_id)) + })?; + + Ok(ResolvedTrigger { + trigger_kind: trigger.kind, + is_dead_letter_handler: matches!(trigger.kind, TriggerKind::DeadLetter), + script_id: script.id, + script_source: script.source, + script_name: script.name, + sandbox_overrides: script.sandbox, + registered_by_principal: trigger.registered_by_principal, + retry_max_attempts: trigger.retry_max_attempts, + retry_backoff: trigger.retry_backoff, + retry_base_ms: trigger.retry_base_ms, + }) + } + + async fn build_exec_request( + &self, + row: &OutboxRow, + resolved: &ResolvedTrigger, + ) -> Result { + let trigger_event: TriggerEvent = serde_json::from_value(row.payload.clone()) + .map_err(|e| DispatcherError::ResolveTrigger(format!("decode payload: {e}")))?; + + let principal = self + .principals + .resolve(resolved.registered_by_principal) + .await + .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))?; + + let execution_id = ExecutionId::new(); + Ok(ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id: resolved.script_id, + script_name: resolved.script_name.clone(), + invocation_type: InvocationType::Function, + path: format!("/trigger/{}", trigger_event.source()), + headers: std::collections::BTreeMap::new(), + body: serde_json::Value::Null, + params: std::collections::BTreeMap::new(), + query: std::collections::BTreeMap::new(), + rest: String::new(), + sandbox_overrides: resolved.sandbox_overrides, + app_id: row.app_id, + principal: Some(principal), + trigger_depth: row.trigger_depth, + root_execution_id: row.root_execution_id.unwrap_or(execution_id), + is_dead_letter_handler: resolved.is_dead_letter_handler, + event: Some(trigger_event), + }) + } + + async fn handle_success( + &self, + row: &OutboxRow, + _resolved: &ResolvedTrigger, + resp: ExecResponse, + ) -> Result<(), DispatcherError> { + if let Some(inbox_id) = row.reply_to { + self.deliver_inbox(row, inbox_id, InboxResult::Success(summarize(&resp))) + .await; + } + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + Ok(()) + } + + async fn handle_failure( + &self, + row: &OutboxRow, + resolved: &ResolvedTrigger, + err: ExecError, + ) -> Result<(), DispatcherError> { + // Sync HTTP: always single-attempt. Always deliver outcome + // (success-or-failure) to the inbox. Never retry, never DL. + if let Some(inbox_id) = row.reply_to { + let (kind, message) = classify_exec_error(&err); + self.deliver_inbox( + row, + inbox_id, + InboxResult::Failure { + kind, + message: message.clone(), + }, + ) + .await; + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + return Ok(()); + } + + // Dead-letter handler: never retry, never DL. Failure + // annotates the original DL row + bumps a metric. + if resolved.is_dead_letter_handler { + tracing::error!( + outbox_id = %row.id, + app_id = %row.app_id, + ?err, + "dead-letter handler failed; not retrying" + ); + // TODO(metrics): bump `picloud_dead_letter_handler_failures{app_id}`. + // Annotate the original DL row (id is `row.payload.dead_letter.id` + // when the payload is a DeadLetter TriggerEvent). Best-effort: + // if the payload doesn't decode, just log and move on. + if let Ok(TriggerEvent::DeadLetter { dead_letter_id, .. }) = + serde_json::from_value::(row.payload.clone()) + { + if let Err(e) = self + .dead_letters + .resolve(dead_letter_id, "handler_failed") + .await + { + tracing::warn!(?e, "could not annotate DL row as handler_failed"); + } + } + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + return Ok(()); + } + + // Async event: retry per policy, then dead-letter. + let attempt = row.attempt_count + 1; + if attempt < resolved.retry_max_attempts { + let delay = compute_backoff( + attempt, + resolved.retry_backoff, + resolved.retry_base_ms, + self.config.retry_jitter_pct, + ); + let next = Utc::now() + chrono::Duration::milliseconds(i64::from(delay)); + tracing::info!( + outbox_id = %row.id, + attempt, + max_attempts = resolved.retry_max_attempts, + retry_in_ms = delay, + "rescheduling outbox row" + ); + self.outbox + .reschedule(row.id, attempt, next) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + return Ok(()); + } + + // Exhausted retries → dead-letter. + let (op, source) = describe_event(&row.payload); + let now = Utc::now(); + if let Err(e) = self + .dead_letters + .insert(NewDeadLetter { + app_id: row.app_id, + original_event_id: row.id, + source, + op, + trigger_id: row.trigger_id, + script_id: Some(resolved.script_id), + payload: row.payload.clone(), + attempt_count: attempt, + first_attempt_at: row.created_at, + last_attempt_at: now, + last_error: err.to_string(), + }) + .await + { + tracing::error!(?e, "failed to write dead-letter row"); + } + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + Ok(()) + } + + async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) { + match self.inbox.deliver(inbox_id, result.clone()).await { + InboxDeliveryOutcome::Delivered => {} + InboxDeliveryOutcome::Abandoned => { + // Receiver was dropped — record forensic row + bump + // metric. + let (status_code, summary) = match &result { + InboxResult::Success(s) => (s.status_code, None), + InboxResult::Failure { kind, message } => { + (failure_kind_to_status(*kind), Some(message.clone())) + } + }; + if let Err(e) = self + .abandoned + .insert(NewAbandonedExecution { + app_id: row.app_id, + outbox_id: row.id, + script_id: row.script_id, + inbox_id, + status_code, + result_summary: summary, + }) + .await + { + tracing::warn!(?e, "abandoned_executions insert failed"); + } + // TODO(metrics): bump `picloud_abandoned_executions_total{app_id}`. + } + } + } +} + +#[derive(Debug)] +pub struct ResolvedTrigger { + pub trigger_kind: TriggerKind, + pub is_dead_letter_handler: bool, + pub script_id: ScriptId, + pub script_source: String, + pub script_name: String, + pub sandbox_overrides: ScriptSandbox, + pub registered_by_principal: picloud_shared::AdminUserId, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, +} + +#[derive(Debug, thiserror::Error)] +pub enum DispatcherError { + #[error("outbox: {0}")] + Outbox(String), + #[error("resolve trigger: {0}")] + ResolveTrigger(String), +} + +fn summarize(resp: &ExecResponse) -> ExecResponseSummary { + ExecResponseSummary { + status_code: resp.status_code, + headers: resp.headers.clone(), + body: resp.body.clone(), + } +} + +/// Map `ExecError` onto the design-notes §3 status-code table. +fn classify_exec_error(err: &ExecError) -> (InboxFailureKind, String) { + match err { + ExecError::Parse(s) | ExecError::InvalidResponse(s) => { + (InboxFailureKind::Validation, s.clone()) + } + ExecError::Timeout(_) => (InboxFailureKind::Timeout, err.to_string()), + ExecError::OperationBudgetExceeded => (InboxFailureKind::OperationBudget, err.to_string()), + ExecError::Overloaded { .. } => (InboxFailureKind::Overloaded, err.to_string()), + ExecError::Runtime(s) => (InboxFailureKind::Runtime, s.clone()), + } +} + +fn failure_kind_to_status(k: InboxFailureKind) -> u16 { + match k { + InboxFailureKind::Validation => 422, + InboxFailureKind::Runtime => 502, + InboxFailureKind::Overloaded => 503, + InboxFailureKind::Timeout => 504, + InboxFailureKind::OperationBudget => 507, + InboxFailureKind::Platform => 500, + } +} + +/// `(op, source)` extracted from the outbox payload. Used to seed the +/// `dead_letters` row when retries exhaust. +fn describe_event(payload: &serde_json::Value) -> (String, String) { + let source = payload + .get("source") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let op = payload + .get("op") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + (op, source) +} + +/// Compute backoff (ms) for the given attempt + policy + jitter. +/// Attempt is 1-indexed (first retry = attempt 1). +#[must_use] +pub fn compute_backoff(attempt: u32, backoff: BackoffShape, base_ms: u32, jitter_pct: u32) -> u32 { + let base_ms = u64::from(base_ms); + let attempt = u64::from(attempt.saturating_sub(1)); + let raw = match backoff { + BackoffShape::Constant => base_ms, + BackoffShape::Linear => base_ms * (attempt + 1), + // 1x base, 2x base, 4x base, … (saturating). + BackoffShape::Exponential => base_ms.saturating_mul(1u64 << attempt.min(20)), + }; + let raw = u32::try_from(raw.min(u64::from(u32::MAX))).unwrap_or(u32::MAX); + apply_jitter(raw, jitter_pct) +} + +fn apply_jitter(raw: u32, pct: u32) -> u32 { + if pct == 0 { + return raw; + } + let pct = pct.min(100); + // ±span% — bounded by raw itself so we can't underflow when + // raw + offset goes below zero. + let span = u64::from(raw) * u64::from(pct) / 100; + if span == 0 { + return raw; + } + let span_i64 = i64::try_from(span).unwrap_or(i64::MAX); + let mut rng = rand::thread_rng(); + let offset = rng.gen_range(-span_i64..=span_i64); + let signed = i64::from(raw).saturating_add(offset).max(0); + u32::try_from(signed.min(i64::from(u32::MAX))).unwrap_or(u32::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn exponential_backoff_doubles_per_attempt() { + // No jitter (pct=0) for a deterministic check. + assert_eq!(compute_backoff(1, BackoffShape::Exponential, 1000, 0), 1000); + assert_eq!(compute_backoff(2, BackoffShape::Exponential, 1000, 0), 2000); + assert_eq!(compute_backoff(3, BackoffShape::Exponential, 1000, 0), 4000); + assert_eq!(compute_backoff(4, BackoffShape::Exponential, 1000, 0), 8000); + } + + #[test] + fn linear_backoff_scales_with_attempt() { + assert_eq!(compute_backoff(1, BackoffShape::Linear, 100, 0), 100); + assert_eq!(compute_backoff(2, BackoffShape::Linear, 100, 0), 200); + assert_eq!(compute_backoff(5, BackoffShape::Linear, 100, 0), 500); + } + + #[test] + fn constant_backoff_returns_base() { + for attempt in 1..=5 { + assert_eq!( + compute_backoff(attempt, BackoffShape::Constant, 750, 0), + 750 + ); + } + } + + #[test] + fn jitter_within_pct_of_base() { + for _ in 0..100 { + let v = compute_backoff(1, BackoffShape::Constant, 1000, 20); + // ±20% of 1000 = 800..=1200. + assert!((800..=1200).contains(&v), "jitter out of range: {v}"); + } + } + + #[test] + fn classify_exec_error_covers_every_variant() { + let parse = classify_exec_error(&ExecError::Parse("nope".into())); + assert!(matches!(parse.0, InboxFailureKind::Validation)); + let invalid = classify_exec_error(&ExecError::InvalidResponse("bad".into())); + assert!(matches!(invalid.0, InboxFailureKind::Validation)); + let timeout = classify_exec_error(&ExecError::Timeout(30)); + assert!(matches!(timeout.0, InboxFailureKind::Timeout)); + let budget = classify_exec_error(&ExecError::OperationBudgetExceeded); + assert!(matches!(budget.0, InboxFailureKind::OperationBudget)); + let runtime = classify_exec_error(&ExecError::Runtime("threw".into())); + assert!(matches!(runtime.0, InboxFailureKind::Runtime)); + let overload = classify_exec_error(&ExecError::Overloaded { + retry_after_secs: 1, + }); + assert!(matches!(overload.0, InboxFailureKind::Overloaded)); + } + + #[test] + fn failure_kind_status_codes_match_design_notes() { + assert_eq!(failure_kind_to_status(InboxFailureKind::Validation), 422); + assert_eq!(failure_kind_to_status(InboxFailureKind::Runtime), 502); + assert_eq!(failure_kind_to_status(InboxFailureKind::Overloaded), 503); + assert_eq!(failure_kind_to_status(InboxFailureKind::Timeout), 504); + assert_eq!( + failure_kind_to_status(InboxFailureKind::OperationBudget), + 507 + ); + assert_eq!(failure_kind_to_status(InboxFailureKind::Platform), 500); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index 09a77bf..869d3de 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -23,11 +23,14 @@ pub mod auth_bootstrap; pub mod auth_middleware; pub mod authz; pub mod dead_letter_repo; +pub mod dispatcher; pub mod kv_repo; pub mod kv_service; pub mod log_sink; pub mod migrations; +pub mod outbox_event_emitter; pub mod outbox_repo; +pub mod principal_resolver; pub mod repo; pub mod route_admin; pub mod route_repo; @@ -77,12 +80,15 @@ pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, De pub use dead_letter_repo::{ DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo, }; +pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_service::KvServiceImpl; pub use log_sink::PostgresExecutionLogSink; +pub use outbox_event_emitter::OutboxEventEmitter; pub use outbox_repo::{ NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo, }; +pub use principal_resolver::{AdminPrincipalResolver, PrincipalResolver, PrincipalResolverError}; pub use repo::{ ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository, RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError, diff --git a/crates/manager-core/src/outbox_event_emitter.rs b/crates/manager-core/src/outbox_event_emitter.rs new file mode 100644 index 0000000..176c10e --- /dev/null +++ b/crates/manager-core/src/outbox_event_emitter.rs @@ -0,0 +1,103 @@ +//! `OutboxEventEmitter` — the real `ServiceEventEmitter` that replaces +//! v1.1.0's `NoopEventEmitter` once the triggers framework lands. +//! +//! On each `emit` (a KV mutation, future doc/file/pubsub event, etc.): +//! 1. Look up matching triggers for the event's (app_id, source, op, +//! collection) tuple via `TriggerRepo::list_matching_*`. +//! 2. For each match, write one outbox row carrying the event payload +//! serialized as a `TriggerEvent`. +//! +//! Defaults applied at write time so `OutboxRow.payload` carries +//! everything the dispatcher needs to reconstruct the executor +//! invocation without joining back to the trigger row. +//! +//! Non-KV `ServiceEvent` sources are silently dropped in v1.1.1 — the +//! dispatcher only knows how to fire KV triggers this release. Future +//! sources (docs/files/pubsub) add their own dispatch arm. + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{ + EmitError, KvEventOp, SdkCallCx, ServiceEvent, ServiceEventEmitter, TriggerEvent, +}; + +use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind}; +use crate::trigger_repo::TriggerRepo; + +pub struct OutboxEventEmitter { + triggers: Arc, + outbox: Arc, +} + +impl OutboxEventEmitter { + #[must_use] + pub fn new(triggers: Arc, outbox: Arc) -> Self { + Self { triggers, outbox } + } +} + +#[async_trait] +impl ServiceEventEmitter for OutboxEventEmitter { + async fn emit(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> { + match event.source { + "kv" => self.emit_kv(cx, event).await, + // Future sources land here. For now, silently drop — the + // SDK calls `events.emit(...)` unconditionally for forward + // compat, so swallowing without an error is correct. + _ => Ok(()), + } + } +} + +impl OutboxEventEmitter { + async fn emit_kv(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> { + let Some(op) = KvEventOp::from_wire(event.op) else { + return Ok(()); // unknown op — drop quietly + }; + let Some(collection) = event.collection.clone() else { + return Ok(()); // KV events always carry a collection — defensively skip + }; + let key = event.key.clone().unwrap_or_default(); + + let matches = self + .triggers + .list_matching_kv(cx.app_id, &collection, op) + .await + .map_err(|e| EmitError::Unavailable(format!("trigger lookup: {e}")))?; + + if matches.is_empty() { + return Ok(()); + } + + // Serialize the originating event as a TriggerEvent so the + // dispatcher can hand it to the script as `ctx.event` without + // round-tripping back to the trigger row. + let trigger_event = TriggerEvent::Kv { + op, + collection, + key, + value: event.payload.clone(), + }; + let payload = serde_json::to_value(&trigger_event) + .map_err(|e| EmitError::Rejected(format!("event serialize: {e}")))?; + + for m in matches { + self.outbox + .insert(NewOutboxRow { + app_id: cx.app_id, + source_kind: OutboxSourceKind::Kv, + trigger_id: Some(m.trigger_id), + script_id: Some(m.script_id), + reply_to: None, + payload: payload.clone(), + origin_principal: cx.principal.as_ref().map(|p| p.user_id), + trigger_depth: cx.trigger_depth.saturating_add(1), + root_execution_id: Some(cx.root_execution_id), + }) + .await + .map_err(|e| EmitError::Unavailable(format!("outbox insert: {e}")))?; + } + Ok(()) + } +} diff --git a/crates/manager-core/src/principal_resolver.rs b/crates/manager-core/src/principal_resolver.rs new file mode 100644 index 0000000..2674b51 --- /dev/null +++ b/crates/manager-core/src/principal_resolver.rs @@ -0,0 +1,62 @@ +//! `PrincipalResolver` — turns a `registered_by_principal` user id from +//! a trigger row into the `Principal` the dispatcher passes through to +//! the executor. Per design notes §4, a trigger execution runs as the +//! user that registered the trigger; the original event's caller is +//! recorded elsewhere (on the outbox row, for forensics) and does not +//! become the execution principal. + +use async_trait::async_trait; +use picloud_shared::{AdminUserId, Principal}; + +use crate::admin_user_repo::{AdminUserRepository, AdminUserRepositoryError}; + +#[derive(Debug, thiserror::Error)] +pub enum PrincipalResolverError { + #[error("user not found: {0}")] + NotFound(AdminUserId), + #[error("user is inactive: {0}")] + Inactive(AdminUserId), + #[error("admin user repo error: {0}")] + Backend(String), +} + +#[async_trait] +pub trait PrincipalResolver: Send + Sync { + async fn resolve(&self, user_id: AdminUserId) -> Result; +} + +pub struct AdminPrincipalResolver { + users: std::sync::Arc, +} + +impl AdminPrincipalResolver { + #[must_use] + pub fn new(users: std::sync::Arc) -> Self { + Self { users } + } +} + +#[async_trait] +impl PrincipalResolver for AdminPrincipalResolver { + async fn resolve(&self, user_id: AdminUserId) -> Result { + let row = self + .users + .get(user_id) + .await + .map_err(|e: AdminUserRepositoryError| PrincipalResolverError::Backend(e.to_string()))? + .ok_or(PrincipalResolverError::NotFound(user_id))?; + if !row.is_active { + return Err(PrincipalResolverError::Inactive(user_id)); + } + Ok(Principal { + user_id, + instance_role: row.instance_role, + // Trigger executions are cookie-session-style (no API key + // scope restriction). Per-app permissions are evaluated + // via `authz::can` against the `app_id` of the resource + // the script touches, exactly like an admin invocation. + scopes: None, + app_binding: None, + }) + } +} diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 9f8c697..8122513 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -12,22 +12,26 @@ use picloud_executor_core::{Engine, Limits}; use picloud_manager_core::{ admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated, - route_admin_router, triggers_router, AdminSessionRepository, AdminState, AdminUserRepository, - AdminsState, ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, - AppMembersState, AppRepository, AppsState, AuthState, AuthzRepo, KvServiceImpl, - PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, - PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, - PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, - PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, RepoResolver, - RouteAdminState, RouteRepository, SandboxCeiling, TriggerConfig, TriggerRepo, TriggersState, + route_admin_router, triggers_router, AbandonedRepo, AdminPrincipalResolver, + AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, + ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository, + AppsState, AuthState, AuthzRepo, DeadLetterRepo, Dispatcher, KvServiceImpl, OutboxEventEmitter, + OutboxRepo, PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, + PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, + PostgresAppRepository, PostgresDeadLetterRepo, PostgresExecutionLogRepository, + PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresRouteRepository, + PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, RepoResolver, + RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo, + TriggersState, }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, KvService, NoopDeadLetterService, NoopEventEmitter, ScriptValidator, - ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, + ExecutionLogSink, InboxResolver, KvService, NoopDeadLetterService, NoopInboxResolver, + ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, + WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -99,23 +103,28 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { let members: Arc = members_concrete.clone(); let authz: Arc = members_concrete; - // SDK services bundle. v1.1.1 ships the KV store; the outbox-backed - // event emitter replaces `NoopEventEmitter` once the triggers - // dispatcher lands. `NoopDeadLetterService` is a v1.1.1 stub that - // errors loudly until the real `PostgresDeadLetterService` ships. - let kv_repo = Arc::new(PostgresKvRepo::new(pool.clone())); - let events: Arc = Arc::new(NoopEventEmitter); + // Triggers framework storage. The outbox event emitter routes + // KV mutations into the outbox; the dispatcher fans them out. + let trigger_repo: Arc = Arc::new(PostgresTriggerRepo::new(pool.clone())); + let outbox_repo: Arc = Arc::new(PostgresOutboxRepo::new(pool.clone())); + let dl_repo: Arc = Arc::new(PostgresDeadLetterRepo::new(pool.clone())); + let abandoned_repo: Arc = Arc::new(PostgresAbandonedRepo::new(pool.clone())); + let trigger_config = TriggerConfig::from_env(); + + // SDK services bundle. v1.1.1 ships the KV store and the + // outbox-backed event emitter; `NoopDeadLetterService` is a v1.1.1 + // stub that errors loudly until the real `PostgresDeadLetterService` + // ships (commit 8). + let kv_repo = Arc::new(PostgresKvRepo::new(pool)); + let events: Arc = Arc::new(OutboxEventEmitter::new( + trigger_repo.clone(), + outbox_repo.clone(), + )); let kv: Arc = Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone())); let services = Services::new(kv, Arc::new(NoopDeadLetterService), events); let engine = Arc::new(Engine::new(Limits::default(), services)); - // Trigger repo + config, shared between the admin endpoint and the - // dispatcher (commit 5). Read defaults from env so operators can - // tune retry / depth without rebuilding the binary. - let trigger_repo: Arc = Arc::new(PostgresTriggerRepo::new(pool)); - let trigger_config = TriggerConfig::from_env(); - // Compile the routes table once at startup; admin writes refresh it. let route_table = Arc::new(RouteTable::new()); let initial = route_repo.list_all().await?; @@ -146,7 +155,31 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { // Single global gate — overflow is rejected with 503 + Retry-After. // See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`. let gate = Arc::new(ExecutionGate::from_env()); - let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate)); + let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate.clone())); + + // Dispatcher — single tokio task that polls the outbox and routes + // due rows to the executor. Shares the `ExecutionGate` with sync + // HTTP per design notes §2 (one cap for everything). NoopInboxResolver + // until commit 6 wires the real in-process inbox registry. + let dispatcher_script_repo: Arc = + Arc::new(PostgresScriptRepoHandle(script_repo.clone())); + let principals: Arc = + Arc::new(AdminPrincipalResolver::new(auth.users.clone())); + let inbox: Arc = Arc::new(NoopInboxResolver); + Dispatcher { + outbox: outbox_repo.clone(), + triggers: trigger_repo.clone(), + scripts: dispatcher_script_repo, + dead_letters: dl_repo.clone(), + abandoned: abandoned_repo.clone(), + principals, + executor: executor.clone(), + gate, + inbox, + config: trigger_config, + instance_id: format!("picloud-{}", std::process::id()), + } + .spawn(); let admin = AdminState { repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())), @@ -170,6 +203,10 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { app_domains: app_domain_table.clone(), routes: route_table, }; + // Silence unused-import warnings for repos handed to the + // dispatcher in this commit; commit 8 wires them into the + // dead-letters admin endpoints and commit 10 into the GC sweeper. + let _ = (&dl_repo, &abandoned_repo); let triggers_state = TriggersState { triggers: trigger_repo, apps: apps_repo.clone(), diff --git a/crates/shared/src/exec_summary.rs b/crates/shared/src/exec_summary.rs new file mode 100644 index 0000000..3ba96a9 --- /dev/null +++ b/crates/shared/src/exec_summary.rs @@ -0,0 +1,16 @@ +//! `ExecResponseSummary` — a flattened, crate-portable view of an +//! `ExecResponse` for use by `InboxResult`. Lives in +//! `picloud-shared` because the dispatcher (manager-core) and the +//! orchestrator-core inbox registry both need to read it, and +//! `executor-core::ExecResponse` is owned by a leaf crate. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExecResponseSummary { + pub status_code: u16, + pub headers: BTreeMap, + pub body: serde_json::Value, +} diff --git a/crates/shared/src/inbox.rs b/crates/shared/src/inbox.rs new file mode 100644 index 0000000..ed4c096 --- /dev/null +++ b/crates/shared/src/inbox.rs @@ -0,0 +1,86 @@ +//! `InboxResolver` — abstraction the dispatcher uses to deliver sync +//! HTTP results back to the orchestrator that's awaiting them on a +//! oneshot channel. Lives in `picloud-shared` because the dispatcher +//! (manager-core) and the registry impl (orchestrator-core) live in +//! different crates and need a shared trait surface. +//! +//! v1.1.1 ships an in-process implementation in `orchestrator-core` +//! that keeps a `HashMap>`. Cluster +//! mode (v1.3+) swaps this for a Postgres `LISTEN/NOTIFY`-based +//! resolver without touching the dispatcher code (design notes §3 +//! implementation table). +//! +//! Until commit 6 wires up the real registry, `NoopInboxResolver` +//! (`Abandoned` for every attempt) keeps the dispatcher able to run. + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::ExecResponseSummary; + +/// Result of trying to hand back a sync-HTTP outcome. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InboxDeliveryOutcome { + /// Receiver still attached; result was delivered. Dispatcher + /// deletes the outbox row. + Delivered, + /// Receiver was dropped (orchestrator timed out). Dispatcher + /// writes an `abandoned_executions` row. + Abandoned, +} + +/// Outcome shape the dispatcher delivers to the inbox. Carries enough +/// to reconstruct an HTTP response — full body via JSON, optional +/// error string when the executor reported a failure. +#[derive(Debug, Clone)] +pub enum InboxResult { + /// Successful execution. `response` is the `ExecResponse` summary + /// (status code + body + headers + logs). + Success(ExecResponseSummary), + /// Failure modes — script threw, op-budget, timeout, etc. The + /// orchestrator maps these to the design-notes §3 status codes + /// (422/502/503/504/507/500) when responding to the HTTP caller. + Failure { + kind: InboxFailureKind, + message: String, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InboxFailureKind { + /// Script's Rhai code threw or hit a runtime error → 502. + Runtime, + /// Wall-clock exceeded → 504. + Timeout, + /// Operation budget exceeded → 507. + OperationBudget, + /// Gate refused admission → 503. + Overloaded, + /// Script parse failure / bad-request → 422. + Validation, + /// Platform problem (executor crashed, dispatcher crashed, etc.) → 500. + Platform, +} + +#[async_trait] +pub trait InboxResolver: Send + Sync { + /// Attempt to deliver `result` to the receiver registered under + /// `inbox_id`. Returns `Delivered` if the channel was alive, + /// `Abandoned` if the receiver was already dropped (the + /// orchestrator's timeout fired before the dispatcher got here). + async fn deliver(&self, inbox_id: Uuid, result: InboxResult) -> InboxDeliveryOutcome; +} + +/// Bootstrap impl used before the real registry is wired in. Every +/// delivery is treated as abandoned — the dispatcher records an +/// abandoned-execution row and moves on. Replaced in `build_app` with +/// the in-process `InboxRegistry` from orchestrator-core. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopInboxResolver; + +#[async_trait] +impl InboxResolver for NoopInboxResolver { + async fn deliver(&self, _inbox_id: Uuid, _result: InboxResult) -> InboxDeliveryOutcome { + InboxDeliveryOutcome::Abandoned + } +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 4246974..edb24fa 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -9,8 +9,10 @@ pub mod auth; pub mod dead_letters; pub mod error; pub mod events; +pub mod exec_summary; pub mod execution_log; pub mod ids; +pub mod inbox; pub mod kv; pub mod log_sink; pub mod route; @@ -27,8 +29,12 @@ pub use auth::{AppRole, InstanceRole, Principal, Scope, UserId}; pub use dead_letters::{DeadLetterError, DeadLetterId, DeadLetterService, NoopDeadLetterService}; pub use error::Error; pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter}; +pub use exec_summary::ExecResponseSummary; pub use execution_log::{ExecutionLog, ExecutionStatus}; pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId, TriggerId}; +pub use inbox::{ + InboxDeliveryOutcome, InboxFailureKind, InboxResolver, InboxResult, NoopInboxResolver, +}; pub use kv::{KvError, KvListPage, KvService, NoopKvService}; pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use route::{HostKind, PathKind, Route};