feat(v1.1.1-kv): migrations + KvService trait + Postgres impl
First v1.1.1 commit. Adds the KV store the design notes commit to: `(app_id, collection, key)` identity with JSONB value and a per-app index. Trait lives in `picloud-shared` so the executor-core Rhai bridge (next commit), the Postgres impl, and tests all depend on the same surface without coupling crates. The `Services` bundle grows from empty to three fields: `kv`, `dead_letters` (NoopDeadLetterService stub — replaced by the Postgres impl in commit 8), and `events` (NoopEventEmitter until the outbox emitter lands with the dispatcher). Tests use `Services::default()` for an all-noop bundle. New capabilities `AppKvRead` / `AppKvWrite` join the Capability enum. They map onto the existing seven-value `Scope` (script:read / script:write) — the scope vocabulary stays locked per the `docs/versioning.md` commitment. Script-as-gate semantics in `KvServiceImpl`: capability check runs when `cx.principal.is_some()`, skipped when None (public HTTP). Cross-app isolation is enforced independently by deriving every row's `app_id` from `cx.app_id` rather than a script-passed argument. In-memory `KvRepo` impl + unit tests cover the round-trips, the cross-app isolation property, empty-collection rejection, script-as-gate behaviour for both anonymous and authed contexts, and cursor-style pagination. Postgres impl exists; integration testing waits for a real DB harness (see HANDBACK). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
519
crates/manager-core/src/kv_service.rs
Normal file
519
crates/manager-core/src/kv_service.rs
Normal file
@@ -0,0 +1,519 @@
|
||||
//! `KvServiceImpl` — wires the `KvRepo` underneath the
|
||||
//! `picloud_shared::KvService` trait that scripts see via the Rhai
|
||||
//! bridge.
|
||||
//!
|
||||
//! Layers added here (vs the raw repo):
|
||||
//!
|
||||
//! 1. Empty-collection rejection at the SDK boundary
|
||||
//! (`docs/sdk-shape.md`).
|
||||
//! 2. **Script-as-gate authz**: when `cx.principal.is_some()` we run
|
||||
//! `authz::require(...)`; when it's `None` (public unauthenticated
|
||||
//! HTTP — the common case for public routes) we skip the check.
|
||||
//! Cross-app isolation isn't affected — every query is keyed by
|
||||
//! `cx.app_id`, never an argument.
|
||||
//! 3. `ServiceEvent` emission after each mutation (`insert` / `update`
|
||||
//! / `delete`). v1.1.0 ships a `NoopEventEmitter` so this is a
|
||||
//! no-op until the outbox emitter lands later in v1.1.1.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{
|
||||
KvError, KvListPage, KvService, SdkCallCx, ServiceEvent, ServiceEventEmitter,
|
||||
};
|
||||
|
||||
use crate::authz::{self, AuthzRepo, Capability};
|
||||
use crate::kv_repo::{KvRepo, KvRepoError};
|
||||
|
||||
pub struct KvServiceImpl {
|
||||
repo: Arc<dyn KvRepo>,
|
||||
authz: Arc<dyn AuthzRepo>,
|
||||
events: Arc<dyn ServiceEventEmitter>,
|
||||
}
|
||||
|
||||
impl KvServiceImpl {
|
||||
#[must_use]
|
||||
pub fn new(
|
||||
repo: Arc<dyn KvRepo>,
|
||||
authz: Arc<dyn AuthzRepo>,
|
||||
events: Arc<dyn ServiceEventEmitter>,
|
||||
) -> Self {
|
||||
Self {
|
||||
repo,
|
||||
authz,
|
||||
events,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_read(&self, cx: &SdkCallCx) -> Result<(), KvError> {
|
||||
if let Some(ref principal) = cx.principal {
|
||||
authz::require(&*self.authz, principal, Capability::AppKvRead(cx.app_id))
|
||||
.await
|
||||
.map_err(|_| KvError::Forbidden)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_write(&self, cx: &SdkCallCx) -> Result<(), KvError> {
|
||||
if let Some(ref principal) = cx.principal {
|
||||
authz::require(&*self.authz, principal, Capability::AppKvWrite(cx.app_id))
|
||||
.await
|
||||
.map_err(|_| KvError::Forbidden)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_collection(collection: &str) -> Result<(), KvError> {
|
||||
if collection.is_empty() {
|
||||
return Err(KvError::InvalidCollection);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl From<KvRepoError> for KvError {
|
||||
fn from(e: KvRepoError) -> Self {
|
||||
Self::Backend(e.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KvService for KvServiceImpl {
|
||||
async fn get(
|
||||
&self,
|
||||
cx: &SdkCallCx,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
) -> Result<Option<serde_json::Value>, KvError> {
|
||||
validate_collection(collection)?;
|
||||
self.check_read(cx).await?;
|
||||
Ok(self.repo.get(cx.app_id, collection, key).await?)
|
||||
}
|
||||
|
||||
async fn set(
|
||||
&self,
|
||||
cx: &SdkCallCx,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
value: serde_json::Value,
|
||||
) -> Result<(), KvError> {
|
||||
validate_collection(collection)?;
|
||||
self.check_write(cx).await?;
|
||||
let previous = self
|
||||
.repo
|
||||
.set(cx.app_id, collection, key, value.clone())
|
||||
.await?;
|
||||
let op = if previous.is_some() {
|
||||
"update"
|
||||
} else {
|
||||
"insert"
|
||||
};
|
||||
// Emit unconditionally; the noop emitter drops it, the outbox
|
||||
// emitter persists it. Best-effort: a failed emit is logged
|
||||
// but does not roll back the write.
|
||||
if let Err(e) = self
|
||||
.events
|
||||
.emit(
|
||||
cx,
|
||||
ServiceEvent {
|
||||
source: "kv",
|
||||
op,
|
||||
collection: Some(collection.to_string()),
|
||||
key: Some(key.to_string()),
|
||||
payload: Some(value),
|
||||
old_payload: previous,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, source = "kv", op, "event emit failed");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result<bool, KvError> {
|
||||
validate_collection(collection)?;
|
||||
self.check_write(cx).await?;
|
||||
let previous = self.repo.delete(cx.app_id, collection, key).await?;
|
||||
let was_present = previous.is_some();
|
||||
if was_present {
|
||||
if let Err(e) = self
|
||||
.events
|
||||
.emit(
|
||||
cx,
|
||||
ServiceEvent {
|
||||
source: "kv",
|
||||
op: "delete",
|
||||
collection: Some(collection.to_string()),
|
||||
key: Some(key.to_string()),
|
||||
payload: None,
|
||||
old_payload: previous,
|
||||
},
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error = %e, source = "kv", op = "delete", "event emit failed");
|
||||
}
|
||||
}
|
||||
Ok(was_present)
|
||||
}
|
||||
|
||||
async fn has(&self, cx: &SdkCallCx, collection: &str, key: &str) -> Result<bool, KvError> {
|
||||
validate_collection(collection)?;
|
||||
self.check_read(cx).await?;
|
||||
Ok(self.repo.has(cx.app_id, collection, key).await?)
|
||||
}
|
||||
|
||||
async fn list(
|
||||
&self,
|
||||
cx: &SdkCallCx,
|
||||
collection: &str,
|
||||
cursor: Option<&str>,
|
||||
limit: u32,
|
||||
) -> Result<KvListPage, KvError> {
|
||||
validate_collection(collection)?;
|
||||
self.check_read(cx).await?;
|
||||
Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Tests — in-memory KvRepo so unit tests don't need Postgres.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::authz::{AuthzError, AuthzRepo};
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{
|
||||
AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal,
|
||||
RequestId, UserId,
|
||||
};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Default)]
|
||||
struct InMemoryKvRepo {
|
||||
data: Mutex<BTreeMap<(AppId, String, String), serde_json::Value>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl KvRepo for InMemoryKvRepo {
|
||||
async fn get(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
) -> Result<Option<serde_json::Value>, KvRepoError> {
|
||||
Ok(self
|
||||
.data
|
||||
.lock()
|
||||
.await
|
||||
.get(&(app_id, collection.to_string(), key.to_string()))
|
||||
.cloned())
|
||||
}
|
||||
|
||||
async fn set(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
value: serde_json::Value,
|
||||
) -> Result<Option<serde_json::Value>, KvRepoError> {
|
||||
Ok(self
|
||||
.data
|
||||
.lock()
|
||||
.await
|
||||
.insert((app_id, collection.to_string(), key.to_string()), value))
|
||||
}
|
||||
|
||||
async fn delete(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
) -> Result<Option<serde_json::Value>, KvRepoError> {
|
||||
Ok(self
|
||||
.data
|
||||
.lock()
|
||||
.await
|
||||
.remove(&(app_id, collection.to_string(), key.to_string())))
|
||||
}
|
||||
|
||||
async fn has(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
key: &str,
|
||||
) -> Result<bool, KvRepoError> {
|
||||
Ok(self.data.lock().await.contains_key(&(
|
||||
app_id,
|
||||
collection.to_string(),
|
||||
key.to_string(),
|
||||
)))
|
||||
}
|
||||
|
||||
async fn list(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
collection: &str,
|
||||
cursor: Option<&str>,
|
||||
limit: u32,
|
||||
) -> Result<KvListPage, KvRepoError> {
|
||||
let data = self.data.lock().await;
|
||||
let last_key = cursor.map(std::string::ToString::to_string);
|
||||
let mut keys: Vec<String> = data
|
||||
.iter()
|
||||
.filter(|((a, c, _), _)| *a == app_id && c == collection)
|
||||
.map(|((_, _, k), _)| k.clone())
|
||||
.filter(|k| last_key.as_ref().is_none_or(|lk| k > lk))
|
||||
.collect();
|
||||
keys.sort();
|
||||
let take = (limit as usize).max(1);
|
||||
let next_cursor = if keys.len() > take {
|
||||
keys.truncate(take);
|
||||
keys.last().cloned()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok(KvListPage { keys, next_cursor })
|
||||
}
|
||||
}
|
||||
|
||||
/// AuthzRepo that always denies — used to confirm the service
|
||||
/// short-circuits on cx.principal.is_some() with a denial, and
|
||||
/// that it does NOT call into authz when cx.principal is None.
|
||||
#[derive(Default)]
|
||||
struct DenyingAuthzRepo;
|
||||
|
||||
#[async_trait]
|
||||
impl AuthzRepo for DenyingAuthzRepo {
|
||||
async fn membership(
|
||||
&self,
|
||||
_user_id: UserId,
|
||||
_app_id: AppId,
|
||||
) -> Result<Option<AppRole>, AuthzError> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn anon_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
principal: None,
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
trigger_depth: 0,
|
||||
root_execution_id: ExecutionId::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn owner_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Owner,
|
||||
scopes: None,
|
||||
app_binding: None,
|
||||
}),
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
trigger_depth: 0,
|
||||
root_execution_id: ExecutionId::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn member_no_role_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Member,
|
||||
scopes: None,
|
||||
app_binding: None,
|
||||
}),
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
trigger_depth: 0,
|
||||
root_execution_id: ExecutionId::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn svc() -> KvServiceImpl {
|
||||
KvServiceImpl::new(
|
||||
Arc::new(InMemoryKvRepo::default()),
|
||||
Arc::new(DenyingAuthzRepo),
|
||||
Arc::new(NoopEventEmitter),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_then_get_round_trips() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
kv.set(&cx, "widgets", "k1", serde_json::json!({"n": 1}))
|
||||
.await
|
||||
.unwrap();
|
||||
let v = kv.get(&cx, "widgets", "k1").await.unwrap();
|
||||
assert_eq!(v, Some(serde_json::json!({"n": 1})));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_missing_returns_none() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
let v = kv.get(&cx, "widgets", "nope").await.unwrap();
|
||||
assert_eq!(v, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn has_returns_bool() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
assert!(!kv.has(&cx, "widgets", "k1").await.unwrap());
|
||||
kv.set(&cx, "widgets", "k1", serde_json::json!(true))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(kv.has(&cx, "widgets", "k1").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn delete_returns_was_present() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
assert!(!kv.delete(&cx, "widgets", "missing").await.unwrap());
|
||||
kv.set(&cx, "widgets", "k1", serde_json::json!(1))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(kv.delete(&cx, "widgets", "k1").await.unwrap());
|
||||
// Idempotent — second delete returns false.
|
||||
assert!(!kv.delete(&cx, "widgets", "k1").await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_collection_rejected() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
let err = kv.get(&cx, "", "k1").await.unwrap_err();
|
||||
assert!(matches!(err, KvError::InvalidCollection));
|
||||
}
|
||||
|
||||
/// Load-bearing: a script with `cx.app_id = A` must NOT see
|
||||
/// entries inserted under `cx.app_id = B`. This is the cross-app
|
||||
/// isolation boundary; getting this wrong is a security
|
||||
/// vulnerability.
|
||||
#[tokio::test]
|
||||
async fn cross_app_isolation_via_cx_app_id() {
|
||||
let kv = svc();
|
||||
let app_a = AppId::new();
|
||||
let app_b = AppId::new();
|
||||
let cx_a = anon_cx(app_a);
|
||||
let cx_b = anon_cx(app_b);
|
||||
|
||||
kv.set(&cx_a, "shared", "k", serde_json::json!("from-a"))
|
||||
.await
|
||||
.unwrap();
|
||||
kv.set(&cx_b, "shared", "k", serde_json::json!("from-b"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
kv.get(&cx_a, "shared", "k").await.unwrap(),
|
||||
Some(serde_json::json!("from-a"))
|
||||
);
|
||||
assert_eq!(
|
||||
kv.get(&cx_b, "shared", "k").await.unwrap(),
|
||||
Some(serde_json::json!("from-b"))
|
||||
);
|
||||
}
|
||||
|
||||
/// Script-as-gate: an `anon_cx` (principal = None) skips the
|
||||
/// capability check entirely. Even with a denying authz repo,
|
||||
/// the write succeeds.
|
||||
#[tokio::test]
|
||||
async fn anonymous_cx_skips_authz() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
kv.set(&cx, "widgets", "k", serde_json::json!(1))
|
||||
.await
|
||||
.unwrap();
|
||||
// No panic, no Forbidden.
|
||||
}
|
||||
|
||||
/// Authenticated principal with no role on the app: the
|
||||
/// `DenyingAuthzRepo` returns no membership, so the capability
|
||||
/// check denies. Set must surface KvError::Forbidden.
|
||||
#[tokio::test]
|
||||
async fn authed_cx_with_no_role_is_forbidden() {
|
||||
let kv = svc();
|
||||
let cx = member_no_role_cx(AppId::new());
|
||||
let err = kv
|
||||
.set(&cx, "widgets", "k", serde_json::json!(1))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, KvError::Forbidden));
|
||||
}
|
||||
|
||||
/// Owner principal: instance-role grants kick in inside `authz::can`
|
||||
/// (Owner -> implicit AppAdmin which covers KvWrite).
|
||||
#[tokio::test]
|
||||
async fn owner_principal_can_write() {
|
||||
let kv = svc();
|
||||
let cx = owner_cx(AppId::new());
|
||||
kv.set(&cx, "widgets", "k", serde_json::json!(1))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_cursor_pagination() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
for i in 0..5 {
|
||||
kv.set(
|
||||
&cx,
|
||||
"widgets",
|
||||
&format!("k{i:02}"),
|
||||
serde_json::json!({"i": i}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
// page 1 — 2 keys
|
||||
let p1 = kv.list(&cx, "widgets", None, 2).await.unwrap();
|
||||
assert_eq!(p1.keys, vec!["k00".to_string(), "k01".to_string()]);
|
||||
assert!(p1.next_cursor.is_some());
|
||||
// page 2 — 2 keys
|
||||
let p2 = kv
|
||||
.list(&cx, "widgets", p1.next_cursor.as_deref(), 2)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(p2.keys, vec!["k02".to_string(), "k03".to_string()]);
|
||||
// final page — 1 key, no cursor
|
||||
let p3 = kv
|
||||
.list(&cx, "widgets", p2.next_cursor.as_deref(), 2)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(p3.keys, vec!["k04".to_string()]);
|
||||
assert!(p3.next_cursor.is_none());
|
||||
}
|
||||
|
||||
/// Pinning the v1.1.0 contract: services hold the emitter as a
|
||||
/// dyn Arc and call `emit().await` unconditionally. This test
|
||||
/// proves the call site doesn't blow up against the noop impl —
|
||||
/// the outbox emitter (v1.1.1) drops in transparently.
|
||||
#[tokio::test]
|
||||
async fn noop_emitter_does_not_block_mutations() {
|
||||
let kv = svc();
|
||||
let cx = anon_cx(AppId::new());
|
||||
kv.set(&cx, "widgets", "k", serde_json::json!(1))
|
||||
.await
|
||||
.unwrap();
|
||||
kv.delete(&cx, "widgets", "k").await.unwrap();
|
||||
// Reaching here means emit() returned Ok and didn't panic.
|
||||
// Suppress unused-import warning when run alone:
|
||||
let _ = HashMap::<String, String>::new();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user