feat(v1.1.2-docs): manager-core docs service + repo + query DSL parser

DocsServiceImpl mirrors KvServiceImpl's script-as-gate authz pattern,
the empty-collection rejection, and the best-effort emitter call —
adding "data must be a JSON object" validation, NotFound on update of
a missing doc, and prev_data plumbing via repo.update returning the
prior data.

PostgresDocsRepo handles CRUD against the docs table. The find path
runs through the v1.1.2 query DSL parser (docs_filter::parse_filter)
before building parameterised SQL via sqlx::QueryBuilder:

  * Every field-path segment + comparison value is bound as $N.
  * jsonb_extract_path_text(data, $N1, $N2, ...) handles variable
    depth without segment interpolation.
  * Base WHERE is fixed: WHERE app_id = $1 AND collection = $2.
    Filter conditions can only narrow, never widen. Load-bearing
    test in sql_shape_tests pins this prefix on every emitted query
    + asserts no user string ever lands in the SQL text.
  * $ne uses IS DISTINCT FROM (not <>) so missing paths + JSON nulls
    are correctly included.
  * $in binds the value list as TEXT[] via = ANY($N::text[]).
  * $sort always appends a ", id ASC" tiebreaker for stable cursor
    pagination semantics; $limit is clamped to MAX_FIND_LIMIT.

docs_filter is the AST + parser for the DSL. Operator allowlist is
explicit; any non-v1.1.2 operator throws UnsupportedOperator with a
v1.2 pointer. Snapshot tests pin the SDK-contract error strings so
changing them is a deliberate act.

Two new Capability variants — AppDocsRead and AppDocsWrite — map to
the existing Scope::ScriptRead and ScriptWrite per the seven-scope
commitment from v1.1.0. role_satisfies grants read at Viewer,
write at Editor (same trust shape as KV).

59 unit tests added across the three new files. All pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-06-02 19:55:14 +02:00
parent 3af8cc38c9
commit 06678f4496
5 changed files with 2071 additions and 6 deletions

View File

@@ -64,6 +64,14 @@ pub enum Capability {
/// Write entries to this app's KV store (v1.1.1). Granted to /// Write entries to this app's KV store (v1.1.1). Granted to
/// `editor`+. Maps to `script:write` on API keys. /// `editor`+. Maps to `script:write` on API keys.
AppKvWrite(AppId), AppKvWrite(AppId),
/// Read documents from this app's docs store (v1.1.2). Same trust
/// shape as KV read — granted to `viewer`+, maps to `script:read`
/// on API keys. Honors the seven-scope commitment.
AppDocsRead(AppId),
/// Write documents to this app's docs store (v1.1.2). Same trust
/// shape as KV write — granted to `editor`+, maps to
/// `script:write` on API keys.
AppDocsWrite(AppId),
/// Create / list / delete triggers for this app (v1.1.1). Maps to /// Create / list / delete triggers for this app (v1.1.1). Maps to
/// `app:admin` on API keys — triggers are app-configuration acts /// `app:admin` on API keys — triggers are app-configuration acts
/// rather than data-plane access. Granted to `app_admin`+. /// rather than data-plane access. Granted to `app_admin`+.
@@ -91,6 +99,8 @@ impl Capability {
| Self::AppLogRead(id) | Self::AppLogRead(id)
| Self::AppKvRead(id) | Self::AppKvRead(id)
| Self::AppKvWrite(id) | Self::AppKvWrite(id)
| Self::AppDocsRead(id)
| Self::AppDocsWrite(id)
| Self::AppManageTriggers(id) | Self::AppManageTriggers(id)
| Self::AppDeadLetterManage(id) => Some(id), | Self::AppDeadLetterManage(id) => Some(id),
} }
@@ -107,8 +117,10 @@ impl Capability {
Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => { Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => {
Scope::InstanceAdmin Scope::InstanceAdmin
} }
Self::AppRead(_) | Self::AppKvRead(_) => Scope::ScriptRead, Self::AppRead(_) | Self::AppKvRead(_) | Self::AppDocsRead(_) => Scope::ScriptRead,
Self::AppWriteScript(_) | Self::AppKvWrite(_) => Scope::ScriptWrite, Self::AppWriteScript(_) | Self::AppKvWrite(_) | Self::AppDocsWrite(_) => {
Scope::ScriptWrite
}
Self::AppWriteRoute(_) => Scope::RouteWrite, Self::AppWriteRoute(_) => Scope::RouteWrite,
Self::AppManageDomains(_) => Scope::DomainManage, Self::AppManageDomains(_) => Scope::DomainManage,
Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => { Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => {
@@ -253,7 +265,10 @@ async fn member_grants(
const fn role_satisfies(role: AppRole, cap: Capability) -> bool { const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
let in_viewer = matches!( let in_viewer = matches!(
cap, cap,
Capability::AppRead(_) | Capability::AppLogRead(_) | Capability::AppKvRead(_) Capability::AppRead(_)
| Capability::AppLogRead(_)
| Capability::AppKvRead(_)
| Capability::AppDocsRead(_)
); );
let in_editor = in_viewer let in_editor = in_viewer
|| matches!( || matches!(
@@ -261,6 +276,7 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
Capability::AppWriteScript(_) Capability::AppWriteScript(_)
| Capability::AppWriteRoute(_) | Capability::AppWriteRoute(_)
| Capability::AppKvWrite(_) | Capability::AppKvWrite(_)
| Capability::AppDocsWrite(_)
); );
let in_app_admin = in_editor let in_app_admin = in_editor
|| matches!( || matches!(

View File

@@ -0,0 +1,598 @@
//! v1.1.2 query DSL parser + AST for `docs::find` / `docs::find_one`.
//!
//! Sets the precedent v1.2's `dead_letters::list` will follow (see
//! `docs/v1.1.x-design-notes.md` §4 #13). When that lands we promote
//! this module to `picloud-shared` and rename to
//! `picloud_shared::query::{Filter, FieldPath, ComparisonOp}`; until
//! then keeping it private to manager-core avoids over-engineering.
//!
//! Parse stage is deliberately strict: any unrecognized `$xxx`
//! operator surfaces as `FilterParseError::UnsupportedOperator` with
//! a script-visible message naming the offending key + pointing at
//! v1.2. The error strings become part of the SDK contract once
//! scripts depend on them; pin them with snapshot tests in the test
//! module below before changing.
//!
//! ## DSL surface (v1.1.2 subset)
//!
//! ```rhai
//! // implicit equality (top-level)
//! users.find(#{ tier: "gold", status: "active" })
//!
//! // operator object on a field
//! users.find(#{ created_at: #{ "$gt": "2026-01-01T00:00:00Z" } })
//!
//! // dotted paths (max 5 segments)
//! users.find(#{ "user.email": "a@b" })
//!
//! // sort + limit as filter modifiers
//! users.find(#{ tier: "gold", "$sort": #{ created_at: -1 }, "$limit": 10 })
//! ```
//!
//! ## Out of scope (v1.2)
//!
//! `$or`, `$and`, `$not`, `$exists`, `$regex`, `$type`, `$size`,
//! `$all`, `$elemMatch`, multi-field sort, projection, aggregations.
use serde_json::Value;
/// Maximum nesting depth for dotted field paths. `"a.b.c.d.e"` is the
/// deepest path allowed (5 segments). Deeper paths reject at parse
/// time with `InvalidFilter` — prevents pathological JSONB navigation
/// chains from a script.
pub const MAX_FIELD_PATH_DEPTH: usize = 5;
/// Hard cap on `$limit` values — script-side limits are silently
/// clamped here so the Postgres query is always bounded. Mirrors the
/// `find` repo's own internal cap.
pub const MAX_FIND_LIMIT: u32 = 1_000;
/// Parsed `docs::find` filter.
#[derive(Debug, Clone, PartialEq)]
pub struct DocsFilter {
pub conditions: Vec<FieldCondition>,
pub sort: Option<Sort>,
pub limit: Option<u32>,
}
impl DocsFilter {
/// Empty filter — matches every document in the collection.
#[must_use]
pub const fn empty() -> Self {
Self {
conditions: Vec::new(),
sort: None,
limit: None,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct FieldCondition {
pub path: FieldPath,
pub op: ComparisonOp,
pub value: Value,
}
/// Validated dotted path. Construct only via `FieldPath::parse` so the
/// segment invariants (non-empty, no `..`, no `$` prefix, depth ≤ 5)
/// are guaranteed.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FieldPath {
segments: Vec<String>,
}
impl FieldPath {
/// Parse a dotted path from a JSON object key.
pub fn parse(raw: &str) -> Result<Self, FilterParseError> {
if raw.is_empty() {
return Err(FilterParseError::InvalidFilter(
"docs::find: field path must not be empty".into(),
));
}
let segments: Vec<&str> = raw.split('.').collect();
if segments.len() > MAX_FIELD_PATH_DEPTH {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: field path '{raw}' exceeds max depth {MAX_FIELD_PATH_DEPTH}"
)));
}
for seg in &segments {
if seg.is_empty() {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: field path '{raw}' has an empty segment (leading/trailing dot or '..')"
)));
}
if seg.starts_with('$') {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: field path segment '{seg}' must not start with '$'"
)));
}
}
Ok(Self {
segments: segments.into_iter().map(ToString::to_string).collect(),
})
}
/// Path segments in order. The Postgres impl binds each as a
/// separate text parameter to `jsonb_extract_path_text`, so no
/// segment ever appears in the SQL string verbatim.
#[must_use]
pub fn segments(&self) -> &[String] {
&self.segments
}
/// Display form for error messages — joined back with `.`.
#[must_use]
pub fn as_str(&self) -> String {
self.segments.join(".")
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ComparisonOp {
/// Implicit equality at top level OR explicit `$eq`. Maps to
/// `jsonb_extract_path_text(...) = $M`.
Eq,
/// `$ne` — uses Postgres `IS DISTINCT FROM` so JSON nulls and
/// missing paths are correctly included (`<>` returns NULL on
/// either operand being NULL, which would silently exclude rows
/// the user expects to see).
Ne,
/// `$gt` / `$gte` / `$lt` / `$lte` — text-lex comparison per the
/// brief's contract. Known limitation: lex breaks across
/// digit-count boundaries (`'10' < '9'` is TRUE). Documented in
/// CHANGELOG; v1.2 advanced query will add numeric-aware
/// operators.
Gt,
Gte,
Lt,
Lte,
/// `$in` — `= ANY($M::text[])` where the value list is bound as
/// a TEXT[].
In,
}
impl ComparisonOp {
/// Decode an operator key like `"$gt"`. Returns `None` for any
/// non-`$` key; returns `Some(Err(...))` for `$`-prefixed keys
/// not in the v1.1.2 allowlist (caller surfaces the
/// UnsupportedOperator error).
fn from_dollar_key(key: &str) -> Option<Result<Self, FilterParseError>> {
if !key.starts_with('$') {
return None;
}
Some(match key {
"$eq" => Ok(Self::Eq),
"$ne" => Ok(Self::Ne),
"$gt" => Ok(Self::Gt),
"$gte" => Ok(Self::Gte),
"$lt" => Ok(Self::Lt),
"$lte" => Ok(Self::Lte),
"$in" => Ok(Self::In),
other => Err(FilterParseError::UnsupportedOperator(format!(
"docs::find: operator '{other}' is not supported in v1.1.2; planned for v1.2 advanced query"
))),
})
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct Sort {
pub path: FieldPath,
pub direction: SortDir,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortDir {
Asc,
Desc,
}
#[derive(Debug, thiserror::Error)]
pub enum FilterParseError {
/// Bad path syntax, malformed operator value, multi-field sort,
/// etc. The string is the script-visible message.
#[error("{0}")]
InvalidFilter(String),
/// Filter used an operator not in the v1.1.2 allowlist. The
/// string includes the offending operator + v1.2 pointer.
#[error("{0}")]
UnsupportedOperator(String),
}
/// Parse a `serde_json::Value` filter into `DocsFilter`. The bridge
/// converts the script's Rhai map into a `Value` via
/// `executor-core::sdk::bridge::dynamic_to_json` and passes it through
/// `DocsService::find`; the service calls this parser before touching
/// the repo.
pub fn parse_filter(filter: &Value) -> Result<DocsFilter, FilterParseError> {
let obj = filter.as_object().ok_or_else(|| {
FilterParseError::InvalidFilter("docs::find: filter must be a map/object".into())
})?;
let mut out = DocsFilter::empty();
for (key, value) in obj {
if let Some(stripped) = key.strip_prefix('$') {
// Top-level modifier — `$sort` / `$limit`. Any other
// dollar-key at top level is unsupported.
match stripped {
"sort" => out.sort = Some(parse_sort(value)?),
"limit" => out.limit = Some(parse_limit(value)?),
other => {
return Err(FilterParseError::UnsupportedOperator(format!(
"docs::find: top-level modifier '${other}' is not supported in v1.1.2; planned for v1.2 advanced query"
)));
}
}
continue;
}
// Field path → either implicit equality OR operator-object.
let path = FieldPath::parse(key)?;
match value {
Value::Object(inner) if is_operator_object(inner) => {
for (op_key, op_val) in inner {
let Some(op_res) = ComparisonOp::from_dollar_key(op_key) else {
// This shouldn't trigger — is_operator_object
// already guarantees every key is $-prefixed.
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: operator object for '{}' has non-$ key '{op_key}'",
path.as_str()
)));
};
let op = op_res?;
validate_op_value(op, op_val, &path)?;
out.conditions.push(FieldCondition {
path: path.clone(),
op,
value: op_val.clone(),
});
}
}
// Any non-object value is implicit equality.
// (Object values with non-$ keys are user data, not an
// operator object — reject so the user doesn't accidentally
// match against a literal `{ name: "Alice" }` shape that
// would never compare meaningfully under JSONB text.)
Value::Object(_) => {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: value for '{}' must be a scalar (implicit equality) or an operator map (keys starting with '$')",
path.as_str()
)));
}
_ => {
out.conditions.push(FieldCondition {
path,
op: ComparisonOp::Eq,
value: value.clone(),
});
}
}
}
Ok(out)
}
/// True when every key in the map starts with `$`. Mixed-shape maps
/// (some `$key`, some user-data key) are rejected to avoid silent
/// surprise — the user almost certainly meant an operator object.
fn is_operator_object(map: &serde_json::Map<String, Value>) -> bool {
!map.is_empty() && map.keys().all(|k| k.starts_with('$'))
}
fn validate_op_value(
op: ComparisonOp,
value: &Value,
path: &FieldPath,
) -> Result<(), FilterParseError> {
match op {
ComparisonOp::In => {
if !value.is_array() {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: '$in' on '{}' requires an array value",
path.as_str()
)));
}
}
_ => {
// For the scalar-comparison ops, the value must be a JSON
// scalar (no arrays / no nested objects). JSON null is
// allowed — `$ne` against null is a valid query.
if value.is_array() || value.is_object() {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: '{op_name}' on '{path}' requires a scalar value",
op_name = op_name(op),
path = path.as_str()
)));
}
}
}
Ok(())
}
const fn op_name(op: ComparisonOp) -> &'static str {
match op {
ComparisonOp::Eq => "$eq",
ComparisonOp::Ne => "$ne",
ComparisonOp::Gt => "$gt",
ComparisonOp::Gte => "$gte",
ComparisonOp::Lt => "$lt",
ComparisonOp::Lte => "$lte",
ComparisonOp::In => "$in",
}
}
fn parse_sort(value: &Value) -> Result<Sort, FilterParseError> {
let map = value.as_object().ok_or_else(|| {
FilterParseError::InvalidFilter("docs::find: '$sort' must be a map".into())
})?;
if map.is_empty() {
return Err(FilterParseError::InvalidFilter(
"docs::find: '$sort' must name at least one field".into(),
));
}
if map.len() > 1 {
return Err(FilterParseError::InvalidFilter(
"docs::find: multi-field '$sort' is not supported in v1.1.2; planned for v1.2 advanced query"
.into(),
));
}
let (field, dir_val) = map.iter().next().unwrap();
let path = FieldPath::parse(field)?;
let direction = match dir_val.as_i64() {
Some(1) => SortDir::Asc,
Some(-1) => SortDir::Desc,
_ => {
return Err(FilterParseError::InvalidFilter(format!(
"docs::find: '$sort' direction for '{field}' must be 1 (ascending) or -1 (descending)"
)));
}
};
Ok(Sort { path, direction })
}
fn parse_limit(value: &Value) -> Result<u32, FilterParseError> {
let n = value.as_i64().ok_or_else(|| {
FilterParseError::InvalidFilter("docs::find: '$limit' must be an integer".into())
})?;
if n < 0 {
return Err(FilterParseError::InvalidFilter(
"docs::find: '$limit' must be non-negative".into(),
));
}
Ok(u32::try_from(n)
.unwrap_or(MAX_FIND_LIMIT)
.min(MAX_FIND_LIMIT))
}
// ----------------------------------------------------------------------------
// Tests — error messages are part of the SDK contract once scripts
// depend on them; the snapshot-style asserts pin the exact strings.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn parse(v: Value) -> Result<DocsFilter, FilterParseError> {
parse_filter(&v)
}
#[test]
fn empty_object_has_no_conditions() {
let f = parse(json!({})).unwrap();
assert!(f.conditions.is_empty());
assert!(f.sort.is_none());
assert!(f.limit.is_none());
}
#[test]
fn single_equality_top_level() {
let f = parse(json!({ "tier": "gold" })).unwrap();
assert_eq!(f.conditions.len(), 1);
assert_eq!(f.conditions[0].path.segments(), &["tier".to_string()]);
assert_eq!(f.conditions[0].op, ComparisonOp::Eq);
assert_eq!(f.conditions[0].value, json!("gold"));
}
#[test]
fn multi_field_equality_is_conjunctive() {
let f = parse(json!({ "tier": "gold", "status": "active" })).unwrap();
assert_eq!(f.conditions.len(), 2);
}
#[test]
fn nested_dotted_path() {
let f = parse(json!({ "user.email": "a@b" })).unwrap();
let cond = &f.conditions[0];
assert_eq!(
cond.path.segments(),
&["user".to_string(), "email".to_string()]
);
}
#[test]
fn depth_limit_rejects_six_segments() {
let err = parse(json!({ "a.b.c.d.e.f": "x" })).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("exceeds max depth"), "msg: {msg}");
assert!(msg.contains('5'), "msg: {msg}");
}
#[test]
fn double_dot_rejected() {
let err = parse(json!({ "a..b": "x" })).unwrap_err();
assert!(err.to_string().contains("empty segment"));
}
#[test]
fn leading_dot_rejected() {
let err = parse(json!({ ".a": "x" })).unwrap_err();
assert!(err.to_string().contains("empty segment"));
}
#[test]
fn trailing_dot_rejected() {
let err = parse(json!({ "a.": "x" })).unwrap_err();
assert!(err.to_string().contains("empty segment"));
}
#[test]
fn dollar_prefix_in_path_segment_rejected() {
// (The top-level $foo would route to operator dispatch; this
// tests deeper segments which should never start with $.)
let err = parse(json!({ "x.$inner": "v" })).unwrap_err();
assert!(err.to_string().contains("must not start with '$'"));
}
#[test]
fn each_supported_operator_parses() {
for (key, expected_op) in [
("$eq", ComparisonOp::Eq),
("$ne", ComparisonOp::Ne),
("$gt", ComparisonOp::Gt),
("$gte", ComparisonOp::Gte),
("$lt", ComparisonOp::Lt),
("$lte", ComparisonOp::Lte),
] {
let v = json!({ "field": { key: "v" } });
let f = parse(v).unwrap();
assert_eq!(f.conditions[0].op, expected_op, "key {key}");
}
// $in needs an array.
let f = parse(json!({ "tier": { "$in": ["gold", "platinum"] } })).unwrap();
assert_eq!(f.conditions[0].op, ComparisonOp::In);
}
#[test]
fn dollar_in_with_non_array_value_rejected() {
let err = parse(json!({ "tier": { "$in": "gold" } })).unwrap_err();
assert!(err.to_string().contains("'$in'"));
assert!(err.to_string().contains("array"));
}
#[test]
fn scalar_op_with_object_value_rejected() {
let err = parse(json!({ "tier": { "$gt": { "nested": true } } })).unwrap_err();
assert!(err.to_string().contains("'$gt'"));
assert!(err.to_string().contains("scalar"));
}
/// Snapshot: the v1.2-deferred operator error string is part of
/// the SDK contract. Don't change it without a major-version bump.
#[test]
fn unsupported_operator_message_pins_v1_2_pointer() {
let err = parse(json!({ "name": { "$regex": "^A" } })).unwrap_err();
assert_eq!(
err.to_string(),
"docs::find: operator '$regex' is not supported in v1.1.2; planned for v1.2 advanced query"
);
}
#[test]
fn unsupported_top_level_modifier_rejected() {
let err = parse(json!({ "$or": [{ "x": 1 }] })).unwrap_err();
assert!(err.to_string().contains("'$or'"));
assert!(err.to_string().contains("v1.2"));
}
/// Snapshot: depth-limit error string. Pinned per the SDK contract.
#[test]
fn depth_limit_message_pinned() {
let err = parse(json!({ "a.b.c.d.e.f": 1 })).unwrap_err();
assert_eq!(
err.to_string(),
"docs::find: field path 'a.b.c.d.e.f' exceeds max depth 5"
);
}
#[test]
fn mixed_shape_operator_object_rejected() {
// Object value where some keys are $-prefixed and some aren't
// — treated as user data + invalid (the user almost certainly
// meant an operator object).
let err = parse(json!({ "x": { "$gt": 1, "other": 2 } })).unwrap_err();
assert!(err
.to_string()
.contains("scalar (implicit equality) or an operator map"));
}
#[test]
fn sort_asc_and_desc_parse() {
let f = parse(json!({ "$sort": { "created_at": 1 } })).unwrap();
let sort = f.sort.unwrap();
assert_eq!(sort.direction, SortDir::Asc);
assert_eq!(sort.path.segments(), &["created_at".to_string()]);
let f = parse(json!({ "$sort": { "created_at": -1 } })).unwrap();
assert_eq!(f.sort.unwrap().direction, SortDir::Desc);
}
#[test]
fn sort_with_bad_direction_rejected() {
let err = parse(json!({ "$sort": { "x": 2 } })).unwrap_err();
assert!(err.to_string().contains("1 (ascending)"));
}
/// Snapshot: multi-field sort error string. Pinned.
#[test]
fn multi_field_sort_rejected_with_v1_2_pointer() {
let err = parse(json!({ "$sort": { "a": 1, "b": -1 } })).unwrap_err();
assert_eq!(
err.to_string(),
"docs::find: multi-field '$sort' is not supported in v1.1.2; planned for v1.2 advanced query"
);
}
#[test]
fn limit_accepts_non_negative_integer() {
let f = parse(json!({ "$limit": 50 })).unwrap();
assert_eq!(f.limit, Some(50));
}
#[test]
fn limit_clamps_to_max() {
let f = parse(json!({ "$limit": 10_000 })).unwrap();
assert_eq!(f.limit, Some(MAX_FIND_LIMIT));
}
#[test]
fn limit_rejects_negative() {
let err = parse(json!({ "$limit": -1 })).unwrap_err();
assert!(err.to_string().contains("non-negative"));
}
#[test]
fn limit_rejects_non_integer() {
let err = parse(json!({ "$limit": "twenty" })).unwrap_err();
assert!(err.to_string().contains("integer"));
}
#[test]
fn non_object_filter_rejected() {
let err = parse(json!("not a map")).unwrap_err();
assert!(err.to_string().contains("filter must be a map/object"));
}
#[test]
fn dollar_eq_value_can_be_null() {
// $ne against null is a valid query (returns docs where field
// exists and is not null OR is missing) — so null must be an
// accepted scalar.
let f = parse(json!({ "deleted_at": { "$ne": null } })).unwrap();
assert_eq!(f.conditions[0].op, ComparisonOp::Ne);
assert_eq!(f.conditions[0].value, Value::Null);
}
#[test]
fn implicit_equality_with_array_value_accepts() {
// `{ "tags": ["a", "b"] }` is implicit equality against the
// literal array shape. The Postgres query will compare the
// text encoding under JSONB; this is valid v1.1.2.
let f = parse(json!({ "tags": ["a", "b"] })).unwrap();
assert_eq!(f.conditions[0].op, ComparisonOp::Eq);
}
}

View File

@@ -0,0 +1,556 @@
//! Low-level Postgres CRUD + filter-query builder over the `docs`
//! table (migration 0013). Stays storage-only; authorization, event
//! emission, and empty-collection validation live one layer up in
//! `DocsServiceImpl`.
//!
//! The `find` SQL builder is the security-critical surface. **Every
//! field-path segment and every comparison value is bound as a
//! `$N` parameter — never interpolated into the SQL string.** The base
//! `WHERE app_id = $1 AND collection = $2` clause is fixed and
//! prepended to every query so cross-app isolation can't be widened by
//! any operator. See `sql_starts_with_app_collection_predicate`
//! assertion in tests for the load-bearing guarantee.
use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use chrono::{DateTime, Utc};
use picloud_shared::{AppId, DocId, DocRow, DocsListPage};
use serde_json::Value;
use sqlx::postgres::PgRow;
use sqlx::{PgPool, Postgres, QueryBuilder, Row};
use uuid::Uuid;
use crate::docs_filter::{ComparisonOp, DocsFilter, SortDir};
#[derive(Debug, thiserror::Error)]
pub enum DocsRepoError {
#[error("database error: {0}")]
Db(#[from] sqlx::Error),
#[error("invalid pagination cursor")]
InvalidCursor,
}
/// Repo surface. The trait is exposed so the service unit tests can
/// substitute an in-memory backing without spinning up Postgres.
#[async_trait]
pub trait DocsRepo: Send + Sync {
/// Create a new doc with a server-generated UUID. Returns the
/// fully-materialised `DocRow` so the caller has timestamps too
/// (no separate select-back round-trip).
async fn create(
&self,
app_id: AppId,
collection: &str,
data: Value,
) -> Result<DocRow, DocsRepoError>;
async fn get(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<DocRow>, DocsRepoError>;
/// Filter-based query. The parsed `DocsFilter` ensures every
/// field-path segment and operator value is bound as a parameter.
async fn find(
&self,
app_id: AppId,
collection: &str,
filter: &DocsFilter,
) -> Result<Vec<DocRow>, DocsRepoError>;
/// Full document replace. Returns `Some(previous_data)` on
/// success, `None` if no doc matched (the service maps that to
/// `DocsError::NotFound`). The prev value is the input to the
/// emitted update event's `old_payload`.
async fn update(
&self,
app_id: AppId,
collection: &str,
id: DocId,
data: Value,
) -> Result<Option<Value>, DocsRepoError>;
/// Returns the deleted doc's data if it existed, `None` if no
/// such doc. The caller converts `Some` → `Ok(true)` for the SDK's
/// was-present return; the `Value` feeds the delete event's
/// `old_payload`.
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<Value>, DocsRepoError>;
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<DocsListPage, DocsRepoError>;
}
pub struct PostgresDocsRepo {
pool: PgPool,
}
impl PostgresDocsRepo {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
/// Hard ceiling on `list` page size — mirrors KV's `KV_LIST_MAX_LIMIT`.
/// Scripts that pass anything larger get silently clamped.
const DOCS_LIST_MAX_LIMIT: u32 = 1_000;
const DOCS_LIST_DEFAULT_LIMIT: u32 = 100;
#[async_trait]
impl DocsRepo for PostgresDocsRepo {
async fn create(
&self,
app_id: AppId,
collection: &str,
data: Value,
) -> Result<DocRow, DocsRepoError> {
let id = Uuid::new_v4();
let row: (DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
"INSERT INTO docs (app_id, collection, id, data) \
VALUES ($1, $2, $3, $4) \
RETURNING created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.bind(&data)
.fetch_one(&self.pool)
.await?;
Ok(DocRow {
id,
data,
created_at: row.0,
updated_at: row.1,
})
}
async fn get(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<DocRow>, DocsRepoError> {
let row: Option<(Value, DateTime<Utc>, DateTime<Utc>)> = sqlx::query_as(
"SELECT data, created_at, updated_at FROM docs \
WHERE app_id = $1 AND collection = $2 AND id = $3",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(data, created_at, updated_at)| DocRow {
id,
data,
created_at,
updated_at,
}))
}
async fn find(
&self,
app_id: AppId,
collection: &str,
filter: &DocsFilter,
) -> Result<Vec<DocRow>, DocsRepoError> {
let mut qb = build_find_query(app_id, collection, filter);
let rows = qb.build().fetch_all(&self.pool).await?;
rows.into_iter().map(row_to_doc).collect()
}
async fn update(
&self,
app_id: AppId,
collection: &str,
id: DocId,
data: Value,
) -> Result<Option<Value>, DocsRepoError> {
// Same CTE shape as KV's set ([kv_repo.rs:101-132]): SELECT the
// previous data before the UPDATE so the service can emit
// `prev_data` in the update ServiceEvent. Single statement, no
// explicit transaction. Inherits KV's last-writer-wins race
// under concurrent writers; documented as a known limitation
// for v1.1.2.
let row: Option<(Option<Value>,)> = sqlx::query_as(
"WITH prev AS ( \
SELECT data FROM docs \
WHERE app_id = $1 AND collection = $2 AND id = $3 \
), \
updated AS ( \
UPDATE docs SET data = $4, updated_at = NOW() \
WHERE app_id = $1 AND collection = $2 AND id = $3 \
RETURNING 1 \
) \
SELECT (SELECT data FROM prev) FROM updated",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.bind(&data)
.fetch_optional(&self.pool)
.await?;
// `row` is None when the UPDATE matched no rows (missing doc);
// Some((Some(prev),)) on success. `data` is JSONB NOT NULL so
// the inner Option is always Some when prev exists.
Ok(row.and_then(|(v,)| v))
}
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<Value>, DocsRepoError> {
let row: Option<(Value,)> = sqlx::query_as(
"DELETE FROM docs \
WHERE app_id = $1 AND collection = $2 AND id = $3 \
RETURNING data",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(v,)| v))
}
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<DocsListPage, DocsRepoError> {
let limit = if limit == 0 {
DOCS_LIST_DEFAULT_LIMIT
} else {
limit.min(DOCS_LIST_MAX_LIMIT)
};
let last_id = match cursor {
Some(c) => Some(decode_cursor(c)?),
None => None,
};
let take = i64::from(limit) + 1;
let rows: Vec<(Uuid, Value, DateTime<Utc>, DateTime<Utc>)> = sqlx::query_as(
"SELECT id, data, created_at, updated_at FROM docs \
WHERE app_id = $1 AND collection = $2 \
AND ($3::uuid IS NULL OR id > $3) \
ORDER BY id ASC \
LIMIT $4",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(last_id)
.bind(take)
.fetch_all(&self.pool)
.await?;
let mut docs: Vec<DocRow> = rows
.into_iter()
.map(|(id, data, created_at, updated_at)| DocRow {
id,
data,
created_at,
updated_at,
})
.collect();
let next_cursor = if docs.len() > limit as usize {
docs.truncate(limit as usize);
docs.last().map(|d| encode_cursor(&d.id))
} else {
None
};
Ok(DocsListPage { docs, next_cursor })
}
}
fn row_to_doc(row: PgRow) -> Result<DocRow, DocsRepoError> {
Ok(DocRow {
id: row.try_get("id")?,
data: row.try_get("data")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
fn encode_cursor(last_id: &Uuid) -> String {
URL_SAFE_NO_PAD.encode(last_id.as_bytes())
}
fn decode_cursor(cursor: &str) -> Result<Uuid, DocsRepoError> {
let bytes = URL_SAFE_NO_PAD
.decode(cursor)
.map_err(|_| DocsRepoError::InvalidCursor)?;
let arr: [u8; 16] = bytes
.as_slice()
.try_into()
.map_err(|_| DocsRepoError::InvalidCursor)?;
Ok(Uuid::from_bytes(arr))
}
// ----------------------------------------------------------------------------
// SQL builder — the load-bearing security surface.
//
// Every field-path segment + every comparison value goes through
// `QueryBuilder::push_bind`, which appends `$N` to the SQL string and
// binds the value as a parameter. The only literal strings appended to
// the SQL are: hardcoded SQL fragments (SELECT/WHERE/AND/etc.) and
// hardcoded operator strings ("=", "IS DISTINCT FROM", ">", "ASC", …).
// **No user input ever lands in the SQL text unparameterized.**
// ----------------------------------------------------------------------------
fn build_find_query<'a>(
app_id: AppId,
collection: &'a str,
filter: &'a DocsFilter,
) -> QueryBuilder<'a, Postgres> {
let mut qb =
QueryBuilder::new("SELECT id, data, created_at, updated_at FROM docs WHERE app_id = ");
qb.push_bind(app_id.into_inner());
qb.push(" AND collection = ");
qb.push_bind(collection);
for cond in &filter.conditions {
qb.push(" AND ");
emit_condition(&mut qb, cond);
}
qb.push(" ORDER BY ");
if let Some(sort) = &filter.sort {
push_jsonb_path(&mut qb, sort.path.segments());
qb.push(match sort.direction {
SortDir::Asc => " ASC",
SortDir::Desc => " DESC",
});
qb.push(", id ASC");
} else {
qb.push("id ASC");
}
let limit = filter
.limit
.map_or(DOCS_LIST_MAX_LIMIT, |l| l.min(DOCS_LIST_MAX_LIMIT));
qb.push(" LIMIT ");
qb.push_bind(i64::from(limit));
qb
}
fn emit_condition<'a>(
qb: &mut QueryBuilder<'a, Postgres>,
cond: &'a crate::docs_filter::FieldCondition,
) {
push_jsonb_path(qb, cond.path.segments());
match cond.op {
ComparisonOp::Eq => {
if cond.value.is_null() {
qb.push(" IS NULL");
} else {
qb.push(" = ");
qb.push_bind(value_to_text(&cond.value));
}
}
ComparisonOp::Ne => {
// IS DISTINCT FROM correctly handles NULL on either side
// (would otherwise silently exclude rows with missing
// paths). Holds for the literal-NULL case too.
if cond.value.is_null() {
qb.push(" IS NOT NULL");
} else {
qb.push(" IS DISTINCT FROM ");
qb.push_bind(value_to_text(&cond.value));
}
}
ComparisonOp::Gt => {
qb.push(" > ");
qb.push_bind(value_to_text(&cond.value));
}
ComparisonOp::Gte => {
qb.push(" >= ");
qb.push_bind(value_to_text(&cond.value));
}
ComparisonOp::Lt => {
qb.push(" < ");
qb.push_bind(value_to_text(&cond.value));
}
ComparisonOp::Lte => {
qb.push(" <= ");
qb.push_bind(value_to_text(&cond.value));
}
ComparisonOp::In => {
qb.push(" = ANY(");
let texts: Vec<Option<String>> = cond
.value
.as_array()
.map(|arr| arr.iter().map(value_to_text).collect())
.unwrap_or_default();
qb.push_bind(texts);
qb.push(")");
}
}
}
/// Append `jsonb_extract_path_text(data, $N1, $N2, …)` with each
/// segment bound as a separate text parameter. Variadic path lengths
/// (15) all flow through this single helper.
fn push_jsonb_path<'a>(qb: &mut QueryBuilder<'a, Postgres>, segments: &'a [String]) {
qb.push("jsonb_extract_path_text(data");
for seg in segments {
qb.push(", ");
qb.push_bind(seg.as_str());
}
qb.push(")");
}
/// JSON scalar → TEXT for binding. `Value::Null` is preserved as
/// `None` so the binding lands as SQL NULL (handled specially above for
/// `Eq` / `Ne`). Arrays + objects serialize to compact JSON; the user
/// is comparing against the JSONB text rendering, which is consistent
/// with `jsonb_extract_path_text`'s output for those types.
fn value_to_text(v: &Value) -> Option<String> {
match v {
Value::Null => None,
Value::String(s) => Some(s.clone()),
Value::Bool(b) => Some(b.to_string()),
Value::Number(n) => Some(n.to_string()),
Value::Array(_) | Value::Object(_) => Some(v.to_string()),
}
}
// ----------------------------------------------------------------------------
// SQL-shape guardrail tests — pure (no DB) so they run in the default
// test suite. These are the highest-stakes tests in the release: they
// pin the cross-app isolation invariant at the SQL level.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod sql_shape_tests {
use super::*;
use crate::docs_filter::parse_filter;
use serde_json::json;
fn sql_for(filter_json: serde_json::Value) -> String {
let filter = parse_filter(&filter_json).unwrap();
let qb = build_find_query(AppId::new(), "users", &filter);
qb.sql().to_string()
}
/// **Load-bearing**: every generated SELECT begins
/// `WHERE app_id = $1 AND collection = $2`. The app_id parameter
/// is the cross-app isolation gate. No user-supplied filter
/// fragment can ever appear before these clauses.
#[test]
fn every_query_starts_with_app_id_and_collection_predicate() {
let cases = vec![
json!({}),
json!({ "tier": "gold" }),
json!({ "created_at": { "$gt": "2026-01-01" } }),
json!({ "tier": { "$in": ["gold", "platinum"] } }),
json!({ "tier": "gold", "status": "active" }),
json!({ "$sort": { "created_at": -1 }, "$limit": 5 }),
json!({ "tier": "gold", "$sort": { "created_at": 1 } }),
json!({ "deleted_at": { "$ne": null } }),
];
for case in cases {
let sql = sql_for(case.clone());
assert!(
sql.starts_with(
"SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2"
),
"filter {case} produced SQL: {sql}"
);
}
}
/// Every comparison value lands as a `$N` placeholder — there
/// should be NO double-quoted string literal in the SQL after the
/// fixed prefix. (This guards against an accidental `format!`
/// regression.)
#[test]
fn no_user_string_literal_in_sql() {
let sql = sql_for(json!({ "tier": "gold; DROP TABLE docs;--" }));
assert!(!sql.contains("gold"), "value leaked into SQL string: {sql}");
assert!(!sql.contains("DROP"), "value leaked into SQL string: {sql}");
}
/// Field-path segments also bind as parameters. A user passing a
/// path that looks like SQL keywords doesn't change the structure.
#[test]
fn no_user_path_literal_in_sql() {
let sql = sql_for(json!({ "drop_table_users": "v" }));
assert!(
!sql.contains("drop_table_users"),
"path leaked into SQL string: {sql}"
);
}
#[test]
fn empty_filter_sql_has_no_extra_conditions() {
let sql = sql_for(json!({}));
// After the fixed prefix, only ORDER BY + LIMIT — no `AND`s.
let suffix = sql
.trim_start_matches(
"SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2",
)
.trim();
assert!(
suffix.starts_with("ORDER BY"),
"expected ORDER BY immediately after base WHERE; got: {suffix}"
);
}
#[test]
fn eq_with_null_emits_is_null() {
let sql = sql_for(json!({ "x": null }));
assert!(sql.contains("IS NULL"), "sql: {sql}");
}
#[test]
fn ne_with_null_emits_is_not_null() {
let sql = sql_for(json!({ "x": { "$ne": null } }));
assert!(sql.contains("IS NOT NULL"), "sql: {sql}");
}
#[test]
fn ne_with_value_uses_is_distinct_from() {
// IS DISTINCT FROM, NOT <> — see ComparisonOp::Ne comment.
let sql = sql_for(json!({ "x": { "$ne": "v" } }));
assert!(sql.contains("IS DISTINCT FROM"), "sql: {sql}");
assert!(!sql.contains(" <> "), "sql: {sql}");
}
#[test]
fn in_emits_any_array() {
let sql = sql_for(json!({ "x": { "$in": ["a", "b"] } }));
assert!(sql.contains("= ANY"), "sql: {sql}");
}
#[test]
fn sort_appends_tiebreaker_id_asc() {
let sql = sql_for(json!({ "$sort": { "created_at": -1 } }));
assert!(sql.contains("DESC, id ASC"), "sql: {sql}");
}
#[test]
fn jsonb_extract_path_used_for_field_access() {
let sql = sql_for(json!({ "user.email": "a@b" }));
assert!(sql.contains("jsonb_extract_path_text(data"), "sql: {sql}");
}
}

View File

@@ -0,0 +1,890 @@
//! `DocsServiceImpl` — wires the `DocsRepo` underneath the
//! `picloud_shared::DocsService` trait that scripts see via the Rhai
//! bridge.
//!
//! Layers added here (vs the raw repo):
//!
//! 1. Empty-collection rejection at the SDK boundary
//! (`docs/sdk-shape.md`).
//! 2. `data` must be a JSON object for create + update. (The repo
//! accepts anything serde_json can serialise; the SDK contract
//! pins documents to map shape so dotted-path queries make sense.)
//! 3. **Script-as-gate authz**: when `cx.principal.is_some()` we run
//! `authz::require(...)`; when it's `None` (public unauthenticated
//! HTTP — the common case for public routes) we skip the check.
//! Cross-app isolation isn't affected — every query is keyed by
//! `cx.app_id`, never an argument.
//! 4. Query DSL parse — `find`/`find_one` parse the opaque filter
//! into `DocsFilter` before passing it down. Parse errors map to
//! `DocsError::InvalidFilter` / `UnsupportedOperator` with the
//! parser's message verbatim (script-visible).
//! 5. `ServiceEvent` emission after each mutation (`create` / `update`
//! / `delete`). The outbox emitter (when wired) turns these into
//! docs-trigger fan-out via `OutboxEventEmitter::emit_docs`.
use std::sync::Arc;
use async_trait::async_trait;
use picloud_shared::{
DocId, DocRow, DocsError, DocsListPage, DocsService, SdkCallCx, ServiceEvent,
ServiceEventEmitter,
};
use crate::authz::{self, AuthzRepo, Capability};
use crate::docs_filter::{parse_filter, FilterParseError};
use crate::docs_repo::{DocsRepo, DocsRepoError};
pub struct DocsServiceImpl {
repo: Arc<dyn DocsRepo>,
authz: Arc<dyn AuthzRepo>,
events: Arc<dyn ServiceEventEmitter>,
}
impl DocsServiceImpl {
#[must_use]
pub fn new(
repo: Arc<dyn DocsRepo>,
authz: Arc<dyn AuthzRepo>,
events: Arc<dyn ServiceEventEmitter>,
) -> Self {
Self {
repo,
authz,
events,
}
}
async fn check_read(&self, cx: &SdkCallCx) -> Result<(), DocsError> {
if let Some(ref principal) = cx.principal {
authz::require(&*self.authz, principal, Capability::AppDocsRead(cx.app_id))
.await
.map_err(|_| DocsError::Forbidden)?;
}
Ok(())
}
async fn check_write(&self, cx: &SdkCallCx) -> Result<(), DocsError> {
if let Some(ref principal) = cx.principal {
authz::require(&*self.authz, principal, Capability::AppDocsWrite(cx.app_id))
.await
.map_err(|_| DocsError::Forbidden)?;
}
Ok(())
}
}
fn validate_collection(collection: &str) -> Result<(), DocsError> {
if collection.is_empty() {
return Err(DocsError::InvalidCollection);
}
Ok(())
}
fn validate_data(data: &serde_json::Value) -> Result<(), DocsError> {
if !data.is_object() {
return Err(DocsError::InvalidData);
}
Ok(())
}
impl From<DocsRepoError> for DocsError {
fn from(e: DocsRepoError) -> Self {
Self::Backend(e.to_string())
}
}
impl From<FilterParseError> for DocsError {
fn from(e: FilterParseError) -> Self {
match e {
FilterParseError::InvalidFilter(s) => Self::InvalidFilter(s),
FilterParseError::UnsupportedOperator(s) => Self::UnsupportedOperator(s),
}
}
}
#[async_trait]
impl DocsService for DocsServiceImpl {
async fn create(
&self,
cx: &SdkCallCx,
collection: &str,
data: serde_json::Value,
) -> Result<DocId, DocsError> {
validate_collection(collection)?;
validate_data(&data)?;
self.check_write(cx).await?;
let row = self
.repo
.create(cx.app_id, collection, data.clone())
.await?;
// Best-effort emit — a failed emit logs but does not roll back
// the write (mirrors KV's pattern).
if let Err(e) = self
.events
.emit(
cx,
ServiceEvent {
source: "docs",
op: "create",
collection: Some(collection.to_string()),
key: Some(row.id.to_string()),
payload: Some(data),
old_payload: None,
},
)
.await
{
tracing::warn!(error = %e, source = "docs", op = "create", "event emit failed");
}
Ok(row.id)
}
async fn get(
&self,
cx: &SdkCallCx,
collection: &str,
id: DocId,
) -> Result<Option<DocRow>, DocsError> {
validate_collection(collection)?;
self.check_read(cx).await?;
Ok(self.repo.get(cx.app_id, collection, id).await?)
}
async fn find(
&self,
cx: &SdkCallCx,
collection: &str,
filter: serde_json::Value,
) -> Result<Vec<DocRow>, DocsError> {
validate_collection(collection)?;
self.check_read(cx).await?;
let parsed = parse_filter(&filter)?;
Ok(self.repo.find(cx.app_id, collection, &parsed).await?)
}
async fn find_one(
&self,
cx: &SdkCallCx,
collection: &str,
filter: serde_json::Value,
) -> Result<Option<DocRow>, DocsError> {
validate_collection(collection)?;
self.check_read(cx).await?;
let mut parsed = parse_filter(&filter)?;
// Inject the implicit `LIMIT 1` for find_one — explicit
// caller-supplied `$limit` wins.
if parsed.limit.is_none() {
parsed.limit = Some(1);
}
let rows = self.repo.find(cx.app_id, collection, &parsed).await?;
Ok(rows.into_iter().next())
}
async fn update(
&self,
cx: &SdkCallCx,
collection: &str,
id: DocId,
data: serde_json::Value,
) -> Result<(), DocsError> {
validate_collection(collection)?;
validate_data(&data)?;
self.check_write(cx).await?;
let previous = self
.repo
.update(cx.app_id, collection, id, data.clone())
.await?;
match previous {
Some(prev) => {
if let Err(e) = self
.events
.emit(
cx,
ServiceEvent {
source: "docs",
op: "update",
collection: Some(collection.to_string()),
key: Some(id.to_string()),
payload: Some(data),
old_payload: Some(prev),
},
)
.await
{
tracing::warn!(error = %e, source = "docs", op = "update", "event emit failed");
}
Ok(())
}
None => Err(DocsError::NotFound),
}
}
async fn delete(&self, cx: &SdkCallCx, collection: &str, id: DocId) -> Result<bool, DocsError> {
validate_collection(collection)?;
self.check_write(cx).await?;
let previous = self.repo.delete(cx.app_id, collection, id).await?;
let was_present = previous.is_some();
if let Some(prev) = previous {
if let Err(e) = self
.events
.emit(
cx,
ServiceEvent {
source: "docs",
op: "delete",
collection: Some(collection.to_string()),
key: Some(id.to_string()),
payload: None,
old_payload: Some(prev),
},
)
.await
{
tracing::warn!(error = %e, source = "docs", op = "delete", "event emit failed");
}
}
Ok(was_present)
}
async fn list(
&self,
cx: &SdkCallCx,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<DocsListPage, DocsError> {
validate_collection(collection)?;
self.check_read(cx).await?;
Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?)
}
}
// ----------------------------------------------------------------------------
// Tests — in-memory DocsRepo so unit tests don't need Postgres.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::authz::{AuthzError, AuthzRepo};
use crate::docs_filter::DocsFilter;
use async_trait::async_trait;
use chrono::Utc;
use picloud_shared::{
AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal,
RequestId, UserId,
};
use serde_json::json;
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use uuid::Uuid;
/// In-memory backing: BTreeMap keyed by `(app_id, collection, id)`
/// so iteration is naturally ordered for stable cursor pagination
/// (matches the Postgres `ORDER BY id ASC`).
#[derive(Default)]
struct InMemoryDocsRepo {
data: Mutex<BTreeMap<(AppId, String, DocId), DocRow>>,
}
#[async_trait]
impl DocsRepo for InMemoryDocsRepo {
async fn create(
&self,
app_id: AppId,
collection: &str,
data: serde_json::Value,
) -> Result<DocRow, DocsRepoError> {
let id = Uuid::new_v4();
let now = Utc::now();
let row = DocRow {
id,
data,
created_at: now,
updated_at: now,
};
self.data
.lock()
.await
.insert((app_id, collection.to_string(), id), row.clone());
Ok(row)
}
async fn get(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<DocRow>, DocsRepoError> {
Ok(self
.data
.lock()
.await
.get(&(app_id, collection.to_string(), id))
.cloned())
}
async fn find(
&self,
app_id: AppId,
collection: &str,
filter: &DocsFilter,
) -> Result<Vec<DocRow>, DocsRepoError> {
let map = self.data.lock().await;
let mut out: Vec<DocRow> = map
.iter()
.filter(|((a, c, _), _)| *a == app_id && c == collection)
.map(|(_, v)| v.clone())
.filter(|row| in_memory_matches(row, filter))
.collect();
if let Some(sort) = &filter.sort {
let path = sort.path.segments().to_vec();
let dir = sort.direction;
out.sort_by(|a, b| {
let av = extract_path_str(&a.data, &path);
let bv = extract_path_str(&b.data, &path);
let ord = av.cmp(&bv);
match dir {
crate::docs_filter::SortDir::Asc => ord,
crate::docs_filter::SortDir::Desc => ord.reverse(),
}
});
} else {
out.sort_by_key(|d| d.id);
}
if let Some(limit) = filter.limit {
out.truncate(limit as usize);
}
Ok(out)
}
async fn update(
&self,
app_id: AppId,
collection: &str,
id: DocId,
data: serde_json::Value,
) -> Result<Option<serde_json::Value>, DocsRepoError> {
let mut map = self.data.lock().await;
let key = (app_id, collection.to_string(), id);
let Some(existing) = map.get_mut(&key) else {
return Ok(None);
};
let prev = std::mem::replace(&mut existing.data, data);
existing.updated_at = Utc::now();
Ok(Some(prev))
}
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: DocId,
) -> Result<Option<serde_json::Value>, DocsRepoError> {
Ok(self
.data
.lock()
.await
.remove(&(app_id, collection.to_string(), id))
.map(|row| row.data))
}
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<DocsListPage, DocsRepoError> {
let map = self.data.lock().await;
let last_id = cursor
.map(|c| Uuid::parse_str(c).map_err(|_| DocsRepoError::InvalidCursor))
.transpose()?;
let mut docs: Vec<DocRow> = map
.iter()
.filter(|((a, c, _), _)| *a == app_id && c == collection)
.map(|(_, v)| v.clone())
.filter(|d| last_id.is_none_or(|lid| d.id > lid))
.collect();
docs.sort_by_key(|d| d.id);
let take = if limit == 0 {
usize::MAX
} else {
limit as usize
};
let next_cursor = if docs.len() > take {
docs.truncate(take);
docs.last().map(|d| d.id.to_string())
} else {
None
};
Ok(DocsListPage { docs, next_cursor })
}
}
/// Best-effort in-memory filter eval mirroring the Postgres
/// semantics: extract each field path as a text-form string, then
/// apply the operator. Good enough for the unit tests; production
/// always goes through the Postgres impl.
fn in_memory_matches(row: &DocRow, filter: &DocsFilter) -> bool {
for cond in &filter.conditions {
let actual = extract_path_str(&row.data, cond.path.segments());
if !cond_matches(actual.as_ref(), cond) {
return false;
}
}
true
}
fn cond_matches(actual: Option<&String>, cond: &crate::docs_filter::FieldCondition) -> bool {
use crate::docs_filter::ComparisonOp::*;
let actual: Option<&str> = actual.map(String::as_str);
let want = json_text(&cond.value);
let want_ref: Option<&str> = want.as_deref();
match cond.op {
Eq => actual == want_ref,
Ne => actual != want_ref,
Gt => actual.zip(want_ref).is_some_and(|(a, b)| a > b),
Gte => actual.zip(want_ref).is_some_and(|(a, b)| a >= b),
Lt => actual.zip(want_ref).is_some_and(|(a, b)| a < b),
Lte => actual.zip(want_ref).is_some_and(|(a, b)| a <= b),
In => {
let Some(arr) = cond.value.as_array() else {
return false;
};
arr.iter()
.any(|v| actual == json_text(v).as_deref())
}
}
}
fn extract_path_str(value: &serde_json::Value, segments: &[String]) -> Option<String> {
let mut cur = value;
for seg in segments {
cur = cur.as_object()?.get(seg)?;
}
json_text(cur)
}
fn json_text(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::Null => None,
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Bool(b) => Some(b.to_string()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Array(_) | serde_json::Value::Object(_) => Some(v.to_string()),
}
}
#[derive(Default)]
struct DenyingAuthzRepo;
#[async_trait]
impl AuthzRepo for DenyingAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(None)
}
}
#[derive(Default)]
struct AllowingAuthzRepo;
#[async_trait]
impl AuthzRepo for AllowingAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(Some(AppRole::Editor))
}
}
fn anon_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
principal: None,
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn owner_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
principal: Some(Principal {
user_id: AdminUserId::new(),
instance_role: InstanceRole::Owner,
scopes: None,
app_binding: None,
}),
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn member_no_role_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
principal: Some(Principal {
user_id: AdminUserId::new(),
instance_role: InstanceRole::Member,
scopes: None,
app_binding: None,
}),
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn svc() -> DocsServiceImpl {
DocsServiceImpl::new(
Arc::new(InMemoryDocsRepo::default()),
Arc::new(DenyingAuthzRepo),
Arc::new(NoopEventEmitter),
)
}
fn svc_allowing() -> DocsServiceImpl {
DocsServiceImpl::new(
Arc::new(InMemoryDocsRepo::default()),
Arc::new(AllowingAuthzRepo),
Arc::new(NoopEventEmitter),
)
}
#[tokio::test]
async fn create_then_get_round_trips() {
let s = svc();
let cx = anon_cx(AppId::new());
let id = s
.create(&cx, "users", json!({ "name": "Alice" }))
.await
.unwrap();
let row = s.get(&cx, "users", id).await.unwrap().unwrap();
assert_eq!(row.id, id);
assert_eq!(row.data, json!({ "name": "Alice" }));
}
#[tokio::test]
async fn get_missing_returns_none() {
let s = svc();
let cx = anon_cx(AppId::new());
let v = s.get(&cx, "users", Uuid::new_v4()).await.unwrap();
assert!(v.is_none());
}
#[tokio::test]
async fn update_missing_returns_not_found() {
let s = svc();
let cx = anon_cx(AppId::new());
let err = s
.update(&cx, "users", Uuid::new_v4(), json!({ "x": 1 }))
.await
.unwrap_err();
assert!(matches!(err, DocsError::NotFound));
}
#[tokio::test]
async fn delete_missing_returns_false() {
let s = svc();
let cx = anon_cx(AppId::new());
let was_present = s.delete(&cx, "users", Uuid::new_v4()).await.unwrap();
assert!(!was_present);
}
#[tokio::test]
async fn delete_present_returns_true() {
let s = svc();
let cx = anon_cx(AppId::new());
let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
let was_present = s.delete(&cx, "users", id).await.unwrap();
assert!(was_present);
}
#[tokio::test]
async fn update_present_succeeds() {
let s = svc();
let cx = anon_cx(AppId::new());
let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap();
let row = s.get(&cx, "users", id).await.unwrap().unwrap();
assert_eq!(row.data, json!({ "x": 2 }));
}
#[tokio::test]
async fn empty_collection_rejected() {
let s = svc();
let cx = anon_cx(AppId::new());
let err = s.create(&cx, "", json!({})).await.unwrap_err();
assert!(matches!(err, DocsError::InvalidCollection));
}
#[tokio::test]
async fn create_with_non_object_data_rejected() {
let s = svc();
let cx = anon_cx(AppId::new());
let err = s.create(&cx, "users", json!(42)).await.unwrap_err();
assert!(matches!(err, DocsError::InvalidData));
}
#[tokio::test]
async fn update_with_non_object_data_rejected() {
let s = svc();
let cx = anon_cx(AppId::new());
let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
let err = s
.update(&cx, "users", id, json!("not an object"))
.await
.unwrap_err();
assert!(matches!(err, DocsError::InvalidData));
}
/// Load-bearing: a script with `cx.app_id = A` must NOT see
/// documents created under `cx.app_id = B`. Cross-app isolation
/// boundary; tested through both `get` and `find` because each
/// path could conceivably leak independently.
#[tokio::test]
async fn cross_app_isolation_via_cx_app_id() {
let s = svc();
let app_a = AppId::new();
let app_b = AppId::new();
let cx_a = anon_cx(app_a);
let cx_b = anon_cx(app_b);
let id_a = s
.create(&cx_a, "shared", json!({ "from": "a" }))
.await
.unwrap();
let id_b = s
.create(&cx_b, "shared", json!({ "from": "b" }))
.await
.unwrap();
assert_ne!(id_a, id_b);
// Each app sees only its own doc via get.
assert!(s.get(&cx_a, "shared", id_b).await.unwrap().is_none());
assert!(s.get(&cx_b, "shared", id_a).await.unwrap().is_none());
// And via find.
let from_a = s.find(&cx_a, "shared", json!({})).await.unwrap();
assert_eq!(from_a.len(), 1);
assert_eq!(from_a[0].id, id_a);
let from_b = s.find(&cx_b, "shared", json!({})).await.unwrap();
assert_eq!(from_b.len(), 1);
assert_eq!(from_b[0].id, id_b);
}
#[tokio::test]
async fn anonymous_cx_skips_authz() {
// Denying authz repo + anon cx (no principal) ⇒ writes still
// succeed under script-as-gate.
let s = svc();
let cx = anon_cx(AppId::new());
let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
let _ = s.delete(&cx, "users", id).await.unwrap();
}
#[tokio::test]
async fn authed_cx_with_no_role_is_forbidden_on_write() {
let s = svc();
let cx = member_no_role_cx(AppId::new());
let err = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap_err();
assert!(matches!(err, DocsError::Forbidden));
}
#[tokio::test]
async fn authed_cx_with_no_role_is_forbidden_on_read() {
let s = svc();
let cx = member_no_role_cx(AppId::new());
let err = s.get(&cx, "users", Uuid::new_v4()).await.unwrap_err();
assert!(matches!(err, DocsError::Forbidden));
}
#[tokio::test]
async fn owner_principal_can_write() {
let s = svc();
let cx = owner_cx(AppId::new());
let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
}
#[tokio::test]
async fn editor_member_can_write_via_role() {
// AllowingAuthzRepo grants Editor — should be able to write
// (AppDocsWrite is in_editor in role_satisfies).
let s = svc_allowing();
let cx = member_no_role_cx(AppId::new());
let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
}
#[tokio::test]
async fn find_with_equality_returns_matches() {
let s = svc();
let cx = anon_cx(AppId::new());
s.create(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
s.create(&cx, "users", json!({ "tier": "silver" }))
.await
.unwrap();
s.create(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
let golds = s
.find(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
assert_eq!(golds.len(), 2);
}
#[tokio::test]
async fn find_one_returns_first_or_none() {
let s = svc();
let cx = anon_cx(AppId::new());
s.create(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
let hit = s
.find_one(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
assert!(hit.is_some());
let miss = s
.find_one(&cx, "users", json!({ "tier": "platinum" }))
.await
.unwrap();
assert!(miss.is_none());
}
#[tokio::test]
async fn find_with_unsupported_operator_throws() {
let s = svc();
let cx = anon_cx(AppId::new());
let err = s
.find(&cx, "users", json!({ "name": { "$regex": "^A" } }))
.await
.unwrap_err();
match err {
DocsError::UnsupportedOperator(m) => {
assert!(m.contains("$regex"));
assert!(m.contains("v1.2"));
}
other => panic!("expected UnsupportedOperator, got {other:?}"),
}
}
#[tokio::test]
async fn find_with_invalid_filter_throws() {
let s = svc();
let cx = anon_cx(AppId::new());
let err = s
.find(&cx, "users", json!({ "a.b.c.d.e.f": "x" }))
.await
.unwrap_err();
assert!(matches!(err, DocsError::InvalidFilter(_)));
}
#[tokio::test]
async fn find_with_dollar_in_returns_subset() {
let s = svc();
let cx = anon_cx(AppId::new());
s.create(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
s.create(&cx, "users", json!({ "tier": "silver" }))
.await
.unwrap();
s.create(&cx, "users", json!({ "tier": "platinum" }))
.await
.unwrap();
let hits = s
.find(
&cx,
"users",
json!({ "tier": { "$in": ["gold", "platinum"] } }),
)
.await
.unwrap();
assert_eq!(hits.len(), 2);
}
#[tokio::test]
async fn find_one_explicit_limit_is_honoured() {
// The service injects limit=1 ONLY when caller didn't set
// $limit. An explicit `$limit: 5` survives — and find_one
// still returns the first.
let s = svc();
let cx = anon_cx(AppId::new());
for _ in 0..3 {
s.create(&cx, "users", json!({ "tier": "gold" }))
.await
.unwrap();
}
let hit = s
.find_one(&cx, "users", json!({ "tier": "gold", "$limit": 5 }))
.await
.unwrap();
assert!(hit.is_some());
}
#[tokio::test]
async fn list_cursor_pagination() {
let s = svc();
let cx = anon_cx(AppId::new());
let mut ids = Vec::new();
for _ in 0..5 {
ids.push(s.create(&cx, "users", json!({})).await.unwrap());
}
ids.sort();
let p1 = s.list(&cx, "users", None, 2).await.unwrap();
assert_eq!(p1.docs.len(), 2);
assert!(p1.next_cursor.is_some());
let p2 = s
.list(&cx, "users", p1.next_cursor.as_deref(), 2)
.await
.unwrap();
assert_eq!(p2.docs.len(), 2);
let p3 = s
.list(&cx, "users", p2.next_cursor.as_deref(), 2)
.await
.unwrap();
assert_eq!(p3.docs.len(), 1);
assert!(p3.next_cursor.is_none());
}
#[tokio::test]
async fn noop_emitter_does_not_block_mutations() {
// Pins v1.1.0 contract: services hold an Arc<dyn ServiceEventEmitter>
// and call emit().await unconditionally. The noop drops it.
let s = svc();
let cx = anon_cx(AppId::new());
let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap();
s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap();
let _ = s.delete(&cx, "users", id).await.unwrap();
}
}

View File

@@ -26,6 +26,9 @@ pub mod dead_letter_repo;
pub mod dead_letter_service; pub mod dead_letter_service;
pub mod dead_letters_api; pub mod dead_letters_api;
pub mod dispatcher; pub mod dispatcher;
pub mod docs_filter;
pub mod docs_repo;
pub mod docs_service;
pub mod gc; pub mod gc;
pub mod kv_repo; pub mod kv_repo;
pub mod kv_service; pub mod kv_service;
@@ -86,6 +89,8 @@ pub use dead_letter_repo::{
pub use dead_letter_service::PostgresDeadLetterService; pub use dead_letter_service::PostgresDeadLetterService;
pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState}; pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState};
pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError};
pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo};
pub use docs_service::DocsServiceImpl;
pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc}; pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc};
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
pub use kv_service::KvServiceImpl; pub use kv_service::KvServiceImpl;
@@ -104,8 +109,8 @@ pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository};
pub use sandbox::{CeilingError, SandboxCeiling}; pub use sandbox::{CeilingError, SandboxCeiling};
pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_config::{BackoffShape, TriggerConfig};
pub use trigger_repo::{ pub use trigger_repo::{
collection_matches, CreateDeadLetterTrigger, CreateKvTrigger, DeadLetterTriggerMatch, collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger,
KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger,
TriggerRepo, TriggerRepoError, TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError,
}; };
pub use triggers_api::{triggers_router, TriggersApiError, TriggersState}; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState};