Server-side realtime SSE on per-app pub/sub topics, plus the three
v1.1.5 follow-ups and the version bumps.
Realtime:
- topics registry (0021) + admin endpoints + Capability::AppTopicManage
(-> app:admin; no new scope).
- GET /realtime/topics/{topic} SSE endpoint (orchestrator-core data
plane): Host -> app, RealtimeAuthority gate (404 missing/internal,
401 bad/absent token), broadcast::Receiver stream + heartbeat.
- RealtimeBroadcaster / RealtimeEvent / RealtimeAuthority traits
(picloud-shared); InProcessBroadcaster + GC (orchestrator-core);
DB-backed RealtimeAuthorityImpl (manager-core). Publish path fans out
to in-process subscribers after the durable outbox commit (best-effort,
panic-isolated).
- HMAC subscriber tokens (subscriber_token.rs) + app_secrets table (0022)
+ pubsub::subscriber_token SDK (schema 1.6 -> 1.7). TTL clamp + env
overrides.
- Dashboard Topics tab (register/list/edit/delete, prominent external
badge, flip confirmation).
v1.1.5 follow-ups:
- Empty blobs accepted (NewFile/FileUpdate::validate) + round-trip test.
- Orphan *.tmp.* sweeper (spawn_files_orphan_sweep).
- Dispatcher e2e tests, one per trigger kind (DATABASE_URL-gated).
Versions: workspace 1.1.6, SDK 1.7, dashboard 0.12.0. Schema-snapshot
golden re-blessed.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
339 lines
11 KiB
Rust
339 lines
11 KiB
Rust
//! `RealtimeAuthorityImpl` — the DB-backed SSE subscribe gate (v1.1.6).
|
|
//!
|
|
//! Backs the [`picloud_shared::RealtimeAuthority`] trait the SSE handler
|
|
//! in orchestrator-core calls. All `topics`-table reads and signing-key
|
|
//! material stay inside this impl so the data-plane crate never touches
|
|
//! the key.
|
|
//!
|
|
//! Verdict mapping (see [`SubscribeDenied`]):
|
|
//! * topic missing OR not externally subscribable → `NotFound` (404).
|
|
//! Both collapse to 404 so the endpoint can't probe internal topics.
|
|
//! * `auth_mode = 'public'` → allow.
|
|
//! * `auth_mode = 'token'` → verify the HMAC token (present, signed by
|
|
//! this app's key, unexpired, scoped to this topic) → allow, else
|
|
//! `Unauthorized` (401, generic — never says which check failed).
|
|
//!
|
|
//! Signing keys never change in v1.1.6 (no rotation API), so a small
|
|
//! in-memory cache avoids a per-subscribe DB read once an app's key has
|
|
//! been seen. The cache is purely an optimization — a cold miss reads
|
|
//! the row.
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use async_trait::async_trait;
|
|
use picloud_shared::{subscriber_token, AppId, RealtimeAuthority, SubscribeDenied};
|
|
|
|
use crate::app_secrets_repo::AppSecretsRepo;
|
|
use crate::topic_repo::{TopicAuthMode, TopicRepo};
|
|
|
|
pub struct RealtimeAuthorityImpl {
|
|
topics: Arc<dyn TopicRepo>,
|
|
secrets: Arc<dyn AppSecretsRepo>,
|
|
key_cache: Mutex<HashMap<AppId, Vec<u8>>>,
|
|
}
|
|
|
|
impl RealtimeAuthorityImpl {
|
|
#[must_use]
|
|
pub fn new(topics: Arc<dyn TopicRepo>, secrets: Arc<dyn AppSecretsRepo>) -> Self {
|
|
Self {
|
|
topics,
|
|
secrets,
|
|
key_cache: Mutex::new(HashMap::new()),
|
|
}
|
|
}
|
|
|
|
/// Fetch the app's signing key, consulting the cache first. Returns
|
|
/// `None` when the app has no key (no token ever minted) — which the
|
|
/// caller maps to `Unauthorized`.
|
|
async fn signing_key(&self, app_id: AppId) -> Result<Option<Vec<u8>>, SubscribeDenied> {
|
|
if let Ok(cache) = self.key_cache.lock() {
|
|
if let Some(k) = cache.get(&app_id) {
|
|
return Ok(Some(k.clone()));
|
|
}
|
|
}
|
|
let key = self
|
|
.secrets
|
|
.signing_key(app_id)
|
|
.await
|
|
.map_err(|e| SubscribeDenied::Backend(e.to_string()))?;
|
|
if let Some(ref k) = key {
|
|
if let Ok(mut cache) = self.key_cache.lock() {
|
|
cache.insert(app_id, k.clone());
|
|
}
|
|
}
|
|
Ok(key)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl RealtimeAuthority for RealtimeAuthorityImpl {
|
|
async fn authorize_subscribe(
|
|
&self,
|
|
app_id: AppId,
|
|
topic: &str,
|
|
token: Option<&str>,
|
|
) -> Result<(), SubscribeDenied> {
|
|
let registered = self
|
|
.topics
|
|
.get(app_id, topic)
|
|
.await
|
|
.map_err(|e| SubscribeDenied::Backend(e.to_string()))?;
|
|
|
|
// Missing topic AND internal-only topic both 404 — don't leak
|
|
// which internal topics exist.
|
|
let Some(t) = registered.filter(|t| t.external_subscribable) else {
|
|
return Err(SubscribeDenied::NotFound);
|
|
};
|
|
|
|
match t.auth_mode {
|
|
TopicAuthMode::Public => Ok(()),
|
|
TopicAuthMode::Token => {
|
|
let token = token.ok_or(SubscribeDenied::Unauthorized)?;
|
|
let key = self
|
|
.signing_key(app_id)
|
|
.await?
|
|
.ok_or(SubscribeDenied::Unauthorized)?;
|
|
let now = chrono::Utc::now().timestamp();
|
|
let claims = subscriber_token::verify(&key, token, now)
|
|
.map_err(|_| SubscribeDenied::Unauthorized)?;
|
|
// Per-app key already makes a cross-app token fail the
|
|
// signature check; this is belt-and-suspenders.
|
|
if claims.app_id != app_id || !claims.allows_topic(topic) {
|
|
return Err(SubscribeDenied::Unauthorized);
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::app_secrets_repo::AppSecretsRepoError;
|
|
use crate::topic_repo::{Topic, TopicRepoError};
|
|
use chrono::Utc;
|
|
use picloud_shared::subscriber_token::{sign, TokenClaims};
|
|
|
|
struct FakeTopics(Vec<(AppId, Topic)>);
|
|
#[async_trait]
|
|
impl TopicRepo for FakeTopics {
|
|
async fn create(
|
|
&self,
|
|
_: AppId,
|
|
_: &str,
|
|
_: bool,
|
|
_: TopicAuthMode,
|
|
) -> Result<Topic, TopicRepoError> {
|
|
unimplemented!()
|
|
}
|
|
async fn list(&self, _: AppId) -> Result<Vec<Topic>, TopicRepoError> {
|
|
unimplemented!()
|
|
}
|
|
async fn get(&self, app_id: AppId, name: &str) -> Result<Option<Topic>, TopicRepoError> {
|
|
Ok(self
|
|
.0
|
|
.iter()
|
|
.find(|(a, t)| *a == app_id && t.name == name)
|
|
.map(|(_, t)| t.clone()))
|
|
}
|
|
async fn update(
|
|
&self,
|
|
_: AppId,
|
|
_: &str,
|
|
_: Option<bool>,
|
|
_: Option<TopicAuthMode>,
|
|
) -> Result<Option<Topic>, TopicRepoError> {
|
|
unimplemented!()
|
|
}
|
|
async fn delete(&self, _: AppId, _: &str) -> Result<bool, TopicRepoError> {
|
|
unimplemented!()
|
|
}
|
|
}
|
|
|
|
struct FakeSecrets(AppId, Vec<u8>);
|
|
#[async_trait]
|
|
impl AppSecretsRepo for FakeSecrets {
|
|
async fn get_or_create_signing_key(
|
|
&self,
|
|
_: AppId,
|
|
) -> Result<Vec<u8>, AppSecretsRepoError> {
|
|
Ok(self.1.clone())
|
|
}
|
|
async fn signing_key(&self, app_id: AppId) -> Result<Option<Vec<u8>>, AppSecretsRepoError> {
|
|
Ok((app_id == self.0).then(|| self.1.clone()))
|
|
}
|
|
}
|
|
|
|
fn topic(name: &str, external: bool, mode: TopicAuthMode) -> Topic {
|
|
Topic {
|
|
name: name.to_string(),
|
|
external_subscribable: external,
|
|
auth_mode: mode,
|
|
created_at: Utc::now(),
|
|
updated_at: Utc::now(),
|
|
}
|
|
}
|
|
|
|
fn authority(
|
|
topics: Vec<(AppId, Topic)>,
|
|
key_app: AppId,
|
|
key: Vec<u8>,
|
|
) -> RealtimeAuthorityImpl {
|
|
RealtimeAuthorityImpl::new(
|
|
Arc::new(FakeTopics(topics)),
|
|
Arc::new(FakeSecrets(key_app, key)),
|
|
)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn missing_topic_is_not_found() {
|
|
let app = AppId::new();
|
|
let a = authority(vec![], app, vec![0u8; 32]);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app, "ghost", None).await,
|
|
Err(SubscribeDenied::NotFound)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn internal_only_topic_is_not_found() {
|
|
let app = AppId::new();
|
|
let a = authority(
|
|
vec![(app, topic("internal", false, TopicAuthMode::Public))],
|
|
app,
|
|
vec![0u8; 32],
|
|
);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app, "internal", None).await,
|
|
Err(SubscribeDenied::NotFound)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn public_topic_allows_without_token() {
|
|
let app = AppId::new();
|
|
let a = authority(
|
|
vec![(app, topic("news", true, TopicAuthMode::Public))],
|
|
app,
|
|
vec![0u8; 32],
|
|
);
|
|
assert!(a.authorize_subscribe(app, "news", None).await.is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn token_topic_without_token_is_unauthorized() {
|
|
let app = AppId::new();
|
|
let a = authority(
|
|
vec![(app, topic("chat", true, TopicAuthMode::Token))],
|
|
app,
|
|
vec![7u8; 32],
|
|
);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app, "chat", None).await,
|
|
Err(SubscribeDenied::Unauthorized)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn token_topic_with_valid_token_allows() {
|
|
let app = AppId::new();
|
|
let key = vec![9u8; 32];
|
|
let a = authority(
|
|
vec![(app, topic("chat", true, TopicAuthMode::Token))],
|
|
app,
|
|
key.clone(),
|
|
);
|
|
let token = sign(
|
|
&key,
|
|
&TokenClaims {
|
|
app_id: app,
|
|
topics: vec!["chat".into()],
|
|
iat: Utc::now().timestamp(),
|
|
exp: Utc::now().timestamp() + 60,
|
|
},
|
|
);
|
|
assert!(a
|
|
.authorize_subscribe(app, "chat", Some(&token))
|
|
.await
|
|
.is_ok());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn token_for_other_topic_is_unauthorized() {
|
|
let app = AppId::new();
|
|
let key = vec![9u8; 32];
|
|
let a = authority(
|
|
vec![(app, topic("chat", true, TopicAuthMode::Token))],
|
|
app,
|
|
key.clone(),
|
|
);
|
|
let token = sign(
|
|
&key,
|
|
&TokenClaims {
|
|
app_id: app,
|
|
topics: vec!["other".into()],
|
|
iat: Utc::now().timestamp(),
|
|
exp: Utc::now().timestamp() + 60,
|
|
},
|
|
);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app, "chat", Some(&token)).await,
|
|
Err(SubscribeDenied::Unauthorized)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn expired_token_is_unauthorized() {
|
|
let app = AppId::new();
|
|
let key = vec![9u8; 32];
|
|
let a = authority(
|
|
vec![(app, topic("chat", true, TopicAuthMode::Token))],
|
|
app,
|
|
key.clone(),
|
|
);
|
|
let token = sign(
|
|
&key,
|
|
&TokenClaims {
|
|
app_id: app,
|
|
topics: vec!["chat".into()],
|
|
iat: Utc::now().timestamp() - 120,
|
|
exp: Utc::now().timestamp() - 60,
|
|
},
|
|
);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app, "chat", Some(&token)).await,
|
|
Err(SubscribeDenied::Unauthorized)
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn token_signed_by_other_app_key_is_unauthorized() {
|
|
let app_a = AppId::new();
|
|
let app_b = AppId::new();
|
|
let key_a = vec![1u8; 32];
|
|
let key_b = vec![2u8; 32];
|
|
// Authority for app B; its key is key_b.
|
|
let a = authority(
|
|
vec![(app_b, topic("chat", true, TopicAuthMode::Token))],
|
|
app_b,
|
|
key_b,
|
|
);
|
|
// Token signed by app A's key, claiming app A.
|
|
let token = sign(
|
|
&key_a,
|
|
&TokenClaims {
|
|
app_id: app_a,
|
|
topics: vec!["chat".into()],
|
|
iat: Utc::now().timestamp(),
|
|
exp: Utc::now().timestamp() + 60,
|
|
},
|
|
);
|
|
assert_eq!(
|
|
a.authorize_subscribe(app_b, "chat", Some(&token)).await,
|
|
Err(SubscribeDenied::Unauthorized)
|
|
);
|
|
}
|
|
}
|