feat(v1.1.1-kv): Rhai kv:: SDK module + ctx.event wiring

Wires the KV store into Rhai scripts via the handle pattern:

    let widgets = kv::collection("widgets");
    widgets.set("k", #{ n: 1 });
    let v = widgets.get("k");          // value or () if absent
    widgets.has("k") / widgets.delete("k")
    let page = widgets.list();          // cursor-style pagination

`KvHandle` is a custom Rhai type holding `Arc<dyn KvService>` + the
per-call `Arc<SdkCallCx>`. Methods route async service calls through
`tokio::Handle::current().block_on(...)` — works because
`LocalExecutorClient` runs the script under `spawn_blocking` so a
runtime is reachable. The bridge surfaces `app_id` exclusively
through `cx.app_id`; no public-facing argument can spoof an app.

`TriggerEvent` lands in `picloud-shared` as the wire shape the
dispatcher will emit (KV + DeadLetter variants — KV exercised now,
DL hooks up with the dispatcher in commit 5/8). `SdkCallCx` and
`ExecRequest` grow `is_dead_letter_handler: bool` and
`event: Option<TriggerEvent>`. `engine.rs::build_ctx_map` flattens
the event into `ctx.event` for triggered handlers; direct ingress
leaves the key absent so scripts can `if "event" in ctx`.

Tests:
- 7 `sdk_kv.rs` integration tests covering the full Rhai surface
  (round-trip, missing-key unit, has bool, delete was-present,
  empty-collection rejection, cursor pagination, cross-app
  isolation through the bridge).
- 3 new `engine.rs` tests pinning `ctx.event` shape per
  design notes §4 (KV insert with value, delete with unit value,
  direct invocations have no `event` key).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-01 21:38:41 +02:00
parent 434fb63cd2
commit 6b99f74c48
15 changed files with 767 additions and 13 deletions

View File

@@ -3,7 +3,9 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;
use chrono::Utc;
use picloud_shared::{ScriptValidator, SdkCallCx, Services, ValidationError, SDK_VERSION};
use picloud_shared::{
ScriptValidator, SdkCallCx, Services, TriggerEvent, ValidationError, SDK_VERSION,
};
use rhai::{Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module, Scope};
use serde_json::Value as Json;
@@ -75,6 +77,8 @@ impl Engine {
request_id: req.request_id,
trigger_depth: req.trigger_depth,
root_execution_id: req.root_execution_id,
is_dead_letter_handler: req.is_dead_letter_handler,
event: req.event.clone(),
});
sdk::register_all(&mut engine, &self.services, cx);
@@ -239,9 +243,82 @@ fn build_ctx_map(req: &ExecRequest) -> Map {
request.insert("rest".into(), req.rest.clone().into());
ctx.insert("request".into(), request.into());
// Triggered invocations: surface the originating event as
// `ctx.event`. Direct ingress (HTTP request, manual run) leaves
// the key absent so scripts can test `if "event" in ctx`.
if let Some(event) = req.event.as_ref() {
ctx.insert("event".into(), trigger_event_to_dynamic(event));
}
ctx
}
/// Convert a `TriggerEvent` into the `ctx.event` Rhai shape defined in
/// `docs/v1.1.x-design-notes.md` §4 (the dead-letter sub-shape) and
/// §2/blueprint §9 (KV). Each variant becomes a Rhai map with a
/// `source` discriminant plus per-source fields.
fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic {
let mut m = Map::new();
m.insert("source".into(), event.source().into());
match event {
TriggerEvent::Kv {
op,
collection,
key,
value,
} => {
m.insert("op".into(), op.as_str().into());
let mut kv_map = Map::new();
kv_map.insert("collection".into(), collection.clone().into());
kv_map.insert("key".into(), key.clone().into());
kv_map.insert(
"value".into(),
value.clone().map_or(Dynamic::UNIT, json_to_dynamic),
);
m.insert("kv".into(), kv_map.into());
}
TriggerEvent::DeadLetter {
dead_letter_id,
original,
attempts,
last_error,
trigger_id,
script_id,
first_attempt_at,
last_attempt_at,
} => {
let mut dl = Map::new();
dl.insert("id".into(), dead_letter_id.to_string().into());
dl.insert("original".into(), trigger_event_to_dynamic(original));
dl.insert("attempts".into(), i64::from(*attempts).into());
dl.insert("last_error".into(), last_error.clone().into());
dl.insert(
"trigger_id".into(),
trigger_id
.map(|id| Dynamic::from(id.to_string()))
.unwrap_or(Dynamic::UNIT),
);
dl.insert(
"script_id".into(),
script_id
.map(|id| Dynamic::from(id.to_string()))
.unwrap_or(Dynamic::UNIT),
);
dl.insert(
"first_attempt_at".into(),
first_attempt_at.to_rfc3339().into(),
);
dl.insert(
"last_attempt_at".into(),
last_attempt_at.to_rfc3339().into(),
);
m.insert("dead_letter".into(), dl.into());
}
}
m.into()
}
fn invocation_type_str(it: InvocationType) -> &'static str {
match it {
InvocationType::Http => "http",

View File

@@ -0,0 +1,193 @@
//! `kv::` Rhai bridge — collection-scoped handle pattern.
//!
//! ```rhai
//! let widgets = kv::collection("widgets");
//! widgets.set("k", #{ n: 1 });
//! let v = widgets.get("k"); // value or () if absent
//! if widgets.has("k") { ... }
//! widgets.delete("k"); // bool (was-present)
//! let page = widgets.list(); // returns #{ keys: [...], next_cursor: () }
//! ```
//!
//! The `KvHandle` custom Rhai type captures the collection name once
//! and routes each call through the injected `Arc<dyn KvService>` with
//! the per-call `Arc<SdkCallCx>`. **The service derives `app_id` from
//! `cx.app_id` — `app_id` never appears in any function signature
//! script-side, preserving cross-app isolation.**
//!
//! Sync↔async bridge: Rhai is synchronous; the underlying service is
//! async. Closures wrap each call in `Handle::current().block_on(...)`
//! — safe because `LocalExecutorClient` runs the script under
//! `spawn_blocking`, so a runtime handle is reachable and blocking on
//! it doesn't park an async worker.
//!
//! Error convention (per `docs/sdk-shape.md`):
//! - throw on failure (Rhai runtime error string)
//! - `()` for absent values (`get` on a missing key)
//! - `bool` for predicates (`has`; also `delete` returns was-present)
use std::sync::Arc;
use picloud_shared::{KvError, KvService, SdkCallCx, Services};
use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module};
use tokio::runtime::Handle as TokioHandle;
use super::bridge::{dynamic_to_json, json_to_dynamic};
/// Per-call handle captured by the Rhai SDK. Cheap to clone (two Arcs
/// plus an owned string).
#[derive(Clone)]
pub struct KvHandle {
collection: String,
service: Arc<dyn KvService>,
cx: Arc<SdkCallCx>,
}
pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
let kv_service = services.kv.clone();
// `kv::collection(name)` — handle constructor lives in the `kv`
// static module so the script-visible call is `kv::collection(...)`.
let mut module = Module::new();
{
let kv_service = kv_service.clone();
let cx = cx.clone();
module.set_native_fn(
"collection",
move |name: &str| -> Result<KvHandle, Box<EvalAltResult>> {
if name.is_empty() {
return Err("kv::collection name must not be empty".into());
}
Ok(KvHandle {
collection: name.to_string(),
service: kv_service.clone(),
cx: cx.clone(),
})
},
);
}
engine.register_static_module("kv", module.into());
// Methods on KvHandle — `register_fn` with `&mut KvHandle` first
// argument lets Rhai dispatch them as `handle.get(k)` /
// `handle.set(k, v)` / etc. through the dot-notation.
engine.register_type_with_name::<KvHandle>("KvHandle");
register_get(engine);
register_set(engine);
register_has(engine);
register_delete(engine);
register_list(engine);
}
fn register_get(engine: &mut RhaiEngine) {
engine.register_fn(
"get",
|handle: &mut KvHandle, key: &str| -> Result<Dynamic, Box<EvalAltResult>> {
let h = handle.clone();
block_on(async move { h.service.get(&h.cx, &h.collection, key).await })
.map(|opt| opt.map_or(Dynamic::UNIT, json_to_dynamic))
},
);
}
fn register_set(engine: &mut RhaiEngine) {
engine.register_fn(
"set",
|handle: &mut KvHandle, key: &str, value: Dynamic| -> Result<(), Box<EvalAltResult>> {
let h = handle.clone();
let json = dynamic_to_json(&value);
block_on(async move { h.service.set(&h.cx, &h.collection, key, json).await })
},
);
}
fn register_has(engine: &mut RhaiEngine) {
engine.register_fn(
"has",
|handle: &mut KvHandle, key: &str| -> Result<bool, Box<EvalAltResult>> {
let h = handle.clone();
block_on(async move { h.service.has(&h.cx, &h.collection, key).await })
},
);
}
fn register_delete(engine: &mut RhaiEngine) {
engine.register_fn(
"delete",
|handle: &mut KvHandle, key: &str| -> Result<bool, Box<EvalAltResult>> {
let h = handle.clone();
block_on(async move { h.service.delete(&h.cx, &h.collection, key).await })
},
);
}
fn register_list(engine: &mut RhaiEngine) {
// Zero-arg form — full page, no cursor.
engine.register_fn(
"list",
|handle: &mut KvHandle| -> Result<Map, Box<EvalAltResult>> { list_call(handle, None, 0) },
);
// One-arg form — cursor only.
engine.register_fn(
"list",
|handle: &mut KvHandle, cursor: &str| -> Result<Map, Box<EvalAltResult>> {
list_call(handle, Some(cursor.to_string()), 0)
},
);
// Two-arg form — cursor + limit.
engine.register_fn(
"list",
|handle: &mut KvHandle, cursor: &str, limit: i64| -> Result<Map, Box<EvalAltResult>> {
let limit = u32::try_from(limit.max(0)).unwrap_or(0);
list_call(handle, Some(cursor.to_string()), limit)
},
);
}
fn list_call(
handle: &KvHandle,
cursor: Option<String>,
limit: u32,
) -> Result<Map, Box<EvalAltResult>> {
let h = handle.clone();
let page = block_on(async move {
h.service
.list(&h.cx, &h.collection, cursor.as_deref(), limit)
.await
})?;
let mut m = Map::new();
let keys: Array = page.keys.into_iter().map(Dynamic::from).collect();
m.insert("keys".into(), keys.into());
m.insert(
"next_cursor".into(),
page.next_cursor.map_or(Dynamic::UNIT, Dynamic::from),
);
Ok(m)
}
/// Run an async future inside the synchronous Rhai context.
///
/// `LocalExecutorClient` wraps script execution in `spawn_blocking`, so
/// the current Tokio runtime is reachable via `Handle::current()`. We
/// block on it directly; we are NOT calling this from an async task,
/// so blocking is the correct primitive (`block_in_place` would also
/// work, but we're already on a blocking worker).
fn block_on<F, T>(fut: F) -> Result<T, Box<EvalAltResult>>
where
F: std::future::Future<Output = Result<T, KvError>> + Send,
T: Send,
{
let handle = TokioHandle::try_current().map_err(|e| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(
format!("kv: no tokio runtime available: {e}").into(),
rhai::Position::NONE,
)
.into()
})?;
handle.block_on(fut).map_err(|err| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(format!("kv: {err}").into(), rhai::Position::NONE).into()
})
}

View File

@@ -13,6 +13,7 @@
pub mod bridge;
pub mod cx;
pub mod kv;
pub mod stdlib;
pub use bridge::{dynamic_to_json, json_to_dynamic};
@@ -27,14 +28,10 @@ use rhai::Engine as RhaiEngine;
/// once per invocation, just after `build_engine` constructs the
/// sandboxed Rhai engine and just before script compilation.
///
/// v1.1.0 ships an intentionally empty body — the call site exists so
/// future PRs (KV first) drop their registration logic here rather
/// than reaching into `engine.rs::build_engine`. The signature is
/// locked: subsequent PRs MUST keep the same parameter shape so that
/// hosts don't have to re-thread the plumbing.
/// v1.1.1 wires the first stateful service (KV). Subsequent PRs add a
/// single `<service>::register(...)` line per service.
pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
// Intentionally inert in v1.1.0. The unused-suppression below is a
// load-bearing placeholder: future PRs replace this `let _` with
// real `register_kv(engine, services, cx.clone())` calls etc.
let _ = (engine, services, cx);
kv::register(engine, services, cx.clone());
// v1.1.1 commit 8: dead_letters::register(engine, services, cx.clone());
let _ = cx;
}

View File

@@ -1,7 +1,9 @@
use std::collections::BTreeMap;
use chrono::{DateTime, Utc};
use picloud_shared::{AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox};
use picloud_shared::{
AppId, ExecutionId, Principal, RequestId, ScriptId, ScriptSandbox, TriggerEvent,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
@@ -79,6 +81,20 @@ pub struct ExecRequest {
/// `execution_id` for direct invocations; preserves the root
/// across fan-out for audit log grouping.
pub root_execution_id: ExecutionId,
/// `true` only when the dispatcher resolved this invocation
/// against a `dead_letter` trigger. The retry / dead-letter
/// machinery short-circuits when this is set so handler failures
/// cannot themselves be dead-lettered (design notes §4
/// recursion-stop rule).
#[serde(default)]
pub is_dead_letter_handler: bool,
/// The originating event for a triggered invocation. `None` for
/// direct ingress (sync HTTP, manual admin run). Flattened into
/// `ctx.event` by the executor's per-call ctx builder.
#[serde(default)]
pub event: Option<TriggerEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]