feat(v1.1.1-dead-letters): service + Rhai SDK + admin endpoints
`PostgresDeadLetterService` lands as the real `DeadLetterService`
impl, replacing `NoopDeadLetterService` in the picloud binary's
`Services` bundle. Both methods are gated by
`Capability::AppDeadLetterManage(AppId)` — public-HTTP scripts with
`principal: None` fail the check, per design notes §4.
- `dead_letters::replay(id)` (Rhai SDK + admin endpoint): re-inserts
the original event payload into the outbox with attempt_count=0,
reply_to=None. The DL row is marked `resolution='replayed'`.
- `dead_letters::resolve(id, reason)` (Rhai SDK + admin endpoint):
closes the row with `resolved_at = NOW()` and the given reason.
CHECK constraint on the column enforces the 4-value vocabulary.
- `dead_letters::list(filter)` is intentionally NOT shipped —
design notes §4 defers it to v1.2 to align with the eventual
`docs::find()` query DSL.
Admin endpoints under `/api/v1/admin/apps/{id}/dead_letters/*`:
- `GET /` (with `?unresolved=true`) → list view
- `GET /count` → unresolved-count badge
- `GET /{dl_id}` → row detail (full payload + error)
- `POST /{dl_id}/replay` → re-enqueue
- `POST /{dl_id}/resolve` body `{reason}` → close out
All cross-app-aware: the row's `app_id` is compared against the path
param so a caller with rights on app A cannot manipulate app B's
dead letters by id alone.
The Rhai bridge for `dead_letters::*` follows the same sync↔async
pattern as the `kv::` bridge (`Handle::current().block_on(...)`
inside the spawn_blocking-wrapped Rhai engine).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
84
crates/executor-core/src/sdk/dead_letters.rs
Normal file
84
crates/executor-core/src/sdk/dead_letters.rs
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
//! `dead_letters::` Rhai bridge.
|
||||||
|
//!
|
||||||
|
//! ```rhai
|
||||||
|
//! dead_letters::replay("01234567-..."); // re-enqueue + mark replayed
|
||||||
|
//! dead_letters::resolve("01234567-...", "ignored"); // close out the row
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! Sync↔async via `Handle::current().block_on(...)` — same pattern as
|
||||||
|
//! the `kv::` bridge (works because `LocalExecutorClient` runs the
|
||||||
|
//! script under `spawn_blocking`).
|
||||||
|
//!
|
||||||
|
//! `dead_letters::list(filter)` is intentionally NOT shipped — design
|
||||||
|
//! notes §4 defers it to v1.2 to align with the `docs::find()` query
|
||||||
|
//! DSL.
|
||||||
|
|
||||||
|
use std::str::FromStr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use picloud_shared::{DeadLetterError, DeadLetterId, SdkCallCx, Services};
|
||||||
|
use rhai::{Engine as RhaiEngine, EvalAltResult, Module};
|
||||||
|
use tokio::runtime::Handle as TokioHandle;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
|
||||||
|
let svc = services.dead_letters.clone();
|
||||||
|
let mut module = Module::new();
|
||||||
|
{
|
||||||
|
let svc = svc.clone();
|
||||||
|
let cx = cx.clone();
|
||||||
|
module.set_native_fn(
|
||||||
|
"replay",
|
||||||
|
move |id: &str| -> Result<(), Box<EvalAltResult>> {
|
||||||
|
let dl_id = parse_dl_id(id)?;
|
||||||
|
let svc = svc.clone();
|
||||||
|
let cx = cx.clone();
|
||||||
|
block_on(async move { svc.replay(&cx, dl_id).await })
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let svc = svc.clone();
|
||||||
|
let cx = cx.clone();
|
||||||
|
module.set_native_fn(
|
||||||
|
"resolve",
|
||||||
|
move |id: &str, reason: &str| -> Result<(), Box<EvalAltResult>> {
|
||||||
|
let dl_id = parse_dl_id(id)?;
|
||||||
|
let reason = reason.to_string();
|
||||||
|
let svc = svc.clone();
|
||||||
|
let cx = cx.clone();
|
||||||
|
block_on(async move { svc.resolve(&cx, dl_id, &reason).await })
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
engine.register_static_module("dead_letters", module.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_dl_id(s: &str) -> Result<DeadLetterId, Box<EvalAltResult>> {
|
||||||
|
Uuid::from_str(s)
|
||||||
|
.map(DeadLetterId::from)
|
||||||
|
.map_err(|e| -> Box<EvalAltResult> {
|
||||||
|
EvalAltResult::ErrorRuntime(
|
||||||
|
format!("dead_letters: invalid id {s:?}: {e}").into(),
|
||||||
|
rhai::Position::NONE,
|
||||||
|
)
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn block_on<F>(fut: F) -> Result<(), Box<EvalAltResult>>
|
||||||
|
where
|
||||||
|
F: std::future::Future<Output = Result<(), DeadLetterError>> + Send,
|
||||||
|
{
|
||||||
|
let handle = TokioHandle::try_current().map_err(|e| -> Box<EvalAltResult> {
|
||||||
|
EvalAltResult::ErrorRuntime(
|
||||||
|
format!("dead_letters: no tokio runtime available: {e}").into(),
|
||||||
|
rhai::Position::NONE,
|
||||||
|
)
|
||||||
|
.into()
|
||||||
|
})?;
|
||||||
|
handle.block_on(fut).map_err(|err| -> Box<EvalAltResult> {
|
||||||
|
EvalAltResult::ErrorRuntime(format!("dead_letters: {err}").into(), rhai::Position::NONE)
|
||||||
|
.into()
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
pub mod bridge;
|
pub mod bridge;
|
||||||
pub mod cx;
|
pub mod cx;
|
||||||
|
pub mod dead_letters;
|
||||||
pub mod kv;
|
pub mod kv;
|
||||||
pub mod stdlib;
|
pub mod stdlib;
|
||||||
|
|
||||||
@@ -32,6 +33,5 @@ use rhai::Engine as RhaiEngine;
|
|||||||
/// single `<service>::register(...)` line per service.
|
/// single `<service>::register(...)` line per service.
|
||||||
pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
|
pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc<SdkCallCx>) {
|
||||||
kv::register(engine, services, cx.clone());
|
kv::register(engine, services, cx.clone());
|
||||||
// v1.1.1 commit 8: dead_letters::register(engine, services, cx.clone());
|
dead_letters::register(engine, services, cx);
|
||||||
let _ = cx;
|
|
||||||
}
|
}
|
||||||
|
|||||||
118
crates/manager-core/src/dead_letter_service.rs
Normal file
118
crates/manager-core/src/dead_letter_service.rs
Normal file
@@ -0,0 +1,118 @@
|
|||||||
|
//! `PostgresDeadLetterService` — replaces `NoopDeadLetterService` in
|
||||||
|
//! v1.1.1's `Services` bundle. Implements `replay` (re-enqueue the
|
||||||
|
//! original event into the outbox + mark the DL row replayed) and
|
||||||
|
//! `resolve` (close the row out with a reason).
|
||||||
|
//!
|
||||||
|
//! Both methods are gated by `Capability::AppDeadLetterManage(AppId)`
|
||||||
|
//! evaluated against `cx.principal`. Public-HTTP scripts with
|
||||||
|
//! `principal: None` fail the check — design notes §4: managing
|
||||||
|
//! dead letters is an admin act.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use picloud_shared::{DeadLetterError, DeadLetterId, DeadLetterService, SdkCallCx};
|
||||||
|
|
||||||
|
use crate::authz::{self, AuthzRepo, Capability};
|
||||||
|
use crate::dead_letter_repo::{DeadLetterRepo, DeadLetterRepoError, DeadLetterRow};
|
||||||
|
use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind};
|
||||||
|
|
||||||
|
pub struct PostgresDeadLetterService {
|
||||||
|
repo: Arc<dyn DeadLetterRepo>,
|
||||||
|
outbox: Arc<dyn OutboxRepo>,
|
||||||
|
authz: Arc<dyn AuthzRepo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PostgresDeadLetterService {
|
||||||
|
#[must_use]
|
||||||
|
pub fn new(
|
||||||
|
repo: Arc<dyn DeadLetterRepo>,
|
||||||
|
outbox: Arc<dyn OutboxRepo>,
|
||||||
|
authz: Arc<dyn AuthzRepo>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
repo,
|
||||||
|
outbox,
|
||||||
|
authz,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn require_dl_capability(&self, cx: &SdkCallCx) -> Result<(), DeadLetterError> {
|
||||||
|
let Some(ref principal) = cx.principal else {
|
||||||
|
return Err(DeadLetterError::Forbidden);
|
||||||
|
};
|
||||||
|
authz::require(
|
||||||
|
&*self.authz,
|
||||||
|
principal,
|
||||||
|
Capability::AppDeadLetterManage(cx.app_id),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|_| DeadLetterError::Forbidden)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_row(&self, id: DeadLetterId) -> Result<DeadLetterRow, DeadLetterError> {
|
||||||
|
self.repo
|
||||||
|
.get(id)
|
||||||
|
.await
|
||||||
|
.map_err(map_repo_err)?
|
||||||
|
.ok_or(DeadLetterError::NotFound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DeadLetterService for PostgresDeadLetterService {
|
||||||
|
async fn replay(&self, cx: &SdkCallCx, id: DeadLetterId) -> Result<(), DeadLetterError> {
|
||||||
|
self.require_dl_capability(cx).await?;
|
||||||
|
let row = self.load_row(id).await?;
|
||||||
|
if row.app_id != cx.app_id {
|
||||||
|
// Cross-app — treat as not-found to avoid leaking
|
||||||
|
// information about other apps' dead letters.
|
||||||
|
return Err(DeadLetterError::NotFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
let source_kind = OutboxSourceKind::from_wire(&row.source).unwrap_or(OutboxSourceKind::Kv);
|
||||||
|
self.outbox
|
||||||
|
.insert(NewOutboxRow {
|
||||||
|
app_id: row.app_id,
|
||||||
|
source_kind,
|
||||||
|
trigger_id: row.trigger_id,
|
||||||
|
script_id: row.script_id,
|
||||||
|
reply_to: None,
|
||||||
|
payload: row.payload.clone(),
|
||||||
|
origin_principal: None,
|
||||||
|
trigger_depth: 0,
|
||||||
|
root_execution_id: None,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| DeadLetterError::Backend(e.to_string()))?;
|
||||||
|
|
||||||
|
self.repo
|
||||||
|
.resolve(id, "replayed")
|
||||||
|
.await
|
||||||
|
.map_err(map_repo_err)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve(
|
||||||
|
&self,
|
||||||
|
cx: &SdkCallCx,
|
||||||
|
id: DeadLetterId,
|
||||||
|
reason: &str,
|
||||||
|
) -> Result<(), DeadLetterError> {
|
||||||
|
self.require_dl_capability(cx).await?;
|
||||||
|
let row = self.load_row(id).await?;
|
||||||
|
if row.app_id != cx.app_id {
|
||||||
|
return Err(DeadLetterError::NotFound);
|
||||||
|
}
|
||||||
|
self.repo.resolve(id, reason).await.map_err(map_repo_err)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_repo_err(e: DeadLetterRepoError) -> DeadLetterError {
|
||||||
|
match e {
|
||||||
|
DeadLetterRepoError::NotFound(_) => DeadLetterError::NotFound,
|
||||||
|
DeadLetterRepoError::InvalidResolution(s) => DeadLetterError::InvalidResolution(s),
|
||||||
|
DeadLetterRepoError::Db(e) => DeadLetterError::Backend(e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
316
crates/manager-core/src/dead_letters_api.rs
Normal file
316
crates/manager-core/src/dead_letters_api.rs
Normal file
@@ -0,0 +1,316 @@
|
|||||||
|
//! `/api/v1/admin/apps/{id}/dead_letters/*` — dashboard surface for
|
||||||
|
//! the no-default-handler model (design notes §4).
|
||||||
|
//!
|
||||||
|
//! Endpoints:
|
||||||
|
//! - `GET /apps/{id}/dead_letters?unresolved=true` — list view
|
||||||
|
//! - `GET /apps/{id}/dead_letters/count` — badge count
|
||||||
|
//! - `GET /apps/{id}/dead_letters/{dl_id}` — row detail
|
||||||
|
//! - `POST /apps/{id}/dead_letters/{dl_id}/replay` — re-enqueue
|
||||||
|
//! - `POST /apps/{id}/dead_letters/{dl_id}/resolve` — mark resolved
|
||||||
|
//!
|
||||||
|
//! All gated on `Capability::AppDeadLetterManage(app_id)`.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::extract::{Path, Query, State};
|
||||||
|
use axum::http::StatusCode;
|
||||||
|
use axum::response::{IntoResponse, Json, Response};
|
||||||
|
use axum::routing::{get, post};
|
||||||
|
use axum::{Extension, Router};
|
||||||
|
use picloud_shared::{AppId, DeadLetterId, DeadLetterService, Principal, SdkCallCx};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
use crate::app_repo::AppRepository;
|
||||||
|
use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability};
|
||||||
|
use crate::dead_letter_repo::{DeadLetterRepo, DeadLetterRepoError, DeadLetterRow};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct DeadLettersState {
|
||||||
|
pub repo: Arc<dyn DeadLetterRepo>,
|
||||||
|
pub service: Arc<dyn DeadLetterService>,
|
||||||
|
pub apps: Arc<dyn AppRepository>,
|
||||||
|
pub authz: Arc<dyn AuthzRepo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dead_letters_router(state: DeadLettersState) -> Router {
|
||||||
|
Router::new()
|
||||||
|
.route("/apps/{app_id}/dead_letters", get(list))
|
||||||
|
.route("/apps/{app_id}/dead_letters/count", get(count))
|
||||||
|
.route("/apps/{app_id}/dead_letters/{dl_id}", get(detail))
|
||||||
|
.route("/apps/{app_id}/dead_letters/{dl_id}/replay", post(replay))
|
||||||
|
.route("/apps/{app_id}/dead_letters/{dl_id}/resolve", post(resolve))
|
||||||
|
.with_state(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ListQuery {
|
||||||
|
#[serde(default)]
|
||||||
|
pub unresolved: bool,
|
||||||
|
#[serde(default = "default_limit")]
|
||||||
|
pub limit: i64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub offset: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
const fn default_limit() -> i64 {
|
||||||
|
50
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct ListResponse {
|
||||||
|
pub dead_letters: Vec<DeadLetterDto>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct CountResponse {
|
||||||
|
pub unresolved: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct ResolveBody {
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct DeadLetterDto {
|
||||||
|
pub id: DeadLetterId,
|
||||||
|
pub app_id: AppId,
|
||||||
|
pub source: String,
|
||||||
|
pub op: String,
|
||||||
|
pub trigger_id: Option<picloud_shared::TriggerId>,
|
||||||
|
pub script_id: Option<picloud_shared::ScriptId>,
|
||||||
|
pub payload: serde_json::Value,
|
||||||
|
pub attempt_count: u32,
|
||||||
|
pub first_attempt_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
pub last_attempt_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
pub last_error: String,
|
||||||
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
pub resolved_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||||
|
pub resolution: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<DeadLetterRow> for DeadLetterDto {
|
||||||
|
fn from(r: DeadLetterRow) -> Self {
|
||||||
|
Self {
|
||||||
|
id: r.id,
|
||||||
|
app_id: r.app_id,
|
||||||
|
source: r.source,
|
||||||
|
op: r.op,
|
||||||
|
trigger_id: r.trigger_id,
|
||||||
|
script_id: r.script_id,
|
||||||
|
payload: r.payload,
|
||||||
|
attempt_count: r.attempt_count,
|
||||||
|
first_attempt_at: r.first_attempt_at,
|
||||||
|
last_attempt_at: r.last_attempt_at,
|
||||||
|
last_error: r.last_error,
|
||||||
|
created_at: r.created_at,
|
||||||
|
resolved_at: r.resolved_at,
|
||||||
|
resolution: r.resolution,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list(
|
||||||
|
State(s): State<DeadLettersState>,
|
||||||
|
Extension(principal): Extension<Principal>,
|
||||||
|
Path(app_id): Path<AppId>,
|
||||||
|
Query(q): Query<ListQuery>,
|
||||||
|
) -> Result<Json<ListResponse>, DeadLettersApiError> {
|
||||||
|
ensure_app(&*s.apps, app_id).await?;
|
||||||
|
require(
|
||||||
|
s.authz.as_ref(),
|
||||||
|
&principal,
|
||||||
|
Capability::AppDeadLetterManage(app_id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let rows = s
|
||||||
|
.repo
|
||||||
|
.list_for_app(app_id, q.unresolved, q.limit.clamp(1, 200), q.offset.max(0))
|
||||||
|
.await?;
|
||||||
|
Ok(Json(ListResponse {
|
||||||
|
dead_letters: rows.into_iter().map(Into::into).collect(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn count(
|
||||||
|
State(s): State<DeadLettersState>,
|
||||||
|
Extension(principal): Extension<Principal>,
|
||||||
|
Path(app_id): Path<AppId>,
|
||||||
|
) -> Result<Json<CountResponse>, DeadLettersApiError> {
|
||||||
|
ensure_app(&*s.apps, app_id).await?;
|
||||||
|
require(
|
||||||
|
s.authz.as_ref(),
|
||||||
|
&principal,
|
||||||
|
Capability::AppDeadLetterManage(app_id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let n = s.repo.unresolved_count(app_id).await?;
|
||||||
|
Ok(Json(CountResponse { unresolved: n }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn detail(
|
||||||
|
State(s): State<DeadLettersState>,
|
||||||
|
Extension(principal): Extension<Principal>,
|
||||||
|
Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>,
|
||||||
|
) -> Result<Json<DeadLetterDto>, DeadLettersApiError> {
|
||||||
|
ensure_app(&*s.apps, app_id).await?;
|
||||||
|
require(
|
||||||
|
s.authz.as_ref(),
|
||||||
|
&principal,
|
||||||
|
Capability::AppDeadLetterManage(app_id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let row = s
|
||||||
|
.repo
|
||||||
|
.get(dl_id)
|
||||||
|
.await?
|
||||||
|
.ok_or(DeadLettersApiError::NotFound(dl_id))?;
|
||||||
|
if row.app_id != app_id {
|
||||||
|
return Err(DeadLettersApiError::NotFound(dl_id));
|
||||||
|
}
|
||||||
|
Ok(Json(row.into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn replay(
|
||||||
|
State(s): State<DeadLettersState>,
|
||||||
|
Extension(principal): Extension<Principal>,
|
||||||
|
Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>,
|
||||||
|
) -> Result<StatusCode, DeadLettersApiError> {
|
||||||
|
ensure_app(&*s.apps, app_id).await?;
|
||||||
|
// Authz handled inside the service via SdkCallCx.
|
||||||
|
let cx = admin_cx(app_id, &principal);
|
||||||
|
s.service
|
||||||
|
.replay(&cx, dl_id)
|
||||||
|
.await
|
||||||
|
.map_err(map_service_err)?;
|
||||||
|
Ok(StatusCode::NO_CONTENT)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn resolve(
|
||||||
|
State(s): State<DeadLettersState>,
|
||||||
|
Extension(principal): Extension<Principal>,
|
||||||
|
Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>,
|
||||||
|
Json(body): Json<ResolveBody>,
|
||||||
|
) -> Result<StatusCode, DeadLettersApiError> {
|
||||||
|
ensure_app(&*s.apps, app_id).await?;
|
||||||
|
let cx = admin_cx(app_id, &principal);
|
||||||
|
s.service
|
||||||
|
.resolve(&cx, dl_id, &body.reason)
|
||||||
|
.await
|
||||||
|
.map_err(map_service_err)?;
|
||||||
|
Ok(StatusCode::NO_CONTENT)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Synthesize an `SdkCallCx` for the admin path. The service layer
|
||||||
|
/// reads `cx.app_id` + `cx.principal` and ignores the trigger /
|
||||||
|
/// execution fields, so the per-call ids are arbitrary.
|
||||||
|
fn admin_cx(app_id: AppId, principal: &Principal) -> SdkCallCx {
|
||||||
|
SdkCallCx {
|
||||||
|
app_id,
|
||||||
|
principal: Some(principal.clone()),
|
||||||
|
execution_id: picloud_shared::ExecutionId::new(),
|
||||||
|
request_id: picloud_shared::RequestId::new(),
|
||||||
|
trigger_depth: 0,
|
||||||
|
root_execution_id: picloud_shared::ExecutionId::new(),
|
||||||
|
is_dead_letter_handler: false,
|
||||||
|
event: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ensure_app(apps: &dyn AppRepository, app_id: AppId) -> Result<(), DeadLettersApiError> {
|
||||||
|
apps.get_by_id(app_id)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DeadLettersApiError::Backend(e.to_string()))?
|
||||||
|
.ok_or_else(|| DeadLettersApiError::AppNotFound(app_id.to_string()))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn map_service_err(e: picloud_shared::DeadLetterError) -> DeadLettersApiError {
|
||||||
|
match e {
|
||||||
|
picloud_shared::DeadLetterError::NotFound => {
|
||||||
|
DeadLettersApiError::NotFound(DeadLetterId::new())
|
||||||
|
}
|
||||||
|
picloud_shared::DeadLetterError::Forbidden => DeadLettersApiError::Forbidden,
|
||||||
|
picloud_shared::DeadLetterError::InvalidResolution(s) => {
|
||||||
|
DeadLettersApiError::Invalid(format!("invalid resolution: {s}"))
|
||||||
|
}
|
||||||
|
picloud_shared::DeadLetterError::Backend(s) => DeadLettersApiError::Backend(s),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum DeadLettersApiError {
|
||||||
|
#[error("app not found: {0}")]
|
||||||
|
AppNotFound(String),
|
||||||
|
|
||||||
|
#[error("dead-letter not found: {0}")]
|
||||||
|
NotFound(DeadLetterId),
|
||||||
|
|
||||||
|
#[error("invalid: {0}")]
|
||||||
|
Invalid(String),
|
||||||
|
|
||||||
|
#[error("forbidden")]
|
||||||
|
Forbidden,
|
||||||
|
|
||||||
|
#[error("authorization repo error: {0}")]
|
||||||
|
AuthzRepo(String),
|
||||||
|
|
||||||
|
#[error("dead-letter backend: {0}")]
|
||||||
|
Backend(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<AuthzDenied> for DeadLettersApiError {
|
||||||
|
fn from(d: AuthzDenied) -> Self {
|
||||||
|
match d {
|
||||||
|
AuthzDenied::Denied => Self::Forbidden,
|
||||||
|
AuthzDenied::Repo(e) => Self::AuthzRepo(e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<AuthzError> for DeadLettersApiError {
|
||||||
|
fn from(e: AuthzError) -> Self {
|
||||||
|
Self::AuthzRepo(e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<DeadLetterRepoError> for DeadLettersApiError {
|
||||||
|
fn from(e: DeadLetterRepoError) -> Self {
|
||||||
|
match e {
|
||||||
|
DeadLetterRepoError::NotFound(id) => Self::NotFound(id),
|
||||||
|
DeadLetterRepoError::InvalidResolution(s) => Self::Invalid(s),
|
||||||
|
DeadLetterRepoError::Db(e) => Self::Backend(e.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for DeadLettersApiError {
|
||||||
|
fn into_response(self) -> Response {
|
||||||
|
let (status, body) = match &self {
|
||||||
|
Self::AppNotFound(_) | Self::NotFound(_) => {
|
||||||
|
(StatusCode::NOT_FOUND, json!({ "error": self.to_string() }))
|
||||||
|
}
|
||||||
|
Self::Invalid(_) => (
|
||||||
|
StatusCode::UNPROCESSABLE_ENTITY,
|
||||||
|
json!({ "error": self.to_string() }),
|
||||||
|
),
|
||||||
|
Self::Forbidden => (StatusCode::FORBIDDEN, json!({ "error": self.to_string() })),
|
||||||
|
Self::AuthzRepo(e) => {
|
||||||
|
tracing::error!(error = %e, "dead_letters authz repo error");
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
json!({ "error": "internal error" }),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Self::Backend(e) => {
|
||||||
|
tracing::error!(error = %e, "dead_letters api backend error");
|
||||||
|
(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
json!({ "error": "internal error" }),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
(status, Json(body)).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -23,6 +23,8 @@ pub mod auth_bootstrap;
|
|||||||
pub mod auth_middleware;
|
pub mod auth_middleware;
|
||||||
pub mod authz;
|
pub mod authz;
|
||||||
pub mod dead_letter_repo;
|
pub mod dead_letter_repo;
|
||||||
|
pub mod dead_letter_service;
|
||||||
|
pub mod dead_letters_api;
|
||||||
pub mod dispatcher;
|
pub mod dispatcher;
|
||||||
pub mod kv_repo;
|
pub mod kv_repo;
|
||||||
pub mod kv_service;
|
pub mod kv_service;
|
||||||
@@ -80,6 +82,8 @@ pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, De
|
|||||||
pub use dead_letter_repo::{
|
pub use dead_letter_repo::{
|
||||||
DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo,
|
DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo,
|
||||||
};
|
};
|
||||||
|
pub use dead_letter_service::PostgresDeadLetterService;
|
||||||
|
pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState};
|
||||||
pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError};
|
pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError};
|
||||||
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
|
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
|
||||||
pub use kv_service::KvServiceImpl;
|
pub use kv_service::KvServiceImpl;
|
||||||
|
|||||||
@@ -11,14 +11,15 @@ use axum::{routing::get, Json, Router};
|
|||||||
use picloud_executor_core::{Engine, Limits};
|
use picloud_executor_core::{Engine, Limits};
|
||||||
use picloud_manager_core::{
|
use picloud_manager_core::{
|
||||||
admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router,
|
admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router,
|
||||||
attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated,
|
attach_principal_if_present, auth_router, compile_routes, dead_letters_router, migrations,
|
||||||
route_admin_router, triggers_router, AbandonedRepo, AdminPrincipalResolver,
|
require_authenticated, route_admin_router, triggers_router, AbandonedRepo,
|
||||||
AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository,
|
AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState,
|
||||||
ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository,
|
ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState,
|
||||||
AppsState, AuthState, AuthzRepo, DeadLetterRepo, Dispatcher, KvServiceImpl, OutboxEventEmitter,
|
AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher,
|
||||||
OutboxRepo, PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository,
|
KvServiceImpl, OutboxEventEmitter, OutboxRepo, PostgresAbandonedRepo,
|
||||||
PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository,
|
PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository,
|
||||||
PostgresAppRepository, PostgresDeadLetterRepo, PostgresExecutionLogRepository,
|
PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository,
|
||||||
|
PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresExecutionLogRepository,
|
||||||
PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresRouteRepository,
|
PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresRouteRepository,
|
||||||
PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, RepoResolver,
|
PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, RepoResolver,
|
||||||
RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo,
|
RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo,
|
||||||
@@ -30,9 +31,8 @@ use picloud_orchestrator_core::{
|
|||||||
LocalExecutorClient,
|
LocalExecutorClient,
|
||||||
};
|
};
|
||||||
use picloud_shared::{
|
use picloud_shared::{
|
||||||
ExecutionLogSink, InboxResolver, KvService, NoopDeadLetterService, OutboxWriter,
|
DeadLetterService, ExecutionLogSink, InboxResolver, KvService, OutboxWriter, ScriptValidator,
|
||||||
ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION,
|
ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION,
|
||||||
WIRE_VERSION,
|
|
||||||
};
|
};
|
||||||
use sqlx::postgres::PgPoolOptions;
|
use sqlx::postgres::PgPoolOptions;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
@@ -119,10 +119,9 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|||||||
let abandoned_repo: Arc<dyn AbandonedRepo> = Arc::new(PostgresAbandonedRepo::new(pool.clone()));
|
let abandoned_repo: Arc<dyn AbandonedRepo> = Arc::new(PostgresAbandonedRepo::new(pool.clone()));
|
||||||
let trigger_config = TriggerConfig::from_env();
|
let trigger_config = TriggerConfig::from_env();
|
||||||
|
|
||||||
// SDK services bundle. v1.1.1 ships the KV store and the
|
// SDK services bundle. v1.1.1 ships the KV store + the
|
||||||
// outbox-backed event emitter; `NoopDeadLetterService` is a v1.1.1
|
// outbox-backed event emitter + the dead-letter service (replay /
|
||||||
// stub that errors loudly until the real `PostgresDeadLetterService`
|
// resolve).
|
||||||
// ships (commit 8).
|
|
||||||
let kv_repo = Arc::new(PostgresKvRepo::new(pool));
|
let kv_repo = Arc::new(PostgresKvRepo::new(pool));
|
||||||
let events: Arc<dyn ServiceEventEmitter> = Arc::new(OutboxEventEmitter::new(
|
let events: Arc<dyn ServiceEventEmitter> = Arc::new(OutboxEventEmitter::new(
|
||||||
trigger_repo.clone(),
|
trigger_repo.clone(),
|
||||||
@@ -130,7 +129,12 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|||||||
));
|
));
|
||||||
let kv: Arc<dyn KvService> =
|
let kv: Arc<dyn KvService> =
|
||||||
Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone()));
|
Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone()));
|
||||||
let services = Services::new(kv, Arc::new(NoopDeadLetterService), events);
|
let dl_service: Arc<dyn DeadLetterService> = Arc::new(PostgresDeadLetterService::new(
|
||||||
|
dl_repo.clone(),
|
||||||
|
outbox_repo.clone(),
|
||||||
|
authz.clone(),
|
||||||
|
));
|
||||||
|
let services = Services::new(kv, dl_service.clone(), events);
|
||||||
let engine = Arc::new(Engine::new(Limits::default(), services));
|
let engine = Arc::new(Engine::new(Limits::default(), services));
|
||||||
|
|
||||||
// Compile the routes table once at startup; admin writes refresh it.
|
// Compile the routes table once at startup; admin writes refresh it.
|
||||||
@@ -216,16 +220,20 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|||||||
inbox: inbox_registry,
|
inbox: inbox_registry,
|
||||||
outbox: outbox_writer,
|
outbox: outbox_writer,
|
||||||
};
|
};
|
||||||
// Silence unused-import warnings for repos handed to the
|
// Commit 10 wires abandoned_repo into the GC sweeper.
|
||||||
// dispatcher in this commit; commit 8 wires them into the
|
let _ = &abandoned_repo;
|
||||||
// dead-letters admin endpoints and commit 10 into the GC sweeper.
|
|
||||||
let _ = (&dl_repo, &abandoned_repo);
|
|
||||||
let triggers_state = TriggersState {
|
let triggers_state = TriggersState {
|
||||||
triggers: trigger_repo,
|
triggers: trigger_repo,
|
||||||
apps: apps_repo.clone(),
|
apps: apps_repo.clone(),
|
||||||
authz: authz.clone(),
|
authz: authz.clone(),
|
||||||
config: trigger_config,
|
config: trigger_config,
|
||||||
};
|
};
|
||||||
|
let dead_letters_state = DeadLettersState {
|
||||||
|
repo: dl_repo,
|
||||||
|
service: dl_service,
|
||||||
|
apps: apps_repo.clone(),
|
||||||
|
authz: authz.clone(),
|
||||||
|
};
|
||||||
let apps_state = AppsState {
|
let apps_state = AppsState {
|
||||||
apps: apps_repo,
|
apps: apps_repo,
|
||||||
domains: domains_repo,
|
domains: domains_repo,
|
||||||
@@ -268,6 +276,7 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result<Router> {
|
|||||||
.merge(app_members_router(app_members_state))
|
.merge(app_members_router(app_members_state))
|
||||||
.merge(api_keys_router(api_keys_state))
|
.merge(api_keys_router(api_keys_state))
|
||||||
.merge(triggers_router(triggers_state))
|
.merge(triggers_router(triggers_state))
|
||||||
|
.merge(dead_letters_router(dead_letters_state))
|
||||||
.layer(from_fn_with_state(
|
.layer(from_fn_with_state(
|
||||||
auth_state.clone(),
|
auth_state.clone(),
|
||||||
require_authenticated,
|
require_authenticated,
|
||||||
|
|||||||
Reference in New Issue
Block a user