//! `PubsubServiceImpl` — wires `PubsubRepo` underneath the //! `picloud_shared::PubsubService` trait scripts see via the Rhai //! bridge. //! //! Mirrors the other stateful services: script-as-gate authz //! (`AppPubsubPublish`, skipped when `cx.principal` is `None`), with the //! backend doing a publish-time outbox fan-out instead of a row write. //! No `ServiceEventEmitter` here — pub/sub publishes directly to the //! outbox; it doesn't mutate local data that other triggers observe. use std::sync::Arc; use async_trait::async_trait; use picloud_shared::subscriber_token::{self, TokenClaims}; use picloud_shared::{ PubsubError, PubsubService, RealtimeBroadcaster, RealtimeEvent, SdkCallCx, TriggerEvent, }; use crate::app_secrets_repo::AppSecretsRepo; use crate::authz::{self, AuthzRepo, Capability}; use crate::pubsub_repo::{PublishCtx, PubsubRepo, PubsubRepoError}; use crate::topic_repo::TopicRepo; /// TTL bounds (seconds) for `pubsub::subscriber_token`. Env-overridable /// via `PICLOUD_SUBSCRIBER_TOKEN_TTL_{MIN,MAX,DEFAULT}_SEC`. #[derive(Debug, Clone, Copy)] pub struct SubscriberTokenConfig { pub min_ttl: i64, pub max_ttl: i64, pub default_ttl: i64, } impl SubscriberTokenConfig { #[must_use] pub const fn conservative() -> Self { Self { min_ttl: 10, max_ttl: 86_400, default_ttl: 3_600, } } /// Load from env, falling back to the conservative defaults for any /// missing / invalid value. #[must_use] pub fn from_env() -> Self { let mut c = Self::conservative(); load_i64(&mut c.min_ttl, "PICLOUD_SUBSCRIBER_TOKEN_TTL_MIN_SEC"); load_i64(&mut c.max_ttl, "PICLOUD_SUBSCRIBER_TOKEN_TTL_MAX_SEC"); load_i64( &mut c.default_ttl, "PICLOUD_SUBSCRIBER_TOKEN_TTL_DEFAULT_SEC", ); c } } impl Default for SubscriberTokenConfig { fn default() -> Self { Self::conservative() } } fn load_i64(dst: &mut i64, key: &str) { if let Ok(v) = std::env::var(key) { match v.parse::() { Ok(n) if n > 0 => *dst = n, _ => tracing::warn!(env = key, value = %v, "ignoring invalid token-ttl value"), } } } pub struct PubsubServiceImpl { repo: Arc, authz: Arc, // Realtime extras (v1.1.6) — optional so the existing two-arg // constructor (and its unit tests) keep working without them. The // production binary attaches them via `with_realtime`. realtime: Option>, topics: Option>, secrets: Option>, token_config: SubscriberTokenConfig, } impl PubsubServiceImpl { #[must_use] pub fn new(repo: Arc, authz: Arc) -> Self { Self { repo, authz, realtime: None, topics: None, secrets: None, token_config: SubscriberTokenConfig::conservative(), } } /// Attach the v1.1.6 realtime surface: the in-process broadcaster /// (publish fan-out to SSE subscribers), the topic registry + /// app-secrets repo (subscriber-token minting), and the TTL config. #[must_use] pub fn with_realtime( mut self, broadcaster: Arc, topics: Arc, secrets: Arc, token_config: SubscriberTokenConfig, ) -> Self { self.realtime = Some(broadcaster); self.topics = Some(topics); self.secrets = Some(secrets); self.token_config = token_config; self } async fn check_publish(&self, cx: &SdkCallCx) -> Result<(), PubsubError> { if let Some(ref principal) = cx.principal { authz::require( &*self.authz, principal, Capability::AppPubsubPublish(cx.app_id), ) .await .map_err(|_| PubsubError::Forbidden)?; } Ok(()) } } impl From for PubsubError { fn from(e: PubsubRepoError) -> Self { Self::Unavailable(e.to_string()) } } #[async_trait] impl PubsubService for PubsubServiceImpl { async fn publish_durable( &self, cx: &SdkCallCx, topic: &str, message: serde_json::Value, ) -> Result<(), PubsubError> { if topic.trim().is_empty() { return Err(PubsubError::EmptyTopic); } self.check_publish(cx).await?; // `published_at` is stamped once on the manager side so every // delivery path — durable triggers AND the realtime broadcast — // agrees on one instant. The message is cloned into the trigger // event so the realtime path can reuse the original. let published_at = chrono::Utc::now(); let event = TriggerEvent::Pubsub { topic: topic.to_string(), message: message.clone(), published_at, }; let payload = serde_json::to_value(&event) .map_err(|e| PubsubError::Rejected(format!("event serialize: {e}")))?; let publish_ctx = PublishCtx { app_id: cx.app_id, origin_principal: cx.principal.as_ref().map(|p| p.user_id), trigger_depth: cx.trigger_depth, root_execution_id: cx.root_execution_id, }; // Order (design notes §8): transactional outbox fan-out + commit // FIRST; only then the best-effort realtime broadcast. If the // fan-out fails, the publish throws and no broadcast happens. If // the broadcast fails after a committed fan-out, trigger // deliveries still happen and only SSE subscribers miss this // event (no replay in v1.1.6). // // No matching triggers → 0 rows written, publish still succeeds. self.repo .fan_out_publish(publish_ctx, topic, payload) .await?; // Non-transactional, best-effort fan-out to in-process SSE // subscribers. Run on a child task so a panicking broadcaster // (or a future cluster-mode resolver fault) becomes a warn log, // never a failed publish — the durable deliveries already // committed above. if let Some(realtime) = self.realtime.clone() { let app_id = cx.app_id; let topic_owned = topic.to_string(); let realtime_event = RealtimeEvent { topic: topic_owned.clone(), message, published_at, }; let handle = tokio::spawn(async move { realtime.publish(app_id, &topic_owned, realtime_event).await; }); if let Err(e) = handle.await { tracing::warn!(error = %e, "realtime broadcast failed; publish unaffected"); } } Ok(()) } async fn mint_subscriber_token( &self, cx: &SdkCallCx, topics: Vec, ttl_seconds: Option, ) -> Result { // Anonymous (public-HTTP) scripts can't mint — that would bypass // the token-minting authz boundary. let Some(principal) = cx.principal.as_ref() else { return Err(PubsubError::SubscriberToken( "pubsub::subscriber_token: requires an authenticated principal \ (a script on a public route cannot mint tokens)" .into(), )); }; // Minting reuses the existing pub/sub publish capability (no new // scope — the seven-scope commitment holds). authz::require( &*self.authz, principal, Capability::AppPubsubPublish(cx.app_id), ) .await .map_err(|_| PubsubError::Forbidden)?; let (Some(topic_repo), Some(secrets)) = (self.topics.as_ref(), self.secrets.as_ref()) else { return Err(PubsubError::Unavailable( "subscriber tokens are not wired in".into(), )); }; if topics.is_empty() { return Err(PubsubError::SubscriberToken( "pubsub::subscriber_token: topics list must not be empty".into(), )); } let ttl = ttl_seconds.unwrap_or(self.token_config.default_ttl); if ttl < self.token_config.min_ttl || ttl > self.token_config.max_ttl { return Err(PubsubError::SubscriberToken(format!( "pubsub::subscriber_token: ttl_seconds must be between {} and {}", self.token_config.min_ttl, self.token_config.max_ttl ))); } // Every requested topic must be registered as externally // subscribable in this app — fail fast rather than mint a token // that won't work. for name in &topics { let registered = topic_repo .get(cx.app_id, name) .await .map_err(|e| PubsubError::Unavailable(e.to_string()))?; if !registered.is_some_and(|t| t.external_subscribable) { return Err(PubsubError::SubscriberToken(format!( "pubsub::subscriber_token: topic {name} is not externally subscribable" ))); } } let key = secrets .get_or_create_signing_key(cx.app_id) .await .map_err(|e| PubsubError::Unavailable(e.to_string()))?; let now = chrono::Utc::now().timestamp(); let claims = TokenClaims { app_id: cx.app_id, topics, exp: now.saturating_add(ttl), iat: now, }; Ok(subscriber_token::sign(&key, &claims)) } } // ---------------------------------------------------------------------------- // Tests — in-memory PubsubRepo so unit tests don't need Postgres. The // real transactional fan-out is covered against a live DB by the // integration suite; the in-memory fake models the all-or-nothing // commit so the rollback semantics can be asserted without a DB. // ---------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; use crate::authz::{AuthzError, AuthzRepo}; use async_trait::async_trait; use picloud_shared::{ topic_matches, AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, Principal, RequestId, ScriptId, UserId, }; use std::sync::Mutex; /// In-memory pubsub repo. Holds a set of `(app, pattern)` /// subscriptions and records the outbox rows a publish would write. /// `fail_at` simulates a mid-fan-out INSERT failure: when set to /// `Some(n)`, the n-th (1-indexed) matching row errors and NOTHING /// is recorded — modelling the single-transaction rollback. struct InMemoryPubsubRepo { subs: Vec<(AppId, String)>, written: Mutex>, fail_at: Option, } impl InMemoryPubsubRepo { fn new(subs: Vec<(AppId, String)>) -> Self { Self { subs, written: Mutex::new(Vec::new()), fail_at: None, } } fn written_count(&self) -> usize { self.written.lock().unwrap().len() } } #[async_trait] impl PubsubRepo for InMemoryPubsubRepo { async fn fan_out_publish( &self, ctx: PublishCtx, topic: &str, _event_payload: serde_json::Value, ) -> Result { let matches: Vec<&(AppId, String)> = self .subs .iter() .filter(|(a, pat)| *a == ctx.app_id && topic_matches(pat, topic)) .collect(); let mut staged = Vec::new(); for (i, _) in matches.iter().enumerate() { if self.fail_at == Some(i + 1) { // Rollback: nothing was committed. return Err(PubsubRepoError::Db(sqlx::Error::Protocol( "simulated insert failure".into(), ))); } staged.push((ctx.app_id, topic.to_string())); } let n = staged.len(); self.written.lock().unwrap().extend(staged); Ok(u32::try_from(n).unwrap_or(u32::MAX)) } } #[derive(Default)] struct DenyingAuthzRepo; #[async_trait] impl AuthzRepo for DenyingAuthzRepo { async fn membership( &self, _user_id: UserId, _app_id: AppId, ) -> Result, AuthzError> { Ok(None) } } #[derive(Default)] struct EditorAuthzRepo; #[async_trait] impl AuthzRepo for EditorAuthzRepo { async fn membership( &self, _user_id: UserId, _app_id: AppId, ) -> Result, AuthzError> { Ok(Some(AppRole::Editor)) } } fn anon_cx(app_id: AppId) -> SdkCallCx { SdkCallCx { app_id, script_id: ScriptId::new(), principal: None, execution_id: ExecutionId::new(), request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), is_dead_letter_handler: false, event: None, } } fn member_cx(app_id: AppId) -> SdkCallCx { SdkCallCx { principal: Some(Principal { user_id: AdminUserId::new(), instance_role: InstanceRole::Member, scopes: None, app_binding: None, }), ..anon_cx(app_id) } } fn svc(repo: Arc, authz: Arc) -> PubsubServiceImpl { PubsubServiceImpl::new(repo, authz) } #[tokio::test] async fn publish_writes_one_row_per_matching_trigger() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![ (app, "user.*".into()), (app, "user.created".into()), (app, "order.*".into()), // does not match ])); let files = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); files .publish_durable(&anon_cx(app), "user.created", serde_json::json!({"id": 1})) .await .unwrap(); // Two of the three subscriptions match "user.created". assert_eq!(repo.written_count(), 2); } #[tokio::test] async fn no_matching_trigger_succeeds_silently() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app, "order.*".into())])); let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); svc.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1)) .await .unwrap(); assert_eq!(repo.written_count(), 0); } #[tokio::test] async fn empty_topic_rejected() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); let svc = svc(repo, Arc::new(DenyingAuthzRepo)); let err = svc .publish_durable(&anon_cx(app), " ", serde_json::json!(1)) .await .unwrap_err(); assert!(matches!(err, PubsubError::EmptyTopic)); } #[tokio::test] async fn cross_app_isolation() { let app_a = AppId::new(); let app_b = AppId::new(); // The only subscription belongs to app B. let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app_b, "*".into())])); let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); // App A publishes — app B's trigger must NOT fire. svc.publish_durable(&anon_cx(app_a), "user.created", serde_json::json!(1)) .await .unwrap(); assert_eq!(repo.written_count(), 0); } #[tokio::test] async fn fan_out_is_transactional_all_or_nothing() { let app = AppId::new(); let mut repo = InMemoryPubsubRepo::new(vec![ (app, "*".into()), (app, "user.*".into()), (app, "user.created".into()), ]); repo.fail_at = Some(3); // fail on the 3rd matching insert let repo = Arc::new(repo); let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo)); let err = svc .publish_durable(&anon_cx(app), "user.created", serde_json::json!(1)) .await .unwrap_err(); assert!(matches!(err, PubsubError::Unavailable(_))); // Rollback: no partial fan-out survived. assert_eq!(repo.written_count(), 0); } #[tokio::test] async fn anonymous_cx_skips_authz() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); let svc = svc(repo, Arc::new(DenyingAuthzRepo)); // No principal → no authz check even with a denying repo. svc.publish_durable(&anon_cx(app), "t", serde_json::json!(1)) .await .unwrap(); } #[tokio::test] async fn member_without_role_is_forbidden() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); let svc = svc(repo, Arc::new(DenyingAuthzRepo)); let err = svc .publish_durable(&member_cx(app), "t", serde_json::json!(1)) .await .unwrap_err(); assert!(matches!(err, PubsubError::Forbidden)); } #[tokio::test] async fn member_with_editor_role_allowed() { let app = AppId::new(); let repo = Arc::new(InMemoryPubsubRepo::new(vec![])); let svc = svc(repo, Arc::new(EditorAuthzRepo)); svc.publish_durable(&member_cx(app), "t", serde_json::json!(1)) .await .unwrap(); } // ------------------------------------------------------------------ // v1.1.6 realtime broadcast + subscriber-token minting // ------------------------------------------------------------------ use crate::app_secrets_repo::{AppSecretsRepo, AppSecretsRepoError}; use crate::topic_repo::{Topic, TopicAuthMode, TopicRepo, TopicRepoError}; use picloud_orchestrator_core::InProcessBroadcaster; use picloud_shared::{RealtimeBroadcaster, RealtimeEvent}; /// Topic repo fake: returns the configured topics as registered + /// externally-subscribable (unless absent). struct FakeTopicRepo(Vec); #[async_trait] impl TopicRepo for FakeTopicRepo { async fn create( &self, _: AppId, _: &str, _: bool, _: TopicAuthMode, ) -> Result { unimplemented!() } async fn list(&self, _: AppId) -> Result, TopicRepoError> { unimplemented!() } async fn get(&self, _: AppId, name: &str) -> Result, TopicRepoError> { Ok(self.0.iter().any(|t| t == name).then(|| Topic { name: name.to_string(), external_subscribable: true, auth_mode: TopicAuthMode::Token, created_at: chrono::Utc::now(), updated_at: chrono::Utc::now(), })) } async fn update( &self, _: AppId, _: &str, _: Option, _: Option, ) -> Result, TopicRepoError> { unimplemented!() } async fn delete(&self, _: AppId, _: &str) -> Result { unimplemented!() } } #[derive(Default)] struct FakeSecrets; #[async_trait] impl AppSecretsRepo for FakeSecrets { async fn get_or_create_signing_key( &self, _: AppId, ) -> Result, AppSecretsRepoError> { Ok(vec![42u8; 32]) } async fn signing_key(&self, _: AppId) -> Result>, AppSecretsRepoError> { Ok(Some(vec![42u8; 32])) } } /// Broadcaster that panics on publish — proves a broadcast fault /// can't fail the publish. struct PanicBroadcaster; #[async_trait] impl RealtimeBroadcaster for PanicBroadcaster { async fn subscribe( &self, _: AppId, _: &str, ) -> Result, picloud_shared::BroadcasterError> { unimplemented!() } async fn publish(&self, _: AppId, _: &str, _: RealtimeEvent) { panic!("boom"); } async fn drop_topic(&self, _: AppId, _: &str) {} } fn realtime_svc( repo: Arc, broadcaster: Arc, topics: Vec, ) -> PubsubServiceImpl { PubsubServiceImpl::new(repo, Arc::new(EditorAuthzRepo)).with_realtime( broadcaster, Arc::new(FakeTopicRepo(topics)), Arc::new(FakeSecrets), SubscriberTokenConfig::conservative(), ) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn publish_broadcasts_to_in_process_subscribers() { let app = AppId::new(); let broadcaster = Arc::new(InProcessBroadcaster::new(16)); let mut rx = broadcaster.subscribe(app, "chat").await.unwrap(); let svc = realtime_svc( Arc::new(InMemoryPubsubRepo::new(vec![])), broadcaster, vec![], ); svc.publish_durable(&anon_cx(app), "chat", serde_json::json!({ "hi": 1 })) .await .unwrap(); let ev = rx.recv().await.unwrap(); assert_eq!(ev.topic, "chat"); assert_eq!(ev.message, serde_json::json!({ "hi": 1 })); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn panicking_broadcaster_does_not_fail_publish() { let app = AppId::new(); let svc = realtime_svc( Arc::new(InMemoryPubsubRepo::new(vec![])), Arc::new(PanicBroadcaster), vec![], ); // The outbox fan-out committed; the broadcast panic is swallowed. svc.publish_durable(&anon_cx(app), "chat", serde_json::json!(1)) .await .expect("publish must succeed despite broadcast panic"); } fn mint_svc(topics: Vec) -> PubsubServiceImpl { realtime_svc( Arc::new(InMemoryPubsubRepo::new(vec![])), Arc::new(picloud_shared::NoopRealtimeBroadcaster), topics, ) } #[tokio::test] async fn mint_returns_token_scoped_to_topics() { let app = AppId::new(); let svc = mint_svc(vec!["chat".into(), "notify".into()]); let token = svc .mint_subscriber_token( &member_cx(app), vec!["chat".into(), "notify".into()], Some(120), ) .await .unwrap(); // Verify with the fake key; claims carry the topics + expiry. let claims = subscriber_token::verify(&[42u8; 32], &token, chrono::Utc::now().timestamp()) .expect("token verifies"); assert_eq!(claims.app_id, app); assert!(claims.allows_topic("chat") && claims.allows_topic("notify")); assert!(claims.exp > claims.iat); } #[tokio::test] async fn mint_anonymous_principal_throws() { let app = AppId::new(); let svc = mint_svc(vec!["chat".into()]); let err = svc .mint_subscriber_token(&anon_cx(app), vec!["chat".into()], None) .await .unwrap_err(); assert!(matches!(err, PubsubError::SubscriberToken(_))); } #[tokio::test] async fn mint_empty_topics_throws() { let app = AppId::new(); let svc = mint_svc(vec!["chat".into()]); let err = svc .mint_subscriber_token(&member_cx(app), vec![], None) .await .unwrap_err(); assert!(matches!(err, PubsubError::SubscriberToken(_))); } #[tokio::test] async fn mint_ttl_below_min_and_above_max_throw() { let app = AppId::new(); let svc = mint_svc(vec!["chat".into()]); for bad in [Some(5), Some(90_000)] { let err = svc .mint_subscriber_token(&member_cx(app), vec!["chat".into()], bad) .await .unwrap_err(); assert!( matches!(err, PubsubError::SubscriberToken(_)), "ttl {bad:?}" ); } } #[tokio::test] async fn mint_unregistered_topic_throws_with_message() { let app = AppId::new(); // "chat" registered; "secret" is not. let svc = mint_svc(vec!["chat".into()]); let err = svc .mint_subscriber_token(&member_cx(app), vec!["chat".into(), "secret".into()], None) .await .unwrap_err(); match err { PubsubError::SubscriberToken(msg) => { assert!( msg.contains("topic secret is not externally subscribable"), "got: {msg}" ); } other => panic!("expected SubscriberToken, got {other:?}"), } } }