//! `DocsServiceImpl` — wires the `DocsRepo` underneath the //! `picloud_shared::DocsService` trait that scripts see via the Rhai //! bridge. //! //! Layers added here (vs the raw repo): //! //! 1. Empty-collection rejection at the SDK boundary //! (`docs/sdk-shape.md`). //! 2. `data` must be a JSON object for create + update. (The repo //! accepts anything serde_json can serialise; the SDK contract //! pins documents to map shape so dotted-path queries make sense.) //! 3. **Script-as-gate authz**: when `cx.principal.is_some()` we run //! `authz::require(...)`; when it's `None` (public unauthenticated //! HTTP — the common case for public routes) we skip the check. //! Cross-app isolation isn't affected — every query is keyed by //! `cx.app_id`, never an argument. //! 4. Query DSL parse — `find`/`find_one` parse the opaque filter //! into `DocsFilter` before passing it down. Parse errors map to //! `DocsError::InvalidFilter` / `UnsupportedOperator` with the //! parser's message verbatim (script-visible). //! 5. `ServiceEvent` emission after each mutation (`create` / `update` //! / `delete`). The outbox emitter (when wired) turns these into //! docs-trigger fan-out via `OutboxEventEmitter::emit_docs`. use std::sync::Arc; use async_trait::async_trait; use picloud_shared::{ DocId, DocRow, DocsError, DocsListPage, DocsService, SdkCallCx, ServiceEvent, ServiceEventEmitter, }; use crate::authz::{self, AuthzRepo, Capability}; use crate::docs_filter::{parse_filter, FilterParseError}; use crate::docs_repo::{DocsRepo, DocsRepoError}; pub struct DocsServiceImpl { repo: Arc, authz: Arc, events: Arc, } impl DocsServiceImpl { #[must_use] pub fn new( repo: Arc, authz: Arc, events: Arc, ) -> Self { Self { repo, authz, events, } } async fn check_read(&self, cx: &SdkCallCx) -> Result<(), DocsError> { if let Some(ref principal) = cx.principal { authz::require(&*self.authz, principal, Capability::AppDocsRead(cx.app_id)) .await .map_err(|_| DocsError::Forbidden)?; } Ok(()) } async fn check_write(&self, cx: &SdkCallCx) -> Result<(), DocsError> { if let Some(ref principal) = cx.principal { authz::require(&*self.authz, principal, Capability::AppDocsWrite(cx.app_id)) .await .map_err(|_| DocsError::Forbidden)?; } Ok(()) } } fn validate_collection(collection: &str) -> Result<(), DocsError> { if collection.is_empty() { return Err(DocsError::InvalidCollection); } Ok(()) } fn validate_data(data: &serde_json::Value) -> Result<(), DocsError> { if !data.is_object() { return Err(DocsError::InvalidData); } Ok(()) } impl From for DocsError { fn from(e: DocsRepoError) -> Self { Self::Backend(e.to_string()) } } impl From for DocsError { fn from(e: FilterParseError) -> Self { match e { FilterParseError::InvalidFilter(s) => Self::InvalidFilter(s), FilterParseError::UnsupportedOperator(s) => Self::UnsupportedOperator(s), } } } #[async_trait] impl DocsService for DocsServiceImpl { async fn create( &self, cx: &SdkCallCx, collection: &str, data: serde_json::Value, ) -> Result { validate_collection(collection)?; validate_data(&data)?; self.check_write(cx).await?; let row = self .repo .create(cx.app_id, collection, data.clone()) .await?; // Best-effort emit — a failed emit logs but does not roll back // the write (mirrors KV's pattern). if let Err(e) = self .events .emit( cx, ServiceEvent { source: "docs", op: "create", collection: Some(collection.to_string()), key: Some(row.id.to_string()), payload: Some(data), old_payload: None, }, ) .await { tracing::warn!(error = %e, source = "docs", op = "create", "event emit failed"); } Ok(row.id) } async fn get( &self, cx: &SdkCallCx, collection: &str, id: DocId, ) -> Result, DocsError> { validate_collection(collection)?; self.check_read(cx).await?; Ok(self.repo.get(cx.app_id, collection, id).await?) } async fn find( &self, cx: &SdkCallCx, collection: &str, filter: serde_json::Value, ) -> Result, DocsError> { validate_collection(collection)?; self.check_read(cx).await?; let parsed = parse_filter(&filter)?; Ok(self.repo.find(cx.app_id, collection, &parsed).await?) } async fn find_one( &self, cx: &SdkCallCx, collection: &str, filter: serde_json::Value, ) -> Result, DocsError> { validate_collection(collection)?; self.check_read(cx).await?; let mut parsed = parse_filter(&filter)?; // Inject the implicit `LIMIT 1` for find_one — explicit // caller-supplied `$limit` wins. if parsed.limit.is_none() { parsed.limit = Some(1); } let rows = self.repo.find(cx.app_id, collection, &parsed).await?; Ok(rows.into_iter().next()) } async fn update( &self, cx: &SdkCallCx, collection: &str, id: DocId, data: serde_json::Value, ) -> Result<(), DocsError> { validate_collection(collection)?; validate_data(&data)?; self.check_write(cx).await?; let previous = self .repo .update(cx.app_id, collection, id, data.clone()) .await?; match previous { Some(prev) => { if let Err(e) = self .events .emit( cx, ServiceEvent { source: "docs", op: "update", collection: Some(collection.to_string()), key: Some(id.to_string()), payload: Some(data), old_payload: Some(prev), }, ) .await { tracing::warn!(error = %e, source = "docs", op = "update", "event emit failed"); } Ok(()) } None => Err(DocsError::NotFound), } } async fn delete(&self, cx: &SdkCallCx, collection: &str, id: DocId) -> Result { validate_collection(collection)?; self.check_write(cx).await?; let previous = self.repo.delete(cx.app_id, collection, id).await?; let was_present = previous.is_some(); if let Some(prev) = previous { if let Err(e) = self .events .emit( cx, ServiceEvent { source: "docs", op: "delete", collection: Some(collection.to_string()), key: Some(id.to_string()), payload: None, old_payload: Some(prev), }, ) .await { tracing::warn!(error = %e, source = "docs", op = "delete", "event emit failed"); } } Ok(was_present) } async fn list( &self, cx: &SdkCallCx, collection: &str, cursor: Option<&str>, limit: u32, ) -> Result { validate_collection(collection)?; self.check_read(cx).await?; Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?) } } // ---------------------------------------------------------------------------- // Tests — in-memory DocsRepo so unit tests don't need Postgres. // ---------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; use crate::authz::{AuthzError, AuthzRepo}; use crate::docs_filter::DocsFilter; use async_trait::async_trait; use chrono::Utc; use picloud_shared::{ AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal, RequestId, UserId, }; use serde_json::json; use std::collections::BTreeMap; use std::sync::Arc; use tokio::sync::Mutex; use uuid::Uuid; /// In-memory backing: BTreeMap keyed by `(app_id, collection, id)` /// so iteration is naturally ordered for stable cursor pagination /// (matches the Postgres `ORDER BY id ASC`). #[derive(Default)] struct InMemoryDocsRepo { data: Mutex>, } #[async_trait] impl DocsRepo for InMemoryDocsRepo { async fn create( &self, app_id: AppId, collection: &str, data: serde_json::Value, ) -> Result { let id = Uuid::new_v4(); let now = Utc::now(); let row = DocRow { id, data, created_at: now, updated_at: now, }; self.data .lock() .await .insert((app_id, collection.to_string(), id), row.clone()); Ok(row) } async fn get( &self, app_id: AppId, collection: &str, id: DocId, ) -> Result, DocsRepoError> { Ok(self .data .lock() .await .get(&(app_id, collection.to_string(), id)) .cloned()) } async fn find( &self, app_id: AppId, collection: &str, filter: &DocsFilter, ) -> Result, DocsRepoError> { let map = self.data.lock().await; let mut out: Vec = map .iter() .filter(|((a, c, _), _)| *a == app_id && c == collection) .map(|(_, v)| v.clone()) .filter(|row| in_memory_matches(row, filter)) .collect(); if let Some(sort) = &filter.sort { let path = sort.path.segments().to_vec(); let dir = sort.direction; out.sort_by(|a, b| { let av = extract_path_str(&a.data, &path); let bv = extract_path_str(&b.data, &path); let ord = av.cmp(&bv); match dir { crate::docs_filter::SortDir::Asc => ord, crate::docs_filter::SortDir::Desc => ord.reverse(), } }); } else { out.sort_by_key(|d| d.id); } if let Some(limit) = filter.limit { out.truncate(limit as usize); } Ok(out) } async fn update( &self, app_id: AppId, collection: &str, id: DocId, data: serde_json::Value, ) -> Result, DocsRepoError> { let mut map = self.data.lock().await; let key = (app_id, collection.to_string(), id); let Some(existing) = map.get_mut(&key) else { return Ok(None); }; let prev = std::mem::replace(&mut existing.data, data); existing.updated_at = Utc::now(); Ok(Some(prev)) } async fn delete( &self, app_id: AppId, collection: &str, id: DocId, ) -> Result, DocsRepoError> { Ok(self .data .lock() .await .remove(&(app_id, collection.to_string(), id)) .map(|row| row.data)) } async fn list( &self, app_id: AppId, collection: &str, cursor: Option<&str>, limit: u32, ) -> Result { let map = self.data.lock().await; let last_id = cursor .map(|c| Uuid::parse_str(c).map_err(|_| DocsRepoError::InvalidCursor)) .transpose()?; let mut docs: Vec = map .iter() .filter(|((a, c, _), _)| *a == app_id && c == collection) .map(|(_, v)| v.clone()) .filter(|d| last_id.is_none_or(|lid| d.id > lid)) .collect(); docs.sort_by_key(|d| d.id); let take = if limit == 0 { usize::MAX } else { limit as usize }; let next_cursor = if docs.len() > take { docs.truncate(take); docs.last().map(|d| d.id.to_string()) } else { None }; Ok(DocsListPage { docs, next_cursor }) } } /// Best-effort in-memory filter eval mirroring the Postgres /// semantics: extract each field path as a text-form string, then /// apply the operator. Good enough for the unit tests; production /// always goes through the Postgres impl. fn in_memory_matches(row: &DocRow, filter: &DocsFilter) -> bool { for cond in &filter.conditions { let actual = extract_path_str(&row.data, cond.path.segments()); if !cond_matches(actual.as_ref(), cond) { return false; } } true } fn cond_matches(actual: Option<&String>, cond: &crate::docs_filter::FieldCondition) -> bool { use crate::docs_filter::ComparisonOp::*; let actual: Option<&str> = actual.map(String::as_str); let want = json_text(&cond.value); let want_ref: Option<&str> = want.as_deref(); match cond.op { Eq => actual == want_ref, Ne => actual != want_ref, Gt => actual.zip(want_ref).is_some_and(|(a, b)| a > b), Gte => actual.zip(want_ref).is_some_and(|(a, b)| a >= b), Lt => actual.zip(want_ref).is_some_and(|(a, b)| a < b), Lte => actual.zip(want_ref).is_some_and(|(a, b)| a <= b), In => { let Some(arr) = cond.value.as_array() else { return false; }; arr.iter().any(|v| actual == json_text(v).as_deref()) } } } fn extract_path_str(value: &serde_json::Value, segments: &[String]) -> Option { let mut cur = value; for seg in segments { cur = cur.as_object()?.get(seg)?; } json_text(cur) } fn json_text(v: &serde_json::Value) -> Option { match v { serde_json::Value::Null => None, serde_json::Value::String(s) => Some(s.clone()), serde_json::Value::Bool(b) => Some(b.to_string()), serde_json::Value::Number(n) => Some(n.to_string()), serde_json::Value::Array(_) | serde_json::Value::Object(_) => Some(v.to_string()), } } #[derive(Default)] struct DenyingAuthzRepo; #[async_trait] impl AuthzRepo for DenyingAuthzRepo { async fn membership( &self, _user_id: UserId, _app_id: AppId, ) -> Result, AuthzError> { Ok(None) } } #[derive(Default)] struct AllowingAuthzRepo; #[async_trait] impl AuthzRepo for AllowingAuthzRepo { async fn membership( &self, _user_id: UserId, _app_id: AppId, ) -> Result, AuthzError> { Ok(Some(AppRole::Editor)) } } fn anon_cx(app_id: AppId) -> SdkCallCx { SdkCallCx { app_id, principal: None, execution_id: ExecutionId::new(), request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), is_dead_letter_handler: false, event: None, } } fn owner_cx(app_id: AppId) -> SdkCallCx { SdkCallCx { app_id, principal: Some(Principal { user_id: AdminUserId::new(), instance_role: InstanceRole::Owner, scopes: None, app_binding: None, }), execution_id: ExecutionId::new(), request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), is_dead_letter_handler: false, event: None, } } fn member_no_role_cx(app_id: AppId) -> SdkCallCx { SdkCallCx { app_id, principal: Some(Principal { user_id: AdminUserId::new(), instance_role: InstanceRole::Member, scopes: None, app_binding: None, }), execution_id: ExecutionId::new(), request_id: RequestId::new(), trigger_depth: 0, root_execution_id: ExecutionId::new(), is_dead_letter_handler: false, event: None, } } fn svc() -> DocsServiceImpl { DocsServiceImpl::new( Arc::new(InMemoryDocsRepo::default()), Arc::new(DenyingAuthzRepo), Arc::new(NoopEventEmitter), ) } fn svc_allowing() -> DocsServiceImpl { DocsServiceImpl::new( Arc::new(InMemoryDocsRepo::default()), Arc::new(AllowingAuthzRepo), Arc::new(NoopEventEmitter), ) } #[tokio::test] async fn create_then_get_round_trips() { let s = svc(); let cx = anon_cx(AppId::new()); let id = s .create(&cx, "users", json!({ "name": "Alice" })) .await .unwrap(); let row = s.get(&cx, "users", id).await.unwrap().unwrap(); assert_eq!(row.id, id); assert_eq!(row.data, json!({ "name": "Alice" })); } #[tokio::test] async fn get_missing_returns_none() { let s = svc(); let cx = anon_cx(AppId::new()); let v = s.get(&cx, "users", Uuid::new_v4()).await.unwrap(); assert!(v.is_none()); } #[tokio::test] async fn update_missing_returns_not_found() { let s = svc(); let cx = anon_cx(AppId::new()); let err = s .update(&cx, "users", Uuid::new_v4(), json!({ "x": 1 })) .await .unwrap_err(); assert!(matches!(err, DocsError::NotFound)); } #[tokio::test] async fn delete_missing_returns_false() { let s = svc(); let cx = anon_cx(AppId::new()); let was_present = s.delete(&cx, "users", Uuid::new_v4()).await.unwrap(); assert!(!was_present); } #[tokio::test] async fn delete_present_returns_true() { let s = svc(); let cx = anon_cx(AppId::new()); let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); let was_present = s.delete(&cx, "users", id).await.unwrap(); assert!(was_present); } #[tokio::test] async fn update_present_succeeds() { let s = svc(); let cx = anon_cx(AppId::new()); let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap(); let row = s.get(&cx, "users", id).await.unwrap().unwrap(); assert_eq!(row.data, json!({ "x": 2 })); } #[tokio::test] async fn empty_collection_rejected() { let s = svc(); let cx = anon_cx(AppId::new()); let err = s.create(&cx, "", json!({})).await.unwrap_err(); assert!(matches!(err, DocsError::InvalidCollection)); } #[tokio::test] async fn create_with_non_object_data_rejected() { let s = svc(); let cx = anon_cx(AppId::new()); let err = s.create(&cx, "users", json!(42)).await.unwrap_err(); assert!(matches!(err, DocsError::InvalidData)); } #[tokio::test] async fn update_with_non_object_data_rejected() { let s = svc(); let cx = anon_cx(AppId::new()); let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); let err = s .update(&cx, "users", id, json!("not an object")) .await .unwrap_err(); assert!(matches!(err, DocsError::InvalidData)); } /// Load-bearing: a script with `cx.app_id = A` must NOT see /// documents created under `cx.app_id = B`. Cross-app isolation /// boundary; tested through both `get` and `find` because each /// path could conceivably leak independently. #[tokio::test] async fn cross_app_isolation_via_cx_app_id() { let s = svc(); let app_a = AppId::new(); let app_b = AppId::new(); let cx_a = anon_cx(app_a); let cx_b = anon_cx(app_b); let id_a = s .create(&cx_a, "shared", json!({ "from": "a" })) .await .unwrap(); let id_b = s .create(&cx_b, "shared", json!({ "from": "b" })) .await .unwrap(); assert_ne!(id_a, id_b); // Each app sees only its own doc via get. assert!(s.get(&cx_a, "shared", id_b).await.unwrap().is_none()); assert!(s.get(&cx_b, "shared", id_a).await.unwrap().is_none()); // And via find. let from_a = s.find(&cx_a, "shared", json!({})).await.unwrap(); assert_eq!(from_a.len(), 1); assert_eq!(from_a[0].id, id_a); let from_b = s.find(&cx_b, "shared", json!({})).await.unwrap(); assert_eq!(from_b.len(), 1); assert_eq!(from_b[0].id, id_b); } #[tokio::test] async fn anonymous_cx_skips_authz() { // Denying authz repo + anon cx (no principal) ⇒ writes still // succeed under script-as-gate. let s = svc(); let cx = anon_cx(AppId::new()); let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); let _ = s.delete(&cx, "users", id).await.unwrap(); } #[tokio::test] async fn authed_cx_with_no_role_is_forbidden_on_write() { let s = svc(); let cx = member_no_role_cx(AppId::new()); let err = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap_err(); assert!(matches!(err, DocsError::Forbidden)); } #[tokio::test] async fn authed_cx_with_no_role_is_forbidden_on_read() { let s = svc(); let cx = member_no_role_cx(AppId::new()); let err = s.get(&cx, "users", Uuid::new_v4()).await.unwrap_err(); assert!(matches!(err, DocsError::Forbidden)); } #[tokio::test] async fn owner_principal_can_write() { let s = svc(); let cx = owner_cx(AppId::new()); let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); } #[tokio::test] async fn editor_member_can_write_via_role() { // AllowingAuthzRepo grants Editor — should be able to write // (AppDocsWrite is in_editor in role_satisfies). let s = svc_allowing(); let cx = member_no_role_cx(AppId::new()); let _ = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); } #[tokio::test] async fn find_with_equality_returns_matches() { let s = svc(); let cx = anon_cx(AppId::new()); s.create(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); s.create(&cx, "users", json!({ "tier": "silver" })) .await .unwrap(); s.create(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); let golds = s .find(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); assert_eq!(golds.len(), 2); } #[tokio::test] async fn find_one_returns_first_or_none() { let s = svc(); let cx = anon_cx(AppId::new()); s.create(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); let hit = s .find_one(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); assert!(hit.is_some()); let miss = s .find_one(&cx, "users", json!({ "tier": "platinum" })) .await .unwrap(); assert!(miss.is_none()); } #[tokio::test] async fn find_with_unsupported_operator_throws() { let s = svc(); let cx = anon_cx(AppId::new()); let err = s .find(&cx, "users", json!({ "name": { "$regex": "^A" } })) .await .unwrap_err(); match err { DocsError::UnsupportedOperator(m) => { assert!(m.contains("$regex")); assert!(m.contains("v1.2")); } other => panic!("expected UnsupportedOperator, got {other:?}"), } } #[tokio::test] async fn find_with_invalid_filter_throws() { let s = svc(); let cx = anon_cx(AppId::new()); let err = s .find(&cx, "users", json!({ "a.b.c.d.e.f": "x" })) .await .unwrap_err(); assert!(matches!(err, DocsError::InvalidFilter(_))); } #[tokio::test] async fn find_with_dollar_in_returns_subset() { let s = svc(); let cx = anon_cx(AppId::new()); s.create(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); s.create(&cx, "users", json!({ "tier": "silver" })) .await .unwrap(); s.create(&cx, "users", json!({ "tier": "platinum" })) .await .unwrap(); let hits = s .find( &cx, "users", json!({ "tier": { "$in": ["gold", "platinum"] } }), ) .await .unwrap(); assert_eq!(hits.len(), 2); } #[tokio::test] async fn find_one_explicit_limit_is_honoured() { // The service injects limit=1 ONLY when caller didn't set // $limit. An explicit `$limit: 5` survives — and find_one // still returns the first. let s = svc(); let cx = anon_cx(AppId::new()); for _ in 0..3 { s.create(&cx, "users", json!({ "tier": "gold" })) .await .unwrap(); } let hit = s .find_one(&cx, "users", json!({ "tier": "gold", "$limit": 5 })) .await .unwrap(); assert!(hit.is_some()); } #[tokio::test] async fn list_cursor_pagination() { let s = svc(); let cx = anon_cx(AppId::new()); let mut ids = Vec::new(); for _ in 0..5 { ids.push(s.create(&cx, "users", json!({})).await.unwrap()); } ids.sort(); let p1 = s.list(&cx, "users", None, 2).await.unwrap(); assert_eq!(p1.docs.len(), 2); assert!(p1.next_cursor.is_some()); let p2 = s .list(&cx, "users", p1.next_cursor.as_deref(), 2) .await .unwrap(); assert_eq!(p2.docs.len(), 2); let p3 = s .list(&cx, "users", p2.next_cursor.as_deref(), 2) .await .unwrap(); assert_eq!(p3.docs.len(), 1); assert!(p3.next_cursor.is_none()); } #[tokio::test] async fn noop_emitter_does_not_block_mutations() { // Pins v1.1.0 contract: services hold an Arc // and call emit().await unconditionally. The noop drops it. let s = svc(); let cx = anon_cx(AppId::new()); let id = s.create(&cx, "users", json!({ "x": 1 })).await.unwrap(); s.update(&cx, "users", id, json!({ "x": 2 })).await.unwrap(); let _ = s.delete(&cx, "users", id).await.unwrap(); } }