From fe1dd9083678dfb2ee701122058a854d5f37adb7 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 30 May 2026 18:48:39 +0200 Subject: [PATCH] feat(executor-core): plumb app_id/principal/depth through ExecRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the four internal-only fields every v1.1.x stateful service needs to isolate by app and audit by caller: - app_id — owning app for this invocation - principal — Option; data-plane is unauthenticated today so the orchestrator passes None until the opportunistic middleware lands in the next commit - trigger_depth — 0 for direct invocations; the triggers framework (v1.1.1) bounds runaway feedback loops via this - root_execution_id — equal to execution_id for direct invocations; preserved across trigger fan-out for audit grouping ExecRequest stays serializable (cluster mode still has to ship it across processes when v1.3+ arrives). principal is `#[serde(skip)]` because shared::Principal has no wire derivation today — when cluster mode lands the wire-Principal question gets revisited properly. Engine now carries a Services bundle (empty in v1.1.0). Engine::execute constructs an SdkCallCx from the request and hands it to sdk::register_all just after the per-call Rhai engine is built. The hook is a no-op in v1.1.0; v1.1.1 KV registers its first native fns there. Adds ExecError::Overloaded { retry_after_secs } and the matching 503 + Retry-After mapping in orchestrator-core's IntoResponse. The gate that actually produces this variant lands in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/executor-core/src/engine.rs | 30 ++++++++++++++--- crates/executor-core/src/types.rs | 38 +++++++++++++++++++++- crates/executor-core/tests/engine.rs | 13 +++++--- crates/executor-core/tests/sdk_contract.rs | 11 +++++-- crates/orchestrator-core/src/api.rs | 38 ++++++++++++++++++++-- crates/picloud/src/lib.rs | 7 ++-- 6 files changed, 119 insertions(+), 18 deletions(-) diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index 6e2e745..f2849a8 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -3,31 +3,38 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use chrono::Utc; -use picloud_shared::{ScriptValidator, ValidationError, SDK_VERSION}; +use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION}; use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; use serde_json::Value as Json; use crate::sandbox::Limits; +use crate::sdk; use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic}; use crate::types::{ ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel, }; -/// Preconfigured Rhai engine with sandbox limits applied. +/// Preconfigured Rhai engine with sandbox limits applied and the SDK +/// `Services` bundle attached. /// /// One `Engine` is constructed at process startup and reused across /// invocations. `execute` is **synchronous** — it owns the per-call /// scope and log buffer. Wall-clock timeouts and offloading off the /// async runtime belong to the caller (orchestrator-core's /// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`). +/// +/// The `Services` bundle is empty in v1.1.0; subsequent v1.1.x PRs add +/// service handles (KV, docs, …) and `sdk::register_all` wires them +/// into each per-call Rhai engine. pub struct Engine { limits: Limits, + services: Services, } impl Engine { #[must_use] - pub fn new(limits: Limits) -> Self { - Self { limits } + pub fn new(limits: Limits, services: Services) -> Self { + Self { limits, services } } #[must_use] @@ -56,7 +63,20 @@ impl Engine { pub fn execute(&self, source: &str, req: ExecRequest) -> Result { let effective_limits = self.limits.with_overrides(&req.sandbox_overrides); let logs: Arc>> = Arc::new(Mutex::new(Vec::new())); - let engine = build_engine(effective_limits, Some(logs.clone())); + let mut engine = build_engine(effective_limits, Some(logs.clone())); + + // Per-call context handed to every stateful SDK service via the + // `sdk::register_all` hook. The Arc lets future service closures + // capture cheap clones of the cx for use at script-call time. + let cx = Arc::new(SdkCallCx { + app_id: req.app_id, + principal: req.principal.clone(), + execution_id: req.execution_id, + request_id: req.request_id, + trigger_depth: req.trigger_depth, + root_execution_id: req.root_execution_id, + }); + sdk::register_all(&mut engine, &self.services, cx); let ast = engine .compile(source) diff --git a/crates/executor-core/src/types.rs b/crates/executor-core/src/types.rs index fd57dce..d974a17 100644 --- a/crates/executor-core/src/types.rs +++ b/crates/executor-core/src/types.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use chrono::{DateTime, Utc}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -50,6 +50,35 @@ pub struct ExecRequest { /// override) before the Rhai engine is built. #[serde(default)] pub sandbox_overrides: ScriptSandbox, + + /// Owning application. Source of truth for every `(app_id, …)` + /// storage lookup the script makes via stateful SDK services. + /// Internal-only; not surfaced via `ctx` (which the script sees). + pub app_id: AppId, + + /// Caller identity, when authenticated. `None` for unauthenticated + /// data-plane HTTP requests (the common case for public scripts); + /// `Some` when a bearer token or session cookie was resolved. + /// Internal-only — exposed via `SdkCallCx` to service trait impls. + /// + /// `#[serde(skip)]`: `ExecRequest` is serializable so cluster mode + /// (v1.3+) can ship invocations to remote executors over HTTP, but + /// `Principal` has no wire derivation today. Skipping here keeps + /// v1.1.0 compiling; the cluster-mode PR will introduce a wire-safe + /// snapshot then. + #[serde(skip)] + pub principal: Option, + + /// Triggers-framework depth. `0` for direct invocations. The + /// dispatcher (v1.1.1) increments on each indirection to bound + /// runaway feedback loops. + #[serde(default)] + pub trigger_depth: u32, + + /// Originating execution id of a trigger chain. Equal to + /// `execution_id` for direct invocations; preserves the root + /// across fan-out for audit log grouping. + pub root_execution_id: ExecutionId, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -100,4 +129,11 @@ pub enum ExecError { #[error("script runtime error: {0}")] Runtime(String), + + /// Concurrency gate (orchestrator-core::ExecutionGate) refused + /// admission. Surfaced as HTTP 503 with a `Retry-After` header. + /// The gate enforces a global cap so a script storm can't park + /// every blocking thread. + #[error("execution declined: server at capacity (retry after {retry_after_secs}s)")] + Overloaded { retry_after_secs: u32 }, } diff --git a/crates/executor-core/tests/engine.rs b/crates/executor-core/tests/engine.rs index c888ae4..39935f4 100644 --- a/crates/executor-core/tests/engine.rs +++ b/crates/executor-core/tests/engine.rs @@ -1,12 +1,13 @@ use std::collections::BTreeMap; use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services}; use serde_json::json; fn req(body: serde_json::Value) -> ExecRequest { + let execution_id = ExecutionId::new(); ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: ScriptId::new(), script_name: "test".into(), @@ -18,11 +19,15 @@ fn req(body: serde_json::Value) -> ExecRequest { query: BTreeMap::new(), rest: String::new(), sandbox_overrides: ScriptSandbox::default(), + app_id: AppId::new(), + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, } } fn engine() -> Engine { - Engine::new(Limits::default()) + Engine::new(Limits::default(), Services::new()) } #[test] @@ -121,7 +126,7 @@ fn enforces_operation_budget() { max_operations: 1_000, ..Limits::default() }; - let engine = Engine::new(limits); + let engine = Engine::new(limits, Services::new()); // 10_000 iterations vastly exceeds 1_000 ops. let src = r"let n = 0; for i in 0..10000 { n += 1; } n"; let err = engine diff --git a/crates/executor-core/tests/sdk_contract.rs b/crates/executor-core/tests/sdk_contract.rs index af7fbb1..26788c5 100644 --- a/crates/executor-core/tests/sdk_contract.rs +++ b/crates/executor-core/tests/sdk_contract.rs @@ -23,7 +23,7 @@ use std::collections::BTreeMap; use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits, LogLevel}; -use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services}; use serde_json::{json, Value}; // ---------------------------------------------------------------------------- @@ -31,12 +31,13 @@ use serde_json::{json, Value}; // ---------------------------------------------------------------------------- fn engine() -> Engine { - Engine::new(Limits::default()) + Engine::new(Limits::default(), Services::new()) } fn baseline_request() -> ExecRequest { + let execution_id = ExecutionId::new(); ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: ScriptId::new(), script_name: "contract".into(), @@ -48,6 +49,10 @@ fn baseline_request() -> ExecRequest { query: BTreeMap::new(), rest: String::new(), sandbox_overrides: ScriptSandbox::default(), + app_id: AppId::new(), + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, } } diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index c11454b..82008e2 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -17,7 +17,8 @@ use axum::{ use chrono::Utc; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_shared::{ - AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId, + AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, Principal, RequestId, + ScriptId, }; use serde_json::Value as Json_; use uuid::Uuid; @@ -97,7 +98,10 @@ where .await? .ok_or(ApiError::NotFound(id))?; - let mut req = build_exec_request(id, &script.name, &headers, &body)?; + // Principal stays `None` until the data-plane `attach_principal_if_present` + // middleware lands in the picloud-wiring commit. Both shapes are + // valid against `ExecRequest.principal: Option`. + let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, None)?; req.sandbox_overrides = script.sandbox; let request_id = req.request_id; let request_path = req.path.clone(); @@ -195,6 +199,8 @@ where &script.name, &headers, &body_bytes, + app_id, + None, )?; req.path = path; req.params = matched.params; @@ -264,6 +270,8 @@ fn build_exec_request( name: &str, headers: &HeaderMap, body: &Bytes, + app_id: AppId, + principal: Option, ) -> Result { let mut hmap = BTreeMap::new(); for (k, v) in headers { @@ -279,8 +287,9 @@ fn build_exec_request( .map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))? }; + let execution_id = ExecutionId::new(); Ok(ExecRequest { - execution_id: ExecutionId::new(), + execution_id, request_id: RequestId::new(), script_id: id, script_name: name.to_string(), @@ -293,6 +302,13 @@ fn build_exec_request( rest: String::new(), // Overwritten by the handler after the script is resolved. sandbox_overrides: picloud_shared::ScriptSandbox::default(), + app_id, + principal, + // Direct invocations are at depth 0 with a self-referential + // root. The triggers framework (v1.1.1) increments depth and + // preserves the original root for chained executions. + trigger_depth: 0, + root_execution_id: execution_id, }) } @@ -396,6 +412,21 @@ pub enum ApiError { impl IntoResponse for ApiError { fn into_response(self) -> Response { + // Overloaded is the only variant that needs to attach an HTTP + // header (Retry-After), so it short-circuits the (status, body) + // reduction below. Axum's tuple builder makes per-arm header + // injection awkward otherwise. + if let ApiError::Exec(ExecError::Overloaded { retry_after_secs }) = &self { + let retry = retry_after_secs.to_string(); + let body = Json(serde_json::json!({ "error": self.to_string() })); + return ( + StatusCode::SERVICE_UNAVAILABLE, + [(axum::http::header::RETRY_AFTER, retry)], + body, + ) + .into_response(); + } + use ApiError as E; let (status, message) = match &self { E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()), @@ -416,6 +447,7 @@ impl IntoResponse for ApiError { (StatusCode::INSUFFICIENT_STORAGE, e.to_string()) } ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()), + ExecError::Overloaded { .. } => unreachable!("handled above"), }, }; (status, Json(serde_json::json!({ "error": message }))).into_response() diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index ef59a73..9ae47c5 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -25,7 +25,8 @@ use picloud_orchestrator_core::{ data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, ScriptValidator, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, + ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, + WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -82,7 +83,9 @@ fn read_session_ttl() -> Duration { /// `/version`) stays open — it's the public ingress for user scripts. #[allow(clippy::too_many_lines)] pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { - let engine = Arc::new(Engine::new(Limits::default())); + // `Services` is the SDK service bundle. Empty in v1.1.0; the + // v1.1.1 KV PR will populate it with `kv: Arc::new(...)` here. + let engine = Arc::new(Engine::new(Limits::default(), Services::new())); let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone())); let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone()));