Merge branch 'feat/v1.1.0-sdk-foundation'

v1.1.0 PR #0 — SDK Foundation.

Lands the architectural shape every v1.1.x stateful service hangs off,
without shipping any user-visible service. After this PR, subsequent
service PRs (KV v1.1.1, docs v1.1.2, …) are mechanical fill-in:

  - picloud_shared::{SdkCallCx, Services, ServiceEventEmitter +
    NoopEventEmitter} lock the per-call context, service bundle,
    and event-emission trait shape.
  - executor-core::sdk/ — register_all hook called per invocation;
    json↔dynamic bridge moved here from engine.rs.
  - ExecRequest gained app_id, principal, trigger_depth,
    root_execution_id (the last two reserved for v1.1.1's triggers
    framework).
  - orchestrator-core::gate::ExecutionGate — single global semaphore
    (PICLOUD_MAX_CONCURRENT_EXECUTIONS, default 32). Overflow returns
    503 + Retry-After: 1 immediately, no queue.
  - manager-core::attach_principal_if_present — opportunistic,
    fail-open middleware wired on data-plane + user-routes.
  - docs/sdk-shape.md — developer-facing reference for the
    conventions every future service PR implements against.
  - Blueprint revisions: Phase 3.5 marked ✓ Shipped, §8.1 KV switched
    from hstore to JSONB, new §7.5 SDK Architecture section and §7.5.1
    trigger sketch, §12 Phase 4 restructured into v1.1.0 → v1.1.8.
  - CLAUDE.md: current focus → v1.1.0, JSONB note, handle-pattern
    Working Rule, Runtime Configuration table with the new env var.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-30 20:11:08 +02:00
22 changed files with 1028 additions and 153 deletions

View File

@@ -8,7 +8,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
Authoritative design: [serverless_cloud_blueprint.md](serverless_cloud_blueprint.md). The blueprint is a living document — when architecture decisions are made in conversation that contradict it, treat the latest decision as truth and update the blueprint. Authoritative design: [serverless_cloud_blueprint.md](serverless_cloud_blueprint.md). The blueprint is a living document — when architecture decisions are made in conversation that contradict it, treat the latest decision as truth and update the blueprint.
**Current focus (Phase 4, v1.1):** data-plane SDKs — KV store, then document store, then HTTP client, then cron triggers. See blueprint §12. Phase 3 (admin auth + multi-app scoping) shipped; every v1.1+ table starts with `app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE` and every Rhai SDK call resolves its app from the execution context. **Current focus (Phase 4, v1.1.0):** SDK foundation + stdlib utilities — the shape every v1.1.x service module hangs off, see [docs/sdk-shape.md](docs/sdk-shape.md). Subsequent v1.1.x releases (KV in v1.1.1, docs in v1.1.2, …) fill it in; see blueprint §12 for the full table. Phase 3 shipped end-to-end: admin auth, multi-app scoping, and Phase 3.5 capability gating (`manager-core::authz::{can, require, Capability}` + migration `0006_users_authz.sql`). Every v1.1+ table starts with `app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE` and every Rhai SDK call resolves its app from the execution context.
## Three-Service Architecture ## Three-Service Architecture
@@ -48,7 +48,7 @@ Caddy fronts everything. Same Caddyfile shape works for single-node and cluster
- **Rust 1.92+** workspace, pinned via `rust-toolchain.toml` - **Rust 1.92+** workspace, pinned via `rust-toolchain.toml`
- **Axum** for HTTP, **Tokio** async, **sqlx** for Postgres - **Axum** for HTTP, **Tokio** async, **sqlx** for Postgres
- **Rhai** embedded scripting (in `executor-core`) - **Rhai** embedded scripting (in `executor-core`)
- **PostgreSQL 15+** with `pgcrypto` and (v1.1+) `hstore` - **PostgreSQL 15+** with `pgcrypto`. v1.1+ data-plane tables use JSONB for value columns (hstore was considered for KV and rejected — see blueprint §8.1).
- **SvelteKit** dashboard, static adapter, CodeMirror 6 for the script editor - **SvelteKit** dashboard, static adapter, CodeMirror 6 for the script editor
- **Caddy 2** reverse proxy (auto-HTTPS in prod) - **Caddy 2** reverse proxy (auto-HTTPS in prod)
- **Docker Compose** for dev and single-node prod - **Docker Compose** for dev and single-node prod
@@ -103,9 +103,22 @@ docs/
- **Honor the three-service boundary.** Don't reach across `*-core` crates. If `orchestrator-core` needs something from `manager-core`, define a trait in `shared` and inject the impl. - **Honor the three-service boundary.** Don't reach across `*-core` crates. If `orchestrator-core` needs something from `manager-core`, define a trait in `shared` and inject the impl.
- **`executor-core` has no Postgres dependency.** Data-plane services (kv, docs, users — v1.1+) come in via injected `ServiceProvider` traits. - **`executor-core` has no Postgres dependency.** Data-plane services (kv, docs, users — v1.1+) come in via injected `ServiceProvider` traits.
- **Database writes only from `manager-core`.** `orchestrator-core` reads scripts (cached); `executor-core` doesn't touch the DB. - **Database writes only from `manager-core`.** `orchestrator-core` reads scripts (cached); `executor-core` doesn't touch the DB.
- **Stateful SDK services use the handle pattern + `SdkCallCx`.** Collection-scoped surfaces look like `kv::collection("x").get(k)`, not `kv::get("x", k)`. Every service trait method takes `&SdkCallCx` and **MUST** derive `app_id` from `cx.app_id` — never trust a script-passed `app_id`. That is the cross-app isolation boundary. See [docs/sdk-shape.md](docs/sdk-shape.md).
- **MVP builds only the `picloud` all-in-one binary.** The three split binaries exist as skeletons so the crate boundaries stay honest; flesh them out only when cluster mode is being implemented. - **MVP builds only the `picloud` all-in-one binary.** The three split binaries exist as skeletons so the crate boundaries stay honest; flesh them out only when cluster mode is being implemented.
- **Trunk-based dev.** See [docs/git-workflow.md](docs/git-workflow.md). No long-lived branches. Feature flags for incomplete work. - **Trunk-based dev.** See [docs/git-workflow.md](docs/git-workflow.md). No long-lived branches. Feature flags for incomplete work.
## Runtime configuration
Environment variables consumed by the `picloud` binary:
| Variable | Default | Purpose |
|---|---|---|
| `PICLOUD_BIND` | `0.0.0.0:8080` | HTTP listen address. Port 8080 is owned by another process on this host — override locally. |
| `PICLOUD_MAX_CONCURRENT_EXECUTIONS` | `32` | Global concurrency cap on data-plane script executions. Overflow returns HTTP 503 with `Retry-After: 1` immediately (no queue). |
| `DATABASE_URL` | — | Required. Postgres connection string. |
| `PICLOUD_SESSION_TTL_HOURS` | `24` | Sliding-window session lifetime. |
| `PICLOUD_SANDBOX_MAX_*` | conservative defaults | Per-knob admin ceilings on Rhai sandbox overrides. See `manager-core::sandbox::SandboxCeiling`. |
## Out of MVP ## Out of MVP
Queue triggers, cron triggers, SMTP ingress, KV / docs / email / users / HTTP SDKs in scripts, interceptors, workflows, function-to-function `invoke()`, secrets, metrics dashboard. All deferred to v1.1+ per the blueprint. Don't pre-build for them — but don't make decisions that close the door on them either. Queue triggers, cron triggers, SMTP ingress, KV / docs / email / users / HTTP SDKs in scripts, interceptors, workflows, function-to-function `invoke()`, secrets, metrics dashboard. All deferred to v1.1+ per the blueprint. Don't pre-build for them — but don't make decisions that close the door on them either.

View File

@@ -3,30 +3,38 @@ use std::sync::{Arc, Mutex};
use std::time::Instant; use std::time::Instant;
use chrono::Utc; use chrono::Utc;
use picloud_shared::{ScriptValidator, ValidationError, SDK_VERSION}; use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION};
use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope};
use serde_json::Value as Json; use serde_json::Value as Json;
use crate::sandbox::Limits; use crate::sandbox::Limits;
use crate::sdk;
use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic};
use crate::types::{ use crate::types::{
ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel, ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel,
}; };
/// Preconfigured Rhai engine with sandbox limits applied. /// Preconfigured Rhai engine with sandbox limits applied and the SDK
/// `Services` bundle attached.
/// ///
/// One `Engine` is constructed at process startup and reused across /// One `Engine` is constructed at process startup and reused across
/// invocations. `execute` is **synchronous** — it owns the per-call /// invocations. `execute` is **synchronous** — it owns the per-call
/// scope and log buffer. Wall-clock timeouts and offloading off the /// scope and log buffer. Wall-clock timeouts and offloading off the
/// async runtime belong to the caller (orchestrator-core's /// async runtime belong to the caller (orchestrator-core's
/// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`). /// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`).
///
/// The `Services` bundle is empty in v1.1.0; subsequent v1.1.x PRs add
/// service handles (KV, docs, …) and `sdk::register_all` wires them
/// into each per-call Rhai engine.
pub struct Engine { pub struct Engine {
limits: Limits, limits: Limits,
services: Services,
} }
impl Engine { impl Engine {
#[must_use] #[must_use]
pub fn new(limits: Limits) -> Self { pub fn new(limits: Limits, services: Services) -> Self {
Self { limits } Self { limits, services }
} }
#[must_use] #[must_use]
@@ -55,7 +63,20 @@ impl Engine {
pub fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> { pub fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> {
let effective_limits = self.limits.with_overrides(&req.sandbox_overrides); let effective_limits = self.limits.with_overrides(&req.sandbox_overrides);
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new())); let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let engine = build_engine(effective_limits, Some(logs.clone())); let mut engine = build_engine(effective_limits, Some(logs.clone()));
// Per-call context handed to every stateful SDK service via the
// `sdk::register_all` hook. The Arc lets future service closures
// capture cheap clones of the cx for use at script-call time.
let cx = Arc::new(SdkCallCx {
app_id: req.app_id,
principal: req.principal.clone(),
execution_id: req.execution_id,
request_id: req.request_id,
trigger_depth: req.trigger_depth,
root_execution_id: req.root_execution_id,
});
sdk::register_all(&mut engine, &self.services, cx);
let ast = engine let ast = engine
.compile(source) .compile(source)
@@ -265,69 +286,6 @@ fn parse_structured_response(map: Map) -> Result<(u16, BTreeMap<String, String>,
Ok((status_code, headers, body)) Ok((status_code, headers, body))
} }
// ----------------------------------------------------------------------------
// Rhai ↔ serde_json bridges
// ----------------------------------------------------------------------------
fn json_to_dynamic(value: Json) -> Dynamic {
match value {
Json::Null => Dynamic::UNIT,
Json::Bool(b) => b.into(),
Json::Number(n) => {
if let Some(i) = n.as_i64() {
i.into()
} else if let Some(f) = n.as_f64() {
f.into()
} else {
n.to_string().into()
}
}
Json::String(s) => s.into(),
Json::Array(arr) => arr
.into_iter()
.map(json_to_dynamic)
.collect::<Vec<Dynamic>>()
.into(),
Json::Object(obj) => {
let mut m = Map::new();
for (k, v) in obj {
m.insert(k.into(), json_to_dynamic(v));
}
Dynamic::from(m)
}
}
}
fn dynamic_to_json(value: &Dynamic) -> Json {
if value.is_unit() {
return Json::Null;
}
if let Ok(b) = value.as_bool() {
return Json::Bool(b);
}
if let Ok(i) = value.as_int() {
return Json::Number(i.into());
}
if let Ok(f) = value.as_float() {
return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number);
}
if value.is_string() {
return Json::String(value.clone().into_string().unwrap_or_default());
}
if let Some(arr) = value.clone().try_cast::<rhai::Array>() {
return Json::Array(arr.iter().map(dynamic_to_json).collect());
}
if let Some(map) = value.clone().try_cast::<Map>() {
let mut out = serde_json::Map::new();
for (k, v) in map {
out.insert(k.to_string(), dynamic_to_json(&v));
}
return Json::Object(out);
}
// Anything else (timestamps, custom types) — best-effort string form.
Json::String(value.to_string())
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Error mapping // Error mapping
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------

View File

@@ -8,6 +8,7 @@ pub mod context;
pub mod engine; pub mod engine;
pub mod logging; pub mod logging;
pub mod sandbox; pub mod sandbox;
pub mod sdk;
pub mod types; pub mod types;
pub use engine::Engine; pub use engine::Engine;

View File

@@ -0,0 +1,77 @@
//! JSON ↔ Rhai `Dynamic` value bridge.
//!
//! Originally inline in `engine.rs`; moved here for v1.1.0 so future
//! service modules (KV in v1.1.1, docs in v1.1.2, …) can convert
//! values without `engine.rs` being the only owner of the conversions.
//! Behaviour is unchanged from the pre-extraction implementation —
//! `sdk_contract.rs::json_round_trip_preserves_nested_shapes` pins the
//! observable round-trip.
use rhai::{Dynamic, Map};
use serde_json::Value as Json;
/// Convert a `serde_json::Value` into a Rhai `Dynamic` suitable for
/// pushing into a script's scope. Numbers prefer the narrowest type
/// (`i64` over `f64`); anything that can't round-trip falls back to a
/// string so the script always sees a defined value.
pub fn json_to_dynamic(value: Json) -> Dynamic {
match value {
Json::Null => Dynamic::UNIT,
Json::Bool(b) => b.into(),
Json::Number(n) => {
if let Some(i) = n.as_i64() {
i.into()
} else if let Some(f) = n.as_f64() {
f.into()
} else {
n.to_string().into()
}
}
Json::String(s) => s.into(),
Json::Array(arr) => arr
.into_iter()
.map(json_to_dynamic)
.collect::<Vec<Dynamic>>()
.into(),
Json::Object(obj) => {
let mut m = Map::new();
for (k, v) in obj {
m.insert(k.into(), json_to_dynamic(v));
}
Dynamic::from(m)
}
}
}
/// Convert a Rhai `Dynamic` back to a `serde_json::Value`. Custom Rhai
/// types (timestamps, user-registered modules) fall back to their
/// `Display` form so they appear as strings in JSON output rather than
/// failing the response build.
pub fn dynamic_to_json(value: &Dynamic) -> Json {
if value.is_unit() {
return Json::Null;
}
if let Ok(b) = value.as_bool() {
return Json::Bool(b);
}
if let Ok(i) = value.as_int() {
return Json::Number(i.into());
}
if let Ok(f) = value.as_float() {
return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number);
}
if value.is_string() {
return Json::String(value.clone().into_string().unwrap_or_default());
}
if let Some(arr) = value.clone().try_cast::<rhai::Array>() {
return Json::Array(arr.iter().map(dynamic_to_json).collect());
}
if let Some(map) = value.clone().try_cast::<Map>() {
let mut out = serde_json::Map::new();
for (k, v) in map {
out.insert(k.to_string(), dynamic_to_json(&v));
}
return Json::Object(out);
}
Json::String(value.to_string())
}

View File

@@ -0,0 +1,10 @@
//! Re-export of `picloud_shared::SdkCallCx`.
//!
//! The type itself lives in `picloud-shared` because future stateful
//! service impls live in `manager-core` (which `executor-core` must
//! not depend on) and need to reference the same cx shape. This
//! re-export lets executor-side code write
//! `use picloud_executor_core::sdk::SdkCallCx;` instead of reaching
//! into `picloud_shared` for one type.
pub use picloud_shared::SdkCallCx;

View File

@@ -0,0 +1,39 @@
//! SDK plumbing — types and the per-call registration entry point.
//!
//! `executor-core` is responsible for building the per-invocation Rhai
//! engine and wiring stateful services into it. v1.1.0 ships the
//! shapes (`Services` bundle, `SdkCallCx`, `register_all` entry point)
//! but no actual services — subsequent v1.1.x PRs (KV in v1.1.1,
//! docs in v1.1.2, …) extend `register_all` rather than re-threading
//! plumbing through `engine.rs`.
//!
//! Bridge functions (`json_to_dynamic` / `dynamic_to_json`) also live
//! here so service modules can convert values without `engine.rs`
//! being the only home for the conversion logic.
pub mod bridge;
pub mod cx;
pub use bridge::{dynamic_to_json, json_to_dynamic};
pub use cx::SdkCallCx;
use std::sync::Arc;
use picloud_shared::Services;
use rhai::Engine as RhaiEngine;
/// Single hook every v1.1.x stateful service registers into. Called
/// once per invocation, just after `build_engine` constructs the
/// sandboxed Rhai engine and just before script compilation.
///
/// v1.1.0 ships an intentionally empty body — the call site exists so
/// future PRs (KV first) drop their registration logic here rather
/// than reaching into `engine.rs::build_engine`. The signature is
/// locked: subsequent PRs MUST keep the same parameter shape so that
/// hosts don't have to re-thread the plumbing.
pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
// Intentionally inert in v1.1.0. The unused-suppression below is a
// load-bearing placeholder: future PRs replace this `let _` with
// real `register_kv(engine, services, cx.clone())` calls etc.
let _ = (engine, services, cx);
}

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
@@ -50,6 +50,35 @@ pub struct ExecRequest {
/// override) before the Rhai engine is built. /// override) before the Rhai engine is built.
#[serde(default)] #[serde(default)]
pub sandbox_overrides: ScriptSandbox, pub sandbox_overrides: ScriptSandbox,
/// Owning application. Source of truth for every `(app_id, …)`
/// storage lookup the script makes via stateful SDK services.
/// Internal-only; not surfaced via `ctx` (which the script sees).
pub app_id: AppId,
/// Caller identity, when authenticated. `None` for unauthenticated
/// data-plane HTTP requests (the common case for public scripts);
/// `Some` when a bearer token or session cookie was resolved.
/// Internal-only — exposed via `SdkCallCx` to service trait impls.
///
/// `#[serde(skip)]`: `ExecRequest` is serializable so cluster mode
/// (v1.3+) can ship invocations to remote executors over HTTP, but
/// `Principal` has no wire derivation today. Skipping here keeps
/// v1.1.0 compiling; the cluster-mode PR will introduce a wire-safe
/// snapshot then.
#[serde(skip)]
pub principal: Option<Principal>,
/// Triggers-framework depth. `0` for direct invocations. The
/// dispatcher (v1.1.1) increments on each indirection to bound
/// runaway feedback loops.
#[serde(default)]
pub trigger_depth: u32,
/// Originating execution id of a trigger chain. Equal to
/// `execution_id` for direct invocations; preserves the root
/// across fan-out for audit log grouping.
pub root_execution_id: ExecutionId,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@@ -100,4 +129,11 @@ pub enum ExecError {
#[error("script runtime error: {0}")] #[error("script runtime error: {0}")]
Runtime(String), Runtime(String),
/// Concurrency gate (orchestrator-core::ExecutionGate) refused
/// admission. Surfaced as HTTP 503 with a `Retry-After` header.
/// The gate enforces a global cap so a script storm can't park
/// every blocking thread.
#[error("execution declined: server at capacity (retry after {retry_after_secs}s)")]
Overloaded { retry_after_secs: u32 },
} }

View File

@@ -1,12 +1,13 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel}; use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services};
use serde_json::json; use serde_json::json;
fn req(body: serde_json::Value) -> ExecRequest { fn req(body: serde_json::Value) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest { ExecRequest {
execution_id: ExecutionId::new(), execution_id,
request_id: RequestId::new(), request_id: RequestId::new(),
script_id: ScriptId::new(), script_id: ScriptId::new(),
script_name: "test".into(), script_name: "test".into(),
@@ -18,11 +19,15 @@ fn req(body: serde_json::Value) -> ExecRequest {
query: BTreeMap::new(), query: BTreeMap::new(),
rest: String::new(), rest: String::new(),
sandbox_overrides: ScriptSandbox::default(), sandbox_overrides: ScriptSandbox::default(),
app_id: AppId::new(),
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
} }
} }
fn engine() -> Engine { fn engine() -> Engine {
Engine::new(Limits::default()) Engine::new(Limits::default(), Services::new())
} }
#[test] #[test]
@@ -121,7 +126,7 @@ fn enforces_operation_budget() {
max_operations: 1_000, max_operations: 1_000,
..Limits::default() ..Limits::default()
}; };
let engine = Engine::new(limits); let engine = Engine::new(limits, Services::new());
// 10_000 iterations vastly exceeds 1_000 ops. // 10_000 iterations vastly exceeds 1_000 ops.
let src = r"let n = 0; for i in 0..10000 { n += 1; } n"; let src = r"let n = 0; for i in 0..10000 { n += 1; } n";
let err = engine let err = engine

View File

@@ -23,7 +23,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits, LogLevel}; use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits, LogLevel};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services};
use serde_json::{json, Value}; use serde_json::{json, Value};
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@@ -31,12 +31,13 @@ use serde_json::{json, Value};
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
fn engine() -> Engine { fn engine() -> Engine {
Engine::new(Limits::default()) Engine::new(Limits::default(), Services::new())
} }
fn baseline_request() -> ExecRequest { fn baseline_request() -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest { ExecRequest {
execution_id: ExecutionId::new(), execution_id,
request_id: RequestId::new(), request_id: RequestId::new(),
script_id: ScriptId::new(), script_id: ScriptId::new(),
script_name: "contract".into(), script_name: "contract".into(),
@@ -48,6 +49,10 @@ fn baseline_request() -> ExecRequest {
query: BTreeMap::new(), query: BTreeMap::new(),
rest: String::new(), rest: String::new(),
sandbox_overrides: ScriptSandbox::default(), sandbox_overrides: ScriptSandbox::default(),
app_id: AppId::new(),
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
} }
} }

View File

@@ -100,6 +100,35 @@ pub async fn require_admin(state: State<AuthState>, req: Request<Body>, next: Ne
require_authenticated(state, req, next).await require_authenticated(state, req, next).await
} }
/// Opportunistic data-plane variant: always inserts an
/// `Extension<Option<Principal>>` and forwards the request. Used on
/// `/execute/{id}` and the user-route fallback, where most invocations
/// are anonymous public HTTP and the few authed ones (dashboard
/// test-runs, API keys) should still let scripts see the caller via
/// `cx.principal` once services consume it.
///
/// Failure modes — all degrade to `None` rather than rejecting:
/// * No bearer / cookie → `None`.
/// * Malformed or unknown token → `None`.
/// * DB blip while resolving → `None` (fail-open; the data plane
/// should not 500 on transient infra failures for an *optional*
/// identity check).
///
/// Admin-side routes that REQUIRE an identity keep using
/// `require_authenticated`.
pub async fn attach_principal_if_present(
State(state): State<AuthState>,
mut req: Request<Body>,
next: Next,
) -> Response {
let principal: Option<Principal> = match extract_token(&req) {
Some(token) => resolve_principal(&state, &token).await.unwrap_or(None),
None => None,
};
req.extensions_mut().insert(principal);
next.run(req).await
}
/// Decide whether the token is an API key (pic_ prefix) or a session /// Decide whether the token is an API key (pic_ prefix) or a session
/// token, then resolve the corresponding `Principal`. `Ok(None)` /// token, then resolve the corresponding `Principal`. `Ok(None)`
/// means the token was structurally valid but didn't match any active /// means the token was structurally valid but didn't match any active

View File

@@ -59,8 +59,8 @@ pub use auth_bootstrap::{
}; };
#[allow(deprecated)] #[allow(deprecated)]
pub use auth_middleware::{ pub use auth_middleware::{
require_admin, require_authenticated, AuthState, AuthedAdmin, API_KEY_PREFIX, attach_principal_if_present, require_admin, require_authenticated, AuthState, AuthedAdmin,
API_KEY_PREFIX_LEN, SESSION_COOKIE, API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE,
}; };
pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision}; pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision};
pub use log_sink::PostgresExecutionLogSink; pub use log_sink::PostgresExecutionLogSink;

View File

@@ -12,12 +12,13 @@ use axum::{
http::{HeaderMap, HeaderName, HeaderValue, StatusCode}, http::{HeaderMap, HeaderName, HeaderValue, StatusCode},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
routing::post, routing::post,
Json, Router, Extension, Json, Router,
}; };
use chrono::Utc; use chrono::Utc;
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
use picloud_shared::{ use picloud_shared::{
AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId, AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, Principal, RequestId,
ScriptId,
}; };
use serde_json::Value as Json_; use serde_json::Value as Json_;
use uuid::Uuid; use uuid::Uuid;
@@ -54,6 +55,11 @@ impl<E, R> Clone for DataPlaneState<E, R> {
/// Build the data-plane router. Handles `POST /execute/:id` — the /// Build the data-plane router. Handles `POST /execute/:id` — the
/// always-available ID-based bypass. /// always-available ID-based bypass.
///
/// Handlers expect an `Extension<Option<Principal>>` to be attached by
/// upstream middleware (`manager-core::attach_principal_if_present`);
/// requests without that extension panic at extraction time. The
/// picloud binary wires this in `build_app`.
pub fn data_plane_router<E, R>(state: DataPlaneState<E, R>) -> Router pub fn data_plane_router<E, R>(state: DataPlaneState<E, R>) -> Router
where where
E: ExecutorClient + 'static, E: ExecutorClient + 'static,
@@ -67,6 +73,10 @@ where
/// Build a router that handles ALL paths via the user-defined routing /// Build a router that handles ALL paths via the user-defined routing
/// table. Intended to be merged into the picloud app router as a /// table. Intended to be merged into the picloud app router as a
/// fallback (after the system routes are mounted). /// fallback (after the system routes are mounted).
///
/// Same middleware expectation as `data_plane_router` — wrap with
/// `attach_principal_if_present` so handlers can extract
/// `Extension<Option<Principal>>`.
pub fn user_routes_router<E, R>(state: DataPlaneState<E, R>) -> Router pub fn user_routes_router<E, R>(state: DataPlaneState<E, R>) -> Router
where where
E: ExecutorClient + 'static, E: ExecutorClient + 'static,
@@ -84,6 +94,7 @@ where
async fn execute_by_id<E, R>( async fn execute_by_id<E, R>(
State(state): State<DataPlaneState<E, R>>, State(state): State<DataPlaneState<E, R>>,
Path(id): Path<ScriptId>, Path(id): Path<ScriptId>,
Extension(principal): Extension<Option<Principal>>,
headers: HeaderMap, headers: HeaderMap,
body: Bytes, body: Bytes,
) -> Result<Response, ApiError> ) -> Result<Response, ApiError>
@@ -97,7 +108,7 @@ where
.await? .await?
.ok_or(ApiError::NotFound(id))?; .ok_or(ApiError::NotFound(id))?;
let mut req = build_exec_request(id, &script.name, &headers, &body)?; let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, principal)?;
req.sandbox_overrides = script.sandbox; req.sandbox_overrides = script.sandbox;
let request_id = req.request_id; let request_id = req.request_id;
let request_path = req.path.clone(); let request_path = req.path.clone();
@@ -133,6 +144,7 @@ where
async fn user_route_handler<E, R>( async fn user_route_handler<E, R>(
State(state): State<DataPlaneState<E, R>>, State(state): State<DataPlaneState<E, R>>,
Extension(principal): Extension<Option<Principal>>,
request: Request, request: Request,
) -> Result<Response, ApiError> ) -> Result<Response, ApiError>
where where
@@ -195,6 +207,8 @@ where
&script.name, &script.name,
&headers, &headers,
&body_bytes, &body_bytes,
app_id,
principal,
)?; )?;
req.path = path; req.path = path;
req.params = matched.params; req.params = matched.params;
@@ -264,6 +278,8 @@ fn build_exec_request(
name: &str, name: &str,
headers: &HeaderMap, headers: &HeaderMap,
body: &Bytes, body: &Bytes,
app_id: AppId,
principal: Option<Principal>,
) -> Result<ExecRequest, ApiError> { ) -> Result<ExecRequest, ApiError> {
let mut hmap = BTreeMap::new(); let mut hmap = BTreeMap::new();
for (k, v) in headers { for (k, v) in headers {
@@ -279,8 +295,9 @@ fn build_exec_request(
.map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))?
}; };
let execution_id = ExecutionId::new();
Ok(ExecRequest { Ok(ExecRequest {
execution_id: ExecutionId::new(), execution_id,
request_id: RequestId::new(), request_id: RequestId::new(),
script_id: id, script_id: id,
script_name: name.to_string(), script_name: name.to_string(),
@@ -293,6 +310,13 @@ fn build_exec_request(
rest: String::new(), rest: String::new(),
// Overwritten by the handler after the script is resolved. // Overwritten by the handler after the script is resolved.
sandbox_overrides: picloud_shared::ScriptSandbox::default(), sandbox_overrides: picloud_shared::ScriptSandbox::default(),
app_id,
principal,
// Direct invocations are at depth 0 with a self-referential
// root. The triggers framework (v1.1.1) increments depth and
// preserves the original root for chained executions.
trigger_depth: 0,
root_execution_id: execution_id,
}) })
} }
@@ -396,7 +420,22 @@ pub enum ApiError {
impl IntoResponse for ApiError { impl IntoResponse for ApiError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
// Overloaded is the only variant that needs to attach an HTTP
// header (Retry-After), so it short-circuits the (status, body)
// reduction below. Axum's tuple builder makes per-arm header
// injection awkward otherwise.
use ApiError as E; use ApiError as E;
if let E::Exec(ExecError::Overloaded { retry_after_secs }) = &self {
let retry = retry_after_secs.to_string();
let body = Json(serde_json::json!({ "error": self.to_string() }));
return (
StatusCode::SERVICE_UNAVAILABLE,
[(axum::http::header::RETRY_AFTER, retry)],
body,
)
.into_response();
}
let (status, message) = match &self { let (status, message) = match &self {
E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()), E::BadRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()),
@@ -416,6 +455,7 @@ impl IntoResponse for ApiError {
(StatusCode::INSUFFICIENT_STORAGE, e.to_string()) (StatusCode::INSUFFICIENT_STORAGE, e.to_string())
} }
ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()), ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()),
ExecError::Overloaded { .. } => unreachable!("handled above"),
}, },
}; };
(status, Json(serde_json::json!({ "error": message }))).into_response() (status, Json(serde_json::json!({ "error": message }))).into_response()

View File

@@ -4,6 +4,8 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse}; use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse};
use crate::gate::{AcquireError, ExecutionGate};
/// Maximum wall-clock time we'll wait for a single invocation, regardless /// Maximum wall-clock time we'll wait for a single invocation, regardless
/// of the per-script `timeout_seconds`. Provides a hard ceiling on /// of the per-script `timeout_seconds`. Provides a hard ceiling on
/// resource usage independent of misconfigured scripts. /// resource usage independent of misconfigured scripts.
@@ -30,14 +32,19 @@ pub trait ExecutorClient: Send + Sync {
/// `executor-core::Engine::execute` is synchronous; we offload it to a /// `executor-core::Engine::execute` is synchronous; we offload it to a
/// blocking thread so it doesn't park a Tokio worker, and apply the /// blocking thread so it doesn't park a Tokio worker, and apply the
/// wall-clock timeout here. /// wall-clock timeout here.
///
/// Holds an `ExecutionGate` and acquires a permit before `spawn_blocking`
/// so a script storm can't drain the blocking-thread pool. The permit
/// drops with the future, returning the slot.
pub struct LocalExecutorClient { pub struct LocalExecutorClient {
engine: Arc<Engine>, engine: Arc<Engine>,
gate: Arc<ExecutionGate>,
} }
impl LocalExecutorClient { impl LocalExecutorClient {
#[must_use] #[must_use]
pub fn new(engine: Arc<Engine>) -> Self { pub fn new(engine: Arc<Engine>, gate: Arc<ExecutionGate>) -> Self {
Self { engine } Self { engine, gate }
} }
} }
@@ -49,6 +56,24 @@ impl ExecutorClient for LocalExecutorClient {
req: ExecRequest, req: ExecRequest,
timeout: Duration, timeout: Duration,
) -> Result<ExecResponse, ExecError> { ) -> Result<ExecResponse, ExecError> {
// Acquire before spending any wall-clock budget. The permit is
// held by this future; on `tokio::time::timeout` firing, the
// future drops and the permit returns to the pool — but the
// detached `spawn_blocking` thread keeps running until the
// Rhai script finishes (or panics). So in-use blocking threads
// can briefly exceed the gate's permit count after a timeout.
// That is intentional: a new admission can be served while the
// already-doomed script winds down, which is preferable to
// wedging the slot for the worst-case timeout duration.
let _permit =
self.gate
.try_acquire()
.map_err(
|AcquireError::Overloaded { retry_after_secs }| ExecError::Overloaded {
retry_after_secs,
},
)?;
let timeout = timeout.min(HARD_TIMEOUT_CAP); let timeout = timeout.min(HARD_TIMEOUT_CAP);
let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX); let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX);

View File

@@ -0,0 +1,155 @@
//! Global concurrency gate for the data plane.
//!
//! Wraps a single `tokio::sync::Semaphore` so the executor can refuse
//! admission immediately when too many invocations are already in
//! flight. Designed for v1.1.0's single-node MVP — one cap across all
//! apps and scripts. Per-app or per-script caps come later when a real
//! workload surfaces the need.
//!
//! Policy: **non-blocking, no queue**. If a permit isn't free right
//! now, the call returns `AcquireError::Overloaded` and the data-plane
//! HTTP layer translates that to a 503 with `Retry-After: 1`. Pushing
//! back hard beats letting requests pile up against a finite pool of
//! blocking threads (executor work runs under `spawn_blocking`).
//!
//! Configured via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var.
//! Default is 32 — comfortable for a single-node Pi, low enough that
//! a script storm doesn't park every blocking thread.
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
/// Env var consulted by `from_env`.
pub const ENV_MAX_CONCURRENT: &str = "PICLOUD_MAX_CONCURRENT_EXECUTIONS";
/// Default cap when the env var is unset or invalid.
pub const DEFAULT_MAX_CONCURRENT: u32 = 32;
/// `Retry-After` header value (seconds) returned alongside the 503
/// when the gate refuses. Fixed for v1.1.0; later versions may compute
/// a smarter value from in-flight latency.
pub const DEFAULT_RETRY_AFTER_SECS: u32 = 1;
/// Refused admission. The HTTP layer translates this to 503 with a
/// `Retry-After` header.
#[derive(Debug, Error)]
pub enum AcquireError {
#[error("at capacity (retry after {retry_after_secs}s)")]
Overloaded { retry_after_secs: u32 },
}
/// Global execution gate. Constructed once at orchestrator startup and
/// shared via `Arc`. Holds an inner `Arc<Semaphore>` so permits are
/// owned (they release on drop independent of the gate's lifetime).
pub struct ExecutionGate {
permits: Arc<Semaphore>,
max_permits: u32,
}
impl ExecutionGate {
/// Construct with an explicit cap. Mostly for tests; production
/// uses `from_env`.
#[must_use]
pub fn new(max_permits: u32) -> Self {
Self {
permits: Arc::new(Semaphore::new(max_permits as usize)),
max_permits,
}
}
/// Read `PICLOUD_MAX_CONCURRENT_EXECUTIONS` from the environment.
/// Falls back to `DEFAULT_MAX_CONCURRENT` on absence; warns and
/// falls back on parse failure or non-positive value. Mirrors the
/// `SandboxCeiling::from_env` ergonomics so operators see a
/// consistent shape across the env-tunables.
#[must_use]
pub fn from_env() -> Self {
let max = match std::env::var(ENV_MAX_CONCURRENT) {
Err(_) => DEFAULT_MAX_CONCURRENT,
Ok(v) => match v.parse::<u32>() {
Ok(n) if n > 0 => n,
Ok(_) => {
tracing::warn!(
env = ENV_MAX_CONCURRENT,
value = %v,
"value must be > 0; using default {DEFAULT_MAX_CONCURRENT}"
);
DEFAULT_MAX_CONCURRENT
}
Err(e) => {
tracing::warn!(
env = ENV_MAX_CONCURRENT,
value = %v,
error = %e,
"invalid value; using default {DEFAULT_MAX_CONCURRENT}"
);
DEFAULT_MAX_CONCURRENT
}
},
};
Self::new(max)
}
/// Maximum concurrent permits this gate was configured for. Useful
/// for diagnostics / future metrics.
#[must_use]
pub fn max_permits(&self) -> u32 {
self.max_permits
}
/// Non-blocking permit acquisition. Returns the owned permit on
/// success (drop releases the slot) or `AcquireError::Overloaded`
/// when saturated. Sync because the semaphore's non-blocking try is
/// sync — no runtime hop needed.
pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.permits
.clone()
.try_acquire_owned()
.map_err(|err| match err {
TryAcquireError::NoPermits | TryAcquireError::Closed => AcquireError::Overloaded {
retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
},
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn acquire_succeeds_under_capacity() {
let gate = ExecutionGate::new(2);
let _p1 = gate.try_acquire().expect("first permit available");
let _p2 = gate.try_acquire().expect("second permit available");
}
#[test]
fn acquire_overloaded_when_saturated() {
let gate = ExecutionGate::new(1);
let _p = gate.try_acquire().expect("first permit available");
let AcquireError::Overloaded { retry_after_secs } = gate
.try_acquire()
.expect_err("second permit must be refused");
assert!(retry_after_secs > 0, "retry-after must be positive");
}
#[test]
fn permit_drop_releases_slot() {
let gate = ExecutionGate::new(1);
{
let _p = gate.try_acquire().expect("first permit available");
}
let _ = gate
.try_acquire()
.expect("slot must be returned after permit drops");
}
#[test]
fn max_permits_exposed() {
let gate = ExecutionGate::new(7);
assert_eq!(gate.max_permits(), 7);
}
}

View File

@@ -10,9 +10,11 @@
pub mod api; pub mod api;
pub mod client; pub mod client;
pub mod gate;
pub mod resolver; pub mod resolver;
pub mod routing; pub mod routing;
pub use api::{data_plane_router, user_routes_router, DataPlaneState}; pub use api::{data_plane_router, user_routes_router, DataPlaneState};
pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient}; pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient};
pub use gate::{AcquireError, ExecutionGate};
pub use resolver::{ResolverError, ScriptResolver}; pub use resolver::{ResolverError, ScriptResolver};

View File

@@ -11,21 +11,22 @@ use axum::{routing::get, Json, Router};
use picloud_executor_core::{Engine, Limits}; use picloud_executor_core::{Engine, Limits};
use picloud_manager_core::{ use picloud_manager_core::{
admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router,
auth_router, compile_routes, migrations, require_authenticated, route_admin_router, attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated,
AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, route_admin_router, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState,
ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository, ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState,
AppsState, AuthState, AuthzRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, AppRepository, AppsState, AuthState, AuthzRepo, PostgresAdminSessionRepository,
PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, PostgresAppDomainRepository,
PostgresAppRepository, PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresAppMembersRepository, PostgresAppRepository, PostgresExecutionLogRepository,
PostgresRouteRepository, PostgresScriptRepository, RepoResolver, RouteAdminState, PostgresExecutionLogSink, PostgresRouteRepository, PostgresScriptRepository, RepoResolver,
RouteRepository, SandboxCeiling, RouteAdminState, RouteRepository, SandboxCeiling,
}; };
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
use picloud_orchestrator_core::{ use picloud_orchestrator_core::{
data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient, data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient,
}; };
use picloud_shared::{ use picloud_shared::{
ExecutionLogSink, ScriptValidator, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
WIRE_VERSION,
}; };
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool; use sqlx::PgPool;
@@ -82,7 +83,9 @@ fn read_session_ttl() -> Duration {
/// `/version`) stays open — it's the public ingress for user scripts. /// `/version`) stays open — it's the public ingress for user scripts.
#[allow(clippy::too_many_lines)] #[allow(clippy::too_many_lines)]
pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> { pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
let engine = Arc::new(Engine::new(Limits::default())); // `Services` is the SDK service bundle. Empty in v1.1.0; the
// v1.1.1 KV PR will populate it with `kv: Arc::new(...)` here.
let engine = Arc::new(Engine::new(Limits::default(), Services::new()));
let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone())); let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone()));
let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone())); let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone()));
@@ -126,7 +129,10 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle( let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(
script_repo.clone(), script_repo.clone(),
))); )));
let executor = Arc::new(LocalExecutorClient::new(engine.clone())); // Single global gate — overflow is rejected with 503 + Retry-After.
// See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`.
let gate = Arc::new(ExecutionGate::from_env());
let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate));
let admin = AdminState { let admin = AdminState {
repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())), repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
@@ -200,16 +206,31 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
// facade above; the bare module path is retained so it's discoverable. // facade above; the bare module path is retained so it's discoverable.
let _ = apps_api::AppsState::clone; let _ = apps_api::AppsState::clone;
// Opportunistic principal extraction on every data-plane request.
// Always inserts `Extension<Option<Principal>>`: Some for authed
// ingress (bearer / cookie), None otherwise. Handlers depend on
// this layer being applied — scoped to the data-plane routers so
// the admin path (which uses `require_authenticated`) doesn't
// double-resolve the same token.
let data_plane_routed = data_plane_router(data_plane.clone()).layer(from_fn_with_state(
auth_state.clone(),
attach_principal_if_present,
));
let user_routes = user_routes_router(data_plane).layer(from_fn_with_state(
auth_state.clone(),
attach_principal_if_present,
));
let api_v1 = Router::new() let api_v1 = Router::new()
.nest("/admin", auth_router(auth_state)) .nest("/admin", auth_router(auth_state))
.nest("/admin", guarded_admin) .nest("/admin", guarded_admin)
.merge(data_plane_router(data_plane.clone())); .merge(data_plane_routed);
Ok(Router::new() Ok(Router::new()
.route("/healthz", get(healthz)) .route("/healthz", get(healthz))
.route("/version", get(version)) .route("/version", get(version))
.nest(&format!("/api/v{API_VERSION}"), api_v1) .nest(&format!("/api/v{API_VERSION}"), api_v1)
.merge(user_routes_router(data_plane)) .merge(user_routes)
.layer(TraceLayer::new_for_http())) .layer(TraceLayer::new_for_http()))
} }

119
crates/shared/src/events.rs Normal file
View File

@@ -0,0 +1,119 @@
//! `ServiceEventEmitter` — the contract every stateful SDK service uses
//! to publish events into the (future) triggers framework.
//!
//! v1.1.0 ships only the trait shape and a `NoopEventEmitter` that
//! drops every event. The real outbox-backed implementation lands with
//! the triggers PR in v1.1.1; locking the trait now means services
//! written in subsequent v1.1.x PRs (KV, docs, files, …) don't have to
//! re-thread their plumbing when the dispatcher arrives.
//!
//! Design rationale (full discussion: `docs/sdk-shape.md`):
//! * Async — outbox writes hit Postgres.
//! * Cx is passed in so the emitter can attribute the event to the
//! `app_id` / `principal` / `execution_id` that produced it.
//! * Events carry their semantic identity (`source` + `op`) plus
//! optional locator (`collection` + `key`) and optional payloads
//! (`payload` for the new value, `old_payload` for the previous on
//! updates). The dispatcher matches on (source, op, collection)
//! filters to decide which scripts to fan out to.
use async_trait::async_trait;
use thiserror::Error;
use crate::SdkCallCx;
/// Trait every stateful service depends on to emit events. The host
/// binary constructs one instance and clones the Arc into each service.
#[async_trait]
pub trait ServiceEventEmitter: Send + Sync {
/// Publish a single event. Implementations are expected to be
/// fire-and-forget from the caller's perspective: the outbox impl
/// will return `Ok(())` once the event is durably persisted, the
/// dispatcher reads it out-of-band.
async fn emit(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError>;
}
/// One service event. `source` and `op` are `&'static str` because they
/// come from a fixed enumeration baked into each service (`"kv"` +
/// `"insert"`/`"update"`/`"delete"`, etc.) — never from user data.
/// `collection`/`key`/payloads come from user data and are owned.
#[derive(Debug, Clone)]
pub struct ServiceEvent {
/// Service namespace. Matches the Rhai module name: `"kv"`,
/// `"docs"`, `"files"`, etc.
pub source: &'static str,
/// Operation verb. Each service defines its own vocabulary;
/// dispatcher filters match on the literal string.
pub op: &'static str,
/// Affected collection, when the service is collection-scoped
/// (`kv`, `docs`, `files`). `None` for collection-less events.
pub collection: Option<String>,
/// Affected key/id within the collection, when applicable.
pub key: Option<String>,
/// New value after the operation, when carrying it is cheap and
/// useful. `None` for deletes.
pub payload: Option<serde_json::Value>,
/// Previous value before the operation, populated on `update` /
/// `delete` so triggers can diff. `None` on `insert`.
pub old_payload: Option<serde_json::Value>,
}
/// Errors an emitter can surface upward. The noop impl never returns
/// these; the v1.1.1 outbox impl uses `Unavailable` for pool/connection
/// failures and `Rejected` for malformed payloads (e.g. event JSON too
/// large for the outbox row).
#[derive(Debug, Error)]
pub enum EmitError {
#[error("event sink unavailable: {0}")]
Unavailable(String),
#[error("event sink rejected event: {0}")]
Rejected(String),
}
/// Default emitter for v1.1.0. Accepts every event, persists nothing,
/// always returns `Ok(())`. Wired in the picloud binary; the v1.1.1
/// triggers PR swaps this for a Postgres outbox writer.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopEventEmitter;
#[async_trait]
impl ServiceEventEmitter for NoopEventEmitter {
async fn emit(&self, _cx: &SdkCallCx, _event: ServiceEvent) -> Result<(), EmitError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
// Compile-time check that ServiceEventEmitter is dyn-safe — every
// service holds it as `Arc<dyn ServiceEventEmitter>` and would
// silently break the workspace if a non-object-safe method snuck
// in. Behavioural tests for the noop impl come for free once a
// service exercises it (v1.1.1+); avoid pulling tokio into
// `picloud-shared` just for a one-line `emit().await` check.
#[allow(dead_code)]
fn assert_dyn_compatible(_e: &dyn ServiceEventEmitter) {}
#[test]
fn service_event_construction_is_explicit() {
// Pin the field layout so a re-ordering in a future PR causes a
// compile failure here rather than silently misattributing
// events. Hash-derive isn't appropriate (serde_json::Value isn't
// Hash), so structural construction is the assertion.
let _ = ServiceEvent {
source: "kv",
op: "insert",
collection: Some("widgets".into()),
key: Some("k1".into()),
payload: Some(serde_json::json!({"v": 1})),
old_payload: None,
};
}
}

View File

@@ -7,23 +7,29 @@
pub mod app; pub mod app;
pub mod auth; pub mod auth;
pub mod error; pub mod error;
pub mod events;
pub mod execution_log; pub mod execution_log;
pub mod ids; pub mod ids;
pub mod log_sink; pub mod log_sink;
pub mod route; pub mod route;
pub mod sandbox; pub mod sandbox;
pub mod script; pub mod script;
pub mod sdk_cx;
pub mod services;
pub mod validator; pub mod validator;
pub mod version; pub mod version;
pub use app::{App, AppDomain, DomainShape}; pub use app::{App, AppDomain, DomainShape};
pub use auth::{AppRole, InstanceRole, Principal, Scope, UserId}; pub use auth::{AppRole, InstanceRole, Principal, Scope, UserId};
pub use error::Error; pub use error::Error;
pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter};
pub use execution_log::{ExecutionLog, ExecutionStatus}; pub use execution_log::{ExecutionLog, ExecutionStatus};
pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId}; pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId};
pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use log_sink::{ExecutionLogSink, LogSinkError};
pub use route::{HostKind, PathKind, Route}; pub use route::{HostKind, PathKind, Route};
pub use sandbox::ScriptSandbox; pub use sandbox::ScriptSandbox;
pub use script::Script; pub use script::Script;
pub use sdk_cx::SdkCallCx;
pub use services::Services;
pub use validator::{ScriptValidator, ValidationError}; pub use validator::{ScriptValidator, ValidationError};
pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION}; pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION};

View File

@@ -0,0 +1,54 @@
//! `SdkCallCx` — per-call context every stateful SDK service receives.
//!
//! Service trait methods (added by subsequent v1.1.x PRs starting with
//! KV) all take `&SdkCallCx` so they can:
//! * scope by `app_id` for cross-app isolation,
//! * audit `principal` when authenticated,
//! * carry `execution_id` / `request_id` into emitted events,
//! * bound trigger chains via `trigger_depth` / `root_execution_id`.
//!
//! The struct lives in `picloud-shared` (not `executor-core`) because
//! future service impls live in `manager-core` and the trait that hands
//! the cx in is shared by both sides. Pure value type — no handles, no
//! DB pool references, no allocations beyond what's in `Principal`.
use crate::{AppId, ExecutionId, Principal, RequestId};
/// Per-invocation context for every stateful SDK service call.
///
/// Constructed once at the start of an invocation by `executor-core`
/// from the incoming `ExecRequest`, then handed (by reference) to every
/// service trait method the script triggers during execution. Services
/// MUST derive `app_id` from this struct — never from script-passed
/// arguments — to preserve cross-app isolation.
#[derive(Debug, Clone)]
pub struct SdkCallCx {
/// Owning application for this invocation. Source of truth for
/// every `(app_id, …)` storage lookup the script makes.
pub app_id: AppId,
/// Caller identity, when authenticated. `None` for unauthenticated
/// data-plane HTTP requests (the common case for public endpoints);
/// `Some` when the call came in via the dashboard, an API key, or a
/// future authed surface.
pub principal: Option<Principal>,
/// Unique id for THIS execution. Matches `ExecRequest.execution_id`.
pub execution_id: ExecutionId,
/// Unique id for the ingress request that started the chain. The
/// same `request_id` is shared across every execution triggered by
/// the same request (direct + trigger fan-out).
pub request_id: RequestId,
/// `0` for direct invocations (HTTP request, manual run). Each
/// indirect invocation through the triggers framework (v1.1.1)
/// increments this; the dispatcher rejects beyond a configured
/// ceiling to prevent runaway feedback loops.
pub trigger_depth: u32,
/// `== execution_id` when `trigger_depth == 0`; otherwise the
/// `execution_id` of the original ingress execution. Lets the audit
/// log group every fan-out execution under the originating event.
pub root_execution_id: ExecutionId,
}

View File

@@ -0,0 +1,38 @@
//! `Services` — bundle of stateful SDK service handles plumbed from the
//! host binary into every Rhai execution.
//!
//! v1.1.0 ships this struct empty. Subsequent PRs in the v1.1.x series
//! add one field per service:
//!
//! ```ignore
//! pub kv: Arc<dyn KvService>, // v1.1.1
//! pub docs: Arc<dyn DocsService>, // v1.1.2
//! pub http: Arc<dyn HttpService>, // v1.1.4
//! // …
//! ```
//!
//! The bundle is cheap to clone (`Arc` per service) and is constructed
//! once at startup in the picloud binary. The executor takes it by
//! reference per invocation, hands it (alongside an `SdkCallCx`) to
//! `executor-core::sdk::register_all`, which wires the corresponding
//! Rhai `::` namespace per service.
//!
//! `#[non_exhaustive]` so adding fields is a non-breaking change for
//! consumers that only *pattern-match* a `&Services`; only crates that
//! *construct* a `Services` (in practice, just the picloud binary) need
//! to update their constructor when new services land.
/// SDK service bundle. See module docs for the lifecycle and the v1.1.x
/// expansion plan.
#[non_exhaustive]
#[derive(Default)]
pub struct Services {}
impl Services {
/// Construct an empty bundle. Replaced by a fielded `::new(...)`
/// once the first service (KV, v1.1.1) lands.
#[must_use]
pub fn new() -> Self {
Self {}
}
}

227
docs/sdk-shape.md Normal file
View File

@@ -0,0 +1,227 @@
# SDK shape (v1.1.x stateful services)
This document describes the architectural shape every v1.1.x SDK
service follows. It is **not** a feature reference for any particular
service — those live in their own docs as each PR lands (KV in v1.1.1,
docs in v1.1.2, …). What follows is the contract those PRs implement
against, so the surface stays consistent and the build doesn't drift.
The shape was laid down in v1.1.0 (the SDK foundation PR). If you find
yourself re-litigating any of it inside a service PR, push back and
update this doc explicitly first.
## Two kinds of Rhai modules
**Stateless utility modules** (regex, time, json, base64, hex, url —
landing as v1.1.0's stdlib PR) are registered once at engine build.
They have no per-call state and no cross-app sensitivity. Implementation
goes in `executor-core::engine::build_engine` next to the existing
`log::` registration. They use Rhai's `register_static_module`.
**Stateful service modules** (kv, docs, http, cron, files, pubsub,
secrets, email, users, queue, invoke) are registered **per call** by
`executor-core::sdk::register_all`. They need:
- A service handle bundled in `picloud_shared::Services` (constructed
once at startup, cloned cheaply per call).
- A per-call `SdkCallCx` carrying the calling app, principal,
execution ids, and trigger depth.
- Closures that capture both, registered as Rhai native functions
inside a per-call `rhai::Module`.
Mixing the two categories in one module is wrong — services that
internally consult per-call context are stateful, period.
## `::` namespace style
Every SDK module exposes itself under a `::` namespace, mirroring the
existing `log::`:
```rhai
log::info("hello"); // v1.0 — present
let value = kv::collection("widgets").get("k"); // v1.1.1
let resp = http::get("https://example.com"); // v1.1.4
```
Dotted-object syntax (`kv.get("widgets", "k")`) is **not** used.
Rationale: `::` is consistent with Rust import syntax, doesn't
require a wrapper "module object" in Rhai's scope, and keeps the
module boundary obvious in scripts.
## Handle pattern for collection-scoped services
Services that operate on collections expose a **collection handle**
returned by an `::collection(name)` constructor:
```rhai
let widgets = kv::collection("widgets");
widgets.set("k", "v");
let v = widgets.get("k");
```
Not `kv::set("widgets", "k", "v")`. The handle is a Rhai custom type
the service registers; method calls bind to that type. This:
- Removes the "did I get the collection-name argument right?" foot-gun.
- Lets the implementation cache per-collection state on the handle
(prepared statements, connection affinity) without leaking that
into the call signature.
- Pre-empts the "collection is implicit" failure mode where two
services in the same script accidentally share a default collection.
`(app_id, collection, key)` is the identity tuple for KV; `(app_id,
collection, id)` for docs. Collections are **mandatory**, not optional
— even single-collection apps name their collection. The service layer
rejects requests with empty collection names.
## Error convention
- **Throw on failure.** `widgets.set("k", "v")` throws a Rhai runtime
error on any operational problem (DB unavailable, payload too large,
authz denied). Scripts opting into error handling use Rhai's
`try/catch`.
- **`()` for absent.** `widgets.get("missing")` returns `()` (Rhai
unit). Scripts test absence with `if v == () { ... }` or use the
matching `has(k)` predicate.
- **`bool` for predicates.** `widgets.has(k)` is the cheap existence
check that doesn't deserialize the value.
This convention is uniform across every v1.1.x service. Adding
`Result`-flavoured variants is a design departure that requires a doc
update before implementation.
## `SdkCallCx` and cross-app isolation
Every stateful service trait method takes `&SdkCallCx` as its first
non-self argument. The cx carries:
```rust
pub struct SdkCallCx {
pub app_id: AppId,
pub principal: Option<Principal>,
pub execution_id: ExecutionId,
pub request_id: RequestId,
pub trigger_depth: u32,
pub root_execution_id: ExecutionId,
}
```
**The service implementation MUST derive `app_id` from `cx.app_id`
never from a script-passed argument.** Scripts cannot name another
app's data, period. The closure registered into Rhai captures the
`Arc<SdkCallCx>` for the call; the script never sees or passes
`app_id`.
Why this matters: a `kv::set("widgets", "k", v)` call with a
script-supplied `app_id` would be a tenant-isolation vulnerability if
that arg ever leaked into the storage query. By deriving from the
host-attached cx, the service can't be tricked.
`principal` is `Option<Principal>` because the data plane is
unauthenticated by default — public HTTP scripts run with `None`.
Services that need an authenticated identity (e.g., `users::*`) check
`cx.principal.is_some()` and throw if missing.
## Sync ↔ async bridge
Rhai is synchronous; service trait methods (KV writes, HTTP calls) are
async. The bridge runs *inside the `spawn_blocking` thread* that
already wraps `Engine::execute` (orchestrator-core's
`LocalExecutorClient`):
```rust
// Inside a Rhai-registered closure.
let runtime = tokio::runtime::Handle::current();
let result = runtime.block_on(service.do_thing(&cx, args));
```
`Handle::current()` finds the same Tokio runtime that scheduled the
`spawn_blocking`, so the `block_on` doesn't construct a fresh runtime.
The thread is already off the async worker pool (that's what
`spawn_blocking` does), so blocking inside it is safe.
This pattern goes in every stateful service's registered Rhai closure.
The first service PR (KV, v1.1.1) lands a helper so subsequent services
don't reinvent the boilerplate.
## `ServiceEventEmitter`
Every stateful service that mutates data also emits events for the
(future) triggers framework:
```rust
emitter.emit(&cx, ServiceEvent {
source: "kv",
op: "insert",
collection: Some("widgets".into()),
key: Some("k".into()),
payload: Some(new_value_json),
old_payload: None,
}).await?;
```
v1.1.0 ships only `NoopEventEmitter`. The v1.1.1 triggers PR replaces
that with an outbox-backed implementation: events land in a Postgres
outbox table; a dispatcher worker reads them out-of-band, matches
against registered triggers, and fans out script executions. The
dispatcher enforces a depth limit via `cx.trigger_depth` so a
trigger-fires-its-own-trigger chain can't run away.
Services hold `Arc<dyn ServiceEventEmitter>` and emit unconditionally;
the noop drops events, the real impl persists them. From the service's
perspective the emission is fire-and-forget.
## `ExecutionGate` and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`
A single global semaphore caps concurrent script executions. Default
is 32; override via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var.
Acquisition is **non-blocking, no queue** — if a permit isn't free,
the request is refused immediately with HTTP 503 and a `Retry-After:
1` header.
Rationale: Rhai execution runs under `spawn_blocking`, which uses a
finite pool of blocking threads (defaults to 512 in current Tokio).
Without a cap, a script storm parks every blocking thread and starves
every other workload (DB writes, log sinks, audit emission). Hard
pushback is preferable to silent degradation.
Per-app or per-script caps are deferred until a real workload demands
them. The gate lives in `orchestrator-core::gate::ExecutionGate` and
is constructed once in the picloud binary's `build_app`.
## Registration: where future services hook in
```rust
// orchestrator-core / executor-core internal call path —
// you do not implement this; you implement registration helpers
// that future PRs call from here.
pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
// v1.1.1: register_kv(engine, services, cx.clone());
// v1.1.2: register_docs(engine, services, cx.clone());
// …
}
```
Each service PR adds:
1. A `Service` trait + impl in `manager-core` (since that's where the
DB-backed implementations live).
2. A field on `picloud_shared::Services` (`pub kv: Arc<dyn KvService>`).
3. A `register_kv` helper inside `executor-core::sdk::kv` that takes
the engine, the service, and the cx, then registers the Rhai
`::collection(...)` constructor and method bindings.
4. A new `Capability` variant in `manager-core::authz` (e.g.
`AppKvRead(AppId)`) and a check inside the service impl.
That sequence is the entire mechanical pattern; nothing here should
require architecture-level discussion past v1.1.0.
## What this doc does NOT cover
- Service-specific schemas (KV table layout, docs query DSL, etc.) —
in each service PR.
- Authentication and the admin auth model — see blueprint §11.5,
§11.6 and Phase 3.5.
- The trigger dispatch design (outbox row layout, fan-out semantics,
trigger CRUD endpoints) — comes with v1.1.1.
- Cluster mode considerations — deferred to v1.3+.

View File

@@ -661,7 +661,7 @@ users.set_permissions(user_id, {
|-------|-----------|-----------| |-------|-----------|-----------|
| **Orchestrator** | Rust + Axum | Performance, safety, async-first; minimal overhead | | **Orchestrator** | Rust + Axum | Performance, safety, async-first; minimal overhead |
| **Dashboard** | Alpine.js + vanilla HTML/CSS | Zero dependencies, simple to deploy, fast enough for MVP | | **Dashboard** | Alpine.js + vanilla HTML/CSS | Zero dependencies, simple to deploy, fast enough for MVP |
| **Database** | PostgreSQL + hstore | Robust ACID database; hstore extension for lightweight KV (v1.1) | | **Database** | PostgreSQL 15+ (`pgcrypto`) | Robust ACID database; JSONB carries data-plane values (v1.1+). See §8.1. |
| **Container Runtime** | Docker (Docker daemon) | Industry standard, simple CLI | | **Container Runtime** | Docker (Docker daemon) | Industry standard, simple CLI |
| **Executor Image** | Alpine Linux + Rhai | Minimal image size (~50-100MB), fast startup | | **Executor Image** | Alpine Linux + Rhai | Minimal image size (~50-100MB), fast startup |
| **Scripting** | Rhai | Lightweight, embedded-friendly, safe by default | | **Scripting** | Rhai | Lightweight, embedded-friendly, safe by default |
@@ -1022,9 +1022,9 @@ The scripts and routes endpoints keep their existing shape — this avoids forci
--- ---
## 11.6 Users, roles, and bearer-token auth (Phase 3.5) — Pending ## 11.6 Users, roles, and bearer-token auth (Phase 3.5) — ✓ Shipped
**Status**: pending. Targets `crates/manager-core/src/{authz,api_keys_api,api_key_repo}.rs`, an extended `auth_middleware.rs`, new shared types under `crates/shared/src/auth.rs`, migration `0006_users_authz.sql`. **Status**: shipped, ahead of the originally planned slot. Lives in `crates/manager-core/src/{authz,api_keys_api,api_key_repo}.rs`, the extended `auth_middleware.rs`, shared types under `crates/shared/src/auth.rs`, and migration `0006_users_authz.sql`. `can(principal, capability)` and `require(principal, capability)` are the single gate every admin handler goes through.
**Purpose**: bridge Phase 3b → Phase 4. Phase 4's v1.1 SDKs (KV, docs, HTTP, cron) each gate access on the calling principal. Without a real authorization model in place, every SDK addition has to either invent its own gate or stay open. Phase 3.5 lands `can(principal, capability)` as the single check every future SDK + admin endpoint goes through, so v1.1 work focuses on data plane shape, not on re-litigating auth. **Purpose**: bridge Phase 3b → Phase 4. Phase 4's v1.1 SDKs (KV, docs, HTTP, cron) each gate access on the calling principal. Without a real authorization model in place, every SDK addition has to either invent its own gate or stay open. Phase 3.5 lands `can(principal, capability)` as the single check every future SDK + admin endpoint goes through, so v1.1 work focuses on data plane shape, not on re-litigating auth.
@@ -1223,7 +1223,7 @@ Defer to follow-up sessions: dashboard surfaces for invites / key minting (curl
--- ---
### Phase 3: v1.0.x — Foundations (Current focus) ### Phase 3: v1.0.x — Foundations ✓ (Shipped)
Three foundation pieces that must land before the v1.1 service expansion, because retrofitting them later is expensive. Three foundation pieces that must land before the v1.1 service expansion, because retrofitting them later is expensive.
@@ -1231,24 +1231,27 @@ Three foundation pieces that must land before the v1.1 service expansion, becaus
**3b. Multi-app scoping** — ✓ shipped. See section 11.5. `apps`, `app_domains`, `app_slug_history` tables; `app_id` columns on `scripts`, `routes`, `execution_logs`. Migration assigns existing data to a `default` app and always claims `localhost`; a Rust-side bootstrap inserts a `Hello World` script + `/hello` route when the default app is empty. Orchestrator dispatch is two-phase (Host → app → route trie). `/api/v1/execute/{id}/*` continues to work without a public domain claim. Dashboard is app-hierarchical (`/admin/apps`, `/admin/apps/{slug}/...`); API stays flat with new endpoints under `/api/v1/admin/apps/*` and a `?app=` filter on script listing. Per-app admin roles deferred. **3b. Multi-app scoping** — ✓ shipped. See section 11.5. `apps`, `app_domains`, `app_slug_history` tables; `app_id` columns on `scripts`, `routes`, `execution_logs`. Migration assigns existing data to a `default` app and always claims `localhost`; a Rust-side bootstrap inserts a `Hello World` script + `/hello` route when the default app is empty. Orchestrator dispatch is two-phase (Host → app → route trie). `/api/v1/execute/{id}/*` continues to work without a public domain claim. Dashboard is app-hierarchical (`/admin/apps`, `/admin/apps/{slug}/...`); API stays flat with new endpoints under `/api/v1/admin/apps/*` and a `?app=` filter on script listing. Per-app admin roles deferred.
**3c. Users, roles, and bearer-token auth** — pending. See section 11.6. Adds `instance_role` to `admin_users` (`owner`/`admin`/`member`), `app_members` for per-app `app_admin`/`editor`/`viewer` grants, and `api_keys` for `Authorization: Bearer pic_…` credentials. Unifies cookie-session and API-key paths behind a single `can(principal, capability)` gate; list endpoints filter by membership at SQL for `member` users. Dashboard surfaces, invites, MFA, service accounts, and the `picloud` CLI binary are deferred — schema room only. **3c. Users, roles, and bearer-token auth (Phase 3.5)** — ✓ shipped. See section 11.6. Adds `instance_role` to `admin_users` (`owner`/`admin`/`member`), `app_members` for per-app `app_admin`/`editor`/`viewer` grants, and `api_keys` for `Authorization: Bearer pic_…` credentials. Unifies cookie-session and API-key paths behind a single `can(principal, capability)` gate; list endpoints filter by membership at SQL for `member` users. Dashboard surfaces, invites, MFA, service accounts, and the `picloud` CLI binary are deferred — schema room only.
**Why all three before v1.1**: every v1.1 service (KV, docs, users, etc.) needs both an `app_id` scoping key in its schema and a `Principal` to authorize against. Adding both now is one migration each on a small surface; adding them after the SDKs ship is many migrations on populated data plus a re-gate of every SDK call. **Why all three before v1.1**: every v1.1 service (KV, docs, users, etc.) needs both an `app_id` scoping key in its schema and a `Principal` to authorize against. Adding both now is one migration each on a small surface; adding them after the SDKs ship is many migrations on populated data plus a re-gate of every SDK call.
--- ---
### Phase 4: v1.1 (Expand Capabilities & Services) ### Phase 4: v1.1 (Expand Capabilities & Services) — Current focus
Ordered roughly by foundation value: each row enables the rows below it.
1. **Rhai SDK: KV Store** (`kv.get/set/delete/has` with collections, scoped per app) Released in patch steps (v1.1.0 → v1.1.8), each landing one focused capability. The split lets each release ship behind tests + docs without long-lived branches. SDK shape (handle pattern, `::` namespace, error convention, `ExecutionGate`, `SdkCallCx`, `ServiceEventEmitter` — see §7.5 and [docs/sdk-shape.md](../docs/sdk-shape.md)) is fixed in v1.1.0; every subsequent release fills in the contents without re-litigating the shape.
2. **Rhai SDK: Document Store** (`docs.create/find/update/delete/list/query`, scoped per app)
3. **Rhai SDK: HTTP** (`http.get/post/put/delete` with SSRF deny-list) | Version | Capability |
4. **Cron triggers** (manager scheduler skeleton already exists; needs schedules table + `FOR UPDATE SKIP LOCKED` dispatch) |---------|------------|
5. **Rhai SDK: Email** (`email.send` via SMTP; needs per-deploy config) | **v1.1.0** | **Foundation & Standard Library** — SDK shape (`Services` bundle, `SdkCallCx`, `ExecutionGate`, `ServiceEventEmitter` trait shape); stdlib utilities (regex, random, time, json, base64, hex, url). |
6. **Rhai SDK: User Management** (auth, CRUD, roles, permissions, invitations, password reset; depends on email for invites; scoped per app) | **v1.1.1** | **Storage & Events** — KV store keyed `(app_id, collection, key)`; triggers framework (outbox + dispatcher + trigger CRUD + `ctx.event` + depth limit); KV trigger kinds. |
7. **Queue triggers** (start with Postgres LISTEN/NOTIFY; RabbitMQ/Redis later if needed) | **v1.1.2** | **Documents**`docs::collection(name).create/find/update/delete/list` with `docs:*` triggers. |
8. **`invoke()` + `retry::*`** (function-to-function calls; execution_logs gain `parent_execution_id`) | **v1.1.3** | **Modules**`scripts.kind`, per-app resolver replaces `DummyModuleResolver`, AST cache + dep-graph invalidation. |
9. **Secrets management** (encrypted env vars, per app) | **v1.1.4** | **Outbound HTTP & Scheduled Tasks**`http::*` with SSRF deny-list; cron triggers. |
| **v1.1.5** | **Files & Messaging** — filesystem-backed blobs with `files:*` triggers; pub/sub via LISTEN/NOTIFY with `pubsub:*` triggers. |
| **v1.1.6** | **Configuration & Email** — encrypted per-app secrets; outbound `email::send` / `send_html` + inbound `email:receive` trigger. |
| **v1.1.7** | **User Management**`users::*` for in-script CRUD, auth, roles, invites, password reset. |
| **v1.1.8** | **Durable Queues & Function Composition**`queue::*` with `queue:receive` trigger; `invoke()` + `retry::*` (closures-as-args, re-entrant Rhai). |
--- ---
@@ -1309,59 +1312,71 @@ Ordered roughly by foundation value: each row enables the rows below it.
| **ctx** (global) | `ctx.execution_id`, `ctx.script_id`, `ctx.script_name`, `ctx.request_id`, `ctx.trace_id`, `ctx.invocation_type`, `ctx.parent_execution_id`, `ctx.request.path`, `ctx.request.headers`, `ctx.request.body` | MVP+ | | **ctx** (global) | `ctx.execution_id`, `ctx.script_id`, `ctx.script_name`, `ctx.request_id`, `ctx.trace_id`, `ctx.invocation_type`, `ctx.parent_execution_id`, `ctx.request.path`, `ctx.request.headers`, `ctx.request.body` | MVP+ |
| **Response** | Return `{ statusCode, headers?, body }` | MVP | | **Response** | Return `{ statusCode, headers?, body }` | MVP |
## 7.5 SDK Architecture (v1.1.x foundation)
Stateful Rhai SDK services (KV, docs, HTTP, …) hang off a common shape laid down by the v1.1.0 SDK foundation PR. Full reference lives in [docs/sdk-shape.md](../docs/sdk-shape.md); this section sketches the moving parts so other sections can refer to them by name.
**`Services` bundle** (`picloud_shared::Services`) — an `#[non_exhaustive]` struct constructed once at startup. v1.1.0 ships it empty; each subsequent v1.1.x PR adds one `Arc<dyn KvService>` / `Arc<dyn DocsService>` / … field. Held on `Engine`, passed by reference to the per-call registration hook.
**Per-call context** (`picloud_shared::SdkCallCx`) — every stateful service trait method takes `&SdkCallCx` as its first non-self argument. Carries `app_id`, `Option<Principal>`, `execution_id`, `request_id`, and the `trigger_depth` / `root_execution_id` slots that the triggers framework populates. Services derive `app_id` from the cx — never from script-passed args. **That rule is the cross-app isolation boundary**; scripts cannot name another app's data.
**Handle pattern** — collection-scoped services expose `kv::collection("widgets").get("k")`, not `kv::get("widgets", "k")`. Removes the wrong-collection-name foot-gun and lets implementations cache per-collection state. `(app_id, collection, key)` is the identity tuple for KV; `(app_id, collection, id)` for docs. Collections are mandatory.
**Error convention** — throw on failure, `()` for absent, `bool` for predicates. Uniform across every v1.1.x service. Scripts opt into handling errors via Rhai's `try/catch`.
**`ExecutionGate`** (`orchestrator-core::gate::ExecutionGate`) — single global semaphore capping concurrent script executions. Default 32, override via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var. Non-blocking — on overflow, the orchestrator returns HTTP 503 with `Retry-After: 1` immediately. No queue. Rationale: Rhai runs under `spawn_blocking`, so unbounded concurrency would park every blocking thread and starve every other workload.
**`ServiceEventEmitter`** (`picloud_shared::ServiceEventEmitter`) — every mutating service method emits a `ServiceEvent { source, op, collection, key, payload, old_payload }`. v1.1.0 ships `NoopEventEmitter`; the real outbox-backed dispatcher lands with v1.1.1 (see 7.5.1).
### 7.5.1 Trigger architecture (sketch)
Triggers fire scripts in response to service events. Three locked properties; full design and CRUD endpoints land with v1.1.1.
1. **Async outbox**: services emit events synchronously into a Postgres outbox table; a separate dispatcher worker reads, matches them against registered triggers, and fans out script executions. Service writes don't block on trigger fan-out.
2. **Depth-limited**: each trigger-spawned execution increments `cx.trigger_depth`. The dispatcher refuses to fan out beyond a configured ceiling to prevent runaway feedback loops. `cx.root_execution_id` preserves the originating execution id for audit grouping.
3. **Trigger model**: a trigger is `(service, event, filter) → script`, stored in a `triggers` table. The filter is the dispatcher's match predicate on the emitted `ServiceEvent`.
### 8.1 KV Store Service ### 8.1 KV Store Service
**Purpose**: Simple key-value persistence organized by collections, shared across script invocations and scripts. **Purpose**: Simple key-value persistence organized by collections, scoped per app and shared across script invocations and scripts within that app.
**PostgreSQL Setup:** **PostgreSQL Schema:**
```sql ```sql
-- Enable hstore extension (one-time setup)
CREATE EXTENSION IF NOT EXISTS hstore;
-- Create KV table with collection support
CREATE TABLE kv_store ( CREATE TABLE kv_store (
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
collection TEXT NOT NULL, collection TEXT NOT NULL,
key TEXT NOT NULL, key TEXT NOT NULL,
value hstore NOT NULL, value JSONB NOT NULL,
expires_at TIMESTAMP, expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(), created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (collection, key) PRIMARY KEY (app_id, collection, key)
); );
CREATE INDEX idx_kv_collection ON kv_store(collection); CREATE INDEX idx_kv_app_collection ON kv_store(app_id, collection);
CREATE INDEX idx_kv_expires ON kv_store(expires_at) CREATE INDEX idx_kv_expires ON kv_store(expires_at)
WHERE expires_at IS NOT NULL; WHERE expires_at IS NOT NULL;
``` ```
**Why hstore + collections?** **Why JSONB + mandatory collections + `app_id` first:**
- Lightweight, purpose-built for key-value storage - `(app_id, collection, key)` is the identity tuple. The PK begins with `app_id` so the index is naturally per-app; cross-app reads can't happen even if the service layer has a bug.
- Collections allow logical grouping (e.g., `kv:sessions`, `kv:counters`, `kv:flags`) - Collections are **mandatory** — every set / get / delete names one. The same key can legitimately live in multiple collections within one app (`sessions:abc` and `counters:abc` are distinct rows).
- Faster than JSONB for simple KV use cases - JSONB carries arbitrary script-side values (nested objects, arrays) without a separate serialization step. `hstore` was considered and ruled out — it doesn't carry nested types and would force a second JSONB column the moment a script writes a structured value.
- Built-in indexing support
- Keeps all data in one database (no Redis dependency)
**Rhai SDK:** **Value-size cap:** 64 KiB per value, enforced at the service layer (script-visible error on overflow). The cap keeps KV "small fast values, not blob storage"; the v1.1.5 files SDK is the right home for large payloads.
**Rhai SDK (handle pattern — see [docs/sdk-shape.md](docs/sdk-shape.md)):**
```rhai ```rhai
// Get a value from a collection let sessions = kv::collection("sessions");
let val = kv.get("sessions", "user:123"); // Returns object or null sessions.set("user:123", #{ token: "abc", created: "2026-04-10" });
let val = sessions.get("user:123"); // value or () if absent
sessions.delete("user:123");
sessions.set("user:123", #{ token: "xyz" }, 3600); // TTL in seconds
if sessions.has("user:123") { ... }
// Set a value in a collection // Distinct collections in one script — different handles.
kv.set("sessions", "user:123", { token: "abc", created: "2026-04-10" }); let counters = kv::collection("counters");
counters.set("api:calls", 42);
// Delete a key from a collection
kv.delete("sessions", "user:123");
// Set with TTL (seconds)
kv.set("sessions", "user:123", { token: "xyz" }, 3600);
// Check if key exists in a collection
if kv.has("sessions", "user:123") { ... }
// Use different collections for different purposes
kv.set("counters", "api:calls", 42);
kv.set("flags", "feature:beta", true);
kv.set("cache", "page:home", { html: "..." });
``` ```
**Use Cases:** **Use Cases:**