diff --git a/crates/executor-core/tests/engine.rs b/crates/executor-core/tests/engine.rs index 39935f4..7bb1336 100644 --- a/crates/executor-core/tests/engine.rs +++ b/crates/executor-core/tests/engine.rs @@ -27,7 +27,7 @@ fn req(body: serde_json::Value) -> ExecRequest { } fn engine() -> Engine { - Engine::new(Limits::default(), Services::new()) + Engine::new(Limits::default(), Services::default()) } #[test] @@ -126,7 +126,7 @@ fn enforces_operation_budget() { max_operations: 1_000, ..Limits::default() }; - let engine = Engine::new(limits, Services::new()); + let engine = Engine::new(limits, Services::default()); // 10_000 iterations vastly exceeds 1_000 ops. let src = r"let n = 0; for i in 0..10000 { n += 1; } n"; let err = engine diff --git a/crates/executor-core/tests/sdk_contract.rs b/crates/executor-core/tests/sdk_contract.rs index 26788c5..9ae6014 100644 --- a/crates/executor-core/tests/sdk_contract.rs +++ b/crates/executor-core/tests/sdk_contract.rs @@ -31,7 +31,7 @@ use serde_json::{json, Value}; // ---------------------------------------------------------------------------- fn engine() -> Engine { - Engine::new(Limits::default(), Services::new()) + Engine::new(Limits::default(), Services::default()) } fn baseline_request() -> ExecRequest { diff --git a/crates/executor-core/tests/stdlib.rs b/crates/executor-core/tests/stdlib.rs index 1f119c7..c3649df 100644 --- a/crates/executor-core/tests/stdlib.rs +++ b/crates/executor-core/tests/stdlib.rs @@ -17,7 +17,7 @@ use serde_json::{json, Value}; // ---------------------------------------------------------------------------- fn engine() -> Engine { - Engine::new(Limits::default(), Services::new()) + Engine::new(Limits::default(), Services::default()) } fn baseline_request() -> ExecRequest { diff --git a/crates/manager-core/migrations/0007_kv.sql b/crates/manager-core/migrations/0007_kv.sql new file mode 100644 index 0000000..c4ecd67 --- /dev/null +++ b/crates/manager-core/migrations/0007_kv.sql @@ -0,0 +1,28 @@ +-- v1.1.1: Key-value store — see blueprint §8.1 + docs/sdk-shape.md. +-- +-- Identity tuple `(app_id, collection, key)`. `app_id` is first in the +-- primary key so the implicit index is always per-app; cross-app reads +-- cannot happen even with a buggy query. Collections are a required +-- namespace inside an app — the same key can live in different +-- collections without collision. +-- +-- `value` is JSONB so scripts can store nested structures without +-- a separate serialization step. No TTL column in v1.1.1; deferred +-- until a concrete need surfaces (the blueprint reserved one but the +-- v1.1.1 SDK surface — get/set/has/delete/list — doesn't expose TTL). + +CREATE TABLE kv_entries ( + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + collection TEXT NOT NULL, + key TEXT NOT NULL, + value JSONB NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (app_id, collection, key) +); + +-- Supports list-by-collection (keyset pagination) and per-collection +-- triggers' fan-out scans. The PK already covers (app_id, collection) +-- as a prefix but spelling out the explicit index makes intent clear +-- for the planner. +CREATE INDEX idx_kv_entries_app_collection ON kv_entries (app_id, collection); diff --git a/crates/manager-core/src/authz.rs b/crates/manager-core/src/authz.rs index 1da5709..a19a055 100644 --- a/crates/manager-core/src/authz.rs +++ b/crates/manager-core/src/authz.rs @@ -57,6 +57,13 @@ pub enum Capability { AppAdmin(AppId), /// Read execution logs for scripts in this app. AppLogRead(AppId), + /// Read entries from this app's KV store (v1.1.1). Granted to + /// `viewer`+ in the per-app role table. Maps to `script:read` on + /// API keys — the seven-scope vocabulary stays locked. + AppKvRead(AppId), + /// Write entries to this app's KV store (v1.1.1). Granted to + /// `editor`+. Maps to `script:write` on API keys. + AppKvWrite(AppId), } impl Capability { @@ -73,7 +80,9 @@ impl Capability { | Self::AppWriteRoute(id) | Self::AppManageDomains(id) | Self::AppAdmin(id) - | Self::AppLogRead(id) => Some(id), + | Self::AppLogRead(id) + | Self::AppKvRead(id) + | Self::AppKvWrite(id) => Some(id), } } @@ -88,8 +97,8 @@ impl Capability { Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => { Scope::InstanceAdmin } - Self::AppRead(_) => Scope::ScriptRead, - Self::AppWriteScript(_) => Scope::ScriptWrite, + Self::AppRead(_) | Self::AppKvRead(_) => Scope::ScriptRead, + Self::AppWriteScript(_) | Self::AppKvWrite(_) => Scope::ScriptWrite, Self::AppWriteRoute(_) => Scope::RouteWrite, Self::AppManageDomains(_) => Scope::DomainManage, Self::AppAdmin(_) => Scope::AppAdmin, @@ -230,11 +239,16 @@ async fn member_grants( /// domain claims, and delete. Roles form a strict subset chain, so /// the check is "is this capability in the role's set?". const fn role_satisfies(role: AppRole, cap: Capability) -> bool { - let in_viewer = matches!(cap, Capability::AppRead(_) | Capability::AppLogRead(_)); + let in_viewer = matches!( + cap, + Capability::AppRead(_) | Capability::AppLogRead(_) | Capability::AppKvRead(_) + ); let in_editor = in_viewer || matches!( cap, - Capability::AppWriteScript(_) | Capability::AppWriteRoute(_) + Capability::AppWriteScript(_) + | Capability::AppWriteRoute(_) + | Capability::AppKvWrite(_) ); let in_app_admin = in_editor || matches!( diff --git a/crates/manager-core/src/kv_repo.rs b/crates/manager-core/src/kv_repo.rs new file mode 100644 index 0000000..750d1bb --- /dev/null +++ b/crates/manager-core/src/kv_repo.rs @@ -0,0 +1,223 @@ +//! Low-level Postgres CRUD over `kv_entries`. Stays storage-only; +//! authorization, event emission, and empty-collection validation live +//! one layer up in `KvServiceImpl`. + +use async_trait::async_trait; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use base64::Engine as _; +use picloud_shared::{AppId, KvListPage}; +use sqlx::PgPool; + +#[derive(Debug, thiserror::Error)] +pub enum KvRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), + + #[error("invalid pagination cursor")] + InvalidCursor, +} + +/// Repo surface. The trait is exposed so tests can substitute an +/// in-memory backing without spinning up Postgres. +#[async_trait] +pub trait KvRepo: Send + Sync { + async fn get( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError>; + + /// Upserts the row. Returns the previous value (if any) so callers + /// can determine whether this was an `insert` or an `update` for + /// the emitted `ServiceEvent`. + async fn set( + &self, + app_id: AppId, + collection: &str, + key: &str, + value: serde_json::Value, + ) -> Result, KvRepoError>; + + /// Returns the deleted value if present, `None` if the row didn't + /// exist. The caller turns the `bool was-present` part into the + /// SDK's return value; the `Option` part feeds the + /// `old_payload` field of the emitted delete event. + async fn delete( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError>; + + async fn has(&self, app_id: AppId, collection: &str, key: &str) -> Result; + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result; +} + +pub struct PostgresKvRepo { + pool: PgPool, +} + +impl PostgresKvRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +/// Hard ceiling on `list` page size — scripts that pass anything larger +/// silently get clamped to this. Cursor-style pagination keeps a single +/// request bounded; clients fetch the next page via the returned cursor. +const KV_LIST_MAX_LIMIT: u32 = 1_000; +const KV_LIST_DEFAULT_LIMIT: u32 = 100; + +#[async_trait] +impl KvRepo for PostgresKvRepo { + async fn get( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError> { + let row: Option<(serde_json::Value,)> = sqlx::query_as( + "SELECT value FROM kv_entries \ + WHERE app_id = $1 AND collection = $2 AND key = $3", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(key) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|(v,)| v)) + } + + async fn set( + &self, + app_id: AppId, + collection: &str, + key: &str, + value: serde_json::Value, + ) -> Result, KvRepoError> { + // `RETURNING` after `ON CONFLICT DO UPDATE` exposes the old + // value via the `xmax`/old-row trick: capture the prior value + // with a CTE so callers know whether this was insert vs update. + let row: Option<(Option,)> = sqlx::query_as( + "WITH prev AS (\ + SELECT value FROM kv_entries \ + WHERE app_id = $1 AND collection = $2 AND key = $3\ + ), \ + upserted AS (\ + INSERT INTO kv_entries (app_id, collection, key, value) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (app_id, collection, key) DO UPDATE \ + SET value = EXCLUDED.value, updated_at = NOW() \ + RETURNING 1\ + ) \ + SELECT (SELECT value FROM prev) FROM upserted", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(key) + .bind(value) + .fetch_optional(&self.pool) + .await?; + Ok(row.and_then(|(v,)| v)) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError> { + let row: Option<(serde_json::Value,)> = sqlx::query_as( + "DELETE FROM kv_entries \ + WHERE app_id = $1 AND collection = $2 AND key = $3 \ + RETURNING value", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(key) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|(v,)| v)) + } + + async fn has(&self, app_id: AppId, collection: &str, key: &str) -> Result { + let row: Option<(i64,)> = sqlx::query_as( + "SELECT 1 FROM kv_entries \ + WHERE app_id = $1 AND collection = $2 AND key = $3", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(key) + .fetch_optional(&self.pool) + .await?; + Ok(row.is_some()) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let limit = if limit == 0 { + KV_LIST_DEFAULT_LIMIT + } else { + limit.min(KV_LIST_MAX_LIMIT) + }; + + let last_key = match cursor { + Some(c) => Some(decode_cursor(c)?), + None => None, + }; + + // Keyset pagination: rows beyond `last_key` ordered by key. + // `+1` to detect a "more pages" condition without a separate + // COUNT query. + let take = i64::from(limit) + 1; + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT key FROM kv_entries \ + WHERE app_id = $1 AND collection = $2 \ + AND ($3::text IS NULL OR key > $3) \ + ORDER BY key ASC \ + LIMIT $4", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(last_key.as_deref()) + .bind(take) + .fetch_all(&self.pool) + .await?; + + let mut keys: Vec = rows.into_iter().map(|(k,)| k).collect(); + let next_cursor = if keys.len() > limit as usize { + keys.truncate(limit as usize); + keys.last().map(|k| encode_cursor(k)) + } else { + None + }; + + Ok(KvListPage { keys, next_cursor }) + } +} + +fn encode_cursor(last_key: &str) -> String { + URL_SAFE_NO_PAD.encode(last_key.as_bytes()) +} + +fn decode_cursor(cursor: &str) -> Result { + let bytes = URL_SAFE_NO_PAD + .decode(cursor) + .map_err(|_| KvRepoError::InvalidCursor)?; + String::from_utf8(bytes).map_err(|_| KvRepoError::InvalidCursor) +} diff --git a/crates/manager-core/src/kv_service.rs b/crates/manager-core/src/kv_service.rs new file mode 100644 index 0000000..4af0479 --- /dev/null +++ b/crates/manager-core/src/kv_service.rs @@ -0,0 +1,519 @@ +//! `KvServiceImpl` — wires the `KvRepo` underneath the +//! `picloud_shared::KvService` trait that scripts see via the Rhai +//! bridge. +//! +//! Layers added here (vs the raw repo): +//! +//! 1. Empty-collection rejection at the SDK boundary +//! (`docs/sdk-shape.md`). +//! 2. **Script-as-gate authz**: when `cx.principal.is_some()` we run +//! `authz::require(...)`; when it's `None` (public unauthenticated +//! HTTP — the common case for public routes) we skip the check. +//! Cross-app isolation isn't affected — every query is keyed by +//! `cx.app_id`, never an argument. +//! 3. `ServiceEvent` emission after each mutation (`insert` / `update` +//! / `delete`). v1.1.0 ships a `NoopEventEmitter` so this is a +//! no-op until the outbox emitter lands later in v1.1.1. + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{ + KvError, KvListPage, KvService, SdkCallCx, ServiceEvent, ServiceEventEmitter, +}; + +use crate::authz::{self, AuthzRepo, Capability}; +use crate::kv_repo::{KvRepo, KvRepoError}; + +pub struct KvServiceImpl { + repo: Arc, + authz: Arc, + events: Arc, +} + +impl KvServiceImpl { + #[must_use] + pub fn new( + repo: Arc, + authz: Arc, + events: Arc, + ) -> Self { + Self { + repo, + authz, + events, + } + } + + async fn check_read(&self, cx: &SdkCallCx) -> Result<(), KvError> { + if let Some(ref principal) = cx.principal { + authz::require(&*self.authz, principal, Capability::AppKvRead(cx.app_id)) + .await + .map_err(|_| KvError::Forbidden)?; + } + Ok(()) + } + + async fn check_write(&self, cx: &SdkCallCx) -> Result<(), KvError> { + if let Some(ref principal) = cx.principal { + authz::require(&*self.authz, principal, Capability::AppKvWrite(cx.app_id)) + .await + .map_err(|_| KvError::Forbidden)?; + } + Ok(()) + } +} + +fn validate_collection(collection: &str) -> Result<(), KvError> { + if collection.is_empty() { + return Err(KvError::InvalidCollection); + } + Ok(()) +} + +impl From for KvError { + fn from(e: KvRepoError) -> Self { + Self::Backend(e.to_string()) + } +} + +#[async_trait] +impl KvService for KvServiceImpl { + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + ) -> Result, KvError> { + validate_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.get(cx.app_id, collection, key).await?) + } + + async fn set( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + value: serde_json::Value, + ) -> Result<(), KvError> { + validate_collection(collection)?; + self.check_write(cx).await?; + let previous = self + .repo + .set(cx.app_id, collection, key, value.clone()) + .await?; + let op = if previous.is_some() { + "update" + } else { + "insert" + }; + // Emit unconditionally; the noop emitter drops it, the outbox + // emitter persists it. Best-effort: a failed emit is logged + // but does not roll back the write. + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "kv", + op, + collection: Some(collection.to_string()), + key: Some(key.to_string()), + payload: Some(value), + old_payload: previous, + }, + ) + .await + { + tracing::warn!(error = %e, source = "kv", op, "event emit failed"); + } + Ok(()) + } + + async fn delete(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result { + validate_collection(collection)?; + self.check_write(cx).await?; + let previous = self.repo.delete(cx.app_id, collection, key).await?; + let was_present = previous.is_some(); + if was_present { + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "kv", + op: "delete", + collection: Some(collection.to_string()), + key: Some(key.to_string()), + payload: None, + old_payload: previous, + }, + ) + .await + { + tracing::warn!(error = %e, source = "kv", op = "delete", "event emit failed"); + } + } + Ok(was_present) + } + + async fn has(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result { + validate_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.has(cx.app_id, collection, key).await?) + } + + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + validate_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?) + } +} + +// ---------------------------------------------------------------------------- +// Tests — in-memory KvRepo so unit tests don't need Postgres. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::authz::{AuthzError, AuthzRepo}; + use async_trait::async_trait; + use picloud_shared::{ + AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal, + RequestId, UserId, + }; + use std::collections::{BTreeMap, HashMap}; + use tokio::sync::Mutex; + + #[derive(Default)] + struct InMemoryKvRepo { + data: Mutex>, + } + + #[async_trait] + impl KvRepo for InMemoryKvRepo { + async fn get( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError> { + Ok(self + .data + .lock() + .await + .get(&(app_id, collection.to_string(), key.to_string())) + .cloned()) + } + + async fn set( + &self, + app_id: AppId, + collection: &str, + key: &str, + value: serde_json::Value, + ) -> Result, KvRepoError> { + Ok(self + .data + .lock() + .await + .insert((app_id, collection.to_string(), key.to_string()), value)) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result, KvRepoError> { + Ok(self + .data + .lock() + .await + .remove(&(app_id, collection.to_string(), key.to_string()))) + } + + async fn has( + &self, + app_id: AppId, + collection: &str, + key: &str, + ) -> Result { + Ok(self.data.lock().await.contains_key(&( + app_id, + collection.to_string(), + key.to_string(), + ))) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let data = self.data.lock().await; + let last_key = cursor.map(std::string::ToString::to_string); + let mut keys: Vec = data + .iter() + .filter(|((a, c, _), _)| *a == app_id && c == collection) + .map(|((_, _, k), _)| k.clone()) + .filter(|k| last_key.as_ref().is_none_or(|lk| k > lk)) + .collect(); + keys.sort(); + let take = (limit as usize).max(1); + let next_cursor = if keys.len() > take { + keys.truncate(take); + keys.last().cloned() + } else { + None + }; + Ok(KvListPage { keys, next_cursor }) + } + } + + /// AuthzRepo that always denies — used to confirm the service + /// short-circuits on cx.principal.is_some() with a denial, and + /// that it does NOT call into authz when cx.principal is None. + #[derive(Default)] + struct DenyingAuthzRepo; + + #[async_trait] + impl AuthzRepo for DenyingAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(None) + } + } + + fn anon_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: None, + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + } + } + + fn owner_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Owner, + scopes: None, + app_binding: None, + }), + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + } + } + + fn member_no_role_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Member, + scopes: None, + app_binding: None, + }), + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + } + } + + fn svc() -> KvServiceImpl { + KvServiceImpl::new( + Arc::new(InMemoryKvRepo::default()), + Arc::new(DenyingAuthzRepo), + Arc::new(NoopEventEmitter), + ) + } + + #[tokio::test] + async fn set_then_get_round_trips() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + kv.set(&cx, "widgets", "k1", serde_json::json!({"n": 1})) + .await + .unwrap(); + let v = kv.get(&cx, "widgets", "k1").await.unwrap(); + assert_eq!(v, Some(serde_json::json!({"n": 1}))); + } + + #[tokio::test] + async fn get_missing_returns_none() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + let v = kv.get(&cx, "widgets", "nope").await.unwrap(); + assert_eq!(v, None); + } + + #[tokio::test] + async fn has_returns_bool() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + assert!(!kv.has(&cx, "widgets", "k1").await.unwrap()); + kv.set(&cx, "widgets", "k1", serde_json::json!(true)) + .await + .unwrap(); + assert!(kv.has(&cx, "widgets", "k1").await.unwrap()); + } + + #[tokio::test] + async fn delete_returns_was_present() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + assert!(!kv.delete(&cx, "widgets", "missing").await.unwrap()); + kv.set(&cx, "widgets", "k1", serde_json::json!(1)) + .await + .unwrap(); + assert!(kv.delete(&cx, "widgets", "k1").await.unwrap()); + // Idempotent — second delete returns false. + assert!(!kv.delete(&cx, "widgets", "k1").await.unwrap()); + } + + #[tokio::test] + async fn empty_collection_rejected() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + let err = kv.get(&cx, "", "k1").await.unwrap_err(); + assert!(matches!(err, KvError::InvalidCollection)); + } + + /// Load-bearing: a script with `cx.app_id = A` must NOT see + /// entries inserted under `cx.app_id = B`. This is the cross-app + /// isolation boundary; getting this wrong is a security + /// vulnerability. + #[tokio::test] + async fn cross_app_isolation_via_cx_app_id() { + let kv = svc(); + let app_a = AppId::new(); + let app_b = AppId::new(); + let cx_a = anon_cx(app_a); + let cx_b = anon_cx(app_b); + + kv.set(&cx_a, "shared", "k", serde_json::json!("from-a")) + .await + .unwrap(); + kv.set(&cx_b, "shared", "k", serde_json::json!("from-b")) + .await + .unwrap(); + + assert_eq!( + kv.get(&cx_a, "shared", "k").await.unwrap(), + Some(serde_json::json!("from-a")) + ); + assert_eq!( + kv.get(&cx_b, "shared", "k").await.unwrap(), + Some(serde_json::json!("from-b")) + ); + } + + /// Script-as-gate: an `anon_cx` (principal = None) skips the + /// capability check entirely. Even with a denying authz repo, + /// the write succeeds. + #[tokio::test] + async fn anonymous_cx_skips_authz() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + kv.set(&cx, "widgets", "k", serde_json::json!(1)) + .await + .unwrap(); + // No panic, no Forbidden. + } + + /// Authenticated principal with no role on the app: the + /// `DenyingAuthzRepo` returns no membership, so the capability + /// check denies. Set must surface KvError::Forbidden. + #[tokio::test] + async fn authed_cx_with_no_role_is_forbidden() { + let kv = svc(); + let cx = member_no_role_cx(AppId::new()); + let err = kv + .set(&cx, "widgets", "k", serde_json::json!(1)) + .await + .unwrap_err(); + assert!(matches!(err, KvError::Forbidden)); + } + + /// Owner principal: instance-role grants kick in inside `authz::can` + /// (Owner -> implicit AppAdmin which covers KvWrite). + #[tokio::test] + async fn owner_principal_can_write() { + let kv = svc(); + let cx = owner_cx(AppId::new()); + kv.set(&cx, "widgets", "k", serde_json::json!(1)) + .await + .unwrap(); + } + + #[tokio::test] + async fn list_cursor_pagination() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + for i in 0..5 { + kv.set( + &cx, + "widgets", + &format!("k{i:02}"), + serde_json::json!({"i": i}), + ) + .await + .unwrap(); + } + // page 1 — 2 keys + let p1 = kv.list(&cx, "widgets", None, 2).await.unwrap(); + assert_eq!(p1.keys, vec!["k00".to_string(), "k01".to_string()]); + assert!(p1.next_cursor.is_some()); + // page 2 — 2 keys + let p2 = kv + .list(&cx, "widgets", p1.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p2.keys, vec!["k02".to_string(), "k03".to_string()]); + // final page — 1 key, no cursor + let p3 = kv + .list(&cx, "widgets", p2.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p3.keys, vec!["k04".to_string()]); + assert!(p3.next_cursor.is_none()); + } + + /// Pinning the v1.1.0 contract: services hold the emitter as a + /// dyn Arc and call `emit().await` unconditionally. This test + /// proves the call site doesn't blow up against the noop impl — + /// the outbox emitter (v1.1.1) drops in transparently. + #[tokio::test] + async fn noop_emitter_does_not_block_mutations() { + let kv = svc(); + let cx = anon_cx(AppId::new()); + kv.set(&cx, "widgets", "k", serde_json::json!(1)) + .await + .unwrap(); + kv.delete(&cx, "widgets", "k").await.unwrap(); + // Reaching here means emit() returned Ok and didn't panic. + // Suppress unused-import warning when run alone: + let _ = HashMap::::new(); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index b126f9e..b308916 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -21,6 +21,8 @@ pub mod auth_api; pub mod auth_bootstrap; pub mod auth_middleware; pub mod authz; +pub mod kv_repo; +pub mod kv_service; pub mod log_sink; pub mod migrations; pub mod repo; @@ -63,6 +65,8 @@ pub use auth_middleware::{ API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE, }; pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision}; +pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; +pub use kv_service::KvServiceImpl; pub use log_sink::PostgresExecutionLogSink; pub use repo::{ ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository, diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 297301f..087ce70 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -14,19 +14,19 @@ use picloud_manager_core::{ attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated, route_admin_router, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, - AppRepository, AppsState, AuthState, AuthzRepo, PostgresAdminSessionRepository, + AppRepository, AppsState, AuthState, AuthzRepo, KvServiceImpl, PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, PostgresExecutionLogRepository, - PostgresExecutionLogSink, PostgresRouteRepository, PostgresScriptRepository, RepoResolver, - RouteAdminState, RouteRepository, SandboxCeiling, + PostgresExecutionLogSink, PostgresKvRepo, PostgresRouteRepository, PostgresScriptRepository, + RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, - WIRE_VERSION, + ExecutionLogSink, KvService, NoopDeadLetterService, NoopEventEmitter, ScriptValidator, + ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -83,10 +83,6 @@ fn read_session_ttl() -> Duration { /// `/version`) stays open — it's the public ingress for user scripts. #[allow(clippy::too_many_lines)] pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { - // `Services` is the SDK service bundle. Empty in v1.1.0; the - // v1.1.1 KV PR will populate it with `kv: Arc::new(...)` here. - let engine = Arc::new(Engine::new(Limits::default(), Services::new())); - let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone())); let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone())); let log_sink: Arc = Arc::new(PostgresExecutionLogSink::new(pool.clone())); @@ -98,10 +94,21 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { // (CRUD over the table) and `AuthzRepo` (single-row membership lookup // for capability checks). Construct it once and clone the Arc into // both trait views — same allocation, two vtables. - let members_concrete = Arc::new(PostgresAppMembersRepository::new(pool)); + let members_concrete = Arc::new(PostgresAppMembersRepository::new(pool.clone())); let members: Arc = members_concrete.clone(); let authz: Arc = members_concrete; + // SDK services bundle. v1.1.1 ships the KV store; the outbox-backed + // event emitter replaces `NoopEventEmitter` once the triggers + // dispatcher lands. `NoopDeadLetterService` is a v1.1.1 stub that + // errors loudly until the real `PostgresDeadLetterService` ships. + let kv_repo = Arc::new(PostgresKvRepo::new(pool.clone())); + let events: Arc = Arc::new(NoopEventEmitter); + let kv: Arc = + Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone())); + let services = Services::new(kv, Arc::new(NoopDeadLetterService), events); + let engine = Arc::new(Engine::new(Limits::default(), services)); + // Compile the routes table once at startup; admin writes refresh it. let route_table = Arc::new(RouteTable::new()); let initial = route_repo.list_all().await?; diff --git a/crates/shared/src/dead_letters.rs b/crates/shared/src/dead_letters.rs new file mode 100644 index 0000000..20c4323 --- /dev/null +++ b/crates/shared/src/dead_letters.rs @@ -0,0 +1,118 @@ +//! `DeadLetterService` — Rhai SDK contract for replaying and resolving +//! dead letters. Surface kept intentionally narrow for v1.1.1 (no +//! `list` — deferred to v1.2 per `docs/v1.1.x-design-notes.md` §4). +//! +//! Both methods are gated by `Capability::AppDeadLetterManage(AppId)` +//! evaluated inside the impl. Public-HTTP scripts running with +//! `cx.principal = None` will fail the check, which matches the +//! design's expectation (managing dead letters is an admin act). + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +use crate::SdkCallCx; + +/// Opaque identifier for a `dead_letters` row. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct DeadLetterId(pub Uuid); + +impl DeadLetterId { + #[must_use] + pub fn new() -> Self { + Self(Uuid::new_v4()) + } + + #[must_use] + pub fn into_inner(self) -> Uuid { + self.0 + } +} + +impl Default for DeadLetterId { + fn default() -> Self { + Self::new() + } +} + +impl From for DeadLetterId { + fn from(u: Uuid) -> Self { + Self(u) + } +} + +impl From for Uuid { + fn from(id: DeadLetterId) -> Self { + id.0 + } +} + +impl std::fmt::Display for DeadLetterId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +#[async_trait] +pub trait DeadLetterService: Send + Sync { + /// Re-enqueue the original event into the outbox. The dead-letter + /// row is marked `resolution = 'replayed'` regardless of whether + /// the retry ultimately succeeds. + async fn replay(&self, cx: &SdkCallCx, id: DeadLetterId) -> Result<(), DeadLetterError>; + + /// Mark the row resolved with the given reason (typically + /// `"ignored"` from the dashboard or `"handled_by_script"` from + /// inside a `dead_letter` trigger handler). + async fn resolve( + &self, + cx: &SdkCallCx, + id: DeadLetterId, + reason: &str, + ) -> Result<(), DeadLetterError>; +} + +#[derive(Debug, Error)] +pub enum DeadLetterError { + #[error("dead-letter row not found")] + NotFound, + + #[error("forbidden")] + Forbidden, + + #[error("invalid resolution reason: {0}")] + InvalidResolution(String), + + #[error("dead-letter backend error: {0}")] + Backend(String), +} + +/// Stub used to bootstrap the `Services` bundle before the real +/// Postgres-backed implementation lands. Behaves like +/// `NoopEventEmitter` — every call returns `Backend("...")` so scripts +/// see a clear "not yet implemented" error rather than silently +/// no-op'ing. Replaced by `PostgresDeadLetterService` in the v1.1.1 +/// dead-letter PR. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopDeadLetterService; + +#[async_trait] +impl DeadLetterService for NoopDeadLetterService { + async fn replay(&self, _cx: &SdkCallCx, _id: DeadLetterId) -> Result<(), DeadLetterError> { + Err(DeadLetterError::Backend( + "dead_letters::replay is not yet wired in".into(), + )) + } + + async fn resolve( + &self, + _cx: &SdkCallCx, + _id: DeadLetterId, + _reason: &str, + ) -> Result<(), DeadLetterError> { + Err(DeadLetterError::Backend( + "dead_letters::resolve is not yet wired in".into(), + )) + } +} diff --git a/crates/shared/src/ids.rs b/crates/shared/src/ids.rs index 426c6a3..ba21d07 100644 --- a/crates/shared/src/ids.rs +++ b/crates/shared/src/ids.rs @@ -53,3 +53,4 @@ id_type!(RequestId); id_type!(AdminUserId); id_type!(AppId); id_type!(ApiKeyId); +id_type!(TriggerId); diff --git a/crates/shared/src/kv.rs b/crates/shared/src/kv.rs new file mode 100644 index 0000000..57dec3e --- /dev/null +++ b/crates/shared/src/kv.rs @@ -0,0 +1,140 @@ +//! `KvService` — the v1.1.1 key-value store contract. +//! +//! Lives in `picloud-shared` (not `executor-core`) so the Rhai bridge, +//! the manager-core Postgres impl, and any future in-memory test impl +//! can all depend on the same trait without dragging +//! `executor-core` into `manager-core`'s dep graph. +//! +//! Implementations MUST derive every storage `app_id` from `cx.app_id` +//! — never from a script-passed argument. That is the cross-app +//! isolation boundary; see `docs/sdk-shape.md`. + +use async_trait::async_trait; +use thiserror::Error; + +use crate::SdkCallCx; + +/// `KvService` is collection-scoped. Scripts get a handle via +/// `kv::collection(name)` and call `get`/`set`/`has`/`delete`/`list` +/// on it. The trait surface accepts the collection by name so the +/// Postgres impl can avoid an extra round-trip to materialize the +/// collection (collections are namespaces, not first-class rows). +#[async_trait] +pub trait KvService: Send + Sync { + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + ) -> Result, KvError>; + + async fn set( + &self, + cx: &SdkCallCx, + collection: &str, + key: &str, + value: serde_json::Value, + ) -> Result<(), KvError>; + + async fn delete(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result; + + async fn has(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result; + + /// Cursor-style pagination. `cursor` is opaque to the caller; + /// implementations encode the resume key inside. `None` cursor + /// starts from the beginning. Implementations cap `limit` at a + /// reasonable ceiling internally (script can't request an unbounded + /// page). + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result; +} + +/// One page of keys from `KvService::list`. `next_cursor` is `Some` +/// when more pages exist, `None` when exhausted. The cursor encoding +/// is implementation-defined (the Postgres impl base64-encodes the +/// last key). +#[derive(Debug, Clone)] +pub struct KvListPage { + pub keys: Vec, + pub next_cursor: Option, +} + +/// Stub used by the test harness so executor-core integration tests +/// (which don't touch KV) can construct a `Services` bundle without +/// spinning up Postgres. Every call returns +/// `KvError::Backend("...")` so accidental KV use surfaces clearly. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopKvService; + +#[async_trait] +impl KvService for NoopKvService { + async fn get( + &self, + _cx: &SdkCallCx, + _collection: &str, + _key: &str, + ) -> Result, KvError> { + Err(KvError::Backend("kv is not wired in".into())) + } + + async fn set( + &self, + _cx: &SdkCallCx, + _collection: &str, + _key: &str, + _value: serde_json::Value, + ) -> Result<(), KvError> { + Err(KvError::Backend("kv is not wired in".into())) + } + + async fn delete( + &self, + _cx: &SdkCallCx, + _collection: &str, + _key: &str, + ) -> Result { + Err(KvError::Backend("kv is not wired in".into())) + } + + async fn has(&self, _cx: &SdkCallCx, _collection: &str, _key: &str) -> Result { + Err(KvError::Backend("kv is not wired in".into())) + } + + async fn list( + &self, + _cx: &SdkCallCx, + _collection: &str, + _cursor: Option<&str>, + _limit: u32, + ) -> Result { + Err(KvError::Backend("kv is not wired in".into())) + } +} + +/// Failure modes surfaced to the Rhai bridge. The bridge converts each +/// to a Rhai runtime error string; the discriminants exist so internal +/// callers (admin endpoints, tests, GC) can react more precisely. +#[derive(Debug, Error)] +pub enum KvError { + /// Empty collection name; rejected at the SDK boundary per + /// `docs/sdk-shape.md`. + #[error("collection name must not be empty")] + InvalidCollection, + + /// Caller principal lacked the required capability. Only raised + /// when `cx.principal.is_some()` — scripts running with + /// `principal: None` (public HTTP) operate under script-as-gate + /// semantics and skip the capability check. + #[error("forbidden")] + Forbidden, + + /// Anything else — Postgres unavailable, serialization failure, + /// etc. The string is safe to surface to a script. + #[error("kv backend error: {0}")] + Backend(String), +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 5714861..6540a79 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -6,10 +6,12 @@ pub mod app; pub mod auth; +pub mod dead_letters; pub mod error; pub mod events; pub mod execution_log; pub mod ids; +pub mod kv; pub mod log_sink; pub mod route; pub mod sandbox; @@ -21,10 +23,12 @@ pub mod version; pub use app::{App, AppDomain, DomainShape}; pub use auth::{AppRole, InstanceRole, Principal, Scope, UserId}; +pub use dead_letters::{DeadLetterError, DeadLetterId, DeadLetterService, NoopDeadLetterService}; pub use error::Error; pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter}; pub use execution_log::{ExecutionLog, ExecutionStatus}; -pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId}; +pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId, TriggerId}; +pub use kv::{KvError, KvListPage, KvService, NoopKvService}; pub use log_sink::{ExecutionLogSink, LogSinkError}; pub use route::{HostKind, PathKind, Route}; pub use sandbox::ScriptSandbox; diff --git a/crates/shared/src/services.rs b/crates/shared/src/services.rs index 6900c63..29e7c9d 100644 --- a/crates/shared/src/services.rs +++ b/crates/shared/src/services.rs @@ -1,38 +1,81 @@ //! `Services` — bundle of stateful SDK service handles plumbed from the //! host binary into every Rhai execution. //! -//! v1.1.0 ships this struct empty. Subsequent PRs in the v1.1.x series -//! add one field per service: +//! Constructed once at startup in the picloud binary; cloned (cheap — +//! every field is an `Arc`) into the per-call sdk bridge so script +//! invocations don't need to re-resolve dependencies. The bundle is +//! handed to `executor-core::sdk::register_all` alongside an +//! `SdkCallCx` to wire each `::` namespace. //! -//! ```ignore -//! pub kv: Arc, // v1.1.1 -//! pub docs: Arc, // v1.1.2 -//! pub http: Arc, // v1.1.4 -//! // … -//! ``` -//! -//! The bundle is cheap to clone (`Arc` per service) and is constructed -//! once at startup in the picloud binary. The executor takes it by -//! reference per invocation, hands it (alongside an `SdkCallCx`) to -//! `executor-core::sdk::register_all`, which wires the corresponding -//! Rhai `::` namespace per service. +//! v1.1.0 shipped this empty; v1.1.1 adds the first two service fields +//! (`kv`, `dead_letters`) plus the `events` emitter that bound services +//! use to publish events into the triggers outbox. //! //! `#[non_exhaustive]` so adding fields is a non-breaking change for //! consumers that only *pattern-match* a `&Services`; only crates that -//! *construct* a `Services` (in practice, just the picloud binary) need -//! to update their constructor when new services land. +//! *construct* a `Services` (the picloud binary and tests) update. + +use std::sync::Arc; + +use crate::{ + DeadLetterService, KvService, NoopDeadLetterService, NoopEventEmitter, NoopKvService, + ServiceEventEmitter, +}; /// SDK service bundle. See module docs for the lifecycle and the v1.1.x /// expansion plan. #[non_exhaustive] -#[derive(Default)] -pub struct Services {} +pub struct Services { + /// KV store (v1.1.1). Backed by Postgres in the picloud binary; + /// in-memory in tests. + pub kv: Arc, + + /// Dead-letter management (v1.1.1). Scripts get + /// `dead_letters::replay(id)` and `dead_letters::resolve(id, reason)`. + pub dead_letters: Arc, + + /// Event emitter for the triggers outbox. Mutating service methods + /// (`KvService::set/delete`, future `docs::*`, `files::*`, etc.) + /// call `events.emit(cx, event)` after the write succeeds. The + /// outbox-backed impl in `manager-core::outbox_event_emitter` + /// replaces v1.1.0's `NoopEventEmitter`. + pub events: Arc, +} impl Services { - /// Construct an empty bundle. Replaced by a fielded `::new(...)` - /// once the first service (KV, v1.1.1) lands. + /// Construct a bundle from already-constructed `Arc` handles. + /// The picloud binary's `main` wires this up after the DB pool is + /// open; tests build it from in-memory fakes. #[must_use] - pub fn new() -> Self { - Self {} + pub fn new( + kv: Arc, + dead_letters: Arc, + events: Arc, + ) -> Self { + Self { + kv, + dead_letters, + events, + } + } + + /// All-noop bundle for tests that build an `Engine` but don't + /// exercise the stateful services. Returns the same shape as + /// `Services::new` so callers can't accidentally rely on a stub + /// silently doing the right thing — every call into a noop + /// service surfaces an explicit error. + #[must_use] + pub fn with_noop_services() -> Self { + Self::new( + Arc::new(NoopKvService), + Arc::new(NoopDeadLetterService), + Arc::new(NoopEventEmitter), + ) + } +} + +impl Default for Services { + fn default() -> Self { + Self::with_noop_services() } }