Files
PiCloud/crates/executor-core/tests/sdk_pubsub.rs
MechaCat02 2d11090d1a feat(v1.1.7-secrets): secrets SDK + table + admin API + dashboard
Encrypted per-app secrets, reachable from scripts as
secrets::{get,set,delete,list}(name) and managed from the dashboard
Secrets tab. Values are AES-256-GCM-sealed with the process master key
(picloud_shared::crypto) before they touch Postgres; the repo only ever
sees ciphertext + nonce. JSON round-trip preserves Rhai types.

- migration 0023_secrets.sql (PRIMARY KEY (app_id, name)).
- SecretsService trait (picloud-shared) + SecretsServiceImpl + repo
  (manager-core), wired into the Services bundle and Rhai engine.
- Capability::AppSecretsRead/Write (→ script:read / script:write); no
  new Scope variants (seven-scope commitment).
- Admin API GET/POST/DELETE /apps/{id}/secrets (list returns names +
  updated_at, never values).
- build_app now takes a MasterKey, sourced from PICLOUD_SECRET_KEY in
  main.rs; test callers pass a fixed test key.
- 64 KB value cap (PICLOUD_SECRET_MAX_VALUE_BYTES); no ServiceEvent
  emission (secret writes don't fire triggers, by design).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-04 21:37:17 +02:00

159 lines
5.2 KiB
Rust

//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine
//! against an in-memory `PubsubService` that records the published
//! `(topic, message)`. Verifies the message JSON encoding the wire
//! contract requires: Maps, Arrays, strings, numbers, bool, null, and
//! **Blob → base64**, including nesting.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
use picloud_shared::{
AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService,
NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId,
ScriptId, ScriptSandbox, SdkCallCx, Services,
};
use serde_json::{json, Value};
#[derive(Default)]
struct RecordingPubsub {
last: Mutex<Option<(String, Value)>>,
}
#[async_trait]
impl PubsubService for RecordingPubsub {
async fn publish_durable(
&self,
_cx: &SdkCallCx,
topic: &str,
message: Value,
) -> Result<(), PubsubError> {
if topic.trim().is_empty() {
return Err(PubsubError::EmptyTopic);
}
*self.last.lock().unwrap() = Some((topic.to_string(), message));
Ok(())
}
}
fn make_engine(svc: Arc<RecordingPubsub>) -> Arc<Engine> {
let services = Services::new(
Arc::new(NoopKvService),
Arc::new(NoopDocsService),
Arc::new(NoopDeadLetterService),
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(NoopFilesService),
svc,
Arc::new(picloud_shared::NoopSecretsService),
);
Arc::new(Engine::new(Limits::default(), services))
}
fn baseline_request(app_id: AppId) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "pubsub-test".into(),
invocation_type: InvocationType::Http,
path: "/pubsub-test".into(),
headers: BTreeMap::new(),
body: Value::Null,
params: BTreeMap::new(),
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id,
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
is_dead_letter_handler: false,
event: None,
}
}
async fn run(engine: Arc<Engine>, src: &str, req: ExecRequest) {
let src = src.to_string();
tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic")
.expect("script execution should succeed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_map_message() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#,
baseline_request(AppId::new()),
)
.await;
let (topic, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(topic, "user.created");
assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_scalar_and_array_and_null() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!([1, "two", false, null]));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_number_scalar() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("metric", 42);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!(42));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_blob_encodes_base64_including_nested() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
// base64("hello") = "aGVsbG8=" (STANDARD, padded).
run(
engine,
r#"
let data = base64::decode("aGVsbG8=");
pubsub::publish_durable("blobs", #{ raw: data, list: [data] });
"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_empty_topic_throws() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
let src = r#"pubsub::publish_durable("", 1);"#.to_string();
let req = baseline_request(AppId::new());
let res = tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic");
assert!(res.is_err(), "empty topic should throw");
}