use std::collections::BTreeMap; use async_trait::async_trait; use picloud_orchestrator_core::{ResolverError, ScriptResolver}; use picloud_shared::{ AdminUserId, AppId, ExecutionLog, ExecutionStatus, RequestId, Script, ScriptId, ScriptKind, ScriptSandbox, }; use sqlx::PgPool; #[derive(Debug, thiserror::Error)] pub enum ScriptRepositoryError { #[error("database error: {0}")] Db(#[from] sqlx::Error), #[error("not found: {0}")] NotFound(ScriptId), #[error("conflict: {0}")] Conflict(String), } /// CRUD over the `scripts` table. #[async_trait] pub trait ScriptRepository: Send + Sync { async fn get(&self, id: ScriptId) -> Result, ScriptRepositoryError>; /// Every script across all apps. Mostly for tests and admin /// "global" views; the dashboard reaches scripts via `list_for_app`. async fn list(&self) -> Result, ScriptRepositoryError>; async fn list_for_app(&self, app_id: AppId) -> Result, ScriptRepositoryError>; /// Every script in any app the user is a member of. Drives /// `GET /admin/scripts` for `member` instance-role callers so the /// API never returns scripts they shouldn't see — even before the /// per-handler capability check fires. async fn list_for_user( &self, user_id: AdminUserId, ) -> Result, ScriptRepositoryError>; async fn create(&self, input: NewScript) -> Result; async fn update( &self, id: ScriptId, patch: ScriptPatch, ) -> Result; async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError>; /// v1.1.3: how many routes reference this script. Used by the /// API layer to refuse `endpoint → module` kind changes when the /// script is still bound to user-facing entry points. async fn count_routes_for_script( &self, script_id: ScriptId, ) -> Result; /// v1.1.3: how many triggers (kv / docs / dead-letter) target /// this script. Same purpose as `count_routes_for_script`. async fn count_triggers_for_script( &self, script_id: ScriptId, ) -> Result; /// v1.1.3: list module dependencies of this script — the rows in /// `script_imports` where `importer_script_id = script_id`. Used /// by tests and (eventually) a dashboard "Imports" panel. async fn list_imports( &self, script_id: ScriptId, ) -> Result, ScriptRepositoryError>; } /// Inbound shape for create. Defaults match the migration's CHECK /// constraints; the repo enforces them in the DB regardless. #[derive(Debug, Clone)] pub struct NewScript { pub app_id: AppId, pub name: String, pub description: Option, pub source: String, /// Defaults to `Endpoint` if absent. `Module` scripts cannot be /// bound to routes or used as trigger targets. pub kind: ScriptKind, pub timeout_seconds: Option, pub memory_limit_mb: Option, /// Sandbox overrides; `None` means store an empty object (use /// platform defaults at exec time). pub sandbox: Option, /// v1.1.3: literal-path `import ""` declarations extracted /// from the source. The repo writes these into `script_imports` /// transactionally with the script row. Empty when validation /// found no imports (the common case for endpoints today). pub imports: Vec, } /// Inbound shape for update. `None` fields are left untouched. #[derive(Debug, Clone, Default)] pub struct ScriptPatch { pub name: Option, pub description: Option>, pub source: Option, pub timeout_seconds: Option, pub memory_limit_mb: Option, /// `Some(sandbox)` replaces the stored overrides wholesale (including /// `Some(empty)` to clear them); `None` leaves them untouched. pub sandbox: Option, /// `Some(new_kind)` changes the script's role; the API layer /// rejects unsafe transitions (e.g. endpoint→module when routes /// or triggers reference the script). pub kind: Option, /// v1.1.3: when `source` is also `Some`, the repo replaces the /// `script_imports` edges for this script with these names. /// `None` keeps the existing edges untouched (a name/description /// edit alone shouldn't touch the dep graph). pub imports: Option>, } pub struct PostgresScriptRepository { pool: PgPool, } impl PostgresScriptRepository { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } #[must_use] pub fn pool(&self) -> &PgPool { &self.pool } } /// Columns selected from `scripts` everywhere — kept in one constant so /// adding `kind` (v1.1.3) and future columns can't accidentally skip /// one query. const SCRIPT_SELECT_COLS: &str = "id, app_id, name, description, version, source, kind, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at"; #[async_trait] impl ScriptRepository for PostgresScriptRepository { async fn get(&self, id: ScriptId) -> Result, ScriptRepositoryError> { let row = sqlx::query_as::<_, ScriptRow>(&format!( "SELECT {SCRIPT_SELECT_COLS} FROM scripts WHERE id = $1" )) .bind(id.into_inner()) .fetch_optional(&self.pool) .await?; Ok(row.map(Into::into)) } async fn list(&self) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ScriptRow>(&format!( "SELECT {SCRIPT_SELECT_COLS} FROM scripts ORDER BY name" )) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } async fn list_for_app(&self, app_id: AppId) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ScriptRow>(&format!( "SELECT {SCRIPT_SELECT_COLS} FROM scripts WHERE app_id = $1 ORDER BY name" )) .bind(app_id.into_inner()) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } async fn list_for_user( &self, user_id: AdminUserId, ) -> Result, ScriptRepositoryError> { let cols = SCRIPT_SELECT_COLS .split(", ") .map(|c| format!("s.{c}")) .collect::>() .join(", "); let rows = sqlx::query_as::<_, ScriptRow>(&format!( "SELECT {cols} FROM scripts s \ JOIN app_members m ON m.app_id = s.app_id \ WHERE m.user_id = $1 \ ORDER BY s.name" )) .bind(user_id.into_inner()) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } async fn create(&self, input: NewScript) -> Result { let sandbox_json = serde_json::to_value(input.sandbox.unwrap_or_default()) .unwrap_or_else(|_| serde_json::json!({})); let mut tx = self.pool.begin().await?; let res = sqlx::query_as::<_, ScriptRow>(&format!( "INSERT INTO scripts ( \ app_id, name, description, source, kind, \ timeout_seconds, memory_limit_mb, sandbox \ ) VALUES ($1, $2, $3, $4, $5, COALESCE($6, 30), COALESCE($7, 256), $8) \ RETURNING {SCRIPT_SELECT_COLS}" )) .bind(input.app_id.into_inner()) .bind(&input.name) .bind(input.description.as_deref()) .bind(&input.source) .bind(input.kind.as_str()) .bind(input.timeout_seconds) .bind(input.memory_limit_mb) .bind(sandbox_json) .fetch_one(&mut *tx) .await; let script: Script = match res { Ok(row) => row.into(), Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { return Err(ScriptRepositoryError::Conflict(format!( "a script named {:?} already exists in this app", input.name ))); } Err(e) => return Err(e.into()), }; // Dep-graph: write any literal-path imports declared in the // source. Unresolved names (the referenced module doesn't // exist yet) are silently skipped — best-effort. replace_imports_tx(&mut tx, script.id, script.app_id, &input.imports).await?; tx.commit().await?; Ok(script) } async fn update( &self, id: ScriptId, patch: ScriptPatch, ) -> Result { // COALESCE-based partial update: `NULL` parameters leave columns // untouched. Description is double-Optioned so callers can // explicitly set it to NULL (Some(None)) vs leave it alone (None). // Sandbox is replaced wholesale when present; per-field merging // happens in the API layer (clearer semantics for a "PUT a new // sandbox config" call). app_id is immutable — moving a script // to another app is a copy-and-delete, not an in-place edit. let sandbox_json = patch .sandbox .as_ref() .map(|s| serde_json::to_value(s).unwrap_or_else(|_| serde_json::json!({}))); let mut tx = self.pool.begin().await?; let res = sqlx::query_as::<_, ScriptRow>(&format!( "UPDATE scripts SET \ name = COALESCE($2, name), \ description = CASE WHEN $3::bool THEN $4 ELSE description END, \ source = COALESCE($5, source), \ timeout_seconds = COALESCE($6, timeout_seconds), \ memory_limit_mb = COALESCE($7, memory_limit_mb), \ sandbox = COALESCE($8, sandbox), \ kind = COALESCE($9, kind), \ version = version + 1, \ updated_at = NOW() \ WHERE id = $1 \ RETURNING {SCRIPT_SELECT_COLS}" )) .bind(id.into_inner()) .bind(patch.name.as_deref()) .bind(patch.description.is_some()) .bind(patch.description.as_ref().and_then(|d| d.as_deref())) .bind(patch.source.as_deref()) .bind(patch.timeout_seconds) .bind(patch.memory_limit_mb) .bind(sandbox_json) .bind(patch.kind.map(|k| k.as_str())) .fetch_optional(&mut *tx) .await; let script: Script = match res { Ok(Some(row)) => row.into(), Ok(None) => return Err(ScriptRepositoryError::NotFound(id)), Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { return Err(ScriptRepositoryError::Conflict( "a script with that name already exists in this app".into(), )); } Err(e) => return Err(e.into()), }; // Replace imports only when the caller has a fresh list (i.e. // the source actually changed and the validator re-extracted // imports). A name-only or description-only edit leaves the // dep graph alone. if let Some(imports) = patch.imports.as_deref() { replace_imports_tx(&mut tx, script.id, script.app_id, imports).await?; } tx.commit().await?; Ok(script) } async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError> { let res = sqlx::query("DELETE FROM scripts WHERE id = $1") .bind(id.into_inner()) .execute(&self.pool) .await?; if res.rows_affected() == 0 { return Err(ScriptRepositoryError::NotFound(id)); } Ok(()) } async fn count_routes_for_script( &self, script_id: ScriptId, ) -> Result { let n: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM routes WHERE script_id = $1") .bind(script_id.into_inner()) .fetch_one(&self.pool) .await?; Ok(n.0) } async fn count_triggers_for_script( &self, script_id: ScriptId, ) -> Result { let n: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM triggers WHERE script_id = $1") .bind(script_id.into_inner()) .fetch_one(&self.pool) .await?; Ok(n.0) } async fn list_imports( &self, script_id: ScriptId, ) -> Result, ScriptRepositoryError> { let cols = SCRIPT_SELECT_COLS .split(", ") .map(|c| format!("s.{c}")) .collect::>() .join(", "); let rows = sqlx::query_as::<_, ScriptRow>(&format!( "SELECT {cols} FROM scripts s \ JOIN script_imports i ON i.imported_script_id = s.id \ WHERE i.importer_script_id = $1 \ ORDER BY s.name" )) .bind(script_id.into_inner()) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } } /// Replace the `script_imports` edges for `importer` with rows derived /// from `import_names`. Names that don't resolve to a `kind = 'module'` /// script in the same app are silently skipped (best-effort dep graph). async fn replace_imports_tx( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, importer: ScriptId, app_id: AppId, import_names: &[String], ) -> Result<(), ScriptRepositoryError> { sqlx::query("DELETE FROM script_imports WHERE importer_script_id = $1") .bind(importer.into_inner()) .execute(&mut **tx) .await?; if import_names.is_empty() { return Ok(()); } // Insert with ON CONFLICT DO NOTHING in case the source declares // `import "x"` twice — the dep graph stores each pair at most once. sqlx::query( "INSERT INTO script_imports (app_id, importer_script_id, imported_script_id) \ SELECT $1, $2, s.id \ FROM scripts s \ WHERE s.app_id = $1 \ AND s.kind = 'module' \ AND s.id <> $2 \ AND s.name = ANY($3) \ ON CONFLICT DO NOTHING", ) .bind(app_id.into_inner()) .bind(importer.into_inner()) .bind(import_names) .execute(&mut **tx) .await?; Ok(()) } /// Row shape mirroring the `scripts` table for sqlx FromRow. #[derive(sqlx::FromRow)] struct ScriptRow { id: uuid::Uuid, app_id: uuid::Uuid, name: String, description: Option, version: i32, source: String, /// v1.1.3: 'endpoint' | 'module'. Stored as TEXT with a CHECK /// constraint so we don't need a Postgres enum (avoiding the /// migration churn of adding values later). kind: String, timeout_seconds: i32, memory_limit_mb: i32, sandbox: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, } impl From for Script { fn from(r: ScriptRow) -> Self { // Tolerate stale rows whose sandbox column predates a future // schema migration: unknown fields are rejected by serde, so // fall back to an empty ScriptSandbox rather than poisoning a // list response. let sandbox = serde_json::from_value(r.sandbox).unwrap_or_default(); // Defensive: if a row's `kind` somehow falls outside the CHECK // constraint, treat it as Endpoint (the safe default — won't // grant a row import-target status it doesn't have). let kind = ScriptKind::from_str(&r.kind).unwrap_or(ScriptKind::Endpoint); Self { id: r.id.into(), app_id: r.app_id.into(), name: r.name, description: r.description, version: r.version, source: r.source, kind, timeout_seconds: u32::try_from(r.timeout_seconds).unwrap_or(30), memory_limit_mb: u32::try_from(r.memory_limit_mb).unwrap_or(256), sandbox, created_at: r.created_at, updated_at: r.updated_at, } } } /// Adapts a `ScriptRepository` into the `ScriptResolver` trait the /// orchestrator depends on. Keeps orchestrator-core unaware of how /// scripts are stored. pub struct RepoResolver { repo: R, } impl RepoResolver { pub fn new(repo: R) -> Self { Self { repo } } } #[async_trait] impl ScriptResolver for RepoResolver { async fn resolve(&self, id: ScriptId) -> Result, ResolverError> { self.repo .get(id) .await .map_err(|e| ResolverError::Backend(e.to_string())) } } // ---------------------------------------------------------------------------- // Execution log repository (read side) // ---------------------------------------------------------------------------- /// Read-side access to the `execution_logs` table. Writes go through /// `PostgresExecutionLogSink` so the read and write paths can diverge /// in cluster mode without disturbing this trait. #[async_trait] pub trait ExecutionLogRepository: Send + Sync { async fn list_for_script( &self, script_id: ScriptId, limit: i64, offset: i64, ) -> Result, ScriptRepositoryError>; } pub struct PostgresExecutionLogRepository { pool: PgPool, } impl PostgresExecutionLogRepository { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl ExecutionLogRepository for PostgresExecutionLogRepository { async fn list_for_script( &self, script_id: ScriptId, limit: i64, offset: i64, ) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ExecutionLogRow>( "SELECT id, app_id, script_id, request_id, \ request_path, request_headers, request_body, \ response_code, response_body, \ logs, duration_ms, status, created_at \ FROM execution_logs \ WHERE script_id = $1 \ ORDER BY created_at DESC \ LIMIT $2 OFFSET $3", ) .bind(script_id.into_inner()) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } } #[derive(sqlx::FromRow)] struct ExecutionLogRow { id: uuid::Uuid, app_id: uuid::Uuid, script_id: uuid::Uuid, request_id: uuid::Uuid, request_path: Option, request_headers: serde_json::Value, request_body: Option, response_code: Option, response_body: Option, logs: serde_json::Value, duration_ms: i32, status: String, created_at: chrono::DateTime, } impl From for ExecutionLog { fn from(r: ExecutionLogRow) -> Self { let headers: BTreeMap = serde_json::from_value(r.request_headers).unwrap_or_default(); let status = match r.status.as_str() { "success" => ExecutionStatus::Success, "timeout" => ExecutionStatus::Timeout, "budget_exceeded" => ExecutionStatus::BudgetExceeded, _ => ExecutionStatus::Error, }; Self { id: r.id, app_id: r.app_id.into(), script_id: r.script_id.into(), request_id: RequestId::from(r.request_id), request_path: r.request_path.unwrap_or_default(), request_headers: headers, request_body: r.request_body.unwrap_or(serde_json::Value::Null), response_code: r.response_code.and_then(|c| u16::try_from(c).ok()), response_body: r.response_body, script_logs: r.logs, duration_ms: u64::try_from(r.duration_ms).unwrap_or(0), status, created_at: r.created_at, } } }