feat(v1.1.1-dispatcher): dispatcher loop + retry + depth limit + outbox emitter

`OutboxEventEmitter` replaces `NoopEventEmitter` in the picloud
binary's `Services` bundle. KV mutations now fan out to the outbox
via `TriggerRepo::list_matching_kv` — one row per matching trigger,
carrying the serialized `TriggerEvent` payload + the matching
trigger's retry policy.

`Dispatcher` is the single tokio task that polls the outbox every
100ms, claims due rows via FOR UPDATE SKIP LOCKED (with a batch cap),
and routes each to the executor. Shares the `ExecutionGate` with
sync HTTP per design notes §2 — gate saturation reschedules the
row instead of dropping it.

Outcome handling matches design notes §3 and §4:
- reply_to.is_some() (sync HTTP): never retry. Deliver via
  `InboxResolver`; if the receiver was dropped, write an
  `abandoned_executions` row.
- is_dead_letter_handler == true: never retry, never DL. On
  failure, annotate the original DL row with
  `resolution = 'handler_failed'`. Stops the recursion that would
  otherwise re-fire a broken handler script.
- Otherwise async: bump attempt_count, reschedule with exponential
  backoff + ±jitter; once max_attempts is reached, write a
  `dead_letters` row and drop from outbox.
- Trigger-depth limit: `cx.trigger_depth > max_trigger_depth` skips
  execution entirely (log + future metric), NEVER dead-letters.
  Loops are not retried via the DL chain — they're terminated.

`InboxResolver` trait lands in `picloud-shared` with a
`NoopInboxResolver` bootstrap that flags every delivery as
`Abandoned`. Commit 6 replaces the noop with the real
in-process registry in `orchestrator-core`.

`AdminPrincipalResolver` builds a `Principal` from a trigger's
`registered_by_principal` user id so the dispatched script executes
as the trigger registrant (design notes §4).

Unit tests cover backoff math (exponential/linear/constant) +
jitter range + ExecError → InboxFailureKind classification + the
status-code table mapping. Integration tests for the full
dispatcher loop need a real Postgres + executor; reviewer runs them
via the manual smoke flow in the plan / HANDBACK.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-01 22:01:42 +02:00
parent 2e92691ee1
commit 6a2971ac70
10 changed files with 953 additions and 24 deletions

View File

@@ -0,0 +1,610 @@
//! The triggers-framework dispatcher.
//!
//! Single tokio task that polls the outbox, claims due rows
//! (`FOR UPDATE SKIP LOCKED`), and routes each to the executor.
//! Shares the `ExecutionGate` with sync HTTP — they compete for the
//! same permit budget, matching design notes §2.
//!
//! Outcome handling per design notes §3 and §4:
//! - reply_to.is_some() (sync HTTP): never retry. Deliver to inbox
//! (or write `abandoned_executions` if the receiver dropped).
//! - is_dead_letter_handler == true: never retry, never DL. Failure
//! just annotates the original DL row with `resolution =
//! 'handler_failed'` and bumps a metric.
//! - Otherwise on failure: if `attempt_count + 1 < max_attempts`,
//! reschedule with backoff + jitter. Else, write a `dead_letters`
//! row and delete from outbox.
//!
//! Depth-limit: `trigger_depth > max_trigger_depth` skips execution
//! entirely (log + metric) and deletes the row — does NOT dead-letter
//! (design notes §4: depth-exceeded means "you built a loop", and
//! dead-lettering would just re-fire the same loop).
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
use picloud_orchestrator_core::{ExecutionGate, ExecutorClient};
use picloud_shared::{
ExecResponseSummary, ExecutionId, InboxDeliveryOutcome, InboxFailureKind, InboxResolver,
InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent,
};
use rand::Rng;
use uuid::Uuid;
use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution};
use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter};
use crate::outbox_repo::{OutboxRepo, OutboxRow, OutboxSourceKind};
use crate::principal_resolver::PrincipalResolver;
use crate::repo::ScriptRepository;
use crate::trigger_config::{BackoffShape, TriggerConfig};
use crate::trigger_repo::{TriggerKind, TriggerRepo};
/// Bundle the dispatcher reads from. Each handle is `Arc<dyn …>` so
/// tests can substitute in-memory backings.
pub struct Dispatcher {
pub outbox: Arc<dyn OutboxRepo>,
pub triggers: Arc<dyn TriggerRepo>,
pub scripts: Arc<dyn ScriptRepository>,
pub dead_letters: Arc<dyn DeadLetterRepo>,
pub abandoned: Arc<dyn AbandonedRepo>,
pub principals: Arc<dyn PrincipalResolver>,
pub executor: Arc<dyn ExecutorClient>,
pub gate: Arc<ExecutionGate>,
pub inbox: Arc<dyn InboxResolver>,
pub config: TriggerConfig,
/// Stable id for this dispatcher instance — written into
/// `outbox.claimed_by` for forensics. In MVP this is the host's
/// pid; cluster mode (v1.3+) uses node identity.
pub instance_id: String,
}
/// How many outbox rows the dispatcher tries to claim per tick.
/// Bounded to keep the working set small even if there's a flood.
const CLAIM_BATCH: i64 = 8;
/// Polling cadence. Short enough that fan-out feels instant; long
/// enough that an idle dispatcher doesn't burn cycles.
const TICK_INTERVAL: Duration = Duration::from_millis(100);
/// Hard cap on the wall-clock budget passed to the executor for an
/// async-dispatched script. Sync HTTP gets a per-script timeout via
/// the orchestrator path; async rows don't have one, so we apply a
/// platform-wide ceiling here. Matches `LocalExecutorClient`'s own
/// 5-minute cap.
const ASYNC_EXEC_TIMEOUT: Duration = Duration::from_secs(300);
impl Dispatcher {
/// Spawn the dispatcher loop as a detached `tokio::task`. The
/// returned `JoinHandle` is dropped — the loop runs for the
/// process lifetime.
pub fn spawn(self) {
tokio::spawn(async move {
self.run().await;
});
}
async fn run(self) {
let mut ticker = tokio::time::interval(TICK_INTERVAL);
// Skip the immediate first fire so we don't race startup.
ticker.tick().await;
loop {
ticker.tick().await;
if let Err(err) = self.tick().await {
tracing::warn!(?err, "dispatcher tick errored");
}
}
}
async fn tick(&self) -> Result<(), DispatcherError> {
// Cheap gate sample so we don't claim rows we can't dispatch.
// The exact permit budget is reapplied per-row below.
let rows = self
.outbox
.claim_due(&self.instance_id, CLAIM_BATCH)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
if rows.is_empty() {
return Ok(());
}
for row in rows {
// Process serially within a tick — the outer ticker is the
// pacing mechanism. Concurrent dispatchers are a cluster-
// mode concern; v1.1.1 MVP has one.
if let Err(err) = self.dispatch_one(row).await {
tracing::warn!(?err, "dispatch one errored");
}
}
Ok(())
}
async fn dispatch_one(&self, row: OutboxRow) -> Result<(), DispatcherError> {
// Depth-limit check — design notes §4: loops aren't DL'd.
if row.trigger_depth > self.config.max_trigger_depth {
tracing::warn!(
outbox_id = %row.id,
app_id = %row.app_id,
trigger_depth = row.trigger_depth,
"trigger depth exceeded; dropping row"
);
// TODO(metrics): bump `picloud_trigger_depth_exceeded{app_id,trigger_id}`.
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
return Ok(());
}
// Gate admission — non-blocking. If the gate is saturated,
// release the claim by rescheduling so another tick can pick
// it up. The row stays "due" essentially immediately.
let Ok(permit) = self.gate.try_acquire() else {
let next = Utc::now() + chrono::Duration::milliseconds(100);
self.outbox
.reschedule(row.id, row.attempt_count, next)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
return Ok(());
};
// Resolve the trigger config (KV or DL) and the script.
let resolved = match row.source_kind {
OutboxSourceKind::Http => {
// Sync HTTP path lands here when commit 6 wires up
// the orchestrator -> outbox bridge. For now, this
// arm is a forward-compat stub — drop the row to
// avoid a permanent stuck state.
tracing::debug!(outbox_id = %row.id, "HTTP outbox row encountered; commit 6 wires this in");
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
drop(permit);
return Ok(());
}
OutboxSourceKind::Kv | OutboxSourceKind::DeadLetter => {
self.resolve_trigger(&row).await?
}
};
let exec_req = match self.build_exec_request(&row, &resolved).await {
Ok(req) => req,
Err(err) => {
tracing::warn!(outbox_id = %row.id, ?err, "exec request build failed; dropping row");
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
drop(permit);
return Ok(());
}
};
// The gate permit auto-releases when this scope ends or when
// the executor finishes. We hand control to the executor and
// wait synchronously here — sync HTTP and dispatcher share the
// semaphore so this is intentional.
let source = resolved.script_source.clone();
let outcome = self
.executor
.execute(&source, exec_req, ASYNC_EXEC_TIMEOUT)
.await;
drop(permit);
match outcome {
Ok(resp) => self.handle_success(&row, &resolved, resp).await,
Err(err) => self.handle_failure(&row, &resolved, err).await,
}
}
async fn resolve_trigger(&self, row: &OutboxRow) -> Result<ResolvedTrigger, DispatcherError> {
// For KV and DL kinds, the outbox carries `trigger_id`. Use it
// to look up the trigger row, then resolve the script.
let Some(trigger_id) = row.trigger_id else {
return Err(DispatcherError::ResolveTrigger(
"outbox row missing trigger_id".into(),
));
};
let trigger = self
.triggers
.get(trigger_id)
.await
.map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))?
.ok_or_else(|| {
DispatcherError::ResolveTrigger(format!("trigger {trigger_id} not found"))
})?;
let script = self
.scripts
.get(trigger.script_id)
.await
.map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))?
.ok_or_else(|| {
DispatcherError::ResolveTrigger(format!("script {} not found", trigger.script_id))
})?;
Ok(ResolvedTrigger {
trigger_kind: trigger.kind,
is_dead_letter_handler: matches!(trigger.kind, TriggerKind::DeadLetter),
script_id: script.id,
script_source: script.source,
script_name: script.name,
sandbox_overrides: script.sandbox,
registered_by_principal: trigger.registered_by_principal,
retry_max_attempts: trigger.retry_max_attempts,
retry_backoff: trigger.retry_backoff,
retry_base_ms: trigger.retry_base_ms,
})
}
async fn build_exec_request(
&self,
row: &OutboxRow,
resolved: &ResolvedTrigger,
) -> Result<ExecRequest, DispatcherError> {
let trigger_event: TriggerEvent = serde_json::from_value(row.payload.clone())
.map_err(|e| DispatcherError::ResolveTrigger(format!("decode payload: {e}")))?;
let principal = self
.principals
.resolve(resolved.registered_by_principal)
.await
.map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))?;
let execution_id = ExecutionId::new();
Ok(ExecRequest {
execution_id,
request_id: RequestId::new(),
script_id: resolved.script_id,
script_name: resolved.script_name.clone(),
invocation_type: InvocationType::Function,
path: format!("/trigger/{}", trigger_event.source()),
headers: std::collections::BTreeMap::new(),
body: serde_json::Value::Null,
params: std::collections::BTreeMap::new(),
query: std::collections::BTreeMap::new(),
rest: String::new(),
sandbox_overrides: resolved.sandbox_overrides,
app_id: row.app_id,
principal: Some(principal),
trigger_depth: row.trigger_depth,
root_execution_id: row.root_execution_id.unwrap_or(execution_id),
is_dead_letter_handler: resolved.is_dead_letter_handler,
event: Some(trigger_event),
})
}
async fn handle_success(
&self,
row: &OutboxRow,
_resolved: &ResolvedTrigger,
resp: ExecResponse,
) -> Result<(), DispatcherError> {
if let Some(inbox_id) = row.reply_to {
self.deliver_inbox(row, inbox_id, InboxResult::Success(summarize(&resp)))
.await;
}
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
Ok(())
}
async fn handle_failure(
&self,
row: &OutboxRow,
resolved: &ResolvedTrigger,
err: ExecError,
) -> Result<(), DispatcherError> {
// Sync HTTP: always single-attempt. Always deliver outcome
// (success-or-failure) to the inbox. Never retry, never DL.
if let Some(inbox_id) = row.reply_to {
let (kind, message) = classify_exec_error(&err);
self.deliver_inbox(
row,
inbox_id,
InboxResult::Failure {
kind,
message: message.clone(),
},
)
.await;
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
return Ok(());
}
// Dead-letter handler: never retry, never DL. Failure
// annotates the original DL row + bumps a metric.
if resolved.is_dead_letter_handler {
tracing::error!(
outbox_id = %row.id,
app_id = %row.app_id,
?err,
"dead-letter handler failed; not retrying"
);
// TODO(metrics): bump `picloud_dead_letter_handler_failures{app_id}`.
// Annotate the original DL row (id is `row.payload.dead_letter.id`
// when the payload is a DeadLetter TriggerEvent). Best-effort:
// if the payload doesn't decode, just log and move on.
if let Ok(TriggerEvent::DeadLetter { dead_letter_id, .. }) =
serde_json::from_value::<TriggerEvent>(row.payload.clone())
{
if let Err(e) = self
.dead_letters
.resolve(dead_letter_id, "handler_failed")
.await
{
tracing::warn!(?e, "could not annotate DL row as handler_failed");
}
}
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
return Ok(());
}
// Async event: retry per policy, then dead-letter.
let attempt = row.attempt_count + 1;
if attempt < resolved.retry_max_attempts {
let delay = compute_backoff(
attempt,
resolved.retry_backoff,
resolved.retry_base_ms,
self.config.retry_jitter_pct,
);
let next = Utc::now() + chrono::Duration::milliseconds(i64::from(delay));
tracing::info!(
outbox_id = %row.id,
attempt,
max_attempts = resolved.retry_max_attempts,
retry_in_ms = delay,
"rescheduling outbox row"
);
self.outbox
.reschedule(row.id, attempt, next)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
return Ok(());
}
// Exhausted retries → dead-letter.
let (op, source) = describe_event(&row.payload);
let now = Utc::now();
if let Err(e) = self
.dead_letters
.insert(NewDeadLetter {
app_id: row.app_id,
original_event_id: row.id,
source,
op,
trigger_id: row.trigger_id,
script_id: Some(resolved.script_id),
payload: row.payload.clone(),
attempt_count: attempt,
first_attempt_at: row.created_at,
last_attempt_at: now,
last_error: err.to_string(),
})
.await
{
tracing::error!(?e, "failed to write dead-letter row");
}
self.outbox
.delete(row.id)
.await
.map_err(|e| DispatcherError::Outbox(e.to_string()))?;
Ok(())
}
async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) {
match self.inbox.deliver(inbox_id, result.clone()).await {
InboxDeliveryOutcome::Delivered => {}
InboxDeliveryOutcome::Abandoned => {
// Receiver was dropped — record forensic row + bump
// metric.
let (status_code, summary) = match &result {
InboxResult::Success(s) => (s.status_code, None),
InboxResult::Failure { kind, message } => {
(failure_kind_to_status(*kind), Some(message.clone()))
}
};
if let Err(e) = self
.abandoned
.insert(NewAbandonedExecution {
app_id: row.app_id,
outbox_id: row.id,
script_id: row.script_id,
inbox_id,
status_code,
result_summary: summary,
})
.await
{
tracing::warn!(?e, "abandoned_executions insert failed");
}
// TODO(metrics): bump `picloud_abandoned_executions_total{app_id}`.
}
}
}
}
#[derive(Debug)]
pub struct ResolvedTrigger {
pub trigger_kind: TriggerKind,
pub is_dead_letter_handler: bool,
pub script_id: ScriptId,
pub script_source: String,
pub script_name: String,
pub sandbox_overrides: ScriptSandbox,
pub registered_by_principal: picloud_shared::AdminUserId,
pub retry_max_attempts: u32,
pub retry_backoff: BackoffShape,
pub retry_base_ms: u32,
}
#[derive(Debug, thiserror::Error)]
pub enum DispatcherError {
#[error("outbox: {0}")]
Outbox(String),
#[error("resolve trigger: {0}")]
ResolveTrigger(String),
}
fn summarize(resp: &ExecResponse) -> ExecResponseSummary {
ExecResponseSummary {
status_code: resp.status_code,
headers: resp.headers.clone(),
body: resp.body.clone(),
}
}
/// Map `ExecError` onto the design-notes §3 status-code table.
fn classify_exec_error(err: &ExecError) -> (InboxFailureKind, String) {
match err {
ExecError::Parse(s) | ExecError::InvalidResponse(s) => {
(InboxFailureKind::Validation, s.clone())
}
ExecError::Timeout(_) => (InboxFailureKind::Timeout, err.to_string()),
ExecError::OperationBudgetExceeded => (InboxFailureKind::OperationBudget, err.to_string()),
ExecError::Overloaded { .. } => (InboxFailureKind::Overloaded, err.to_string()),
ExecError::Runtime(s) => (InboxFailureKind::Runtime, s.clone()),
}
}
fn failure_kind_to_status(k: InboxFailureKind) -> u16 {
match k {
InboxFailureKind::Validation => 422,
InboxFailureKind::Runtime => 502,
InboxFailureKind::Overloaded => 503,
InboxFailureKind::Timeout => 504,
InboxFailureKind::OperationBudget => 507,
InboxFailureKind::Platform => 500,
}
}
/// `(op, source)` extracted from the outbox payload. Used to seed the
/// `dead_letters` row when retries exhaust.
fn describe_event(payload: &serde_json::Value) -> (String, String) {
let source = payload
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let op = payload
.get("op")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
(op, source)
}
/// Compute backoff (ms) for the given attempt + policy + jitter.
/// Attempt is 1-indexed (first retry = attempt 1).
#[must_use]
pub fn compute_backoff(attempt: u32, backoff: BackoffShape, base_ms: u32, jitter_pct: u32) -> u32 {
let base_ms = u64::from(base_ms);
let attempt = u64::from(attempt.saturating_sub(1));
let raw = match backoff {
BackoffShape::Constant => base_ms,
BackoffShape::Linear => base_ms * (attempt + 1),
// 1x base, 2x base, 4x base, … (saturating).
BackoffShape::Exponential => base_ms.saturating_mul(1u64 << attempt.min(20)),
};
let raw = u32::try_from(raw.min(u64::from(u32::MAX))).unwrap_or(u32::MAX);
apply_jitter(raw, jitter_pct)
}
fn apply_jitter(raw: u32, pct: u32) -> u32 {
if pct == 0 {
return raw;
}
let pct = pct.min(100);
// ±span% — bounded by raw itself so we can't underflow when
// raw + offset goes below zero.
let span = u64::from(raw) * u64::from(pct) / 100;
if span == 0 {
return raw;
}
let span_i64 = i64::try_from(span).unwrap_or(i64::MAX);
let mut rng = rand::thread_rng();
let offset = rng.gen_range(-span_i64..=span_i64);
let signed = i64::from(raw).saturating_add(offset).max(0);
u32::try_from(signed.min(i64::from(u32::MAX))).unwrap_or(u32::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn exponential_backoff_doubles_per_attempt() {
// No jitter (pct=0) for a deterministic check.
assert_eq!(compute_backoff(1, BackoffShape::Exponential, 1000, 0), 1000);
assert_eq!(compute_backoff(2, BackoffShape::Exponential, 1000, 0), 2000);
assert_eq!(compute_backoff(3, BackoffShape::Exponential, 1000, 0), 4000);
assert_eq!(compute_backoff(4, BackoffShape::Exponential, 1000, 0), 8000);
}
#[test]
fn linear_backoff_scales_with_attempt() {
assert_eq!(compute_backoff(1, BackoffShape::Linear, 100, 0), 100);
assert_eq!(compute_backoff(2, BackoffShape::Linear, 100, 0), 200);
assert_eq!(compute_backoff(5, BackoffShape::Linear, 100, 0), 500);
}
#[test]
fn constant_backoff_returns_base() {
for attempt in 1..=5 {
assert_eq!(
compute_backoff(attempt, BackoffShape::Constant, 750, 0),
750
);
}
}
#[test]
fn jitter_within_pct_of_base() {
for _ in 0..100 {
let v = compute_backoff(1, BackoffShape::Constant, 1000, 20);
// ±20% of 1000 = 800..=1200.
assert!((800..=1200).contains(&v), "jitter out of range: {v}");
}
}
#[test]
fn classify_exec_error_covers_every_variant() {
let parse = classify_exec_error(&ExecError::Parse("nope".into()));
assert!(matches!(parse.0, InboxFailureKind::Validation));
let invalid = classify_exec_error(&ExecError::InvalidResponse("bad".into()));
assert!(matches!(invalid.0, InboxFailureKind::Validation));
let timeout = classify_exec_error(&ExecError::Timeout(30));
assert!(matches!(timeout.0, InboxFailureKind::Timeout));
let budget = classify_exec_error(&ExecError::OperationBudgetExceeded);
assert!(matches!(budget.0, InboxFailureKind::OperationBudget));
let runtime = classify_exec_error(&ExecError::Runtime("threw".into()));
assert!(matches!(runtime.0, InboxFailureKind::Runtime));
let overload = classify_exec_error(&ExecError::Overloaded {
retry_after_secs: 1,
});
assert!(matches!(overload.0, InboxFailureKind::Overloaded));
}
#[test]
fn failure_kind_status_codes_match_design_notes() {
assert_eq!(failure_kind_to_status(InboxFailureKind::Validation), 422);
assert_eq!(failure_kind_to_status(InboxFailureKind::Runtime), 502);
assert_eq!(failure_kind_to_status(InboxFailureKind::Overloaded), 503);
assert_eq!(failure_kind_to_status(InboxFailureKind::Timeout), 504);
assert_eq!(
failure_kind_to_status(InboxFailureKind::OperationBudget),
507
);
assert_eq!(failure_kind_to_status(InboxFailureKind::Platform), 500);
}
}