From 2669714a518373409ed4a5c0249e5bb0c7d2a949 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:40:09 +0200 Subject: [PATCH 1/9] feat(shared): SdkCallCx, Services bundle, ServiceEventEmitter trait shape MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for the v1.1.x stateful SDK services. Lands the shape only: - SdkCallCx — per-call context plumbed into every future service trait method (app_id, principal, execution/request ids, trigger depth slots). - Services — empty non_exhaustive bundle; v1.1.1 (KV) adds the first field, subsequent PRs follow. - ServiceEventEmitter — async trait future services emit through; real outbox-backed impl lands with triggers in v1.1.1. NoopEventEmitter is the v1.1.0 default. No behaviour change. Subsequent commits in this PR plumb these types through executor-core and the orchestrator dispatch path. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/shared/src/events.rs | 119 ++++++++++++++++++++++++++++++++++ crates/shared/src/lib.rs | 6 ++ crates/shared/src/sdk_cx.rs | 54 +++++++++++++++ crates/shared/src/services.rs | 38 +++++++++++ 4 files changed, 217 insertions(+) create mode 100644 crates/shared/src/events.rs create mode 100644 crates/shared/src/sdk_cx.rs create mode 100644 crates/shared/src/services.rs diff --git a/crates/shared/src/events.rs b/crates/shared/src/events.rs new file mode 100644 index 0000000..1720925 --- /dev/null +++ b/crates/shared/src/events.rs @@ -0,0 +1,119 @@ +//! `ServiceEventEmitter` — the contract every stateful SDK service uses +//! to publish events into the (future) triggers framework. +//! +//! v1.1.0 ships only the trait shape and a `NoopEventEmitter` that +//! drops every event. The real outbox-backed implementation lands with +//! the triggers PR in v1.1.1; locking the trait now means services +//! written in subsequent v1.1.x PRs (KV, docs, files, …) don't have to +//! re-thread their plumbing when the dispatcher arrives. +//! +//! Design rationale (full discussion: `docs/sdk-shape.md`): +//! * Async — outbox writes hit Postgres. +//! * Cx is passed in so the emitter can attribute the event to the +//! `app_id` / `principal` / `execution_id` that produced it. +//! * Events carry their semantic identity (`source` + `op`) plus +//! optional locator (`collection` + `key`) and optional payloads +//! (`payload` for the new value, `old_payload` for the previous on +//! updates). The dispatcher matches on (source, op, collection) +//! filters to decide which scripts to fan out to. + +use async_trait::async_trait; +use thiserror::Error; + +use crate::SdkCallCx; + +/// Trait every stateful service depends on to emit events. The host +/// binary constructs one instance and clones the Arc into each service. +#[async_trait] +pub trait ServiceEventEmitter: Send + Sync { + /// Publish a single event. Implementations are expected to be + /// fire-and-forget from the caller's perspective: the outbox impl + /// will return `Ok(())` once the event is durably persisted, the + /// dispatcher reads it out-of-band. + async fn emit(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError>; +} + +/// One service event. `source` and `op` are `&'static str` because they +/// come from a fixed enumeration baked into each service (`"kv"` + +/// `"insert"`/`"update"`/`"delete"`, etc.) — never from user data. +/// `collection`/`key`/payloads come from user data and are owned. +#[derive(Debug, Clone)] +pub struct ServiceEvent { + /// Service namespace. Matches the Rhai module name: `"kv"`, + /// `"docs"`, `"files"`, etc. + pub source: &'static str, + + /// Operation verb. Each service defines its own vocabulary; + /// dispatcher filters match on the literal string. + pub op: &'static str, + + /// Affected collection, when the service is collection-scoped + /// (`kv`, `docs`, `files`). `None` for collection-less events. + pub collection: Option, + + /// Affected key/id within the collection, when applicable. + pub key: Option, + + /// New value after the operation, when carrying it is cheap and + /// useful. `None` for deletes. + pub payload: Option, + + /// Previous value before the operation, populated on `update` / + /// `delete` so triggers can diff. `None` on `insert`. + pub old_payload: Option, +} + +/// Errors an emitter can surface upward. The noop impl never returns +/// these; the v1.1.1 outbox impl uses `Unavailable` for pool/connection +/// failures and `Rejected` for malformed payloads (e.g. event JSON too +/// large for the outbox row). +#[derive(Debug, Error)] +pub enum EmitError { + #[error("event sink unavailable: {0}")] + Unavailable(String), + #[error("event sink rejected event: {0}")] + Rejected(String), +} + +/// Default emitter for v1.1.0. Accepts every event, persists nothing, +/// always returns `Ok(())`. Wired in the picloud binary; the v1.1.1 +/// triggers PR swaps this for a Postgres outbox writer. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopEventEmitter; + +#[async_trait] +impl ServiceEventEmitter for NoopEventEmitter { + async fn emit(&self, _cx: &SdkCallCx, _event: ServiceEvent) -> Result<(), EmitError> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // Compile-time check that ServiceEventEmitter is dyn-safe — every + // service holds it as `Arc` and would + // silently break the workspace if a non-object-safe method snuck + // in. Behavioural tests for the noop impl come for free once a + // service exercises it (v1.1.1+); avoid pulling tokio into + // `picloud-shared` just for a one-line `emit().await` check. + #[allow(dead_code)] + fn assert_dyn_compatible(_e: &dyn ServiceEventEmitter) {} + + #[test] + fn service_event_construction_is_explicit() { + // Pin the field layout so a re-ordering in a future PR causes a + // compile failure here rather than silently misattributing + // events. Hash-derive isn't appropriate (serde_json::Value isn't + // Hash), so structural construction is the assertion. + let _ = ServiceEvent { + source: "kv", + op: "insert", + collection: Some("widgets".into()), + key: Some("k1".into()), + payload: Some(serde_json::json!({"v": 1})), + old_payload: None, + }; + } +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 53b864a..5714861 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -7,23 +7,29 @@ pub mod app; pub mod auth; pub mod error; +pub mod events; pub mod execution_log; pub mod ids; pub mod log_sink; pub mod route; pub mod sandbox; pub mod script; +pub mod sdk_cx; +pub mod services; pub mod validator; pub mod version; pub use app::{App, AppDomain, DomainShape}; pub use auth::{AppRole, InstanceRole, Principal, Scope, UserId}; pub use error::Error; +pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter}; pub use execution_log::{ExecutionLog, ExecutionStatus}; pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId}; pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use route::{HostKind, PathKind, Route}; pub use sandbox::ScriptSandbox; pub use script::Script; +pub use sdk_cx::SdkCallCx; +pub use services::Services; pub use validator::{ScriptValidator, ValidationError}; pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION}; diff --git a/crates/shared/src/sdk_cx.rs b/crates/shared/src/sdk_cx.rs new file mode 100644 index 0000000..3fa4b35 --- /dev/null +++ b/crates/shared/src/sdk_cx.rs @@ -0,0 +1,54 @@ +//! `SdkCallCx` — per-call context every stateful SDK service receives. +//! +//! Service trait methods (added by subsequent v1.1.x PRs starting with +//! KV) all take `&SdkCallCx` so they can: +//! * scope by `app_id` for cross-app isolation, +//! * audit `principal` when authenticated, +//! * carry `execution_id` / `request_id` into emitted events, +//! * bound trigger chains via `trigger_depth` / `root_execution_id`. +//! +//! The struct lives in `picloud-shared` (not `executor-core`) because +//! future service impls live in `manager-core` and the trait that hands +//! the cx in is shared by both sides. Pure value type — no handles, no +//! DB pool references, no allocations beyond what's in `Principal`. + +use crate::{AppId, ExecutionId, Principal, RequestId}; + +/// Per-invocation context for every stateful SDK service call. +/// +/// Constructed once at the start of an invocation by `executor-core` +/// from the incoming `ExecRequest`, then handed (by reference) to every +/// service trait method the script triggers during execution. Services +/// MUST derive `app_id` from this struct — never from script-passed +/// arguments — to preserve cross-app isolation. +#[derive(Debug, Clone)] +pub struct SdkCallCx { + /// Owning application for this invocation. Source of truth for + /// every `(app_id, …)` storage lookup the script makes. + pub app_id: AppId, + + /// Caller identity, when authenticated. `None` for unauthenticated + /// data-plane HTTP requests (the common case for public endpoints); + /// `Some` when the call came in via the dashboard, an API key, or a + /// future authed surface. + pub principal: Option, + + /// Unique id for THIS execution. Matches `ExecRequest.execution_id`. + pub execution_id: ExecutionId, + + /// Unique id for the ingress request that started the chain. The + /// same `request_id` is shared across every execution triggered by + /// the same request (direct + trigger fan-out). + pub request_id: RequestId, + + /// `0` for direct invocations (HTTP request, manual run). Each + /// indirect invocation through the triggers framework (v1.1.1) + /// increments this; the dispatcher rejects beyond a configured + /// ceiling to prevent runaway feedback loops. + pub trigger_depth: u32, + + /// `== execution_id` when `trigger_depth == 0`; otherwise the + /// `execution_id` of the original ingress execution. Lets the audit + /// log group every fan-out execution under the originating event. + pub root_execution_id: ExecutionId, +} diff --git a/crates/shared/src/services.rs b/crates/shared/src/services.rs new file mode 100644 index 0000000..6900c63 --- /dev/null +++ b/crates/shared/src/services.rs @@ -0,0 +1,38 @@ +//! `Services` — bundle of stateful SDK service handles plumbed from the +//! host binary into every Rhai execution. +//! +//! v1.1.0 ships this struct empty. Subsequent PRs in the v1.1.x series +//! add one field per service: +//! +//! ```ignore +//! pub kv: Arc, // v1.1.1 +//! pub docs: Arc, // v1.1.2 +//! pub http: Arc, // v1.1.4 +//! // … +//! ``` +//! +//! The bundle is cheap to clone (`Arc` per service) and is constructed +//! once at startup in the picloud binary. The executor takes it by +//! reference per invocation, hands it (alongside an `SdkCallCx`) to +//! `executor-core::sdk::register_all`, which wires the corresponding +//! Rhai `::` namespace per service. +//! +//! `#[non_exhaustive]` so adding fields is a non-breaking change for +//! consumers that only *pattern-match* a `&Services`; only crates that +//! *construct* a `Services` (in practice, just the picloud binary) need +//! to update their constructor when new services land. + +/// SDK service bundle. See module docs for the lifecycle and the v1.1.x +/// expansion plan. +#[non_exhaustive] +#[derive(Default)] +pub struct Services {} + +impl Services { + /// Construct an empty bundle. Replaced by a fielded `::new(...)` + /// once the first service (KV, v1.1.1) lands. + #[must_use] + pub fn new() -> Self { + Self {} + } +} From aaba58dee127c19e30bb501f0659e859aaa19811 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:43:03 +0200 Subject: [PATCH 2/9] =?UTF-8?q?refactor(executor-core):=20extract=20sdk/?= =?UTF-8?q?=20module=20+=20move=20json=E2=86=94dynamic=20bridge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hoist the json_to_dynamic / dynamic_to_json helpers out of engine.rs into a new sdk/bridge.rs so the v1.1.x service modules (KV, docs, …) can use them without engine.rs being the sole owner. No behavioural change — the sdk_contract round-trip test pins the observable JSON fidelity. Also lands the structural shape that subsequent v1.1.x PRs hook into: - sdk::register_all(engine, services, cx) — single per-call hook every stateful service registers through. Body is a no-op for v1.1.0; SdkCallCx construction inside Engine::execute lands in the next commit alongside the new ExecRequest fields it reads. - sdk::cx re-exports picloud_shared::SdkCallCx so SDK callers don't cross-import shared for one type. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/executor-core/src/engine.rs | 64 +-------------------- crates/executor-core/src/lib.rs | 1 + crates/executor-core/src/sdk/bridge.rs | 79 ++++++++++++++++++++++++++ crates/executor-core/src/sdk/cx.rs | 10 ++++ crates/executor-core/src/sdk/mod.rs | 39 +++++++++++++ 5 files changed, 130 insertions(+), 63 deletions(-) create mode 100644 crates/executor-core/src/sdk/bridge.rs create mode 100644 crates/executor-core/src/sdk/cx.rs create mode 100644 crates/executor-core/src/sdk/mod.rs diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index ee6b0ee..6e2e745 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -8,6 +8,7 @@ use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; use serde_json::Value as Json; use crate::sandbox::Limits; +use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic}; use crate::types::{ ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel, }; @@ -265,69 +266,6 @@ fn parse_structured_response(map: Map) -> Result<(u16, BTreeMap, Ok((status_code, headers, body)) } -// ---------------------------------------------------------------------------- -// Rhai ↔ serde_json bridges -// ---------------------------------------------------------------------------- - -fn json_to_dynamic(value: Json) -> Dynamic { - match value { - Json::Null => Dynamic::UNIT, - Json::Bool(b) => b.into(), - Json::Number(n) => { - if let Some(i) = n.as_i64() { - i.into() - } else if let Some(f) = n.as_f64() { - f.into() - } else { - n.to_string().into() - } - } - Json::String(s) => s.into(), - Json::Array(arr) => arr - .into_iter() - .map(json_to_dynamic) - .collect::>() - .into(), - Json::Object(obj) => { - let mut m = Map::new(); - for (k, v) in obj { - m.insert(k.into(), json_to_dynamic(v)); - } - Dynamic::from(m) - } - } -} - -fn dynamic_to_json(value: &Dynamic) -> Json { - if value.is_unit() { - return Json::Null; - } - if let Ok(b) = value.as_bool() { - return Json::Bool(b); - } - if let Ok(i) = value.as_int() { - return Json::Number(i.into()); - } - if let Ok(f) = value.as_float() { - return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number); - } - if value.is_string() { - return Json::String(value.clone().into_string().unwrap_or_default()); - } - if let Some(arr) = value.clone().try_cast::() { - return Json::Array(arr.iter().map(dynamic_to_json).collect()); - } - if let Some(map) = value.clone().try_cast::() { - let mut out = serde_json::Map::new(); - for (k, v) in map { - out.insert(k.to_string(), dynamic_to_json(&v)); - } - return Json::Object(out); - } - // Anything else (timestamps, custom types) — best-effort string form. - Json::String(value.to_string()) -} - // ---------------------------------------------------------------------------- // Error mapping // ---------------------------------------------------------------------------- diff --git a/crates/executor-core/src/lib.rs b/crates/executor-core/src/lib.rs index 5e64a51..384a161 100644 --- a/crates/executor-core/src/lib.rs +++ b/crates/executor-core/src/lib.rs @@ -8,6 +8,7 @@ pub mod context; pub mod engine; pub mod logging; pub mod sandbox; +pub mod sdk; pub mod types; pub use engine::Engine; diff --git a/crates/executor-core/src/sdk/bridge.rs b/crates/executor-core/src/sdk/bridge.rs new file mode 100644 index 0000000..07d223d --- /dev/null +++ b/crates/executor-core/src/sdk/bridge.rs @@ -0,0 +1,79 @@ +//! JSON ↔ Rhai `Dynamic` value bridge. +//! +//! Originally inline in `engine.rs`; moved here for v1.1.0 so future +//! service modules (KV in v1.1.1, docs in v1.1.2, …) can convert +//! values without `engine.rs` being the only owner of the conversions. +//! Behaviour is unchanged from the pre-extraction implementation — +//! `sdk_contract.rs::json_round_trip_preserves_nested_shapes` pins the +//! observable round-trip. + +use rhai::{Dynamic, Map}; +use serde_json::Value as Json; + +/// Convert a `serde_json::Value` into a Rhai `Dynamic` suitable for +/// pushing into a script's scope. Numbers prefer the narrowest type +/// (`i64` over `f64`); anything that can't round-trip falls back to a +/// string so the script always sees a defined value. +#[must_use] +pub fn json_to_dynamic(value: Json) -> Dynamic { + match value { + Json::Null => Dynamic::UNIT, + Json::Bool(b) => b.into(), + Json::Number(n) => { + if let Some(i) = n.as_i64() { + i.into() + } else if let Some(f) = n.as_f64() { + f.into() + } else { + n.to_string().into() + } + } + Json::String(s) => s.into(), + Json::Array(arr) => arr + .into_iter() + .map(json_to_dynamic) + .collect::>() + .into(), + Json::Object(obj) => { + let mut m = Map::new(); + for (k, v) in obj { + m.insert(k.into(), json_to_dynamic(v)); + } + Dynamic::from(m) + } + } +} + +/// Convert a Rhai `Dynamic` back to a `serde_json::Value`. Custom Rhai +/// types (timestamps, user-registered modules) fall back to their +/// `Display` form so they appear as strings in JSON output rather than +/// failing the response build. +#[must_use] +pub fn dynamic_to_json(value: &Dynamic) -> Json { + if value.is_unit() { + return Json::Null; + } + if let Ok(b) = value.as_bool() { + return Json::Bool(b); + } + if let Ok(i) = value.as_int() { + return Json::Number(i.into()); + } + if let Ok(f) = value.as_float() { + return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number); + } + if value.is_string() { + return Json::String(value.clone().into_string().unwrap_or_default()); + } + if let Some(arr) = value.clone().try_cast::() { + return Json::Array(arr.iter().map(dynamic_to_json).collect()); + } + if let Some(map) = value.clone().try_cast::() { + let mut out = serde_json::Map::new(); + for (k, v) in map { + out.insert(k.to_string(), dynamic_to_json(&v)); + } + return Json::Object(out); + } + Json::String(value.to_string()) +} diff --git a/crates/executor-core/src/sdk/cx.rs b/crates/executor-core/src/sdk/cx.rs new file mode 100644 index 0000000..5820041 --- /dev/null +++ b/crates/executor-core/src/sdk/cx.rs @@ -0,0 +1,10 @@ +//! Re-export of `picloud_shared::SdkCallCx`. +//! +//! The type itself lives in `picloud-shared` because future stateful +//! service impls live in `manager-core` (which `executor-core` must +//! not depend on) and need to reference the same cx shape. This +//! re-export lets executor-side code write +//! `use picloud_executor_core::sdk::SdkCallCx;` instead of reaching +//! into `picloud_shared` for one type. + +pub use picloud_shared::SdkCallCx; diff --git a/crates/executor-core/src/sdk/mod.rs b/crates/executor-core/src/sdk/mod.rs new file mode 100644 index 0000000..bbb478c --- /dev/null +++ b/crates/executor-core/src/sdk/mod.rs @@ -0,0 +1,39 @@ +//! SDK plumbing — types and the per-call registration entry point. +//! +//! `executor-core` is responsible for building the per-invocation Rhai +//! engine and wiring stateful services into it. v1.1.0 ships the +//! shapes (`Services` bundle, `SdkCallCx`, `register_all` entry point) +//! but no actual services — subsequent v1.1.x PRs (KV in v1.1.1, +//! docs in v1.1.2, …) extend `register_all` rather than re-threading +//! plumbing through `engine.rs`. +//! +//! Bridge functions (`json_to_dynamic` / `dynamic_to_json`) also live +//! here so service modules can convert values without `engine.rs` +//! being the only home for the conversion logic. + +pub mod bridge; +pub mod cx; + +pub use bridge::{dynamic_to_json, json_to_dynamic}; +pub use cx::SdkCallCx; + +use std::sync::Arc; + +use picloud_shared::Services; +use rhai::Engine as RhaiEngine; + +/// Single hook every v1.1.x stateful service registers into. Called +/// once per invocation, just after `build_engine` constructs the +/// sandboxed Rhai engine and just before script compilation. +/// +/// v1.1.0 ships an intentionally empty body — the call site exists so +/// future PRs (KV first) drop their registration logic here rather +/// than reaching into `engine.rs::build_engine`. The signature is +/// locked: subsequent PRs MUST keep the same parameter shape so that +/// hosts don't have to re-thread the plumbing. +pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + // Intentionally inert in v1.1.0. The unused-suppression below is a + // load-bearing placeholder: future PRs replace this `let _` with + // real `register_kv(engine, services, cx.clone())` calls etc. + let _ = (engine, services, cx); +} From fe1dd9083678dfb2ee701122058a854d5f37adb7 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:48:39 +0200 Subject: [PATCH 3/9] feat(executor-core): plumb app_id/principal/depth through ExecRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the four internal-only fields every v1.1.x stateful service needs to isolate by app and audit by caller: - app_id — owning app for this invocation - principal — Option; data-plane is unauthenticated today so the orchestrator passes None until the opportunistic middleware lands in the next commit - trigger_depth — 0 for direct invocations; the triggers framework (v1.1.1) bounds runaway feedback loops via this - root_execution_id — equal to execution_id for direct invocations; preserved across trigger fan-out for audit grouping ExecRequest stays serializable (cluster mode still has to ship it across processes when v1.3+ arrives). principal is `#[serde(skip)]` because shared::Principal has no wire derivation today — when cluster mode lands the wire-Principal question gets revisited properly. Engine now carries a Services bundle (empty in v1.1.0). Engine::execute constructs an SdkCallCx from the request and hands it to sdk::register_all just after the per-call Rhai engine is built. The hook is a no-op in v1.1.0; v1.1.1 KV registers its first native fns there. Adds ExecError::Overloaded { retry_after_secs } and the matching 503 + Retry-After mapping in orchestrator-core's IntoResponse. The gate that actually produces this variant lands in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/executor-core/src/engine.rs | 30 ++++++++++++++--- crates/executor-core/src/types.rs | 38 +++++++++++++++++++++- crates/executor-core/tests/engine.rs | 13 +++++--- crates/executor-core/tests/sdk_contract.rs | 11 +++++-- crates/orchestrator-core/src/api.rs | 38 ++++++++++++++++++++-- crates/picloud/src/lib.rs | 7 ++-- 6 files changed, 119 insertions(+), 18 deletions(-) diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index 6e2e745..f2849a8 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -3,31 +3,38 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use chrono::Utc; -use picloud_shared::{ScriptValidator, ValidationError, SDK_VERSION}; +use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION}; use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; use serde_json::Value as Json; use crate::sandbox::Limits; +use crate::sdk; use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic}; use crate::types::{ ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel, }; -/// Preconfigured Rhai engine with sandbox limits applied. +/// Preconfigured Rhai engine with sandbox limits applied and the SDK +/// `Services` bundle attached. /// /// One `Engine` is constructed at process startup and reused across /// invocations. `execute` is **synchronous** — it owns the per-call /// scope and log buffer. Wall-clock timeouts and offloading off the /// async runtime belong to the caller (orchestrator-core's /// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`). +/// +/// The `Services` bundle is empty in v1.1.0; subsequent v1.1.x PRs add +/// service handles (KV, docs, …) and `sdk::register_all` wires them +/// into each per-call Rhai engine. pub struct Engine { limits: Limits, + services: Services, } impl Engine { #[must_use] - pub fn new(limits: Limits) -> Self { - Self { limits } + pub fn new(limits: Limits, services: Services) -> Self { + Self { limits, services } } #[must_use] @@ -56,7 +63,20 @@ impl Engine { pub fn execute(&self, source: &str, req: ExecRequest) -> Result { let effective_limits = self.limits.with_overrides(&req.sandbox_overrides); let logs: Arc>> = Arc::new(Mutex::new(Vec::new())); - let engine = build_engine(effective_limits, Some(logs.clone())); + let mut engine = build_engine(effective_limits, Some(logs.clone())); + + // Per-call context handed to every stateful SDK service via the + // `sdk::register_all` hook. The Arc lets future service closures + // capture cheap clones of the cx for use at script-call time. + let cx = Arc::new(SdkCallCx { + app_id: req.app_id, + principal: req.principal.clone(), + execution_id: req.execution_id, + request_id: req.request_id, + trigger_depth: req.trigger_depth, + root_execution_id: req.root_execution_id, + }); + sdk::register_all(&mut engine, &self.services, cx); let ast = engine .compile(source) diff --git a/crates/executor-core/src/types.rs b/crates/executor-core/src/types.rs index fd57dce..d974a17 100644 --- a/crates/executor-core/src/types.rs +++ b/crates/executor-core/src/types.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use chrono::{DateTime, Utc}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -50,6 +50,35 @@ pub struct ExecRequest { /// override) before the Rhai engine is built. #[serde(default)] pub sandbox_overrides: ScriptSandbox, + + /// Owning application. Source of truth for every `(app_id, …)` + /// storage lookup the script makes via stateful SDK services. + /// Internal-only; not surfaced via `ctx` (which the script sees). + pub app_id: AppId, + + /// Caller identity, when authenticated. `None` for unauthenticated + /// data-plane HTTP requests (the common case for public scripts); + /// `Some` when a bearer token or session cookie was resolved. + /// Internal-only — exposed via `SdkCallCx` to service trait impls. + /// + /// `#[serde(skip)]`: `ExecRequest` is serializable so cluster mode + /// (v1.3+) can ship invocations to remote executors over HTTP, but + /// `Principal` has no wire derivation today. Skipping here keeps + /// v1.1.0 compiling; the cluster-mode PR will introduce a wire-safe + /// snapshot then. + #[serde(skip)] + pub principal: Option, + + /// Triggers-framework depth. `0` for direct invocations. The + /// dispatcher (v1.1.1) increments on each indirection to bound + /// runaway feedback loops. + #[serde(default)] + pub trigger_depth: u32, + + /// Originating execution id of a trigger chain. Equal to + /// `execution_id` for direct invocations; preserves the root + /// across fan-out for audit log grouping. + pub root_execution_id: ExecutionId, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -100,4 +129,11 @@ pub enum ExecError { #[error("script runtime error: {0}")] Runtime(String), + + /// Concurrency gate (orchestrator-core::ExecutionGate) refused + /// admission. Surfaced as HTTP 503 with a `Retry-After` header. + /// The gate enforces a global cap so a script storm can't park + /// every blocking thread. + #[error("execution declined: server at capacity (retry after {retry_after_secs}s)")] + Overloaded { retry_after_secs: u32 }, } diff --git a/crates/executor-core/tests/engine.rs b/crates/executor-core/tests/engine.rs index c888ae4..39935f4 100644 --- a/crates/executor-core/tests/engine.rs +++ b/crates/executor-core/tests/engine.rs @@ -1,12 +1,13 @@ use std::collections::BTreeMap; use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services}; use serde_json::json; fn req(body: serde_json::Value) -> ExecRequest { + let execution_id = ExecutionId::new(); ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: ScriptId::new(), script_name: "test".into(), @@ -18,11 +19,15 @@ fn req(body: serde_json::Value) -> ExecRequest { query: BTreeMap::new(), rest: String::new(), sandbox_overrides: ScriptSandbox::default(), + app_id: AppId::new(), + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, } } fn engine() -> Engine { - Engine::new(Limits::default()) + Engine::new(Limits::default(), Services::new()) } #[test] @@ -121,7 +126,7 @@ fn enforces_operation_budget() { max_operations: 1_000, ..Limits::default() }; - let engine = Engine::new(limits); + let engine = Engine::new(limits, Services::new()); // 10_000 iterations vastly exceeds 1_000 ops. let src = r"let n = 0; for i in 0..10000 { n += 1; } n"; let err = engine diff --git a/crates/executor-core/tests/sdk_contract.rs b/crates/executor-core/tests/sdk_contract.rs index af7fbb1..26788c5 100644 --- a/crates/executor-core/tests/sdk_contract.rs +++ b/crates/executor-core/tests/sdk_contract.rs @@ -23,7 +23,7 @@ use std::collections::BTreeMap; use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits, LogLevel}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services}; use serde_json::{json, Value}; // ---------------------------------------------------------------------------- @@ -31,12 +31,13 @@ use serde_json::{json, Value}; // ---------------------------------------------------------------------------- fn engine() -> Engine { - Engine::new(Limits::default()) + Engine::new(Limits::default(), Services::new()) } fn baseline_request() -> ExecRequest { + let execution_id = ExecutionId::new(); ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: ScriptId::new(), script_name: "contract".into(), @@ -48,6 +49,10 @@ fn baseline_request() -> ExecRequest { query: BTreeMap::new(), rest: String::new(), sandbox_overrides: ScriptSandbox::default(), + app_id: AppId::new(), + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, } } diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index c11454b..82008e2 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -17,7 +17,8 @@ use axum::{ use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_shared::{ - AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId, + AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, Principal, RequestId, + ScriptId, }; use serde_json::Value as Json_; use uuid::Uuid; @@ -97,7 +98,10 @@ where .await? .ok_or(ApiError::NotFound(id))?; - let mut req = build_exec_request(id, &script.name, &headers, &body)?; + // Principal stays `None` until the data-plane `attach_principal_if_present` + // middleware lands in the picloud-wiring commit. Both shapes are + // valid against `ExecRequest.principal: Option`. + let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, None)?; req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); @@ -195,6 +199,8 @@ where &script.name, &headers, &body_bytes, + app_id, + None, )?; req.path = path; req.params = matched.params; @@ -264,6 +270,8 @@ fn build_exec_request( name: &str, headers: &HeaderMap, body: &Bytes, + app_id: AppId, + principal: Option, ) -> Result { let mut hmap = BTreeMap::new(); for (k, v) in headers { @@ -279,8 +287,9 @@ fn build_exec_request( .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? }; + let execution_id = ExecutionId::new(); Ok(ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: id, script_name: name.to_string(), @@ -293,6 +302,13 @@ fn build_exec_request( rest: String::new(), // Overwritten by the handler after the script is resolved. sandbox_overrides: picloud_shared::ScriptSandbox::default(), + app_id, + principal, + // Direct invocations are at depth 0 with a self-referential + // root. The triggers framework (v1.1.1) increments depth and + // preserves the original root for chained executions. + trigger_depth: 0, + root_execution_id: execution_id, }) } @@ -396,6 +412,21 @@ pub enum ApiError { impl IntoResponse for ApiError { fn into_response(self) -> Response { + // Overloaded is the only variant that needs to attach an HTTP + // header (Retry-After), so it short-circuits the (status, body) + // reduction below. Axum's tuple builder makes per-arm header + // injection awkward otherwise. + if let ApiError::Exec(ExecError::Overloaded { retry_after_secs }) = &self { + let retry = retry_after_secs.to_string(); + let body = Json(serde_json::json!({ "error": self.to_string() })); + return ( + StatusCode::SERVICE_UNAVAILABLE, + [(axum::http::header::RETRY_AFTER, retry)], + body, + ) + .into_response(); + } + use ApiError as E; let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), @@ -416,6 +447,7 @@ impl IntoResponse for ApiError { (StatusCode::INSUFFICIENT_STORAGE, e.to_string()) } ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()), + ExecError::Overloaded { .. } => unreachable!("handled above"), }, }; (status, Json(serde_json::json!({ "error": message }))).into_response() diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index ef59a73..9ae47c5 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -25,7 +25,8 @@ use picloud_orchestrator_core::{ data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, ScriptValidator, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, + ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, + WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -82,7 +83,9 @@ fn read_session_ttl() -> Duration { /// `/version`) stays open — it's the public ingress for user scripts. #[allow(clippy::too_many_lines)] pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { - let engine = Arc::new(Engine::new(Limits::default())); + // `Services` is the SDK service bundle. Empty in v1.1.0; the + // v1.1.1 KV PR will populate it with `kv: Arc::new(...)` here. + let engine = Arc::new(Engine::new(Limits::default(), Services::new())); let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone())); let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone())); From dea776b2a31fedfcdac31fc637cde08d82c81590 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:50:44 +0200 Subject: [PATCH 4/9] feat(orchestrator-core): ExecutionGate + 503/Retry-After on overflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a single global concurrency cap on the data-plane dispatch path: - orchestrator-core::gate::ExecutionGate wraps tokio::Semaphore. Non-blocking try_acquire — no queue. PICLOUD_MAX_CONCURRENT_EXECUTIONS env var (default 32) sets the cap. - LocalExecutorClient acquires a permit before spawn_blocking; the permit drops with the future so the slot returns automatically. - On refusal, ExecError::Overloaded { retry_after_secs: 1 } surfaces upward. ApiError::IntoResponse already maps that to 503 with a Retry-After header (landed in the previous commit alongside the variant itself). - picloud binary constructs the gate once at build_app and shares it with LocalExecutorClient. The cap exists so a Rhai script storm can't drain the blocking-thread pool — pushing back hard beats letting requests pile up against a finite worker count. Per-app / per-script caps stay deferred until a real workload demands them. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/orchestrator-core/src/client.rs | 23 +++- crates/orchestrator-core/src/gate.rs | 154 +++++++++++++++++++++++++ crates/orchestrator-core/src/lib.rs | 2 + crates/picloud/src/lib.rs | 7 +- 4 files changed, 182 insertions(+), 4 deletions(-) create mode 100644 crates/orchestrator-core/src/gate.rs diff --git a/crates/orchestrator-core/src/client.rs b/crates/orchestrator-core/src/client.rs index e64c49a..feaca81 100644 --- a/crates/orchestrator-core/src/client.rs +++ b/crates/orchestrator-core/src/client.rs @@ -4,6 +4,8 @@ use std::time::Duration; use async_trait::async_trait; use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse}; +use crate::gate::{AcquireError, ExecutionGate}; + /// Maximum wall-clock time we'll wait for a single invocation, regardless /// of the per-script `timeout_seconds`. Provides a hard ceiling on /// resource usage independent of misconfigured scripts. @@ -30,14 +32,19 @@ pub trait ExecutorClient: Send + Sync { /// `executor-core::Engine::execute` is synchronous; we offload it to a /// blocking thread so it doesn't park a Tokio worker, and apply the /// wall-clock timeout here. +/// +/// Holds an `ExecutionGate` and acquires a permit before `spawn_blocking` +/// so a script storm can't drain the blocking-thread pool. The permit +/// drops with the future, returning the slot. pub struct LocalExecutorClient { engine: Arc, + gate: Arc, } impl LocalExecutorClient { #[must_use] - pub fn new(engine: Arc) -> Self { - Self { engine } + pub fn new(engine: Arc, gate: Arc) -> Self { + Self { engine, gate } } } @@ -49,6 +56,18 @@ impl ExecutorClient for LocalExecutorClient { req: ExecRequest, timeout: Duration, ) -> Result { + // Acquire before spending any wall-clock budget. The permit is + // held until this future returns; spawn_blocking inherits the + // gating via the captured `_permit`. + let _permit = + self.gate + .try_acquire() + .map_err( + |AcquireError::Overloaded { retry_after_secs }| ExecError::Overloaded { + retry_after_secs, + }, + )?; + let timeout = timeout.min(HARD_TIMEOUT_CAP); let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX); diff --git a/crates/orchestrator-core/src/gate.rs b/crates/orchestrator-core/src/gate.rs new file mode 100644 index 0000000..5fd6ce3 --- /dev/null +++ b/crates/orchestrator-core/src/gate.rs @@ -0,0 +1,154 @@ +//! Global concurrency gate for the data plane. +//! +//! Wraps a single `tokio::sync::Semaphore` so the executor can refuse +//! admission immediately when too many invocations are already in +//! flight. Designed for v1.1.0's single-node MVP — one cap across all +//! apps and scripts. Per-app or per-script caps come later when a real +//! workload surfaces the need. +//! +//! Policy: **non-blocking, no queue**. If a permit isn't free right +//! now, the call returns `AcquireError::Overloaded` and the data-plane +//! HTTP layer translates that to a 503 with `Retry-After: 1`. Pushing +//! back hard beats letting requests pile up against a finite pool of +//! blocking threads (executor work runs under `spawn_blocking`). +//! +//! Configured via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var. +//! Default is 32 — comfortable for a single-node Pi, low enough that +//! a script storm doesn't park every blocking thread. + +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; + +/// Env var consulted by `from_env`. +pub const ENV_MAX_CONCURRENT: &str = "PICLOUD_MAX_CONCURRENT_EXECUTIONS"; + +/// Default cap when the env var is unset or invalid. +pub const DEFAULT_MAX_CONCURRENT: u32 = 32; + +/// `Retry-After` header value (seconds) returned alongside the 503 +/// when the gate refuses. Fixed for v1.1.0; later versions may compute +/// a smarter value from in-flight latency. +pub const DEFAULT_RETRY_AFTER_SECS: u32 = 1; + +/// Refused admission. The HTTP layer translates this to 503 with a +/// `Retry-After` header. +#[derive(Debug, Error)] +pub enum AcquireError { + #[error("at capacity (retry after {retry_after_secs}s)")] + Overloaded { retry_after_secs: u32 }, +} + +/// Global execution gate. Constructed once at orchestrator startup and +/// shared via `Arc`. Holds an inner `Arc` so permits are +/// owned (they release on drop independent of the gate's lifetime). +pub struct ExecutionGate { + permits: Arc, + max_permits: u32, +} + +impl ExecutionGate { + /// Construct with an explicit cap. Mostly for tests; production + /// uses `from_env`. + #[must_use] + pub fn new(max_permits: u32) -> Self { + Self { + permits: Arc::new(Semaphore::new(max_permits as usize)), + max_permits, + } + } + + /// Read `PICLOUD_MAX_CONCURRENT_EXECUTIONS` from the environment. + /// Falls back to `DEFAULT_MAX_CONCURRENT` on absence; warns and + /// falls back on parse failure or non-positive value. Mirrors the + /// `SandboxCeiling::from_env` ergonomics so operators see a + /// consistent shape across the env-tunables. + #[must_use] + pub fn from_env() -> Self { + let max = match std::env::var(ENV_MAX_CONCURRENT) { + Err(_) => DEFAULT_MAX_CONCURRENT, + Ok(v) => match v.parse::() { + Ok(n) if n > 0 => n, + Ok(_) => { + tracing::warn!( + env = ENV_MAX_CONCURRENT, + value = %v, + "value must be > 0; using default {DEFAULT_MAX_CONCURRENT}" + ); + DEFAULT_MAX_CONCURRENT + } + Err(e) => { + tracing::warn!( + env = ENV_MAX_CONCURRENT, + value = %v, + error = %e, + "invalid value; using default {DEFAULT_MAX_CONCURRENT}" + ); + DEFAULT_MAX_CONCURRENT + } + }, + }; + Self::new(max) + } + + /// Maximum concurrent permits this gate was configured for. Useful + /// for diagnostics / future metrics. + #[must_use] + pub fn max_permits(&self) -> u32 { + self.max_permits + } + + /// Non-blocking permit acquisition. Returns the owned permit on + /// success (drop releases the slot) or `AcquireError::Overloaded` + /// when saturated. Sync because the semaphore's non-blocking try is + /// sync — no runtime hop needed. + pub fn try_acquire(&self) -> Result { + self.permits + .clone() + .try_acquire_owned() + .map_err(|err| match err { + TryAcquireError::NoPermits | TryAcquireError::Closed => AcquireError::Overloaded { + retry_after_secs: DEFAULT_RETRY_AFTER_SECS, + }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn acquire_succeeds_under_capacity() { + let gate = ExecutionGate::new(2); + let _p1 = gate.try_acquire().expect("first permit available"); + let _p2 = gate.try_acquire().expect("second permit available"); + } + + #[test] + fn acquire_overloaded_when_saturated() { + let gate = ExecutionGate::new(1); + let _p = gate.try_acquire().expect("first permit available"); + let AcquireError::Overloaded { retry_after_secs } = gate + .try_acquire() + .expect_err("second permit must be refused"); + assert!(retry_after_secs > 0, "retry-after must be positive"); + } + + #[test] + fn permit_drop_releases_slot() { + let gate = ExecutionGate::new(1); + { + let _p = gate.try_acquire().expect("first permit available"); + } + gate.try_acquire() + .expect("slot must be returned after permit drops"); + } + + #[test] + fn max_permits_exposed() { + let gate = ExecutionGate::new(7); + assert_eq!(gate.max_permits(), 7); + } +} diff --git a/crates/orchestrator-core/src/lib.rs b/crates/orchestrator-core/src/lib.rs index 3d07e4e..11c0a34 100644 --- a/crates/orchestrator-core/src/lib.rs +++ b/crates/orchestrator-core/src/lib.rs @@ -10,9 +10,11 @@ pub mod api; pub mod client; +pub mod gate; pub mod resolver; pub mod routing; pub use api::{data_plane_router, user_routes_router, DataPlaneState}; pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient}; +pub use gate::{AcquireError, ExecutionGate}; pub use resolver::{ResolverError, ScriptResolver}; diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 9ae47c5..1203f8d 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -22,7 +22,7 @@ use picloud_manager_core::{ }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ - data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient, + data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient, }; use picloud_shared::{ ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, @@ -129,7 +129,10 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle( script_repo.clone(), ))); - let executor = Arc::new(LocalExecutorClient::new(engine.clone())); + // Single global gate — overflow is rejected with 503 + Retry-After. + // See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`. + let gate = Arc::new(ExecutionGate::from_env()); + let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate)); let admin = AdminState { repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())), From 902dd78027731698b98fe6be37039b40ca744cb9 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:53:27 +0200 Subject: [PATCH 5/9] feat(picloud): opportunistic principal middleware on the data plane MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The data-plane (POST /execute/{id} + user-route fallback) is unauthenticated by default — public scripts get hit by anonymous HTTP traffic. But some calls are authed (dashboard test-runs, API-key invocations) and v1.1.x services will want to see the caller via `cx.principal` for audit / authz once those features land. - New manager-core::attach_principal_if_present middleware. Always inserts Extension>: Some on resolved bearer/cookie, None on absent or malformed token. Fail-open on DB blip so a transient infra failure can't 500 anonymous traffic. - Wired in picloud build_app, scoped to the data-plane and user-routes routers only. The admin path keeps using require_authenticated; no double-resolve on the same token. - orchestrator-core handlers (execute_by_id, user_route_handler) now extract Extension> and pass it to build_exec_request. Replaces the temporary `None` placeholders from the previous commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/manager-core/src/auth_middleware.rs | 29 ++++++++++++++++++ crates/manager-core/src/lib.rs | 4 +-- crates/orchestrator-core/src/api.rs | 20 +++++++++---- crates/picloud/src/lib.rs | 35 +++++++++++++++------- 4 files changed, 70 insertions(+), 18 deletions(-) diff --git a/crates/manager-core/src/auth_middleware.rs b/crates/manager-core/src/auth_middleware.rs index 754b5ce..2136534 100644 --- a/crates/manager-core/src/auth_middleware.rs +++ b/crates/manager-core/src/auth_middleware.rs @@ -100,6 +100,35 @@ pub async fn require_admin(state: State, req: Request, next: Ne require_authenticated(state, req, next).await } +/// Opportunistic data-plane variant: always inserts an +/// `Extension>` and forwards the request. Used on +/// `/execute/{id}` and the user-route fallback, where most invocations +/// are anonymous public HTTP and the few authed ones (dashboard +/// test-runs, API keys) should still let scripts see the caller via +/// `cx.principal` once services consume it. +/// +/// Failure modes — all degrade to `None` rather than rejecting: +/// * No bearer / cookie → `None`. +/// * Malformed or unknown token → `None`. +/// * DB blip while resolving → `None` (fail-open; the data plane +/// should not 500 on transient infra failures for an *optional* +/// identity check). +/// +/// Admin-side routes that REQUIRE an identity keep using +/// `require_authenticated`. +pub async fn attach_principal_if_present( + State(state): State, + mut req: Request, + next: Next, +) -> Response { + let principal: Option = match extract_token(&req) { + Some(token) => resolve_principal(&state, &token).await.unwrap_or(None), + None => None, + }; + req.extensions_mut().insert(principal); + next.run(req).await +} + /// Decide whether the token is an API key (pic_ prefix) or a session /// token, then resolve the corresponding `Principal`. `Ok(None)` /// means the token was structurally valid but didn't match any active diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index b2c249f..b126f9e 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -59,8 +59,8 @@ pub use auth_bootstrap::{ }; #[allow(deprecated)] pub use auth_middleware::{ - require_admin, require_authenticated, AuthState, AuthedAdmin, API_KEY_PREFIX, - API_KEY_PREFIX_LEN, SESSION_COOKIE, + attach_principal_if_present, require_admin, require_authenticated, AuthState, AuthedAdmin, + API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE, }; pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision}; pub use log_sink::PostgresExecutionLogSink; diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index 82008e2..779b93e 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -12,7 +12,7 @@ use axum::{ http::{HeaderMap, HeaderName, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::post, - Json, Router, + Extension, Json, Router, }; use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; @@ -55,6 +55,11 @@ impl Clone for DataPlaneState { /// Build the data-plane router. Handles `POST /execute/:id` — the /// always-available ID-based bypass. +/// +/// Handlers expect an `Extension>` to be attached by +/// upstream middleware (`manager-core::attach_principal_if_present`); +/// requests without that extension panic at extraction time. The +/// picloud binary wires this in `build_app`. pub fn data_plane_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, @@ -68,6 +73,10 @@ where /// Build a router that handles ALL paths via the user-defined routing /// table. Intended to be merged into the picloud app router as a /// fallback (after the system routes are mounted). +/// +/// Same middleware expectation as `data_plane_router` — wrap with +/// `attach_principal_if_present` so handlers can extract +/// `Extension>`. pub fn user_routes_router(state: DataPlaneState) -> Router where E: ExecutorClient + 'static, @@ -85,6 +94,7 @@ where async fn execute_by_id( State(state): State>, Path(id): Path, + Extension(principal): Extension>, headers: HeaderMap, body: Bytes, ) -> Result @@ -98,10 +108,7 @@ where .await? .ok_or(ApiError::NotFound(id))?; - // Principal stays `None` until the data-plane `attach_principal_if_present` - // middleware lands in the picloud-wiring commit. Both shapes are - // valid against `ExecRequest.principal: Option`. - let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, None)?; + let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, principal)?; req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); @@ -137,6 +144,7 @@ where async fn user_route_handler( State(state): State>, + Extension(principal): Extension>, request: Request, ) -> Result where @@ -200,7 +208,7 @@ where &headers, &body_bytes, app_id, - None, + principal, )?; req.path = path; req.params = matched.params; diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 1203f8d..297301f 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -11,14 +11,14 @@ use axum::{routing::get, Json, Router}; use picloud_executor_core::{Engine, Limits}; use picloud_manager_core::{ admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, - auth_router, compile_routes, migrations, require_authenticated, route_admin_router, - AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, - ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository, - AppsState, AuthState, AuthzRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, - PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, - PostgresAppRepository, PostgresExecutionLogRepository, PostgresExecutionLogSink, - PostgresRouteRepository, PostgresScriptRepository, RepoResolver, RouteAdminState, - RouteRepository, SandboxCeiling, + attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated, + route_admin_router, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, + ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, + AppRepository, AppsState, AuthState, AuthzRepo, PostgresAdminSessionRepository, + PostgresAdminUserRepository, PostgresApiKeyRepository, PostgresAppDomainRepository, + PostgresAppMembersRepository, PostgresAppRepository, PostgresExecutionLogRepository, + PostgresExecutionLogSink, PostgresRouteRepository, PostgresScriptRepository, RepoResolver, + RouteAdminState, RouteRepository, SandboxCeiling, }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ @@ -206,16 +206,31 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { // facade above; the bare module path is retained so it's discoverable. let _ = apps_api::AppsState::clone; + // Opportunistic principal extraction on every data-plane request. + // Always inserts `Extension>`: Some for authed + // ingress (bearer / cookie), None otherwise. Handlers depend on + // this layer being applied — scoped to the data-plane routers so + // the admin path (which uses `require_authenticated`) doesn't + // double-resolve the same token. + let data_plane_routed = data_plane_router(data_plane.clone()).layer(from_fn_with_state( + auth_state.clone(), + attach_principal_if_present, + )); + let user_routes = user_routes_router(data_plane).layer(from_fn_with_state( + auth_state.clone(), + attach_principal_if_present, + )); + let api_v1 = Router::new() .nest("/admin", auth_router(auth_state)) .nest("/admin", guarded_admin) - .merge(data_plane_router(data_plane.clone())); + .merge(data_plane_routed); Ok(Router::new() .route("/healthz", get(healthz)) .route("/version", get(version)) .nest(&format!("/api/v{API_VERSION}"), api_v1) - .merge(user_routes_router(data_plane)) + .merge(user_routes) .layer(TraceLayer::new_for_http())) } From 5302bd3192de0526ff7ea719a98e74b71cdff826 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:57:44 +0200 Subject: [PATCH 6/9] docs(sdk): SDK-shape reference + blueprint updates for v1.1.x MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the developer-facing reference for the SDK shape every v1.1.x service implements against, plus the blueprint changes the shape and the recently-shipped Phase 3.5 imply: - New docs/sdk-shape.md — covers handle pattern, :: namespace, throw/() error convention, sync↔async bridge, cross-app isolation rule, ServiceEventEmitter, ExecutionGate + env var, stateless vs stateful module registration. - Blueprint §11.6 (Phase 3.5): Pending → ✓ Shipped, with a note that it landed ahead of the originally planned slot. - Blueprint §8.1 (KV Store): replace hstore schema + rationale with JSONB. PK becomes (app_id, collection, key); cross-app isolation is enforced at the index, not just the service layer. Note 64 KiB per-value cap enforced at the service layer (lands with the KV PR in v1.1.1). - Blueprint new §7.5 (SDK Architecture): brief overview pointing to docs/sdk-shape.md. Includes §7.5.1 sketch of the trigger architecture (outbox + depth limit + (service, event, filter) → script). - Blueprint §12 Phase 4: restructured to enumerate v1.1.0 through v1.1.8 with one focused capability per release. Current focus moves to Phase 4 (v1.1.0) now that Phase 3.5 is done. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/sdk-shape.md | 227 ++++++++++++++++++++++++++++++++++ serverless_cloud_blueprint.md | 119 ++++++++++-------- 2 files changed, 294 insertions(+), 52 deletions(-) create mode 100644 docs/sdk-shape.md diff --git a/docs/sdk-shape.md b/docs/sdk-shape.md new file mode 100644 index 0000000..15e0a7f --- /dev/null +++ b/docs/sdk-shape.md @@ -0,0 +1,227 @@ +# SDK shape (v1.1.x stateful services) + +This document describes the architectural shape every v1.1.x SDK +service follows. It is **not** a feature reference for any particular +service — those live in their own docs as each PR lands (KV in v1.1.1, +docs in v1.1.2, …). What follows is the contract those PRs implement +against, so the surface stays consistent and the build doesn't drift. + +The shape was laid down in v1.1.0 (the SDK foundation PR). If you find +yourself re-litigating any of it inside a service PR, push back and +update this doc explicitly first. + +## Two kinds of Rhai modules + +**Stateless utility modules** (regex, time, json, base64, hex, url — +landing as v1.1.0's stdlib PR) are registered once at engine build. +They have no per-call state and no cross-app sensitivity. Implementation +goes in `executor-core::engine::build_engine` next to the existing +`log::` registration. They use Rhai's `register_static_module`. + +**Stateful service modules** (kv, docs, http, cron, files, pubsub, +secrets, email, users, queue, invoke) are registered **per call** by +`executor-core::sdk::register_all`. They need: + +- A service handle bundled in `picloud_shared::Services` (constructed + once at startup, cloned cheaply per call). +- A per-call `SdkCallCx` carrying the calling app, principal, + execution ids, and trigger depth. +- Closures that capture both, registered as Rhai native functions + inside a per-call `rhai::Module`. + +Mixing the two categories in one module is wrong — services that +internally consult per-call context are stateful, period. + +## `::` namespace style + +Every SDK module exposes itself under a `::` namespace, mirroring the +existing `log::`: + +```rhai +log::info("hello"); // v1.0 — present +let value = kv::collection("widgets").get("k"); // v1.1.1 +let resp = http::get("https://example.com"); // v1.1.4 +``` + +Dotted-object syntax (`kv.get("widgets", "k")`) is **not** used. +Rationale: `::` is consistent with Rust import syntax, doesn't +require a wrapper "module object" in Rhai's scope, and keeps the +module boundary obvious in scripts. + +## Handle pattern for collection-scoped services + +Services that operate on collections expose a **collection handle** +returned by an `::collection(name)` constructor: + +```rhai +let widgets = kv::collection("widgets"); +widgets.set("k", "v"); +let v = widgets.get("k"); +``` + +Not `kv::set("widgets", "k", "v")`. The handle is a Rhai custom type +the service registers; method calls bind to that type. This: + +- Removes the "did I get the collection-name argument right?" foot-gun. +- Lets the implementation cache per-collection state on the handle + (prepared statements, connection affinity) without leaking that + into the call signature. +- Pre-empts the "collection is implicit" failure mode where two + services in the same script accidentally share a default collection. + +`(app_id, collection, key)` is the identity tuple for KV; `(app_id, +collection, id)` for docs. Collections are **mandatory**, not optional +— even single-collection apps name their collection. The service layer +rejects requests with empty collection names. + +## Error convention + +- **Throw on failure.** `widgets.set("k", "v")` throws a Rhai runtime + error on any operational problem (DB unavailable, payload too large, + authz denied). Scripts opting into error handling use Rhai's + `try/catch`. +- **`()` for absent.** `widgets.get("missing")` returns `()` (Rhai + unit). Scripts test absence with `if v == () { ... }` or use the + matching `has(k)` predicate. +- **`bool` for predicates.** `widgets.has(k)` is the cheap existence + check that doesn't deserialize the value. + +This convention is uniform across every v1.1.x service. Adding +`Result`-flavoured variants is a design departure that requires a doc +update before implementation. + +## `SdkCallCx` and cross-app isolation + +Every stateful service trait method takes `&SdkCallCx` as its first +non-self argument. The cx carries: + +```rust +pub struct SdkCallCx { + pub app_id: AppId, + pub principal: Option, + pub execution_id: ExecutionId, + pub request_id: RequestId, + pub trigger_depth: u32, + pub root_execution_id: ExecutionId, +} +``` + +**The service implementation MUST derive `app_id` from `cx.app_id` — +never from a script-passed argument.** Scripts cannot name another +app's data, period. The closure registered into Rhai captures the +`Arc` for the call; the script never sees or passes +`app_id`. + +Why this matters: a `kv::set("widgets", "k", v)` call with a +script-supplied `app_id` would be a tenant-isolation vulnerability if +that arg ever leaked into the storage query. By deriving from the +host-attached cx, the service can't be tricked. + +`principal` is `Option` because the data plane is +unauthenticated by default — public HTTP scripts run with `None`. +Services that need an authenticated identity (e.g., `users::*`) check +`cx.principal.is_some()` and throw if missing. + +## Sync ↔ async bridge + +Rhai is synchronous; service trait methods (KV writes, HTTP calls) are +async. The bridge runs *inside the `spawn_blocking` thread* that +already wraps `Engine::execute` (orchestrator-core's +`LocalExecutorClient`): + +```rust +// Inside a Rhai-registered closure. +let runtime = tokio::runtime::Handle::current(); +let result = runtime.block_on(service.do_thing(&cx, args)); +``` + +`Handle::current()` finds the same Tokio runtime that scheduled the +`spawn_blocking`, so the `block_on` doesn't construct a fresh runtime. +The thread is already off the async worker pool (that's what +`spawn_blocking` does), so blocking inside it is safe. + +This pattern goes in every stateful service's registered Rhai closure. +The first service PR (KV, v1.1.1) lands a helper so subsequent services +don't reinvent the boilerplate. + +## `ServiceEventEmitter` + +Every stateful service that mutates data also emits events for the +(future) triggers framework: + +```rust +emitter.emit(&cx, ServiceEvent { + source: "kv", + op: "insert", + collection: Some("widgets".into()), + key: Some("k".into()), + payload: Some(new_value_json), + old_payload: None, +}).await?; +``` + +v1.1.0 ships only `NoopEventEmitter`. The v1.1.1 triggers PR replaces +that with an outbox-backed implementation: events land in a Postgres +outbox table; a dispatcher worker reads them out-of-band, matches +against registered triggers, and fans out script executions. The +dispatcher enforces a depth limit via `cx.trigger_depth` so a +trigger-fires-its-own-trigger chain can't run away. + +Services hold `Arc` and emit unconditionally; +the noop drops events, the real impl persists them. From the service's +perspective the emission is fire-and-forget. + +## `ExecutionGate` and `PICLOUD_MAX_CONCURRENT_EXECUTIONS` + +A single global semaphore caps concurrent script executions. Default +is 32; override via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var. +Acquisition is **non-blocking, no queue** — if a permit isn't free, +the request is refused immediately with HTTP 503 and a `Retry-After: +1` header. + +Rationale: Rhai execution runs under `spawn_blocking`, which uses a +finite pool of blocking threads (defaults to 512 in current Tokio). +Without a cap, a script storm parks every blocking thread and starves +every other workload (DB writes, log sinks, audit emission). Hard +pushback is preferable to silent degradation. + +Per-app or per-script caps are deferred until a real workload demands +them. The gate lives in `orchestrator-core::gate::ExecutionGate` and +is constructed once in the picloud binary's `build_app`. + +## Registration: where future services hook in + +```rust +// orchestrator-core / executor-core internal call path — +// you do not implement this; you implement registration helpers +// that future PRs call from here. +pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + // v1.1.1: register_kv(engine, services, cx.clone()); + // v1.1.2: register_docs(engine, services, cx.clone()); + // … +} +``` + +Each service PR adds: + +1. A `Service` trait + impl in `manager-core` (since that's where the + DB-backed implementations live). +2. A field on `picloud_shared::Services` (`pub kv: Arc`). +3. A `register_kv` helper inside `executor-core::sdk::kv` that takes + the engine, the service, and the cx, then registers the Rhai + `::collection(...)` constructor and method bindings. +4. A new `Capability` variant in `manager-core::authz` (e.g. + `AppKvRead(AppId)`) and a check inside the service impl. + +That sequence is the entire mechanical pattern; nothing here should +require architecture-level discussion past v1.1.0. + +## What this doc does NOT cover + +- Service-specific schemas (KV table layout, docs query DSL, etc.) — + in each service PR. +- Authentication and the admin auth model — see blueprint §11.5, + §11.6 and Phase 3.5. +- The trigger dispatch design (outbox row layout, fan-out semantics, + trigger CRUD endpoints) — comes with v1.1.1. +- Cluster mode considerations — deferred to v1.3+. diff --git a/serverless_cloud_blueprint.md b/serverless_cloud_blueprint.md index 4dee843..f3c2341 100644 --- a/serverless_cloud_blueprint.md +++ b/serverless_cloud_blueprint.md @@ -1022,9 +1022,9 @@ The scripts and routes endpoints keep their existing shape — this avoids forci --- -## 11.6 Users, roles, and bearer-token auth (Phase 3.5) — Pending +## 11.6 Users, roles, and bearer-token auth (Phase 3.5) — ✓ Shipped -**Status**: pending. Targets `crates/manager-core/src/{authz,api_keys_api,api_key_repo}.rs`, an extended `auth_middleware.rs`, new shared types under `crates/shared/src/auth.rs`, migration `0006_users_authz.sql`. +**Status**: shipped, ahead of the originally planned slot. Lives in `crates/manager-core/src/{authz,api_keys_api,api_key_repo}.rs`, the extended `auth_middleware.rs`, shared types under `crates/shared/src/auth.rs`, and migration `0006_users_authz.sql`. `can(principal, capability)` and `require(principal, capability)` are the single gate every admin handler goes through. **Purpose**: bridge Phase 3b → Phase 4. Phase 4's v1.1 SDKs (KV, docs, HTTP, cron) each gate access on the calling principal. Without a real authorization model in place, every SDK addition has to either invent its own gate or stay open. Phase 3.5 lands `can(principal, capability)` as the single check every future SDK + admin endpoint goes through, so v1.1 work focuses on data plane shape, not on re-litigating auth. @@ -1223,7 +1223,7 @@ Defer to follow-up sessions: dashboard surfaces for invites / key minting (curl --- -### Phase 3: v1.0.x — Foundations (Current focus) +### Phase 3: v1.0.x — Foundations ✓ (Shipped) Three foundation pieces that must land before the v1.1 service expansion, because retrofitting them later is expensive. @@ -1231,24 +1231,27 @@ Three foundation pieces that must land before the v1.1 service expansion, becaus **3b. Multi-app scoping** — ✓ shipped. See section 11.5. `apps`, `app_domains`, `app_slug_history` tables; `app_id` columns on `scripts`, `routes`, `execution_logs`. Migration assigns existing data to a `default` app and always claims `localhost`; a Rust-side bootstrap inserts a `Hello World` script + `/hello` route when the default app is empty. Orchestrator dispatch is two-phase (Host → app → route trie). `/api/v1/execute/{id}/*` continues to work without a public domain claim. Dashboard is app-hierarchical (`/admin/apps`, `/admin/apps/{slug}/...`); API stays flat with new endpoints under `/api/v1/admin/apps/*` and a `?app=` filter on script listing. Per-app admin roles deferred. -**3c. Users, roles, and bearer-token auth** — pending. See section 11.6. Adds `instance_role` to `admin_users` (`owner`/`admin`/`member`), `app_members` for per-app `app_admin`/`editor`/`viewer` grants, and `api_keys` for `Authorization: Bearer pic_…` credentials. Unifies cookie-session and API-key paths behind a single `can(principal, capability)` gate; list endpoints filter by membership at SQL for `member` users. Dashboard surfaces, invites, MFA, service accounts, and the `picloud` CLI binary are deferred — schema room only. +**3c. Users, roles, and bearer-token auth (Phase 3.5)** — ✓ shipped. See section 11.6. Adds `instance_role` to `admin_users` (`owner`/`admin`/`member`), `app_members` for per-app `app_admin`/`editor`/`viewer` grants, and `api_keys` for `Authorization: Bearer pic_…` credentials. Unifies cookie-session and API-key paths behind a single `can(principal, capability)` gate; list endpoints filter by membership at SQL for `member` users. Dashboard surfaces, invites, MFA, service accounts, and the `picloud` CLI binary are deferred — schema room only. **Why all three before v1.1**: every v1.1 service (KV, docs, users, etc.) needs both an `app_id` scoping key in its schema and a `Principal` to authorize against. Adding both now is one migration each on a small surface; adding them after the SDKs ship is many migrations on populated data plus a re-gate of every SDK call. --- -### Phase 4: v1.1 (Expand Capabilities & Services) -Ordered roughly by foundation value: each row enables the rows below it. +### Phase 4: v1.1 (Expand Capabilities & Services) — Current focus -1. **Rhai SDK: KV Store** (`kv.get/set/delete/has` with collections, scoped per app) -2. **Rhai SDK: Document Store** (`docs.create/find/update/delete/list/query`, scoped per app) -3. **Rhai SDK: HTTP** (`http.get/post/put/delete` with SSRF deny-list) -4. **Cron triggers** (manager scheduler skeleton already exists; needs schedules table + `FOR UPDATE SKIP LOCKED` dispatch) -5. **Rhai SDK: Email** (`email.send` via SMTP; needs per-deploy config) -6. **Rhai SDK: User Management** (auth, CRUD, roles, permissions, invitations, password reset; depends on email for invites; scoped per app) -7. **Queue triggers** (start with Postgres LISTEN/NOTIFY; RabbitMQ/Redis later if needed) -8. **`invoke()` + `retry::*`** (function-to-function calls; execution_logs gain `parent_execution_id`) -9. **Secrets management** (encrypted env vars, per app) +Released in patch steps (v1.1.0 → v1.1.8), each landing one focused capability. The split lets each release ship behind tests + docs without long-lived branches. SDK shape (handle pattern, `::` namespace, error convention, `ExecutionGate`, `SdkCallCx`, `ServiceEventEmitter` — see §7.5 and [docs/sdk-shape.md](../docs/sdk-shape.md)) is fixed in v1.1.0; every subsequent release fills in the contents without re-litigating the shape. + +| Version | Capability | +|---------|------------| +| **v1.1.0** | **Foundation & Standard Library** — SDK shape (`Services` bundle, `SdkCallCx`, `ExecutionGate`, `ServiceEventEmitter` trait shape); stdlib utilities (regex, random, time, json, base64, hex, url). | +| **v1.1.1** | **Storage & Events** — KV store keyed `(app_id, collection, key)`; triggers framework (outbox + dispatcher + trigger CRUD + `ctx.event` + depth limit); KV trigger kinds. | +| **v1.1.2** | **Documents** — `docs::collection(name).create/find/update/delete/list` with `docs:*` triggers. | +| **v1.1.3** | **Modules** — `scripts.kind`, per-app resolver replaces `DummyModuleResolver`, AST cache + dep-graph invalidation. | +| **v1.1.4** | **Outbound HTTP & Scheduled Tasks** — `http::*` with SSRF deny-list; cron triggers. | +| **v1.1.5** | **Files & Messaging** — filesystem-backed blobs with `files:*` triggers; pub/sub via LISTEN/NOTIFY with `pubsub:*` triggers. | +| **v1.1.6** | **Configuration & Email** — encrypted per-app secrets; outbound `email::send` / `send_html` + inbound `email:receive` trigger. | +| **v1.1.7** | **User Management** — `users::*` for in-script CRUD, auth, roles, invites, password reset. | +| **v1.1.8** | **Durable Queues & Function Composition** — `queue::*` with `queue:receive` trigger; `invoke()` + `retry::*` (closures-as-args, re-entrant Rhai). | --- @@ -1309,59 +1312,71 @@ Ordered roughly by foundation value: each row enables the rows below it. | **ctx** (global) | `ctx.execution_id`, `ctx.script_id`, `ctx.script_name`, `ctx.request_id`, `ctx.trace_id`, `ctx.invocation_type`, `ctx.parent_execution_id`, `ctx.request.path`, `ctx.request.headers`, `ctx.request.body` | MVP+ | | **Response** | Return `{ statusCode, headers?, body }` | MVP | +## 7.5 SDK Architecture (v1.1.x foundation) + +Stateful Rhai SDK services (KV, docs, HTTP, …) hang off a common shape laid down by the v1.1.0 SDK foundation PR. Full reference lives in [docs/sdk-shape.md](../docs/sdk-shape.md); this section sketches the moving parts so other sections can refer to them by name. + +**`Services` bundle** (`picloud_shared::Services`) — an `#[non_exhaustive]` struct constructed once at startup. v1.1.0 ships it empty; each subsequent v1.1.x PR adds one `Arc` / `Arc` / … field. Held on `Engine`, passed by reference to the per-call registration hook. + +**Per-call context** (`picloud_shared::SdkCallCx`) — every stateful service trait method takes `&SdkCallCx` as its first non-self argument. Carries `app_id`, `Option`, `execution_id`, `request_id`, and the `trigger_depth` / `root_execution_id` slots that the triggers framework populates. Services derive `app_id` from the cx — never from script-passed args. **That rule is the cross-app isolation boundary**; scripts cannot name another app's data. + +**Handle pattern** — collection-scoped services expose `kv::collection("widgets").get("k")`, not `kv::get("widgets", "k")`. Removes the wrong-collection-name foot-gun and lets implementations cache per-collection state. `(app_id, collection, key)` is the identity tuple for KV; `(app_id, collection, id)` for docs. Collections are mandatory. + +**Error convention** — throw on failure, `()` for absent, `bool` for predicates. Uniform across every v1.1.x service. Scripts opt into handling errors via Rhai's `try/catch`. + +**`ExecutionGate`** (`orchestrator-core::gate::ExecutionGate`) — single global semaphore capping concurrent script executions. Default 32, override via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var. Non-blocking — on overflow, the orchestrator returns HTTP 503 with `Retry-After: 1` immediately. No queue. Rationale: Rhai runs under `spawn_blocking`, so unbounded concurrency would park every blocking thread and starve every other workload. + +**`ServiceEventEmitter`** (`picloud_shared::ServiceEventEmitter`) — every mutating service method emits a `ServiceEvent { source, op, collection, key, payload, old_payload }`. v1.1.0 ships `NoopEventEmitter`; the real outbox-backed dispatcher lands with v1.1.1 (see 7.5.1). + +### 7.5.1 Trigger architecture (sketch) + +Triggers fire scripts in response to service events. Three locked properties; full design and CRUD endpoints land with v1.1.1. + +1. **Async outbox**: services emit events synchronously into a Postgres outbox table; a separate dispatcher worker reads, matches them against registered triggers, and fans out script executions. Service writes don't block on trigger fan-out. +2. **Depth-limited**: each trigger-spawned execution increments `cx.trigger_depth`. The dispatcher refuses to fan out beyond a configured ceiling to prevent runaway feedback loops. `cx.root_execution_id` preserves the originating execution id for audit grouping. +3. **Trigger model**: a trigger is `(service, event, filter) → script`, stored in a `triggers` table. The filter is the dispatcher's match predicate on the emitted `ServiceEvent`. + ### 8.1 KV Store Service -**Purpose**: Simple key-value persistence organized by collections, shared across script invocations and scripts. +**Purpose**: Simple key-value persistence organized by collections, scoped per app and shared across script invocations and scripts within that app. -**PostgreSQL Setup:** +**PostgreSQL Schema:** ```sql --- Enable hstore extension (one-time setup) -CREATE EXTENSION IF NOT EXISTS hstore; - --- Create KV table with collection support CREATE TABLE kv_store ( + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, collection TEXT NOT NULL, - key TEXT NOT NULL, - value hstore NOT NULL, + key TEXT NOT NULL, + value JSONB NOT NULL, expires_at TIMESTAMP, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(), - - PRIMARY KEY (collection, key) + + PRIMARY KEY (app_id, collection, key) ); -CREATE INDEX idx_kv_collection ON kv_store(collection); -CREATE INDEX idx_kv_expires ON kv_store(expires_at) +CREATE INDEX idx_kv_app_collection ON kv_store(app_id, collection); +CREATE INDEX idx_kv_expires ON kv_store(expires_at) WHERE expires_at IS NOT NULL; ``` -**Why hstore + collections?** -- Lightweight, purpose-built for key-value storage -- Collections allow logical grouping (e.g., `kv:sessions`, `kv:counters`, `kv:flags`) -- Faster than JSONB for simple KV use cases -- Built-in indexing support -- Keeps all data in one database (no Redis dependency) +**Why JSONB + mandatory collections + `app_id` first:** +- `(app_id, collection, key)` is the identity tuple. The PK begins with `app_id` so the index is naturally per-app; cross-app reads can't happen even if the service layer has a bug. +- Collections are **mandatory** — every set / get / delete names one. The same key can legitimately live in multiple collections within one app (`sessions:abc` and `counters:abc` are distinct rows). +- JSONB carries arbitrary script-side values (nested objects, arrays) without a separate serialization step. `hstore` was considered and ruled out — it doesn't carry nested types and would force a second JSONB column the moment a script writes a structured value. -**Rhai SDK:** +**Value-size cap:** 64 KiB per value, enforced at the service layer (script-visible error on overflow). The cap keeps KV "small fast values, not blob storage"; the v1.1.5 files SDK is the right home for large payloads. + +**Rhai SDK (handle pattern — see [docs/sdk-shape.md](docs/sdk-shape.md)):** ```rhai -// Get a value from a collection -let val = kv.get("sessions", "user:123"); // Returns object or null +let sessions = kv::collection("sessions"); +sessions.set("user:123", #{ token: "abc", created: "2026-04-10" }); +let val = sessions.get("user:123"); // value or () if absent +sessions.delete("user:123"); +sessions.set("user:123", #{ token: "xyz" }, 3600); // TTL in seconds +if sessions.has("user:123") { ... } -// Set a value in a collection -kv.set("sessions", "user:123", { token: "abc", created: "2026-04-10" }); - -// Delete a key from a collection -kv.delete("sessions", "user:123"); - -// Set with TTL (seconds) -kv.set("sessions", "user:123", { token: "xyz" }, 3600); - -// Check if key exists in a collection -if kv.has("sessions", "user:123") { ... } - -// Use different collections for different purposes -kv.set("counters", "api:calls", 42); -kv.set("flags", "feature:beta", true); -kv.set("cache", "page:home", { html: "..." }); +// Distinct collections in one script — different handles. +let counters = kv::collection("counters"); +counters.set("api:calls", 42); ``` **Use Cases:** From 9b4a8346272a61ef8b1f0827d69b6884d0cf1bac Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:58:29 +0200 Subject: [PATCH 7/9] =?UTF-8?q?chore(claude-md):=20refresh=20for=20v1.1.0?= =?UTF-8?q?=20=E2=80=94=20focus,=20working=20rules,=20env=20vars?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Current focus moves to v1.1.0 (SDK foundation + stdlib) with a pointer to docs/sdk-shape.md. Notes Phase 3.5 capability gating is shipped end-to-end. - Tech-stack line drops the misleading "v1.1+ hstore" mention; v1.1+ data-plane tables now use JSONB (see blueprint §8.1). - New Working Rules bullet for the handle pattern + SdkCallCx rule: services derive app_id from cx.app_id, never from a script-passed arg. That is the cross-app isolation boundary. - New "Runtime configuration" table documenting every env var the picloud binary consumes — including the new PICLOUD_MAX_CONCURRENT_EXECUTIONS alongside the existing PICLOUD_BIND, DATABASE_URL, session TTL, and sandbox knobs. Co-Authored-By: Claude Opus 4.7 (1M context) --- CLAUDE.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index bd5ec44..9e27af9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -8,7 +8,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co Authoritative design: [serverless_cloud_blueprint.md](serverless_cloud_blueprint.md). The blueprint is a living document — when architecture decisions are made in conversation that contradict it, treat the latest decision as truth and update the blueprint. -**Current focus (Phase 4, v1.1):** data-plane SDKs — KV store, then document store, then HTTP client, then cron triggers. See blueprint §12. Phase 3 (admin auth + multi-app scoping) shipped; every v1.1+ table starts with `app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE` and every Rhai SDK call resolves its app from the execution context. +**Current focus (Phase 4, v1.1.0):** SDK foundation + stdlib utilities — the shape every v1.1.x service module hangs off, see [docs/sdk-shape.md](docs/sdk-shape.md). Subsequent v1.1.x releases (KV in v1.1.1, docs in v1.1.2, …) fill it in; see blueprint §12 for the full table. Phase 3 shipped end-to-end: admin auth, multi-app scoping, and Phase 3.5 capability gating (`manager-core::authz::{can, require, Capability}` + migration `0006_users_authz.sql`). Every v1.1+ table starts with `app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE` and every Rhai SDK call resolves its app from the execution context. ## Three-Service Architecture @@ -48,7 +48,7 @@ Caddy fronts everything. Same Caddyfile shape works for single-node and cluster - **Rust 1.92+** workspace, pinned via `rust-toolchain.toml` - **Axum** for HTTP, **Tokio** async, **sqlx** for Postgres - **Rhai** embedded scripting (in `executor-core`) -- **PostgreSQL 15+** with `pgcrypto` and (v1.1+) `hstore` +- **PostgreSQL 15+** with `pgcrypto`. v1.1+ data-plane tables use JSONB for value columns (hstore was considered for KV and rejected — see blueprint §8.1). - **SvelteKit** dashboard, static adapter, CodeMirror 6 for the script editor - **Caddy 2** reverse proxy (auto-HTTPS in prod) - **Docker Compose** for dev and single-node prod @@ -103,9 +103,22 @@ docs/ - **Honor the three-service boundary.** Don't reach across `*-core` crates. If `orchestrator-core` needs something from `manager-core`, define a trait in `shared` and inject the impl. - **`executor-core` has no Postgres dependency.** Data-plane services (kv, docs, users — v1.1+) come in via injected `ServiceProvider` traits. - **Database writes only from `manager-core`.** `orchestrator-core` reads scripts (cached); `executor-core` doesn't touch the DB. +- **Stateful SDK services use the handle pattern + `SdkCallCx`.** Collection-scoped surfaces look like `kv::collection("x").get(k)`, not `kv::get("x", k)`. Every service trait method takes `&SdkCallCx` and **MUST** derive `app_id` from `cx.app_id` — never trust a script-passed `app_id`. That is the cross-app isolation boundary. See [docs/sdk-shape.md](docs/sdk-shape.md). - **MVP builds only the `picloud` all-in-one binary.** The three split binaries exist as skeletons so the crate boundaries stay honest; flesh them out only when cluster mode is being implemented. - **Trunk-based dev.** See [docs/git-workflow.md](docs/git-workflow.md). No long-lived branches. Feature flags for incomplete work. +## Runtime configuration + +Environment variables consumed by the `picloud` binary: + +| Variable | Default | Purpose | +|---|---|---| +| `PICLOUD_BIND` | `0.0.0.0:8080` | HTTP listen address. Port 8080 is owned by another process on this host — override locally. | +| `PICLOUD_MAX_CONCURRENT_EXECUTIONS` | `32` | Global concurrency cap on data-plane script executions. Overflow returns HTTP 503 with `Retry-After: 1` immediately (no queue). | +| `DATABASE_URL` | — | Required. Postgres connection string. | +| `PICLOUD_SESSION_TTL_HOURS` | `24` | Sliding-window session lifetime. | +| `PICLOUD_SANDBOX_MAX_*` | conservative defaults | Per-knob admin ceilings on Rhai sandbox overrides. See `manager-core::sandbox::SandboxCeiling`. | + ## Out of MVP Queue triggers, cron triggers, SMTP ingress, KV / docs / email / users / HTTP SDKs in scripts, interceptors, workflows, function-to-function `invoke()`, secrets, metrics dashboard. All deferred to v1.1+ per the blueprint. Don't pre-build for them — but don't make decisions that close the door on them either. From 098e18a989e2e3a457bc02cc76244bf28b3c9605 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 19:00:35 +0200 Subject: [PATCH 8/9] chore(clippy): silence three v1.1.0-foundation lints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - sdk/bridge.rs: drop #[must_use] on the bridge fns — `Dynamic` and `serde_json::Value` are both #[must_use] already; the wrapper attribute is double-must-use noise. - api.rs IntoResponse: hoist `use ApiError as E;` above the early Overloaded branch so `E::Exec(...)` works in the if-let too (clippy::items_after_statements). - gate.rs test: bind the returned permit with `let _ =` so the OwnedSemaphorePermit doesn't trip unused-must-use. No behaviour change. Caught by `cargo clippy --all-targets --all-features -- -D warnings`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/executor-core/src/sdk/bridge.rs | 2 -- crates/orchestrator-core/src/api.rs | 4 ++-- crates/orchestrator-core/src/gate.rs | 3 ++- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/executor-core/src/sdk/bridge.rs b/crates/executor-core/src/sdk/bridge.rs index 07d223d..57b6f8f 100644 --- a/crates/executor-core/src/sdk/bridge.rs +++ b/crates/executor-core/src/sdk/bridge.rs @@ -14,7 +14,6 @@ use serde_json::Value as Json; /// pushing into a script's scope. Numbers prefer the narrowest type /// (`i64` over `f64`); anything that can't round-trip falls back to a /// string so the script always sees a defined value. -#[must_use] pub fn json_to_dynamic(value: Json) -> Dynamic { match value { Json::Null => Dynamic::UNIT, @@ -48,7 +47,6 @@ pub fn json_to_dynamic(value: Json) -> Dynamic { /// types (timestamps, user-registered modules) fall back to their /// `Display` form so they appear as strings in JSON output rather than /// failing the response build. -#[must_use] pub fn dynamic_to_json(value: &Dynamic) -> Json { if value.is_unit() { return Json::Null; diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index 779b93e..a412ff5 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -424,7 +424,8 @@ impl IntoResponse for ApiError { // header (Retry-After), so it short-circuits the (status, body) // reduction below. Axum's tuple builder makes per-arm header // injection awkward otherwise. - if let ApiError::Exec(ExecError::Overloaded { retry_after_secs }) = &self { + use ApiError as E; + if let E::Exec(ExecError::Overloaded { retry_after_secs }) = &self { let retry = retry_after_secs.to_string(); let body = Json(serde_json::json!({ "error": self.to_string() })); return ( @@ -435,7 +436,6 @@ impl IntoResponse for ApiError { .into_response(); } - use ApiError as E; let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()), diff --git a/crates/orchestrator-core/src/gate.rs b/crates/orchestrator-core/src/gate.rs index 5fd6ce3..ba6698e 100644 --- a/crates/orchestrator-core/src/gate.rs +++ b/crates/orchestrator-core/src/gate.rs @@ -142,7 +142,8 @@ mod tests { { let _p = gate.try_acquire().expect("first permit available"); } - gate.try_acquire() + let _ = gate + .try_acquire() .expect("slot must be returned after permit drops"); } From e3757357963f8007ed3a0f874ac3a0017749795a Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 20:10:05 +0200 Subject: [PATCH 9/9] docs(blueprint+gate): drop hstore from Tech Stack; note gate-vs-timeout interaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two review-pass nits from the v1.1.0-foundation review: - Blueprint §6 Tech Stack table still listed the database as "PostgreSQL + hstore" with an hstore-for-KV rationale — directly contradicting the §8.1 KV rewrite that explicitly rejected hstore in favour of JSONB. Updates the row so the high-level summary matches the §8.1 reasoning. - LocalExecutorClient::execute now documents the permit-vs-timeout interaction: when tokio::time::timeout fires the future drops and the permit returns, but the detached spawn_blocking thread keeps running until the Rhai script winds down. In-use blocking threads can briefly exceed the gate's permit count after a timeout. Calling it out so future readers don't read the implementation as buggy. No behaviour change. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/orchestrator-core/src/client.rs | 10 ++++++++-- serverless_cloud_blueprint.md | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator-core/src/client.rs b/crates/orchestrator-core/src/client.rs index feaca81..3b2b218 100644 --- a/crates/orchestrator-core/src/client.rs +++ b/crates/orchestrator-core/src/client.rs @@ -57,8 +57,14 @@ impl ExecutorClient for LocalExecutorClient { timeout: Duration, ) -> Result { // Acquire before spending any wall-clock budget. The permit is - // held until this future returns; spawn_blocking inherits the - // gating via the captured `_permit`. + // held by this future; on `tokio::time::timeout` firing, the + // future drops and the permit returns to the pool — but the + // detached `spawn_blocking` thread keeps running until the + // Rhai script finishes (or panics). So in-use blocking threads + // can briefly exceed the gate's permit count after a timeout. + // That is intentional: a new admission can be served while the + // already-doomed script winds down, which is preferable to + // wedging the slot for the worst-case timeout duration. let _permit = self.gate .try_acquire() diff --git a/serverless_cloud_blueprint.md b/serverless_cloud_blueprint.md index f3c2341..75877c0 100644 --- a/serverless_cloud_blueprint.md +++ b/serverless_cloud_blueprint.md @@ -661,7 +661,7 @@ users.set_permissions(user_id, { |-------|-----------|-----------| | **Orchestrator** | Rust + Axum | Performance, safety, async-first; minimal overhead | | **Dashboard** | Alpine.js + vanilla HTML/CSS | Zero dependencies, simple to deploy, fast enough for MVP | -| **Database** | PostgreSQL + hstore | Robust ACID database; hstore extension for lightweight KV (v1.1) | +| **Database** | PostgreSQL 15+ (`pgcrypto`) | Robust ACID database; JSONB carries data-plane values (v1.1+). See §8.1. | | **Container Runtime** | Docker (Docker daemon) | Industry standard, simple CLI | | **Executor Image** | Alpine Linux + Rhai | Minimal image size (~50-100MB), fast startup | | **Scripting** | Rhai | Lightweight, embedded-friendly, safe by default |