From 77b2cb58bbdfb42cf4b653adb9b1aa9108a85715 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 1 Jun 2026 22:12:55 +0200 Subject: [PATCH] feat(v1.1.1-routes): outbox-routed sync HTTP + dispatch_mode=async MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Routes gain `dispatch_mode TEXT NOT NULL DEFAULT 'sync'` (migration 0012). Existing routes default to sync so the migration is non-breaking. `DispatchMode` enum lands in `picloud-shared`. The user-routes orchestrator handler now branches: - `dispatch_mode = async` → write outbox row with `reply_to = None`, return `202 Accepted` + `{accepted_at, execution_id}`. Dispatcher fires the script in the background; retries / dead-letters via the framework from commit 5. - `dispatch_mode = sync` → register an inbox channel (`tokio::sync::oneshot`), write outbox row with `reply_to = inbox_id`, `.await` on the receiver with a timeout = script.timeout_seconds + 2s buffer. Dispatcher hands the result back; orchestrator maps `InboxResult` into the HTTP response per the design-notes §3 status-code table (422/502/503/504/507/500). `InboxRegistry` (orchestrator-core/src/inbox.rs) is the in-process implementation of `InboxResolver`. Lock-free HashMap of pending oneshot senders keyed by `inbox_id`. Tests cover register/deliver round-trip, unknown-id is abandoned, dropped-receiver is abandoned, explicit cancel. Cluster mode (v1.3+) swaps this for LISTEN/NOTIFY-keyed lookup behind the same trait. `OutboxWriter` trait lives in `picloud-shared` so orchestrator-core can write to the outbox without depending on manager-core (which would invert the dependency arrow). `PostgresOutboxRepo` implements both `OutboxRepo` (dispatcher surface) and `OutboxWriter` (orchestrator surface); the picloud binary clones the same concrete Arc into both trait views. The dispatcher's HTTP arm (commit 5 had a stub) now decodes the `HttpDispatchPayload` off the outbox row, looks up the script, synthesizes an `ExecRequest`, and runs it through the executor. Outcome routing reuses the same path as KV triggers — sync HTTP flows through the inbox, async dispatch gets dropped after success (or DL'd on exhaustion). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../migrations/0012_routes_dispatch_mode.sql | 16 + crates/manager-core/src/app_bootstrap.rs | 1 + crates/manager-core/src/dispatcher.rs | 137 +++++-- crates/manager-core/src/outbox_repo.rs | 27 +- crates/manager-core/src/route_admin.rs | 8 + crates/manager-core/src/route_repo.rs | 20 +- crates/orchestrator-core/src/api.rs | 344 ++++++++++++++++-- crates/orchestrator-core/src/inbox.rs | 139 +++++++ crates/orchestrator-core/src/lib.rs | 2 + .../orchestrator-core/src/routing/matcher.rs | 9 + crates/picloud/src/lib.rs | 27 +- crates/shared/src/lib.rs | 4 +- crates/shared/src/outbox_writer.rs | 72 ++++ crates/shared/src/route.rs | 39 ++ 14 files changed, 767 insertions(+), 78 deletions(-) create mode 100644 crates/manager-core/migrations/0012_routes_dispatch_mode.sql create mode 100644 crates/orchestrator-core/src/inbox.rs create mode 100644 crates/shared/src/outbox_writer.rs diff --git a/crates/manager-core/migrations/0012_routes_dispatch_mode.sql b/crates/manager-core/migrations/0012_routes_dispatch_mode.sql new file mode 100644 index 0000000..70ac2c5 --- /dev/null +++ b/crates/manager-core/migrations/0012_routes_dispatch_mode.sql @@ -0,0 +1,16 @@ +-- v1.1.1: per-route dispatch mode (design notes §2 + §3). +-- +-- `sync` (default): orchestrator awaits the executor inline and +-- returns the response in the same HTTP request — current MVP +-- behaviour. +-- `async`: orchestrator writes the request to the trigger outbox, +-- returns `202 Accepted` immediately. The dispatcher runs the +-- script in the background and surfaces failures via the +-- retry / dead-letter machinery — same shape as any other async +-- event. +-- +-- Existing routes default to `sync` so the migration is non-breaking. + +ALTER TABLE routes + ADD COLUMN dispatch_mode TEXT NOT NULL DEFAULT 'sync' + CHECK (dispatch_mode IN ('sync', 'async')); diff --git a/crates/manager-core/src/app_bootstrap.rs b/crates/manager-core/src/app_bootstrap.rs index 35f0ef5..8b11826 100644 --- a/crates/manager-core/src/app_bootstrap.rs +++ b/crates/manager-core/src/app_bootstrap.rs @@ -82,6 +82,7 @@ async fn seed_into( // Accept any method so both `curl /hello` and // `curl -d '{"name":"X"}' /hello` work out of the box. method: None, + dispatch_mode: picloud_shared::DispatchMode::Sync, }) .await?; diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index f41725a..2571f75 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -27,8 +27,8 @@ use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_orchestrator_core::{ExecutionGate, ExecutorClient}; use picloud_shared::{ - ExecResponseSummary, ExecutionId, InboxDeliveryOutcome, InboxFailureKind, InboxResolver, - InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, + ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome, InboxFailureKind, + InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, }; use rand::Rng; use uuid::Uuid; @@ -148,36 +148,36 @@ impl Dispatcher { return Ok(()); }; - // Resolve the trigger config (KV or DL) and the script. - let resolved = match row.source_kind { - OutboxSourceKind::Http => { - // Sync HTTP path lands here when commit 6 wires up - // the orchestrator -> outbox bridge. For now, this - // arm is a forward-compat stub — drop the row to - // avoid a permanent stuck state. - tracing::debug!(outbox_id = %row.id, "HTTP outbox row encountered; commit 6 wires this in"); - self.outbox - .delete(row.id) - .await - .map_err(|e| DispatcherError::Outbox(e.to_string()))?; - drop(permit); - return Ok(()); - } + // Resolve the trigger config (KV / DL) or pull the HTTP + // payload directly off the outbox row. + let (resolved, exec_req) = match row.source_kind { + OutboxSourceKind::Http => match self.build_http_request(&row).await { + Ok(pair) => pair, + Err(err) => { + tracing::warn!(outbox_id = %row.id, ?err, "http exec build failed; dropping"); + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + drop(permit); + return Ok(()); + } + }, OutboxSourceKind::Kv | OutboxSourceKind::DeadLetter => { - self.resolve_trigger(&row).await? - } - }; - - let exec_req = match self.build_exec_request(&row, &resolved).await { - Ok(req) => req, - Err(err) => { - tracing::warn!(outbox_id = %row.id, ?err, "exec request build failed; dropping row"); - self.outbox - .delete(row.id) - .await - .map_err(|e| DispatcherError::Outbox(e.to_string()))?; - drop(permit); - return Ok(()); + let resolved = self.resolve_trigger(&row).await?; + let req = match self.build_exec_request(&row, &resolved).await { + Ok(req) => req, + Err(err) => { + tracing::warn!(outbox_id = %row.id, ?err, "exec request build failed; dropping row"); + self.outbox + .delete(row.id) + .await + .map_err(|e| DispatcherError::Outbox(e.to_string()))?; + drop(permit); + return Ok(()); + } + }; + (resolved, req) } }; @@ -275,6 +275,81 @@ impl Dispatcher { }) } + /// Build an `(ResolvedTrigger, ExecRequest)` for an HTTP outbox + /// row. HTTP rows don't have a backing `triggers` row (the + /// `trigger_id` references `routes.id` instead). We pull the + /// script id off the outbox row, the request shape off the + /// payload, and synthesize a `ResolvedTrigger` with retry + /// settings irrelevant for HTTP (sync HTTP is never retried; + /// async HTTP uses default policy from `TriggerConfig`). + async fn build_http_request( + &self, + row: &OutboxRow, + ) -> Result<(ResolvedTrigger, ExecRequest), DispatcherError> { + let Some(script_id) = row.script_id else { + return Err(DispatcherError::ResolveTrigger( + "HTTP outbox row missing script_id".into(), + )); + }; + let script = self + .scripts + .get(script_id) + .await + .map_err(|e| DispatcherError::ResolveTrigger(e.to_string()))? + .ok_or_else(|| { + DispatcherError::ResolveTrigger(format!("script {script_id} not found")) + })?; + + let payload: HttpDispatchPayload = serde_json::from_value(row.payload.clone()) + .map_err(|e| DispatcherError::ResolveTrigger(format!("decode http payload: {e}")))?; + + let execution_id = ExecutionId::new(); + let req = ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id, + script_name: payload.script_name.clone(), + invocation_type: InvocationType::Http, + path: payload.path.clone(), + headers: payload.headers, + body: payload.body, + params: payload.params, + query: payload.query, + rest: payload.rest, + sandbox_overrides: script.sandbox, + app_id: row.app_id, + // HTTP outbox rows don't run as the trigger registrant — + // they run with no principal (public ingress) or the + // attached one (origin_principal forensic field is not + // promoted to execution principal in this MVP). + principal: None, + trigger_depth: row.trigger_depth, + root_execution_id: row.root_execution_id.unwrap_or(execution_id), + is_dead_letter_handler: false, + event: None, + }; + + let resolved = ResolvedTrigger { + trigger_kind: TriggerKind::Kv, // placeholder; HTTP doesn't have a kind + is_dead_letter_handler: false, + script_id, + script_source: script.source, + script_name: payload.script_name, + sandbox_overrides: script.sandbox, + // HTTP outbox rows don't carry a registered_by_principal + // — use a sentinel zero UUID since this field isn't used + // downstream for HTTP (no retries, no inbox principal). + registered_by_principal: picloud_shared::AdminUserId::from(uuid::Uuid::nil()), + // Async HTTP uses the platform default retry policy from + // TriggerConfig. Sync HTTP (reply_to.is_some) never retries + // regardless. + retry_max_attempts: self.config.retry_max_attempts, + retry_backoff: self.config.retry_backoff, + retry_base_ms: self.config.retry_base_ms, + }; + Ok((resolved, req)) + } + async fn handle_success( &self, row: &OutboxRow, diff --git a/crates/manager-core/src/outbox_repo.rs b/crates/manager-core/src/outbox_repo.rs index 42d4812..926aba6 100644 --- a/crates/manager-core/src/outbox_repo.rs +++ b/crates/manager-core/src/outbox_repo.rs @@ -5,7 +5,10 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use picloud_shared::{AdminUserId, AppId, ExecutionId, ScriptId, TriggerId}; +use picloud_shared::{ + AdminUserId, AppId, ExecutionId, NewHttpOutbox, OutboxWriter, OutboxWriterError, ScriptId, + TriggerId, +}; use sqlx::PgPool; use uuid::Uuid; @@ -195,6 +198,28 @@ impl OutboxRepo for PostgresOutboxRepo { } } +/// `OutboxWriter` implementation so orchestrator-core (which can't +/// depend on manager-core) can enqueue HTTP outbox rows through the +/// shared trait. +#[async_trait] +impl OutboxWriter for PostgresOutboxRepo { + async fn enqueue_http(&self, row: NewHttpOutbox) -> Result { + self.insert(NewOutboxRow { + app_id: row.app_id, + source_kind: OutboxSourceKind::Http, + trigger_id: Some(TriggerId::from(row.route_id)), + script_id: Some(row.script_id), + reply_to: row.reply_to, + payload: row.payload, + origin_principal: row.origin_principal, + trigger_depth: row.trigger_depth, + root_execution_id: row.root_execution_id, + }) + .await + .map_err(|e| OutboxWriterError::Backend(e.to_string())) + } +} + #[derive(sqlx::FromRow)] struct OutboxRowRaw { id: Uuid, diff --git a/crates/manager-core/src/route_admin.rs b/crates/manager-core/src/route_admin.rs index 240ab67..b6b1a5b 100644 --- a/crates/manager-core/src/route_admin.rs +++ b/crates/manager-core/src/route_admin.rs @@ -77,6 +77,12 @@ pub struct CreateRouteRequest { pub path_kind: PathKind, pub path: String, pub method: Option, + /// Per-route dispatch mode (v1.1.1). Defaults to `Sync` when + /// omitted so older clients aren't broken. `Async` routes return + /// `202 Accepted` immediately and run the script in the + /// background via the dispatcher. + #[serde(default)] + pub dispatch_mode: picloud_shared::DispatchMode, } #[derive(Debug, Deserialize)] @@ -211,6 +217,7 @@ async fn create_route( path_kind: input.path_kind, path: normalized_path, method: input.method, + dispatch_mode: input.dispatch_mode, }) .await?; refresh_table(&state).await?; @@ -370,6 +377,7 @@ pub fn compile_routes(rows: &[Route]) -> Result, pattern::Par host: pattern::parse_host(r.host_kind, &r.host, r.host_param_name.as_deref())?, path: pattern::parse_path(r.path_kind, &r.path)?, method: r.method.clone(), + dispatch_mode: r.dispatch_mode, }) }) .collect() diff --git a/crates/manager-core/src/route_repo.rs b/crates/manager-core/src/route_repo.rs index b914478..115d35d 100644 --- a/crates/manager-core/src/route_repo.rs +++ b/crates/manager-core/src/route_repo.rs @@ -4,7 +4,7 @@ //! after every write — see the route_admin module for the binding. use async_trait::async_trait; -use picloud_shared::{AppId, HostKind, PathKind, Route, ScriptId}; +use picloud_shared::{AppId, DispatchMode, HostKind, PathKind, Route, ScriptId}; use sqlx::PgPool; use uuid::Uuid; @@ -20,6 +20,7 @@ pub struct NewRoute { pub path_kind: PathKind, pub path: String, pub method: Option, + pub dispatch_mode: DispatchMode, } #[async_trait] @@ -62,7 +63,7 @@ impl RouteRepository for PostgresRouteRepository { async fn list_all(&self) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, RouteRow>( "SELECT id, app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method, created_at \ + path_kind, path, method, dispatch_mode, created_at \ FROM routes ORDER BY created_at", ) .fetch_all(&self.pool) @@ -73,7 +74,7 @@ impl RouteRepository for PostgresRouteRepository { async fn get(&self, route_id: Uuid) -> Result, ScriptRepositoryError> { let row = sqlx::query_as::<_, RouteRow>( "SELECT id, app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method, created_at \ + path_kind, path, method, dispatch_mode, created_at \ FROM routes WHERE id = $1", ) .bind(route_id) @@ -85,7 +86,7 @@ impl RouteRepository for PostgresRouteRepository { async fn list_for_app(&self, app_id: AppId) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, RouteRow>( "SELECT id, app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method, created_at \ + path_kind, path, method, dispatch_mode, created_at \ FROM routes WHERE app_id = $1 ORDER BY created_at", ) .bind(app_id.into_inner()) @@ -100,7 +101,7 @@ impl RouteRepository for PostgresRouteRepository { ) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, RouteRow>( "SELECT id, app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method, created_at \ + path_kind, path, method, dispatch_mode, created_at \ FROM routes WHERE script_id = $1 ORDER BY created_at", ) .bind(script_id.into_inner()) @@ -113,10 +114,10 @@ impl RouteRepository for PostgresRouteRepository { let res = sqlx::query_as::<_, RouteRow>( "INSERT INTO routes ( \ app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + path_kind, path, method, dispatch_mode \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ RETURNING id, app_id, script_id, host_kind, host, host_param_name, \ - path_kind, path, method, created_at", + path_kind, path, method, dispatch_mode, created_at", ) .bind(input.app_id.into_inner()) .bind(input.script_id.into_inner()) @@ -126,6 +127,7 @@ impl RouteRepository for PostgresRouteRepository { .bind(path_kind_str(input.path_kind)) .bind(&input.path) .bind(input.method.as_deref()) + .bind(input.dispatch_mode.as_str()) .fetch_one(&self.pool) .await; @@ -198,6 +200,7 @@ struct RouteRow { path_kind: String, path: String, method: Option, + dispatch_mode: String, created_at: chrono::DateTime, } @@ -221,6 +224,7 @@ impl From for Route { }, path: r.path, method: r.method, + dispatch_mode: DispatchMode::from_wire(&r.dispatch_mode).unwrap_or(DispatchMode::Sync), created_at: r.created_at, } } diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index 8c2210f..c78f49d 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -17,13 +17,15 @@ use axum::{ use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_shared::{ - AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, Principal, RequestId, - ScriptId, + AppId, DispatchMode, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, + HttpDispatchPayload, InboxFailureKind, InboxResult, NewHttpOutbox, OutboxWriter, Principal, + RequestId, ScriptId, }; use serde_json::Value as Json_; use uuid::Uuid; use crate::client::ExecutorClient; +use crate::inbox::InboxRegistry; use crate::resolver::{ResolverError, ScriptResolver}; use crate::routing::{AppDomainTable, RouteTable}; @@ -39,6 +41,14 @@ pub struct DataPlaneState { /// Routing table for user-defined paths, partitioned per app. /// Shared with the manager (admin router writes; this side reads). pub routes: Arc, + /// NATS-style inbox registry (v1.1.1). Used by sync HTTP via + /// outbox to await the dispatcher's delivery on a oneshot + /// channel. + pub inbox: Arc, + /// Writer for the universal trigger outbox (v1.1.1). The sync + /// HTTP path inserts a row with `reply_to = inbox_id`; the async + /// path inserts with `reply_to = None` and returns 202. + pub outbox: Arc, } impl Clone for DataPlaneState { @@ -49,6 +59,8 @@ impl Clone for DataPlaneState { log_sink: self.log_sink.clone(), app_domains: self.app_domains.clone(), routes: self.routes.clone(), + inbox: self.inbox.clone(), + outbox: self.outbox.clone(), } } } @@ -202,50 +214,312 @@ where Err(e) => return Err(ApiError::BadRequest(format!("body read failed: {e}"))), }; - let mut req = build_exec_request( - matched.matched.script_id, - &script.name, - &headers, - &body_bytes, - app_id, - principal, - )?; - req.path = path; - req.params = matched.params; - req.query = parse_query_string(&query_str); - req.rest = matched.rest.unwrap_or_default(); - req.sandbox_overrides = script.sandbox; + let body_json: Json_ = if body_bytes.is_empty() { + Json_::Null + } else { + serde_json::from_slice(&body_bytes) + .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? + }; + let header_map: BTreeMap = headers + .iter() + .filter_map(|(k, v)| { + v.to_str() + .ok() + .map(|s| (k.as_str().to_string(), s.to_string())) + }) + .collect(); + let query = parse_query_string(&query_str); + let rest = matched.rest.clone().unwrap_or_default(); - let request_id = req.request_id; - let request_path = req.path.clone(); - let request_headers = req.headers.clone(); - let request_body = req.body.clone(); + match matched.matched.dispatch_mode { + DispatchMode::Async => { + handle_async_route( + &state, + app_id, + matched.matched.route_id, + matched.matched.script_id, + &script.name, + path, + method, + header_map, + body_json, + matched.params, + query, + rest, + script.timeout_seconds, + principal, + ) + .await + } + DispatchMode::Sync => { + handle_sync_route( + &state, + app_id, + matched.matched.route_id, + matched.matched.script_id, + &script.name, + path, + method, + header_map, + body_json, + matched.params, + query, + rest, + script.timeout_seconds, + principal, + ) + .await + } + } +} - let timeout = Duration::from_secs(u64::from(script.timeout_seconds)); +#[allow(clippy::too_many_arguments)] +async fn handle_async_route( + state: &DataPlaneState, + app_id: AppId, + route_id: Uuid, + script_id: ScriptId, + script_name: &str, + path: String, + method: String, + headers: BTreeMap, + body: Json_, + params: BTreeMap, + query: BTreeMap, + rest: String, + timeout_seconds: u32, + principal: Option, +) -> Result +where + E: ExecutorClient + 'static, + R: ScriptResolver + 'static, +{ + let payload = HttpDispatchPayload { + script_name: script_name.to_string(), + path, + method, + headers, + body, + params, + query, + rest, + timeout_seconds, + }; + let payload_value = serde_json::to_value(&payload) + .map_err(|e| ApiError::BadRequest(format!("payload serialize: {e}")))?; + let execution_id = ExecutionId::new(); + state + .outbox + .enqueue_http(NewHttpOutbox { + app_id, + route_id, + script_id, + reply_to: None, + payload: payload_value, + origin_principal: principal.map(|p| p.user_id), + trigger_depth: 0, + root_execution_id: Some(execution_id), + }) + .await + .map_err(|e| ApiError::OutboxWrite(e.to_string()))?; + Ok(( + StatusCode::ACCEPTED, + Json(serde_json::json!({ + "accepted_at": Utc::now().to_rfc3339(), + "execution_id": execution_id.to_string(), + })), + ) + .into_response()) +} + +#[allow(clippy::too_many_arguments)] +async fn handle_sync_route( + state: &DataPlaneState, + app_id: AppId, + route_id: Uuid, + script_id: ScriptId, + script_name: &str, + path: String, + method: String, + headers: BTreeMap, + body: Json_, + params: BTreeMap, + query: BTreeMap, + rest: String, + timeout_seconds: u32, + principal: Option, +) -> Result +where + E: ExecutorClient + 'static, + R: ScriptResolver + 'static, +{ + let payload = HttpDispatchPayload { + script_name: script_name.to_string(), + path: path.clone(), + method, + headers: headers.clone(), + body: body.clone(), + params, + query, + rest, + timeout_seconds, + }; + let payload_value = serde_json::to_value(&payload) + .map_err(|e| ApiError::BadRequest(format!("payload serialize: {e}")))?; + + // Register the inbox before writing the outbox row so the + // dispatcher can't race-deliver before the orchestrator is + // listening. + let (inbox_id, rx) = state.inbox.register(); + + let execution_id = ExecutionId::new(); + let outbox_id = state + .outbox + .enqueue_http(NewHttpOutbox { + app_id, + route_id, + script_id, + reply_to: Some(inbox_id), + payload: payload_value, + origin_principal: principal.map(|p| p.user_id), + trigger_depth: 0, + root_execution_id: Some(execution_id), + }) + .await + .map_err(|e| { + // Failed outbox write — abandon the inbox so the dispatcher + // can never deliver to a stale entry. + state.inbox.cancel(inbox_id); + ApiError::OutboxWrite(e.to_string()) + })?; + + // Wait for the dispatcher's delivery. Outer timeout = script + // wall-clock + a small buffer to cover dispatcher latency. + let wait_budget = Duration::from_secs(u64::from(timeout_seconds)) + Duration::from_secs(2); + let request_id = RequestId::new(); let started = Utc::now(); - let outcome = state.executor.execute(&script.source, req, timeout).await; + let result = tokio::time::timeout(wait_budget, rx).await; let finished = Utc::now(); - let log = build_execution_log( - script.app_id, - matched.matched.script_id, + // Tear down the receiver if it's still alive. `inbox.cancel` is a + // no-op when the dispatcher already delivered. + let _ = state.inbox.cancel(inbox_id); + + let response = match result { + Ok(Ok(InboxResult::Success(summary))) => http_response_from_summary(summary), + Ok(Ok(InboxResult::Failure { kind, message })) => failure_to_response(kind, &message), + Ok(Err(_recv)) => { + // Channel was closed without a value — dispatcher dropped + // the sender. Treat as platform failure. + tracing::warn!( + outbox_id = %outbox_id, + "inbox channel closed without delivery" + ); + failure_to_response( + InboxFailureKind::Platform, + "dispatcher closed inbox without delivery", + ) + } + Err(_elapsed) => { + // Outer timeout — either the script was too slow or the + // dispatcher is wedged. Returns 504 by default. + failure_to_response(InboxFailureKind::Timeout, "request timed out") + } + }; + + let log = build_inbox_execution_log( + app_id, + script_id, request_id, - request_path, - request_headers, - request_body, - &outcome, + path, + headers, + body, + response.status().as_u16(), started, finished, ); if let Err(e) = state.log_sink.record(log).await { tracing::warn!( error = %e, - script_id = %matched.matched.script_id, + %script_id, "failed to persist execution log" ); } - Ok(exec_response_to_http(outcome?)) + Ok(response) +} + +fn http_response_from_summary(summary: picloud_shared::ExecResponseSummary) -> Response { + let status = + StatusCode::from_u16(summary.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let mut http_headers = HeaderMap::new(); + for (k, v) in summary.headers { + if let (Ok(name), Ok(value)) = (k.parse::(), v.parse::()) { + http_headers.insert(name, value); + } + } + http_headers + .entry(axum::http::header::CONTENT_TYPE) + .or_insert_with(|| HeaderValue::from_static("application/json")); + (status, http_headers, Json(summary.body)).into_response() +} + +/// Map `InboxFailureKind` onto the design-notes §3 status-code table. +fn failure_to_response(kind: InboxFailureKind, message: &str) -> Response { + let status = match kind { + InboxFailureKind::Validation => StatusCode::UNPROCESSABLE_ENTITY, + InboxFailureKind::Runtime => StatusCode::BAD_GATEWAY, + InboxFailureKind::Overloaded => StatusCode::SERVICE_UNAVAILABLE, + InboxFailureKind::Timeout => StatusCode::GATEWAY_TIMEOUT, + InboxFailureKind::OperationBudget => StatusCode::INSUFFICIENT_STORAGE, + InboxFailureKind::Platform => StatusCode::INTERNAL_SERVER_ERROR, + }; + let body = Json(serde_json::json!({ "error": message })); + if matches!(kind, InboxFailureKind::Overloaded) { + return (status, [(axum::http::header::RETRY_AFTER, "1")], body).into_response(); + } + (status, body).into_response() +} + +#[allow(clippy::too_many_arguments)] +fn build_inbox_execution_log( + app_id: AppId, + script_id: ScriptId, + request_id: RequestId, + request_path: String, + request_headers: BTreeMap, + request_body: Json_, + response_code: u16, + started: chrono::DateTime, + finished: chrono::DateTime, +) -> ExecutionLog { + let duration_ms = u64::try_from( + finished + .signed_duration_since(started) + .num_milliseconds() + .max(0), + ) + .unwrap_or(0); + let status = if (200..400).contains(&response_code) { + ExecutionStatus::Success + } else { + ExecutionStatus::Error + }; + ExecutionLog { + id: Uuid::new_v4(), + app_id, + script_id, + request_id, + request_path, + request_headers, + request_body, + response_code: Some(response_code), + response_body: None, + script_logs: Json_::Array(vec![]), + duration_ms, + status, + created_at: started, + } } fn parse_query_string(s: &str) -> BTreeMap { @@ -421,6 +695,9 @@ pub enum ApiError { #[error("execution error: {0}")] Exec(#[from] ExecError), + + #[error("outbox write failed: {0}")] + OutboxWrite(String), } impl IntoResponse for ApiError { @@ -444,6 +721,13 @@ impl IntoResponse for ApiError { let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()), + E::OutboxWrite(e) => { + tracing::error!(error = %e, "outbox write failed"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "internal error".to_string(), + ) + } E::Resolver(e) => { tracing::error!(error = %e, "resolver failure"); ( diff --git a/crates/orchestrator-core/src/inbox.rs b/crates/orchestrator-core/src/inbox.rs new file mode 100644 index 0000000..15a7ef8 --- /dev/null +++ b/crates/orchestrator-core/src/inbox.rs @@ -0,0 +1,139 @@ +//! In-process `InboxRegistry` — the NATS-style request/reply +//! implementation for sync HTTP via the trigger outbox (design notes +//! §3). +//! +//! Workflow: +//! 1. Orchestrator allocates an `inbox_id`, calls +//! `registry.register()` to get a oneshot receiver. +//! 2. Orchestrator writes an outbox row with `reply_to = inbox_id`. +//! 3. Dispatcher picks the row, runs the script, calls +//! `registry.deliver(inbox_id, result)`. +//! 4. Orchestrator's `.await` on the receiver fires; it maps the +//! `InboxResult` back into an HTTP response. +//! +//! `Delivered` means the receiver was alive when delivery hit. If the +//! orchestrator timed out and dropped the receiver before delivery, +//! `Abandoned` comes back — the dispatcher writes an +//! `abandoned_executions` row (design notes §3 #9). +//! +//! Cluster mode (v1.3+) swaps this for a Postgres `LISTEN/NOTIFY`- +//! based resolver; the `InboxResolver` trait stays the same. + +use std::collections::HashMap; +use std::sync::Mutex; + +use async_trait::async_trait; +use picloud_shared::{InboxDeliveryOutcome, InboxResolver, InboxResult}; +use tokio::sync::oneshot; +use uuid::Uuid; + +pub struct InboxRegistry { + inner: Mutex>>, +} + +impl InboxRegistry { + #[must_use] + pub fn new() -> Self { + Self { + inner: Mutex::new(HashMap::new()), + } + } + + /// Allocate a new inbox id and register the sender side. The + /// caller awaits the returned `Receiver`; the dispatcher delivers + /// the outcome via `deliver(id, …)`. + #[must_use] + pub fn register(&self) -> (Uuid, oneshot::Receiver) { + let id = Uuid::new_v4(); + let (tx, rx) = oneshot::channel(); + if let Ok(mut g) = self.inner.lock() { + g.insert(id, tx); + } + (id, rx) + } + + /// Cancel a pending inbox (orchestrator timed out and gave up). + /// Drops the sender so any future `deliver` returns `Abandoned`. + /// Returns `true` if the receiver was still registered. + pub fn cancel(&self, id: Uuid) -> bool { + self.inner + .lock() + .map(|mut g| g.remove(&id).is_some()) + .unwrap_or(false) + } +} + +impl Default for InboxRegistry { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl InboxResolver for InboxRegistry { + async fn deliver(&self, inbox_id: Uuid, result: InboxResult) -> InboxDeliveryOutcome { + let Ok(mut g) = self.inner.lock() else { + return InboxDeliveryOutcome::Abandoned; + }; + let Some(tx) = g.remove(&inbox_id) else { + return InboxDeliveryOutcome::Abandoned; + }; + // `send` returns Err iff the receiver was dropped — exactly + // the abandoned-execution case. + if tx.send(result).is_err() { + InboxDeliveryOutcome::Abandoned + } else { + InboxDeliveryOutcome::Delivered + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use picloud_shared::ExecResponseSummary; + use std::collections::BTreeMap; + + fn ok_result() -> InboxResult { + InboxResult::Success(ExecResponseSummary { + status_code: 200, + headers: BTreeMap::new(), + body: serde_json::json!({ "ok": true }), + }) + } + + #[tokio::test] + async fn register_then_deliver_resolves_receiver() { + let reg = InboxRegistry::new(); + let (id, rx) = reg.register(); + let outcome = reg.deliver(id, ok_result()).await; + assert_eq!(outcome, InboxDeliveryOutcome::Delivered); + let received = rx.await.expect("receiver should fire"); + assert!(matches!(received, InboxResult::Success(_))); + } + + #[tokio::test] + async fn deliver_to_unknown_id_is_abandoned() { + let reg = InboxRegistry::new(); + let outcome = reg.deliver(Uuid::new_v4(), ok_result()).await; + assert_eq!(outcome, InboxDeliveryOutcome::Abandoned); + } + + #[tokio::test] + async fn dropping_receiver_then_delivering_is_abandoned() { + let reg = InboxRegistry::new(); + let (id, rx) = reg.register(); + drop(rx); + let outcome = reg.deliver(id, ok_result()).await; + assert_eq!(outcome, InboxDeliveryOutcome::Abandoned); + } + + #[tokio::test] + async fn cancel_removes_sender() { + let reg = InboxRegistry::new(); + let (id, _rx) = reg.register(); + assert!(reg.cancel(id)); + let outcome = reg.deliver(id, ok_result()).await; + assert_eq!(outcome, InboxDeliveryOutcome::Abandoned); + } +} diff --git a/crates/orchestrator-core/src/lib.rs b/crates/orchestrator-core/src/lib.rs index 11c0a34..1576e19 100644 --- a/crates/orchestrator-core/src/lib.rs +++ b/crates/orchestrator-core/src/lib.rs @@ -11,10 +11,12 @@ pub mod api; pub mod client; pub mod gate; +pub mod inbox; pub mod resolver; pub mod routing; pub use api::{data_plane_router, user_routes_router, DataPlaneState}; pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient}; pub use gate::{AcquireError, ExecutionGate}; +pub use inbox::InboxRegistry; pub use resolver::{ResolverError, ScriptResolver}; diff --git a/crates/orchestrator-core/src/routing/matcher.rs b/crates/orchestrator-core/src/routing/matcher.rs index 22409f2..15d7fb1 100644 --- a/crates/orchestrator-core/src/routing/matcher.rs +++ b/crates/orchestrator-core/src/routing/matcher.rs @@ -38,6 +38,11 @@ pub struct MatchResult { pub struct Matched { pub route_id: uuid::Uuid, pub script_id: picloud_shared::ScriptId, + /// Per-route dispatch mode (v1.1.1). Forwarded to the + /// orchestrator's HTTP handler so it can pick the sync or async + /// path. Defaults to `Sync` for older routes that predate the + /// column. + pub dispatch_mode: picloud_shared::DispatchMode, } /// A single route ready for matching. `app_id` is carried so the @@ -51,6 +56,7 @@ pub struct CompiledRoute { pub host: HostPattern, pub path: PathPattern, pub method: Option, + pub dispatch_mode: picloud_shared::DispatchMode, } /// Find the best matching route for the request. Returns `None` if no @@ -180,6 +186,7 @@ fn match_within_bucket( matched: Matched { route_id: route.route_id, script_id: route.script_id, + dispatch_mode: route.dispatch_mode, }, params: BTreeMap::new(), rest: None, @@ -230,6 +237,7 @@ fn match_within_bucket( matched: Matched { route_id: route.route_id, script_id: route.script_id, + dispatch_mode: route.dispatch_mode, }, params, rest, @@ -312,6 +320,7 @@ mod tests { host, path: parse_path(path_kind, raw).unwrap(), method: None, + dispatch_mode: picloud_shared::DispatchMode::Sync, } } diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 8122513..e162ce5 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -26,10 +26,11 @@ use picloud_manager_core::{ }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ - data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient, + data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, InboxRegistry, + LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, InboxResolver, KvService, NoopDeadLetterService, NoopInboxResolver, + ExecutionLogSink, InboxResolver, KvService, NoopDeadLetterService, OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, }; @@ -106,7 +107,14 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { // Triggers framework storage. The outbox event emitter routes // KV mutations into the outbox; the dispatcher fans them out. let trigger_repo: Arc = Arc::new(PostgresTriggerRepo::new(pool.clone())); - let outbox_repo: Arc = Arc::new(PostgresOutboxRepo::new(pool.clone())); + // PostgresOutboxRepo implements both `OutboxRepo` (the dispatcher + // surface) and `OutboxWriter` (the orchestrator surface). Construct + // the concrete Arc once, clone it into each trait view — same + // allocation, two vtables (mirrors how `members_concrete` above is + // used as both `AppMembersRepository` and `AuthzRepo`). + let outbox_concrete = Arc::new(PostgresOutboxRepo::new(pool.clone())); + let outbox_repo: Arc = outbox_concrete.clone(); + let outbox_writer: Arc = outbox_concrete; let dl_repo: Arc = Arc::new(PostgresDeadLetterRepo::new(pool.clone())); let abandoned_repo: Arc = Arc::new(PostgresAbandonedRepo::new(pool.clone())); let trigger_config = TriggerConfig::from_env(); @@ -159,13 +167,16 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { // Dispatcher — single tokio task that polls the outbox and routes // due rows to the executor. Shares the `ExecutionGate` with sync - // HTTP per design notes §2 (one cap for everything). NoopInboxResolver - // until commit 6 wires the real in-process inbox registry. + // HTTP per design notes §2 (one cap for everything). let dispatcher_script_repo: Arc = Arc::new(PostgresScriptRepoHandle(script_repo.clone())); let principals: Arc = Arc::new(AdminPrincipalResolver::new(auth.users.clone())); - let inbox: Arc = Arc::new(NoopInboxResolver); + // The InboxRegistry is constructed once and shared between the + // orchestrator (registers receivers, awaits) and the dispatcher + // (delivers results). Two Arc views on the same allocation. + let inbox_registry = Arc::new(InboxRegistry::new()); + let inbox_resolver: Arc = inbox_registry.clone(); Dispatcher { outbox: outbox_repo.clone(), triggers: trigger_repo.clone(), @@ -175,7 +186,7 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { principals, executor: executor.clone(), gate, - inbox, + inbox: inbox_resolver, config: trigger_config, instance_id: format!("picloud-{}", std::process::id()), } @@ -202,6 +213,8 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { log_sink, app_domains: app_domain_table.clone(), routes: route_table, + inbox: inbox_registry, + outbox: outbox_writer, }; // Silence unused-import warnings for repos handed to the // dispatcher in this commit; commit 8 wires them into the diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index edb24fa..e18fa80 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -15,6 +15,7 @@ pub mod ids; pub mod inbox; pub mod kv; pub mod log_sink; +pub mod outbox_writer; pub mod route; pub mod sandbox; pub mod script; @@ -37,7 +38,8 @@ pub use inbox::{ }; pub use kv::{KvError, KvListPage, KvService, NoopKvService}; pub use log_sink::{ExecutionLogSink, LogSinkError}; -pub use route::{HostKind, PathKind, Route}; +pub use outbox_writer::{HttpDispatchPayload, NewHttpOutbox, OutboxWriter, OutboxWriterError}; +pub use route::{DispatchMode, HostKind, PathKind, Route}; pub use sandbox::ScriptSandbox; pub use script::Script; pub use sdk_cx::SdkCallCx; diff --git a/crates/shared/src/outbox_writer.rs b/crates/shared/src/outbox_writer.rs new file mode 100644 index 0000000..84ac7fc --- /dev/null +++ b/crates/shared/src/outbox_writer.rs @@ -0,0 +1,72 @@ +//! `OutboxWriter` — minimal trait the orchestrator-core sync-HTTP path +//! uses to enqueue rows into the universal trigger outbox. The +//! manager-core `PostgresOutboxRepo` implements this in addition to +//! its richer `OutboxRepo` surface; defining it here lets +//! orchestrator-core depend on the trait without pulling in +//! manager-core (which would invert the dependency arrow). + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +use crate::{AdminUserId, AppId, ExecutionId, ScriptId}; + +/// What the orchestrator hands to the outbox when it ingests an HTTP +/// request. Carries enough for the dispatcher to reconstruct the +/// `ExecRequest` end-to-end. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NewHttpOutbox { + pub app_id: AppId, + /// `routes.id` of the matched route. Discriminated against + /// `triggers.id` by `source_kind = 'http'` on the outbox row. + pub route_id: Uuid, + /// Pre-resolved script so the dispatcher doesn't re-look it up. + pub script_id: ScriptId, + /// `Some(inbox_id)` for sync HTTP (the orchestrator awaits a + /// channel keyed on this id). `None` for `dispatch_mode = async` + /// — dispatcher fires-and-forgets, no reply path. + pub reply_to: Option, + /// Serialized `HttpDispatchPayload` (defined below) — everything + /// the dispatcher needs to reconstruct an `ExecRequest`. + pub payload: serde_json::Value, + /// The principal that ingressed the HTTP request (Some when + /// authenticated, None for public). Forensic only; the script + /// executes as the route's app principal model, not this. + pub origin_principal: Option, + /// `0` for direct HTTP ingress; the dispatcher will increment + /// for any further fan-out triggered by the script. + pub trigger_depth: u32, + pub root_execution_id: Option, +} + +/// The shape the orchestrator serializes into `NewHttpOutbox.payload` +/// (the JSONB column). Mirrored on the dispatcher side so it can +/// rebuild an `ExecRequest`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpDispatchPayload { + pub script_name: String, + pub path: String, + pub method: String, + pub headers: std::collections::BTreeMap, + pub body: serde_json::Value, + pub params: std::collections::BTreeMap, + pub query: std::collections::BTreeMap, + pub rest: String, + pub timeout_seconds: u32, +} + +#[async_trait] +pub trait OutboxWriter: Send + Sync { + /// Insert a sync- or async-HTTP outbox row. Returns the row's id + /// — the orchestrator stores it locally for forensics and to + /// correlate `abandoned_executions` rows when the dispatcher's + /// inbox delivery fails. + async fn enqueue_http(&self, row: NewHttpOutbox) -> Result; +} + +#[derive(Debug, Error)] +pub enum OutboxWriterError { + #[error("outbox write failed: {0}")] + Backend(String), +} diff --git a/crates/shared/src/route.rs b/crates/shared/src/route.rs index 477dd0d..ee0e751 100644 --- a/crates/shared/src/route.rs +++ b/crates/shared/src/route.rs @@ -37,6 +37,38 @@ pub enum PathKind { Param, } +/// Per-route dispatch mode (v1.1.1). `Sync` = orchestrator awaits the +/// executor and returns the response in the same HTTP request. `Async` +/// = orchestrator writes the request to the trigger outbox, returns +/// `202 Accepted` immediately, and the dispatcher runs the script in +/// the background (with retries + dead-letter). +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all = "lowercase")] +pub enum DispatchMode { + #[default] + Sync, + Async, +} + +impl DispatchMode { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Sync => "sync", + Self::Async => "async", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "sync" => Some(Self::Sync), + "async" => Some(Self::Async), + _ => None, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Route { pub id: Uuid, @@ -60,5 +92,12 @@ pub struct Route { /// `None` = any method. pub method: Option, + /// v1.1.1: per-route dispatch mode. `Sync` (default) → orchestrator + /// awaits the executor inline. `Async` → orchestrator writes to + /// the outbox + returns `202 Accepted`; dispatcher fires the + /// script in the background with retries. + #[serde(default)] + pub dispatch_mode: DispatchMode, + pub created_at: DateTime, }