feat: end-to-end script CRUD + Rhai execution

Brings the MVP feature set online: upload a Rhai script, get an HTTP
endpoint that runs it sandboxed in-process, list/update/delete it, and
have invalid sources rejected at upload time. Verified live through
Caddy with a full lifecycle (`create → list → get → execute → update
→ delete`) plus error paths (syntax error, duplicate name, deleted).

Layout — every concern lands behind the trait seam its layer owns, so
cluster-mode in v1.3+ is a swap of two impls, not a rewrite:

  * shared::ScriptValidator — manager calls into validation without
    a hard dep on executor-core; executor-core impls the trait on
    `Engine`. Pinned in shared so neither crate has to know about
    the other.

  * executor-core::Engine — real Rhai engine: sandbox limits (max
    operations / string size / map size / call depth), disabled
    `print`, blocked `import` (DummyModuleResolver), `log::trace
    /info/warn/error` registered as a static module with shared
    log-capture buffer (no `log::debug` because `debug` is a Rhai
    reserved keyword — `log::trace` covers the same need).

      - `ctx` is pushed as a Scope constant exposing
        execution_id, script_id, script_name, request_id,
        invocation_type, request.{path,headers,body}.

      - Response convention: a Map with `statusCode` is the
        structured shape (`{statusCode, headers?, body}`); any
        other return value is a 200 with the value as the body.

      - Engine::execute is now synchronous (pure compute); the
        async wrapper + wall-clock timeout live in
        LocalExecutorClient, which spawns_blocking and applies a
        300s hard ceiling regardless of per-script config.

      - 10 unit tests cover validate, exec, structured response,
        ctx exposure, log capture, op-budget enforcement, runtime
        errors, blocked imports, JSON round-tripping.

  * manager-core::repo — full sqlx CRUD over the `scripts` table,
    with proper unique-violation handling for duplicate names.
    Embedded migrations via `sqlx::migrate!` (one initial
    `0001_init.sql` for pgcrypto + scripts + execution_logs).

  * manager-core::api — `admin_router` mounts `/scripts` and
    `/scripts/{id}`. Create + Update validate source through the
    injected `ScriptValidator` before persistence. Returns proper
    422/409/404 status codes via `ApiError::IntoResponse`.

  * orchestrator-core::api — `data_plane_router` mounts
    `/execute/{id}`: resolves the script through `ScriptResolver`,
    constructs the `ExecRequest` from headers+body, awaits
    `ExecutorClient::execute(..., timeout)`, translates the
    `ExecResponse` to an axum `Response` with header passthrough.
    Maps `ExecError` variants to 422/504/502/507.

  * picloud all-in-one — opens the pool, runs migrations, builds
    one engine, nests both routers under `/api/admin` and `/api`,
    enables structured JSON tracing and graceful shutdown on
    SIGTERM. Single `PostgresScriptRepository` Arc is shared by
    the admin router (writes) and the resolver (reads).

Other changes:
  * Workspace axum bump 0.7 → 0.8 for the `{id}` path syntax
    matching the route definitions.
  * Workspace clippy: allow `needless_pass_by_value` and
    `boxed_local` to keep API ergonomics over pedantic noise.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-23 00:00:36 +02:00
parent 9efe678983
commit 4f044e7b81
19 changed files with 1272 additions and 76 deletions

26
Cargo.lock generated
View File

@@ -89,13 +89,13 @@ checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53"
[[package]]
name = "axum"
version = "0.7.9"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
dependencies = [
"async-trait",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
@@ -108,8 +108,7 @@ dependencies = [
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_core",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
@@ -123,19 +122,17 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.4.5"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199"
checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
@@ -1014,9 +1011,9 @@ dependencies = [
[[package]]
name = "matchit"
version = "0.7.3"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md-5"
@@ -1196,6 +1193,7 @@ name = "picloud"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"axum",
"figment",
"picloud-executor-core",
@@ -1204,6 +1202,7 @@ dependencies = [
"picloud-shared",
"serde",
"serde_json",
"sqlx",
"thiserror 1.0.69",
"tokio",
"tower",
@@ -1255,6 +1254,7 @@ name = "picloud-manager-core"
version = "0.1.0"
dependencies = [
"async-trait",
"axum",
"chrono",
"picloud-orchestrator-core",
"picloud-shared",
@@ -1283,6 +1283,7 @@ name = "picloud-orchestrator-core"
version = "0.1.0"
dependencies = [
"async-trait",
"axum",
"chrono",
"picloud-executor-core",
"picloud-shared",
@@ -1290,6 +1291,7 @@ dependencies = [
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
"uuid",
]

View File

@@ -27,7 +27,7 @@ picloud-manager-core = { path = "crates/manager-core" }
# Async + HTTP
tokio = { version = "1.40", features = ["full"] }
axum = "0.7"
axum = "0.8"
tower = "0.5"
tower-http = { version = "0.6", features = ["trace", "cors"] }
hyper = "1"
@@ -71,6 +71,10 @@ module_name_repetitions = "allow"
missing_errors_doc = "allow"
missing_panics_doc = "allow"
doc_markdown = "allow"
# API ergonomics: we deliberately take values by ownership for owned
# inputs (e.g. ExecRequest) and accept Rhai's Box<EvalAltResult> as-is.
needless_pass_by_value = "allow"
boxed_local = "allow"
[profile.release]
lto = "thin"

View File

@@ -1,15 +1,26 @@
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use chrono::Utc;
use picloud_shared::{ScriptValidator, ValidationError};
use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope};
use serde_json::Value as Json;
use crate::sandbox::Limits;
use crate::types::{ExecError, ExecRequest, ExecResponse};
use crate::types::{
ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel,
};
/// Preconfigured Rhai engine with sandbox limits applied.
///
/// One `Engine` is constructed at process startup and reused across
/// invocations. `execute` is the only entry point — it owns the per-call
/// scope and log buffer, then returns a complete `ExecResponse`.
/// invocations. `execute` is **synchronous** — it owns the per-call
/// scope and log buffer. Wall-clock timeouts and offloading off the
/// async runtime belong to the caller (orchestrator-core's
/// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`).
pub struct Engine {
limits: Limits,
// The actual `rhai::Engine` lands with the first execution implementation.
// Keep this opaque for now so callers don't bind to it.
}
impl Engine {
@@ -23,26 +34,288 @@ impl Engine {
&self.limits
}
/// Parse-only validation, used by the manager at script-upload time
/// so syntax errors are surfaced before the first invocation.
pub fn validate(&self, _source: &str) -> Result<(), ExecError> {
// TODO(executor-core): wire `rhai::Engine::compile`
Ok(())
/// Parse-only validation. Surfaced at script-upload time so syntax
/// errors are caught before the first invocation. Same logic as the
/// `ScriptValidator` impl below but with the richer `ExecError`
/// variant; callers in the executor path use this, the manager
/// path goes through the trait.
pub fn validate(&self, source: &str) -> Result<(), ExecError> {
let engine = build_engine(self.limits, None);
engine
.compile(source)
.map(|_| ())
.map_err(|e| ExecError::Parse(e.to_string()))
}
/// Execute `source` against `req` under the configured sandbox.
///
/// `async` is part of the contract: v1.1+ SDK calls (kv, docs, http)
/// will await injected service providers from inside this method.
#[allow(clippy::unused_async)]
pub async fn execute(
&self,
_source: &str,
_req: ExecRequest,
) -> Result<ExecResponse, ExecError> {
// TODO(executor-core): wire `rhai::Engine::eval_with_scope`
Err(ExecError::Runtime(
"executor-core::Engine::execute not yet implemented".into(),
))
/// Execute `source` against `req`. Op-budget protection comes from
/// Rhai's `set_max_operations`; wall-clock enforcement is the
/// caller's responsibility.
pub fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> {
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let engine = build_engine(self.limits, Some(logs.clone()));
let ast = engine
.compile(source)
.map_err(|e| ExecError::Parse(e.to_string()))?;
let mut scope = Scope::new();
scope.push_constant("ctx", build_ctx_map(&req));
let started = Instant::now();
let value: Dynamic = engine
.eval_ast_with_scope(&mut scope, &ast)
.map_err(map_eval_error)?;
let duration = started.elapsed();
let logs = Arc::try_unwrap(logs).map_or_else(
|arc| arc.lock().map(|g| g.clone()).unwrap_or_default(),
|m| m.into_inner().unwrap_or_default(),
);
let (status_code, headers, body) = parse_response(value)?;
Ok(ExecResponse {
status_code,
headers,
body,
logs,
stats: ExecStats {
duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
operations: 0,
},
})
}
}
impl ScriptValidator for Engine {
fn validate(&self, source: &str) -> Result<(), ValidationError> {
Engine::validate(self, source).map_err(|e| ValidationError::Syntax(e.to_string()))
}
}
// ----------------------------------------------------------------------------
// Engine construction
// ----------------------------------------------------------------------------
fn build_engine(limits: Limits, logs: Option<Arc<Mutex<Vec<LogEntry>>>>) -> RhaiEngine {
let mut engine = RhaiEngine::new();
engine.set_max_operations(limits.max_operations);
engine.set_max_string_size(limits.max_string_size);
engine.set_max_array_size(limits.max_array_size);
engine.set_max_map_size(limits.max_map_size);
engine.set_max_call_levels(limits.max_call_levels);
engine.set_max_expr_depths(limits.max_expr_depth, limits.max_expr_depth);
// Reject `import` — scripts cannot pull external modules.
engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver);
// Rhai's built-in `print` and `debug` map to stdout/stderr by
// default; we never want scripts dumping there directly. Disable
// them so scripts route all output through `log::*` instead.
engine.disable_symbol("print");
if let Some(logs) = logs {
engine.register_static_module("log", build_log_module(logs).into());
}
engine
}
fn build_log_module(logs: Arc<Mutex<Vec<LogEntry>>>) -> Module {
let mut module = Module::new();
register_log_fn(&mut module, "trace", LogLevel::Trace, &logs);
register_log_fn(&mut module, "info", LogLevel::Info, &logs);
register_log_fn(&mut module, "warn", LogLevel::Warn, &logs);
register_log_fn(&mut module, "error", LogLevel::Error, &logs);
// No `log::debug` — `debug` is a Rhai reserved keyword. Use
// `log::trace` for sub-info-level diagnostics.
module
}
fn register_log_fn(
module: &mut Module,
name: &str,
level: LogLevel,
logs: &Arc<Mutex<Vec<LogEntry>>>,
) {
// Single-argument form: `log::info("message")`.
let logs_single = logs.clone();
module.set_native_fn(name, move |msg: &str| {
push_log(&logs_single, level, msg, None);
Ok::<_, Box<EvalAltResult>>(())
});
// Two-argument form: `log::info("message", #{ user: 42 })`.
let logs_struct = logs.clone();
module.set_native_fn(name, move |msg: &str, data: Dynamic| {
let json = dynamic_to_json(&data);
push_log(&logs_struct, level, msg, Some(json));
Ok::<_, Box<EvalAltResult>>(())
});
}
fn push_log(logs: &Arc<Mutex<Vec<LogEntry>>>, level: LogLevel, message: &str, data: Option<Json>) {
if let Ok(mut g) = logs.lock() {
g.push(LogEntry {
timestamp: Utc::now(),
level,
message: message.to_string(),
data,
});
}
}
// ----------------------------------------------------------------------------
// ctx construction
// ----------------------------------------------------------------------------
fn build_ctx_map(req: &ExecRequest) -> Map {
let mut ctx = Map::new();
ctx.insert("execution_id".into(), req.execution_id.to_string().into());
ctx.insert("script_id".into(), req.script_id.to_string().into());
ctx.insert("script_name".into(), req.script_name.clone().into());
ctx.insert("request_id".into(), req.request_id.to_string().into());
ctx.insert(
"invocation_type".into(),
invocation_type_str(req.invocation_type).into(),
);
let mut request = Map::new();
request.insert("path".into(), req.path.clone().into());
let mut headers = Map::new();
for (k, v) in &req.headers {
headers.insert(k.clone().into(), v.clone().into());
}
request.insert("headers".into(), headers.into());
request.insert("body".into(), json_to_dynamic(req.body.clone()));
ctx.insert("request".into(), request.into());
ctx
}
fn invocation_type_str(it: InvocationType) -> &'static str {
match it {
InvocationType::Http => "http",
InvocationType::Function => "function",
InvocationType::Scheduled => "scheduled",
}
}
// ----------------------------------------------------------------------------
// Response parsing
// ----------------------------------------------------------------------------
fn parse_response(value: Dynamic) -> Result<(u16, BTreeMap<String, String>, Json), ExecError> {
// Convention: a Map with a `statusCode` field is the structured shape.
// Anything else is treated as a 200 response with the value as body.
if value.is_map() {
if let Some(map) = value.clone().try_cast::<Map>() {
if map.contains_key("statusCode") {
return parse_structured_response(map);
}
}
}
Ok((200, BTreeMap::new(), dynamic_to_json(&value)))
}
fn parse_structured_response(map: Map) -> Result<(u16, BTreeMap<String, String>, Json), ExecError> {
let status_dyn = map
.get("statusCode")
.ok_or_else(|| ExecError::InvalidResponse("missing statusCode".into()))?;
let status_code: i64 = status_dyn
.as_int()
.map_err(|_| ExecError::InvalidResponse("statusCode must be an integer".into()))?;
let status_code = u16::try_from(status_code)
.map_err(|_| ExecError::InvalidResponse("statusCode out of HTTP range".into()))?;
let mut headers: BTreeMap<String, String> = BTreeMap::new();
if let Some(h) = map.get("headers") {
if let Some(h_map) = h.clone().try_cast::<Map>() {
for (k, v) in h_map {
headers.insert(k.to_string(), v.to_string());
}
}
}
let body = map.get("body").map_or(Json::Null, dynamic_to_json);
Ok((status_code, headers, body))
}
// ----------------------------------------------------------------------------
// Rhai ↔ serde_json bridges
// ----------------------------------------------------------------------------
fn json_to_dynamic(value: Json) -> Dynamic {
match value {
Json::Null => Dynamic::UNIT,
Json::Bool(b) => b.into(),
Json::Number(n) => {
if let Some(i) = n.as_i64() {
i.into()
} else if let Some(f) = n.as_f64() {
f.into()
} else {
n.to_string().into()
}
}
Json::String(s) => s.into(),
Json::Array(arr) => arr
.into_iter()
.map(json_to_dynamic)
.collect::<Vec<Dynamic>>()
.into(),
Json::Object(obj) => {
let mut m = Map::new();
for (k, v) in obj {
m.insert(k.into(), json_to_dynamic(v));
}
Dynamic::from(m)
}
}
}
fn dynamic_to_json(value: &Dynamic) -> Json {
if value.is_unit() {
return Json::Null;
}
if let Ok(b) = value.as_bool() {
return Json::Bool(b);
}
if let Ok(i) = value.as_int() {
return Json::Number(i.into());
}
if let Ok(f) = value.as_float() {
return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number);
}
if value.is_string() {
return Json::String(value.clone().into_string().unwrap_or_default());
}
if let Some(arr) = value.clone().try_cast::<rhai::Array>() {
return Json::Array(arr.iter().map(dynamic_to_json).collect());
}
if let Some(map) = value.clone().try_cast::<Map>() {
let mut out = serde_json::Map::new();
for (k, v) in map {
out.insert(k.to_string(), dynamic_to_json(&v));
}
return Json::Object(out);
}
// Anything else (timestamps, custom types) — best-effort string form.
Json::String(value.to_string())
}
// ----------------------------------------------------------------------------
// Error mapping
// ----------------------------------------------------------------------------
fn map_eval_error(err: Box<EvalAltResult>) -> ExecError {
match *err {
EvalAltResult::ErrorTooManyOperations(_) => ExecError::OperationBudgetExceeded,
EvalAltResult::ErrorParsing(parse_err, _) => ExecError::Parse(parse_err.to_string()),
other => ExecError::Runtime(other.to_string()),
}
}

View File

@@ -41,7 +41,7 @@ pub struct ExecResponse {
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Debug,
Trace,
Info,
Warn,
Error,

View File

@@ -0,0 +1,157 @@
use std::collections::BTreeMap;
use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel};
use picloud_shared::{ExecutionId, RequestId, ScriptId};
use serde_json::json;
fn req(body: serde_json::Value) -> ExecRequest {
ExecRequest {
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "test".into(),
invocation_type: InvocationType::Http,
path: "/test".into(),
headers: BTreeMap::new(),
body,
}
}
fn engine() -> Engine {
Engine::new(Limits::default())
}
#[test]
fn validate_accepts_well_formed_script() {
engine()
.validate("let x = 1; #{ statusCode: 200, body: x }")
.expect("valid script should validate");
}
#[test]
fn validate_rejects_syntax_errors() {
let err = engine()
.validate("this is not rhai @@@")
.expect_err("invalid script should not validate");
assert!(matches!(err, ExecError::Parse(_)));
}
#[test]
fn returns_unwrapped_value_as_200_body() {
let resp = engine()
.execute("42", req(json!(null)))
.expect("should execute");
assert_eq!(resp.status_code, 200);
assert_eq!(resp.body, json!(42));
assert!(resp.headers.is_empty());
}
#[test]
fn returns_structured_response_when_status_code_present() {
let src = r#"
#{ statusCode: 201,
headers: #{ "x-test": "hello" },
body: #{ ok: true, msg: "created" } }
"#;
let resp = engine().execute(src, req(json!(null))).unwrap();
assert_eq!(resp.status_code, 201);
assert_eq!(
resp.headers.get("x-test").map(String::as_str),
Some("hello")
);
assert_eq!(resp.body, json!({ "ok": true, "msg": "created" }));
}
#[test]
fn ctx_exposes_request_data() {
let src = r"
#{ statusCode: 200,
body: #{
path: ctx.request.path,
name: ctx.script_name,
amount: ctx.request.body.amount
} }
";
let r = ExecRequest {
path: "/payments".into(),
body: json!({ "amount": 1234 }),
script_name: "payments".into(),
..req(json!(null))
};
let resp = engine().execute(src, r).unwrap();
assert_eq!(
resp.body,
json!({ "path": "/payments", "name": "payments", "amount": 1234 })
);
}
#[test]
fn captures_log_calls() {
let src = r#"
log::info("starting");
log::warn("watch out", #{ count: 3 });
log::error("oops");
log::trace("deep diagnostic");
42
"#;
let resp = engine().execute(src, req(json!(null))).unwrap();
assert_eq!(resp.logs.len(), 4);
let levels: Vec<_> = resp.logs.iter().map(|l| l.level).collect();
assert_eq!(
levels,
vec![
LogLevel::Info,
LogLevel::Warn,
LogLevel::Error,
LogLevel::Trace
]
);
assert_eq!(resp.logs[0].message, "starting");
assert_eq!(resp.logs[1].data, Some(json!({ "count": 3 })));
}
#[test]
fn enforces_operation_budget() {
let limits = Limits {
max_operations: 1_000,
..Limits::default()
};
let engine = Engine::new(limits);
// 10_000 iterations vastly exceeds 1_000 ops.
let src = r"let n = 0; for i in 0..10000 { n += 1; } n";
let err = engine
.execute(src, req(json!(null)))
.expect_err("should exceed budget");
assert!(matches!(err, ExecError::OperationBudgetExceeded));
}
#[test]
fn runtime_error_is_mapped_to_runtime_variant() {
let err = engine()
.execute("1 / 0", req(json!(null)))
.expect_err("division by zero should error");
assert!(matches!(err, ExecError::Runtime(_)));
}
#[test]
fn module_import_is_blocked() {
let err = engine()
.execute(r#"import "evil" as e; 1"#, req(json!(null)))
.expect_err("imports should be blocked");
// Module-not-found is reported as a runtime error via DummyModuleResolver.
assert!(matches!(err, ExecError::Runtime(_) | ExecError::Parse(_)));
}
#[test]
fn body_passes_through_nested_json_round_trip() {
let src = "#{ statusCode: 200, body: ctx.request.body }";
let body = json!({
"deep": {
"list": [1, "two", 3.5, null, true, { "k": "v" }],
"count": 6
}
});
let resp = engine().execute(src, req(body.clone())).unwrap();
assert_eq!(resp.body, body);
}

View File

@@ -13,6 +13,7 @@ picloud-shared.workspace = true
picloud-orchestrator-core.workspace = true
async-trait.workspace = true
axum.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,43 @@
-- pgcrypto provides gen_random_uuid(). hstore is not needed yet (v1.1+
-- KV service); leave it for the migration that introduces that feature.
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE scripts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
description TEXT,
version INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL,
timeout_seconds INTEGER NOT NULL DEFAULT 30 CHECK (timeout_seconds > 0 AND timeout_seconds <= 300),
memory_limit_mb INTEGER NOT NULL DEFAULT 256 CHECK (memory_limit_mb > 0 AND memory_limit_mb <= 2048),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Names are user-facing; unique so the dashboard list and any future
-- name-based routing have an obvious identifier to surface.
CREATE UNIQUE INDEX scripts_name_uidx ON scripts (LOWER(name));
CREATE TABLE execution_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
script_id UUID NOT NULL REFERENCES scripts(id) ON DELETE CASCADE,
request_id UUID NOT NULL,
request_path TEXT,
request_headers JSONB NOT NULL DEFAULT '{}'::jsonb,
request_body JSONB,
response_code INTEGER,
response_body JSONB,
logs JSONB NOT NULL DEFAULT '[]'::jsonb,
duration_ms INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL CHECK (status IN ('success','error','timeout','budget_exceeded')),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX execution_logs_script_id_created_at_idx
ON execution_logs (script_id, created_at DESC);

View File

@@ -0,0 +1,197 @@
//! Control-plane HTTP surface. Mounted by the `picloud` all-in-one
//! binary under `/api/admin` and by the future split `picloud-manager`
//! binary at its own root.
use std::sync::Arc;
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use picloud_shared::{Script, ScriptId, ScriptValidator, ValidationError};
use serde::Deserialize;
use crate::repo::{NewScript, ScriptPatch, ScriptRepository, ScriptRepositoryError};
/// State shared by control-plane handlers. Separates concerns so the
/// manager can validate at upload time without depending on the
/// concrete executor-core types.
pub struct AdminState<R> {
pub repo: Arc<R>,
pub validator: Arc<dyn ScriptValidator>,
}
impl<R> Clone for AdminState<R> {
fn clone(&self) -> Self {
Self {
repo: self.repo.clone(),
validator: self.validator.clone(),
}
}
}
/// Build the admin router. The caller (binary) chooses where to mount
/// it (typically `Router::new().nest("/api/admin", admin_router(state))`).
pub fn admin_router<R: ScriptRepository + 'static>(state: AdminState<R>) -> Router {
Router::new()
.route("/scripts", get(list_scripts::<R>).post(create_script::<R>))
.route(
"/scripts/{id}",
get(get_script::<R>)
.put(update_script::<R>)
.delete(delete_script::<R>),
)
.with_state(state)
}
// ----------------------------------------------------------------------------
// DTOs
// ----------------------------------------------------------------------------
#[derive(Debug, Deserialize)]
pub struct CreateScriptRequest {
pub name: String,
pub description: Option<String>,
pub source: String,
pub timeout_seconds: Option<i32>,
pub memory_limit_mb: Option<i32>,
}
#[derive(Debug, Deserialize)]
pub struct UpdateScriptRequest {
pub name: Option<String>,
// Double Option lets clients explicitly clear the description by
// sending `"description": null`; an absent field leaves it alone.
#[serde(default, deserialize_with = "deserialize_optional_optional")]
#[allow(clippy::option_option)]
pub description: Option<Option<String>>,
pub source: Option<String>,
pub timeout_seconds: Option<i32>,
pub memory_limit_mb: Option<i32>,
}
#[allow(clippy::option_option)]
fn deserialize_optional_optional<'de, D>(d: D) -> Result<Option<Option<String>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Option::<String>::deserialize(d).map(Some)
}
// ----------------------------------------------------------------------------
// Handlers
// ----------------------------------------------------------------------------
async fn list_scripts<R: ScriptRepository>(
State(state): State<AdminState<R>>,
) -> Result<Json<Vec<Script>>, ApiError> {
Ok(Json(state.repo.list().await?))
}
async fn get_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
Path(id): Path<ScriptId>,
) -> Result<Json<Script>, ApiError> {
state
.repo
.get(id)
.await?
.map(Json)
.ok_or(ApiError::NotFound(id))
}
async fn create_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
Json(input): Json<CreateScriptRequest>,
) -> Result<(StatusCode, Json<Script>), ApiError> {
state.validator.validate(&input.source)?;
let created = state
.repo
.create(NewScript {
name: input.name,
description: input.description,
source: input.source,
timeout_seconds: input.timeout_seconds,
memory_limit_mb: input.memory_limit_mb,
})
.await?;
Ok((StatusCode::CREATED, Json(created)))
}
async fn update_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
Path(id): Path<ScriptId>,
Json(input): Json<UpdateScriptRequest>,
) -> Result<Json<Script>, ApiError> {
if let Some(src) = input.source.as_deref() {
state.validator.validate(src)?;
}
let updated = state
.repo
.update(
id,
ScriptPatch {
name: input.name,
description: input.description,
source: input.source,
timeout_seconds: input.timeout_seconds,
memory_limit_mb: input.memory_limit_mb,
},
)
.await?;
Ok(Json(updated))
}
async fn delete_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
Path(id): Path<ScriptId>,
) -> Result<StatusCode, ApiError> {
state.repo.delete(id).await?;
Ok(StatusCode::NO_CONTENT)
}
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------
#[derive(Debug, thiserror::Error)]
pub enum ApiError {
#[error("script not found: {0}")]
NotFound(ScriptId),
#[error("conflict: {0}")]
Conflict(String),
#[error("invalid script: {0}")]
Invalid(#[from] ValidationError),
#[error("repository error: {0}")]
Repo(#[from] ScriptRepositoryError),
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
let (status, message) = match &self {
Self::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
Self::Conflict(_) => (StatusCode::CONFLICT, self.to_string()),
Self::Invalid(_) => (StatusCode::UNPROCESSABLE_ENTITY, self.to_string()),
Self::Repo(ScriptRepositoryError::NotFound(_)) => {
(StatusCode::NOT_FOUND, self.to_string())
}
Self::Repo(ScriptRepositoryError::Conflict(_)) => {
(StatusCode::CONFLICT, self.to_string())
}
Self::Repo(ScriptRepositoryError::Db(e)) => {
tracing::error!(error = %e, "manager db error");
(
StatusCode::INTERNAL_SERVER_ERROR,
"internal error".to_string(),
)
}
};
(status, Json(serde_json::json!({ "error": message }))).into_response()
}
}

View File

@@ -4,7 +4,13 @@
//! the same DB for now; once we add caching and per-node ingress, the
//! manager will publish change events.
pub mod api;
pub mod migrations;
pub mod repo;
pub mod scheduler;
pub use repo::{PostgresScriptRepository, ScriptRepository, ScriptRepositoryError};
pub use api::{admin_router, AdminState};
pub use repo::{
NewScript, PostgresScriptRepository, RepoResolver, ScriptPatch, ScriptRepository,
ScriptRepositoryError,
};

View File

@@ -0,0 +1,9 @@
//! Embedded SQL migrations. Runs against the manager's `PgPool` at
//! startup. New migrations live in `crates/manager-core/migrations/`
//! and follow the `NNNN_description.sql` convention.
use sqlx::PgPool;
pub async fn run(pool: &PgPool) -> Result<(), sqlx::migrate::MigrateError> {
sqlx::migrate!("./migrations").run(pool).await
}

View File

@@ -10,6 +10,9 @@ pub enum ScriptRepositoryError {
#[error("not found: {0}")]
NotFound(ScriptId),
#[error("conflict: {0}")]
Conflict(String),
}
/// CRUD over the `scripts` table.
@@ -17,11 +20,36 @@ pub enum ScriptRepositoryError {
pub trait ScriptRepository: Send + Sync {
async fn get(&self, id: ScriptId) -> Result<Option<Script>, ScriptRepositoryError>;
async fn list(&self) -> Result<Vec<Script>, ScriptRepositoryError>;
async fn create(&self, script: &Script) -> Result<(), ScriptRepositoryError>;
async fn update(&self, script: &Script) -> Result<(), ScriptRepositoryError>;
async fn create(&self, input: NewScript) -> Result<Script, ScriptRepositoryError>;
async fn update(
&self,
id: ScriptId,
patch: ScriptPatch,
) -> Result<Script, ScriptRepositoryError>;
async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError>;
}
/// Inbound shape for create. Defaults match the migration's CHECK
/// constraints; the repo enforces them in the DB regardless.
#[derive(Debug, Clone)]
pub struct NewScript {
pub name: String,
pub description: Option<String>,
pub source: String,
pub timeout_seconds: Option<i32>,
pub memory_limit_mb: Option<i32>,
}
/// Inbound shape for update. `None` fields are left untouched.
#[derive(Debug, Clone, Default)]
pub struct ScriptPatch {
pub name: Option<String>,
pub description: Option<Option<String>>,
pub source: Option<String>,
pub timeout_seconds: Option<i32>,
pub memory_limit_mb: Option<i32>,
}
pub struct PostgresScriptRepository {
pool: PgPool,
}
@@ -31,37 +59,145 @@ impl PostgresScriptRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
#[must_use]
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
// Real query bodies land alongside the first migration. Stubbing the trait
// impl so the workspace compiles and the seam is visible.
#[async_trait]
impl ScriptRepository for PostgresScriptRepository {
async fn get(&self, _id: ScriptId) -> Result<Option<Script>, ScriptRepositoryError> {
let _ = &self.pool;
Ok(None)
async fn get(&self, id: ScriptId) -> Result<Option<Script>, ScriptRepositoryError> {
let row = sqlx::query_as::<_, ScriptRow>(
"SELECT id, name, description, version, source, \
timeout_seconds, memory_limit_mb, created_at, updated_at \
FROM scripts WHERE id = $1",
)
.bind(id.into_inner())
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn list(&self) -> Result<Vec<Script>, ScriptRepositoryError> {
Ok(Vec::new())
let rows = sqlx::query_as::<_, ScriptRow>(
"SELECT id, name, description, version, source, \
timeout_seconds, memory_limit_mb, created_at, updated_at \
FROM scripts ORDER BY name",
)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn create(&self, _script: &Script) -> Result<(), ScriptRepositoryError> {
Ok(())
async fn create(&self, input: NewScript) -> Result<Script, ScriptRepositoryError> {
let res = sqlx::query_as::<_, ScriptRow>(
"INSERT INTO scripts (name, description, source, timeout_seconds, memory_limit_mb) \
VALUES ($1, $2, $3, COALESCE($4, 30), COALESCE($5, 256)) \
RETURNING id, name, description, version, source, \
timeout_seconds, memory_limit_mb, created_at, updated_at",
)
.bind(&input.name)
.bind(input.description.as_deref())
.bind(&input.source)
.bind(input.timeout_seconds)
.bind(input.memory_limit_mb)
.fetch_one(&self.pool)
.await;
match res {
Ok(row) => Ok(row.into()),
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
Err(ScriptRepositoryError::Conflict(format!(
"a script named {:?} already exists",
input.name
)))
}
Err(e) => Err(e.into()),
}
}
async fn update(&self, _script: &Script) -> Result<(), ScriptRepositoryError> {
Ok(())
async fn update(
&self,
id: ScriptId,
patch: ScriptPatch,
) -> Result<Script, ScriptRepositoryError> {
// COALESCE-based partial update: `NULL` parameters leave columns
// untouched. Description is double-Optioned so callers can
// explicitly set it to NULL (Some(None)) vs leave it alone (None).
let row = sqlx::query_as::<_, ScriptRow>(
"UPDATE scripts SET \
name = COALESCE($2, name), \
description = CASE WHEN $3::bool THEN $4 ELSE description END, \
source = COALESCE($5, source), \
timeout_seconds = COALESCE($6, timeout_seconds), \
memory_limit_mb = COALESCE($7, memory_limit_mb), \
version = version + 1, \
updated_at = NOW() \
WHERE id = $1 \
RETURNING id, name, description, version, source, \
timeout_seconds, memory_limit_mb, created_at, updated_at",
)
.bind(id.into_inner())
.bind(patch.name.as_deref())
.bind(patch.description.is_some())
.bind(patch.description.as_ref().and_then(|d| d.as_deref()))
.bind(patch.source.as_deref())
.bind(patch.timeout_seconds)
.bind(patch.memory_limit_mb)
.fetch_optional(&self.pool)
.await?;
row.map(Into::into)
.ok_or(ScriptRepositoryError::NotFound(id))
}
async fn delete(&self, _id: ScriptId) -> Result<(), ScriptRepositoryError> {
async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError> {
let res = sqlx::query("DELETE FROM scripts WHERE id = $1")
.bind(id.into_inner())
.execute(&self.pool)
.await?;
if res.rows_affected() == 0 {
return Err(ScriptRepositoryError::NotFound(id));
}
Ok(())
}
}
/// Row shape mirroring the `scripts` table for sqlx FromRow.
#[derive(sqlx::FromRow)]
struct ScriptRow {
id: uuid::Uuid,
name: String,
description: Option<String>,
version: i32,
source: String,
timeout_seconds: i32,
memory_limit_mb: i32,
created_at: chrono::DateTime<chrono::Utc>,
updated_at: chrono::DateTime<chrono::Utc>,
}
impl From<ScriptRow> for Script {
fn from(r: ScriptRow) -> Self {
Self {
id: r.id.into(),
name: r.name,
description: r.description,
version: r.version,
source: r.source,
timeout_seconds: u32::try_from(r.timeout_seconds).unwrap_or(30),
memory_limit_mb: u32::try_from(r.memory_limit_mb).unwrap_or(256),
created_at: r.created_at,
updated_at: r.updated_at,
}
}
}
/// Adapts a `ScriptRepository` into the `ScriptResolver` trait the
/// orchestrator depends on, so we don't pull the manager into the
/// orchestrator's dependency graph.
/// orchestrator depends on. Keeps orchestrator-core unaware of how
/// scripts are stored.
pub struct RepoResolver<R: ScriptRepository> {
repo: R,
}

View File

@@ -13,6 +13,7 @@ picloud-shared.workspace = true
picloud-executor-core.workspace = true
async-trait.workspace = true
axum.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
@@ -20,3 +21,4 @@ tracing.workspace = true
uuid.workspace = true
chrono.workspace = true
reqwest.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,180 @@
//! Data-plane HTTP surface. Mounted by the `picloud` all-in-one binary
//! under `/api` (so the path becomes `/api/execute/:id`) and by the
//! future split `picloud-orchestrator` binary at its own root.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use axum::{
body::Bytes,
extract::{Path, State},
http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
response::{IntoResponse, Response},
routing::post,
Json, Router,
};
use picloud_executor_core::{ExecError, ExecRequest, InvocationType};
use picloud_shared::{ExecutionId, RequestId, ScriptId};
use serde_json::Value as Json_;
use crate::client::ExecutorClient;
use crate::resolver::{ResolverError, ScriptResolver};
/// State shared by data-plane handlers.
///
/// Both fields are `Arc` because handlers run concurrently; the
/// underlying impls are `Send + Sync` (enforced by their traits).
pub struct DataPlaneState<E, R> {
pub executor: Arc<E>,
pub resolver: Arc<R>,
}
impl<E, R> Clone for DataPlaneState<E, R> {
fn clone(&self) -> Self {
Self {
executor: self.executor.clone(),
resolver: self.resolver.clone(),
}
}
}
/// Build the data-plane router. Handles `POST /execute/:id`.
pub fn data_plane_router<E, R>(state: DataPlaneState<E, R>) -> Router
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
Router::new()
.route("/execute/{id}", post(execute_by_id::<E, R>))
.with_state(state)
}
// ----------------------------------------------------------------------------
// Handlers
// ----------------------------------------------------------------------------
async fn execute_by_id<E, R>(
State(state): State<DataPlaneState<E, R>>,
Path(id): Path<ScriptId>,
headers: HeaderMap,
body: Bytes,
) -> Result<Response, ApiError>
where
E: ExecutorClient + 'static,
R: ScriptResolver + 'static,
{
let script = state
.resolver
.resolve(id)
.await?
.ok_or(ApiError::NotFound(id))?;
let req = build_exec_request(id, &script.name, &headers, &body)?;
let timeout = Duration::from_secs(u64::from(script.timeout_seconds));
let resp = state.executor.execute(&script.source, req, timeout).await?;
Ok(exec_response_to_http(resp))
}
// ----------------------------------------------------------------------------
// Marshalling
// ----------------------------------------------------------------------------
fn build_exec_request(
id: ScriptId,
name: &str,
headers: &HeaderMap,
body: &Bytes,
) -> Result<ExecRequest, ApiError> {
let mut hmap = BTreeMap::new();
for (k, v) in headers {
if let Ok(s) = v.to_str() {
hmap.insert(k.as_str().to_string(), s.to_string());
}
}
let body_json: Json_ = if body.is_empty() {
Json_::Null
} else {
serde_json::from_slice(body)
.map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))?
};
Ok(ExecRequest {
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
script_id: id,
script_name: name.to_string(),
invocation_type: InvocationType::Http,
path: format!("/api/execute/{id}"),
headers: hmap,
body: body_json,
})
}
fn exec_response_to_http(resp: picloud_executor_core::ExecResponse) -> Response {
let status =
StatusCode::from_u16(resp.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut http_headers = HeaderMap::new();
for (k, v) in resp.headers {
if let (Ok(name), Ok(value)) = (k.parse::<HeaderName>(), v.parse::<HeaderValue>()) {
http_headers.insert(name, value);
}
}
// Default content type to JSON; the script can override via `headers`.
http_headers
.entry(axum::http::header::CONTENT_TYPE)
.or_insert_with(|| HeaderValue::from_static("application/json"));
(status, http_headers, Json(resp.body)).into_response()
}
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------
#[derive(Debug, thiserror::Error)]
pub enum ApiError {
#[error("script not found: {0}")]
NotFound(ScriptId),
#[error("bad request: {0}")]
BadRequest(String),
#[error("resolver error: {0}")]
Resolver(#[from] ResolverError),
#[error("execution error: {0}")]
Exec(#[from] ExecError),
}
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
use ApiError as E;
let (status, message) = match &self {
E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()),
E::Resolver(e) => {
tracing::error!(error = %e, "resolver failure");
(
StatusCode::INTERNAL_SERVER_ERROR,
"internal error".to_string(),
)
}
E::Exec(e) => match e {
ExecError::Parse(_) | ExecError::InvalidResponse(_) => {
(StatusCode::UNPROCESSABLE_ENTITY, e.to_string())
}
ExecError::Timeout(_) => (StatusCode::GATEWAY_TIMEOUT, e.to_string()),
ExecError::OperationBudgetExceeded => {
(StatusCode::INSUFFICIENT_STORAGE, e.to_string())
}
ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()),
},
};
(status, Json(serde_json::json!({ "error": message }))).into_response()
}
}

View File

@@ -1,20 +1,35 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse};
/// Maximum wall-clock time we'll wait for a single invocation, regardless
/// of the per-script `timeout_seconds`. Provides a hard ceiling on
/// resource usage independent of misconfigured scripts.
const HARD_TIMEOUT_CAP: Duration = Duration::from_secs(300);
/// The seam between the orchestrator and the executor.
///
/// Single-node mode plugs in `LocalExecutorClient`, which calls
/// `executor-core` in-process. Cluster mode plugs in `RemoteExecutorClient`,
/// which forwards over HTTP to an executor node. Everything else in
/// orchestrator-core depends only on this trait.
/// `executor-core` in-process via `spawn_blocking`. Cluster mode plugs
/// in `RemoteExecutorClient`, which forwards over HTTP to an executor
/// node. Everything else in orchestrator-core depends only on this trait.
#[async_trait]
pub trait ExecutorClient: Send + Sync {
async fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError>;
async fn execute(
&self,
source: &str,
req: ExecRequest,
timeout: Duration,
) -> Result<ExecResponse, ExecError>;
}
/// In-process executor — wraps `executor-core::Engine` directly.
///
/// `executor-core::Engine::execute` is synchronous; we offload it to a
/// blocking thread so it doesn't park a Tokio worker, and apply the
/// wall-clock timeout here.
pub struct LocalExecutorClient {
engine: Arc<Engine>,
}
@@ -28,8 +43,26 @@ impl LocalExecutorClient {
#[async_trait]
impl ExecutorClient for LocalExecutorClient {
async fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> {
self.engine.execute(source, req).await
async fn execute(
&self,
source: &str,
req: ExecRequest,
timeout: Duration,
) -> Result<ExecResponse, ExecError> {
let timeout = timeout.min(HARD_TIMEOUT_CAP);
let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX);
let engine = self.engine.clone();
let source = source.to_string();
let join = tokio::task::spawn_blocking(move || engine.execute(&source, req));
match tokio::time::timeout(timeout, join).await {
Err(_) => Err(ExecError::Timeout(timeout_secs)),
Ok(Err(join_err)) => Err(ExecError::Runtime(format!(
"execution task panicked: {join_err}"
))),
Ok(Ok(res)) => res,
}
}
}
@@ -53,7 +86,12 @@ impl RemoteExecutorClient {
#[async_trait]
impl ExecutorClient for RemoteExecutorClient {
async fn execute(&self, _source: &str, _req: ExecRequest) -> Result<ExecResponse, ExecError> {
async fn execute(
&self,
_source: &str,
_req: ExecRequest,
_timeout: Duration,
) -> Result<ExecResponse, ExecError> {
Err(ExecError::Runtime(
"RemoteExecutorClient not implemented (cluster mode is v1.3+)".into(),
))

View File

@@ -8,8 +8,10 @@
//! trait is the seam that lets the orchestrator call executor logic
//! in-process (single-node) or over HTTP (cluster).
pub mod api;
pub mod client;
pub mod resolver;
pub use api::{data_plane_router, DataPlaneState};
pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient};
pub use resolver::{ResolverError, ScriptResolver};

View File

@@ -19,9 +19,11 @@ picloud-orchestrator-core.workspace = true
picloud-manager-core.workspace = true
tokio.workspace = true
async-trait.workspace = true
axum.workspace = true
tower.workspace = true
tower-http.workspace = true
sqlx.workspace = true
serde.workspace = true
serde_json.workspace = true
anyhow.workspace = true

View File

@@ -1,35 +1,97 @@
//! PiCloud all-in-one binary — runs manager + orchestrator + executor in
//! one process. This is the only binary built for MVP. The split binaries
//! (`picloud-manager`, `picloud-orchestrator`, `picloud-executor`) exist
//! to enforce the crate boundaries and will be fleshed out in v1.3+
//! when cluster mode is built.
//! PiCloud all-in-one binary — manager + orchestrator + executor in
//! one process. The only binary built for MVP.
//!
//! On startup it opens the Postgres pool, runs migrations, builds the
//! Rhai engine, then nests both core routers behind a single Axum
//! listener:
//!
//! /api/admin/* → manager-core (script CRUD)
//! /api/execute/{id} → orchestrator-core (data plane)
//! /healthz → liveness probe
//!
//! Cluster-mode (v1.3+) keeps this layout — splits each nested router
//! into its own binary, swaps `LocalExecutorClient` for the remote one,
//! and points Caddy at the new upstreams.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::{routing::get, Router};
use std::net::SocketAddr;
use picloud_executor_core::{Engine, Limits};
use picloud_manager_core::{
admin_router, migrations, AdminState, PostgresScriptRepository, RepoResolver,
};
use picloud_orchestrator_core::{data_plane_router, DataPlaneState, LocalExecutorClient};
use picloud_shared::ScriptValidator;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use tower_http::trace::TraceLayer;
use tracing_subscriber::EnvFilter;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()))
.json()
.init();
let app = Router::new()
.route("/healthz", get(healthz))
.route("/", get(root));
init_tracing();
let addr: SocketAddr = std::env::var("PICLOUD_BIND")
.unwrap_or_else(|_| "0.0.0.0:8080".into())
.parse()?;
let database_url =
std::env::var("DATABASE_URL").map_err(|_| anyhow::anyhow!("DATABASE_URL is required"))?;
let pool = init_db(&database_url).await?;
migrations::run(&pool).await?;
tracing::info!("migrations applied");
let app = build_app(pool);
let listener = tokio::net::TcpListener::bind(addr).await?;
tracing::info!(%addr, "picloud all-in-one listening");
axum::serve(listener, app).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
Ok(())
}
fn init_tracing() {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| "info".into()))
.json()
.init();
}
async fn init_db(url: &str) -> anyhow::Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(5))
.connect(url)
.await?;
Ok(pool)
}
fn build_app(pool: PgPool) -> Router {
// Core services. The `Arc`s let the routers and any background
// tasks share the same instances cheaply.
let engine = Arc::new(Engine::new(Limits::default()));
let repo = Arc::new(PostgresScriptRepository::new(pool));
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(repo.clone())));
let executor = Arc::new(LocalExecutorClient::new(engine.clone()));
let admin = AdminState {
repo: Arc::new(PostgresScriptRepoHandle(repo)),
validator: engine as Arc<dyn ScriptValidator>,
};
let data_plane = DataPlaneState { executor, resolver };
Router::new()
.route("/healthz", get(healthz))
.route("/", get(root))
.nest("/api/admin", admin_router(admin))
.nest("/api", data_plane_router(data_plane))
.layer(TraceLayer::new_for_http())
}
async fn healthz() -> &'static str {
"ok"
}
@@ -37,3 +99,66 @@ async fn healthz() -> &'static str {
async fn root() -> &'static str {
"picloud — see /api/admin/* (manager) and /api/execute/* (orchestrator)"
}
async fn shutdown_signal() {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
};
#[cfg(unix)]
let terminate = async {
if let Ok(mut s) = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
{
s.recv().await;
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => tracing::info!("ctrl-c received, draining"),
() = terminate => tracing::info!("SIGTERM received, draining"),
}
}
// ----------------------------------------------------------------------------
// Bridge: PostgresScriptRepository is constructed once and shared via
// Arc; `RepoResolver` wants ownership of an impl of `ScriptRepository`.
// We pass a thin wrapper that delegates to the Arc'd repo, so a single
// connection pool backs both the admin router and the resolver.
// ----------------------------------------------------------------------------
struct PostgresScriptRepoHandle(Arc<PostgresScriptRepository>);
#[async_trait::async_trait]
impl picloud_manager_core::ScriptRepository for PostgresScriptRepoHandle {
async fn get(
&self,
id: picloud_shared::ScriptId,
) -> Result<Option<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.get(id).await
}
async fn list(
&self,
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.list().await
}
async fn create(
&self,
input: picloud_manager_core::NewScript,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.create(input).await
}
async fn update(
&self,
id: picloud_shared::ScriptId,
patch: picloud_manager_core::ScriptPatch,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.update(id, patch).await
}
async fn delete(
&self,
id: picloud_shared::ScriptId,
) -> Result<(), picloud_manager_core::ScriptRepositoryError> {
self.0.delete(id).await
}
}

View File

@@ -7,7 +7,9 @@
pub mod error;
pub mod ids;
pub mod script;
pub mod validator;
pub use error::Error;
pub use ids::{ExecutionId, RequestId, ScriptId};
pub use script::Script;
pub use validator::{ScriptValidator, ValidationError};

View File

@@ -0,0 +1,17 @@
//! Abstraction for parse-time script validation.
//!
//! Lives in `shared` so the manager can call into validation logic
//! without taking a hard dep on the executor crate. The executor-core
//! crate provides the impl by registering `Engine` as a `ScriptValidator`.
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ValidationError {
#[error("invalid script source: {0}")]
Syntax(String),
}
pub trait ScriptValidator: Send + Sync {
fn validate(&self, source: &str) -> Result<(), ValidationError>;
}