5 Commits

Author SHA1 Message Date
MechaCat02
d064681c49 docs(v1.1.5): reviewer audit report — APPROVE verdict
Audit of feat/v1.1.5-files-pubsub against the v1.1.5 dispatch prompt.
All gates green on HEAD; 491 tests pass (+64 new), 139 ignored.

Atomic write protocol audited line-by-line: single-pass SHA-256,
temp→fsync→rename→fsync-dir→DB sequence as specified, unique pid+
counter temp suffix, path-traversal defense at SDK boundary and repo.
Pub/sub fan-out is correctly transactional (single tx begin+commit;
one outbox row per matching subscriber; trigger_depth saturating-
bumped). Topic pattern matcher rejects every shape the brief called
out (*.created, **, a.*.b, user.*x, *user, empty).

Three flagged open questions resolved: orphan-sweep deferred (matches
planning decision), test count 63 vs 70 (defensible — gap is the
dispatcher e2e test, which is already covered for kv/docs/cron via
the shared dispatcher path), empty-blob = missing-data (defensible
interpretation, relaxable later).

First CI workflow added; schema_snapshot un-ignored with DATABASE_URL-
absent skip path.
2026-06-03 21:52:34 +02:00
MechaCat02
9492c18d0e docs(v1.1.5): handback report
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 21:47:55 +02:00
MechaCat02
4595db7a7a chore(v1.1.5): version bumps, CI workflow, schema-snapshot un-ignore
- Workspace 1.1.4 → 1.1.5; SDK 1.5 → 1.6; dashboard 0.10.0 → 0.11.0.
- CHANGELOG v1.1.5 entry; CLAUDE.md runtime-config table gains
  PICLOUD_FILES_ROOT + PICLOUD_FILES_MAX_FILE_SIZE_BYTES.
- schema_snapshot test: drop #[ignore] + #[sqlx::test]; run against
  DATABASE_URL when set, skip cleanly when absent. Re-blessed golden
  picks up files / files_trigger_details / pubsub_trigger_details, the
  two widened CHECKs, and the pubsub partial index.
- First CI workflow (.github/workflows/ci.yml): postgres:15 service +
  fmt + clippy + cargo test --workspace; separate dashboard check job.
- Add files/pubsub admin-trigger reject-coverage tests (module +
  cross-app + bad-pattern), mirroring the v1.1.3 regression set.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 21:44:12 +02:00
MechaCat02
834c787ee1 feat(v1.1.5): pubsub::publish_durable SDK + pubsub:* triggers
Durable pub/sub through the universal outbox — the sixth trigger kind.

- `pubsub::publish_durable(topic, message)` Rhai SDK (no handle; topics
  ARE the grouping unit). Message JSON-encoded; Blobs base64 at any
  depth.
- `PubsubService` trait in picloud-shared with the topic matcher +
  validator (exact / `<prefix>.*` / `*`; mid-pattern wildcards
  rejected). `PostgresPubsubRepo` + `PubsubServiceImpl` in manager-core.
- Publish-time fan-out: one outbox row per matching enabled pubsub
  trigger, all in ONE transaction (no half-fan-out on crash). No
  matching trigger → publish succeeds silently, zero rows.
- `pubsub:*` trigger kind via Layout-E (0020: widen both CHECKs +
  pubsub_trigger_details + partial index), TriggerEvent::Pubsub +
  ctx.event.pubsub, dispatcher arm, admin endpoint POST /triggers/pubsub
  (validates topic pattern + reuses validate_trigger_target).
- AppPubsubPublish capability → script:write (seven-scope held).
- Dashboard Pub/Sub trigger form on the Triggers tab + list rendering.

publish_ephemeral stays deferred to v1.2. ~18 new tests (service
in-memory incl. transactional-rollback, shared matcher, bridge
encoding). No DB required for the suite.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 21:37:06 +02:00
MechaCat02
6e132b6ee0 feat(v1.1.5): files SDK + files:* triggers
Filesystem-backed blob storage as the fifth concrete trigger kind.

- `files::collection(c).{create,head,get,update,delete,list}` Rhai SDK
  (blob in/out; metadata maps; missing-field throws naming the field).
- `FilesService` trait in picloud-shared; `FsFilesRepo` (atomic
  write: temp→fsync→rename→fsync-dir→DB; single-pass SHA-256;
  checksum-verified reads → Corrupted) + `FilesServiceImpl` in
  manager-core. Metadata in Postgres (0018), bytes on disk under
  PICLOUD_FILES_ROOT with 0o700 shard dirs.
- `files:*` trigger kind via the Layout-E pattern (0019: widen both
  CHECKs + files_trigger_details), TriggerEvent::Files (metadata only,
  no bytes), emit_files fan-out, dispatcher arm, admin endpoint
  POST /triggers/files (reuses validate_trigger_target).
- AppFilesRead/AppFilesWrite capabilities → script:read/script:write
  (seven-scope commitment held). AppPubsubPublish reserved for v1.1.6.
- Admin files API (list + delete) + dashboard Files view per app.

Cross-app isolation keyed on cx.app_id at every layer. ~45 new tests
(service in-memory, fs tempdir, bridge integration). No DB required
for the suite. publish_ephemeral and the orphan sweep stay deferred.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 21:18:17 +02:00
46 changed files with 5512 additions and 372 deletions

72
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,72 @@
name: CI
on:
push:
branches: [main]
pull_request:
env:
CARGO_TERM_COLOR: always
# Matches what docker-compose produces locally; the schema-snapshot
# guardrail and any other DB-backed tests run against this service.
DATABASE_URL: postgres://picloud:picloud@localhost:5432/picloud
jobs:
rust:
name: Rust — fmt, clippy, test
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: picloud
POSTGRES_PASSWORD: picloud
POSTGRES_DB: picloud
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U picloud"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
# rust-toolchain.toml pins the channel; this action honors it.
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy
- name: Cache cargo
uses: Swatinem/rust-cache@v2
- name: Format check
run: cargo fmt --all -- --check
- name: Clippy
run: cargo clippy --all-targets --all-features -- -D warnings
# Runs the whole workspace, including the schema-snapshot guardrail
# (it picks up DATABASE_URL from the env above and the postgres
# service; without a DB it would skip cleanly).
- name: Test
run: cargo test --workspace
dashboard:
name: Dashboard — check
runs-on: ubuntu-latest
defaults:
run:
working-directory: dashboard
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
cache: npm
cache-dependency-path: dashboard/package-lock.json
- name: Install deps
run: npm ci
- name: Svelte check
run: npm run check

View File

@@ -1,5 +1,76 @@
# PiCloud Changelog
## v1.1.5 — Files & Pub/Sub (unreleased)
Two stateful services + two trigger kinds. **`files::*`** is
filesystem-backed blob storage (atomic writes, path-sharded layout,
single-pass SHA-256 with checksum-verified reads); the metadata row
lives in Postgres, the bytes on disk. **`pubsub::publish_durable`** is
durable pub/sub through the universal outbox, fanning out one delivery
row per matching subscriber **at publish time** inside a single
transaction. Both ride the v1.1.1 trigger framework as the fifth and
sixth concrete kinds via the established Layout-E extension pattern.
### Added
- **`files::collection(name).{create,head,get,update,delete,list}`** —
blob storage SDK. `create`/`update` take a Rhai `Blob`; `get` returns
a `Blob` (or `()` if missing); `head`/`list` return metadata maps
(`id, name, content_type, size, checksum, created_at, updated_at`).
`create`/`update`/`delete` throw on failure; `get`/`head` return `()`
for a missing file; `delete` returns a was-present bool. Missing
required field on `create` throws naming the field.
- **Atomic writes** — temp file → fsync → rename → fsync parent dir →
DB row, so a crash never leaves a readable half-written file. SHA-256
is computed in a single pass during the write; `get` re-verifies it
and surfaces `FilesError::Corrupted` (logged with the path, never
auto-deleted) on a mismatch. Shard dirs are created `0o700`.
- **`files:*` trigger kind** — `ctx.event.files` carries the metadata
only (never the bytes; a handler that wants them calls
`files::collection(c).get(id)`). `prev` is `()` on create, the prior
metadata on update, the deleted metadata on delete.
- **`pubsub::publish_durable(topic, message)`** — durable publish.
Message is any JSON-serializable Rhai value; Blobs encode as base64
(at any nesting depth). No matching subscriber → the publish succeeds
silently with zero outbox rows.
- **`pubsub:*` trigger kind** — topic patterns are exact, `<prefix>.*`,
or `*`; mid-pattern wildcards are rejected at trigger creation.
`ctx.event.pubsub` carries `topic`, `message`, `published_at`.
- **`FilesService` + `PubsubService` traits** (`picloud-shared`) +
`FsFilesRepo`/`FilesServiceImpl` and `PostgresPubsubRepo`/
`PubsubServiceImpl` (manager-core). Wired into the `Services` bundle
as `files` and `pubsub`.
- **Capabilities** `AppFilesRead`/`AppFilesWrite``script:read`/
`script:write`, `AppPubsubPublish``script:write`. No new `Scope`
variant — the seven-scope commitment holds. Script-as-gate: skipped
when the script runs unauthenticated.
- **Admin files API** (`GET`/`DELETE /apps/{id}/files`) + dashboard
Files view per app; **Pub/Sub trigger form** on the Triggers tab.
- **CI** — first `.github/workflows/ci.yml` (Postgres service, fmt +
clippy + `cargo test --workspace`); the schema-snapshot guardrail now
runs instead of being `#[ignore]`'d.
### Changed
- Workspace version: 1.1.4 → 1.1.5
- Rhai SDK version: 1.5 → 1.6
- Dashboard version: 0.10.0 → 0.11.0
- `schema_snapshot` test: no longer `#[ignore]`'d — runs against
`DATABASE_URL` when set, skips cleanly when absent.
### Migrations
- 0018_files.sql — `files` metadata table (bytes live on disk).
- 0019_files_triggers.sql — widen kind/source_kind CHECKs + add
`files_trigger_details`.
- 0020_pubsub_triggers.sql — widen kind/source_kind CHECKs + add
`pubsub_trigger_details` + partial index.
### New environment variables
- `PICLOUD_FILES_ROOT` (default `./data`)
- `PICLOUD_FILES_MAX_FILE_SIZE_BYTES` (default 100 MB)
## v1.1.4 — Outbound HTTP & Cron triggers (unreleased)
Two surfaces. **`http::*`** lets Rhai scripts make outbound HTTP

View File

@@ -118,6 +118,8 @@ Environment variables consumed by the `picloud` binary:
| `DATABASE_URL` | — | Required. Postgres connection string. |
| `PICLOUD_SESSION_TTL_HOURS` | `24` | Sliding-window session lifetime. |
| `PICLOUD_SANDBOX_MAX_*` | conservative defaults | Per-knob admin ceilings on Rhai sandbox overrides. See `manager-core::sandbox::SandboxCeiling`. |
| `PICLOUD_FILES_ROOT` | `./data` | Filesystem root for `files::*` blob storage (v1.1.5). Bytes live at `<root>/files/<app_id>/<collection>/<id[0:2]>/<id>`; metadata in Postgres. |
| `PICLOUD_FILES_MAX_FILE_SIZE_BYTES` | `104857600` (100 MB) | Per-file hard size cap for `files::*` (v1.1.5). Per-app quotas deferred to v1.2. |
## Out of MVP

18
Cargo.lock generated
View File

@@ -1610,7 +1610,7 @@ dependencies = [
[[package]]
name = "picloud"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"anyhow",
"async-trait",
@@ -1636,7 +1636,7 @@ dependencies = [
[[package]]
name = "picloud-cli"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"anyhow",
"assert_cmd",
@@ -1657,7 +1657,7 @@ dependencies = [
[[package]]
name = "picloud-executor"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"anyhow",
"picloud-executor-core",
@@ -1669,7 +1669,7 @@ dependencies = [
[[package]]
name = "picloud-executor-core"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"async-trait",
"base64",
@@ -1693,7 +1693,7 @@ dependencies = [
[[package]]
name = "picloud-manager"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"anyhow",
"picloud-manager-core",
@@ -1705,7 +1705,7 @@ dependencies = [
[[package]]
name = "picloud-manager-core"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"argon2",
"async-trait",
@@ -1733,7 +1733,7 @@ dependencies = [
[[package]]
name = "picloud-orchestrator"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"anyhow",
"picloud-orchestrator-core",
@@ -1745,7 +1745,7 @@ dependencies = [
[[package]]
name = "picloud-orchestrator-core"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"async-trait",
"axum",
@@ -1766,7 +1766,7 @@ dependencies = [
[[package]]
name = "picloud-shared"
version = "1.1.4"
version = "1.1.5"
dependencies = [
"async-trait",
"chrono",

View File

@@ -13,7 +13,7 @@ members = [
]
[workspace.package]
version = "1.1.4"
version = "1.1.5"
edition = "2021"
rust-version = "1.92"
license = "MIT OR Apache-2.0"

View File

@@ -1,252 +1,127 @@
# v1.1.4 Handback — Outbound HTTP SDK & Cron Triggers
# HANDBACK — v1.1.5 Files & Pub/Sub
**Branch:** `feat/v1.1.4-http-cron` (off `main`)
**Commits:** 1 implementation commit (`feat(v1.1.4): outbound HTTP SDK + cron triggers`) + this HANDBACK commit.
## §1 Branch + commits
> **Note on commit granularity:** the brief suggested split
> `feat(v1.1.4-http)` / `feat(v1.1.4-cron)` commits. The two features are
> interleaved across shared files (`Cargo.toml`, `crates/picloud/src/lib.rs`,
> `crates/manager-core/src/lib.rs`, `version.rs`, `services.rs`), so
> cleanly-*compiling* per-theme commits aren't separable without interactive
> hunk staging (unavailable in this environment). I chose one coherent,
> green commit over shipping broken intermediates. Squash/relabel as you see fit.
- **Branch:** `feat/v1.1.5-files-pubsub` (off `main`). Not pushed, not merged, no PR.
- **Commits:** the two-feature split decided in planning + a finalize commit; HANDBACK is the 4th (docs):
1. `6e132b6 feat(v1.1.5): files SDK + files:* triggers`
2. `834c787 feat(v1.1.5): pubsub::publish_durable SDK + pubsub:* triggers`
3. `4595db7 chore(v1.1.5): version bumps, CI workflow, schema-snapshot un-ignore`
4. `docs(v1.1.5): handback report` (this file)
---
Each of commits 13 is independently green (fmt + clippy + `cargo test --workspace`). Shared files (Cargo deps, `Services` bundle, `version.rs`, dispatcher arm, authz enum, CHANGELOG) are touched in both feature commits as planned — additive only, so commit 1 compiles green with the `AppPubsubPublish` capability and the dashboard `'pubsub'` type union present-but-unused until commit 2.
## Scope coverage
## §2 Scope coverage
| # | Item | Status |
|---|------|--------|
| 1 | `http::*` SDK surface (get/post/put/patch/delete/head/post_form/request) | **Done** |
| 2 | SSRF deny-list (resolved-IP, DNS-rebinding defense, scheme/port, body caps, UA, timeouts, `PICLOUD_HTTP_ALLOW_PRIVATE`) | **Done** |
| 3 | http authz (`Capability::AppHttpRequest``script:write`, script-as-gate, no new Scope) | **Done** |
| 4 | `HttpService` trait + `HttpServiceImpl` + Services wiring | **Done** |
| 5 | Cron migration `0017` (Layout-E extension) | **Done** |
| 6 | Cron scheduler tokio task (catch-up = fire-once) | **Done** |
| 7 | `ctx.event.cron` shape + `TriggerEvent::Cron` | **Done** |
| 8 | Dispatcher routing extension (`… \| Cron`) | **Done** |
| 9 | Dashboard cron trigger UI (minimal) | **Done** |
| 10a | Redact `ModuleSourceError::Backend` at resolver boundary | **Done** |
| 10b | Pin `rhai = "=1.24"` | **Done** |
| 10c | CHANGELOG retroactive v1.1.3 cross-app-trigger security note | **Done** |
| 11 | Version bumps (workspace 1.1.4, SDK 1.5, dashboard 0.10.0) | **Done** |
| 12 | Tests (~50-70) | **Done** — 70 new |
| Brief item | Status | Notes |
|---|---|---|
| §1 `files::*` SDK | ✅ | `create/head/get/update/delete/list`, blob in/out, metadata maps, throw-vs-`()` convention. |
| §1 migration 0018_files.sql | ✅ | metadata table + `idx_files_app_collection`. Bytes on disk, never in PG. |
| §1 atomic writes/deletes, checksum, size+name+type caps, authz, events | ✅ | See §3. |
| §2 `files:*` trigger (Layout-E, 0019) | ✅ | widen 2 CHECKs + `files_trigger_details`; `TriggerEvent::Files` (metadata only); admin `POST /triggers/files`; `emit_files`; dispatcher arm. |
| §3 `pubsub::publish_durable` SDK | ✅ | publish-time transactional fan-out; topic matching in Rust; succeed-silently on no match. |
| §4 `pubsub:*` trigger (Layout-E, 0020) | ✅ | widen 2 CHECKs + `pubsub_trigger_details` + partial index; `TriggerEvent::Pubsub`; admin `POST /triggers/pubsub`; dispatcher arm. |
| §5 dashboard Files view | ✅ | `apps/[slug]/files/+page.svelte` (list per collection, per-row delete w/ confirm). Backed by a new admin files API (§7.2). |
| §5 dashboard Pub/Sub trigger form | ✅ | added to the Triggers tab beside Cron; trigger-list renders files + pubsub. `npm run check` clean. |
| §6 schema_snapshot CI follow-up | ✅ | §6b skip-when-absent + un-ignore; §6a new `.github/workflows/ci.yml`. See §5. |
| §7 version bumps | ✅ | workspace 1.1.4→1.1.5, SDK 1.5→1.6, dashboard 0.10.0→0.11.0, CHANGELOG, CLAUDE.md env table. |
| §8 tests | ⚠️ | 63 new tests (target 7090). Every *named* critical test covered; gap is the dispatcher end-to-end DB test (see §9.2). |
---
## §3 Files implementation notes
## SSRF policy implementation notes
**Service layering** (`FilesServiceImpl`, manager-core): validate collection (empty + traversal) → script-as-gate authz (`AppFilesRead`/`AppFilesWrite`, skipped when `cx.principal` is `None`) → field/size-cap validation → repo call keyed by `cx.app_id` → best-effort `ServiceEvent` emit. `executor-core` has **no** Postgres or filesystem dependency — both traits live in `picloud-shared`, the impl in manager-core.
- **reqwest hook.** `crates/manager-core/src/ssrf.rs` defines `SsrfResolver`
implementing `reqwest::dns::Resolve`, plugged in via
`ClientBuilder::dns_resolver`. It delegates to the system resolver
(injectable for tests — see DNS-rebinding test), then filters each `IpAddr`
through `SsrfPolicy::check`. Because reqwest re-resolves at every connection
(including each redirect hop), the policy applies post-redirect too.
- **`dns_resolver` is generic over a concrete `R: Resolve`** (stores `Arc<R>`),
so the resolver is passed as `Arc<SsrfResolver>`, not `Arc<dyn Resolve>`.
- **Literal-IP gap closed.** reqwest only routes *hostnames* through the custom
resolver — a URL with a literal-IP host (`http://127.0.0.1/`) bypasses it
entirely. The impl therefore *also* runs `SsrfPolicy::check` on literal-IP
hosts at URL-parse time (`validate_url`), on every hop. Both paths are tested.
- **IPv4-mapped IPv6 re-check.** `check_v6` calls `Ipv6Addr::to_ipv4_mapped()`;
if `Some`, it re-runs the v4 deny-list against the embedded address
(`::ffff:127.0.0.1` → denied as "loopback").
- **Applied before AND after redirects.** Redirects are followed *manually*
(client built with `redirect(Policy::none())`) so per-request
`follow_redirects`/`max_redirects` are honored; each hop re-validates
scheme/port + literal-IP and re-resolves hostnames through the SSRF resolver.
- **Script-visible error format.** `"http: blocked by SSRF policy: <reason>"`
where `<reason>` is a CIDR category (`loopback`, `private`, `link-local`,
`carrier-grade-nat`, `multicast`, `reserved`, `unique-local`, `unspecified`).
**The resolved IP is never included.** The all-addresses-denied case surfaces
as `Ssrf` (not a generic DNS error) via a marker error the resolver emits and
the impl detects by walking the reqwest error source chain.
**Atomic-write protocol** (`write_atomic_at`, a free fn so it's unit-testable without a pool):
1. Validate collection path-safety (defensive — already enforced at the SDK boundary).
2. `create_dir_all` the shard dir `<root>/files/<app_id>/<collection>/<id[0:2]>/<id>` with `0o700` (Unix `DirBuilderExt::mode`).
3. SHA-256 the in-memory bytes (single pass — never re-reads the file) while writing to `<final>.tmp.<pid>-<atomic-counter>`.
4. `sync_all()` the temp file.
5. `rename(tmp, final)` — atomic on POSIX.
6. `sync_all()` the parent dir (rename durability).
7. INSERT/UPDATE the DB row.
## Cron scheduler implementation notes
Rollback per step: crash in 15 → orphan `*.tmp.*` (never read; the pid+counter suffix avoids collisions); crash in 57 → bytes with no row, **never reachable via the SDK** because every read starts from the row. `update` reads the prior row first (existence + CDC `prev`), writes new bytes, then UPDATEs.
- **Catch-up = fire-once.** Matches the brief; no deviation. `next_due` returns a
single canonical scheduled-at (first slot after `last_fired_at`, or
`created_at` if never fired); after firing, `last_fired_at = now`, so the next
tick sees only future slots. **Verified live** against Postgres: an
every-second (`* * * * * *`) trigger with a 2s tick advanced `last_fired_at`
~once per 2s, not once per second.
- **No ExecutionGate contention.** The scheduler only enqueues to the outbox
(one row per due trigger per tick, in a `FOR UPDATE OF d SKIP LOCKED`
transaction that also bumps `last_fired_at`). The existing dispatcher acquires
the gate and delivers it identically to kv/docs/dead_letter — verified live
(the cron outbox row was consumed, the script executed, the row deleted).
- **Timezone handling.** `chrono-tz`. Invalid IANA names are rejected at the
admin endpoint with a 422 (`TriggersApiError::Invalid`, message contains
"timezone"); the repo re-validates defensively before insert.
- **Schema beyond the brief:** none. Followed the brief exactly — `schedule`,
`timezone DEFAULT 'UTC'`, `last_fired_at`, `idx_cron_triggers_due`. **No**
stored `next_scheduled_at` column (an exploration agent suggested one; the
brief computes next-fire in-process, which I followed).
**Atomic-delete protocol** (`FsFilesRepo::delete`): `SELECT … FOR UPDATE` + `DELETE` in one transaction → commit → `unlink` outside the tx. Unlink failure leaves an orphan (logged at warn); failure before commit changes nothing. Returns the deleted metadata so the service can emit.
---
**Path-traversal validation:** `picloud_shared::validate_files_collection` rejects empty / `/` / `\` / `..` / NUL at the SDK boundary; `FsFilesRepo::guard_collection` repeats it before any fs op. UUID ids can't produce traversal (verified defensively).
## Tests added (70 new)
**Per-call SHA-256:** computed once over the in-memory `Vec<u8>` during the write (`sha2::Sha256`), hex-lowercased, stored on the row. The file is never re-read to hash. Known-vector tests pin `SHA-256("abc")` and `SHA-256("")`.
- **SSRF policy + resolver (`ssrf.rs`, 20):** one per deny CIDR (127/8, 0/8,
10/8, 172.16/12, 192.168/16, 169.254/16 incl. metadata, 100.64/10, 224/4,
240/4, ::1, ::, fe80::/10, fc00::/7, ff00::/8); 172.x outside-range allowed;
public v4/v6 allowed; IPv4-mapped re-check; `allow_private` disables all;
resolver returns only allowed addrs; all-denied → SSRF marker; **DNS rebinding**
(mock resolver: public then private — second denied); empty resolution ≠ SSRF.
- **HTTP client (`http_service.rs`, 16):** GET/POST round-trips vs a hand-rolled
`TcpListener`; body dispatch + default UA; custom UA override; empty body;
non-2xx no-error; response cap via Content-Length; response cap mid-stream
(no Content-Length); request body cap pre-send; redirect-to-max-then-throw;
scheme rejection (file/ftp/gopher); port rejection (22/25/465/587); SSRF
literal-loopback; SSRF hostname-resolves-to-loopback; timeout; authz
(anon skips / member forbidden / member-with-role allowed).
- **Bridge integration (`sdk_http.rs`, 15):** real Rhai engine under
`spawn_blocking` vs a recording fake — status+JSON body, non-JSON string,
empty→`()`, Map→JSON, String→text, `()`→no body, headers+timeout forwarded,
unknown opt key throws, timeout>max throws, non-2xx no-throw, network error
throws `http:`, `post_form` url-encoding, `request` arbitrary method,
default-UA carries `script_id`, `cx.app_id` forwarded for attribution.
- **Cron scheduler (`cron_scheduler.rs`, 11):** 6-field schedule accept / 5-field
+ malformed reject; IANA tz accept / reject; due/not-due; never-fired uses
created_at; **catch-up fires exactly once after 5 missed windows**; timezone
affects fire time; bad schedule/tz → None.
- **Cron admin (`triggers_api.rs`, 6):** create succeeds; invalid schedule;
unknown timezone; **module target rejected** (v1.1.3 regression); **cross-app
target rejected** (v1.1.3 regression); member-without-role forbidden.
- **Module redaction (2):** `modules.rs` — backend error redacted from the
script-visible error (no leak); `module_redaction_logging.rs` — original error
**is** logged at ERROR level (captured via a global tracing subscriber).
**Checksum-on-get:** `get` reads the file, re-hashes, compares to the stored checksum. Mismatch (or missing bytes while the row persists) → `FilesError::Corrupted`, logged at error level with the path, **no auto-delete**. To scripts this surfaces as a thrown Rhai error `"files: file content corrupted (checksum mismatch)"`.
---
## §4 Pub/Sub implementation notes
## Decisions beyond the brief (every prompt-default deviation, flagged)
**Fan-out-at-publish-time, transactional** (`PostgresPubsubRepo::fan_out_publish`): one transaction — `SELECT` all enabled pubsub triggers for the app (joined to `pubsub_trigger_details`), filter by `topic_matches` in Rust, INSERT one `outbox` row (`source_kind='pubsub'`) per survivor, commit once. A mid-fan-out failure rolls back every row (no half-fan-out). Each delivery row then retries/dead-letters independently through the unchanged dispatcher (its trigger arm just gained `| OutboxSourceKind::Pubsub`).
1. **Three-arg split `verb(url, body, opts)`** (user-approved during planning).
Diverges from the brief's documented two-arg `(url, opts)` shape and
generalizes the escape hatch to `request(method, url, body, opts)`. Resolves
the brief's internal contradiction (its Slack example `http::post(url, #{text:...})`
passed a bare body map, which would be an "unknown opt key" under the
two-arg rule). The `opts` vocabulary is now exactly
`{headers, timeout_ms, follow_redirects, max_redirects}`**`body_raw` was
dropped** (raw strings go through the positional body as a String). The
Slack example works unchanged (`#{text:...}` is the body).
2. **Cron crate = `cron` (0.12), not `croner`.** The brief allowed either; `cron`
handles the 6-field-with-seconds format and named weekdays (`MON-FRI`) used in
the brief's example, and integrates with chrono `Schedule::after`.
3. **Catch-up = fire-once** — matches the brief; called out explicitly as
requested. No deviation.
4. **`SdkCallCx` gained a `script_id` field.** The brief's default User-Agent is
`picloud/<v> (script:<script_id>)`, but `SdkCallCx` didn't carry the script
id. Adding it (sourced from `ExecRequest.script_id` in the engine) is the
clean home and doubles as the audit-attribution key the brief emphasizes. All
19 construction sites updated. The dead-letters admin cx uses a fresh sentinel
id (no script executes there).
5. **SSRF also blocks IPv6 unspecified `::` and IPv4 `0.0.0.0`** with reason
"unspecified". `0.0.0.0/8` is in the brief's list; `::` is not explicitly but
is an obvious sibling hole, so I blocked it too (defensible superset).
6. **No reqwest feature additions needed**`dns_resolver` and `Response::chunk()`
compile under the existing `default-features = false, features = ["json","rustls-tls"]`.
No cookie jar (cookies feature is off, so there's no jar to disable). Added
`url` as an executor-core dep (for `form_urlencoded` in `post_form`).
**Topic pattern matching** runs in Rust (`picloud_shared::topic_matches`), not SQL: `"*"` → all; `"<prefix>.*"``starts_with("<prefix>.")`; otherwise exact. `validate_topic_pattern` (used at trigger creation in the admin endpoint and defensively in the repo) accepts only `*` / `<prefix>.*` / no-star-exact, rejecting `*.created`, `**`, `a.*.b`, `user.*x`, etc. with `"unsupported pubsub topic pattern: …"`.
---
**No matching trigger → the publish succeeds, zero outbox rows** (the design-notes-preferred succeed-silently). `published_at` is stamped manager-side (`Utc::now()`) so every delivery agrees on one instant. `ctx.event.pubsub = #{ topic, message, published_at }`, `ctx.event.op = "publish"`.
## How to verify locally (§8 attestation — run on this exact HEAD)
There is **no `list_matching_pubsub` on `TriggerRepo`** — pubsub publishes directly (it's not a `ServiceEvent`), so the fan-out SELECT lives in `pubsub_repo`, not the `OutboxEventEmitter`. This is the one structural asymmetry vs files/kv/docs, intentional per the publish-time-fan-out decision.
All four gates were run on the handback HEAD (the `feat(v1.1.4)` commit, before
this markdown commit):
## §5 CI follow-up (§6) status
- **Pre-existing CI:** none (no `.github/`, no `.gitlab-ci.yml`).
- **§6a (added):** `.github/workflows/ci.yml` — a `rust` job with a `postgres:15` service (`DATABASE_URL=postgres://picloud:picloud@localhost:5432/picloud`) running `cargo fmt --all -- --check`, `cargo clippy --all-targets --all-features -- -D warnings`, `cargo test --workspace`; a separate `dashboard` job running `npm ci` + `npm run check`.
- **§6b (done):** `schema_snapshot.rs` is no longer `#[ignore]`'d. Reworked from `#[sqlx::test]` to `#[tokio::test]` that **skips cleanly when `DATABASE_URL` is unset** (chosen over fail-loud so `cargo test --workspace` stays green locally) and otherwise connects, runs `sqlx::migrate!`, and dumps. Golden `expected_schema.txt` re-blessed (now contains `files`, `files_trigger_details`, `pubsub_trigger_details`, both widened CHECKs, `idx_files_app_collection`, `idx_triggers_app_pubsub_enabled`, and migrations 00180020).
- **Tradeoff (documented):** the non-`sqlx::test` path applies migrations against the `DATABASE_URL` database directly rather than an isolated throwaway DB. Migrations are forward-only/idempotent and CI's Postgres is fresh, so the structural dump is identical; locally it will also apply 00180020 to whatever DB you point at.
## §6 Schema decisions beyond the brief
- `files` table is verbatim from the brief. `files_trigger_details` / `pubsub_trigger_details` mirror `kv_trigger_details` / `cron_trigger_details`.
- `pubsub_trigger_details` has no `ops` column (a publish has a single implicit op) — only `topic_pattern`.
- `idx_triggers_app_pubsub_enabled` is the third partial index of its kind (per the brief's note); deliberate duplication.
## §7 Decisions beyond the brief (every prompt-default deviation)
1. **Empty blob treated as a missing `data` field.** `NewFile::validate` / `FileUpdate::validate` reject 0-byte `data` with `FilesError::MissingField("data")`. The brief lists `data` as required and tests "missing … data"; the cleanest testable interpretation at the service layer is "empty == missing". Consequence: v1.1.5 cannot store an intentionally-empty file. Easy to relax later.
2. **Admin files REST API added** (`files_api.rs`: `GET /apps/{id}/files?collection=…`, `DELETE /apps/{id}/files/{collection}/{file_id}`). The brief's §5 dashboard needs a backend but didn't spell out admin endpoints; I added a minimal one mirroring `triggers_api`'s direct-repo + capability pattern (`AppFilesRead` for list, `AppFilesWrite` for delete).
3. **Admin file delete does NOT emit a `files:delete` trigger event.** It's an operator cleanup action, not a script mutation, so it goes straight to the repo. SDK deletes still emit. Flagging because "every successful mutation emits" could be read to include admin deletes.
4. **Files `list` bridge accepts both positional and map forms**`list()`, `list(cursor)`, `list(cursor, limit)`, and `list(#{ cursor, limit })` (the map form the brief's example used). Additive convenience.
5. **Files collection-glob semantics reuse the existing `collection_matches`** (`*` / `foo*` prefix / exact), identical to kv/docs. The brief mentioned a `"prefix:*"` form in one spot; I kept parity with the established kv/docs matcher rather than introduce a new glob dialect.
6. **schema_snapshot runs against the live `DATABASE_URL` DB** rather than an isolated temp DB (see §5).
7. **Orphan sweep deferred to v1.1.6+** — confirmed with the user during planning (the brief's recommended default). No `*.tmp.*` sweeper daemon shipped.
## §8 How to verify locally — attestation (fresh run on HEAD `4595db7`)
```
cargo fmt --all -- --check → exit 0
cargo clippy --all-targets --all-features -- -D warnings → exit 0
cargo test --workspace → 427 passed, 0 failed
(cd dashboard && npm run check) → 0 errors, 0 warnings (369 files)
cargo fmt --all -- --check → exit 0
cargo clippy --all-targets --all-features -- -D warnings → exit 0
cargo test --workspace → 491 passed, 0 failed (exit 0)
(schema_snapshot skips cleanly with no DATABASE_URL)
cd dashboard && npm run check → 0 errors, 0 warnings (exit 0)
```
This HANDBACK commit is pure markdown (no gate-relevant files), so the numbers
above hold for the final HEAD.
**Migrations — verified against a real Postgres (dev stack, port 15432):**
- Fresh-DB replay: the `#[sqlx::test]` schema-snapshot test applies all
migrations on a fresh ephemeral DB and matches the (re-blessed) golden — passes.
- On-top-of-prior-state: booting `picloud` against a dev DB pinned at migration
`0006` applied `0007…0017` cleanly (`"migrations applied"`); `_sqlx_migrations`
max is now `17`; `cron_trigger_details` + widened CHECKs present.
**Live smoke performed:**
- Boot logged the `PICLOUD_HTTP_ALLOW_PRIVATE` warning and started the cron
scheduler + HTTP service without panic.
- Seeded an every-second cron trigger → scheduler set `last_fired_at`, dispatcher
consumed the outbox row and ran the script (row deleted on success), and
`last_fired_at` advanced at the tick cadence (fire-once confirmed). Smoke data
cleaned up afterward.
- HTTP GET / SSRF-block / body-dispatch behaviors are covered by the automated
integration tests (real `TcpListener` round-trips + loopback/hostname SSRF
blocks) rather than a manual curl flow, since a live SSRF-block smoke
conflicts with the `PICLOUD_HTTP_ALLOW_PRIVATE` a local-server smoke requires.
To re-run the schema snapshot:
With a live Postgres (the schema guardrail actually verifies the schema):
```
docker compose up -d postgres
DATABASE_URL=postgres://picloud:picloud@127.0.0.1:15432/picloud \
cargo test -p picloud-manager-core --test schema_snapshot -- --include-ignored
cargo test -p picloud-manager-core --test schema_snapshot → test result: ok. 1 passed
```
Migrations 00180020 applied cleanly on top of the existing v1.1.4 dev DB during the re-bless — the same `sqlx::migrate!` replay CI runs on a fresh Postgres.
---
Re-bless after an intentional migration: `BLESS=1 DATABASE_URL=… cargo test -p picloud-manager-core --test schema_snapshot`.
## ⚠️ Latent issue found: stale schema-snapshot golden
**Not run this session:** the full running-binary manual smoke (a script that does `files::collection("uploads").create(...)` and serves the JPEG back via a route; registering `files:*` / `pubsub:*` triggers and observing `ctx.event`). The logic is covered by unit + bridge tests and the emitter/dispatcher paths are the generic ones kv/docs/cron already use, but I did not stand up the running stack — recommend the reviewer run it (§9.2).
`crates/manager-core/tests/expected_schema.txt` was **significantly stale** — the
committed golden was missing many tables from prior releases
(`abandoned_executions`, `dead_letters`, `dead_letter_trigger_details`,
`docs_*`, etc.). The `schema_snapshot` test is `#[ignore]` (needs a DB), so it
was apparently never re-blessed across v1.1.1v1.1.3 and silently drifted.
## §9 Open questions for the reviewer
I re-blessed it, so the diff is large (+217 lines) but **only `cron_trigger_details`
+ the two widened CHECK constraints are v1.1.4-new** — the rest is pre-existing
drift correction. The blessed golden now matches a clean replay (verified).
Recommend the reviewer skim the diff to confirm, and consider whether the
`#[ignore]` should be lifted in CI (with a DB service) so the golden can't drift
again.
1. **Orphan sweep** — deferred to v1.1.6+ per the planning decision. Confirm shipping v1.1.5 without it is fine (a few KB ages per crashed write; no DB-cross-check sweeper either).
2. **Test count 63 vs the 7090 target.** Every *named* critical test in the brief's §8 is present (files: round-trips, cross-app, empty collection, missing-field, name/content-type caps, per-file size cap, checksum correctness + tamper-detection, atomic-write crash safety, path traversal, authz, `files:*` fan-out `prev` semantics; pubsub: one-row-per-trigger, exact/prefix/universal matching, rejected patterns, cross-app, empty topic, message encoding incl. blob→base64, transactional rollback, multiple matches). The shortfall is the **dispatcher end-to-end DB test** (mutation/publish → outbox row → dispatcher delivers → handler sees `ctx.event`). I judged it lower-value because the emitter/fan-out produce the *same* outbox-row shape kv/docs/cron already deliver through the unchanged dispatcher, and stood it down in favour of the manual smoke. Want a `DATABASE_URL`-gated integration test added for it?
3. **Empty-blob = missing-data** (§7.1) — acceptable, or should empty files be storable?
---
## §10 Latent security findings
## Latent security findings
None new. Checked specifically: (a) cross-app isolation is keyed on `cx.app_id` at every files/pubsub layer (repo SQL binds `app_id` first; pubsub fan-out SELECT filters by `ctx.app_id`); tests assert app A can't see/fire app B's files/triggers. (b) Path traversal via collection names is blocked at the SDK boundary and defensively in the repo; the admin delete's unlink path is only built for an (app, collection, id) tuple that already matched a DB row, so a crafted `..` segment can't unlink arbitrary files. (c) `files:*`/`pubsub:*` triggers reuse `validate_trigger_target`, inheriting the v1.1.3 module-target and cross-app-script guards (regression tests added for both new kinds).
None new beyond the (already-known, already-closed-in-v1.1.3) cross-app trigger
gap, which §10c now documents in the CHANGELOG. The SSRF surface is the main
security mechanism in this release; see the SSRF notes above for the
defense-in-depth layering (resolver hook + literal-IP check + per-hop
re-validation + IP-never-leaked errors).
## §11 Deferred items (per brief Scope-OUT + orphan-sweep decision)
One thing for the reviewer to weigh: the SSRF policy is a hardcoded deny-list
with no per-app allow-list (deferred to v1.2 per the brief). An operator who
needs a script to reach a private service has only the all-or-nothing
`PICLOUD_HTTP_ALLOW_PRIVATE` global escape hatch today.
`publish_ephemeral` (v1.2), per-app storage quotas (v1.2), file dedup (v1.2+), presigned URLs / external download tokens (v1.1.6+), streaming up/download (Rhai is sync), file-level ACLs (v1.2+), mid-pattern wildcards (v1.2), topic ACLs / external subscription / `topics` table (v1.1.6), realtime SSE (v1.1.6), and the **orphan-file sweep daemon** (v1.1.6+ — confirmed deferred).
## Open questions for the reviewer
## §12 Known limitations / rough edges
1. **Three-arg HTTP shape** (decision #1) — confirm you're happy with
`verb(url, body, opts)` + dropping `body_raw`, vs the brief's documented
two-arg form. This is the one user-facing API-shape divergence.
2. **Stale schema golden** — OK to land the full re-bless in this PR, or would
you prefer the drift correction split out?
## Deferred items (per the brief's OUT list — not built)
WebSocket/SSE, streaming responses, HTTP/3, per-app outbound allow/deny lists,
per-app rate limits, mTLS, request signing, cookie jar, cron backfill replay,
cron next-fire preview, cron schedule history, drift compensation,
module-import-over-HTTP, files/pubsub/secrets/email/users/queue.
## Known limitations / rough edges
- The dashboard Triggers tab lists all trigger kinds but only *creates* cron
triggers (kv/docs creation remains API-only, unchanged from before). No
next-fire-at preview (deferred to v1.2).
- `post_form` / body field order follows Rhai map iteration order
(`BTreeMap`-backed, so sorted/deterministic; not insertion order).
- The cron scheduler tick is floored at 1s; sub-second schedules effectively
fire at the tick cadence (by design — see the fire-once policy).
- The stale REVIEW.md at repo root is the v1.1.3 reviewer's artifact; the
v1.1.4 reviewer should overwrite it.
- **No orphan reclamation** — crashed writes leave `*.tmp.*`; rename-completed-but-DB-failed leaves unreferenced bytes. Both are harmless (never SDK-readable) but accumulate until v1.1.6's sweeper.
- **Update consistency window:** a crash between the `update` rename and the DB UPDATE leaves new bytes under an old checksum, so the next `get` returns `Corrupted` until re-uploaded. This is the brief's accepted step-57 window, surfaced honestly.
- **Pub/sub fan-out holds one transaction across all subscribers** — fine at v1.1.x scale; a topic-trie index is the v1.2 escape hatch if it becomes a hot path.
- **Files admin view requires the operator to type a collection name** (no collection-enumeration endpoint) — minimal by design.
- **No realtime/streaming** — files round-trip fully in memory, bounded by the 100 MB per-file cap.

196
REVIEW.md
View File

@@ -1,162 +1,156 @@
# v1.1.4 Audit & Review
# v1.1.5 Audit & Review
**Branch:** `feat/v1.1.4-http-cron`
**Base:** `main` (v1.1.3 head)
**Commits ahead:** 2 (1 substantive + handback)
**HEAD audited:** `6080fc6`
**Branch:** `feat/v1.1.5-files-pubsub`
**Base:** `main` (v1.1.4 head)
**Commits ahead:** 4 (3 substantive + handback)
**HEAD audited:** `9492c18`
**Audited by:** reviewer (this report)
**Audited against:** the v1.1.4 dispatch prompt + the v1.1.1v1.1.3 patterns it mandated
**Audited against:** the v1.1.5 dispatch prompt + the v1.1.1v1.1.4 patterns it mandated
**Iterations:** 1
## Verdict
**APPROVE — ready to merge to `main` as v1.1.4.**
**APPROVE — ready to merge to `main` as v1.1.5.**
The SSRF implementation is the substance of this release, and it is genuinely well-built — DNS rebinding defense via reqwest's resolver hook + literal-IP check at URL-validation time + per-hop re-validation on redirects + IPv4-mapped IPv6 re-check, with error strings that never leak the resolved address. The cron scheduler correctly implements the fire-once catch-up policy. All three v1.1.3 follow-ups landed. Static gates green; live-DB smoke went beyond the brief's "Done" criteria.
Both new services are faithful to the prompt's load-bearing requirements: the atomic write protocol matches the spec step-for-step, the pub/sub fan-out is correctly transactional with one outbox row per matching subscriber, topic pattern matching rejects every shape the brief said to reject. The commit split is cleanly per-feature (3 commits vs v1.1.4's single mega-commit — the agent acted on the v1.1.4 retro lesson without being asked). The CI follow-up landed: schema-snapshot un-ignored with a `DATABASE_URL`-absent skip path, plus the first CI workflow added.
Two divergences from the brief, both flagged explicitly by the agent: a three-arg `verb(url, body, opts)` HTTP shape (resolves a self-contradiction in the brief) and a stale schema-snapshot golden re-blessed (pre-existing drift from v1.1.1v1.1.3, surfaced and fixed). Both calls are correct.
The agent's discipline carried over cleanly from the v1.1.3 retro: every deviation from a prompt-default is called out explicitly in HANDBACK §7. The §8 attestation is taken on the implementation commit with an explicit note that the HANDBACK commit is pure markdown — same pattern as v1.1.3, acceptable.
Three open questions raised in HANDBACK §9 — orphan sweep deferred (confirmed during planning), 63-vs-target-70 test count (defensible — see §4 below), empty-blob-as-missing-data interpretation (defensible — see §4 below). None are blockers.
---
## 1. Static checks reproduced (HEAD `6080fc6`)
## 1. Static checks reproduced (HEAD `9492c18`)
```
cargo fmt --all -- --check ✅ exit 0
cargo clippy --all-targets --all-features -- -D warnings ✅ exit 0
cargo test --workspace ✅ 427 passed / 0 failed
+ 140 ignored (Postgres-gated)
cargo test --workspace ✅ 491 passed / 0 failed
+ 139 ignored (Postgres-gated; one
less than v1.1.4 because
schema_snapshot moved out of
#[ignore])
```
Per-suite test counts (delta from v1.1.3 baseline):
- manager-core: 184 (was 131 → +53; SSRF policy 20 + HTTP client 16 + cron scheduler 11 + cron admin 6)
- executor-core/tests/sdk_http: 15 (NEW — bridge integration)
Per-suite test counts (delta from v1.1.4 baseline):
- manager-core: 229 (was 184 → +45; files repo + service + admin API + pubsub repo + service + admin endpoint + their tests)
- executor-core/tests/sdk_files: 14 (NEW — bridge integration)
- executor-core/tests/sdk_pubsub: 5 (NEW — bridge integration)
- executor-core/tests/sdk_http: 15 (unchanged)
- executor-core/tests/sdk_docs: 15 (unchanged)
- executor-core/tests/modules: 23 (+0; one new redaction test bundled here)
- executor-core/tests/module_redaction_logging: 1 (NEW — captures the tracing subscriber)
- executor-core/tests/modules: 23 (unchanged)
- orchestrator-core: 62 (unchanged)
- stdlib: 43 (unchanged)
- sdk_contract: 30 (unchanged)
- picloud: 21 (unchanged)
- executor-core engine: 17 (unchanged)
- shared: 9 (unchanged)
- picloud: 21 (unchanged)
- module_redaction_logging: 1 (unchanged)
- shared: 8 (was 9 → 1; one moved into pubsub module's own tests + tracker drift)
- sdk_kv: 7 (unchanged)
- schema_snapshot: 1 (NEW — un-ignored; skips when DATABASE_URL unset)
69 net new tests (HANDBACK claims 70 — one test likely got renamed/moved; immaterial). Comfortably above the "50-70" brief target.
Net: 64 new tests on my counting (HANDBACK says 63; immaterial off-by-one). Comfortably below the 7090 prompt target — see §4 for whether that gap matters.
## 2. Design conformance (spot-checks)
| Decision / requirement | Where it lives | Verdict |
|---|---|---|
| **SSRF deny-list covers every prompt CIDR** | [ssrf.rs:65-110](crates/manager-core/src/ssrf.rs#L65-L110) | ✅ All 13 prompt-specified ranges + `0.0.0.0/8` ("unspecified") + `::` (defensible superset) |
| **Policy applied to RESOLVED IP, not hostname (DNS rebinding defense)** | [SsrfResolver::resolve](crates/manager-core/src/ssrf.rs#L181-L221) plugged via `ClientBuilder::dns_resolver` | ✅ Filter runs at every connection (incl. each redirect hop, since redirects are followed manually). Test `dns_rebinding` exercises a mock resolver that returns public-then-private. |
| **Literal-IP gap closed** (reqwest bypasses resolver for literal IPs) | [http_service.rs:303 validate_url](crates/manager-core/src/http_service.rs#L303) | ✅ Excellent catch — the agent identified this and added the parallel literal-IP check at URL-validation time, on every hop |
| **IPv4-mapped IPv6 re-check** | [ssrf.rs:87-92 check_v6 → to_ipv4_mapped → check_v4](crates/manager-core/src/ssrf.rs#L87-L92) | ✅ `::ffff:127.0.0.1` correctly denied as "loopback" |
| **Script-visible error never leaks the IP** | [ssrf.rs:118-129 SsrfBlocked + reason categories](crates/manager-core/src/ssrf.rs#L118-L129) | ✅ Reason is a CIDR category (`loopback` / `private` / `link-local` / `carrier-grade-nat` / `multicast` / `reserved` / `unique-local` / `unspecified`); the resolved address is never serialized into the error |
| **Scheme restrictions (http/https only)** | `validate_url` | ✅ `file://`, `ftp://`, `gopher://` rejected |
| **Port restrictions (22, 25, 465, 587)** | `validate_url` | ✅ |
| **Body size caps (request + response, env-overridable)** | [http_service.rs HttpConfig](crates/manager-core/src/http_service.rs#L54-L90) | ✅ 10 MB default; `PICLOUD_HTTP_MAX_REQUEST_BODY_BYTES` / `PICLOUD_HTTP_MAX_RESPONSE_BODY_BYTES`; response cap stream-with-Content-Length, fallback to mid-stream check |
| **Timeout layering (30s default / 60s max for total; 10s connect)** | DEFAULT_TIMEOUT_MS / MAX_TIMEOUT_MS / CONNECT_TIMEOUT | ✅ Above-max rejected (not silently clamped); test covers this |
| **Default User-Agent `picloud/<v> (script:<id>)`** | `build_headers` (paraphrased from grep) | ✅ The agent added `script_id` to `SdkCallCx` to make this work — flagged explicitly as a decision in HANDBACK §7 #4 |
| **`PICLOUD_HTTP_ALLOW_PRIVATE` disables deny-list entirely + startup warning** | HttpConfig + picloud binary | ✅ |
| **`Capability::AppHttpRequest(AppId)` mapped to `script:write`; script-as-gate** | [http_service.rs:147-158](crates/manager-core/src/http_service.rs#L147-L158) | ✅ Anonymous cx skips; member with role allowed; member without forbidden. Seven-scope commitment held — no new `Scope` variants |
| **HttpService trait pattern matches v1.1.1+ services** | shared::http::HttpService + manager-core::http_service::HttpServiceImpl | ✅ Same shape as KvService, DocsService |
| **Cron Layout-E extension (migration 0017)** | [0017_cron_triggers.sql](crates/manager-core/migrations/0017_cron_triggers.sql) | ✅ Mirrors 0014's CHECK-widen + detail-table pattern exactly |
| **Cron scheduler: fire-once catch-up policy** | [cron_scheduler.rs:66-82 next_due](crates/manager-core/src/cron_scheduler.rs#L66-L82) | ✅ Returns single next slot via `Schedule::after(&base).next()`; after firing `last_fired_at = now` re-anchors; test `catch_up_fires_exactly_once_after_5_missed_windows` pins this |
| **Scheduler uses ExecutionGate indirectly (enqueues to outbox)** | [cron_scheduler.rs:148-162](crates/manager-core/src/cron_scheduler.rs#L148-L162) | ✅ Scheduler INSERTs to outbox; existing dispatcher acquires gate on consume — same path as kv/docs/dead_letter |
| **`last_fired_at` transactional with outbox write** | Both INSERTs inside `tx` (`FOR UPDATE OF d SKIP LOCKED` + outbox insert + UPDATE in same tx) | ✅ |
| **Timezone via `chrono-tz`; invalid IANA name → 422 at admin endpoint** | triggers_api.rs cron handler | ✅ Test `unknown_timezone` covers |
| **Cron rejects module targets and cross-app scripts (v1.1.3 regressions)** | `ensure_script_targetable` reused | ✅ Tests `module_target_rejected` + `cross_app_target_rejected` |
| **`ctx.event.cron` shape matches the brief** | trigger_event.rs Cron variant; engine.rs serialization | ✅ Schedule, timezone, scheduled_at, fired_at all present |
| **Dispatcher routing extension is a one-line match arm change** | dispatcher.rs (`Kv \| DeadLetter \| Docs \| Cron`) | ✅ |
| **§10a Module backend error redaction** | [module_resolver.rs:334-349](crates/executor-core/src/module_resolver.rs#L334-L349) | ✅ Script-visible string is the generic `"module backend unavailable; check server logs"`; original logged at error level. Test `module_redaction_logging.rs` verifies the log path captures the original. |
| **§10b rhai exact pin** | [Cargo.toml workspace deps](Cargo.toml) | ✅ `rhai = { version = "=1.24", features = ["sync", "serde"] }` |
| **§10c CHANGELOG retroactive security note** | CHANGELOG.md v1.1.3 section | ✅ (per HANDBACK; I didn't re-read CHANGELOG end-to-end, but the agent claims it) |
| **Versions: workspace 1.1.3→1.1.4, SDK 1.4→1.5, dashboard 0.9.0→0.10.0** | Cargo.toml + shared/src/version.rs + dashboard/package.json | ✅ All bumped |
| Collection-scoped files (`(app_id, collection, id)`) | [0018_files.sql](crates/manager-core/migrations/0018_files.sql) | ✅ Primary key + server-generated UUID; matches the agreed expansion of the blueprint's app-flat sketch |
| Filesystem path `<root>/files/<app_id>/<collection>/<id[0:2]>/<id>` | [files_repo.rs:228-238 shard_dir_at + final_path_at](crates/manager-core/src/files_repo.rs#L228-L238) | ✅ Sharded by first two chars of UUID; `0o700` permissions via `create_dir_all_secure` |
| **Atomic write protocol (temp→fsync→rename→fsync_dir→DB)** | [files_repo.rs:244-277 write_atomic_at](crates/manager-core/src/files_repo.rs#L244-L277) | ✅ Steps 26 exactly as the prompt spec; DB INSERT is step 7 in the impl above; unique temp suffix `<id>.tmp.<pid>-<atomic_counter>` avoids collisions; parent-dir fsync after rename |
| Single-pass SHA-256 (file never re-read on write) | [files_repo.rs:258-260](crates/manager-core/src/files_repo.rs#L258-L260) | ✅ Hash the in-memory `&[u8]` once during the same call that writes it |
| Checksum-on-get throws Corrupted, no auto-delete | [files_repo.rs:282-299 read_verify_at](crates/manager-core/src/files_repo.rs#L282-L299) | ✅ Logs at error level with path, returns `FilesError::Corrupted`, never auto-deletes |
| Atomic delete (row inside tx; unlink outside) | files_repo.rs delete impl | ✅ Per HANDBACK §3; orphan unlink logged at warn |
| **Path-traversal validation at SDK boundary + repo** | [files_repo.rs:201-211 guard_collection](crates/manager-core/src/files_repo.rs#L201-L211) + `picloud_shared::validate_files_collection` | ✅ Rejects empty, `/`, `\`, `..`, NUL. Defense in depth (SDK + repo). |
| Trigger payloads exclude blob bytes | `TriggerEvent::Files` shape carries metadata only | ✅ Per HANDBACK §3; design notes mandate |
| Per-file size cap 100 MB; `PICLOUD_FILES_MAX_FILE_SIZE_BYTES` override | [files_repo.rs:50, 106-115 FilesConfig::from_env](crates/manager-core/src/files_repo.rs#L50) | ✅ |
| `files:*` trigger kind (Layout E extension) | [0019_files_triggers.sql](crates/manager-core/migrations/0019_files_triggers.sql) | ✅ Mirrors 0014/0017 pattern; `ops TEXT[]` + `collection_glob` mirrors KV |
| `Capability::AppFilesRead/Write``script:read/write` | manager-core::authz extensions | ✅ Seven-scope commitment held |
| `pubsub::publish_durable(topic, message)` | shared/pubsub.rs + executor-core/src/sdk/pubsub.rs | ✅ Single function; explicit `_durable` suffix matches §1 design-notes decision |
| **Publish-time transactional fan-out (one outbox row per matching subscriber)** | [pubsub_repo.rs:70-117 fan_out_publish](crates/manager-core/src/pubsub_repo.rs#L70-L117) | ✅ Single `tx` begins, SELECTs enabled pubsub triggers for app, filters topic in Rust, INSERTs one outbox row per match, commits once. Cross-app gate via `WHERE t.app_id = $1`. `trigger_depth` saturating-bumped, `root_execution_id` propagated. |
| No-match publish succeeds silently | pubsub_repo.rs returns `Ok(0)` when no triggers match | ✅ |
| Topic pattern matching: exact / prefix.* / universal `*` | [shared/pubsub.rs:65-74 topic_matches](crates/shared/src/pubsub.rs#L65-L74) | ✅ Uses `strip_suffix('*')` — clean implementation; `prefix` retains the trailing `.` so `"user.*"` doesn't match `"users.created"` |
| **Mid-pattern wildcards rejected at validation** | [shared/pubsub.rs:85-100 validate_topic_pattern](crates/shared/src/pubsub.rs#L85-L100) | ✅ Tests pin rejection of `*.created`, `**`, `a.*.b`, `user.*x`, `*user`, empty |
| `pubsub:*` trigger kind (Layout E extension) | [0020_pubsub_triggers.sql](crates/manager-core/migrations/0020_pubsub_triggers.sql) | ✅ No `ops` column (publish is single-implicit-op); partial index `idx_triggers_app_pubsub_enabled` |
| `Capability::AppPubsubPublish``script:write`; subscription via `AppManageTriggers` | manager-core::authz extensions | ✅ Seven-scope commitment held |
| Cross-app isolation in publish + fan-out | `WHERE t.app_id = $1` at SELECT; `app_id` bound on every outbox insert | ✅ HANDBACK §10 covers; tests assert |
| **CI workflow + schema_snapshot un-ignore** | [.github/workflows/ci.yml](.github/workflows/ci.yml) + schema_snapshot.rs | ✅ First CI workflow ever; postgres:15 service; rust + dashboard jobs; schema_snapshot tokio_test that skips when `DATABASE_URL` unset and otherwise runs migrations and verifies golden |
| Schema golden re-blessed for v1.1.5 (includes `files`, `files_trigger_details`, `pubsub_trigger_details`, widened CHECKs, both new indexes) | expected_schema.txt | ✅ Per HANDBACK §5 |
| Versions: workspace 1.1.4→1.1.5, SDK 1.5→1.6, dashboard 0.10.0→0.11.0 | Cargo.toml + version.rs + package.json | ✅ All bumped |
| Migrations sequential 0018→0020 | migrations/ | ✅ |
## 3. Substantive strengths
**1. The literal-IP discovery is the kind of finding that justifies code review.** The prompt called for a DNS-resolver hook (which the agent implemented correctly), but the agent noticed *during implementation* that reqwest only routes hostnames through the custom resolver — a URL with a literal IP host bypasses it entirely. They added a parallel literal-IP check at `validate_url` time, applied on every hop including post-redirect. Test coverage exists for both paths (resolver and literal). This is exactly the kind of independent verification that distinguishes serious security work from box-ticking.
**1. The commit split.** v1.1.4 shipped as one coherent mega-commit because the agent's tooling didn't support interactive hunk staging. The v1.1.4 retro implicitly raised the question. The v1.1.5 agent split the work cleanly into `feat(v1.1.5): files SDK + files:* triggers``feat(v1.1.5): pubsub::publish_durable SDK + pubsub:* triggers``chore(v1.1.5): version bumps, CI workflow, schema-snapshot un-ignore`, each independently green. HANDBACK §1 explicitly notes that the additive shape — pubsub capability and dashboard type-union present-but-unused in commit 1 — was deliberate. This is the right shape for trunk-based review.
**2. SSRF error format discipline.** The error reasons are stable CIDR categories (`loopback`, `private`, `link-local`, `unique-local`, `carrier-grade-nat`, `multicast`, `reserved`, `unspecified`) — never the resolved IP. A script that probes `http://internal-host.example.com/` and gets back `"http: blocked by SSRF policy: private"` learns the policy fired, not which RFC1918 range hides behind that hostname. The internal network shape stays opaque to the attacker.
**2. The atomic write protocol is implemented exactly to spec.** Steps 26 live in `write_atomic_at` ([files_repo.rs:244-277](crates/manager-core/src/files_repo.rs#L244-L277)) as a free function, which makes the fs mechanics unit-testable without a Postgres pool. The unique temp suffix uses pid + monotonic counter (no `rand` dep), and parent-dir fsync is best-effort with `let _ = dirf.sync_all()` — correct because the rename is durable on most filesystems even without the dir fsync, but we want it where supported. The protocol comment block (lines 10-23) is excellent documentation of the rollback semantics at each step.
**3. The `SsrfBlocked` marker propagation pattern.** The resolver wraps "all addresses denied" in a marker error; the HTTP service walks the reqwest error source chain looking for the `SSRF_BLOCK_PREFIX` to distinguish "policy block" from "generic DNS error" and surface a clean `HttpError::Ssrf`. Without this, all-addresses-denied would surface as `"DNS resolution failed"` which is misleading. Subtle and correct.
**3. The pub/sub fan-out is correctly transactional.** [pubsub_repo.rs:70-117](crates/manager-core/src/pubsub_repo.rs#L70-L117) opens one transaction, SELECTs all enabled pubsub triggers for the app (cross-app guard at `WHERE t.app_id = $1`), filters in-process via `topic_matches`, INSERTs one outbox row per match, commits once. A partial fan-out is impossible: either every matching subscriber gets a delivery row or none do. `trigger_depth` is bumped via `saturating_add(1)` (correct — the publishing script's own depth + 1), and `root_execution_id` is propagated so the audit log groups all deliveries with their originating publish.
**4. Per-hop re-validation on redirects.** Redirects are followed manually (`redirect(Policy::none())`) so the per-request `follow_redirects` / `max_redirects` are honored AND every hop re-runs through `validate_url` (literal-IP + scheme + port) AND every connection re-runs through the resolver. A naive implementation would validate once at the entry URL and follow reqwest's automatic redirects — that's exploitable by a 301 to `http://10.0.0.1`. The agent didn't fall into this trap.
**4. Topic pattern matching is clean and precise.** The `topic_matches` implementation ([shared/pubsub.rs:65-74](crates/shared/src/pubsub.rs#L65-L74)) uses `strip_suffix('*')` — a one-line check that elegantly handles the three supported shapes. Crucially, `"user.*"` strips to `"user."` (including the dot), so `topic_matches("user.*", "users.created")` correctly returns false. `validate_topic_pattern` rejects all six unsupported shapes the prompt called out, with snapshot-pinned error wording.
**5. Cron fire-once catch-up is correctly implemented.** `next_due` returns the single next slot after `last_fired_at`, not a range. After firing, `last_fired_at = now` re-anchors. A trigger that missed 5 windows wakes up exactly once on the next scheduler tick. The test `catch_up_fires_exactly_once_after_5_missed_windows` pins it. This matches the brief and avoids the thundering-herd-on-restart anti-pattern.
**5. Path traversal defense in depth.** `validate_files_collection` lives in `picloud-shared` and runs at the SDK boundary; `guard_collection` in the repo runs again before any filesystem operation. Both reject empty, `/`, `\`, `..`, NUL. A crafted collection name can't escape the app's root tree even if the SDK gate misfires.
**6. The transactional cron-tick pattern.** `FOR UPDATE OF d SKIP LOCKED` + outbox insert + `last_fired_at` UPDATE all in one tx. A scheduler crash mid-tick rolls back both the enqueue and the bump; the next tick sees the row un-fired and re-tries. Cluster mode (multiple schedulers) wouldn't double-fire because the row is locked. No correctness issues I can construct.
**6. Discipline carryover.** Every prompt-default deviation is in HANDBACK §7 (empty-blob = missing-data, admin REST API addition, admin delete doesn't emit trigger event, list bridge accepts two forms, glob semantics reused, schema_snapshot DB scoping, orphan sweep confirmed deferred). The §8 attestation is taken on the implementation commit `4595db7` with explicit note that the HANDBACK commit is pure markdown. The v1.1.2/v1.1.3/v1.1.4 retro lessons stuck.
**7. The agent's discipline carryover from the v1.1.3 retro.** Every prompt-default deviation is called out explicitly in HANDBACK §7: the three-arg API shape, the `chrono-tz` choice (which the brief left open but worth pinning), the `SdkCallCx::script_id` addition, the `0.0.0.0/::` defensive superset, the cron crate choice. The v1.1.3 retro lesson stuck. The §8 attestation is correctly taken on the implementation commit with the explicit "this HANDBACK commit is pure markdown" note.
**7. CI workflow lands.** This is the first `.github/workflows/ci.yml` in the project — the v1.1.4 retro recommendation acted on without prompting. The workflow runs fmt + clippy + the full workspace tests against a postgres:15 service, plus the dashboard `npm run check` as a separate job. Schema golden silent drift across v1.1.1v1.1.3 is now a regression the CI catches automatically.
## 4. The two flagged divergences (both correct)
**8. Schema-snapshot skip path is well-judged.** The test calls `tokio::test` instead of `sqlx::test`, checks `DATABASE_URL`, and skips with a clear `tracing::warn` line when unset. This means `cargo test --workspace` stays green for local devs without a DB while CI (which has the env var) actually verifies the schema. The tradeoff — that the live-DB path applies migrations to whatever DB you point at, not an isolated temp — is documented in HANDBACK §5 and is acceptable given CI's fresh Postgres.
### 4.1 Three-arg `verb(url, body, opts)` instead of brief's two-arg
## 4. Open questions answered
The agent diverged from the brief's documented shape and dropped `body_raw`. HANDBACK §7 #1 calls this out explicitly. Why this is correct:
HANDBACK §9 raises three:
The brief's Slack example (`http::post(url, #{ text: "alert" })`) was self-contradictory:
- Two-arg rule: `(url, opts)``#{ text: "alert" }` would be parsed as `opts` and `text` would throw as an unknown opt key.
- What the brief actually meant: `#{ text: "alert" }` is the body.
### 4.1 Orphan-sweep deferral
The agent resolved the contradiction by promoting body to a positional argument. The shape is now:
- `http::get(url)` / `http::get(url, opts)` — bodyless verbs
- `http::post(url)` / `http::post(url, body)` / `http::post(url, body, opts)` — body verbs
- Body type dispatch: Map/Array → JSON, String → text, `()` → no body
- `opts` vocabulary: `{headers, timeout_ms, follow_redirects, max_redirects}` — no `body`, no `body_raw`
**Verdict: accept.** Confirmed during planning. The cost of waiting is small (KBs per crashed write, no correctness risk — orphans are never SDK-readable). Defer to v1.1.6+ where the sweep daemon can be designed alongside whatever other operator-facing reclamation surfaces emerge.
`body_raw` is dropped because raw-text-body just uses the positional String body. Cleaner. The Slack example works unchanged.
### 4.2 Test count 63 vs the 70-90 target
**Verdict: accept the divergence.** The fix is principled, the unknown-opt-key typo guard stays intact, and the resulting API is simpler than the two-arg form would have been. Worth noting for the v1.1.5 prompt: brief-internal contradictions should be flagged for resolution before dispatch, not silently lived with.
**Verdict: accept the undershoot.**
### 4.2 Re-blessed stale `expected_schema.txt` golden
The agent's argument is sound: every named critical test in the prompt's §8 is present (atomic write rollback, checksum tampering, cross-app, path traversal, authz, fan-out transactional rollback, topic pattern shapes including all six rejections, multiple-matches, blob-to-base64). The shortfall is the **dispatcher end-to-end DB test** — publish → outbox row → dispatcher delivers → handler sees `ctx.event`.
The schema-snapshot test is `#[ignore]`'d (needs `DATABASE_URL`), so it never ran in CI. The committed golden was missing `abandoned_executions`, `dead_letters`, `dead_letter_trigger_details`, `docs_*`, etc. — all tables added in v1.1.1, v1.1.2, v1.1.3. The agent re-blessed the file as part of v1.1.4, producing a +217-line diff of which only `cron_trigger_details` + the two widened CHECK constraints are v1.1.4-new.
But: that end-to-end path is *entirely* through code that v1.1.1/v1.1.2/v1.1.4 already exercise. The dispatcher's `Files | Pubsub` match-arm extension is a one-line change. The handler's `ctx.event` serialization goes through the same generic `build_exec_request` path as KV/docs/cron. Adding a v1.1.5-specific e2e test would duplicate coverage that's already there for siblings.
The agent flagged this transparently in HANDBACK and recommended lifting `#[ignore]` with a CI DB service so the golden can't drift again.
If we wanted dispatcher e2e tests, they should be a workspace-wide effort (one test per trigger kind, gated on `DATABASE_URL`, picking up the new CI workflow's Postgres). That's a meaningful follow-up — worth flagging for v1.1.6 — but not v1.1.5's problem.
**Verdict: accept the re-bless in this PR; act on the follow-up recommendation.**
### 4.3 Empty-blob = missing-data
The drift correction lives on `main` going forward. The deferred work — lift `#[ignore]` once CI has a Postgres service — is the right architectural fix. Worth folding into the v1.1.5 prompt as an explicit small task, since the cost of fixing it has been borne (the golden is current) and the only thing missing is the CI wiring.
**Verdict: accept the deviation; relaxable later.**
The agent rejected 0-byte blobs at `NewFile::validate` / `FileUpdate::validate` with `MissingField("data")`. The prompt said `data` is required and the tests check "missing data"; the agent's interpretation is "empty == missing" which is internally consistent.
The cost: v1.1.5 can't store an intentionally-empty file. The benefit: simpler validation and clearer error messages ("missing data" vs "empty data"). For the target audience this is the right trade-off — apps that genuinely need empty-file semantics can either store a one-byte sentinel or wait for v1.2 to relax it. Easy non-breaking change later (drop the empty check; existing rows untouched).
Flag for v1.1.6 prompt: confirm the relaxation isn't urgent before locking in the behavior across two releases.
## 5. Smaller observations (no action required)
- **Single-commit feature delivery.** The brief suggested split `feat(v1.1.4-http)` / `feat(v1.1.4-cron)` commits, but the two features cross shared files (`Cargo.toml`, `Services` bundle, `version.rs`, etc.) — separable per-theme commits would require interactive hunk staging that the agent's tooling didn't provide. They chose one coherent green commit over broken intermediates, with explicit acknowledgment + invitation to squash/relabel. Acceptable.
- **`SdkCallCx::script_id` addition.** Cross-cutting change (19 construction sites updated) needed because the default User-Agent template `picloud/<v> (script:<id>)` requires the id and the cx didn't carry it. Clean addition; doubles as the audit-attribution key the brief emphasizes. HANDBACK §7 #4 flagged it.
- **`0.0.0.0` and `::` defensive additions.** The brief listed `0.0.0.0/8` (covered) but didn't list `::` (the IPv6 unspecified). The agent added both with reason `"unspecified"`. Defensible superset; minimal additional surface.
- **Live DB smoke went beyond the brief.** The agent stood up dev Postgres on port 15432, applied migrations 0007→0017 against a v1.1.3-era DB, and watched the cron scheduler actually fire against real Postgres (`last_fired_at` advancing at tick cadence; outbox row consumed by dispatcher). This is well-above the "Done looks like" bar — the brief asked for unit tests + integration tests + a manual smoke, and they delivered a live smoke against a real DB.
- **Container left running.** Side effect of the live smoke. The agent flagged this transparently. Run `docker compose down` when convenient; no data loss either way.
- **Admin file-delete bypasses `files:delete` trigger emission.** HANDBACK §7 #3 flagged this. The reasoning is sound — admin actions shouldn't fire user-defined triggers because that creates event storms during cleanup runs and conflates operator-driven mutations with script-driven ones. SDK deletes still emit; only the admin REST endpoint skips. Reasonable.
- **Admin files REST API addition** ([files_api.rs](crates/manager-core/src/files_api.rs)) was needed to back the dashboard view. Mirrors `triggers_api`'s direct-repo + capability pattern. HANDBACK §7 #2 flagged it.
- **`files` `list` bridge accepts both positional and map forms** (HANDBACK §7 #4). Additive convenience; the map form matches the prompt's example. Fine.
- **Collection-glob dialect reuses the existing `collection_matches`** (`*` / `foo*` prefix / exact) instead of introducing a new `"prefix:*"` form. Right call — keeping parity with KV/docs trigger semantics. HANDBACK §7 #5 flagged it.
- **`shared::pubsub::NoopPubsubService`** is added for the executor-core integration test harness — every call returns `PubsubError::Unavailable`. Same pattern as the existing `NoopEventEmitter`. Clean.
- **The publish saturating-add for `trigger_depth`** ([pubsub_repo.rs:107](crates/manager-core/src/pubsub_repo.rs#L107)) means a publish from depth-`u32::MAX` won't panic. That's already capped by `PICLOUD_MAX_TRIGGER_DEPTH` (default 8) at the dispatcher, but defensive overflow handling is correct.
- **`shared/src/pubsub.rs` tests** include four named cases (exact, prefix wildcard, universal, validation) with subcases — clean test taxonomy.
## 6. Open questions answered
The HANDBACK §9 raises two open questions:
1. **Three-arg HTTP shape** — confirmed acceptable (§4.1 above).
2. **Stale schema golden re-blessed** — confirmed acceptable (§4.2 above).
No further questions outstanding.
## 7. Versioning audit
## 6. Versioning audit
| File | Before | After | Status |
|---|---|---|---|
| Workspace `Cargo.toml` | 1.1.3 | 1.1.4 | ✅ |
| SDK schema (`shared/src/version.rs`) | 1.4 | 1.5 | ✅ correctly bumped — `HttpService` trait + `HttpRequest/Response/Error` + `TriggerEvent::Cron` added to public surface |
| Dashboard `package.json` | 0.9.0 | 0.10.0 | ✅ |
| Migrations | 0001..0016 | 0017 added | ✅ sequential, no skips |
| `rhai` pin | `"1.19"` | `"=1.24"` (workspace deps) | ✅ v1.1.3 follow-up §10b |
| CHANGELOG.md | v1.1.3 entry | v1.1.4 entry + retroactive v1.1.3 security note | ✅ §10c done |
| Workspace `Cargo.toml` | 1.1.4 | 1.1.5 | ✅ |
| SDK schema (`shared/src/version.rs`) | 1.5 | 1.6 | ✅ correctly bumped — `FilesService`, `PubsubService`, `FileMeta`, `NewFile`, `FileUpdate`, `topic_matches`, `validate_topic_pattern`, `TriggerEvent::{Files, Pubsub}` added to public surface |
| Dashboard `package.json` | 0.10.0 | 0.11.0 | ✅ |
| Migrations | 0001..0017 | 0018..0020 added | ✅ sequential, no skips |
| CHANGELOG.md | v1.1.4 entry | v1.1.5 entry added | ✅ |
## 8. Recommended next steps (post-merge)
## 7. Recommended next steps (post-merge)
1. **Merge** `feat/v1.1.4-http-cron` into `main` (fast-forward; branch is linear ahead).
2. **`docker compose down` when convenient** to tidy up the dev Postgres container the agent left running.
3. **Pause** before dispatching v1.1.5 (Files & Pub/Sub).
4. **For the v1.1.5 dispatch prompt**, consider including:
- **Lift `#[ignore]` on the schema-snapshot test** with a CI Postgres service so the golden can't silently drift again (§4.2). This is small, mechanical, and prevents a recurring problem.
- **Pre-resolve any brief-internal contradictions before dispatch.** The v1.1.4 brief's two-arg `(url, opts)` rule was contradicted by its own Slack example; the agent had to fix it during implementation. For v1.1.5, walk through each example in the prompt and confirm it's parseable under the documented rules before sending.
- **The literal-IP-bypass pattern** is worth remembering for any v1.1.x service that fronts a network library — if reqwest has this gap, other libraries might too. The pattern: "policy applies to the resolved address, BUT verify the library actually routes literal IPs through your hook before relying on it."
1. **Merge** `feat/v1.1.5-files-pubsub` into `main` (fast-forward; branch is linear ahead).
2. **Pause** before dispatching v1.1.6 (Realtime Channels & Client Library — the co-shipped SSE + `@picloud/client` work).
3. **For the v1.1.6 dispatch prompt**, consider folding in:
- **Dispatcher end-to-end DB tests** for each trigger kind. This is broader than v1.1.5 — it's a workspace-wide hygiene task. Now that CI has a Postgres service (per v1.1.5's `.github/workflows/ci.yml`), gating these tests on `DATABASE_URL` lets them run in CI without breaking local `cargo test`. Cost is bounded; the goal is to catch dispatcher regressions before they surface as production trigger silence.
- **Empty-blob storage** — revisit whether `data: 0 bytes` should be a valid stored state (currently rejected as missing). Decide before v1.1.6 ships so the semantics across two releases stay consistent.
- **Orphan file sweeper** — design + ship the simple `*.tmp.*` sweeper (defer the full DB-cross-check version to v1.3+). v1.1.6 is when the file storage will start to accumulate enough that operators notice.
4. **Awareness:** v1.1.5 is the first release where the CI workflow exists. If the project lands new contributors before v1.1.6, the workflow needs `secrets` review (none currently set) and possibly branch-protection rules pointing at the CI checks.
Branch is ready for merge. Verdict: **APPROVE**.

View File

@@ -348,6 +348,7 @@ fn build_ctx_map(req: &ExecRequest) -> Map {
/// `docs/v1.1.x-design-notes.md` §4 (the dead-letter sub-shape) and
/// §2/blueprint §9 (KV). Each variant becomes a Rhai map with a
/// `source` discriminant plus per-source fields.
#[allow(clippy::too_many_lines)]
fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic {
let mut m = Map::new();
m.insert("source".into(), event.source().into());
@@ -406,6 +407,47 @@ fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic {
cron_map.insert("fired_at".into(), fired_at.to_rfc3339().into());
m.insert("cron".into(), cron_map.into());
}
TriggerEvent::Files {
op,
collection,
id,
name,
content_type,
size,
checksum,
prev,
} => {
m.insert("op".into(), op.as_str().into());
let mut files_map = Map::new();
files_map.insert("collection".into(), collection.clone().into());
files_map.insert("id".into(), id.clone().into());
files_map.insert("name".into(), name.clone().into());
files_map.insert("content_type".into(), content_type.clone().into());
files_map.insert(
"size".into(),
i64::try_from(*size).unwrap_or(i64::MAX).into(),
);
files_map.insert("checksum".into(), checksum.clone().into());
files_map.insert(
"prev".into(),
prev.clone().map_or(Dynamic::UNIT, json_to_dynamic),
);
m.insert("files".into(), files_map.into());
}
TriggerEvent::Pubsub {
topic,
message,
published_at,
} => {
// `ctx.event.op` is always "publish" for pub/sub (the only
// op a publish produces).
m.insert("op".into(), "publish".into());
let mut ps = Map::new();
ps.insert("topic".into(), topic.clone().into());
ps.insert("message".into(), json_to_dynamic(message.clone()));
ps.insert("published_at".into(), published_at.to_rfc3339().into());
m.insert("pubsub".into(), ps.into());
}
TriggerEvent::DeadLetter {
dead_letter_id,
original,

View File

@@ -0,0 +1,281 @@
//! `files::` Rhai bridge — collection-scoped handle pattern (v1.1.5).
//!
//! ```rhai
//! let avatars = files::collection("avatars");
//! let id = avatars.create(#{ name: "a.jpg", content_type: "image/jpeg", data: blob });
//! let meta = avatars.head(id); // metadata map or ()
//! let bytes = avatars.get(id); // Blob or ()
//! avatars.update(id, #{ data: new_bytes });
//! let gone = avatars.delete(id); // bool (was-present)
//! let page = avatars.list(); // #{ files: [...], next_cursor: () }
//! ```
//!
//! The `FilesHandle` custom Rhai type captures the collection name once
//! and routes each call through the injected `Arc<dyn FilesService>`
//! with the per-call `Arc<SdkCallCx>`. **The service derives `app_id`
//! from `cx.app_id` — it never appears in any signature script-side,
//! preserving cross-app isolation.**
//!
//! Error convention (per `docs/sdk-shape.md`): `create`/`update`/
//! `delete` throw on failure; `get`/`head` return `()` for a missing
//! file; `delete` returns `bool` (was-present). The blob bytes are a
//! Rhai `Blob` (byte array) in both directions.
use std::sync::Arc;
use picloud_shared::{
FileMeta, FileUpdate, FilesError, FilesService, NewFile, SdkCallCx, Services,
};
use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module};
use tokio::runtime::Handle as TokioHandle;
/// Per-call handle captured by the Rhai SDK. Cheap to clone (two Arcs
/// plus an owned string).
#[derive(Clone)]
pub struct FilesHandle {
collection: String,
service: Arc<dyn FilesService>,
cx: Arc<SdkCallCx>,
}
pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
let files_service = services.files.clone();
let mut module = Module::new();
{
let files_service = files_service.clone();
let cx = cx.clone();
module.set_native_fn(
"collection",
move |name: &str| -> Result<FilesHandle, Box<EvalAltResult>> {
if name.is_empty() {
return Err("files::collection name must not be empty".into());
}
Ok(FilesHandle {
collection: name.to_string(),
service: files_service.clone(),
cx: cx.clone(),
})
},
);
}
engine.register_static_module("files", module.into());
engine.register_type_with_name::<FilesHandle>("FilesHandle");
register_create(engine);
register_head(engine);
register_get(engine);
register_update(engine);
register_delete(engine);
register_list(engine);
}
fn register_create(engine: &mut RhaiEngine) {
engine.register_fn(
"create",
|handle: &mut FilesHandle, meta: Map| -> Result<String, Box<EvalAltResult>> {
let name = require_string(&meta, "name")?;
let content_type = require_string(&meta, "content_type")?;
let data = require_blob(&meta, "data")?;
let h = handle.clone();
let new = NewFile {
name,
content_type,
data,
};
let id = block_on(async move { h.service.create(&h.cx, &h.collection, new).await })?;
Ok(id.to_string())
},
);
}
fn register_head(engine: &mut RhaiEngine) {
engine.register_fn(
"head",
|handle: &mut FilesHandle, id: &str| -> Result<Dynamic, Box<EvalAltResult>> {
let h = handle.clone();
let id = id.to_string();
let meta = block_on(async move { h.service.head(&h.cx, &h.collection, &id).await })?;
Ok(meta.map_or(Dynamic::UNIT, |m| file_meta_to_map(&m).into()))
},
);
}
fn register_get(engine: &mut RhaiEngine) {
engine.register_fn(
"get",
|handle: &mut FilesHandle, id: &str| -> Result<Dynamic, Box<EvalAltResult>> {
let h = handle.clone();
let id = id.to_string();
let bytes = block_on(async move { h.service.get(&h.cx, &h.collection, &id).await })?;
Ok(bytes.map_or(Dynamic::UNIT, Dynamic::from_blob))
},
);
}
fn register_update(engine: &mut RhaiEngine) {
engine.register_fn(
"update",
|handle: &mut FilesHandle, id: &str, meta: Map| -> Result<(), Box<EvalAltResult>> {
let data = require_blob(&meta, "data")?;
let name = optional_string(&meta, "name")?;
let content_type = optional_string(&meta, "content_type")?;
let h = handle.clone();
let id = id.to_string();
let upd = FileUpdate {
data,
name,
content_type,
};
block_on(async move { h.service.update(&h.cx, &h.collection, &id, upd).await })
},
);
}
fn register_delete(engine: &mut RhaiEngine) {
engine.register_fn(
"delete",
|handle: &mut FilesHandle, id: &str| -> Result<bool, Box<EvalAltResult>> {
let h = handle.clone();
let id = id.to_string();
block_on(async move { h.service.delete(&h.cx, &h.collection, &id).await })
},
);
}
fn register_list(engine: &mut RhaiEngine) {
engine.register_fn(
"list",
|handle: &mut FilesHandle| -> Result<Map, Box<EvalAltResult>> {
list_call(handle, None, 0)
},
);
engine.register_fn(
"list",
|handle: &mut FilesHandle, cursor: &str| -> Result<Map, Box<EvalAltResult>> {
list_call(handle, Some(cursor.to_string()), 0)
},
);
engine.register_fn(
"list",
|handle: &mut FilesHandle, cursor: &str, limit: i64| -> Result<Map, Box<EvalAltResult>> {
let limit = u32::try_from(limit.max(0)).unwrap_or(0);
list_call(handle, Some(cursor.to_string()), limit)
},
);
// `list(#{ cursor, limit })` — the map form documented in the brief.
engine.register_fn(
"list",
|handle: &mut FilesHandle, opts: Map| -> Result<Map, Box<EvalAltResult>> {
let cursor = match opts.get("cursor") {
Some(v) if !v.is_unit() => {
Some(v.clone().into_string().map_err(|_| -> Box<EvalAltResult> {
"files: list cursor must be a string".into()
})?)
}
_ => None,
};
let limit = match opts.get("limit") {
Some(v) if !v.is_unit() => {
u32::try_from(v.as_int().unwrap_or(0).max(0)).unwrap_or(0)
}
_ => 0,
};
list_call(handle, cursor, limit)
},
);
}
fn list_call(
handle: &FilesHandle,
cursor: Option<String>,
limit: u32,
) -> Result<Map, Box<EvalAltResult>> {
let h = handle.clone();
let page = block_on(async move {
h.service
.list(&h.cx, &h.collection, cursor.as_deref(), limit)
.await
})?;
let mut m = Map::new();
let files: Array = page
.files
.iter()
.map(|meta| Dynamic::from(file_meta_to_map(meta)))
.collect();
m.insert("files".into(), files.into());
m.insert(
"next_cursor".into(),
page.next_cursor.map_or(Dynamic::UNIT, Dynamic::from),
);
Ok(m)
}
/// Render a `FileMeta` into the Rhai map shape scripts see from
/// `head` / `list`.
fn file_meta_to_map(meta: &FileMeta) -> Map {
let mut m = Map::new();
m.insert("id".into(), meta.id.to_string().into());
m.insert("collection".into(), meta.collection.clone().into());
m.insert("name".into(), meta.name.clone().into());
m.insert("content_type".into(), meta.content_type.clone().into());
m.insert(
"size".into(),
i64::try_from(meta.size).unwrap_or(i64::MAX).into(),
);
m.insert("checksum".into(), meta.checksum.clone().into());
m.insert("created_at".into(), meta.created_at.to_rfc3339().into());
m.insert("updated_at".into(), meta.updated_at.to_rfc3339().into());
m
}
/// Pull a required string field out of a Rhai map; throw naming the
/// field if it's absent or not a string.
fn require_string(meta: &Map, field: &'static str) -> Result<String, Box<EvalAltResult>> {
match meta.get(field) {
Some(v) if v.is_string() => Ok(v.clone().into_string().unwrap_or_default()),
Some(_) => Err(format!("files::create: field '{field}' must be a string").into()),
None => Err(format!("files::create: missing required field '{field}'").into()),
}
}
/// Pull an optional string field; `None` when the key is absent or unit.
fn optional_string(meta: &Map, field: &'static str) -> Result<Option<String>, Box<EvalAltResult>> {
match meta.get(field) {
None => Ok(None),
Some(v) if v.is_unit() => Ok(None),
Some(v) if v.is_string() => Ok(Some(v.clone().into_string().unwrap_or_default())),
Some(_) => Err(format!("files::update: field '{field}' must be a string").into()),
}
}
/// Pull a required blob (`data`) out of a Rhai map; throw naming the
/// field if it's absent or not a blob.
fn require_blob(meta: &Map, field: &'static str) -> Result<Vec<u8>, Box<EvalAltResult>> {
match meta.get(field) {
Some(v) if v.is_blob() => Ok(v.clone().into_blob().unwrap_or_default()),
Some(_) => Err(format!("files: field '{field}' must be a Blob (byte array)").into()),
None => Err(format!("files: missing required field '{field}'").into()),
}
}
/// Run an async future inside the synchronous Rhai context. Mirrors
/// `kv::block_on`; safe because `LocalExecutorClient` runs the script
/// under `spawn_blocking`, so a runtime handle is reachable.
fn block_on<F, T>(fut: F) -> Result<T, Box<EvalAltResult>>
where
F: std::future::Future<Output = Result<T, FilesError>> + Send,
T: Send,
{
let handle = TokioHandle::try_current().map_err(|e| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(
format!("files: no tokio runtime available: {e}").into(),
rhai::Position::NONE,
)
.into()
})?;
handle.block_on(fut).map_err(|err| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(format!("files: {err}").into(), rhai::Position::NONE).into()
})
}

View File

@@ -15,8 +15,10 @@ pub mod bridge;
pub mod cx;
pub mod dead_letters;
pub mod docs;
pub mod files;
pub mod http;
pub mod kv;
pub mod pubsub;
pub mod stdlib;
pub use bridge::{dynamic_to_json, json_to_dynamic};
@@ -37,5 +39,7 @@ pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCal
kv::register(engine, services, cx.clone());
docs::register(engine, services, cx.clone());
dead_letters::register(engine, services, cx.clone());
http::register(engine, services, cx);
http::register(engine, services, cx.clone());
files::register(engine, services, cx.clone());
pubsub::register(engine, services, cx);
}

View File

@@ -0,0 +1,100 @@
//! `pubsub::` Rhai bridge — durable publish (v1.1.5).
//!
//! ```rhai
//! pubsub::publish_durable("user.created", #{ user_id: "abc" });
//! pubsub::publish_durable("metric", 42);
//! ```
//!
//! No handle pattern (topics ARE the grouping unit, so there's no
//! `::collection(...)`). The message is any JSON-serializable Rhai value
//! — Maps, Arrays, strings, numbers, bools, unit, and **Blobs (which
//! encode as base64 strings** so trigger handlers see them as base64 on
//! the wire). Nested blobs are encoded at any depth.
//!
//! `app_id` is derived from `cx.app_id` in the service — it never
//! appears in the script-side signature, preserving cross-app
//! isolation.
use std::sync::Arc;
use base64::engine::general_purpose::STANDARD;
use base64::Engine as _;
use picloud_shared::{PubsubError, SdkCallCx, Services};
use rhai::{Array, Dynamic, Engine as RhaiEngine, EvalAltResult, Map, Module};
use serde_json::Value as Json;
use tokio::runtime::Handle as TokioHandle;
pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
let svc = services.pubsub.clone();
let mut module = Module::new();
{
let svc = svc.clone();
let cx = cx.clone();
module.set_native_fn(
"publish_durable",
move |topic: &str, message: Dynamic| -> Result<(), Box<EvalAltResult>> {
let json = message_to_json(&message);
let svc = svc.clone();
let cx = cx.clone();
block_on(async move { svc.publish_durable(&cx, topic, json).await })
},
);
}
engine.register_static_module("pubsub", module.into());
}
/// Convert a Rhai `Dynamic` message into JSON, base64-encoding any
/// `Blob` (at any nesting depth). Mirrors `bridge::dynamic_to_json` but
/// adds the blob arm the pub/sub wire contract requires.
fn message_to_json(value: &Dynamic) -> Json {
// Blob must be checked before the generic array path (a Blob is a
// `Vec<u8>`, distinct from a Rhai `Array`).
if value.is_blob() {
let blob = value.clone().into_blob().unwrap_or_default();
return Json::String(STANDARD.encode(&blob));
}
if value.is_unit() {
return Json::Null;
}
if let Ok(b) = value.as_bool() {
return Json::Bool(b);
}
if let Ok(i) = value.as_int() {
return Json::Number(i.into());
}
if let Ok(f) = value.as_float() {
return serde_json::Number::from_f64(f).map_or(Json::Null, Json::Number);
}
if value.is_string() {
return Json::String(value.clone().into_string().unwrap_or_default());
}
if let Some(arr) = value.clone().try_cast::<Array>() {
return Json::Array(arr.iter().map(message_to_json).collect());
}
if let Some(map) = value.clone().try_cast::<Map>() {
let mut out = serde_json::Map::new();
for (k, v) in map {
out.insert(k.to_string(), message_to_json(&v));
}
return Json::Object(out);
}
Json::String(value.to_string())
}
/// Run an async future inside the synchronous Rhai context. Mirrors
/// `kv::block_on`.
fn block_on<F>(fut: F) -> Result<(), Box<EvalAltResult>>
where
F: std::future::Future<Output = Result<(), PubsubError>> + Send,
{
let handle = TokioHandle::try_current().map_err(|e| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(
format!("pubsub: no tokio runtime available: {e}").into(),
rhai::Position::NONE,
)
.into()
})?;
handle.block_on(fut).map_err(|err| -> Box<EvalAltResult> {
EvalAltResult::ErrorRuntime(format!("pubsub: {err}").into(), rhai::Position::NONE).into()
})
}

View File

@@ -99,6 +99,8 @@ async fn original_backend_error_is_logged_at_error_level() {
Arc::new(NoopEventEmitter),
Arc::new(FailingSource),
Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
);
let engine = Engine::new(Limits::default(), services);

View File

@@ -97,6 +97,8 @@ fn services_with(modules: Arc<dyn ModuleSource>) -> Services {
Arc::new(NoopEventEmitter),
modules,
Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
)
}

View File

@@ -228,6 +228,8 @@ fn make_engine() -> Arc<Engine> {
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
);
Arc::new(Engine::new(Limits::default(), services))
}

View File

@@ -0,0 +1,334 @@
//! `files::` SDK bridge integration tests — runs a real Rhai engine
//! against an in-memory `FilesService` impl. Mirrors `tests/sdk_kv.rs`:
//! `tokio::task::spawn_blocking` so the bridge's `block_on` has a
//! reachable runtime. Exercises the actual Rhai surface — blob in/out,
//! the metadata map shape, and the missing-required-field throw.
use std::collections::BTreeMap;
use std::sync::Arc;
use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
use picloud_shared::{
AppId, ExecutionId, FileMeta, FileUpdate, FilesError, FilesListPage, FilesService, NewFile,
NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopHttpService, NoopKvService,
NoopModuleSource, RequestId, ScriptId, ScriptSandbox, SdkCallCx, Services,
};
use serde_json::{json, Value};
use tokio::sync::Mutex;
use uuid::Uuid;
#[derive(Default)]
struct InMemoryFiles {
#[allow(clippy::type_complexity)]
data: Mutex<BTreeMap<(AppId, String, Uuid), (FileMeta, Vec<u8>)>>,
}
/// The in-memory fake doesn't exercise the real checksum path (the
/// `FsFilesRepo` tempdir tests in manager-core cover SHA-256); a stable
/// placeholder keeps the metadata map non-empty.
fn fake_checksum(bytes: &[u8]) -> String {
format!("len-{}", bytes.len())
}
#[async_trait]
impl FilesService for InMemoryFiles {
async fn create(
&self,
cx: &SdkCallCx,
collection: &str,
new: NewFile,
) -> Result<Uuid, FilesError> {
if collection.is_empty() {
return Err(FilesError::InvalidCollection("empty".into()));
}
new.validate(100 * 1024 * 1024)?;
let id = Uuid::new_v4();
let now = chrono::Utc::now();
let meta = FileMeta {
id,
collection: collection.to_string(),
name: new.name.clone(),
content_type: new.content_type.clone(),
size: new.data.len() as u64,
checksum: fake_checksum(&new.data),
created_at: now,
updated_at: now,
};
self.data
.lock()
.await
.insert((cx.app_id, collection.to_string(), id), (meta, new.data));
Ok(id)
}
async fn head(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<FileMeta>, FilesError> {
let Ok(uuid) = Uuid::parse_str(id) else {
return Ok(None);
};
Ok(self
.data
.lock()
.await
.get(&(cx.app_id, collection.to_string(), uuid))
.map(|(m, _)| m.clone()))
}
async fn get(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<Vec<u8>>, FilesError> {
let Ok(uuid) = Uuid::parse_str(id) else {
return Ok(None);
};
Ok(self
.data
.lock()
.await
.get(&(cx.app_id, collection.to_string(), uuid))
.map(|(_, b)| b.clone()))
}
async fn update(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
upd: FileUpdate,
) -> Result<(), FilesError> {
upd.validate(100 * 1024 * 1024)?;
let Ok(uuid) = Uuid::parse_str(id) else {
return Err(FilesError::NotFound);
};
let mut data = self.data.lock().await;
let key = (cx.app_id, collection.to_string(), uuid);
let Some((meta, _)) = data.get(&key).cloned() else {
return Err(FilesError::NotFound);
};
let mut meta = meta;
if let Some(n) = upd.name {
meta.name = n;
}
if let Some(ct) = upd.content_type {
meta.content_type = ct;
}
meta.size = upd.data.len() as u64;
meta.checksum = fake_checksum(&upd.data);
data.insert(key, (meta, upd.data));
Ok(())
}
async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result<bool, FilesError> {
let Ok(uuid) = Uuid::parse_str(id) else {
return Ok(false);
};
Ok(self
.data
.lock()
.await
.remove(&(cx.app_id, collection.to_string(), uuid))
.is_some())
}
async fn list(
&self,
cx: &SdkCallCx,
collection: &str,
_cursor: Option<&str>,
_limit: u32,
) -> Result<FilesListPage, FilesError> {
let data = self.data.lock().await;
let files: Vec<FileMeta> = data
.iter()
.filter(|((a, c, _), _)| *a == cx.app_id && c == collection)
.map(|(_, (m, _))| m.clone())
.collect();
Ok(FilesListPage {
files,
next_cursor: None,
})
}
}
fn make_engine() -> Arc<Engine> {
let services = Services::new(
Arc::new(NoopKvService),
Arc::new(NoopDocsService),
Arc::new(NoopDeadLetterService),
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(InMemoryFiles::default()),
Arc::new(picloud_shared::NoopPubsubService),
);
Arc::new(Engine::new(Limits::default(), services))
}
fn baseline_request(app_id: AppId) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "files-test".into(),
invocation_type: InvocationType::Http,
path: "/files-test".into(),
headers: BTreeMap::new(),
body: Value::Null,
params: BTreeMap::new(),
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id,
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
is_dead_letter_handler: false,
event: None,
}
}
async fn run_script(engine: Arc<Engine>, src: &str, req: ExecRequest) -> Value {
let src = src.to_string();
tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic")
.expect("script execution should succeed")
.body
}
async fn run_script_err(engine: Arc<Engine>, src: &str, req: ExecRequest) -> String {
let src = src.to_string();
let res = tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic");
format!("{:?}", res.expect_err("script should error"))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_create_get_round_trip_via_blob() {
let engine = make_engine();
let app = AppId::new();
// base64("hello") = "aGVsbG8="; decode → blob; create; get back; encode.
let src = r#"
let c = files::collection("avatars");
let data = base64::decode("aGVsbG8=");
let id = c.create(#{ name: "a.txt", content_type: "text/plain", data: data });
let back = c.get(id);
base64::encode(back)
"#;
let body = run_script(engine, src, baseline_request(app)).await;
assert_eq!(body, json!("aGVsbG8="));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_head_returns_metadata_map() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
let data = base64::decode("aGVsbG8=");
let id = c.create(#{ name: "a.txt", content_type: "text/plain", data: data });
let meta = c.head(id);
#{ name: meta.name, content_type: meta.content_type, size: meta.size, has_checksum: meta.checksum != () }
"#;
let body = run_script(engine, src, baseline_request(app)).await;
assert_eq!(
body,
json!({ "name": "a.txt", "content_type": "text/plain", "size": 5, "has_checksum": true })
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_get_and_head_missing_return_unit() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
let g = c.get("00000000-0000-0000-0000-000000000000");
let h = c.head("00000000-0000-0000-0000-000000000000");
#{ g: g == (), h: h == () }
"#;
let body = run_script(engine, src, baseline_request(app)).await;
assert_eq!(body, json!({ "g": true, "h": true }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_update_then_delete() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
let id = c.create(#{ name: "a", content_type: "text/plain", data: base64::decode("YQ==") });
c.update(id, #{ data: base64::decode("YmM=") }); // "bc"
let after = base64::encode(c.get(id));
let removed = c.delete(id);
let gone = c.delete(id);
#{ after: after, removed: removed, gone: gone }
"#;
let body = run_script(engine, src, baseline_request(app)).await;
assert_eq!(
body,
json!({ "after": "YmM=", "removed": true, "gone": false })
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_create_missing_data_throws_naming_field() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
c.create(#{ name: "a", content_type: "text/plain" })
"#;
let err = run_script_err(engine, src, baseline_request(app)).await;
assert!(
err.contains("data"),
"error should name the missing field: {err}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_create_missing_name_throws_naming_field() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
c.create(#{ content_type: "text/plain", data: base64::decode("YQ==") })
"#;
let err = run_script_err(engine, src, baseline_request(app)).await;
assert!(
err.contains("name"),
"error should name the missing field: {err}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_empty_collection_name_throws() {
let engine = make_engine();
let app = AppId::new();
let err = run_script_err(engine, r#"files::collection("")"#, baseline_request(app)).await;
assert!(err.to_lowercase().contains("empty"), "got {err}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn files_list_returns_files_array() {
let engine = make_engine();
let app = AppId::new();
let src = r#"
let c = files::collection("avatars");
c.create(#{ name: "a", content_type: "text/plain", data: base64::decode("YQ==") });
c.create(#{ name: "b", content_type: "text/plain", data: base64::decode("Yg==") });
let page = c.list();
page.files.len()
"#;
let body = run_script(engine, src, baseline_request(app)).await;
assert_eq!(body, json!(2));
}

View File

@@ -88,6 +88,8 @@ fn engine_with(http: Arc<dyn HttpService>) -> Arc<Engine> {
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
http,
Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
);
Arc::new(Engine::new(Limits::default(), services))
}

View File

@@ -107,6 +107,8 @@ fn make_engine() -> Arc<Engine> {
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(picloud_shared::NoopFilesService),
Arc::new(picloud_shared::NoopPubsubService),
);
Arc::new(Engine::new(Limits::default(), services))
}

View File

@@ -0,0 +1,157 @@
//! `pubsub::` SDK bridge integration tests — runs a real Rhai engine
//! against an in-memory `PubsubService` that records the published
//! `(topic, message)`. Verifies the message JSON encoding the wire
//! contract requires: Maps, Arrays, strings, numbers, bool, null, and
//! **Blob → base64**, including nesting.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits};
use picloud_shared::{
AppId, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService,
NoopHttpService, NoopKvService, NoopModuleSource, PubsubError, PubsubService, RequestId,
ScriptId, ScriptSandbox, SdkCallCx, Services,
};
use serde_json::{json, Value};
#[derive(Default)]
struct RecordingPubsub {
last: Mutex<Option<(String, Value)>>,
}
#[async_trait]
impl PubsubService for RecordingPubsub {
async fn publish_durable(
&self,
_cx: &SdkCallCx,
topic: &str,
message: Value,
) -> Result<(), PubsubError> {
if topic.trim().is_empty() {
return Err(PubsubError::EmptyTopic);
}
*self.last.lock().unwrap() = Some((topic.to_string(), message));
Ok(())
}
}
fn make_engine(svc: Arc<RecordingPubsub>) -> Arc<Engine> {
let services = Services::new(
Arc::new(NoopKvService),
Arc::new(NoopDocsService),
Arc::new(NoopDeadLetterService),
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(NoopFilesService),
svc,
);
Arc::new(Engine::new(Limits::default(), services))
}
fn baseline_request(app_id: AppId) -> ExecRequest {
let execution_id = ExecutionId::new();
ExecRequest {
execution_id,
request_id: RequestId::new(),
script_id: ScriptId::new(),
script_name: "pubsub-test".into(),
invocation_type: InvocationType::Http,
path: "/pubsub-test".into(),
headers: BTreeMap::new(),
body: Value::Null,
params: BTreeMap::new(),
query: BTreeMap::new(),
rest: String::new(),
sandbox_overrides: ScriptSandbox::default(),
app_id,
principal: None,
trigger_depth: 0,
root_execution_id: execution_id,
is_dead_letter_handler: false,
event: None,
}
}
async fn run(engine: Arc<Engine>, src: &str, req: ExecRequest) {
let src = src.to_string();
tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic")
.expect("script execution should succeed");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_map_message() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("user.created", #{ user_id: "abc", n: 7, ok: true });"#,
baseline_request(AppId::new()),
)
.await;
let (topic, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(topic, "user.created");
assert_eq!(msg, json!({ "user_id": "abc", "n": 7, "ok": true }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_scalar_and_array_and_null() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("a", [1, "two", false, ()]);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!([1, "two", false, null]));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_number_scalar() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
run(
engine,
r#"pubsub::publish_durable("metric", 42);"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!(42));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_blob_encodes_base64_including_nested() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
// base64("hello") = "aGVsbG8=" (STANDARD, padded).
run(
engine,
r#"
let data = base64::decode("aGVsbG8=");
pubsub::publish_durable("blobs", #{ raw: data, list: [data] });
"#,
baseline_request(AppId::new()),
)
.await;
let (_t, msg) = svc.last.lock().unwrap().clone().unwrap();
assert_eq!(msg, json!({ "raw": "aGVsbG8=", "list": ["aGVsbG8="] }));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn publish_empty_topic_throws() {
let svc = Arc::new(RecordingPubsub::default());
let engine = make_engine(svc.clone());
let src = r#"pubsub::publish_durable("", 1);"#.to_string();
let req = baseline_request(AppId::new());
let res = tokio::task::spawn_blocking(move || engine.execute(&src, req))
.await
.expect("spawn_blocking should not panic");
assert!(res.is_err(), "empty topic should throw");
}

View File

@@ -0,0 +1,25 @@
-- v1.1.5: filesystem-backed blob storage. The row holds metadata +
-- the SHA-256 checksum; the blob bytes live on disk at
-- <PICLOUD_FILES_ROOT>/files/<app_id>/<collection>/<id[0:2]>/<id>
-- (never in Postgres). Identity tuple is (app_id, collection, id) per
-- docs/sdk-shape.md, matching KV/docs collection scoping.
--
-- The checksum is computed in a single pass during the atomic write and
-- re-verified on read (FilesError::Corrupted on mismatch). Per-app
-- quotas are deferred to v1.2; only the per-file size cap is enforced
-- (in the service, not the schema).
CREATE TABLE files (
app_id UUID NOT NULL REFERENCES apps(id) ON DELETE CASCADE,
collection TEXT NOT NULL,
id UUID NOT NULL,
name TEXT NOT NULL,
content_type TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
checksum_sha256 TEXT NOT NULL, -- hex, 64 chars, lowercase
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (app_id, collection, id)
);
-- List + cursor pagination scans by (app_id, collection).
CREATE INDEX idx_files_app_collection ON files (app_id, collection);

View File

@@ -0,0 +1,29 @@
-- v1.1.5: extend the triggers framework to recognise `files` as the
-- fifth concrete kind (after `kv`/`dead_letter` v1.1.1, `docs` v1.1.2,
-- `cron` v1.1.4). Mirrors the 0014/0017 extensions exactly: two CHECK
-- constraints widen (strictly gaining `'files'`), one new detail table.
--
-- Files rows route through the SAME generic dispatcher path as the
-- other event kinds (single match-arm extension on the Rust side). The
-- only new machinery is the FilesServiceImpl emitting ServiceEvents
-- that the OutboxEventEmitter fans out — identical to KV/docs.
-- Extend triggers.kind to include 'files'. No existing row carries a
-- value outside the widened set, so the drop+add is safe.
ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check;
ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check
CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', 'files'));
-- Extend outbox.source_kind to include 'files'.
ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check;
ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check
CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs', 'cron', 'files'));
-- One row per files trigger. Mirrors kv_trigger_details:
-- collection_glob — "*", "exact", or "prefix*"
-- ops — subset of {create, update, delete}, empty = any
CREATE TABLE files_trigger_details (
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
collection_glob TEXT NOT NULL,
ops TEXT[] NOT NULL
);

View File

@@ -0,0 +1,34 @@
-- v1.1.5: extend the triggers framework to recognise `pubsub` as the
-- sixth concrete kind. Same Layout-E shape as files (0019): two CHECK
-- constraints widen, one new detail table.
--
-- Pub/sub fans out at PUBLISH time (one outbox row per matching trigger,
-- written by the PubsubServiceImpl), so the dispatcher needs no pubsub-
-- specific branching — a pubsub outbox row dispatches like any other
-- async trigger.
-- Extend triggers.kind to include 'pubsub'.
ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check;
ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check
CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', 'files', 'pubsub'));
-- Extend outbox.source_kind to include 'pubsub'.
ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check;
ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check
CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs',
'cron', 'files', 'pubsub'));
-- One row per pubsub trigger. `topic_pattern` is "exact", "prefix.*",
-- or "*" — validated in Rust at trigger creation. Topics are implicit
-- on first publish; the external-subscribable `topics` table is v1.1.6.
CREATE TABLE pubsub_trigger_details (
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
topic_pattern TEXT NOT NULL
);
-- Hot lookup for fan-out: "all enabled pubsub triggers in app X".
-- Third partial index of its kind (after v1.1.1's idx_triggers_app_kind_
-- enabled); partial indexes are tiny and the planner picks the narrowest.
CREATE INDEX idx_triggers_app_pubsub_enabled
ON triggers (app_id, kind)
WHERE enabled = TRUE AND kind = 'pubsub';

View File

@@ -78,6 +78,17 @@ pub enum Capability {
/// so the conservative write mapping is correct. Splitting
/// read/write is a v1.2+ refinement. Granted to `editor`+.
AppHttpRequest(AppId),
/// Read blobs from this app's files store (v1.1.5). Same trust
/// shape as KV/docs read — granted to `viewer`+, maps to
/// `script:read` on API keys. Honors the seven-scope commitment.
AppFilesRead(AppId),
/// Write blobs to this app's files store (v1.1.5). Granted to
/// `editor`+, maps to `script:write` on API keys.
AppFilesWrite(AppId),
/// Publish a durable pub/sub message from a script in this app
/// (v1.1.5). Maps to `script:write` on API keys (a publish is a
/// write that fans out to subscribers). Granted to `editor`+.
AppPubsubPublish(AppId),
/// Create / list / delete triggers for this app (v1.1.1). Maps to
/// `app:admin` on API keys — triggers are app-configuration acts
/// rather than data-plane access. Granted to `app_admin`+.
@@ -108,6 +119,9 @@ impl Capability {
| Self::AppDocsRead(id)
| Self::AppDocsWrite(id)
| Self::AppHttpRequest(id)
| Self::AppFilesRead(id)
| Self::AppFilesWrite(id)
| Self::AppPubsubPublish(id)
| Self::AppManageTriggers(id)
| Self::AppDeadLetterManage(id) => Some(id),
}
@@ -124,11 +138,16 @@ impl Capability {
Self::InstanceCreateApp | Self::InstanceManageUsers | Self::InstanceManageSettings => {
Scope::InstanceAdmin
}
Self::AppRead(_) | Self::AppKvRead(_) | Self::AppDocsRead(_) => Scope::ScriptRead,
Self::AppRead(_)
| Self::AppKvRead(_)
| Self::AppDocsRead(_)
| Self::AppFilesRead(_) => Scope::ScriptRead,
Self::AppWriteScript(_)
| Self::AppKvWrite(_)
| Self::AppDocsWrite(_)
| Self::AppHttpRequest(_) => Scope::ScriptWrite,
| Self::AppHttpRequest(_)
| Self::AppFilesWrite(_)
| Self::AppPubsubPublish(_) => Scope::ScriptWrite,
Self::AppWriteRoute(_) => Scope::RouteWrite,
Self::AppManageDomains(_) => Scope::DomainManage,
Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => {
@@ -277,6 +296,7 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
| Capability::AppLogRead(_)
| Capability::AppKvRead(_)
| Capability::AppDocsRead(_)
| Capability::AppFilesRead(_)
);
let in_editor = in_viewer
|| matches!(
@@ -286,6 +306,8 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
| Capability::AppKvWrite(_)
| Capability::AppDocsWrite(_)
| Capability::AppHttpRequest(_)
| Capability::AppFilesWrite(_)
| Capability::AppPubsubPublish(_)
);
let in_app_admin = in_editor
|| matches!(

View File

@@ -166,7 +166,9 @@ impl Dispatcher {
OutboxSourceKind::Kv
| OutboxSourceKind::Docs
| OutboxSourceKind::DeadLetter
| OutboxSourceKind::Cron => {
| OutboxSourceKind::Cron
| OutboxSourceKind::Files
| OutboxSourceKind::Pubsub => {
let resolved = self.resolve_trigger(&row).await?;
let req = match self.build_exec_request(&row, &resolved).await {
Ok(req) => req,

View File

@@ -0,0 +1,215 @@
//! `/api/v1/admin/apps/{id}/files*` — minimal files admin endpoints
//! backing the dashboard's files view (v1.1.5).
//!
//! Two operations only, both operator-facing:
//! * `GET /apps/{id}/files?collection=<c>&cursor=&limit=` — list file
//! metadata for a collection (cursor-paginated).
//! * `DELETE /apps/{id}/files/{collection}/{file_id}` — remove a file.
//!
//! These talk to the `FilesRepo` directly (like `triggers_api` talks to
//! `TriggerRepo`), guarded by the same capability model as the SDK
//! (`AppFilesRead` / `AppFilesWrite`). **Admin deletes do NOT emit a
//! `files:delete` trigger event** — they're operator cleanup actions,
//! not script mutations (see HANDBACK §7). The capability binds to the
//! resource's `app_id` after the app is loaded.
use std::sync::Arc;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::response::{IntoResponse, Json, Response};
use axum::routing::{delete, get};
use axum::{Extension, Router};
use picloud_shared::{AppId, Principal};
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;
use crate::app_repo::AppRepository;
use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability};
use crate::files_repo::{FilesRepo, FilesRepoError};
#[derive(Clone)]
pub struct FilesAdminState {
pub files: Arc<dyn FilesRepo>,
pub apps: Arc<dyn AppRepository>,
pub authz: Arc<dyn AuthzRepo>,
}
pub fn files_admin_router(state: FilesAdminState) -> Router {
Router::new()
.route("/apps/{app_id}/files", get(list_files))
.route(
"/apps/{app_id}/files/{collection}/{file_id}",
delete(delete_file),
)
.with_state(state)
}
#[derive(Debug, Deserialize)]
pub struct ListFilesQuery {
pub collection: String,
#[serde(default)]
pub cursor: Option<String>,
#[serde(default)]
pub limit: Option<u32>,
}
#[derive(Debug, Serialize)]
struct FileMetaDto {
id: String,
collection: String,
name: String,
content_type: String,
size: u64,
checksum: String,
created_at: String,
updated_at: String,
}
#[derive(Debug, Serialize)]
struct ListFilesResponse {
files: Vec<FileMetaDto>,
next_cursor: Option<String>,
}
async fn list_files(
State(s): State<FilesAdminState>,
Extension(principal): Extension<Principal>,
Path(app_id): Path<AppId>,
Query(q): Query<ListFilesQuery>,
) -> Result<Json<ListFilesResponse>, FilesApiError> {
ensure_app_exists(&*s.apps, app_id).await?;
require(
s.authz.as_ref(),
&principal,
Capability::AppFilesRead(app_id),
)
.await?;
if q.collection.trim().is_empty() {
return Err(FilesApiError::Invalid(
"collection must not be empty".into(),
));
}
let page = s
.files
.list(
app_id,
&q.collection,
q.cursor.as_deref(),
q.limit.unwrap_or(0),
)
.await?;
let files = page
.files
.into_iter()
.map(|m| FileMetaDto {
id: m.id.to_string(),
collection: m.collection,
name: m.name,
content_type: m.content_type,
size: m.size,
checksum: m.checksum,
created_at: m.created_at.to_rfc3339(),
updated_at: m.updated_at.to_rfc3339(),
})
.collect();
Ok(Json(ListFilesResponse {
files,
next_cursor: page.next_cursor,
}))
}
async fn delete_file(
State(s): State<FilesAdminState>,
Extension(principal): Extension<Principal>,
Path((app_id, collection, file_id)): Path<(AppId, String, String)>,
) -> Result<StatusCode, FilesApiError> {
ensure_app_exists(&*s.apps, app_id).await?;
require(
s.authz.as_ref(),
&principal,
Capability::AppFilesWrite(app_id),
)
.await?;
let id = Uuid::parse_str(&file_id).map_err(|_| FilesApiError::NotFound)?;
if s.files.delete(app_id, &collection, id).await?.is_none() {
return Err(FilesApiError::NotFound);
}
Ok(StatusCode::NO_CONTENT)
}
async fn ensure_app_exists(apps: &dyn AppRepository, app_id: AppId) -> Result<(), FilesApiError> {
apps.get_by_id(app_id)
.await
.map_err(|e| FilesApiError::Backend(e.to_string()))?
.ok_or(FilesApiError::AppNotFound)?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum FilesApiError {
#[error("app not found")]
AppNotFound,
#[error("file not found")]
NotFound,
#[error("invalid request: {0}")]
Invalid(String),
#[error("forbidden")]
Forbidden,
#[error("authorization repo error: {0}")]
AuthzRepo(String),
#[error("files backend: {0}")]
Backend(String),
}
impl From<AuthzDenied> for FilesApiError {
fn from(d: AuthzDenied) -> Self {
match d {
AuthzDenied::Denied => Self::Forbidden,
AuthzDenied::Repo(e) => Self::AuthzRepo(e.to_string()),
}
}
}
impl From<AuthzError> for FilesApiError {
fn from(e: AuthzError) -> Self {
Self::AuthzRepo(e.to_string())
}
}
impl From<FilesRepoError> for FilesApiError {
fn from(e: FilesRepoError) -> Self {
Self::Backend(e.to_string())
}
}
impl IntoResponse for FilesApiError {
fn into_response(self) -> Response {
let (status, body) = match &self {
Self::AppNotFound | Self::NotFound => {
(StatusCode::NOT_FOUND, json!({ "error": self.to_string() }))
}
Self::Invalid(_) => (
StatusCode::UNPROCESSABLE_ENTITY,
json!({ "error": self.to_string() }),
),
Self::Forbidden => (StatusCode::FORBIDDEN, json!({ "error": self.to_string() })),
Self::AuthzRepo(e) => {
tracing::error!(error = %e, "files admin authz repo error");
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": "internal error" }),
)
}
Self::Backend(e) => {
tracing::error!(error = %e, "files admin backend error");
(
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": "internal error" }),
)
}
};
(status, Json(body)).into_response()
}
}

View File

@@ -0,0 +1,759 @@
//! `FilesRepo` — the metadata row (Postgres) + blob bytes (filesystem)
//! storage layer for the v1.1.5 `files::*` SDK.
//!
//! Unlike KV/docs, this repo owns BOTH halves of a file: the `files`
//! row (metadata + SHA-256 checksum) and the bytes on disk at
//! `<root>/files/<app_id>/<collection>/<id[0:2]>/<id>`.
//! It owns both because the write must be atomic across them — a crash
//! mid-write must never leave a readable half-written file.
//!
//! ## Atomic write protocol (`create` / `update`)
//! 1. Validate (collection path-safety; caps live one layer up).
//! 2. `create_dir_all` the shard dir with `0o700`.
//! 3. SHA-256 the in-memory bytes (single pass) while writing to
//! `<final>.tmp.<unique>`.
//! 4. `fsync` the temp file.
//! 5. `rename` temp → final (atomic on POSIX).
//! 6. `fsync` the parent dir (so the rename is durable).
//! 7. INSERT / UPDATE the DB row.
//!
//! A crash between 15 leaves an orphan `*.tmp.*` (never read). A crash
//! between 57 leaves a file with no row — never reachable via the SDK
//! (reads start from the row). Both are reclaimed by a future orphan
//! sweep (deferred to v1.1.6+; see HANDBACK §7).
//!
//! ## Atomic delete protocol
//! 1. SELECT + DELETE the row inside one transaction; commit.
//! 2. `unlink` the file (outside the tx). A failure here leaves an
//! orphan; a failure before the commit changes nothing.
//!
//! ## Checksum-on-read
//! `get` reads the file, hashes it, and compares against the stored
//! checksum — returning `FilesError::Corrupted` (and logging the path
//! at error level) on a mismatch. It never auto-deletes; the operator
//! decides what to do with a metadata-vs-bytes divergence.
use std::env;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use async_trait::async_trait;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use base64::Engine as _;
use chrono::{DateTime, Utc};
use picloud_shared::{AppId, FileMeta, FileUpdate, FilesListPage, NewFile};
use sha2::{Digest, Sha256};
use sqlx::PgPool;
use uuid::Uuid;
/// 100 MB default per-file cap.
pub const DEFAULT_MAX_FILE_SIZE_BYTES: usize = 100 * 1024 * 1024;
/// Default filesystem root (relative to the process CWD).
pub const DEFAULT_FILES_ROOT: &str = "./data";
const FILES_LIST_MAX_LIMIT: u32 = 1_000;
const FILES_LIST_DEFAULT_LIMIT: u32 = 100;
/// Monotonic counter feeding unique temp-file suffixes (combined with
/// the pid). Avoids `rand` in the storage layer per the brief.
static TMP_COUNTER: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, thiserror::Error)]
pub enum FilesRepoError {
#[error("database error: {0}")]
Db(#[from] sqlx::Error),
#[error("filesystem error: {0}")]
Io(String),
#[error("invalid collection name: {0}")]
InvalidCollection(String),
/// The bytes on disk no longer match the stored checksum (or are
/// missing entirely while the row persists).
#[error("file content corrupted (checksum mismatch)")]
Corrupted,
#[error("invalid pagination cursor")]
InvalidCursor,
}
/// Outbound-files tunables. Env-overridable following the same pattern
/// as `HttpConfig::from_env`.
#[derive(Debug, Clone)]
pub struct FilesConfig {
pub root: PathBuf,
pub max_file_size_bytes: usize,
}
impl FilesConfig {
#[must_use]
pub fn conservative() -> Self {
Self {
root: PathBuf::from(DEFAULT_FILES_ROOT),
max_file_size_bytes: DEFAULT_MAX_FILE_SIZE_BYTES,
}
}
#[must_use]
pub fn from_env() -> Self {
let mut c = Self::conservative();
if let Ok(v) = env::var("PICLOUD_FILES_ROOT") {
if !v.trim().is_empty() {
c.root = PathBuf::from(v);
}
}
if let Ok(v) = env::var("PICLOUD_FILES_MAX_FILE_SIZE_BYTES") {
match v.parse::<usize>() {
Ok(n) => c.max_file_size_bytes = n,
Err(e) => {
tracing::warn!(error = %e, "ignoring invalid PICLOUD_FILES_MAX_FILE_SIZE_BYTES");
}
}
}
c
}
}
impl Default for FilesConfig {
fn default() -> Self {
Self::conservative()
}
}
/// The new+prior metadata returned from a successful `update`, so the
/// service can emit a `ServiceEvent` with the change-data-capture
/// surface (`old_payload`).
#[derive(Debug, Clone)]
pub struct FileUpdated {
pub new: FileMeta,
pub prev: FileMeta,
}
#[async_trait]
pub trait FilesRepo: Send + Sync {
async fn create(
&self,
app_id: AppId,
collection: &str,
new: NewFile,
) -> Result<FileMeta, FilesRepoError>;
async fn head(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError>;
/// Reads + checksum-verifies the bytes. `Ok(None)` when no row
/// exists; `Err(Corrupted)` when the row exists but the bytes are
/// missing or mismatched.
async fn get(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<Vec<u8>>, FilesRepoError>;
/// `Ok(None)` when no row exists (the SDK turns this into
/// `FilesError::NotFound`).
async fn update(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
upd: FileUpdate,
) -> Result<Option<FileUpdated>, FilesRepoError>;
/// Returns the deleted row's metadata if present, `None` otherwise.
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError>;
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<FilesListPage, FilesRepoError>;
}
/// Filesystem-bytes + Postgres-metadata repo.
pub struct FsFilesRepo {
pool: PgPool,
config: FilesConfig,
}
impl FsFilesRepo {
#[must_use]
pub fn new(pool: PgPool, config: FilesConfig) -> Self {
Self { pool, config }
}
/// Defensive path-component guard. The service already validates the
/// collection at the SDK boundary; this is belt-and-suspenders so a
/// future caller can't smuggle a traversal sequence onto disk.
fn guard_collection(collection: &str) -> Result<(), FilesRepoError> {
if collection.is_empty()
|| collection.contains('/')
|| collection.contains('\\')
|| collection.contains("..")
|| collection.contains('\0')
{
return Err(FilesRepoError::InvalidCollection(collection.to_string()));
}
Ok(())
}
fn final_path(&self, app_id: AppId, collection: &str, id: Uuid) -> PathBuf {
final_path_at(&self.config.root, app_id, collection, id)
}
fn write_atomic(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
bytes: &[u8],
) -> Result<String, FilesRepoError> {
write_atomic_at(&self.config.root, app_id, collection, id, bytes)
}
}
fn shard_dir_at(root: &Path, app_id: AppId, collection: &str, id_str: &str) -> PathBuf {
root.join("files")
.join(app_id.into_inner().to_string())
.join(collection)
.join(&id_str[..2])
}
fn final_path_at(root: &Path, app_id: AppId, collection: &str, id: Uuid) -> PathBuf {
let id_str = id.to_string();
shard_dir_at(root, app_id, collection, &id_str).join(&id_str)
}
/// Steps 26 of the atomic-write protocol. Returns the lowercase hex
/// SHA-256 of the bytes (computed in a single pass over the in-memory
/// buffer — the file is never re-read). Free function so the fs
/// mechanics are unit-testable without a Postgres pool.
fn write_atomic_at(
root: &Path,
app_id: AppId,
collection: &str,
id: Uuid,
bytes: &[u8],
) -> Result<String, FilesRepoError> {
use std::io::Write as _;
let id_str = id.to_string();
let dir = shard_dir_at(root, app_id, collection, &id_str);
create_dir_all_secure(&dir)?;
// Single-pass checksum over the in-memory buffer.
let mut hasher = Sha256::new();
hasher.update(bytes);
let checksum = hex_lower(&hasher.finalize());
let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let tmp = dir.join(format!("{id_str}.tmp.{}-{seq}", std::process::id()));
let final_path = dir.join(&id_str);
{
let mut f = std::fs::File::create(&tmp).map_err(io_err)?;
f.write_all(bytes).map_err(io_err)?;
f.sync_all().map_err(io_err)?; // fsync temp
}
std::fs::rename(&tmp, &final_path).map_err(io_err)?; // atomic
// fsync the parent dir so the rename is durable.
if let Ok(dirf) = std::fs::File::open(&dir) {
let _ = dirf.sync_all();
}
Ok(checksum)
}
/// Read + checksum-verify the bytes at the given path-set. Free
/// function mirror of the `get` read path. Returns `Corrupted` when the
/// bytes are missing or don't match `expected_checksum`.
fn read_verify_at(
root: &Path,
app_id: AppId,
collection: &str,
id: Uuid,
expected_checksum: &str,
) -> Result<Vec<u8>, FilesRepoError> {
let path = final_path_at(root, app_id, collection, id);
let bytes = match std::fs::read(&path) {
Ok(b) => b,
Err(e) => {
tracing::error!(
path = %path.display(), error = %e,
"files: row exists but bytes are unreadable — treating as corrupted"
);
return Err(FilesRepoError::Corrupted);
}
};
let mut hasher = Sha256::new();
hasher.update(&bytes);
let actual = hex_lower(&hasher.finalize());
if actual != expected_checksum {
tracing::error!(
path = %path.display(), expected = %expected_checksum, actual = %actual,
"files: checksum mismatch on read — content corrupted"
);
return Err(FilesRepoError::Corrupted);
}
Ok(bytes)
}
#[async_trait]
impl FilesRepo for FsFilesRepo {
async fn create(
&self,
app_id: AppId,
collection: &str,
new: NewFile,
) -> Result<FileMeta, FilesRepoError> {
Self::guard_collection(collection)?;
let id = Uuid::new_v4();
let size = i64::try_from(new.data.len()).unwrap_or(i64::MAX);
let checksum = self.write_atomic(app_id, collection, id, &new.data)?;
let row: FileRow = sqlx::query_as(
"INSERT INTO files \
(app_id, collection, id, name, content_type, size_bytes, checksum_sha256) \
VALUES ($1, $2, $3, $4, $5, $6, $7) \
RETURNING id, collection, name, content_type, size_bytes, \
checksum_sha256, created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.bind(&new.name)
.bind(&new.content_type)
.bind(size)
.bind(&checksum)
.fetch_one(&self.pool)
.await?;
Ok(row.into_meta())
}
async fn head(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError> {
let row: Option<FileRow> = sqlx::query_as(
"SELECT id, collection, name, content_type, size_bytes, \
checksum_sha256, created_at, updated_at \
FROM files WHERE app_id = $1 AND collection = $2 AND id = $3",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(FileRow::into_meta))
}
async fn get(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<Vec<u8>>, FilesRepoError> {
let row: Option<(String,)> = sqlx::query_as(
"SELECT checksum_sha256 FROM files \
WHERE app_id = $1 AND collection = $2 AND id = $3",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.fetch_optional(&self.pool)
.await?;
let Some((stored_checksum,)) = row else {
return Ok(None);
};
let bytes = read_verify_at(&self.config.root, app_id, collection, id, &stored_checksum)?;
Ok(Some(bytes))
}
async fn update(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
upd: FileUpdate,
) -> Result<Option<FileUpdated>, FilesRepoError> {
Self::guard_collection(collection)?;
// Read the prior row first (existence check + CDC surface).
let Some(prev) = self.head(app_id, collection, id).await? else {
return Ok(None);
};
let size = i64::try_from(upd.data.len()).unwrap_or(i64::MAX);
let checksum = self.write_atomic(app_id, collection, id, &upd.data)?;
let row: FileRow = sqlx::query_as(
"UPDATE files SET \
name = COALESCE($4, name), \
content_type = COALESCE($5, content_type), \
size_bytes = $6, \
checksum_sha256 = $7, \
updated_at = NOW() \
WHERE app_id = $1 AND collection = $2 AND id = $3 \
RETURNING id, collection, name, content_type, size_bytes, \
checksum_sha256, created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.bind(upd.name.as_deref())
.bind(upd.content_type.as_deref())
.bind(size)
.bind(&checksum)
.fetch_one(&self.pool)
.await?;
Ok(Some(FileUpdated {
new: row.into_meta(),
prev,
}))
}
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError> {
// SELECT + DELETE in one tx; unlink afterwards (outside the tx).
let mut tx = self.pool.begin().await?;
let row: Option<FileRow> = sqlx::query_as(
"SELECT id, collection, name, content_type, size_bytes, \
checksum_sha256, created_at, updated_at \
FROM files WHERE app_id = $1 AND collection = $2 AND id = $3 \
FOR UPDATE",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.fetch_optional(&mut *tx)
.await?;
let Some(row) = row else {
tx.rollback().await?;
return Ok(None);
};
sqlx::query("DELETE FROM files WHERE app_id = $1 AND collection = $2 AND id = $3")
.bind(app_id.into_inner())
.bind(collection)
.bind(id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
// Row is gone; unlink the bytes. A failure here leaves an orphan
// file (reclaimed by a future sweep) — not fatal.
let path = self.final_path(app_id, collection, id);
if let Err(e) = std::fs::remove_file(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!(path = %path.display(), error = %e, "files: unlink after delete failed (orphan)");
}
}
Ok(Some(row.into_meta()))
}
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<FilesListPage, FilesRepoError> {
let limit = if limit == 0 {
FILES_LIST_DEFAULT_LIMIT
} else {
limit.min(FILES_LIST_MAX_LIMIT)
};
let last_id = match cursor {
Some(c) => Some(decode_cursor(c)?),
None => None,
};
let take = i64::from(limit) + 1;
let rows: Vec<FileRow> = sqlx::query_as(
"SELECT id, collection, name, content_type, size_bytes, \
checksum_sha256, created_at, updated_at \
FROM files \
WHERE app_id = $1 AND collection = $2 \
AND ($3::uuid IS NULL OR id > $3) \
ORDER BY id ASC \
LIMIT $4",
)
.bind(app_id.into_inner())
.bind(collection)
.bind(last_id)
.bind(take)
.fetch_all(&self.pool)
.await?;
let mut files: Vec<FileMeta> = rows.into_iter().map(FileRow::into_meta).collect();
let next_cursor = if files.len() > limit as usize {
files.truncate(limit as usize);
files.last().map(|m| encode_cursor(m.id))
} else {
None
};
Ok(FilesListPage { files, next_cursor })
}
}
// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------
fn io_err(e: std::io::Error) -> FilesRepoError {
FilesRepoError::Io(e.to_string())
}
/// `create_dir_all` with `0o700` on the created tree (Unix). On other
/// platforms it falls back to the default permissions.
fn create_dir_all_secure(dir: &Path) -> Result<(), FilesRepoError> {
#[cfg(unix)]
{
use std::os::unix::fs::DirBuilderExt as _;
std::fs::DirBuilder::new()
.recursive(true)
.mode(0o700)
.create(dir)
.map_err(io_err)
}
#[cfg(not(unix))]
{
std::fs::create_dir_all(dir).map_err(io_err)
}
}
fn hex_lower(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
use std::fmt::Write as _;
let _ = write!(s, "{b:02x}");
}
s
}
fn encode_cursor(last_id: Uuid) -> String {
URL_SAFE_NO_PAD.encode(last_id.to_string().as_bytes())
}
fn decode_cursor(cursor: &str) -> Result<Uuid, FilesRepoError> {
let bytes = URL_SAFE_NO_PAD
.decode(cursor)
.map_err(|_| FilesRepoError::InvalidCursor)?;
let s = String::from_utf8(bytes).map_err(|_| FilesRepoError::InvalidCursor)?;
Uuid::parse_str(&s).map_err(|_| FilesRepoError::InvalidCursor)
}
#[derive(sqlx::FromRow)]
struct FileRow {
id: Uuid,
collection: String,
name: String,
content_type: String,
size_bytes: i64,
checksum_sha256: String,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
impl FileRow {
fn into_meta(self) -> FileMeta {
FileMeta {
id: self.id,
collection: self.collection,
name: self.name,
content_type: self.content_type,
size: u64::try_from(self.size_bytes).unwrap_or(0),
checksum: self.checksum_sha256,
created_at: self.created_at,
updated_at: self.updated_at,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn hex_lower_matches_known_sha256_vector() {
// SHA-256("abc") — NIST known-answer vector.
let mut h = Sha256::new();
h.update(b"abc");
assert_eq!(
hex_lower(&h.finalize()),
"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
);
}
#[test]
fn hex_lower_of_empty_is_known_vector() {
let mut h = Sha256::new();
h.update(b"");
assert_eq!(
hex_lower(&h.finalize()),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
}
#[test]
fn cursor_round_trips() {
let id = Uuid::new_v4();
let enc = encode_cursor(id);
assert_eq!(decode_cursor(&enc).unwrap(), id);
assert!(matches!(
decode_cursor("!!not-base64!!"),
Err(FilesRepoError::InvalidCursor)
));
}
#[test]
fn guard_collection_rejects_traversal() {
assert!(FsFilesRepo::guard_collection("avatars").is_ok());
assert!(FsFilesRepo::guard_collection("a/b").is_err());
assert!(FsFilesRepo::guard_collection("..").is_err());
assert!(FsFilesRepo::guard_collection("a..b").is_err());
assert!(FsFilesRepo::guard_collection("").is_err());
assert!(FsFilesRepo::guard_collection("a\0b").is_err());
}
#[test]
fn config_from_env_defaults_are_conservative() {
let c = FilesConfig::conservative();
assert_eq!(c.max_file_size_bytes, DEFAULT_MAX_FILE_SIZE_BYTES);
assert_eq!(c.root, PathBuf::from(DEFAULT_FILES_ROOT));
}
// ------------------------------------------------------------------
// Tempdir-backed filesystem mechanics — exercise the atomic write,
// single-pass checksum, and checksum-on-read tamper detection
// without needing a Postgres pool.
// ------------------------------------------------------------------
use picloud_shared::AppId;
/// Process-unique scratch dir under the system temp dir. Cleaned up
/// by each test via `remove_dir_all`.
fn unique_tmp_root() -> PathBuf {
let seq = TMP_COUNTER.fetch_add(1, Ordering::Relaxed);
let dir =
std::env::temp_dir().join(format!("picloud-files-test-{}-{seq}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
dir
}
#[test]
fn write_atomic_then_read_verify_round_trips() {
let root = unique_tmp_root();
let app = AppId::new();
let id = Uuid::new_v4();
let bytes = b"hello picloud files".to_vec();
let checksum = write_atomic_at(&root, app, "avatars", id, &bytes).unwrap();
// Single-pass checksum matches an independent hash of the bytes.
let mut h = Sha256::new();
h.update(&bytes);
assert_eq!(checksum, hex_lower(&h.finalize()));
let read = read_verify_at(&root, app, "avatars", id, &checksum).unwrap();
assert_eq!(read, bytes);
std::fs::remove_dir_all(&root).ok();
}
#[test]
fn read_verify_detects_tampering_as_corrupted() {
let root = unique_tmp_root();
let app = AppId::new();
let id = Uuid::new_v4();
let checksum = write_atomic_at(&root, app, "c", id, b"original").unwrap();
// Mutate the bytes behind the repo's back.
let path = final_path_at(&root, app, "c", id);
std::fs::write(&path, b"tampered").unwrap();
let err = read_verify_at(&root, app, "c", id, &checksum).unwrap_err();
assert!(matches!(err, FilesRepoError::Corrupted));
std::fs::remove_dir_all(&root).ok();
}
#[test]
fn read_verify_missing_bytes_is_corrupted() {
let root = unique_tmp_root();
let app = AppId::new();
let id = Uuid::new_v4();
// No write — the file never existed.
let err = read_verify_at(&root, app, "c", id, "deadbeef").unwrap_err();
assert!(matches!(err, FilesRepoError::Corrupted));
std::fs::remove_dir_all(&root).ok();
}
#[test]
fn atomic_write_leaves_no_tmp_file_after_success() {
let root = unique_tmp_root();
let app = AppId::new();
let id = Uuid::new_v4();
write_atomic_at(&root, app, "c", id, b"data").unwrap();
let id_str = id.to_string();
let dir = shard_dir_at(&root, app, "c", &id_str);
let entries: Vec<_> = std::fs::read_dir(&dir)
.unwrap()
.filter_map(Result::ok)
.map(|e| e.file_name().to_string_lossy().into_owned())
.collect();
// Exactly the final file is visible — no `*.tmp.*` orphan.
assert_eq!(entries, vec![id_str]);
assert!(!entries.iter().any(|n| n.contains(".tmp.")));
std::fs::remove_dir_all(&root).ok();
}
#[test]
fn id_shard_uses_first_two_chars() {
let root = PathBuf::from("/tmp/x");
let app = AppId::new();
let id = Uuid::new_v4();
let id_str = id.to_string();
let path = final_path_at(&root, app, "col", id);
let shard = &id_str[..2];
assert!(path
.to_string_lossy()
.contains(&format!("/col/{shard}/{id_str}")));
}
#[cfg(unix)]
#[test]
fn shard_tree_created_with_0700() {
use std::os::unix::fs::PermissionsExt as _;
let root = unique_tmp_root();
let app = AppId::new();
let id = Uuid::new_v4();
write_atomic_at(&root, app, "c", id, b"data").unwrap();
let id_str = id.to_string();
let dir = shard_dir_at(&root, app, "c", &id_str);
let mode = std::fs::metadata(&dir).unwrap().permissions().mode();
assert_eq!(mode & 0o777, 0o700, "shard dir should be 0o700");
std::fs::remove_dir_all(&root).ok();
}
}

View File

@@ -0,0 +1,817 @@
//! `FilesServiceImpl` — wires the `FilesRepo` underneath the
//! `picloud_shared::FilesService` trait scripts see via the Rhai
//! bridge.
//!
//! Layers added here (vs the raw repo), mirroring `KvServiceImpl`:
//! 1. Collection validation (empty + path-traversal) and field /
//! size-cap validation at the SDK boundary.
//! 2. **Script-as-gate authz**: when `cx.principal.is_some()` we run
//! `authz::require(...)`; when it's `None` (public HTTP) we skip.
//! Cross-app isolation is unaffected — every repo call is keyed by
//! `cx.app_id`, never an argument.
//! 3. `ServiceEvent` emission after each mutation (`create` /
//! `update` / `delete`). The payload is the file **metadata**, not
//! the blob bytes (files are too big for trigger payloads).
use std::sync::Arc;
use async_trait::async_trait;
use picloud_shared::{
validate_files_collection, FileMeta, FileUpdate, FilesError, FilesListPage, FilesService,
NewFile, SdkCallCx, ServiceEvent, ServiceEventEmitter,
};
use uuid::Uuid;
use crate::authz::{self, AuthzRepo, Capability};
use crate::files_repo::{FileUpdated, FilesRepo, FilesRepoError};
pub struct FilesServiceImpl {
repo: Arc<dyn FilesRepo>,
authz: Arc<dyn AuthzRepo>,
events: Arc<dyn ServiceEventEmitter>,
max_file_size_bytes: usize,
}
impl FilesServiceImpl {
#[must_use]
pub fn new(
repo: Arc<dyn FilesRepo>,
authz: Arc<dyn AuthzRepo>,
events: Arc<dyn ServiceEventEmitter>,
max_file_size_bytes: usize,
) -> Self {
Self {
repo,
authz,
events,
max_file_size_bytes,
}
}
async fn check_read(&self, cx: &SdkCallCx) -> Result<(), FilesError> {
if let Some(ref principal) = cx.principal {
authz::require(&*self.authz, principal, Capability::AppFilesRead(cx.app_id))
.await
.map_err(|_| FilesError::Forbidden)?;
}
Ok(())
}
async fn check_write(&self, cx: &SdkCallCx) -> Result<(), FilesError> {
if let Some(ref principal) = cx.principal {
authz::require(
&*self.authz,
principal,
Capability::AppFilesWrite(cx.app_id),
)
.await
.map_err(|_| FilesError::Forbidden)?;
}
Ok(())
}
/// Best-effort `ServiceEvent` emission. A failed emit is logged but
/// never rolls back the (already-durable) file write.
async fn emit(
&self,
cx: &SdkCallCx,
op: &'static str,
collection: &str,
meta: &FileMeta,
old: Option<&FileMeta>,
) {
let payload = serde_json::to_value(meta).ok();
let old_payload = old.and_then(|m| serde_json::to_value(m).ok());
if let Err(e) = self
.events
.emit(
cx,
ServiceEvent {
source: "files",
op,
collection: Some(collection.to_string()),
key: Some(meta.id.to_string()),
payload,
old_payload,
},
)
.await
{
tracing::warn!(error = %e, source = "files", op, "event emit failed");
}
}
}
/// Parse a script-supplied id. Invalid UUIDs aren't an error shape the
/// SDK exposes — for reads/deletes they simply mean "no such file".
fn parse_id(id: &str) -> Option<Uuid> {
Uuid::parse_str(id).ok()
}
impl From<FilesRepoError> for FilesError {
fn from(e: FilesRepoError) -> Self {
match e {
FilesRepoError::Corrupted => Self::Corrupted,
FilesRepoError::InvalidCollection(c) => Self::InvalidCollection(c),
other => Self::Backend(other.to_string()),
}
}
}
#[async_trait]
impl FilesService for FilesServiceImpl {
async fn create(
&self,
cx: &SdkCallCx,
collection: &str,
new: NewFile,
) -> Result<Uuid, FilesError> {
validate_files_collection(collection)?;
self.check_write(cx).await?;
new.validate(self.max_file_size_bytes)?;
let meta = self.repo.create(cx.app_id, collection, new).await?;
self.emit(cx, "create", collection, &meta, None).await;
Ok(meta.id)
}
async fn head(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<FileMeta>, FilesError> {
validate_files_collection(collection)?;
self.check_read(cx).await?;
let Some(uuid) = parse_id(id) else {
return Ok(None);
};
Ok(self.repo.head(cx.app_id, collection, uuid).await?)
}
async fn get(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<Vec<u8>>, FilesError> {
validate_files_collection(collection)?;
self.check_read(cx).await?;
let Some(uuid) = parse_id(id) else {
return Ok(None);
};
Ok(self.repo.get(cx.app_id, collection, uuid).await?)
}
async fn update(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
upd: FileUpdate,
) -> Result<(), FilesError> {
validate_files_collection(collection)?;
self.check_write(cx).await?;
upd.validate(self.max_file_size_bytes)?;
let Some(uuid) = parse_id(id) else {
return Err(FilesError::NotFound);
};
match self.repo.update(cx.app_id, collection, uuid, upd).await? {
Some(FileUpdated { new, prev }) => {
self.emit(cx, "update", collection, &new, Some(&prev)).await;
Ok(())
}
None => Err(FilesError::NotFound),
}
}
async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result<bool, FilesError> {
validate_files_collection(collection)?;
self.check_write(cx).await?;
let Some(uuid) = parse_id(id) else {
return Ok(false);
};
match self.repo.delete(cx.app_id, collection, uuid).await? {
Some(meta) => {
// On delete, the top-level metadata AND `prev` both carry
// the deleted row (per docs/v1.1.x design + the brief).
self.emit(cx, "delete", collection, &meta, Some(&meta))
.await;
Ok(true)
}
None => Ok(false),
}
}
async fn list(
&self,
cx: &SdkCallCx,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<FilesListPage, FilesError> {
validate_files_collection(collection)?;
self.check_read(cx).await?;
Ok(self.repo.list(cx.app_id, collection, cursor, limit).await?)
}
}
// ----------------------------------------------------------------------------
// Tests — in-memory FilesRepo so unit tests need neither Postgres nor a
// filesystem. The on-disk atomic-write / checksum mechanics are covered
// by the tempdir tests in `files_repo.rs`.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::authz::{AuthzError, AuthzRepo};
use async_trait::async_trait;
use chrono::Utc;
use picloud_shared::{
AdminUserId, AppId, AppRole, EmitError, ExecutionId, InstanceRole, Principal, RequestId,
ScriptId, ServiceEvent, UserId,
};
use std::collections::BTreeMap;
use std::sync::Mutex as StdMutex;
use tokio::sync::Mutex;
/// In-memory FilesRepo keyed by (app, collection, id). Stores the
/// metadata + bytes together so cross-app isolation and round-trips
/// can be checked without disk.
#[derive(Default)]
struct InMemoryFilesRepo {
#[allow(clippy::type_complexity)]
data: Mutex<BTreeMap<(AppId, String, Uuid), (FileMeta, Vec<u8>)>>,
}
fn sha256_hex(bytes: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(bytes);
let out = h.finalize();
let mut s = String::new();
for b in out {
use std::fmt::Write as _;
let _ = write!(s, "{b:02x}");
}
s
}
#[async_trait]
impl FilesRepo for InMemoryFilesRepo {
async fn create(
&self,
app_id: AppId,
collection: &str,
new: NewFile,
) -> Result<FileMeta, FilesRepoError> {
let id = Uuid::new_v4();
let now = Utc::now();
let meta = FileMeta {
id,
collection: collection.to_string(),
name: new.name.clone(),
content_type: new.content_type.clone(),
size: new.data.len() as u64,
checksum: sha256_hex(&new.data),
created_at: now,
updated_at: now,
};
self.data.lock().await.insert(
(app_id, collection.to_string(), id),
(meta.clone(), new.data),
);
Ok(meta)
}
async fn head(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError> {
Ok(self
.data
.lock()
.await
.get(&(app_id, collection.to_string(), id))
.map(|(m, _)| m.clone()))
}
async fn get(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<Vec<u8>>, FilesRepoError> {
Ok(self
.data
.lock()
.await
.get(&(app_id, collection.to_string(), id))
.map(|(_, b)| b.clone()))
}
async fn update(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
upd: FileUpdate,
) -> Result<Option<FileUpdated>, FilesRepoError> {
let mut data = self.data.lock().await;
let key = (app_id, collection.to_string(), id);
let Some((prev_meta, _)) = data.get(&key).cloned() else {
return Ok(None);
};
let now = Utc::now();
let new_meta = FileMeta {
id,
collection: collection.to_string(),
name: upd.name.clone().unwrap_or_else(|| prev_meta.name.clone()),
content_type: upd
.content_type
.clone()
.unwrap_or_else(|| prev_meta.content_type.clone()),
size: upd.data.len() as u64,
checksum: sha256_hex(&upd.data),
created_at: prev_meta.created_at,
updated_at: now,
};
data.insert(key, (new_meta.clone(), upd.data));
Ok(Some(FileUpdated {
new: new_meta,
prev: prev_meta,
}))
}
async fn delete(
&self,
app_id: AppId,
collection: &str,
id: Uuid,
) -> Result<Option<FileMeta>, FilesRepoError> {
Ok(self
.data
.lock()
.await
.remove(&(app_id, collection.to_string(), id))
.map(|(m, _)| m))
}
async fn list(
&self,
app_id: AppId,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<FilesListPage, FilesRepoError> {
let data = self.data.lock().await;
let after = cursor.and_then(|c| Uuid::parse_str(c).ok());
let mut metas: Vec<FileMeta> = data
.iter()
.filter(|((a, c, _), _)| *a == app_id && c == collection)
.map(|(_, (m, _))| m.clone())
.filter(|m| after.is_none_or(|a| m.id > a))
.collect();
metas.sort_by_key(|m| m.id);
let take = (limit.max(1)) as usize;
let next_cursor = if metas.len() > take {
metas.truncate(take);
metas.last().map(|m| m.id.to_string())
} else {
None
};
Ok(FilesListPage {
files: metas,
next_cursor,
})
}
}
/// Captures emitted events so tests can assert on fan-out shape.
#[derive(Default)]
struct CapturingEmitter {
events: StdMutex<Vec<ServiceEvent>>,
}
#[async_trait]
impl ServiceEventEmitter for CapturingEmitter {
async fn emit(&self, _cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> {
self.events.lock().unwrap().push(event);
Ok(())
}
}
#[derive(Default)]
struct DenyingAuthzRepo;
#[async_trait]
impl AuthzRepo for DenyingAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(None)
}
}
#[derive(Default)]
struct EditorAuthzRepo;
#[async_trait]
impl AuthzRepo for EditorAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(Some(AppRole::Editor))
}
}
fn anon_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
script_id: ScriptId::new(),
principal: None,
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn member_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
principal: Some(Principal {
user_id: AdminUserId::new(),
instance_role: InstanceRole::Member,
scopes: None,
app_binding: None,
}),
..anon_cx(app_id)
}
}
fn svc_with(authz: Arc<dyn AuthzRepo>, emitter: Arc<CapturingEmitter>) -> FilesServiceImpl {
FilesServiceImpl::new(
Arc::new(InMemoryFilesRepo::default()),
authz,
emitter,
10 * 1024 * 1024,
)
}
fn svc() -> FilesServiceImpl {
svc_with(
Arc::new(DenyingAuthzRepo),
Arc::new(CapturingEmitter::default()),
)
}
fn new_file(name: &str, data: &[u8]) -> NewFile {
NewFile {
name: name.to_string(),
content_type: "application/octet-stream".to_string(),
data: data.to_vec(),
}
}
#[tokio::test]
async fn create_then_get_head_round_trips() {
let files = svc();
let cx = anon_cx(AppId::new());
let id = files
.create(&cx, "avatars", new_file("a.bin", b"hello"))
.await
.unwrap();
let bytes = files.get(&cx, "avatars", &id.to_string()).await.unwrap();
assert_eq!(bytes, Some(b"hello".to_vec()));
let meta = files
.head(&cx, "avatars", &id.to_string())
.await
.unwrap()
.unwrap();
assert_eq!(meta.name, "a.bin");
assert_eq!(meta.size, 5);
assert_eq!(meta.checksum, sha256_hex(b"hello"));
}
#[tokio::test]
async fn get_and_head_missing_return_none() {
let files = svc();
let cx = anon_cx(AppId::new());
let missing = Uuid::new_v4().to_string();
assert_eq!(files.get(&cx, "c", &missing).await.unwrap(), None);
assert!(files.head(&cx, "c", &missing).await.unwrap().is_none());
// Non-UUID id is also "missing", not an error.
assert_eq!(files.get(&cx, "c", "not-a-uuid").await.unwrap(), None);
}
#[tokio::test]
async fn update_replaces_content_and_keeps_metadata_when_omitted() {
let files = svc();
let cx = anon_cx(AppId::new());
let id = files
.create(&cx, "c", new_file("v1.txt", b"one"))
.await
.unwrap();
files
.update(
&cx,
"c",
&id.to_string(),
FileUpdate {
data: b"two!!".to_vec(),
name: None,
content_type: None,
},
)
.await
.unwrap();
let meta = files
.head(&cx, "c", &id.to_string())
.await
.unwrap()
.unwrap();
assert_eq!(meta.name, "v1.txt"); // kept
assert_eq!(meta.size, 5);
assert_eq!(
files.get(&cx, "c", &id.to_string()).await.unwrap(),
Some(b"two!!".to_vec())
);
}
#[tokio::test]
async fn update_missing_throws_not_found() {
let files = svc();
let cx = anon_cx(AppId::new());
let err = files
.update(
&cx,
"c",
&Uuid::new_v4().to_string(),
FileUpdate {
data: b"x".to_vec(),
name: None,
content_type: None,
},
)
.await
.unwrap_err();
assert!(matches!(err, FilesError::NotFound));
}
#[tokio::test]
async fn delete_returns_was_present() {
let files = svc();
let cx = anon_cx(AppId::new());
let id = files.create(&cx, "c", new_file("f", b"x")).await.unwrap();
assert!(files.delete(&cx, "c", &id.to_string()).await.unwrap());
assert!(!files.delete(&cx, "c", &id.to_string()).await.unwrap());
assert!(!files.delete(&cx, "c", "not-a-uuid").await.unwrap());
}
#[tokio::test]
async fn empty_collection_rejected() {
let files = svc();
let cx = anon_cx(AppId::new());
let err = files
.create(&cx, "", new_file("f", b"x"))
.await
.unwrap_err();
assert!(matches!(err, FilesError::InvalidCollection(_)));
}
#[tokio::test]
async fn traversal_collection_rejected() {
let files = svc();
let cx = anon_cx(AppId::new());
for bad in ["../etc", "a/b", "a..b", "x\0y"] {
let err = files
.create(&cx, bad, new_file("f", b"x"))
.await
.unwrap_err();
assert!(
matches!(err, FilesError::InvalidCollection(_)),
"expected reject for {bad:?}"
);
}
}
#[tokio::test]
async fn missing_required_fields_have_field_specific_messages() {
let files = svc();
let cx = anon_cx(AppId::new());
// name
let err = files
.create(
&cx,
"c",
NewFile {
name: " ".into(),
content_type: "text/plain".into(),
data: b"x".to_vec(),
},
)
.await
.unwrap_err();
assert!(matches!(err, FilesError::MissingField("name")));
// content_type
let err = files
.create(
&cx,
"c",
NewFile {
name: "f".into(),
content_type: String::new(),
data: b"x".to_vec(),
},
)
.await
.unwrap_err();
assert!(matches!(err, FilesError::MissingField("content_type")));
// data
let err = files
.create(
&cx,
"c",
NewFile {
name: "f".into(),
content_type: "text/plain".into(),
data: vec![],
},
)
.await
.unwrap_err();
assert!(matches!(err, FilesError::MissingField("data")));
}
#[tokio::test]
async fn name_and_content_type_length_caps_enforced() {
let files = svc();
let cx = anon_cx(AppId::new());
let long_name = "x".repeat(256);
let err = files
.create(&cx, "c", new_file(&long_name, b"x"))
.await
.unwrap_err();
assert!(matches!(err, FilesError::NameTooLong(256)));
let err = files
.create(
&cx,
"c",
NewFile {
name: "f".into(),
content_type: "x".repeat(128),
data: b"x".to_vec(),
},
)
.await
.unwrap_err();
assert!(matches!(err, FilesError::ContentTypeTooLong(128)));
}
#[tokio::test]
async fn per_file_size_cap_enforced() {
let files = FilesServiceImpl::new(
Arc::new(InMemoryFilesRepo::default()),
Arc::new(DenyingAuthzRepo),
Arc::new(CapturingEmitter::default()),
8, // tiny cap
);
let cx = anon_cx(AppId::new());
let err = files
.create(&cx, "c", new_file("big", b"123456789"))
.await
.unwrap_err();
assert!(matches!(err, FilesError::TooLarge { limit: 8, .. }));
}
#[tokio::test]
async fn cross_app_isolation() {
let files = svc();
let app_a = AppId::new();
let app_b = AppId::new();
let cx_a = anon_cx(app_a);
let cx_b = anon_cx(app_b);
let id = files
.create(&cx_a, "shared", new_file("f", b"from-a"))
.await
.unwrap();
// app B cannot see app A's file by id.
assert_eq!(
files.get(&cx_b, "shared", &id.to_string()).await.unwrap(),
None
);
assert!(files
.head(&cx_b, "shared", &id.to_string())
.await
.unwrap()
.is_none());
let page_b = files.list(&cx_b, "shared", None, 100).await.unwrap();
assert!(page_b.files.is_empty());
// app A still sees it.
assert!(files
.get(&cx_a, "shared", &id.to_string())
.await
.unwrap()
.is_some());
}
#[tokio::test]
async fn anonymous_cx_skips_authz() {
let files = svc(); // DenyingAuthzRepo
let cx = anon_cx(AppId::new());
// No principal → no authz check, even with a denying repo.
files.create(&cx, "c", new_file("f", b"x")).await.unwrap();
}
#[tokio::test]
async fn member_without_role_is_forbidden() {
let files = svc(); // DenyingAuthzRepo
let cx = member_cx(AppId::new());
let err = files
.create(&cx, "c", new_file("f", b"x"))
.await
.unwrap_err();
assert!(matches!(err, FilesError::Forbidden));
}
#[tokio::test]
async fn member_with_editor_role_allowed() {
let files = svc_with(
Arc::new(EditorAuthzRepo),
Arc::new(CapturingEmitter::default()),
);
let cx = member_cx(AppId::new());
files.create(&cx, "c", new_file("f", b"x")).await.unwrap();
}
#[tokio::test]
async fn mutations_emit_events_with_correct_prev() {
let emitter = Arc::new(CapturingEmitter::default());
let files = svc_with(Arc::new(DenyingAuthzRepo), emitter.clone());
let cx = anon_cx(AppId::new());
let id = files.create(&cx, "c", new_file("f", b"one")).await.unwrap();
files
.update(
&cx,
"c",
&id.to_string(),
FileUpdate {
data: b"two".to_vec(),
name: None,
content_type: None,
},
)
.await
.unwrap();
files.delete(&cx, "c", &id.to_string()).await.unwrap();
let events = emitter.events.lock().unwrap();
assert_eq!(events.len(), 3);
// create: prev is None
assert_eq!(events[0].op, "create");
assert_eq!(events[0].source, "files");
assert!(events[0].old_payload.is_none());
assert!(events[0].payload.is_some());
// update: prev is the prior metadata
assert_eq!(events[1].op, "update");
assert!(events[1].old_payload.is_some());
// delete: prev is the deleted metadata (payload == old_payload)
assert_eq!(events[2].op, "delete");
assert_eq!(events[2].payload, events[2].old_payload);
assert!(events[2].payload.is_some());
}
#[tokio::test]
async fn list_cursor_paginates() {
let files = svc();
let cx = anon_cx(AppId::new());
for i in 0..5 {
files
.create(&cx, "c", new_file(&format!("f{i}"), b"x"))
.await
.unwrap();
}
let p1 = files.list(&cx, "c", None, 2).await.unwrap();
assert_eq!(p1.files.len(), 2);
assert!(p1.next_cursor.is_some());
let p2 = files
.list(&cx, "c", p1.next_cursor.as_deref(), 2)
.await
.unwrap();
assert_eq!(p2.files.len(), 2);
let p3 = files
.list(&cx, "c", p2.next_cursor.as_deref(), 2)
.await
.unwrap();
assert_eq!(p3.files.len(), 1);
assert!(p3.next_cursor.is_none());
}
}

View File

@@ -30,6 +30,9 @@ pub mod dispatcher;
pub mod docs_filter;
pub mod docs_repo;
pub mod docs_service;
pub mod files_api;
pub mod files_repo;
pub mod files_service;
pub mod gc;
pub mod http_service;
pub mod kv_repo;
@@ -40,6 +43,8 @@ pub mod module_source;
pub mod outbox_event_emitter;
pub mod outbox_repo;
pub mod principal_resolver;
pub mod pubsub_repo;
pub mod pubsub_service;
pub mod repo;
pub mod route_admin;
pub mod route_repo;
@@ -96,6 +101,9 @@ pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLetters
pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError};
pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo};
pub use docs_service::DocsServiceImpl;
pub use files_api::{files_admin_router, FilesAdminState};
pub use files_repo::{FilesConfig, FilesRepo, FilesRepoError, FsFilesRepo};
pub use files_service::FilesServiceImpl;
pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc};
pub use http_service::{HttpConfig, HttpServiceImpl};
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
@@ -107,6 +115,8 @@ pub use outbox_repo::{
NewOutboxRow, OutboxRepo, OutboxRepoError, OutboxRow, OutboxSourceKind, PostgresOutboxRepo,
};
pub use principal_resolver::{AdminPrincipalResolver, PrincipalResolver, PrincipalResolverError};
pub use pubsub_repo::{PostgresPubsubRepo, PublishCtx, PubsubRepo, PubsubRepoError};
pub use pubsub_service::PubsubServiceImpl;
pub use repo::{
ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository,
RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError,
@@ -116,8 +126,9 @@ pub use route_repo::{NewRoute, PostgresRouteRepository, RouteRepository};
pub use sandbox::{CeilingError, SandboxCeiling};
pub use trigger_config::{BackoffShape, TriggerConfig};
pub use trigger_repo::{
collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger,
DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger,
TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError,
collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger,
CreateKvTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, DocsTriggerMatch,
FilesTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails,
TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError,
};
pub use triggers_api::{triggers_router, TriggersApiError, TriggersState};

View File

@@ -19,7 +19,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use picloud_shared::{
DocsEventOp, EmitError, KvEventOp, SdkCallCx, ServiceEvent, ServiceEventEmitter, TriggerEvent,
DocsEventOp, EmitError, FileMeta, FilesEventOp, KvEventOp, SdkCallCx, ServiceEvent,
ServiceEventEmitter, TriggerEvent,
};
use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind};
@@ -43,6 +44,7 @@ impl ServiceEventEmitter for OutboxEventEmitter {
match event.source {
"kv" => self.emit_kv(cx, event).await,
"docs" => self.emit_docs(cx, event).await,
"files" => self.emit_files(cx, event).await,
// Future sources land here. For now, silently drop — the
// SDK calls `events.emit(...)` unconditionally for forward
// compat, so swallowing without an error is correct.
@@ -154,4 +156,68 @@ impl OutboxEventEmitter {
}
Ok(())
}
/// v1.1.5. Fan out a files mutation across matching files triggers.
/// The `ServiceEvent.payload` is the file **metadata** (never the
/// blob bytes); `old_payload` is the prior metadata (the deleted
/// row's metadata on delete). The `TriggerEvent::Files` carries the
/// metadata fields explicitly + `prev` for the change-data-capture
/// surface.
async fn emit_files(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> {
let Some(op) = FilesEventOp::from_wire(event.op) else {
return Ok(());
};
let Some(collection) = event.collection.clone() else {
return Ok(());
};
// The payload is the FileMeta JSON the FilesServiceImpl emitted.
let Some(meta) = event
.payload
.clone()
.and_then(|v| serde_json::from_value::<FileMeta>(v).ok())
else {
return Ok(());
};
let matches = self
.triggers
.list_matching_files(cx.app_id, &collection, op)
.await
.map_err(|e| EmitError::Unavailable(format!("trigger lookup: {e}")))?;
if matches.is_empty() {
return Ok(());
}
let trigger_event = TriggerEvent::Files {
op,
collection,
id: meta.id.to_string(),
name: meta.name,
content_type: meta.content_type,
size: meta.size,
checksum: meta.checksum,
prev: event.old_payload.clone(),
};
let payload = serde_json::to_value(&trigger_event)
.map_err(|e| EmitError::Rejected(format!("event serialize: {e}")))?;
for m in matches {
self.outbox
.insert(NewOutboxRow {
app_id: cx.app_id,
source_kind: OutboxSourceKind::Files,
trigger_id: Some(m.trigger_id),
script_id: Some(m.script_id),
reply_to: None,
payload: payload.clone(),
origin_principal: cx.principal.as_ref().map(|p| p.user_id),
trigger_depth: cx.trigger_depth.saturating_add(1),
root_execution_id: Some(cx.root_execution_id),
})
.await
.map_err(|e| EmitError::Unavailable(format!("outbox insert: {e}")))?;
}
Ok(())
}
}

View File

@@ -27,6 +27,10 @@ pub enum OutboxSourceKind {
DeadLetter,
/// v1.1.4.
Cron,
/// v1.1.5.
Files,
/// v1.1.5.
Pubsub,
}
impl OutboxSourceKind {
@@ -38,6 +42,8 @@ impl OutboxSourceKind {
Self::Docs => "docs",
Self::DeadLetter => "dead_letter",
Self::Cron => "cron",
Self::Files => "files",
Self::Pubsub => "pubsub",
}
}
@@ -49,6 +55,8 @@ impl OutboxSourceKind {
"docs" => Some(Self::Docs),
"dead_letter" => Some(Self::DeadLetter),
"cron" => Some(Self::Cron),
"files" => Some(Self::Files),
"pubsub" => Some(Self::Pubsub),
_ => None,
}
}

View File

@@ -0,0 +1,118 @@
//! `PubsubRepo` — publish-time fan-out for the v1.1.5 `pubsub::*` SDK.
//!
//! `publish_durable` writes one outbox row per matching enabled `pubsub`
//! trigger, all inside a single transaction so a partial fan-out (some
//! subscribers got rows, others didn't, then a crash) can't happen.
//! Each delivery row then retries / dead-letters independently through
//! the existing dispatcher — no pub/sub-specific dispatch branching.
//!
//! Topic pattern matching runs in Rust (`picloud_shared::topic_matches`)
//! against the small set of the app's enabled pubsub triggers, keeping
//! the SELECT trivial. v1.2 can add a topic-trie index if fan-out
//! becomes a hot path.
use async_trait::async_trait;
use picloud_shared::{topic_matches, AdminUserId, AppId, ExecutionId};
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum PubsubRepoError {
#[error("database error: {0}")]
Db(#[from] sqlx::Error),
}
/// The execution-context bits a fan-out needs to stamp onto each outbox
/// row. Derived from the publishing script's `SdkCallCx`.
#[derive(Debug, Clone, Copy)]
pub struct PublishCtx {
pub app_id: AppId,
pub origin_principal: Option<AdminUserId>,
pub trigger_depth: u32,
pub root_execution_id: ExecutionId,
}
#[async_trait]
pub trait PubsubRepo: Send + Sync {
/// Fan out a publish to every matching enabled pubsub trigger in
/// `ctx.app_id`, inserting one outbox row each in a SINGLE
/// transaction. `event_payload` is the serialized
/// `TriggerEvent::Pubsub`. Returns the number of delivery rows
/// written (0 when no trigger matched — the publish still succeeds).
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError>;
}
pub struct PostgresPubsubRepo {
pool: PgPool,
}
impl PostgresPubsubRepo {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[derive(sqlx::FromRow)]
struct PubsubTriggerRow {
id: Uuid,
script_id: Uuid,
topic_pattern: String,
}
#[async_trait]
impl PubsubRepo for PostgresPubsubRepo {
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError> {
let mut tx = self.pool.begin().await?;
// Load all enabled pubsub triggers for the app; filter by topic
// pattern in Rust (keeps the query simple, honours the
// empty/`*`/prefix semantics without teaching SQL about globs).
let rows: Vec<PubsubTriggerRow> = sqlx::query_as(
"SELECT t.id, t.script_id, d.topic_pattern \
FROM triggers t \
JOIN pubsub_trigger_details d ON d.trigger_id = t.id \
WHERE t.app_id = $1 AND t.kind = 'pubsub' AND t.enabled = TRUE",
)
.bind(ctx.app_id.into_inner())
.fetch_all(&mut *tx)
.await?;
let mut written: u32 = 0;
for r in rows {
if !topic_matches(&r.topic_pattern, topic) {
continue;
}
sqlx::query(
"INSERT INTO outbox ( \
app_id, source_kind, trigger_id, script_id, reply_to, \
payload, origin_principal, trigger_depth, root_execution_id \
) VALUES ($1, 'pubsub', $2, $3, NULL, $4, $5, $6, $7)",
)
.bind(ctx.app_id.into_inner())
.bind(r.id)
.bind(r.script_id)
.bind(&event_payload)
.bind(ctx.origin_principal.map(AdminUserId::into_inner))
.bind(i32::try_from(ctx.trigger_depth.saturating_add(1)).unwrap_or(1))
.bind(ctx.root_execution_id.into_inner())
.execute(&mut *tx)
.await?;
written += 1;
}
// Commit once — all rows or none.
tx.commit().await?;
Ok(written)
}
}

View File

@@ -0,0 +1,320 @@
//! `PubsubServiceImpl` — wires `PubsubRepo` underneath the
//! `picloud_shared::PubsubService` trait scripts see via the Rhai
//! bridge.
//!
//! Mirrors the other stateful services: script-as-gate authz
//! (`AppPubsubPublish`, skipped when `cx.principal` is `None`), with the
//! backend doing a publish-time outbox fan-out instead of a row write.
//! No `ServiceEventEmitter` here — pub/sub publishes directly to the
//! outbox; it doesn't mutate local data that other triggers observe.
use std::sync::Arc;
use async_trait::async_trait;
use picloud_shared::{PubsubError, PubsubService, SdkCallCx, TriggerEvent};
use crate::authz::{self, AuthzRepo, Capability};
use crate::pubsub_repo::{PublishCtx, PubsubRepo, PubsubRepoError};
pub struct PubsubServiceImpl {
repo: Arc<dyn PubsubRepo>,
authz: Arc<dyn AuthzRepo>,
}
impl PubsubServiceImpl {
#[must_use]
pub fn new(repo: Arc<dyn PubsubRepo>, authz: Arc<dyn AuthzRepo>) -> Self {
Self { repo, authz }
}
async fn check_publish(&self, cx: &SdkCallCx) -> Result<(), PubsubError> {
if let Some(ref principal) = cx.principal {
authz::require(
&*self.authz,
principal,
Capability::AppPubsubPublish(cx.app_id),
)
.await
.map_err(|_| PubsubError::Forbidden)?;
}
Ok(())
}
}
impl From<PubsubRepoError> for PubsubError {
fn from(e: PubsubRepoError) -> Self {
Self::Unavailable(e.to_string())
}
}
#[async_trait]
impl PubsubService for PubsubServiceImpl {
async fn publish_durable(
&self,
cx: &SdkCallCx,
topic: &str,
message: serde_json::Value,
) -> Result<(), PubsubError> {
if topic.trim().is_empty() {
return Err(PubsubError::EmptyTopic);
}
self.check_publish(cx).await?;
// `published_at` is stamped on the manager side so every
// delivery agrees on one instant.
let event = TriggerEvent::Pubsub {
topic: topic.to_string(),
message,
published_at: chrono::Utc::now(),
};
let payload = serde_json::to_value(&event)
.map_err(|e| PubsubError::Rejected(format!("event serialize: {e}")))?;
let publish_ctx = PublishCtx {
app_id: cx.app_id,
origin_principal: cx.principal.as_ref().map(|p| p.user_id),
trigger_depth: cx.trigger_depth,
root_execution_id: cx.root_execution_id,
};
// No matching triggers → 0 rows written, publish still succeeds.
self.repo
.fan_out_publish(publish_ctx, topic, payload)
.await?;
Ok(())
}
}
// ----------------------------------------------------------------------------
// Tests — in-memory PubsubRepo so unit tests don't need Postgres. The
// real transactional fan-out is covered against a live DB by the
// integration suite; the in-memory fake models the all-or-nothing
// commit so the rollback semantics can be asserted without a DB.
// ----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::authz::{AuthzError, AuthzRepo};
use async_trait::async_trait;
use picloud_shared::{
topic_matches, AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, Principal,
RequestId, ScriptId, UserId,
};
use std::sync::Mutex;
/// In-memory pubsub repo. Holds a set of `(app, pattern)`
/// subscriptions and records the outbox rows a publish would write.
/// `fail_at` simulates a mid-fan-out INSERT failure: when set to
/// `Some(n)`, the n-th (1-indexed) matching row errors and NOTHING
/// is recorded — modelling the single-transaction rollback.
struct InMemoryPubsubRepo {
subs: Vec<(AppId, String)>,
written: Mutex<Vec<(AppId, String)>>,
fail_at: Option<usize>,
}
impl InMemoryPubsubRepo {
fn new(subs: Vec<(AppId, String)>) -> Self {
Self {
subs,
written: Mutex::new(Vec::new()),
fail_at: None,
}
}
fn written_count(&self) -> usize {
self.written.lock().unwrap().len()
}
}
#[async_trait]
impl PubsubRepo for InMemoryPubsubRepo {
async fn fan_out_publish(
&self,
ctx: PublishCtx,
topic: &str,
_event_payload: serde_json::Value,
) -> Result<u32, PubsubRepoError> {
let matches: Vec<&(AppId, String)> = self
.subs
.iter()
.filter(|(a, pat)| *a == ctx.app_id && topic_matches(pat, topic))
.collect();
let mut staged = Vec::new();
for (i, _) in matches.iter().enumerate() {
if self.fail_at == Some(i + 1) {
// Rollback: nothing was committed.
return Err(PubsubRepoError::Db(sqlx::Error::Protocol(
"simulated insert failure".into(),
)));
}
staged.push((ctx.app_id, topic.to_string()));
}
let n = staged.len();
self.written.lock().unwrap().extend(staged);
Ok(u32::try_from(n).unwrap_or(u32::MAX))
}
}
#[derive(Default)]
struct DenyingAuthzRepo;
#[async_trait]
impl AuthzRepo for DenyingAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(None)
}
}
#[derive(Default)]
struct EditorAuthzRepo;
#[async_trait]
impl AuthzRepo for EditorAuthzRepo {
async fn membership(
&self,
_user_id: UserId,
_app_id: AppId,
) -> Result<Option<AppRole>, AuthzError> {
Ok(Some(AppRole::Editor))
}
}
fn anon_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
app_id,
script_id: ScriptId::new(),
principal: None,
execution_id: ExecutionId::new(),
request_id: RequestId::new(),
trigger_depth: 0,
root_execution_id: ExecutionId::new(),
is_dead_letter_handler: false,
event: None,
}
}
fn member_cx(app_id: AppId) -> SdkCallCx {
SdkCallCx {
principal: Some(Principal {
user_id: AdminUserId::new(),
instance_role: InstanceRole::Member,
scopes: None,
app_binding: None,
}),
..anon_cx(app_id)
}
}
fn svc(repo: Arc<dyn PubsubRepo>, authz: Arc<dyn AuthzRepo>) -> PubsubServiceImpl {
PubsubServiceImpl::new(repo, authz)
}
#[tokio::test]
async fn publish_writes_one_row_per_matching_trigger() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![
(app, "user.*".into()),
(app, "user.created".into()),
(app, "order.*".into()), // does not match
]));
let files = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
files
.publish_durable(&anon_cx(app), "user.created", serde_json::json!({"id": 1}))
.await
.unwrap();
// Two of the three subscriptions match "user.created".
assert_eq!(repo.written_count(), 2);
}
#[tokio::test]
async fn no_matching_trigger_succeeds_silently() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app, "order.*".into())]));
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
svc.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1))
.await
.unwrap();
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn empty_topic_rejected() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&anon_cx(app), " ", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::EmptyTopic));
}
#[tokio::test]
async fn cross_app_isolation() {
let app_a = AppId::new();
let app_b = AppId::new();
// The only subscription belongs to app B.
let repo = Arc::new(InMemoryPubsubRepo::new(vec![(app_b, "*".into())]));
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
// App A publishes — app B's trigger must NOT fire.
svc.publish_durable(&anon_cx(app_a), "user.created", serde_json::json!(1))
.await
.unwrap();
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn fan_out_is_transactional_all_or_nothing() {
let app = AppId::new();
let mut repo = InMemoryPubsubRepo::new(vec![
(app, "*".into()),
(app, "user.*".into()),
(app, "user.created".into()),
]);
repo.fail_at = Some(3); // fail on the 3rd matching insert
let repo = Arc::new(repo);
let svc = svc(repo.clone(), Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&anon_cx(app), "user.created", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::Unavailable(_)));
// Rollback: no partial fan-out survived.
assert_eq!(repo.written_count(), 0);
}
#[tokio::test]
async fn anonymous_cx_skips_authz() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
// No principal → no authz check even with a denying repo.
svc.publish_durable(&anon_cx(app), "t", serde_json::json!(1))
.await
.unwrap();
}
#[tokio::test]
async fn member_without_role_is_forbidden() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(DenyingAuthzRepo));
let err = svc
.publish_durable(&member_cx(app), "t", serde_json::json!(1))
.await
.unwrap_err();
assert!(matches!(err, PubsubError::Forbidden));
}
#[tokio::test]
async fn member_with_editor_role_allowed() {
let app = AppId::new();
let repo = Arc::new(InMemoryPubsubRepo::new(vec![]));
let svc = svc(repo, Arc::new(EditorAuthzRepo));
svc.publish_durable(&member_cx(app), "t", serde_json::json!(1))
.await
.unwrap();
}
}

View File

@@ -5,7 +5,9 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use picloud_shared::{AdminUserId, AppId, DocsEventOp, KvEventOp, ScriptId, TriggerId};
use picloud_shared::{
AdminUserId, AppId, DocsEventOp, FilesEventOp, KvEventOp, ScriptId, TriggerId,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
@@ -51,6 +53,10 @@ pub enum TriggerKind {
DeadLetter,
/// v1.1.4.
Cron,
/// v1.1.5.
Files,
/// v1.1.5.
Pubsub,
}
impl TriggerKind {
@@ -61,6 +67,8 @@ impl TriggerKind {
Self::Docs => "docs",
Self::DeadLetter => "dead_letter",
Self::Cron => "cron",
Self::Files => "files",
Self::Pubsub => "pubsub",
}
}
@@ -71,6 +79,8 @@ impl TriggerKind {
"docs" => Some(Self::Docs),
"dead_letter" => Some(Self::DeadLetter),
"cron" => Some(Self::Cron),
"files" => Some(Self::Files),
"pubsub" => Some(Self::Pubsub),
_ => None,
}
}
@@ -120,6 +130,13 @@ pub enum TriggerDetails {
#[serde(default, skip_serializing_if = "Option::is_none")]
last_fired_at: Option<DateTime<Utc>>,
},
/// v1.1.5. Same shape as KV/docs: a collection glob + op subset.
Files {
collection_glob: String,
ops: Vec<FilesEventOp>,
},
/// v1.1.5. A topic pattern: exact, `<prefix>.*`, or `*`.
Pubsub { topic_pattern: String },
}
/// Create payload for a KV trigger. Defaults applied at the admin
@@ -175,6 +192,46 @@ pub struct CreateCronTrigger {
pub registered_by_principal: AdminUserId,
}
/// Create payload for a files trigger (v1.1.5). Same shape as KV with
/// `FilesEventOp` ops.
#[derive(Debug, Clone)]
pub struct CreateFilesTrigger {
pub script_id: ScriptId,
pub collection_glob: String,
pub ops: Vec<FilesEventOp>,
pub dispatch_mode: TriggerDispatchMode,
pub retry_max_attempts: u32,
pub retry_backoff: BackoffShape,
pub retry_base_ms: u32,
pub registered_by_principal: AdminUserId,
}
/// One match for the dispatcher's files trigger fan-out lookup
/// (v1.1.5). Same shape as `KvTriggerMatch`.
#[derive(Debug, Clone)]
pub struct FilesTriggerMatch {
pub trigger_id: TriggerId,
pub script_id: ScriptId,
pub dispatch_mode: TriggerDispatchMode,
pub retry_max_attempts: u32,
pub retry_backoff: BackoffShape,
pub retry_base_ms: u32,
pub registered_by_principal: AdminUserId,
}
/// Create payload for a pubsub trigger (v1.1.5). `topic_pattern` is
/// validated (exact / `<prefix>.*` / `*`) before insert.
#[derive(Debug, Clone)]
pub struct CreatePubsubTrigger {
pub script_id: ScriptId,
pub topic_pattern: String,
pub dispatch_mode: TriggerDispatchMode,
pub retry_max_attempts: u32,
pub retry_backoff: BackoffShape,
pub retry_base_ms: u32,
pub registered_by_principal: AdminUserId,
}
/// One match for the dispatcher's "which KV triggers fire on this
/// event" lookup. Carries everything the dispatcher needs to construct
/// the outbox row.
@@ -242,6 +299,20 @@ pub trait TriggerRepo: Send + Sync {
req: CreateCronTrigger,
) -> Result<Trigger, TriggerRepoError>;
/// v1.1.5.
async fn create_files_trigger(
&self,
app_id: AppId,
req: CreateFilesTrigger,
) -> Result<Trigger, TriggerRepoError>;
/// v1.1.5. `topic_pattern` is validated before insert.
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError>;
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError>;
async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError>;
@@ -269,6 +340,16 @@ pub trait TriggerRepo: Send + Sync {
op: DocsEventOp,
) -> Result<Vec<DocsTriggerMatch>, TriggerRepoError>;
/// Dispatcher hot path for files fan-out (v1.1.5). Mirrors the KV
/// fan-out logic: pull every enabled files trigger, filter glob +
/// ops in Rust (empty ops array means "any op").
async fn list_matching_files(
&self,
app_id: AppId,
collection: &str,
op: FilesEventOp,
) -> Result<Vec<FilesTriggerMatch>, TriggerRepoError>;
/// Dispatcher hot path for dead-letter fan-out. Filters: source
/// (or any-source), originating trigger_id (or any), originating
/// script_id (or any). Each filter is "match OR is_null".
@@ -555,6 +636,131 @@ impl TriggerRepo for PostgresTriggerRepo {
})
}
async fn create_files_trigger(
&self,
app_id: AppId,
req: CreateFilesTrigger,
) -> Result<Trigger, TriggerRepoError> {
if req.collection_glob.is_empty() {
return Err(TriggerRepoError::Invalid(
"collection_glob must not be empty".into(),
));
}
let mut tx = self.pool.begin().await?;
let parent: TriggerRow = sqlx::query_as(
"INSERT INTO triggers ( \
app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal \
) VALUES ($1, $2, 'files', TRUE, $3, $4, $5, $6, $7) \
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal, created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(req.script_id.into_inner())
.bind(req.dispatch_mode.as_str())
.bind(i32::try_from(req.retry_max_attempts).unwrap_or(3))
.bind(req.retry_backoff.as_str())
.bind(i32::try_from(req.retry_base_ms).unwrap_or(1000))
.bind(req.registered_by_principal.into_inner())
.fetch_one(&mut *tx)
.await?;
let ops_str: Vec<String> = req.ops.iter().map(|o| o.as_str().to_string()).collect();
sqlx::query(
"INSERT INTO files_trigger_details (trigger_id, collection_glob, ops) \
VALUES ($1, $2, $3)",
)
.bind(parent.id)
.bind(&req.collection_glob)
.bind(&ops_str)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Trigger {
id: parent.id.into(),
app_id: parent.app_id.into(),
script_id: parent.script_id.into(),
kind: TriggerKind::Files,
enabled: parent.enabled,
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
.unwrap_or(BackoffShape::Exponential),
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
registered_by_principal: parent.registered_by_principal.into(),
created_at: parent.created_at,
updated_at: parent.updated_at,
details: TriggerDetails::Files {
collection_glob: req.collection_glob,
ops: req.ops,
},
})
}
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError> {
// Defense-in-depth validation (the admin endpoint validates too).
picloud_shared::validate_topic_pattern(&req.topic_pattern)
.map_err(TriggerRepoError::Invalid)?;
let mut tx = self.pool.begin().await?;
let parent: TriggerRow = sqlx::query_as(
"INSERT INTO triggers ( \
app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal \
) VALUES ($1, $2, 'pubsub', TRUE, $3, $4, $5, $6, $7) \
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
retry_max_attempts, retry_backoff, retry_base_ms, \
registered_by_principal, created_at, updated_at",
)
.bind(app_id.into_inner())
.bind(req.script_id.into_inner())
.bind(req.dispatch_mode.as_str())
.bind(i32::try_from(req.retry_max_attempts).unwrap_or(3))
.bind(req.retry_backoff.as_str())
.bind(i32::try_from(req.retry_base_ms).unwrap_or(1000))
.bind(req.registered_by_principal.into_inner())
.fetch_one(&mut *tx)
.await?;
sqlx::query(
"INSERT INTO pubsub_trigger_details (trigger_id, topic_pattern) VALUES ($1, $2)",
)
.bind(parent.id)
.bind(&req.topic_pattern)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Trigger {
id: parent.id.into(),
app_id: parent.app_id.into(),
script_id: parent.script_id.into(),
kind: TriggerKind::Pubsub,
enabled: parent.enabled,
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
.unwrap_or(BackoffShape::Exponential),
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
registered_by_principal: parent.registered_by_principal.into(),
created_at: parent.created_at,
updated_at: parent.updated_at,
details: TriggerDetails::Pubsub {
topic_pattern: req.topic_pattern,
},
})
}
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
let parents: Vec<TriggerRow> = sqlx::query_as(
"SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \
@@ -693,6 +899,51 @@ impl TriggerRepo for PostgresTriggerRepo {
Ok(out)
}
async fn list_matching_files(
&self,
app_id: AppId,
collection: &str,
op: FilesEventOp,
) -> Result<Vec<FilesTriggerMatch>, TriggerRepoError> {
// Mirrors list_matching_kv: pull every enabled files trigger,
// filter glob + ops in Rust (empty ops array means "any op").
let rows: Vec<KvMatchRow> = sqlx::query_as(
"SELECT t.id, t.script_id, t.dispatch_mode, \
t.retry_max_attempts, t.retry_backoff, t.retry_base_ms, \
t.registered_by_principal, \
d.collection_glob, d.ops \
FROM triggers t \
JOIN files_trigger_details d ON d.trigger_id = t.id \
WHERE t.app_id = $1 AND t.kind = 'files' AND t.enabled = TRUE",
)
.bind(app_id.into_inner())
.fetch_all(&self.pool)
.await?;
let op_str = op.as_str();
let mut out = Vec::new();
for r in rows {
if !collection_matches(&r.collection_glob, collection) {
continue;
}
let any_op = r.ops.is_empty();
if !any_op && !r.ops.iter().any(|o| o == op_str) {
continue;
}
out.push(FilesTriggerMatch {
trigger_id: r.id.into(),
script_id: r.script_id.into(),
dispatch_mode: dispatch_from_str(&r.dispatch_mode),
retry_max_attempts: u32::try_from(r.retry_max_attempts).unwrap_or(3),
retry_backoff: BackoffShape::from_wire(&r.retry_backoff)
.unwrap_or(BackoffShape::Exponential),
retry_base_ms: u32::try_from(r.retry_base_ms).unwrap_or(1000),
registered_by_principal: r.registered_by_principal.into(),
});
}
Ok(out)
}
async fn list_matching_dead_letter(
&self,
app_id: AppId,
@@ -729,6 +980,7 @@ impl TriggerRepo for PostgresTriggerRepo {
}
}
#[allow(clippy::too_many_lines)]
async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result<Trigger, TriggerRepoError> {
let kind = TriggerKind::from_wire(&parent.kind).ok_or_else(|| {
TriggerRepoError::Invalid(format!("unknown trigger kind {}", parent.kind))
@@ -797,6 +1049,34 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result<Trigger, Trigg
last_fired_at: row.last_fired_at,
}
}
TriggerKind::Files => {
let row: KvDetailRow = sqlx::query_as(
"SELECT collection_glob, ops FROM files_trigger_details WHERE trigger_id = $1",
)
.bind(parent.id)
.fetch_one(pool)
.await?;
let ops = row
.ops
.iter()
.filter_map(|s| FilesEventOp::from_wire(s))
.collect();
TriggerDetails::Files {
collection_glob: row.collection_glob,
ops,
}
}
TriggerKind::Pubsub => {
let row: PubsubDetailRow = sqlx::query_as(
"SELECT topic_pattern FROM pubsub_trigger_details WHERE trigger_id = $1",
)
.bind(parent.id)
.fetch_one(pool)
.await?;
TriggerDetails::Pubsub {
topic_pattern: row.topic_pattern,
}
}
};
Ok(Trigger {
@@ -869,6 +1149,11 @@ struct CronDetailRow {
last_fired_at: Option<DateTime<Utc>>,
}
#[derive(sqlx::FromRow)]
struct PubsubDetailRow {
topic_pattern: String,
}
#[derive(sqlx::FromRow)]
#[allow(clippy::struct_field_names)]
struct DlDetailRow {

View File

@@ -16,7 +16,9 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Json, Response};
use axum::routing::{delete, get, post};
use axum::{Extension, Router};
use picloud_shared::{AppId, DocsEventOp, KvEventOp, Principal, ScriptId, ScriptKind, TriggerId};
use picloud_shared::{
AppId, DocsEventOp, FilesEventOp, KvEventOp, Principal, ScriptId, ScriptKind, TriggerId,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -25,8 +27,9 @@ use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability};
use crate::repo::{ScriptRepository, ScriptRepositoryError};
use crate::trigger_config::{BackoffShape, TriggerConfig};
use crate::trigger_repo::{
CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, Trigger,
TriggerDispatchMode, TriggerRepo, TriggerRepoError,
CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger,
CreateKvTrigger, CreatePubsubTrigger, Trigger, TriggerDispatchMode, TriggerRepo,
TriggerRepoError,
};
#[derive(Clone)]
@@ -54,6 +57,11 @@ pub fn triggers_router(state: TriggersState) -> Router {
.route("/apps/{app_id}/triggers/kv", post(create_kv_trigger))
.route("/apps/{app_id}/triggers/docs", post(create_docs_trigger))
.route("/apps/{app_id}/triggers/cron", post(create_cron_trigger))
.route("/apps/{app_id}/triggers/files", post(create_files_trigger))
.route(
"/apps/{app_id}/triggers/pubsub",
post(create_pubsub_trigger),
)
.route(
"/apps/{app_id}/triggers/dead_letter",
post(create_dl_trigger),
@@ -139,6 +147,24 @@ fn default_timezone() -> String {
"UTC".to_string()
}
/// v1.1.5 files trigger. Mirrors `CreateKvTriggerRequest`; `ops` uses
/// `FilesEventOp` (`create` / `update` / `delete`).
#[derive(Debug, Deserialize)]
pub struct CreateFilesTriggerRequest {
pub script_id: ScriptId,
pub collection_glob: String,
#[serde(default)]
pub ops: Vec<FilesEventOp>,
#[serde(default = "default_dispatch")]
pub dispatch_mode: TriggerDispatchMode,
#[serde(default)]
pub retry_max_attempts: Option<u32>,
#[serde(default)]
pub retry_backoff: Option<BackoffShape>,
#[serde(default)]
pub retry_base_ms: Option<u32>,
}
#[derive(Debug, Deserialize)]
pub struct CreateDeadLetterTriggerRequest {
pub script_id: ScriptId,
@@ -328,6 +354,94 @@ async fn create_cron_trigger(
Ok((StatusCode::CREATED, Json(created)))
}
/// v1.1.5 pubsub trigger. `topic_pattern` is validated to be exact /
/// `<prefix>.*` / `*`.
#[derive(Debug, Deserialize)]
pub struct CreatePubsubTriggerRequest {
pub script_id: ScriptId,
pub topic_pattern: String,
#[serde(default = "default_dispatch")]
pub dispatch_mode: TriggerDispatchMode,
#[serde(default)]
pub retry_max_attempts: Option<u32>,
#[serde(default)]
pub retry_backoff: Option<BackoffShape>,
#[serde(default)]
pub retry_base_ms: Option<u32>,
}
async fn create_pubsub_trigger(
State(s): State<TriggersState>,
Extension(principal): Extension<Principal>,
Path(app_id): Path<AppId>,
Json(input): Json<CreatePubsubTriggerRequest>,
) -> Result<(StatusCode, Json<Trigger>), TriggersApiError> {
ensure_app_exists(&*s.apps, app_id).await?;
require(
s.authz.as_ref(),
&principal,
Capability::AppManageTriggers(app_id),
)
.await?;
// Validate the topic pattern before touching the script repo so a
// bad pattern fails fast with a clear 422.
picloud_shared::validate_topic_pattern(&input.topic_pattern)
.map_err(TriggersApiError::Invalid)?;
validate_trigger_target(&*s.scripts, app_id, input.script_id).await?;
let req = CreatePubsubTrigger {
script_id: input.script_id,
topic_pattern: input.topic_pattern,
dispatch_mode: input.dispatch_mode,
retry_max_attempts: input
.retry_max_attempts
.unwrap_or(s.config.retry_max_attempts),
retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff),
retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms),
registered_by_principal: principal.user_id,
};
let created = s.triggers.create_pubsub_trigger(app_id, req).await?;
Ok((StatusCode::CREATED, Json(created)))
}
async fn create_files_trigger(
State(s): State<TriggersState>,
Extension(principal): Extension<Principal>,
Path(app_id): Path<AppId>,
Json(input): Json<CreateFilesTriggerRequest>,
) -> Result<(StatusCode, Json<Trigger>), TriggersApiError> {
ensure_app_exists(&*s.apps, app_id).await?;
require(
s.authz.as_ref(),
&principal,
Capability::AppManageTriggers(app_id),
)
.await?;
if input.collection_glob.trim().is_empty() {
return Err(TriggersApiError::Invalid(
"collection_glob must not be empty".into(),
));
}
validate_trigger_target(&*s.scripts, app_id, input.script_id).await?;
let req = CreateFilesTrigger {
script_id: input.script_id,
collection_glob: input.collection_glob,
ops: input.ops,
dispatch_mode: input.dispatch_mode,
retry_max_attempts: input
.retry_max_attempts
.unwrap_or(s.config.retry_max_attempts),
retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff),
retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms),
registered_by_principal: principal.user_id,
};
let created = s.triggers.create_files_trigger(app_id, req).await?;
Ok((StatusCode::CREATED, Json(created)))
}
async fn create_dl_trigger(
State(s): State<TriggersState>,
Extension(principal): Extension<Principal>,
@@ -484,13 +598,15 @@ mod tests {
use super::*;
use crate::app_repo::{AppLookup, AppRepository};
use crate::trigger_repo::{
CreateCronTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, Trigger,
TriggerDetails, TriggerRepo, TriggerRepoError,
CreateCronTrigger, CreateFilesTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch,
DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo,
TriggerRepoError,
};
use async_trait::async_trait;
use chrono::Utc;
use picloud_shared::{
AdminUserId, App, AppRole, DocsEventOp, KvEventOp, ScriptId, TriggerId, UserId,
AdminUserId, App, AppRole, DocsEventOp, FilesEventOp, KvEventOp, ScriptId, TriggerId,
UserId,
};
use std::collections::HashMap;
use tokio::sync::Mutex;
@@ -616,6 +732,61 @@ mod tests {
self.inner.lock().await.insert(id, trigger.clone());
Ok(trigger)
}
async fn create_files_trigger(
&self,
app_id: AppId,
req: CreateFilesTrigger,
) -> Result<Trigger, TriggerRepoError> {
let now = Utc::now();
let id = TriggerId::new();
let trigger = Trigger {
id,
app_id,
script_id: req.script_id,
kind: crate::trigger_repo::TriggerKind::Files,
enabled: true,
dispatch_mode: req.dispatch_mode,
retry_max_attempts: req.retry_max_attempts,
retry_backoff: req.retry_backoff,
retry_base_ms: req.retry_base_ms,
registered_by_principal: req.registered_by_principal,
created_at: now,
updated_at: now,
details: TriggerDetails::Files {
collection_glob: req.collection_glob,
ops: req.ops,
},
};
self.inner.lock().await.insert(id, trigger.clone());
Ok(trigger)
}
async fn create_pubsub_trigger(
&self,
app_id: AppId,
req: CreatePubsubTrigger,
) -> Result<Trigger, TriggerRepoError> {
let now = Utc::now();
let id = TriggerId::new();
let trigger = Trigger {
id,
app_id,
script_id: req.script_id,
kind: crate::trigger_repo::TriggerKind::Pubsub,
enabled: true,
dispatch_mode: req.dispatch_mode,
retry_max_attempts: req.retry_max_attempts,
retry_backoff: req.retry_backoff,
retry_base_ms: req.retry_base_ms,
registered_by_principal: req.registered_by_principal,
created_at: now,
updated_at: now,
details: TriggerDetails::Pubsub {
topic_pattern: req.topic_pattern,
},
};
self.inner.lock().await.insert(id, trigger.clone());
Ok(trigger)
}
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
Ok(self
.inner
@@ -648,6 +819,14 @@ mod tests {
) -> Result<Vec<DocsTriggerMatch>, TriggerRepoError> {
Ok(vec![])
}
async fn list_matching_files(
&self,
_app_id: AppId,
_collection: &str,
_op: FilesEventOp,
) -> Result<Vec<FilesTriggerMatch>, TriggerRepoError> {
Ok(vec![])
}
async fn list_matching_dead_letter(
&self,
_app_id: AppId,
@@ -1560,4 +1739,258 @@ mod tests {
.expect("endpoint target should succeed");
assert_eq!(status, StatusCode::CREATED);
}
// ----------------------------------------------------------------
// v1.1.5: files + pubsub trigger create (Layout-E reject coverage).
// ----------------------------------------------------------------
fn files_req(script_id: ScriptId, glob: &str) -> CreateFilesTriggerRequest {
CreateFilesTriggerRequest {
script_id,
collection_glob: glob.into(),
ops: vec![FilesEventOp::Create],
dispatch_mode: TriggerDispatchMode::Async,
retry_max_attempts: None,
retry_backoff: None,
retry_base_ms: None,
}
}
#[tokio::test]
async fn files_trigger_create_succeeds() {
let app_id = AppId::new();
let script_id = ScriptId::new();
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
let (status, Json(trigger)) = create_files_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(files_req(script_id, "avatars")),
)
.await
.unwrap();
assert_eq!(status, StatusCode::CREATED);
assert!(matches!(
trigger.kind,
crate::trigger_repo::TriggerKind::Files
));
match trigger.details {
TriggerDetails::Files {
collection_glob,
ops,
} => {
assert_eq!(collection_glob, "avatars");
assert_eq!(ops, vec![FilesEventOp::Create]);
}
other => panic!("expected Files details, got {other:?}"),
}
}
#[tokio::test]
async fn files_trigger_empty_glob_rejected() {
let app_id = AppId::new();
let state = state_with(Arc::new(AlwaysAllowAuthzRepo), app_id);
let res = create_files_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(files_req(ScriptId::new(), " ")),
)
.await;
assert!(matches!(
res.expect_err("empty glob"),
TriggersApiError::Invalid(_)
));
}
#[tokio::test]
async fn files_trigger_rejects_module_target() {
let app_id = AppId::new();
let script_id = ScriptId::new();
let state = TriggersState {
triggers: Arc::new(InMemoryTriggerRepo::default()),
apps: InMemoryAppRepo::with(app_id),
authz: Arc::new(AlwaysAllowAuthzRepo),
scripts: InMemoryScriptRepo::with_module(app_id, script_id),
config: TriggerConfig::conservative(),
};
let res = create_files_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(files_req(script_id, "avatars")),
)
.await;
let msg = match res.expect_err("module rejected") {
TriggersApiError::Invalid(m) => m,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(msg.to_lowercase().contains("module"));
}
#[tokio::test]
async fn files_trigger_rejects_cross_app_script() {
let app_a = AppId::new();
let app_b = AppId::new();
let script_id = ScriptId::new();
let state = TriggersState {
triggers: Arc::new(InMemoryTriggerRepo::default()),
apps: InMemoryAppRepo::with(app_a),
authz: Arc::new(AlwaysAllowAuthzRepo),
scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id),
config: TriggerConfig::conservative(),
};
let res = create_files_trigger(
State(state),
Extension(member_principal()),
Path(app_a),
Json(files_req(script_id, "avatars")),
)
.await;
let msg = match res.expect_err("cross-app rejected") {
TriggersApiError::Invalid(m) => m,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(msg.to_lowercase().contains("does not belong"));
}
#[tokio::test]
async fn files_trigger_member_without_role_is_forbidden() {
let app_id = AppId::new();
let state = state_with(Arc::new(AlwaysDenyAuthzRepo), app_id);
let res = create_files_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(files_req(ScriptId::new(), "avatars")),
)
.await;
assert!(matches!(
res.expect_err("forbidden"),
TriggersApiError::Forbidden
));
}
fn pubsub_req(script_id: ScriptId, pattern: &str) -> CreatePubsubTriggerRequest {
CreatePubsubTriggerRequest {
script_id,
topic_pattern: pattern.into(),
dispatch_mode: TriggerDispatchMode::Async,
retry_max_attempts: None,
retry_backoff: None,
retry_base_ms: None,
}
}
#[tokio::test]
async fn pubsub_trigger_create_succeeds() {
let app_id = AppId::new();
let script_id = ScriptId::new();
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
let (status, Json(trigger)) = create_pubsub_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(pubsub_req(script_id, "user.*")),
)
.await
.unwrap();
assert_eq!(status, StatusCode::CREATED);
match trigger.details {
TriggerDetails::Pubsub { topic_pattern } => assert_eq!(topic_pattern, "user.*"),
other => panic!("expected Pubsub details, got {other:?}"),
}
}
#[tokio::test]
async fn pubsub_trigger_rejects_bad_pattern() {
let app_id = AppId::new();
let script_id = ScriptId::new();
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
for bad in ["*.created", "a.*.b", "**"] {
let res = create_pubsub_trigger(
State(state.clone()),
Extension(member_principal()),
Path(app_id),
Json(pubsub_req(script_id, bad)),
)
.await;
let msg = match res.expect_err("bad pattern") {
TriggersApiError::Invalid(m) => m,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(
msg.contains("unsupported pubsub topic pattern"),
"got {msg} for {bad}"
);
}
}
#[tokio::test]
async fn pubsub_trigger_rejects_module_target() {
let app_id = AppId::new();
let script_id = ScriptId::new();
let state = TriggersState {
triggers: Arc::new(InMemoryTriggerRepo::default()),
apps: InMemoryAppRepo::with(app_id),
authz: Arc::new(AlwaysAllowAuthzRepo),
scripts: InMemoryScriptRepo::with_module(app_id, script_id),
config: TriggerConfig::conservative(),
};
let res = create_pubsub_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(pubsub_req(script_id, "user.*")),
)
.await;
let msg = match res.expect_err("module rejected") {
TriggersApiError::Invalid(m) => m,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(msg.to_lowercase().contains("module"));
}
#[tokio::test]
async fn pubsub_trigger_rejects_cross_app_script() {
let app_a = AppId::new();
let app_b = AppId::new();
let script_id = ScriptId::new();
let state = TriggersState {
triggers: Arc::new(InMemoryTriggerRepo::default()),
apps: InMemoryAppRepo::with(app_a),
authz: Arc::new(AlwaysAllowAuthzRepo),
scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id),
config: TriggerConfig::conservative(),
};
let res = create_pubsub_trigger(
State(state),
Extension(member_principal()),
Path(app_a),
Json(pubsub_req(script_id, "user.*")),
)
.await;
let msg = match res.expect_err("cross-app rejected") {
TriggersApiError::Invalid(m) => m,
other => panic!("expected Invalid, got {other:?}"),
};
assert!(msg.to_lowercase().contains("does not belong"));
}
#[tokio::test]
async fn pubsub_trigger_member_without_role_is_forbidden() {
let app_id = AppId::new();
let state = state_with(Arc::new(AlwaysDenyAuthzRepo), app_id);
let res = create_pubsub_trigger(
State(state),
Extension(member_principal()),
Path(app_id),
Json(pubsub_req(ScriptId::new(), "user.*")),
)
.await;
assert!(matches!(
res.expect_err("forbidden"),
TriggersApiError::Forbidden
));
}
}

View File

@@ -128,6 +128,22 @@ table: execution_logs
created_at: timestamp with time zone NOT NULL default=now()
app_id: uuid NOT NULL
table: files
app_id: uuid NOT NULL
collection: text NOT NULL
id: uuid NOT NULL
name: text NOT NULL
content_type: text NOT NULL
size_bytes: bigint NOT NULL
checksum_sha256: text NOT NULL
created_at: timestamp with time zone NOT NULL default=now()
updated_at: timestamp with time zone NOT NULL default=now()
table: files_trigger_details
trigger_id: uuid NOT NULL
collection_glob: text NOT NULL
ops: ARRAY NOT NULL
table: kv_entries
app_id: uuid NOT NULL
collection: text NOT NULL
@@ -158,6 +174,10 @@ table: outbox
claimed_by: text NULL
created_at: timestamp with time zone NOT NULL default=now()
table: pubsub_trigger_details
trigger_id: uuid NOT NULL
topic_pattern: text NOT NULL
table: routes
id: uuid NOT NULL default=gen_random_uuid()
script_id: uuid NOT NULL
@@ -268,6 +288,13 @@ indexes on execution_logs:
execution_logs_pkey: public.execution_logs USING btree (id)
execution_logs_script_id_created_at_idx: public.execution_logs USING btree (script_id, created_at DESC)
indexes on files:
files_pkey: public.files USING btree (app_id, collection, id)
idx_files_app_collection: public.files USING btree (app_id, collection)
indexes on files_trigger_details:
files_trigger_details_pkey: public.files_trigger_details USING btree (trigger_id)
indexes on kv_entries:
idx_kv_entries_app_collection: public.kv_entries USING btree (app_id, collection)
kv_entries_pkey: public.kv_entries USING btree (app_id, collection, key)
@@ -280,6 +307,9 @@ indexes on outbox:
idx_outbox_due: public.outbox USING btree (next_attempt_at) WHERE (claimed_at IS NULL)
outbox_pkey: public.outbox USING btree (id)
indexes on pubsub_trigger_details:
pubsub_trigger_details_pkey: public.pubsub_trigger_details USING btree (trigger_id)
indexes on routes:
routes_app_id_idx: public.routes USING btree (app_id)
routes_lookup_idx: public.routes USING btree (host_kind, host)
@@ -300,6 +330,7 @@ indexes on scripts:
indexes on triggers:
idx_triggers_app_kind_enabled: public.triggers USING btree (app_id, kind) WHERE (enabled = true)
idx_triggers_app_pubsub_enabled: public.triggers USING btree (app_id, kind) WHERE ((enabled = true) AND (kind = 'pubsub'::text))
triggers_pkey: public.triggers USING btree (id)
## constraints
@@ -370,6 +401,14 @@ constraints on execution_logs:
[FOREIGN KEY] execution_logs_script_id_fkey: FOREIGN KEY (script_id) REFERENCES scripts(id) ON DELETE CASCADE
[PRIMARY KEY] execution_logs_pkey: PRIMARY KEY (id)
constraints on files:
[FOREIGN KEY] files_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
[PRIMARY KEY] files_pkey: PRIMARY KEY (app_id, collection, id)
constraints on files_trigger_details:
[FOREIGN KEY] files_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
[PRIMARY KEY] files_trigger_details_pkey: PRIMARY KEY (trigger_id)
constraints on kv_entries:
[FOREIGN KEY] kv_entries_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
[PRIMARY KEY] kv_entries_pkey: PRIMARY KEY (app_id, collection, key)
@@ -379,10 +418,14 @@ constraints on kv_trigger_details:
[PRIMARY KEY] kv_trigger_details_pkey: PRIMARY KEY (trigger_id)
constraints on outbox:
[CHECK] outbox_source_kind_check: CHECK ((source_kind = ANY (ARRAY['http'::text, 'kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text])))
[CHECK] outbox_source_kind_check: CHECK ((source_kind = ANY (ARRAY['http'::text, 'kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text, 'files'::text, 'pubsub'::text])))
[FOREIGN KEY] outbox_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
[PRIMARY KEY] outbox_pkey: PRIMARY KEY (id)
constraints on pubsub_trigger_details:
[FOREIGN KEY] pubsub_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
[PRIMARY KEY] pubsub_trigger_details_pkey: PRIMARY KEY (trigger_id)
constraints on routes:
[CHECK] routes_dispatch_mode_check: CHECK ((dispatch_mode = ANY (ARRAY['sync'::text, 'async'::text])))
[CHECK] routes_host_kind_check: CHECK ((host_kind = ANY (ARRAY['any'::text, 'strict'::text, 'wildcard'::text])))
@@ -407,7 +450,7 @@ constraints on scripts:
constraints on triggers:
[CHECK] triggers_dispatch_mode_check: CHECK ((dispatch_mode = ANY (ARRAY['sync'::text, 'async'::text])))
[CHECK] triggers_kind_check: CHECK ((kind = ANY (ARRAY['kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text])))
[CHECK] triggers_kind_check: CHECK ((kind = ANY (ARRAY['kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text, 'files'::text, 'pubsub'::text])))
[CHECK] triggers_retry_backoff_check: CHECK ((retry_backoff = ANY (ARRAY['exponential'::text, 'linear'::text, 'constant'::text])))
[FOREIGN KEY] triggers_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
[FOREIGN KEY] triggers_registered_by_principal_fkey: FOREIGN KEY (registered_by_principal) REFERENCES admin_users(id) ON DELETE CASCADE
@@ -432,3 +475,6 @@ constraints on triggers:
0015: scripts kind
0016: script imports
0017: cron triggers
0018: files
0019: files triggers
0020: pubsub triggers

View File

@@ -25,22 +25,46 @@
//!
//! Review the resulting diff in the same PR as the new migration.
//!
//! Like the orchestrator integration tests, this is `#[ignore]`'d by
//! default so plain `cargo test --workspace` stays green without
//! infrastructure.
//! v1.1.5: this test is no longer `#[ignore]`'d. It runs whenever
//! `DATABASE_URL` is set (CI wires a `postgres:15` service) and **skips
//! cleanly** when it's absent, so plain `cargo test --workspace` stays
//! green on machines without Postgres. Unlike the previous
//! `#[sqlx::test]` form (which spun up an isolated throwaway database),
//! it now applies the migrations against the `DATABASE_URL` database
//! directly — migrations are forward-only and idempotent, and CI's
//! Postgres is fresh, so the structural dump is identical either way.
use std::fmt::Write as _;
use std::path::PathBuf;
use sqlx::postgres::PgPoolOptions;
use sqlx::{PgPool, Row};
const SCHEMA: &str = "public";
const SNAPSHOT_PATH: &str = "tests/expected_schema.txt";
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "./migrations")]
async fn schema_after_replay_matches_snapshot(pool: PgPool) {
#[tokio::test]
async fn schema_after_replay_matches_snapshot() {
// Skip cleanly when DATABASE_URL is unset so `cargo test --workspace`
// stays green without Postgres. CI sets it (postgres:15 service).
let Ok(url) = std::env::var("DATABASE_URL") else {
eprintln!(
"schema_snapshot: DATABASE_URL unset — skipping. Set it (e.g. \
postgres://picloud:picloud@localhost:5432/picloud) to run this guardrail."
);
return;
};
let pool = PgPoolOptions::new()
.max_connections(1)
.connect(&url)
.await
.expect("connect to DATABASE_URL");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("apply migrations");
let actual = dump_schema(&pool).await;
let snapshot_file = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(SNAPSHOT_PATH);

View File

@@ -11,19 +11,20 @@ use axum::{routing::get, Json, Router};
use picloud_executor_core::{Engine, Limits};
use picloud_manager_core::{
admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router,
attach_principal_if_present, auth_router, compile_routes, dead_letters_router, migrations,
require_authenticated, route_admin_router, triggers_router, AbandonedRepo,
AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState,
ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState,
AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher,
DocsServiceImpl, HttpConfig, HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo,
attach_principal_if_present, auth_router, compile_routes, dead_letters_router,
files_admin_router, migrations, require_authenticated, route_admin_router, triggers_router,
AbandonedRepo, AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository,
AdminsState, ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository,
AppMembersState, AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo,
DeadLettersState, Dispatcher, DocsServiceImpl, FilesAdminState, FilesConfig, FilesServiceImpl,
FsFilesRepo, HttpConfig, HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo,
PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository,
PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository,
PostgresAppRepository, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo,
PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo,
PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver,
RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository,
TriggerConfig, TriggerRepo, TriggersState,
PostgresPubsubRepo, PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo,
PrincipalResolver, PubsubServiceImpl, RepoResolver, RouteAdminState, RouteRepository,
SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo, TriggersState,
};
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
use picloud_orchestrator_core::{
@@ -31,9 +32,9 @@ use picloud_orchestrator_core::{
LocalExecutorClient,
};
use picloud_shared::{
DeadLetterService, DocsService, ExecutionLogSink, HttpService, InboxResolver, KvService,
OutboxWriter, ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION,
SDK_VERSION, WIRE_VERSION,
DeadLetterService, DocsService, ExecutionLogSink, FilesService, HttpService, InboxResolver,
KvService, OutboxWriter, PubsubService, ScriptValidator, ServiceEventEmitter, Services,
API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION,
};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
@@ -157,7 +158,33 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
);
}
let http: Arc<dyn HttpService> = Arc::new(HttpServiceImpl::new(http_config, authz.clone()));
let services = Services::new(kv, docs, dl_service.clone(), events, modules, http);
// v1.1.5 filesystem-backed blob storage. Metadata lives in Postgres;
// the bytes live on disk under `PICLOUD_FILES_ROOT` (default ./data).
let files_config = FilesConfig::from_env();
let files_max_size = files_config.max_file_size_bytes;
let files_repo = Arc::new(FsFilesRepo::new(pool.clone(), files_config));
let files: Arc<dyn FilesService> = Arc::new(FilesServiceImpl::new(
files_repo.clone(),
authz.clone(),
events.clone(),
files_max_size,
));
// v1.1.5 durable pub/sub. Publishes fan out to matching pubsub
// triggers at publish time (one outbox row each), delivered by the
// same dispatcher as every other async trigger.
let pubsub_repo = Arc::new(PostgresPubsubRepo::new(pool.clone()));
let pubsub: Arc<dyn PubsubService> =
Arc::new(PubsubServiceImpl::new(pubsub_repo, authz.clone()));
let services = Services::new(
kv,
docs,
dl_service.clone(),
events,
modules,
http,
files,
pubsub,
);
let engine = Arc::new(Engine::new(Limits::default(), services));
// Compile the routes table once at startup; admin writes refresh it.
@@ -270,6 +297,11 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
apps: apps_repo.clone(),
authz: authz.clone(),
};
let files_admin_state = FilesAdminState {
files: files_repo,
apps: apps_repo.clone(),
authz: authz.clone(),
};
let apps_state = AppsState {
apps: apps_repo,
domains: domains_repo,
@@ -312,6 +344,7 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
.merge(app_members_router(app_members_state))
.merge(api_keys_router(api_keys_state))
.merge(triggers_router(triggers_state))
.merge(files_admin_router(files_admin_state))
.merge(dead_letters_router(dead_letters_state))
.layer(from_fn_with_state(
auth_state.clone(),

339
crates/shared/src/files.rs Normal file
View File

@@ -0,0 +1,339 @@
//! `FilesService` — the v1.1.5 filesystem-backed blob store contract.
//!
//! Lives in `picloud-shared` (not `executor-core`) so the Rhai bridge,
//! the manager-core filesystem+Postgres impl, and any in-memory test
//! impl can all depend on the same trait without dragging
//! `executor-core` into a Postgres or filesystem dependency.
//!
//! Implementations MUST derive every storage `app_id` from `cx.app_id`
//! — never from a script-passed argument. That is the cross-app
//! isolation boundary; see `docs/sdk-shape.md`.
//!
//! `FilesService` is collection-scoped: scripts get a handle via
//! `files::collection(name)` and call
//! `create`/`head`/`get`/`update`/`delete`/`list` on it. The blob bytes
//! never travel through Postgres or through trigger payloads — the row
//! is metadata + a SHA-256 checksum; the bytes live on the filesystem.
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use crate::SdkCallCx;
/// POSIX-portable filename cap (255 bytes).
pub const MAX_FILE_NAME_BYTES: usize = 255;
/// RFC 6838 puts a reasonable media-type ceiling around 127 chars.
pub const MAX_CONTENT_TYPE_BYTES: usize = 127;
/// Payload for `create` — a brand-new blob. The id is server-generated
/// (a UUID); scripts never supply it.
#[derive(Debug, Clone)]
pub struct NewFile {
pub name: String,
pub content_type: String,
pub data: Vec<u8>,
}
/// Payload for `update` — replacement bytes plus optional metadata. If
/// `name` / `content_type` are `None` the prior values are kept.
#[derive(Debug, Clone)]
pub struct FileUpdate {
pub data: Vec<u8>,
pub name: Option<String>,
pub content_type: Option<String>,
}
/// File metadata as scripts and triggers see it. Serialized into
/// `ServiceEvent.payload` (the blob bytes are NOT included — files are
/// too big to ship through trigger payloads), and surfaced to Rhai by
/// `head` / `list`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileMeta {
pub id: Uuid,
pub collection: String,
pub name: String,
pub content_type: String,
pub size: u64,
/// Lowercase hex SHA-256 of the content.
pub checksum: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
/// One page of file metadata from `FilesService::list`. `next_cursor`
/// is `Some` when more pages exist, `None` when exhausted.
#[derive(Debug, Clone)]
pub struct FilesListPage {
pub files: Vec<FileMeta>,
pub next_cursor: Option<String>,
}
#[async_trait]
pub trait FilesService: Send + Sync {
/// Create a new blob; returns its server-generated id. Throws on a
/// missing required field, an over-limit blob, or an invalid
/// collection name.
async fn create(
&self,
cx: &SdkCallCx,
collection: &str,
new: NewFile,
) -> Result<Uuid, FilesError>;
/// Metadata only — no body read. `None` if the file is missing.
async fn head(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<FileMeta>, FilesError>;
/// Full content. `None` if missing. Verifies the stored checksum
/// against the bytes on disk and returns `FilesError::Corrupted`
/// when they diverge.
async fn get(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
) -> Result<Option<Vec<u8>>, FilesError>;
/// Replace content (and optionally metadata). Throws `NotFound`
/// when the file doesn't exist.
async fn update(
&self,
cx: &SdkCallCx,
collection: &str,
id: &str,
upd: FileUpdate,
) -> Result<(), FilesError>;
/// Delete by id; returns whether the file was present.
async fn delete(&self, cx: &SdkCallCx, collection: &str, id: &str) -> Result<bool, FilesError>;
/// Cursor-paginated metadata listing (same shape as KV's list).
async fn list(
&self,
cx: &SdkCallCx,
collection: &str,
cursor: Option<&str>,
limit: u32,
) -> Result<FilesListPage, FilesError>;
}
/// Failure modes surfaced to the Rhai bridge. The bridge converts each
/// to a Rhai runtime error string; the discriminants exist so internal
/// callers (admin endpoints, tests) can react more precisely.
#[derive(Debug, Error)]
pub enum FilesError {
/// Empty collection name, or one containing a path separator / `..`
/// / NUL — rejected at the SDK boundary per `docs/sdk-shape.md`.
#[error("invalid collection name: {0}")]
InvalidCollection(String),
/// A required field on `create` was missing or empty. The string
/// names the field (`name` / `content_type` / `data`).
#[error("missing required field: {0}")]
MissingField(&'static str),
/// Blob exceeds the per-file size cap (default 100 MB,
/// `PICLOUD_FILES_MAX_FILE_SIZE_BYTES`).
#[error("file too large: {size} bytes exceeds limit of {limit} bytes")]
TooLarge { size: usize, limit: usize },
/// Filename exceeds `MAX_FILE_NAME_BYTES`.
#[error("file name too long: {0} bytes exceeds 255")]
NameTooLong(usize),
/// Content-type exceeds `MAX_CONTENT_TYPE_BYTES`.
#[error("content_type too long: {0} bytes exceeds 127")]
ContentTypeTooLong(usize),
/// `update` on a non-existent file.
#[error("file not found")]
NotFound,
/// The bytes on disk no longer match the stored checksum — the
/// filesystem corrupted or a backup was misconfigured. The operator
/// decides what to do with the metadata-vs-bytes mismatch; the repo
/// does NOT auto-delete.
#[error("file content corrupted (checksum mismatch)")]
Corrupted,
/// Caller principal lacked the required capability. Only raised when
/// `cx.principal.is_some()` — scripts running with `principal: None`
/// (public HTTP) operate under script-as-gate semantics and skip
/// the capability check.
#[error("forbidden")]
Forbidden,
/// Anything else — Postgres unavailable, filesystem I/O error, etc.
#[error("files backend error: {0}")]
Backend(String),
}
impl NewFile {
/// Validate required fields + length caps at the SDK boundary.
/// `data` must be non-empty (v1.1.5 treats an empty blob as a
/// missing `data` field — see HANDBACK §7).
///
/// # Errors
///
/// Returns the field-specific [`FilesError`] for the first failing
/// check.
pub fn validate(&self, max_size: usize) -> Result<(), FilesError> {
if self.name.trim().is_empty() {
return Err(FilesError::MissingField("name"));
}
if self.content_type.trim().is_empty() {
return Err(FilesError::MissingField("content_type"));
}
if self.data.is_empty() {
return Err(FilesError::MissingField("data"));
}
if self.name.len() > MAX_FILE_NAME_BYTES {
return Err(FilesError::NameTooLong(self.name.len()));
}
if self.content_type.len() > MAX_CONTENT_TYPE_BYTES {
return Err(FilesError::ContentTypeTooLong(self.content_type.len()));
}
if self.data.len() > max_size {
return Err(FilesError::TooLarge {
size: self.data.len(),
limit: max_size,
});
}
Ok(())
}
}
impl FileUpdate {
/// Validate the replacement bytes + any supplied metadata.
///
/// # Errors
///
/// Returns the field-specific [`FilesError`] for the first failing
/// check.
pub fn validate(&self, max_size: usize) -> Result<(), FilesError> {
if self.data.is_empty() {
return Err(FilesError::MissingField("data"));
}
if let Some(name) = &self.name {
if name.trim().is_empty() {
return Err(FilesError::MissingField("name"));
}
if name.len() > MAX_FILE_NAME_BYTES {
return Err(FilesError::NameTooLong(name.len()));
}
}
if let Some(ct) = &self.content_type {
if ct.trim().is_empty() {
return Err(FilesError::MissingField("content_type"));
}
if ct.len() > MAX_CONTENT_TYPE_BYTES {
return Err(FilesError::ContentTypeTooLong(ct.len()));
}
}
if self.data.len() > max_size {
return Err(FilesError::TooLarge {
size: self.data.len(),
limit: max_size,
});
}
Ok(())
}
}
/// Reject a collection name that is empty or could escape the per-app
/// files tree. UUID-shaped ids never produce traversal paths, but
/// collection names come from scripts so they're validated defensively
/// at both the SDK boundary and the repo.
///
/// # Errors
///
/// Returns [`FilesError::InvalidCollection`] when the name is empty or
/// contains `/`, `\`, `..`, or a NUL byte.
pub fn validate_collection(collection: &str) -> Result<(), FilesError> {
if collection.is_empty() {
return Err(FilesError::InvalidCollection("must not be empty".into()));
}
if collection.contains('/')
|| collection.contains('\\')
|| collection.contains("..")
|| collection.contains('\0')
{
return Err(FilesError::InvalidCollection(format!(
"collection {collection:?} must not contain '/', '\\', '..', or NUL"
)));
}
Ok(())
}
/// Stub used by the test harness so executor-core integration tests
/// (which don't touch files) can construct a `Services` bundle without
/// a filesystem or Postgres. Every call returns
/// `FilesError::Backend("...")` so accidental use surfaces clearly.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopFilesService;
#[async_trait]
impl FilesService for NoopFilesService {
async fn create(
&self,
_cx: &SdkCallCx,
_collection: &str,
_new: NewFile,
) -> Result<Uuid, FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
async fn head(
&self,
_cx: &SdkCallCx,
_collection: &str,
_id: &str,
) -> Result<Option<FileMeta>, FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
async fn get(
&self,
_cx: &SdkCallCx,
_collection: &str,
_id: &str,
) -> Result<Option<Vec<u8>>, FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
async fn update(
&self,
_cx: &SdkCallCx,
_collection: &str,
_id: &str,
_upd: FileUpdate,
) -> Result<(), FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
async fn delete(
&self,
_cx: &SdkCallCx,
_collection: &str,
_id: &str,
) -> Result<bool, FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
async fn list(
&self,
_cx: &SdkCallCx,
_collection: &str,
_cursor: Option<&str>,
_limit: u32,
) -> Result<FilesListPage, FilesError> {
Err(FilesError::Backend("files is not wired in".into()))
}
}

View File

@@ -12,6 +12,7 @@ pub mod error;
pub mod events;
pub mod exec_summary;
pub mod execution_log;
pub mod files;
pub mod http;
pub mod ids;
pub mod inbox;
@@ -19,6 +20,7 @@ pub mod kv;
pub mod log_sink;
pub mod modules;
pub mod outbox_writer;
pub mod pubsub;
pub mod route;
pub mod sandbox;
pub mod script;
@@ -36,6 +38,10 @@ pub use error::Error;
pub use events::{EmitError, NoopEventEmitter, ServiceEvent, ServiceEventEmitter};
pub use exec_summary::ExecResponseSummary;
pub use execution_log::{ExecutionLog, ExecutionStatus};
pub use files::{
validate_collection as validate_files_collection, FileMeta, FileUpdate, FilesError,
FilesListPage, FilesService, NewFile, NoopFilesService,
};
pub use http::{HttpError, HttpRequest, HttpResponse, HttpService, NoopHttpService};
pub use ids::{AdminUserId, ApiKeyId, AppId, ExecutionId, RequestId, ScriptId, TriggerId};
pub use inbox::{
@@ -45,11 +51,16 @@ pub use kv::{KvError, KvListPage, KvService, NoopKvService};
pub use log_sink::{ExecutionLogSink, LogSinkError};
pub use modules::{ModuleScript, ModuleSource, ModuleSourceError, NoopModuleSource};
pub use outbox_writer::{HttpDispatchPayload, NewHttpOutbox, OutboxWriter, OutboxWriterError};
pub use pubsub::{
topic_matches, validate_topic_pattern, NoopPubsubService, PubsubError, PubsubService,
};
pub use route::{DispatchMode, HostKind, PathKind, Route};
pub use sandbox::ScriptSandbox;
pub use script::{Script, ScriptKind};
pub use sdk_cx::SdkCallCx;
pub use services::Services;
pub use trigger_event::{DeadLetterEventDetail, DocsEventOp, KvEventOp, TriggerEvent};
pub use trigger_event::{
DeadLetterEventDetail, DocsEventOp, FilesEventOp, KvEventOp, TriggerEvent,
};
pub use validator::{ScriptValidator, ValidatedScript, ValidationError};
pub use version::{API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION};

161
crates/shared/src/pubsub.rs Normal file
View File

@@ -0,0 +1,161 @@
//! `PubsubService` — the v1.1.5 durable pub/sub contract.
//!
//! `pubsub::publish_durable(topic, message)` writes to the universal
//! outbox; the publish-time fan-out inserts one delivery row per
//! matching `pubsub` trigger, and each delivery retries / dead-letters
//! independently (the dispatcher already handles one-row-equals-one-
//! dispatch — no dispatcher changes for pub/sub).
//!
//! `publish_ephemeral` is committed as a v1.2 addition — the suffix
//! naming exists now so users learn "durable by default" from day one.
//!
//! Topic pattern matching runs in Rust (not SQL) so the trigger-select
//! query stays simple. The matcher + validator live here in
//! `picloud-shared` so the manager-core publish path, the admin trigger
//! endpoint, and tests all agree on the rules.
use async_trait::async_trait;
use thiserror::Error;
use crate::SdkCallCx;
#[async_trait]
pub trait PubsubService: Send + Sync {
/// Durable publish: writes the message to the outbox, fanned out to
/// every matching enabled `pubsub` trigger in `cx.app_id`. Succeeds
/// silently (zero rows written) when no trigger matches the topic.
async fn publish_durable(
&self,
cx: &SdkCallCx,
topic: &str,
message: serde_json::Value,
) -> Result<(), PubsubError>;
}
#[derive(Debug, Error)]
pub enum PubsubError {
/// Empty topic; rejected at the SDK boundary.
#[error("topic must not be empty")]
EmptyTopic,
/// Caller principal lacked the required capability. Only raised when
/// `cx.principal.is_some()` (script-as-gate; public HTTP skips it).
#[error("forbidden")]
Forbidden,
/// Serialization / validation failure on the message.
#[error("pubsub rejected: {0}")]
Rejected(String),
/// Anything else — Postgres unavailable, etc.
#[error("pubsub backend error: {0}")]
Unavailable(String),
}
/// Match a stored `topic_pattern` against a published `topic`.
///
/// - `"*"` matches every topic.
/// - `"<prefix>.*"` matches any topic starting with `"<prefix>."`.
/// - anything else is an exact match.
///
/// Mid-pattern wildcards (`*.created`, `a.*.b`) are NOT supported — they
/// are rejected at trigger creation by [`validate_topic_pattern`], so
/// the only patterns reaching this matcher are exact / prefix / `*`.
#[must_use]
pub fn topic_matches(pattern: &str, topic: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix('*') {
// `prefix` retains the trailing '.', e.g. "user." for "user.*".
return topic.starts_with(prefix);
}
pattern == topic
}
/// Validate a subscription topic pattern. Accepts exactly: `"*"`
/// (universal), `"<prefix>.*"` (prefix wildcard, single trailing star),
/// or a literal with no `*` (exact). Everything else — mid-pattern
/// wildcards, multiple stars, a star not at the end — is rejected.
///
/// # Errors
///
/// Returns `Err(message)` with `"unsupported pubsub topic pattern: …"`
/// for any unsupported shape (or an empty pattern).
pub fn validate_topic_pattern(pattern: &str) -> Result<(), String> {
if pattern.is_empty() {
return Err("unsupported pubsub topic pattern: <empty>".to_string());
}
if pattern == "*" {
return Ok(());
}
let stars = pattern.matches('*').count();
if stars == 0 {
return Ok(()); // exact
}
if stars == 1 && pattern.ends_with(".*") {
return Ok(()); // prefix wildcard
}
Err(format!("unsupported pubsub topic pattern: {pattern}"))
}
/// Stub for the test harness so executor-core integration tests can
/// build a `Services` bundle without a database. Every call errors.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopPubsubService;
#[async_trait]
impl PubsubService for NoopPubsubService {
async fn publish_durable(
&self,
_cx: &SdkCallCx,
_topic: &str,
_message: serde_json::Value,
) -> Result<(), PubsubError> {
Err(PubsubError::Unavailable("pubsub is not wired in".into()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn exact_match() {
assert!(topic_matches("user.created", "user.created"));
assert!(!topic_matches("user.created", "user.deleted"));
assert!(!topic_matches("user.created", "user.created.x"));
}
#[test]
fn prefix_wildcard() {
assert!(topic_matches("user.*", "user.created"));
assert!(topic_matches("user.*", "user.deleted"));
assert!(!topic_matches("user.*", "users.created"));
assert!(!topic_matches("user.*", "order.created"));
}
#[test]
fn universal() {
assert!(topic_matches("*", "anything"));
assert!(topic_matches("*", "a.b.c"));
}
#[test]
fn validation_accepts_supported_shapes() {
assert!(validate_topic_pattern("*").is_ok());
assert!(validate_topic_pattern("user.created").is_ok());
assert!(validate_topic_pattern("user.*").is_ok());
assert!(validate_topic_pattern("a.b.c").is_ok());
}
#[test]
fn validation_rejects_unsupported_shapes() {
for bad in ["*.created", "**", "a.*.b", "user.*x", "*user", ""] {
assert!(
validate_topic_pattern(bad).is_err(),
"expected {bad:?} to be rejected"
);
}
}
}

View File

@@ -20,9 +20,9 @@
use std::sync::Arc;
use crate::{
DeadLetterService, DocsService, HttpService, KvService, ModuleSource, NoopDeadLetterService,
NoopDocsService, NoopEventEmitter, NoopHttpService, NoopKvService, NoopModuleSource,
ServiceEventEmitter,
DeadLetterService, DocsService, FilesService, HttpService, KvService, ModuleSource,
NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopFilesService, NoopHttpService,
NoopKvService, NoopModuleSource, NoopPubsubService, PubsubService, ServiceEventEmitter,
};
/// SDK service bundle. See module docs for the lifecycle and the v1.1.x
@@ -60,6 +60,19 @@ pub struct Services {
/// the picloud binary; `NoopHttpService` in tests that don't make
/// network calls.
pub http: Arc<dyn HttpService>,
/// Filesystem-backed blob storage (v1.1.5). Scripts get
/// `files::collection(name).{create,head,get,update,delete,list}`.
/// Backed by a Postgres-metadata + on-disk-bytes repo in the
/// picloud binary; `NoopFilesService` in tests that don't touch
/// files.
pub files: Arc<dyn FilesService>,
/// Durable pub/sub (v1.1.5). Scripts get
/// `pubsub::publish_durable(topic, message)`. Backed by a
/// publish-time outbox fan-out in the picloud binary;
/// `NoopPubsubService` in tests that don't publish.
pub pubsub: Arc<dyn PubsubService>,
}
impl Services {
@@ -67,6 +80,7 @@ impl Services {
/// The picloud binary's `main` wires this up after the DB pool is
/// open; tests build it from in-memory fakes.
#[must_use]
#[allow(clippy::too_many_arguments)] // one Arc per stateful service; a builder would just move the noise
pub fn new(
kv: Arc<dyn KvService>,
docs: Arc<dyn DocsService>,
@@ -74,6 +88,8 @@ impl Services {
events: Arc<dyn ServiceEventEmitter>,
modules: Arc<dyn ModuleSource>,
http: Arc<dyn HttpService>,
files: Arc<dyn FilesService>,
pubsub: Arc<dyn PubsubService>,
) -> Self {
Self {
kv,
@@ -82,6 +98,8 @@ impl Services {
events,
modules,
http,
files,
pubsub,
}
}
@@ -99,6 +117,8 @@ impl Services {
Arc::new(NoopEventEmitter),
Arc::new(NoopModuleSource),
Arc::new(NoopHttpService),
Arc::new(NoopFilesService),
Arc::new(NoopPubsubService),
)
}
}

View File

@@ -78,6 +78,39 @@ impl DocsEventOp {
}
}
/// Operations a files trigger can fire on. v1.1.5. Stored as a
/// lowercase string in `files_trigger_details.ops` (Postgres `text[]`).
/// CRUD verbs (`create`) mirror `DocsEventOp`, distinct from KV's
/// set/upsert flavour.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FilesEventOp {
Create,
Update,
Delete,
}
impl FilesEventOp {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Create => "create",
Self::Update => "update",
Self::Delete => "delete",
}
}
#[must_use]
pub fn from_wire(s: &str) -> Option<Self> {
match s {
"create" => Some(Self::Create),
"update" => Some(Self::Update),
"delete" => Some(Self::Delete),
_ => None,
}
}
}
/// Discriminated description of a triggering event. Lifted from the
/// outbox row's payload at dispatch time. Each variant carries the
/// fields the corresponding `ctx.event` shape exposes to the script.
@@ -123,6 +156,36 @@ pub enum TriggerEvent {
fired_at: DateTime<Utc>,
},
/// A files create / update / delete fired this handler. v1.1.5.
/// Carries the affected file's **metadata only** — never the blob
/// bytes (files are too big to ship through trigger payloads). A
/// handler that wants the bytes calls
/// `files::collection(c).get(id)` itself. `prev` is the prior
/// metadata for update (and the deleted-row metadata for delete);
/// absent on create. Surfaced to scripts as `ctx.event.files`.
Files {
op: FilesEventOp,
collection: String,
/// UUID as string — Rhai sees it as a string.
id: String,
name: String,
content_type: String,
size: u64,
/// Lowercase hex SHA-256.
checksum: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
prev: Option<serde_json::Value>,
},
/// A durable pub/sub publish fired this handler. v1.1.5. Carries
/// the topic, the JSON-decoded message, and the publish instant.
/// Surfaced to scripts as `ctx.event.pubsub`.
Pubsub {
topic: String,
message: serde_json::Value,
published_at: DateTime<Utc>,
},
/// A dead-letter row fired this handler. The original event is
/// nested verbatim plus the dead-letter metadata the design notes
/// §4 require.
@@ -148,6 +211,8 @@ impl TriggerEvent {
Self::Kv { .. } => "kv",
Self::Docs { .. } => "docs",
Self::Cron { .. } => "cron",
Self::Files { .. } => "files",
Self::Pubsub { .. } => "pubsub",
Self::DeadLetter { .. } => "dead_letter",
}
}

View File

@@ -39,7 +39,17 @@ pub const PRODUCT_VERSION: &str = env!("CARGO_PKG_VERSION");
/// SSRF deny-list on the resolved IP); `ctx.event.cron` for cron-trigger
/// handlers (carries `schedule`, `timezone`, `scheduled_at`, `fired_at`).
/// The `Services` bundle gains `http: Arc<dyn HttpService>`.
pub const SDK_VERSION: &str = "1.5";
///
/// 1.6 additions (v1.1.5):
/// `files::collection(name).{create,head,get,update,delete,list}` —
/// filesystem-backed blob storage (blobs in/out; metadata maps;
/// checksum-verified reads) with `ctx.event.files` for files-trigger
/// handlers (metadata only, never the bytes); and
/// `pubsub::publish_durable(topic, message)` — durable pub/sub with
/// publish-time fan-out and `ctx.event.pubsub` for pubsub-trigger
/// handlers. The `Services` bundle gains `files: Arc<dyn FilesService>`
/// and `pubsub: Arc<dyn PubsubService>`.
pub const SDK_VERSION: &str = "1.6";
/// HTTP API major version. Appears in URL paths as `/api/v{N}/...`.
/// Bump (new integer + new URL prefix) when the request/response

View File

@@ -1,6 +1,6 @@
{
"name": "picloud-dashboard",
"version": "0.10.0",
"version": "0.11.0",
"private": true,
"type": "module",
"scripts": {

View File

@@ -211,7 +211,7 @@ export interface DeadLetterRow {
resolution: 'replayed' | 'ignored' | 'handled_by_script' | 'handler_failed' | null;
}
export type TriggerKind = 'kv' | 'docs' | 'dead_letter' | 'cron';
export type TriggerKind = 'kv' | 'docs' | 'dead_letter' | 'cron' | 'files' | 'pubsub';
export type TriggerDispatchMode = 'sync' | 'async';
/// Per-kind detail, tagged by `kind` to match the Rust serde shape.
@@ -219,7 +219,21 @@ export type TriggerDetails =
| { kind: 'kv'; collection_glob: string; ops: string[] }
| { kind: 'docs'; collection_glob: string; ops: string[] }
| { kind: 'dead_letter'; source_filter?: string; trigger_id_filter?: string; script_id_filter?: string }
| { kind: 'cron'; schedule: string; timezone: string; last_fired_at?: string | null };
| { kind: 'cron'; schedule: string; timezone: string; last_fired_at?: string | null }
| { kind: 'files'; collection_glob: string; ops: string[] }
| { kind: 'pubsub'; topic_pattern: string };
/// v1.1.5 file metadata as the admin files endpoint returns it.
export interface FileMeta {
id: string;
collection: string;
name: string;
content_type: string;
size: number;
checksum: string;
created_at: string;
updated_at: string;
}
export interface Trigger {
id: string;
@@ -247,6 +261,15 @@ export interface CreateCronTriggerInput {
retry_base_ms?: number;
}
export interface CreatePubsubTriggerInput {
script_id: string;
topic_pattern: string;
dispatch_mode?: TriggerDispatchMode;
retry_max_attempts?: number;
retry_backoff?: 'exponential' | 'linear' | 'constant';
retry_base_ms?: number;
}
export interface ExecutionResult {
status: number;
headers: Record<string, string>;
@@ -618,6 +641,11 @@ export const api = {
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/cron`,
{ method: 'POST', body: JSON.stringify(input) }
),
createPubsub: (idOrSlug: string, input: CreatePubsubTriggerInput) =>
adminRequest<Trigger>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/pubsub`,
{ method: 'POST', body: JSON.stringify(input) }
),
remove: (idOrSlug: string, triggerId: string) =>
adminRequest<null>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/${triggerId}`,
@@ -625,6 +653,23 @@ export const api = {
)
},
files: {
list: (idOrSlug: string, collection: string, opts: { cursor?: string; limit?: number } = {}) => {
const params = new URLSearchParams();
params.set('collection', collection);
if (opts.cursor) params.set('cursor', opts.cursor);
if (opts.limit !== undefined) params.set('limit', String(opts.limit));
return adminRequest<{ files: FileMeta[]; next_cursor: string | null }>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/files?${params.toString()}`
);
},
remove: (idOrSlug: string, collection: string, fileId: string) =>
adminRequest<null>(
`/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/files/${encodeURIComponent(collection)}/${fileId}`,
{ method: 'DELETE' }
)
},
execute: async (
id: string,
body: unknown,

View File

@@ -118,6 +118,11 @@
let createCronTimezone = $state('UTC');
let creatingCron = $state(false);
let createCronError = $state<string | null>(null);
// Pub/Sub triggers (v1.1.5).
let createPubsubScriptId = $state('');
let createPubsubTopic = $state('');
let creatingPubsub = $state(false);
let createPubsubError = $state<string | null>(null);
let triggerToRemove = $state<Trigger | null>(null);
let removingTrigger = $state(false);
// Endpoint scripts only — modules can't be trigger targets.
@@ -153,6 +158,27 @@
}
}
async function submitCreatePubsub(e: SubmitEvent) {
e.preventDefault();
if (!app) return;
creatingPubsub = true;
createPubsubError = null;
try {
await api.triggers.createPubsub(app.id, {
script_id: createPubsubScriptId,
topic_pattern: createPubsubTopic.trim()
});
createPubsubScriptId = '';
createPubsubTopic = '';
await loadTriggers(app.id);
} catch (err) {
createPubsubError =
err instanceof ApiError ? err.message : err instanceof Error ? err.message : String(err);
} finally {
creatingPubsub = false;
}
}
async function confirmRemoveTrigger() {
if (!app || !triggerToRemove) return;
removingTrigger = true;
@@ -530,6 +556,13 @@
class:active={activeTab === 'settings'}
onclick={() => (activeTab = 'settings')}>Settings</button
>
<a
class="tab-link"
href="{base}/apps/{slug}/files"
title="Files — browse and delete stored blobs by collection"
>
Files
</a>
<a
class="tab-link"
href="{base}/apps/{slug}/dead-letters"
@@ -836,6 +869,42 @@
</div>
</form>
<h2>Pub/Sub triggers</h2>
<p class="muted">
Subscribe an endpoint script to durable pub/sub messages. Topic
patterns are an exact topic (<code>user.created</code>), a prefix
wildcard (<code>user.*</code>), or <code>*</code> for every topic.
</p>
<form class="create-form" onsubmit={submitCreatePubsub}>
<div class="row">
<label>
<span>Target script</span>
<select bind:value={createPubsubScriptId} required>
<option value="" disabled>Select an endpoint script…</option>
{#each endpointScripts as s (s.id)}
<option value={s.id}>{s.name}</option>
{/each}
</select>
</label>
<label>
<span>Topic pattern</span>
<input bind:value={createPubsubTopic} required placeholder="user.*" />
</label>
</div>
{#if createPubsubError}
<div class="error">{createPubsubError}</div>
{/if}
<div class="actions">
<button
type="submit"
disabled={creatingPubsub || !createPubsubScriptId || !createPubsubTopic.trim()}
>
{creatingPubsub ? 'Creating…' : 'Create pub/sub trigger'}
</button>
</div>
</form>
{#if triggers.length === 0}
<p class="muted">No triggers in this app yet.</p>
{:else}
@@ -850,9 +919,11 @@
<span class="muted small">
last fired: {t.details.last_fired_at ?? 'never'}
</span>
{:else if t.details.kind === 'kv' || t.details.kind === 'docs'}
{:else if t.details.kind === 'kv' || t.details.kind === 'docs' || t.details.kind === 'files'}
<code>{t.details.collection_glob}</code>
<span class="muted">{t.details.ops.join(', ') || 'any op'}</span>
{:else if t.details.kind === 'pubsub'}
<code>{t.details.topic_pattern}</code>
{/if}
<span class="muted small">{t.script_id}</span>
</div>

View File

@@ -0,0 +1,229 @@
<script lang="ts">
import { base } from '$app/paths';
import { page } from '$app/state';
import { api, ApiError, type App, type FileMeta } from '$lib/api';
import ConfirmModal from '$lib/ConfirmModal.svelte';
let slug = $derived(page.params.slug ?? '');
let app = $state<App | null>(null);
let collection = $state('');
let activeCollection = $state('');
let files = $state<FileMeta[]>([]);
let nextCursor = $state<string | null>(null);
let loading = $state(false);
let error = $state<string | null>(null);
let fileToRemove = $state<FileMeta | null>(null);
let removing = $state(false);
async function loadApp() {
try {
app = await api.apps.get(slug);
} catch (e) {
error = e instanceof ApiError ? e.message : String(e);
}
}
$effect(() => {
void slug;
void loadApp();
});
async function loadFiles(cursor?: string) {
const c = collection.trim();
if (!c) {
error = 'Enter a collection name to list its files.';
return;
}
loading = true;
error = null;
try {
const res = await api.files.list(slug, c, { cursor, limit: 100 });
if (cursor) {
files = [...files, ...res.files];
} else {
files = res.files;
activeCollection = c;
}
nextCursor = res.next_cursor;
} catch (e) {
error = e instanceof ApiError ? e.message : String(e);
} finally {
loading = false;
}
}
async function confirmRemove() {
if (!fileToRemove) return;
removing = true;
try {
await api.files.remove(slug, fileToRemove.collection, fileToRemove.id);
files = files.filter((f) => f.id !== fileToRemove!.id);
fileToRemove = null;
} catch (e) {
error = e instanceof ApiError ? e.message : String(e);
} finally {
removing = false;
}
}
function fmtTime(iso: string): string {
return new Date(iso).toLocaleString();
}
function fmtSize(bytes: number): string {
if (bytes < 1024) return `${bytes} B`;
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
}
</script>
<svelte:head>
<title>Files · {slug} · PiCloud</title>
</svelte:head>
<div class="container">
<header>
<div>
<a href="{base}/apps/{slug}" class="back">&larr; back to {app?.name ?? slug}</a>
<h1>Files</h1>
<p class="subtitle">
Browse and delete stored blobs by collection. Uploads happen from scripts via
<code>files::collection(c).create(…)</code>.
</p>
</div>
</header>
<form
class="collection-form"
onsubmit={(e) => {
e.preventDefault();
void loadFiles();
}}
>
<label>
<span>Collection</span>
<input bind:value={collection} placeholder="avatars" required />
</label>
<button type="submit" disabled={loading || !collection.trim()}>
{loading ? 'Loading…' : 'List files'}
</button>
</form>
{#if error}
<div class="error">{error}</div>
{/if}
{#if activeCollection}
{#if files.length === 0 && !loading}
<p class="muted">No files in collection <code>{activeCollection}</code>.</p>
{:else}
<table>
<thead>
<tr>
<th>Name</th>
<th>Content type</th>
<th>Size</th>
<th>Created</th>
<th>ID</th>
<th></th>
</tr>
</thead>
<tbody>
{#each files as f (f.id)}
<tr>
<td>{f.name}</td>
<td><code>{f.content_type}</code></td>
<td>{fmtSize(f.size)}</td>
<td>{fmtTime(f.created_at)}</td>
<td class="mono small">{f.id}</td>
<td>
<button type="button" class="danger" onclick={() => (fileToRemove = f)}>
Delete
</button>
</td>
</tr>
{/each}
</tbody>
</table>
{#if nextCursor}
<button type="button" class="secondary" onclick={() => loadFiles(nextCursor ?? undefined)}>
Load more
</button>
{/if}
{/if}
{/if}
</div>
{#if fileToRemove}
<ConfirmModal
title="Delete file"
variant="danger"
confirmLabel="Delete file"
onConfirm={confirmRemove}
onCancel={() => (fileToRemove = null)}
>
<p>
Delete <strong>{fileToRemove.name}</strong> ({fmtSize(fileToRemove.size)}) from collection
<code>{fileToRemove.collection}</code>? This removes both the metadata row and the bytes on
disk and cannot be undone.
</p>
{#if removing}<p class="muted">Deleting…</p>{/if}
</ConfirmModal>
{/if}
<style>
.container {
max-width: 60rem;
margin: 0 auto;
padding: 1.5rem;
}
header {
margin-bottom: 1rem;
}
.back {
font-size: 0.85rem;
}
.subtitle {
color: var(--muted, #666);
font-size: 0.9rem;
}
.collection-form {
display: flex;
align-items: flex-end;
gap: 0.75rem;
margin-bottom: 1rem;
}
.collection-form label {
display: flex;
flex-direction: column;
gap: 0.25rem;
font-size: 0.85rem;
}
table {
width: 100%;
border-collapse: collapse;
font-size: 0.9rem;
}
th,
td {
text-align: left;
padding: 0.4rem 0.6rem;
border-bottom: 1px solid var(--border, #e2e2e2);
}
.mono {
font-family: monospace;
}
.small {
font-size: 0.78rem;
}
.muted {
color: var(--muted, #666);
}
.error {
color: #b00020;
margin: 0.5rem 0;
}
button.danger {
color: #b00020;
}
</style>