feat(orchestrator-core): ExecutionGate + 503/Retry-After on overflow
Adds a single global concurrency cap on the data-plane dispatch path:
- orchestrator-core::gate::ExecutionGate wraps tokio::Semaphore.
Non-blocking try_acquire — no queue. PICLOUD_MAX_CONCURRENT_EXECUTIONS
env var (default 32) sets the cap.
- LocalExecutorClient acquires a permit before spawn_blocking; the
permit drops with the future so the slot returns automatically.
- On refusal, ExecError::Overloaded { retry_after_secs: 1 } surfaces
upward. ApiError::IntoResponse already maps that to 503 with a
Retry-After header (landed in the previous commit alongside the
variant itself).
- picloud binary constructs the gate once at build_app and shares it
with LocalExecutorClient.
The cap exists so a Rhai script storm can't drain the blocking-thread
pool — pushing back hard beats letting requests pile up against a
finite worker count. Per-app / per-script caps stay deferred until a
real workload demands them.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,8 @@ use std::time::Duration;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse};
|
use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse};
|
||||||
|
|
||||||
|
use crate::gate::{AcquireError, ExecutionGate};
|
||||||
|
|
||||||
/// Maximum wall-clock time we'll wait for a single invocation, regardless
|
/// Maximum wall-clock time we'll wait for a single invocation, regardless
|
||||||
/// of the per-script `timeout_seconds`. Provides a hard ceiling on
|
/// of the per-script `timeout_seconds`. Provides a hard ceiling on
|
||||||
/// resource usage independent of misconfigured scripts.
|
/// resource usage independent of misconfigured scripts.
|
||||||
@@ -30,14 +32,19 @@ pub trait ExecutorClient: Send + Sync {
|
|||||||
/// `executor-core::Engine::execute` is synchronous; we offload it to a
|
/// `executor-core::Engine::execute` is synchronous; we offload it to a
|
||||||
/// blocking thread so it doesn't park a Tokio worker, and apply the
|
/// blocking thread so it doesn't park a Tokio worker, and apply the
|
||||||
/// wall-clock timeout here.
|
/// wall-clock timeout here.
|
||||||
|
///
|
||||||
|
/// Holds an `ExecutionGate` and acquires a permit before `spawn_blocking`
|
||||||
|
/// so a script storm can't drain the blocking-thread pool. The permit
|
||||||
|
/// drops with the future, returning the slot.
|
||||||
pub struct LocalExecutorClient {
|
pub struct LocalExecutorClient {
|
||||||
engine: Arc<Engine>,
|
engine: Arc<Engine>,
|
||||||
|
gate: Arc<ExecutionGate>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalExecutorClient {
|
impl LocalExecutorClient {
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn new(engine: Arc<Engine>) -> Self {
|
pub fn new(engine: Arc<Engine>, gate: Arc<ExecutionGate>) -> Self {
|
||||||
Self { engine }
|
Self { engine, gate }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,6 +56,18 @@ impl ExecutorClient for LocalExecutorClient {
|
|||||||
req: ExecRequest,
|
req: ExecRequest,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
) -> Result<ExecResponse, ExecError> {
|
) -> Result<ExecResponse, ExecError> {
|
||||||
|
// Acquire before spending any wall-clock budget. The permit is
|
||||||
|
// held until this future returns; spawn_blocking inherits the
|
||||||
|
// gating via the captured `_permit`.
|
||||||
|
let _permit =
|
||||||
|
self.gate
|
||||||
|
.try_acquire()
|
||||||
|
.map_err(
|
||||||
|
|AcquireError::Overloaded { retry_after_secs }| ExecError::Overloaded {
|
||||||
|
retry_after_secs,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
let timeout = timeout.min(HARD_TIMEOUT_CAP);
|
let timeout = timeout.min(HARD_TIMEOUT_CAP);
|
||||||
let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX);
|
let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX);
|
||||||
|
|
||||||
|
|||||||
154
crates/orchestrator-core/src/gate.rs
Normal file
154
crates/orchestrator-core/src/gate.rs
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
//! Global concurrency gate for the data plane.
|
||||||
|
//!
|
||||||
|
//! Wraps a single `tokio::sync::Semaphore` so the executor can refuse
|
||||||
|
//! admission immediately when too many invocations are already in
|
||||||
|
//! flight. Designed for v1.1.0's single-node MVP — one cap across all
|
||||||
|
//! apps and scripts. Per-app or per-script caps come later when a real
|
||||||
|
//! workload surfaces the need.
|
||||||
|
//!
|
||||||
|
//! Policy: **non-blocking, no queue**. If a permit isn't free right
|
||||||
|
//! now, the call returns `AcquireError::Overloaded` and the data-plane
|
||||||
|
//! HTTP layer translates that to a 503 with `Retry-After: 1`. Pushing
|
||||||
|
//! back hard beats letting requests pile up against a finite pool of
|
||||||
|
//! blocking threads (executor work runs under `spawn_blocking`).
|
||||||
|
//!
|
||||||
|
//! Configured via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var.
|
||||||
|
//! Default is 32 — comfortable for a single-node Pi, low enough that
|
||||||
|
//! a script storm doesn't park every blocking thread.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
|
||||||
|
|
||||||
|
/// Env var consulted by `from_env`.
|
||||||
|
pub const ENV_MAX_CONCURRENT: &str = "PICLOUD_MAX_CONCURRENT_EXECUTIONS";
|
||||||
|
|
||||||
|
/// Default cap when the env var is unset or invalid.
|
||||||
|
pub const DEFAULT_MAX_CONCURRENT: u32 = 32;
|
||||||
|
|
||||||
|
/// `Retry-After` header value (seconds) returned alongside the 503
|
||||||
|
/// when the gate refuses. Fixed for v1.1.0; later versions may compute
|
||||||
|
/// a smarter value from in-flight latency.
|
||||||
|
pub const DEFAULT_RETRY_AFTER_SECS: u32 = 1;
|
||||||
|
|
||||||
|
/// Refused admission. The HTTP layer translates this to 503 with a
|
||||||
|
/// `Retry-After` header.
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum AcquireError {
|
||||||
|
#[error("at capacity (retry after {retry_after_secs}s)")]
|
||||||
|
Overloaded { retry_after_secs: u32 },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Global execution gate. Constructed once at orchestrator startup and
|
||||||
|
/// shared via `Arc`. Holds an inner `Arc<Semaphore>` so permits are
|
||||||
|
/// owned (they release on drop independent of the gate's lifetime).
|
||||||
|
pub struct ExecutionGate {
|
||||||
|
permits: Arc<Semaphore>,
|
||||||
|
max_permits: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ExecutionGate {
|
||||||
|
/// Construct with an explicit cap. Mostly for tests; production
|
||||||
|
/// uses `from_env`.
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(max_permits: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
permits: Arc::new(Semaphore::new(max_permits as usize)),
|
||||||
|
max_permits,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read `PICLOUD_MAX_CONCURRENT_EXECUTIONS` from the environment.
|
||||||
|
/// Falls back to `DEFAULT_MAX_CONCURRENT` on absence; warns and
|
||||||
|
/// falls back on parse failure or non-positive value. Mirrors the
|
||||||
|
/// `SandboxCeiling::from_env` ergonomics so operators see a
|
||||||
|
/// consistent shape across the env-tunables.
|
||||||
|
#[must_use]
|
||||||
|
pub fn from_env() -> Self {
|
||||||
|
let max = match std::env::var(ENV_MAX_CONCURRENT) {
|
||||||
|
Err(_) => DEFAULT_MAX_CONCURRENT,
|
||||||
|
Ok(v) => match v.parse::<u32>() {
|
||||||
|
Ok(n) if n > 0 => n,
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::warn!(
|
||||||
|
env = ENV_MAX_CONCURRENT,
|
||||||
|
value = %v,
|
||||||
|
"value must be > 0; using default {DEFAULT_MAX_CONCURRENT}"
|
||||||
|
);
|
||||||
|
DEFAULT_MAX_CONCURRENT
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
env = ENV_MAX_CONCURRENT,
|
||||||
|
value = %v,
|
||||||
|
error = %e,
|
||||||
|
"invalid value; using default {DEFAULT_MAX_CONCURRENT}"
|
||||||
|
);
|
||||||
|
DEFAULT_MAX_CONCURRENT
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
Self::new(max)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Maximum concurrent permits this gate was configured for. Useful
|
||||||
|
/// for diagnostics / future metrics.
|
||||||
|
#[must_use]
|
||||||
|
pub fn max_permits(&self) -> u32 {
|
||||||
|
self.max_permits
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Non-blocking permit acquisition. Returns the owned permit on
|
||||||
|
/// success (drop releases the slot) or `AcquireError::Overloaded`
|
||||||
|
/// when saturated. Sync because the semaphore's non-blocking try is
|
||||||
|
/// sync — no runtime hop needed.
|
||||||
|
pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
|
||||||
|
self.permits
|
||||||
|
.clone()
|
||||||
|
.try_acquire_owned()
|
||||||
|
.map_err(|err| match err {
|
||||||
|
TryAcquireError::NoPermits | TryAcquireError::Closed => AcquireError::Overloaded {
|
||||||
|
retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn acquire_succeeds_under_capacity() {
|
||||||
|
let gate = ExecutionGate::new(2);
|
||||||
|
let _p1 = gate.try_acquire().expect("first permit available");
|
||||||
|
let _p2 = gate.try_acquire().expect("second permit available");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn acquire_overloaded_when_saturated() {
|
||||||
|
let gate = ExecutionGate::new(1);
|
||||||
|
let _p = gate.try_acquire().expect("first permit available");
|
||||||
|
let AcquireError::Overloaded { retry_after_secs } = gate
|
||||||
|
.try_acquire()
|
||||||
|
.expect_err("second permit must be refused");
|
||||||
|
assert!(retry_after_secs > 0, "retry-after must be positive");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn permit_drop_releases_slot() {
|
||||||
|
let gate = ExecutionGate::new(1);
|
||||||
|
{
|
||||||
|
let _p = gate.try_acquire().expect("first permit available");
|
||||||
|
}
|
||||||
|
gate.try_acquire()
|
||||||
|
.expect("slot must be returned after permit drops");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn max_permits_exposed() {
|
||||||
|
let gate = ExecutionGate::new(7);
|
||||||
|
assert_eq!(gate.max_permits(), 7);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -10,9 +10,11 @@
|
|||||||
|
|
||||||
pub mod api;
|
pub mod api;
|
||||||
pub mod client;
|
pub mod client;
|
||||||
|
pub mod gate;
|
||||||
pub mod resolver;
|
pub mod resolver;
|
||||||
pub mod routing;
|
pub mod routing;
|
||||||
|
|
||||||
pub use api::{data_plane_router, user_routes_router, DataPlaneState};
|
pub use api::{data_plane_router, user_routes_router, DataPlaneState};
|
||||||
pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient};
|
pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient};
|
||||||
|
pub use gate::{AcquireError, ExecutionGate};
|
||||||
pub use resolver::{ResolverError, ScriptResolver};
|
pub use resolver::{ResolverError, ScriptResolver};
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ use picloud_manager_core::{
|
|||||||
};
|
};
|
||||||
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
|
use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable};
|
||||||
use picloud_orchestrator_core::{
|
use picloud_orchestrator_core::{
|
||||||
data_plane_router, user_routes_router, DataPlaneState, LocalExecutorClient,
|
data_plane_router, user_routes_router, DataPlaneState, ExecutionGate, LocalExecutorClient,
|
||||||
};
|
};
|
||||||
use picloud_shared::{
|
use picloud_shared::{
|
||||||
ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
|
ExecutionLogSink, ScriptValidator, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
|
||||||
@@ -129,7 +129,10 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|||||||
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(
|
let resolver = Arc::new(RepoResolver::new(PostgresScriptRepoHandle(
|
||||||
script_repo.clone(),
|
script_repo.clone(),
|
||||||
)));
|
)));
|
||||||
let executor = Arc::new(LocalExecutorClient::new(engine.clone()));
|
// Single global gate — overflow is rejected with 503 + Retry-After.
|
||||||
|
// See `ExecutionGate` docs and `PICLOUD_MAX_CONCURRENT_EXECUTIONS`.
|
||||||
|
let gate = Arc::new(ExecutionGate::from_env());
|
||||||
|
let executor = Arc::new(LocalExecutorClient::new(engine.clone(), gate));
|
||||||
|
|
||||||
let admin = AdminState {
|
let admin = AdminState {
|
||||||
repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
|
repo: Arc::new(PostgresScriptRepoHandle(script_repo.clone())),
|
||||||
|
|||||||
Reference in New Issue
Block a user