DocsServiceImpl mirrors KvServiceImpl's script-as-gate authz pattern,
the empty-collection rejection, and the best-effort emitter call —
adding "data must be a JSON object" validation, NotFound on update of
a missing doc, and prev_data plumbing via repo.update returning the
prior data.
PostgresDocsRepo handles CRUD against the docs table. The find path
runs through the v1.1.2 query DSL parser (docs_filter::parse_filter)
before building parameterised SQL via sqlx::QueryBuilder:
* Every field-path segment + comparison value is bound as $N.
* jsonb_extract_path_text(data, $N1, $N2, ...) handles variable
depth without segment interpolation.
* Base WHERE is fixed: WHERE app_id = $1 AND collection = $2.
Filter conditions can only narrow, never widen. Load-bearing
test in sql_shape_tests pins this prefix on every emitted query
+ asserts no user string ever lands in the SQL text.
* $ne uses IS DISTINCT FROM (not <>) so missing paths + JSON nulls
are correctly included.
* $in binds the value list as TEXT[] via = ANY($N::text[]).
* $sort always appends a ", id ASC" tiebreaker for stable cursor
pagination semantics; $limit is clamped to MAX_FIND_LIMIT.
docs_filter is the AST + parser for the DSL. Operator allowlist is
explicit; any non-v1.1.2 operator throws UnsupportedOperator with a
v1.2 pointer. Snapshot tests pin the SDK-contract error strings so
changing them is a deliberate act.
Two new Capability variants — AppDocsRead and AppDocsWrite — map to
the existing Scope::ScriptRead and ScriptWrite per the seven-scope
commitment from v1.1.0. role_satisfies grants read at Viewer,
write at Editor (same trust shape as KV).
59 unit tests added across the three new files. All pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
557 lines
18 KiB
Rust
557 lines
18 KiB
Rust
//! Low-level Postgres CRUD + filter-query builder over the `docs`
|
||
//! table (migration 0013). Stays storage-only; authorization, event
|
||
//! emission, and empty-collection validation live one layer up in
|
||
//! `DocsServiceImpl`.
|
||
//!
|
||
//! The `find` SQL builder is the security-critical surface. **Every
|
||
//! field-path segment and every comparison value is bound as a
|
||
//! `$N` parameter — never interpolated into the SQL string.** The base
|
||
//! `WHERE app_id = $1 AND collection = $2` clause is fixed and
|
||
//! prepended to every query so cross-app isolation can't be widened by
|
||
//! any operator. See `sql_starts_with_app_collection_predicate`
|
||
//! assertion in tests for the load-bearing guarantee.
|
||
|
||
use async_trait::async_trait;
|
||
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
|
||
use base64::Engine as _;
|
||
use chrono::{DateTime, Utc};
|
||
use picloud_shared::{AppId, DocId, DocRow, DocsListPage};
|
||
use serde_json::Value;
|
||
use sqlx::postgres::PgRow;
|
||
use sqlx::{PgPool, Postgres, QueryBuilder, Row};
|
||
use uuid::Uuid;
|
||
|
||
use crate::docs_filter::{ComparisonOp, DocsFilter, SortDir};
|
||
|
||
#[derive(Debug, thiserror::Error)]
|
||
pub enum DocsRepoError {
|
||
#[error("database error: {0}")]
|
||
Db(#[from] sqlx::Error),
|
||
|
||
#[error("invalid pagination cursor")]
|
||
InvalidCursor,
|
||
}
|
||
|
||
/// Repo surface. The trait is exposed so the service unit tests can
|
||
/// substitute an in-memory backing without spinning up Postgres.
|
||
#[async_trait]
|
||
pub trait DocsRepo: Send + Sync {
|
||
/// Create a new doc with a server-generated UUID. Returns the
|
||
/// fully-materialised `DocRow` so the caller has timestamps too
|
||
/// (no separate select-back round-trip).
|
||
async fn create(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
data: Value,
|
||
) -> Result<DocRow, DocsRepoError>;
|
||
|
||
async fn get(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
) -> Result<Option<DocRow>, DocsRepoError>;
|
||
|
||
/// Filter-based query. The parsed `DocsFilter` ensures every
|
||
/// field-path segment and operator value is bound as a parameter.
|
||
async fn find(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
filter: &DocsFilter,
|
||
) -> Result<Vec<DocRow>, DocsRepoError>;
|
||
|
||
/// Full document replace. Returns `Some(previous_data)` on
|
||
/// success, `None` if no doc matched (the service maps that to
|
||
/// `DocsError::NotFound`). The prev value is the input to the
|
||
/// emitted update event's `old_payload`.
|
||
async fn update(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
data: Value,
|
||
) -> Result<Option<Value>, DocsRepoError>;
|
||
|
||
/// Returns the deleted doc's data if it existed, `None` if no
|
||
/// such doc. The caller converts `Some` → `Ok(true)` for the SDK's
|
||
/// was-present return; the `Value` feeds the delete event's
|
||
/// `old_payload`.
|
||
async fn delete(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
) -> Result<Option<Value>, DocsRepoError>;
|
||
|
||
async fn list(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
cursor: Option<&str>,
|
||
limit: u32,
|
||
) -> Result<DocsListPage, DocsRepoError>;
|
||
}
|
||
|
||
pub struct PostgresDocsRepo {
|
||
pool: PgPool,
|
||
}
|
||
|
||
impl PostgresDocsRepo {
|
||
#[must_use]
|
||
pub fn new(pool: PgPool) -> Self {
|
||
Self { pool }
|
||
}
|
||
}
|
||
|
||
/// Hard ceiling on `list` page size — mirrors KV's `KV_LIST_MAX_LIMIT`.
|
||
/// Scripts that pass anything larger get silently clamped.
|
||
const DOCS_LIST_MAX_LIMIT: u32 = 1_000;
|
||
const DOCS_LIST_DEFAULT_LIMIT: u32 = 100;
|
||
|
||
#[async_trait]
|
||
impl DocsRepo for PostgresDocsRepo {
|
||
async fn create(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
data: Value,
|
||
) -> Result<DocRow, DocsRepoError> {
|
||
let id = Uuid::new_v4();
|
||
let row: (DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
|
||
"INSERT INTO docs (app_id, collection, id, data) \
|
||
VALUES ($1, $2, $3, $4) \
|
||
RETURNING created_at, updated_at",
|
||
)
|
||
.bind(app_id.into_inner())
|
||
.bind(collection)
|
||
.bind(id)
|
||
.bind(&data)
|
||
.fetch_one(&self.pool)
|
||
.await?;
|
||
Ok(DocRow {
|
||
id,
|
||
data,
|
||
created_at: row.0,
|
||
updated_at: row.1,
|
||
})
|
||
}
|
||
|
||
async fn get(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
) -> Result<Option<DocRow>, DocsRepoError> {
|
||
let row: Option<(Value, DateTime<Utc>, DateTime<Utc>)> = sqlx::query_as(
|
||
"SELECT data, created_at, updated_at FROM docs \
|
||
WHERE app_id = $1 AND collection = $2 AND id = $3",
|
||
)
|
||
.bind(app_id.into_inner())
|
||
.bind(collection)
|
||
.bind(id)
|
||
.fetch_optional(&self.pool)
|
||
.await?;
|
||
Ok(row.map(|(data, created_at, updated_at)| DocRow {
|
||
id,
|
||
data,
|
||
created_at,
|
||
updated_at,
|
||
}))
|
||
}
|
||
|
||
async fn find(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
filter: &DocsFilter,
|
||
) -> Result<Vec<DocRow>, DocsRepoError> {
|
||
let mut qb = build_find_query(app_id, collection, filter);
|
||
let rows = qb.build().fetch_all(&self.pool).await?;
|
||
rows.into_iter().map(row_to_doc).collect()
|
||
}
|
||
|
||
async fn update(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
data: Value,
|
||
) -> Result<Option<Value>, DocsRepoError> {
|
||
// Same CTE shape as KV's set ([kv_repo.rs:101-132]): SELECT the
|
||
// previous data before the UPDATE so the service can emit
|
||
// `prev_data` in the update ServiceEvent. Single statement, no
|
||
// explicit transaction. Inherits KV's last-writer-wins race
|
||
// under concurrent writers; documented as a known limitation
|
||
// for v1.1.2.
|
||
let row: Option<(Option<Value>,)> = sqlx::query_as(
|
||
"WITH prev AS ( \
|
||
SELECT data FROM docs \
|
||
WHERE app_id = $1 AND collection = $2 AND id = $3 \
|
||
), \
|
||
updated AS ( \
|
||
UPDATE docs SET data = $4, updated_at = NOW() \
|
||
WHERE app_id = $1 AND collection = $2 AND id = $3 \
|
||
RETURNING 1 \
|
||
) \
|
||
SELECT (SELECT data FROM prev) FROM updated",
|
||
)
|
||
.bind(app_id.into_inner())
|
||
.bind(collection)
|
||
.bind(id)
|
||
.bind(&data)
|
||
.fetch_optional(&self.pool)
|
||
.await?;
|
||
// `row` is None when the UPDATE matched no rows (missing doc);
|
||
// Some((Some(prev),)) on success. `data` is JSONB NOT NULL so
|
||
// the inner Option is always Some when prev exists.
|
||
Ok(row.and_then(|(v,)| v))
|
||
}
|
||
|
||
async fn delete(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
id: DocId,
|
||
) -> Result<Option<Value>, DocsRepoError> {
|
||
let row: Option<(Value,)> = sqlx::query_as(
|
||
"DELETE FROM docs \
|
||
WHERE app_id = $1 AND collection = $2 AND id = $3 \
|
||
RETURNING data",
|
||
)
|
||
.bind(app_id.into_inner())
|
||
.bind(collection)
|
||
.bind(id)
|
||
.fetch_optional(&self.pool)
|
||
.await?;
|
||
Ok(row.map(|(v,)| v))
|
||
}
|
||
|
||
async fn list(
|
||
&self,
|
||
app_id: AppId,
|
||
collection: &str,
|
||
cursor: Option<&str>,
|
||
limit: u32,
|
||
) -> Result<DocsListPage, DocsRepoError> {
|
||
let limit = if limit == 0 {
|
||
DOCS_LIST_DEFAULT_LIMIT
|
||
} else {
|
||
limit.min(DOCS_LIST_MAX_LIMIT)
|
||
};
|
||
|
||
let last_id = match cursor {
|
||
Some(c) => Some(decode_cursor(c)?),
|
||
None => None,
|
||
};
|
||
|
||
let take = i64::from(limit) + 1;
|
||
let rows: Vec<(Uuid, Value, DateTime<Utc>, DateTime<Utc>)> = sqlx::query_as(
|
||
"SELECT id, data, created_at, updated_at FROM docs \
|
||
WHERE app_id = $1 AND collection = $2 \
|
||
AND ($3::uuid IS NULL OR id > $3) \
|
||
ORDER BY id ASC \
|
||
LIMIT $4",
|
||
)
|
||
.bind(app_id.into_inner())
|
||
.bind(collection)
|
||
.bind(last_id)
|
||
.bind(take)
|
||
.fetch_all(&self.pool)
|
||
.await?;
|
||
|
||
let mut docs: Vec<DocRow> = rows
|
||
.into_iter()
|
||
.map(|(id, data, created_at, updated_at)| DocRow {
|
||
id,
|
||
data,
|
||
created_at,
|
||
updated_at,
|
||
})
|
||
.collect();
|
||
let next_cursor = if docs.len() > limit as usize {
|
||
docs.truncate(limit as usize);
|
||
docs.last().map(|d| encode_cursor(&d.id))
|
||
} else {
|
||
None
|
||
};
|
||
|
||
Ok(DocsListPage { docs, next_cursor })
|
||
}
|
||
}
|
||
|
||
fn row_to_doc(row: PgRow) -> Result<DocRow, DocsRepoError> {
|
||
Ok(DocRow {
|
||
id: row.try_get("id")?,
|
||
data: row.try_get("data")?,
|
||
created_at: row.try_get("created_at")?,
|
||
updated_at: row.try_get("updated_at")?,
|
||
})
|
||
}
|
||
|
||
fn encode_cursor(last_id: &Uuid) -> String {
|
||
URL_SAFE_NO_PAD.encode(last_id.as_bytes())
|
||
}
|
||
|
||
fn decode_cursor(cursor: &str) -> Result<Uuid, DocsRepoError> {
|
||
let bytes = URL_SAFE_NO_PAD
|
||
.decode(cursor)
|
||
.map_err(|_| DocsRepoError::InvalidCursor)?;
|
||
let arr: [u8; 16] = bytes
|
||
.as_slice()
|
||
.try_into()
|
||
.map_err(|_| DocsRepoError::InvalidCursor)?;
|
||
Ok(Uuid::from_bytes(arr))
|
||
}
|
||
|
||
// ----------------------------------------------------------------------------
|
||
// SQL builder — the load-bearing security surface.
|
||
//
|
||
// Every field-path segment + every comparison value goes through
|
||
// `QueryBuilder::push_bind`, which appends `$N` to the SQL string and
|
||
// binds the value as a parameter. The only literal strings appended to
|
||
// the SQL are: hardcoded SQL fragments (SELECT/WHERE/AND/etc.) and
|
||
// hardcoded operator strings ("=", "IS DISTINCT FROM", ">", "ASC", …).
|
||
// **No user input ever lands in the SQL text unparameterized.**
|
||
// ----------------------------------------------------------------------------
|
||
|
||
fn build_find_query<'a>(
|
||
app_id: AppId,
|
||
collection: &'a str,
|
||
filter: &'a DocsFilter,
|
||
) -> QueryBuilder<'a, Postgres> {
|
||
let mut qb =
|
||
QueryBuilder::new("SELECT id, data, created_at, updated_at FROM docs WHERE app_id = ");
|
||
qb.push_bind(app_id.into_inner());
|
||
qb.push(" AND collection = ");
|
||
qb.push_bind(collection);
|
||
|
||
for cond in &filter.conditions {
|
||
qb.push(" AND ");
|
||
emit_condition(&mut qb, cond);
|
||
}
|
||
|
||
qb.push(" ORDER BY ");
|
||
if let Some(sort) = &filter.sort {
|
||
push_jsonb_path(&mut qb, sort.path.segments());
|
||
qb.push(match sort.direction {
|
||
SortDir::Asc => " ASC",
|
||
SortDir::Desc => " DESC",
|
||
});
|
||
qb.push(", id ASC");
|
||
} else {
|
||
qb.push("id ASC");
|
||
}
|
||
|
||
let limit = filter
|
||
.limit
|
||
.map_or(DOCS_LIST_MAX_LIMIT, |l| l.min(DOCS_LIST_MAX_LIMIT));
|
||
qb.push(" LIMIT ");
|
||
qb.push_bind(i64::from(limit));
|
||
|
||
qb
|
||
}
|
||
|
||
fn emit_condition<'a>(
|
||
qb: &mut QueryBuilder<'a, Postgres>,
|
||
cond: &'a crate::docs_filter::FieldCondition,
|
||
) {
|
||
push_jsonb_path(qb, cond.path.segments());
|
||
match cond.op {
|
||
ComparisonOp::Eq => {
|
||
if cond.value.is_null() {
|
||
qb.push(" IS NULL");
|
||
} else {
|
||
qb.push(" = ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
}
|
||
ComparisonOp::Ne => {
|
||
// IS DISTINCT FROM correctly handles NULL on either side
|
||
// (would otherwise silently exclude rows with missing
|
||
// paths). Holds for the literal-NULL case too.
|
||
if cond.value.is_null() {
|
||
qb.push(" IS NOT NULL");
|
||
} else {
|
||
qb.push(" IS DISTINCT FROM ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
}
|
||
ComparisonOp::Gt => {
|
||
qb.push(" > ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
ComparisonOp::Gte => {
|
||
qb.push(" >= ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
ComparisonOp::Lt => {
|
||
qb.push(" < ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
ComparisonOp::Lte => {
|
||
qb.push(" <= ");
|
||
qb.push_bind(value_to_text(&cond.value));
|
||
}
|
||
ComparisonOp::In => {
|
||
qb.push(" = ANY(");
|
||
let texts: Vec<Option<String>> = cond
|
||
.value
|
||
.as_array()
|
||
.map(|arr| arr.iter().map(value_to_text).collect())
|
||
.unwrap_or_default();
|
||
qb.push_bind(texts);
|
||
qb.push(")");
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Append `jsonb_extract_path_text(data, $N1, $N2, …)` with each
|
||
/// segment bound as a separate text parameter. Variadic path lengths
|
||
/// (1–5) all flow through this single helper.
|
||
fn push_jsonb_path<'a>(qb: &mut QueryBuilder<'a, Postgres>, segments: &'a [String]) {
|
||
qb.push("jsonb_extract_path_text(data");
|
||
for seg in segments {
|
||
qb.push(", ");
|
||
qb.push_bind(seg.as_str());
|
||
}
|
||
qb.push(")");
|
||
}
|
||
|
||
/// JSON scalar → TEXT for binding. `Value::Null` is preserved as
|
||
/// `None` so the binding lands as SQL NULL (handled specially above for
|
||
/// `Eq` / `Ne`). Arrays + objects serialize to compact JSON; the user
|
||
/// is comparing against the JSONB text rendering, which is consistent
|
||
/// with `jsonb_extract_path_text`'s output for those types.
|
||
fn value_to_text(v: &Value) -> Option<String> {
|
||
match v {
|
||
Value::Null => None,
|
||
Value::String(s) => Some(s.clone()),
|
||
Value::Bool(b) => Some(b.to_string()),
|
||
Value::Number(n) => Some(n.to_string()),
|
||
Value::Array(_) | Value::Object(_) => Some(v.to_string()),
|
||
}
|
||
}
|
||
|
||
// ----------------------------------------------------------------------------
|
||
// SQL-shape guardrail tests — pure (no DB) so they run in the default
|
||
// test suite. These are the highest-stakes tests in the release: they
|
||
// pin the cross-app isolation invariant at the SQL level.
|
||
// ----------------------------------------------------------------------------
|
||
|
||
#[cfg(test)]
|
||
mod sql_shape_tests {
|
||
use super::*;
|
||
use crate::docs_filter::parse_filter;
|
||
use serde_json::json;
|
||
|
||
fn sql_for(filter_json: serde_json::Value) -> String {
|
||
let filter = parse_filter(&filter_json).unwrap();
|
||
let qb = build_find_query(AppId::new(), "users", &filter);
|
||
qb.sql().to_string()
|
||
}
|
||
|
||
/// **Load-bearing**: every generated SELECT begins
|
||
/// `WHERE app_id = $1 AND collection = $2`. The app_id parameter
|
||
/// is the cross-app isolation gate. No user-supplied filter
|
||
/// fragment can ever appear before these clauses.
|
||
#[test]
|
||
fn every_query_starts_with_app_id_and_collection_predicate() {
|
||
let cases = vec![
|
||
json!({}),
|
||
json!({ "tier": "gold" }),
|
||
json!({ "created_at": { "$gt": "2026-01-01" } }),
|
||
json!({ "tier": { "$in": ["gold", "platinum"] } }),
|
||
json!({ "tier": "gold", "status": "active" }),
|
||
json!({ "$sort": { "created_at": -1 }, "$limit": 5 }),
|
||
json!({ "tier": "gold", "$sort": { "created_at": 1 } }),
|
||
json!({ "deleted_at": { "$ne": null } }),
|
||
];
|
||
for case in cases {
|
||
let sql = sql_for(case.clone());
|
||
assert!(
|
||
sql.starts_with(
|
||
"SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2"
|
||
),
|
||
"filter {case} produced SQL: {sql}"
|
||
);
|
||
}
|
||
}
|
||
|
||
/// Every comparison value lands as a `$N` placeholder — there
|
||
/// should be NO double-quoted string literal in the SQL after the
|
||
/// fixed prefix. (This guards against an accidental `format!`
|
||
/// regression.)
|
||
#[test]
|
||
fn no_user_string_literal_in_sql() {
|
||
let sql = sql_for(json!({ "tier": "gold; DROP TABLE docs;--" }));
|
||
assert!(!sql.contains("gold"), "value leaked into SQL string: {sql}");
|
||
assert!(!sql.contains("DROP"), "value leaked into SQL string: {sql}");
|
||
}
|
||
|
||
/// Field-path segments also bind as parameters. A user passing a
|
||
/// path that looks like SQL keywords doesn't change the structure.
|
||
#[test]
|
||
fn no_user_path_literal_in_sql() {
|
||
let sql = sql_for(json!({ "drop_table_users": "v" }));
|
||
assert!(
|
||
!sql.contains("drop_table_users"),
|
||
"path leaked into SQL string: {sql}"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn empty_filter_sql_has_no_extra_conditions() {
|
||
let sql = sql_for(json!({}));
|
||
// After the fixed prefix, only ORDER BY + LIMIT — no `AND`s.
|
||
let suffix = sql
|
||
.trim_start_matches(
|
||
"SELECT id, data, created_at, updated_at FROM docs WHERE app_id = $1 AND collection = $2",
|
||
)
|
||
.trim();
|
||
assert!(
|
||
suffix.starts_with("ORDER BY"),
|
||
"expected ORDER BY immediately after base WHERE; got: {suffix}"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn eq_with_null_emits_is_null() {
|
||
let sql = sql_for(json!({ "x": null }));
|
||
assert!(sql.contains("IS NULL"), "sql: {sql}");
|
||
}
|
||
|
||
#[test]
|
||
fn ne_with_null_emits_is_not_null() {
|
||
let sql = sql_for(json!({ "x": { "$ne": null } }));
|
||
assert!(sql.contains("IS NOT NULL"), "sql: {sql}");
|
||
}
|
||
|
||
#[test]
|
||
fn ne_with_value_uses_is_distinct_from() {
|
||
// IS DISTINCT FROM, NOT <> — see ComparisonOp::Ne comment.
|
||
let sql = sql_for(json!({ "x": { "$ne": "v" } }));
|
||
assert!(sql.contains("IS DISTINCT FROM"), "sql: {sql}");
|
||
assert!(!sql.contains(" <> "), "sql: {sql}");
|
||
}
|
||
|
||
#[test]
|
||
fn in_emits_any_array() {
|
||
let sql = sql_for(json!({ "x": { "$in": ["a", "b"] } }));
|
||
assert!(sql.contains("= ANY"), "sql: {sql}");
|
||
}
|
||
|
||
#[test]
|
||
fn sort_appends_tiebreaker_id_asc() {
|
||
let sql = sql_for(json!({ "$sort": { "created_at": -1 } }));
|
||
assert!(sql.contains("DESC, id ASC"), "sql: {sql}");
|
||
}
|
||
|
||
#[test]
|
||
fn jsonb_extract_path_used_for_field_access() {
|
||
let sql = sql_for(json!({ "user.email": "a@b" }));
|
||
assert!(sql.contains("jsonb_extract_path_text(data"), "sql: {sql}");
|
||
}
|
||
}
|