Files
PiCloud/crates/orchestrator-core/src/gate.rs
MechaCat02 098e18a989 chore(clippy): silence three v1.1.0-foundation lints
- sdk/bridge.rs: drop #[must_use] on the bridge fns — `Dynamic` and
    `serde_json::Value` are both #[must_use] already; the wrapper
    attribute is double-must-use noise.
  - api.rs IntoResponse: hoist `use ApiError as E;` above the early
    Overloaded branch so `E::Exec(...)` works in the if-let too
    (clippy::items_after_statements).
  - gate.rs test: bind the returned permit with `let _ =` so the
    OwnedSemaphorePermit doesn't trip unused-must-use.

No behaviour change. Caught by `cargo clippy --all-targets
--all-features -- -D warnings`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 19:00:35 +02:00

156 lines
5.4 KiB
Rust

//! Global concurrency gate for the data plane.
//!
//! Wraps a single `tokio::sync::Semaphore` so the executor can refuse
//! admission immediately when too many invocations are already in
//! flight. Designed for v1.1.0's single-node MVP — one cap across all
//! apps and scripts. Per-app or per-script caps come later when a real
//! workload surfaces the need.
//!
//! Policy: **non-blocking, no queue**. If a permit isn't free right
//! now, the call returns `AcquireError::Overloaded` and the data-plane
//! HTTP layer translates that to a 503 with `Retry-After: 1`. Pushing
//! back hard beats letting requests pile up against a finite pool of
//! blocking threads (executor work runs under `spawn_blocking`).
//!
//! Configured via the `PICLOUD_MAX_CONCURRENT_EXECUTIONS` env var.
//! Default is 32 — comfortable for a single-node Pi, low enough that
//! a script storm doesn't park every blocking thread.
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
/// Env var consulted by `from_env`.
pub const ENV_MAX_CONCURRENT: &str = "PICLOUD_MAX_CONCURRENT_EXECUTIONS";
/// Default cap when the env var is unset or invalid.
pub const DEFAULT_MAX_CONCURRENT: u32 = 32;
/// `Retry-After` header value (seconds) returned alongside the 503
/// when the gate refuses. Fixed for v1.1.0; later versions may compute
/// a smarter value from in-flight latency.
pub const DEFAULT_RETRY_AFTER_SECS: u32 = 1;
/// Refused admission. The HTTP layer translates this to 503 with a
/// `Retry-After` header.
#[derive(Debug, Error)]
pub enum AcquireError {
#[error("at capacity (retry after {retry_after_secs}s)")]
Overloaded { retry_after_secs: u32 },
}
/// Global execution gate. Constructed once at orchestrator startup and
/// shared via `Arc`. Holds an inner `Arc<Semaphore>` so permits are
/// owned (they release on drop independent of the gate's lifetime).
pub struct ExecutionGate {
permits: Arc<Semaphore>,
max_permits: u32,
}
impl ExecutionGate {
/// Construct with an explicit cap. Mostly for tests; production
/// uses `from_env`.
#[must_use]
pub fn new(max_permits: u32) -> Self {
Self {
permits: Arc::new(Semaphore::new(max_permits as usize)),
max_permits,
}
}
/// Read `PICLOUD_MAX_CONCURRENT_EXECUTIONS` from the environment.
/// Falls back to `DEFAULT_MAX_CONCURRENT` on absence; warns and
/// falls back on parse failure or non-positive value. Mirrors the
/// `SandboxCeiling::from_env` ergonomics so operators see a
/// consistent shape across the env-tunables.
#[must_use]
pub fn from_env() -> Self {
let max = match std::env::var(ENV_MAX_CONCURRENT) {
Err(_) => DEFAULT_MAX_CONCURRENT,
Ok(v) => match v.parse::<u32>() {
Ok(n) if n > 0 => n,
Ok(_) => {
tracing::warn!(
env = ENV_MAX_CONCURRENT,
value = %v,
"value must be > 0; using default {DEFAULT_MAX_CONCURRENT}"
);
DEFAULT_MAX_CONCURRENT
}
Err(e) => {
tracing::warn!(
env = ENV_MAX_CONCURRENT,
value = %v,
error = %e,
"invalid value; using default {DEFAULT_MAX_CONCURRENT}"
);
DEFAULT_MAX_CONCURRENT
}
},
};
Self::new(max)
}
/// Maximum concurrent permits this gate was configured for. Useful
/// for diagnostics / future metrics.
#[must_use]
pub fn max_permits(&self) -> u32 {
self.max_permits
}
/// Non-blocking permit acquisition. Returns the owned permit on
/// success (drop releases the slot) or `AcquireError::Overloaded`
/// when saturated. Sync because the semaphore's non-blocking try is
/// sync — no runtime hop needed.
pub fn try_acquire(&self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.permits
.clone()
.try_acquire_owned()
.map_err(|err| match err {
TryAcquireError::NoPermits | TryAcquireError::Closed => AcquireError::Overloaded {
retry_after_secs: DEFAULT_RETRY_AFTER_SECS,
},
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn acquire_succeeds_under_capacity() {
let gate = ExecutionGate::new(2);
let _p1 = gate.try_acquire().expect("first permit available");
let _p2 = gate.try_acquire().expect("second permit available");
}
#[test]
fn acquire_overloaded_when_saturated() {
let gate = ExecutionGate::new(1);
let _p = gate.try_acquire().expect("first permit available");
let AcquireError::Overloaded { retry_after_secs } = gate
.try_acquire()
.expect_err("second permit must be refused");
assert!(retry_after_secs > 0, "retry-after must be positive");
}
#[test]
fn permit_drop_releases_slot() {
let gate = ExecutionGate::new(1);
{
let _p = gate.try_acquire().expect("first permit available");
}
let _ = gate
.try_acquire()
.expect("slot must be returned after permit drops");
}
#[test]
fn max_permits_exposed() {
let gate = ExecutionGate::new(7);
assert_eq!(gate.max_permits(), 7);
}
}