diff --git a/crates/orchestrator-core/src/client.rs b/crates/orchestrator-core/src/client.rs index e64c49a..feaca81 100644 --- a/crates/orchestrator-core/src/client.rs +++ b/crates/orchestrator-core/src/client.rs @@ -4,6 +4,8 @@ use std::time::Duration; use async_trait::async_trait; use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse}; +use crate::gate::{AcquireError, ExecutionGate}; + /// Maximum wall-clock time we'll wait for a single invocation, regardless /// of the per-script `timeout_seconds`. Provides a hard ceiling on /// resource usage independent of misconfigured scripts. @@ -30,14 +32,19 @@ pub trait ExecutorClient: Send + Sync { /// `executor-core::Engine::execute` is synchronous; we offload it to a /// blocking thread so it doesn't park a Tokio worker, and apply the /// wall-clock timeout here. +/// +/// Holds an `ExecutionGate` and acquires a permit before `spawn_blocking` +/// so a script storm can't drain the blocking-thread pool. The permit +/// drops with the future, returning the slot. pub struct LocalExecutorClient { engine: Arc, + gate: Arc, } impl LocalExecutorClient { #[must_use] - pub fn new(engine: Arc) -> Self { - Self { engine } + pub fn new(engine: Arc, gate: Arc) -> Self { + Self { engine, gate } } } @@ -49,6 +56,18 @@ impl ExecutorClient for LocalExecutorClient { req: ExecRequest, timeout: Duration, ) -> Result { + // Acquire before spending any wall-clock budget. The permit is + // held until this future returns; spawn_blocking inherits the + // gating via the captured `_permit`. + let _permit = + self.gate + .try_acquire() + .map_err( + |AcquireError::Overloaded { retry_after_secs }| ExecError::Overloaded { + retry_after_secs, + }, + )?; + let timeout = timeout.min(HARD_TIMEOUT_CAP); let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX); diff --git a/crates/orchestrator-core/src/gate.rs b/crates/orchestrator-core/src/gate.rs new file mode 100644 index 0000000..5fd6ce3 --- /dev/null +++ b/crates/orchestrator-core/src/gate.rs @@ -0,0 +1,154 @@ +//! Global concurrency gate for the data plane. +//! +//! Wraps a single `tokio::sync::Semaphore` so the executor can refuse +//! admission immediately when too many invocations are already in +//! flight. Designed for v1.1.0's single-node MVP — one cap across all +//! apps and scripts. Per-app or per-script caps come later when a real +//! workload surfaces the need. +//! +//! Policy: **non-blocking, no queue**. If a permit isn't free right +//! now, the call returns `AcquireError::Overloaded` and the data-plane +//! HTTP layer translates that to a 503 with `Retry-After: 1`. Pushing +//! back hard beats letting requests pile up against a finite pool of +//! blocking threads (executor work runs under `spawn_blocking`). +//! +//! Configured via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var. +//! Default is 32 — comfortable for a single-node Pi, low enough that +//! a script storm doesn't park every blocking thread. + +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError}; + +/// Env var consulted by `from_env`. +pub const ENV_MAX_CONCURRENT: &str = "PICLOUD_MAX_CONCURRENT_EXECUTIONS"; + +/// Default cap when the env var is unset or invalid. +pub const DEFAULT_MAX_CONCURRENT: u32 = 32; + +/// `Retry-After` header value (seconds) returned alongside the 503 +/// when the gate refuses. Fixed for v1.1.0; later versions may compute +/// a smarter value from in-flight latency. +pub const DEFAULT_RETRY_AFTER_SECS: u32 = 1; + +/// Refused admission. The HTTP layer translates this to 503 with a +/// `Retry-After` header. +#[derive(Debug, Error)] +pub enum AcquireError { + #[error("at capacity (retry after {retry_after_secs}s)")] + Overloaded { retry_after_secs: u32 }, +} + +/// Global execution gate. Constructed once at orchestrator startup and +/// shared via `Arc`. Holds an inner `Arc` so permits are +/// owned (they release on drop independent of the gate's lifetime). +pub struct ExecutionGate { + permits: Arc, + max_permits: u32, +} + +impl ExecutionGate { + /// Construct with an explicit cap. Mostly for tests; production + /// uses `from_env`. + #[must_use] + pub fn new(max_permits: u32) -> Self { + Self { + permits: Arc::new(Semaphore::new(max_permits as usize)), + max_permits, + } + } + + /// Read `PICLOUD_MAX_CONCURRENT_EXECUTIONS` from the environment. + /// Falls back to `DEFAULT_MAX_CONCURRENT` on absence; warns and + /// falls back on parse failure or non-positive value. Mirrors the + /// `SandboxCeiling::from_env` ergonomics so operators see a + /// consistent shape across the env-tunables. + #[must_use] + pub fn from_env() -> Self { + let max = match std::env::var(ENV_MAX_CONCURRENT) { + Err(_) => DEFAULT_MAX_CONCURRENT, + Ok(v) => match v.parse::() { + Ok(n) if n > 0 => n, + Ok(_) => { + tracing::warn!( + env = ENV_MAX_CONCURRENT, + value = %v, + "value must be > 0; using default {DEFAULT_MAX_CONCURRENT}" + ); + DEFAULT_MAX_CONCURRENT + } + Err(e) => { + tracing::warn!( + env = ENV_MAX_CONCURRENT, + value = %v, + error = %e, + "invalid value; using default {DEFAULT_MAX_CONCURRENT}" + ); + DEFAULT_MAX_CONCURRENT + } + }, + }; + Self::new(max) + } + + /// Maximum concurrent permits this gate was configured for. Useful + /// for diagnostics / future metrics. + #[must_use] + pub fn max_permits(&self) -> u32 { + self.max_permits + } + + /// Non-blocking permit acquisition. Returns the owned permit on + /// success (drop releases the slot) or `AcquireError::Overloaded` + /// when saturated. Sync because the semaphore's non-blocking try is + /// sync — no runtime hop needed. + pub fn try_acquire(&self) -> Result { + self.permits + .clone() + .try_acquire_owned() + .map_err(|err| match err { + TryAcquireError::NoPermits | TryAcquireError::Closed => AcquireError::Overloaded { + retry_after_secs: DEFAULT_RETRY_AFTER_SECS, + }, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn acquire_succeeds_under_capacity() { + let gate = ExecutionGate::new(2); + let _p1 = gate.try_acquire().expect("first permit available"); + let _p2 = gate.try_acquire().expect("second permit available"); + } + + #[test] + fn acquire_overloaded_when_saturated() { + let gate = ExecutionGate::new(1); + let _p = gate.try_acquire().expect("first permit available"); + let AcquireError::Overloaded { retry_after_secs } = gate + .try_acquire() + .expect_err("second permit must be refused"); + assert!(retry_after_secs > 0, "retry-after must be positive"); + } + + #[test] + fn permit_drop_releases_slot() { + let gate = ExecutionGate::new(1); + { + let _p = gate.try_acquire().expect("first permit available"); + } + gate.try_acquire() + .expect("slot must be returned after permit drops"); + } + + #[test] + fn max_permits_exposed() { + let gate = ExecutionGate::new(7); + assert_eq!(gate.max_permits(), 7); + } +} diff --git a/crates/orchestrator-core/src/lib.rs b/crates/orchestrator-core/src/lib.rs index 3d07e4e..11c0a34 100644 --- a/crates/orchestrator-core/src/lib.rs +++ b/crates/orchestrator-core/src/lib.rs @@ -10,9 +10,11 @@ pub mod api; pub mod client; +pub mod gate; pub mod resolver; pub mod routing; pub use api::{data_plane_router, user_routes_router, DataPlaneState}; pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient}; +pub use gate::{AcquireError, ExecutionGate}; pub use resolver::{ResolverError, ScriptResolver}; diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 9ae47c5..1203f8d 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -22,7 +22,7 @@ use picloud_manager_core::{ }; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; use picloud_orchestrator_core::{ - data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient, + data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient, }; use picloud_shared::{ ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, @@ -129,7 +129,10 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle( script_repo.clone(), ))); - let executor = Arc::new(LocalExecutorClient::new(engine.clone())); + // Single global gate — overflow is rejected with 503 + Retry-After. + // See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`. + let gate = Arc::new(ExecutionGate::from_env()); + let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate)); let admin = AdminState { repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),