Files
PiCloud/crates/picloud/tests/dispatcher_e2e.rs
MechaCat02 02335a8132 fix(v1.1.7-dead-letter): wire dispatcher → list_matching_dead_letter
dead_letter triggers have been registerable since v1.1.1 but their
handlers never fired: dispatcher::handle_failure wrote the dead_letters
row and stopped — list_matching_dead_letter had no production caller.
Any deploy v1.1.1–v1.1.6 with dead_letter triggers had silently
non-functional handlers.

The fix: after the dead-letter row is inserted on retry exhaustion, fan
out to matching dead_letter triggers (filtered by source / originating
trigger_id / script_id) and enqueue one outbox row per match carrying a
real-shape TriggerEvent::DeadLetter (the §6 brief field names were stale
— used the actual variant: dead_letter_id, original: Box<TriggerEvent>,
attempts, last_error, trigger_id, script_id, first/last_attempt_at).
The recursion-stop (a handler's own failure isn't re-dead-lettered)
is upheld by the existing is_dead_letter_handler short-circuit.

Tests (DB-gated): handler actually fires with the nested original event;
existing row-create test now also asserts handler-fire; source_filter
excludes non-matching; failing handler does not recurse.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-04 22:30:25 +02:00

490 lines
17 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! End-to-end dispatcher tests — one per trigger kind (v1.1.5 follow-up,
//! landed in v1.1.6). Each test wires the full all-in-one app via
//! `build_app` (which spawns the real dispatcher + cron scheduler +
//! executor), creates an app + a logging handler script + a trigger,
//! causes the originating event, and polls for the handler's side effect.
//!
//! ## Gating
//!
//! These need a Postgres reachable via `DATABASE_URL`. They follow the
//! `schema_snapshot` pattern (NOT `#[ignore]`): when `DATABASE_URL` is
//! unset the test prints a notice and returns early, so plain
//! `cargo test` stays green locally while CI (which sets `DATABASE_URL`)
//! runs them.
//!
//! ## How "the handler fired" is observed
//!
//! The dispatcher does not write `execution_log` rows for trigger
//! handlers, so each handler instead records its `ctx.event` into a KV
//! marker (`collection = "e2e_markers"`, which no trigger watches — no
//! recursion). The test polls `kv_entries` for that marker and asserts
//! the event shape. See HANDBACK §deviations for why this lives in
//! `picloud/tests/` rather than `manager-core/tests/` (build_app lives in
//! the `picloud` crate) and for the `dead_letter` reinterpretation.
#![allow(clippy::needless_pass_by_value)]
use std::time::Duration;
use axum_test::TestServer;
use serde_json::{json, Value};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use uuid::Uuid;
/// Connect + migrate, or return `None` (printing a skip notice) when
/// `DATABASE_URL` is unset — mirrors `schema_snapshot.rs`.
async fn pool_or_skip() -> Option<PgPool> {
let Ok(url) = std::env::var("DATABASE_URL") else {
eprintln!("dispatcher_e2e: DATABASE_URL unset — skipping");
return None;
};
let pool = PgPoolOptions::new()
.max_connections(5)
.connect(&url)
.await
.expect("connect to DATABASE_URL");
sqlx::migrate!("../manager-core/migrations")
.run(&pool)
.await
.expect("apply migrations");
Some(pool)
}
/// Build the app over the shared pool with a uniquely-named owner admin,
/// log in, and create a fresh app. `suffix` must be unique per test (the
/// pool is shared, so names must not collide).
async fn server_for(pool: PgPool, suffix: &str) -> (TestServer, String) {
use picloud_manager_core::auth::hash_password;
use picloud_shared::InstanceRole;
let unique = format!("{suffix}-{}", Uuid::new_v4().simple());
let auth = picloud::AuthDeps::from_pool(pool.clone());
let username = format!("e2e-{unique}");
let hash = hash_password("pw").expect("hash");
auth.users
.create(&username, &hash, InstanceRole::Owner, None)
.await
.expect("seed admin");
let app = picloud::build_app(
pool,
auth,
picloud_shared::MasterKey::from_bytes([0x42u8; 32]),
)
.await
.expect("build_app");
let mut server = TestServer::new(app).expect("TestServer");
let resp = server
.post("/api/v1/admin/auth/login")
.json(&json!({ "username": username, "password": "pw" }))
.await;
resp.assert_status_ok();
let token = resp.json::<Value>()["token"]
.as_str()
.expect("login token")
.to_string();
server.add_header("authorization", format!("Bearer {token}"));
// A fresh app keeps each test's KV / events isolated from siblings.
let slug = format!("e2e-{unique}");
let created: Value = server
.post("/api/v1/admin/apps")
.json(&json!({ "slug": slug, "name": slug }))
.await
.json();
let app_id = created["id"].as_str().expect("app id").to_string();
(server, app_id)
}
async fn create_script(server: &TestServer, app_id: &str, name: &str, source: &str) -> String {
let created: Value = server
.post("/api/v1/admin/scripts")
.json(&json!({ "app_id": app_id, "name": name, "source": source }))
.await
.json();
created["id"].as_str().expect("script id").to_string()
}
/// A handler that records its `ctx.event` into a KV marker the test can
/// observe. The marker collection is watched by no trigger.
const MARKER_HANDLER: &str = r#"
let e = ctx.event;
kv::collection("e2e_markers").set("marker", e);
#{ ok: true }
"#;
/// Poll the marker KV key until present (or ~10s timeout).
async fn poll_marker(pool: &PgPool, app_id: &str) -> Option<Value> {
poll_marker_n(pool, app_id, 100).await
}
/// Poll the marker KV key for `iters` × 100ms.
async fn poll_marker_n(pool: &PgPool, app_id: &str, iters: u32) -> Option<Value> {
let app_uuid = Uuid::parse_str(app_id).expect("app uuid");
for _ in 0..iters {
let row: Option<(Value,)> = sqlx::query_as(
"SELECT value FROM kv_entries \
WHERE app_id = $1 AND collection = 'e2e_markers' AND key = 'marker'",
)
.bind(app_uuid)
.fetch_optional(pool)
.await
.expect("query marker");
if let Some((value,)) = row {
return Some(value);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
None
}
async fn execute(server: &TestServer, script_id: &str) {
server
.post(&format!("/api/v1/execute/{script_id}"))
.json(&json!({}))
.await
.assert_status_ok();
}
#[tokio::test]
async fn dispatcher_delivers_kv_to_handler() {
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "kv").await;
let handler = create_script(&server, &app_id, "kv-handler", MARKER_HANDLER).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
.json(&json!({ "script_id": handler, "collection_glob": "src" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
&server,
&app_id,
"kv-source",
r#"kv::collection("src").set("k", 42); #{ ok: true }"#,
)
.await;
execute(&server, &source).await;
let event = poll_marker(&pool, &app_id).await.expect("kv handler fired");
assert_eq!(event["source"], "kv");
assert_eq!(event["op"], "insert");
assert_eq!(event["kv"]["collection"], "src");
assert_eq!(event["kv"]["key"], "k");
}
#[tokio::test]
async fn dispatcher_delivers_docs_to_handler() {
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "docs").await;
let handler = create_script(&server, &app_id, "docs-handler", MARKER_HANDLER).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/docs"))
.json(&json!({ "script_id": handler, "collection_glob": "src" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
&server,
&app_id,
"docs-source",
r#"docs::collection("src").create(#{ x: 1 }); #{ ok: true }"#,
)
.await;
execute(&server, &source).await;
let event = poll_marker(&pool, &app_id)
.await
.expect("docs handler fired");
assert_eq!(event["source"], "docs");
assert_eq!(event["op"], "create");
assert_eq!(event["docs"]["collection"], "src");
}
#[tokio::test]
async fn dispatcher_delivers_cron_to_handler() {
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "cron").await;
let handler = create_script(&server, &app_id, "cron-handler", MARKER_HANDLER).await;
// Fire every second (6-field cron, seconds-resolution).
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/cron"))
.json(&json!({ "script_id": handler, "schedule": "* * * * * *", "timezone": "UTC" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
// No source — the scheduler enqueues the due tick on its own. The
// scheduler skips its first tick and then ticks every
// PICLOUD_CRON_TICK_INTERVAL_MS (default 30s), so poll past that
// (set the env var lower to speed CI up if desired).
let event = poll_marker_n(&pool, &app_id, 450)
.await
.expect("cron handler fired");
assert_eq!(event["source"], "cron");
assert_eq!(event["op"], "tick");
assert_eq!(event["cron"]["timezone"], "UTC");
}
#[tokio::test]
async fn dispatcher_delivers_files_to_handler() {
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "files").await;
let handler = create_script(&server, &app_id, "files-handler", MARKER_HANDLER).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/files"))
.json(&json!({ "script_id": handler, "collection_glob": "src" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
&server,
&app_id,
"files-source",
r#"
let data = base64::decode("aGk=");
files::collection("src").create(#{ name: "f.txt", content_type: "text/plain", data: data });
#{ ok: true }
"#,
)
.await;
execute(&server, &source).await;
let event = poll_marker(&pool, &app_id)
.await
.expect("files handler fired");
assert_eq!(event["source"], "files");
assert_eq!(event["op"], "create");
assert_eq!(event["files"]["collection"], "src");
assert_eq!(event["files"]["name"], "f.txt");
}
#[tokio::test]
async fn dispatcher_delivers_pubsub_to_handler() {
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "pubsub").await;
let handler = create_script(&server, &app_id, "pubsub-handler", MARKER_HANDLER).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/pubsub"))
.json(&json!({ "script_id": handler, "topic_pattern": "e2e.topic" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
&server,
&app_id,
"pubsub-source",
r#"pubsub::publish_durable("e2e.topic", #{ hello: 1 }); #{ ok: true }"#,
)
.await;
execute(&server, &source).await;
let event = poll_marker(&pool, &app_id)
.await
.expect("pubsub handler fired");
assert_eq!(event["source"], "pubsub");
assert_eq!(event["op"], "publish");
assert_eq!(event["pubsub"]["topic"], "e2e.topic");
assert_eq!(event["pubsub"]["message"]["hello"], 1);
}
/// Count dead_letters rows for an app.
async fn dead_letter_count(pool: &PgPool, app_id: &str) -> i64 {
let app_uuid = Uuid::parse_str(app_id).unwrap();
sqlx::query_scalar("SELECT COUNT(*) FROM dead_letters WHERE app_id = $1")
.bind(app_uuid)
.fetch_one(pool)
.await
.expect("count dead_letters")
}
async fn poll_dead_letter_count(pool: &PgPool, app_id: &str, want: i64) -> i64 {
let mut count = 0;
for _ in 0..100 {
count = dead_letter_count(pool, app_id).await;
if count >= want {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
count
}
/// Register a failing KV trigger on `dlsrc` (single attempt → immediate
/// dead-letter) and a `dead_letter` trigger pointing at the marker
/// handler, then cause the originating KV event. Returns when set up.
async fn setup_dead_letter(server: &TestServer, app_id: &str, dl_handler: &str) {
let failing = create_script(server, app_id, "dl-failing", r#"throw "boom";"#).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
.json(&json!({
"script_id": failing,
"collection_glob": "dlsrc",
"retry_max_attempts": 1,
"retry_base_ms": 0
}))
.await
.assert_status(axum::http::StatusCode::CREATED);
// The dead_letter trigger (no filters → matches any dead-letter).
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter"))
.json(&json!({ "script_id": dl_handler }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
server,
app_id,
"dl-source",
r#"kv::collection("dlsrc").set("k", 1); #{ ok: true }"#,
)
.await;
execute(server, source.as_str()).await;
}
#[tokio::test]
async fn dispatcher_delivers_dead_letter_to_handler() {
// v1.1.7: the dead-letter fan-out is now wired
// (`dispatcher::handle_failure` → `list_matching_dead_letter` →
// outbox). This asserts BOTH that the `dead_letters` row is written
// AND that the registered `dead_letter`-kind handler actually fires
// (it was silently non-functional v1.1.1v1.1.6).
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "dl").await;
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
setup_dead_letter(&server, &app_id, &handler).await;
// Row written.
assert!(
poll_dead_letter_count(&pool, &app_id, 1).await > 0,
"a dead-letter row should have been produced"
);
// Handler fired.
let event = poll_marker(&pool, &app_id)
.await
.expect("dead-letter handler fired");
assert_eq!(event["source"], "dead_letter");
}
#[tokio::test]
async fn dispatcher_delivers_dead_letter_to_handler_actually_fires() {
// Focused on the handler-fire side: the marker handler receives a
// fully-shaped dead-letter event (the original KV event nested under
// `ctx.event.dead_letter.original`, plus the failure metadata).
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "dlfire").await;
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
setup_dead_letter(&server, &app_id, &handler).await;
let event = poll_marker(&pool, &app_id)
.await
.expect("dead-letter handler fired");
assert_eq!(event["source"], "dead_letter");
// The original KV event is nested verbatim.
assert_eq!(event["dead_letter"]["original"]["source"], "kv");
assert_eq!(
event["dead_letter"]["original"]["kv"]["collection"],
"dlsrc"
);
// Failure metadata is present.
assert!(event["dead_letter"]["last_error"]
.as_str()
.unwrap()
.contains("boom"));
assert!(event["dead_letter"]["attempts"].as_i64().unwrap() >= 1);
}
#[tokio::test]
async fn dead_letter_source_filter_excludes_nonmatching() {
// `list_matching_dead_letter` filters by source (among trigger_id /
// script_id). A dead_letter trigger whose `source_filter` is "docs"
// must NOT fire for a "kv"-sourced dead-letter — the row is still
// written, but no handler delivery is enqueued.
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "dlfilter").await;
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
let failing = create_script(&server, &app_id, "dl-failing", r#"throw "boom";"#).await;
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
.json(&json!({
"script_id": failing,
"collection_glob": "dlsrc",
"retry_max_attempts": 1,
"retry_base_ms": 0
}))
.await
.assert_status(axum::http::StatusCode::CREATED);
// Filter to a different source so this handler must NOT match.
server
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter"))
.json(&json!({ "script_id": handler, "source_filter": "docs" }))
.await
.assert_status(axum::http::StatusCode::CREATED);
let source = create_script(
&server,
&app_id,
"dl-source",
r#"kv::collection("dlsrc").set("k", 1); #{ ok: true }"#,
)
.await;
execute(&server, &source).await;
// The dead-letter row is written…
assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1);
// …but the source-filtered handler never fires.
let marker = poll_marker_n(&pool, &app_id, 8).await;
assert!(
marker.is_none(),
"source_filter='docs' must not fire for a kv dead-letter"
);
}
#[tokio::test]
async fn dead_letter_handler_failure_does_not_recurse() {
// Recursion-stop (design notes §4): a dead_letter handler that itself
// throws must NOT produce a second dead-letter row. The
// `is_dead_letter_handler` short-circuit annotates the original row
// and drops the outbox row without re-dead-lettering.
let Some(pool) = pool_or_skip().await else {
return;
};
let (server, app_id) = server_for(pool.clone(), "dlrec").await;
// The DL handler itself throws.
let throwing = create_script(&server, &app_id, "dl-throws", r#"throw "handler boom";"#).await;
setup_dead_letter(&server, &app_id, &throwing).await;
// One dead-letter row appears (the original). Give the throwing
// handler time to run + (not) recurse, then confirm the count stayed
// at exactly 1.
assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1);
tokio::time::sleep(Duration::from_millis(800)).await;
assert_eq!(
dead_letter_count(&pool, &app_id).await,
1,
"a failing dead-letter handler must not create a new dead-letter row"
);
}