feat(executor-core): plumb app_id/principal/depth through ExecRequest

Adds the four internal-only fields every v1.1.x stateful service needs
to isolate by app and audit by caller:

  - app_id            — owning app for this invocation
  - principal         — Option<Principal>; data-plane is unauthenticated
                        today so the orchestrator passes None until the
                        opportunistic middleware lands in the next commit
  - trigger_depth     — 0 for direct invocations; the triggers framework
                        (v1.1.1) bounds runaway feedback loops via this
  - root_execution_id — equal to execution_id for direct invocations;
                        preserved across trigger fan-out for audit grouping

ExecRequest stays serializable (cluster mode still has to ship it across
processes when v1.3+ arrives). principal is `#[serde(skip)]` because
shared::Principal has no wire derivation today — when cluster mode lands
the wire-Principal question gets revisited properly.

Engine now carries a Services bundle (empty in v1.1.0). Engine::execute
constructs an SdkCallCx from the request and hands it to sdk::register_all
just after the per-call Rhai engine is built. The hook is a no-op in v1.1.0;
v1.1.1 KV registers its first native fns there.

Adds ExecError::Overloaded { retry_after_secs } and the matching 503 +
Retry-After mapping in orchestrator-core's IntoResponse. The gate that
actually produces this variant lands in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-30 18:48:39 +02:00
parent aaba58dee1
commit fe1dd90836
6 changed files with 119 additions and 18 deletions

View File

@@ -3,31 +3,38 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;
use chrono::Utc;
use picloud_shared::{ScriptValidator, ValidationError, SDK_VERSION};
use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION};
use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope};
use serde_json::Value as Json;
use crate::sandbox::Limits;
use crate::sdk;
use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic};
use crate::types::{
ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel,
};
/// Preconfigured Rhai engine with sandbox limits applied.
/// Preconfigured Rhai engine with sandbox limits applied and the SDK
/// `Services` bundle attached.
///
/// One `Engine` is constructed at process startup and reused across
/// invocations. `execute` is **synchronous** — it owns the per-call
/// scope and log buffer. Wall-clock timeouts and offloading off the
/// async runtime belong to the caller (orchestrator-core's
/// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`).
///
/// The `Services` bundle is empty in v1.1.0; subsequent v1.1.x PRs add
/// service handles (KV, docs, …) and `sdk::register_all` wires them
/// into each per-call Rhai engine.
pub struct Engine {
limits: Limits,
services: Services,
}
impl Engine {
#[must_use]
pub fn new(limits: Limits) -> Self {
Self { limits }
pub fn new(limits: Limits, services: Services) -> Self {
Self { limits, services }
}
#[must_use]
@@ -56,7 +63,20 @@ impl Engine {
pub fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> {
let effective_limits = self.limits.with_overrides(&req.sandbox_overrides);
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let engine = build_engine(effective_limits, Some(logs.clone()));
let mut engine = build_engine(effective_limits, Some(logs.clone()));
// Per-call context handed to every stateful SDK service via the
// `sdk::register_all` hook. The Arc lets future service closures
// capture cheap clones of the cx for use at script-call time.
let cx = Arc::new(SdkCallCx {
app_id: req.app_id,
principal: req.principal.clone(),
execution_id: req.execution_id,
request_id: req.request_id,
trigger_depth: req.trigger_depth,
root_execution_id: req.root_execution_id,
});
sdk::register_all(&mut engine, &self.services, cx);
let ast = engine
.compile(source)

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use chrono::{DateTime, Utc};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox};
use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox};
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -50,6 +50,35 @@ pub struct ExecRequest {
/// override) before the Rhai engine is built.
#[serde(default)]
pub sandbox_overrides: ScriptSandbox,
/// Owning application. Source of truth for every `(app_id, …)`
/// storage lookup the script makes via stateful SDK services.
/// Internal-only; not surfaced via `ctx` (which the script sees).
pub app_id: AppId,
/// Caller identity, when authenticated. `None` for unauthenticated
/// data-plane HTTP requests (the common case for public scripts);
/// `Some` when a bearer token or session cookie was resolved.
/// Internal-only — exposed via `SdkCallCx` to service trait impls.
///
/// `#[serde(skip)]`: `ExecRequest` is serializable so cluster mode
/// (v1.3+) can ship invocations to remote executors over HTTP, but
/// `Principal` has no wire derivation today. Skipping here keeps
/// v1.1.0 compiling; the cluster-mode PR will introduce a wire-safe
/// snapshot then.
#[serde(skip)]
pub principal: Option<Principal>,
/// Triggers-framework depth. `0` for direct invocations. The
/// dispatcher (v1.1.1) increments on each indirection to bound
/// runaway feedback loops.
#[serde(default)]
pub trigger_depth: u32,
/// Originating execution id of a trigger chain. Equal to
/// `execution_id` for direct invocations; preserves the root
/// across fan-out for audit log grouping.
pub root_execution_id: ExecutionId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -100,4 +129,11 @@ pub enum ExecError {
#[error("script runtime error: {0}")]
Runtime(String),
/// Concurrency gate (orchestrator-core::ExecutionGate) refused
/// admission. Surfaced as HTTP 503 with a `Retry-After` header.
/// The gate enforces a global cap so a script storm can't park
/// every blocking thread.
#[error("execution declined: server at capacity (retry after {retry_after_secs}s)")]
Overloaded { retry_after_secs: u32 },
}

View File

@@ -1,12 +1,13 @@
use std::collections::BTreeMap;
use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox};
use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services};
use serde_json::json;
fn req(body: serde_json::Value) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id: ExecutionId::new(),
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "test".into(),
@@ -18,11 +19,15 @@ fn req(body: serde_json::Value) -> ExecRequest {
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id: AppId::new(),
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
}
}
fn engine() -> Engine {
Engine::new(Limits::default())
Engine::new(Limits::default(), Services::new())
}
#[test]
@@ -121,7 +126,7 @@ fn enforces_operation_budget() {
max_operations: 1_000,
..Limits::default()
};
let engine = Engine::new(limits);
let engine = Engine::new(limits, Services::new());
// 10_000 iterations vastly exceeds 1_000 ops.
let src = r"let n = 0; for i in 0..10000 { n += 1; } n";
let err = engine

View File

@@ -23,7 +23,7 @@
use std::collections::BTreeMap;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits, LogLevel};
use picloud_shared::{ExecutionId, RequestId, ScriptId, ScriptSandbox};
use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services};
use serde_json::{json, Value};
// ----------------------------------------------------------------------------
@@ -31,12 +31,13 @@ use serde_json::{json, Value};
// ----------------------------------------------------------------------------
fn engine() -> Engine {
Engine::new(Limits::default())
Engine::new(Limits::default(), Services::new())
}
fn baseline_request() -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id: ExecutionId::new(),
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "contract".into(),
@@ -48,6 +49,10 @@ fn baseline_request() -> ExecRequest {
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id: AppId::new(),
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
}
}

View File

@@ -17,7 +17,8 @@ use axum::{
use chrono::Utc;
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
use picloud_shared::{
AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId,
AppId, ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, Principal, RequestId,
ScriptId,
};
use serde_json::Value as Json_;
use uuid::Uuid;
@@ -97,7 +98,10 @@ where
.await?
.ok_or(ApiError::NotFound(id))?;
let mut req = build_exec_request(id, &script.name, &headers, &body)?;
// Principal stays `None` until the data-plane `attach_principal_if_present`
// middleware lands in the picloud-wiring commit. Both shapes are
// valid against `ExecRequest.principal: Option<Principal>`.
let mut req = build_exec_request(id, &script.name, &headers, &body, script.app_id, None)?;
req.sandbox_overrides = script.sandbox;
let request_id = req.request_id;
let request_path = req.path.clone();
@@ -195,6 +199,8 @@ where
&script.name,
&headers,
&body_bytes,
app_id,
None,
)?;
req.path = path;
req.params = matched.params;
@@ -264,6 +270,8 @@ fn build_exec_request(
name: &str,
headers: &HeaderMap,
body: &Bytes,
app_id: AppId,
principal: Option<Principal>,
) -> Result<ExecRequest, ApiError> {
let mut hmap = BTreeMap::new();
for (k, v) in headers {
@@ -279,8 +287,9 @@ fn build_exec_request(
.map_err(|e| ApiError::BadRequest(format!("invalid JSON body: {e}")))?
};
let execution_id = ExecutionId::new();
Ok(ExecRequest {
execution_id: ExecutionId::new(),
execution_id,
request_id: RequestId::new(),
script_id: id,
script_name: name.to_string(),
@@ -293,6 +302,13 @@ fn build_exec_request(
rest: String::new(),
// Overwritten by the handler after the script is resolved.
sandbox_overrides: picloud_shared::ScriptSandbox::default(),
app_id,
principal,
// Direct invocations are at depth 0 with a self-referential
// root. The triggers framework (v1.1.1) increments depth and
// preserves the original root for chained executions.
trigger_depth: 0,
root_execution_id: execution_id,
})
}
@@ -396,6 +412,21 @@ pub enum ApiError {
impl IntoResponse for ApiError {
fn into_response(self) -> Response {
// Overloaded is the only variant that needs to attach an HTTP
// header (Retry-After), so it short-circuits the (status, body)
// reduction below. Axum's tuple builder makes per-arm header
// injection awkward otherwise.
if let ApiError::Exec(ExecError::Overloaded { retry_after_secs }) = &self {
let retry = retry_after_secs.to_string();
let body = Json(serde_json::json!({ "error": self.to_string() }));
return (
StatusCode::SERVICE_UNAVAILABLE,
[(axum::http::header::RETRY_AFTER, retry)],
body,
)
.into_response();
}
use ApiError as E;
let (status, message) = match &self {
E::NotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
@@ -416,6 +447,7 @@ impl IntoResponse for ApiError {
(StatusCode::INSUFFICIENT_STORAGE, e.to_string())
}
ExecError::Runtime(_) => (StatusCode::BAD_GATEWAY, e.to_string()),
ExecError::Overloaded { .. } => unreachable!("handled above"),
},
};
(status, Json(serde_json::json!({ "error": message }))).into_response()

View File

@@ -25,7 +25,8 @@ use picloud_orchestrator_core::{
data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient,
};
use picloud_shared::{
ExecutionLogSink, ScriptValidator, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION,
ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
WIRE_VERSION,
};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
@@ -82,7 +83,9 @@ fn read_session_ttl() -> Duration {
/// `/version`) stays open — it's the public ingress for user scripts.
#[allow(clippy::too_many_lines)]
pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
let engine = Arc::new(Engine::new(Limits::default()));
// `Services` is the SDK service bundle. Empty in v1.1.0; the
// v1.1.1 KV PR will populate it with `kv: Arc::new(...)` here.
let engine = Arc::new(Engine::new(Limits::default(), Services::new()));
let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone()));
let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone()));