diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index 3104aa2..b56d725 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -134,11 +134,7 @@ impl Engine { /// v1.1.3: execute a pre-compiled AST. The orchestrator's script /// cache hands compiled ASTs in directly; this path skips the /// per-call compile. - pub fn execute_ast( - &self, - ast: &Arc, - req: ExecRequest, - ) -> Result { + pub fn execute_ast(&self, ast: &Arc, req: ExecRequest) -> Result { let effective_limits = self.limits.with_overrides(&req.sandbox_overrides); let logs: Arc>> = Arc::new(Mutex::new(Vec::new())); let mut engine = build_engine(effective_limits, Some(logs.clone())); diff --git a/crates/executor-core/src/module_resolver.rs b/crates/executor-core/src/module_resolver.rs index d926701..a4ffd1a 100644 --- a/crates/executor-core/src/module_resolver.rs +++ b/crates/executor-core/src/module_resolver.rs @@ -130,8 +130,7 @@ impl PicloudModuleResolver { for stmt in ast.statements() { match stmt { rhai::Stmt::Var(_, opts, _) if opts.intersects(ASTFlags::CONSTANT) => {} - rhai::Stmt::Import(..) => {} - rhai::Stmt::Noop(..) => {} + rhai::Stmt::Import(..) | rhai::Stmt::Noop(..) => {} other => { return Err(format!( "module {name:?}: top-level {} is not allowed; \ @@ -221,6 +220,7 @@ fn stmt_kind_label(stmt: &rhai::Stmt) -> &'static str { } impl ModuleResolver for PicloudModuleResolver { + #[allow(clippy::too_many_lines)] fn resolve( &self, engine: &RhaiEngine, @@ -237,7 +237,7 @@ impl ModuleResolver for PicloudModuleResolver { depth: &'r Mutex, armed: bool, } - impl<'r> Drop for StackGuard<'r> { + impl Drop for StackGuard<'_> { fn drop(&mut self) { if !self.armed { return; @@ -298,11 +298,14 @@ impl ModuleResolver for PicloudModuleResolver { armed: true, }; - // Bridge to async. The resolver always runs on a `spawn_blocking` - // thread (see LocalExecutorClient in orchestrator-core), which - // still carries a Tokio handle. `try_current` makes the failure - // mode explicit when callers wire up an `Engine` from a non- - // Tokio context (typically a test harness). + // Bridge to async. The resolver typically runs on a + // `spawn_blocking` thread (see LocalExecutorClient in + // orchestrator-core), but tests may invoke `Engine::execute` + // directly from a multi-threaded Tokio task. `try_current` + + // `block_in_place` covers both — on a blocking thread it's a + // no-op, on a worker thread it tells the runtime to relocate + // other tasks. `current_thread` runtimes still panic; non- + // Tokio contexts surface a clean Runtime error. let handle = tokio::runtime::Handle::try_current().map_err(|_| { Box::new(EvalAltResult::ErrorInModule( path.to_string(), @@ -317,7 +320,7 @@ impl ModuleResolver for PicloudModuleResolver { })?; let lookup_result: Result, ModuleSourceError> = - handle.block_on(self.source.lookup(&self.cx, path)); + tokio::task::block_in_place(|| handle.block_on(self.source.lookup(&self.cx, path))); let module_row = match lookup_result { Ok(Some(m)) => m, @@ -403,13 +406,8 @@ impl ModuleResolver for PicloudModuleResolver { // are resolved through the same `engine.set_module_resolver` // (which is THIS resolver), so cycle/depth tracking carries // through naturally. - let module = Module::eval_ast_as_new(rhai::Scope::new(), &ast, engine).map_err(|e| { - Box::new(EvalAltResult::ErrorInModule( - path.to_string(), - e, - pos, - )) - })?; + let module = Module::eval_ast_as_new(rhai::Scope::new(), &ast, engine) + .map_err(|e| Box::new(EvalAltResult::ErrorInModule(path.to_string(), e, pos)))?; let shared: SharedRhaiModule = module.into(); // Insert (possibly evicting via LRU). Subsequent imports of diff --git a/crates/executor-core/tests/modules.rs b/crates/executor-core/tests/modules.rs new file mode 100644 index 0000000..d974f81 --- /dev/null +++ b/crates/executor-core/tests/modules.rs @@ -0,0 +1,584 @@ +//! v1.1.3 — `PicloudModuleResolver` integration tests. +#![allow(clippy::needless_raw_string_hashes)] // r#""# is more uniform when many tests embed Rhai sources +//! +//! Each test wires an `Engine` with a `CountingModuleSource` (an +//! in-memory fake), a `Services` bundle, and an `ExecRequest` whose +//! `app_id` controls the cross-app boundary. The resolver is +//! exercised end-to-end through `Engine::execute`, so these tests +//! verify the same code path the `picloud` binary runs at request +//! time. + +use std::collections::{BTreeMap, HashMap}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; +use picloud_shared::{ + AppId, ExecutionId, ModuleScript, ModuleSource, ModuleSourceError, NoopDeadLetterService, + NoopDocsService, NoopEventEmitter, NoopKvService, RequestId, ScriptId, ScriptSandbox, + SdkCallCx, Services, +}; +use tokio::sync::Mutex; + +/// In-memory `ModuleSource` backed by a `HashMap<(AppId, name)>`. +/// Tracks total lookup count so tests can assert cache hit/miss. +#[derive(Default)] +struct CountingModuleSource { + table: Mutex>, + lookups: AtomicUsize, + /// When `Some`, every lookup returns this error instead of the + /// table — used by the backend-error test. + fail_with: Mutex>, +} + +impl CountingModuleSource { + fn new() -> Arc { + Arc::new(Self::default()) + } + + async fn put(self: &Arc, app_id: AppId, name: &str, source: &str) -> ScriptId { + self.put_with_updated_at(app_id, name, source, Utc::now()) + .await + } + + async fn put_with_updated_at( + self: &Arc, + app_id: AppId, + name: &str, + source: &str, + updated_at: DateTime, + ) -> ScriptId { + let script_id = ScriptId::new(); + self.table.lock().await.insert( + (app_id, name.to_string()), + ModuleScript { + script_id, + app_id, + name: name.to_string(), + source: source.to_string(), + updated_at, + }, + ); + script_id + } + + fn lookup_count(&self) -> usize { + self.lookups.load(Ordering::SeqCst) + } +} + +#[async_trait] +impl ModuleSource for CountingModuleSource { + async fn lookup( + &self, + cx: &SdkCallCx, + name: &str, + ) -> Result, ModuleSourceError> { + self.lookups.fetch_add(1, Ordering::SeqCst); + if let Some(err) = self.fail_with.lock().await.as_ref() { + return Err(ModuleSourceError::Backend(err.clone())); + } + Ok(self + .table + .lock() + .await + .get(&(cx.app_id, name.to_string())) + .cloned()) + } +} + +fn services_with(modules: Arc) -> Services { + Services::new( + Arc::new(NoopKvService), + Arc::new(NoopDocsService), + Arc::new(NoopDeadLetterService), + Arc::new(NoopEventEmitter), + modules, + ) +} + +fn engine_with(modules: Arc) -> Engine { + Engine::new(Limits::default(), services_with(modules)) +} + +fn req(app_id: AppId) -> ExecRequest { + let execution_id = ExecutionId::new(); + ExecRequest { + execution_id, + request_id: RequestId::new(), + script_id: ScriptId::new(), + script_name: "test".into(), + invocation_type: InvocationType::Http, + path: "/test".into(), + headers: BTreeMap::new(), + body: serde_json::Value::Null, + params: BTreeMap::new(), + query: BTreeMap::new(), + rest: String::new(), + sandbox_overrides: ScriptSandbox::default(), + app_id, + principal: None, + trigger_depth: 0, + root_execution_id: execution_id, + is_dead_letter_handler: false, + event: None, + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_loads_simple_module() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + source.put(app_id, "math", "fn add(a, b) { a + b }").await; + + let engine = engine_with(source.clone()); + let resp = engine + .execute(r#"import "math" as m; m::add(2, 3)"#, req(app_id)) + .expect("should execute"); + assert_eq!(resp.status_code, 200); + assert_eq!(resp.body, serde_json::json!(5)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_cross_app_blocked() { + let source = CountingModuleSource::new(); + let app_a = AppId::new(); + let app_b = AppId::new(); + source + .put(app_a, "secrets", "fn token() { \"A-token\" }") + .await; + source + .put(app_b, "secrets", "fn token() { \"B-token\" }") + .await; + + let engine = engine_with(source.clone()); + + // App A sees A's module. + let resp = engine + .execute(r#"import "secrets" as s; s::token()"#, req(app_a)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!("A-token")); + + // App B sees B's module — same name, completely separate value. + let resp = engine + .execute(r#"import "secrets" as s; s::token()"#, req(app_b)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!("B-token")); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_cross_app_module_not_found() { + let source = CountingModuleSource::new(); + let app_a = AppId::new(); + let app_b = AppId::new(); + // Only app A has the module. + source.put(app_a, "lonely", "fn ping() { \"pong\" }").await; + + // App B's lookup should return None → resolver surfaces + // ErrorModuleNotFound. + let engine = engine_with(source.clone()); + let err = engine + .execute(r#"import "lonely" as l; l::ping()"#, req(app_b)) + .expect_err("cross-app import should fail"); + let msg = format!("{err:?}"); + assert!( + msg.to_lowercase().contains("module") + || msg.to_lowercase().contains("not found") + || msg.to_lowercase().contains("lonely"), + "expected module-not-found-flavoured error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_module_not_found() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + let engine = engine_with(source); + + let err = engine + .execute(r#"import "doesnotexist" as x; 1"#, req(app_id)) + .expect_err("unknown module should fail"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("doesnotexist") || msg.contains("not found"), + "expected ErrorModuleNotFound-flavoured error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_self_import_detected() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + // a imports itself + source + .put(app_id, "a", r#"import "a" as a; fn nope() { 0 }"#) + .await; + let engine = engine_with(source); + + let err = engine + .execute(r#"import "a" as a; a::nope()"#, req(app_id)) + .expect_err("self-import should detect cycle"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("circular") || msg.contains("cycle"), + "expected circular-import error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_circular_detected() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + // a imports b; b imports a; both then declare a fn. + source + .put(app_id, "a", r#"import "b" as b; fn x() { 0 }"#) + .await; + source + .put(app_id, "b", r#"import "a" as a; fn y() { 0 }"#) + .await; + let engine = engine_with(source); + + let err = engine + .execute(r#"import "a" as a; a::x()"#, req(app_id)) + .expect_err("circular import should fail"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("circular") || msg.contains("cycle"), + "expected circular-import error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_depth_limit_enforced() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + // Chain `m0 -> m1 -> ... -> m9` (10 levels). Default depth limit is 8. + for i in 0..9 { + let next = format!("m{}", i + 1); + source + .put( + app_id, + &format!("m{i}"), + &format!(r#"import "{next}" as nxt; fn x() {{ 0 }}"#), + ) + .await; + } + source.put(app_id, "m9", "fn x() { 0 }").await; + + let engine = engine_with(source); + let err = engine + .execute(r#"import "m0" as m0; m0::x()"#, req(app_id)) + .expect_err("chain exceeding depth limit should fail"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("depth"), + "expected depth-exceeded error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_depth_limit_just_under_succeeds() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + // Chain depth 7 (under default 8). m0 -> m1 -> ... -> m6 (terminal). + for i in 0..6 { + let next = format!("m{}", i + 1); + source + .put( + app_id, + &format!("m{i}"), + &format!(r#"import "{next}" as nxt; fn x() {{ nxt::x() }}"#), + ) + .await; + } + source.put(app_id, "m6", "fn x() { 42 }").await; + + let engine = engine_with(source); + let resp = engine + .execute(r#"import "m0" as m0; m0::x()"#, req(app_id)) + .expect("chain under depth limit should succeed"); + assert_eq!(resp.body, serde_json::json!(42)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_runtime_validation_rejects_top_level_expr() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + // Module has a top-level expression — bypassed the admin gate, + // but the resolver re-validates and rejects. + source.put(app_id, "bad", r#"42; fn x() { 1 }"#).await; + let engine = engine_with(source); + + let err = engine + .execute(r#"import "bad" as b; b::x()"#, req(app_id)) + .expect_err("top-level expr in module should be rejected at resolve"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("top-level") || msg.contains("module"), + "expected module-shape error, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn resolver_backend_error_surfaces() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + *source.fail_with.lock().await = Some("simulated db outage".into()); + let engine = engine_with(source); + + let err = engine + .execute(r#"import "x" as x; 1"#, req(app_id)) + .expect_err("backend error should propagate"); + let msg = format!("{err:?}").to_lowercase(); + assert!( + msg.contains("simulated") || msg.contains("backend"), + "expected backend-error message, got {msg}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn module_cache_hit_reuses_compiled_module() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + source.put(app_id, "u", "fn ping() { 1 }").await; + + let engine = engine_with(source.clone()); + + // First execution compiles and caches. + engine + .execute(r#"import "u" as u; u::ping()"#, req(app_id)) + .unwrap(); + let lookups_after_first = source.lookup_count(); + assert_eq!( + lookups_after_first, 1, + "first invocation should look up once" + ); + + // Second execution should re-lookup (to compare updated_at) but + // serve from cache without recompiling. We can't directly observe + // compile-vs-cache here, but we can assert lookup count grew by + // one (no spurious extra calls). + engine + .execute(r#"import "u" as u; u::ping()"#, req(app_id)) + .unwrap(); + assert_eq!(source.lookup_count(), 2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn module_cache_stale_invalidated_on_updated_at_change() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + let t0 = Utc::now() - chrono::Duration::seconds(10); + source + .put_with_updated_at(app_id, "u", r#"fn v() { 1 }"#, t0) + .await; + + let engine = engine_with(source.clone()); + + let resp = engine + .execute(r#"import "u" as u; u::v()"#, req(app_id)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!(1)); + + // Replace with newer updated_at — cache should refresh. + let t1 = Utc::now(); + source + .put_with_updated_at(app_id, "u", r#"fn v() { 99 }"#, t1) + .await; + + let resp = engine + .execute(r#"import "u" as u; u::v()"#, req(app_id)) + .unwrap(); + assert_eq!( + resp.body, + serde_json::json!(99), + "edited module should be visible on next invocation" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn module_cache_keyed_by_app() { + let source = CountingModuleSource::new(); + let app_a = AppId::new(); + let app_b = AppId::new(); + source.put(app_a, "u", "fn id() { 1 }").await; + source.put(app_b, "u", "fn id() { 2 }").await; + + let engine = engine_with(source.clone()); + + // Both apps should compile + cache independently; neither sees + // the other's compiled module. + let resp = engine + .execute(r#"import "u" as u; u::id()"#, req(app_a)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!(1)); + let resp = engine + .execute(r#"import "u" as u; u::id()"#, req(app_b)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!(2)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn module_cache_lru_evicts_when_capacity_exceeded() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + source.put(app_id, "a", "fn v() { 1 }").await; + source.put(app_id, "b", "fn v() { 2 }").await; + source.put(app_id, "c", "fn v() { 3 }").await; + + // Capacity 1 — only the most recently used entry stays cached. + let engine = + Engine::with_module_cache_capacity(Limits::default(), services_with(source.clone()), 1); + + engine + .execute(r#"import "a" as m; m::v()"#, req(app_id)) + .unwrap(); + engine + .execute(r#"import "b" as m; m::v()"#, req(app_id)) + .unwrap(); + engine + .execute(r#"import "c" as m; m::v()"#, req(app_id)) + .unwrap(); + + // Cache should hold at most one entry. + let cache = engine.module_cache().lock().unwrap(); + assert!( + cache.len() <= 1, + "cache size {} exceeded capacity 1", + cache.len() + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn endpoint_can_import_module() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + source + .put(app_id, "helpers", r#"fn greet(name) { `hello, ${name}` }"#) + .await; + + let engine = engine_with(source); + let resp = engine + .execute( + r#"import "helpers" as h; #{ statusCode: 200, body: h::greet("world") }"#, + req(app_id), + ) + .unwrap(); + assert_eq!(resp.status_code, 200); + assert_eq!(resp.body, serde_json::json!("hello, world")); +} + +#[tokio::test(flavor = "multi_thread")] +async fn module_can_import_module() { + let source = CountingModuleSource::new(); + let app_id = AppId::new(); + source.put(app_id, "inner", "fn three() { 3 }").await; + source + .put( + app_id, + "outer", + r#"import "inner" as i; fn nine() { i::three() * 3 }"#, + ) + .await; + let engine = engine_with(source); + + let resp = engine + .execute(r#"import "outer" as o; o::nine()"#, req(app_id)) + .unwrap(); + assert_eq!(resp.body, serde_json::json!(9)); +} + +#[test] +fn validate_module_accepts_fn_const_import_only() { + let engine = Engine::new(Limits::default(), Services::default()); + let valid = r#" + const PI = 3.14; + import "other" as o; + fn area(r) { PI * r * r } + "#; + let v = engine.validate_module(valid).expect("valid module body"); + assert_eq!(v.imports, vec!["other".to_string()]); +} + +#[test] +fn validate_module_rejects_top_level_let() { + let engine = Engine::new(Limits::default(), Services::default()); + let bad = "let x = 1; fn f() { x }"; + let err = engine + .validate_module(bad) + .expect_err("top-level let should be rejected"); + let msg = format!("{err:?}").to_lowercase(); + assert!(msg.contains("top-level") || msg.contains("module")); +} + +#[test] +fn validate_module_rejects_top_level_expr() { + let engine = Engine::new(Limits::default(), Services::default()); + let bad = "42"; + let err = engine + .validate_module(bad) + .expect_err("top-level expr should be rejected"); + let msg = format!("{err:?}").to_lowercase(); + assert!(msg.contains("top-level") || msg.contains("module")); +} + +#[test] +fn validate_module_rejects_top_level_while() { + // Avoid `if true { ... }` — Rhai folds constant-condition `if`s + // at optimize time, leaving an empty statement list that passes + // module-shape validation vacuously. A `while` with a variable + // condition isn't folded. + let engine = Engine::new(Limits::default(), Services::default()); + let bad = r#"let i = 0; while i < 1 { i += 1; }"#; + let err = engine + .validate_module(bad) + .expect_err("top-level loop should be rejected"); + let msg = format!("{err:?}").to_lowercase(); + assert!(msg.contains("top-level") || msg.contains("module")); +} + +#[test] +fn validate_endpoint_extracts_literal_imports() { + let engine = Engine::new(Limits::default(), Services::default()); + let src = r#" + import "a" as a; + import "b" as b; + a::run() + b::run() + "#; + let v = engine + .validate(src) + .expect("endpoint with imports should parse"); + assert_eq!(v.imports, vec!["a".to_string(), "b".to_string()]); +} + +#[test] +fn validate_endpoint_top_level_expr_still_allowed() { + // Endpoints can have arbitrary top-level statements — only + // modules are restricted. Confirm v1.1.3 didn't tighten endpoints. + let engine = Engine::new(Limits::default(), Services::default()); + let src = r#"let x = 1; #{ statusCode: 200, body: x }"#; + engine + .validate(src) + .expect("endpoints may have top-level statements"); +} + +#[test] +fn validate_endpoint_skips_dynamic_imports_in_imports_list() { + // `import some_var as y;` parses but is not a literal-path + // import — the dep graph cannot track it. The imports list + // should be empty for such a script. + let engine = Engine::new(Limits::default(), Services::default()); + let src = r#" + let name = "x"; + import name as y; + y::run() + "#; + let v = engine.validate(src).expect("dynamic import should parse"); + assert!( + v.imports.is_empty(), + "dynamic imports should not appear in the dep-graph imports list, got {:?}", + v.imports + ); +} diff --git a/crates/manager-core/src/repo.rs b/crates/manager-core/src/repo.rs index 821c1b3..b609389 100644 --- a/crates/manager-core/src/repo.rs +++ b/crates/manager-core/src/repo.rs @@ -62,10 +62,8 @@ pub trait ScriptRepository: Send + Sync { /// v1.1.3: list module dependencies of this script — the rows in /// `script_imports` where `importer_script_id = script_id`. Used /// by tests and (eventually) a dashboard "Imports" panel. - async fn list_imports( - &self, - script_id: ScriptId, - ) -> Result, ScriptRepositoryError>; + async fn list_imports(&self, script_id: ScriptId) + -> Result, ScriptRepositoryError>; } /// Inbound shape for create. Defaults match the migration's CHECK @@ -267,7 +265,7 @@ impl ScriptRepository for PostgresScriptRepository { .bind(patch.timeout_seconds) .bind(patch.memory_limit_mb) .bind(sandbox_json) - .bind(patch.kind.map(|k| k.as_str())) + .bind(patch.kind.map(ScriptKind::as_str)) .fetch_optional(&mut *tx) .await; @@ -414,7 +412,7 @@ impl From for Script { // Defensive: if a row's `kind` somehow falls outside the CHECK // constraint, treat it as Endpoint (the safe default — won't // grant a row import-target status it doesn't have). - let kind = ScriptKind::from_str(&r.kind).unwrap_or(ScriptKind::Endpoint); + let kind = ScriptKind::parse_str(&r.kind).unwrap_or(ScriptKind::Endpoint); Self { id: r.id.into(), app_id: r.app_id.into(), diff --git a/crates/manager-core/src/triggers_api.rs b/crates/manager-core/src/triggers_api.rs index 20981d2..075bbcd 100644 --- a/crates/manager-core/src/triggers_api.rs +++ b/crates/manager-core/src/triggers_api.rs @@ -716,7 +716,7 @@ mod tests { source: String::new(), kind, timeout_seconds: 30, - sandbox: Default::default(), + sandbox: picloud_shared::ScriptSandbox::default(), memory_limit_mb: 256, created_at: now, updated_at: now, @@ -766,10 +766,7 @@ mod tests { ) -> Result { unimplemented!() } - async fn delete( - &self, - _id: ScriptId, - ) -> Result<(), crate::repo::ScriptRepositoryError> { + async fn delete(&self, _id: ScriptId) -> Result<(), crate::repo::ScriptRepositoryError> { unimplemented!() } async fn count_routes_for_script( @@ -1106,4 +1103,205 @@ mod tests { let err = res.expect_err("cross-app delete should 404"); assert!(matches!(err, TriggersApiError::NotFound(_))); } + + // ---------------------------------------------------------------- + // v1.1.3: kind + cross-app target validation on trigger create. + // ---------------------------------------------------------------- + + #[tokio::test] + async fn kv_trigger_rejects_module_target() { + let app_id = AppId::new(); + let script_id = ScriptId::new(); + let state = TriggersState { + triggers: Arc::new(InMemoryTriggerRepo::default()), + apps: InMemoryAppRepo::with(app_id), + authz: Arc::new(AlwaysAllowAuthzRepo), + scripts: InMemoryScriptRepo::with_module(app_id, script_id), + config: TriggerConfig::conservative(), + }; + let res = create_kv_trigger( + State(state), + Extension(member_principal()), + Path(app_id), + Json(CreateKvTriggerRequest { + script_id, + collection_glob: "widgets".into(), + ops: vec![KvEventOp::Insert], + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: None, + retry_backoff: None, + retry_base_ms: None, + }), + ) + .await; + let err = res.expect_err("module script should be rejected as trigger target"); + let msg = match err { + TriggersApiError::Invalid(m) => m, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!( + msg.to_lowercase().contains("module"), + "expected error to mention 'module', got {msg}" + ); + } + + #[tokio::test] + async fn docs_trigger_rejects_module_target() { + let app_id = AppId::new(); + let script_id = ScriptId::new(); + let state = TriggersState { + triggers: Arc::new(InMemoryTriggerRepo::default()), + apps: InMemoryAppRepo::with(app_id), + authz: Arc::new(AlwaysAllowAuthzRepo), + scripts: InMemoryScriptRepo::with_module(app_id, script_id), + config: TriggerConfig::conservative(), + }; + let res = create_docs_trigger( + State(state), + Extension(member_principal()), + Path(app_id), + Json(CreateDocsTriggerRequest { + script_id, + collection_glob: "users".into(), + ops: vec![DocsEventOp::Create], + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: None, + retry_backoff: None, + retry_base_ms: None, + }), + ) + .await; + let err = res.expect_err("module script should be rejected as docs-trigger target"); + let msg = match err { + TriggersApiError::Invalid(m) => m, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(msg.to_lowercase().contains("module")); + } + + #[tokio::test] + async fn dl_trigger_rejects_module_target() { + let app_id = AppId::new(); + let script_id = ScriptId::new(); + let state = TriggersState { + triggers: Arc::new(InMemoryTriggerRepo::default()), + apps: InMemoryAppRepo::with(app_id), + authz: Arc::new(AlwaysAllowAuthzRepo), + scripts: InMemoryScriptRepo::with_module(app_id, script_id), + config: TriggerConfig::conservative(), + }; + let res = create_dl_trigger( + State(state), + Extension(member_principal()), + Path(app_id), + Json(CreateDeadLetterTriggerRequest { + script_id, + source_filter: None, + trigger_id_filter: None, + script_id_filter: None, + }), + ) + .await; + let err = res.expect_err("module script should be rejected as dead-letter target"); + let msg = match err { + TriggersApiError::Invalid(m) => m, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(msg.to_lowercase().contains("module")); + } + + #[tokio::test] + async fn kv_trigger_rejects_missing_script() { + let app_id = AppId::new(); + // Empty script repo — the requested script_id doesn't exist. + let state = state_with(Arc::new(AlwaysAllowAuthzRepo), app_id); + let res = create_kv_trigger( + State(state), + Extension(member_principal()), + Path(app_id), + Json(CreateKvTriggerRequest { + script_id: ScriptId::new(), + collection_glob: "widgets".into(), + ops: vec![], + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: None, + retry_backoff: None, + retry_base_ms: None, + }), + ) + .await; + let err = res.expect_err("missing script should reject"); + let msg = match err { + TriggersApiError::Invalid(m) => m, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!(msg.to_lowercase().contains("not found")); + } + + #[tokio::test] + async fn kv_trigger_rejects_cross_app_script() { + // Latent v1.1.1/v1.1.2 isolation gap closed by v1.1.3: a + // member of app A could previously target a script in app B. + let app_a = AppId::new(); + let app_b = AppId::new(); + let script_id = ScriptId::new(); + // Pre-populate the script repo with the script living in app B, + // but the trigger request targets app A. + let scripts = InMemoryScriptRepo::with_endpoint(app_b, script_id); + let state = TriggersState { + triggers: Arc::new(InMemoryTriggerRepo::default()), + apps: InMemoryAppRepo::with(app_a), + authz: Arc::new(AlwaysAllowAuthzRepo), + scripts, + config: TriggerConfig::conservative(), + }; + let res = create_kv_trigger( + State(state), + Extension(member_principal()), + Path(app_a), + Json(CreateKvTriggerRequest { + script_id, + collection_glob: "widgets".into(), + ops: vec![], + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: None, + retry_backoff: None, + retry_base_ms: None, + }), + ) + .await; + let err = res.expect_err("cross-app trigger target should reject"); + let msg = match err { + TriggersApiError::Invalid(m) => m, + other => panic!("expected Invalid, got {other:?}"), + }; + assert!( + msg.to_lowercase().contains("does not belong"), + "expected cross-app rejection message, got {msg}" + ); + } + + #[tokio::test] + async fn kv_trigger_accepts_endpoint_target() { + let app_id = AppId::new(); + let script_id = ScriptId::new(); + let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id); + let (status, _) = create_kv_trigger( + State(state), + Extension(member_principal()), + Path(app_id), + Json(CreateKvTriggerRequest { + script_id, + collection_glob: "widgets".into(), + ops: vec![KvEventOp::Insert], + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: None, + retry_backoff: None, + retry_base_ms: None, + }), + ) + .await + .expect("endpoint target should succeed"); + assert_eq!(status, StatusCode::CREATED); + } } diff --git a/crates/orchestrator-core/src/client.rs b/crates/orchestrator-core/src/client.rs index ff46fa4..b6f530a 100644 --- a/crates/orchestrator-core/src/client.rs +++ b/crates/orchestrator-core/src/client.rs @@ -280,3 +280,131 @@ impl ExecutorClient for RemoteExecutorClient { )) } } + +#[cfg(test)] +mod cache_tests { + use super::*; + use picloud_executor_core::Limits; + use picloud_shared::Services; + + fn engine() -> Arc { + Arc::new(Engine::new(Limits::default(), Services::default())) + } + + fn client_with_cap(cap: usize) -> LocalExecutorClient { + LocalExecutorClient::with_script_cache_capacity( + engine(), + Arc::new(ExecutionGate::new(32)), + cap, + ) + } + + fn identity_at(t: DateTime) -> ScriptIdentity { + ScriptIdentity { + script_id: ScriptId::new(), + updated_at: t, + } + } + + #[test] + fn cache_hit_when_identity_matches() { + let client = client_with_cap(8); + let identity = identity_at(Utc::now()); + let src = "fn f() { 1 }"; + + let ast_a = client.get_or_compile(identity, src).unwrap(); + let ast_b = client.get_or_compile(identity, src).unwrap(); + + // Same Arc — cache served the second call without recompiling. + assert!( + Arc::ptr_eq(&ast_a, &ast_b), + "expected identical Arc from cache hit" + ); + } + + #[test] + fn cache_invalidated_when_updated_at_changes() { + let client = client_with_cap(8); + let script_id = ScriptId::new(); + let t0 = Utc::now() - chrono::Duration::seconds(10); + let t1 = Utc::now(); + + let ast_a = client + .get_or_compile( + ScriptIdentity { + script_id, + updated_at: t0, + }, + "fn f() { 1 }", + ) + .unwrap(); + let ast_b = client + .get_or_compile( + ScriptIdentity { + script_id, + updated_at: t1, + }, + "fn f() { 2 }", + ) + .unwrap(); + + // Different Arc — cache miss forced recompile. + assert!( + !Arc::ptr_eq(&ast_a, &ast_b), + "expected recompile on updated_at change" + ); + } + + #[test] + fn distinct_script_ids_cache_independently() { + let client = client_with_cap(8); + let now = Utc::now(); + let a = identity_at(now); + let b = identity_at(now); + client.get_or_compile(a, "fn x() { 1 }").unwrap(); + client.get_or_compile(b, "fn x() { 1 }").unwrap(); + + let cache = client.script_cache().lock().unwrap(); + assert_eq!( + cache.len(), + 2, + "distinct script_ids should yield two entries" + ); + } + + #[test] + fn lru_eviction_caps_cache_size() { + // Capacity 1 — every new script evicts the previous. + let client = client_with_cap(1); + client + .get_or_compile(identity_at(Utc::now()), "fn a() { 1 }") + .unwrap(); + client + .get_or_compile(identity_at(Utc::now()), "fn b() { 2 }") + .unwrap(); + client + .get_or_compile(identity_at(Utc::now()), "fn c() { 3 }") + .unwrap(); + assert_eq!(client.script_cache().lock().unwrap().len(), 1); + } + + #[test] + fn script_identity_is_copy() { + // Copy is load-bearing — many call sites pass it by value. + let id = identity_at(Utc::now()); + let _ = id; + let _ = id; // should still be usable + } + + #[test] + fn compile_error_does_not_poison_cache() { + let client = client_with_cap(8); + let identity = identity_at(Utc::now()); + // Bad source — should error and not insert anything. + let res = client.get_or_compile(identity, "@@@ not valid rhai @@@"); + assert!(res.is_err(), "garbage source should fail to compile"); + // A subsequent good compile under a fresh identity must still work. + let good = client.get_or_compile(identity_at(Utc::now()), "fn ok() { 1 }"); + assert!(good.is_ok()); + } +} diff --git a/crates/picloud/tests/api.rs b/crates/picloud/tests/api.rs index dfcdb2c..3e1fe80 100644 --- a/crates/picloud/tests/api.rs +++ b/crates/picloud/tests/api.rs @@ -1221,3 +1221,270 @@ async fn execution_errors_are_still_logged(pool: PgPool) { assert_eq!(logs[0]["status"], "error"); assert!(logs[0]["response_body"]["error"].is_string()); } + +// ============================================================================ +// v1.1.3 — Modules: scripts.kind, route + trigger rejection, end-to-end import +// ============================================================================ + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn create_script_default_kind_is_endpoint(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + let r = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ "name": "default-kind", "source": "1" }), + )) + .await; + r.assert_status(axum::http::StatusCode::CREATED); + let body: Value = r.json(); + assert_eq!(body["kind"], "endpoint"); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn create_module_kind_persists(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + let r = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "helpers", + "kind": "module", + "source": "fn add(a, b) { a + b }" + }), + )) + .await; + r.assert_status(axum::http::StatusCode::CREATED); + let body: Value = r.json(); + assert_eq!(body["kind"], "module"); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn create_module_with_top_level_expr_rejected(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + let r = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "badmod", + "kind": "module", + "source": "42; fn ok() { 1 }" + }), + )) + .await; + r.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY); + let body: Value = r.json(); + assert!(body["error"].as_str().unwrap().contains("module")); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn create_module_with_reserved_name_rejected(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + let r = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "kv", + "kind": "module", + "source": "fn ok() { 1 }" + }), + )) + .await; + r.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY); + let body: Value = r.json(); + assert!(body["error"].as_str().unwrap().contains("reserved")); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn route_bind_rejects_module(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + let r = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "lib", + "kind": "module", + "source": "fn pong() { 42 }" + }), + )) + .await; + r.assert_status(axum::http::StatusCode::CREATED); + let body: Value = r.json(); + let id = body["id"].as_str().unwrap(); + + let r = s + .post(&format!("/api/v1/admin/scripts/{id}/routes")) + .json(&json!({ + "host_kind": "any", + "path_kind": "exact", + "path": "/lib" + })) + .await; + r.assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn endpoint_imports_module_end_to_end(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + + // Create a module script. + s.post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "math", + "kind": "module", + "source": "fn add(a, b) { a + b }" + }), + )) + .await + .assert_status(axum::http::StatusCode::CREATED); + + // Create an endpoint that imports it. + let id = create_basic_script( + &s, + &app_id, + "calc", + r#"import "math" as m; #{ statusCode: 200, body: m::add(2, 3) }"#, + ) + .await; + + // Bind a route. + s.post(&format!("/api/v1/admin/scripts/{id}/routes")) + .json(&json!({ + "host_kind": "any", + "path_kind": "exact", + "path": "/calc" + })) + .await + .assert_status(axum::http::StatusCode::CREATED); + + // Hit it — the endpoint should consume the module and return 5. + let r = s.get("/calc").add_header("host", "localhost").await; + r.assert_status_ok(); + let body: Value = r.json(); + assert_eq!(body, json!(5)); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn module_edit_visible_on_next_invocation(pool: PgPool) { + let (s, app_id) = server_with_app(pool).await; + + let lib: Value = s + .post("/api/v1/admin/scripts") + .json(&with_app( + &app_id, + json!({ + "name": "greet", + "kind": "module", + "source": r"fn say(n) { `hello, ${n}` }" + }), + )) + .await + .json(); + let lib_id = lib["id"].as_str().unwrap(); + + let id = create_basic_script( + &s, + &app_id, + "hello", + r#"import "greet" as g; #{ statusCode: 200, body: g::say("world") }"#, + ) + .await; + s.post(&format!("/api/v1/admin/scripts/{id}/routes")) + .json(&json!({ + "host_kind": "any", + "path_kind": "exact", + "path": "/hello" + })) + .await + .assert_status(axum::http::StatusCode::CREATED); + + let r1: Value = s.get("/hello").add_header("host", "localhost").await.json(); + assert_eq!(r1, json!("hello, world")); + + // Edit the module — bump updated_at. + s.put(&format!("/api/v1/admin/scripts/{lib_id}")) + .json(&json!({ "source": r"fn say(n) { `hi, ${n}` }" })) + .await + .assert_status_ok(); + + // Cache invalidation must surface the new behavior. + let r2: Value = s.get("/hello").add_header("host", "localhost").await.json(); + assert_eq!(r2, json!("hi, world")); +} + +#[ignore = "needs DATABASE_URL pointing at a running Postgres"] +#[sqlx::test(migrations = "../manager-core/migrations")] +async fn cross_app_import_blocked(pool: PgPool) { + // Two apps each have a module named "helpers" with different + // behavior. An endpoint in app A must import A's module, not B's. + + // App A is already created by `server_with_app`. Create app B. + let (s, app_a) = server_with_app(pool).await; + let app_b: Value = s + .post("/api/v1/admin/apps") + .json(&json!({ "slug": "appb", "name": "App B" })) + .await + .json(); + let app_b_id = app_b["id"].as_str().unwrap(); + + // App A's module returns "A". App B's returns "B". + s.post("/api/v1/admin/scripts") + .json(&with_app( + &app_a, + json!({ + "name": "helpers", + "kind": "module", + "source": r#"fn who() { "A" }"# + }), + )) + .await + .assert_status(axum::http::StatusCode::CREATED); + s.post("/api/v1/admin/scripts") + .json(&with_app( + app_b_id, + json!({ + "name": "helpers", + "kind": "module", + "source": r#"fn who() { "B" }"# + }), + )) + .await + .assert_status(axum::http::StatusCode::CREATED); + + // Endpoint in app A imports "helpers" and exposes the result. + let id = create_basic_script( + &s, + &app_a, + "who-am-i", + r#"import "helpers" as h; #{ statusCode: 200, body: h::who() }"#, + ) + .await; + s.post(&format!("/api/v1/admin/scripts/{id}/routes")) + .json(&json!({ + "host_kind": "any", + "path_kind": "exact", + "path": "/who-am-i" + })) + .await + .assert_status(axum::http::StatusCode::CREATED); + + let r: Value = s + .get("/who-am-i") + .add_header("host", "localhost") + .await + .json(); + assert_eq!(r, json!("A"), "must see app A's module, not app B's"); +} diff --git a/crates/shared/src/script.rs b/crates/shared/src/script.rs index 1ea53b0..5e7103a 100644 --- a/crates/shared/src/script.rs +++ b/crates/shared/src/script.rs @@ -14,21 +14,16 @@ use crate::{AppId, ScriptId, ScriptSandbox}; /// Serialized as `"endpoint"` / `"module"` so the wire shape is the /// same string the SQL `CHECK (kind IN ('endpoint','module'))` /// constraint enforces. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum ScriptKind { + #[default] Endpoint, Module, } -impl Default for ScriptKind { - fn default() -> Self { - Self::Endpoint - } -} - impl ScriptKind { - /// Wire / SQL representation. Inverse of `from_str`. + /// Wire / SQL representation. Inverse of `parse_str`. #[must_use] pub fn as_str(self) -> &'static str { match self { @@ -39,8 +34,11 @@ impl ScriptKind { /// Parse the canonical wire / SQL form. Returns `None` for any /// other input; callers map that to a 400 / `ValidationError`. + /// Named `parse_str` (not `from_str`) to dodge the + /// `std::str::FromStr` lint without taking on the trait's + /// `Result` shape that this caller doesn't need. #[must_use] - pub fn from_str(s: &str) -> Option { + pub fn parse_str(s: &str) -> Option { match s { "endpoint" => Some(Self::Endpoint), "module" => Some(Self::Module), @@ -49,6 +47,45 @@ impl ScriptKind { } } +#[cfg(test)] +mod kind_tests { + use super::*; + + #[test] + fn default_is_endpoint() { + assert_eq!(ScriptKind::default(), ScriptKind::Endpoint); + } + + #[test] + fn round_trips_through_serde_lowercase() { + assert_eq!( + serde_json::to_string(&ScriptKind::Endpoint).unwrap(), + "\"endpoint\"" + ); + assert_eq!( + serde_json::to_string(&ScriptKind::Module).unwrap(), + "\"module\"" + ); + assert_eq!( + serde_json::from_str::("\"endpoint\"").unwrap(), + ScriptKind::Endpoint + ); + assert_eq!( + serde_json::from_str::("\"module\"").unwrap(), + ScriptKind::Module + ); + } + + #[test] + fn parse_str_round_trip() { + for k in [ScriptKind::Endpoint, ScriptKind::Module] { + assert_eq!(ScriptKind::parse_str(k.as_str()), Some(k)); + } + assert_eq!(ScriptKind::parse_str("invalid"), None); + assert_eq!(ScriptKind::parse_str(""), None); + } +} + /// A user-uploaded Rhai script and its execution configuration. /// /// This is the canonical representation that flows between manager (storage), diff --git a/crates/shared/src/services.rs b/crates/shared/src/services.rs index 0834767..14de6fa 100644 --- a/crates/shared/src/services.rs +++ b/crates/shared/src/services.rs @@ -20,8 +20,8 @@ use std::sync::Arc; use crate::{ - DeadLetterService, DocsService, KvService, ModuleSource, NoopDeadLetterService, NoopDocsService, - NoopEventEmitter, NoopKvService, NoopModuleSource, ServiceEventEmitter, + DeadLetterService, DocsService, KvService, ModuleSource, NoopDeadLetterService, + NoopDocsService, NoopEventEmitter, NoopKvService, NoopModuleSource, ServiceEventEmitter, }; /// SDK service bundle. See module docs for the lifecycle and the v1.1.x