diff --git a/crates/manager-core/src/authz.rs b/crates/manager-core/src/authz.rs index e241186..a27861f 100644 --- a/crates/manager-core/src/authz.rs +++ b/crates/manager-core/src/authz.rs @@ -64,6 +64,14 @@ pub enum Capability { /// Write entries to this app's KV store (v1.1.1). Granted to /// `editor`+. Maps to `script:write` on API keys. AppKvWrite(AppId), + /// Read documents from this app's docs store (v1.1.2). Same trust + /// shape as KV read — granted to `viewer`+, maps to `script:read` + /// on API keys. Honors the seven-scope commitment. + AppDocsRead(AppId), + /// Write documents to this app's docs store (v1.1.2). Same trust + /// shape as KV write — granted to `editor`+, maps to + /// `script:write` on API keys. + AppDocsWrite(AppId), /// Create / list / delete triggers for this app (v1.1.1). Maps to /// `app:admin` on API keys — triggers are app-configuration acts /// rather than data-plane access. Granted to `app_admin`+. @@ -91,6 +99,8 @@ impl Capability { | Self::AppLogRead(id) | Self::AppKvRead(id) | Self::AppKvWrite(id) + | Self::AppDocsRead(id) + | Self::AppDocsWrite(id) | Self::AppManageTriggers(id) | Self::AppDeadLetterManage(id) => Some(id), } @@ -107,8 +117,10 @@ impl Capability { Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => { Scope::InstanceAdmin } - Self::AppRead(_) | Self::AppKvRead(_) => Scope::ScriptRead, - Self::AppWriteScript(_) | Self::AppKvWrite(_) => Scope::ScriptWrite, + Self::AppRead(_) | Self::AppKvRead(_) | Self::AppDocsRead(_) => Scope::ScriptRead, + Self::AppWriteScript(_) | Self::AppKvWrite(_) | Self::AppDocsWrite(_) => { + Scope::ScriptWrite + } Self::AppWriteRoute(_) => Scope::RouteWrite, Self::AppManageDomains(_) => Scope::DomainManage, Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => { @@ -253,7 +265,10 @@ async fn member_grants( const fn role_satisfies(role: AppRole, cap: Capability) -> bool { let in_viewer = matches!( cap, - Capability::AppRead(_) | Capability::AppLogRead(_) | Capability::AppKvRead(_) + Capability::AppRead(_) + | Capability::AppLogRead(_) + | Capability::AppKvRead(_) + | Capability::AppDocsRead(_) ); let in_editor = in_viewer || matches!( @@ -261,6 +276,7 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool { Capability::AppWriteScript(_) | Capability::AppWriteRoute(_) | Capability::AppKvWrite(_) + | Capability::AppDocsWrite(_) ); let in_app_admin = in_editor || matches!( diff --git a/crates/manager-core/src/docs_filter.rs b/crates/manager-core/src/docs_filter.rs new file mode 100644 index 0000000..368fb57 --- /dev/null +++ b/crates/manager-core/src/docs_filter.rs @@ -0,0 +1,598 @@ +//! v1.1.2 query DSL parser + AST for `docs::find` / `docs::find_one`. +//! +//! Sets the precedent v1.2's `dead_letters::list` will follow (see +//! `docs/v1.1.x-design-notes.md` §4 #13). When that lands we promote +//! this module to `picloud-shared` and rename to +//! `picloud_shared::query::{Filter, FieldPath, ComparisonOp}`; until +//! then keeping it private to manager-core avoids over-engineering. +//! +//! Parse stage is deliberately strict: any unrecognized `$xxx` +//! operator surfaces as `FilterParseError::UnsupportedOperator` with +//! a script-visible message naming the offending key + pointing at +//! v1.2. The error strings become part of the SDK contract once +//! scripts depend on them; pin them with snapshot tests in the test +//! module below before changing. +//! +//! ## DSL surface (v1.1.2 subset) +//! +//! ```rhai +//! // implicit equality (top-level) +//! users.find(#{ tier: "gold", status: "active" }) +//! +//! // operator object on a field +//! users.find(#{ created_at: #{ "$gt": "2026-01-01T00:00:00Z" } }) +//! +//! // dotted paths (max 5 segments) +//! users.find(#{ "user.email": "a@b" }) +//! +//! // sort + limit as filter modifiers +//! users.find(#{ tier: "gold", "$sort": #{ created_at: -1 }, "$limit": 10 }) +//! ``` +//! +//! ## Out of scope (v1.2) +//! +//! `$or`, `$and`, `$not`, `$exists`, `$regex`, `$type`, `$size`, +//! `$all`, `$elemMatch`, multi-field sort, projection, aggregations. + +use serde_json::Value; + +/// Maximum nesting depth for dotted field paths. `"a.b.c.d.e"` is the +/// deepest path allowed (5 segments). Deeper paths reject at parse +/// time with `InvalidFilter` — prevents pathological JSONB navigation +/// chains from a script. +pub const MAX_FIELD_PATH_DEPTH: usize = 5; + +/// Hard cap on `$limit` values — script-side limits are silently +/// clamped here so the Postgres query is always bounded. Mirrors the +/// `find` repo's own internal cap. +pub const MAX_FIND_LIMIT: u32 = 1_000; + +/// Parsed `docs::find` filter. +#[derive(Debug, Clone, PartialEq)] +pub struct DocsFilter { + pub conditions: Vec, + pub sort: Option, + pub limit: Option, +} + +impl DocsFilter { + /// Empty filter — matches every document in the collection. + #[must_use] + pub const fn empty() -> Self { + Self { + conditions: Vec::new(), + sort: None, + limit: None, + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct FieldCondition { + pub path: FieldPath, + pub op: ComparisonOp, + pub value: Value, +} + +/// Validated dotted path. Construct only via `FieldPath::parse` so the +/// segment invariants (non-empty, no `..`, no `$` prefix, depth ≤ 5) +/// are guaranteed. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FieldPath { + segments: Vec, +} + +impl FieldPath { + /// Parse a dotted path from a JSON object key. + pub fn parse(raw: &str) -> Result { + if raw.is_empty() { + return Err(FilterParseError::InvalidFilter( + "docs::find: field path must not be empty".into(), + )); + } + let segments: Vec<&str> = raw.split('.').collect(); + if segments.len() > MAX_FIELD_PATH_DEPTH { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: field path '{raw}' exceeds max depth {MAX_FIELD_PATH_DEPTH}" + ))); + } + for seg in &segments { + if seg.is_empty() { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: field path '{raw}' has an empty segment (leading/trailing dot or '..')" + ))); + } + if seg.starts_with('$') { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: field path segment '{seg}' must not start with '$'" + ))); + } + } + Ok(Self { + segments: segments.into_iter().map(ToString::to_string).collect(), + }) + } + + /// Path segments in order. The Postgres impl binds each as a + /// separate text parameter to `jsonb_extract_path_text`, so no + /// segment ever appears in the SQL string verbatim. + #[must_use] + pub fn segments(&self) -> &[String] { + &self.segments + } + + /// Display form for error messages — joined back with `.`. + #[must_use] + pub fn as_str(&self) -> String { + self.segments.join(".") + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ComparisonOp { + /// Implicit equality at top level OR explicit `$eq`. Maps to + /// `jsonb_extract_path_text(...) = $M`. + Eq, + /// `$ne` — uses Postgres `IS DISTINCT FROM` so JSON nulls and + /// missing paths are correctly included (`<>` returns NULL on + /// either operand being NULL, which would silently exclude rows + /// the user expects to see). + Ne, + /// `$gt` / `$gte` / `$lt` / `$lte` — text-lex comparison per the + /// brief's contract. Known limitation: lex breaks across + /// digit-count boundaries (`'10' < '9'` is TRUE). Documented in + /// CHANGELOG; v1.2 advanced query will add numeric-aware + /// operators. + Gt, + Gte, + Lt, + Lte, + /// `$in` — `= ANY($M::text[])` where the value list is bound as + /// a TEXT[]. + In, +} + +impl ComparisonOp { + /// Decode an operator key like `"$gt"`. Returns `None` for any + /// non-`$` key; returns `Some(Err(...))` for `$`-prefixed keys + /// not in the v1.1.2 allowlist (caller surfaces the + /// UnsupportedOperator error). + fn from_dollar_key(key: &str) -> Option> { + if !key.starts_with('$') { + return None; + } + Some(match key { + "$eq" => Ok(Self::Eq), + "$ne" => Ok(Self::Ne), + "$gt" => Ok(Self::Gt), + "$gte" => Ok(Self::Gte), + "$lt" => Ok(Self::Lt), + "$lte" => Ok(Self::Lte), + "$in" => Ok(Self::In), + other => Err(FilterParseError::UnsupportedOperator(format!( + "docs::find: operator '{other}' is not supported in v1.1.2; planned for v1.2 advanced query" + ))), + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Sort { + pub path: FieldPath, + pub direction: SortDir, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SortDir { + Asc, + Desc, +} + +#[derive(Debug, thiserror::Error)] +pub enum FilterParseError { + /// Bad path syntax, malformed operator value, multi-field sort, + /// etc. The string is the script-visible message. + #[error("{0}")] + InvalidFilter(String), + + /// Filter used an operator not in the v1.1.2 allowlist. The + /// string includes the offending operator + v1.2 pointer. + #[error("{0}")] + UnsupportedOperator(String), +} + +/// Parse a `serde_json::Value` filter into `DocsFilter`. The bridge +/// converts the script's Rhai map into a `Value` via +/// `executor-core::sdk::bridge::dynamic_to_json` and passes it through +/// `DocsService::find`; the service calls this parser before touching +/// the repo. +pub fn parse_filter(filter: &Value) -> Result { + let obj = filter.as_object().ok_or_else(|| { + FilterParseError::InvalidFilter("docs::find: filter must be a map/object".into()) + })?; + + let mut out = DocsFilter::empty(); + + for (key, value) in obj { + if let Some(stripped) = key.strip_prefix('$') { + // Top-level modifier — `$sort` / `$limit`. Any other + // dollar-key at top level is unsupported. + match stripped { + "sort" => out.sort = Some(parse_sort(value)?), + "limit" => out.limit = Some(parse_limit(value)?), + other => { + return Err(FilterParseError::UnsupportedOperator(format!( + "docs::find: top-level modifier '${other}' is not supported in v1.1.2; planned for v1.2 advanced query" + ))); + } + } + continue; + } + + // Field path → either implicit equality OR operator-object. + let path = FieldPath::parse(key)?; + match value { + Value::Object(inner) if is_operator_object(inner) => { + for (op_key, op_val) in inner { + let Some(op_res) = ComparisonOp::from_dollar_key(op_key) else { + // This shouldn't trigger — is_operator_object + // already guarantees every key is $-prefixed. + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: operator object for '{}' has non-$ key '{op_key}'", + path.as_str() + ))); + }; + let op = op_res?; + validate_op_value(op, op_val, &path)?; + out.conditions.push(FieldCondition { + path: path.clone(), + op, + value: op_val.clone(), + }); + } + } + // Any non-object value is implicit equality. + // (Object values with non-$ keys are user data, not an + // operator object — reject so the user doesn't accidentally + // match against a literal `{ name: "Alice" }` shape that + // would never compare meaningfully under JSONB text.) + Value::Object(_) => { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: value for '{}' must be a scalar (implicit equality) or an operator map (keys starting with '$')", + path.as_str() + ))); + } + _ => { + out.conditions.push(FieldCondition { + path, + op: ComparisonOp::Eq, + value: value.clone(), + }); + } + } + } + + Ok(out) +} + +/// True when every key in the map starts with `$`. Mixed-shape maps +/// (some `$key`, some user-data key) are rejected to avoid silent +/// surprise — the user almost certainly meant an operator object. +fn is_operator_object(map: &serde_json::Map) -> bool { + !map.is_empty() && map.keys().all(|k| k.starts_with('$')) +} + +fn validate_op_value( + op: ComparisonOp, + value: &Value, + path: &FieldPath, +) -> Result<(), FilterParseError> { + match op { + ComparisonOp::In => { + if !value.is_array() { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: '$in' on '{}' requires an array value", + path.as_str() + ))); + } + } + _ => { + // For the scalar-comparison ops, the value must be a JSON + // scalar (no arrays / no nested objects). JSON null is + // allowed — `$ne` against null is a valid query. + if value.is_array() || value.is_object() { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: '{op_name}' on '{path}' requires a scalar value", + op_name = op_name(op), + path = path.as_str() + ))); + } + } + } + Ok(()) +} + +const fn op_name(op: ComparisonOp) -> &'static str { + match op { + ComparisonOp::Eq => "$eq", + ComparisonOp::Ne => "$ne", + ComparisonOp::Gt => "$gt", + ComparisonOp::Gte => "$gte", + ComparisonOp::Lt => "$lt", + ComparisonOp::Lte => "$lte", + ComparisonOp::In => "$in", + } +} + +fn parse_sort(value: &Value) -> Result { + let map = value.as_object().ok_or_else(|| { + FilterParseError::InvalidFilter("docs::find: '$sort' must be a map".into()) + })?; + if map.is_empty() { + return Err(FilterParseError::InvalidFilter( + "docs::find: '$sort' must name at least one field".into(), + )); + } + if map.len() > 1 { + return Err(FilterParseError::InvalidFilter( + "docs::find: multi-field '$sort' is not supported in v1.1.2; planned for v1.2 advanced query" + .into(), + )); + } + let (field, dir_val) = map.iter().next().unwrap(); + let path = FieldPath::parse(field)?; + let direction = match dir_val.as_i64() { + Some(1) => SortDir::Asc, + Some(-1) => SortDir::Desc, + _ => { + return Err(FilterParseError::InvalidFilter(format!( + "docs::find: '$sort' direction for '{field}' must be 1 (ascending) or -1 (descending)" + ))); + } + }; + Ok(Sort { path, direction }) +} + +fn parse_limit(value: &Value) -> Result { + let n = value.as_i64().ok_or_else(|| { + FilterParseError::InvalidFilter("docs::find: '$limit' must be an integer".into()) + })?; + if n < 0 { + return Err(FilterParseError::InvalidFilter( + "docs::find: '$limit' must be non-negative".into(), + )); + } + Ok(u32::try_from(n) + .unwrap_or(MAX_FIND_LIMIT) + .min(MAX_FIND_LIMIT)) +} + +// ---------------------------------------------------------------------------- +// Tests — error messages are part of the SDK contract once scripts +// depend on them; the snapshot-style asserts pin the exact strings. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn parse(v: Value) -> Result { + parse_filter(&v) + } + + #[test] + fn empty_object_has_no_conditions() { + let f = parse(json!({})).unwrap(); + assert!(f.conditions.is_empty()); + assert!(f.sort.is_none()); + assert!(f.limit.is_none()); + } + + #[test] + fn single_equality_top_level() { + let f = parse(json!({ "tier": "gold" })).unwrap(); + assert_eq!(f.conditions.len(), 1); + assert_eq!(f.conditions[0].path.segments(), &["tier".to_string()]); + assert_eq!(f.conditions[0].op, ComparisonOp::Eq); + assert_eq!(f.conditions[0].value, json!("gold")); + } + + #[test] + fn multi_field_equality_is_conjunctive() { + let f = parse(json!({ "tier": "gold", "status": "active" })).unwrap(); + assert_eq!(f.conditions.len(), 2); + } + + #[test] + fn nested_dotted_path() { + let f = parse(json!({ "user.email": "a@b" })).unwrap(); + let cond = &f.conditions[0]; + assert_eq!( + cond.path.segments(), + &["user".to_string(), "email".to_string()] + ); + } + + #[test] + fn depth_limit_rejects_six_segments() { + let err = parse(json!({ "a.b.c.d.e.f": "x" })).unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("exceeds max depth"), "msg: {msg}"); + assert!(msg.contains('5'), "msg: {msg}"); + } + + #[test] + fn double_dot_rejected() { + let err = parse(json!({ "a..b": "x" })).unwrap_err(); + assert!(err.to_string().contains("empty segment")); + } + + #[test] + fn leading_dot_rejected() { + let err = parse(json!({ ".a": "x" })).unwrap_err(); + assert!(err.to_string().contains("empty segment")); + } + + #[test] + fn trailing_dot_rejected() { + let err = parse(json!({ "a.": "x" })).unwrap_err(); + assert!(err.to_string().contains("empty segment")); + } + + #[test] + fn dollar_prefix_in_path_segment_rejected() { + // (The top-level $foo would route to operator dispatch; this + // tests deeper segments which should never start with $.) + let err = parse(json!({ "x.$inner": "v" })).unwrap_err(); + assert!(err.to_string().contains("must not start with '$'")); + } + + #[test] + fn each_supported_operator_parses() { + for (key, expected_op) in [ + ("$eq", ComparisonOp::Eq), + ("$ne", ComparisonOp::Ne), + ("$gt", ComparisonOp::Gt), + ("$gte", ComparisonOp::Gte), + ("$lt", ComparisonOp::Lt), + ("$lte", ComparisonOp::Lte), + ] { + let v = json!({ "field": { key: "v" } }); + let f = parse(v).unwrap(); + assert_eq!(f.conditions[0].op, expected_op, "key {key}"); + } + // $in needs an array. + let f = parse(json!({ "tier": { "$in": ["gold", "platinum"] } })).unwrap(); + assert_eq!(f.conditions[0].op, ComparisonOp::In); + } + + #[test] + fn dollar_in_with_non_array_value_rejected() { + let err = parse(json!({ "tier": { "$in": "gold" } })).unwrap_err(); + assert!(err.to_string().contains("'$in'")); + assert!(err.to_string().contains("array")); + } + + #[test] + fn scalar_op_with_object_value_rejected() { + let err = parse(json!({ "tier": { "$gt": { "nested": true } } })).unwrap_err(); + assert!(err.to_string().contains("'$gt'")); + assert!(err.to_string().contains("scalar")); + } + + /// Snapshot: the v1.2-deferred operator error string is part of + /// the SDK contract. Don't change it without a major-version bump. + #[test] + fn unsupported_operator_message_pins_v1_2_pointer() { + let err = parse(json!({ "name": { "$regex": "^A" } })).unwrap_err(); + assert_eq!( + err.to_string(), + "docs::find: operator '$regex' is not supported in v1.1.2; planned for v1.2 advanced query" + ); + } + + #[test] + fn unsupported_top_level_modifier_rejected() { + let err = parse(json!({ "$or": [{ "x": 1 }] })).unwrap_err(); + assert!(err.to_string().contains("'$or'")); + assert!(err.to_string().contains("v1.2")); + } + + /// Snapshot: depth-limit error string. Pinned per the SDK contract. + #[test] + fn depth_limit_message_pinned() { + let err = parse(json!({ "a.b.c.d.e.f": 1 })).unwrap_err(); + assert_eq!( + err.to_string(), + "docs::find: field path 'a.b.c.d.e.f' exceeds max depth 5" + ); + } + + #[test] + fn mixed_shape_operator_object_rejected() { + // Object value where some keys are $-prefixed and some aren't + // — treated as user data + invalid (the user almost certainly + // meant an operator object). + let err = parse(json!({ "x": { "$gt": 1, "other": 2 } })).unwrap_err(); + assert!(err + .to_string() + .contains("scalar (implicit equality) or an operator map")); + } + + #[test] + fn sort_asc_and_desc_parse() { + let f = parse(json!({ "$sort": { "created_at": 1 } })).unwrap(); + let sort = f.sort.unwrap(); + assert_eq!(sort.direction, SortDir::Asc); + assert_eq!(sort.path.segments(), &["created_at".to_string()]); + + let f = parse(json!({ "$sort": { "created_at": -1 } })).unwrap(); + assert_eq!(f.sort.unwrap().direction, SortDir::Desc); + } + + #[test] + fn sort_with_bad_direction_rejected() { + let err = parse(json!({ "$sort": { "x": 2 } })).unwrap_err(); + assert!(err.to_string().contains("1 (ascending)")); + } + + /// Snapshot: multi-field sort error string. Pinned. + #[test] + fn multi_field_sort_rejected_with_v1_2_pointer() { + let err = parse(json!({ "$sort": { "a": 1, "b": -1 } })).unwrap_err(); + assert_eq!( + err.to_string(), + "docs::find: multi-field '$sort' is not supported in v1.1.2; planned for v1.2 advanced query" + ); + } + + #[test] + fn limit_accepts_non_negative_integer() { + let f = parse(json!({ "$limit": 50 })).unwrap(); + assert_eq!(f.limit, Some(50)); + } + + #[test] + fn limit_clamps_to_max() { + let f = parse(json!({ "$limit": 10_000 })).unwrap(); + assert_eq!(f.limit, Some(MAX_FIND_LIMIT)); + } + + #[test] + fn limit_rejects_negative() { + let err = parse(json!({ "$limit": -1 })).unwrap_err(); + assert!(err.to_string().contains("non-negative")); + } + + #[test] + fn limit_rejects_non_integer() { + let err = parse(json!({ "$limit": "twenty" })).unwrap_err(); + assert!(err.to_string().contains("integer")); + } + + #[test] + fn non_object_filter_rejected() { + let err = parse(json!("not a map")).unwrap_err(); + assert!(err.to_string().contains("filter must be a map/object")); + } + + #[test] + fn dollar_eq_value_can_be_null() { + // $ne against null is a valid query (returns docs where field + // exists and is not null OR is missing) — so null must be an + // accepted scalar. + let f = parse(json!({ "deleted_at": { "$ne": null } })).unwrap(); + assert_eq!(f.conditions[0].op, ComparisonOp::Ne); + assert_eq!(f.conditions[0].value, Value::Null); + } + + #[test] + fn implicit_equality_with_array_value_accepts() { + // `{ "tags": ["a", "b"] }` is implicit equality against the + // literal array shape. The Postgres query will compare the + // text encoding under JSONB; this is valid v1.1.2. + let f = parse(json!({ "tags": ["a", "b"] })).unwrap(); + assert_eq!(f.conditions[0].op, ComparisonOp::Eq); + } +} diff --git a/crates/manager-core/src/docs_repo.rs b/crates/manager-core/src/docs_repo.rs new file mode 100644 index 0000000..0ded31a --- /dev/null +++ b/crates/manager-core/src/docs_repo.rs @@ -0,0 +1,556 @@ +//! Low-level Postgres CRUD + filter-query builder over the `docs` +//! table (migration 0013). Stays storage-only; authorization, event +//! emission, and empty-collection validation live one layer up in +//! `DocsServiceImpl`. +//! +//! The `find` SQL builder is the security-critical surface. **Every +//! field-path segment and every comparison value is bound as a +//! `$N` parameter — never interpolated into the SQL string.** The base +//! `WHERE app_id = $1 AND collection = $2` clause is fixed and +//! prepended to every query so cross-app isolation can't be widened by +//! any operator. See `sql_starts_with_app_collection_predicate` +//! assertion in tests for the load-bearing guarantee. + +use async_trait::async_trait; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use base64::Engine as _; +use chrono::{DateTime, Utc}; +use picloud_shared::{AppId, DocId, DocRow, DocsListPage}; +use serde_json::Value; +use sqlx::postgres::PgRow; +use sqlx::{PgPool, Postgres, QueryBuilder, Row}; +use uuid::Uuid; + +use crate::docs_filter::{ComparisonOp, DocsFilter, SortDir}; + +#[derive(Debug, thiserror::Error)] +pub enum DocsRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), + + #[error("invalid pagination cursor")] + InvalidCursor, +} + +/// Repo surface. The trait is exposed so the service unit tests can +/// substitute an in-memory backing without spinning up Postgres. +#[async_trait] +pub trait DocsRepo: Send + Sync { + /// Create a new doc with a server-generated UUID. Returns the + /// fully-materialised `DocRow` so the caller has timestamps too + /// (no separate select-back round-trip). + async fn create( + &self, + app_id: AppId, + collection: &str, + data: Value, + ) -> Result; + + async fn get( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError>; + + /// Filter-based query. The parsed `DocsFilter` ensures every + /// field-path segment and operator value is bound as a parameter. + async fn find( + &self, + app_id: AppId, + collection: &str, + filter: &DocsFilter, + ) -> Result, DocsRepoError>; + + /// Full document replace. Returns `Some(previous_data)` on + /// success, `None` if no doc matched (the service maps that to + /// `DocsError::NotFound`). The prev value is the input to the + /// emitted update event's `old_payload`. + async fn update( + &self, + app_id: AppId, + collection: &str, + id: DocId, + data: Value, + ) -> Result, DocsRepoError>; + + /// Returns the deleted doc's data if it existed, `None` if no + /// such doc. The caller converts `Some` → `Ok(true)` for the SDK's + /// was-present return; the `Value` feeds the delete event's + /// `old_payload`. + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError>; + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result; +} + +pub struct PostgresDocsRepo { + pool: PgPool, +} + +impl PostgresDocsRepo { + #[must_use] + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +/// Hard ceiling on `list` page size — mirrors KV's `KV_LIST_MAX_LIMIT`. +/// Scripts that pass anything larger get silently clamped. +const DOCS_LIST_MAX_LIMIT: u32 = 1_000; +const DOCS_LIST_DEFAULT_LIMIT: u32 = 100; + +#[async_trait] +impl DocsRepo for PostgresDocsRepo { + async fn create( + &self, + app_id: AppId, + collection: &str, + data: Value, + ) -> Result { + let id = Uuid::new_v4(); + let row: (DateTime, DateTime) = sqlx::query_as( + "INSERT INTO docs (app_id, collection, id, data) \ + VALUES ($1, $2, $3, $4) \ + RETURNING created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .bind(&data) + .fetch_one(&self.pool) + .await?; + Ok(DocRow { + id, + data, + created_at: row.0, + updated_at: row.1, + }) + } + + async fn get( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError> { + let row: Option<(Value, DateTime, DateTime)> = sqlx::query_as( + "SELECT data, created_at, updated_at FROM docs \ + WHERE app_id = $1 AND collection = $2 AND id = $3", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|(data, created_at, updated_at)| DocRow { + id, + data, + created_at, + updated_at, + })) + } + + async fn find( + &self, + app_id: AppId, + collection: &str, + filter: &DocsFilter, + ) -> Result, DocsRepoError> { + let mut qb = build_find_query(app_id, collection, filter); + let rows = qb.build().fetch_all(&self.pool).await?; + rows.into_iter().map(row_to_doc).collect() + } + + async fn update( + &self, + app_id: AppId, + collection: &str, + id: DocId, + data: Value, + ) -> Result, DocsRepoError> { + // Same CTE shape as KV's set ([kv_repo.rs:101-132]): SELECT the + // previous data before the UPDATE so the service can emit + // `prev_data` in the update ServiceEvent. Single statement, no + // explicit transaction. Inherits KV's last-writer-wins race + // under concurrent writers; documented as a known limitation + // for v1.1.2. + let row: Option<(Option,)> = sqlx::query_as( + "WITH prev AS ( \ + SELECT data FROM docs \ + WHERE app_id = $1 AND collection = $2 AND id = $3 \ + ), \ + updated AS ( \ + UPDATE docs SET data = $4, updated_at = NOW() \ + WHERE app_id = $1 AND collection = $2 AND id = $3 \ + RETURNING 1 \ + ) \ + SELECT (SELECT data FROM prev) FROM updated", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .bind(&data) + .fetch_optional(&self.pool) + .await?; + // `row` is None when the UPDATE matched no rows (missing doc); + // Some((Some(prev),)) on success. `data` is JSONB NOT NULL so + // the inner Option is always Some when prev exists. + Ok(row.and_then(|(v,)| v)) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError> { + let row: Option<(Value,)> = sqlx::query_as( + "DELETE FROM docs \ + WHERE app_id = $1 AND collection = $2 AND id = $3 \ + RETURNING data", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|(v,)| v)) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let limit = if limit == 0 { + DOCS_LIST_DEFAULT_LIMIT + } else { + limit.min(DOCS_LIST_MAX_LIMIT) + }; + + let last_id = match cursor { + Some(c) => Some(decode_cursor(c)?), + None => None, + }; + + let take = i64::from(limit) + 1; + let rows: Vec<(Uuid, Value, DateTime, DateTime)> = sqlx::query_as( + "SELECT id, data, created_at, updated_at FROM docs \ + WHERE app_id = $1 AND collection = $2 \ + AND ($3::uuid IS NULL OR id > $3) \ + ORDER BY id ASC \ + LIMIT $4", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(last_id) + .bind(take) + .fetch_all(&self.pool) + .await?; + + let mut docs: Vec = rows + .into_iter() + .map(|(id, data, created_at, updated_at)| DocRow { + id, + data, + created_at, + updated_at, + }) + .collect(); + let next_cursor = if docs.len() > limit as usize { + docs.truncate(limit as usize); + docs.last().map(|d| encode_cursor(&d.id)) + } else { + None + }; + + Ok(DocsListPage { docs, next_cursor }) + } +} + +fn row_to_doc(row: PgRow) -> Result { + Ok(DocRow { + id: row.try_get("id")?, + data: row.try_get("data")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) +} + +fn encode_cursor(last_id: &Uuid) -> String { + URL_SAFE_NO_PAD.encode(last_id.as_bytes()) +} + +fn decode_cursor(cursor: &str) -> Result { + let bytes = URL_SAFE_NO_PAD + .decode(cursor) + .map_err(|_| DocsRepoError::InvalidCursor)?; + let arr: [u8; 16] = bytes + .as_slice() + .try_into() + .map_err(|_| DocsRepoError::InvalidCursor)?; + Ok(Uuid::from_bytes(arr)) +} + +// ---------------------------------------------------------------------------- +// SQL builder — the load-bearing security surface. +// +// Every field-path segment + every comparison value goes through +// `QueryBuilder::push_bind`, which appends `$N` to the SQL string and +// binds the value as a parameter. The only literal strings appended to +// the SQL are: hardcoded SQL fragments (SELECT/WHERE/AND/etc.) and +// hardcoded operator strings ("=", "IS DISTINCT FROM", ">", "ASC", …). +// **No user input ever lands in the SQL text unparameterized.** +// ---------------------------------------------------------------------------- + +fn build_find_query<'a>( + app_id: AppId, + collection: &'a str, + filter: &'a DocsFilter, +) -> QueryBuilder<'a, Postgres> { + let mut qb = + QueryBuilder::new("SELECT id, data, created_at, updated_at FROM docs WHERE app_id = "); + qb.push_bind(app_id.into_inner()); + qb.push(" AND collection = "); + qb.push_bind(collection); + + for cond in &filter.conditions { + qb.push(" AND "); + emit_condition(&mut qb, cond); + } + + qb.push(" ORDER BY "); + if let Some(sort) = &filter.sort { + push_jsonb_path(&mut qb, sort.path.segments()); + qb.push(match sort.direction { + SortDir::Asc => " ASC", + SortDir::Desc => " DESC", + }); + qb.push(", id ASC"); + } else { + qb.push("id ASC"); + } + + let limit = filter + .limit + .map_or(DOCS_LIST_MAX_LIMIT, |l| l.min(DOCS_LIST_MAX_LIMIT)); + qb.push(" LIMIT "); + qb.push_bind(i64::from(limit)); + + qb +} + +fn emit_condition<'a>( + qb: &mut QueryBuilder<'a, Postgres>, + cond: &'a crate::docs_filter::FieldCondition, +) { + push_jsonb_path(qb, cond.path.segments()); + match cond.op { + ComparisonOp::Eq => { + if cond.value.is_null() { + qb.push(" IS NULL"); + } else { + qb.push(" = "); + qb.push_bind(value_to_text(&cond.value)); + } + } + ComparisonOp::Ne => { + // IS DISTINCT FROM correctly handles NULL on either side + // (would otherwise silently exclude rows with missing + // paths). Holds for the literal-NULL case too. + if cond.value.is_null() { + qb.push(" IS NOT NULL"); + } else { + qb.push(" IS DISTINCT FROM "); + qb.push_bind(value_to_text(&cond.value)); + } + } + ComparisonOp::Gt => { + qb.push(" > "); + qb.push_bind(value_to_text(&cond.value)); + } + ComparisonOp::Gte => { + qb.push(" >= "); + qb.push_bind(value_to_text(&cond.value)); + } + ComparisonOp::Lt => { + qb.push(" < "); + qb.push_bind(value_to_text(&cond.value)); + } + ComparisonOp::Lte => { + qb.push(" <= "); + qb.push_bind(value_to_text(&cond.value)); + } + ComparisonOp::In => { + qb.push(" = ANY("); + let texts: Vec> = cond + .value + .as_array() + .map(|arr| arr.iter().map(value_to_text).collect()) + .unwrap_or_default(); + qb.push_bind(texts); + qb.push(")"); + } + } +} + +/// Append `jsonb_extract_path_text(data, $N1, $N2, …)` with each +/// segment bound as a separate text parameter. Variadic path lengths +/// (1–5) all flow through this single helper. +fn push_jsonb_path<'a>(qb: &mut QueryBuilder<'a, Postgres>, segments: &'a [String]) { + qb.push("jsonb_extract_path_text(data"); + for seg in segments { + qb.push(", "); + qb.push_bind(seg.as_str()); + } + qb.push(")"); +} + +/// JSON scalar → TEXT for binding. `Value::Null` is preserved as +/// `None` so the binding lands as SQL NULL (handled specially above for +/// `Eq` / `Ne`). Arrays + objects serialize to compact JSON; the user +/// is comparing against the JSONB text rendering, which is consistent +/// with `jsonb_extract_path_text`'s output for those types. +fn value_to_text(v: &Value) -> Option { + match v { + Value::Null => None, + Value::String(s) => Some(s.clone()), + Value::Bool(b) => Some(b.to_string()), + Value::Number(n) => Some(n.to_string()), + Value::Array(_) | Value::Object(_) => Some(v.to_string()), + } +} + +// ---------------------------------------------------------------------------- +// SQL-shape guardrail tests — pure (no DB) so they run in the default +// test suite. These are the highest-stakes tests in the release: they +// pin the cross-app isolation invariant at the SQL level. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod sql_shape_tests { + use super::*; + use crate::docs_filter::parse_filter; + use serde_json::json; + + fn sql_for(filter_json: serde_json::Value) -> String { + let filter = parse_filter(&filter_json).unwrap(); + let qb = build_find_query(AppId::new(), "users", &filter); + qb.sql().to_string() + } + + /// **Load-bearing**: every generated SELECT begins + /// `WHERE app_id = $1 AND collection = $2`. The app_id parameter + /// is the cross-app isolation gate. No user-supplied filter + /// fragment can ever appear before these clauses. + #[test] + fn every_query_starts_with_app_id_and_collection_predicate() { + let cases = vec![ + json!({}), + json!({ "tier": "gold" }), + json!({ "created_at": { "$gt": "2026-01-01" } }), + json!({ "tier": { "$in": ["gold", "platinum"] } }), + json!({ "tier": "gold", "status": "active" }), + json!({ "$sort": { "created_at": -1 }, "$limit": 5 }), + json!({ "tier": "gold", "$sort": { "created_at": 1 } }), + json!({ "deleted_at": { "$ne": null } }), + ]; + for case in cases { + let sql = sql_for(case.clone()); + assert!( + sql.starts_with( + "SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2" + ), + "filter {case} produced SQL: {sql}" + ); + } + } + + /// Every comparison value lands as a `$N` placeholder — there + /// should be NO double-quoted string literal in the SQL after the + /// fixed prefix. (This guards against an accidental `format!` + /// regression.) + #[test] + fn no_user_string_literal_in_sql() { + let sql = sql_for(json!({ "tier": "gold; DROP TABLE docs;--" })); + assert!(!sql.contains("gold"), "value leaked into SQL string: {sql}"); + assert!(!sql.contains("DROP"), "value leaked into SQL string: {sql}"); + } + + /// Field-path segments also bind as parameters. A user passing a + /// path that looks like SQL keywords doesn't change the structure. + #[test] + fn no_user_path_literal_in_sql() { + let sql = sql_for(json!({ "drop_table_users": "v" })); + assert!( + !sql.contains("drop_table_users"), + "path leaked into SQL string: {sql}" + ); + } + + #[test] + fn empty_filter_sql_has_no_extra_conditions() { + let sql = sql_for(json!({})); + // After the fixed prefix, only ORDER BY + LIMIT — no `AND`s. + let suffix = sql + .trim_start_matches( + "SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2", + ) + .trim(); + assert!( + suffix.starts_with("ORDER BY"), + "expected ORDER BY immediately after base WHERE; got: {suffix}" + ); + } + + #[test] + fn eq_with_null_emits_is_null() { + let sql = sql_for(json!({ "x": null })); + assert!(sql.contains("IS NULL"), "sql: {sql}"); + } + + #[test] + fn ne_with_null_emits_is_not_null() { + let sql = sql_for(json!({ "x": { "$ne": null } })); + assert!(sql.contains("IS NOT NULL"), "sql: {sql}"); + } + + #[test] + fn ne_with_value_uses_is_distinct_from() { + // IS DISTINCT FROM, NOT <> — see ComparisonOp::Ne comment. + let sql = sql_for(json!({ "x": { "$ne": "v" } })); + assert!(sql.contains("IS DISTINCT FROM"), "sql: {sql}"); + assert!(!sql.contains(" <> "), "sql: {sql}"); + } + + #[test] + fn in_emits_any_array() { + let sql = sql_for(json!({ "x": { "$in": ["a", "b"] } })); + assert!(sql.contains("= ANY"), "sql: {sql}"); + } + + #[test] + fn sort_appends_tiebreaker_id_asc() { + let sql = sql_for(json!({ "$sort": { "created_at": -1 } })); + assert!(sql.contains("DESC, id ASC"), "sql: {sql}"); + } + + #[test] + fn jsonb_extract_path_used_for_field_access() { + let sql = sql_for(json!({ "user.email": "a@b" })); + assert!(sql.contains("jsonb_extract_path_text(data"), "sql: {sql}"); + } +} diff --git a/crates/manager-core/src/docs_service.rs b/crates/manager-core/src/docs_service.rs new file mode 100644 index 0000000..7c58351 --- /dev/null +++ b/crates/manager-core/src/docs_service.rs @@ -0,0 +1,890 @@ +//! `DocsServiceImpl` — wires the `DocsRepo` underneath the +//! `picloud_shared::DocsService` trait that scripts see via the Rhai +//! bridge. +//! +//! Layers added here (vs the raw repo): +//! +//! 1. Empty-collection rejection at the SDK boundary +//! (`docs/sdk-shape.md`). +//! 2. `data` must be a JSON object for create + update. (The repo +//! accepts anything serde_json can serialise; the SDK contract +//! pins documents to map shape so dotted-path queries make sense.) +//! 3. **Script-as-gate authz**: when `cx.principal.is_some()` we run +//! `authz::require(...)`; when it's `None` (public unauthenticated +//! HTTP — the common case for public routes) we skip the check. +//! Cross-app isolation isn't affected — every query is keyed by +//! `cx.app_id`, never an argument. +//! 4. Query DSL parse — `find`/`find_one` parse the opaque filter +//! into `DocsFilter` before passing it down. Parse errors map to +//! `DocsError::InvalidFilter` / `UnsupportedOperator` with the +//! parser's message verbatim (script-visible). +//! 5. `ServiceEvent` emission after each mutation (`create` / `update` +//! / `delete`). The outbox emitter (when wired) turns these into +//! docs-trigger fan-out via `OutboxEventEmitter::emit_docs`. + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{ + DocId, DocRow, DocsError, DocsListPage, DocsService, SdkCallCx, ServiceEvent, + ServiceEventEmitter, +}; + +use crate::authz::{self, AuthzRepo, Capability}; +use crate::docs_filter::{parse_filter, FilterParseError}; +use crate::docs_repo::{DocsRepo, DocsRepoError}; + +pub struct DocsServiceImpl { + repo: Arc, + authz: Arc, + events: Arc, +} + +impl DocsServiceImpl { + #[must_use] + pub fn new( + repo: Arc, + authz: Arc, + events: Arc, + ) -> Self { + Self { + repo, + authz, + events, + } + } + + async fn check_read(&self, cx: &SdkCallCx) -> Result<(), DocsError> { + if let Some(ref principal) = cx.principal { + authz::require(&*self.authz, principal, Capability::AppDocsRead(cx.app_id)) + .await + .map_err(|_| DocsError::Forbidden)?; + } + Ok(()) + } + + async fn check_write(&self, cx: &SdkCallCx) -> Result<(), DocsError> { + if let Some(ref principal) = cx.principal { + authz::require(&*self.authz, principal, Capability::AppDocsWrite(cx.app_id)) + .await + .map_err(|_| DocsError::Forbidden)?; + } + Ok(()) + } +} + +fn validate_collection(collection: &str) -> Result<(), DocsError> { + if collection.is_empty() { + return Err(DocsError::InvalidCollection); + } + Ok(()) +} + +fn validate_data(data: &serde_json::Value) -> Result<(), DocsError> { + if !data.is_object() { + return Err(DocsError::InvalidData); + } + Ok(()) +} + +impl From for DocsError { + fn from(e: DocsRepoError) -> Self { + Self::Backend(e.to_string()) + } +} + +impl From for DocsError { + fn from(e: FilterParseError) -> Self { + match e { + FilterParseError::InvalidFilter(s) => Self::InvalidFilter(s), + FilterParseError::UnsupportedOperator(s) => Self::UnsupportedOperator(s), + } + } +} + +#[async_trait] +impl DocsService for DocsServiceImpl { + async fn create( + &self, + cx: &SdkCallCx, + collection: &str, + data: serde_json::Value, + ) -> Result { + validate_collection(collection)?; + validate_data(&data)?; + self.check_write(cx).await?; + let row = self + .repo + .create(cx.app_id, collection, data.clone()) + .await?; + // Best-effort emit — a failed emit logs but does not roll back + // the write (mirrors KV's pattern). + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "docs", + op: "create", + collection: Some(collection.to_string()), + key: Some(row.id.to_string()), + payload: Some(data), + old_payload: None, + }, + ) + .await + { + tracing::warn!(error = %e, source = "docs", op = "create", "event emit failed"); + } + Ok(row.id) + } + + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + id: DocId, + ) -> Result, DocsError> { + validate_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.get(cx.app_id, collection, id).await?) + } + + async fn find( + &self, + cx: &SdkCallCx, + collection: &str, + filter: serde_json::Value, + ) -> Result, DocsError> { + validate_collection(collection)?; + self.check_read(cx).await?; + let parsed = parse_filter(&filter)?; + Ok(self.repo.find(cx.app_id, collection, &parsed).await?) + } + + async fn find_one( + &self, + cx: &SdkCallCx, + collection: &str, + filter: serde_json::Value, + ) -> Result, DocsError> { + validate_collection(collection)?; + self.check_read(cx).await?; + let mut parsed = parse_filter(&filter)?; + // Inject the implicit `LIMIT 1` for find_one — explicit + // caller-supplied `$limit` wins. + if parsed.limit.is_none() { + parsed.limit = Some(1); + } + let rows = self.repo.find(cx.app_id, collection, &parsed).await?; + Ok(rows.into_iter().next()) + } + + async fn update( + &self, + cx: &SdkCallCx, + collection: &str, + id: DocId, + data: serde_json::Value, + ) -> Result<(), DocsError> { + validate_collection(collection)?; + validate_data(&data)?; + self.check_write(cx).await?; + let previous = self + .repo + .update(cx.app_id, collection, id, data.clone()) + .await?; + match previous { + Some(prev) => { + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "docs", + op: "update", + collection: Some(collection.to_string()), + key: Some(id.to_string()), + payload: Some(data), + old_payload: Some(prev), + }, + ) + .await + { + tracing::warn!(error = %e, source = "docs", op = "update", "event emit failed"); + } + Ok(()) + } + None => Err(DocsError::NotFound), + } + } + + async fn delete(&self, cx: &SdkCallCx, collection: &str, id: DocId) -> Result { + validate_collection(collection)?; + self.check_write(cx).await?; + let previous = self.repo.delete(cx.app_id, collection, id).await?; + let was_present = previous.is_some(); + if let Some(prev) = previous { + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "docs", + op: "delete", + collection: Some(collection.to_string()), + key: Some(id.to_string()), + payload: None, + old_payload: Some(prev), + }, + ) + .await + { + tracing::warn!(error = %e, source = "docs", op = "delete", "event emit failed"); + } + } + Ok(was_present) + } + + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + validate_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?) + } +} + +// ---------------------------------------------------------------------------- +// Tests — in-memory DocsRepo so unit tests don't need Postgres. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::authz::{AuthzError, AuthzRepo}; + use crate::docs_filter::DocsFilter; + use async_trait::async_trait; + use chrono::Utc; + use picloud_shared::{ + AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal, + RequestId, UserId, + }; + use serde_json::json; + use std::collections::BTreeMap; + use std::sync::Arc; + use tokio::sync::Mutex; + use uuid::Uuid; + + /// In-memory backing: BTreeMap keyed by `(app_id, collection, id)` + /// so iteration is naturally ordered for stable cursor pagination + /// (matches the Postgres `ORDER BY id ASC`). + #[derive(Default)] + struct InMemoryDocsRepo { + data: Mutex>, + } + + #[async_trait] + impl DocsRepo for InMemoryDocsRepo { + async fn create( + &self, + app_id: AppId, + collection: &str, + data: serde_json::Value, + ) -> Result { + let id = Uuid::new_v4(); + let now = Utc::now(); + let row = DocRow { + id, + data, + created_at: now, + updated_at: now, + }; + self.data + .lock() + .await + .insert((app_id, collection.to_string(), id), row.clone()); + Ok(row) + } + + async fn get( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError> { + Ok(self + .data + .lock() + .await + .get(&(app_id, collection.to_string(), id)) + .cloned()) + } + + async fn find( + &self, + app_id: AppId, + collection: &str, + filter: &DocsFilter, + ) -> Result, DocsRepoError> { + let map = self.data.lock().await; + let mut out: Vec = map + .iter() + .filter(|((a, c, _), _)| *a == app_id && c == collection) + .map(|(_, v)| v.clone()) + .filter(|row| in_memory_matches(row, filter)) + .collect(); + if let Some(sort) = &filter.sort { + let path = sort.path.segments().to_vec(); + let dir = sort.direction; + out.sort_by(|a, b| { + let av = extract_path_str(&a.data, &path); + let bv = extract_path_str(&b.data, &path); + let ord = av.cmp(&bv); + match dir { + crate::docs_filter::SortDir::Asc => ord, + crate::docs_filter::SortDir::Desc => ord.reverse(), + } + }); + } else { + out.sort_by_key(|d| d.id); + } + if let Some(limit) = filter.limit { + out.truncate(limit as usize); + } + Ok(out) + } + + async fn update( + &self, + app_id: AppId, + collection: &str, + id: DocId, + data: serde_json::Value, + ) -> Result, DocsRepoError> { + let mut map = self.data.lock().await; + let key = (app_id, collection.to_string(), id); + let Some(existing) = map.get_mut(&key) else { + return Ok(None); + }; + let prev = std::mem::replace(&mut existing.data, data); + existing.updated_at = Utc::now(); + Ok(Some(prev)) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: DocId, + ) -> Result, DocsRepoError> { + Ok(self + .data + .lock() + .await + .remove(&(app_id, collection.to_string(), id)) + .map(|row| row.data)) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let map = self.data.lock().await; + let last_id = cursor + .map(|c| Uuid::parse_str(c).map_err(|_| DocsRepoError::InvalidCursor)) + .transpose()?; + let mut docs: Vec = map + .iter() + .filter(|((a, c, _), _)| *a == app_id && c == collection) + .map(|(_, v)| v.clone()) + .filter(|d| last_id.is_none_or(|lid| d.id > lid)) + .collect(); + docs.sort_by_key(|d| d.id); + let take = if limit == 0 { + usize::MAX + } else { + limit as usize + }; + let next_cursor = if docs.len() > take { + docs.truncate(take); + docs.last().map(|d| d.id.to_string()) + } else { + None + }; + Ok(DocsListPage { docs, next_cursor }) + } + } + + /// Best-effort in-memory filter eval mirroring the Postgres + /// semantics: extract each field path as a text-form string, then + /// apply the operator. Good enough for the unit tests; production + /// always goes through the Postgres impl. + fn in_memory_matches(row: &DocRow, filter: &DocsFilter) -> bool { + for cond in &filter.conditions { + let actual = extract_path_str(&row.data, cond.path.segments()); + if !cond_matches(actual.as_ref(), cond) { + return false; + } + } + true + } + + fn cond_matches(actual: Option<&String>, cond: &crate::docs_filter::FieldCondition) -> bool { + use crate::docs_filter::ComparisonOp::*; + let actual: Option<&str> = actual.map(String::as_str); + let want = json_text(&cond.value); + let want_ref: Option<&str> = want.as_deref(); + match cond.op { + Eq => actual == want_ref, + Ne => actual != want_ref, + Gt => actual.zip(want_ref).is_some_and(|(a, b)| a > b), + Gte => actual.zip(want_ref).is_some_and(|(a, b)| a >= b), + Lt => actual.zip(want_ref).is_some_and(|(a, b)| a < b), + Lte => actual.zip(want_ref).is_some_and(|(a, b)| a <= b), + In => { + let Some(arr) = cond.value.as_array() else { + return false; + }; + arr.iter() + .any(|v| actual == json_text(v).as_deref()) + } + } + } + + fn extract_path_str(value: &serde_json::Value, segments: &[String]) -> Option { + let mut cur = value; + for seg in segments { + cur = cur.as_object()?.get(seg)?; + } + json_text(cur) + } + + fn json_text(v: &serde_json::Value) -> Option { + match v { + serde_json::Value::Null => None, + serde_json::Value::String(s) => Some(s.clone()), + serde_json::Value::Bool(b) => Some(b.to_string()), + serde_json::Value::Number(n) => Some(n.to_string()), + serde_json::Value::Array(_) | serde_json::Value::Object(_) => Some(v.to_string()), + } + } + + #[derive(Default)] + struct DenyingAuthzRepo; + + #[async_trait] + impl AuthzRepo for DenyingAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(None) + } + } + + #[derive(Default)] + struct AllowingAuthzRepo; + + #[async_trait] + impl AuthzRepo for AllowingAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(Some(AppRole::Editor)) + } + } + + fn anon_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: None, + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } + } + + fn owner_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Owner, + scopes: None, + app_binding: None, + }), + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } + } + + fn member_no_role_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Member, + scopes: None, + app_binding: None, + }), + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } + } + + fn svc() -> DocsServiceImpl { + DocsServiceImpl::new( + Arc::new(InMemoryDocsRepo::default()), + Arc::new(DenyingAuthzRepo), + Arc::new(NoopEventEmitter), + ) + } + + fn svc_allowing() -> DocsServiceImpl { + DocsServiceImpl::new( + Arc::new(InMemoryDocsRepo::default()), + Arc::new(AllowingAuthzRepo), + Arc::new(NoopEventEmitter), + ) + } + + #[tokio::test] + async fn create_then_get_round_trips() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s + .create(&cx, "users", json!({ "name": "Alice" })) + .await + .unwrap(); + let row = s.get(&cx, "users", id).await.unwrap().unwrap(); + assert_eq!(row.id, id); + assert_eq!(row.data, json!({ "name": "Alice" })); + } + + #[tokio::test] + async fn get_missing_returns_none() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let v = s.get(&cx, "users", Uuid::new_v4()).await.unwrap(); + assert!(v.is_none()); + } + + #[tokio::test] + async fn update_missing_returns_not_found() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let err = s + .update(&cx, "users", Uuid::new_v4(), json!({ "x": 1 })) + .await + .unwrap_err(); + assert!(matches!(err, DocsError::NotFound)); + } + + #[tokio::test] + async fn delete_missing_returns_false() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let was_present = s.delete(&cx, "users", Uuid::new_v4()).await.unwrap(); + assert!(!was_present); + } + + #[tokio::test] + async fn delete_present_returns_true() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + let was_present = s.delete(&cx, "users", id).await.unwrap(); + assert!(was_present); + } + + #[tokio::test] + async fn update_present_succeeds() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap(); + let row = s.get(&cx, "users", id).await.unwrap().unwrap(); + assert_eq!(row.data, json!({ "x": 2 })); + } + + #[tokio::test] + async fn empty_collection_rejected() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let err = s.create(&cx, "", json!({})).await.unwrap_err(); + assert!(matches!(err, DocsError::InvalidCollection)); + } + + #[tokio::test] + async fn create_with_non_object_data_rejected() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let err = s.create(&cx, "users", json!(42)).await.unwrap_err(); + assert!(matches!(err, DocsError::InvalidData)); + } + + #[tokio::test] + async fn update_with_non_object_data_rejected() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + let err = s + .update(&cx, "users", id, json!("not an object")) + .await + .unwrap_err(); + assert!(matches!(err, DocsError::InvalidData)); + } + + /// Load-bearing: a script with `cx.app_id = A` must NOT see + /// documents created under `cx.app_id = B`. Cross-app isolation + /// boundary; tested through both `get` and `find` because each + /// path could conceivably leak independently. + #[tokio::test] + async fn cross_app_isolation_via_cx_app_id() { + let s = svc(); + let app_a = AppId::new(); + let app_b = AppId::new(); + let cx_a = anon_cx(app_a); + let cx_b = anon_cx(app_b); + + let id_a = s + .create(&cx_a, "shared", json!({ "from": "a" })) + .await + .unwrap(); + let id_b = s + .create(&cx_b, "shared", json!({ "from": "b" })) + .await + .unwrap(); + assert_ne!(id_a, id_b); + + // Each app sees only its own doc via get. + assert!(s.get(&cx_a, "shared", id_b).await.unwrap().is_none()); + assert!(s.get(&cx_b, "shared", id_a).await.unwrap().is_none()); + + // And via find. + let from_a = s.find(&cx_a, "shared", json!({})).await.unwrap(); + assert_eq!(from_a.len(), 1); + assert_eq!(from_a[0].id, id_a); + + let from_b = s.find(&cx_b, "shared", json!({})).await.unwrap(); + assert_eq!(from_b.len(), 1); + assert_eq!(from_b[0].id, id_b); + } + + #[tokio::test] + async fn anonymous_cx_skips_authz() { + // Denying authz repo + anon cx (no principal) ⇒ writes still + // succeed under script-as-gate. + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + let _ = s.delete(&cx, "users", id).await.unwrap(); + } + + #[tokio::test] + async fn authed_cx_with_no_role_is_forbidden_on_write() { + let s = svc(); + let cx = member_no_role_cx(AppId::new()); + let err = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap_err(); + assert!(matches!(err, DocsError::Forbidden)); + } + + #[tokio::test] + async fn authed_cx_with_no_role_is_forbidden_on_read() { + let s = svc(); + let cx = member_no_role_cx(AppId::new()); + let err = s.get(&cx, "users", Uuid::new_v4()).await.unwrap_err(); + assert!(matches!(err, DocsError::Forbidden)); + } + + #[tokio::test] + async fn owner_principal_can_write() { + let s = svc(); + let cx = owner_cx(AppId::new()); + let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + } + + #[tokio::test] + async fn editor_member_can_write_via_role() { + // AllowingAuthzRepo grants Editor — should be able to write + // (AppDocsWrite is in_editor in role_satisfies). + let s = svc_allowing(); + let cx = member_no_role_cx(AppId::new()); + let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + } + + #[tokio::test] + async fn find_with_equality_returns_matches() { + let s = svc(); + let cx = anon_cx(AppId::new()); + s.create(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + s.create(&cx, "users", json!({ "tier": "silver" })) + .await + .unwrap(); + s.create(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + + let golds = s + .find(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + assert_eq!(golds.len(), 2); + } + + #[tokio::test] + async fn find_one_returns_first_or_none() { + let s = svc(); + let cx = anon_cx(AppId::new()); + s.create(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + + let hit = s + .find_one(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + assert!(hit.is_some()); + + let miss = s + .find_one(&cx, "users", json!({ "tier": "platinum" })) + .await + .unwrap(); + assert!(miss.is_none()); + } + + #[tokio::test] + async fn find_with_unsupported_operator_throws() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let err = s + .find(&cx, "users", json!({ "name": { "$regex": "^A" } })) + .await + .unwrap_err(); + match err { + DocsError::UnsupportedOperator(m) => { + assert!(m.contains("$regex")); + assert!(m.contains("v1.2")); + } + other => panic!("expected UnsupportedOperator, got {other:?}"), + } + } + + #[tokio::test] + async fn find_with_invalid_filter_throws() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let err = s + .find(&cx, "users", json!({ "a.b.c.d.e.f": "x" })) + .await + .unwrap_err(); + assert!(matches!(err, DocsError::InvalidFilter(_))); + } + + #[tokio::test] + async fn find_with_dollar_in_returns_subset() { + let s = svc(); + let cx = anon_cx(AppId::new()); + s.create(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + s.create(&cx, "users", json!({ "tier": "silver" })) + .await + .unwrap(); + s.create(&cx, "users", json!({ "tier": "platinum" })) + .await + .unwrap(); + + let hits = s + .find( + &cx, + "users", + json!({ "tier": { "$in": ["gold", "platinum"] } }), + ) + .await + .unwrap(); + assert_eq!(hits.len(), 2); + } + + #[tokio::test] + async fn find_one_explicit_limit_is_honoured() { + // The service injects limit=1 ONLY when caller didn't set + // $limit. An explicit `$limit: 5` survives — and find_one + // still returns the first. + let s = svc(); + let cx = anon_cx(AppId::new()); + for _ in 0..3 { + s.create(&cx, "users", json!({ "tier": "gold" })) + .await + .unwrap(); + } + let hit = s + .find_one(&cx, "users", json!({ "tier": "gold", "$limit": 5 })) + .await + .unwrap(); + assert!(hit.is_some()); + } + + #[tokio::test] + async fn list_cursor_pagination() { + let s = svc(); + let cx = anon_cx(AppId::new()); + let mut ids = Vec::new(); + for _ in 0..5 { + ids.push(s.create(&cx, "users", json!({})).await.unwrap()); + } + ids.sort(); + + let p1 = s.list(&cx, "users", None, 2).await.unwrap(); + assert_eq!(p1.docs.len(), 2); + assert!(p1.next_cursor.is_some()); + + let p2 = s + .list(&cx, "users", p1.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p2.docs.len(), 2); + + let p3 = s + .list(&cx, "users", p2.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p3.docs.len(), 1); + assert!(p3.next_cursor.is_none()); + } + + #[tokio::test] + async fn noop_emitter_does_not_block_mutations() { + // Pins v1.1.0 contract: services hold an Arc + // and call emit().await unconditionally. The noop drops it. + let s = svc(); + let cx = anon_cx(AppId::new()); + let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); + s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap(); + let _ = s.delete(&cx, "users", id).await.unwrap(); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index 6ff6869..4a7c3bc 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -26,6 +26,9 @@ pub mod dead_letter_repo; pub mod dead_letter_service; pub mod dead_letters_api; pub mod dispatcher; +pub mod docs_filter; +pub mod docs_repo; +pub mod docs_service; pub mod gc; pub mod kv_repo; pub mod kv_service; @@ -86,6 +89,8 @@ pub use dead_letter_repo::{ pub use dead_letter_service::PostgresDeadLetterService; pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState}; pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; +pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo}; +pub use docs_service::DocsServiceImpl; pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_service::KvServiceImpl; @@ -104,8 +109,8 @@ pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository}; pub use sandbox::{CeilingError, SandboxCeiling}; pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_repo::{ - collection_matches, CreateDeadLetterTrigger, CreateKvTrigger, DeadLetterTriggerMatch, - KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, - TriggerRepo, TriggerRepoError, + collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, + DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, + TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError, }; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState};