Durable pub/sub through the universal outbox — the sixth trigger kind. - `pubsub::publish_durable(topic, message)` Rhai SDK (no handle; topics ARE the grouping unit). Message JSON-encoded; Blobs base64 at any depth. - `PubsubService` trait in picloud-shared with the topic matcher + validator (exact / `<prefix>.*` / `*`; mid-pattern wildcards rejected). `PostgresPubsubRepo` + `PubsubServiceImpl` in manager-core. - Publish-time fan-out: one outbox row per matching enabled pubsub trigger, all in ONE transaction (no half-fan-out on crash). No matching trigger → publish succeeds silently, zero rows. - `pubsub:*` trigger kind via Layout-E (0020: widen both CHECKs + pubsub_trigger_details + partial index), TriggerEvent::Pubsub + ctx.event.pubsub, dispatcher arm, admin endpoint POST /triggers/pubsub (validates topic pattern + reuses validate_trigger_target). - AppPubsubPublish capability → script:write (seven-scope held). - Dashboard Pub/Sub trigger form on the Triggers tab + list rendering. publish_ephemeral stays deferred to v1.2. ~18 new tests (service in-memory incl. transactional-rollback, shared matcher, bridge encoding). No DB required for the suite. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
119 lines
3.9 KiB
Rust
119 lines
3.9 KiB
Rust
//! `PubsubRepo` — publish-time fan-out for the v1.1.5 `pubsub::*` SDK.
|
|
//!
|
|
//! `publish_durable` writes one outbox row per matching enabled `pubsub`
|
|
//! trigger, all inside a single transaction so a partial fan-out (some
|
|
//! subscribers got rows, others didn't, then a crash) can't happen.
|
|
//! Each delivery row then retries / dead-letters independently through
|
|
//! the existing dispatcher — no pub/sub-specific dispatch branching.
|
|
//!
|
|
//! Topic pattern matching runs in Rust (`picloud_shared::topic_matches`)
|
|
//! against the small set of the app's enabled pubsub triggers, keeping
|
|
//! the SELECT trivial. v1.2 can add a topic-trie index if fan-out
|
|
//! becomes a hot path.
|
|
|
|
use async_trait::async_trait;
|
|
use picloud_shared::{topic_matches, AdminUserId, AppId, ExecutionId};
|
|
use sqlx::PgPool;
|
|
use uuid::Uuid;
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum PubsubRepoError {
|
|
#[error("database error: {0}")]
|
|
Db(#[from] sqlx::Error),
|
|
}
|
|
|
|
/// The execution-context bits a fan-out needs to stamp onto each outbox
|
|
/// row. Derived from the publishing script's `SdkCallCx`.
|
|
#[derive(Debug, Clone, Copy)]
|
|
pub struct PublishCtx {
|
|
pub app_id: AppId,
|
|
pub origin_principal: Option<AdminUserId>,
|
|
pub trigger_depth: u32,
|
|
pub root_execution_id: ExecutionId,
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait PubsubRepo: Send + Sync {
|
|
/// Fan out a publish to every matching enabled pubsub trigger in
|
|
/// `ctx.app_id`, inserting one outbox row each in a SINGLE
|
|
/// transaction. `event_payload` is the serialized
|
|
/// `TriggerEvent::Pubsub`. Returns the number of delivery rows
|
|
/// written (0 when no trigger matched — the publish still succeeds).
|
|
async fn fan_out_publish(
|
|
&self,
|
|
ctx: PublishCtx,
|
|
topic: &str,
|
|
event_payload: serde_json::Value,
|
|
) -> Result<u32, PubsubRepoError>;
|
|
}
|
|
|
|
pub struct PostgresPubsubRepo {
|
|
pool: PgPool,
|
|
}
|
|
|
|
impl PostgresPubsubRepo {
|
|
#[must_use]
|
|
pub fn new(pool: PgPool) -> Self {
|
|
Self { pool }
|
|
}
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct PubsubTriggerRow {
|
|
id: Uuid,
|
|
script_id: Uuid,
|
|
topic_pattern: String,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PubsubRepo for PostgresPubsubRepo {
|
|
async fn fan_out_publish(
|
|
&self,
|
|
ctx: PublishCtx,
|
|
topic: &str,
|
|
event_payload: serde_json::Value,
|
|
) -> Result<u32, PubsubRepoError> {
|
|
let mut tx = self.pool.begin().await?;
|
|
|
|
// Load all enabled pubsub triggers for the app; filter by topic
|
|
// pattern in Rust (keeps the query simple, honours the
|
|
// empty/`*`/prefix semantics without teaching SQL about globs).
|
|
let rows: Vec<PubsubTriggerRow> = sqlx::query_as(
|
|
"SELECT t.id, t.script_id, d.topic_pattern \
|
|
FROM triggers t \
|
|
JOIN pubsub_trigger_details d ON d.trigger_id = t.id \
|
|
WHERE t.app_id = $1 AND t.kind = 'pubsub' AND t.enabled = TRUE",
|
|
)
|
|
.bind(ctx.app_id.into_inner())
|
|
.fetch_all(&mut *tx)
|
|
.await?;
|
|
|
|
let mut written: u32 = 0;
|
|
for r in rows {
|
|
if !topic_matches(&r.topic_pattern, topic) {
|
|
continue;
|
|
}
|
|
sqlx::query(
|
|
"INSERT INTO outbox ( \
|
|
app_id, source_kind, trigger_id, script_id, reply_to, \
|
|
payload, origin_principal, trigger_depth, root_execution_id \
|
|
) VALUES ($1, 'pubsub', $2, $3, NULL, $4, $5, $6, $7)",
|
|
)
|
|
.bind(ctx.app_id.into_inner())
|
|
.bind(r.id)
|
|
.bind(r.script_id)
|
|
.bind(&event_payload)
|
|
.bind(ctx.origin_principal.map(AdminUserId::into_inner))
|
|
.bind(i32::try_from(ctx.trigger_depth.saturating_add(1)).unwrap_or(1))
|
|
.bind(ctx.root_execution_id.into_inner())
|
|
.execute(&mut *tx)
|
|
.await?;
|
|
written += 1;
|
|
}
|
|
|
|
// Commit once — all rows or none.
|
|
tx.commit().await?;
|
|
Ok(written)
|
|
}
|
|
}
|