Files
PiCloud/crates/orchestrator-core/src/api.rs
MechaCat02 4c41374db4 feat(manager-core,orchestrator-core): multi-app scoping (Phase 3b)
Apps become the isolation boundary for scripts, routes, domains, and
later data. Doing this now — while the surface is small — avoids
several migrations on populated tables once v1.1 data-plane services
ship.

Schema (migration 0005_apps.sql):
- New tables: apps, app_domains (with shape_key UNIQUE for collision
  detection), app_slug_history (for permanent slug-rename redirects).
- app_id added to scripts, routes, execution_logs (non-null, cascading
  rules per row).
- Script-name uniqueness becomes per-app; the route unique index is
  swapped for an app-scoped version.
- The "default" app is seeded unconditionally with a localhost claim;
  existing scripts/routes backfill into it. Fresh installs additionally
  get the Hello World seed via seed_hello_world_if_fresh after
  migrations run (idempotent — only fires when the default app has no
  scripts).

Orchestrator dispatch is two-phase: AppDomainTable resolves Host →
app_id (most-specific match wins, exact beats wildcard), then the
existing route matcher runs against that app's partitioned slice via
RouteTable. Unknown hosts return 404 at the app layer with a clear
message; /api/v1/execute/{id} still works as the implicit
__internal__ claim, decoupled from any public domain.

Manager API: full CRUD for /api/v1/admin/apps/* and
/api/v1/admin/apps/{id_or_slug}/domains/*, with slug:check + force
takeover semantics implementing the rename-history flow (two-step
check → confirm, never a single endpoint). Script create requires
app_id; list accepts ?app= filter. Route create validates host
against the parent app's claims; conflict detection stays strictly
intra-app.

Dashboard: /admin/apps and /admin/apps/{slug} (overview + scripts +
domains + settings tabs, with slug-history-aware redirects). Root
path redirects to the apps list. Script detail page gains an app
breadcrumb and threads app_id into the route preview.

Deferred per design: per-app admin roles. The require_admin middleware
remains the seam where role checks will slot in later.

Blueprint §11.5 and roadmap updated to reflect what shipped; docs/
versioning.md notes the schema 3 → 5 bump.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 21:03:05 +02:00

424 lines
13 KiB
Rust

//! Data-plane HTTP surface. Mounted by the `picloud` all-in-one binary
//! under `/api` (so the path becomes `/api/execute/:id`) and by the
//! future split `picloud-orchestrator` binary at its own root.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use axum::{
body::Bytes,
extract::{Path, Request, State},
http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
response::{IntoResponse, Response},
routing::post,
Json, Router,
};
use chrono::Utc;
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
use picloud_shared::{
AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId,
};
use serde_json::Value as Json_;
use uuid::Uuid;
use crate::client::ExecutorClient;
use crate::resolver::{ResolverError, ScriptResolver};
use crate::routing::{AppDomainTable, RouteTable};
/// State shared by data-plane handlers.
pub struct DataPlaneState<E, R> {
pub executor: Arc<E>,
pub resolver: Arc<R>,
pub log_sink: Arc<dyn ExecutionLogSink>,
/// Host → app_id resolver. Run before `routes` to filter to the
/// owning app's slice. Shared with the manager (writes invalidate
/// the cache by replacing the table).
pub app_domains: Arc<AppDomainTable>,
/// Routing table for user-defined paths, partitioned per app.
/// Shared with the manager (admin router writes; this side reads).
pub routes: Arc<RouteTable>,
}
impl<E, R> Clone for DataPlaneState<E, R> {
fn clone(&self) -> Self {
Self {
executor: self.executor.clone(),
resolver: self.resolver.clone(),
log_sink: self.log_sink.clone(),
app_domains: self.app_domains.clone(),
routes: self.routes.clone(),
}
}
}
/// Build the data-plane router. Handles `POST /execute/:id` — the
/// always-available ID-based bypass.
pub fn data_plane_router<E, R>(state: DataPlaneState<E, R>) -> Router
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
Router::new()
.route("/execute/{id}", post(execute_by_id::<E, R>))
.with_state(state)
}
/// Build a router that handles ALL paths via the user-defined routing
/// table. Intended to be merged into the picloud app router as a
/// fallback (after the system routes are mounted).
pub fn user_routes_router<E, R>(state: DataPlaneState<E, R>) -> Router
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
Router::new()
.fallback(user_route_handler::<E, R>)
.with_state(state)
}
// ----------------------------------------------------------------------------
// Handlers
// ----------------------------------------------------------------------------
async fn execute_by_id<E, R>(
State(state): State<DataPlaneState<E, R>>,
Path(id): Path<ScriptId>,
headers: HeaderMap,
body: Bytes,
) -> Result<Response, ApiError>
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
let script = state
.resolver
.resolve(id)
.await?
.ok_or(ApiError::NotFound(id))?;
let mut req = build_exec_request(id, &script.name, &headers, &body)?;
req.sandbox_overrides = script.sandbox;
let request_id = req.request_id;
let request_path = req.path.clone();
let request_headers = req.headers.clone();
let request_body = req.body.clone();
let timeout = Duration::from_secs(u64::from(script.timeout_seconds));
let started = Utc::now();
let outcome = state.executor.execute(&script.source, req, timeout).await;
let finished = Utc::now();
// Build and dispatch the audit log regardless of outcome. We await
// the sink — recording the trail is part of correctness for an
// audit-visible platform — but a sink failure must not mask the
// user-facing result, so we only log a warning if it fails.
let log = build_execution_log(
script.app_id,
id,
request_id,
request_path,
request_headers,
request_body,
&outcome,
started,
finished,
);
if let Err(e) = state.log_sink.record(log).await {
tracing::warn!(error = %e, script_id = %id, "failed to persist execution log");
}
Ok(exec_response_to_http(outcome?))
}
async fn user_route_handler<E, R>(
State(state): State<DataPlaneState<E, R>>,
request: Request,
) -> Result<Response, ApiError>
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
let method = request.method().as_str().to_string();
let uri = request.uri().clone();
let path = uri.path().to_string();
let query_str = uri.query().unwrap_or("").to_string();
let host = request
.headers()
.get("host")
.and_then(|h| h.to_str().ok())
.unwrap_or("")
.to_string();
let headers = request.headers().clone();
// Two-phase dispatch (blueprint §11.5): first resolve Host → app_id,
// then run the existing matcher on that app's slice. No app claims
// this host → flat 404; the path doesn't get the chance to fire.
let Some(app_id) = state.app_domains.resolve_app(&host) else {
return Ok((
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": format!("no app claims host {host:?}")
})),
)
.into_response());
};
let Some(matched) = state
.routes
.match_request_for_app(app_id, &host, &method, &path)
else {
return Ok((
StatusCode::NOT_FOUND,
Json(serde_json::json!({
"error": format!("no route matches {method} {path}")
})),
)
.into_response());
};
let script = state
.resolver
.resolve(matched.matched.script_id)
.await?
.ok_or(ApiError::NotFound(matched.matched.script_id))?;
// Drain the body now that we know we'll execute. 10 MiB cap matches
// the conservative default response/request size in the blueprint.
let body_bytes = match axum::body::to_bytes(request.into_body(), 10 * 1024 * 1024).await {
Ok(b) => b,
Err(e) => return Err(ApiError::BadRequest(format!("body read failed: {e}"))),
};
let mut req = build_exec_request(
matched.matched.script_id,
&script.name,
&headers,
&body_bytes,
)?;
req.path = path;
req.params = matched.params;
req.query = parse_query_string(&query_str);
req.rest = matched.rest.unwrap_or_default();
req.sandbox_overrides = script.sandbox;
let request_id = req.request_id;
let request_path = req.path.clone();
let request_headers = req.headers.clone();
let request_body = req.body.clone();
let timeout = Duration::from_secs(u64::from(script.timeout_seconds));
let started = Utc::now();
let outcome = state.executor.execute(&script.source, req, timeout).await;
let finished = Utc::now();
let log = build_execution_log(
script.app_id,
matched.matched.script_id,
request_id,
request_path,
request_headers,
request_body,
&outcome,
started,
finished,
);
if let Err(e) = state.log_sink.record(log).await {
tracing::warn!(
error = %e,
script_id = %matched.matched.script_id,
"failed to persist execution log"
);
}
Ok(exec_response_to_http(outcome?))
}
fn parse_query_string(s: &str) -> BTreeMap<String, String> {
let mut out = BTreeMap::new();
if s.is_empty() {
return out;
}
for pair in s.split('&') {
let (k, v) = match pair.split_once('=') {
Some((k, v)) => (k, v),
None => (pair, ""),
};
let key = urlencoding::decode(k)
.map(std::borrow::Cow::into_owned)
.unwrap_or_default();
let val = urlencoding::decode(v)
.map(std::borrow::Cow::into_owned)
.unwrap_or_default();
out.insert(key, val);
}
out
}
// ----------------------------------------------------------------------------
// Marshalling
// ----------------------------------------------------------------------------
fn build_exec_request(
id: ScriptId,
name: &str,
headers: &HeaderMap,
body: &Bytes,
) -> Result<ExecRequest, ApiError> {
let mut hmap = BTreeMap::new();
for (k, v) in headers {
if let Ok(s) = v.to_str() {
hmap.insert(k.as_str().to_string(), s.to_string());
}
}
let body_json: Json_ = if body.is_empty() {
Json_::Null
} else {
serde_json::from_slice(body)
.map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))?
};
Ok(ExecRequest {
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
script_id: id,
script_name: name.to_string(),
invocation_type: InvocationType::Http,
path: format!("/api/execute/{id}"),
headers: hmap,
body: body_json,
params: BTreeMap::new(),
query: BTreeMap::new(),
rest: String::new(),
// Overwritten by the handler after the script is resolved.
sandbox_overrides: picloud_shared::ScriptSandbox::default(),
})
}
fn exec_response_to_http(resp: ExecResponse) -> Response {
let status =
StatusCode::from_u16(resp.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut http_headers = HeaderMap::new();
for (k, v) in resp.headers {
if let (Ok(name), Ok(value)) = (k.parse::<HeaderName>(), v.parse::<HeaderValue>()) {
http_headers.insert(name, value);
}
}
http_headers
.entry(axum::http::header::CONTENT_TYPE)
.or_insert_with(|| HeaderValue::from_static("application/json"));
(status, http_headers, Json(resp.body)).into_response()
}
#[allow(clippy::too_many_arguments)]
fn build_execution_log(
app_id: AppId,
script_id: ScriptId,
request_id: RequestId,
request_path: String,
request_headers: BTreeMap<String, String>,
request_body: Json_,
outcome: &Result<ExecResponse, ExecError>,
started: chrono::DateTime<Utc>,
finished: chrono::DateTime<Utc>,
) -> ExecutionLog {
let duration_ms = u64::try_from(
finished
.signed_duration_since(started)
.num_milliseconds()
.max(0),
)
.unwrap_or(0);
let (status, response_code, response_body, script_logs) = match outcome {
Ok(resp) => {
let logs = serde_json::to_value(&resp.logs).unwrap_or(Json_::Array(vec![]));
(
ExecutionStatus::Success,
Some(resp.status_code),
Some(resp.body.clone()),
logs,
)
}
Err(e) => {
let status = match e {
ExecError::Timeout(_) => ExecutionStatus::Timeout,
ExecError::OperationBudgetExceeded => ExecutionStatus::BudgetExceeded,
_ => ExecutionStatus::Error,
};
(
status,
None,
Some(serde_json::json!({ "error": e.to_string() })),
Json_::Array(vec![]),
)
}
};
ExecutionLog {
id: Uuid::new_v4(),
app_id,
script_id,
request_id,
request_path,
request_headers,
request_body,
response_code,
response_body,
script_logs,
duration_ms,
status,
created_at: started,
}
}
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------
#[derive(Debug, thiserror::Error)]
pub enum ApiError {
#[error("script not found: {0}")]
NotFound(ScriptId),
#[error("bad request: {0}")]
BadRequest(String),
#[error("resolver error: {0}")]
Resolver(#[from] ResolverError),
#[error("execution error: {0}")]
Exec(#[from] ExecError),
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
use ApiError as E;
let (status, message) = match &self {
E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()),
E::Resolver(e) => {
tracing::error!(error = %e, "resolver failure");
(
StatusCode::INTERNAL_SERVER_ERROR,
"internal error".to_string(),
)
}
E::Exec(e) => match e {
ExecError::Parse(_) | ExecError::InvalidResponse(_) => {
(StatusCode::UNPROCESSABLE_ENTITY, e.to_string())
}
ExecError::Timeout(_) => (StatusCode::GATEWAY_TIMEOUT, e.to_string()),
ExecError::OperationBudgetExceeded => {
(StatusCode::INSUFFICIENT_STORAGE, e.to_string())
}
ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()),
},
};
(status, Json(serde_json::json!({ "error": message }))).into_response()
}
}