From 02335a8132d3ae8998d7af96d478cb9af4d3eff5 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Thu, 4 Jun 2026 22:30:25 +0200 Subject: [PATCH] =?UTF-8?q?fix(v1.1.7-dead-letter):=20wire=20dispatcher=20?= =?UTF-8?q?=E2=86=92=20list=5Fmatching=5Fdead=5Fletter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dead_letter triggers have been registerable since v1.1.1 but their handlers never fired: dispatcher::handle_failure wrote the dead_letters row and stopped — list_matching_dead_letter had no production caller. Any deploy v1.1.1–v1.1.6 with dead_letter triggers had silently non-functional handlers. The fix: after the dead-letter row is inserted on retry exhaustion, fan out to matching dead_letter triggers (filtered by source / originating trigger_id / script_id) and enqueue one outbox row per match carrying a real-shape TriggerEvent::DeadLetter (the §6 brief field names were stale — used the actual variant: dead_letter_id, original: Box, attempts, last_error, trigger_id, script_id, first/last_attempt_at). The recursion-stop (a handler's own failure isn't re-dead-lettered) is upheld by the existing is_dead_letter_handler short-circuit. Tests (DB-gated): handler actually fires with the nested original event; existing row-create test now also asserts handler-fire; source_filter excludes non-matching; failing handler does not recurse. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/manager-core/src/dispatcher.rs | 108 ++++++++++++++- crates/picloud/tests/dispatcher_e2e.rs | 180 +++++++++++++++++++++---- 2 files changed, 256 insertions(+), 32 deletions(-) diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index 78bdfe1..4c9de30 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -23,19 +23,19 @@ use std::sync::Arc; use std::time::Duration; -use chrono::Utc; +use chrono::{DateTime, Utc}; use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType}; use picloud_orchestrator_core::{ExecutionGate, ExecutorClient}; use picloud_shared::{ - ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome, InboxFailureKind, - InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, + DeadLetterId, ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome, + InboxFailureKind, InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent, }; use rand::Rng; use uuid::Uuid; use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution}; use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter}; -use crate::outbox_repo::{OutboxRepo, OutboxRow, OutboxSourceKind}; +use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxRow, OutboxSourceKind}; use crate::principal_resolver::PrincipalResolver; use crate::repo::ScriptRepository; use crate::trigger_config::{BackoffShape, TriggerConfig}; @@ -463,12 +463,12 @@ impl Dispatcher { // Exhausted retries → dead-letter. let (op, source) = describe_event(&row.payload); let now = Utc::now(); - if let Err(e) = self + let dl_id = match self .dead_letters .insert(NewDeadLetter { app_id: row.app_id, original_event_id: row.id, - source, + source: source.clone(), op, trigger_id: row.trigger_id, script_id: Some(resolved.script_id), @@ -480,8 +480,26 @@ impl Dispatcher { }) .await { - tracing::error!(?e, "failed to write dead-letter row"); + Ok(id) => Some(id), + Err(e) => { + tracing::error!(?e, "failed to write dead-letter row"); + None + } + }; + + // v1.1.7 fix: fan the dead-letter out to matching handler triggers. + // This was missing since v1.1.1 — the row was written but + // `list_matching_dead_letter` had no production caller, so + // registered dead_letter handlers never fired. The recursion-stop + // (a dead-letter handler's own failure is not re-dead-lettered) + // is upheld by the `is_dead_letter_handler` short-circuit at the + // top of this function, so this fan-out is only reached for + // non-handler executions. + if let Some(dl_id) = dl_id { + self.fan_out_dead_letter(&row, &resolved, dl_id, &source, attempt, &err, now) + .await; } + self.outbox .delete(row.id) .await @@ -489,6 +507,82 @@ impl Dispatcher { Ok(()) } + /// Enqueue one outbox row per matching `dead_letter` trigger so its + /// handler script runs with the dead-letter event as `ctx.event`. + /// Best-effort: a lookup/insert failure is logged, not propagated + /// (the dead-letter row itself is already durably written). + #[allow(clippy::too_many_arguments)] + async fn fan_out_dead_letter( + &self, + row: &OutboxRow, + resolved: &ResolvedTrigger, + dead_letter_id: DeadLetterId, + source: &str, + attempt: u32, + err: &ExecError, + now: DateTime, + ) { + // The DL event nests the original verbatim; if the payload can't + // be decoded back into a TriggerEvent we can't build the nested + // `original`, so skip the fan-out (the DL row is still written). + let Ok(original) = serde_json::from_value::(row.payload.clone()) else { + tracing::warn!( + outbox_id = %row.id, + "dead-letter payload is not a TriggerEvent; skipping handler fan-out" + ); + return; + }; + + let matches = match self + .triggers + .list_matching_dead_letter(row.app_id, source, row.trigger_id, Some(resolved.script_id)) + .await + { + Ok(m) => m, + Err(e) => { + tracing::error!(?e, "dead-letter trigger lookup failed"); + return; + } + }; + + for m in matches { + let event = TriggerEvent::DeadLetter { + dead_letter_id, + original: Box::new(original.clone()), + attempts: attempt, + last_error: err.to_string(), + trigger_id: row.trigger_id, + script_id: Some(resolved.script_id), + first_attempt_at: row.created_at, + last_attempt_at: now, + }; + let payload = match serde_json::to_value(&event) { + Ok(p) => p, + Err(e) => { + tracing::error!(?e, "failed to serialize dead-letter event"); + continue; + } + }; + if let Err(e) = self + .outbox + .insert(NewOutboxRow { + app_id: row.app_id, + source_kind: OutboxSourceKind::DeadLetter, + trigger_id: Some(m.trigger_id), + script_id: Some(m.script_id), + reply_to: None, + payload, + origin_principal: Some(m.registered_by_principal), + trigger_depth: row.trigger_depth.saturating_add(1), + root_execution_id: row.root_execution_id, + }) + .await + { + tracing::error!(?e, "failed to enqueue dead-letter handler delivery"); + } + } + } + async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) { match self.inbox.deliver(inbox_id, result.clone()).await { InboxDeliveryOutcome::Delivered => {} diff --git a/crates/picloud/tests/dispatcher_e2e.rs b/crates/picloud/tests/dispatcher_e2e.rs index d48f060..5966ec8 100644 --- a/crates/picloud/tests/dispatcher_e2e.rs +++ b/crates/picloud/tests/dispatcher_e2e.rs @@ -303,23 +303,128 @@ async fn dispatcher_delivers_pubsub_to_handler() { assert_eq!(event["pubsub"]["message"]["hello"], 1); } +/// Count dead_letters rows for an app. +async fn dead_letter_count(pool: &PgPool, app_id: &str) -> i64 { + let app_uuid = Uuid::parse_str(app_id).unwrap(); + sqlx::query_scalar("SELECT COUNT(*) FROM dead_letters WHERE app_id = $1") + .bind(app_uuid) + .fetch_one(pool) + .await + .expect("count dead_letters") +} + +async fn poll_dead_letter_count(pool: &PgPool, app_id: &str, want: i64) -> i64 { + let mut count = 0; + for _ in 0..100 { + count = dead_letter_count(pool, app_id).await; + if count >= want { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + count +} + +/// Register a failing KV trigger on `dlsrc` (single attempt → immediate +/// dead-letter) and a `dead_letter` trigger pointing at the marker +/// handler, then cause the originating KV event. Returns when set up. +async fn setup_dead_letter(server: &TestServer, app_id: &str, dl_handler: &str) { + let failing = create_script(server, app_id, "dl-failing", r#"throw "boom";"#).await; + server + .post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv")) + .json(&json!({ + "script_id": failing, + "collection_glob": "dlsrc", + "retry_max_attempts": 1, + "retry_base_ms": 0 + })) + .await + .assert_status(axum::http::StatusCode::CREATED); + // The dead_letter trigger (no filters → matches any dead-letter). + server + .post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter")) + .json(&json!({ "script_id": dl_handler })) + .await + .assert_status(axum::http::StatusCode::CREATED); + + let source = create_script( + server, + app_id, + "dl-source", + r#"kv::collection("dlsrc").set("k", 1); #{ ok: true }"#, + ) + .await; + execute(server, source.as_str()).await; +} + #[tokio::test] async fn dispatcher_delivers_dead_letter_to_handler() { - // NOTE: the dead-letter creation path (`dispatcher::handle_failure` → - // `DeadLetterRepo::insert`) writes the `dead_letters` row but does not - // appear to enqueue deliveries for `dead_letter`-kind triggers - // (`TriggerRepo::list_matching_dead_letter` has no production caller — - // see HANDBACK latent-findings). So this test asserts the wired - // behavior: a failing handler that exhausts its (single) attempt - // produces a dead-letter row. If/when DL→handler fan-out lands, this - // can be upgraded to assert the handler marker like the others. + // v1.1.7: the dead-letter fan-out is now wired + // (`dispatcher::handle_failure` → `list_matching_dead_letter` → + // outbox). This asserts BOTH that the `dead_letters` row is written + // AND that the registered `dead_letter`-kind handler actually fires + // (it was silently non-functional v1.1.1–v1.1.6). let Some(pool) = pool_or_skip().await else { return; }; let (server, app_id) = server_for(pool.clone(), "dl").await; + let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await; + setup_dead_letter(&server, &app_id, &handler).await; + + // Row written. + assert!( + poll_dead_letter_count(&pool, &app_id, 1).await > 0, + "a dead-letter row should have been produced" + ); + // Handler fired. + let event = poll_marker(&pool, &app_id) + .await + .expect("dead-letter handler fired"); + assert_eq!(event["source"], "dead_letter"); +} + +#[tokio::test] +async fn dispatcher_delivers_dead_letter_to_handler_actually_fires() { + // Focused on the handler-fire side: the marker handler receives a + // fully-shaped dead-letter event (the original KV event nested under + // `ctx.event.dead_letter.original`, plus the failure metadata). + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool.clone(), "dlfire").await; + let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await; + setup_dead_letter(&server, &app_id, &handler).await; + + let event = poll_marker(&pool, &app_id) + .await + .expect("dead-letter handler fired"); + assert_eq!(event["source"], "dead_letter"); + // The original KV event is nested verbatim. + assert_eq!(event["dead_letter"]["original"]["source"], "kv"); + assert_eq!( + event["dead_letter"]["original"]["kv"]["collection"], + "dlsrc" + ); + // Failure metadata is present. + assert!(event["dead_letter"]["last_error"] + .as_str() + .unwrap() + .contains("boom")); + assert!(event["dead_letter"]["attempts"].as_i64().unwrap() >= 1); +} + +#[tokio::test] +async fn dead_letter_source_filter_excludes_nonmatching() { + // `list_matching_dead_letter` filters by source (among trigger_id / + // script_id). A dead_letter trigger whose `source_filter` is "docs" + // must NOT fire for a "kv"-sourced dead-letter — the row is still + // written, but no handler delivery is enqueued. + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool.clone(), "dlfilter").await; + let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await; - // A handler that always throws, with a single attempt so it - // dead-letters immediately (no retry backoff). let failing = create_script(&server, &app_id, "dl-failing", r#"throw "boom";"#).await; server .post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv")) @@ -331,6 +436,12 @@ async fn dispatcher_delivers_dead_letter_to_handler() { })) .await .assert_status(axum::http::StatusCode::CREATED); + // Filter to a different source so this handler must NOT match. + server + .post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter")) + .json(&json!({ "script_id": handler, "source_filter": "docs" })) + .await + .assert_status(axum::http::StatusCode::CREATED); let source = create_script( &server, @@ -341,19 +452,38 @@ async fn dispatcher_delivers_dead_letter_to_handler() { .await; execute(&server, &source).await; - // Poll the dead_letters table for this app. - let app_uuid = Uuid::parse_str(&app_id).unwrap(); - let mut count: i64 = 0; - for _ in 0..100 { - count = sqlx::query_scalar("SELECT COUNT(*) FROM dead_letters WHERE app_id = $1") - .bind(app_uuid) - .fetch_one(&pool) - .await - .expect("count dead_letters"); - if count > 0 { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - assert!(count > 0, "a dead-letter row should have been produced"); + // The dead-letter row is written… + assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1); + // …but the source-filtered handler never fires. + let marker = poll_marker_n(&pool, &app_id, 8).await; + assert!( + marker.is_none(), + "source_filter='docs' must not fire for a kv dead-letter" + ); +} + +#[tokio::test] +async fn dead_letter_handler_failure_does_not_recurse() { + // Recursion-stop (design notes §4): a dead_letter handler that itself + // throws must NOT produce a second dead-letter row. The + // `is_dead_letter_handler` short-circuit annotates the original row + // and drops the outbox row without re-dead-lettering. + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool.clone(), "dlrec").await; + // The DL handler itself throws. + let throwing = create_script(&server, &app_id, "dl-throws", r#"throw "handler boom";"#).await; + setup_dead_letter(&server, &app_id, &throwing).await; + + // One dead-letter row appears (the original). Give the throwing + // handler time to run + (not) recurse, then confirm the count stayed + // at exactly 1. + assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1); + tokio::time::sleep(Duration::from_millis(800)).await; + assert_eq!( + dead_letter_count(&pool, &app_id).await, + 1, + "a failing dead-letter handler must not create a new dead-letter row" + ); }