feat(v1.1.3-modules): shared types, migrations, engine + resolver scaffold
Lays down the v1.1.3 plumbing:
- `ScriptKind` enum in `picloud-shared` ('endpoint' | 'module').
- `ModuleSource` trait + `ModuleScript` DTO + `NoopModuleSource` in
`picloud-shared`. Resolver lives in `executor-core`; Postgres impl
in `manager-core` (`PostgresModuleSource`).
- `Services::new` grows a fifth `modules: Arc<dyn ModuleSource>` arg.
- `ScriptValidator` returns `ValidatedScript { imports }` so the
manager can populate the dep-graph table on save. New
`validate_module` method on the trait gates module-shape rules.
- `Engine::execute_ast(&Arc<rhai::AST>, req)` lets the orchestrator's
script cache reuse compiled ASTs. `Engine::execute(&str, req)` is
preserved as a convenience that compiles inline. `Engine::compile`
exposes the AST for callers that want to cache.
- `PicloudModuleResolver` replaces `DummyModuleResolver` per-call.
Bridges Rhai's sync `ModuleResolver::resolve` to async
`ModuleSource::lookup` via `Handle::block_on`. Enforces:
- cross-app isolation (resolver captures `Arc<SdkCallCx>`),
- circular import detection (in-progress stack on the resolver),
- import depth limit (default 8 via
`Limits::module_import_depth_max`).
- Module-shape validation walks `ast.statements()` via `rhai/internals`
and accepts only `Var { CONSTANT }`, `Import`, and `Noop`. The
manager admin endpoint runs `validate_module` at save (primary
gate); resolver re-runs it at load (defense in depth).
- LRU cache `(AppId, name) -> (updated_at, Arc<Module>)` owned by
`Engine`. Size from `PICLOUD_MODULE_CACHE_SIZE` (default 512).
- Migration `0015_scripts_kind.sql` adds `scripts.kind` + composite
index + module-name shape CHECK.
- Migration `0016_script_imports.sql` adds the dep-graph table with
FK CASCADE on both columns.
- Repo: `kind` threaded through SELECT/INSERT/UPDATE. New
`count_routes_for_script` / `count_triggers_for_script` /
`list_imports` methods. `create`/`update` open a transaction and
call `replace_imports_tx` to populate the dep-graph.
- Admin endpoint: accepts `kind`; rejects reserved module names;
rejects `endpoint → module` transitions when routes / triggers
exist.
- SDK_VERSION 1.3 → 1.4.
Workspace builds; full test suite (~440 tests) green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,7 +3,8 @@ use std::collections::BTreeMap;
|
||||
use async_trait::async_trait;
|
||||
use picloud_orchestrator_core::{ResolverError, ScriptResolver};
|
||||
use picloud_shared::{
|
||||
AdminUserId, AppId, ExecutionLog, ExecutionStatus, RequestId, Script, ScriptId, ScriptSandbox,
|
||||
AdminUserId, AppId, ExecutionLog, ExecutionStatus, RequestId, Script, ScriptId, ScriptKind,
|
||||
ScriptSandbox,
|
||||
};
|
||||
use sqlx::PgPool;
|
||||
|
||||
@@ -42,6 +43,29 @@ pub trait ScriptRepository: Send + Sync {
|
||||
patch: ScriptPatch,
|
||||
) -> Result<Script, ScriptRepositoryError>;
|
||||
async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError>;
|
||||
|
||||
/// v1.1.3: how many routes reference this script. Used by the
|
||||
/// API layer to refuse `endpoint → module` kind changes when the
|
||||
/// script is still bound to user-facing entry points.
|
||||
async fn count_routes_for_script(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<i64, ScriptRepositoryError>;
|
||||
|
||||
/// v1.1.3: how many triggers (kv / docs / dead-letter) target
|
||||
/// this script. Same purpose as `count_routes_for_script`.
|
||||
async fn count_triggers_for_script(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<i64, ScriptRepositoryError>;
|
||||
|
||||
/// v1.1.3: list module dependencies of this script — the rows in
|
||||
/// `script_imports` where `importer_script_id = script_id`. Used
|
||||
/// by tests and (eventually) a dashboard "Imports" panel.
|
||||
async fn list_imports(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<Vec<Script>, ScriptRepositoryError>;
|
||||
}
|
||||
|
||||
/// Inbound shape for create. Defaults match the migration's CHECK
|
||||
@@ -52,11 +76,19 @@ pub struct NewScript {
|
||||
pub name: String,
|
||||
pub description: Option<String>,
|
||||
pub source: String,
|
||||
/// Defaults to `Endpoint` if absent. `Module` scripts cannot be
|
||||
/// bound to routes or used as trigger targets.
|
||||
pub kind: ScriptKind,
|
||||
pub timeout_seconds: Option<i32>,
|
||||
pub memory_limit_mb: Option<i32>,
|
||||
/// Sandbox overrides; `None` means store an empty object (use
|
||||
/// platform defaults at exec time).
|
||||
pub sandbox: Option<ScriptSandbox>,
|
||||
/// v1.1.3: literal-path `import "<name>"` declarations extracted
|
||||
/// from the source. The repo writes these into `script_imports`
|
||||
/// transactionally with the script row. Empty when validation
|
||||
/// found no imports (the common case for endpoints today).
|
||||
pub imports: Vec<String>,
|
||||
}
|
||||
|
||||
/// Inbound shape for update. `None` fields are left untouched.
|
||||
@@ -70,6 +102,15 @@ pub struct ScriptPatch {
|
||||
/// `Some(sandbox)` replaces the stored overrides wholesale (including
|
||||
/// `Some(empty)` to clear them); `None` leaves them untouched.
|
||||
pub sandbox: Option<ScriptSandbox>,
|
||||
/// `Some(new_kind)` changes the script's role; the API layer
|
||||
/// rejects unsafe transitions (e.g. endpoint→module when routes
|
||||
/// or triggers reference the script).
|
||||
pub kind: Option<ScriptKind>,
|
||||
/// v1.1.3: when `source` is also `Some`, the repo replaces the
|
||||
/// `script_imports` edges for this script with these names.
|
||||
/// `None` keeps the existing edges untouched (a name/description
|
||||
/// edit alone shouldn't touch the dep graph).
|
||||
pub imports: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
pub struct PostgresScriptRepository {
|
||||
@@ -88,14 +129,18 @@ impl PostgresScriptRepository {
|
||||
}
|
||||
}
|
||||
|
||||
/// Columns selected from `scripts` everywhere — kept in one constant so
|
||||
/// adding `kind` (v1.1.3) and future columns can't accidentally skip
|
||||
/// one query.
|
||||
const SCRIPT_SELECT_COLS: &str = "id, app_id, name, description, version, source, kind, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at";
|
||||
|
||||
#[async_trait]
|
||||
impl ScriptRepository for PostgresScriptRepository {
|
||||
async fn get(&self, id: ScriptId) -> Result<Option<Script>, ScriptRepositoryError> {
|
||||
let row = sqlx::query_as::<_, ScriptRow>(
|
||||
"SELECT id, app_id, name, description, version, source, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \
|
||||
FROM scripts WHERE id = $1",
|
||||
)
|
||||
let row = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"SELECT {SCRIPT_SELECT_COLS} FROM scripts WHERE id = $1"
|
||||
))
|
||||
.bind(id.into_inner())
|
||||
.fetch_optional(&self.pool)
|
||||
.await?;
|
||||
@@ -103,22 +148,18 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
}
|
||||
|
||||
async fn list(&self) -> Result<Vec<Script>, ScriptRepositoryError> {
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(
|
||||
"SELECT id, app_id, name, description, version, source, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \
|
||||
FROM scripts ORDER BY name",
|
||||
)
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"SELECT {SCRIPT_SELECT_COLS} FROM scripts ORDER BY name"
|
||||
))
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Script>, ScriptRepositoryError> {
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(
|
||||
"SELECT id, app_id, name, description, version, source, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at \
|
||||
FROM scripts WHERE app_id = $1 ORDER BY name",
|
||||
)
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"SELECT {SCRIPT_SELECT_COLS} FROM scripts WHERE app_id = $1 ORDER BY name"
|
||||
))
|
||||
.bind(app_id.into_inner())
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
@@ -129,14 +170,17 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
&self,
|
||||
user_id: AdminUserId,
|
||||
) -> Result<Vec<Script>, ScriptRepositoryError> {
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(
|
||||
"SELECT s.id, s.app_id, s.name, s.description, s.version, s.source, \
|
||||
s.timeout_seconds, s.memory_limit_mb, s.sandbox, s.created_at, s.updated_at \
|
||||
FROM scripts s \
|
||||
let cols = SCRIPT_SELECT_COLS
|
||||
.split(", ")
|
||||
.map(|c| format!("s.{c}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"SELECT {cols} FROM scripts s \
|
||||
JOIN app_members m ON m.app_id = s.app_id \
|
||||
WHERE m.user_id = $1 \
|
||||
ORDER BY s.name",
|
||||
)
|
||||
ORDER BY s.name"
|
||||
))
|
||||
.bind(user_id.into_inner())
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
@@ -146,34 +190,42 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
async fn create(&self, input: NewScript) -> Result<Script, ScriptRepositoryError> {
|
||||
let sandbox_json = serde_json::to_value(input.sandbox.unwrap_or_default())
|
||||
.unwrap_or_else(|_| serde_json::json!({}));
|
||||
let res = sqlx::query_as::<_, ScriptRow>(
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let res = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"INSERT INTO scripts ( \
|
||||
app_id, name, description, source, \
|
||||
app_id, name, description, source, kind, \
|
||||
timeout_seconds, memory_limit_mb, sandbox \
|
||||
) VALUES ($1, $2, $3, $4, COALESCE($5, 30), COALESCE($6, 256), $7) \
|
||||
RETURNING id, app_id, name, description, version, source, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at",
|
||||
)
|
||||
) VALUES ($1, $2, $3, $4, $5, COALESCE($6, 30), COALESCE($7, 256), $8) \
|
||||
RETURNING {SCRIPT_SELECT_COLS}"
|
||||
))
|
||||
.bind(input.app_id.into_inner())
|
||||
.bind(&input.name)
|
||||
.bind(input.description.as_deref())
|
||||
.bind(&input.source)
|
||||
.bind(input.kind.as_str())
|
||||
.bind(input.timeout_seconds)
|
||||
.bind(input.memory_limit_mb)
|
||||
.bind(sandbox_json)
|
||||
.fetch_one(&self.pool)
|
||||
.fetch_one(&mut *tx)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(row) => Ok(row.into()),
|
||||
let script: Script = match res {
|
||||
Ok(row) => row.into(),
|
||||
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
|
||||
Err(ScriptRepositoryError::Conflict(format!(
|
||||
return Err(ScriptRepositoryError::Conflict(format!(
|
||||
"a script named {:?} already exists in this app",
|
||||
input.name
|
||||
)))
|
||||
)));
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Dep-graph: write any literal-path imports declared in the
|
||||
// source. Unresolved names (the referenced module doesn't
|
||||
// exist yet) are silently skipped — best-effort.
|
||||
replace_imports_tx(&mut tx, script.id, script.app_id, &input.imports).await?;
|
||||
tx.commit().await?;
|
||||
Ok(script)
|
||||
}
|
||||
|
||||
async fn update(
|
||||
@@ -192,7 +244,8 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
.sandbox
|
||||
.as_ref()
|
||||
.map(|s| serde_json::to_value(s).unwrap_or_else(|_| serde_json::json!({})));
|
||||
let res = sqlx::query_as::<_, ScriptRow>(
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let res = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"UPDATE scripts SET \
|
||||
name = COALESCE($2, name), \
|
||||
description = CASE WHEN $3::bool THEN $4 ELSE description END, \
|
||||
@@ -200,12 +253,12 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
timeout_seconds = COALESCE($6, timeout_seconds), \
|
||||
memory_limit_mb = COALESCE($7, memory_limit_mb), \
|
||||
sandbox = COALESCE($8, sandbox), \
|
||||
kind = COALESCE($9, kind), \
|
||||
version = version + 1, \
|
||||
updated_at = NOW() \
|
||||
WHERE id = $1 \
|
||||
RETURNING id, app_id, name, description, version, source, \
|
||||
timeout_seconds, memory_limit_mb, sandbox, created_at, updated_at",
|
||||
)
|
||||
RETURNING {SCRIPT_SELECT_COLS}"
|
||||
))
|
||||
.bind(id.into_inner())
|
||||
.bind(patch.name.as_deref())
|
||||
.bind(patch.description.is_some())
|
||||
@@ -214,19 +267,30 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
.bind(patch.timeout_seconds)
|
||||
.bind(patch.memory_limit_mb)
|
||||
.bind(sandbox_json)
|
||||
.fetch_optional(&self.pool)
|
||||
.bind(patch.kind.map(|k| k.as_str()))
|
||||
.fetch_optional(&mut *tx)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(Some(row)) => Ok(row.into()),
|
||||
Ok(None) => Err(ScriptRepositoryError::NotFound(id)),
|
||||
let script: Script = match res {
|
||||
Ok(Some(row)) => row.into(),
|
||||
Ok(None) => return Err(ScriptRepositoryError::NotFound(id)),
|
||||
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
|
||||
Err(ScriptRepositoryError::Conflict(
|
||||
return Err(ScriptRepositoryError::Conflict(
|
||||
"a script with that name already exists in this app".into(),
|
||||
))
|
||||
));
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Replace imports only when the caller has a fresh list (i.e.
|
||||
// the source actually changed and the validator re-extracted
|
||||
// imports). A name-only or description-only edit leaves the
|
||||
// dep graph alone.
|
||||
if let Some(imports) = patch.imports.as_deref() {
|
||||
replace_imports_tx(&mut tx, script.id, script.app_id, imports).await?;
|
||||
}
|
||||
tx.commit().await?;
|
||||
Ok(script)
|
||||
}
|
||||
|
||||
async fn delete(&self, id: ScriptId) -> Result<(), ScriptRepositoryError> {
|
||||
@@ -239,6 +303,85 @@ impl ScriptRepository for PostgresScriptRepository {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn count_routes_for_script(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<i64, ScriptRepositoryError> {
|
||||
let n: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM routes WHERE script_id = $1")
|
||||
.bind(script_id.into_inner())
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(n.0)
|
||||
}
|
||||
|
||||
async fn count_triggers_for_script(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<i64, ScriptRepositoryError> {
|
||||
let n: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM triggers WHERE script_id = $1")
|
||||
.bind(script_id.into_inner())
|
||||
.fetch_one(&self.pool)
|
||||
.await?;
|
||||
Ok(n.0)
|
||||
}
|
||||
|
||||
async fn list_imports(
|
||||
&self,
|
||||
script_id: ScriptId,
|
||||
) -> Result<Vec<Script>, ScriptRepositoryError> {
|
||||
let cols = SCRIPT_SELECT_COLS
|
||||
.split(", ")
|
||||
.map(|c| format!("s.{c}"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let rows = sqlx::query_as::<_, ScriptRow>(&format!(
|
||||
"SELECT {cols} FROM scripts s \
|
||||
JOIN script_imports i ON i.imported_script_id = s.id \
|
||||
WHERE i.importer_script_id = $1 \
|
||||
ORDER BY s.name"
|
||||
))
|
||||
.bind(script_id.into_inner())
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().map(Into::into).collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// Replace the `script_imports` edges for `importer` with rows derived
|
||||
/// from `import_names`. Names that don't resolve to a `kind = 'module'`
|
||||
/// script in the same app are silently skipped (best-effort dep graph).
|
||||
async fn replace_imports_tx(
|
||||
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
|
||||
importer: ScriptId,
|
||||
app_id: AppId,
|
||||
import_names: &[String],
|
||||
) -> Result<(), ScriptRepositoryError> {
|
||||
sqlx::query("DELETE FROM script_imports WHERE importer_script_id = $1")
|
||||
.bind(importer.into_inner())
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
if import_names.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
// Insert with ON CONFLICT DO NOTHING in case the source declares
|
||||
// `import "x"` twice — the dep graph stores each pair at most once.
|
||||
sqlx::query(
|
||||
"INSERT INTO script_imports (app_id, importer_script_id, imported_script_id) \
|
||||
SELECT $1, $2, s.id \
|
||||
FROM scripts s \
|
||||
WHERE s.app_id = $1 \
|
||||
AND s.kind = 'module' \
|
||||
AND s.id <> $2 \
|
||||
AND s.name = ANY($3) \
|
||||
ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(importer.into_inner())
|
||||
.bind(import_names)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Row shape mirroring the `scripts` table for sqlx FromRow.
|
||||
@@ -250,6 +393,10 @@ struct ScriptRow {
|
||||
description: Option<String>,
|
||||
version: i32,
|
||||
source: String,
|
||||
/// v1.1.3: 'endpoint' | 'module'. Stored as TEXT with a CHECK
|
||||
/// constraint so we don't need a Postgres enum (avoiding the
|
||||
/// migration churn of adding values later).
|
||||
kind: String,
|
||||
timeout_seconds: i32,
|
||||
memory_limit_mb: i32,
|
||||
sandbox: serde_json::Value,
|
||||
@@ -264,6 +411,10 @@ impl From<ScriptRow> for Script {
|
||||
// fall back to an empty ScriptSandbox rather than poisoning a
|
||||
// list response.
|
||||
let sandbox = serde_json::from_value(r.sandbox).unwrap_or_default();
|
||||
// Defensive: if a row's `kind` somehow falls outside the CHECK
|
||||
// constraint, treat it as Endpoint (the safe default — won't
|
||||
// grant a row import-target status it doesn't have).
|
||||
let kind = ScriptKind::from_str(&r.kind).unwrap_or(ScriptKind::Endpoint);
|
||||
Self {
|
||||
id: r.id.into(),
|
||||
app_id: r.app_id.into(),
|
||||
@@ -271,6 +422,7 @@ impl From<ScriptRow> for Script {
|
||||
description: r.description,
|
||||
version: r.version,
|
||||
source: r.source,
|
||||
kind,
|
||||
timeout_seconds: u32::try_from(r.timeout_seconds).unwrap_or(30),
|
||||
memory_limit_mb: u32::try_from(r.memory_limit_mb).unwrap_or(256),
|
||||
sandbox,
|
||||
|
||||
Reference in New Issue
Block a user