feat(v1.1.6): realtime channels + v1.1.5 follow-ups + version bumps

Server-side realtime SSE on per-app pub/sub topics, plus the three
v1.1.5 follow-ups and the version bumps.

Realtime:
- topics registry (0021) + admin endpoints + Capability::AppTopicManage
  (-> app:admin; no new scope).
- GET /realtime/topics/{topic} SSE endpoint (orchestrator-core data
  plane): Host -> app, RealtimeAuthority gate (404 missing/internal,
  401 bad/absent token), broadcast::Receiver stream + heartbeat.
- RealtimeBroadcaster / RealtimeEvent / RealtimeAuthority traits
  (picloud-shared); InProcessBroadcaster + GC (orchestrator-core);
  DB-backed RealtimeAuthorityImpl (manager-core). Publish path fans out
  to in-process subscribers after the durable outbox commit (best-effort,
  panic-isolated).
- HMAC subscriber tokens (subscriber_token.rs) + app_secrets table (0022)
  + pubsub::subscriber_token SDK (schema 1.6 -> 1.7). TTL clamp + env
  overrides.
- Dashboard Topics tab (register/list/edit/delete, prominent external
  badge, flip confirmation).

v1.1.5 follow-ups:
- Empty blobs accepted (NewFile/FileUpdate::validate) + round-trip test.
- Orphan *.tmp.* sweeper (spawn_files_orphan_sweep).
- Dispatcher e2e tests, one per trigger kind (DATABASE_URL-gated).

Versions: workspace 1.1.6, SDK 1.7, dashboard 0.12.0. Schema-snapshot
golden re-blessed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-04 20:18:50 +02:00
parent d064681c49
commit fcbcc576a2
35 changed files with 4333 additions and 63 deletions

View File

@@ -12,6 +12,8 @@ pub mod api;
pub mod client;
pub mod gate;
pub mod inbox;
pub mod realtime;
pub mod realtime_api;
pub mod resolver;
pub mod routing;
@@ -19,4 +21,8 @@ pub use api::{data_plane_router, user_routes_router, DataPlaneState};
pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient, ScriptIdentity};
pub use gate::{AcquireError, ExecutionGate};
pub use inbox::InboxRegistry;
pub use realtime::{spawn_realtime_gc, InProcessBroadcaster, DEFAULT_BROADCAST_CAPACITY};
pub use realtime_api::{
heartbeat_secs_from_env, realtime_router, RealtimeState, DEFAULT_HEARTBEAT_SECS,
};
pub use resolver::{ResolverError, ScriptResolver};

View File

@@ -0,0 +1,242 @@
//! In-process `RealtimeBroadcaster` — the SSE fan-out registry (v1.1.6).
//!
//! Sibling of [`crate::inbox::InboxRegistry`], but multi-receiver and
//! repeated-event: a `Mutex<HashMap<(AppId, topic), broadcast::Sender>>`
//! over `tokio::sync::broadcast` instead of a oneshot map. The publish
//! side ([`PubsubServiceImpl`]) and the SSE subscribe side both hold one
//! shared `Arc<InProcessBroadcaster>`.
//!
//! Delivery is best-effort: each channel has a bounded buffer
//! (`PICLOUD_REALTIME_BROADCAST_CAPACITY`, default 64); a slow consumer
//! that falls behind sees the oldest events dropped (standard
//! `broadcast` lag semantics — the receiver gets `RecvError::Lagged`).
//! SSE's transport-layer auto-reconnect is the recovery path; there's no
//! server-side replay in v1.1.6.
//!
//! Channels are created lazily on first subscribe. A periodic GC task
//! ([`spawn_realtime_gc`]) drops senders whose receiver count has fallen
//! to zero so one-shot subscribers don't grow the map unboundedly.
//!
//! Cluster mode (v1.3+) swaps this for a Postgres `LISTEN/NOTIFY`-backed
//! resolver behind the same `RealtimeBroadcaster` trait.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use picloud_shared::{AppId, BroadcasterError, RealtimeBroadcaster, RealtimeEvent};
use tokio::sync::broadcast;
/// Default per-channel broadcast buffer depth.
pub const DEFAULT_BROADCAST_CAPACITY: usize = 64;
const ENV_CAPACITY: &str = "PICLOUD_REALTIME_BROADCAST_CAPACITY";
/// Default GC sweep interval for empty channels.
pub const DEFAULT_GC_INTERVAL_SECS: u64 = 60;
pub struct InProcessBroadcaster {
inner: Mutex<HashMap<(AppId, String), broadcast::Sender<RealtimeEvent>>>,
capacity: usize,
}
impl InProcessBroadcaster {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
inner: Mutex::new(HashMap::new()),
capacity: capacity.max(1),
}
}
/// Build from `PICLOUD_REALTIME_BROADCAST_CAPACITY` (default 64).
#[must_use]
pub fn from_env() -> Self {
let capacity = match std::env::var(ENV_CAPACITY) {
Err(_) => DEFAULT_BROADCAST_CAPACITY,
Ok(v) => match v.parse::<usize>() {
Ok(n) if n > 0 => n,
Ok(_) => {
tracing::warn!(env = ENV_CAPACITY, value = %v, "must be > 0; using default");
DEFAULT_BROADCAST_CAPACITY
}
Err(e) => {
tracing::warn!(env = ENV_CAPACITY, value = %v, error = %e, "invalid; using default");
DEFAULT_BROADCAST_CAPACITY
}
},
};
Self::new(capacity)
}
/// Number of live channels in the map (test/observability helper).
#[must_use]
pub fn channel_count(&self) -> usize {
self.inner.lock().map(|g| g.len()).unwrap_or(0)
}
/// Drop senders with zero receivers. Returns how many were removed.
/// Called periodically by [`spawn_realtime_gc`].
pub fn gc(&self) -> usize {
let Ok(mut g) = self.inner.lock() else {
return 0;
};
let before = g.len();
g.retain(|_, tx| tx.receiver_count() > 0);
before - g.len()
}
}
#[async_trait]
impl RealtimeBroadcaster for InProcessBroadcaster {
async fn subscribe(
&self,
app_id: AppId,
topic: &str,
) -> Result<broadcast::Receiver<RealtimeEvent>, BroadcasterError> {
let mut g = self
.inner
.lock()
.map_err(|_| BroadcasterError::Unavailable("broadcaster map poisoned".into()))?;
let tx = g
.entry((app_id, topic.to_string()))
.or_insert_with(|| broadcast::channel(self.capacity).0);
Ok(tx.subscribe())
}
async fn publish(&self, app_id: AppId, topic: &str, event: RealtimeEvent) {
let Ok(g) = self.inner.lock() else {
return;
};
// Only fan out to an existing channel: a topic with no live
// subscribers has no sender (publish never creates one). `send`
// returns Err iff every receiver has dropped — a benign no-op.
if let Some(tx) = g.get(&(app_id, topic.to_string())) {
let _ = tx.send(event);
}
}
async fn drop_topic(&self, app_id: AppId, topic: &str) {
if let Ok(mut g) = self.inner.lock() {
// Removing the sender closes the channel; existing receivers
// observe `RecvError::Closed` and disconnect cleanly.
g.remove(&(app_id, topic.to_string()));
}
}
}
/// Spawn the background GC sweep that drops empty channels every
/// `interval_secs` (default [`DEFAULT_GC_INTERVAL_SECS`]). Spawned at
/// startup alongside the other housekeeping tasks.
pub fn spawn_realtime_gc(broadcaster: Arc<InProcessBroadcaster>, interval_secs: u64) {
let period = Duration::from_secs(interval_secs.max(1));
tokio::spawn(async move {
let mut ticker = tokio::time::interval(period);
ticker.tick().await; // skip the immediate first fire
loop {
ticker.tick().await;
let removed = broadcaster.gc();
if removed > 0 {
tracing::debug!(removed, "realtime broadcaster GC dropped empty channels");
}
}
});
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use serde_json::json;
fn event(topic: &str, n: i64) -> RealtimeEvent {
RealtimeEvent {
topic: topic.to_string(),
message: json!({ "n": n }),
published_at: Utc::now(),
}
}
#[tokio::test]
async fn multiple_subscribers_each_receive_each_event() {
let b = InProcessBroadcaster::new(16);
let app = AppId::new();
let mut rx1 = b.subscribe(app, "chat").await.unwrap();
let mut rx2 = b.subscribe(app, "chat").await.unwrap();
b.publish(app, "chat", event("chat", 1)).await;
b.publish(app, "chat", event("chat", 2)).await;
for rx in [&mut rx1, &mut rx2] {
assert_eq!(rx.recv().await.unwrap().message, json!({ "n": 1 }));
assert_eq!(rx.recv().await.unwrap().message, json!({ "n": 2 }));
}
}
#[tokio::test]
async fn dropped_subscriber_does_not_leak_after_gc() {
let b = InProcessBroadcaster::new(16);
let app = AppId::new();
let rx = b.subscribe(app, "t").await.unwrap();
assert_eq!(b.channel_count(), 1);
drop(rx);
// GC reclaims the now-empty channel.
assert_eq!(b.gc(), 1);
assert_eq!(b.channel_count(), 0);
}
#[tokio::test]
async fn drop_topic_disconnects_existing_subscribers() {
let b = InProcessBroadcaster::new(16);
let app = AppId::new();
let mut rx = b.subscribe(app, "t").await.unwrap();
b.drop_topic(app, "t").await;
// Sender gone → receiver observes a closed channel.
assert!(rx.recv().await.is_err());
assert_eq!(b.channel_count(), 0);
}
#[tokio::test]
async fn slow_consumer_loses_oldest_events() {
// Capacity 2: a consumer that never drains sees the oldest
// events dropped (broadcast Lagged semantics).
let b = InProcessBroadcaster::new(2);
let app = AppId::new();
let mut rx = b.subscribe(app, "t").await.unwrap();
for i in 0..5 {
b.publish(app, "t", event("t", i)).await;
}
// First recv reports the lag rather than event 0.
let first = rx.recv().await;
assert!(
matches!(first, Err(broadcast::error::RecvError::Lagged(_))),
"expected Lagged, got {first:?}"
);
// Subsequent recvs return the most recent buffered events.
let next = rx.recv().await.unwrap();
assert_eq!(next.message, json!({ "n": 3 }));
}
#[tokio::test]
async fn cross_app_isolation() {
let b = InProcessBroadcaster::new(16);
let app_a = AppId::new();
let app_b = AppId::new();
let mut rx_a = b.subscribe(app_a, "shared").await.unwrap();
let mut rx_b = b.subscribe(app_b, "shared").await.unwrap();
b.publish(app_a, "shared", event("shared", 1)).await;
// App B's subscriber must not see app A's publish.
assert_eq!(rx_a.recv().await.unwrap().message, json!({ "n": 1 }));
assert!(rx_b.try_recv().is_err());
}
#[tokio::test]
async fn publish_with_no_subscribers_is_noop() {
let b = InProcessBroadcaster::new(16);
let app = AppId::new();
// No subscriber → no sender created → no panic, nothing fanned out.
b.publish(app, "ghost", event("ghost", 1)).await;
assert_eq!(b.channel_count(), 0);
}
}

View File

@@ -0,0 +1,408 @@
//! SSE realtime endpoint — `GET /realtime/topics/{topic}` (v1.1.6).
//!
//! This is a data-plane surface, deliberately NOT under `/api/`
//! (realtime is its own versioning surface per the path scheme). It is
//! merged at the router root by the `picloud` binary alongside
//! `/healthz`, `/version`, and the user-route fallback.
//!
//! Handshake:
//! 1. Resolve `Host` → `app_id` (two-phase dispatch). No app → 404.
//! 2. Extract the token from `Authorization: Bearer <t>` OR `?token=<t>`
//! (EventSource can't set custom headers, so the query form is the
//! browser-compatible path).
//! 3. Ask the injected [`RealtimeAuthority`]: missing/internal topic →
//! 404, bad/absent token on a token-gated topic → 401, otherwise OK.
//! 4. Acquire a `broadcast::Receiver` and stream events as SSE until
//! the client disconnects (dropping the receiver — the broadcaster
//! cleans up on its own).
//!
//! Heartbeats (`:` comment lines) keep idle proxies from closing the
//! connection; interval is `PICLOUD_REALTIME_HEARTBEAT_SEC` (default 30).
use std::sync::Arc;
use std::time::Duration;
use axum::extract::{Path, Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::response::{IntoResponse, Response};
use axum::routing::get;
use axum::Router;
use picloud_shared::{RealtimeAuthority, RealtimeBroadcaster, SubscribeDenied};
use serde::Deserialize;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::{Stream, StreamExt};
use crate::routing::AppDomainTable;
/// Default heartbeat interval (seconds) for idle SSE connections.
pub const DEFAULT_HEARTBEAT_SECS: u64 = 30;
const ENV_HEARTBEAT: &str = "PICLOUD_REALTIME_HEARTBEAT_SEC";
#[derive(Clone)]
pub struct RealtimeState {
/// Host → app_id resolver (shared with the rest of the data plane).
pub app_domains: Arc<AppDomainTable>,
pub broadcaster: Arc<dyn RealtimeBroadcaster>,
pub authority: Arc<dyn RealtimeAuthority>,
pub heartbeat: Duration,
}
impl RealtimeState {
#[must_use]
pub fn new(
app_domains: Arc<AppDomainTable>,
broadcaster: Arc<dyn RealtimeBroadcaster>,
authority: Arc<dyn RealtimeAuthority>,
) -> Self {
Self {
app_domains,
broadcaster,
authority,
heartbeat: Duration::from_secs(heartbeat_secs_from_env()),
}
}
}
/// Read `PICLOUD_REALTIME_HEARTBEAT_SEC` (default 30, must be > 0).
#[must_use]
pub fn heartbeat_secs_from_env() -> u64 {
match std::env::var(ENV_HEARTBEAT) {
Err(_) => DEFAULT_HEARTBEAT_SECS,
Ok(v) => match v.parse::<u64>() {
Ok(n) if n > 0 => n,
_ => {
tracing::warn!(env = ENV_HEARTBEAT, value = %v, "invalid; using default");
DEFAULT_HEARTBEAT_SECS
}
},
}
}
/// Router for the realtime SSE surface. Merged at the router root.
#[must_use]
pub fn realtime_router(state: RealtimeState) -> Router {
Router::new()
.route("/realtime/topics/{topic}", get(sse_topic))
.with_state(state)
}
#[derive(Debug, Deserialize)]
struct TokenQuery {
token: Option<String>,
}
async fn sse_topic(
State(state): State<RealtimeState>,
Path(topic): Path<String>,
Query(q): Query<TokenQuery>,
headers: HeaderMap,
) -> Response {
// 1. Host → app.
let host = headers
.get("host")
.and_then(|h| h.to_str().ok())
.unwrap_or("");
let Some(app_id) = state.app_domains.resolve_app(host) else {
return not_found("no app claims this host");
};
// 2. Token: Authorization: Bearer <t> takes precedence, else ?token=.
let token = bearer_token(&headers).or(q.token);
// 3. Authorize.
match state
.authority
.authorize_subscribe(app_id, &topic, token.as_deref())
.await
{
Ok(()) => {}
Err(SubscribeDenied::NotFound) => return not_found("topic not found"),
Err(SubscribeDenied::Unauthorized) => return unauthorized(),
Err(SubscribeDenied::Backend(e)) => {
tracing::error!(error = %e, "realtime authority backend error");
return (
StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(serde_json::json!({ "error": "internal error" })),
)
.into_response();
}
}
// 4. Subscribe + stream.
let rx = match state.broadcaster.subscribe(app_id, &topic).await {
Ok(rx) => rx,
Err(e) => {
tracing::error!(error = %e, "failed to acquire realtime subscription");
return (
StatusCode::INTERNAL_SERVER_ERROR,
axum::Json(serde_json::json!({ "error": "internal error" })),
)
.into_response();
}
};
let stream = event_stream(rx);
let sse =
Sse::new(stream).keep_alive(KeepAlive::new().interval(state.heartbeat).text("heartbeat"));
// Sse sets Content-Type: text/event-stream + Cache-Control: no-cache.
// Add X-Accel-Buffering: no so an intermediate nginx doesn't buffer
// the stream (ignored by other proxies). Connection management is
// hyper's concern (and is hop-by-hop on HTTP/1.1, server-managed on
// HTTP/2), so we don't set Connection ourselves.
let mut resp = sse.into_response();
resp.headers_mut().insert(
"X-Accel-Buffering",
axum::http::HeaderValue::from_static("no"),
);
resp
}
/// Map the broadcast receiver into a stream of SSE events. Lagged
/// notifications (slow consumer) are skipped; a closed channel
/// (`drop_topic`, or all senders gone) ends the stream and the SSE
/// connection closes cleanly.
fn event_stream(
rx: tokio::sync::broadcast::Receiver<picloud_shared::RealtimeEvent>,
) -> impl Stream<Item = Result<Event, std::convert::Infallible>> {
BroadcastStream::new(rx).filter_map(|item| {
let ev = item.ok()?; // drop Lagged errors
let payload = serde_json::json!({
"topic": ev.topic,
"message": ev.message,
"published_at": ev.published_at.to_rfc3339(),
});
Some(Ok(Event::default().data(payload.to_string())))
})
}
fn bearer_token(headers: &HeaderMap) -> Option<String> {
let raw = headers
.get(axum::http::header::AUTHORIZATION)?
.to_str()
.ok()?;
raw.strip_prefix("Bearer ")
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
}
fn not_found(msg: &str) -> Response {
(
StatusCode::NOT_FOUND,
axum::Json(serde_json::json!({ "error": msg })),
)
.into_response()
}
fn unauthorized() -> Response {
// Generic — never leaks which check failed.
(
StatusCode::UNAUTHORIZED,
axum::Json(serde_json::json!({ "error": "unauthorized" })),
)
.into_response()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::realtime::InProcessBroadcaster;
use crate::routing::AppDomainTable;
use async_trait::async_trait;
use axum::body::Body;
use axum::http::Request;
use picloud_shared::{AppId, RealtimeEvent};
use tower::ServiceExt; // oneshot
/// Authority stub returning a fixed verdict.
struct StubAuthority(Result<(), SubscribeDenied>);
#[async_trait]
impl RealtimeAuthority for StubAuthority {
async fn authorize_subscribe(
&self,
_: AppId,
_: &str,
_: Option<&str>,
) -> Result<(), SubscribeDenied> {
self.0.clone()
}
}
/// App-domain table that maps a fixed host to a fixed app.
fn domains(host: &str, app: AppId) -> Arc<AppDomainTable> {
use crate::routing::{parse_app_domain, CompiledAppDomain};
let d = parse_app_domain(host).unwrap();
let table = AppDomainTable::new();
table.replace(vec![CompiledAppDomain {
app_id: app,
pattern: d.pattern,
shape_key: d.shape_key,
}]);
Arc::new(table)
}
fn state(
app: AppId,
host: &str,
verdict: Result<(), SubscribeDenied>,
broadcaster: Arc<dyn RealtimeBroadcaster>,
) -> RealtimeState {
RealtimeState {
app_domains: domains(host, app),
broadcaster,
authority: Arc::new(StubAuthority(verdict)),
heartbeat: Duration::from_millis(100),
}
}
async fn get_status(state: RealtimeState, host: &str, topic: &str) -> StatusCode {
let app = realtime_router(state);
let req = Request::builder()
.uri(format!("/realtime/topics/{topic}"))
.header("host", host)
.body(Body::empty())
.unwrap();
app.oneshot(req).await.unwrap().status()
}
#[tokio::test]
async fn unknown_host_is_404() {
let app = AppId::new();
let st = state(
app,
"app.example.com",
Ok(()),
Arc::new(InProcessBroadcaster::new(8)),
);
// Request a different host → no app claims it.
assert_eq!(
get_status(st, "other.example.com", "chat").await,
StatusCode::NOT_FOUND
);
}
#[tokio::test]
async fn not_found_topic_is_404() {
let app = AppId::new();
let st = state(
app,
"app.example.com",
Err(SubscribeDenied::NotFound),
Arc::new(InProcessBroadcaster::new(8)),
);
assert_eq!(
get_status(st, "app.example.com", "ghost").await,
StatusCode::NOT_FOUND
);
}
#[tokio::test]
async fn unauthorized_token_is_401() {
let app = AppId::new();
let st = state(
app,
"app.example.com",
Err(SubscribeDenied::Unauthorized),
Arc::new(InProcessBroadcaster::new(8)),
);
assert_eq!(
get_status(st, "app.example.com", "chat").await,
StatusCode::UNAUTHORIZED
);
}
#[tokio::test]
async fn public_topic_returns_event_stream() {
let app = AppId::new();
let st = state(
app,
"app.example.com",
Ok(()),
Arc::new(InProcessBroadcaster::new(8)),
);
let appr = realtime_router(st);
let req = Request::builder()
.uri("/realtime/topics/chat")
.header("host", "app.example.com")
.body(Body::empty())
.unwrap();
let resp = appr.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let ct = resp
.headers()
.get(axum::http::header::CONTENT_TYPE)
.unwrap()
.to_str()
.unwrap();
assert!(ct.starts_with("text/event-stream"));
assert_eq!(resp.headers().get("x-accel-buffering").unwrap(), "no");
}
#[tokio::test]
async fn subscribe_receives_published_event() {
let app = AppId::new();
let broadcaster = Arc::new(InProcessBroadcaster::new(8));
let st = state(app, "app.example.com", Ok(()), broadcaster.clone());
let appr = realtime_router(st);
let req = Request::builder()
.uri("/realtime/topics/chat")
.header("host", "app.example.com")
.body(Body::empty())
.unwrap();
let resp = appr.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
// The handler has subscribed; publish and read the first chunk.
// Give the streaming task a beat to register its receiver.
let mut body = resp.into_body().into_data_stream();
tokio::time::sleep(Duration::from_millis(50)).await;
broadcaster
.publish(
app,
"chat",
RealtimeEvent {
topic: "chat".into(),
message: serde_json::json!({ "hi": 1 }),
published_at: chrono::Utc::now(),
},
)
.await;
let chunk = tokio::time::timeout(Duration::from_secs(2), body.next())
.await
.expect("a chunk within timeout")
.expect("stream item")
.expect("chunk ok");
let text = String::from_utf8_lossy(&chunk);
assert!(text.contains("data:"), "got: {text}");
assert!(text.contains("\"hi\":1"), "got: {text}");
}
#[tokio::test]
async fn heartbeat_fires_on_idle_connection() {
let app = AppId::new();
let broadcaster = Arc::new(InProcessBroadcaster::new(8));
// Hold a clone so the channel's sender outlives the router (which
// oneshot consumes) — otherwise the stream closes immediately.
let _keepalive = broadcaster.clone();
let st = state(app, "app.example.com", Ok(()), broadcaster);
let appr = realtime_router(st);
let req = Request::builder()
.uri("/realtime/topics/chat")
.header("host", "app.example.com")
.body(Body::empty())
.unwrap();
let resp = appr.oneshot(req).await.unwrap();
let mut body = resp.into_body().into_data_stream();
// No publish — with a 100ms heartbeat, a keep-alive comment must
// arrive well within a second.
let chunk = tokio::time::timeout(Duration::from_secs(1), body.next())
.await
.expect("heartbeat within timeout")
.expect("stream item")
.expect("chunk ok");
let text = String::from_utf8_lossy(&chunk);
assert!(text.starts_with(':'), "expected SSE comment, got: {text}");
}
}