diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index 445a0a5..b45abbe 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -348,6 +348,7 @@ fn build_ctx_map(req: &ExecRequest) -> Map { /// `docs/v1.1.x-design-notes.md` §4 (the dead-letter sub-shape) and /// §2/blueprint §9 (KV). Each variant becomes a Rhai map with a /// `source` discriminant plus per-source fields. +#[allow(clippy::too_many_lines)] fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic { let mut m = Map::new(); m.insert("source".into(), event.source().into()); @@ -406,6 +407,33 @@ fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic { cron_map.insert("fired_at".into(), fired_at.to_rfc3339().into()); m.insert("cron".into(), cron_map.into()); } + TriggerEvent::Files { + op, + collection, + id, + name, + content_type, + size, + checksum, + prev, + } => { + m.insert("op".into(), op.as_str().into()); + let mut files_map = Map::new(); + files_map.insert("collection".into(), collection.clone().into()); + files_map.insert("id".into(), id.clone().into()); + files_map.insert("name".into(), name.clone().into()); + files_map.insert("content_type".into(), content_type.clone().into()); + files_map.insert( + "size".into(), + i64::try_from(*size).unwrap_or(i64::MAX).into(), + ); + files_map.insert("checksum".into(), checksum.clone().into()); + files_map.insert( + "prev".into(), + prev.clone().map_or(Dynamic::UNIT, json_to_dynamic), + ); + m.insert("files".into(), files_map.into()); + } TriggerEvent::DeadLetter { dead_letter_id, original, diff --git a/crates/executor-core/src/sdk/files.rs b/crates/executor-core/src/sdk/files.rs new file mode 100644 index 0000000..fa146ec --- /dev/null +++ b/crates/executor-core/src/sdk/files.rs @@ -0,0 +1,281 @@ +//! `files::` Rhai bridge — collection-scoped handle pattern (v1.1.5). +//! +//! ```rhai +//! let avatars = files::collection("avatars"); +//! let id = avatars.create(#{ name: "a.jpg", content_type: "image/jpeg", data: blob }); +//! let meta = avatars.head(id); // metadata map or () +//! let bytes = avatars.get(id); // Blob or () +//! avatars.update(id, #{ data: new_bytes }); +//! let gone = avatars.delete(id); // bool (was-present) +//! let page = avatars.list(); // #{ files: [...], next_cursor: () } +//! ``` +//! +//! The `FilesHandle` custom Rhai type captures the collection name once +//! and routes each call through the injected `Arc` +//! with the per-call `Arc`. **The service derives `app_id` +//! from `cx.app_id` — it never appears in any signature script-side, +//! preserving cross-app isolation.** +//! +//! Error convention (per `docs/sdk-shape.md`): `create`/`update`/ +//! `delete` throw on failure; `get`/`head` return `()` for a missing +//! file; `delete` returns `bool` (was-present). The blob bytes are a +//! Rhai `Blob` (byte array) in both directions. + +use std::sync::Arc; + +use picloud_shared::{ + FileMeta, FileUpdate, FilesError, FilesService, NewFile, SdkCallCx, Services, +}; +use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module}; +use tokio::runtime::Handle as TokioHandle; + +/// Per-call handle captured by the Rhai SDK. Cheap to clone (two Arcs +/// plus an owned string). +#[derive(Clone)] +pub struct FilesHandle { + collection: String, + service: Arc, + cx: Arc, +} + +pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + let files_service = services.files.clone(); + + let mut module = Module::new(); + { + let files_service = files_service.clone(); + let cx = cx.clone(); + module.set_native_fn( + "collection", + move |name: &str| -> Result> { + if name.is_empty() { + return Err("files::collection name must not be empty".into()); + } + Ok(FilesHandle { + collection: name.to_string(), + service: files_service.clone(), + cx: cx.clone(), + }) + }, + ); + } + engine.register_static_module("files", module.into()); + + engine.register_type_with_name::("FilesHandle"); + + register_create(engine); + register_head(engine); + register_get(engine); + register_update(engine); + register_delete(engine); + register_list(engine); +} + +fn register_create(engine: &mut RhaiEngine) { + engine.register_fn( + "create", + |handle: &mut FilesHandle, meta: Map| -> Result> { + let name = require_string(&meta, "name")?; + let content_type = require_string(&meta, "content_type")?; + let data = require_blob(&meta, "data")?; + let h = handle.clone(); + let new = NewFile { + name, + content_type, + data, + }; + let id = block_on(async move { h.service.create(&h.cx, &h.collection, new).await })?; + Ok(id.to_string()) + }, + ); +} + +fn register_head(engine: &mut RhaiEngine) { + engine.register_fn( + "head", + |handle: &mut FilesHandle, id: &str| -> Result> { + let h = handle.clone(); + let id = id.to_string(); + let meta = block_on(async move { h.service.head(&h.cx, &h.collection, &id).await })?; + Ok(meta.map_or(Dynamic::UNIT, |m| file_meta_to_map(&m).into())) + }, + ); +} + +fn register_get(engine: &mut RhaiEngine) { + engine.register_fn( + "get", + |handle: &mut FilesHandle, id: &str| -> Result> { + let h = handle.clone(); + let id = id.to_string(); + let bytes = block_on(async move { h.service.get(&h.cx, &h.collection, &id).await })?; + Ok(bytes.map_or(Dynamic::UNIT, Dynamic::from_blob)) + }, + ); +} + +fn register_update(engine: &mut RhaiEngine) { + engine.register_fn( + "update", + |handle: &mut FilesHandle, id: &str, meta: Map| -> Result<(), Box> { + let data = require_blob(&meta, "data")?; + let name = optional_string(&meta, "name")?; + let content_type = optional_string(&meta, "content_type")?; + let h = handle.clone(); + let id = id.to_string(); + let upd = FileUpdate { + data, + name, + content_type, + }; + block_on(async move { h.service.update(&h.cx, &h.collection, &id, upd).await }) + }, + ); +} + +fn register_delete(engine: &mut RhaiEngine) { + engine.register_fn( + "delete", + |handle: &mut FilesHandle, id: &str| -> Result> { + let h = handle.clone(); + let id = id.to_string(); + block_on(async move { h.service.delete(&h.cx, &h.collection, &id).await }) + }, + ); +} + +fn register_list(engine: &mut RhaiEngine) { + engine.register_fn( + "list", + |handle: &mut FilesHandle| -> Result> { + list_call(handle, None, 0) + }, + ); + engine.register_fn( + "list", + |handle: &mut FilesHandle, cursor: &str| -> Result> { + list_call(handle, Some(cursor.to_string()), 0) + }, + ); + engine.register_fn( + "list", + |handle: &mut FilesHandle, cursor: &str, limit: i64| -> Result> { + let limit = u32::try_from(limit.max(0)).unwrap_or(0); + list_call(handle, Some(cursor.to_string()), limit) + }, + ); + // `list(#{ cursor, limit })` — the map form documented in the brief. + engine.register_fn( + "list", + |handle: &mut FilesHandle, opts: Map| -> Result> { + let cursor = match opts.get("cursor") { + Some(v) if !v.is_unit() => { + Some(v.clone().into_string().map_err(|_| -> Box { + "files: list cursor must be a string".into() + })?) + } + _ => None, + }; + let limit = match opts.get("limit") { + Some(v) if !v.is_unit() => { + u32::try_from(v.as_int().unwrap_or(0).max(0)).unwrap_or(0) + } + _ => 0, + }; + list_call(handle, cursor, limit) + }, + ); +} + +fn list_call( + handle: &FilesHandle, + cursor: Option, + limit: u32, +) -> Result> { + let h = handle.clone(); + let page = block_on(async move { + h.service + .list(&h.cx, &h.collection, cursor.as_deref(), limit) + .await + })?; + let mut m = Map::new(); + let files: Array = page + .files + .iter() + .map(|meta| Dynamic::from(file_meta_to_map(meta))) + .collect(); + m.insert("files".into(), files.into()); + m.insert( + "next_cursor".into(), + page.next_cursor.map_or(Dynamic::UNIT, Dynamic::from), + ); + Ok(m) +} + +/// Render a `FileMeta` into the Rhai map shape scripts see from +/// `head` / `list`. +fn file_meta_to_map(meta: &FileMeta) -> Map { + let mut m = Map::new(); + m.insert("id".into(), meta.id.to_string().into()); + m.insert("collection".into(), meta.collection.clone().into()); + m.insert("name".into(), meta.name.clone().into()); + m.insert("content_type".into(), meta.content_type.clone().into()); + m.insert( + "size".into(), + i64::try_from(meta.size).unwrap_or(i64::MAX).into(), + ); + m.insert("checksum".into(), meta.checksum.clone().into()); + m.insert("created_at".into(), meta.created_at.to_rfc3339().into()); + m.insert("updated_at".into(), meta.updated_at.to_rfc3339().into()); + m +} + +/// Pull a required string field out of a Rhai map; throw naming the +/// field if it's absent or not a string. +fn require_string(meta: &Map, field: &'static str) -> Result> { + match meta.get(field) { + Some(v) if v.is_string() => Ok(v.clone().into_string().unwrap_or_default()), + Some(_) => Err(format!("files::create: field '{field}' must be a string").into()), + None => Err(format!("files::create: missing required field '{field}'").into()), + } +} + +/// Pull an optional string field; `None` when the key is absent or unit. +fn optional_string(meta: &Map, field: &'static str) -> Result, Box> { + match meta.get(field) { + None => Ok(None), + Some(v) if v.is_unit() => Ok(None), + Some(v) if v.is_string() => Ok(Some(v.clone().into_string().unwrap_or_default())), + Some(_) => Err(format!("files::update: field '{field}' must be a string").into()), + } +} + +/// Pull a required blob (`data`) out of a Rhai map; throw naming the +/// field if it's absent or not a blob. +fn require_blob(meta: &Map, field: &'static str) -> Result, Box> { + match meta.get(field) { + Some(v) if v.is_blob() => Ok(v.clone().into_blob().unwrap_or_default()), + Some(_) => Err(format!("files: field '{field}' must be a Blob (byte array)").into()), + None => Err(format!("files: missing required field '{field}'").into()), + } +} + +/// Run an async future inside the synchronous Rhai context. Mirrors +/// `kv::block_on`; safe because `LocalExecutorClient` runs the script +/// under `spawn_blocking`, so a runtime handle is reachable. +fn block_on(fut: F) -> Result> +where + F: std::future::Future> + Send, + T: Send, +{ + let handle = TokioHandle::try_current().map_err(|e| -> Box { + EvalAltResult::ErrorRuntime( + format!("files: no tokio runtime available: {e}").into(), + rhai::Position::NONE, + ) + .into() + })?; + handle.block_on(fut).map_err(|err| -> Box { + EvalAltResult::ErrorRuntime(format!("files: {err}").into(), rhai::Position::NONE).into() + }) +} diff --git a/crates/executor-core/src/sdk/mod.rs b/crates/executor-core/src/sdk/mod.rs index 138fa73..b1387f7 100644 --- a/crates/executor-core/src/sdk/mod.rs +++ b/crates/executor-core/src/sdk/mod.rs @@ -15,6 +15,7 @@ pub mod bridge; pub mod cx; pub mod dead_letters; pub mod docs; +pub mod files; pub mod http; pub mod kv; pub mod stdlib; @@ -37,5 +38,6 @@ pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) -> Services { Arc::new(NoopEventEmitter), modules, Arc::new(NoopHttpService), + Arc::new(picloud_shared::NoopFilesService), ) } diff --git a/crates/executor-core/tests/sdk_docs.rs b/crates/executor-core/tests/sdk_docs.rs index 62c6e74..44512e2 100644 --- a/crates/executor-core/tests/sdk_docs.rs +++ b/crates/executor-core/tests/sdk_docs.rs @@ -228,6 +228,7 @@ fn make_engine() -> Arc { Arc::new(NoopEventEmitter), Arc::new(NoopModuleSource), Arc::new(NoopHttpService), + Arc::new(picloud_shared::NoopFilesService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_files.rs b/crates/executor-core/tests/sdk_files.rs new file mode 100644 index 0000000..6bf15ab --- /dev/null +++ b/crates/executor-core/tests/sdk_files.rs @@ -0,0 +1,333 @@ +//! `files::` SDK bridge integration tests — runs a real Rhai engine +//! against an in-memory `FilesService` impl. Mirrors `tests/sdk_kv.rs`: +//! `tokio::task::spawn_blocking` so the bridge's `block_on` has a +//! reachable runtime. Exercises the actual Rhai surface — blob in/out, +//! the metadata map shape, and the missing-required-field throw. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; +use picloud_shared::{ + AppId, ExecutionId, FileMeta, FileUpdate, FilesError, FilesListPage, FilesService, NewFile, + NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopHttpService, NoopKvService, + NoopModuleSource, RequestId, ScriptId, ScriptSandbox, SdkCallCx, Services, +}; +use serde_json::{json, Value}; +use tokio::sync::Mutex; +use uuid::Uuid; + +#[derive(Default)] +struct InMemoryFiles { + #[allow(clippy::type_complexity)] + data: Mutex)>>, +} + +/// The in-memory fake doesn't exercise the real checksum path (the +/// `FsFilesRepo` tempdir tests in manager-core cover SHA-256); a stable +/// placeholder keeps the metadata map non-empty. +fn fake_checksum(bytes: &[u8]) -> String { + format!("len-{}", bytes.len()) +} + +#[async_trait] +impl FilesService for InMemoryFiles { + async fn create( + &self, + cx: &SdkCallCx, + collection: &str, + new: NewFile, + ) -> Result { + if collection.is_empty() { + return Err(FilesError::InvalidCollection("empty".into())); + } + new.validate(100 * 1024 * 1024)?; + let id = Uuid::new_v4(); + let now = chrono::Utc::now(); + let meta = FileMeta { + id, + collection: collection.to_string(), + name: new.name.clone(), + content_type: new.content_type.clone(), + size: new.data.len() as u64, + checksum: fake_checksum(&new.data), + created_at: now, + updated_at: now, + }; + self.data + .lock() + .await + .insert((cx.app_id, collection.to_string(), id), (meta, new.data)); + Ok(id) + } + + async fn head( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result, FilesError> { + let Ok(uuid) = Uuid::parse_str(id) else { + return Ok(None); + }; + Ok(self + .data + .lock() + .await + .get(&(cx.app_id, collection.to_string(), uuid)) + .map(|(m, _)| m.clone())) + } + + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result>, FilesError> { + let Ok(uuid) = Uuid::parse_str(id) else { + return Ok(None); + }; + Ok(self + .data + .lock() + .await + .get(&(cx.app_id, collection.to_string(), uuid)) + .map(|(_, b)| b.clone())) + } + + async fn update( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + upd: FileUpdate, + ) -> Result<(), FilesError> { + upd.validate(100 * 1024 * 1024)?; + let Ok(uuid) = Uuid::parse_str(id) else { + return Err(FilesError::NotFound); + }; + let mut data = self.data.lock().await; + let key = (cx.app_id, collection.to_string(), uuid); + let Some((meta, _)) = data.get(&key).cloned() else { + return Err(FilesError::NotFound); + }; + let mut meta = meta; + if let Some(n) = upd.name { + meta.name = n; + } + if let Some(ct) = upd.content_type { + meta.content_type = ct; + } + meta.size = upd.data.len() as u64; + meta.checksum = fake_checksum(&upd.data); + data.insert(key, (meta, upd.data)); + Ok(()) + } + + async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result { + let Ok(uuid) = Uuid::parse_str(id) else { + return Ok(false); + }; + Ok(self + .data + .lock() + .await + .remove(&(cx.app_id, collection.to_string(), uuid)) + .is_some()) + } + + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + _cursor: Option<&str>, + _limit: u32, + ) -> Result { + let data = self.data.lock().await; + let files: Vec = data + .iter() + .filter(|((a, c, _), _)| *a == cx.app_id && c == collection) + .map(|(_, (m, _))| m.clone()) + .collect(); + Ok(FilesListPage { + files, + next_cursor: None, + }) + } +} + +fn make_engine() -> Arc { + let services = Services::new( + Arc::new(NoopKvService), + Arc::new(NoopDocsService), + Arc::new(NoopDeadLetterService), + Arc::new(NoopEventEmitter), + Arc::new(NoopModuleSource), + Arc::new(NoopHttpService), + Arc::new(InMemoryFiles::default()), + ); + Arc::new(Engine::new(Limits::default(), services)) +} + +fn baseline_request(app_id: AppId) -> ExecRequest { + let execution_id = ExecutionId::new(); + ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id: ScriptId::new(), + script_name: "files-test".into(), + invocation_type: InvocationType::Http, + path: "/files-test".into(), + headers: BTreeMap::new(), + body: Value::Null, + params: BTreeMap::new(), + query: BTreeMap::new(), + rest: String::new(), + sandbox_overrides: ScriptSandbox::default(), + app_id, + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, + } +} + +async fn run_script(engine: Arc, src: &str, req: ExecRequest) -> Value { + let src = src.to_string(); + tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .expect("spawn_blocking should not panic") + .expect("script execution should succeed") + .body +} + +async fn run_script_err(engine: Arc, src: &str, req: ExecRequest) -> String { + let src = src.to_string(); + let res = tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .expect("spawn_blocking should not panic"); + format!("{:?}", res.expect_err("script should error")) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_create_get_round_trip_via_blob() { + let engine = make_engine(); + let app = AppId::new(); + // base64("hello") = "aGVsbG8="; decode → blob; create; get back; encode. + let src = r#" + let c = files::collection("avatars"); + let data = base64::decode("aGVsbG8="); + let id = c.create(#{ name: "a.txt", content_type: "text/plain", data: data }); + let back = c.get(id); + base64::encode(back) + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!("aGVsbG8=")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_head_returns_metadata_map() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + let data = base64::decode("aGVsbG8="); + let id = c.create(#{ name: "a.txt", content_type: "text/plain", data: data }); + let meta = c.head(id); + #{ name: meta.name, content_type: meta.content_type, size: meta.size, has_checksum: meta.checksum != () } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!( + body, + json!({ "name": "a.txt", "content_type": "text/plain", "size": 5, "has_checksum": true }) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_get_and_head_missing_return_unit() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + let g = c.get("00000000-0000-0000-0000-000000000000"); + let h = c.head("00000000-0000-0000-0000-000000000000"); + #{ g: g == (), h: h == () } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!({ "g": true, "h": true })); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_update_then_delete() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + let id = c.create(#{ name: "a", content_type: "text/plain", data: base64::decode("YQ==") }); + c.update(id, #{ data: base64::decode("YmM=") }); // "bc" + let after = base64::encode(c.get(id)); + let removed = c.delete(id); + let gone = c.delete(id); + #{ after: after, removed: removed, gone: gone } + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!( + body, + json!({ "after": "YmM=", "removed": true, "gone": false }) + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_create_missing_data_throws_naming_field() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + c.create(#{ name: "a", content_type: "text/plain" }) + "#; + let err = run_script_err(engine, src, baseline_request(app)).await; + assert!( + err.contains("data"), + "error should name the missing field: {err}" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_create_missing_name_throws_naming_field() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + c.create(#{ content_type: "text/plain", data: base64::decode("YQ==") }) + "#; + let err = run_script_err(engine, src, baseline_request(app)).await; + assert!( + err.contains("name"), + "error should name the missing field: {err}" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_empty_collection_name_throws() { + let engine = make_engine(); + let app = AppId::new(); + let err = run_script_err(engine, r#"files::collection("")"#, baseline_request(app)).await; + assert!(err.to_lowercase().contains("empty"), "got {err}"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn files_list_returns_files_array() { + let engine = make_engine(); + let app = AppId::new(); + let src = r#" + let c = files::collection("avatars"); + c.create(#{ name: "a", content_type: "text/plain", data: base64::decode("YQ==") }); + c.create(#{ name: "b", content_type: "text/plain", data: base64::decode("Yg==") }); + let page = c.list(); + page.files.len() + "#; + let body = run_script(engine, src, baseline_request(app)).await; + assert_eq!(body, json!(2)); +} diff --git a/crates/executor-core/tests/sdk_http.rs b/crates/executor-core/tests/sdk_http.rs index 11a0f2c..447246a 100644 --- a/crates/executor-core/tests/sdk_http.rs +++ b/crates/executor-core/tests/sdk_http.rs @@ -88,6 +88,7 @@ fn engine_with(http: Arc) -> Arc { Arc::new(NoopEventEmitter), Arc::new(NoopModuleSource), http, + Arc::new(picloud_shared::NoopFilesService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/executor-core/tests/sdk_kv.rs b/crates/executor-core/tests/sdk_kv.rs index 343e0ad..a8706cd 100644 --- a/crates/executor-core/tests/sdk_kv.rs +++ b/crates/executor-core/tests/sdk_kv.rs @@ -107,6 +107,7 @@ fn make_engine() -> Arc { Arc::new(NoopEventEmitter), Arc::new(NoopModuleSource), Arc::new(NoopHttpService), + Arc::new(picloud_shared::NoopFilesService), ); Arc::new(Engine::new(Limits::default(), services)) } diff --git a/crates/manager-core/migrations/0018_files.sql b/crates/manager-core/migrations/0018_files.sql new file mode 100644 index 0000000..e94ab1e --- /dev/null +++ b/crates/manager-core/migrations/0018_files.sql @@ -0,0 +1,25 @@ +-- v1.1.5: filesystem-backed blob storage. The row holds metadata + +-- the SHA-256 checksum; the blob bytes live on disk at +-- /files//// +-- (never in Postgres). Identity tuple is (app_id, collection, id) per +-- docs/sdk-shape.md, matching KV/docs collection scoping. +-- +-- The checksum is computed in a single pass during the atomic write and +-- re-verified on read (FilesError::Corrupted on mismatch). Per-app +-- quotas are deferred to v1.2; only the per-file size cap is enforced +-- (in the service, not the schema). +CREATE TABLE files ( + app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE, + collection TEXT NOT NULL, + id UUID NOT NULL, + name TEXT NOT NULL, + content_type TEXT NOT NULL, + size_bytes BIGINT NOT NULL, + checksum_sha256 TEXT NOT NULL, -- hex, 64 chars, lowercase + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (app_id, collection, id) +); + +-- List + cursor pagination scans by (app_id, collection). +CREATE INDEX idx_files_app_collection ON files (app_id, collection); diff --git a/crates/manager-core/migrations/0019_files_triggers.sql b/crates/manager-core/migrations/0019_files_triggers.sql new file mode 100644 index 0000000..71d1039 --- /dev/null +++ b/crates/manager-core/migrations/0019_files_triggers.sql @@ -0,0 +1,29 @@ +-- v1.1.5: extend the triggers framework to recognise `files` as the +-- fifth concrete kind (after `kv`/`dead_letter` v1.1.1, `docs` v1.1.2, +-- `cron` v1.1.4). Mirrors the 0014/0017 extensions exactly: two CHECK +-- constraints widen (strictly gaining `'files'`), one new detail table. +-- +-- Files rows route through the SAME generic dispatcher path as the +-- other event kinds (single match-arm extension on the Rust side). The +-- only new machinery is the FilesServiceImpl emitting ServiceEvents +-- that the OutboxEventEmitter fans out — identical to KV/docs. + +-- Extend triggers.kind to include 'files'. No existing row carries a +-- value outside the widened set, so the drop+add is safe. +ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check; +ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check + CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', 'files')); + +-- Extend outbox.source_kind to include 'files'. +ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check; +ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check + CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs', 'cron', 'files')); + +-- One row per files trigger. Mirrors kv_trigger_details: +-- collection_glob — "*", "exact", or "prefix*" +-- ops — subset of {create, update, delete}, empty = any +CREATE TABLE files_trigger_details ( + trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE, + collection_glob TEXT NOT NULL, + ops TEXT[] NOT NULL +); diff --git a/crates/manager-core/src/authz.rs b/crates/manager-core/src/authz.rs index e7457b6..64615c4 100644 --- a/crates/manager-core/src/authz.rs +++ b/crates/manager-core/src/authz.rs @@ -78,6 +78,17 @@ pub enum Capability { /// so the conservative write mapping is correct. Splitting /// read/write is a v1.2+ refinement. Granted to `editor`+. AppHttpRequest(AppId), + /// Read blobs from this app's files store (v1.1.5). Same trust + /// shape as KV/docs read — granted to `viewer`+, maps to + /// `script:read` on API keys. Honors the seven-scope commitment. + AppFilesRead(AppId), + /// Write blobs to this app's files store (v1.1.5). Granted to + /// `editor`+, maps to `script:write` on API keys. + AppFilesWrite(AppId), + /// Publish a durable pub/sub message from a script in this app + /// (v1.1.5). Maps to `script:write` on API keys (a publish is a + /// write that fans out to subscribers). Granted to `editor`+. + AppPubsubPublish(AppId), /// Create / list / delete triggers for this app (v1.1.1). Maps to /// `app:admin` on API keys — triggers are app-configuration acts /// rather than data-plane access. Granted to `app_admin`+. @@ -108,6 +119,9 @@ impl Capability { | Self::AppDocsRead(id) | Self::AppDocsWrite(id) | Self::AppHttpRequest(id) + | Self::AppFilesRead(id) + | Self::AppFilesWrite(id) + | Self::AppPubsubPublish(id) | Self::AppManageTriggers(id) | Self::AppDeadLetterManage(id) => Some(id), } @@ -124,11 +138,16 @@ impl Capability { Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => { Scope::InstanceAdmin } - Self::AppRead(_) | Self::AppKvRead(_) | Self::AppDocsRead(_) => Scope::ScriptRead, + Self::AppRead(_) + | Self::AppKvRead(_) + | Self::AppDocsRead(_) + | Self::AppFilesRead(_) => Scope::ScriptRead, Self::AppWriteScript(_) | Self::AppKvWrite(_) | Self::AppDocsWrite(_) - | Self::AppHttpRequest(_) => Scope::ScriptWrite, + | Self::AppHttpRequest(_) + | Self::AppFilesWrite(_) + | Self::AppPubsubPublish(_) => Scope::ScriptWrite, Self::AppWriteRoute(_) => Scope::RouteWrite, Self::AppManageDomains(_) => Scope::DomainManage, Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => { @@ -277,6 +296,7 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool { | Capability::AppLogRead(_) | Capability::AppKvRead(_) | Capability::AppDocsRead(_) + | Capability::AppFilesRead(_) ); let in_editor = in_viewer || matches!( @@ -286,6 +306,8 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool { | Capability::AppKvWrite(_) | Capability::AppDocsWrite(_) | Capability::AppHttpRequest(_) + | Capability::AppFilesWrite(_) + | Capability::AppPubsubPublish(_) ); let in_app_admin = in_editor || matches!( diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index 1f4e7b2..af105ca 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -166,7 +166,8 @@ impl Dispatcher { OutboxSourceKind::Kv | OutboxSourceKind::Docs | OutboxSourceKind::DeadLetter - | OutboxSourceKind::Cron => { + | OutboxSourceKind::Cron + | OutboxSourceKind::Files => { let resolved = self.resolve_trigger(&row).await?; let req = match self.build_exec_request(&row, &resolved).await { Ok(req) => req, diff --git a/crates/manager-core/src/files_api.rs b/crates/manager-core/src/files_api.rs new file mode 100644 index 0000000..af8120c --- /dev/null +++ b/crates/manager-core/src/files_api.rs @@ -0,0 +1,215 @@ +//! `/api/v1/admin/apps/{id}/files*` — minimal files admin endpoints +//! backing the dashboard's files view (v1.1.5). +//! +//! Two operations only, both operator-facing: +//! * `GET /apps/{id}/files?collection=&cursor=&limit=` — list file +//! metadata for a collection (cursor-paginated). +//! * `DELETE /apps/{id}/files/{collection}/{file_id}` — remove a file. +//! +//! These talk to the `FilesRepo` directly (like `triggers_api` talks to +//! `TriggerRepo`), guarded by the same capability model as the SDK +//! (`AppFilesRead` / `AppFilesWrite`). **Admin deletes do NOT emit a +//! `files:delete` trigger event** — they're operator cleanup actions, +//! not script mutations (see HANDBACK §7). The capability binds to the +//! resource's `app_id` after the app is loaded. + +use std::sync::Arc; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Json, Response}; +use axum::routing::{delete, get}; +use axum::{Extension, Router}; +use picloud_shared::{AppId, Principal}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use uuid::Uuid; + +use crate::app_repo::AppRepository; +use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability}; +use crate::files_repo::{FilesRepo, FilesRepoError}; + +#[derive(Clone)] +pub struct FilesAdminState { + pub files: Arc, + pub apps: Arc, + pub authz: Arc, +} + +pub fn files_admin_router(state: FilesAdminState) -> Router { + Router::new() + .route("/apps/{app_id}/files", get(list_files)) + .route( + "/apps/{app_id}/files/{collection}/{file_id}", + delete(delete_file), + ) + .with_state(state) +} + +#[derive(Debug, Deserialize)] +pub struct ListFilesQuery { + pub collection: String, + #[serde(default)] + pub cursor: Option, + #[serde(default)] + pub limit: Option, +} + +#[derive(Debug, Serialize)] +struct FileMetaDto { + id: String, + collection: String, + name: String, + content_type: String, + size: u64, + checksum: String, + created_at: String, + updated_at: String, +} + +#[derive(Debug, Serialize)] +struct ListFilesResponse { + files: Vec, + next_cursor: Option, +} + +async fn list_files( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, + Query(q): Query, +) -> Result, FilesApiError> { + ensure_app_exists(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppFilesRead(app_id), + ) + .await?; + if q.collection.trim().is_empty() { + return Err(FilesApiError::Invalid( + "collection must not be empty".into(), + )); + } + let page = s + .files + .list( + app_id, + &q.collection, + q.cursor.as_deref(), + q.limit.unwrap_or(0), + ) + .await?; + let files = page + .files + .into_iter() + .map(|m| FileMetaDto { + id: m.id.to_string(), + collection: m.collection, + name: m.name, + content_type: m.content_type, + size: m.size, + checksum: m.checksum, + created_at: m.created_at.to_rfc3339(), + updated_at: m.updated_at.to_rfc3339(), + }) + .collect(); + Ok(Json(ListFilesResponse { + files, + next_cursor: page.next_cursor, + })) +} + +async fn delete_file( + State(s): State, + Extension(principal): Extension, + Path((app_id, collection, file_id)): Path<(AppId, String, String)>, +) -> Result { + ensure_app_exists(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppFilesWrite(app_id), + ) + .await?; + let id = Uuid::parse_str(&file_id).map_err(|_| FilesApiError::NotFound)?; + if s.files.delete(app_id, &collection, id).await?.is_none() { + return Err(FilesApiError::NotFound); + } + Ok(StatusCode::NO_CONTENT) +} + +async fn ensure_app_exists(apps: &dyn AppRepository, app_id: AppId) -> Result<(), FilesApiError> { + apps.get_by_id(app_id) + .await + .map_err(|e| FilesApiError::Backend(e.to_string()))? + .ok_or(FilesApiError::AppNotFound)?; + Ok(()) +} + +#[derive(Debug, thiserror::Error)] +pub enum FilesApiError { + #[error("app not found")] + AppNotFound, + #[error("file not found")] + NotFound, + #[error("invalid request: {0}")] + Invalid(String), + #[error("forbidden")] + Forbidden, + #[error("authorization repo error: {0}")] + AuthzRepo(String), + #[error("files backend: {0}")] + Backend(String), +} + +impl From for FilesApiError { + fn from(d: AuthzDenied) -> Self { + match d { + AuthzDenied::Denied => Self::Forbidden, + AuthzDenied::Repo(e) => Self::AuthzRepo(e.to_string()), + } + } +} + +impl From for FilesApiError { + fn from(e: AuthzError) -> Self { + Self::AuthzRepo(e.to_string()) + } +} + +impl From for FilesApiError { + fn from(e: FilesRepoError) -> Self { + Self::Backend(e.to_string()) + } +} + +impl IntoResponse for FilesApiError { + fn into_response(self) -> Response { + let (status, body) = match &self { + Self::AppNotFound | Self::NotFound => { + (StatusCode::NOT_FOUND, json!({ "error": self.to_string() })) + } + Self::Invalid(_) => ( + StatusCode::UNPROCESSABLE_ENTITY, + json!({ "error": self.to_string() }), + ), + Self::Forbidden => (StatusCode::FORBIDDEN, json!({ "error": self.to_string() })), + Self::AuthzRepo(e) => { + tracing::error!(error = %e, "files admin authz repo error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "error": "internal error" }), + ) + } + Self::Backend(e) => { + tracing::error!(error = %e, "files admin backend error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "error": "internal error" }), + ) + } + }; + (status, Json(body)).into_response() + } +} diff --git a/crates/manager-core/src/files_repo.rs b/crates/manager-core/src/files_repo.rs new file mode 100644 index 0000000..d20225c --- /dev/null +++ b/crates/manager-core/src/files_repo.rs @@ -0,0 +1,759 @@ +//! `FilesRepo` — the metadata row (Postgres) + blob bytes (filesystem) +//! storage layer for the v1.1.5 `files::*` SDK. +//! +//! Unlike KV/docs, this repo owns BOTH halves of a file: the `files` +//! row (metadata + SHA-256 checksum) and the bytes on disk at +//! `/files////`. +//! It owns both because the write must be atomic across them — a crash +//! mid-write must never leave a readable half-written file. +//! +//! ## Atomic write protocol (`create` / `update`) +//! 1. Validate (collection path-safety; caps live one layer up). +//! 2. `create_dir_all` the shard dir with `0o700`. +//! 3. SHA-256 the in-memory bytes (single pass) while writing to +//! `.tmp.`. +//! 4. `fsync` the temp file. +//! 5. `rename` temp → final (atomic on POSIX). +//! 6. `fsync` the parent dir (so the rename is durable). +//! 7. INSERT / UPDATE the DB row. +//! +//! A crash between 1–5 leaves an orphan `*.tmp.*` (never read). A crash +//! between 5–7 leaves a file with no row — never reachable via the SDK +//! (reads start from the row). Both are reclaimed by a future orphan +//! sweep (deferred to v1.1.6+; see HANDBACK §7). +//! +//! ## Atomic delete protocol +//! 1. SELECT + DELETE the row inside one transaction; commit. +//! 2. `unlink` the file (outside the tx). A failure here leaves an +//! orphan; a failure before the commit changes nothing. +//! +//! ## Checksum-on-read +//! `get` reads the file, hashes it, and compares against the stored +//! checksum — returning `FilesError::Corrupted` (and logging the path +//! at error level) on a mismatch. It never auto-deletes; the operator +//! decides what to do with a metadata-vs-bytes divergence. + +use std::env; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; + +use async_trait::async_trait; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use base64::Engine as _; +use chrono::{DateTime, Utc}; +use picloud_shared::{AppId, FileMeta, FileUpdate, FilesListPage, NewFile}; +use sha2::{Digest, Sha256}; +use sqlx::PgPool; +use uuid::Uuid; + +/// 100 MB default per-file cap. +pub const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 100 * 1024 * 1024; +/// Default filesystem root (relative to the process CWD). +pub const DEFAULT_FILES_ROOT: &str = "./data"; + +const FILES_LIST_MAX_LIMIT: u32 = 1_000; +const FILES_LIST_DEFAULT_LIMIT: u32 = 100; + +/// Monotonic counter feeding unique temp-file suffixes (combined with +/// the pid). Avoids `rand` in the storage layer per the brief. +static TMP_COUNTER: AtomicU64 = AtomicU64::new(0); + +#[derive(Debug, thiserror::Error)] +pub enum FilesRepoError { + #[error("database error: {0}")] + Db(#[from] sqlx::Error), + + #[error("filesystem error: {0}")] + Io(String), + + #[error("invalid collection name: {0}")] + InvalidCollection(String), + + /// The bytes on disk no longer match the stored checksum (or are + /// missing entirely while the row persists). + #[error("file content corrupted (checksum mismatch)")] + Corrupted, + + #[error("invalid pagination cursor")] + InvalidCursor, +} + +/// Outbound-files tunables. Env-overridable following the same pattern +/// as `HttpConfig::from_env`. +#[derive(Debug, Clone)] +pub struct FilesConfig { + pub root: PathBuf, + pub max_file_size_bytes: usize, +} + +impl FilesConfig { + #[must_use] + pub fn conservative() -> Self { + Self { + root: PathBuf::from(DEFAULT_FILES_ROOT), + max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES, + } + } + + #[must_use] + pub fn from_env() -> Self { + let mut c = Self::conservative(); + if let Ok(v) = env::var("PICLOUD_FILES_ROOT") { + if !v.trim().is_empty() { + c.root = PathBuf::from(v); + } + } + if let Ok(v) = env::var("PICLOUD_FILES_MAX_FILE_SIZE_BYTES") { + match v.parse::() { + Ok(n) => c.max_file_size_bytes = n, + Err(e) => { + tracing::warn!(error = %e, "ignoring invalid PICLOUD_FILES_MAX_FILE_SIZE_BYTES"); + } + } + } + c + } +} + +impl Default for FilesConfig { + fn default() -> Self { + Self::conservative() + } +} + +/// The new+prior metadata returned from a successful `update`, so the +/// service can emit a `ServiceEvent` with the change-data-capture +/// surface (`old_payload`). +#[derive(Debug, Clone)] +pub struct FileUpdated { + pub new: FileMeta, + pub prev: FileMeta, +} + +#[async_trait] +pub trait FilesRepo: Send + Sync { + async fn create( + &self, + app_id: AppId, + collection: &str, + new: NewFile, + ) -> Result; + + async fn head( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError>; + + /// Reads + checksum-verifies the bytes. `Ok(None)` when no row + /// exists; `Err(Corrupted)` when the row exists but the bytes are + /// missing or mismatched. + async fn get( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result>, FilesRepoError>; + + /// `Ok(None)` when no row exists (the SDK turns this into + /// `FilesError::NotFound`). + async fn update( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + upd: FileUpdate, + ) -> Result, FilesRepoError>; + + /// Returns the deleted row's metadata if present, `None` otherwise. + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError>; + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result; +} + +/// Filesystem-bytes + Postgres-metadata repo. +pub struct FsFilesRepo { + pool: PgPool, + config: FilesConfig, +} + +impl FsFilesRepo { + #[must_use] + pub fn new(pool: PgPool, config: FilesConfig) -> Self { + Self { pool, config } + } + + /// Defensive path-component guard. The service already validates the + /// collection at the SDK boundary; this is belt-and-suspenders so a + /// future caller can't smuggle a traversal sequence onto disk. + fn guard_collection(collection: &str) -> Result<(), FilesRepoError> { + if collection.is_empty() + || collection.contains('/') + || collection.contains('\\') + || collection.contains("..") + || collection.contains('\0') + { + return Err(FilesRepoError::InvalidCollection(collection.to_string())); + } + Ok(()) + } + + fn final_path(&self, app_id: AppId, collection: &str, id: Uuid) -> PathBuf { + final_path_at(&self.config.root, app_id, collection, id) + } + + fn write_atomic( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + bytes: &[u8], + ) -> Result { + write_atomic_at(&self.config.root, app_id, collection, id, bytes) + } +} + +fn shard_dir_at(root: &Path, app_id: AppId, collection: &str, id_str: &str) -> PathBuf { + root.join("files") + .join(app_id.into_inner().to_string()) + .join(collection) + .join(&id_str[..2]) +} + +fn final_path_at(root: &Path, app_id: AppId, collection: &str, id: Uuid) -> PathBuf { + let id_str = id.to_string(); + shard_dir_at(root, app_id, collection, &id_str).join(&id_str) +} + +/// Steps 2–6 of the atomic-write protocol. Returns the lowercase hex +/// SHA-256 of the bytes (computed in a single pass over the in-memory +/// buffer — the file is never re-read). Free function so the fs +/// mechanics are unit-testable without a Postgres pool. +fn write_atomic_at( + root: &Path, + app_id: AppId, + collection: &str, + id: Uuid, + bytes: &[u8], +) -> Result { + use std::io::Write as _; + + let id_str = id.to_string(); + let dir = shard_dir_at(root, app_id, collection, &id_str); + create_dir_all_secure(&dir)?; + + // Single-pass checksum over the in-memory buffer. + let mut hasher = Sha256::new(); + hasher.update(bytes); + let checksum = hex_lower(&hasher.finalize()); + + let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed); + let tmp = dir.join(format!("{id_str}.tmp.{}-{seq}", std::process::id())); + let final_path = dir.join(&id_str); + + { + let mut f = std::fs::File::create(&tmp).map_err(io_err)?; + f.write_all(bytes).map_err(io_err)?; + f.sync_all().map_err(io_err)?; // fsync temp + } + std::fs::rename(&tmp, &final_path).map_err(io_err)?; // atomic + // fsync the parent dir so the rename is durable. + if let Ok(dirf) = std::fs::File::open(&dir) { + let _ = dirf.sync_all(); + } + Ok(checksum) +} + +/// Read + checksum-verify the bytes at the given path-set. Free +/// function mirror of the `get` read path. Returns `Corrupted` when the +/// bytes are missing or don't match `expected_checksum`. +fn read_verify_at( + root: &Path, + app_id: AppId, + collection: &str, + id: Uuid, + expected_checksum: &str, +) -> Result, FilesRepoError> { + let path = final_path_at(root, app_id, collection, id); + let bytes = match std::fs::read(&path) { + Ok(b) => b, + Err(e) => { + tracing::error!( + path = %path.display(), error = %e, + "files: row exists but bytes are unreadable — treating as corrupted" + ); + return Err(FilesRepoError::Corrupted); + } + }; + let mut hasher = Sha256::new(); + hasher.update(&bytes); + let actual = hex_lower(&hasher.finalize()); + if actual != expected_checksum { + tracing::error!( + path = %path.display(), expected = %expected_checksum, actual = %actual, + "files: checksum mismatch on read — content corrupted" + ); + return Err(FilesRepoError::Corrupted); + } + Ok(bytes) +} + +#[async_trait] +impl FilesRepo for FsFilesRepo { + async fn create( + &self, + app_id: AppId, + collection: &str, + new: NewFile, + ) -> Result { + Self::guard_collection(collection)?; + let id = Uuid::new_v4(); + let size = i64::try_from(new.data.len()).unwrap_or(i64::MAX); + + let checksum = self.write_atomic(app_id, collection, id, &new.data)?; + + let row: FileRow = sqlx::query_as( + "INSERT INTO files \ + (app_id, collection, id, name, content_type, size_bytes, checksum_sha256) \ + VALUES ($1, $2, $3, $4, $5, $6, $7) \ + RETURNING id, collection, name, content_type, size_bytes, \ + checksum_sha256, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .bind(&new.name) + .bind(&new.content_type) + .bind(size) + .bind(&checksum) + .fetch_one(&self.pool) + .await?; + + Ok(row.into_meta()) + } + + async fn head( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError> { + let row: Option = sqlx::query_as( + "SELECT id, collection, name, content_type, size_bytes, \ + checksum_sha256, created_at, updated_at \ + FROM files WHERE app_id = $1 AND collection = $2 AND id = $3", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(FileRow::into_meta)) + } + + async fn get( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result>, FilesRepoError> { + let row: Option<(String,)> = sqlx::query_as( + "SELECT checksum_sha256 FROM files \ + WHERE app_id = $1 AND collection = $2 AND id = $3", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .fetch_optional(&self.pool) + .await?; + let Some((stored_checksum,)) = row else { + return Ok(None); + }; + let bytes = read_verify_at(&self.config.root, app_id, collection, id, &stored_checksum)?; + Ok(Some(bytes)) + } + + async fn update( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + upd: FileUpdate, + ) -> Result, FilesRepoError> { + Self::guard_collection(collection)?; + // Read the prior row first (existence check + CDC surface). + let Some(prev) = self.head(app_id, collection, id).await? else { + return Ok(None); + }; + + let size = i64::try_from(upd.data.len()).unwrap_or(i64::MAX); + let checksum = self.write_atomic(app_id, collection, id, &upd.data)?; + + let row: FileRow = sqlx::query_as( + "UPDATE files SET \ + name = COALESCE($4, name), \ + content_type = COALESCE($5, content_type), \ + size_bytes = $6, \ + checksum_sha256 = $7, \ + updated_at = NOW() \ + WHERE app_id = $1 AND collection = $2 AND id = $3 \ + RETURNING id, collection, name, content_type, size_bytes, \ + checksum_sha256, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .bind(upd.name.as_deref()) + .bind(upd.content_type.as_deref()) + .bind(size) + .bind(&checksum) + .fetch_one(&self.pool) + .await?; + + Ok(Some(FileUpdated { + new: row.into_meta(), + prev, + })) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError> { + // SELECT + DELETE in one tx; unlink afterwards (outside the tx). + let mut tx = self.pool.begin().await?; + let row: Option = sqlx::query_as( + "SELECT id, collection, name, content_type, size_bytes, \ + checksum_sha256, created_at, updated_at \ + FROM files WHERE app_id = $1 AND collection = $2 AND id = $3 \ + FOR UPDATE", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .fetch_optional(&mut *tx) + .await?; + + let Some(row) = row else { + tx.rollback().await?; + return Ok(None); + }; + + sqlx::query("DELETE FROM files WHERE app_id = $1 AND collection = $2 AND id = $3") + .bind(app_id.into_inner()) + .bind(collection) + .bind(id) + .execute(&mut *tx) + .await?; + tx.commit().await?; + + // Row is gone; unlink the bytes. A failure here leaves an orphan + // file (reclaimed by a future sweep) — not fatal. + let path = self.final_path(app_id, collection, id); + if let Err(e) = std::fs::remove_file(&path) { + if e.kind() != std::io::ErrorKind::NotFound { + tracing::warn!(path = %path.display(), error = %e, "files: unlink after delete failed (orphan)"); + } + } + Ok(Some(row.into_meta())) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let limit = if limit == 0 { + FILES_LIST_DEFAULT_LIMIT + } else { + limit.min(FILES_LIST_MAX_LIMIT) + }; + let last_id = match cursor { + Some(c) => Some(decode_cursor(c)?), + None => None, + }; + let take = i64::from(limit) + 1; + let rows: Vec = sqlx::query_as( + "SELECT id, collection, name, content_type, size_bytes, \ + checksum_sha256, created_at, updated_at \ + FROM files \ + WHERE app_id = $1 AND collection = $2 \ + AND ($3::uuid IS NULL OR id > $3) \ + ORDER BY id ASC \ + LIMIT $4", + ) + .bind(app_id.into_inner()) + .bind(collection) + .bind(last_id) + .bind(take) + .fetch_all(&self.pool) + .await?; + + let mut files: Vec = rows.into_iter().map(FileRow::into_meta).collect(); + let next_cursor = if files.len() > limit as usize { + files.truncate(limit as usize); + files.last().map(|m| encode_cursor(m.id)) + } else { + None + }; + Ok(FilesListPage { files, next_cursor }) + } +} + +// ---------------------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------------------- + +fn io_err(e: std::io::Error) -> FilesRepoError { + FilesRepoError::Io(e.to_string()) +} + +/// `create_dir_all` with `0o700` on the created tree (Unix). On other +/// platforms it falls back to the default permissions. +fn create_dir_all_secure(dir: &Path) -> Result<(), FilesRepoError> { + #[cfg(unix)] + { + use std::os::unix::fs::DirBuilderExt as _; + std::fs::DirBuilder::new() + .recursive(true) + .mode(0o700) + .create(dir) + .map_err(io_err) + } + #[cfg(not(unix))] + { + std::fs::create_dir_all(dir).map_err(io_err) + } +} + +fn hex_lower(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + use std::fmt::Write as _; + let _ = write!(s, "{b:02x}"); + } + s +} + +fn encode_cursor(last_id: Uuid) -> String { + URL_SAFE_NO_PAD.encode(last_id.to_string().as_bytes()) +} + +fn decode_cursor(cursor: &str) -> Result { + let bytes = URL_SAFE_NO_PAD + .decode(cursor) + .map_err(|_| FilesRepoError::InvalidCursor)?; + let s = String::from_utf8(bytes).map_err(|_| FilesRepoError::InvalidCursor)?; + Uuid::parse_str(&s).map_err(|_| FilesRepoError::InvalidCursor) +} + +#[derive(sqlx::FromRow)] +struct FileRow { + id: Uuid, + collection: String, + name: String, + content_type: String, + size_bytes: i64, + checksum_sha256: String, + created_at: DateTime, + updated_at: DateTime, +} + +impl FileRow { + fn into_meta(self) -> FileMeta { + FileMeta { + id: self.id, + collection: self.collection, + name: self.name, + content_type: self.content_type, + size: u64::try_from(self.size_bytes).unwrap_or(0), + checksum: self.checksum_sha256, + created_at: self.created_at, + updated_at: self.updated_at, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hex_lower_matches_known_sha256_vector() { + // SHA-256("abc") — NIST known-answer vector. + let mut h = Sha256::new(); + h.update(b"abc"); + assert_eq!( + hex_lower(&h.finalize()), + "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" + ); + } + + #[test] + fn hex_lower_of_empty_is_known_vector() { + let mut h = Sha256::new(); + h.update(b""); + assert_eq!( + hex_lower(&h.finalize()), + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ); + } + + #[test] + fn cursor_round_trips() { + let id = Uuid::new_v4(); + let enc = encode_cursor(id); + assert_eq!(decode_cursor(&enc).unwrap(), id); + assert!(matches!( + decode_cursor("!!not-base64!!"), + Err(FilesRepoError::InvalidCursor) + )); + } + + #[test] + fn guard_collection_rejects_traversal() { + assert!(FsFilesRepo::guard_collection("avatars").is_ok()); + assert!(FsFilesRepo::guard_collection("a/b").is_err()); + assert!(FsFilesRepo::guard_collection("..").is_err()); + assert!(FsFilesRepo::guard_collection("a..b").is_err()); + assert!(FsFilesRepo::guard_collection("").is_err()); + assert!(FsFilesRepo::guard_collection("a\0b").is_err()); + } + + #[test] + fn config_from_env_defaults_are_conservative() { + let c = FilesConfig::conservative(); + assert_eq!(c.max_file_size_bytes, DEFAULT_MAX_FILE_SIZE_BYTES); + assert_eq!(c.root, PathBuf::from(DEFAULT_FILES_ROOT)); + } + + // ------------------------------------------------------------------ + // Tempdir-backed filesystem mechanics — exercise the atomic write, + // single-pass checksum, and checksum-on-read tamper detection + // without needing a Postgres pool. + // ------------------------------------------------------------------ + + use picloud_shared::AppId; + + /// Process-unique scratch dir under the system temp dir. Cleaned up + /// by each test via `remove_dir_all`. + fn unique_tmp_root() -> PathBuf { + let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed); + let dir = + std::env::temp_dir().join(format!("picloud-files-test-{}-{seq}", std::process::id())); + std::fs::create_dir_all(&dir).unwrap(); + dir + } + + #[test] + fn write_atomic_then_read_verify_round_trips() { + let root = unique_tmp_root(); + let app = AppId::new(); + let id = Uuid::new_v4(); + let bytes = b"hello picloud files".to_vec(); + + let checksum = write_atomic_at(&root, app, "avatars", id, &bytes).unwrap(); + // Single-pass checksum matches an independent hash of the bytes. + let mut h = Sha256::new(); + h.update(&bytes); + assert_eq!(checksum, hex_lower(&h.finalize())); + + let read = read_verify_at(&root, app, "avatars", id, &checksum).unwrap(); + assert_eq!(read, bytes); + + std::fs::remove_dir_all(&root).ok(); + } + + #[test] + fn read_verify_detects_tampering_as_corrupted() { + let root = unique_tmp_root(); + let app = AppId::new(); + let id = Uuid::new_v4(); + let checksum = write_atomic_at(&root, app, "c", id, b"original").unwrap(); + + // Mutate the bytes behind the repo's back. + let path = final_path_at(&root, app, "c", id); + std::fs::write(&path, b"tampered").unwrap(); + + let err = read_verify_at(&root, app, "c", id, &checksum).unwrap_err(); + assert!(matches!(err, FilesRepoError::Corrupted)); + + std::fs::remove_dir_all(&root).ok(); + } + + #[test] + fn read_verify_missing_bytes_is_corrupted() { + let root = unique_tmp_root(); + let app = AppId::new(); + let id = Uuid::new_v4(); + // No write — the file never existed. + let err = read_verify_at(&root, app, "c", id, "deadbeef").unwrap_err(); + assert!(matches!(err, FilesRepoError::Corrupted)); + std::fs::remove_dir_all(&root).ok(); + } + + #[test] + fn atomic_write_leaves_no_tmp_file_after_success() { + let root = unique_tmp_root(); + let app = AppId::new(); + let id = Uuid::new_v4(); + write_atomic_at(&root, app, "c", id, b"data").unwrap(); + + let id_str = id.to_string(); + let dir = shard_dir_at(&root, app, "c", &id_str); + let entries: Vec<_> = std::fs::read_dir(&dir) + .unwrap() + .filter_map(Result::ok) + .map(|e| e.file_name().to_string_lossy().into_owned()) + .collect(); + // Exactly the final file is visible — no `*.tmp.*` orphan. + assert_eq!(entries, vec![id_str]); + assert!(!entries.iter().any(|n| n.contains(".tmp."))); + + std::fs::remove_dir_all(&root).ok(); + } + + #[test] + fn id_shard_uses_first_two_chars() { + let root = PathBuf::from("/tmp/x"); + let app = AppId::new(); + let id = Uuid::new_v4(); + let id_str = id.to_string(); + let path = final_path_at(&root, app, "col", id); + let shard = &id_str[..2]; + assert!(path + .to_string_lossy() + .contains(&format!("/col/{shard}/{id_str}"))); + } + + #[cfg(unix)] + #[test] + fn shard_tree_created_with_0700() { + use std::os::unix::fs::PermissionsExt as _; + let root = unique_tmp_root(); + let app = AppId::new(); + let id = Uuid::new_v4(); + write_atomic_at(&root, app, "c", id, b"data").unwrap(); + let id_str = id.to_string(); + let dir = shard_dir_at(&root, app, "c", &id_str); + let mode = std::fs::metadata(&dir).unwrap().permissions().mode(); + assert_eq!(mode & 0o777, 0o700, "shard dir should be 0o700"); + std::fs::remove_dir_all(&root).ok(); + } +} diff --git a/crates/manager-core/src/files_service.rs b/crates/manager-core/src/files_service.rs new file mode 100644 index 0000000..2d60da6 --- /dev/null +++ b/crates/manager-core/src/files_service.rs @@ -0,0 +1,817 @@ +//! `FilesServiceImpl` — wires the `FilesRepo` underneath the +//! `picloud_shared::FilesService` trait scripts see via the Rhai +//! bridge. +//! +//! Layers added here (vs the raw repo), mirroring `KvServiceImpl`: +//! 1. Collection validation (empty + path-traversal) and field / +//! size-cap validation at the SDK boundary. +//! 2. **Script-as-gate authz**: when `cx.principal.is_some()` we run +//! `authz::require(...)`; when it's `None` (public HTTP) we skip. +//! Cross-app isolation is unaffected — every repo call is keyed by +//! `cx.app_id`, never an argument. +//! 3. `ServiceEvent` emission after each mutation (`create` / +//! `update` / `delete`). The payload is the file **metadata**, not +//! the blob bytes (files are too big for trigger payloads). + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{ + validate_files_collection, FileMeta, FileUpdate, FilesError, FilesListPage, FilesService, + NewFile, SdkCallCx, ServiceEvent, ServiceEventEmitter, +}; +use uuid::Uuid; + +use crate::authz::{self, AuthzRepo, Capability}; +use crate::files_repo::{FileUpdated, FilesRepo, FilesRepoError}; + +pub struct FilesServiceImpl { + repo: Arc, + authz: Arc, + events: Arc, + max_file_size_bytes: usize, +} + +impl FilesServiceImpl { + #[must_use] + pub fn new( + repo: Arc, + authz: Arc, + events: Arc, + max_file_size_bytes: usize, + ) -> Self { + Self { + repo, + authz, + events, + max_file_size_bytes, + } + } + + async fn check_read(&self, cx: &SdkCallCx) -> Result<(), FilesError> { + if let Some(ref principal) = cx.principal { + authz::require(&*self.authz, principal, Capability::AppFilesRead(cx.app_id)) + .await + .map_err(|_| FilesError::Forbidden)?; + } + Ok(()) + } + + async fn check_write(&self, cx: &SdkCallCx) -> Result<(), FilesError> { + if let Some(ref principal) = cx.principal { + authz::require( + &*self.authz, + principal, + Capability::AppFilesWrite(cx.app_id), + ) + .await + .map_err(|_| FilesError::Forbidden)?; + } + Ok(()) + } + + /// Best-effort `ServiceEvent` emission. A failed emit is logged but + /// never rolls back the (already-durable) file write. + async fn emit( + &self, + cx: &SdkCallCx, + op: &'static str, + collection: &str, + meta: &FileMeta, + old: Option<&FileMeta>, + ) { + let payload = serde_json::to_value(meta).ok(); + let old_payload = old.and_then(|m| serde_json::to_value(m).ok()); + if let Err(e) = self + .events + .emit( + cx, + ServiceEvent { + source: "files", + op, + collection: Some(collection.to_string()), + key: Some(meta.id.to_string()), + payload, + old_payload, + }, + ) + .await + { + tracing::warn!(error = %e, source = "files", op, "event emit failed"); + } + } +} + +/// Parse a script-supplied id. Invalid UUIDs aren't an error shape the +/// SDK exposes — for reads/deletes they simply mean "no such file". +fn parse_id(id: &str) -> Option { + Uuid::parse_str(id).ok() +} + +impl From for FilesError { + fn from(e: FilesRepoError) -> Self { + match e { + FilesRepoError::Corrupted => Self::Corrupted, + FilesRepoError::InvalidCollection(c) => Self::InvalidCollection(c), + other => Self::Backend(other.to_string()), + } + } +} + +#[async_trait] +impl FilesService for FilesServiceImpl { + async fn create( + &self, + cx: &SdkCallCx, + collection: &str, + new: NewFile, + ) -> Result { + validate_files_collection(collection)?; + self.check_write(cx).await?; + new.validate(self.max_file_size_bytes)?; + let meta = self.repo.create(cx.app_id, collection, new).await?; + self.emit(cx, "create", collection, &meta, None).await; + Ok(meta.id) + } + + async fn head( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result, FilesError> { + validate_files_collection(collection)?; + self.check_read(cx).await?; + let Some(uuid) = parse_id(id) else { + return Ok(None); + }; + Ok(self.repo.head(cx.app_id, collection, uuid).await?) + } + + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result>, FilesError> { + validate_files_collection(collection)?; + self.check_read(cx).await?; + let Some(uuid) = parse_id(id) else { + return Ok(None); + }; + Ok(self.repo.get(cx.app_id, collection, uuid).await?) + } + + async fn update( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + upd: FileUpdate, + ) -> Result<(), FilesError> { + validate_files_collection(collection)?; + self.check_write(cx).await?; + upd.validate(self.max_file_size_bytes)?; + let Some(uuid) = parse_id(id) else { + return Err(FilesError::NotFound); + }; + match self.repo.update(cx.app_id, collection, uuid, upd).await? { + Some(FileUpdated { new, prev }) => { + self.emit(cx, "update", collection, &new, Some(&prev)).await; + Ok(()) + } + None => Err(FilesError::NotFound), + } + } + + async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result { + validate_files_collection(collection)?; + self.check_write(cx).await?; + let Some(uuid) = parse_id(id) else { + return Ok(false); + }; + match self.repo.delete(cx.app_id, collection, uuid).await? { + Some(meta) => { + // On delete, the top-level metadata AND `prev` both carry + // the deleted row (per docs/v1.1.x design + the brief). + self.emit(cx, "delete", collection, &meta, Some(&meta)) + .await; + Ok(true) + } + None => Ok(false), + } + } + + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + validate_files_collection(collection)?; + self.check_read(cx).await?; + Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?) + } +} + +// ---------------------------------------------------------------------------- +// Tests — in-memory FilesRepo so unit tests need neither Postgres nor a +// filesystem. The on-disk atomic-write / checksum mechanics are covered +// by the tempdir tests in `files_repo.rs`. +// ---------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::authz::{AuthzError, AuthzRepo}; + use async_trait::async_trait; + use chrono::Utc; + use picloud_shared::{ + AdminUserId, AppId, AppRole, EmitError, ExecutionId, InstanceRole, Principal, RequestId, + ScriptId, ServiceEvent, UserId, + }; + use std::collections::BTreeMap; + use std::sync::Mutex as StdMutex; + use tokio::sync::Mutex; + + /// In-memory FilesRepo keyed by (app, collection, id). Stores the + /// metadata + bytes together so cross-app isolation and round-trips + /// can be checked without disk. + #[derive(Default)] + struct InMemoryFilesRepo { + #[allow(clippy::type_complexity)] + data: Mutex)>>, + } + + fn sha256_hex(bytes: &[u8]) -> String { + use sha2::{Digest, Sha256}; + let mut h = Sha256::new(); + h.update(bytes); + let out = h.finalize(); + let mut s = String::new(); + for b in out { + use std::fmt::Write as _; + let _ = write!(s, "{b:02x}"); + } + s + } + + #[async_trait] + impl FilesRepo for InMemoryFilesRepo { + async fn create( + &self, + app_id: AppId, + collection: &str, + new: NewFile, + ) -> Result { + let id = Uuid::new_v4(); + let now = Utc::now(); + let meta = FileMeta { + id, + collection: collection.to_string(), + name: new.name.clone(), + content_type: new.content_type.clone(), + size: new.data.len() as u64, + checksum: sha256_hex(&new.data), + created_at: now, + updated_at: now, + }; + self.data.lock().await.insert( + (app_id, collection.to_string(), id), + (meta.clone(), new.data), + ); + Ok(meta) + } + + async fn head( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError> { + Ok(self + .data + .lock() + .await + .get(&(app_id, collection.to_string(), id)) + .map(|(m, _)| m.clone())) + } + + async fn get( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result>, FilesRepoError> { + Ok(self + .data + .lock() + .await + .get(&(app_id, collection.to_string(), id)) + .map(|(_, b)| b.clone())) + } + + async fn update( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + upd: FileUpdate, + ) -> Result, FilesRepoError> { + let mut data = self.data.lock().await; + let key = (app_id, collection.to_string(), id); + let Some((prev_meta, _)) = data.get(&key).cloned() else { + return Ok(None); + }; + let now = Utc::now(); + let new_meta = FileMeta { + id, + collection: collection.to_string(), + name: upd.name.clone().unwrap_or_else(|| prev_meta.name.clone()), + content_type: upd + .content_type + .clone() + .unwrap_or_else(|| prev_meta.content_type.clone()), + size: upd.data.len() as u64, + checksum: sha256_hex(&upd.data), + created_at: prev_meta.created_at, + updated_at: now, + }; + data.insert(key, (new_meta.clone(), upd.data)); + Ok(Some(FileUpdated { + new: new_meta, + prev: prev_meta, + })) + } + + async fn delete( + &self, + app_id: AppId, + collection: &str, + id: Uuid, + ) -> Result, FilesRepoError> { + Ok(self + .data + .lock() + .await + .remove(&(app_id, collection.to_string(), id)) + .map(|(m, _)| m)) + } + + async fn list( + &self, + app_id: AppId, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result { + let data = self.data.lock().await; + let after = cursor.and_then(|c| Uuid::parse_str(c).ok()); + let mut metas: Vec = data + .iter() + .filter(|((a, c, _), _)| *a == app_id && c == collection) + .map(|(_, (m, _))| m.clone()) + .filter(|m| after.is_none_or(|a| m.id > a)) + .collect(); + metas.sort_by_key(|m| m.id); + let take = (limit.max(1)) as usize; + let next_cursor = if metas.len() > take { + metas.truncate(take); + metas.last().map(|m| m.id.to_string()) + } else { + None + }; + Ok(FilesListPage { + files: metas, + next_cursor, + }) + } + } + + /// Captures emitted events so tests can assert on fan-out shape. + #[derive(Default)] + struct CapturingEmitter { + events: StdMutex>, + } + + #[async_trait] + impl ServiceEventEmitter for CapturingEmitter { + async fn emit(&self, _cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> { + self.events.lock().unwrap().push(event); + Ok(()) + } + } + + #[derive(Default)] + struct DenyingAuthzRepo; + #[async_trait] + impl AuthzRepo for DenyingAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(None) + } + } + + #[derive(Default)] + struct EditorAuthzRepo; + #[async_trait] + impl AuthzRepo for EditorAuthzRepo { + async fn membership( + &self, + _user_id: UserId, + _app_id: AppId, + ) -> Result, AuthzError> { + Ok(Some(AppRole::Editor)) + } + } + + fn anon_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + app_id, + script_id: ScriptId::new(), + principal: None, + execution_id: ExecutionId::new(), + request_id: RequestId::new(), + trigger_depth: 0, + root_execution_id: ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } + } + + fn member_cx(app_id: AppId) -> SdkCallCx { + SdkCallCx { + principal: Some(Principal { + user_id: AdminUserId::new(), + instance_role: InstanceRole::Member, + scopes: None, + app_binding: None, + }), + ..anon_cx(app_id) + } + } + + fn svc_with(authz: Arc, emitter: Arc) -> FilesServiceImpl { + FilesServiceImpl::new( + Arc::new(InMemoryFilesRepo::default()), + authz, + emitter, + 10 * 1024 * 1024, + ) + } + + fn svc() -> FilesServiceImpl { + svc_with( + Arc::new(DenyingAuthzRepo), + Arc::new(CapturingEmitter::default()), + ) + } + + fn new_file(name: &str, data: &[u8]) -> NewFile { + NewFile { + name: name.to_string(), + content_type: "application/octet-stream".to_string(), + data: data.to_vec(), + } + } + + #[tokio::test] + async fn create_then_get_head_round_trips() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let id = files + .create(&cx, "avatars", new_file("a.bin", b"hello")) + .await + .unwrap(); + let bytes = files.get(&cx, "avatars", &id.to_string()).await.unwrap(); + assert_eq!(bytes, Some(b"hello".to_vec())); + let meta = files + .head(&cx, "avatars", &id.to_string()) + .await + .unwrap() + .unwrap(); + assert_eq!(meta.name, "a.bin"); + assert_eq!(meta.size, 5); + assert_eq!(meta.checksum, sha256_hex(b"hello")); + } + + #[tokio::test] + async fn get_and_head_missing_return_none() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let missing = Uuid::new_v4().to_string(); + assert_eq!(files.get(&cx, "c", &missing).await.unwrap(), None); + assert!(files.head(&cx, "c", &missing).await.unwrap().is_none()); + // Non-UUID id is also "missing", not an error. + assert_eq!(files.get(&cx, "c", "not-a-uuid").await.unwrap(), None); + } + + #[tokio::test] + async fn update_replaces_content_and_keeps_metadata_when_omitted() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let id = files + .create(&cx, "c", new_file("v1.txt", b"one")) + .await + .unwrap(); + files + .update( + &cx, + "c", + &id.to_string(), + FileUpdate { + data: b"two!!".to_vec(), + name: None, + content_type: None, + }, + ) + .await + .unwrap(); + let meta = files + .head(&cx, "c", &id.to_string()) + .await + .unwrap() + .unwrap(); + assert_eq!(meta.name, "v1.txt"); // kept + assert_eq!(meta.size, 5); + assert_eq!( + files.get(&cx, "c", &id.to_string()).await.unwrap(), + Some(b"two!!".to_vec()) + ); + } + + #[tokio::test] + async fn update_missing_throws_not_found() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let err = files + .update( + &cx, + "c", + &Uuid::new_v4().to_string(), + FileUpdate { + data: b"x".to_vec(), + name: None, + content_type: None, + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::NotFound)); + } + + #[tokio::test] + async fn delete_returns_was_present() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let id = files.create(&cx, "c", new_file("f", b"x")).await.unwrap(); + assert!(files.delete(&cx, "c", &id.to_string()).await.unwrap()); + assert!(!files.delete(&cx, "c", &id.to_string()).await.unwrap()); + assert!(!files.delete(&cx, "c", "not-a-uuid").await.unwrap()); + } + + #[tokio::test] + async fn empty_collection_rejected() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let err = files + .create(&cx, "", new_file("f", b"x")) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::InvalidCollection(_))); + } + + #[tokio::test] + async fn traversal_collection_rejected() { + let files = svc(); + let cx = anon_cx(AppId::new()); + for bad in ["../etc", "a/b", "a..b", "x\0y"] { + let err = files + .create(&cx, bad, new_file("f", b"x")) + .await + .unwrap_err(); + assert!( + matches!(err, FilesError::InvalidCollection(_)), + "expected reject for {bad:?}" + ); + } + } + + #[tokio::test] + async fn missing_required_fields_have_field_specific_messages() { + let files = svc(); + let cx = anon_cx(AppId::new()); + // name + let err = files + .create( + &cx, + "c", + NewFile { + name: " ".into(), + content_type: "text/plain".into(), + data: b"x".to_vec(), + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::MissingField("name"))); + // content_type + let err = files + .create( + &cx, + "c", + NewFile { + name: "f".into(), + content_type: String::new(), + data: b"x".to_vec(), + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::MissingField("content_type"))); + // data + let err = files + .create( + &cx, + "c", + NewFile { + name: "f".into(), + content_type: "text/plain".into(), + data: vec![], + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::MissingField("data"))); + } + + #[tokio::test] + async fn name_and_content_type_length_caps_enforced() { + let files = svc(); + let cx = anon_cx(AppId::new()); + let long_name = "x".repeat(256); + let err = files + .create(&cx, "c", new_file(&long_name, b"x")) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::NameTooLong(256))); + + let err = files + .create( + &cx, + "c", + NewFile { + name: "f".into(), + content_type: "x".repeat(128), + data: b"x".to_vec(), + }, + ) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::ContentTypeTooLong(128))); + } + + #[tokio::test] + async fn per_file_size_cap_enforced() { + let files = FilesServiceImpl::new( + Arc::new(InMemoryFilesRepo::default()), + Arc::new(DenyingAuthzRepo), + Arc::new(CapturingEmitter::default()), + 8, // tiny cap + ); + let cx = anon_cx(AppId::new()); + let err = files + .create(&cx, "c", new_file("big", b"123456789")) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::TooLarge { limit: 8, .. })); + } + + #[tokio::test] + async fn cross_app_isolation() { + let files = svc(); + let app_a = AppId::new(); + let app_b = AppId::new(); + let cx_a = anon_cx(app_a); + let cx_b = anon_cx(app_b); + let id = files + .create(&cx_a, "shared", new_file("f", b"from-a")) + .await + .unwrap(); + // app B cannot see app A's file by id. + assert_eq!( + files.get(&cx_b, "shared", &id.to_string()).await.unwrap(), + None + ); + assert!(files + .head(&cx_b, "shared", &id.to_string()) + .await + .unwrap() + .is_none()); + let page_b = files.list(&cx_b, "shared", None, 100).await.unwrap(); + assert!(page_b.files.is_empty()); + // app A still sees it. + assert!(files + .get(&cx_a, "shared", &id.to_string()) + .await + .unwrap() + .is_some()); + } + + #[tokio::test] + async fn anonymous_cx_skips_authz() { + let files = svc(); // DenyingAuthzRepo + let cx = anon_cx(AppId::new()); + // No principal → no authz check, even with a denying repo. + files.create(&cx, "c", new_file("f", b"x")).await.unwrap(); + } + + #[tokio::test] + async fn member_without_role_is_forbidden() { + let files = svc(); // DenyingAuthzRepo + let cx = member_cx(AppId::new()); + let err = files + .create(&cx, "c", new_file("f", b"x")) + .await + .unwrap_err(); + assert!(matches!(err, FilesError::Forbidden)); + } + + #[tokio::test] + async fn member_with_editor_role_allowed() { + let files = svc_with( + Arc::new(EditorAuthzRepo), + Arc::new(CapturingEmitter::default()), + ); + let cx = member_cx(AppId::new()); + files.create(&cx, "c", new_file("f", b"x")).await.unwrap(); + } + + #[tokio::test] + async fn mutations_emit_events_with_correct_prev() { + let emitter = Arc::new(CapturingEmitter::default()); + let files = svc_with(Arc::new(DenyingAuthzRepo), emitter.clone()); + let cx = anon_cx(AppId::new()); + + let id = files.create(&cx, "c", new_file("f", b"one")).await.unwrap(); + files + .update( + &cx, + "c", + &id.to_string(), + FileUpdate { + data: b"two".to_vec(), + name: None, + content_type: None, + }, + ) + .await + .unwrap(); + files.delete(&cx, "c", &id.to_string()).await.unwrap(); + + let events = emitter.events.lock().unwrap(); + assert_eq!(events.len(), 3); + // create: prev is None + assert_eq!(events[0].op, "create"); + assert_eq!(events[0].source, "files"); + assert!(events[0].old_payload.is_none()); + assert!(events[0].payload.is_some()); + // update: prev is the prior metadata + assert_eq!(events[1].op, "update"); + assert!(events[1].old_payload.is_some()); + // delete: prev is the deleted metadata (payload == old_payload) + assert_eq!(events[2].op, "delete"); + assert_eq!(events[2].payload, events[2].old_payload); + assert!(events[2].payload.is_some()); + } + + #[tokio::test] + async fn list_cursor_paginates() { + let files = svc(); + let cx = anon_cx(AppId::new()); + for i in 0..5 { + files + .create(&cx, "c", new_file(&format!("f{i}"), b"x")) + .await + .unwrap(); + } + let p1 = files.list(&cx, "c", None, 2).await.unwrap(); + assert_eq!(p1.files.len(), 2); + assert!(p1.next_cursor.is_some()); + let p2 = files + .list(&cx, "c", p1.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p2.files.len(), 2); + let p3 = files + .list(&cx, "c", p2.next_cursor.as_deref(), 2) + .await + .unwrap(); + assert_eq!(p3.files.len(), 1); + assert!(p3.next_cursor.is_none()); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index ba059a1..94e5399 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -30,6 +30,9 @@ pub mod dispatcher; pub mod docs_filter; pub mod docs_repo; pub mod docs_service; +pub mod files_api; +pub mod files_repo; +pub mod files_service; pub mod gc; pub mod http_service; pub mod kv_repo; @@ -96,6 +99,9 @@ pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLetters pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo}; pub use docs_service::DocsServiceImpl; +pub use files_api::{files_admin_router, FilesAdminState}; +pub use files_repo::{FilesConfig, FilesRepo, FilesRepoError, FsFilesRepo}; +pub use files_service::FilesServiceImpl; pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc}; pub use http_service::{HttpConfig, HttpServiceImpl}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; @@ -116,8 +122,9 @@ pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository}; pub use sandbox::{CeilingError, SandboxCeiling}; pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_repo::{ - collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, - DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, - TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError, + collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, + CreateKvTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, + PostgresTriggerRepo, Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, + TriggerRepoError, }; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState}; diff --git a/crates/manager-core/src/outbox_event_emitter.rs b/crates/manager-core/src/outbox_event_emitter.rs index 406e1c3..a5c705c 100644 --- a/crates/manager-core/src/outbox_event_emitter.rs +++ b/crates/manager-core/src/outbox_event_emitter.rs @@ -19,7 +19,8 @@ use std::sync::Arc; use async_trait::async_trait; use picloud_shared::{ - DocsEventOp, EmitError, KvEventOp, SdkCallCx, ServiceEvent, ServiceEventEmitter, TriggerEvent, + DocsEventOp, EmitError, FileMeta, FilesEventOp, KvEventOp, SdkCallCx, ServiceEvent, + ServiceEventEmitter, TriggerEvent, }; use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind}; @@ -43,6 +44,7 @@ impl ServiceEventEmitter for OutboxEventEmitter { match event.source { "kv" => self.emit_kv(cx, event).await, "docs" => self.emit_docs(cx, event).await, + "files" => self.emit_files(cx, event).await, // Future sources land here. For now, silently drop — the // SDK calls `events.emit(...)` unconditionally for forward // compat, so swallowing without an error is correct. @@ -154,4 +156,68 @@ impl OutboxEventEmitter { } Ok(()) } + + /// v1.1.5. Fan out a files mutation across matching files triggers. + /// The `ServiceEvent.payload` is the file **metadata** (never the + /// blob bytes); `old_payload` is the prior metadata (the deleted + /// row's metadata on delete). The `TriggerEvent::Files` carries the + /// metadata fields explicitly + `prev` for the change-data-capture + /// surface. + async fn emit_files(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> { + let Some(op) = FilesEventOp::from_wire(event.op) else { + return Ok(()); + }; + let Some(collection) = event.collection.clone() else { + return Ok(()); + }; + // The payload is the FileMeta JSON the FilesServiceImpl emitted. + let Some(meta) = event + .payload + .clone() + .and_then(|v| serde_json::from_value::(v).ok()) + else { + return Ok(()); + }; + + let matches = self + .triggers + .list_matching_files(cx.app_id, &collection, op) + .await + .map_err(|e| EmitError::Unavailable(format!("trigger lookup: {e}")))?; + + if matches.is_empty() { + return Ok(()); + } + + let trigger_event = TriggerEvent::Files { + op, + collection, + id: meta.id.to_string(), + name: meta.name, + content_type: meta.content_type, + size: meta.size, + checksum: meta.checksum, + prev: event.old_payload.clone(), + }; + let payload = serde_json::to_value(&trigger_event) + .map_err(|e| EmitError::Rejected(format!("event serialize: {e}")))?; + + for m in matches { + self.outbox + .insert(NewOutboxRow { + app_id: cx.app_id, + source_kind: OutboxSourceKind::Files, + trigger_id: Some(m.trigger_id), + script_id: Some(m.script_id), + reply_to: None, + payload: payload.clone(), + origin_principal: cx.principal.as_ref().map(|p| p.user_id), + trigger_depth: cx.trigger_depth.saturating_add(1), + root_execution_id: Some(cx.root_execution_id), + }) + .await + .map_err(|e| EmitError::Unavailable(format!("outbox insert: {e}")))?; + } + Ok(()) + } } diff --git a/crates/manager-core/src/outbox_repo.rs b/crates/manager-core/src/outbox_repo.rs index 4bb41fa..10a9c9d 100644 --- a/crates/manager-core/src/outbox_repo.rs +++ b/crates/manager-core/src/outbox_repo.rs @@ -27,6 +27,8 @@ pub enum OutboxSourceKind { DeadLetter, /// v1.1.4. Cron, + /// v1.1.5. + Files, } impl OutboxSourceKind { @@ -38,6 +40,7 @@ impl OutboxSourceKind { Self::Docs => "docs", Self::DeadLetter => "dead_letter", Self::Cron => "cron", + Self::Files => "files", } } @@ -49,6 +52,7 @@ impl OutboxSourceKind { "docs" => Some(Self::Docs), "dead_letter" => Some(Self::DeadLetter), "cron" => Some(Self::Cron), + "files" => Some(Self::Files), _ => None, } } diff --git a/crates/manager-core/src/trigger_repo.rs b/crates/manager-core/src/trigger_repo.rs index 1f5111b..9476e55 100644 --- a/crates/manager-core/src/trigger_repo.rs +++ b/crates/manager-core/src/trigger_repo.rs @@ -5,7 +5,9 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use picloud_shared::{AdminUserId, AppId, DocsEventOp, KvEventOp, ScriptId, TriggerId}; +use picloud_shared::{ + AdminUserId, AppId, DocsEventOp, FilesEventOp, KvEventOp, ScriptId, TriggerId, +}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; @@ -51,6 +53,8 @@ pub enum TriggerKind { DeadLetter, /// v1.1.4. Cron, + /// v1.1.5. + Files, } impl TriggerKind { @@ -61,6 +65,7 @@ impl TriggerKind { Self::Docs => "docs", Self::DeadLetter => "dead_letter", Self::Cron => "cron", + Self::Files => "files", } } @@ -71,6 +76,7 @@ impl TriggerKind { "docs" => Some(Self::Docs), "dead_letter" => Some(Self::DeadLetter), "cron" => Some(Self::Cron), + "files" => Some(Self::Files), _ => None, } } @@ -120,6 +126,11 @@ pub enum TriggerDetails { #[serde(default, skip_serializing_if = "Option::is_none")] last_fired_at: Option>, }, + /// v1.1.5. Same shape as KV/docs: a collection glob + op subset. + Files { + collection_glob: String, + ops: Vec, + }, } /// Create payload for a KV trigger. Defaults applied at the admin @@ -175,6 +186,33 @@ pub struct CreateCronTrigger { pub registered_by_principal: AdminUserId, } +/// Create payload for a files trigger (v1.1.5). Same shape as KV with +/// `FilesEventOp` ops. +#[derive(Debug, Clone)] +pub struct CreateFilesTrigger { + pub script_id: ScriptId, + pub collection_glob: String, + pub ops: Vec, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, +} + +/// One match for the dispatcher's files trigger fan-out lookup +/// (v1.1.5). Same shape as `KvTriggerMatch`. +#[derive(Debug, Clone)] +pub struct FilesTriggerMatch { + pub trigger_id: TriggerId, + pub script_id: ScriptId, + pub dispatch_mode: TriggerDispatchMode, + pub retry_max_attempts: u32, + pub retry_backoff: BackoffShape, + pub retry_base_ms: u32, + pub registered_by_principal: AdminUserId, +} + /// One match for the dispatcher's "which KV triggers fire on this /// event" lookup. Carries everything the dispatcher needs to construct /// the outbox row. @@ -242,6 +280,13 @@ pub trait TriggerRepo: Send + Sync { req: CreateCronTrigger, ) -> Result; + /// v1.1.5. + async fn create_files_trigger( + &self, + app_id: AppId, + req: CreateFilesTrigger, + ) -> Result; + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError>; async fn get(&self, id: TriggerId) -> Result, TriggerRepoError>; @@ -269,6 +314,16 @@ pub trait TriggerRepo: Send + Sync { op: DocsEventOp, ) -> Result, TriggerRepoError>; + /// Dispatcher hot path for files fan-out (v1.1.5). Mirrors the KV + /// fan-out logic: pull every enabled files trigger, filter glob + + /// ops in Rust (empty ops array means "any op"). + async fn list_matching_files( + &self, + app_id: AppId, + collection: &str, + op: FilesEventOp, + ) -> Result, TriggerRepoError>; + /// Dispatcher hot path for dead-letter fan-out. Filters: source /// (or any-source), originating trigger_id (or any), originating /// script_id (or any). Each filter is "match OR is_null". @@ -555,6 +610,71 @@ impl TriggerRepo for PostgresTriggerRepo { }) } + async fn create_files_trigger( + &self, + app_id: AppId, + req: CreateFilesTrigger, + ) -> Result { + if req.collection_glob.is_empty() { + return Err(TriggerRepoError::Invalid( + "collection_glob must not be empty".into(), + )); + } + let mut tx = self.pool.begin().await?; + let parent: TriggerRow = sqlx::query_as( + "INSERT INTO triggers ( \ + app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal \ + ) VALUES ($1, $2, 'files', TRUE, $3, $4, $5, $6, $7) \ + RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(req.script_id.into_inner()) + .bind(req.dispatch_mode.as_str()) + .bind(i32::try_from(req.retry_max_attempts).unwrap_or(3)) + .bind(req.retry_backoff.as_str()) + .bind(i32::try_from(req.retry_base_ms).unwrap_or(1000)) + .bind(req.registered_by_principal.into_inner()) + .fetch_one(&mut *tx) + .await?; + + let ops_str: Vec = req.ops.iter().map(|o| o.as_str().to_string()).collect(); + sqlx::query( + "INSERT INTO files_trigger_details (trigger_id, collection_glob, ops) \ + VALUES ($1, $2, $3)", + ) + .bind(parent.id) + .bind(&req.collection_glob) + .bind(&ops_str) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind: TriggerKind::Files, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details: TriggerDetails::Files { + collection_glob: req.collection_glob, + ops: req.ops, + }, + }) + } + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { let parents: Vec = sqlx::query_as( "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ @@ -693,6 +813,51 @@ impl TriggerRepo for PostgresTriggerRepo { Ok(out) } + async fn list_matching_files( + &self, + app_id: AppId, + collection: &str, + op: FilesEventOp, + ) -> Result, TriggerRepoError> { + // Mirrors list_matching_kv: pull every enabled files trigger, + // filter glob + ops in Rust (empty ops array means "any op"). + let rows: Vec = sqlx::query_as( + "SELECT t.id, t.script_id, t.dispatch_mode, \ + t.retry_max_attempts, t.retry_backoff, t.retry_base_ms, \ + t.registered_by_principal, \ + d.collection_glob, d.ops \ + FROM triggers t \ + JOIN files_trigger_details d ON d.trigger_id = t.id \ + WHERE t.app_id = $1 AND t.kind = 'files' AND t.enabled = TRUE", + ) + .bind(app_id.into_inner()) + .fetch_all(&self.pool) + .await?; + + let op_str = op.as_str(); + let mut out = Vec::new(); + for r in rows { + if !collection_matches(&r.collection_glob, collection) { + continue; + } + let any_op = r.ops.is_empty(); + if !any_op && !r.ops.iter().any(|o| o == op_str) { + continue; + } + out.push(FilesTriggerMatch { + trigger_id: r.id.into(), + script_id: r.script_id.into(), + dispatch_mode: dispatch_from_str(&r.dispatch_mode), + retry_max_attempts: u32::try_from(r.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&r.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(r.retry_base_ms).unwrap_or(1000), + registered_by_principal: r.registered_by_principal.into(), + }); + } + Ok(out) + } + async fn list_matching_dead_letter( &self, app_id: AppId, @@ -729,6 +894,7 @@ impl TriggerRepo for PostgresTriggerRepo { } } +#[allow(clippy::too_many_lines)] async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { let kind = TriggerKind::from_wire(&parent.kind).ok_or_else(|| { TriggerRepoError::Invalid(format!("unknown trigger kind {}", parent.kind)) @@ -797,6 +963,23 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { + let row: KvDetailRow = sqlx::query_as( + "SELECT collection_glob, ops FROM files_trigger_details WHERE trigger_id = $1", + ) + .bind(parent.id) + .fetch_one(pool) + .await?; + let ops = row + .ops + .iter() + .filter_map(|s| FilesEventOp::from_wire(s)) + .collect(); + TriggerDetails::Files { + collection_glob: row.collection_glob, + ops, + } + } }; Ok(Trigger { diff --git a/crates/manager-core/src/triggers_api.rs b/crates/manager-core/src/triggers_api.rs index a3ff8d1..fffe695 100644 --- a/crates/manager-core/src/triggers_api.rs +++ b/crates/manager-core/src/triggers_api.rs @@ -16,7 +16,9 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Json, Response}; use axum::routing::{delete, get, post}; use axum::{Extension, Router}; -use picloud_shared::{AppId, DocsEventOp, KvEventOp, Principal, ScriptId, ScriptKind, TriggerId}; +use picloud_shared::{ + AppId, DocsEventOp, FilesEventOp, KvEventOp, Principal, ScriptId, ScriptKind, TriggerId, +}; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -25,8 +27,8 @@ use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability}; use crate::repo::{ScriptRepository, ScriptRepositoryError}; use crate::trigger_config::{BackoffShape, TriggerConfig}; use crate::trigger_repo::{ - CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, Trigger, - TriggerDispatchMode, TriggerRepo, TriggerRepoError, + CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, + CreateKvTrigger, Trigger, TriggerDispatchMode, TriggerRepo, TriggerRepoError, }; #[derive(Clone)] @@ -54,6 +56,7 @@ pub fn triggers_router(state: TriggersState) -> Router { .route("/apps/{app_id}/triggers/kv", post(create_kv_trigger)) .route("/apps/{app_id}/triggers/docs", post(create_docs_trigger)) .route("/apps/{app_id}/triggers/cron", post(create_cron_trigger)) + .route("/apps/{app_id}/triggers/files", post(create_files_trigger)) .route( "/apps/{app_id}/triggers/dead_letter", post(create_dl_trigger), @@ -139,6 +142,24 @@ fn default_timezone() -> String { "UTC".to_string() } +/// v1.1.5 files trigger. Mirrors `CreateKvTriggerRequest`; `ops` uses +/// `FilesEventOp` (`create` / `update` / `delete`). +#[derive(Debug, Deserialize)] +pub struct CreateFilesTriggerRequest { + pub script_id: ScriptId, + pub collection_glob: String, + #[serde(default)] + pub ops: Vec, + #[serde(default = "default_dispatch")] + pub dispatch_mode: TriggerDispatchMode, + #[serde(default)] + pub retry_max_attempts: Option, + #[serde(default)] + pub retry_backoff: Option, + #[serde(default)] + pub retry_base_ms: Option, +} + #[derive(Debug, Deserialize)] pub struct CreateDeadLetterTriggerRequest { pub script_id: ScriptId, @@ -328,6 +349,43 @@ async fn create_cron_trigger( Ok((StatusCode::CREATED, Json(created))) } +async fn create_files_trigger( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, + Json(input): Json, +) -> Result<(StatusCode, Json), TriggersApiError> { + ensure_app_exists(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppManageTriggers(app_id), + ) + .await?; + + if input.collection_glob.trim().is_empty() { + return Err(TriggersApiError::Invalid( + "collection_glob must not be empty".into(), + )); + } + validate_trigger_target(&*s.scripts, app_id, input.script_id).await?; + + let req = CreateFilesTrigger { + script_id: input.script_id, + collection_glob: input.collection_glob, + ops: input.ops, + dispatch_mode: input.dispatch_mode, + retry_max_attempts: input + .retry_max_attempts + .unwrap_or(s.config.retry_max_attempts), + retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff), + retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms), + registered_by_principal: principal.user_id, + }; + let created = s.triggers.create_files_trigger(app_id, req).await?; + Ok((StatusCode::CREATED, Json(created))) +} + async fn create_dl_trigger( State(s): State, Extension(principal): Extension, @@ -484,13 +542,14 @@ mod tests { use super::*; use crate::app_repo::{AppLookup, AppRepository}; use crate::trigger_repo::{ - CreateCronTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, Trigger, - TriggerDetails, TriggerRepo, TriggerRepoError, + CreateCronTrigger, CreateFilesTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, + FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo, TriggerRepoError, }; use async_trait::async_trait; use chrono::Utc; use picloud_shared::{ - AdminUserId, App, AppRole, DocsEventOp, KvEventOp, ScriptId, TriggerId, UserId, + AdminUserId, App, AppRole, DocsEventOp, FilesEventOp, KvEventOp, ScriptId, TriggerId, + UserId, }; use std::collections::HashMap; use tokio::sync::Mutex; @@ -616,6 +675,34 @@ mod tests { self.inner.lock().await.insert(id, trigger.clone()); Ok(trigger) } + async fn create_files_trigger( + &self, + app_id: AppId, + req: CreateFilesTrigger, + ) -> Result { + let now = Utc::now(); + let id = TriggerId::new(); + let trigger = Trigger { + id, + app_id, + script_id: req.script_id, + kind: crate::trigger_repo::TriggerKind::Files, + enabled: true, + dispatch_mode: req.dispatch_mode, + retry_max_attempts: req.retry_max_attempts, + retry_backoff: req.retry_backoff, + retry_base_ms: req.retry_base_ms, + registered_by_principal: req.registered_by_principal, + created_at: now, + updated_at: now, + details: TriggerDetails::Files { + collection_glob: req.collection_glob, + ops: req.ops, + }, + }; + self.inner.lock().await.insert(id, trigger.clone()); + Ok(trigger) + } async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { Ok(self .inner @@ -648,6 +735,14 @@ mod tests { ) -> Result, TriggerRepoError> { Ok(vec![]) } + async fn list_matching_files( + &self, + _app_id: AppId, + _collection: &str, + _op: FilesEventOp, + ) -> Result, TriggerRepoError> { + Ok(vec![]) + } async fn list_matching_dead_letter( &self, _app_id: AppId, diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index f60e4de..55197e6 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -11,12 +11,13 @@ use axum::{routing::get, Json, Router}; use picloud_executor_core::{Engine, Limits}; use picloud_manager_core::{ admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, - attach_principal_if_present, auth_router, compile_routes, dead_letters_router, migrations, - require_authenticated, route_admin_router, triggers_router, AbandonedRepo, - AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, - ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, - AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher, - DocsServiceImpl, HttpConfig, HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo, + attach_principal_if_present, auth_router, compile_routes, dead_letters_router, + files_admin_router, migrations, require_authenticated, route_admin_router, triggers_router, + AbandonedRepo, AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, + AdminsState, ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, + AppMembersState, AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, + DeadLettersState, Dispatcher, DocsServiceImpl, FilesAdminState, FilesConfig, FilesServiceImpl, + FsFilesRepo, HttpConfig, HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo, PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo, @@ -31,9 +32,9 @@ use picloud_orchestrator_core::{ LocalExecutorClient, }; use picloud_shared::{ - DeadLetterService, DocsService, ExecutionLogSink, HttpService, InboxResolver, KvService, - OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, - SDK_VERSION, WIRE_VERSION, + DeadLetterService, DocsService, ExecutionLogSink, FilesService, HttpService, InboxResolver, + KvService, OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, + PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -157,7 +158,18 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { ); } let http: Arc = Arc::new(HttpServiceImpl::new(http_config, authz.clone())); - let services = Services::new(kv, docs, dl_service.clone(), events, modules, http); + // v1.1.5 filesystem-backed blob storage. Metadata lives in Postgres; + // the bytes live on disk under `PICLOUD_FILES_ROOT` (default ./data). + let files_config = FilesConfig::from_env(); + let files_max_size = files_config.max_file_size_bytes; + let files_repo = Arc::new(FsFilesRepo::new(pool.clone(), files_config)); + let files: Arc = Arc::new(FilesServiceImpl::new( + files_repo.clone(), + authz.clone(), + events.clone(), + files_max_size, + )); + let services = Services::new(kv, docs, dl_service.clone(), events, modules, http, files); let engine = Arc::new(Engine::new(Limits::default(), services)); // Compile the routes table once at startup; admin writes refresh it. @@ -270,6 +282,11 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { apps: apps_repo.clone(), authz: authz.clone(), }; + let files_admin_state = FilesAdminState { + files: files_repo, + apps: apps_repo.clone(), + authz: authz.clone(), + }; let apps_state = AppsState { apps: apps_repo, domains: domains_repo, @@ -312,6 +329,7 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { .merge(app_members_router(app_members_state)) .merge(api_keys_router(api_keys_state)) .merge(triggers_router(triggers_state)) + .merge(files_admin_router(files_admin_state)) .merge(dead_letters_router(dead_letters_state)) .layer(from_fn_with_state( auth_state.clone(), diff --git a/crates/shared/src/files.rs b/crates/shared/src/files.rs new file mode 100644 index 0000000..b3842d3 --- /dev/null +++ b/crates/shared/src/files.rs @@ -0,0 +1,339 @@ +//! `FilesService` — the v1.1.5 filesystem-backed blob store contract. +//! +//! Lives in `picloud-shared` (not `executor-core`) so the Rhai bridge, +//! the manager-core filesystem+Postgres impl, and any in-memory test +//! impl can all depend on the same trait without dragging +//! `executor-core` into a Postgres or filesystem dependency. +//! +//! Implementations MUST derive every storage `app_id` from `cx.app_id` +//! — never from a script-passed argument. That is the cross-app +//! isolation boundary; see `docs/sdk-shape.md`. +//! +//! `FilesService` is collection-scoped: scripts get a handle via +//! `files::collection(name)` and call +//! `create`/`head`/`get`/`update`/`delete`/`list` on it. The blob bytes +//! never travel through Postgres or through trigger payloads — the row +//! is metadata + a SHA-256 checksum; the bytes live on the filesystem. + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +use crate::SdkCallCx; + +/// POSIX-portable filename cap (255 bytes). +pub const MAX_FILE_NAME_BYTES: usize = 255; +/// RFC 6838 puts a reasonable media-type ceiling around 127 chars. +pub const MAX_CONTENT_TYPE_BYTES: usize = 127; + +/// Payload for `create` — a brand-new blob. The id is server-generated +/// (a UUID); scripts never supply it. +#[derive(Debug, Clone)] +pub struct NewFile { + pub name: String, + pub content_type: String, + pub data: Vec, +} + +/// Payload for `update` — replacement bytes plus optional metadata. If +/// `name` / `content_type` are `None` the prior values are kept. +#[derive(Debug, Clone)] +pub struct FileUpdate { + pub data: Vec, + pub name: Option, + pub content_type: Option, +} + +/// File metadata as scripts and triggers see it. Serialized into +/// `ServiceEvent.payload` (the blob bytes are NOT included — files are +/// too big to ship through trigger payloads), and surfaced to Rhai by +/// `head` / `list`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct FileMeta { + pub id: Uuid, + pub collection: String, + pub name: String, + pub content_type: String, + pub size: u64, + /// Lowercase hex SHA-256 of the content. + pub checksum: String, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +/// One page of file metadata from `FilesService::list`. `next_cursor` +/// is `Some` when more pages exist, `None` when exhausted. +#[derive(Debug, Clone)] +pub struct FilesListPage { + pub files: Vec, + pub next_cursor: Option, +} + +#[async_trait] +pub trait FilesService: Send + Sync { + /// Create a new blob; returns its server-generated id. Throws on a + /// missing required field, an over-limit blob, or an invalid + /// collection name. + async fn create( + &self, + cx: &SdkCallCx, + collection: &str, + new: NewFile, + ) -> Result; + + /// Metadata only — no body read. `None` if the file is missing. + async fn head( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result, FilesError>; + + /// Full content. `None` if missing. Verifies the stored checksum + /// against the bytes on disk and returns `FilesError::Corrupted` + /// when they diverge. + async fn get( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + ) -> Result>, FilesError>; + + /// Replace content (and optionally metadata). Throws `NotFound` + /// when the file doesn't exist. + async fn update( + &self, + cx: &SdkCallCx, + collection: &str, + id: &str, + upd: FileUpdate, + ) -> Result<(), FilesError>; + + /// Delete by id; returns whether the file was present. + async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result; + + /// Cursor-paginated metadata listing (same shape as KV's list). + async fn list( + &self, + cx: &SdkCallCx, + collection: &str, + cursor: Option<&str>, + limit: u32, + ) -> Result; +} + +/// Failure modes surfaced to the Rhai bridge. The bridge converts each +/// to a Rhai runtime error string; the discriminants exist so internal +/// callers (admin endpoints, tests) can react more precisely. +#[derive(Debug, Error)] +pub enum FilesError { + /// Empty collection name, or one containing a path separator / `..` + /// / NUL — rejected at the SDK boundary per `docs/sdk-shape.md`. + #[error("invalid collection name: {0}")] + InvalidCollection(String), + + /// A required field on `create` was missing or empty. The string + /// names the field (`name` / `content_type` / `data`). + #[error("missing required field: {0}")] + MissingField(&'static str), + + /// Blob exceeds the per-file size cap (default 100 MB, + /// `PICLOUD_FILES_MAX_FILE_SIZE_BYTES`). + #[error("file too large: {size} bytes exceeds limit of {limit} bytes")] + TooLarge { size: usize, limit: usize }, + + /// Filename exceeds `MAX_FILE_NAME_BYTES`. + #[error("file name too long: {0} bytes exceeds 255")] + NameTooLong(usize), + + /// Content-type exceeds `MAX_CONTENT_TYPE_BYTES`. + #[error("content_type too long: {0} bytes exceeds 127")] + ContentTypeTooLong(usize), + + /// `update` on a non-existent file. + #[error("file not found")] + NotFound, + + /// The bytes on disk no longer match the stored checksum — the + /// filesystem corrupted or a backup was misconfigured. The operator + /// decides what to do with the metadata-vs-bytes mismatch; the repo + /// does NOT auto-delete. + #[error("file content corrupted (checksum mismatch)")] + Corrupted, + + /// Caller principal lacked the required capability. Only raised when + /// `cx.principal.is_some()` — scripts running with `principal: None` + /// (public HTTP) operate under script-as-gate semantics and skip + /// the capability check. + #[error("forbidden")] + Forbidden, + + /// Anything else — Postgres unavailable, filesystem I/O error, etc. + #[error("files backend error: {0}")] + Backend(String), +} + +impl NewFile { + /// Validate required fields + length caps at the SDK boundary. + /// `data` must be non-empty (v1.1.5 treats an empty blob as a + /// missing `data` field — see HANDBACK §7). + /// + /// # Errors + /// + /// Returns the field-specific [`FilesError`] for the first failing + /// check. + pub fn validate(&self, max_size: usize) -> Result<(), FilesError> { + if self.name.trim().is_empty() { + return Err(FilesError::MissingField("name")); + } + if self.content_type.trim().is_empty() { + return Err(FilesError::MissingField("content_type")); + } + if self.data.is_empty() { + return Err(FilesError::MissingField("data")); + } + if self.name.len() > MAX_FILE_NAME_BYTES { + return Err(FilesError::NameTooLong(self.name.len())); + } + if self.content_type.len() > MAX_CONTENT_TYPE_BYTES { + return Err(FilesError::ContentTypeTooLong(self.content_type.len())); + } + if self.data.len() > max_size { + return Err(FilesError::TooLarge { + size: self.data.len(), + limit: max_size, + }); + } + Ok(()) + } +} + +impl FileUpdate { + /// Validate the replacement bytes + any supplied metadata. + /// + /// # Errors + /// + /// Returns the field-specific [`FilesError`] for the first failing + /// check. + pub fn validate(&self, max_size: usize) -> Result<(), FilesError> { + if self.data.is_empty() { + return Err(FilesError::MissingField("data")); + } + if let Some(name) = &self.name { + if name.trim().is_empty() { + return Err(FilesError::MissingField("name")); + } + if name.len() > MAX_FILE_NAME_BYTES { + return Err(FilesError::NameTooLong(name.len())); + } + } + if let Some(ct) = &self.content_type { + if ct.trim().is_empty() { + return Err(FilesError::MissingField("content_type")); + } + if ct.len() > MAX_CONTENT_TYPE_BYTES { + return Err(FilesError::ContentTypeTooLong(ct.len())); + } + } + if self.data.len() > max_size { + return Err(FilesError::TooLarge { + size: self.data.len(), + limit: max_size, + }); + } + Ok(()) + } +} + +/// Reject a collection name that is empty or could escape the per-app +/// files tree. UUID-shaped ids never produce traversal paths, but +/// collection names come from scripts so they're validated defensively +/// at both the SDK boundary and the repo. +/// +/// # Errors +/// +/// Returns [`FilesError::InvalidCollection`] when the name is empty or +/// contains `/`, `\`, `..`, or a NUL byte. +pub fn validate_collection(collection: &str) -> Result<(), FilesError> { + if collection.is_empty() { + return Err(FilesError::InvalidCollection("must not be empty".into())); + } + if collection.contains('/') + || collection.contains('\\') + || collection.contains("..") + || collection.contains('\0') + { + return Err(FilesError::InvalidCollection(format!( + "collection {collection:?} must not contain '/', '\\', '..', or NUL" + ))); + } + Ok(()) +} + +/// Stub used by the test harness so executor-core integration tests +/// (which don't touch files) can construct a `Services` bundle without +/// a filesystem or Postgres. Every call returns +/// `FilesError::Backend("...")` so accidental use surfaces clearly. +#[derive(Debug, Default, Clone, Copy)] +pub struct NoopFilesService; + +#[async_trait] +impl FilesService for NoopFilesService { + async fn create( + &self, + _cx: &SdkCallCx, + _collection: &str, + _new: NewFile, + ) -> Result { + Err(FilesError::Backend("files is not wired in".into())) + } + + async fn head( + &self, + _cx: &SdkCallCx, + _collection: &str, + _id: &str, + ) -> Result, FilesError> { + Err(FilesError::Backend("files is not wired in".into())) + } + + async fn get( + &self, + _cx: &SdkCallCx, + _collection: &str, + _id: &str, + ) -> Result>, FilesError> { + Err(FilesError::Backend("files is not wired in".into())) + } + + async fn update( + &self, + _cx: &SdkCallCx, + _collection: &str, + _id: &str, + _upd: FileUpdate, + ) -> Result<(), FilesError> { + Err(FilesError::Backend("files is not wired in".into())) + } + + async fn delete( + &self, + _cx: &SdkCallCx, + _collection: &str, + _id: &str, + ) -> Result { + Err(FilesError::Backend("files is not wired in".into())) + } + + async fn list( + &self, + _cx: &SdkCallCx, + _collection: &str, + _cursor: Option<&str>, + _limit: u32, + ) -> Result { + Err(FilesError::Backend("files is not wired in".into())) + } +} diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index 5e9bfbb..aee6c12 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -12,6 +12,7 @@ pub mod error; pub mod events; pub mod exec_summary; pub mod execution_log; +pub mod files; pub mod http; pub mod ids; pub mod inbox; @@ -36,6 +37,10 @@ pub use error::Error; pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter}; pub use exec_summary::ExecResponseSummary; pub use execution_log::{ExecutionLog, ExecutionStatus}; +pub use files::{ + validate_collection as validate_files_collection, FileMeta, FileUpdate, FilesError, + FilesListPage, FilesService, NewFile, NoopFilesService, +}; pub use http::{HttpError, HttpRequest, HttpResponse, HttpService, NoopHttpService}; pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId, TriggerId}; pub use inbox::{ @@ -50,6 +55,8 @@ pub use sandbox::ScriptSandbox; pub use script::{Script, ScriptKind}; pub use sdk_cx::SdkCallCx; pub use services::Services; -pub use trigger_event::{DeadLetterEventDetail, DocsEventOp, KvEventOp, TriggerEvent}; +pub use trigger_event::{ + DeadLetterEventDetail, DocsEventOp, FilesEventOp, KvEventOp, TriggerEvent, +}; pub use validator::{ScriptValidator, ValidatedScript, ValidationError}; pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION}; diff --git a/crates/shared/src/services.rs b/crates/shared/src/services.rs index 1ec4a93..97328ab 100644 --- a/crates/shared/src/services.rs +++ b/crates/shared/src/services.rs @@ -20,9 +20,9 @@ use std::sync::Arc; use crate::{ - DeadLetterService, DocsService, HttpService, KvService, ModuleSource, NoopDeadLetterService, - NoopDocsService, NoopEventEmitter, NoopHttpService, NoopKvService, NoopModuleSource, - ServiceEventEmitter, + DeadLetterService, DocsService, FilesService, HttpService, KvService, ModuleSource, + NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService, + NoopKvService, NoopModuleSource, ServiceEventEmitter, }; /// SDK service bundle. See module docs for the lifecycle and the v1.1.x @@ -60,6 +60,13 @@ pub struct Services { /// the picloud binary; `NoopHttpService` in tests that don't make /// network calls. pub http: Arc, + + /// Filesystem-backed blob storage (v1.1.5). Scripts get + /// `files::collection(name).{create,head,get,update,delete,list}`. + /// Backed by a Postgres-metadata + on-disk-bytes repo in the + /// picloud binary; `NoopFilesService` in tests that don't touch + /// files. + pub files: Arc, } impl Services { @@ -74,6 +81,7 @@ impl Services { events: Arc, modules: Arc, http: Arc, + files: Arc, ) -> Self { Self { kv, @@ -82,6 +90,7 @@ impl Services { events, modules, http, + files, } } @@ -99,6 +108,7 @@ impl Services { Arc::new(NoopEventEmitter), Arc::new(NoopModuleSource), Arc::new(NoopHttpService), + Arc::new(NoopFilesService), ) } } diff --git a/crates/shared/src/trigger_event.rs b/crates/shared/src/trigger_event.rs index 69a679f..0b399b9 100644 --- a/crates/shared/src/trigger_event.rs +++ b/crates/shared/src/trigger_event.rs @@ -78,6 +78,39 @@ impl DocsEventOp { } } +/// Operations a files trigger can fire on. v1.1.5. Stored as a +/// lowercase string in `files_trigger_details.ops` (Postgres `text[]`). +/// CRUD verbs (`create`) mirror `DocsEventOp`, distinct from KV's +/// set/upsert flavour. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum FilesEventOp { + Create, + Update, + Delete, +} + +impl FilesEventOp { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Create => "create", + Self::Update => "update", + Self::Delete => "delete", + } + } + + #[must_use] + pub fn from_wire(s: &str) -> Option { + match s { + "create" => Some(Self::Create), + "update" => Some(Self::Update), + "delete" => Some(Self::Delete), + _ => None, + } + } +} + /// Discriminated description of a triggering event. Lifted from the /// outbox row's payload at dispatch time. Each variant carries the /// fields the corresponding `ctx.event` shape exposes to the script. @@ -123,6 +156,27 @@ pub enum TriggerEvent { fired_at: DateTime, }, + /// A files create / update / delete fired this handler. v1.1.5. + /// Carries the affected file's **metadata only** — never the blob + /// bytes (files are too big to ship through trigger payloads). A + /// handler that wants the bytes calls + /// `files::collection(c).get(id)` itself. `prev` is the prior + /// metadata for update (and the deleted-row metadata for delete); + /// absent on create. Surfaced to scripts as `ctx.event.files`. + Files { + op: FilesEventOp, + collection: String, + /// UUID as string — Rhai sees it as a string. + id: String, + name: String, + content_type: String, + size: u64, + /// Lowercase hex SHA-256. + checksum: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + prev: Option, + }, + /// A dead-letter row fired this handler. The original event is /// nested verbatim plus the dead-letter metadata the design notes /// §4 require. @@ -148,6 +202,7 @@ impl TriggerEvent { Self::Kv { .. } => "kv", Self::Docs { .. } => "docs", Self::Cron { .. } => "cron", + Self::Files { .. } => "files", Self::DeadLetter { .. } => "dead_letter", } } diff --git a/dashboard/src/lib/api.ts b/dashboard/src/lib/api.ts index a58cd20..d9e1108 100644 --- a/dashboard/src/lib/api.ts +++ b/dashboard/src/lib/api.ts @@ -211,7 +211,7 @@ export interface DeadLetterRow { resolution: 'replayed' | 'ignored' | 'handled_by_script' | 'handler_failed' | null; } -export type TriggerKind = 'kv' | 'docs' | 'dead_letter' | 'cron'; +export type TriggerKind = 'kv' | 'docs' | 'dead_letter' | 'cron' | 'files' | 'pubsub'; export type TriggerDispatchMode = 'sync' | 'async'; /// Per-kind detail, tagged by `kind` to match the Rust serde shape. @@ -219,7 +219,21 @@ export type TriggerDetails = | { kind: 'kv'; collection_glob: string; ops: string[] } | { kind: 'docs'; collection_glob: string; ops: string[] } | { kind: 'dead_letter'; source_filter?: string; trigger_id_filter?: string; script_id_filter?: string } - | { kind: 'cron'; schedule: string; timezone: string; last_fired_at?: string | null }; + | { kind: 'cron'; schedule: string; timezone: string; last_fired_at?: string | null } + | { kind: 'files'; collection_glob: string; ops: string[] } + | { kind: 'pubsub'; topic_pattern: string }; + +/// v1.1.5 file metadata as the admin files endpoint returns it. +export interface FileMeta { + id: string; + collection: string; + name: string; + content_type: string; + size: number; + checksum: string; + created_at: string; + updated_at: string; +} export interface Trigger { id: string; @@ -625,6 +639,23 @@ export const api = { ) }, + files: { + list: (idOrSlug: string, collection: string, opts: { cursor?: string; limit?: number } = {}) => { + const params = new URLSearchParams(); + params.set('collection', collection); + if (opts.cursor) params.set('cursor', opts.cursor); + if (opts.limit !== undefined) params.set('limit', String(opts.limit)); + return adminRequest<{ files: FileMeta[]; next_cursor: string | null }>( + `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/files?${params.toString()}` + ); + }, + remove: (idOrSlug: string, collection: string, fileId: string) => + adminRequest( + `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/files/${encodeURIComponent(collection)}/${fileId}`, + { method: 'DELETE' } + ) + }, + execute: async ( id: string, body: unknown, diff --git a/dashboard/src/routes/apps/[slug]/+page.svelte b/dashboard/src/routes/apps/[slug]/+page.svelte index 2e34ead..77203a9 100644 --- a/dashboard/src/routes/apps/[slug]/+page.svelte +++ b/dashboard/src/routes/apps/[slug]/+page.svelte @@ -530,6 +530,13 @@ class:active={activeTab === 'settings'} onclick={() => (activeTab = 'settings')}>Settings + + Files + + import { base } from '$app/paths'; + import { page } from '$app/state'; + import { api, ApiError, type App, type FileMeta } from '$lib/api'; + import ConfirmModal from '$lib/ConfirmModal.svelte'; + + let slug = $derived(page.params.slug ?? ''); + let app = $state(null); + let collection = $state(''); + let activeCollection = $state(''); + let files = $state([]); + let nextCursor = $state(null); + let loading = $state(false); + let error = $state(null); + let fileToRemove = $state(null); + let removing = $state(false); + + async function loadApp() { + try { + app = await api.apps.get(slug); + } catch (e) { + error = e instanceof ApiError ? e.message : String(e); + } + } + + $effect(() => { + void slug; + void loadApp(); + }); + + async function loadFiles(cursor?: string) { + const c = collection.trim(); + if (!c) { + error = 'Enter a collection name to list its files.'; + return; + } + loading = true; + error = null; + try { + const res = await api.files.list(slug, c, { cursor, limit: 100 }); + if (cursor) { + files = [...files, ...res.files]; + } else { + files = res.files; + activeCollection = c; + } + nextCursor = res.next_cursor; + } catch (e) { + error = e instanceof ApiError ? e.message : String(e); + } finally { + loading = false; + } + } + + async function confirmRemove() { + if (!fileToRemove) return; + removing = true; + try { + await api.files.remove(slug, fileToRemove.collection, fileToRemove.id); + files = files.filter((f) => f.id !== fileToRemove!.id); + fileToRemove = null; + } catch (e) { + error = e instanceof ApiError ? e.message : String(e); + } finally { + removing = false; + } + } + + function fmtTime(iso: string): string { + return new Date(iso).toLocaleString(); + } + + function fmtSize(bytes: number): string { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + } + + + + Files · {slug} · PiCloud + + +
+
+
+ ← back to {app?.name ?? slug} +

Files

+

+ Browse and delete stored blobs by collection. Uploads happen from scripts via + files::collection(c).create(…). +

+
+
+ +
{ + e.preventDefault(); + void loadFiles(); + }} + > + + +
+ + {#if error} +
{error}
+ {/if} + + {#if activeCollection} + {#if files.length === 0 && !loading} +

No files in collection {activeCollection}.

+ {:else} + + + + + + + + + + + + + {#each files as f (f.id)} + + + + + + + + + {/each} + +
NameContent typeSizeCreatedID
{f.name}{f.content_type}{fmtSize(f.size)}{fmtTime(f.created_at)}{f.id} + +
+ {#if nextCursor} + + {/if} + {/if} + {/if} +
+ +{#if fileToRemove} + (fileToRemove = null)} + > +

+ Delete {fileToRemove.name} ({fmtSize(fileToRemove.size)}) from collection + {fileToRemove.collection}? This removes both the metadata row and the bytes on + disk and cannot be undone. +

+ {#if removing}

Deleting…

{/if} +
+{/if} + +