Server-side realtime SSE on per-app pub/sub topics, plus the three
v1.1.5 follow-ups and the version bumps.
Realtime:
- topics registry (0021) + admin endpoints + Capability::AppTopicManage
(-> app:admin; no new scope).
- GET /realtime/topics/{topic} SSE endpoint (orchestrator-core data
plane): Host -> app, RealtimeAuthority gate (404 missing/internal,
401 bad/absent token), broadcast::Receiver stream + heartbeat.
- RealtimeBroadcaster / RealtimeEvent / RealtimeAuthority traits
(picloud-shared); InProcessBroadcaster + GC (orchestrator-core);
DB-backed RealtimeAuthorityImpl (manager-core). Publish path fans out
to in-process subscribers after the durable outbox commit (best-effort,
panic-isolated).
- HMAC subscriber tokens (subscriber_token.rs) + app_secrets table (0022)
+ pubsub::subscriber_token SDK (schema 1.6 -> 1.7). TTL clamp + env
overrides.
- Dashboard Topics tab (register/list/edit/delete, prominent external
badge, flip confirmation).
v1.1.5 follow-ups:
- Empty blobs accepted (NewFile/FileUpdate::validate) + round-trip test.
- Orphan *.tmp.* sweeper (spawn_files_orphan_sweep).
- Dispatcher e2e tests, one per trigger kind (DATABASE_URL-gated).
Versions: workspace 1.1.6, SDK 1.7, dashboard 0.12.0. Schema-snapshot
golden re-blessed.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
213 lines
6.6 KiB
Rust
213 lines
6.6 KiB
Rust
//! `TopicRepo` — CRUD for the `topics` table (v1.1.6).
|
|
//!
|
|
//! This table holds ONLY topics that have been explicitly externalized
|
|
//! for SSE subscription (design notes §5). Internal-only pub/sub topics
|
|
//! stay implicit — they never get a row here, and the publish path never
|
|
//! consults this table. The two readers are the topic admin endpoints
|
|
//! ([`crate::topics_api`]) and the SSE subscribe authorization
|
|
//! ([`crate::realtime_authority`]).
|
|
|
|
use async_trait::async_trait;
|
|
use chrono::{DateTime, Utc};
|
|
use picloud_shared::AppId;
|
|
use serde::{Deserialize, Serialize};
|
|
use sqlx::PgPool;
|
|
|
|
/// External-subscriber auth gate for a topic. `'public'` + `'token'` in
|
|
/// v1.1.6; `'session'` (v1.1.8) and `'script'` (v1.2) extend the DB
|
|
/// CHECK constraint and this enum later.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
|
#[serde(rename_all = "lowercase")]
|
|
pub enum TopicAuthMode {
|
|
Public,
|
|
Token,
|
|
}
|
|
|
|
impl TopicAuthMode {
|
|
#[must_use]
|
|
pub const fn as_str(self) -> &'static str {
|
|
match self {
|
|
Self::Public => "public",
|
|
Self::Token => "token",
|
|
}
|
|
}
|
|
|
|
fn from_db(s: &str) -> Result<Self, TopicRepoError> {
|
|
match s {
|
|
"public" => Ok(Self::Public),
|
|
"token" => Ok(Self::Token),
|
|
other => Err(TopicRepoError::Backend(format!(
|
|
"unknown auth_mode in DB: {other}"
|
|
))),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A registered, externally-subscribable topic row.
|
|
#[derive(Debug, Clone, Serialize)]
|
|
pub struct Topic {
|
|
pub name: String,
|
|
pub external_subscribable: bool,
|
|
pub auth_mode: TopicAuthMode,
|
|
pub created_at: DateTime<Utc>,
|
|
pub updated_at: DateTime<Utc>,
|
|
}
|
|
|
|
#[derive(Debug, thiserror::Error)]
|
|
pub enum TopicRepoError {
|
|
#[error("a topic named {0:?} already exists in this app")]
|
|
AlreadyExists(String),
|
|
#[error("database error: {0}")]
|
|
Db(#[from] sqlx::Error),
|
|
#[error("topic backend error: {0}")]
|
|
Backend(String),
|
|
}
|
|
|
|
#[async_trait]
|
|
pub trait TopicRepo: Send + Sync {
|
|
/// Register a topic. Errors `AlreadyExists` on PK conflict.
|
|
async fn create(
|
|
&self,
|
|
app_id: AppId,
|
|
name: &str,
|
|
external_subscribable: bool,
|
|
auth_mode: TopicAuthMode,
|
|
) -> Result<Topic, TopicRepoError>;
|
|
|
|
/// List every registered topic in the app, ordered by name.
|
|
async fn list(&self, app_id: AppId) -> Result<Vec<Topic>, TopicRepoError>;
|
|
|
|
/// Fetch one topic by name, `None` if not registered.
|
|
async fn get(&self, app_id: AppId, name: &str) -> Result<Option<Topic>, TopicRepoError>;
|
|
|
|
/// Update `external_subscribable` and/or `auth_mode` (each `None`
|
|
/// leaves the column unchanged). `None` return = no such topic.
|
|
async fn update(
|
|
&self,
|
|
app_id: AppId,
|
|
name: &str,
|
|
external_subscribable: Option<bool>,
|
|
auth_mode: Option<TopicAuthMode>,
|
|
) -> Result<Option<Topic>, TopicRepoError>;
|
|
|
|
/// Unregister a topic. Returns `true` if a row was removed.
|
|
async fn delete(&self, app_id: AppId, name: &str) -> Result<bool, TopicRepoError>;
|
|
}
|
|
|
|
#[derive(sqlx::FromRow)]
|
|
struct TopicRow {
|
|
name: String,
|
|
external_subscribable: bool,
|
|
auth_mode: String,
|
|
created_at: DateTime<Utc>,
|
|
updated_at: DateTime<Utc>,
|
|
}
|
|
|
|
impl TopicRow {
|
|
fn into_topic(self) -> Result<Topic, TopicRepoError> {
|
|
Ok(Topic {
|
|
auth_mode: TopicAuthMode::from_db(&self.auth_mode)?,
|
|
name: self.name,
|
|
external_subscribable: self.external_subscribable,
|
|
created_at: self.created_at,
|
|
updated_at: self.updated_at,
|
|
})
|
|
}
|
|
}
|
|
|
|
const SELECT_COLS: &str = "name, external_subscribable, auth_mode, created_at, updated_at";
|
|
|
|
pub struct PostgresTopicRepo {
|
|
pool: PgPool,
|
|
}
|
|
|
|
impl PostgresTopicRepo {
|
|
#[must_use]
|
|
pub fn new(pool: PgPool) -> Self {
|
|
Self { pool }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl TopicRepo for PostgresTopicRepo {
|
|
async fn create(
|
|
&self,
|
|
app_id: AppId,
|
|
name: &str,
|
|
external_subscribable: bool,
|
|
auth_mode: TopicAuthMode,
|
|
) -> Result<Topic, TopicRepoError> {
|
|
let row: Option<TopicRow> = sqlx::query_as(&format!(
|
|
"INSERT INTO topics (app_id, name, external_subscribable, auth_mode) \
|
|
VALUES ($1, $2, $3, $4) \
|
|
ON CONFLICT (app_id, name) DO NOTHING \
|
|
RETURNING {SELECT_COLS}"
|
|
))
|
|
.bind(app_id.into_inner())
|
|
.bind(name)
|
|
.bind(external_subscribable)
|
|
.bind(auth_mode.as_str())
|
|
.fetch_optional(&self.pool)
|
|
.await?;
|
|
match row {
|
|
Some(r) => r.into_topic(),
|
|
None => Err(TopicRepoError::AlreadyExists(name.to_string())),
|
|
}
|
|
}
|
|
|
|
async fn list(&self, app_id: AppId) -> Result<Vec<Topic>, TopicRepoError> {
|
|
let rows: Vec<TopicRow> = sqlx::query_as(&format!(
|
|
"SELECT {SELECT_COLS} FROM topics WHERE app_id = $1 ORDER BY name"
|
|
))
|
|
.bind(app_id.into_inner())
|
|
.fetch_all(&self.pool)
|
|
.await?;
|
|
rows.into_iter().map(TopicRow::into_topic).collect()
|
|
}
|
|
|
|
async fn get(&self, app_id: AppId, name: &str) -> Result<Option<Topic>, TopicRepoError> {
|
|
let row: Option<TopicRow> = sqlx::query_as(&format!(
|
|
"SELECT {SELECT_COLS} FROM topics WHERE app_id = $1 AND name = $2"
|
|
))
|
|
.bind(app_id.into_inner())
|
|
.bind(name)
|
|
.fetch_optional(&self.pool)
|
|
.await?;
|
|
row.map(TopicRow::into_topic).transpose()
|
|
}
|
|
|
|
async fn update(
|
|
&self,
|
|
app_id: AppId,
|
|
name: &str,
|
|
external_subscribable: Option<bool>,
|
|
auth_mode: Option<TopicAuthMode>,
|
|
) -> Result<Option<Topic>, TopicRepoError> {
|
|
// COALESCE leaves a column untouched when its bind is NULL.
|
|
let row: Option<TopicRow> = sqlx::query_as(&format!(
|
|
"UPDATE topics SET \
|
|
external_subscribable = COALESCE($3, external_subscribable), \
|
|
auth_mode = COALESCE($4, auth_mode), \
|
|
updated_at = NOW() \
|
|
WHERE app_id = $1 AND name = $2 \
|
|
RETURNING {SELECT_COLS}"
|
|
))
|
|
.bind(app_id.into_inner())
|
|
.bind(name)
|
|
.bind(external_subscribable)
|
|
.bind(auth_mode.map(|m| m.as_str()))
|
|
.fetch_optional(&self.pool)
|
|
.await?;
|
|
row.map(TopicRow::into_topic).transpose()
|
|
}
|
|
|
|
async fn delete(&self, app_id: AppId, name: &str) -> Result<bool, TopicRepoError> {
|
|
let res = sqlx::query("DELETE FROM topics WHERE app_id = $1 AND name = $2")
|
|
.bind(app_id.into_inner())
|
|
.bind(name)
|
|
.execute(&self.pool)
|
|
.await?;
|
|
Ok(res.rows_affected() > 0)
|
|
}
|
|
}
|