Brings the MVP feature set online: upload a Rhai script, get an HTTP
endpoint that runs it sandboxed in-process, list/update/delete it, and
have invalid sources rejected at upload time. Verified live through
Caddy with a full lifecycle (`create → list → get → execute → update
→ delete`) plus error paths (syntax error, duplicate name, deleted).
Layout — every concern lands behind the trait seam its layer owns, so
cluster-mode in v1.3+ is a swap of two impls, not a rewrite:
* shared::ScriptValidator — manager calls into validation without
a hard dep on executor-core; executor-core impls the trait on
`Engine`. Pinned in shared so neither crate has to know about
the other.
* executor-core::Engine — real Rhai engine: sandbox limits (max
operations / string size / map size / call depth), disabled
`print`, blocked `import` (DummyModuleResolver), `log::trace
/info/warn/error` registered as a static module with shared
log-capture buffer (no `log::debug` because `debug` is a Rhai
reserved keyword — `log::trace` covers the same need).
- `ctx` is pushed as a Scope constant exposing
execution_id, script_id, script_name, request_id,
invocation_type, request.{path,headers,body}.
- Response convention: a Map with `statusCode` is the
structured shape (`{statusCode, headers?, body}`); any
other return value is a 200 with the value as the body.
- Engine::execute is now synchronous (pure compute); the
async wrapper + wall-clock timeout live in
LocalExecutorClient, which spawns_blocking and applies a
300s hard ceiling regardless of per-script config.
- 10 unit tests cover validate, exec, structured response,
ctx exposure, log capture, op-budget enforcement, runtime
errors, blocked imports, JSON round-tripping.
* manager-core::repo — full sqlx CRUD over the `scripts` table,
with proper unique-violation handling for duplicate names.
Embedded migrations via `sqlx::migrate!` (one initial
`0001_init.sql` for pgcrypto + scripts + execution_logs).
* manager-core::api — `admin_router` mounts `/scripts` and
`/scripts/{id}`. Create + Update validate source through the
injected `ScriptValidator` before persistence. Returns proper
422/409/404 status codes via `ApiError::IntoResponse`.
* orchestrator-core::api — `data_plane_router` mounts
`/execute/{id}`: resolves the script through `ScriptResolver`,
constructs the `ExecRequest` from headers+body, awaits
`ExecutorClient::execute(..., timeout)`, translates the
`ExecResponse` to an axum `Response` with header passthrough.
Maps `ExecError` variants to 422/504/502/507.
* picloud all-in-one — opens the pool, runs migrations, builds
one engine, nests both routers under `/api/admin` and `/api`,
enables structured JSON tracing and graceful shutdown on
SIGTERM. Single `PostgresScriptRepository` Arc is shared by
the admin router (writes) and the resolver (reads).
Other changes:
* Workspace axum bump 0.7 → 0.8 for the `{id}` path syntax
matching the route definitions.
* Workspace clippy: allow `needless_pass_by_value` and
`boxed_local` to keep API ergonomics over pedantic noise.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
100 lines
2.9 KiB
Rust
100 lines
2.9 KiB
Rust
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use async_trait::async_trait;
|
|
use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse};
|
|
|
|
/// Maximum wall-clock time we'll wait for a single invocation, regardless
|
|
/// of the per-script `timeout_seconds`. Provides a hard ceiling on
|
|
/// resource usage independent of misconfigured scripts.
|
|
const HARD_TIMEOUT_CAP: Duration = Duration::from_secs(300);
|
|
|
|
/// The seam between the orchestrator and the executor.
|
|
///
|
|
/// Single-node mode plugs in `LocalExecutorClient`, which calls
|
|
/// `executor-core` in-process via `spawn_blocking`. Cluster mode plugs
|
|
/// in `RemoteExecutorClient`, which forwards over HTTP to an executor
|
|
/// node. Everything else in orchestrator-core depends only on this trait.
|
|
#[async_trait]
|
|
pub trait ExecutorClient: Send + Sync {
|
|
async fn execute(
|
|
&self,
|
|
source: &str,
|
|
req: ExecRequest,
|
|
timeout: Duration,
|
|
) -> Result<ExecResponse, ExecError>;
|
|
}
|
|
|
|
/// In-process executor — wraps `executor-core::Engine` directly.
|
|
///
|
|
/// `executor-core::Engine::execute` is synchronous; we offload it to a
|
|
/// blocking thread so it doesn't park a Tokio worker, and apply the
|
|
/// wall-clock timeout here.
|
|
pub struct LocalExecutorClient {
|
|
engine: Arc<Engine>,
|
|
}
|
|
|
|
impl LocalExecutorClient {
|
|
#[must_use]
|
|
pub fn new(engine: Arc<Engine>) -> Self {
|
|
Self { engine }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ExecutorClient for LocalExecutorClient {
|
|
async fn execute(
|
|
&self,
|
|
source: &str,
|
|
req: ExecRequest,
|
|
timeout: Duration,
|
|
) -> Result<ExecResponse, ExecError> {
|
|
let timeout = timeout.min(HARD_TIMEOUT_CAP);
|
|
let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX);
|
|
|
|
let engine = self.engine.clone();
|
|
let source = source.to_string();
|
|
let join = tokio::task::spawn_blocking(move || engine.execute(&source, req));
|
|
|
|
match tokio::time::timeout(timeout, join).await {
|
|
Err(_) => Err(ExecError::Timeout(timeout_secs)),
|
|
Ok(Err(join_err)) => Err(ExecError::Runtime(format!(
|
|
"execution task panicked: {join_err}"
|
|
))),
|
|
Ok(Ok(res)) => res,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Remote executor — forwards to a peer executor node over HTTP.
|
|
///
|
|
/// Skeleton only; fleshed out when cluster mode lands.
|
|
pub struct RemoteExecutorClient {
|
|
_client: reqwest::Client,
|
|
_base_url: String,
|
|
}
|
|
|
|
impl RemoteExecutorClient {
|
|
#[must_use]
|
|
pub fn new(base_url: impl Into<String>) -> Self {
|
|
Self {
|
|
_client: reqwest::Client::new(),
|
|
_base_url: base_url.into(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ExecutorClient for RemoteExecutorClient {
|
|
async fn execute(
|
|
&self,
|
|
_source: &str,
|
|
_req: ExecRequest,
|
|
_timeout: Duration,
|
|
) -> Result<ExecResponse, ExecError> {
|
|
Err(ExecError::Runtime(
|
|
"RemoteExecutorClient not implemented (cluster mode is v1.3+)".into(),
|
|
))
|
|
}
|
|
}
|