Outbound email reachable from scripts as email::send(#{...}) (plain
text) and email::send_html(#{...}) (multipart text + HTML). Backed by a
lettre SMTP relay configured from PICLOUD_SMTP_HOST/PORT/USER/PASSWORD/
TLS/TIMEOUT_SECS; if HOST/USER/PASSWORD aren't all set the service runs
in disabled mode (every send throws NotConfigured, warned at startup).
- EmailService trait + OutboundEmail DTO (picloud-shared);
EmailServiceImpl + EmailTransport seam + lettre transport
(manager-core), wired into the Services bundle and Rhai engine.
- Capability::AppEmailSend (→ script:write); seven-scope commitment held.
- Required-field + RFC5322-ish address validation; 25 MB per-message cap
(PICLOUD_EMAIL_MAX_MESSAGE_BYTES). reply_to defaults to from.
- Per-call connection (pooling deferred to v1.2); no per-app from
validation (operator's SMTP/SPF/DKIM concern).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
160 lines
5.3 KiB
Rust
160 lines
5.3 KiB
Rust
//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine
|
|
//! against an in-memory `PubsubService` that records the published
|
|
//! `(topic, message)`. Verifies the message JSON encoding the wire
|
|
//! contract requires: Maps, Arrays, strings, numbers, bool, null, and
|
|
//! **Blob → base64**, including nesting.
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use async_trait::async_trait;
|
|
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
|
|
use picloud_shared::{
|
|
AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService,
|
|
NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId,
|
|
ScriptId, ScriptSandbox, SdkCallCx, Services,
|
|
};
|
|
use serde_json::{json, Value};
|
|
|
|
#[derive(Default)]
|
|
struct RecordingPubsub {
|
|
last: Mutex<Option<(String, Value)>>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PubsubService for RecordingPubsub {
|
|
async fn publish_durable(
|
|
&self,
|
|
_cx: &SdkCallCx,
|
|
topic: &str,
|
|
message: Value,
|
|
) -> Result<(), PubsubError> {
|
|
if topic.trim().is_empty() {
|
|
return Err(PubsubError::EmptyTopic);
|
|
}
|
|
*self.last.lock().unwrap() = Some((topic.to_string(), message));
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn make_engine(svc: Arc<RecordingPubsub>) -> Arc<Engine> {
|
|
let services = Services::new(
|
|
Arc::new(NoopKvService),
|
|
Arc::new(NoopDocsService),
|
|
Arc::new(NoopDeadLetterService),
|
|
Arc::new(NoopEventEmitter),
|
|
Arc::new(NoopModuleSource),
|
|
Arc::new(NoopHttpService),
|
|
Arc::new(NoopFilesService),
|
|
svc,
|
|
Arc::new(picloud_shared::NoopSecretsService),
|
|
Arc::new(picloud_shared::NoopEmailService),
|
|
);
|
|
Arc::new(Engine::new(Limits::default(), services))
|
|
}
|
|
|
|
fn baseline_request(app_id: AppId) -> ExecRequest {
|
|
let execution_id = ExecutionId::new();
|
|
ExecRequest {
|
|
execution_id,
|
|
request_id: RequestId::new(),
|
|
script_id: ScriptId::new(),
|
|
script_name: "pubsub-test".into(),
|
|
invocation_type: InvocationType::Http,
|
|
path: "/pubsub-test".into(),
|
|
headers: BTreeMap::new(),
|
|
body: Value::Null,
|
|
params: BTreeMap::new(),
|
|
query: BTreeMap::new(),
|
|
rest: String::new(),
|
|
sandbox_overrides: ScriptSandbox::default(),
|
|
app_id,
|
|
principal: None,
|
|
trigger_depth: 0,
|
|
root_execution_id: execution_id,
|
|
is_dead_letter_handler: false,
|
|
event: None,
|
|
}
|
|
}
|
|
|
|
async fn run(engine: Arc<Engine>, src: &str, req: ExecRequest) {
|
|
let src = src.to_string();
|
|
tokio::task::spawn_blocking(move || engine.execute(&src, req))
|
|
.await
|
|
.expect("spawn_blocking should not panic")
|
|
.expect("script execution should succeed");
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn publish_map_message() {
|
|
let svc = Arc::new(RecordingPubsub::default());
|
|
let engine = make_engine(svc.clone());
|
|
run(
|
|
engine,
|
|
r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#,
|
|
baseline_request(AppId::new()),
|
|
)
|
|
.await;
|
|
let (topic, msg) = svc.last.lock().unwrap().clone().unwrap();
|
|
assert_eq!(topic, "user.created");
|
|
assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true }));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn publish_scalar_and_array_and_null() {
|
|
let svc = Arc::new(RecordingPubsub::default());
|
|
let engine = make_engine(svc.clone());
|
|
run(
|
|
engine,
|
|
r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#,
|
|
baseline_request(AppId::new()),
|
|
)
|
|
.await;
|
|
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
|
|
assert_eq!(msg, json!([1, "two", false, null]));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn publish_number_scalar() {
|
|
let svc = Arc::new(RecordingPubsub::default());
|
|
let engine = make_engine(svc.clone());
|
|
run(
|
|
engine,
|
|
r#"pubsub::publish_durable("metric", 42);"#,
|
|
baseline_request(AppId::new()),
|
|
)
|
|
.await;
|
|
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
|
|
assert_eq!(msg, json!(42));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn publish_blob_encodes_base64_including_nested() {
|
|
let svc = Arc::new(RecordingPubsub::default());
|
|
let engine = make_engine(svc.clone());
|
|
// base64("hello") = "aGVsbG8=" (STANDARD, padded).
|
|
run(
|
|
engine,
|
|
r#"
|
|
let data = base64::decode("aGVsbG8=");
|
|
pubsub::publish_durable("blobs", #{ raw: data, list: [data] });
|
|
"#,
|
|
baseline_request(AppId::new()),
|
|
)
|
|
.await;
|
|
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
|
|
assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] }));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn publish_empty_topic_throws() {
|
|
let svc = Arc::new(RecordingPubsub::default());
|
|
let engine = make_engine(svc.clone());
|
|
let src = r#"pubsub::publish_durable("", 1);"#.to_string();
|
|
let req = baseline_request(AppId::new());
|
|
let res = tokio::task::spawn_blocking(move || engine.execute(&src, req))
|
|
.await
|
|
.expect("spawn_blocking should not panic");
|
|
assert!(res.is_err(), "empty topic should throw");
|
|
}
|