//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine //! against an in-memory `PubsubService` that records the published //! `(topic, message)`. Verifies the message JSON encoding the wire //! contract requires: Maps, Arrays, strings, numbers, bool, null, and //! **Blob → base64**, including nesting. use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; use async_trait::async_trait; use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; use picloud_shared::{ AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId, ScriptId, ScriptSandbox, SdkCallCx, Services, }; use serde_json::{json, Value}; #[derive(Default)] struct RecordingPubsub { last: Mutex>, } #[async_trait] impl PubsubService for RecordingPubsub { async fn publish_durable( &self, _cx: &SdkCallCx, topic: &str, message: Value, ) -> Result<(), PubsubError> { if topic.trim().is_empty() { return Err(PubsubError::EmptyTopic); } *self.last.lock().unwrap() = Some((topic.to_string(), message)); Ok(()) } } fn make_engine(svc: Arc) -> Arc { let services = Services::new( Arc::new(NoopKvService), Arc::new(NoopDocsService), Arc::new(NoopDeadLetterService), Arc::new(NoopEventEmitter), Arc::new(NoopModuleSource), Arc::new(NoopHttpService), Arc::new(NoopFilesService), svc, Arc::new(picloud_shared::NoopSecretsService), Arc::new(picloud_shared::NoopEmailService), ); Arc::new(Engine::new(Limits::default(), services)) } fn baseline_request(app_id: AppId) -> ExecRequest { let execution_id = ExecutionId::new(); ExecRequest { execution_id, request_id: RequestId::new(), script_id: ScriptId::new(), script_name: "pubsub-test".into(), invocation_type: InvocationType::Http, path: "/pubsub-test".into(), headers: BTreeMap::new(), body: Value::Null, params: BTreeMap::new(), query: BTreeMap::new(), rest: String::new(), sandbox_overrides: ScriptSandbox::default(), app_id, principal: None, trigger_depth: 0, root_execution_id: execution_id, is_dead_letter_handler: false, event: None, } } async fn run(engine: Arc, src: &str, req: ExecRequest) { let src = src.to_string(); tokio::task::spawn_blocking(move || engine.execute(&src, req)) .await .expect("spawn_blocking should not panic") .expect("script execution should succeed"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_map_message() { let svc = Arc::new(RecordingPubsub::default()); let engine = make_engine(svc.clone()); run( engine, r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#, baseline_request(AppId::new()), ) .await; let (topic, msg) = svc.last.lock().unwrap().clone().unwrap(); assert_eq!(topic, "user.created"); assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true })); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_scalar_and_array_and_null() { let svc = Arc::new(RecordingPubsub::default()); let engine = make_engine(svc.clone()); run( engine, r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#, baseline_request(AppId::new()), ) .await; let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); assert_eq!(msg, json!([1, "two", false, null])); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_number_scalar() { let svc = Arc::new(RecordingPubsub::default()); let engine = make_engine(svc.clone()); run( engine, r#"pubsub::publish_durable("metric", 42);"#, baseline_request(AppId::new()), ) .await; let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); assert_eq!(msg, json!(42)); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_blob_encodes_base64_including_nested() { let svc = Arc::new(RecordingPubsub::default()); let engine = make_engine(svc.clone()); // base64("hello") = "aGVsbG8=" (STANDARD, padded). run( engine, r#" let data = base64::decode("aGVsbG8="); pubsub::publish_durable("blobs", #{ raw: data, list: [data] }); "#, baseline_request(AppId::new()), ) .await; let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] })); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_empty_topic_throws() { let svc = Arc::new(RecordingPubsub::default()); let engine = make_engine(svc.clone()); let src = r#"pubsub::publish_durable("", 1);"#.to_string(); let req = baseline_request(AppId::new()); let res = tokio::task::spawn_blocking(move || engine.execute(&src, req)) .await .expect("spawn_blocking should not panic"); assert!(res.is_err(), "empty topic should throw"); }