Files
PiCloud/crates/executor-core/src/engine.rs
MechaCat02 6b99f74c48 feat(v1.1.1-kv): Rhai kv:: SDK module + ctx.event wiring
Wires the KV store into Rhai scripts via the handle pattern:

    let widgets = kv::collection("widgets");
    widgets.set("k", #{ n: 1 });
    let v = widgets.get("k");          // value or () if absent
    widgets.has("k") / widgets.delete("k")
    let page = widgets.list();          // cursor-style pagination

`KvHandle` is a custom Rhai type holding `Arc<dyn KvService>` + the
per-call `Arc<SdkCallCx>`. Methods route async service calls through
`tokio::Handle::current().block_on(...)` — works because
`LocalExecutorClient` runs the script under `spawn_blocking` so a
runtime is reachable. The bridge surfaces `app_id` exclusively
through `cx.app_id`; no public-facing argument can spoof an app.

`TriggerEvent` lands in `picloud-shared` as the wire shape the
dispatcher will emit (KV + DeadLetter variants — KV exercised now,
DL hooks up with the dispatcher in commit 5/8). `SdkCallCx` and
`ExecRequest` grow `is_dead_letter_handler: bool` and
`event: Option<TriggerEvent>`. `engine.rs::build_ctx_map` flattens
the event into `ctx.event` for triggered handlers; direct ingress
leaves the key absent so scripts can `if "event" in ctx`.

Tests:
- 7 `sdk_kv.rs` integration tests covering the full Rhai surface
  (round-trip, missing-key unit, has bool, delete was-present,
  empty-collection rejection, cursor pagination, cross-app
  isolation through the bridge).
- 3 new `engine.rs` tests pinning `ctx.event` shape per
  design notes §4 (KV insert with value, delete with unit value,
  direct invocations have no `event` key).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 21:38:41 +02:00

382 lines
14 KiB
Rust

use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use chrono::Utc;
use picloud_shared::{
ScriptValidator, SdkCallCx, Services, TriggerEvent, ValidationError, SDK_VERSION,
};
use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope};
use serde_json::Value as Json;
use crate::sandbox::Limits;
use crate::sdk;
use crate::sdk::bridge::{dynamic_to_json, json_to_dynamic};
use crate::types::{
ExecError, ExecRequest, ExecResponse, ExecStats, InvocationType, LogEntry, LogLevel,
};
/// Preconfigured Rhai engine with sandbox limits applied and the SDK
/// `Services` bundle attached.
///
/// One `Engine` is constructed at process startup and reused across
/// invocations. `execute` is **synchronous** — it owns the per-call
/// scope and log buffer. Wall-clock timeouts and offloading off the
/// async runtime belong to the caller (orchestrator-core's
/// `LocalExecutorClient` wraps this with `spawn_blocking` + `timeout`).
///
/// The `Services` bundle is empty in v1.1.0; subsequent v1.1.x PRs add
/// service handles (KV, docs, …) and `sdk::register_all` wires them
/// into each per-call Rhai engine.
pub struct Engine {
limits: Limits,
services: Services,
}
impl Engine {
#[must_use]
pub fn new(limits: Limits, services: Services) -> Self {
Self { limits, services }
}
#[must_use]
pub fn limits(&self) -> &Limits {
&self.limits
}
/// Parse-only validation. Surfaced at script-upload time so syntax
/// errors are caught before the first invocation. Same logic as the
/// `ScriptValidator` impl below but with the richer `ExecError`
/// variant; callers in the executor path use this, the manager
/// path goes through the trait.
pub fn validate(&self, source: &str) -> Result<(), ExecError> {
let engine = build_engine(self.limits, None);
engine
.compile(source)
.map(|_| ())
.map_err(|e| ExecError::Parse(e.to_string()))
}
/// Execute `source` against `req`. Op-budget protection comes from
/// Rhai's `set_max_operations`; wall-clock enforcement is the
/// caller's responsibility. Per-script sandbox overrides on the
/// request replace the engine's defaults field-by-field; the
/// manager already clamped them against the admin ceiling.
pub fn execute(&self, source: &str, req: ExecRequest) -> Result<ExecResponse, ExecError> {
let effective_limits = self.limits.with_overrides(&req.sandbox_overrides);
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let mut engine = build_engine(effective_limits, Some(logs.clone()));
// Per-call context handed to every stateful SDK service via the
// `sdk::register_all` hook. The Arc lets future service closures
// capture cheap clones of the cx for use at script-call time.
let cx = Arc::new(SdkCallCx {
app_id: req.app_id,
principal: req.principal.clone(),
execution_id: req.execution_id,
request_id: req.request_id,
trigger_depth: req.trigger_depth,
root_execution_id: req.root_execution_id,
is_dead_letter_handler: req.is_dead_letter_handler,
event: req.event.clone(),
});
sdk::register_all(&mut engine, &self.services, cx);
let ast = engine
.compile(source)
.map_err(|e| ExecError::Parse(e.to_string()))?;
let mut scope = Scope::new();
scope.push_constant("ctx", build_ctx_map(&req));
let started = Instant::now();
let value: Dynamic = engine
.eval_ast_with_scope(&mut scope, &ast)
.map_err(map_eval_error)?;
let duration = started.elapsed();
let logs = Arc::try_unwrap(logs).map_or_else(
|arc| arc.lock().map(|g| g.clone()).unwrap_or_default(),
|m| m.into_inner().unwrap_or_default(),
);
let (status_code, headers, body) = parse_response(value)?;
Ok(ExecResponse {
status_code,
headers,
body,
logs,
stats: ExecStats {
duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
operations: 0,
},
})
}
}
impl ScriptValidator for Engine {
fn validate(&self, source: &str) -> Result<(), ValidationError> {
Engine::validate(self, source).map_err(|e| ValidationError::Syntax(e.to_string()))
}
}
// ----------------------------------------------------------------------------
// Engine construction
// ----------------------------------------------------------------------------
fn build_engine(limits: Limits, logs: Option<Arc<Mutex<Vec<LogEntry>>>>) -> RhaiEngine {
let mut engine = RhaiEngine::new();
engine.set_max_operations(limits.max_operations);
engine.set_max_string_size(limits.max_string_size);
engine.set_max_array_size(limits.max_array_size);
engine.set_max_map_size(limits.max_map_size);
engine.set_max_call_levels(limits.max_call_levels);
engine.set_max_expr_depths(limits.max_expr_depth, limits.max_expr_depth);
// Reject `import` — scripts cannot pull external modules.
engine.set_module_resolver(rhai::module_resolvers::DummyModuleResolver);
// Rhai's built-in `print` and `debug` map to stdout/stderr by
// default; we never want scripts dumping there directly. Disable
// them so scripts route all output through `log::*` instead.
engine.disable_symbol("print");
if let Some(logs) = logs {
engine.register_static_module("log", build_log_module(logs).into());
}
// Stateless utility modules — regex::/random::/time::/json::/base64::/
// hex::/url::. Always registered, including in the parse-only validate
// path, so script authors get consistent surface in both phases.
sdk::stdlib::register_stdlib(&mut engine);
engine
}
fn build_log_module(logs: Arc<Mutex<Vec<LogEntry>>>) -> Module {
let mut module = Module::new();
register_log_fn(&mut module, "trace", LogLevel::Trace, &logs);
register_log_fn(&mut module, "info", LogLevel::Info, &logs);
register_log_fn(&mut module, "warn", LogLevel::Warn, &logs);
register_log_fn(&mut module, "error", LogLevel::Error, &logs);
// No `log::debug` — `debug` is a Rhai reserved keyword. Use
// `log::trace` for sub-info-level diagnostics.
module
}
fn register_log_fn(
module: &mut Module,
name: &str,
level: LogLevel,
logs: &Arc<Mutex<Vec<LogEntry>>>,
) {
// Single-argument form: `log::info("message")`.
let logs_single = logs.clone();
module.set_native_fn(name, move |msg: &str| {
push_log(&logs_single, level, msg, None);
Ok::<_, Box<EvalAltResult>>(())
});
// Two-argument form: `log::info("message", #{ user: 42 })`.
let logs_struct = logs.clone();
module.set_native_fn(name, move |msg: &str, data: Dynamic| {
let json = dynamic_to_json(&data);
push_log(&logs_struct, level, msg, Some(json));
Ok::<_, Box<EvalAltResult>>(())
});
}
fn push_log(logs: &Arc<Mutex<Vec<LogEntry>>>, level: LogLevel, message: &str, data: Option<Json>) {
if let Ok(mut g) = logs.lock() {
g.push(LogEntry {
timestamp: Utc::now(),
level,
message: message.to_string(),
data,
});
}
}
// ----------------------------------------------------------------------------
// ctx construction
// ----------------------------------------------------------------------------
fn build_ctx_map(req: &ExecRequest) -> Map {
let mut ctx = Map::new();
ctx.insert("sdk_version".into(), SDK_VERSION.into());
ctx.insert("execution_id".into(), req.execution_id.to_string().into());
ctx.insert("script_id".into(), req.script_id.to_string().into());
ctx.insert("script_name".into(), req.script_name.clone().into());
ctx.insert("request_id".into(), req.request_id.to_string().into());
ctx.insert(
"invocation_type".into(),
invocation_type_str(req.invocation_type).into(),
);
let mut request = Map::new();
request.insert("path".into(), req.path.clone().into());
let mut headers = Map::new();
for (k, v) in &req.headers {
headers.insert(k.clone().into(), v.clone().into());
}
request.insert("headers".into(), headers.into());
request.insert("body".into(), json_to_dynamic(req.body.clone()));
// SDK 1.1 additions — route-captured params, query string, prefix
// tail. Empty when not applicable so scripts can always read them.
let mut params = Map::new();
for (k, v) in &req.params {
params.insert(k.clone().into(), v.clone().into());
}
request.insert("params".into(), params.into());
let mut query = Map::new();
for (k, v) in &req.query {
query.insert(k.clone().into(), v.clone().into());
}
request.insert("query".into(), query.into());
request.insert("rest".into(), req.rest.clone().into());
ctx.insert("request".into(), request.into());
// Triggered invocations: surface the originating event as
// `ctx.event`. Direct ingress (HTTP request, manual run) leaves
// the key absent so scripts can test `if "event" in ctx`.
if let Some(event) = req.event.as_ref() {
ctx.insert("event".into(), trigger_event_to_dynamic(event));
}
ctx
}
/// Convert a `TriggerEvent` into the `ctx.event` Rhai shape defined in
/// `docs/v1.1.x-design-notes.md` §4 (the dead-letter sub-shape) and
/// §2/blueprint §9 (KV). Each variant becomes a Rhai map with a
/// `source` discriminant plus per-source fields.
fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic {
let mut m = Map::new();
m.insert("source".into(), event.source().into());
match event {
TriggerEvent::Kv {
op,
collection,
key,
value,
} => {
m.insert("op".into(), op.as_str().into());
let mut kv_map = Map::new();
kv_map.insert("collection".into(), collection.clone().into());
kv_map.insert("key".into(), key.clone().into());
kv_map.insert(
"value".into(),
value.clone().map_or(Dynamic::UNIT, json_to_dynamic),
);
m.insert("kv".into(), kv_map.into());
}
TriggerEvent::DeadLetter {
dead_letter_id,
original,
attempts,
last_error,
trigger_id,
script_id,
first_attempt_at,
last_attempt_at,
} => {
let mut dl = Map::new();
dl.insert("id".into(), dead_letter_id.to_string().into());
dl.insert("original".into(), trigger_event_to_dynamic(original));
dl.insert("attempts".into(), i64::from(*attempts).into());
dl.insert("last_error".into(), last_error.clone().into());
dl.insert(
"trigger_id".into(),
trigger_id
.map(|id| Dynamic::from(id.to_string()))
.unwrap_or(Dynamic::UNIT),
);
dl.insert(
"script_id".into(),
script_id
.map(|id| Dynamic::from(id.to_string()))
.unwrap_or(Dynamic::UNIT),
);
dl.insert(
"first_attempt_at".into(),
first_attempt_at.to_rfc3339().into(),
);
dl.insert(
"last_attempt_at".into(),
last_attempt_at.to_rfc3339().into(),
);
m.insert("dead_letter".into(), dl.into());
}
}
m.into()
}
fn invocation_type_str(it: InvocationType) -> &'static str {
match it {
InvocationType::Http => "http",
InvocationType::Function => "function",
InvocationType::Scheduled => "scheduled",
}
}
// ----------------------------------------------------------------------------
// Response parsing
// ----------------------------------------------------------------------------
fn parse_response(value: Dynamic) -> Result<(u16, BTreeMap<String, String>, Json), ExecError> {
// Convention: a Map with a `statusCode` field is the structured shape.
// Anything else is treated as a 200 response with the value as body.
if value.is_map() {
if let Some(map) = value.clone().try_cast::<Map>() {
if map.contains_key("statusCode") {
return parse_structured_response(map);
}
}
}
Ok((200, BTreeMap::new(), dynamic_to_json(&value)))
}
fn parse_structured_response(map: Map) -> Result<(u16, BTreeMap<String, String>, Json), ExecError> {
let status_dyn = map
.get("statusCode")
.ok_or_else(|| ExecError::InvalidResponse("missing statusCode".into()))?;
let status_code: i64 = status_dyn
.as_int()
.map_err(|_| ExecError::InvalidResponse("statusCode must be an integer".into()))?;
let status_code = u16::try_from(status_code)
.map_err(|_| ExecError::InvalidResponse("statusCode out of HTTP range".into()))?;
let mut headers: BTreeMap<String, String> = BTreeMap::new();
if let Some(h) = map.get("headers") {
if let Some(h_map) = h.clone().try_cast::<Map>() {
for (k, v) in h_map {
headers.insert(k.to_string(), v.to_string());
}
}
}
let body = map.get("body").map_or(Json::Null, dynamic_to_json);
Ok((status_code, headers, body))
}
// ----------------------------------------------------------------------------
// Error mapping
// ----------------------------------------------------------------------------
fn map_eval_error(err: Box<EvalAltResult>) -> ExecError {
match *err {
EvalAltResult::ErrorTooManyOperations(_) => ExecError::OperationBudgetExceeded,
EvalAltResult::ErrorParsing(parse_err, _) => ExecError::Parse(parse_err.to_string()),
other => ExecError::Runtime(other.to_string()),
}
}