- `POST /api/v1/admin/scripts/{id}/routes` returns 400 when the
target script is `kind=module`. Modules have no entry point — they
are imported, not invoked.
- `POST /api/v1/admin/apps/{id}/triggers/{kv,docs,dead_letter}` gain
a shared `validate_trigger_target` that loads the target script
and rejects when:
- the script doesn't exist
- the script belongs to a different app (latent v1.1.1/v1.1.2 gap
where triggers could target a script in any app — closed here)
- the script is `kind=module`
- `TriggersState` grows a `scripts: Arc<dyn ScriptRepository>` field
so handlers can load the target script.
- Trigger-create test helpers split into `state_with` (empty script
repo — for tests asserting upstream errors) and
`state_with_endpoint` (pre-populated — for tests asserting
successful creation). `InMemoryScriptRepo` added to the test
module.
Workspace builds; full test suite (~440 tests) green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
444 lines
18 KiB
Rust
444 lines
18 KiB
Rust
//! Library half of the picloud all-in-one. `main.rs` is a thin wrapper
|
|
//! that opens the pool, runs migrations, calls `build_app`, and binds
|
|
//! the listener. Tests use the same `build_app` against an
|
|
//! ephemeral test database.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use axum::middleware::from_fn_with_state;
|
|
use axum::{routing::get, Json, Router};
|
|
use picloud_executor_core::{Engine, Limits};
|
|
use picloud_manager_core::{
|
|
admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router,
|
|
attach_principal_if_present, auth_router, compile_routes, dead_letters_router, migrations,
|
|
require_authenticated, route_admin_router, triggers_router, AbandonedRepo,
|
|
AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState,
|
|
ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState,
|
|
AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher,
|
|
DocsServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo, PostgresAbandonedRepo,
|
|
PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository,
|
|
PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository,
|
|
PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo,
|
|
PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo,
|
|
PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver,
|
|
RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository,
|
|
TriggerConfig, TriggerRepo, TriggersState,
|
|
};
|
|
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
|
|
use picloud_orchestrator_core::{
|
|
data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, InboxRegistry,
|
|
LocalExecutorClient,
|
|
};
|
|
use picloud_shared::{
|
|
DeadLetterService, DocsService, ExecutionLogSink, InboxResolver, KvService, OutboxWriter,
|
|
ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
|
|
WIRE_VERSION,
|
|
};
|
|
use sqlx::postgres::PgPoolOptions;
|
|
use sqlx::PgPool;
|
|
use tower_http::trace::TraceLayer;
|
|
|
|
/// Default session TTL when `PICLOUD_SESSION_TTL_HOURS` isn't set.
|
|
const DEFAULT_SESSION_TTL_HOURS: u64 = 24;
|
|
|
|
/// Bundles the auth-related dependencies that both `build_app` and the
|
|
/// startup bootstrap need. Built once in `main.rs` from the shared pool.
|
|
pub struct AuthDeps {
|
|
pub users: Arc<dyn AdminUserRepository>,
|
|
pub sessions: Arc<dyn AdminSessionRepository>,
|
|
pub keys: Arc<dyn ApiKeyRepository>,
|
|
pub ttl: Duration,
|
|
}
|
|
|
|
impl AuthDeps {
|
|
/// Construct from a pool with the binary's standard defaults.
|
|
#[must_use]
|
|
pub fn from_pool(pool: PgPool) -> Self {
|
|
Self {
|
|
users: Arc::new(PostgresAdminUserRepository::new(pool.clone())),
|
|
sessions: Arc::new(PostgresAdminSessionRepository::new(pool.clone())),
|
|
keys: Arc::new(PostgresApiKeyRepository::new(pool)),
|
|
ttl: read_session_ttl(),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn read_session_ttl() -> Duration {
|
|
let hours = std::env::var("PICLOUD_SESSION_TTL_HOURS")
|
|
.ok()
|
|
.and_then(|s| s.parse::<u64>().ok())
|
|
.filter(|h| *h > 0)
|
|
.unwrap_or(DEFAULT_SESSION_TTL_HOURS);
|
|
Duration::from_secs(hours * 3600)
|
|
}
|
|
|
|
/// Compose the manager + orchestrator routes on top of a shared
|
|
/// Postgres pool, returning an Axum router ready to be served.
|
|
///
|
|
/// All API routes live under `/api/v{API_VERSION}/...`. The dashboard
|
|
/// is mounted by Caddy at `/admin/*` (its base path). Anything else
|
|
/// falls through to the user-route table — user scripts can bind to
|
|
/// arbitrary paths (subject to the reserved-prefix list).
|
|
///
|
|
/// `auth` carries the admin user/session repositories and the
|
|
/// configured session TTL. The manager-side admin endpoints
|
|
/// (`/api/v1/admin/scripts/*`, `/api/v1/admin/routes/*`,
|
|
/// `/api/v1/admin/admins/*`, `/api/v1/admin/auth/me`) are guarded by
|
|
/// the `require_admin` middleware. The data plane
|
|
/// (`/api/v1/execute/{id}`, the user-route fallthrough, `/healthz`,
|
|
/// `/version`) stays open — it's the public ingress for user scripts.
|
|
#[allow(clippy::too_many_lines)]
|
|
pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|
let script_repo = Arc::new(PostgresScriptRepository::new(pool.clone()));
|
|
let log_repo = Arc::new(PostgresExecutionLogRepository::new(pool.clone()));
|
|
let log_sink: Arc<dyn ExecutionLogSink> = Arc::new(PostgresExecutionLogSink::new(pool.clone()));
|
|
let route_repo = Arc::new(PostgresRouteRepository::new(pool.clone()));
|
|
let apps_repo: Arc<dyn AppRepository> = Arc::new(PostgresAppRepository::new(pool.clone()));
|
|
let domains_repo: Arc<dyn AppDomainRepository> =
|
|
Arc::new(PostgresAppDomainRepository::new(pool.clone()));
|
|
// The Postgres app_members repo implements both `AppMembersRepository`
|
|
// (CRUD over the table) and `AuthzRepo` (single-row membership lookup
|
|
// for capability checks). Construct it once and clone the Arc into
|
|
// both trait views — same allocation, two vtables.
|
|
let members_concrete = Arc::new(PostgresAppMembersRepository::new(pool.clone()));
|
|
let members: Arc<dyn AppMembersRepository> = members_concrete.clone();
|
|
let authz: Arc<dyn AuthzRepo> = members_concrete;
|
|
|
|
// Triggers framework storage. The outbox event emitter routes
|
|
// KV mutations into the outbox; the dispatcher fans them out.
|
|
let trigger_repo: Arc<dyn TriggerRepo> = Arc::new(PostgresTriggerRepo::new(pool.clone()));
|
|
// PostgresOutboxRepo implements both `OutboxRepo` (the dispatcher
|
|
// surface) and `OutboxWriter` (the orchestrator surface). Construct
|
|
// the concrete Arc once, clone it into each trait view — same
|
|
// allocation, two vtables (mirrors how `members_concrete` above is
|
|
// used as both `AppMembersRepository` and `AuthzRepo`).
|
|
let outbox_concrete = Arc::new(PostgresOutboxRepo::new(pool.clone()));
|
|
let outbox_repo: Arc<dyn OutboxRepo> = outbox_concrete.clone();
|
|
let outbox_writer: Arc<dyn OutboxWriter> = outbox_concrete;
|
|
let dl_repo: Arc<dyn DeadLetterRepo> = Arc::new(PostgresDeadLetterRepo::new(pool.clone()));
|
|
let abandoned_repo: Arc<dyn AbandonedRepo> = Arc::new(PostgresAbandonedRepo::new(pool.clone()));
|
|
let trigger_config = TriggerConfig::from_env();
|
|
|
|
// SDK services bundle. v1.1.1 added KV + dead-letter; v1.1.2 added
|
|
// the docs store; v1.1.3 adds the module source backing the Rhai
|
|
// resolver. All bound services share the outbox-backed event
|
|
// emitter so KV and docs mutations both fan out through the same
|
|
// dispatcher.
|
|
let kv_repo = Arc::new(PostgresKvRepo::new(pool.clone()));
|
|
let docs_repo = Arc::new(PostgresDocsRepo::new(pool.clone()));
|
|
let events: Arc<dyn ServiceEventEmitter> = Arc::new(OutboxEventEmitter::new(
|
|
trigger_repo.clone(),
|
|
outbox_repo.clone(),
|
|
));
|
|
let kv: Arc<dyn KvService> =
|
|
Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone()));
|
|
let docs: Arc<dyn DocsService> = Arc::new(DocsServiceImpl::new(
|
|
docs_repo,
|
|
authz.clone(),
|
|
events.clone(),
|
|
));
|
|
let dl_service: Arc<dyn DeadLetterService> = Arc::new(PostgresDeadLetterService::new(
|
|
dl_repo.clone(),
|
|
outbox_repo.clone(),
|
|
authz.clone(),
|
|
));
|
|
let modules: Arc<dyn picloud_shared::ModuleSource> =
|
|
Arc::new(picloud_manager_core::PostgresModuleSource::new(pool));
|
|
let services = Services::new(kv, docs, dl_service.clone(), events, modules);
|
|
let engine = Arc::new(Engine::new(Limits::default(), services));
|
|
|
|
// Compile the routes table once at startup; admin writes refresh it.
|
|
let route_table = Arc::new(RouteTable::new());
|
|
let initial = route_repo.list_all().await?;
|
|
let compiled = compile_routes(&initial)
|
|
.map_err(|e| anyhow::anyhow!("failed to compile stored routes: {e}"))?;
|
|
route_table.replace_all(compiled);
|
|
|
|
// Same shape for app domains (Host → app_id cache).
|
|
let app_domain_table = Arc::new(AppDomainTable::new());
|
|
let initial_domains = domains_repo.list_all().await?;
|
|
let compiled_domains: Vec<_> = initial_domains
|
|
.iter()
|
|
.filter_map(|d| {
|
|
picloud_orchestrator_core::routing::parse_app_domain(&d.pattern)
|
|
.ok()
|
|
.map(|p| picloud_orchestrator_core::routing::CompiledAppDomain {
|
|
app_id: d.app_id,
|
|
pattern: p.pattern,
|
|
shape_key: p.shape_key,
|
|
})
|
|
})
|
|
.collect();
|
|
app_domain_table.replace(compiled_domains);
|
|
|
|
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(
|
|
script_repo.clone(),
|
|
)));
|
|
// Single global gate — overflow is rejected with 503 + Retry-After.
|
|
// See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`.
|
|
let gate = Arc::new(ExecutionGate::from_env());
|
|
let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate.clone()));
|
|
|
|
// Dispatcher — single tokio task that polls the outbox and routes
|
|
// due rows to the executor. Shares the `ExecutionGate` with sync
|
|
// HTTP per design notes §2 (one cap for everything).
|
|
let dispatcher_script_repo: Arc<dyn ScriptRepository> =
|
|
Arc::new(PostgresScriptRepoHandle(script_repo.clone()));
|
|
let principals: Arc<dyn PrincipalResolver> =
|
|
Arc::new(AdminPrincipalResolver::new(auth.users.clone()));
|
|
// The InboxRegistry is constructed once and shared between the
|
|
// orchestrator (registers receivers, awaits) and the dispatcher
|
|
// (delivers results). Two Arc views on the same allocation.
|
|
let inbox_registry = Arc::new(InboxRegistry::new());
|
|
let inbox_resolver: Arc<dyn InboxResolver> = inbox_registry.clone();
|
|
Dispatcher {
|
|
outbox: outbox_repo.clone(),
|
|
triggers: trigger_repo.clone(),
|
|
scripts: dispatcher_script_repo,
|
|
dead_letters: dl_repo.clone(),
|
|
abandoned: abandoned_repo.clone(),
|
|
principals,
|
|
executor: executor.clone(),
|
|
gate,
|
|
inbox: inbox_resolver,
|
|
config: trigger_config,
|
|
instance_id: format!("picloud-{}", std::process::id()),
|
|
}
|
|
.spawn();
|
|
|
|
let admin = AdminState {
|
|
repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
|
|
logs: log_repo,
|
|
apps: apps_repo.clone(),
|
|
authz: authz.clone(),
|
|
validator: engine as Arc<dyn ScriptValidator>,
|
|
sandbox_ceiling: SandboxCeiling::from_env(),
|
|
};
|
|
let route_admin = RouteAdminState {
|
|
routes: route_repo.clone(),
|
|
scripts: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
|
|
domains: domains_repo.clone(),
|
|
table: route_table.clone(),
|
|
authz: authz.clone(),
|
|
};
|
|
let data_plane = DataPlaneState {
|
|
executor,
|
|
resolver,
|
|
log_sink,
|
|
app_domains: app_domain_table.clone(),
|
|
routes: route_table,
|
|
inbox: inbox_registry,
|
|
outbox: outbox_writer,
|
|
};
|
|
// Weekly retention sweepers for dead_letters + abandoned_executions.
|
|
// Defaults: 30 days / 7 days (design notes §3 #9 + §4 retention).
|
|
picloud_manager_core::spawn_dead_letter_gc(
|
|
dl_repo.clone(),
|
|
trigger_config.dead_letter_retention_days,
|
|
);
|
|
picloud_manager_core::spawn_abandoned_gc(
|
|
abandoned_repo.clone(),
|
|
trigger_config.abandoned_retention_days,
|
|
);
|
|
let triggers_state = TriggersState {
|
|
triggers: trigger_repo,
|
|
apps: apps_repo.clone(),
|
|
authz: authz.clone(),
|
|
scripts: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
|
|
config: trigger_config,
|
|
};
|
|
let dead_letters_state = DeadLettersState {
|
|
repo: dl_repo,
|
|
service: dl_service,
|
|
apps: apps_repo.clone(),
|
|
authz: authz.clone(),
|
|
};
|
|
let apps_state = AppsState {
|
|
apps: apps_repo,
|
|
domains: domains_repo,
|
|
routes: route_repo,
|
|
domain_table: app_domain_table,
|
|
authz: authz.clone(),
|
|
};
|
|
|
|
let auth_state = AuthState {
|
|
users: auth.users.clone(),
|
|
sessions: auth.sessions.clone(),
|
|
keys: auth.keys.clone(),
|
|
ttl: auth.ttl,
|
|
};
|
|
let admins_state = AdminsState {
|
|
users: auth.users.clone(),
|
|
sessions: auth.sessions,
|
|
keys: auth.keys.clone(),
|
|
authz: authz.clone(),
|
|
};
|
|
let app_members_state = AppMembersState {
|
|
apps: apps_state.apps.clone(),
|
|
users: auth.users,
|
|
members,
|
|
authz,
|
|
};
|
|
let api_keys_state = ApiKeysState { keys: auth.keys };
|
|
|
|
// /admin/auth/login + /logout are unguarded by design (login is how
|
|
// you get in). /admin/auth/me applies the middleware internally so
|
|
// the same Router::with_state machinery composes cleanly. Everything
|
|
// else under /admin gets the require_authenticated layer; capability
|
|
// checks live in each handler (after the resource is loaded so the
|
|
// capability binds to the resource's actual app_id).
|
|
let guarded_admin = Router::new()
|
|
.merge(admin_router(admin))
|
|
.merge(route_admin_router(route_admin))
|
|
.merge(admins_router(admins_state))
|
|
.merge(apps_router(apps_state))
|
|
.merge(app_members_router(app_members_state))
|
|
.merge(api_keys_router(api_keys_state))
|
|
.merge(triggers_router(triggers_state))
|
|
.merge(dead_letters_router(dead_letters_state))
|
|
.layer(from_fn_with_state(
|
|
auth_state.clone(),
|
|
require_authenticated,
|
|
));
|
|
|
|
// Silence "unused import" lint on `apps_api` — we re-export via the
|
|
// facade above; the bare module path is retained so it's discoverable.
|
|
let _ = apps_api::AppsState::clone;
|
|
|
|
// Opportunistic principal extraction on every data-plane request.
|
|
// Always inserts `Extension<Option<Principal>>`: Some for authed
|
|
// ingress (bearer / cookie), None otherwise. Handlers depend on
|
|
// this layer being applied — scoped to the data-plane routers so
|
|
// the admin path (which uses `require_authenticated`) doesn't
|
|
// double-resolve the same token.
|
|
let data_plane_routed = data_plane_router(data_plane.clone()).layer(from_fn_with_state(
|
|
auth_state.clone(),
|
|
attach_principal_if_present,
|
|
));
|
|
let user_routes = user_routes_router(data_plane).layer(from_fn_with_state(
|
|
auth_state.clone(),
|
|
attach_principal_if_present,
|
|
));
|
|
|
|
let api_v1 = Router::new()
|
|
.nest("/admin", auth_router(auth_state))
|
|
.nest("/admin", guarded_admin)
|
|
.merge(data_plane_routed);
|
|
|
|
Ok(Router::new()
|
|
.route("/healthz", get(healthz))
|
|
.route("/version", get(version))
|
|
.nest(&format!("/api/v{API_VERSION}"), api_v1)
|
|
.merge(user_routes)
|
|
.layer(TraceLayer::new_for_http()))
|
|
}
|
|
|
|
/// Open a Postgres pool with the binary's standard timeout settings.
|
|
/// Exposed so tests reach for the same configuration when needed.
|
|
pub async fn init_db(url: &str) -> anyhow::Result<PgPool> {
|
|
let pool = PgPoolOptions::new()
|
|
.max_connections(10)
|
|
.acquire_timeout(Duration::from_secs(5))
|
|
.connect(url)
|
|
.await?;
|
|
Ok(pool)
|
|
}
|
|
|
|
async fn healthz() -> &'static str {
|
|
"ok"
|
|
}
|
|
|
|
/// Snapshot of every compatibility-surface version this process speaks
|
|
/// plus the operator-configured public base URL (so the dashboard can
|
|
/// render full URLs for user routes).
|
|
///
|
|
/// Source of truth: `shared::version`, the embedded migrations, and
|
|
/// the `PICLOUD_PUBLIC_BASE_URL` env var (default
|
|
/// `http://localhost:8000`).
|
|
async fn version() -> Json<serde_json::Value> {
|
|
let public_base_url = std::env::var("PICLOUD_PUBLIC_BASE_URL")
|
|
.unwrap_or_else(|_| "http://localhost:8000".to_string());
|
|
Json(serde_json::json!({
|
|
"product": PRODUCT_VERSION,
|
|
"sdk": SDK_VERSION,
|
|
"api": API_VERSION,
|
|
"schema": migrations::latest_version(),
|
|
"wire": WIRE_VERSION,
|
|
"public_base_url": public_base_url,
|
|
}))
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Bridge: a single `PostgresScriptRepository` Arc is shared between the
|
|
// admin router (writes) and the resolver (reads). The resolver wants
|
|
// owned `impl ScriptRepository`, so we wrap the Arc in a delegating
|
|
// handle here rather than instantiating two repos against the same pool.
|
|
// ----------------------------------------------------------------------------
|
|
|
|
struct PostgresScriptRepoHandle(Arc<PostgresScriptRepository>);
|
|
|
|
#[async_trait::async_trait]
|
|
impl picloud_manager_core::ScriptRepository for PostgresScriptRepoHandle {
|
|
async fn get(
|
|
&self,
|
|
id: picloud_shared::ScriptId,
|
|
) -> Result<Option<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.get(id).await
|
|
}
|
|
async fn list(
|
|
&self,
|
|
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.list().await
|
|
}
|
|
async fn list_for_app(
|
|
&self,
|
|
app_id: picloud_shared::AppId,
|
|
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.list_for_app(app_id).await
|
|
}
|
|
async fn list_for_user(
|
|
&self,
|
|
user_id: picloud_shared::AdminUserId,
|
|
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.list_for_user(user_id).await
|
|
}
|
|
async fn create(
|
|
&self,
|
|
input: picloud_manager_core::NewScript,
|
|
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.create(input).await
|
|
}
|
|
async fn update(
|
|
&self,
|
|
id: picloud_shared::ScriptId,
|
|
patch: picloud_manager_core::ScriptPatch,
|
|
) -> Result<picloud_shared::Script, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.update(id, patch).await
|
|
}
|
|
async fn delete(
|
|
&self,
|
|
id: picloud_shared::ScriptId,
|
|
) -> Result<(), picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.delete(id).await
|
|
}
|
|
async fn count_routes_for_script(
|
|
&self,
|
|
script_id: picloud_shared::ScriptId,
|
|
) -> Result<i64, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.count_routes_for_script(script_id).await
|
|
}
|
|
async fn count_triggers_for_script(
|
|
&self,
|
|
script_id: picloud_shared::ScriptId,
|
|
) -> Result<i64, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.count_triggers_for_script(script_id).await
|
|
}
|
|
async fn list_imports(
|
|
&self,
|
|
script_id: picloud_shared::ScriptId,
|
|
) -> Result<Vec<picloud_shared::Script>, picloud_manager_core::ScriptRepositoryError> {
|
|
self.0.list_imports(script_id).await
|
|
}
|
|
}
|