use std::collections::BTreeMap; use async_trait::async_trait; use picloud_orchestrator_core::{ResolverError, ScriptResolver}; use picloud_shared::{ AppId, ExecutionLog, ExecutionStatus, RequestId, Script, ScriptId, ScriptSandbox, }; use sqlx::PgPool; #[derive(Debug, thiserror::Error)] pub enum ScriptRepositoryError { #[error("database error: {0}")] Db(#[from] sqlx::Error), #[error("not found: {0}")] NotFound(ScriptId), #[error("conflict: {0}")] Conflict(String), } /// CRUD over the `scripts` table. #[async_trait] pub trait ScriptRepository: Send + Sync { async fn get(&self, id: ScriptId) -> Result, ScriptRepositoryError>; /// Every script across all apps. Mostly for tests and admin /// "global" views; the dashboard reaches scripts via `list_for_app`. async fn list(&self) -> Result, ScriptRepositoryError>; async fn list_for_app(&self, app_id: AppId) -> Result, ScriptRepositoryError>; async fn create(&self, input: NewScript) -> Result; async fn update( &self, id: ScriptId, patch: ScriptPatch, ) -> Result; async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError>; } /// Inbound shape for create. Defaults match the migration's CHECK /// constraints; the repo enforces them in the DB regardless. #[derive(Debug, Clone)] pub struct NewScript { pub app_id: AppId, pub name: String, pub description: Option, pub source: String, pub timeout_seconds: Option, pub memory_limit_mb: Option, /// Sandbox overrides; `None` means store an empty object (use /// platform defaults at exec time). pub sandbox: Option, } /// Inbound shape for update. `None` fields are left untouched. #[derive(Debug, Clone, Default)] pub struct ScriptPatch { pub name: Option, pub description: Option>, pub source: Option, pub timeout_seconds: Option, pub memory_limit_mb: Option, /// `Some(sandbox)` replaces the stored overrides wholesale (including /// `Some(empty)` to clear them); `None` leaves them untouched. pub sandbox: Option, } pub struct PostgresScriptRepository { pool: PgPool, } impl PostgresScriptRepository { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } #[must_use] pub fn pool(&self) -> &PgPool { &self.pool } } #[async_trait] impl ScriptRepository for PostgresScriptRepository { async fn get(&self, id: ScriptId) -> Result, ScriptRepositoryError> { let row = sqlx::query_as::<_, ScriptRow>( "SELECT id, app_id, name, description, version, source, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \ FROM scripts WHERE id = $1", ) .bind(id.into_inner()) .fetch_optional(&self.pool) .await?; Ok(row.map(Into::into)) } async fn list(&self) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ScriptRow>( "SELECT id, app_id, name, description, version, source, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \ FROM scripts ORDER BY name", ) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } async fn list_for_app(&self, app_id: AppId) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ScriptRow>( "SELECT id, app_id, name, description, version, source, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \ FROM scripts WHERE app_id = $1 ORDER BY name", ) .bind(app_id.into_inner()) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } async fn create(&self, input: NewScript) -> Result { let sandbox_json = serde_json::to_value(input.sandbox.unwrap_or_default()) .unwrap_or_else(|_| serde_json::json!({})); let res = sqlx::query_as::<_, ScriptRow>( "INSERT INTO scripts ( \ app_id, name, description, source, \ timeout_seconds, memory_limit_mb, sandbox \ ) VALUES ($1, $2, $3, $4, COALESCE($5, 30), COALESCE($6, 256), $7) \ RETURNING id, app_id, name, description, version, source, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at", ) .bind(input.app_id.into_inner()) .bind(&input.name) .bind(input.description.as_deref()) .bind(&input.source) .bind(input.timeout_seconds) .bind(input.memory_limit_mb) .bind(sandbox_json) .fetch_one(&self.pool) .await; match res { Ok(row) => Ok(row.into()), Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { Err(ScriptRepositoryError::Conflict(format!( "a script named {:?} already exists in this app", input.name ))) } Err(e) => Err(e.into()), } } async fn update( &self, id: ScriptId, patch: ScriptPatch, ) -> Result { // COALESCE-based partial update: `NULL` parameters leave columns // untouched. Description is double-Optioned so callers can // explicitly set it to NULL (Some(None)) vs leave it alone (None). // Sandbox is replaced wholesale when present; per-field merging // happens in the API layer (clearer semantics for a "PUT a new // sandbox config" call). app_id is immutable — moving a script // to another app is a copy-and-delete, not an in-place edit. let sandbox_json = patch .sandbox .as_ref() .map(|s| serde_json::to_value(s).unwrap_or_else(|_| serde_json::json!({}))); let res = sqlx::query_as::<_, ScriptRow>( "UPDATE scripts SET \ name = COALESCE($2, name), \ description = CASE WHEN $3::bool THEN $4 ELSE description END, \ source = COALESCE($5, source), \ timeout_seconds = COALESCE($6, timeout_seconds), \ memory_limit_mb = COALESCE($7, memory_limit_mb), \ sandbox = COALESCE($8, sandbox), \ version = version + 1, \ updated_at = NOW() \ WHERE id = $1 \ RETURNING id, app_id, name, description, version, source, \ timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at", ) .bind(id.into_inner()) .bind(patch.name.as_deref()) .bind(patch.description.is_some()) .bind(patch.description.as_ref().and_then(|d| d.as_deref())) .bind(patch.source.as_deref()) .bind(patch.timeout_seconds) .bind(patch.memory_limit_mb) .bind(sandbox_json) .fetch_optional(&self.pool) .await; match res { Ok(Some(row)) => Ok(row.into()), Ok(None) => Err(ScriptRepositoryError::NotFound(id)), Err(sqlx::Error::Database(e)) if e.is_unique_violation() => { Err(ScriptRepositoryError::Conflict( "a script with that name already exists in this app".into(), )) } Err(e) => Err(e.into()), } } async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError> { let res = sqlx::query("DELETE FROM scripts WHERE id = $1") .bind(id.into_inner()) .execute(&self.pool) .await?; if res.rows_affected() == 0 { return Err(ScriptRepositoryError::NotFound(id)); } Ok(()) } } /// Row shape mirroring the `scripts` table for sqlx FromRow. #[derive(sqlx::FromRow)] struct ScriptRow { id: uuid::Uuid, app_id: uuid::Uuid, name: String, description: Option, version: i32, source: String, timeout_seconds: i32, memory_limit_mb: i32, sandbox: serde_json::Value, created_at: chrono::DateTime, updated_at: chrono::DateTime, } impl From for Script { fn from(r: ScriptRow) -> Self { // Tolerate stale rows whose sandbox column predates a future // schema migration: unknown fields are rejected by serde, so // fall back to an empty ScriptSandbox rather than poisoning a // list response. let sandbox = serde_json::from_value(r.sandbox).unwrap_or_default(); Self { id: r.id.into(), app_id: r.app_id.into(), name: r.name, description: r.description, version: r.version, source: r.source, timeout_seconds: u32::try_from(r.timeout_seconds).unwrap_or(30), memory_limit_mb: u32::try_from(r.memory_limit_mb).unwrap_or(256), sandbox, created_at: r.created_at, updated_at: r.updated_at, } } } /// Adapts a `ScriptRepository` into the `ScriptResolver` trait the /// orchestrator depends on. Keeps orchestrator-core unaware of how /// scripts are stored. pub struct RepoResolver { repo: R, } impl RepoResolver { pub fn new(repo: R) -> Self { Self { repo } } } #[async_trait] impl ScriptResolver for RepoResolver { async fn resolve(&self, id: ScriptId) -> Result, ResolverError> { self.repo .get(id) .await .map_err(|e| ResolverError::Backend(e.to_string())) } } // ---------------------------------------------------------------------------- // Execution log repository (read side) // ---------------------------------------------------------------------------- /// Read-side access to the `execution_logs` table. Writes go through /// `PostgresExecutionLogSink` so the read and write paths can diverge /// in cluster mode without disturbing this trait. #[async_trait] pub trait ExecutionLogRepository: Send + Sync { async fn list_for_script( &self, script_id: ScriptId, limit: i64, offset: i64, ) -> Result, ScriptRepositoryError>; } pub struct PostgresExecutionLogRepository { pool: PgPool, } impl PostgresExecutionLogRepository { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl ExecutionLogRepository for PostgresExecutionLogRepository { async fn list_for_script( &self, script_id: ScriptId, limit: i64, offset: i64, ) -> Result, ScriptRepositoryError> { let rows = sqlx::query_as::<_, ExecutionLogRow>( "SELECT id, app_id, script_id, request_id, \ request_path, request_headers, request_body, \ response_code, response_body, \ logs, duration_ms, status, created_at \ FROM execution_logs \ WHERE script_id = $1 \ ORDER BY created_at DESC \ LIMIT $2 OFFSET $3", ) .bind(script_id.into_inner()) .bind(limit) .bind(offset) .fetch_all(&self.pool) .await?; Ok(rows.into_iter().map(Into::into).collect()) } } #[derive(sqlx::FromRow)] struct ExecutionLogRow { id: uuid::Uuid, app_id: uuid::Uuid, script_id: uuid::Uuid, request_id: uuid::Uuid, request_path: Option, request_headers: serde_json::Value, request_body: Option, response_code: Option, response_body: Option, logs: serde_json::Value, duration_ms: i32, status: String, created_at: chrono::DateTime, } impl From for ExecutionLog { fn from(r: ExecutionLogRow) -> Self { let headers: BTreeMap = serde_json::from_value(r.request_headers).unwrap_or_default(); let status = match r.status.as_str() { "success" => ExecutionStatus::Success, "timeout" => ExecutionStatus::Timeout, "budget_exceeded" => ExecutionStatus::BudgetExceeded, _ => ExecutionStatus::Error, }; Self { id: r.id, app_id: r.app_id.into(), script_id: r.script_id.into(), request_id: RequestId::from(r.request_id), request_path: r.request_path.unwrap_or_default(), request_headers: headers, request_body: r.request_body.unwrap_or(serde_json::Value::Null), response_code: r.response_code.and_then(|c| u16::try_from(c).ok()), response_body: r.response_body, script_logs: r.logs, duration_ms: u64::try_from(r.duration_ms).unwrap_or(0), status, created_at: r.created_at, } } }