diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index b45abbe..c9cb7e4 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -434,6 +434,20 @@ fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic { ); m.insert("files".into(), files_map.into()); } + TriggerEvent::Pubsub { + topic, + message, + published_at, + } => { + // `ctx.event.op` is always "publish" for pub/sub (the only + // op a publish produces). + m.insert("op".into(), "publish".into()); + let mut ps = Map::new(); + ps.insert("topic".into(), topic.clone().into()); + ps.insert("message".into(), json_to_dynamic(message.clone())); + ps.insert("published_at".into(), published_at.to_rfc3339().into()); + m.insert("pubsub".into(), ps.into()); + } TriggerEvent::DeadLetter { dead_letter_id, original, diff --git a/crates/executor-core/src/sdk/mod.rs b/crates/executor-core/src/sdk/mod.rs index b1387f7..74a319c 100644 --- a/crates/executor-core/src/sdk/mod.rs +++ b/crates/executor-core/src/sdk/mod.rs @@ -18,6 +18,7 @@ pub mod docs; pub mod files; pub mod http; pub mod kv; +pub mod pubsub; pub mod stdlib; pub use bridge::{dynamic_to_json, json_to_dynamic}; @@ -39,5 +40,6 @@ pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + let svc = services.pubsub.clone(); + let mut module = Module::new(); + { + let svc = svc.clone(); + let cx = cx.clone(); + module.set_native_fn( + "publish_durable", + move |topic: &str, message: Dynamic| -> Result<(), Box> { + let json = message_to_json(&message); + let svc = svc.clone(); + let cx = cx.clone(); + block_on(async move { svc.publish_durable(&cx, topic, json).await }) + }, + ); + } + engine.register_static_module("pubsub", module.into()); +} + +/// Convert a Rhai `Dynamic` message into JSON, base64-encoding any +/// `Blob` (at any nesting depth). Mirrors `bridge::dynamic_to_json` but +/// adds the blob arm the pub/sub wire contract requires. +fn message_to_json(value: &Dynamic) -> Json { + // Blob must be checked before the generic array path (a Blob is a + // `Vec`, distinct from a Rhai `Array`). + if value.is_blob() { + let blob = value.clone().into_blob().unwrap_or_default(); + return Json::String(STANDARD.encode(&blob)); + } + if value.is_unit() { + return Json::Null; + } + if let Ok(b) = value.as_bool() { + return Json::Bool(b); + } + if let Ok(i) = value.as_int() { + return Json::Number(i.into()); + } + if let Ok(f) = value.as_float() { + return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number); + } + if value.is_string() { + return Json::String(value.clone().into_string().unwrap_or_default()); + } + if let Some(arr) = value.clone().try_cast::() { + return Json::Array(arr.iter().map(message_to_json).collect()); + } + if let Some(map) = value.clone().try_cast::() { + let mut out = serde_json::Map::new(); + for (k, v) in map { + out.insert(k.to_string(), message_to_json(&v)); + } + return Json::Object(out); + } + Json::String(value.to_string()) +} + +/// Run an async future inside the synchronous Rhai context. Mirrors +/// `kv::block_on`. +fn block_on(fut: F) -> Result<(), Box> +where + F: std::future::Future> + Send, +{ + let handle = TokioHandle::try_current().map_err(|e| -> Box { + EvalAltResult::ErrorRuntime( + format!("pubsub: no tokio runtime available: {e}").into(), + rhai::Position::NONE, + ) + .into() + })?; + handle.block_on(fut).map_err(|err| -> Box { + EvalAltResult::ErrorRuntime(format!("pubsub: {err}").into(), rhai::Position::NONE).into() + }) +} diff --git a/crates/executor-core/tests/module_redaction_logging.rs b/crates/executor-core/tests/module_redaction_logging.rs index 8e09c1f..f39e7b7 100644 --- a/crates/executor-core/tests/module_redaction_logging.rs +++ b/crates/executor-core/tests/module_redaction_logging.rs @@ -100,6 +100,7 @@ async fn original_backend_error_is_logged_at_error_level() { Arc::new(FailingSource), Arc::new(NoopHttpService), Arc::new(picloud_shared::NoopFilesService), + Arc::new(picloud_shared::NoopPubsubService), ); let engine = Engine::new(Limits::default(), services); diff --git a/crates/executor-core/tests/modules.rs b/crates/executor-core/tests/modules.rs index 282db48..94d8f05 100644 --- a/crates/executor-core/tests/modules.rs +++ b/crates/executor-core/tests/modules.rs @@ -98,6 +98,7 @@ fn services_with(modules: Arc) -> Services { modules, Arc::new(NoopHttpService), Arc::new(picloud_shared::NoopFilesService), + Arc::new(picloud_shared::NoopPubsubService), ) } diff --git a/crates/executor-core/tests/sdk_docs.rs b/crates/executor-core/tests/sdk_docs.rs index 44512e2..6b2d76c 100644 --- a/crates/executor-core/tests/sdk_docs.rs +++ b/crates/executor-core/tests/sdk_docs.rs @@ -229,6 +229,7 @@ fn make_engine() -> Arc { Arc::new(NoopModuleSource), Arc::new(NoopHttpService), Arc::new(picloud_shared::NoopFilesService), + Arc::new(picloud_shared::NoopPubsubService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_files.rs b/crates/executor-core/tests/sdk_files.rs index 6bf15ab..9e29f0c 100644 --- a/crates/executor-core/tests/sdk_files.rs +++ b/crates/executor-core/tests/sdk_files.rs @@ -166,6 +166,7 @@ fn make_engine() -> Arc { Arc::new(NoopModuleSource), Arc::new(NoopHttpService), Arc::new(InMemoryFiles::default()), + Arc::new(picloud_shared::NoopPubsubService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_http.rs b/crates/executor-core/tests/sdk_http.rs index 447246a..c44510f 100644 --- a/crates/executor-core/tests/sdk_http.rs +++ b/crates/executor-core/tests/sdk_http.rs @@ -89,6 +89,7 @@ fn engine_with(http: Arc) -> Arc { Arc::new(NoopModuleSource), http, Arc::new(picloud_shared::NoopFilesService), + Arc::new(picloud_shared::NoopPubsubService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_kv.rs b/crates/executor-core/tests/sdk_kv.rs index a8706cd..ce8f51f 100644 --- a/crates/executor-core/tests/sdk_kv.rs +++ b/crates/executor-core/tests/sdk_kv.rs @@ -108,6 +108,7 @@ fn make_engine() -> Arc { Arc::new(NoopModuleSource), Arc::new(NoopHttpService), Arc::new(picloud_shared::NoopFilesService), + Arc::new(picloud_shared::NoopPubsubService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_pubsub.rs b/crates/executor-core/tests/sdk_pubsub.rs new file mode 100644 index 0000000..9c64252 --- /dev/null +++ b/crates/executor-core/tests/sdk_pubsub.rs @@ -0,0 +1,157 @@ +//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine +//! against an in-memory `PubsubService` that records the published +//! `(topic, message)`. Verifies the message JSON encoding the wire +//! contract requires: Maps, Arrays, strings, numbers, bool, null, and +//! **Blob → base64**, including nesting. + +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; +use picloud_shared::{ + AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, + NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId, + ScriptId, ScriptSandbox, SdkCallCx, Services, +}; +use serde_json::{json, Value}; + +#[derive(Default)] +struct RecordingPubsub { + last: Mutex>, +} + +#[async_trait] +impl PubsubService for RecordingPubsub { + async fn publish_durable( + &self, + _cx: &SdkCallCx, + topic: &str, + message: Value, + ) -> Result<(), PubsubError> { + if topic.trim().is_empty() { + return Err(PubsubError::EmptyTopic); + } + *self.last.lock().unwrap() = Some((topic.to_string(), message)); + Ok(()) + } +} + +fn make_engine(svc: Arc) -> Arc { + let services = Services::new( + Arc::new(NoopKvService), + Arc::new(NoopDocsService), + Arc::new(NoopDeadLetterService), + Arc::new(NoopEventEmitter), + Arc::new(NoopModuleSource), + Arc::new(NoopHttpService), + Arc::new(NoopFilesService), + svc, + ); + Arc::new(Engine::new(Limits::default(), services)) +} + +fn baseline_request(app_id: AppId) -> ExecRequest { + let execution_id = ExecutionId::new(); + ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id: ScriptId::new(), + script_name: "pubsub-test".into(), + invocation_type: InvocationType::Http, + path: "/pubsub-test".into(), + headers: BTreeMap::new(), + body: Value::Null, + params: BTreeMap::new(), + query: BTreeMap::new(), + rest: String::new(), + sandbox_overrides: ScriptSandbox::default(), + app_id, + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, + } +} + +async fn run(engine: Arc, src: &str, req: ExecRequest) { + let src = src.to_string(); + tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .expect("spawn_blocking should not panic") + .expect("script execution should succeed"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn publish_map_message() { + let svc = Arc::new(RecordingPubsub::default()); + let engine = make_engine(svc.clone()); + run( + engine, + r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#, + baseline_request(AppId::new()), + ) + .await; + let (topic, msg) = svc.last.lock().unwrap().clone().unwrap(); + assert_eq!(topic, "user.created"); + assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn publish_scalar_and_array_and_null() { + let svc = Arc::new(RecordingPubsub::default()); + let engine = make_engine(svc.clone()); + run( + engine, + r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#, + baseline_request(AppId::new()), + ) + .await; + let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); + assert_eq!(msg, json!([1, "two", false, null])); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn publish_number_scalar() { + let svc = Arc::new(RecordingPubsub::default()); + let engine = make_engine(svc.clone()); + run( + engine, + r#"pubsub::publish_durable("metric", 42);"#, + baseline_request(AppId::new()), + ) + .await; + let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); + assert_eq!(msg, json!(42)); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn publish_blob_encodes_base64_including_nested() { + let svc = Arc::new(RecordingPubsub::default()); + let engine = make_engine(svc.clone()); + // base64("hello") = "aGVsbG8=" (STANDARD, padded). + run( + engine, + r#" + let data = base64::decode("aGVsbG8="); + pubsub::publish_durable("blobs", #{ raw: data, list: [data] }); + "#, + baseline_request(AppId::new()), + ) + .await; + let (_t, msg) = svc.last.lock().unwrap().clone().unwrap(); + assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn publish_empty_topic_throws() { + let svc = Arc::new(RecordingPubsub::default()); + let engine = make_engine(svc.clone()); + let src = r#"pubsub::publish_durable("", 1);"#.to_string(); + let req = baseline_request(AppId::new()); + let res = tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .expect("spawn_blocking should not panic"); + assert!(res.is_err(), "empty topic should throw"); +} diff --git a/crates/manager-core/migrations/0020_pubsub_triggers.sql b/crates/manager-core/migrations/0020_pubsub_triggers.sql new file mode 100644 index 0000000..02752f6 --- /dev/null +++ b/crates/manager-core/migrations/0020_pubsub_triggers.sql @@ -0,0 +1,34 @@ +-- v1.1.5: extend the triggers framework to recognise `pubsub` as the +-- sixth concrete kind. Same Layout-E shape as files (0019): two CHECK +-- constraints widen, one new detail table. +-- +-- Pub/sub fans out at PUBLISH time (one outbox row per matching trigger, +-- written by the PubsubServiceImpl), so the dispatcher needs no pubsub- +-- specific branching — a pubsub outbox row dispatches like any other +-- async trigger. + +-- Extend triggers.kind to include 'pubsub'. +ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check; +ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check + CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', 'files', 'pubsub')); + +-- Extend outbox.source_kind to include 'pubsub'. +ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check; +ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check + CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs', + 'cron', 'files', 'pubsub')); + +-- One row per pubsub trigger. `topic_pattern` is "exact", "prefix.*", +-- or "*" — validated in Rust at trigger creation. Topics are implicit +-- on first publish; the external-subscribable `topics` table is v1.1.6. +CREATE TABLE pubsub_trigger_details ( + trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE, + topic_pattern TEXT NOT NULL +); + +-- Hot lookup for fan-out: "all enabled pubsub triggers in app X". +-- Third partial index of its kind (after v1.1.1's idx_triggers_app_kind_ +-- enabled); partial indexes are tiny and the planner picks the narrowest. +CREATE INDEX idx_triggers_app_pubsub_enabled + ON triggers (app_id, kind) + WHERE enabled = TRUE AND kind = 'pubsub'; diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index af105ca..a377160 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -167,7 +167,8 @@ impl Dispatcher { | OutboxSourceKind::Docs | OutboxSourceKind::DeadLetter | OutboxSourceKind::Cron - | OutboxSourceKind::Files => { + | OutboxSourceKind::Files + | OutboxSourceKind::Pubsub => { let resolved = self.resolve_trigger(&row).await?; let req = match self.build_exec_request(&row, &resolved).await { Ok(req) => req, diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index 94e5399..b39bf20 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -43,6 +43,8 @@ pub mod module_source; pub mod outbox_event_emitter; pub mod outbox_repo; pub mod principal_resolver; +pub mod pubsub_repo; +pub mod pubsub_service; pub mod repo; pub mod route_admin; pub mod route_repo; @@ -113,6 +115,8 @@ pub use outbox_repo::{ NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo, }; pub use principal_resolver::{AdminPrincipalResolver, PrincipalResolver, PrincipalResolverError}; +pub use pubsub_repo::{PostgresPubsubRepo, PublishCtx, PubsubRepo, PubsubRepoError}; +pub use pubsub_service::PubsubServiceImpl; pub use repo::{ ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository, RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError, @@ -123,8 +127,8 @@ pub use sandbox::{CeilingError, SandboxCeiling}; pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_repo::{ collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, - CreateKvTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, - PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, - TriggerRepoError, + CreateKvTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, + FilesTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, + TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError, }; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState}; diff --git a/crates/manager-core/src/outbox_repo.rs b/crates/manager-core/src/outbox_repo.rs index 10a9c9d..0ad88be 100644 --- a/crates/manager-core/src/outbox_repo.rs +++ b/crates/manager-core/src/outbox_repo.rs @@ -29,6 +29,8 @@ pub enum OutboxSourceKind { Cron, /// v1.1.5. Files, + /// v1.1.5. + Pubsub, } impl OutboxSourceKind { @@ -41,6 +43,7 @@ impl OutboxSourceKind { Self::DeadLetter => "dead_letter", Self::Cron => "cron", Self::Files => "files", + Self::Pubsub => "pubsub", } } @@ -53,6 +56,7 @@ impl OutboxSourceKind { "dead_letter" => Some(Self::DeadLetter), "cron" => Some(Self::Cron), "files" => Some(Self::Files), + "pubsub" => Some(Self::Pubsub), _ => None, } } diff --git a/crates/manager-core/src/pubsub_repo.rs b/crates/manager-core/src/pubsub_repo.rs new file mode 100644 index 0000000..58a1f52 --- /dev/null +++ b/crates/manager-core/src/pubsub_repo.rs @@ -0,0 +1,118 @@ +//! `PubsubRepo` — publish-time fan-out for the v1.1.5 `pubsub::*` SDK. +//! +//! `publish_durable` writes one outbox row per matching enabled `pubsub` +//! trigger, all inside a single transaction so a partial fan-out (some +//! subscribers got rows, others didn't, then a crash) can't happen. +//! Each delivery row then retries / dead-letters independently through +//! the existing dispatcher — no pub/sub-specific dispatch branching. +//! +//! Topic pattern matching runs in Rust (`picloud_shared::topic_matches`) +//! against the small set of the app's enabled pubsub triggers, keeping +//! the SELECT trivial. v1.2 can add a topic-trie index if fan-out +//! becomes a hot path. + +use async_trait::async_trait; +use picloud_shared::{topic_matches, AdminUserId, AppId, ExecutionId}; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, thiserror::Error)] +pub enum PubsubRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), +} + +/// The execution-context bits a fan-out needs to stamp onto each outbox +/// row. Derived from the publishing script's `SdkCallCx`. +#[derive(Debug, Clone, Copy)] +pub struct PublishCtx { + pub app_id: AppId, + pub origin_principal: Option, + pub trigger_depth: u32, + pub root_execution_id: ExecutionId, +} + +#[async_trait] +pub trait PubsubRepo: Send + Sync { + /// Fan out a publish to every matching enabled pubsub trigger in + /// `ctx.app_id`, inserting one outbox row each in a SINGLE + /// transaction. `event_payload` is the serialized + /// `TriggerEvent::Pubsub`. Returns the number of delivery rows + /// written (0 when no trigger matched — the publish still succeeds). + async fn fan_out_publish( + &self, + ctx: PublishCtx, + topic: &str, + event_payload: serde_json::Value, + ) -> Result; +} + +pub struct PostgresPubsubRepo { + pool: PgPool, +} + +impl PostgresPubsubRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +#[derive(sqlx::FromRow)] +struct PubsubTriggerRow { + id: Uuid, + script_id: Uuid, + topic_pattern: String, +} + +#[async_trait] +impl PubsubRepo for PostgresPubsubRepo { + async fn fan_out_publish( + &self, + ctx: PublishCtx, + topic: &str, + event_payload: serde_json::Value, + ) -> Result { + let mut tx = self.pool.begin().await?; + + // Load all enabled pubsub triggers for the app; filter by topic + // pattern in Rust (keeps the query simple, honours the + // empty/`*`/prefix semantics without teaching SQL about globs). + let rows: Vec = sqlx::query_as( + "SELECT t.id, t.script_id, d.topic_pattern \ + FROM triggers t \ + JOIN pubsub_trigger_details d ON d.trigger_id = t.id \ + WHERE t.app_id = $1 AND t.kind = 'pubsub' AND t.enabled = TRUE", + ) + .bind(ctx.app_id.into_inner()) + .fetch_all(&mut *tx) + .await?; + + let mut written: u32 = 0; + for r in rows { + if !topic_matches(&r.topic_pattern, topic) { + continue; + } + sqlx::query( + "INSERT INTO outbox ( \ + app_id, source_kind, trigger_id, script_id, reply_to, \ + payload, origin_principal, trigger_depth, root_execution_id \ + ) VALUES ($1, 'pubsub', $2, $3, NULL, $4, $5, $6, $7)", + ) + .bind(ctx.app_id.into_inner()) + .bind(r.id) + .bind(r.script_id) + .bind(&event_payload) + .bind(ctx.origin_principal.map(AdminUserId::into_inner)) + .bind(i32::try_from(ctx.trigger_depth.saturating_add(1)).unwrap_or(1)) + .bind(ctx.root_execution_id.into_inner()) + .execute(&mut *tx) + .await?; + written += 1; + } + + // Commit once — all rows or none. + tx.commit().await?; + Ok(written) + } +} diff --git a/crates/manager-core/src/pubsub_service.rs b/crates/manager-core/src/pubsub_service.rs new file mode 100644 index 0000000..71a45d4 --- /dev/null +++ b/crates/manager-core/src/pubsub_service.rs @@ -0,0 +1,320 @@ +//! `PubsubServiceImpl` — wires `PubsubRepo` underneath the +//! `picloud_shared::PubsubService` trait scripts see via the Rhai +//! bridge. +//! +//! Mirrors the other stateful services: script-as-gate authz +//! (`AppPubsubPublish`, skipped when `cx.principal` is `None`), with the +//! backend doing a publish-time outbox fan-out instead of a row write. +//! No `ServiceEventEmitter` here — pub/sub publishes directly to the +//! outbox; it doesn't mutate local data that other triggers observe. + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{PubsubError, PubsubService, SdkCallCx, TriggerEvent}; + +use crate::authz::{self, AuthzRepo, Capability}; +use crate::pubsub_repo::{PublishCtx, PubsubRepo, PubsubRepoError}; + +pub struct PubsubServiceImpl { + repo: Arc, + authz: Arc, +} + +impl PubsubServiceImpl { + #[must_use] + pub fn new(repo: Arc, authz: Arc) -> Self { + Self { repo, authz } + } + + async fn check_publish(&self, cx: &SdkCallCx) -> Result<(), PubsubError> { + if let Some(ref principal) = cx.principal { + authz::require( + &*self.authz, + principal, + Capability::AppPubsubPublish(cx.app_id), + ) + .await + .map_err(|_| PubsubError::Forbidden)?; + } + Ok(()) + } +} + +impl From for PubsubError { + fn from(e: PubsubRepoError) -> Self { + Self::Unavailable(e.to_string()) + } +} + +#[async_trait] +impl PubsubService for PubsubServiceImpl { + async fn publish_durable( + &self, + cx: &SdkCallCx, + topic: &str, + message: serde_json::Value, + ) -> Result<(), PubsubError> { + if topic.trim().is_empty() { + return Err(PubsubError::EmptyTopic); + } + self.check_publish(cx).await?; + + // `published_at` is stamped on the manager side so every + // delivery agrees on one instant. + let event = TriggerEvent::Pubsub { + topic: topic.to_string(), + message, + published_at: chrono::Utc::now(), + }; + let payload = serde_json::to_value(&event) + .map_err(|e| PubsubError::Rejected(format!("event serialize: {e}")))?; + + let publish_ctx = PublishCtx { + app_id: cx.app_id, + origin_principal: cx.principal.as_ref().map(|p| p.user_id), + trigger_depth: cx.trigger_depth, + root_execution_id: cx.root_execution_id, + }; + // No matching triggers → 0 rows written, publish still succeeds. + self.repo + .fan_out_publish(publish_ctx, topic, payload) + .await?; + Ok(()) + } +} + +// ---------------------------------------------------------------------------- +// Tests — in-memory PubsubRepo so unit tests don't need Postgres. The +// real transactional fan-out is covered against a live DB by the +// integration suite; the in-memory fake models the all-or-nothing +// commit so the rollback semantics can be asserted without a DB. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::authz::{AuthzError, AuthzRepo}; + use async_trait::async_trait; + use picloud_shared::{ + topic_matches, AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, Principal, + RequestId, ScriptId, UserId, + }; + use std::sync::Mutex; + + /// In-memory pubsub repo. Holds a set of `(app, pattern)` + /// subscriptions and records the outbox rows a publish would write. + /// `fail_at` simulates a mid-fan-out INSERT failure: when set to + /// `Some(n)`, the n-th (1-indexed) matching row errors and NOTHING + /// is recorded — modelling the single-transaction rollback. + struct InMemoryPubsubRepo { + subs: Vec<(AppId, String)>, + written: Mutex>, + fail_at: Option, + } + + impl InMemoryPubsubRepo { + fn new(subs: Vec<(AppId, String)>) -> Self { + Self { + subs, + written: Mutex::new(Vec::new()), + fail_at: None, + } + } + fn written_count(&self) -> usize { + self.written.lock().unwrap().len() + } + } + + #[async_trait] + impl PubsubRepo for InMemoryPubsubRepo { + async fn fan_out_publish( + &self, + ctx: PublishCtx, + topic: &str, + _event_payload: serde_json::Value, + ) -> Result { + let matches: Vec<&(AppId, String)> = self + .subs + .iter() + .filter(|(a, pat)| *a == ctx.app_id && topic_matches(pat, topic)) + .collect(); + let mut staged = Vec::new(); + for (i, _) in matches.iter().enumerate() { + if self.fail_at == Some(i + 1) { + // Rollback: nothing was committed. + return Err(PubsubRepoError::Db(sqlx::Error::Protocol( + "simulated insert failure".into(), + ))); + } + staged.push((ctx.app_id, topic.to_string())); + } + let n = staged.len(); + self.written.lock().unwrap().extend(staged); + Ok(u32::try_from(n).unwrap_or(u32::MAX)) + } + } + + #[derive(Default)] + struct DenyingAuthzRepo; + #[async_trait] + impl AuthzRepo for DenyingAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(None) + } + } + + #[derive(Default)] + struct EditorAuthzRepo; + #[async_trait] + impl AuthzRepo for EditorAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(Some(AppRole::Editor)) + } + } + + fn anon_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + script_id: ScriptId::new(), + principal: None, + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } + } + + fn member_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Member, + scopes: None, + app_binding: None, + }), + ..anon_cx(app_id) + } + } + + fn svc(repo: Arc, authz: Arc) -> PubsubServiceImpl { + PubsubServiceImpl::new(repo, authz) + } + + #[tokio::test] + async fn publish_writes_one_row_per_matching_trigger() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![ + (app, "user.*".into()), + (app, "user.created".into()), + (app, "order.*".into()), // does not match + ])); + let files = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); + files + .publish_durable(&anon_cx(app), "user.created", serde_json::json!({"id": 1})) + .await + .unwrap(); + // Two of the three subscriptions match "user.created". + assert_eq!(repo.written_count(), 2); + } + + #[tokio::test] + async fn no_matching_trigger_succeeds_silently() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app, "order.*".into())])); + let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); + svc.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1)) + .await + .unwrap(); + assert_eq!(repo.written_count(), 0); + } + + #[tokio::test] + async fn empty_topic_rejected() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); + let svc = svc(repo, Arc::new(DenyingAuthzRepo)); + let err = svc + .publish_durable(&anon_cx(app), " ", serde_json::json!(1)) + .await + .unwrap_err(); + assert!(matches!(err, PubsubError::EmptyTopic)); + } + + #[tokio::test] + async fn cross_app_isolation() { + let app_a = AppId::new(); + let app_b = AppId::new(); + // The only subscription belongs to app B. + let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app_b, "*".into())])); + let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); + // App A publishes — app B's trigger must NOT fire. + svc.publish_durable(&anon_cx(app_a), "user.created", serde_json::json!(1)) + .await + .unwrap(); + assert_eq!(repo.written_count(), 0); + } + + #[tokio::test] + async fn fan_out_is_transactional_all_or_nothing() { + let app = AppId::new(); + let mut repo = InMemoryPubsubRepo::new(vec![ + (app, "*".into()), + (app, "user.*".into()), + (app, "user.created".into()), + ]); + repo.fail_at = Some(3); // fail on the 3rd matching insert + let repo = Arc::new(repo); + let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); + let err = svc + .publish_durable(&anon_cx(app), "user.created", serde_json::json!(1)) + .await + .unwrap_err(); + assert!(matches!(err, PubsubError::Unavailable(_))); + // Rollback: no partial fan-out survived. + assert_eq!(repo.written_count(), 0); + } + + #[tokio::test] + async fn anonymous_cx_skips_authz() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); + let svc = svc(repo, Arc::new(DenyingAuthzRepo)); + // No principal → no authz check even with a denying repo. + svc.publish_durable(&anon_cx(app), "t", serde_json::json!(1)) + .await + .unwrap(); + } + + #[tokio::test] + async fn member_without_role_is_forbidden() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); + let svc = svc(repo, Arc::new(DenyingAuthzRepo)); + let err = svc + .publish_durable(&member_cx(app), "t", serde_json::json!(1)) + .await + .unwrap_err(); + assert!(matches!(err, PubsubError::Forbidden)); + } + + #[tokio::test] + async fn member_with_editor_role_allowed() { + let app = AppId::new(); + let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); + let svc = svc(repo, Arc::new(EditorAuthzRepo)); + svc.publish_durable(&member_cx(app), "t", serde_json::json!(1)) + .await + .unwrap(); + } +} diff --git a/crates/manager-core/src/trigger_repo.rs b/crates/manager-core/src/trigger_repo.rs index 9476e55..0b601ae 100644 --- a/crates/manager-core/src/trigger_repo.rs +++ b/crates/manager-core/src/trigger_repo.rs @@ -55,6 +55,8 @@ pub enum TriggerKind { Cron, /// v1.1.5. Files, + /// v1.1.5. + Pubsub, } impl TriggerKind { @@ -66,6 +68,7 @@ impl TriggerKind { Self::DeadLetter => "dead_letter", Self::Cron => "cron", Self::Files => "files", + Self::Pubsub => "pubsub", } } @@ -77,6 +80,7 @@ impl TriggerKind { "dead_letter" => Some(Self::DeadLetter), "cron" => Some(Self::Cron), "files" => Some(Self::Files), + "pubsub" => Some(Self::Pubsub), _ => None, } } @@ -131,6 +135,8 @@ pub enum TriggerDetails { collection_glob: String, ops: Vec, }, + /// v1.1.5. A topic pattern: exact, `.*`, or `*`. + Pubsub { topic_pattern: String }, } /// Create payload for a KV trigger. Defaults applied at the admin @@ -213,6 +219,19 @@ pub struct FilesTriggerMatch { pub registered_by_principal: AdminUserId, } +/// Create payload for a pubsub trigger (v1.1.5). `topic_pattern` is +/// validated (exact / `.*` / `*`) before insert. +#[derive(Debug, Clone)] +pub struct CreatePubsubTrigger { + pub script_id: ScriptId, + pub topic_pattern: String, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, +} + /// One match for the dispatcher's "which KV triggers fire on this /// event" lookup. Carries everything the dispatcher needs to construct /// the outbox row. @@ -287,6 +306,13 @@ pub trait TriggerRepo: Send + Sync { req: CreateFilesTrigger, ) -> Result; + /// v1.1.5. `topic_pattern` is validated before insert. + async fn create_pubsub_trigger( + &self, + app_id: AppId, + req: CreatePubsubTrigger, + ) -> Result; + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError>; async fn get(&self, id: TriggerId) -> Result, TriggerRepoError>; @@ -675,6 +701,66 @@ impl TriggerRepo for PostgresTriggerRepo { }) } + async fn create_pubsub_trigger( + &self, + app_id: AppId, + req: CreatePubsubTrigger, + ) -> Result { + // Defense-in-depth validation (the admin endpoint validates too). + picloud_shared::validate_topic_pattern(&req.topic_pattern) + .map_err(TriggerRepoError::Invalid)?; + + let mut tx = self.pool.begin().await?; + let parent: TriggerRow = sqlx::query_as( + "INSERT INTO triggers ( \ + app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal \ + ) VALUES ($1, $2, 'pubsub', TRUE, $3, $4, $5, $6, $7) \ + RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(req.script_id.into_inner()) + .bind(req.dispatch_mode.as_str()) + .bind(i32::try_from(req.retry_max_attempts).unwrap_or(3)) + .bind(req.retry_backoff.as_str()) + .bind(i32::try_from(req.retry_base_ms).unwrap_or(1000)) + .bind(req.registered_by_principal.into_inner()) + .fetch_one(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO pubsub_trigger_details (trigger_id, topic_pattern) VALUES ($1, $2)", + ) + .bind(parent.id) + .bind(&req.topic_pattern) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind: TriggerKind::Pubsub, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details: TriggerDetails::Pubsub { + topic_pattern: req.topic_pattern, + }, + }) + } + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { let parents: Vec = sqlx::query_as( "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ @@ -980,6 +1066,17 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { + let row: PubsubDetailRow = sqlx::query_as( + "SELECT topic_pattern FROM pubsub_trigger_details WHERE trigger_id = $1", + ) + .bind(parent.id) + .fetch_one(pool) + .await?; + TriggerDetails::Pubsub { + topic_pattern: row.topic_pattern, + } + } }; Ok(Trigger { @@ -1052,6 +1149,11 @@ struct CronDetailRow { last_fired_at: Option>, } +#[derive(sqlx::FromRow)] +struct PubsubDetailRow { + topic_pattern: String, +} + #[derive(sqlx::FromRow)] #[allow(clippy::struct_field_names)] struct DlDetailRow { diff --git a/crates/manager-core/src/triggers_api.rs b/crates/manager-core/src/triggers_api.rs index fffe695..a9ba63a 100644 --- a/crates/manager-core/src/triggers_api.rs +++ b/crates/manager-core/src/triggers_api.rs @@ -28,7 +28,8 @@ use crate::repo::{ScriptRepository, ScriptRepositoryError}; use crate::trigger_config::{BackoffShape, TriggerConfig}; use crate::trigger_repo::{ CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, - CreateKvTrigger, Trigger, TriggerDispatchMode, TriggerRepo, TriggerRepoError, + CreateKvTrigger, CreatePubsubTrigger, Trigger, TriggerDispatchMode, TriggerRepo, + TriggerRepoError, }; #[derive(Clone)] @@ -57,6 +58,10 @@ pub fn triggers_router(state: TriggersState) -> Router { .route("/apps/{app_id}/triggers/docs", post(create_docs_trigger)) .route("/apps/{app_id}/triggers/cron", post(create_cron_trigger)) .route("/apps/{app_id}/triggers/files", post(create_files_trigger)) + .route( + "/apps/{app_id}/triggers/pubsub", + post(create_pubsub_trigger), + ) .route( "/apps/{app_id}/triggers/dead_letter", post(create_dl_trigger), @@ -349,6 +354,57 @@ async fn create_cron_trigger( Ok((StatusCode::CREATED, Json(created))) } +/// v1.1.5 pubsub trigger. `topic_pattern` is validated to be exact / +/// `.*` / `*`. +#[derive(Debug, Deserialize)] +pub struct CreatePubsubTriggerRequest { + pub script_id: ScriptId, + pub topic_pattern: String, + #[serde(default = "default_dispatch")] + pub dispatch_mode: TriggerDispatchMode, + #[serde(default)] + pub retry_max_attempts: Option, + #[serde(default)] + pub retry_backoff: Option, + #[serde(default)] + pub retry_base_ms: Option, +} + +async fn create_pubsub_trigger( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, + Json(input): Json, +) -> Result<(StatusCode, Json), TriggersApiError> { + ensure_app_exists(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppManageTriggers(app_id), + ) + .await?; + + // Validate the topic pattern before touching the script repo so a + // bad pattern fails fast with a clear 422. + picloud_shared::validate_topic_pattern(&input.topic_pattern) + .map_err(TriggersApiError::Invalid)?; + validate_trigger_target(&*s.scripts, app_id, input.script_id).await?; + + let req = CreatePubsubTrigger { + script_id: input.script_id, + topic_pattern: input.topic_pattern, + dispatch_mode: input.dispatch_mode, + retry_max_attempts: input + .retry_max_attempts + .unwrap_or(s.config.retry_max_attempts), + retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff), + retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms), + registered_by_principal: principal.user_id, + }; + let created = s.triggers.create_pubsub_trigger(app_id, req).await?; + Ok((StatusCode::CREATED, Json(created))) +} + async fn create_files_trigger( State(s): State, Extension(principal): Extension, @@ -542,8 +598,9 @@ mod tests { use super::*; use crate::app_repo::{AppLookup, AppRepository}; use crate::trigger_repo::{ - CreateCronTrigger, CreateFilesTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, - FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo, TriggerRepoError, + CreateCronTrigger, CreateFilesTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, + DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo, + TriggerRepoError, }; use async_trait::async_trait; use chrono::Utc; @@ -703,6 +760,33 @@ mod tests { self.inner.lock().await.insert(id, trigger.clone()); Ok(trigger) } + async fn create_pubsub_trigger( + &self, + app_id: AppId, + req: CreatePubsubTrigger, + ) -> Result { + let now = Utc::now(); + let id = TriggerId::new(); + let trigger = Trigger { + id, + app_id, + script_id: req.script_id, + kind: crate::trigger_repo::TriggerKind::Pubsub, + enabled: true, + dispatch_mode: req.dispatch_mode, + retry_max_attempts: req.retry_max_attempts, + retry_backoff: req.retry_backoff, + retry_base_ms: req.retry_base_ms, + registered_by_principal: req.registered_by_principal, + created_at: now, + updated_at: now, + details: TriggerDetails::Pubsub { + topic_pattern: req.topic_pattern, + }, + }; + self.inner.lock().await.insert(id, trigger.clone()); + Ok(trigger) + } async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { Ok(self .inner diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 55197e6..8e05ff4 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -22,9 +22,9 @@ use picloud_manager_core::{ PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo, PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, - PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, - RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, - TriggerConfig, TriggerRepo, TriggersState, + PostgresPubsubRepo, PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, + PrincipalResolver, PubsubServiceImpl, RepoResolver, RouteAdminState, RouteRepository, + SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo, TriggersState, }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ @@ -33,8 +33,8 @@ use picloud_orchestrator_core::{ }; use picloud_shared::{ DeadLetterService, DocsService, ExecutionLogSink, FilesService, HttpService, InboxResolver, - KvService, OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, - PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, + KvService, OutboxWriter, PubsubService, ScriptValidator, ServiceEventEmitter, Services, + API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -169,7 +169,22 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { events.clone(), files_max_size, )); - let services = Services::new(kv, docs, dl_service.clone(), events, modules, http, files); + // v1.1.5 durable pub/sub. Publishes fan out to matching pubsub + // triggers at publish time (one outbox row each), delivered by the + // same dispatcher as every other async trigger. + let pubsub_repo = Arc::new(PostgresPubsubRepo::new(pool.clone())); + let pubsub: Arc = + Arc::new(PubsubServiceImpl::new(pubsub_repo, authz.clone())); + let services = Services::new( + kv, + docs, + dl_service.clone(), + events, + modules, + http, + files, + pubsub, + ); let engine = Arc::new(Engine::new(Limits::default(), services)); // Compile the routes table once at startup; admin writes refresh it. diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index aee6c12..64fab48 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -20,6 +20,7 @@ pub mod kv; pub mod log_sink; pub mod modules; pub mod outbox_writer; +pub mod pubsub; pub mod route; pub mod sandbox; pub mod script; @@ -50,6 +51,9 @@ pub use kv::{KvError, KvListPage, KvService, NoopKvService}; pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use modules::{ModuleScript, ModuleSource, ModuleSourceError, NoopModuleSource}; pub use outbox_writer::{HttpDispatchPayload, NewHttpOutbox, OutboxWriter, OutboxWriterError}; +pub use pubsub::{ + topic_matches, validate_topic_pattern, NoopPubsubService, PubsubError, PubsubService, +}; pub use route::{DispatchMode, HostKind, PathKind, Route}; pub use sandbox::ScriptSandbox; pub use script::{Script, ScriptKind}; diff --git a/crates/shared/src/pubsub.rs b/crates/shared/src/pubsub.rs new file mode 100644 index 0000000..2f78d60 --- /dev/null +++ b/crates/shared/src/pubsub.rs @@ -0,0 +1,161 @@ +//! `PubsubService` — the v1.1.5 durable pub/sub contract. +//! +//! `pubsub::publish_durable(topic, message)` writes to the universal +//! outbox; the publish-time fan-out inserts one delivery row per +//! matching `pubsub` trigger, and each delivery retries / dead-letters +//! independently (the dispatcher already handles one-row-equals-one- +//! dispatch — no dispatcher changes for pub/sub). +//! +//! `publish_ephemeral` is committed as a v1.2 addition — the suffix +//! naming exists now so users learn "durable by default" from day one. +//! +//! Topic pattern matching runs in Rust (not SQL) so the trigger-select +//! query stays simple. The matcher + validator live here in +//! `picloud-shared` so the manager-core publish path, the admin trigger +//! endpoint, and tests all agree on the rules. + +use async_trait::async_trait; +use thiserror::Error; + +use crate::SdkCallCx; + +#[async_trait] +pub trait PubsubService: Send + Sync { + /// Durable publish: writes the message to the outbox, fanned out to + /// every matching enabled `pubsub` trigger in `cx.app_id`. Succeeds + /// silently (zero rows written) when no trigger matches the topic. + async fn publish_durable( + &self, + cx: &SdkCallCx, + topic: &str, + message: serde_json::Value, + ) -> Result<(), PubsubError>; +} + +#[derive(Debug, Error)] +pub enum PubsubError { + /// Empty topic; rejected at the SDK boundary. + #[error("topic must not be empty")] + EmptyTopic, + + /// Caller principal lacked the required capability. Only raised when + /// `cx.principal.is_some()` (script-as-gate; public HTTP skips it). + #[error("forbidden")] + Forbidden, + + /// Serialization / validation failure on the message. + #[error("pubsub rejected: {0}")] + Rejected(String), + + /// Anything else — Postgres unavailable, etc. + #[error("pubsub backend error: {0}")] + Unavailable(String), +} + +/// Match a stored `topic_pattern` against a published `topic`. +/// +/// - `"*"` matches every topic. +/// - `".*"` matches any topic starting with `"."`. +/// - anything else is an exact match. +/// +/// Mid-pattern wildcards (`*.created`, `a.*.b`) are NOT supported — they +/// are rejected at trigger creation by [`validate_topic_pattern`], so +/// the only patterns reaching this matcher are exact / prefix / `*`. +#[must_use] +pub fn topic_matches(pattern: &str, topic: &str) -> bool { + if pattern == "*" { + return true; + } + if let Some(prefix) = pattern.strip_suffix('*') { + // `prefix` retains the trailing '.', e.g. "user." for "user.*". + return topic.starts_with(prefix); + } + pattern == topic +} + +/// Validate a subscription topic pattern. Accepts exactly: `"*"` +/// (universal), `".*"` (prefix wildcard, single trailing star), +/// or a literal with no `*` (exact). Everything else — mid-pattern +/// wildcards, multiple stars, a star not at the end — is rejected. +/// +/// # Errors +/// +/// Returns `Err(message)` with `"unsupported pubsub topic pattern: …"` +/// for any unsupported shape (or an empty pattern). +pub fn validate_topic_pattern(pattern: &str) -> Result<(), String> { + if pattern.is_empty() { + return Err("unsupported pubsub topic pattern: ".to_string()); + } + if pattern == "*" { + return Ok(()); + } + let stars = pattern.matches('*').count(); + if stars == 0 { + return Ok(()); // exact + } + if stars == 1 && pattern.ends_with(".*") { + return Ok(()); // prefix wildcard + } + Err(format!("unsupported pubsub topic pattern: {pattern}")) +} + +/// Stub for the test harness so executor-core integration tests can +/// build a `Services` bundle without a database. Every call errors. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopPubsubService; + +#[async_trait] +impl PubsubService for NoopPubsubService { + async fn publish_durable( + &self, + _cx: &SdkCallCx, + _topic: &str, + _message: serde_json::Value, + ) -> Result<(), PubsubError> { + Err(PubsubError::Unavailable("pubsub is not wired in".into())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn exact_match() { + assert!(topic_matches("user.created", "user.created")); + assert!(!topic_matches("user.created", "user.deleted")); + assert!(!topic_matches("user.created", "user.created.x")); + } + + #[test] + fn prefix_wildcard() { + assert!(topic_matches("user.*", "user.created")); + assert!(topic_matches("user.*", "user.deleted")); + assert!(!topic_matches("user.*", "users.created")); + assert!(!topic_matches("user.*", "order.created")); + } + + #[test] + fn universal() { + assert!(topic_matches("*", "anything")); + assert!(topic_matches("*", "a.b.c")); + } + + #[test] + fn validation_accepts_supported_shapes() { + assert!(validate_topic_pattern("*").is_ok()); + assert!(validate_topic_pattern("user.created").is_ok()); + assert!(validate_topic_pattern("user.*").is_ok()); + assert!(validate_topic_pattern("a.b.c").is_ok()); + } + + #[test] + fn validation_rejects_unsupported_shapes() { + for bad in ["*.created", "**", "a.*.b", "user.*x", "*user", ""] { + assert!( + validate_topic_pattern(bad).is_err(), + "expected {bad:?} to be rejected" + ); + } + } +} diff --git a/crates/shared/src/services.rs b/crates/shared/src/services.rs index 97328ab..2f41f65 100644 --- a/crates/shared/src/services.rs +++ b/crates/shared/src/services.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::{ DeadLetterService, DocsService, FilesService, HttpService, KvService, ModuleSource, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService, - NoopKvService, NoopModuleSource, ServiceEventEmitter, + NoopKvService, NoopModuleSource, NoopPubsubService, PubsubService, ServiceEventEmitter, }; /// SDK service bundle. See module docs for the lifecycle and the v1.1.x @@ -67,6 +67,12 @@ pub struct Services { /// picloud binary; `NoopFilesService` in tests that don't touch /// files. pub files: Arc, + + /// Durable pub/sub (v1.1.5). Scripts get + /// `pubsub::publish_durable(topic, message)`. Backed by a + /// publish-time outbox fan-out in the picloud binary; + /// `NoopPubsubService` in tests that don't publish. + pub pubsub: Arc, } impl Services { @@ -74,6 +80,7 @@ impl Services { /// The picloud binary's `main` wires this up after the DB pool is /// open; tests build it from in-memory fakes. #[must_use] + #[allow(clippy::too_many_arguments)] // one Arc per stateful service; a builder would just move the noise pub fn new( kv: Arc, docs: Arc, @@ -82,6 +89,7 @@ impl Services { modules: Arc, http: Arc, files: Arc, + pubsub: Arc, ) -> Self { Self { kv, @@ -91,6 +99,7 @@ impl Services { modules, http, files, + pubsub, } } @@ -109,6 +118,7 @@ impl Services { Arc::new(NoopModuleSource), Arc::new(NoopHttpService), Arc::new(NoopFilesService), + Arc::new(NoopPubsubService), ) } } diff --git a/crates/shared/src/trigger_event.rs b/crates/shared/src/trigger_event.rs index 0b399b9..ac79813 100644 --- a/crates/shared/src/trigger_event.rs +++ b/crates/shared/src/trigger_event.rs @@ -177,6 +177,15 @@ pub enum TriggerEvent { prev: Option, }, + /// A durable pub/sub publish fired this handler. v1.1.5. Carries + /// the topic, the JSON-decoded message, and the publish instant. + /// Surfaced to scripts as `ctx.event.pubsub`. + Pubsub { + topic: String, + message: serde_json::Value, + published_at: DateTime, + }, + /// A dead-letter row fired this handler. The original event is /// nested verbatim plus the dead-letter metadata the design notes /// §4 require. @@ -203,6 +212,7 @@ impl TriggerEvent { Self::Docs { .. } => "docs", Self::Cron { .. } => "cron", Self::Files { .. } => "files", + Self::Pubsub { .. } => "pubsub", Self::DeadLetter { .. } => "dead_letter", } } diff --git a/dashboard/src/lib/api.ts b/dashboard/src/lib/api.ts index d9e1108..2540c4f 100644 --- a/dashboard/src/lib/api.ts +++ b/dashboard/src/lib/api.ts @@ -261,6 +261,15 @@ export interface CreateCronTriggerInput { retry_base_ms?: number; } +export interface CreatePubsubTriggerInput { + script_id: string; + topic_pattern: string; + dispatch_mode?: TriggerDispatchMode; + retry_max_attempts?: number; + retry_backoff?: 'exponential' | 'linear' | 'constant'; + retry_base_ms?: number; +} + export interface ExecutionResult { status: number; headers: Record; @@ -632,6 +641,11 @@ export const api = { `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/cron`, { method: 'POST', body: JSON.stringify(input) } ), + createPubsub: (idOrSlug: string, input: CreatePubsubTriggerInput) => + adminRequest( + `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/pubsub`, + { method: 'POST', body: JSON.stringify(input) } + ), remove: (idOrSlug: string, triggerId: string) => adminRequest( `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/${triggerId}`, diff --git a/dashboard/src/routes/apps/[slug]/+page.svelte b/dashboard/src/routes/apps/[slug]/+page.svelte index 77203a9..3c83274 100644 --- a/dashboard/src/routes/apps/[slug]/+page.svelte +++ b/dashboard/src/routes/apps/[slug]/+page.svelte @@ -118,6 +118,11 @@ let createCronTimezone = $state('UTC'); let creatingCron = $state(false); let createCronError = $state(null); + // Pub/Sub triggers (v1.1.5). + let createPubsubScriptId = $state(''); + let createPubsubTopic = $state(''); + let creatingPubsub = $state(false); + let createPubsubError = $state(null); let triggerToRemove = $state(null); let removingTrigger = $state(false); // Endpoint scripts only — modules can't be trigger targets. @@ -153,6 +158,27 @@ } } + async function submitCreatePubsub(e: SubmitEvent) { + e.preventDefault(); + if (!app) return; + creatingPubsub = true; + createPubsubError = null; + try { + await api.triggers.createPubsub(app.id, { + script_id: createPubsubScriptId, + topic_pattern: createPubsubTopic.trim() + }); + createPubsubScriptId = ''; + createPubsubTopic = ''; + await loadTriggers(app.id); + } catch (err) { + createPubsubError = + err instanceof ApiError ? err.message : err instanceof Error ? err.message : String(err); + } finally { + creatingPubsub = false; + } + } + async function confirmRemoveTrigger() { if (!app || !triggerToRemove) return; removingTrigger = true; @@ -843,6 +869,42 @@ +

Pub/Sub triggers

+

+ Subscribe an endpoint script to durable pub/sub messages. Topic + patterns are an exact topic (user.created), a prefix + wildcard (user.*), or * for every topic. +

+ +
+
+ + +
+ {#if createPubsubError} +
{createPubsubError}
+ {/if} +
+ +
+
+ {#if triggers.length === 0}

No triggers in this app yet.

{:else} @@ -857,9 +919,11 @@ last fired: {t.details.last_fired_at ?? 'never'} - {:else if t.details.kind === 'kv' || t.details.kind === 'docs'} + {:else if t.details.kind === 'kv' || t.details.kind === 'docs' || t.details.kind === 'files'} {t.details.collection_glob} — {t.details.ops.join(', ') || 'any op'} + {:else if t.details.kind === 'pubsub'} + {t.details.topic_pattern} {/if} → {t.script_id}