feat(v1.1.5): pubsub::publish_durable SDK + pubsub:* triggers

Durable pub/sub through the universal outbox — the sixth trigger kind.

- `pubsub::publish_durable(topic, message)` Rhai SDK (no handle; topics
  ARE the grouping unit). Message JSON-encoded; Blobs base64 at any
  depth.
- `PubsubService` trait in picloud-shared with the topic matcher +
  validator (exact / `<prefix>.*` / `*`; mid-pattern wildcards
  rejected). `PostgresPubsubRepo` + `PubsubServiceImpl` in manager-core.
- Publish-time fan-out: one outbox row per matching enabled pubsub
  trigger, all in ONE transaction (no half-fan-out on crash). No
  matching trigger → publish succeeds silently, zero rows.
- `pubsub:*` trigger kind via Layout-E (0020: widen both CHECKs +
  pubsub_trigger_details + partial index), TriggerEvent::Pubsub +
  ctx.event.pubsub, dispatcher arm, admin endpoint POST /triggers/pubsub
  (validates topic pattern + reuses validate_trigger_target).
- AppPubsubPublish capability → script:write (seven-scope held).
- Dashboard Pub/Sub trigger form on the Triggers tab + list rendering.

publish_ephemeral stays deferred to v1.2. ~18 new tests (service
in-memory incl. transactional-rollback, shared matcher, bridge
encoding). No DB required for the suite.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-03 21:37:06 +02:00
parent 6e132b6ee0
commit 834c787ee1
25 changed files with 1240 additions and 16 deletions

View File

@@ -434,6 +434,20 @@ fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic {
); );
m.insert("files".into(), files_map.into()); m.insert("files".into(), files_map.into());
} }
TriggerEvent::Pubsub {
topic,
message,
published_at,
} => {
// `ctx.event.op` is always "publish" for pub/sub (the only
// op a publish produces).
m.insert("op".into(), "publish".into());
let mut ps = Map::new();
ps.insert("topic".into(), topic.clone().into());
ps.insert("message".into(), json_to_dynamic(message.clone()));
ps.insert("published_at".into(), published_at.to_rfc3339().into());
m.insert("pubsub".into(), ps.into());
}
TriggerEvent::DeadLetter { TriggerEvent::DeadLetter {
dead_letter_id, dead_letter_id,
original, original,

View File

@@ -18,6 +18,7 @@ pub mod docs;
pub mod files; pub mod files;
pub mod http; pub mod http;
pub mod kv; pub mod kv;
pub mod pubsub;
pub mod stdlib; pub mod stdlib;
pub use bridge::{dynamic_to_json, json_to_dynamic}; pub use bridge::{dynamic_to_json, json_to_dynamic};
@@ -39,5 +40,6 @@ pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCal
docs::register(engine, services, cx.clone()); docs::register(engine, services, cx.clone());
dead_letters::register(engine, services, cx.clone()); dead_letters::register(engine, services, cx.clone());
http::register(engine, services, cx.clone()); http::register(engine, services, cx.clone());
files::register(engine, services, cx); files::register(engine, services, cx.clone());
pubsub::register(engine, services, cx);
} }

View File

@@ -0,0 +1,100 @@
//! `pubsub::` Rhai bridge — durable publish (v1.1.5).
//!
//! ```rhai
//! pubsub::publish_durable("user.created", #{ user_id: "abc" });
//! pubsub::publish_durable("metric", 42);
//! ```
//!
//! No handle pattern (topics ARE the grouping unit, so there's no
//! `::collection(...)`). The message is any JSON-serializable Rhai value
//! — Maps, Arrays, strings, numbers, bools, unit, and **Blobs (which
//! encode as base64 strings** so trigger handlers see them as base64 on
//! the wire). Nested blobs are encoded at any depth.
//!
//! `app_id` is derived from `cx.app_id` in the service — it never
//! appears in the script-side signature, preserving cross-app
//! isolation.
use std::sync::Arc;
use base64::engine::general_purpose::STANDARD;
use base64::Engine as _;
use picloud_shared::{PubsubError, SdkCallCx, Services};
use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module};
use serde_json::Value as Json;
use tokio::runtime::Handle as TokioHandle;
pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
let svc = services.pubsub.clone();
let mut module = Module::new();
{
let svc = svc.clone();
let cx = cx.clone();
module.set_native_fn(
"publish_durable",
move |topic: &str, message: Dynamic| -> Result<(), Box<EvalAltResult>> {
let json = message_to_json(&message);
let svc = svc.clone();
let cx = cx.clone();
block_on(async move { svc.publish_durable(&cx, topic, json).await })
},
);
}
engine.register_static_module("pubsub", module.into());
}
/// Convert a Rhai `Dynamic` message into JSON, base64-encoding any
/// `Blob` (at any nesting depth). Mirrors `bridge::dynamic_to_json` but
/// adds the blob arm the pub/sub wire contract requires.
fn message_to_json(value: &Dynamic) -> Json {
// Blob must be checked before the generic array path (a Blob is a
// `Vec<u8>`, distinct from a Rhai `Array`).
if value.is_blob() {
let blob = value.clone().into_blob().unwrap_or_default();
return Json::String(STANDARD.encode(&blob));
}
if value.is_unit() {
return Json::Null;
}
if let Ok(b) = value.as_bool() {
return Json::Bool(b);
}
if let Ok(i) = value.as_int() {
return Json::Number(i.into());
}
if let Ok(f) = value.as_float() {
return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number);
}
if value.is_string() {
return Json::String(value.clone().into_string().unwrap_or_default());
}
if let Some(arr) = value.clone().try_cast::<Array>() {
return Json::Array(arr.iter().map(message_to_json).collect());
}
if let Some(map) = value.clone().try_cast::<Map>() {
let mut out = serde_json::Map::new();
for (k, v) in map {
out.insert(k.to_string(), message_to_json(&v));
}
return Json::Object(out);
}
Json::String(value.to_string())
}
/// Run an async future inside the synchronous Rhai context. Mirrors
/// `kv::block_on`.
fn block_on<F>(fut: F) -> Result<(), Box<EvalAltResult>>
where
F: std::future::Future<Output = Result<(), PubsubError>> + Send,
{
let handle = TokioHandle::try_current().map_err(|e| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(
format!("pubsub: no tokio runtime available: {e}").into(),
rhai::Position::NONE,
)
.into()
})?;
handle.block_on(fut).map_err(|err| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(format!("pubsub: {err}").into(), rhai::Position::NONE).into()
})
}

View File

@@ -100,6 +100,7 @@ async fn original_backend_error_is_logged_at_error_level() {
Arc::new(FailingSource), Arc::new(FailingSource),
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService), Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
); );
let engine = Engine::new(Limits::default(), services); let engine = Engine::new(Limits::default(), services);

View File

@@ -98,6 +98,7 @@ fn services_with(modules: Arc<dyn ModuleSource>) -> Services {
modules, modules,
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService), Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
) )
} }

View File

@@ -229,6 +229,7 @@ fn make_engine() -> Arc<Engine> {
Arc::new(NoopModuleSource), Arc::new(NoopModuleSource),
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService), Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
); );
Arc::new(Engine::new(Limits::default(), services)) Arc::new(Engine::new(Limits::default(), services))
} }

View File

@@ -166,6 +166,7 @@ fn make_engine() -> Arc<Engine> {
Arc::new(NoopModuleSource), Arc::new(NoopModuleSource),
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(InMemoryFiles::default()), Arc::new(InMemoryFiles::default()),
Arc::new(picloud_shared::NoopPubsubService),
); );
Arc::new(Engine::new(Limits::default(), services)) Arc::new(Engine::new(Limits::default(), services))
} }

View File

@@ -89,6 +89,7 @@ fn engine_with(http: Arc<dyn HttpService>) -> Arc<Engine> {
Arc::new(NoopModuleSource), Arc::new(NoopModuleSource),
http, http,
Arc::new(picloud_shared::NoopFilesService), Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
); );
Arc::new(Engine::new(Limits::default(), services)) Arc::new(Engine::new(Limits::default(), services))
} }

View File

@@ -108,6 +108,7 @@ fn make_engine() -> Arc<Engine> {
Arc::new(NoopModuleSource), Arc::new(NoopModuleSource),
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService), Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
); );
Arc::new(Engine::new(Limits::default(), services)) Arc::new(Engine::new(Limits::default(), services))
} }

View File

@@ -0,0 +1,157 @@
//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine
//! against an in-memory `PubsubService` that records the published
//! `(topic, message)`. Verifies the message JSON encoding the wire
//! contract requires: Maps, Arrays, strings, numbers, bool, null, and
//! **Blob → base64**, including nesting.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
use picloud_shared::{
AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService,
NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId,
ScriptId, ScriptSandbox, SdkCallCx, Services,
};
use serde_json::{json, Value};
#[derive(Default)]
struct RecordingPubsub {
last: Mutex<Option<(String, Value)>>,
}
#[async_trait]
impl PubsubService for RecordingPubsub {
async fn publish_durable(
&self,
_cx: &SdkCallCx,
topic: &str,
message: Value,
) -> Result<(), PubsubError> {
if topic.trim().is_empty() {
return Err(PubsubError::EmptyTopic);
}
*self.last.lock().unwrap() = Some((topic.to_string(), message));
Ok(())
}
}
fn make_engine(svc: Arc<RecordingPubsub>) -> Arc<Engine> {
let services = Services::new(
Arc::new(NoopKvService),
Arc::new(NoopDocsService),
Arc::new(NoopDeadLetterService),
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(NoopFilesService),
svc,
);
Arc::new(Engine::new(Limits::default(), services))
}
fn baseline_request(app_id: AppId) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "pubsub-test".into(),
invocation_type: InvocationType::Http,
path: "/pubsub-test".into(),
headers: BTreeMap::new(),
body: Value::Null,
params: BTreeMap::new(),
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id,
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
is_dead_letter_handler: false,
event: None,
}
}
async fn run(engine: Arc<Engine>, src: &str, req: ExecRequest) {
let src = src.to_string();
tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic")
.expect("script execution should succeed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_map_message() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#,
baseline_request(AppId::new()),
)
.await;
let (topic, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(topic, "user.created");
assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_scalar_and_array_and_null() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!([1, "two", false, null]));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_number_scalar() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("metric", 42);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!(42));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_blob_encodes_base64_including_nested() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
// base64("hello") = "aGVsbG8=" (STANDARD, padded).
run(
engine,
r#"
let data = base64::decode("aGVsbG8=");
pubsub::publish_durable("blobs", #{ raw: data, list: [data] });
"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_empty_topic_throws() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
let src = r#"pubsub::publish_durable("", 1);"#.to_string();
let req = baseline_request(AppId::new());
let res = tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic");
assert!(res.is_err(), "empty topic should throw");
}

View File

@@ -0,0 +1,34 @@
-- v1.1.5: extend the triggers framework to recognise `pubsub` as the
-- sixth concrete kind. Same Layout-E shape as files (0019): two CHECK
-- constraints widen, one new detail table.
--
-- Pub/sub fans out at PUBLISH time (one outbox row per matching trigger,
-- written by the PubsubServiceImpl), so the dispatcher needs no pubsub-
-- specific branching — a pubsub outbox row dispatches like any other
-- async trigger.
-- Extend triggers.kind to include 'pubsub'.
ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check;
ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check
CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', 'files', 'pubsub'));
-- Extend outbox.source_kind to include 'pubsub'.
ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check;
ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check
CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs',
'cron', 'files', 'pubsub'));
-- One row per pubsub trigger. `topic_pattern` is "exact", "prefix.*",
-- or "*" — validated in Rust at trigger creation. Topics are implicit
-- on first publish; the external-subscribable `topics` table is v1.1.6.
CREATE TABLE pubsub_trigger_details (
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
topic_pattern TEXT NOT NULL
);
-- Hot lookup for fan-out: "all enabled pubsub triggers in app X".
-- Third partial index of its kind (after v1.1.1's idx_triggers_app_kind_
-- enabled); partial indexes are tiny and the planner picks the narrowest.
CREATE INDEX idx_triggers_app_pubsub_enabled
ON triggers (app_id, kind)
WHERE enabled = TRUE AND kind = 'pubsub';

View File

@@ -167,7 +167,8 @@ impl Dispatcher {
| OutboxSourceKind::Docs | OutboxSourceKind::Docs
| OutboxSourceKind::DeadLetter | OutboxSourceKind::DeadLetter
| OutboxSourceKind::Cron | OutboxSourceKind::Cron
| OutboxSourceKind::Files => { | OutboxSourceKind::Files
| OutboxSourceKind::Pubsub => {
let resolved = self.resolve_trigger(&row).await?; let resolved = self.resolve_trigger(&row).await?;
let req = match self.build_exec_request(&row, &resolved).await { let req = match self.build_exec_request(&row, &resolved).await {
Ok(req) => req, Ok(req) => req,

View File

@@ -43,6 +43,8 @@ pub mod module_source;
pub mod outbox_event_emitter; pub mod outbox_event_emitter;
pub mod outbox_repo; pub mod outbox_repo;
pub mod principal_resolver; pub mod principal_resolver;
pub mod pubsub_repo;
pub mod pubsub_service;
pub mod repo; pub mod repo;
pub mod route_admin; pub mod route_admin;
pub mod route_repo; pub mod route_repo;
@@ -113,6 +115,8 @@ pub use outbox_repo::{
NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo, NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo,
}; };
pub use principal_resolver::{AdminPrincipalResolver, PrincipalResolver, PrincipalResolverError}; pub use principal_resolver::{AdminPrincipalResolver, PrincipalResolver, PrincipalResolverError};
pub use pubsub_repo::{PostgresPubsubRepo, PublishCtx, PubsubRepo, PubsubRepoError};
pub use pubsub_service::PubsubServiceImpl;
pub use repo::{ pub use repo::{
ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository, ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository,
RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError, RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError,
@@ -123,8 +127,8 @@ pub use sandbox::{CeilingError, SandboxCeiling};
pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_config::{BackoffShape, TriggerConfig};
pub use trigger_repo::{ pub use trigger_repo::{
collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger,
CreateKvTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, CreateKvTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, DocsTriggerMatch,
PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, FilesTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails,
TriggerRepoError, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError,
}; };
pub use triggers_api::{triggers_router, TriggersApiError, TriggersState}; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState};

View File

@@ -29,6 +29,8 @@ pub enum OutboxSourceKind {
Cron, Cron,
/// v1.1.5. /// v1.1.5.
Files, Files,
/// v1.1.5.
Pubsub,
} }
impl OutboxSourceKind { impl OutboxSourceKind {
@@ -41,6 +43,7 @@ impl OutboxSourceKind {
Self::DeadLetter => "dead_letter", Self::DeadLetter => "dead_letter",
Self::Cron => "cron", Self::Cron => "cron",
Self::Files => "files", Self::Files => "files",
Self::Pubsub => "pubsub",
} }
} }
@@ -53,6 +56,7 @@ impl OutboxSourceKind {
"dead_letter" => Some(Self::DeadLetter), "dead_letter" => Some(Self::DeadLetter),
"cron" => Some(Self::Cron), "cron" => Some(Self::Cron),
"files" => Some(Self::Files), "files" => Some(Self::Files),
"pubsub" => Some(Self::Pubsub),
_ => None, _ => None,
} }
} }

View File

@@ -0,0 +1,118 @@
//! `PubsubRepo` — publish-time fan-out for the v1.1.5 `pubsub::*` SDK.
//!
//! `publish_durable` writes one outbox row per matching enabled `pubsub`
//! trigger, all inside a single transaction so a partial fan-out (some
//! subscribers got rows, others didn't, then a crash) can't happen.
//! Each delivery row then retries / dead-letters independently through
//! the existing dispatcher — no pub/sub-specific dispatch branching.
//!
//! Topic pattern matching runs in Rust (`picloud_shared::topic_matches`)
//! against the small set of the app's enabled pubsub triggers, keeping
//! the SELECT trivial. v1.2 can add a topic-trie index if fan-out
//! becomes a hot path.
use async_trait::async_trait;
use picloud_shared::{topic_matches, AdminUserId, AppId, ExecutionId};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum PubsubRepoError {
#[error("database error: {0}")]
Db(#[from] sqlx::Error),
}
/// The execution-context bits a fan-out needs to stamp onto each outbox
/// row. Derived from the publishing script's `SdkCallCx`.
#[derive(Debug, Clone, Copy)]
pub struct PublishCtx {
pub app_id: AppId,
pub origin_principal: Option<AdminUserId>,
pub trigger_depth: u32,
pub root_execution_id: ExecutionId,
}
#[async_trait]
pub trait PubsubRepo: Send + Sync {
/// Fan out a publish to every matching enabled pubsub trigger in
/// `ctx.app_id`, inserting one outbox row each in a SINGLE
/// transaction. `event_payload` is the serialized
/// `TriggerEvent::Pubsub`. Returns the number of delivery rows
/// written (0 when no trigger matched — the publish still succeeds).
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError>;
}
pub struct PostgresPubsubRepo {
pool: PgPool,
}
impl PostgresPubsubRepo {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[derive(sqlx::FromRow)]
struct PubsubTriggerRow {
id: Uuid,
script_id: Uuid,
topic_pattern: String,
}
#[async_trait]
impl PubsubRepo for PostgresPubsubRepo {
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError> {
let mut tx = self.pool.begin().await?;
// Load all enabled pubsub triggers for the app; filter by topic
// pattern in Rust (keeps the query simple, honours the
// empty/`*`/prefix semantics without teaching SQL about globs).
let rows: Vec<PubsubTriggerRow> = sqlx::query_as(
"SELECT t.id, t.script_id, d.topic_pattern \
FROM triggers t \
JOIN pubsub_trigger_details d ON d.trigger_id = t.id \
WHERE t.app_id = $1 AND t.kind = 'pubsub' AND t.enabled = TRUE",
)
.bind(ctx.app_id.into_inner())
.fetch_all(&mut *tx)
.await?;
let mut written: u32 = 0;
for r in rows {
if !topic_matches(&r.topic_pattern, topic) {
continue;
}
sqlx::query(
"INSERT INTO outbox ( \
app_id, source_kind, trigger_id, script_id, reply_to, \
payload, origin_principal, trigger_depth, root_execution_id \
) VALUES ($1, 'pubsub', $2, $3, NULL, $4, $5, $6, $7)",
)
.bind(ctx.app_id.into_inner())
.bind(r.id)
.bind(r.script_id)
.bind(&event_payload)
.bind(ctx.origin_principal.map(AdminUserId::into_inner))
.bind(i32::try_from(ctx.trigger_depth.saturating_add(1)).unwrap_or(1))
.bind(ctx.root_execution_id.into_inner())
.execute(&mut *tx)
.await?;
written += 1;
}
// Commit once — all rows or none.
tx.commit().await?;
Ok(written)
}
}

View File

@@ -0,0 +1,320 @@
//! `PubsubServiceImpl` — wires `PubsubRepo` underneath the
//! `picloud_shared::PubsubService` trait scripts see via the Rhai
//! bridge.
//!
//! Mirrors the other stateful services: script-as-gate authz
//! (`AppPubsubPublish`, skipped when `cx.principal` is `None`), with the
//! backend doing a publish-time outbox fan-out instead of a row write.
//! No `ServiceEventEmitter` here — pub/sub publishes directly to the
//! outbox; it doesn't mutate local data that other triggers observe.
use std::sync::Arc;
use async_trait::async_trait;
use picloud_shared::{PubsubError, PubsubService, SdkCallCx, TriggerEvent};
use crate::authz::{self, AuthzRepo, Capability};
use crate::pubsub_repo::{PublishCtx, PubsubRepo, PubsubRepoError};
pub struct PubsubServiceImpl {
repo: Arc<dyn PubsubRepo>,
authz: Arc<dyn AuthzRepo>,
}
impl PubsubServiceImpl {
#[must_use]
pub fn new(repo: Arc<dyn PubsubRepo>, authz: Arc<dyn AuthzRepo>) -> Self {
Self { repo, authz }
}
async fn check_publish(&self, cx: &SdkCallCx) -> Result<(), PubsubError> {
if let Some(ref principal) = cx.principal {
authz::require(
&*self.authz,
principal,
Capability::AppPubsubPublish(cx.app_id),
)
.await
.map_err(|_| PubsubError::Forbidden)?;
}
Ok(())
}
}
impl From<PubsubRepoError> for PubsubError {
fn from(e: PubsubRepoError) -> Self {
Self::Unavailable(e.to_string())
}
}
#[async_trait]
impl PubsubService for PubsubServiceImpl {
async fn publish_durable(
&self,
cx: &SdkCallCx,
topic: &str,
message: serde_json::Value,
) -> Result<(), PubsubError> {
if topic.trim().is_empty() {
return Err(PubsubError::EmptyTopic);
}
self.check_publish(cx).await?;
// `published_at` is stamped on the manager side so every
// delivery agrees on one instant.
let event = TriggerEvent::Pubsub {
topic: topic.to_string(),
message,
published_at: chrono::Utc::now(),
};
let payload = serde_json::to_value(&event)
.map_err(|e| PubsubError::Rejected(format!("event serialize: {e}")))?;
let publish_ctx = PublishCtx {
app_id: cx.app_id,
origin_principal: cx.principal.as_ref().map(|p| p.user_id),
trigger_depth: cx.trigger_depth,
root_execution_id: cx.root_execution_id,
};
// No matching triggers → 0 rows written, publish still succeeds.
self.repo
.fan_out_publish(publish_ctx, topic, payload)
.await?;
Ok(())
}
}
// ----------------------------------------------------------------------------
// Tests — in-memory PubsubRepo so unit tests don't need Postgres. The
// real transactional fan-out is covered against a live DB by the
// integration suite; the in-memory fake models the all-or-nothing
// commit so the rollback semantics can be asserted without a DB.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::authz::{AuthzError, AuthzRepo};
use async_trait::async_trait;
use picloud_shared::{
topic_matches, AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, Principal,
RequestId, ScriptId, UserId,
};
use std::sync::Mutex;
/// In-memory pubsub repo. Holds a set of `(app, pattern)`
/// subscriptions and records the outbox rows a publish would write.
/// `fail_at` simulates a mid-fan-out INSERT failure: when set to
/// `Some(n)`, the n-th (1-indexed) matching row errors and NOTHING
/// is recorded — modelling the single-transaction rollback.
struct InMemoryPubsubRepo {
subs: Vec<(AppId, String)>,
written: Mutex<Vec<(AppId, String)>>,
fail_at: Option<usize>,
}
impl InMemoryPubsubRepo {
fn new(subs: Vec<(AppId, String)>) -> Self {
Self {
subs,
written: Mutex::new(Vec::new()),
fail_at: None,
}
}
fn written_count(&self) -> usize {
self.written.lock().unwrap().len()
}
}
#[async_trait]
impl PubsubRepo for InMemoryPubsubRepo {
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
_event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError> {
let matches: Vec<&(AppId, String)> = self
.subs
.iter()
.filter(|(a, pat)| *a == ctx.app_id && topic_matches(pat, topic))
.collect();
let mut staged = Vec::new();
for (i, _) in matches.iter().enumerate() {
if self.fail_at == Some(i + 1) {
// Rollback: nothing was committed.
return Err(PubsubRepoError::Db(sqlx::Error::Protocol(
"simulated insert failure".into(),
)));
}
staged.push((ctx.app_id, topic.to_string()));
}
let n = staged.len();
self.written.lock().unwrap().extend(staged);
Ok(u32::try_from(n).unwrap_or(u32::MAX))
}
}
#[derive(Default)]
struct DenyingAuthzRepo;
#[async_trait]
impl AuthzRepo for DenyingAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(None)
}
}
#[derive(Default)]
struct EditorAuthzRepo;
#[async_trait]
impl AuthzRepo for EditorAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(Some(AppRole::Editor))
}
}
fn anon_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
script_id: ScriptId::new(),
principal: None,
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn member_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
principal: Some(Principal {
user_id: AdminUserId::new(),
instance_role: InstanceRole::Member,
scopes: None,
app_binding: None,
}),
..anon_cx(app_id)
}
}
fn svc(repo: Arc<dyn PubsubRepo>, authz: Arc<dyn AuthzRepo>) -> PubsubServiceImpl {
PubsubServiceImpl::new(repo, authz)
}
#[tokio::test]
async fn publish_writes_one_row_per_matching_trigger() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![
(app, "user.*".into()),
(app, "user.created".into()),
(app, "order.*".into()), // does not match
]));
let files = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
files
.publish_durable(&anon_cx(app), "user.created", serde_json::json!({"id": 1}))
.await
.unwrap();
// Two of the three subscriptions match "user.created".
assert_eq!(repo.written_count(), 2);
}
#[tokio::test]
async fn no_matching_trigger_succeeds_silently() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app, "order.*".into())]));
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
svc.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1))
.await
.unwrap();
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn empty_topic_rejected() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&anon_cx(app), " ", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::EmptyTopic));
}
#[tokio::test]
async fn cross_app_isolation() {
let app_a = AppId::new();
let app_b = AppId::new();
// The only subscription belongs to app B.
let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app_b, "*".into())]));
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
// App A publishes — app B's trigger must NOT fire.
svc.publish_durable(&anon_cx(app_a), "user.created", serde_json::json!(1))
.await
.unwrap();
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn fan_out_is_transactional_all_or_nothing() {
let app = AppId::new();
let mut repo = InMemoryPubsubRepo::new(vec![
(app, "*".into()),
(app, "user.*".into()),
(app, "user.created".into()),
]);
repo.fail_at = Some(3); // fail on the 3rd matching insert
let repo = Arc::new(repo);
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::Unavailable(_)));
// Rollback: no partial fan-out survived.
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn anonymous_cx_skips_authz() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
// No principal → no authz check even with a denying repo.
svc.publish_durable(&anon_cx(app), "t", serde_json::json!(1))
.await
.unwrap();
}
#[tokio::test]
async fn member_without_role_is_forbidden() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&member_cx(app), "t", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::Forbidden));
}
#[tokio::test]
async fn member_with_editor_role_allowed() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(EditorAuthzRepo));
svc.publish_durable(&member_cx(app), "t", serde_json::json!(1))
.await
.unwrap();
}
}

View File

@@ -55,6 +55,8 @@ pub enum TriggerKind {
Cron, Cron,
/// v1.1.5. /// v1.1.5.
Files, Files,
/// v1.1.5.
Pubsub,
} }
impl TriggerKind { impl TriggerKind {
@@ -66,6 +68,7 @@ impl TriggerKind {
Self::DeadLetter => "dead_letter", Self::DeadLetter => "dead_letter",
Self::Cron => "cron", Self::Cron => "cron",
Self::Files => "files", Self::Files => "files",
Self::Pubsub => "pubsub",
} }
} }
@@ -77,6 +80,7 @@ impl TriggerKind {
"dead_letter" => Some(Self::DeadLetter), "dead_letter" => Some(Self::DeadLetter),
"cron" => Some(Self::Cron), "cron" => Some(Self::Cron),
"files" => Some(Self::Files), "files" => Some(Self::Files),
"pubsub" => Some(Self::Pubsub),
_ => None, _ => None,
} }
} }
@@ -131,6 +135,8 @@ pub enum TriggerDetails {
collection_glob: String, collection_glob: String,
ops: Vec<FilesEventOp>, ops: Vec<FilesEventOp>,
}, },
/// v1.1.5. A topic pattern: exact, `<prefix>.*`, or `*`.
Pubsub { topic_pattern: String },
} }
/// Create payload for a KV trigger. Defaults applied at the admin /// Create payload for a KV trigger. Defaults applied at the admin
@@ -213,6 +219,19 @@ pub struct FilesTriggerMatch {
pub registered_by_principal: AdminUserId, pub registered_by_principal: AdminUserId,
} }
/// Create payload for a pubsub trigger (v1.1.5). `topic_pattern` is
/// validated (exact / `<prefix>.*` / `*`) before insert.
#[derive(Debug, Clone)]
pub struct CreatePubsubTrigger {
pub script_id: ScriptId,
pub topic_pattern: String,
pub dispatch_mode: TriggerDispatchMode,
pub retry_max_attempts: u32,
pub retry_backoff: BackoffShape,
pub retry_base_ms: u32,
pub registered_by_principal: AdminUserId,
}
/// One match for the dispatcher's "which KV triggers fire on this /// One match for the dispatcher's "which KV triggers fire on this
/// event" lookup. Carries everything the dispatcher needs to construct /// event" lookup. Carries everything the dispatcher needs to construct
/// the outbox row. /// the outbox row.
@@ -287,6 +306,13 @@ pub trait TriggerRepo: Send + Sync {
req: CreateFilesTrigger, req: CreateFilesTrigger,
) -> Result<Trigger, TriggerRepoError>; ) -> Result<Trigger, TriggerRepoError>;
/// v1.1.5. `topic_pattern` is validated before insert.
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError>;
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError>; async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError>;
async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError>; async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError>;
@@ -675,6 +701,66 @@ impl TriggerRepo for PostgresTriggerRepo {
}) })
} }
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError> {
// Defense-in-depth validation (the admin endpoint validates too).
picloud_shared::validate_topic_pattern(&req.topic_pattern)
.map_err(TriggerRepoError::Invalid)?;
let mut tx = self.pool.begin().await?;
let parent: TriggerRow = sqlx::query_as(
"INSERT INTO triggers ( \
app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal \
) VALUES ($1, $2, 'pubsub', TRUE, $3, $4, $5, $6, $7) \
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal, created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(req.script_id.into_inner())
.bind(req.dispatch_mode.as_str())
.bind(i32::try_from(req.retry_max_attempts).unwrap_or(3))
.bind(req.retry_backoff.as_str())
.bind(i32::try_from(req.retry_base_ms).unwrap_or(1000))
.bind(req.registered_by_principal.into_inner())
.fetch_one(&mut *tx)
.await?;
sqlx::query(
"INSERT INTO pubsub_trigger_details (trigger_id, topic_pattern) VALUES ($1, $2)",
)
.bind(parent.id)
.bind(&req.topic_pattern)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Trigger {
id: parent.id.into(),
app_id: parent.app_id.into(),
script_id: parent.script_id.into(),
kind: TriggerKind::Pubsub,
enabled: parent.enabled,
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
.unwrap_or(BackoffShape::Exponential),
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
registered_by_principal: parent.registered_by_principal.into(),
created_at: parent.created_at,
updated_at: parent.updated_at,
details: TriggerDetails::Pubsub {
topic_pattern: req.topic_pattern,
},
})
}
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> { async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
let parents: Vec<TriggerRow> = sqlx::query_as( let parents: Vec<TriggerRow> = sqlx::query_as(
"SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \
@@ -980,6 +1066,17 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result<Trigger, Trigg
ops, ops,
} }
} }
TriggerKind::Pubsub => {
let row: PubsubDetailRow = sqlx::query_as(
"SELECT topic_pattern FROM pubsub_trigger_details WHERE trigger_id = $1",
)
.bind(parent.id)
.fetch_one(pool)
.await?;
TriggerDetails::Pubsub {
topic_pattern: row.topic_pattern,
}
}
}; };
Ok(Trigger { Ok(Trigger {
@@ -1052,6 +1149,11 @@ struct CronDetailRow {
last_fired_at: Option<DateTime<Utc>>, last_fired_at: Option<DateTime<Utc>>,
} }
#[derive(sqlx::FromRow)]
struct PubsubDetailRow {
topic_pattern: String,
}
#[derive(sqlx::FromRow)] #[derive(sqlx::FromRow)]
#[allow(clippy::struct_field_names)] #[allow(clippy::struct_field_names)]
struct DlDetailRow { struct DlDetailRow {

View File

@@ -28,7 +28,8 @@ use crate::repo::{ScriptRepository, ScriptRepositoryError};
use crate::trigger_config::{BackoffShape, TriggerConfig}; use crate::trigger_config::{BackoffShape, TriggerConfig};
use crate::trigger_repo::{ use crate::trigger_repo::{
CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger,
CreateKvTrigger, Trigger, TriggerDispatchMode, TriggerRepo, TriggerRepoError, CreateKvTrigger, CreatePubsubTrigger, Trigger, TriggerDispatchMode, TriggerRepo,
TriggerRepoError,
}; };
#[derive(Clone)] #[derive(Clone)]
@@ -57,6 +58,10 @@ pub fn triggers_router(state: TriggersState) -> Router {
.route("/apps/{app_id}/triggers/docs", post(create_docs_trigger)) .route("/apps/{app_id}/triggers/docs", post(create_docs_trigger))
.route("/apps/{app_id}/triggers/cron", post(create_cron_trigger)) .route("/apps/{app_id}/triggers/cron", post(create_cron_trigger))
.route("/apps/{app_id}/triggers/files", post(create_files_trigger)) .route("/apps/{app_id}/triggers/files", post(create_files_trigger))
.route(
"/apps/{app_id}/triggers/pubsub",
post(create_pubsub_trigger),
)
.route( .route(
"/apps/{app_id}/triggers/dead_letter", "/apps/{app_id}/triggers/dead_letter",
post(create_dl_trigger), post(create_dl_trigger),
@@ -349,6 +354,57 @@ async fn create_cron_trigger(
Ok((StatusCode::CREATED, Json(created))) Ok((StatusCode::CREATED, Json(created)))
} }
/// v1.1.5 pubsub trigger. `topic_pattern` is validated to be exact /
/// `<prefix>.*` / `*`.
#[derive(Debug, Deserialize)]
pub struct CreatePubsubTriggerRequest {
pub script_id: ScriptId,
pub topic_pattern: String,
#[serde(default = "default_dispatch")]
pub dispatch_mode: TriggerDispatchMode,
#[serde(default)]
pub retry_max_attempts: Option<u32>,
#[serde(default)]
pub retry_backoff: Option<BackoffShape>,
#[serde(default)]
pub retry_base_ms: Option<u32>,
}
async fn create_pubsub_trigger(
State(s): State<TriggersState>,
Extension(principal): Extension<Principal>,
Path(app_id): Path<AppId>,
Json(input): Json<CreatePubsubTriggerRequest>,
) -> Result<(StatusCode, Json<Trigger>), TriggersApiError> {
ensure_app_exists(&*s.apps, app_id).await?;
require(
s.authz.as_ref(),
&principal,
Capability::AppManageTriggers(app_id),
)
.await?;
// Validate the topic pattern before touching the script repo so a
// bad pattern fails fast with a clear 422.
picloud_shared::validate_topic_pattern(&input.topic_pattern)
.map_err(TriggersApiError::Invalid)?;
validate_trigger_target(&*s.scripts, app_id, input.script_id).await?;
let req = CreatePubsubTrigger {
script_id: input.script_id,
topic_pattern: input.topic_pattern,
dispatch_mode: input.dispatch_mode,
retry_max_attempts: input
.retry_max_attempts
.unwrap_or(s.config.retry_max_attempts),
retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff),
retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms),
registered_by_principal: principal.user_id,
};
let created = s.triggers.create_pubsub_trigger(app_id, req).await?;
Ok((StatusCode::CREATED, Json(created)))
}
async fn create_files_trigger( async fn create_files_trigger(
State(s): State<TriggersState>, State(s): State<TriggersState>,
Extension(principal): Extension<Principal>, Extension(principal): Extension<Principal>,
@@ -542,8 +598,9 @@ mod tests {
use super::*; use super::*;
use crate::app_repo::{AppLookup, AppRepository}; use crate::app_repo::{AppLookup, AppRepository};
use crate::trigger_repo::{ use crate::trigger_repo::{
CreateCronTrigger, CreateFilesTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, CreateCronTrigger, CreateFilesTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch,
FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo, TriggerRepoError, DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo,
TriggerRepoError,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Utc; use chrono::Utc;
@@ -703,6 +760,33 @@ mod tests {
self.inner.lock().await.insert(id, trigger.clone()); self.inner.lock().await.insert(id, trigger.clone());
Ok(trigger) Ok(trigger)
} }
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError> {
let now = Utc::now();
let id = TriggerId::new();
let trigger = Trigger {
id,
app_id,
script_id: req.script_id,
kind: crate::trigger_repo::TriggerKind::Pubsub,
enabled: true,
dispatch_mode: req.dispatch_mode,
retry_max_attempts: req.retry_max_attempts,
retry_backoff: req.retry_backoff,
retry_base_ms: req.retry_base_ms,
registered_by_principal: req.registered_by_principal,
created_at: now,
updated_at: now,
details: TriggerDetails::Pubsub {
topic_pattern: req.topic_pattern,
},
};
self.inner.lock().await.insert(id, trigger.clone());
Ok(trigger)
}
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> { async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
Ok(self Ok(self
.inner .inner

View File

@@ -22,9 +22,9 @@ use picloud_manager_core::{
PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository,
PostgresAppRepository, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo, PostgresAppRepository, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo,
PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo,
PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, PostgresPubsubRepo, PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo,
RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, PrincipalResolver, PubsubServiceImpl, RepoResolver, RouteAdminState, RouteRepository,
TriggerConfig, TriggerRepo, TriggersState, SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo, TriggersState,
}; };
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
use picloud_orchestrator_core::{ use picloud_orchestrator_core::{
@@ -33,8 +33,8 @@ use picloud_orchestrator_core::{
}; };
use picloud_shared::{ use picloud_shared::{
DeadLetterService, DocsService, ExecutionLogSink, FilesService, HttpService, InboxResolver, DeadLetterService, DocsService, ExecutionLogSink, FilesService, HttpService, InboxResolver,
KvService, OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, KvService, OutboxWriter, PubsubService, ScriptValidator, ServiceEventEmitter, Services,
PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION,
}; };
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool; use sqlx::PgPool;
@@ -169,7 +169,22 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
events.clone(), events.clone(),
files_max_size, files_max_size,
)); ));
let services = Services::new(kv, docs, dl_service.clone(), events, modules, http, files); // v1.1.5 durable pub/sub. Publishes fan out to matching pubsub
// triggers at publish time (one outbox row each), delivered by the
// same dispatcher as every other async trigger.
let pubsub_repo = Arc::new(PostgresPubsubRepo::new(pool.clone()));
let pubsub: Arc<dyn PubsubService> =
Arc::new(PubsubServiceImpl::new(pubsub_repo, authz.clone()));
let services = Services::new(
kv,
docs,
dl_service.clone(),
events,
modules,
http,
files,
pubsub,
);
let engine = Arc::new(Engine::new(Limits::default(), services)); let engine = Arc::new(Engine::new(Limits::default(), services));
// Compile the routes table once at startup; admin writes refresh it. // Compile the routes table once at startup; admin writes refresh it.

View File

@@ -20,6 +20,7 @@ pub mod kv;
pub mod log_sink; pub mod log_sink;
pub mod modules; pub mod modules;
pub mod outbox_writer; pub mod outbox_writer;
pub mod pubsub;
pub mod route; pub mod route;
pub mod sandbox; pub mod sandbox;
pub mod script; pub mod script;
@@ -50,6 +51,9 @@ pub use kv::{KvError, KvListPage, KvService, NoopKvService};
pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use log_sink::{ExecutionLogSink, LogSinkError};
pub use modules::{ModuleScript, ModuleSource, ModuleSourceError, NoopModuleSource}; pub use modules::{ModuleScript, ModuleSource, ModuleSourceError, NoopModuleSource};
pub use outbox_writer::{HttpDispatchPayload, NewHttpOutbox, OutboxWriter, OutboxWriterError}; pub use outbox_writer::{HttpDispatchPayload, NewHttpOutbox, OutboxWriter, OutboxWriterError};
pub use pubsub::{
topic_matches, validate_topic_pattern, NoopPubsubService, PubsubError, PubsubService,
};
pub use route::{DispatchMode, HostKind, PathKind, Route}; pub use route::{DispatchMode, HostKind, PathKind, Route};
pub use sandbox::ScriptSandbox; pub use sandbox::ScriptSandbox;
pub use script::{Script, ScriptKind}; pub use script::{Script, ScriptKind};

161
crates/shared/src/pubsub.rs Normal file
View File

@@ -0,0 +1,161 @@
//! `PubsubService` — the v1.1.5 durable pub/sub contract.
//!
//! `pubsub::publish_durable(topic, message)` writes to the universal
//! outbox; the publish-time fan-out inserts one delivery row per
//! matching `pubsub` trigger, and each delivery retries / dead-letters
//! independently (the dispatcher already handles one-row-equals-one-
//! dispatch — no dispatcher changes for pub/sub).
//!
//! `publish_ephemeral` is committed as a v1.2 addition — the suffix
//! naming exists now so users learn "durable by default" from day one.
//!
//! Topic pattern matching runs in Rust (not SQL) so the trigger-select
//! query stays simple. The matcher + validator live here in
//! `picloud-shared` so the manager-core publish path, the admin trigger
//! endpoint, and tests all agree on the rules.
use async_trait::async_trait;
use thiserror::Error;
use crate::SdkCallCx;
#[async_trait]
pub trait PubsubService: Send + Sync {
/// Durable publish: writes the message to the outbox, fanned out to
/// every matching enabled `pubsub` trigger in `cx.app_id`. Succeeds
/// silently (zero rows written) when no trigger matches the topic.
async fn publish_durable(
&self,
cx: &SdkCallCx,
topic: &str,
message: serde_json::Value,
) -> Result<(), PubsubError>;
}
#[derive(Debug, Error)]
pub enum PubsubError {
/// Empty topic; rejected at the SDK boundary.
#[error("topic must not be empty")]
EmptyTopic,
/// Caller principal lacked the required capability. Only raised when
/// `cx.principal.is_some()` (script-as-gate; public HTTP skips it).
#[error("forbidden")]
Forbidden,
/// Serialization / validation failure on the message.
#[error("pubsub rejected: {0}")]
Rejected(String),
/// Anything else — Postgres unavailable, etc.
#[error("pubsub backend error: {0}")]
Unavailable(String),
}
/// Match a stored `topic_pattern` against a published `topic`.
///
/// - `"*"` matches every topic.
/// - `"<prefix>.*"` matches any topic starting with `"<prefix>."`.
/// - anything else is an exact match.
///
/// Mid-pattern wildcards (`*.created`, `a.*.b`) are NOT supported — they
/// are rejected at trigger creation by [`validate_topic_pattern`], so
/// the only patterns reaching this matcher are exact / prefix / `*`.
#[must_use]
pub fn topic_matches(pattern: &str, topic: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
// `prefix` retains the trailing '.', e.g. "user." for "user.*".
return topic.starts_with(prefix);
}
pattern == topic
}
/// Validate a subscription topic pattern. Accepts exactly: `"*"`
/// (universal), `"<prefix>.*"` (prefix wildcard, single trailing star),
/// or a literal with no `*` (exact). Everything else — mid-pattern
/// wildcards, multiple stars, a star not at the end — is rejected.
///
/// # Errors
///
/// Returns `Err(message)` with `"unsupported pubsub topic pattern: …"`
/// for any unsupported shape (or an empty pattern).
pub fn validate_topic_pattern(pattern: &str) -> Result<(), String> {
if pattern.is_empty() {
return Err("unsupported pubsub topic pattern: <empty>".to_string());
}
if pattern == "*" {
return Ok(());
}
let stars = pattern.matches('*').count();
if stars == 0 {
return Ok(()); // exact
}
if stars == 1 && pattern.ends_with(".*") {
return Ok(()); // prefix wildcard
}
Err(format!("unsupported pubsub topic pattern: {pattern}"))
}
/// Stub for the test harness so executor-core integration tests can
/// build a `Services` bundle without a database. Every call errors.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopPubsubService;
#[async_trait]
impl PubsubService for NoopPubsubService {
async fn publish_durable(
&self,
_cx: &SdkCallCx,
_topic: &str,
_message: serde_json::Value,
) -> Result<(), PubsubError> {
Err(PubsubError::Unavailable("pubsub is not wired in".into()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn exact_match() {
assert!(topic_matches("user.created", "user.created"));
assert!(!topic_matches("user.created", "user.deleted"));
assert!(!topic_matches("user.created", "user.created.x"));
}
#[test]
fn prefix_wildcard() {
assert!(topic_matches("user.*", "user.created"));
assert!(topic_matches("user.*", "user.deleted"));
assert!(!topic_matches("user.*", "users.created"));
assert!(!topic_matches("user.*", "order.created"));
}
#[test]
fn universal() {
assert!(topic_matches("*", "anything"));
assert!(topic_matches("*", "a.b.c"));
}
#[test]
fn validation_accepts_supported_shapes() {
assert!(validate_topic_pattern("*").is_ok());
assert!(validate_topic_pattern("user.created").is_ok());
assert!(validate_topic_pattern("user.*").is_ok());
assert!(validate_topic_pattern("a.b.c").is_ok());
}
#[test]
fn validation_rejects_unsupported_shapes() {
for bad in ["*.created", "**", "a.*.b", "user.*x", "*user", ""] {
assert!(
validate_topic_pattern(bad).is_err(),
"expected {bad:?} to be rejected"
);
}
}
}

View File

@@ -22,7 +22,7 @@ use std::sync::Arc;
use crate::{ use crate::{
DeadLetterService, DocsService, FilesService, HttpService, KvService, ModuleSource, DeadLetterService, DocsService, FilesService, HttpService, KvService, ModuleSource,
NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService,
NoopKvService, NoopModuleSource, ServiceEventEmitter, NoopKvService, NoopModuleSource, NoopPubsubService, PubsubService, ServiceEventEmitter,
}; };
/// SDK service bundle. See module docs for the lifecycle and the v1.1.x /// SDK service bundle. See module docs for the lifecycle and the v1.1.x
@@ -67,6 +67,12 @@ pub struct Services {
/// picloud binary; `NoopFilesService` in tests that don't touch /// picloud binary; `NoopFilesService` in tests that don't touch
/// files. /// files.
pub files: Arc<dyn FilesService>, pub files: Arc<dyn FilesService>,
/// Durable pub/sub (v1.1.5). Scripts get
/// `pubsub::publish_durable(topic, message)`. Backed by a
/// publish-time outbox fan-out in the picloud binary;
/// `NoopPubsubService` in tests that don't publish.
pub pubsub: Arc<dyn PubsubService>,
} }
impl Services { impl Services {
@@ -74,6 +80,7 @@ impl Services {
/// The picloud binary's `main` wires this up after the DB pool is /// The picloud binary's `main` wires this up after the DB pool is
/// open; tests build it from in-memory fakes. /// open; tests build it from in-memory fakes.
#[must_use] #[must_use]
#[allow(clippy::too_many_arguments)] // one Arc per stateful service; a builder would just move the noise
pub fn new( pub fn new(
kv: Arc<dyn KvService>, kv: Arc<dyn KvService>,
docs: Arc<dyn DocsService>, docs: Arc<dyn DocsService>,
@@ -82,6 +89,7 @@ impl Services {
modules: Arc<dyn ModuleSource>, modules: Arc<dyn ModuleSource>,
http: Arc<dyn HttpService>, http: Arc<dyn HttpService>,
files: Arc<dyn FilesService>, files: Arc<dyn FilesService>,
pubsub: Arc<dyn PubsubService>,
) -> Self { ) -> Self {
Self { Self {
kv, kv,
@@ -91,6 +99,7 @@ impl Services {
modules, modules,
http, http,
files, files,
pubsub,
} }
} }
@@ -109,6 +118,7 @@ impl Services {
Arc::new(NoopModuleSource), Arc::new(NoopModuleSource),
Arc::new(NoopHttpService), Arc::new(NoopHttpService),
Arc::new(NoopFilesService), Arc::new(NoopFilesService),
Arc::new(NoopPubsubService),
) )
} }
} }

View File

@@ -177,6 +177,15 @@ pub enum TriggerEvent {
prev: Option<serde_json::Value>, prev: Option<serde_json::Value>,
}, },
/// A durable pub/sub publish fired this handler. v1.1.5. Carries
/// the topic, the JSON-decoded message, and the publish instant.
/// Surfaced to scripts as `ctx.event.pubsub`.
Pubsub {
topic: String,
message: serde_json::Value,
published_at: DateTime<Utc>,
},
/// A dead-letter row fired this handler. The original event is /// A dead-letter row fired this handler. The original event is
/// nested verbatim plus the dead-letter metadata the design notes /// nested verbatim plus the dead-letter metadata the design notes
/// §4 require. /// §4 require.
@@ -203,6 +212,7 @@ impl TriggerEvent {
Self::Docs { .. } => "docs", Self::Docs { .. } => "docs",
Self::Cron { .. } => "cron", Self::Cron { .. } => "cron",
Self::Files { .. } => "files", Self::Files { .. } => "files",
Self::Pubsub { .. } => "pubsub",
Self::DeadLetter { .. } => "dead_letter", Self::DeadLetter { .. } => "dead_letter",
} }
} }

View File

@@ -261,6 +261,15 @@ export interface CreateCronTriggerInput {
retry_base_ms?: number; retry_base_ms?: number;
} }
export interface CreatePubsubTriggerInput {
script_id: string;
topic_pattern: string;
dispatch_mode?: TriggerDispatchMode;
retry_max_attempts?: number;
retry_backoff?: 'exponential' | 'linear' | 'constant';
retry_base_ms?: number;
}
export interface ExecutionResult { export interface ExecutionResult {
status: number; status: number;
headers: Record<string, string>; headers: Record<string, string>;
@@ -632,6 +641,11 @@ export const api = {
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/cron`, `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/cron`,
{ method: 'POST', body: JSON.stringify(input) } { method: 'POST', body: JSON.stringify(input) }
), ),
createPubsub: (idOrSlug: string, input: CreatePubsubTriggerInput) =>
adminRequest<Trigger>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/pubsub`,
{ method: 'POST', body: JSON.stringify(input) }
),
remove: (idOrSlug: string, triggerId: string) => remove: (idOrSlug: string, triggerId: string) =>
adminRequest<null>( adminRequest<null>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/${triggerId}`, `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/${triggerId}`,

View File

@@ -118,6 +118,11 @@
let createCronTimezone = $state('UTC'); let createCronTimezone = $state('UTC');
let creatingCron = $state(false); let creatingCron = $state(false);
let createCronError = $state<string | null>(null); let createCronError = $state<string | null>(null);
// Pub/Sub triggers (v1.1.5).
let createPubsubScriptId = $state('');
let createPubsubTopic = $state('');
let creatingPubsub = $state(false);
let createPubsubError = $state<string | null>(null);
let triggerToRemove = $state<Trigger | null>(null); let triggerToRemove = $state<Trigger | null>(null);
let removingTrigger = $state(false); let removingTrigger = $state(false);
// Endpoint scripts only — modules can't be trigger targets. // Endpoint scripts only — modules can't be trigger targets.
@@ -153,6 +158,27 @@
} }
} }
async function submitCreatePubsub(e: SubmitEvent) {
e.preventDefault();
if (!app) return;
creatingPubsub = true;
createPubsubError = null;
try {
await api.triggers.createPubsub(app.id, {
script_id: createPubsubScriptId,
topic_pattern: createPubsubTopic.trim()
});
createPubsubScriptId = '';
createPubsubTopic = '';
await loadTriggers(app.id);
} catch (err) {
createPubsubError =
err instanceof ApiError ? err.message : err instanceof Error ? err.message : String(err);
} finally {
creatingPubsub = false;
}
}
async function confirmRemoveTrigger() { async function confirmRemoveTrigger() {
if (!app || !triggerToRemove) return; if (!app || !triggerToRemove) return;
removingTrigger = true; removingTrigger = true;
@@ -843,6 +869,42 @@
</div> </div>
</form> </form>
<h2>Pub/Sub triggers</h2>
<p class="muted">
Subscribe an endpoint script to durable pub/sub messages. Topic
patterns are an exact topic (<code>user.created</code>), a prefix
wildcard (<code>user.*</code>), or <code>*</code> for every topic.
</p>
<form class="create-form" onsubmit={submitCreatePubsub}>
<div class="row">
<label>
<span>Target script</span>
<select bind:value={createPubsubScriptId} required>
<option value="" disabled>Select an endpoint script…</option>
{#each endpointScripts as s (s.id)}
<option value={s.id}>{s.name}</option>
{/each}
</select>
</label>
<label>
<span>Topic pattern</span>
<input bind:value={createPubsubTopic} required placeholder="user.*" />
</label>
</div>
{#if createPubsubError}
<div class="error">{createPubsubError}</div>
{/if}
<div class="actions">
<button
type="submit"
disabled={creatingPubsub || !createPubsubScriptId || !createPubsubTopic.trim()}
>
{creatingPubsub ? 'Creating…' : 'Create pub/sub trigger'}
</button>
</div>
</form>
{#if triggers.length === 0} {#if triggers.length === 0}
<p class="muted">No triggers in this app yet.</p> <p class="muted">No triggers in this app yet.</p>
{:else} {:else}
@@ -857,9 +919,11 @@
<span class="muted small"> <span class="muted small">
last fired: {t.details.last_fired_at ?? 'never'} last fired: {t.details.last_fired_at ?? 'never'}
</span> </span>
{:else if t.details.kind === 'kv' || t.details.kind === 'docs'} {:else if t.details.kind === 'kv' || t.details.kind === 'docs' || t.details.kind === 'files'}
<code>{t.details.collection_glob}</code> <code>{t.details.collection_glob}</code>
<span class="muted">{t.details.ops.join(', ') || 'any op'}</span> <span class="muted">{t.details.ops.join(', ') || 'any op'}</span>
{:else if t.details.kind === 'pubsub'}
<code>{t.details.topic_pattern}</code>
{/if} {/if}
<span class="muted small">{t.script_id}</span> <span class="muted small">{t.script_id}</span>
</div> </div>