//! Data-plane HTTP surface. Mounted by the `picloud` all-in-one binary //! under `/api` (so the path becomes `/api/execute/:id`) and by the //! future split `picloud-orchestrator` binary at its own root. use std::collections::BTreeMap; use std::sync::Arc; use std::time::Duration; use axum::{ body::Bytes, extract::{Path, Request, State}, http::{HeaderMap, HeaderName, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::post, Json, Router, }; use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_shared::{ ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId, }; use serde_json::Value as Json_; use uuid::Uuid; use crate::client::ExecutorClient; use crate::resolver::{ResolverError, ScriptResolver}; use crate::routing::RouteTable; /// State shared by data-plane handlers. pub struct DataPlaneState { pub executor: Arc, pub resolver: Arc, pub log_sink: Arc, /// Routing table for user-defined paths. Shared with the manager /// (admin router writes; this side reads). pub routes: Arc, } impl Clone for DataPlaneState { fn clone(&self) -> Self { Self { executor: self.executor.clone(), resolver: self.resolver.clone(), log_sink: self.log_sink.clone(), routes: self.routes.clone(), } } } /// Build the data-plane router. Handles `POST /execute/:id` — the /// always-available ID-based bypass. pub fn data_plane_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { Router::new() .route("/execute/{id}", post(execute_by_id::)) .with_state(state) } /// Build a router that handles ALL paths via the user-defined routing /// table. Intended to be merged into the picloud app router as a /// fallback (after the system routes are mounted). pub fn user_routes_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { Router::new() .fallback(user_route_handler::) .with_state(state) } // ---------------------------------------------------------------------------- // Handlers // ---------------------------------------------------------------------------- async fn execute_by_id( State(state): State>, Path(id): Path, headers: HeaderMap, body: Bytes, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let script = state .resolver .resolve(id) .await? .ok_or(ApiError::NotFound(id))?; let mut req = build_exec_request(id, &script.name, &headers, &body)?; req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); let request_headers = req.headers.clone(); let request_body = req.body.clone(); let timeout = Duration::from_secs(u64::from(script.timeout_seconds)); let started = Utc::now(); let outcome = state.executor.execute(&script.source, req, timeout).await; let finished = Utc::now(); // Build and dispatch the audit log regardless of outcome. We await // the sink — recording the trail is part of correctness for an // audit-visible platform — but a sink failure must not mask the // user-facing result, so we only log a warning if it fails. let log = build_execution_log( id, request_id, request_path, request_headers, request_body, &outcome, started, finished, ); if let Err(e) = state.log_sink.record(log).await { tracing::warn!(error = %e, script_id = %id, "failed to persist execution log"); } Ok(exec_response_to_http(outcome?)) } async fn user_route_handler( State(state): State>, request: Request, ) -> Result where E: ExecutorClient + 'static, R: ScriptResolver + 'static, { let method = request.method().as_str().to_string(); let uri = request.uri().clone(); let path = uri.path().to_string(); let query_str = uri.query().unwrap_or("").to_string(); let host = request .headers() .get("host") .and_then(|h| h.to_str().ok()) .unwrap_or("") .to_string(); let headers = request.headers().clone(); let Some(matched) = state.routes.match_request(&host, &method, &path) else { return Ok(( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": format!("no route matches {method} {path}") })), ) .into_response()); }; let script = state .resolver .resolve(matched.matched.script_id) .await? .ok_or(ApiError::NotFound(matched.matched.script_id))?; // Drain the body now that we know we'll execute. 10 MiB cap matches // the conservative default response/request size in the blueprint. let body_bytes = match axum::body::to_bytes(request.into_body(), 10 * 1024 * 1024).await { Ok(b) => b, Err(e) => return Err(ApiError::BadRequest(format!("body read failed: {e}"))), }; let mut req = build_exec_request( matched.matched.script_id, &script.name, &headers, &body_bytes, )?; req.path = path; req.params = matched.params; req.query = parse_query_string(&query_str); req.rest = matched.rest.unwrap_or_default(); req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); let request_headers = req.headers.clone(); let request_body = req.body.clone(); let timeout = Duration::from_secs(u64::from(script.timeout_seconds)); let started = Utc::now(); let outcome = state.executor.execute(&script.source, req, timeout).await; let finished = Utc::now(); let log = build_execution_log( matched.matched.script_id, request_id, request_path, request_headers, request_body, &outcome, started, finished, ); if let Err(e) = state.log_sink.record(log).await { tracing::warn!( error = %e, script_id = %matched.matched.script_id, "failed to persist execution log" ); } Ok(exec_response_to_http(outcome?)) } fn parse_query_string(s: &str) -> BTreeMap { let mut out = BTreeMap::new(); if s.is_empty() { return out; } for pair in s.split('&') { let (k, v) = match pair.split_once('=') { Some((k, v)) => (k, v), None => (pair, ""), }; let key = urlencoding::decode(k) .map(std::borrow::Cow::into_owned) .unwrap_or_default(); let val = urlencoding::decode(v) .map(std::borrow::Cow::into_owned) .unwrap_or_default(); out.insert(key, val); } out } // ---------------------------------------------------------------------------- // Marshalling // ---------------------------------------------------------------------------- fn build_exec_request( id: ScriptId, name: &str, headers: &HeaderMap, body: &Bytes, ) -> Result { let mut hmap = BTreeMap::new(); for (k, v) in headers { if let Ok(s) = v.to_str() { hmap.insert(k.as_str().to_string(), s.to_string()); } } let body_json: Json_ = if body.is_empty() { Json_::Null } else { serde_json::from_slice(body) .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? }; Ok(ExecRequest { execution_id: ExecutionId::new(), request_id: RequestId::new(), script_id: id, script_name: name.to_string(), invocation_type: InvocationType::Http, path: format!("/api/execute/{id}"), headers: hmap, body: body_json, params: BTreeMap::new(), query: BTreeMap::new(), rest: String::new(), // Overwritten by the handler after the script is resolved. sandbox_overrides: picloud_shared::ScriptSandbox::default(), }) } fn exec_response_to_http(resp: ExecResponse) -> Response { let status = StatusCode::from_u16(resp.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let mut http_headers = HeaderMap::new(); for (k, v) in resp.headers { if let (Ok(name), Ok(value)) = (k.parse::(), v.parse::()) { http_headers.insert(name, value); } } http_headers .entry(axum::http::header::CONTENT_TYPE) .or_insert_with(|| HeaderValue::from_static("application/json")); (status, http_headers, Json(resp.body)).into_response() } #[allow(clippy::too_many_arguments)] fn build_execution_log( script_id: ScriptId, request_id: RequestId, request_path: String, request_headers: BTreeMap, request_body: Json_, outcome: &Result, started: chrono::DateTime, finished: chrono::DateTime, ) -> ExecutionLog { let duration_ms = u64::try_from( finished .signed_duration_since(started) .num_milliseconds() .max(0), ) .unwrap_or(0); let (status, response_code, response_body, script_logs) = match outcome { Ok(resp) => { let logs = serde_json::to_value(&resp.logs).unwrap_or(Json_::Array(vec![])); ( ExecutionStatus::Success, Some(resp.status_code), Some(resp.body.clone()), logs, ) } Err(e) => { let status = match e { ExecError::Timeout(_) => ExecutionStatus::Timeout, ExecError::OperationBudgetExceeded => ExecutionStatus::BudgetExceeded, _ => ExecutionStatus::Error, }; ( status, None, Some(serde_json::json!({ "error": e.to_string() })), Json_::Array(vec![]), ) } }; ExecutionLog { id: Uuid::new_v4(), script_id, request_id, request_path, request_headers, request_body, response_code, response_body, script_logs, duration_ms, status, created_at: started, } } // ---------------------------------------------------------------------------- // Errors // ---------------------------------------------------------------------------- #[derive(Debug, thiserror::Error)] pub enum ApiError { #[error("script not found: {0}")] NotFound(ScriptId), #[error("bad request: {0}")] BadRequest(String), #[error("resolver error: {0}")] Resolver(#[from] ResolverError), #[error("execution error: {0}")] Exec(#[from] ExecError), } impl IntoResponse for ApiError { fn into_response(self) -> Response { use ApiError as E; let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()), E::Resolver(e) => { tracing::error!(error = %e, "resolver failure"); ( StatusCode::INTERNAL_SERVER_ERROR, "internal error".to_string(), ) } E::Exec(e) => match e { ExecError::Parse(_) | ExecError::InvalidResponse(_) => { (StatusCode::UNPROCESSABLE_ENTITY, e.to_string()) } ExecError::Timeout(_) => (StatusCode::GATEWAY_TIMEOUT, e.to_string()), ExecError::OperationBudgetExceeded => { (StatusCode::INSUFFICIENT_STORAGE, e.to_string()) } ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()), }, }; (status, Json(serde_json::json!({ "error": message }))).into_response() } }