From 6b99f74c48b81c95fedd1ab28bf5385e501a3a05 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Mon, 1 Jun 2026 21:38:41 +0200 Subject: [PATCH] feat(v1.1.1-kv): Rhai kv:: SDK module + ctx.event wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the KV store into Rhai scripts via the handle pattern: let widgets = kv::collection("widgets"); widgets.set("k", #{ n: 1 }); let v = widgets.get("k"); // value or () if absent widgets.has("k") / widgets.delete("k") let page = widgets.list(); // cursor-style pagination `KvHandle` is a custom Rhai type holding `Arc` + the per-call `Arc`. Methods route async service calls through `tokio::Handle::current().block_on(...)` — works because `LocalExecutorClient` runs the script under `spawn_blocking` so a runtime is reachable. The bridge surfaces `app_id` exclusively through `cx.app_id`; no public-facing argument can spoof an app. `TriggerEvent` lands in `picloud-shared` as the wire shape the dispatcher will emit (KV + DeadLetter variants — KV exercised now, DL hooks up with the dispatcher in commit 5/8). `SdkCallCx` and `ExecRequest` grow `is_dead_letter_handler: bool` and `event: Option`. `engine.rs::build_ctx_map` flattens the event into `ctx.event` for triggered handlers; direct ingress leaves the key absent so scripts can `if "event" in ctx`. Tests: - 7 `sdk_kv.rs` integration tests covering the full Rhai surface (round-trip, missing-key unit, has bool, delete was-present, empty-collection rejection, cursor pagination, cross-app isolation through the bridge). - 3 new `engine.rs` tests pinning `ctx.event` shape per design notes §4 (KV insert with value, delete with unit value, direct invocations have no `event` key). Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 2 + crates/executor-core/Cargo.toml | 4 + crates/executor-core/src/engine.rs | 79 ++++++- crates/executor-core/src/sdk/kv.rs | 193 +++++++++++++++ crates/executor-core/src/sdk/mod.rs | 15 +- crates/executor-core/src/types.rs | 18 +- crates/executor-core/tests/engine.rs | 70 +++++- crates/executor-core/tests/sdk_contract.rs | 2 + crates/executor-core/tests/sdk_kv.rs | 260 +++++++++++++++++++++ crates/executor-core/tests/stdlib.rs | 2 + crates/manager-core/src/kv_service.rs | 6 + crates/orchestrator-core/src/api.rs | 5 + crates/shared/src/lib.rs | 2 + crates/shared/src/sdk_cx.rs | 17 +- crates/shared/src/trigger_event.rs | 105 +++++++++ 15 files changed, 767 insertions(+), 13 deletions(-) create mode 100644 crates/executor-core/src/sdk/kv.rs create mode 100644 crates/executor-core/tests/sdk_kv.rs create mode 100644 crates/shared/src/trigger_event.rs diff --git a/Cargo.lock b/Cargo.lock index 535af93..890ff57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,6 +1566,7 @@ dependencies = [ name = "picloud-executor-core" version = "1.1.0" dependencies = [ + "async-trait", "base64", "chrono", "hex", @@ -1577,6 +1578,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tokio", "tracing", "uuid", ] diff --git a/crates/executor-core/Cargo.toml b/crates/executor-core/Cargo.toml index 1b13667..4af4552 100644 --- a/crates/executor-core/Cargo.toml +++ b/crates/executor-core/Cargo.toml @@ -14,6 +14,7 @@ picloud-shared.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true uuid.workspace = true chrono.workspace = true @@ -25,3 +26,6 @@ rand.workspace = true base64.workspace = true hex.workspace = true percent-encoding.workspace = true + +[dev-dependencies] +async-trait.workspace = true diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index d580cc4..71477ed 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -3,7 +3,9 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; use chrono::Utc; -use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION}; +use picloud_shared::{ + ScriptValidator, SdkCallCx, Services, TriggerEvent, ValidationError, SDK_VERSION, +}; use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope}; use serde_json::Value as Json; @@ -75,6 +77,8 @@ impl Engine { request_id: req.request_id, trigger_depth: req.trigger_depth, root_execution_id: req.root_execution_id, + is_dead_letter_handler: req.is_dead_letter_handler, + event: req.event.clone(), }); sdk::register_all(&mut engine, &self.services, cx); @@ -239,9 +243,82 @@ fn build_ctx_map(req: &ExecRequest) -> Map { request.insert("rest".into(), req.rest.clone().into()); ctx.insert("request".into(), request.into()); + + // Triggered invocations: surface the originating event as + // `ctx.event`. Direct ingress (HTTP request, manual run) leaves + // the key absent so scripts can test `if "event" in ctx`. + if let Some(event) = req.event.as_ref() { + ctx.insert("event".into(), trigger_event_to_dynamic(event)); + } + ctx } +/// Convert a `TriggerEvent` into the `ctx.event` Rhai shape defined in +/// `docs/v1.1.x-design-notes.md` §4 (the dead-letter sub-shape) and +/// §2/blueprint §9 (KV). Each variant becomes a Rhai map with a +/// `source` discriminant plus per-source fields. +fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic { + let mut m = Map::new(); + m.insert("source".into(), event.source().into()); + match event { + TriggerEvent::Kv { + op, + collection, + key, + value, + } => { + m.insert("op".into(), op.as_str().into()); + let mut kv_map = Map::new(); + kv_map.insert("collection".into(), collection.clone().into()); + kv_map.insert("key".into(), key.clone().into()); + kv_map.insert( + "value".into(), + value.clone().map_or(Dynamic::UNIT, json_to_dynamic), + ); + m.insert("kv".into(), kv_map.into()); + } + TriggerEvent::DeadLetter { + dead_letter_id, + original, + attempts, + last_error, + trigger_id, + script_id, + first_attempt_at, + last_attempt_at, + } => { + let mut dl = Map::new(); + dl.insert("id".into(), dead_letter_id.to_string().into()); + dl.insert("original".into(), trigger_event_to_dynamic(original)); + dl.insert("attempts".into(), i64::from(*attempts).into()); + dl.insert("last_error".into(), last_error.clone().into()); + dl.insert( + "trigger_id".into(), + trigger_id + .map(|id| Dynamic::from(id.to_string())) + .unwrap_or(Dynamic::UNIT), + ); + dl.insert( + "script_id".into(), + script_id + .map(|id| Dynamic::from(id.to_string())) + .unwrap_or(Dynamic::UNIT), + ); + dl.insert( + "first_attempt_at".into(), + first_attempt_at.to_rfc3339().into(), + ); + dl.insert( + "last_attempt_at".into(), + last_attempt_at.to_rfc3339().into(), + ); + m.insert("dead_letter".into(), dl.into()); + } + } + m.into() +} + fn invocation_type_str(it: InvocationType) -> &'static str { match it { InvocationType::Http => "http", diff --git a/crates/executor-core/src/sdk/kv.rs b/crates/executor-core/src/sdk/kv.rs new file mode 100644 index 0000000..40b0efc --- /dev/null +++ b/crates/executor-core/src/sdk/kv.rs @@ -0,0 +1,193 @@ +//! `kv::` Rhai bridge — collection-scoped handle pattern. +//! +//! ```rhai +//! let widgets = kv::collection("widgets"); +//! widgets.set("k", #{ n: 1 }); +//! let v = widgets.get("k"); // value or () if absent +//! if widgets.has("k") { ... } +//! widgets.delete("k"); // bool (was-present) +//! let page = widgets.list(); // returns #{ keys: [...], next_cursor: () } +//! ``` +//! +//! The `KvHandle` custom Rhai type captures the collection name once +//! and routes each call through the injected `Arc` with +//! the per-call `Arc`. **The service derives `app_id` from +//! `cx.app_id` — `app_id` never appears in any function signature +//! script-side, preserving cross-app isolation.** +//! +//! Sync↔async bridge: Rhai is synchronous; the underlying service is +//! async. Closures wrap each call in `Handle::current().block_on(...)` +//! — safe because `LocalExecutorClient` runs the script under +//! `spawn_blocking`, so a runtime handle is reachable and blocking on +//! it doesn't park an async worker. +//! +//! Error convention (per `docs/sdk-shape.md`): +//! - throw on failure (Rhai runtime error string) +//! - `()` for absent values (`get` on a missing key) +//! - `bool` for predicates (`has`; also `delete` returns was-present) + +use std::sync::Arc; + +use picloud_shared::{KvError, KvService, SdkCallCx, Services}; +use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module}; +use tokio::runtime::Handle as TokioHandle; + +use super::bridge::{dynamic_to_json, json_to_dynamic}; + +/// Per-call handle captured by the Rhai SDK. Cheap to clone (two Arcs +/// plus an owned string). +#[derive(Clone)] +pub struct KvHandle { + collection: String, + service: Arc, + cx: Arc, +} + +pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + let kv_service = services.kv.clone(); + + // `kv::collection(name)` — handle constructor lives in the `kv` + // static module so the script-visible call is `kv::collection(...)`. + let mut module = Module::new(); + { + let kv_service = kv_service.clone(); + let cx = cx.clone(); + module.set_native_fn( + "collection", + move |name: &str| -> Result> { + if name.is_empty() { + return Err("kv::collection name must not be empty".into()); + } + Ok(KvHandle { + collection: name.to_string(), + service: kv_service.clone(), + cx: cx.clone(), + }) + }, + ); + } + engine.register_static_module("kv", module.into()); + + // Methods on KvHandle — `register_fn` with `&mut KvHandle` first + // argument lets Rhai dispatch them as `handle.get(k)` / + // `handle.set(k, v)` / etc. through the dot-notation. + engine.register_type_with_name::("KvHandle"); + + register_get(engine); + register_set(engine); + register_has(engine); + register_delete(engine); + register_list(engine); +} + +fn register_get(engine: &mut RhaiEngine) { + engine.register_fn( + "get", + |handle: &mut KvHandle, key: &str| -> Result> { + let h = handle.clone(); + block_on(async move { h.service.get(&h.cx, &h.collection, key).await }) + .map(|opt| opt.map_or(Dynamic::UNIT, json_to_dynamic)) + }, + ); +} + +fn register_set(engine: &mut RhaiEngine) { + engine.register_fn( + "set", + |handle: &mut KvHandle, key: &str, value: Dynamic| -> Result<(), Box> { + let h = handle.clone(); + let json = dynamic_to_json(&value); + block_on(async move { h.service.set(&h.cx, &h.collection, key, json).await }) + }, + ); +} + +fn register_has(engine: &mut RhaiEngine) { + engine.register_fn( + "has", + |handle: &mut KvHandle, key: &str| -> Result> { + let h = handle.clone(); + block_on(async move { h.service.has(&h.cx, &h.collection, key).await }) + }, + ); +} + +fn register_delete(engine: &mut RhaiEngine) { + engine.register_fn( + "delete", + |handle: &mut KvHandle, key: &str| -> Result> { + let h = handle.clone(); + block_on(async move { h.service.delete(&h.cx, &h.collection, key).await }) + }, + ); +} + +fn register_list(engine: &mut RhaiEngine) { + // Zero-arg form — full page, no cursor. + engine.register_fn( + "list", + |handle: &mut KvHandle| -> Result> { list_call(handle, None, 0) }, + ); + + // One-arg form — cursor only. + engine.register_fn( + "list", + |handle: &mut KvHandle, cursor: &str| -> Result> { + list_call(handle, Some(cursor.to_string()), 0) + }, + ); + + // Two-arg form — cursor + limit. + engine.register_fn( + "list", + |handle: &mut KvHandle, cursor: &str, limit: i64| -> Result> { + let limit = u32::try_from(limit.max(0)).unwrap_or(0); + list_call(handle, Some(cursor.to_string()), limit) + }, + ); +} + +fn list_call( + handle: &KvHandle, + cursor: Option, + limit: u32, +) -> Result> { + let h = handle.clone(); + let page = block_on(async move { + h.service + .list(&h.cx, &h.collection, cursor.as_deref(), limit) + .await + })?; + let mut m = Map::new(); + let keys: Array = page.keys.into_iter().map(Dynamic::from).collect(); + m.insert("keys".into(), keys.into()); + m.insert( + "next_cursor".into(), + page.next_cursor.map_or(Dynamic::UNIT, Dynamic::from), + ); + Ok(m) +} + +/// Run an async future inside the synchronous Rhai context. +/// +/// `LocalExecutorClient` wraps script execution in `spawn_blocking`, so +/// the current Tokio runtime is reachable via `Handle::current()`. We +/// block on it directly; we are NOT calling this from an async task, +/// so blocking is the correct primitive (`block_in_place` would also +/// work, but we're already on a blocking worker). +fn block_on(fut: F) -> Result> +where + F: std::future::Future> + Send, + T: Send, +{ + let handle = TokioHandle::try_current().map_err(|e| -> Box { + EvalAltResult::ErrorRuntime( + format!("kv: no tokio runtime available: {e}").into(), + rhai::Position::NONE, + ) + .into() + })?; + handle.block_on(fut).map_err(|err| -> Box { + EvalAltResult::ErrorRuntime(format!("kv: {err}").into(), rhai::Position::NONE).into() + }) +} diff --git a/crates/executor-core/src/sdk/mod.rs b/crates/executor-core/src/sdk/mod.rs index cff56be..a445414 100644 --- a/crates/executor-core/src/sdk/mod.rs +++ b/crates/executor-core/src/sdk/mod.rs @@ -13,6 +13,7 @@ pub mod bridge; pub mod cx; +pub mod kv; pub mod stdlib; pub use bridge::{dynamic_to_json, json_to_dynamic}; @@ -27,14 +28,10 @@ use rhai::Engine as RhaiEngine; /// once per invocation, just after `build_engine` constructs the /// sandboxed Rhai engine and just before script compilation. /// -/// v1.1.0 ships an intentionally empty body — the call site exists so -/// future PRs (KV first) drop their registration logic here rather -/// than reaching into `engine.rs::build_engine`. The signature is -/// locked: subsequent PRs MUST keep the same parameter shape so that -/// hosts don't have to re-thread the plumbing. +/// v1.1.1 wires the first stateful service (KV). Subsequent PRs add a +/// single `::register(...)` line per service. pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) { - // Intentionally inert in v1.1.0. The unused-suppression below is a - // load-bearing placeholder: future PRs replace this `let _` with - // real `register_kv(engine, services, cx.clone())` calls etc. - let _ = (engine, services, cx); + kv::register(engine, services, cx.clone()); + // v1.1.1 commit 8: dead_letters::register(engine, services, cx.clone()); + let _ = cx; } diff --git a/crates/executor-core/src/types.rs b/crates/executor-core/src/types.rs index d974a17..62f9cd4 100644 --- a/crates/executor-core/src/types.rs +++ b/crates/executor-core/src/types.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use chrono::{DateTime, Utc}; -use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox}; +use picloud_shared::{ + AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox, TriggerEvent, +}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -79,6 +81,20 @@ pub struct ExecRequest { /// `execution_id` for direct invocations; preserves the root /// across fan-out for audit log grouping. pub root_execution_id: ExecutionId, + + /// `true` only when the dispatcher resolved this invocation + /// against a `dead_letter` trigger. The retry / dead-letter + /// machinery short-circuits when this is set so handler failures + /// cannot themselves be dead-lettered (design notes §4 + /// recursion-stop rule). + #[serde(default)] + pub is_dead_letter_handler: bool, + + /// The originating event for a triggered invocation. `None` for + /// direct ingress (sync HTTP, manual admin run). Flattened into + /// `ctx.event` by the executor's per-call ctx builder. + #[serde(default)] + pub event: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/executor-core/tests/engine.rs b/crates/executor-core/tests/engine.rs index 7bb1336..daff713 100644 --- a/crates/executor-core/tests/engine.rs +++ b/crates/executor-core/tests/engine.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use picloud_executor_core::{Engine, ExecError, ExecRequest, InvocationType, Limits, LogLevel}; -use picloud_shared::{AppId, ExecutionId, RequestId, ScriptId, ScriptSandbox, Services}; +use picloud_shared::{ + AppId, ExecutionId, KvEventOp, RequestId, ScriptId, ScriptSandbox, Services, TriggerEvent, +}; use serde_json::json; fn req(body: serde_json::Value) -> ExecRequest { @@ -23,6 +25,8 @@ fn req(body: serde_json::Value) -> ExecRequest { principal: None, trigger_depth: 0, root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, } } @@ -235,3 +239,67 @@ fn body_passes_through_nested_json_round_trip() { let resp = engine().execute(src, req(body.clone())).unwrap(); assert_eq!(resp.body, body); } + +#[test] +fn ctx_event_absent_for_direct_invocations() { + // Scripts not fired through the triggers framework see no + // `ctx.event` key — they can use `"event" in ctx` to detect. + let src = r#" + if "event" in ctx { #{ statusCode: 500, body: "should be absent" } } + else { "absent" } + "#; + let resp = engine().execute(src, req(json!(null))).unwrap(); + assert_eq!(resp.body, json!("absent")); +} + +#[test] +fn ctx_event_kv_shape_matches_design_notes() { + // Build an ExecRequest mimicking what the dispatcher hands a + // KV-triggered handler — `event = Some(TriggerEvent::Kv { … })`. + let mut r = req(json!(null)); + r.event = Some(TriggerEvent::Kv { + op: KvEventOp::Insert, + collection: "widgets".into(), + key: "k1".into(), + value: Some(json!({ "n": 1 })), + }); + let src = r" + #{ + source: ctx.event.source, + op: ctx.event.op, + collection: ctx.event.kv.collection, + key: ctx.event.kv.key, + value: ctx.event.kv.value + } + "; + let resp = engine().execute(src, r).unwrap(); + assert_eq!( + resp.body, + json!({ + "source": "kv", + "op": "insert", + "collection": "widgets", + "key": "k1", + "value": { "n": 1 } + }) + ); +} + +#[test] +fn ctx_event_kv_delete_has_unit_value() { + let mut r = req(json!(null)); + r.event = Some(TriggerEvent::Kv { + op: KvEventOp::Delete, + collection: "widgets".into(), + key: "k1".into(), + value: None, + }); + let src = r" + #{ + op: ctx.event.op, + value_is_unit: ctx.event.kv.value == () + } + "; + let resp = engine().execute(src, r).unwrap(); + assert_eq!(resp.body, json!({ "op": "delete", "value_is_unit": true })); +} diff --git a/crates/executor-core/tests/sdk_contract.rs b/crates/executor-core/tests/sdk_contract.rs index 9ae6014..4c27889 100644 --- a/crates/executor-core/tests/sdk_contract.rs +++ b/crates/executor-core/tests/sdk_contract.rs @@ -53,6 +53,8 @@ fn baseline_request() -> ExecRequest { principal: None, trigger_depth: 0, root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, } } diff --git a/crates/executor-core/tests/sdk_kv.rs b/crates/executor-core/tests/sdk_kv.rs new file mode 100644 index 0000000..462e407 --- /dev/null +++ b/crates/executor-core/tests/sdk_kv.rs @@ -0,0 +1,260 @@ +//! `kv::` SDK bridge integration tests — runs a real Rhai engine +//! against an in-memory `KvService` impl. Mirrors how +//! `orchestrator-core::LocalExecutorClient` invokes the engine: under +//! `tokio::task::spawn_blocking` so the bridge's `block_on` has a +//! reachable runtime. + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; +use picloud_shared::{ + AppId, ExecutionId, KvError, KvListPage, KvService, NoopDeadLetterService, NoopEventEmitter, + RequestId, ScriptId, ScriptSandbox, SdkCallCx, Services, +}; +use serde_json::{json, Value}; +use tokio::sync::Mutex; + +#[derive(Default)] +struct InMemoryKv { + data: Mutex>, +} + +#[async_trait] +impl KvService for InMemoryKv { + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + ) -> Result, KvError> { + Ok(self + .data + .lock() + .await + .get(&(cx.app_id, collection.to_string(), key.to_string())) + .cloned()) + } + + async fn set( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + value: Value, + ) -> Result<(), KvError> { + self.data + .lock() + .await + .insert((cx.app_id, collection.to_string(), key.to_string()), value); + Ok(()) + } + + async fn delete(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result { + Ok(self + .data + .lock() + .await + .remove(&(cx.app_id, collection.to_string(), key.to_string())) + .is_some()) + } + + async fn has(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result { + Ok(self.data.lock().await.contains_key(&( + cx.app_id, + collection.to_string(), + key.to_string(), + ))) + } + + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let data = self.data.lock().await; + let mut keys: Vec = data + .iter() + .filter(|((a, c, _), _)| *a == cx.app_id && c == collection) + .map(|((_, _, k), _)| k.clone()) + .filter(|k| cursor.is_none_or(|c| k.as_str() > c)) + .collect(); + keys.sort(); + let take = if limit == 0 { + usize::MAX + } else { + limit as usize + }; + let next_cursor = if keys.len() > take { + keys.truncate(take); + keys.last().cloned() + } else { + None + }; + Ok(KvListPage { keys, next_cursor }) + } +} + +fn make_engine() -> Arc { + let services = Services::new( + Arc::new(InMemoryKv::default()), + Arc::new(NoopDeadLetterService), + Arc::new(NoopEventEmitter), + ); + Arc::new(Engine::new(Limits::default(), services)) +} + +fn baseline_request(app_id: AppId) -> ExecRequest { + let execution_id = ExecutionId::new(); + ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id: ScriptId::new(), + script_name: "kv-test".into(), + invocation_type: InvocationType::Http, + path: "/kv-test".into(), + headers: BTreeMap::new(), + body: Value::Null, + params: BTreeMap::new(), + query: BTreeMap::new(), + rest: String::new(), + sandbox_overrides: ScriptSandbox::default(), + app_id, + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, + } +} + +async fn run_script(engine: Arc, src: &str, req: ExecRequest) -> Value { + let src = src.to_string(); + tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .expect("spawn_blocking should not panic") + .expect("script execution should succeed") + .body +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_set_then_get_round_trip() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let widgets = kv::collection("widgets"); + widgets.set("k1", #{ n: 1 }); + widgets.get("k1") + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!({ "n": 1 })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_get_missing_returns_unit() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = kv::collection("widgets"); + let v = c.get("nope"); + v == () + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!(true)); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_has_returns_bool() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = kv::collection("widgets"); + let before = c.has("k"); + c.set("k", "v"); + let after = c.has("k"); + #{ before: before, after: after } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!({ "before": false, "after": true })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_delete_returns_was_present() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = kv::collection("widgets"); + let nope = c.delete("missing"); + c.set("k", 1); + let yep = c.delete("k"); + #{ nope: nope, yep: yep } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!({ "nope": false, "yep": true })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_empty_collection_name_throws() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#"kv::collection("")"#; + let req = baseline_request(app); + let err = tokio::task::spawn_blocking(move || engine.execute(src, req)) + .await + .unwrap() + .expect_err("empty collection should throw"); + assert!(format!("{err:?}").contains("kv::collection")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_list_pages_via_cursor() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = kv::collection("widgets"); + for i in 0..5 { c.set(`k${i}`, i); } + let p1 = c.list("", 2); + let p2 = c.list(p1.next_cursor, 2); + #{ + p1_keys: p1.keys, + p1_cursor: p1.next_cursor, + p2_keys: p2.keys, + } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + let obj = body.as_object().unwrap(); + let p1_keys = obj["p1_keys"].as_array().unwrap(); + let p2_keys = obj["p2_keys"].as_array().unwrap(); + assert_eq!(p1_keys.len(), 2); + assert_eq!(p2_keys.len(), 2); + assert!(obj["p1_cursor"].is_string()); +} + +/// Cross-app isolation via `cx.app_id` — script with `app_id = A` +/// cannot see entries from `app_id = B`. The kv:: bridge never +/// surfaces `app_id` to the script, so this is enforced purely by the +/// service deriving it from the captured `Arc`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn kv_bridge_preserves_cross_app_isolation() { + let engine = make_engine(); + let app_a = AppId::new(); + let app_b = AppId::new(); + + let writer = r#" + let c = kv::collection("shared"); + c.set("k", "from-a"); + "ok" + "#; + let _ = run_script(engine.clone(), writer, baseline_request(app_a)).await; + + // App B sees nothing under the same collection/key. + let reader = r#" + let c = kv::collection("shared"); + c.get("k") + "#; + let body = run_script(engine, reader, baseline_request(app_b)).await; + assert_eq!(body, Value::Null); +} diff --git a/crates/executor-core/tests/stdlib.rs b/crates/executor-core/tests/stdlib.rs index c3649df..5c61733 100644 --- a/crates/executor-core/tests/stdlib.rs +++ b/crates/executor-core/tests/stdlib.rs @@ -39,6 +39,8 @@ fn baseline_request() -> ExecRequest { principal: None, trigger_depth: 0, root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, } } diff --git a/crates/manager-core/src/kv_service.rs b/crates/manager-core/src/kv_service.rs index 4af0479..881a86c 100644 --- a/crates/manager-core/src/kv_service.rs +++ b/crates/manager-core/src/kv_service.rs @@ -306,6 +306,8 @@ mod tests { request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, } } @@ -322,6 +324,8 @@ mod tests { request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, } } @@ -338,6 +342,8 @@ mod tests { request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, } } diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index a412ff5..8c2210f 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -317,6 +317,11 @@ fn build_exec_request( // preserves the original root for chained executions. trigger_depth: 0, root_execution_id: execution_id, + // Direct invocations are never DL handlers — that flag is only + // set by the dispatcher when it picks a dead_letter trigger row. + is_dead_letter_handler: false, + // No originating trigger event for direct ingress. + event: None, }) } diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 6540a79..4246974 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -18,6 +18,7 @@ pub mod sandbox; pub mod script; pub mod sdk_cx; pub mod services; +pub mod trigger_event; pub mod validator; pub mod version; @@ -35,5 +36,6 @@ pub use sandbox::ScriptSandbox; pub use script::Script; pub use sdk_cx::SdkCallCx; pub use services::Services; +pub use trigger_event::{DeadLetterEventDetail, KvEventOp, TriggerEvent}; pub use validator::{ScriptValidator, ValidationError}; pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION}; diff --git a/crates/shared/src/sdk_cx.rs b/crates/shared/src/sdk_cx.rs index 3fa4b35..43e3d48 100644 --- a/crates/shared/src/sdk_cx.rs +++ b/crates/shared/src/sdk_cx.rs @@ -12,7 +12,7 @@ //! the cx in is shared by both sides. Pure value type — no handles, no //! DB pool references, no allocations beyond what's in `Principal`. -use crate::{AppId, ExecutionId, Principal, RequestId}; +use crate::{AppId, ExecutionId, Principal, RequestId, TriggerEvent}; /// Per-invocation context for every stateful SDK service call. /// @@ -51,4 +51,19 @@ pub struct SdkCallCx { /// `execution_id` of the original ingress execution. Lets the audit /// log group every fan-out execution under the originating event. pub root_execution_id: ExecutionId, + + /// `true` only when this invocation is a `dead_letter` trigger + /// handler. Set by the dispatcher when it picks an outbox row + /// whose trigger has `kind = 'dead_letter'`. The retry / dead- + /// letter machinery short-circuits when this is set: handlers + /// execute once, with no retry, and a failed run can NEVER be + /// dead-lettered itself (design notes §4 recursion-stop rule). + /// `false` for every other invocation, including the script + /// being used as a non-DL trigger handler. + pub is_dead_letter_handler: bool, + + /// The event that fired this script, when it's a triggered + /// invocation. `None` for direct ingress (HTTP request, manual + /// run). Surfaced to scripts as `ctx.event`. + pub event: Option, } diff --git a/crates/shared/src/trigger_event.rs b/crates/shared/src/trigger_event.rs new file mode 100644 index 0000000..86c9e73 --- /dev/null +++ b/crates/shared/src/trigger_event.rs @@ -0,0 +1,105 @@ +//! `TriggerEvent` — the description of the event that fired a script. +//! +//! Built by the dispatcher (in `manager-core`) from the outbox row and +//! attached to the `ExecRequest` that's handed to `executor-core`. The +//! Rhai bridge in `executor-core::engine::build_ctx_map` flattens this +//! into `ctx.event` for the script. +//! +//! Living in `picloud-shared` so the dispatcher and the executor agree +//! on the wire shape. Serializable so cluster mode (v1.3+) can ship +//! ExecRequests over HTTP without rewriting this type. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use crate::{DeadLetterId, ScriptId, TriggerId}; + +/// Operations a KV trigger can fire on. Stored as a lowercase string +/// in `kv_trigger_details.ops` (Postgres `text[]`). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum KvEventOp { + Insert, + Update, + Delete, +} + +impl KvEventOp { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Insert => "insert", + Self::Update => "update", + Self::Delete => "delete", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "insert" => Some(Self::Insert), + "update" => Some(Self::Update), + "delete" => Some(Self::Delete), + _ => None, + } + } +} + +/// Discriminated description of a triggering event. Lifted from the +/// outbox row's payload at dispatch time. Each variant carries the +/// fields the corresponding `ctx.event` shape exposes to the script. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "source", rename_all = "snake_case")] +pub enum TriggerEvent { + /// A KV insert / update / delete fired this handler. + Kv { + op: KvEventOp, + collection: String, + key: String, + /// Present on `insert` and `update`. Absent on `delete`. + #[serde(default, skip_serializing_if = "Option::is_none")] + value: Option, + }, + + /// A dead-letter row fired this handler. The original event is + /// nested verbatim plus the dead-letter metadata the design notes + /// §4 require. + DeadLetter { + dead_letter_id: DeadLetterId, + original: Box, + attempts: u32, + last_error: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + trigger_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + script_id: Option, + first_attempt_at: DateTime, + last_attempt_at: DateTime, + }, +} + +impl TriggerEvent { + /// The `source` discriminant the script sees on `ctx.event.source`. + #[must_use] + pub const fn source(&self) -> &'static str { + match self { + Self::Kv { .. } => "kv", + Self::DeadLetter { .. } => "dead_letter", + } + } +} + +/// Convenience accessor on the dead-letter variant for places that +/// already know they're handling a DL event. Pulled out so the +/// dispatcher and the dashboard don't have to repeat the match. +#[derive(Debug, Clone)] +pub struct DeadLetterEventDetail { + pub dead_letter_id: DeadLetterId, + pub original: TriggerEvent, + pub attempts: u32, + pub last_error: String, + pub trigger_id: Option, + pub script_id: Option, + pub first_attempt_at: DateTime, + pub last_attempt_at: DateTime, +}