HTTP (`http::*`):
- `HttpService` trait (picloud-shared) + reqwest-backed `HttpServiceImpl`
(manager-core), wired into the `Services` bundle.
- SSRF deny-list applied to the resolved IP via a custom reqwest
`dns_resolver` (covers every redirect hop + defeats DNS rebinding) plus
a literal-IP check at URL-parse time. Scheme/port restrictions, request
+ response body caps (stream-with-cap), layered timeout. Error reason is
a CIDR category, never the IP. `PICLOUD_HTTP_ALLOW_PRIVATE` dev override
(logs a startup warning).
- Rhai bridge with three-arg split `verb(url, body, opts)` (resolves the
brief's body-vs-opts contradiction; unknown opt keys throw). Body
dispatch by type; response `#{status,headers,body,body_raw}` with JSON
auto-parse; non-2xx does not throw.
- `Capability::AppHttpRequest` → existing `script:write` scope (no new
Scope variant). `SdkCallCx` gains `script_id` (attribution + User-Agent).
Cron triggers (4th trigger kind):
- Migration 0017 widens the kind/source_kind CHECKs and adds
`cron_trigger_details`. `cron`/`chrono-tz` parse + validate 6-field
schedules and IANA timezones.
- `spawn_cron_scheduler` polls due triggers and enqueues to the universal
outbox; the dispatcher delivers them (one-line match-arm extension).
Catch-up fires exactly once per trigger per tick, not once per missed
window. `ctx.event.cron` for handlers.
- `POST /api/v1/admin/apps/{id}/triggers/cron` reuses the v1.1.3
cross-app + kind!=module target check.
- Dashboard: admin-gated Triggers tab (cron create form + list).
Follow-ups: redact module backend errors at the resolver boundary (log
original at error level); pin `rhai = "=1.24"`; CHANGELOG incl. retroactive
v1.1.3 cross-app-trigger security note. Version bumps: workspace 1.1.4,
SDK 1.5, dashboard 0.10.0.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
265 lines
8.1 KiB
Rust
265 lines
8.1 KiB
Rust
//! `kv::` SDK bridge integration tests — runs a real Rhai engine
|
|
//! against an in-memory `KvService` impl. Mirrors how
|
|
//! `orchestrator-core::LocalExecutorClient` invokes the engine: under
|
|
//! `tokio::task::spawn_blocking` so the bridge's `block_on` has a
|
|
//! reachable runtime.
|
|
|
|
use std::collections::{BTreeMap, HashMap};
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
|
|
use picloud_shared::{
|
|
AppId, ExecutionId, KvError, KvListPage, KvService, NoopDeadLetterService, NoopDocsService,
|
|
NoopEventEmitter, NoopHttpService, NoopModuleSource, RequestId, ScriptId, ScriptSandbox,
|
|
SdkCallCx, Services,
|
|
};
|
|
use serde_json::{json, Value};
|
|
use tokio::sync::Mutex;
|
|
|
|
#[derive(Default)]
|
|
struct InMemoryKv {
|
|
data: Mutex<HashMap<(AppId, String, String), Value>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl KvService for InMemoryKv {
|
|
async fn get(
|
|
&self,
|
|
cx: &SdkCallCx,
|
|
collection: &str,
|
|
key: &str,
|
|
) -> Result<Option<Value>, KvError> {
|
|
Ok(self
|
|
.data
|
|
.lock()
|
|
.await
|
|
.get(&(cx.app_id, collection.to_string(), key.to_string()))
|
|
.cloned())
|
|
}
|
|
|
|
async fn set(
|
|
&self,
|
|
cx: &SdkCallCx,
|
|
collection: &str,
|
|
key: &str,
|
|
value: Value,
|
|
) -> Result<(), KvError> {
|
|
self.data
|
|
.lock()
|
|
.await
|
|
.insert((cx.app_id, collection.to_string(), key.to_string()), value);
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result<bool, KvError> {
|
|
Ok(self
|
|
.data
|
|
.lock()
|
|
.await
|
|
.remove(&(cx.app_id, collection.to_string(), key.to_string()))
|
|
.is_some())
|
|
}
|
|
|
|
async fn has(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result<bool, KvError> {
|
|
Ok(self.data.lock().await.contains_key(&(
|
|
cx.app_id,
|
|
collection.to_string(),
|
|
key.to_string(),
|
|
)))
|
|
}
|
|
|
|
async fn list(
|
|
&self,
|
|
cx: &SdkCallCx,
|
|
collection: &str,
|
|
cursor: Option<&str>,
|
|
limit: u32,
|
|
) -> Result<KvListPage, KvError> {
|
|
let data = self.data.lock().await;
|
|
let mut keys: Vec<String> = data
|
|
.iter()
|
|
.filter(|((a, c, _), _)| *a == cx.app_id && c == collection)
|
|
.map(|((_, _, k), _)| k.clone())
|
|
.filter(|k| cursor.is_none_or(|c| k.as_str() > c))
|
|
.collect();
|
|
keys.sort();
|
|
let take = if limit == 0 {
|
|
usize::MAX
|
|
} else {
|
|
limit as usize
|
|
};
|
|
let next_cursor = if keys.len() > take {
|
|
keys.truncate(take);
|
|
keys.last().cloned()
|
|
} else {
|
|
None
|
|
};
|
|
Ok(KvListPage { keys, next_cursor })
|
|
}
|
|
}
|
|
|
|
fn make_engine() -> Arc<Engine> {
|
|
let services = Services::new(
|
|
Arc::new(InMemoryKv::default()),
|
|
Arc::new(NoopDocsService),
|
|
Arc::new(NoopDeadLetterService),
|
|
Arc::new(NoopEventEmitter),
|
|
Arc::new(NoopModuleSource),
|
|
Arc::new(NoopHttpService),
|
|
);
|
|
Arc::new(Engine::new(Limits::default(), services))
|
|
}
|
|
|
|
fn baseline_request(app_id: AppId) -> ExecRequest {
|
|
let execution_id = ExecutionId::new();
|
|
ExecRequest {
|
|
execution_id,
|
|
request_id: RequestId::new(),
|
|
script_id: ScriptId::new(),
|
|
script_name: "kv-test".into(),
|
|
invocation_type: InvocationType::Http,
|
|
path: "/kv-test".into(),
|
|
headers: BTreeMap::new(),
|
|
body: Value::Null,
|
|
params: BTreeMap::new(),
|
|
query: BTreeMap::new(),
|
|
rest: String::new(),
|
|
sandbox_overrides: ScriptSandbox::default(),
|
|
app_id,
|
|
principal: None,
|
|
trigger_depth: 0,
|
|
root_execution_id: execution_id,
|
|
is_dead_letter_handler: false,
|
|
event: None,
|
|
}
|
|
}
|
|
|
|
async fn run_script(engine: Arc<Engine>, src: &str, req: ExecRequest) -> Value {
|
|
let src = src.to_string();
|
|
tokio::task::spawn_blocking(move || engine.execute(&src, req))
|
|
.await
|
|
.expect("spawn_blocking should not panic")
|
|
.expect("script execution should succeed")
|
|
.body
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_set_then_get_round_trip() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"
|
|
let widgets = kv::collection("widgets");
|
|
widgets.set("k1", #{ n: 1 });
|
|
widgets.get("k1")
|
|
"#;
|
|
let body = run_script(engine, src, baseline_request(app)).await;
|
|
assert_eq!(body, json!({ "n": 1 }));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_get_missing_returns_unit() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"
|
|
let c = kv::collection("widgets");
|
|
let v = c.get("nope");
|
|
v == ()
|
|
"#;
|
|
let body = run_script(engine, src, baseline_request(app)).await;
|
|
assert_eq!(body, json!(true));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_has_returns_bool() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"
|
|
let c = kv::collection("widgets");
|
|
let before = c.has("k");
|
|
c.set("k", "v");
|
|
let after = c.has("k");
|
|
#{ before: before, after: after }
|
|
"#;
|
|
let body = run_script(engine, src, baseline_request(app)).await;
|
|
assert_eq!(body, json!({ "before": false, "after": true }));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_delete_returns_was_present() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"
|
|
let c = kv::collection("widgets");
|
|
let nope = c.delete("missing");
|
|
c.set("k", 1);
|
|
let yep = c.delete("k");
|
|
#{ nope: nope, yep: yep }
|
|
"#;
|
|
let body = run_script(engine, src, baseline_request(app)).await;
|
|
assert_eq!(body, json!({ "nope": false, "yep": true }));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_empty_collection_name_throws() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"kv::collection("")"#;
|
|
let req = baseline_request(app);
|
|
let err = tokio::task::spawn_blocking(move || engine.execute(src, req))
|
|
.await
|
|
.unwrap()
|
|
.expect_err("empty collection should throw");
|
|
assert!(format!("{err:?}").contains("kv::collection"));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_list_pages_via_cursor() {
|
|
let engine = make_engine();
|
|
let app = AppId::new();
|
|
let src = r#"
|
|
let c = kv::collection("widgets");
|
|
for i in 0..5 { c.set(`k${i}`, i); }
|
|
let p1 = c.list("", 2);
|
|
let p2 = c.list(p1.next_cursor, 2);
|
|
#{
|
|
p1_keys: p1.keys,
|
|
p1_cursor: p1.next_cursor,
|
|
p2_keys: p2.keys,
|
|
}
|
|
"#;
|
|
let body = run_script(engine, src, baseline_request(app)).await;
|
|
let obj = body.as_object().unwrap();
|
|
let p1_keys = obj["p1_keys"].as_array().unwrap();
|
|
let p2_keys = obj["p2_keys"].as_array().unwrap();
|
|
assert_eq!(p1_keys.len(), 2);
|
|
assert_eq!(p2_keys.len(), 2);
|
|
assert!(obj["p1_cursor"].is_string());
|
|
}
|
|
|
|
/// Cross-app isolation via `cx.app_id` — script with `app_id = A`
|
|
/// cannot see entries from `app_id = B`. The kv:: bridge never
|
|
/// surfaces `app_id` to the script, so this is enforced purely by the
|
|
/// service deriving it from the captured `Arc<SdkCallCx>`.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn kv_bridge_preserves_cross_app_isolation() {
|
|
let engine = make_engine();
|
|
let app_a = AppId::new();
|
|
let app_b = AppId::new();
|
|
|
|
let writer = r#"
|
|
let c = kv::collection("shared");
|
|
c.set("k", "from-a");
|
|
"ok"
|
|
"#;
|
|
let _ = run_script(engine.clone(), writer, baseline_request(app_a)).await;
|
|
|
|
// App B sees nothing under the same collection/key.
|
|
let reader = r#"
|
|
let c = kv::collection("shared");
|
|
c.get("k")
|
|
"#;
|
|
let body = run_script(engine, reader, baseline_request(app_b)).await;
|
|
assert_eq!(body, Value::Null);
|
|
}
|