From 4f044e7b817aa86f2e918eca044e99b792c03ab4 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 23 May 2026 00:00:36 +0200 Subject: [PATCH] feat: end-to-end script CRUD + Rhai execution MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings the MVP feature set online: upload a Rhai script, get an HTTP endpoint that runs it sandboxed in-process, list/update/delete it, and have invalid sources rejected at upload time. Verified live through Caddy with a full lifecycle (`create → list → get → execute → update → delete`) plus error paths (syntax error, duplicate name, deleted). Layout — every concern lands behind the trait seam its layer owns, so cluster-mode in v1.3+ is a swap of two impls, not a rewrite: * shared::ScriptValidator — manager calls into validation without a hard dep on executor-core; executor-core impls the trait on `Engine`. Pinned in shared so neither crate has to know about the other. * executor-core::Engine — real Rhai engine: sandbox limits (max operations / string size / map size / call depth), disabled `print`, blocked `import` (DummyModuleResolver), `log::trace /info/warn/error` registered as a static module with shared log-capture buffer (no `log::debug` because `debug` is a Rhai reserved keyword — `log::trace` covers the same need). - `ctx` is pushed as a Scope constant exposing execution_id, script_id, script_name, request_id, invocation_type, request.{path,headers,body}. - Response convention: a Map with `statusCode` is the structured shape (`{statusCode, headers?, body}`); any other return value is a 200 with the value as the body. - Engine::execute is now synchronous (pure compute); the async wrapper + wall-clock timeout live in LocalExecutorClient, which spawns_blocking and applies a 300s hard ceiling regardless of per-script config. - 10 unit tests cover validate, exec, structured response, ctx exposure, log capture, op-budget enforcement, runtime errors, blocked imports, JSON round-tripping. * manager-core::repo — full sqlx CRUD over the `scripts` table, with proper unique-violation handling for duplicate names. Embedded migrations via `sqlx::migrate!` (one initial `0001_init.sql` for pgcrypto + scripts + execution_logs). * manager-core::api — `admin_router` mounts `/scripts` and `/scripts/{id}`. Create + Update validate source through the injected `ScriptValidator` before persistence. Returns proper 422/409/404 status codes via `ApiError::IntoResponse`. * orchestrator-core::api — `data_plane_router` mounts `/execute/{id}`: resolves the script through `ScriptResolver`, constructs the `ExecRequest` from headers+body, awaits `ExecutorClient::execute(..., timeout)`, translates the `ExecResponse` to an axum `Response` with header passthrough. Maps `ExecError` variants to 422/504/502/507. * picloud all-in-one — opens the pool, runs migrations, builds one engine, nests both routers under `/api/admin` and `/api`, enables structured JSON tracing and graceful shutdown on SIGTERM. Single `PostgresScriptRepository` Arc is shared by the admin router (writes) and the resolver (reads). Other changes: * Workspace axum bump 0.7 → 0.8 for the `{id}` path syntax matching the route definitions. * Workspace clippy: allow `needless_pass_by_value` and `boxed_local` to keep API ergonomics over pedantic noise. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 26 +- Cargo.toml | 6 +- crates/executor-core/src/engine.rs | 321 +++++++++++++++++-- crates/executor-core/src/types.rs | 2 +- crates/executor-core/tests/engine.rs | 157 +++++++++ crates/manager-core/Cargo.toml | 1 + crates/manager-core/migrations/0001_init.sql | 43 +++ crates/manager-core/src/api.rs | 197 ++++++++++++ crates/manager-core/src/lib.rs | 8 +- crates/manager-core/src/migrations.rs | 9 + crates/manager-core/src/repo.rs | 166 +++++++++- crates/orchestrator-core/Cargo.toml | 2 + crates/orchestrator-core/src/api.rs | 180 +++++++++++ crates/orchestrator-core/src/client.rs | 52 ++- crates/orchestrator-core/src/lib.rs | 2 + crates/picloud/Cargo.toml | 2 + crates/picloud/src/main.rs | 155 ++++++++- crates/shared/src/lib.rs | 2 + crates/shared/src/validator.rs | 17 + 19 files changed, 1272 insertions(+), 76 deletions(-) create mode 100644 crates/executor-core/tests/engine.rs create mode 100644 crates/manager-core/migrations/0001_init.sql create mode 100644 crates/manager-core/src/api.rs create mode 100644 crates/manager-core/src/migrations.rs create mode 100644 crates/orchestrator-core/src/api.rs create mode 100644 crates/shared/src/validator.rs diff --git a/Cargo.lock b/Cargo.lock index eaf60ef..6954294 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,13 +89,13 @@ checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" [[package]] name = "axum" -version = "0.7.9" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ - "async-trait", "axum-core", "bytes", + "form_urlencoded", "futures-util", "http", "http-body", @@ -108,8 +108,7 @@ dependencies = [ "mime", "percent-encoding", "pin-project-lite", - "rustversion", - "serde", + "serde_core", "serde_json", "serde_path_to_error", "serde_urlencoded", @@ -123,19 +122,17 @@ dependencies = [ [[package]] name = "axum-core" -version = "0.4.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ - "async-trait", "bytes", - "futures-util", + "futures-core", "http", "http-body", "http-body-util", "mime", "pin-project-lite", - "rustversion", "sync_wrapper", "tower-layer", "tower-service", @@ -1014,9 +1011,9 @@ dependencies = [ [[package]] name = "matchit" -version = "0.7.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] name = "md-5" @@ -1196,6 +1193,7 @@ name = "picloud" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "axum", "figment", "picloud-executor-core", @@ -1204,6 +1202,7 @@ dependencies = [ "picloud-shared", "serde", "serde_json", + "sqlx", "thiserror 1.0.69", "tokio", "tower", @@ -1255,6 +1254,7 @@ name = "picloud-manager-core" version = "0.1.0" dependencies = [ "async-trait", + "axum", "chrono", "picloud-orchestrator-core", "picloud-shared", @@ -1283,6 +1283,7 @@ name = "picloud-orchestrator-core" version = "0.1.0" dependencies = [ "async-trait", + "axum", "chrono", "picloud-executor-core", "picloud-shared", @@ -1290,6 +1291,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tokio", "tracing", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 50ed9ee..2719b5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ picloud-manager-core = { path = "crates/manager-core" } # Async + HTTP tokio = { version = "1.40", features = ["full"] } -axum = "0.7" +axum = "0.8" tower = "0.5" tower-http = { version = "0.6", features = ["trace", "cors"] } hyper = "1" @@ -71,6 +71,10 @@ module_name_repetitions = "allow" missing_errors_doc = "allow" missing_panics_doc = "allow" doc_markdown = "allow" +# API ergonomics: we deliberately take values by ownership for owned +# inputs (e.g. ExecRequest) and accept Rhai's Box as-is. +needless_pass_by_value = "allow" +boxed_local = "allow" [profile.release] lto = "thin" diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index 94eda21..2193f82 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -1,15 +1,26 @@ +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; +use std::time::Instant; + +use chrono::Utc; +use picloud_shared::{ScriptValidator, ValidationError}; +use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; +use serde_json::Value as Json; + use crate::sandbox::Limits; -use crate::types::{ExecError, ExecRequest, ExecResponse}; +use crate::types::{ + ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel, +}; /// Preconfigured Rhai engine with sandbox limits applied. /// /// One `Engine` is constructed at process startup and reused across -/// invocations. `execute` is the only entry point — it owns the per-call -/// scope and log buffer, then returns a complete `ExecResponse`. +/// invocations. `execute` is **synchronous** — it owns the per-call +/// scope and log buffer. Wall-clock timeouts and offloading off the +/// async runtime belong to the caller (orchestrator-core's +/// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`). pub struct Engine { limits: Limits, - // The actual `rhai::Engine` lands with the first execution implementation. - // Keep this opaque for now so callers don't bind to it. } impl Engine { @@ -23,26 +34,288 @@ impl Engine { &self.limits } - /// Parse-only validation, used by the manager at script-upload time - /// so syntax errors are surfaced before the first invocation. - pub fn validate(&self, _source: &str) -> Result<(), ExecError> { - // TODO(executor-core): wire `rhai::Engine::compile` - Ok(()) + /// Parse-only validation. Surfaced at script-upload time so syntax + /// errors are caught before the first invocation. Same logic as the + /// `ScriptValidator` impl below but with the richer `ExecError` + /// variant; callers in the executor path use this, the manager + /// path goes through the trait. + pub fn validate(&self, source: &str) -> Result<(), ExecError> { + let engine = build_engine(self.limits, None); + engine + .compile(source) + .map(|_| ()) + .map_err(|e| ExecError::Parse(e.to_string())) } - /// Execute `source` against `req` under the configured sandbox. - /// - /// `async` is part of the contract: v1.1+ SDK calls (kv, docs, http) - /// will await injected service providers from inside this method. - #[allow(clippy::unused_async)] - pub async fn execute( - &self, - _source: &str, - _req: ExecRequest, - ) -> Result { - // TODO(executor-core): wire `rhai::Engine::eval_with_scope` - Err(ExecError::Runtime( - "executor-core::Engine::execute not yet implemented".into(), - )) + /// Execute `source` against `req`. Op-budget protection comes from + /// Rhai's `set_max_operations`; wall-clock enforcement is the + /// caller's responsibility. + pub fn execute(&self, source: &str, req: ExecRequest) -> Result { + let logs: Arc>> = Arc::new(Mutex::new(Vec::new())); + let engine = build_engine(self.limits, Some(logs.clone())); + + let ast = engine + .compile(source) + .map_err(|e| ExecError::Parse(e.to_string()))?; + + let mut scope = Scope::new(); + scope.push_constant("ctx", build_ctx_map(&req)); + + let started = Instant::now(); + let value: Dynamic = engine + .eval_ast_with_scope(&mut scope, &ast) + .map_err(map_eval_error)?; + let duration = started.elapsed(); + + let logs = Arc::try_unwrap(logs).map_or_else( + |arc| arc.lock().map(|g| g.clone()).unwrap_or_default(), + |m| m.into_inner().unwrap_or_default(), + ); + + let (status_code, headers, body) = parse_response(value)?; + + Ok(ExecResponse { + status_code, + headers, + body, + logs, + stats: ExecStats { + duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX), + operations: 0, + }, + }) + } +} + +impl ScriptValidator for Engine { + fn validate(&self, source: &str) -> Result<(), ValidationError> { + Engine::validate(self, source).map_err(|e| ValidationError::Syntax(e.to_string())) + } +} + +// ---------------------------------------------------------------------------- +// Engine construction +// ---------------------------------------------------------------------------- + +fn build_engine(limits: Limits, logs: Option>>>) -> RhaiEngine { + let mut engine = RhaiEngine::new(); + + engine.set_max_operations(limits.max_operations); + engine.set_max_string_size(limits.max_string_size); + engine.set_max_array_size(limits.max_array_size); + engine.set_max_map_size(limits.max_map_size); + engine.set_max_call_levels(limits.max_call_levels); + engine.set_max_expr_depths(limits.max_expr_depth, limits.max_expr_depth); + + // Reject `import` — scripts cannot pull external modules. + engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver); + + // Rhai's built-in `print` and `debug` map to stdout/stderr by + // default; we never want scripts dumping there directly. Disable + // them so scripts route all output through `log::*` instead. + engine.disable_symbol("print"); + + if let Some(logs) = logs { + engine.register_static_module("log", build_log_module(logs).into()); + } + + engine +} + +fn build_log_module(logs: Arc>>) -> Module { + let mut module = Module::new(); + register_log_fn(&mut module, "trace", LogLevel::Trace, &logs); + register_log_fn(&mut module, "info", LogLevel::Info, &logs); + register_log_fn(&mut module, "warn", LogLevel::Warn, &logs); + register_log_fn(&mut module, "error", LogLevel::Error, &logs); + // No `log::debug` — `debug` is a Rhai reserved keyword. Use + // `log::trace` for sub-info-level diagnostics. + module +} + +fn register_log_fn( + module: &mut Module, + name: &str, + level: LogLevel, + logs: &Arc>>, +) { + // Single-argument form: `log::info("message")`. + let logs_single = logs.clone(); + module.set_native_fn(name, move |msg: &str| { + push_log(&logs_single, level, msg, None); + Ok::<_, Box>(()) + }); + + // Two-argument form: `log::info("message", #{ user: 42 })`. + let logs_struct = logs.clone(); + module.set_native_fn(name, move |msg: &str, data: Dynamic| { + let json = dynamic_to_json(&data); + push_log(&logs_struct, level, msg, Some(json)); + Ok::<_, Box>(()) + }); +} + +fn push_log(logs: &Arc>>, level: LogLevel, message: &str, data: Option) { + if let Ok(mut g) = logs.lock() { + g.push(LogEntry { + timestamp: Utc::now(), + level, + message: message.to_string(), + data, + }); + } +} + +// ---------------------------------------------------------------------------- +// ctx construction +// ---------------------------------------------------------------------------- + +fn build_ctx_map(req: &ExecRequest) -> Map { + let mut ctx = Map::new(); + ctx.insert("execution_id".into(), req.execution_id.to_string().into()); + ctx.insert("script_id".into(), req.script_id.to_string().into()); + ctx.insert("script_name".into(), req.script_name.clone().into()); + ctx.insert("request_id".into(), req.request_id.to_string().into()); + ctx.insert( + "invocation_type".into(), + invocation_type_str(req.invocation_type).into(), + ); + + let mut request = Map::new(); + request.insert("path".into(), req.path.clone().into()); + + let mut headers = Map::new(); + for (k, v) in &req.headers { + headers.insert(k.clone().into(), v.clone().into()); + } + request.insert("headers".into(), headers.into()); + + request.insert("body".into(), json_to_dynamic(req.body.clone())); + + ctx.insert("request".into(), request.into()); + ctx +} + +fn invocation_type_str(it: InvocationType) -> &'static str { + match it { + InvocationType::Http => "http", + InvocationType::Function => "function", + InvocationType::Scheduled => "scheduled", + } +} + +// ---------------------------------------------------------------------------- +// Response parsing +// ---------------------------------------------------------------------------- + +fn parse_response(value: Dynamic) -> Result<(u16, BTreeMap, Json), ExecError> { + // Convention: a Map with a `statusCode` field is the structured shape. + // Anything else is treated as a 200 response with the value as body. + if value.is_map() { + if let Some(map) = value.clone().try_cast::() { + if map.contains_key("statusCode") { + return parse_structured_response(map); + } + } + } + Ok((200, BTreeMap::new(), dynamic_to_json(&value))) +} + +fn parse_structured_response(map: Map) -> Result<(u16, BTreeMap, Json), ExecError> { + let status_dyn = map + .get("statusCode") + .ok_or_else(|| ExecError::InvalidResponse("missing statusCode".into()))?; + let status_code: i64 = status_dyn + .as_int() + .map_err(|_| ExecError::InvalidResponse("statusCode must be an integer".into()))?; + let status_code = u16::try_from(status_code) + .map_err(|_| ExecError::InvalidResponse("statusCode out of HTTP range".into()))?; + + let mut headers: BTreeMap = BTreeMap::new(); + if let Some(h) = map.get("headers") { + if let Some(h_map) = h.clone().try_cast::() { + for (k, v) in h_map { + headers.insert(k.to_string(), v.to_string()); + } + } + } + + let body = map.get("body").map_or(Json::Null, dynamic_to_json); + + Ok((status_code, headers, body)) +} + +// ---------------------------------------------------------------------------- +// Rhai ↔ serde_json bridges +// ---------------------------------------------------------------------------- + +fn json_to_dynamic(value: Json) -> Dynamic { + match value { + Json::Null => Dynamic::UNIT, + Json::Bool(b) => b.into(), + Json::Number(n) => { + if let Some(i) = n.as_i64() { + i.into() + } else if let Some(f) = n.as_f64() { + f.into() + } else { + n.to_string().into() + } + } + Json::String(s) => s.into(), + Json::Array(arr) => arr + .into_iter() + .map(json_to_dynamic) + .collect::>() + .into(), + Json::Object(obj) => { + let mut m = Map::new(); + for (k, v) in obj { + m.insert(k.into(), json_to_dynamic(v)); + } + Dynamic::from(m) + } + } +} + +fn dynamic_to_json(value: &Dynamic) -> Json { + if value.is_unit() { + return Json::Null; + } + if let Ok(b) = value.as_bool() { + return Json::Bool(b); + } + if let Ok(i) = value.as_int() { + return Json::Number(i.into()); + } + if let Ok(f) = value.as_float() { + return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number); + } + if value.is_string() { + return Json::String(value.clone().into_string().unwrap_or_default()); + } + if let Some(arr) = value.clone().try_cast::() { + return Json::Array(arr.iter().map(dynamic_to_json).collect()); + } + if let Some(map) = value.clone().try_cast::() { + let mut out = serde_json::Map::new(); + for (k, v) in map { + out.insert(k.to_string(), dynamic_to_json(&v)); + } + return Json::Object(out); + } + // Anything else (timestamps, custom types) — best-effort string form. + Json::String(value.to_string()) +} + +// ---------------------------------------------------------------------------- +// Error mapping +// ---------------------------------------------------------------------------- + +fn map_eval_error(err: Box) -> ExecError { + match *err { + EvalAltResult::ErrorTooManyOperations(_) => ExecError::OperationBudgetExceeded, + EvalAltResult::ErrorParsing(parse_err, _) => ExecError::Parse(parse_err.to_string()), + other => ExecError::Runtime(other.to_string()), } } diff --git a/crates/executor-core/src/types.rs b/crates/executor-core/src/types.rs index 1524843..4a53cec 100644 --- a/crates/executor-core/src/types.rs +++ b/crates/executor-core/src/types.rs @@ -41,7 +41,7 @@ pub struct ExecResponse { #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum LogLevel { - Debug, + Trace, Info, Warn, Error, diff --git a/crates/executor-core/tests/engine.rs b/crates/executor-core/tests/engine.rs new file mode 100644 index 0000000..1ee37de --- /dev/null +++ b/crates/executor-core/tests/engine.rs @@ -0,0 +1,157 @@ +use std::collections::BTreeMap; + +use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel}; +use picloud_shared::{ExecutionId, RequestId, ScriptId}; +use serde_json::json; + +fn req(body: serde_json::Value) -> ExecRequest { + ExecRequest { + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + script_id: ScriptId::new(), + script_name: "test".into(), + invocation_type: InvocationType::Http, + path: "/test".into(), + headers: BTreeMap::new(), + body, + } +} + +fn engine() -> Engine { + Engine::new(Limits::default()) +} + +#[test] +fn validate_accepts_well_formed_script() { + engine() + .validate("let x = 1; #{ statusCode: 200, body: x }") + .expect("valid script should validate"); +} + +#[test] +fn validate_rejects_syntax_errors() { + let err = engine() + .validate("this is not rhai @@@") + .expect_err("invalid script should not validate"); + assert!(matches!(err, ExecError::Parse(_))); +} + +#[test] +fn returns_unwrapped_value_as_200_body() { + let resp = engine() + .execute("42", req(json!(null))) + .expect("should execute"); + assert_eq!(resp.status_code, 200); + assert_eq!(resp.body, json!(42)); + assert!(resp.headers.is_empty()); +} + +#[test] +fn returns_structured_response_when_status_code_present() { + let src = r#" + #{ statusCode: 201, + headers: #{ "x-test": "hello" }, + body: #{ ok: true, msg: "created" } } + "#; + let resp = engine().execute(src, req(json!(null))).unwrap(); + assert_eq!(resp.status_code, 201); + assert_eq!( + resp.headers.get("x-test").map(String::as_str), + Some("hello") + ); + assert_eq!(resp.body, json!({ "ok": true, "msg": "created" })); +} + +#[test] +fn ctx_exposes_request_data() { + let src = r" + #{ statusCode: 200, + body: #{ + path: ctx.request.path, + name: ctx.script_name, + amount: ctx.request.body.amount + } } + "; + let r = ExecRequest { + path: "/payments".into(), + body: json!({ "amount": 1234 }), + script_name: "payments".into(), + ..req(json!(null)) + }; + let resp = engine().execute(src, r).unwrap(); + assert_eq!( + resp.body, + json!({ "path": "/payments", "name": "payments", "amount": 1234 }) + ); +} + +#[test] +fn captures_log_calls() { + let src = r#" + log::info("starting"); + log::warn("watch out", #{ count: 3 }); + log::error("oops"); + log::trace("deep diagnostic"); + 42 + "#; + let resp = engine().execute(src, req(json!(null))).unwrap(); + assert_eq!(resp.logs.len(), 4); + + let levels: Vec<_> = resp.logs.iter().map(|l| l.level).collect(); + assert_eq!( + levels, + vec![ + LogLevel::Info, + LogLevel::Warn, + LogLevel::Error, + LogLevel::Trace + ] + ); + assert_eq!(resp.logs[0].message, "starting"); + assert_eq!(resp.logs[1].data, Some(json!({ "count": 3 }))); +} + +#[test] +fn enforces_operation_budget() { + let limits = Limits { + max_operations: 1_000, + ..Limits::default() + }; + let engine = Engine::new(limits); + // 10_000 iterations vastly exceeds 1_000 ops. + let src = r"let n = 0; for i in 0..10000 { n += 1; } n"; + let err = engine + .execute(src, req(json!(null))) + .expect_err("should exceed budget"); + assert!(matches!(err, ExecError::OperationBudgetExceeded)); +} + +#[test] +fn runtime_error_is_mapped_to_runtime_variant() { + let err = engine() + .execute("1 / 0", req(json!(null))) + .expect_err("division by zero should error"); + assert!(matches!(err, ExecError::Runtime(_))); +} + +#[test] +fn module_import_is_blocked() { + let err = engine() + .execute(r#"import "evil" as e; 1"#, req(json!(null))) + .expect_err("imports should be blocked"); + // Module-not-found is reported as a runtime error via DummyModuleResolver. + assert!(matches!(err, ExecError::Runtime(_) | ExecError::Parse(_))); +} + +#[test] +fn body_passes_through_nested_json_round_trip() { + let src = "#{ statusCode: 200, body: ctx.request.body }"; + let body = json!({ + "deep": { + "list": [1, "two", 3.5, null, true, { "k": "v" }], + "count": 6 + } + }); + let resp = engine().execute(src, req(body.clone())).unwrap(); + assert_eq!(resp.body, body); +} diff --git a/crates/manager-core/Cargo.toml b/crates/manager-core/Cargo.toml index 16d2026..bc8d725 100644 --- a/crates/manager-core/Cargo.toml +++ b/crates/manager-core/Cargo.toml @@ -13,6 +13,7 @@ picloud-shared.workspace = true picloud-orchestrator-core.workspace = true async-trait.workspace = true +axum.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/crates/manager-core/migrations/0001_init.sql b/crates/manager-core/migrations/0001_init.sql new file mode 100644 index 0000000..4874d50 --- /dev/null +++ b/crates/manager-core/migrations/0001_init.sql @@ -0,0 +1,43 @@ +-- pgcrypto provides gen_random_uuid(). hstore is not needed yet (v1.1+ +-- KV service); leave it for the migration that introduces that feature. +CREATE EXTENSION IF NOT EXISTS pgcrypto; + +CREATE TABLE scripts ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name TEXT NOT NULL, + description TEXT, + version INTEGER NOT NULL DEFAULT 1, + source TEXT NOT NULL, + + timeout_seconds INTEGER NOT NULL DEFAULT 30 CHECK (timeout_seconds > 0 AND timeout_seconds <= 300), + memory_limit_mb INTEGER NOT NULL DEFAULT 256 CHECK (memory_limit_mb > 0 AND memory_limit_mb <= 2048), + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Names are user-facing; unique so the dashboard list and any future +-- name-based routing have an obvious identifier to surface. +CREATE UNIQUE INDEX scripts_name_uidx ON scripts (LOWER(name)); + +CREATE TABLE execution_logs ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + script_id UUID NOT NULL REFERENCES scripts(id) ON DELETE CASCADE, + request_id UUID NOT NULL, + + request_path TEXT, + request_headers JSONB NOT NULL DEFAULT '{}'::jsonb, + request_body JSONB, + + response_code INTEGER, + response_body JSONB, + + logs JSONB NOT NULL DEFAULT '[]'::jsonb, + duration_ms INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL CHECK (status IN ('success','error','timeout','budget_exceeded')), + + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX execution_logs_script_id_created_at_idx + ON execution_logs (script_id, created_at DESC); diff --git a/crates/manager-core/src/api.rs b/crates/manager-core/src/api.rs new file mode 100644 index 0000000..d6cb3b7 --- /dev/null +++ b/crates/manager-core/src/api.rs @@ -0,0 +1,197 @@ +//! Control-plane HTTP surface. Mounted by the `picloud` all-in-one +//! binary under `/api/admin` and by the future split `picloud-manager` +//! binary at its own root. + +use std::sync::Arc; + +use axum::{ + extract::{Path, State}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::get, + Json, Router, +}; +use picloud_shared::{Script, ScriptId, ScriptValidator, ValidationError}; +use serde::Deserialize; + +use crate::repo::{NewScript, ScriptPatch, ScriptRepository, ScriptRepositoryError}; + +/// State shared by control-plane handlers. Separates concerns so the +/// manager can validate at upload time without depending on the +/// concrete executor-core types. +pub struct AdminState { + pub repo: Arc, + pub validator: Arc, +} + +impl Clone for AdminState { + fn clone(&self) -> Self { + Self { + repo: self.repo.clone(), + validator: self.validator.clone(), + } + } +} + +/// Build the admin router. The caller (binary) chooses where to mount +/// it (typically `Router::new().nest("/api/admin", admin_router(state))`). +pub fn admin_router(state: AdminState) -> Router { + Router::new() + .route("/scripts", get(list_scripts::).post(create_script::)) + .route( + "/scripts/{id}", + get(get_script::) + .put(update_script::) + .delete(delete_script::), + ) + .with_state(state) +} + +// ---------------------------------------------------------------------------- +// DTOs +// ---------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +pub struct CreateScriptRequest { + pub name: String, + pub description: Option, + pub source: String, + pub timeout_seconds: Option, + pub memory_limit_mb: Option, +} + +#[derive(Debug, Deserialize)] +pub struct UpdateScriptRequest { + pub name: Option, + // Double Option lets clients explicitly clear the description by + // sending `"description": null`; an absent field leaves it alone. + #[serde(default, deserialize_with = "deserialize_optional_optional")] + #[allow(clippy::option_option)] + pub description: Option>, + pub source: Option, + pub timeout_seconds: Option, + pub memory_limit_mb: Option, +} + +#[allow(clippy::option_option)] +fn deserialize_optional_optional<'de, D>(d: D) -> Result>, D::Error> +where + D: serde::Deserializer<'de>, +{ + Option::::deserialize(d).map(Some) +} + +// ---------------------------------------------------------------------------- +// Handlers +// ---------------------------------------------------------------------------- + +async fn list_scripts( + State(state): State>, +) -> Result>, ApiError> { + Ok(Json(state.repo.list().await?)) +} + +async fn get_script( + State(state): State>, + Path(id): Path, +) -> Result, ApiError> { + state + .repo + .get(id) + .await? + .map(Json) + .ok_or(ApiError::NotFound(id)) +} + +async fn create_script( + State(state): State>, + Json(input): Json, +) -> Result<(StatusCode, Json