//! Data-plane HTTP surface. Mounted by the `picloud` all-in-one binary //! under `/api` (so the path becomes `/api/execute/:id`) and by the //! future split `picloud-orchestrator` binary at its own root. use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; use axum::{ body::Bytes, extract::{Path, Request, State}, http::{HeaderMap, HeaderName, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::post, Extension, Json, Router, }; use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_shared::{ AppId, DispatchMode, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, HttpDispatchPayload, InboxFailureKind, InboxResult, NewHttpOutbox, OutboxWriter, Principal, RequestId, ScriptId, }; use serde_json::Value as Json_; use uuid::Uuid; use crate::client::ExecutorClient; use crate::inbox::InboxRegistry; use crate::resolver::{ResolverError, ScriptResolver}; use crate::routing::{AppDomainTable, RouteTable}; /// State shared by data-plane handlers. pub struct DataPlaneState { pub executor: Arc, pub resolver: Arc, pub log_sink: Arc, /// Host → app_id resolver. Run before `routes` to filter to the /// owning app's slice. Shared with the manager (writes invalidate /// the cache by replacing the table). pub app_domains: Arc, /// Routing table for user-defined paths, partitioned per app. /// Shared with the manager (admin router writes; this side reads). pub routes: Arc, /// NATS-style inbox registry (v1.1.1). Used by sync HTTP via /// outbox to await the dispatcher's delivery on a oneshot /// channel. pub inbox: Arc, /// Writer for the universal trigger outbox (v1.1.1). The sync /// HTTP path inserts a row with `reply_to = inbox_id`; the async /// path inserts with `reply_to = None` and returns 202. pub outbox: Arc, } impl Clone for DataPlaneState { fn clone(&self) -> Self { Self { executor: self.executor.clone(), resolver: self.resolver.clone(), log_sink: self.log_sink.clone(), app_domains: self.app_domains.clone(), routes: self.routes.clone(), inbox: self.inbox.clone(), outbox: self.outbox.clone(), } } } /// Build the data-plane router. Handles `POST /execute/:id` — the /// always-available ID-based bypass. /// /// Handlers expect an `Extension>` to be attached by /// upstream middleware (`manager-core::attach_principal_if_present`); /// requests without that extension panic at extraction time. The /// picloud binary wires this in `build_app`. pub fn data_plane_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { Router::new() .route("/execute/{id}", post(execute_by_id::)) .with_state(state) } /// Build a router that handles ALL paths via the user-defined routing /// table. Intended to be merged into the picloud app router as a /// fallback (after the system routes are mounted). /// /// Same middleware expectation as `data_plane_router` — wrap with /// `attach_principal_if_present` so handlers can extract /// `Extension>`. pub fn user_routes_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { Router::new() .fallback(user_route_handler::) .with_state(state) } // ---------------------------------------------------------------------------- // Handlers // ---------------------------------------------------------------------------- async fn execute_by_id( State(state): State>, Path(id): Path, Extension(principal): Extension>, headers: HeaderMap, body: Bytes, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let script = state .resolver .resolve(id) .await? .ok_or(ApiError::NotFound(id))?; let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, principal)?; req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); let request_headers = req.headers.clone(); let request_body = req.body.clone(); let timeout = Duration::from_secs(u64::from(script.timeout_seconds)); let started = Utc::now(); let outcome = state.executor.execute(&script.source, req, timeout).await; let finished = Utc::now(); // Build and dispatch the audit log regardless of outcome. We await // the sink — recording the trail is part of correctness for an // audit-visible platform — but a sink failure must not mask the // user-facing result, so we only log a warning if it fails. let log = build_execution_log( script.app_id, id, request_id, request_path, request_headers, request_body, &outcome, started, finished, ); if let Err(e) = state.log_sink.record(log).await { tracing::warn!(error = %e, script_id = %id, "failed to persist execution log"); } Ok(exec_response_to_http(outcome?)) } async fn user_route_handler( State(state): State>, Extension(principal): Extension>, request: Request, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let method = request.method().as_str().to_string(); let uri = request.uri().clone(); let path = uri.path().to_string(); let query_str = uri.query().unwrap_or("").to_string(); let host = request .headers() .get("host") .and_then(|h| h.to_str().ok()) .unwrap_or("") .to_string(); let headers = request.headers().clone(); // Two-phase dispatch (blueprint §11.5): first resolve Host → app_id, // then run the existing matcher on that app's slice. No app claims // this host → flat 404; the path doesn't get the chance to fire. let Some(app_id) = state.app_domains.resolve_app(&host) else { return Ok(( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("no app claims host {host:?}") })), ) .into_response()); }; let Some(matched) = state .routes .match_request_for_app(app_id, &host, &method, &path) else { return Ok(( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("no route matches {method} {path}") })), ) .into_response()); }; let script = state .resolver .resolve(matched.matched.script_id) .await? .ok_or(ApiError::NotFound(matched.matched.script_id))?; // Drain the body now that we know we'll execute. 10 MiB cap matches // the conservative default response/request size in the blueprint. let body_bytes = match axum::body::to_bytes(request.into_body(), 10 * 1024 * 1024).await { Ok(b) => b, Err(e) => return Err(ApiError::BadRequest(format!("body read failed: {e}"))), }; let body_json: Json_ = if body_bytes.is_empty() { Json_::Null } else { serde_json::from_slice(&body_bytes) .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? }; let header_map: BTreeMap = headers .iter() .filter_map(|(k, v)| { v.to_str() .ok() .map(|s| (k.as_str().to_string(), s.to_string())) }) .collect(); let query = parse_query_string(&query_str); let rest = matched.rest.clone().unwrap_or_default(); match matched.matched.dispatch_mode { DispatchMode::Async => { handle_async_route( &state, app_id, matched.matched.route_id, matched.matched.script_id, &script.name, path, method, header_map, body_json, matched.params, query, rest, script.timeout_seconds, principal, ) .await } DispatchMode::Sync => { handle_sync_route( &state, app_id, matched.matched.route_id, matched.matched.script_id, &script.name, path, method, header_map, body_json, matched.params, query, rest, script.timeout_seconds, principal, ) .await } } } #[allow(clippy::too_many_arguments)] async fn handle_async_route( state: &DataPlaneState, app_id: AppId, route_id: Uuid, script_id: ScriptId, script_name: &str, path: String, method: String, headers: BTreeMap, body: Json_, params: BTreeMap, query: BTreeMap, rest: String, timeout_seconds: u32, principal: Option, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let payload = HttpDispatchPayload { script_name: script_name.to_string(), path, method, headers, body, params, query, rest, timeout_seconds, }; let payload_value = serde_json::to_value(&payload) .map_err(|e| ApiError::BadRequest(format!("payload serialize: {e}")))?; let execution_id = ExecutionId::new(); state .outbox .enqueue_http(NewHttpOutbox { app_id, route_id, script_id, reply_to: None, payload: payload_value, origin_principal: principal.map(|p| p.user_id), trigger_depth: 0, root_execution_id: Some(execution_id), }) .await .map_err(|e| ApiError::OutboxWrite(e.to_string()))?; Ok(( StatusCode::ACCEPTED, Json(serde_json::json!({ "accepted_at": Utc::now().to_rfc3339(), "execution_id": execution_id.to_string(), })), ) .into_response()) } #[allow(clippy::too_many_arguments)] async fn handle_sync_route( state: &DataPlaneState, app_id: AppId, route_id: Uuid, script_id: ScriptId, script_name: &str, path: String, method: String, headers: BTreeMap, body: Json_, params: BTreeMap, query: BTreeMap, rest: String, timeout_seconds: u32, principal: Option, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let payload = HttpDispatchPayload { script_name: script_name.to_string(), path: path.clone(), method, headers: headers.clone(), body: body.clone(), params, query, rest, timeout_seconds, }; let payload_value = serde_json::to_value(&payload) .map_err(|e| ApiError::BadRequest(format!("payload serialize: {e}")))?; // Register the inbox before writing the outbox row so the // dispatcher can't race-deliver before the orchestrator is // listening. let (inbox_id, rx) = state.inbox.register(); let execution_id = ExecutionId::new(); let outbox_id = state .outbox .enqueue_http(NewHttpOutbox { app_id, route_id, script_id, reply_to: Some(inbox_id), payload: payload_value, origin_principal: principal.map(|p| p.user_id), trigger_depth: 0, root_execution_id: Some(execution_id), }) .await .map_err(|e| { // Failed outbox write — abandon the inbox so the dispatcher // can never deliver to a stale entry. state.inbox.cancel(inbox_id); ApiError::OutboxWrite(e.to_string()) })?; // Wait for the dispatcher's delivery. Outer timeout = script // wall-clock + a small buffer to cover dispatcher latency. let wait_budget = Duration::from_secs(u64::from(timeout_seconds)) + Duration::from_secs(2); let request_id = RequestId::new(); let started = Utc::now(); let result = tokio::time::timeout(wait_budget, rx).await; let finished = Utc::now(); // Tear down the receiver if it's still alive. `inbox.cancel` is a // no-op when the dispatcher already delivered. let _ = state.inbox.cancel(inbox_id); let response = match result { Ok(Ok(InboxResult::Success(summary))) => http_response_from_summary(summary), Ok(Ok(InboxResult::Failure { kind, message })) => failure_to_response(kind, &message), Ok(Err(_recv)) => { // Channel was closed without a value — dispatcher dropped // the sender. Treat as platform failure. tracing::warn!( outbox_id = %outbox_id, "inbox channel closed without delivery" ); failure_to_response( InboxFailureKind::Platform, "dispatcher closed inbox without delivery", ) } Err(_elapsed) => { // Outer timeout — either the script was too slow or the // dispatcher is wedged. Returns 504 by default. failure_to_response(InboxFailureKind::Timeout, "request timed out") } }; let log = build_inbox_execution_log( app_id, script_id, request_id, path, headers, body, response.status().as_u16(), started, finished, ); if let Err(e) = state.log_sink.record(log).await { tracing::warn!( error = %e, %script_id, "failed to persist execution log" ); } Ok(response) } fn http_response_from_summary(summary: picloud_shared::ExecResponseSummary) -> Response { let status = StatusCode::from_u16(summary.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let mut http_headers = HeaderMap::new(); for (k, v) in summary.headers { if let (Ok(name), Ok(value)) = (k.parse::(), v.parse::()) { http_headers.insert(name, value); } } http_headers .entry(axum::http::header::CONTENT_TYPE) .or_insert_with(|| HeaderValue::from_static("application/json")); (status, http_headers, Json(summary.body)).into_response() } /// Map `InboxFailureKind` onto the design-notes §3 status-code table. fn failure_to_response(kind: InboxFailureKind, message: &str) -> Response { let status = match kind { InboxFailureKind::Validation => StatusCode::UNPROCESSABLE_ENTITY, InboxFailureKind::Runtime => StatusCode::BAD_GATEWAY, InboxFailureKind::Overloaded => StatusCode::SERVICE_UNAVAILABLE, InboxFailureKind::Timeout => StatusCode::GATEWAY_TIMEOUT, InboxFailureKind::OperationBudget => StatusCode::INSUFFICIENT_STORAGE, InboxFailureKind::Platform => StatusCode::INTERNAL_SERVER_ERROR, }; let body = Json(serde_json::json!({ "error": message })); if matches!(kind, InboxFailureKind::Overloaded) { return (status, [(axum::http::header::RETRY_AFTER, "1")], body).into_response(); } (status, body).into_response() } #[allow(clippy::too_many_arguments)] fn build_inbox_execution_log( app_id: AppId, script_id: ScriptId, request_id: RequestId, request_path: String, request_headers: BTreeMap, request_body: Json_, response_code: u16, started: chrono::DateTime, finished: chrono::DateTime, ) -> ExecutionLog { let duration_ms = u64::try_from( finished .signed_duration_since(started) .num_milliseconds() .max(0), ) .unwrap_or(0); let status = if (200..400).contains(&response_code) { ExecutionStatus::Success } else { ExecutionStatus::Error }; ExecutionLog { id: Uuid::new_v4(), app_id, script_id, request_id, request_path, request_headers, request_body, response_code: Some(response_code), response_body: None, script_logs: Json_::Array(vec![]), duration_ms, status, created_at: started, } } fn parse_query_string(s: &str) -> BTreeMap { let mut out = BTreeMap::new(); if s.is_empty() { return out; } for pair in s.split('&') { let (k, v) = match pair.split_once('=') { Some((k, v)) => (k, v), None => (pair, ""), }; let key = urlencoding::decode(k) .map(std::borrow::Cow::into_owned) .unwrap_or_default(); let val = urlencoding::decode(v) .map(std::borrow::Cow::into_owned) .unwrap_or_default(); out.insert(key, val); } out } // ---------------------------------------------------------------------------- // Marshalling // ---------------------------------------------------------------------------- fn build_exec_request( id: ScriptId, name: &str, headers: &HeaderMap, body: &Bytes, app_id: AppId, principal: Option, ) -> Result { let mut hmap = BTreeMap::new(); for (k, v) in headers { if let Ok(s) = v.to_str() { hmap.insert(k.as_str().to_string(), s.to_string()); } } let body_json: Json_ = if body.is_empty() { Json_::Null } else { serde_json::from_slice(body) .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? }; let execution_id = ExecutionId::new(); Ok(ExecRequest { execution_id, request_id: RequestId::new(), script_id: id, script_name: name.to_string(), invocation_type: InvocationType::Http, path: format!("/api/execute/{id}"), headers: hmap, body: body_json, params: BTreeMap::new(), query: BTreeMap::new(), rest: String::new(), // Overwritten by the handler after the script is resolved. sandbox_overrides: picloud_shared::ScriptSandbox::default(), app_id, principal, // Direct invocations are at depth 0 with a self-referential // root. The triggers framework (v1.1.1) increments depth and // preserves the original root for chained executions. trigger_depth: 0, root_execution_id: execution_id, // Direct invocations are never DL handlers — that flag is only // set by the dispatcher when it picks a dead_letter trigger row. is_dead_letter_handler: false, // No originating trigger event for direct ingress. event: None, }) } fn exec_response_to_http(resp: ExecResponse) -> Response { let status = StatusCode::from_u16(resp.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let mut http_headers = HeaderMap::new(); for (k, v) in resp.headers { if let (Ok(name), Ok(value)) = (k.parse::(), v.parse::()) { http_headers.insert(name, value); } } http_headers .entry(axum::http::header::CONTENT_TYPE) .or_insert_with(|| HeaderValue::from_static("application/json")); (status, http_headers, Json(resp.body)).into_response() } #[allow(clippy::too_many_arguments)] fn build_execution_log( app_id: AppId, script_id: ScriptId, request_id: RequestId, request_path: String, request_headers: BTreeMap, request_body: Json_, outcome: &Result, started: chrono::DateTime, finished: chrono::DateTime, ) -> ExecutionLog { let duration_ms = u64::try_from( finished .signed_duration_since(started) .num_milliseconds() .max(0), ) .unwrap_or(0); let (status, response_code, response_body, script_logs) = match outcome { Ok(resp) => { let logs = serde_json::to_value(&resp.logs).unwrap_or(Json_::Array(vec![])); ( ExecutionStatus::Success, Some(resp.status_code), Some(resp.body.clone()), logs, ) } Err(e) => { let status = match e { ExecError::Timeout(_) => ExecutionStatus::Timeout, ExecError::OperationBudgetExceeded => ExecutionStatus::BudgetExceeded, _ => ExecutionStatus::Error, }; ( status, None, Some(serde_json::json!({ "error": e.to_string() })), Json_::Array(vec![]), ) } }; ExecutionLog { id: Uuid::new_v4(), app_id, script_id, request_id, request_path, request_headers, request_body, response_code, response_body, script_logs, duration_ms, status, created_at: started, } } // ---------------------------------------------------------------------------- // Errors // ---------------------------------------------------------------------------- #[derive(Debug, thiserror::Error)] pub enum ApiError { #[error("script not found: {0}")] NotFound(ScriptId), #[error("bad request: {0}")] BadRequest(String), #[error("resolver error: {0}")] Resolver(#[from] ResolverError), #[error("execution error: {0}")] Exec(#[from] ExecError), #[error("outbox write failed: {0}")] OutboxWrite(String), } impl IntoResponse for ApiError { fn into_response(self) -> Response { // Overloaded is the only variant that needs to attach an HTTP // header (Retry-After), so it short-circuits the (status, body) // reduction below. Axum's tuple builder makes per-arm header // injection awkward otherwise. use ApiError as E; if let E::Exec(ExecError::Overloaded { retry_after_secs }) = &self { let retry = retry_after_secs.to_string(); let body = Json(serde_json::json!({ "error": self.to_string() })); return ( StatusCode::SERVICE_UNAVAILABLE, [(axum::http::header::RETRY_AFTER, retry)], body, ) .into_response(); } let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()), E::OutboxWrite(e) => { tracing::error!(error = %e, "outbox write failed"); ( StatusCode::INTERNAL_SERVER_ERROR, "internal error".to_string(), ) } E::Resolver(e) => { tracing::error!(error = %e, "resolver failure"); ( StatusCode::INTERNAL_SERVER_ERROR, "internal error".to_string(), ) } E::Exec(e) => match e { ExecError::Parse(_) | ExecError::InvalidResponse(_) => { (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()) } ExecError::Timeout(_) => (StatusCode::GATEWAY_TIMEOUT, e.to_string()), ExecError::OperationBudgetExceeded => { (StatusCode::INSUFFICIENT_STORAGE, e.to_string()) } ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()), ExecError::Overloaded { .. } => unreachable!("handled above"), }, }; (status, Json(serde_json::json!({ "error": message }))).into_response() } }