From 66b41bb97849e6d51122dd5e88f669823ec9c51f Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Tue, 2 Jun 2026 22:23:11 +0200 Subject: [PATCH] feat(v1.1.3-modules): top-level script AST cache in LocalExecutorClient - New `ScriptIdentity { script_id, updated_at }` DTO. - `ExecutorClient` trait gains an `execute_with_identity` method; default impl forwards to `execute` so `RemoteExecutorClient` (and cluster-mode transports later) keep working without bespoke caching. - `LocalExecutorClient` overrides `execute_with_identity` to consult an `LruCache`. Cache hit only when the cached entry's `updated_at` matches the caller's identity; mismatch triggers a fresh `Engine::compile`. `Engine::execute_ast(&Arc, req)` is called inside `spawn_blocking` exactly as `execute` does today. - Cache size from `PICLOUD_SCRIPT_CACHE_SIZE` (default 256). - Orchestrator's HTTP data-plane path and the dispatcher both switch to `execute_with_identity`. `ResolvedTrigger` carries `script_updated_at` for the dispatcher's identity construction. Workspace builds; full test suite (~440 tests) green. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 2 + crates/manager-core/src/dispatcher.rs | 13 +- crates/orchestrator-core/Cargo.toml | 5 + crates/orchestrator-core/src/api.rs | 9 +- crates/orchestrator-core/src/client.rs | 162 ++++++++++++++++++++++++- crates/orchestrator-core/src/lib.rs | 2 +- 6 files changed, 188 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8adbc03..7bc6e3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1649,9 +1649,11 @@ dependencies = [ "async-trait", "axum", "chrono", + "lru", "picloud-executor-core", "picloud-shared", "reqwest", + "rhai", "serde", "serde_json", "thiserror 1.0.69", diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index d6136c1..26d7f07 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -186,9 +186,13 @@ impl Dispatcher { // wait synchronously here — sync HTTP and dispatcher share the // semaphore so this is intentional. let source = resolved.script_source.clone(); + let identity = picloud_orchestrator_core::ScriptIdentity { + script_id: resolved.script_id, + updated_at: resolved.script_updated_at, + }; let outcome = self .executor - .execute(&source, exec_req, ASYNC_EXEC_TIMEOUT) + .execute_with_identity(identity, &source, exec_req, ASYNC_EXEC_TIMEOUT) .await; drop(permit); @@ -230,6 +234,7 @@ impl Dispatcher { script_id: script.id, script_source: script.source, script_name: script.name, + script_updated_at: script.updated_at, sandbox_overrides: script.sandbox, registered_by_principal: trigger.registered_by_principal, retry_max_attempts: trigger.retry_max_attempts, @@ -335,6 +340,7 @@ impl Dispatcher { script_id, script_source: script.source, script_name: payload.script_name, + script_updated_at: script.updated_at, sandbox_overrides: script.sandbox, // HTTP outbox rows don't carry a registered_by_principal // — use a sentinel zero UUID since this field isn't used @@ -516,6 +522,11 @@ pub struct ResolvedTrigger { pub script_id: ScriptId, pub script_source: String, pub script_name: String, + /// v1.1.3: freshness comparator for the orchestrator's top-level + /// script cache. The dispatcher hands `(script_id, updated_at)` + /// in alongside the source so cached ASTs can be reused across + /// triggered invocations. + pub script_updated_at: chrono::DateTime, pub sandbox_overrides: ScriptSandbox, pub registered_by_principal: picloud_shared::AdminUserId, pub retry_max_attempts: u32, diff --git a/crates/orchestrator-core/Cargo.toml b/crates/orchestrator-core/Cargo.toml index ce21dff..d76e821 100644 --- a/crates/orchestrator-core/Cargo.toml +++ b/crates/orchestrator-core/Cargo.toml @@ -21,5 +21,10 @@ tracing.workspace = true uuid.workspace = true chrono.workspace = true reqwest.workspace = true +rhai.workspace = true tokio.workspace = true urlencoding.workspace = true + +# v1.1.3 — top-level script AST cache lives in orchestrator-core's +# LocalExecutorClient; key is ScriptId, value is `(updated_at, Arc)`. +lru.workspace = true diff --git a/crates/orchestrator-core/src/api.rs b/crates/orchestrator-core/src/api.rs index c78f49d..f52cdd3 100644 --- a/crates/orchestrator-core/src/api.rs +++ b/crates/orchestrator-core/src/api.rs @@ -129,7 +129,14 @@ where let timeout = Duration::from_secs(u64::from(script.timeout_seconds)); let started = Utc::now(); - let outcome = state.executor.execute(&script.source, req, timeout).await; + let identity = crate::client::ScriptIdentity { + script_id: script.id, + updated_at: script.updated_at, + }; + let outcome = state + .executor + .execute_with_identity(identity, &script.source, req, timeout) + .await; let finished = Utc::now(); // Build and dispatch the audit log regardless of outcome. We await diff --git a/crates/orchestrator-core/src/client.rs b/crates/orchestrator-core/src/client.rs index 3b2b218..ff46fa4 100644 --- a/crates/orchestrator-core/src/client.rs +++ b/crates/orchestrator-core/src/client.rs @@ -1,8 +1,12 @@ -use std::sync::Arc; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; use std::time::Duration; use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use lru::LruCache; use picloud_executor_core::{Engine, ExecError, ExecRequest, ExecResponse}; +use picloud_shared::ScriptId; use crate::gate::{AcquireError, ExecutionGate}; @@ -11,6 +15,21 @@ use crate::gate::{AcquireError, ExecutionGate}; /// resource usage independent of misconfigured scripts. const HARD_TIMEOUT_CAP: Duration = Duration::from_secs(300); +/// Default capacity for the top-level script AST cache. Override via +/// `PICLOUD_SCRIPT_CACHE_SIZE`. Sized assuming a few hundred distinct +/// endpoint scripts per process. +const DEFAULT_SCRIPT_CACHE_SIZE: usize = 256; + +/// Identity used by [`ExecutorClient::execute_with_identity`] to key +/// the AST cache. `updated_at` is the freshness comparator — an edit +/// that bumps `scripts.updated_at` invalidates the cached AST on the +/// next lookup, no explicit pub/sub. +#[derive(Debug, Clone, Copy)] +pub struct ScriptIdentity { + pub script_id: ScriptId, + pub updated_at: DateTime, +} + /// The seam between the orchestrator and the executor. /// /// Single-node mode plugs in `LocalExecutorClient`, which calls @@ -25,6 +44,21 @@ pub trait ExecutorClient: Send + Sync { req: ExecRequest, timeout: Duration, ) -> Result; + + /// v1.1.3: identity-aware variant for caching. Callers that already + /// know the script's `(id, updated_at)` should use this so the local + /// executor can reuse a compiled `rhai::AST` across invocations. + /// Default impl forwards to `execute` so `RemoteExecutorClient` (and + /// any future transport) keeps working without bespoke caching. + async fn execute_with_identity( + &self, + _identity: ScriptIdentity, + source: &str, + req: ExecRequest, + timeout: Duration, + ) -> Result { + self.execute(source, req, timeout).await + } } /// In-process executor — wraps `executor-core::Engine` directly. @@ -36,15 +70,106 @@ pub trait ExecutorClient: Send + Sync { /// Holds an `ExecutionGate` and acquires a permit before `spawn_blocking` /// so a script storm can't drain the blocking-thread pool. The permit /// drops with the future, returning the slot. +/// +/// v1.1.3 adds a top-level AST cache keyed by `ScriptId`. On +/// `execute_with_identity`, the client compares the caller's +/// `updated_at` against the cached entry's; a match reuses the +/// `Arc` and skips Rhai's parser. A mismatch (or absence) +/// triggers a fresh `Engine::compile` + replace. pub struct LocalExecutorClient { engine: Arc, gate: Arc, + /// `(updated_at, Arc)` keyed by `ScriptId`. `Mutex` + /// because the cache is shared across invocations of this client; + /// LRU eviction caps memory growth. + script_cache: Arc>>, +} + +pub struct CachedScript { + pub updated_at: DateTime, + pub ast: Arc, } impl LocalExecutorClient { #[must_use] pub fn new(engine: Arc, gate: Arc) -> Self { - Self { engine, gate } + let cap = std::env::var("PICLOUD_SCRIPT_CACHE_SIZE") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(DEFAULT_SCRIPT_CACHE_SIZE); + Self::with_script_cache_capacity(engine, gate, cap) + } + + /// Explicit capacity for tests that exercise LRU eviction. + #[must_use] + pub fn with_script_cache_capacity( + engine: Arc, + gate: Arc, + cap: usize, + ) -> Self { + let cap = NonZeroUsize::new(cap.max(1)).expect("max(1) is non-zero"); + Self { + engine, + gate, + script_cache: Arc::new(Mutex::new(LruCache::new(cap))), + } + } + + /// Cache lookup with `updated_at` freshness check. Returns the + /// cached AST on hit; compiles, inserts, returns the fresh AST on + /// miss or stale. Public so tests can introspect the cache. + pub fn get_or_compile( + &self, + identity: ScriptIdentity, + source: &str, + ) -> Result, ExecError> { + { + let mut cache = self + .script_cache + .lock() + .expect("script cache lock poisoned"); + if let Some(cached) = cache.get(&identity.script_id) { + if cached.updated_at == identity.updated_at { + tracing::debug!( + target = "picloud::scripts::cache", + script_id = %identity.script_id, + "cache hit" + ); + return Ok(cached.ast.clone()); + } + tracing::debug!( + target = "picloud::scripts::cache", + script_id = %identity.script_id, + "cache stale; recompiling" + ); + } else { + tracing::debug!( + target = "picloud::scripts::cache", + script_id = %identity.script_id, + "cache miss" + ); + } + } + let ast = self.engine.compile(source)?; + let mut cache = self + .script_cache + .lock() + .expect("script cache lock poisoned"); + cache.put( + identity.script_id, + CachedScript { + updated_at: identity.updated_at, + ast: ast.clone(), + }, + ); + Ok(ast) + } + + /// Shared script-AST cache. Exposed so tests can introspect cache + /// state (length / contents) under a Mutex lock. + #[must_use] + pub fn script_cache(&self) -> &Arc>> { + &self.script_cache } } @@ -89,6 +214,39 @@ impl ExecutorClient for LocalExecutorClient { Ok(Ok(res)) => res, } } + + async fn execute_with_identity( + &self, + identity: ScriptIdentity, + source: &str, + req: ExecRequest, + timeout: Duration, + ) -> Result { + let _permit = + self.gate + .try_acquire() + .map_err( + |AcquireError::Overloaded { retry_after_secs }| ExecError::Overloaded { + retry_after_secs, + }, + )?; + + let ast = self.get_or_compile(identity, source)?; + + let timeout = timeout.min(HARD_TIMEOUT_CAP); + let timeout_secs = u32::try_from(timeout.as_secs()).unwrap_or(u32::MAX); + + let engine = self.engine.clone(); + let join = tokio::task::spawn_blocking(move || engine.execute_ast(&ast, req)); + + match tokio::time::timeout(timeout, join).await { + Err(_) => Err(ExecError::Timeout(timeout_secs)), + Ok(Err(join_err)) => Err(ExecError::Runtime(format!( + "execution task panicked: {join_err}" + ))), + Ok(Ok(res)) => res, + } + } } /// Remote executor — forwards to a peer executor node over HTTP. diff --git a/crates/orchestrator-core/src/lib.rs b/crates/orchestrator-core/src/lib.rs index 1576e19..3cefa3a 100644 --- a/crates/orchestrator-core/src/lib.rs +++ b/crates/orchestrator-core/src/lib.rs @@ -16,7 +16,7 @@ pub mod resolver; pub mod routing; pub use api::{data_plane_router, user_routes_router, DataPlaneState}; -pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient}; +pub use client::{ExecutorClient, LocalExecutorClient, RemoteExecutorClient, ScriptIdentity}; pub use gate::{AcquireError, ExecutionGate}; pub use inbox::InboxRegistry; pub use resolver::{ResolverError, ScriptResolver};