//! In-process `RealtimeBroadcaster` — the SSE fan-out registry (v1.1.6). //! //! Sibling of [`crate::inbox::InboxRegistry`], but multi-receiver and //! repeated-event: a `Mutex>` //! over `tokio::sync::broadcast` instead of a oneshot map. The publish //! side ([`PubsubServiceImpl`]) and the SSE subscribe side both hold one //! shared `Arc`. //! //! Delivery is best-effort: each channel has a bounded buffer //! (`PICLOUD_REALTIME_BROADCAST_CAPACITY`, default 64); a slow consumer //! that falls behind sees the oldest events dropped (standard //! `broadcast` lag semantics — the receiver gets `RecvError::Lagged`). //! SSE's transport-layer auto-reconnect is the recovery path; there's no //! server-side replay in v1.1.6. //! //! Channels are created lazily on first subscribe. A periodic GC task //! ([`spawn_realtime_gc`]) drops senders whose receiver count has fallen //! to zero so one-shot subscribers don't grow the map unboundedly. //! //! Cluster mode (v1.3+) swaps this for a Postgres `LISTEN/NOTIFY`-backed //! resolver behind the same `RealtimeBroadcaster` trait. use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; use picloud_shared::{AppId, BroadcasterError, RealtimeBroadcaster, RealtimeEvent}; use tokio::sync::broadcast; /// Default per-channel broadcast buffer depth. pub const DEFAULT_BROADCAST_CAPACITY: usize = 64; const ENV_CAPACITY: &str = "PICLOUD_REALTIME_BROADCAST_CAPACITY"; /// Default GC sweep interval for empty channels. pub const DEFAULT_GC_INTERVAL_SECS: u64 = 60; pub struct InProcessBroadcaster { inner: Mutex>>, capacity: usize, } impl InProcessBroadcaster { #[must_use] pub fn new(capacity: usize) -> Self { Self { inner: Mutex::new(HashMap::new()), capacity: capacity.max(1), } } /// Build from `PICLOUD_REALTIME_BROADCAST_CAPACITY` (default 64). #[must_use] pub fn from_env() -> Self { let capacity = match std::env::var(ENV_CAPACITY) { Err(_) => DEFAULT_BROADCAST_CAPACITY, Ok(v) => match v.parse::() { Ok(n) if n > 0 => n, Ok(_) => { tracing::warn!(env = ENV_CAPACITY, value = %v, "must be > 0; using default"); DEFAULT_BROADCAST_CAPACITY } Err(e) => { tracing::warn!(env = ENV_CAPACITY, value = %v, error = %e, "invalid; using default"); DEFAULT_BROADCAST_CAPACITY } }, }; Self::new(capacity) } /// Number of live channels in the map (test/observability helper). #[must_use] pub fn channel_count(&self) -> usize { self.inner.lock().map(|g| g.len()).unwrap_or(0) } /// Drop senders with zero receivers. Returns how many were removed. /// Called periodically by [`spawn_realtime_gc`]. pub fn gc(&self) -> usize { let Ok(mut g) = self.inner.lock() else { return 0; }; let before = g.len(); g.retain(|_, tx| tx.receiver_count() > 0); before - g.len() } } #[async_trait] impl RealtimeBroadcaster for InProcessBroadcaster { async fn subscribe( &self, app_id: AppId, topic: &str, ) -> Result, BroadcasterError> { let mut g = self .inner .lock() .map_err(|_| BroadcasterError::Unavailable("broadcaster map poisoned".into()))?; let tx = g .entry((app_id, topic.to_string())) .or_insert_with(|| broadcast::channel(self.capacity).0); Ok(tx.subscribe()) } async fn publish(&self, app_id: AppId, topic: &str, event: RealtimeEvent) { let Ok(g) = self.inner.lock() else { return; }; // Only fan out to an existing channel: a topic with no live // subscribers has no sender (publish never creates one). `send` // returns Err iff every receiver has dropped — a benign no-op. if let Some(tx) = g.get(&(app_id, topic.to_string())) { let _ = tx.send(event); } } async fn drop_topic(&self, app_id: AppId, topic: &str) { if let Ok(mut g) = self.inner.lock() { // Removing the sender closes the channel; existing receivers // observe `RecvError::Closed` and disconnect cleanly. g.remove(&(app_id, topic.to_string())); } } } /// Spawn the background GC sweep that drops empty channels every /// `interval_secs` (default [`DEFAULT_GC_INTERVAL_SECS`]). Spawned at /// startup alongside the other housekeeping tasks. pub fn spawn_realtime_gc(broadcaster: Arc, interval_secs: u64) { let period = Duration::from_secs(interval_secs.max(1)); tokio::spawn(async move { let mut ticker = tokio::time::interval(period); ticker.tick().await; // skip the immediate first fire loop { ticker.tick().await; let removed = broadcaster.gc(); if removed > 0 { tracing::debug!(removed, "realtime broadcaster GC dropped empty channels"); } } }); } #[cfg(test)] mod tests { use super::*; use chrono::Utc; use serde_json::json; fn event(topic: &str, n: i64) -> RealtimeEvent { RealtimeEvent { topic: topic.to_string(), message: json!({ "n": n }), published_at: Utc::now(), } } #[tokio::test] async fn multiple_subscribers_each_receive_each_event() { let b = InProcessBroadcaster::new(16); let app = AppId::new(); let mut rx1 = b.subscribe(app, "chat").await.unwrap(); let mut rx2 = b.subscribe(app, "chat").await.unwrap(); b.publish(app, "chat", event("chat", 1)).await; b.publish(app, "chat", event("chat", 2)).await; for rx in [&mut rx1, &mut rx2] { assert_eq!(rx.recv().await.unwrap().message, json!({ "n": 1 })); assert_eq!(rx.recv().await.unwrap().message, json!({ "n": 2 })); } } #[tokio::test] async fn dropped_subscriber_does_not_leak_after_gc() { let b = InProcessBroadcaster::new(16); let app = AppId::new(); let rx = b.subscribe(app, "t").await.unwrap(); assert_eq!(b.channel_count(), 1); drop(rx); // GC reclaims the now-empty channel. assert_eq!(b.gc(), 1); assert_eq!(b.channel_count(), 0); } #[tokio::test] async fn drop_topic_disconnects_existing_subscribers() { let b = InProcessBroadcaster::new(16); let app = AppId::new(); let mut rx = b.subscribe(app, "t").await.unwrap(); b.drop_topic(app, "t").await; // Sender gone → receiver observes a closed channel. assert!(rx.recv().await.is_err()); assert_eq!(b.channel_count(), 0); } #[tokio::test] async fn slow_consumer_loses_oldest_events() { // Capacity 2: a consumer that never drains sees the oldest // events dropped (broadcast Lagged semantics). let b = InProcessBroadcaster::new(2); let app = AppId::new(); let mut rx = b.subscribe(app, "t").await.unwrap(); for i in 0..5 { b.publish(app, "t", event("t", i)).await; } // First recv reports the lag rather than event 0. let first = rx.recv().await; assert!( matches!(first, Err(broadcast::error::RecvError::Lagged(_))), "expected Lagged, got {first:?}" ); // Subsequent recvs return the most recent buffered events. let next = rx.recv().await.unwrap(); assert_eq!(next.message, json!({ "n": 3 })); } #[tokio::test] async fn cross_app_isolation() { let b = InProcessBroadcaster::new(16); let app_a = AppId::new(); let app_b = AppId::new(); let mut rx_a = b.subscribe(app_a, "shared").await.unwrap(); let mut rx_b = b.subscribe(app_b, "shared").await.unwrap(); b.publish(app_a, "shared", event("shared", 1)).await; // App B's subscriber must not see app A's publish. assert_eq!(rx_a.recv().await.unwrap().message, json!({ "n": 1 })); assert!(rx_b.try_recv().is_err()); } #[tokio::test] async fn publish_with_no_subscribers_is_noop() { let b = InProcessBroadcaster::new(16); let app = AppId::new(); // No subscriber → no sender created → no panic, nothing fanned out. b.publish(app, "ghost", event("ghost", 1)).await; assert_eq!(b.channel_count(), 0); } }