feat: persist execution logs + dashboard detail view + integration tests

Three threads landing together because they share a public surface
(the new execution_log shape) and verifying any one in isolation
would mean re-doing the work later.

== (A) execution log persistence ==

  * shared::ExecutionLog + ExecutionStatus carry the audit-trail
    shape that flows from the orchestrator through the sink and
    back out via the manager's logs endpoint.

  * shared::ExecutionLogSink trait — abstraction the orchestrator
    writes through. In single-process MVP mode the manager's
    Postgres-backed impl is plugged in directly; in cluster mode
    (v1.3+) the orchestrator's impl will post over HTTP to the
    manager. Trait lives in `shared` so neither *-core crate has
    to know about the other.

  * manager-core::PostgresExecutionLogSink writes to the
    execution_logs table (already in the initial migration);
    PostgresExecutionLogRepository reads them back, paginated.
    AdminState now carries both a script repo and a log repo, so
    `admin_router` exposes `GET /scripts/{id}/logs?limit=&offset=`
    capped at 200 rows per page to keep the dashboard responsive.

  * orchestrator-core::DataPlaneState gains `log_sink`. The
    execute handler builds an ExecutionLog on every outcome —
    success, error, timeout, budget-exceeded — and awaits the
    sink. Sink failures are logged at warn and DO NOT mask the
    user-facing result, since "we couldn't write the audit row"
    is a separate concern from "the script ran".

  * picloud binary refactored into a lib (`build_app(pool)` is
    the seam) + thin bin shell. Same Postgres pool backs the
    script repo, the log repo, and the sink — no double pool.

== (B) dashboard ==

  * Typed API client extended with `scripts.logs(id, opts)`,
    `scripts.update/remove`, and `execute(id, body, headers)`.
    Plain `fetch` wrapper now surfaces server-side error
    messages via a typed ApiError so the UI can render them.

  * `/` — create-script form now actually creates; on success
    the list reloads. List entries link to detail.

  * `/scripts/[id]` — new detail route: source editor with save
    (calls update, version bumps); Test invoke panel that sends
    arbitrary JSON body + headers to /api/execute and shows the
    response; Recent executions panel reading from /logs with
    expandable per-row request/response/script-log views.
    Delete button with confirm. SPA-routed; Caddy serves
    `build/` with the same index.html fallback.

== (C) integration tests ==

  * crates/picloud/tests/api.rs — 14 sqlx::test cases driving
    `build_app` through an axum_test::TestServer against a fresh
    Postgres DB per test. Covers: health, full script CRUD,
    duplicate-name conflict, invalid-source rejection on both
    create and update, execute echoing the body, status+header
    passthrough, 404 on missing scripts, error-path executions
    landing in the audit log with the right status.

  * Tests are `#[ignore]` by default so plain `cargo test
    --workspace` stays green without infrastructure. Opt-in via:
    `docker compose up -d postgres && \
       DATABASE_URL=postgres://picloud:picloud@127.0.0.1:15432/picloud \
       cargo test -p picloud --test api -- --include-ignored`

Verified live through Caddy on :8000: three logged invocations
land in the logs endpoint with the right structured `data` on
each `log::info`/`log::warn`, error-path executions are still
captured with status=error, dashboard list + SPA detail route
both reachable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-23 00:16:32 +02:00
parent 4f044e7b81
commit 777f4af628
18 changed files with 1750 additions and 178 deletions

View File

@@ -11,23 +11,27 @@ use axum::{
routing::get,
Json, Router,
};
use picloud_shared::{Script, ScriptId, ScriptValidator, ValidationError};
use picloud_shared::{ExecutionLog, Script, ScriptId, ScriptValidator, ValidationError};
use serde::Deserialize;
use crate::repo::{NewScript, ScriptPatch, ScriptRepository, ScriptRepositoryError};
use crate::repo::{
ExecutionLogRepository, NewScript, ScriptPatch, ScriptRepository, ScriptRepositoryError,
};
/// State shared by control-plane handlers. Separates concerns so the
/// manager can validate at upload time without depending on the
/// concrete executor-core types.
pub struct AdminState<R> {
pub struct AdminState<R, L> {
pub repo: Arc<R>,
pub logs: Arc<L>,
pub validator: Arc<dyn ScriptValidator>,
}
impl<R> Clone for AdminState<R> {
impl<R, L> Clone for AdminState<R, L> {
fn clone(&self) -> Self {
Self {
repo: self.repo.clone(),
logs: self.logs.clone(),
validator: self.validator.clone(),
}
}
@@ -35,15 +39,23 @@ impl<R> Clone for AdminState<R> {
/// Build the admin router. The caller (binary) chooses where to mount
/// it (typically `Router::new().nest("/api/admin", admin_router(state))`).
pub fn admin_router<R: ScriptRepository + 'static>(state: AdminState<R>) -> Router {
pub fn admin_router<R, L>(state: AdminState<R, L>) -> Router
where
R: ScriptRepository + 'static,
L: ExecutionLogRepository + 'static,
{
Router::new()
.route("/scripts", get(list_scripts::<R>).post(create_script::<R>))
.route(
"/scripts",
get(list_scripts::<R, L>).post(create_script::<R, L>),
)
.route(
"/scripts/{id}",
get(get_script::<R>)
.put(update_script::<R>)
.delete(delete_script::<R>),
get(get_script::<R, L>)
.put(update_script::<R, L>)
.delete(delete_script::<R, L>),
)
.route("/scripts/{id}/logs", get(list_logs::<R, L>))
.with_state(state)
}
@@ -85,14 +97,14 @@ where
// Handlers
// ----------------------------------------------------------------------------
async fn list_scripts<R: ScriptRepository>(
State(state): State<AdminState<R>>,
async fn list_scripts<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
) -> Result<Json<Vec<Script>>, ApiError> {
Ok(Json(state.repo.list().await?))
}
async fn get_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
async fn get_script<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
Path(id): Path<ScriptId>,
) -> Result<Json<Script>, ApiError> {
state
@@ -103,8 +115,8 @@ async fn get_script<R: ScriptRepository>(
.ok_or(ApiError::NotFound(id))
}
async fn create_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
async fn create_script<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
Json(input): Json<CreateScriptRequest>,
) -> Result<(StatusCode, Json<Script>), ApiError> {
state.validator.validate(&input.source)?;
@@ -121,8 +133,8 @@ async fn create_script<R: ScriptRepository>(
Ok((StatusCode::CREATED, Json(created)))
}
async fn update_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
async fn update_script<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
Path(id): Path<ScriptId>,
Json(input): Json<UpdateScriptRequest>,
) -> Result<Json<Script>, ApiError> {
@@ -145,14 +157,39 @@ async fn update_script<R: ScriptRepository>(
Ok(Json(updated))
}
async fn delete_script<R: ScriptRepository>(
State(state): State<AdminState<R>>,
async fn delete_script<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
Path(id): Path<ScriptId>,
) -> Result<StatusCode, ApiError> {
state.repo.delete(id).await?;
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug, Deserialize)]
pub struct LogsQuery {
#[serde(default = "default_limit")]
pub limit: i64,
#[serde(default)]
pub offset: i64,
}
const fn default_limit() -> i64 {
50
}
async fn list_logs<R: ScriptRepository, L: ExecutionLogRepository>(
State(state): State<AdminState<R, L>>,
Path(id): Path<ScriptId>,
axum::extract::Query(q): axum::extract::Query<LogsQuery>,
) -> Result<Json<Vec<ExecutionLog>>, ApiError> {
// Cap to keep the dashboard responsive; the data plane writes are
// unbounded over time so a paged read is the only sane default.
let limit = q.limit.clamp(1, 200);
let offset = q.offset.max(0);
let logs = state.logs.list_for_script(id, limit, offset).await?;
Ok(Json(logs))
}
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------

View File

@@ -5,12 +5,14 @@
//! manager will publish change events.
pub mod api;
pub mod log_sink;
pub mod migrations;
pub mod repo;
pub mod scheduler;
pub use api::{admin_router, AdminState};
pub use log_sink::PostgresExecutionLogSink;
pub use repo::{
NewScript, PostgresScriptRepository, RepoResolver, ScriptPatch, ScriptRepository,
ScriptRepositoryError,
ExecutionLogRepository, NewScript, PostgresExecutionLogRepository, PostgresScriptRepository,
RepoResolver, ScriptPatch, ScriptRepository, ScriptRepositoryError,
};

View File

@@ -0,0 +1,57 @@
use async_trait::async_trait;
use picloud_shared::{ExecutionLog, ExecutionLogSink, LogSinkError};
use sqlx::PgPool;
/// Persists `ExecutionLog` rows to the `execution_logs` table.
///
/// In cluster mode this impl lives in the manager and is reachable
/// from orchestrator nodes via an HTTP wrapper; in single-process MVP
/// mode the orchestrator's `DataPlaneState` holds it directly.
pub struct PostgresExecutionLogSink {
pool: PgPool,
}
impl PostgresExecutionLogSink {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl ExecutionLogSink for PostgresExecutionLogSink {
async fn record(&self, log: ExecutionLog) -> Result<(), LogSinkError> {
let headers = serde_json::to_value(&log.request_headers)
.map_err(|e| LogSinkError::Backend(format!("encode headers: {e}")))?;
let response_code = log.response_code.map(i32::from);
let duration_ms = i32::try_from(log.duration_ms).unwrap_or(i32::MAX);
sqlx::query(
"INSERT INTO execution_logs ( \
id, script_id, request_id, \
request_path, request_headers, request_body, \
response_code, response_body, \
logs, duration_ms, status, created_at \
) VALUES ( \
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12 \
)",
)
.bind(log.id)
.bind(log.script_id.into_inner())
.bind(log.request_id.into_inner())
.bind(&log.request_path)
.bind(headers)
.bind(&log.request_body)
.bind(response_code)
.bind(&log.response_body)
.bind(&log.script_logs)
.bind(duration_ms)
.bind(log.status.as_str())
.bind(log.created_at)
.execute(&self.pool)
.await
.map_err(|e| LogSinkError::Backend(e.to_string()))?;
Ok(())
}
}

View File

@@ -1,6 +1,8 @@
use std::collections::BTreeMap;
use async_trait::async_trait;
use picloud_orchestrator_core::{ResolverError, ScriptResolver};
use picloud_shared::{Script, ScriptId};
use picloud_shared::{ExecutionLog, ExecutionStatus, RequestId, Script, ScriptId};
use sqlx::PgPool;
#[derive(Debug, thiserror::Error)]
@@ -217,3 +219,102 @@ impl<R: ScriptRepository> ScriptResolver for RepoResolver<R> {
.map_err(|e| ResolverError::Backend(e.to_string()))
}
}
// ----------------------------------------------------------------------------
// Execution log repository (read side)
// ----------------------------------------------------------------------------
/// Read-side access to the `execution_logs` table. Writes go through
/// `PostgresExecutionLogSink` so the read and write paths can diverge
/// in cluster mode without disturbing this trait.
#[async_trait]
pub trait ExecutionLogRepository: Send + Sync {
async fn list_for_script(
&self,
script_id: ScriptId,
limit: i64,
offset: i64,
) -> Result<Vec<ExecutionLog>, ScriptRepositoryError>;
}
pub struct PostgresExecutionLogRepository {
pool: PgPool,
}
impl PostgresExecutionLogRepository {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl ExecutionLogRepository for PostgresExecutionLogRepository {
async fn list_for_script(
&self,
script_id: ScriptId,
limit: i64,
offset: i64,
) -> Result<Vec<ExecutionLog>, ScriptRepositoryError> {
let rows = sqlx::query_as::<_, ExecutionLogRow>(
"SELECT id, script_id, request_id, \
request_path, request_headers, request_body, \
response_code, response_body, \
logs, duration_ms, status, created_at \
FROM execution_logs \
WHERE script_id = $1 \
ORDER BY created_at DESC \
LIMIT $2 OFFSET $3",
)
.bind(script_id.into_inner())
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
}
#[derive(sqlx::FromRow)]
struct ExecutionLogRow {
id: uuid::Uuid,
script_id: uuid::Uuid,
request_id: uuid::Uuid,
request_path: Option<String>,
request_headers: serde_json::Value,
request_body: Option<serde_json::Value>,
response_code: Option<i32>,
response_body: Option<serde_json::Value>,
logs: serde_json::Value,
duration_ms: i32,
status: String,
created_at: chrono::DateTime<chrono::Utc>,
}
impl From<ExecutionLogRow> for ExecutionLog {
fn from(r: ExecutionLogRow) -> Self {
let headers: BTreeMap<String, String> =
serde_json::from_value(r.request_headers).unwrap_or_default();
let status = match r.status.as_str() {
"success" => ExecutionStatus::Success,
"timeout" => ExecutionStatus::Timeout,
"budget_exceeded" => ExecutionStatus::BudgetExceeded,
_ => ExecutionStatus::Error,
};
Self {
id: r.id,
script_id: r.script_id.into(),
request_id: RequestId::from(r.request_id),
request_path: r.request_path.unwrap_or_default(),
request_headers: headers,
request_body: r.request_body.unwrap_or(serde_json::Value::Null),
response_code: r.response_code.and_then(|c| u16::try_from(c).ok()),
response_body: r.response_body,
script_logs: r.logs,
duration_ms: u64::try_from(r.duration_ms).unwrap_or(0),
status,
created_at: r.created_at,
}
}
}

View File

@@ -14,20 +14,22 @@ use axum::{
routing::post,
Json, Router,
};
use picloud_executor_core::{ExecError, ExecRequest, InvocationType};
use picloud_shared::{ExecutionId, RequestId, ScriptId};
use chrono::Utc;
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
use picloud_shared::{
ExecutionId, ExecutionLog, ExecutionLogSink, ExecutionStatus, RequestId, ScriptId,
};
use serde_json::Value as Json_;
use uuid::Uuid;
use crate::client::ExecutorClient;
use crate::resolver::{ResolverError, ScriptResolver};
/// State shared by data-plane handlers.
///
/// Both fields are `Arc` because handlers run concurrently; the
/// underlying impls are `Send + Sync` (enforced by their traits).
pub struct DataPlaneState<E, R> {
pub executor: Arc<E>,
pub resolver: Arc<R>,
pub log_sink: Arc<dyn ExecutionLogSink>,
}
impl<E, R> Clone for DataPlaneState<E, R> {
@@ -35,6 +37,7 @@ impl<E, R> Clone for DataPlaneState<E, R> {
Self {
executor: self.executor.clone(),
resolver: self.resolver.clone(),
log_sink: self.log_sink.clone(),
}
}
}
@@ -71,11 +74,35 @@ where
.ok_or(ApiError::NotFound(id))?;
let req = build_exec_request(id, &script.name, &headers, &body)?;
let request_id = req.request_id;
let request_path = req.path.clone();
let request_headers = req.headers.clone();
let request_body = req.body.clone();
let timeout = Duration::from_secs(u64::from(script.timeout_seconds));
let resp = state.executor.execute(&script.source, req, timeout).await?;
let started = Utc::now();
let outcome = state.executor.execute(&script.source, req, timeout).await;
let finished = Utc::now();
Ok(exec_response_to_http(resp))
// Build and dispatch the audit log regardless of outcome. We await
// the sink — recording the trail is part of correctness for an
// audit-visible platform — but a sink failure must not mask the
// user-facing result, so we only log a warning if it fails.
let log = build_execution_log(
id,
request_id,
request_path,
request_headers,
request_body,
&outcome,
started,
finished,
);
if let Err(e) = state.log_sink.record(log).await {
tracing::warn!(error = %e, script_id = %id, "failed to persist execution log");
}
Ok(exec_response_to_http(outcome?))
}
// ----------------------------------------------------------------------------
@@ -114,7 +141,7 @@ fn build_exec_request(
})
}
fn exec_response_to_http(resp: picloud_executor_core::ExecResponse) -> Response {
fn exec_response_to_http(resp: ExecResponse) -> Response {
let status =
StatusCode::from_u16(resp.status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
@@ -124,7 +151,6 @@ fn exec_response_to_http(resp: picloud_executor_core::ExecResponse) -> Response
http_headers.insert(name, value);
}
}
// Default content type to JSON; the script can override via `headers`.
http_headers
.entry(axum::http::header::CONTENT_TYPE)
.or_insert_with(|| HeaderValue::from_static("application/json"));
@@ -132,6 +158,66 @@ fn exec_response_to_http(resp: picloud_executor_core::ExecResponse) -> Response
(status, http_headers, Json(resp.body)).into_response()
}
#[allow(clippy::too_many_arguments)]
fn build_execution_log(
script_id: ScriptId,
request_id: RequestId,
request_path: String,
request_headers: BTreeMap<String, String>,
request_body: Json_,
outcome: &Result<ExecResponse, ExecError>,
started: chrono::DateTime<Utc>,
finished: chrono::DateTime<Utc>,
) -> ExecutionLog {
let duration_ms = u64::try_from(
finished
.signed_duration_since(started)
.num_milliseconds()
.max(0),
)
.unwrap_or(0);
let (status, response_code, response_body, script_logs) = match outcome {
Ok(resp) => {
let logs = serde_json::to_value(&resp.logs).unwrap_or(Json_::Array(vec![]));
(
ExecutionStatus::Success,
Some(resp.status_code),
Some(resp.body.clone()),
logs,
)
}
Err(e) => {
let status = match e {
ExecError::Timeout(_) => ExecutionStatus::Timeout,
ExecError::OperationBudgetExceeded => ExecutionStatus::BudgetExceeded,
_ => ExecutionStatus::Error,
};
(
status,
None,
Some(serde_json::json!({ "error": e.to_string() })),
Json_::Array(vec![]),
)
}
};
ExecutionLog {
id: Uuid::new_v4(),
script_id,
request_id,
request_path,
request_headers,
request_body,
response_code,
response_body,
script_logs,
duration_ms,
status,
created_at: started,
}
}
// ----------------------------------------------------------------------------
// Errors
// ----------------------------------------------------------------------------

View File

@@ -8,6 +8,9 @@ license.workspace = true
[lints]
workspace = true
[lib]
path = "src/lib.rs"
[[bin]]
name = "picloud"
path = "src/main.rs"
@@ -31,3 +34,8 @@ thiserror.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
figment.workspace = true
[dev-dependencies]
axum-test = "17"
serde.workspace = true
serde_json.workspace = true

114
crates/picloud/src/lib.rs Normal file
View File

@@ -0,0 +1,114 @@
//! Library half of the picloud all-in-one. `main.rs` is a thin wrapper
//! that opens the pool, runs migrations, calls `build_app`, and binds
//! the listener. Tests use the same `build_app` against an
//! ephemeral test database.
use std::sync::Arc;
use std::time::Duration;
use axum::{routing::get, Router};
use picloud_executor_core::{Engine, Limits};
use picloud_manager_core::{
admin_router, AdminState, PostgresExecutionLogRepository, PostgresExecutionLogSink,
PostgresScriptRepository, RepoResolver,
};
use picloud_orchestrator_core::{data_plane_router, DataPlaneState, LocalExecutorClient};
use picloud_shared::{ExecutionLogSink, ScriptValidator};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use tower_http::trace::TraceLayer;
/// Compose the manager + orchestrator routes on top of a shared
/// Postgres pool, returning an Axum router ready to be served.
pub fn build_app(pool: PgPool) -> Router {
let engine = Arc::new(Engine::new(Limits::default()));
let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone()));
let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone()));
let log_sink: Arc<dyn ExecutionLogSink> = Arc::new(PostgresExecutionLogSink::new(pool));
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(
script_repo.clone(),
)));
let executor = Arc::new(LocalExecutorClient::new(engine.clone()));
let admin = AdminState {
repo: Arc::new(PostgresScriptRepoHandle(script_repo)),
logs: log_repo,
validator: engine as Arc<dyn ScriptValidator>,
};
let data_plane = DataPlaneState {
executor,
resolver,
log_sink,
};
Router::new()
.route("/healthz", get(healthz))
.route("/", get(root))
.nest("/api/admin", admin_router(admin))
.nest("/api", data_plane_router(data_plane))
.layer(TraceLayer::new_for_http())
}
/// Open a Postgres pool with the binary's standard timeout settings.
/// Exposed so tests reach for the same configuration when needed.
pub async fn init_db(url: &str) -> anyhow::Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(5))
.connect(url)
.await?;
Ok(pool)
}
async fn healthz() -> &'static str {
"ok"
}
async fn root() -> &'static str {
"picloud — see /api/admin/* (manager) and /api/execute/* (orchestrator)"
}
// ----------------------------------------------------------------------------
// Bridge: a single `PostgresScriptRepository` Arc is shared between the
// admin router (writes) and the resolver (reads). The resolver wants
// owned `impl ScriptRepository`, so we wrap the Arc in a delegating
// handle here rather than instantiating two repos against the same pool.
// ----------------------------------------------------------------------------
struct PostgresScriptRepoHandle(Arc<PostgresScriptRepository>);
#[async_trait::async_trait]
impl picloud_manager_core::ScriptRepository for PostgresScriptRepoHandle {
async fn get(
&self,
id: picloud_shared::ScriptId,
) -> Result<Option<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.get(id).await
}
async fn list(
&self,
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.list().await
}
async fn create(
&self,
input: picloud_manager_core::NewScript,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.create(input).await
}
async fn update(
&self,
id: picloud_shared::ScriptId,
patch: picloud_manager_core::ScriptPatch,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.update(id, patch).await
}
async fn delete(
&self,
id: picloud_shared::ScriptId,
) -> Result<(), picloud_manager_core::ScriptRepositoryError> {
self.0.delete(id).await
}
}

View File

@@ -1,32 +1,11 @@
//! PiCloud all-in-one binary — manager + orchestrator + executor in
//! one process. The only binary built for MVP.
//!
//! On startup it opens the Postgres pool, runs migrations, builds the
//! Rhai engine, then nests both core routers behind a single Axum
//! listener:
//!
//! /api/admin/* → manager-core (script CRUD)
//! /api/execute/{id} → orchestrator-core (data plane)
//! /healthz → liveness probe
//!
//! Cluster-mode (v1.3+) keeps this layout — splits each nested router
//! into its own binary, swaps `LocalExecutorClient` for the remote one,
//! and points Caddy at the new upstreams.
//! PiCloud all-in-one binary — see `lib.rs` for the actual app
//! composition; this file is only the runtime shell (env config,
//! logger, migrations, listener).
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::{routing::get, Router};
use picloud_executor_core::{Engine, Limits};
use picloud_manager_core::{
admin_router, migrations, AdminState, PostgresScriptRepository, RepoResolver,
};
use picloud_orchestrator_core::{data_plane_router, DataPlaneState, LocalExecutorClient};
use picloud_shared::ScriptValidator;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use tower_http::trace::TraceLayer;
use picloud::{build_app, init_db};
use picloud_manager_core::migrations;
use tracing_subscriber::EnvFilter;
#[tokio::main]
@@ -61,45 +40,6 @@ fn init_tracing() {
.init();
}
async fn init_db(url: &str) -> anyhow::Result<PgPool> {
let pool = PgPoolOptions::new()
.max_connections(10)
.acquire_timeout(Duration::from_secs(5))
.connect(url)
.await?;
Ok(pool)
}
fn build_app(pool: PgPool) -> Router {
// Core services. The `Arc`s let the routers and any background
// tasks share the same instances cheaply.
let engine = Arc::new(Engine::new(Limits::default()));
let repo = Arc::new(PostgresScriptRepository::new(pool));
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(repo.clone())));
let executor = Arc::new(LocalExecutorClient::new(engine.clone()));
let admin = AdminState {
repo: Arc::new(PostgresScriptRepoHandle(repo)),
validator: engine as Arc<dyn ScriptValidator>,
};
let data_plane = DataPlaneState { executor, resolver };
Router::new()
.route("/healthz", get(healthz))
.route("/", get(root))
.nest("/api/admin", admin_router(admin))
.nest("/api", data_plane_router(data_plane))
.layer(TraceLayer::new_for_http())
}
async fn healthz() -> &'static str {
"ok"
}
async fn root() -> &'static str {
"picloud — see /api/admin/* (manager) and /api/execute/* (orchestrator)"
}
async fn shutdown_signal() {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
@@ -119,46 +59,3 @@ async fn shutdown_signal() {
() = terminate => tracing::info!("SIGTERM received, draining"),
}
}
// ----------------------------------------------------------------------------
// Bridge: PostgresScriptRepository is constructed once and shared via
// Arc; `RepoResolver` wants ownership of an impl of `ScriptRepository`.
// We pass a thin wrapper that delegates to the Arc'd repo, so a single
// connection pool backs both the admin router and the resolver.
// ----------------------------------------------------------------------------
struct PostgresScriptRepoHandle(Arc<PostgresScriptRepository>);
#[async_trait::async_trait]
impl picloud_manager_core::ScriptRepository for PostgresScriptRepoHandle {
async fn get(
&self,
id: picloud_shared::ScriptId,
) -> Result<Option<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.get(id).await
}
async fn list(
&self,
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
self.0.list().await
}
async fn create(
&self,
input: picloud_manager_core::NewScript,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.create(input).await
}
async fn update(
&self,
id: picloud_shared::ScriptId,
patch: picloud_manager_core::ScriptPatch,
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
self.0.update(id, patch).await
}
async fn delete(
&self,
id: picloud_shared::ScriptId,
) -> Result<(), picloud_manager_core::ScriptRepositoryError> {
self.0.delete(id).await
}
}

301
crates/picloud/tests/api.rs Normal file
View File

@@ -0,0 +1,301 @@
//! Integration tests over the full HTTP surface.
//!
//! These tests are `#[ignore]`d by default because they require a
//! running Postgres reachable via `DATABASE_URL`. To run them:
//!
//! docker compose up -d postgres
//! DATABASE_URL=postgres://picloud:picloud@127.0.0.1:15432/picloud \
//! cargo test -p picloud --test api -- --include-ignored
//!
//! Each `#[sqlx::test]` test runs against a freshly created database
//! with `manager-core`'s migrations applied; tests are isolated and
//! can run in parallel.
#![allow(clippy::needless_pass_by_value)]
use axum_test::TestServer;
use serde_json::{json, Value};
use sqlx::PgPool;
fn server(pool: PgPool) -> TestServer {
TestServer::new(picloud::build_app(pool)).expect("TestServer should build")
}
// ============================================================================
// Health
// ============================================================================
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn healthz_responds_ok(pool: PgPool) {
let r = server(pool).get("/healthz").await;
r.assert_status_ok();
assert_eq!(r.text(), "ok");
}
// ============================================================================
// Script CRUD
// ============================================================================
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn create_script_returns_201_with_full_record(pool: PgPool) {
let s = server(pool);
let r = s
.post("/api/admin/scripts")
.json(&json!({
"name": "echo",
"description": "test",
"source": "#{ statusCode: 200, body: 42 }",
}))
.await;
r.assert_status(axum::http::StatusCode::CREATED);
let body: Value = r.json();
assert_eq!(body["name"], "echo");
assert_eq!(body["version"], 1);
assert_eq!(body["timeout_seconds"], 30);
assert!(body["id"].as_str().is_some());
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn create_with_invalid_syntax_returns_422(pool: PgPool) {
let r = server(pool)
.post("/api/admin/scripts")
.json(&json!({ "name": "broken", "source": "@@@ not rhai @@@" }))
.await;
r.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY);
let body: Value = r.json();
assert!(body["error"].as_str().unwrap().contains("invalid script"));
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn duplicate_name_returns_409(pool: PgPool) {
let s = server(pool);
s.post("/api/admin/scripts")
.json(&json!({ "name": "dup", "source": "42" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let r = s
.post("/api/admin/scripts")
.json(&json!({ "name": "dup", "source": "43" }))
.await;
r.assert_status(axum::http::StatusCode::CONFLICT);
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn list_returns_all_scripts(pool: PgPool) {
let s = server(pool);
for name in ["alpha", "bravo", "charlie"] {
s.post("/api/admin/scripts")
.json(&json!({ "name": name, "source": "1" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
}
let r = s.get("/api/admin/scripts").await;
r.assert_status_ok();
let body: Vec<Value> = r.json();
assert_eq!(body.len(), 3);
let names: Vec<&str> = body.iter().map(|s| s["name"].as_str().unwrap()).collect();
assert_eq!(names, vec!["alpha", "bravo", "charlie"]);
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn update_bumps_version_and_persists_changes(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({ "name": "u", "source": "1" }))
.await
.json();
let id = created["id"].as_str().unwrap();
let r = s
.put(&format!("/api/admin/scripts/{id}"))
.json(&json!({ "source": "#{ statusCode: 200, body: \"v2\" }", "timeout_seconds": 60 }))
.await;
r.assert_status_ok();
let updated: Value = r.json();
assert_eq!(updated["version"], 2);
assert_eq!(updated["timeout_seconds"], 60);
assert!(updated["source"].as_str().unwrap().contains("v2"));
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn update_with_invalid_source_returns_422(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({ "name": "u", "source": "1" }))
.await
.json();
let id = created["id"].as_str().unwrap();
let r = s
.put(&format!("/api/admin/scripts/{id}"))
.json(&json!({ "source": "@@@ broken @@@" }))
.await;
r.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY);
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn delete_then_get_returns_404(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({ "name": "d", "source": "1" }))
.await
.json();
let id = created["id"].as_str().unwrap();
s.delete(&format!("/api/admin/scripts/{id}"))
.await
.assert_status(axum::http::StatusCode::NO_CONTENT);
s.get(&format!("/api/admin/scripts/{id}"))
.await
.assert_status_not_found();
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn get_nonexistent_returns_404(pool: PgPool) {
let r = server(pool)
.get("/api/admin/scripts/00000000-0000-0000-0000-000000000000")
.await;
r.assert_status_not_found();
}
// ============================================================================
// Execution + audit logs
// ============================================================================
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn execute_echoes_body_back(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({
"name": "echo",
"source": "#{ statusCode: 200, body: ctx.request.body }",
}))
.await
.json();
let id = created["id"].as_str().unwrap();
let r = s
.post(&format!("/api/execute/{id}"))
.json(&json!({ "n": 42 }))
.await;
r.assert_status_ok();
let body: Value = r.json();
assert_eq!(body, json!({ "n": 42 }));
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn execute_passes_through_status_and_headers(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({
"name": "header-test",
"source": "#{ statusCode: 201, headers: #{ \"x-tag\": \"on\" }, body: 1 }",
}))
.await
.json();
let id = created["id"].as_str().unwrap();
let r = s.post(&format!("/api/execute/{id}")).json(&json!({})).await;
r.assert_status(axum::http::StatusCode::CREATED);
assert_eq!(r.header("x-tag"), "on");
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn execute_nonexistent_returns_404(pool: PgPool) {
let r = server(pool)
.post("/api/execute/00000000-0000-0000-0000-000000000000")
.json(&json!({}))
.await;
r.assert_status_not_found();
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn execution_logs_capture_invocations(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({
"name": "logger",
"source": "log::info(\"called\", #{ marker: 7 }); #{ statusCode: 200, body: \"done\" }",
}))
.await
.json();
let id = created["id"].as_str().unwrap();
// No logs yet.
let r = s.get(&format!("/api/admin/scripts/{id}/logs")).await;
r.assert_status_ok();
let logs: Vec<Value> = r.json();
assert!(logs.is_empty());
// Two invocations.
s.post(&format!("/api/execute/{id}"))
.json(&json!({ "first": true }))
.await
.assert_status_ok();
s.post(&format!("/api/execute/{id}"))
.json(&json!({ "second": true }))
.await
.assert_status_ok();
let logs: Vec<Value> = s.get(&format!("/api/admin/scripts/{id}/logs")).await.json();
assert_eq!(logs.len(), 2);
// Most-recent-first ordering.
assert_eq!(logs[0]["request_body"], json!({ "second": true }));
assert_eq!(logs[1]["request_body"], json!({ "first": true }));
// Status + response shape captured.
assert_eq!(logs[0]["status"], "success");
assert_eq!(logs[0]["response_code"], 200);
assert_eq!(logs[0]["response_body"], json!("done"));
// Script-side log entries captured.
let entries = logs[0]["script_logs"].as_array().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0]["level"], "info");
assert_eq!(entries[0]["message"], "called");
assert_eq!(entries[0]["data"], json!({ "marker": 7 }));
}
#[ignore = "needs DATABASE_URL pointing at a running Postgres"]
#[sqlx::test(migrations = "../manager-core/migrations")]
async fn execution_errors_are_still_logged(pool: PgPool) {
let s = server(pool);
let created: Value = s
.post("/api/admin/scripts")
.json(&json!({
"name": "boom",
"source": "1 / 0",
}))
.await
.json();
let id = created["id"].as_str().unwrap();
let r = s.post(&format!("/api/execute/{id}")).json(&json!({})).await;
r.assert_status(axum::http::StatusCode::BAD_GATEWAY);
let logs: Vec<Value> = s.get(&format!("/api/admin/scripts/{id}/logs")).await.json();
assert_eq!(logs.len(), 1);
assert_eq!(logs[0]["status"], "error");
assert!(logs[0]["response_body"]["error"].is_string());
}

View File

@@ -9,6 +9,7 @@ license.workspace = true
workspace = true
[dependencies]
async-trait.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1,54 @@
use std::collections::BTreeMap;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{RequestId, ScriptId};
/// One row in the `execution_logs` table. Same shape flows through the
/// `ExecutionLogSink` trait and the `GET /scripts/{id}/logs` response.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionLog {
pub id: Uuid,
pub script_id: ScriptId,
pub request_id: RequestId,
pub request_path: String,
pub request_headers: BTreeMap<String, String>,
pub request_body: serde_json::Value,
pub response_code: Option<u16>,
pub response_body: Option<serde_json::Value>,
/// `log::*` entries captured during the execution, serialized as a
/// JSON array of `{timestamp, level, message, data}` objects.
pub script_logs: serde_json::Value,
pub duration_ms: u64,
pub status: ExecutionStatus,
pub created_at: DateTime<Utc>,
}
/// Matches the CHECK constraint on `execution_logs.status`. Keep the
/// serde rename in sync with the migration.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ExecutionStatus {
Success,
Error,
Timeout,
BudgetExceeded,
}
impl ExecutionStatus {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Success => "success",
Self::Error => "error",
Self::Timeout => "timeout",
Self::BudgetExceeded => "budget_exceeded",
}
}
}

View File

@@ -5,11 +5,15 @@
//! entity, error roots, transport DTOs).
pub mod error;
pub mod execution_log;
pub mod ids;
pub mod log_sink;
pub mod script;
pub mod validator;
pub use error::Error;
pub use execution_log::{ExecutionLog, ExecutionStatus};
pub use ids::{ExecutionId, RequestId, ScriptId};
pub use log_sink::{ExecutionLogSink, LogSinkError};
pub use script::Script;
pub use validator::{ScriptValidator, ValidationError};

View File

@@ -0,0 +1,22 @@
//! Abstraction over how execution logs are recorded.
//!
//! Lives in `shared` so the orchestrator can append logs without
//! depending on `manager-core`. In single-process MVP mode, the
//! manager's Postgres sink is plugged in directly. In cluster mode
//! (v1.3+) the orchestrator's impl will post over HTTP to the manager.
use async_trait::async_trait;
use thiserror::Error;
use crate::execution_log::ExecutionLog;
#[derive(Debug, Error)]
pub enum LogSinkError {
#[error("sink backend error: {0}")]
Backend(String),
}
#[async_trait]
pub trait ExecutionLogSink: Send + Sync {
async fn record(&self, log: ExecutionLog) -> Result<(), LogSinkError>;
}