fix(v1.1.7-dead-letter): wire dispatcher → list_matching_dead_letter
dead_letter triggers have been registerable since v1.1.1 but their handlers never fired: dispatcher::handle_failure wrote the dead_letters row and stopped — list_matching_dead_letter had no production caller. Any deploy v1.1.1–v1.1.6 with dead_letter triggers had silently non-functional handlers. The fix: after the dead-letter row is inserted on retry exhaustion, fan out to matching dead_letter triggers (filtered by source / originating trigger_id / script_id) and enqueue one outbox row per match carrying a real-shape TriggerEvent::DeadLetter (the §6 brief field names were stale — used the actual variant: dead_letter_id, original: Box<TriggerEvent>, attempts, last_error, trigger_id, script_id, first/last_attempt_at). The recursion-stop (a handler's own failure isn't re-dead-lettered) is upheld by the existing is_dead_letter_handler short-circuit. Tests (DB-gated): handler actually fires with the nested original event; existing row-create test now also asserts handler-fire; source_filter excludes non-matching; failing handler does not recurse. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,19 +23,19 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::{DateTime, Utc};
|
||||||
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
|
use picloud_executor_core::{ExecError, ExecRequest, ExecResponse, InvocationType};
|
||||||
use picloud_orchestrator_core::{ExecutionGate, ExecutorClient};
|
use picloud_orchestrator_core::{ExecutionGate, ExecutorClient};
|
||||||
use picloud_shared::{
|
use picloud_shared::{
|
||||||
ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome, InboxFailureKind,
|
DeadLetterId, ExecResponseSummary, ExecutionId, HttpDispatchPayload, InboxDeliveryOutcome,
|
||||||
InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent,
|
InboxFailureKind, InboxResolver, InboxResult, RequestId, ScriptId, ScriptSandbox, TriggerEvent,
|
||||||
};
|
};
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution};
|
use crate::abandoned_repo::{AbandonedRepo, NewAbandonedExecution};
|
||||||
use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter};
|
use crate::dead_letter_repo::{DeadLetterRepo, NewDeadLetter};
|
||||||
use crate::outbox_repo::{OutboxRepo, OutboxRow, OutboxSourceKind};
|
use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxRow, OutboxSourceKind};
|
||||||
use crate::principal_resolver::PrincipalResolver;
|
use crate::principal_resolver::PrincipalResolver;
|
||||||
use crate::repo::ScriptRepository;
|
use crate::repo::ScriptRepository;
|
||||||
use crate::trigger_config::{BackoffShape, TriggerConfig};
|
use crate::trigger_config::{BackoffShape, TriggerConfig};
|
||||||
@@ -463,12 +463,12 @@ impl Dispatcher {
|
|||||||
// Exhausted retries → dead-letter.
|
// Exhausted retries → dead-letter.
|
||||||
let (op, source) = describe_event(&row.payload);
|
let (op, source) = describe_event(&row.payload);
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
if let Err(e) = self
|
let dl_id = match self
|
||||||
.dead_letters
|
.dead_letters
|
||||||
.insert(NewDeadLetter {
|
.insert(NewDeadLetter {
|
||||||
app_id: row.app_id,
|
app_id: row.app_id,
|
||||||
original_event_id: row.id,
|
original_event_id: row.id,
|
||||||
source,
|
source: source.clone(),
|
||||||
op,
|
op,
|
||||||
trigger_id: row.trigger_id,
|
trigger_id: row.trigger_id,
|
||||||
script_id: Some(resolved.script_id),
|
script_id: Some(resolved.script_id),
|
||||||
@@ -480,8 +480,26 @@ impl Dispatcher {
|
|||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
tracing::error!(?e, "failed to write dead-letter row");
|
Ok(id) => Some(id),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(?e, "failed to write dead-letter row");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// v1.1.7 fix: fan the dead-letter out to matching handler triggers.
|
||||||
|
// This was missing since v1.1.1 — the row was written but
|
||||||
|
// `list_matching_dead_letter` had no production caller, so
|
||||||
|
// registered dead_letter handlers never fired. The recursion-stop
|
||||||
|
// (a dead-letter handler's own failure is not re-dead-lettered)
|
||||||
|
// is upheld by the `is_dead_letter_handler` short-circuit at the
|
||||||
|
// top of this function, so this fan-out is only reached for
|
||||||
|
// non-handler executions.
|
||||||
|
if let Some(dl_id) = dl_id {
|
||||||
|
self.fan_out_dead_letter(&row, &resolved, dl_id, &source, attempt, &err, now)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.outbox
|
self.outbox
|
||||||
.delete(row.id)
|
.delete(row.id)
|
||||||
.await
|
.await
|
||||||
@@ -489,6 +507,82 @@ impl Dispatcher {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Enqueue one outbox row per matching `dead_letter` trigger so its
|
||||||
|
/// handler script runs with the dead-letter event as `ctx.event`.
|
||||||
|
/// Best-effort: a lookup/insert failure is logged, not propagated
|
||||||
|
/// (the dead-letter row itself is already durably written).
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
async fn fan_out_dead_letter(
|
||||||
|
&self,
|
||||||
|
row: &OutboxRow,
|
||||||
|
resolved: &ResolvedTrigger,
|
||||||
|
dead_letter_id: DeadLetterId,
|
||||||
|
source: &str,
|
||||||
|
attempt: u32,
|
||||||
|
err: &ExecError,
|
||||||
|
now: DateTime<Utc>,
|
||||||
|
) {
|
||||||
|
// The DL event nests the original verbatim; if the payload can't
|
||||||
|
// be decoded back into a TriggerEvent we can't build the nested
|
||||||
|
// `original`, so skip the fan-out (the DL row is still written).
|
||||||
|
let Ok(original) = serde_json::from_value::<TriggerEvent>(row.payload.clone()) else {
|
||||||
|
tracing::warn!(
|
||||||
|
outbox_id = %row.id,
|
||||||
|
"dead-letter payload is not a TriggerEvent; skipping handler fan-out"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let matches = match self
|
||||||
|
.triggers
|
||||||
|
.list_matching_dead_letter(row.app_id, source, row.trigger_id, Some(resolved.script_id))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(?e, "dead-letter trigger lookup failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
for m in matches {
|
||||||
|
let event = TriggerEvent::DeadLetter {
|
||||||
|
dead_letter_id,
|
||||||
|
original: Box::new(original.clone()),
|
||||||
|
attempts: attempt,
|
||||||
|
last_error: err.to_string(),
|
||||||
|
trigger_id: row.trigger_id,
|
||||||
|
script_id: Some(resolved.script_id),
|
||||||
|
first_attempt_at: row.created_at,
|
||||||
|
last_attempt_at: now,
|
||||||
|
};
|
||||||
|
let payload = match serde_json::to_value(&event) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(?e, "failed to serialize dead-letter event");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if let Err(e) = self
|
||||||
|
.outbox
|
||||||
|
.insert(NewOutboxRow {
|
||||||
|
app_id: row.app_id,
|
||||||
|
source_kind: OutboxSourceKind::DeadLetter,
|
||||||
|
trigger_id: Some(m.trigger_id),
|
||||||
|
script_id: Some(m.script_id),
|
||||||
|
reply_to: None,
|
||||||
|
payload,
|
||||||
|
origin_principal: Some(m.registered_by_principal),
|
||||||
|
trigger_depth: row.trigger_depth.saturating_add(1),
|
||||||
|
root_execution_id: row.root_execution_id,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(?e, "failed to enqueue dead-letter handler delivery");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) {
|
async fn deliver_inbox(&self, row: &OutboxRow, inbox_id: Uuid, result: InboxResult) {
|
||||||
match self.inbox.deliver(inbox_id, result.clone()).await {
|
match self.inbox.deliver(inbox_id, result.clone()).await {
|
||||||
InboxDeliveryOutcome::Delivered => {}
|
InboxDeliveryOutcome::Delivered => {}
|
||||||
|
|||||||
@@ -303,23 +303,128 @@ async fn dispatcher_delivers_pubsub_to_handler() {
|
|||||||
assert_eq!(event["pubsub"]["message"]["hello"], 1);
|
assert_eq!(event["pubsub"]["message"]["hello"], 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Count dead_letters rows for an app.
|
||||||
|
async fn dead_letter_count(pool: &PgPool, app_id: &str) -> i64 {
|
||||||
|
let app_uuid = Uuid::parse_str(app_id).unwrap();
|
||||||
|
sqlx::query_scalar("SELECT COUNT(*) FROM dead_letters WHERE app_id = $1")
|
||||||
|
.bind(app_uuid)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
.expect("count dead_letters")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn poll_dead_letter_count(pool: &PgPool, app_id: &str, want: i64) -> i64 {
|
||||||
|
let mut count = 0;
|
||||||
|
for _ in 0..100 {
|
||||||
|
count = dead_letter_count(pool, app_id).await;
|
||||||
|
if count >= want {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a failing KV trigger on `dlsrc` (single attempt → immediate
|
||||||
|
/// dead-letter) and a `dead_letter` trigger pointing at the marker
|
||||||
|
/// handler, then cause the originating KV event. Returns when set up.
|
||||||
|
async fn setup_dead_letter(server: &TestServer, app_id: &str, dl_handler: &str) {
|
||||||
|
let failing = create_script(server, app_id, "dl-failing", r#"throw "boom";"#).await;
|
||||||
|
server
|
||||||
|
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
|
||||||
|
.json(&json!({
|
||||||
|
"script_id": failing,
|
||||||
|
"collection_glob": "dlsrc",
|
||||||
|
"retry_max_attempts": 1,
|
||||||
|
"retry_base_ms": 0
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.assert_status(axum::http::StatusCode::CREATED);
|
||||||
|
// The dead_letter trigger (no filters → matches any dead-letter).
|
||||||
|
server
|
||||||
|
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter"))
|
||||||
|
.json(&json!({ "script_id": dl_handler }))
|
||||||
|
.await
|
||||||
|
.assert_status(axum::http::StatusCode::CREATED);
|
||||||
|
|
||||||
|
let source = create_script(
|
||||||
|
server,
|
||||||
|
app_id,
|
||||||
|
"dl-source",
|
||||||
|
r#"kv::collection("dlsrc").set("k", 1); #{ ok: true }"#,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
execute(server, source.as_str()).await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn dispatcher_delivers_dead_letter_to_handler() {
|
async fn dispatcher_delivers_dead_letter_to_handler() {
|
||||||
// NOTE: the dead-letter creation path (`dispatcher::handle_failure` →
|
// v1.1.7: the dead-letter fan-out is now wired
|
||||||
// `DeadLetterRepo::insert`) writes the `dead_letters` row but does not
|
// (`dispatcher::handle_failure` → `list_matching_dead_letter` →
|
||||||
// appear to enqueue deliveries for `dead_letter`-kind triggers
|
// outbox). This asserts BOTH that the `dead_letters` row is written
|
||||||
// (`TriggerRepo::list_matching_dead_letter` has no production caller —
|
// AND that the registered `dead_letter`-kind handler actually fires
|
||||||
// see HANDBACK latent-findings). So this test asserts the wired
|
// (it was silently non-functional v1.1.1–v1.1.6).
|
||||||
// behavior: a failing handler that exhausts its (single) attempt
|
|
||||||
// produces a dead-letter row. If/when DL→handler fan-out lands, this
|
|
||||||
// can be upgraded to assert the handler marker like the others.
|
|
||||||
let Some(pool) = pool_or_skip().await else {
|
let Some(pool) = pool_or_skip().await else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
let (server, app_id) = server_for(pool.clone(), "dl").await;
|
let (server, app_id) = server_for(pool.clone(), "dl").await;
|
||||||
|
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
|
||||||
|
setup_dead_letter(&server, &app_id, &handler).await;
|
||||||
|
|
||||||
|
// Row written.
|
||||||
|
assert!(
|
||||||
|
poll_dead_letter_count(&pool, &app_id, 1).await > 0,
|
||||||
|
"a dead-letter row should have been produced"
|
||||||
|
);
|
||||||
|
// Handler fired.
|
||||||
|
let event = poll_marker(&pool, &app_id)
|
||||||
|
.await
|
||||||
|
.expect("dead-letter handler fired");
|
||||||
|
assert_eq!(event["source"], "dead_letter");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatcher_delivers_dead_letter_to_handler_actually_fires() {
|
||||||
|
// Focused on the handler-fire side: the marker handler receives a
|
||||||
|
// fully-shaped dead-letter event (the original KV event nested under
|
||||||
|
// `ctx.event.dead_letter.original`, plus the failure metadata).
|
||||||
|
let Some(pool) = pool_or_skip().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let (server, app_id) = server_for(pool.clone(), "dlfire").await;
|
||||||
|
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
|
||||||
|
setup_dead_letter(&server, &app_id, &handler).await;
|
||||||
|
|
||||||
|
let event = poll_marker(&pool, &app_id)
|
||||||
|
.await
|
||||||
|
.expect("dead-letter handler fired");
|
||||||
|
assert_eq!(event["source"], "dead_letter");
|
||||||
|
// The original KV event is nested verbatim.
|
||||||
|
assert_eq!(event["dead_letter"]["original"]["source"], "kv");
|
||||||
|
assert_eq!(
|
||||||
|
event["dead_letter"]["original"]["kv"]["collection"],
|
||||||
|
"dlsrc"
|
||||||
|
);
|
||||||
|
// Failure metadata is present.
|
||||||
|
assert!(event["dead_letter"]["last_error"]
|
||||||
|
.as_str()
|
||||||
|
.unwrap()
|
||||||
|
.contains("boom"));
|
||||||
|
assert!(event["dead_letter"]["attempts"].as_i64().unwrap() >= 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dead_letter_source_filter_excludes_nonmatching() {
|
||||||
|
// `list_matching_dead_letter` filters by source (among trigger_id /
|
||||||
|
// script_id). A dead_letter trigger whose `source_filter` is "docs"
|
||||||
|
// must NOT fire for a "kv"-sourced dead-letter — the row is still
|
||||||
|
// written, but no handler delivery is enqueued.
|
||||||
|
let Some(pool) = pool_or_skip().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let (server, app_id) = server_for(pool.clone(), "dlfilter").await;
|
||||||
|
let handler = create_script(&server, &app_id, "dl-handler", MARKER_HANDLER).await;
|
||||||
|
|
||||||
// A handler that always throws, with a single attempt so it
|
|
||||||
// dead-letters immediately (no retry backoff).
|
|
||||||
let failing = create_script(&server, &app_id, "dl-failing", r#"throw "boom";"#).await;
|
let failing = create_script(&server, &app_id, "dl-failing", r#"throw "boom";"#).await;
|
||||||
server
|
server
|
||||||
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
|
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv"))
|
||||||
@@ -331,6 +436,12 @@ async fn dispatcher_delivers_dead_letter_to_handler() {
|
|||||||
}))
|
}))
|
||||||
.await
|
.await
|
||||||
.assert_status(axum::http::StatusCode::CREATED);
|
.assert_status(axum::http::StatusCode::CREATED);
|
||||||
|
// Filter to a different source so this handler must NOT match.
|
||||||
|
server
|
||||||
|
.post(&format!("/api/v1/admin/apps/{app_id}/triggers/dead_letter"))
|
||||||
|
.json(&json!({ "script_id": handler, "source_filter": "docs" }))
|
||||||
|
.await
|
||||||
|
.assert_status(axum::http::StatusCode::CREATED);
|
||||||
|
|
||||||
let source = create_script(
|
let source = create_script(
|
||||||
&server,
|
&server,
|
||||||
@@ -341,19 +452,38 @@ async fn dispatcher_delivers_dead_letter_to_handler() {
|
|||||||
.await;
|
.await;
|
||||||
execute(&server, &source).await;
|
execute(&server, &source).await;
|
||||||
|
|
||||||
// Poll the dead_letters table for this app.
|
// The dead-letter row is written…
|
||||||
let app_uuid = Uuid::parse_str(&app_id).unwrap();
|
assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1);
|
||||||
let mut count: i64 = 0;
|
// …but the source-filtered handler never fires.
|
||||||
for _ in 0..100 {
|
let marker = poll_marker_n(&pool, &app_id, 8).await;
|
||||||
count = sqlx::query_scalar("SELECT COUNT(*) FROM dead_letters WHERE app_id = $1")
|
assert!(
|
||||||
.bind(app_uuid)
|
marker.is_none(),
|
||||||
.fetch_one(&pool)
|
"source_filter='docs' must not fire for a kv dead-letter"
|
||||||
.await
|
);
|
||||||
.expect("count dead_letters");
|
}
|
||||||
if count > 0 {
|
|
||||||
break;
|
#[tokio::test]
|
||||||
}
|
async fn dead_letter_handler_failure_does_not_recurse() {
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
// Recursion-stop (design notes §4): a dead_letter handler that itself
|
||||||
}
|
// throws must NOT produce a second dead-letter row. The
|
||||||
assert!(count > 0, "a dead-letter row should have been produced");
|
// `is_dead_letter_handler` short-circuit annotates the original row
|
||||||
|
// and drops the outbox row without re-dead-lettering.
|
||||||
|
let Some(pool) = pool_or_skip().await else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let (server, app_id) = server_for(pool.clone(), "dlrec").await;
|
||||||
|
// The DL handler itself throws.
|
||||||
|
let throwing = create_script(&server, &app_id, "dl-throws", r#"throw "handler boom";"#).await;
|
||||||
|
setup_dead_letter(&server, &app_id, &throwing).await;
|
||||||
|
|
||||||
|
// One dead-letter row appears (the original). Give the throwing
|
||||||
|
// handler time to run + (not) recurse, then confirm the count stayed
|
||||||
|
// at exactly 1.
|
||||||
|
assert!(poll_dead_letter_count(&pool, &app_id, 1).await >= 1);
|
||||||
|
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||||
|
assert_eq!(
|
||||||
|
dead_letter_count(&pool, &app_id).await,
|
||||||
|
1,
|
||||||
|
"a failing dead-letter handler must not create a new dead-letter row"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user