diff --git a/crates/executor-core/src/sdk/dead_letters.rs b/crates/executor-core/src/sdk/dead_letters.rs new file mode 100644 index 0000000..a9e3505 --- /dev/null +++ b/crates/executor-core/src/sdk/dead_letters.rs @@ -0,0 +1,84 @@ +//! `dead_letters::` Rhai bridge. +//! +//! ```rhai +//! dead_letters::replay("01234567-..."); // re-enqueue + mark replayed +//! dead_letters::resolve("01234567-...", "ignored"); // close out the row +//! ``` +//! +//! Sync↔async via `Handle::current().block_on(...)` — same pattern as +//! the `kv::` bridge (works because `LocalExecutorClient` runs the +//! script under `spawn_blocking`). +//! +//! `dead_letters::list(filter)` is intentionally NOT shipped — design +//! notes §4 defers it to v1.2 to align with the `docs::find()` query +//! DSL. + +use std::str::FromStr; +use std::sync::Arc; + +use picloud_shared::{DeadLetterError, DeadLetterId, SdkCallCx, Services}; +use rhai::{Engine as RhaiEngine, EvalAltResult, Module}; +use tokio::runtime::Handle as TokioHandle; +use uuid::Uuid; + +pub(super) fn register(engine: &mut RhaiEngine, services: &Services, cx: Arc) { + let svc = services.dead_letters.clone(); + let mut module = Module::new(); + { + let svc = svc.clone(); + let cx = cx.clone(); + module.set_native_fn( + "replay", + move |id: &str| -> Result<(), Box> { + let dl_id = parse_dl_id(id)?; + let svc = svc.clone(); + let cx = cx.clone(); + block_on(async move { svc.replay(&cx, dl_id).await }) + }, + ); + } + { + let svc = svc.clone(); + let cx = cx.clone(); + module.set_native_fn( + "resolve", + move |id: &str, reason: &str| -> Result<(), Box> { + let dl_id = parse_dl_id(id)?; + let reason = reason.to_string(); + let svc = svc.clone(); + let cx = cx.clone(); + block_on(async move { svc.resolve(&cx, dl_id, &reason).await }) + }, + ); + } + engine.register_static_module("dead_letters", module.into()); +} + +fn parse_dl_id(s: &str) -> Result> { + Uuid::from_str(s) + .map(DeadLetterId::from) + .map_err(|e| -> Box { + EvalAltResult::ErrorRuntime( + format!("dead_letters: invalid id {s:?}: {e}").into(), + rhai::Position::NONE, + ) + .into() + }) +} + +fn block_on(fut: F) -> Result<(), Box> +where + F: std::future::Future> + Send, +{ + let handle = TokioHandle::try_current().map_err(|e| -> Box { + EvalAltResult::ErrorRuntime( + format!("dead_letters: no tokio runtime available: {e}").into(), + rhai::Position::NONE, + ) + .into() + })?; + handle.block_on(fut).map_err(|err| -> Box { + EvalAltResult::ErrorRuntime(format!("dead_letters: {err}").into(), rhai::Position::NONE) + .into() + }) +} diff --git a/crates/executor-core/src/sdk/mod.rs b/crates/executor-core/src/sdk/mod.rs index a445414..56f6d47 100644 --- a/crates/executor-core/src/sdk/mod.rs +++ b/crates/executor-core/src/sdk/mod.rs @@ -13,6 +13,7 @@ pub mod bridge; pub mod cx; +pub mod dead_letters; pub mod kv; pub mod stdlib; @@ -32,6 +33,5 @@ use rhai::Engine as RhaiEngine; /// single `::register(...)` line per service. pub fn register_all(engine: &mut RhaiEngine, services: &Services, cx: Arc) { kv::register(engine, services, cx.clone()); - // v1.1.1 commit 8: dead_letters::register(engine, services, cx.clone()); - let _ = cx; + dead_letters::register(engine, services, cx); } diff --git a/crates/manager-core/src/dead_letter_service.rs b/crates/manager-core/src/dead_letter_service.rs new file mode 100644 index 0000000..2bd46bf --- /dev/null +++ b/crates/manager-core/src/dead_letter_service.rs @@ -0,0 +1,118 @@ +//! `PostgresDeadLetterService` — replaces `NoopDeadLetterService` in +//! v1.1.1's `Services` bundle. Implements `replay` (re-enqueue the +//! original event into the outbox + mark the DL row replayed) and +//! `resolve` (close the row out with a reason). +//! +//! Both methods are gated by `Capability::AppDeadLetterManage(AppId)` +//! evaluated against `cx.principal`. Public-HTTP scripts with +//! `principal: None` fail the check — design notes §4: managing +//! dead letters is an admin act. + +use std::sync::Arc; + +use async_trait::async_trait; +use picloud_shared::{DeadLetterError, DeadLetterId, DeadLetterService, SdkCallCx}; + +use crate::authz::{self, AuthzRepo, Capability}; +use crate::dead_letter_repo::{DeadLetterRepo, DeadLetterRepoError, DeadLetterRow}; +use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind}; + +pub struct PostgresDeadLetterService { + repo: Arc, + outbox: Arc, + authz: Arc, +} + +impl PostgresDeadLetterService { + #[must_use] + pub fn new( + repo: Arc, + outbox: Arc, + authz: Arc, + ) -> Self { + Self { + repo, + outbox, + authz, + } + } + + async fn require_dl_capability(&self, cx: &SdkCallCx) -> Result<(), DeadLetterError> { + let Some(ref principal) = cx.principal else { + return Err(DeadLetterError::Forbidden); + }; + authz::require( + &*self.authz, + principal, + Capability::AppDeadLetterManage(cx.app_id), + ) + .await + .map_err(|_| DeadLetterError::Forbidden) + } + + async fn load_row(&self, id: DeadLetterId) -> Result { + self.repo + .get(id) + .await + .map_err(map_repo_err)? + .ok_or(DeadLetterError::NotFound) + } +} + +#[async_trait] +impl DeadLetterService for PostgresDeadLetterService { + async fn replay(&self, cx: &SdkCallCx, id: DeadLetterId) -> Result<(), DeadLetterError> { + self.require_dl_capability(cx).await?; + let row = self.load_row(id).await?; + if row.app_id != cx.app_id { + // Cross-app — treat as not-found to avoid leaking + // information about other apps' dead letters. + return Err(DeadLetterError::NotFound); + } + + let source_kind = OutboxSourceKind::from_wire(&row.source).unwrap_or(OutboxSourceKind::Kv); + self.outbox + .insert(NewOutboxRow { + app_id: row.app_id, + source_kind, + trigger_id: row.trigger_id, + script_id: row.script_id, + reply_to: None, + payload: row.payload.clone(), + origin_principal: None, + trigger_depth: 0, + root_execution_id: None, + }) + .await + .map_err(|e| DeadLetterError::Backend(e.to_string()))?; + + self.repo + .resolve(id, "replayed") + .await + .map_err(map_repo_err)?; + Ok(()) + } + + async fn resolve( + &self, + cx: &SdkCallCx, + id: DeadLetterId, + reason: &str, + ) -> Result<(), DeadLetterError> { + self.require_dl_capability(cx).await?; + let row = self.load_row(id).await?; + if row.app_id != cx.app_id { + return Err(DeadLetterError::NotFound); + } + self.repo.resolve(id, reason).await.map_err(map_repo_err)?; + Ok(()) + } +} + +fn map_repo_err(e: DeadLetterRepoError) -> DeadLetterError { + match e { + DeadLetterRepoError::NotFound(_) => DeadLetterError::NotFound, + DeadLetterRepoError::InvalidResolution(s) => DeadLetterError::InvalidResolution(s), + DeadLetterRepoError::Db(e) => DeadLetterError::Backend(e.to_string()), + } +} diff --git a/crates/manager-core/src/dead_letters_api.rs b/crates/manager-core/src/dead_letters_api.rs new file mode 100644 index 0000000..30219f4 --- /dev/null +++ b/crates/manager-core/src/dead_letters_api.rs @@ -0,0 +1,316 @@ +//! `/api/v1/admin/apps/{id}/dead_letters/*` — dashboard surface for +//! the no-default-handler model (design notes §4). +//! +//! Endpoints: +//! - `GET /apps/{id}/dead_letters?unresolved=true` — list view +//! - `GET /apps/{id}/dead_letters/count` — badge count +//! - `GET /apps/{id}/dead_letters/{dl_id}` — row detail +//! - `POST /apps/{id}/dead_letters/{dl_id}/replay` — re-enqueue +//! - `POST /apps/{id}/dead_letters/{dl_id}/resolve` — mark resolved +//! +//! All gated on `Capability::AppDeadLetterManage(app_id)`. + +use std::sync::Arc; + +use axum::extract::{Path, Query, State}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Json, Response}; +use axum::routing::{get, post}; +use axum::{Extension, Router}; +use picloud_shared::{AppId, DeadLetterId, DeadLetterService, Principal, SdkCallCx}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::app_repo::AppRepository; +use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability}; +use crate::dead_letter_repo::{DeadLetterRepo, DeadLetterRepoError, DeadLetterRow}; + +#[derive(Clone)] +pub struct DeadLettersState { + pub repo: Arc, + pub service: Arc, + pub apps: Arc, + pub authz: Arc, +} + +pub fn dead_letters_router(state: DeadLettersState) -> Router { + Router::new() + .route("/apps/{app_id}/dead_letters", get(list)) + .route("/apps/{app_id}/dead_letters/count", get(count)) + .route("/apps/{app_id}/dead_letters/{dl_id}", get(detail)) + .route("/apps/{app_id}/dead_letters/{dl_id}/replay", post(replay)) + .route("/apps/{app_id}/dead_letters/{dl_id}/resolve", post(resolve)) + .with_state(state) +} + +#[derive(Debug, Deserialize)] +pub struct ListQuery { + #[serde(default)] + pub unresolved: bool, + #[serde(default = "default_limit")] + pub limit: i64, + #[serde(default)] + pub offset: i64, +} + +const fn default_limit() -> i64 { + 50 +} + +#[derive(Debug, Serialize)] +pub struct ListResponse { + pub dead_letters: Vec, +} + +#[derive(Debug, Serialize)] +pub struct CountResponse { + pub unresolved: i64, +} + +#[derive(Debug, Deserialize)] +pub struct ResolveBody { + pub reason: String, +} + +#[derive(Debug, Serialize)] +pub struct DeadLetterDto { + pub id: DeadLetterId, + pub app_id: AppId, + pub source: String, + pub op: String, + pub trigger_id: Option, + pub script_id: Option, + pub payload: serde_json::Value, + pub attempt_count: u32, + pub first_attempt_at: chrono::DateTime, + pub last_attempt_at: chrono::DateTime, + pub last_error: String, + pub created_at: chrono::DateTime, + pub resolved_at: Option>, + pub resolution: Option, +} + +impl From for DeadLetterDto { + fn from(r: DeadLetterRow) -> Self { + Self { + id: r.id, + app_id: r.app_id, + source: r.source, + op: r.op, + trigger_id: r.trigger_id, + script_id: r.script_id, + payload: r.payload, + attempt_count: r.attempt_count, + first_attempt_at: r.first_attempt_at, + last_attempt_at: r.last_attempt_at, + last_error: r.last_error, + created_at: r.created_at, + resolved_at: r.resolved_at, + resolution: r.resolution, + } + } +} + +async fn list( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, + Query(q): Query, +) -> Result, DeadLettersApiError> { + ensure_app(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppDeadLetterManage(app_id), + ) + .await?; + let rows = s + .repo + .list_for_app(app_id, q.unresolved, q.limit.clamp(1, 200), q.offset.max(0)) + .await?; + Ok(Json(ListResponse { + dead_letters: rows.into_iter().map(Into::into).collect(), + })) +} + +async fn count( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, +) -> Result, DeadLettersApiError> { + ensure_app(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppDeadLetterManage(app_id), + ) + .await?; + let n = s.repo.unresolved_count(app_id).await?; + Ok(Json(CountResponse { unresolved: n })) +} + +async fn detail( + State(s): State, + Extension(principal): Extension, + Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>, +) -> Result, DeadLettersApiError> { + ensure_app(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppDeadLetterManage(app_id), + ) + .await?; + let row = s + .repo + .get(dl_id) + .await? + .ok_or(DeadLettersApiError::NotFound(dl_id))?; + if row.app_id != app_id { + return Err(DeadLettersApiError::NotFound(dl_id)); + } + Ok(Json(row.into())) +} + +async fn replay( + State(s): State, + Extension(principal): Extension, + Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>, +) -> Result { + ensure_app(&*s.apps, app_id).await?; + // Authz handled inside the service via SdkCallCx. + let cx = admin_cx(app_id, &principal); + s.service + .replay(&cx, dl_id) + .await + .map_err(map_service_err)?; + Ok(StatusCode::NO_CONTENT) +} + +async fn resolve( + State(s): State, + Extension(principal): Extension, + Path((app_id, dl_id)): Path<(AppId, DeadLetterId)>, + Json(body): Json, +) -> Result { + ensure_app(&*s.apps, app_id).await?; + let cx = admin_cx(app_id, &principal); + s.service + .resolve(&cx, dl_id, &body.reason) + .await + .map_err(map_service_err)?; + Ok(StatusCode::NO_CONTENT) +} + +/// Synthesize an `SdkCallCx` for the admin path. The service layer +/// reads `cx.app_id` + `cx.principal` and ignores the trigger / +/// execution fields, so the per-call ids are arbitrary. +fn admin_cx(app_id: AppId, principal: &Principal) -> SdkCallCx { + SdkCallCx { + app_id, + principal: Some(principal.clone()), + execution_id: picloud_shared::ExecutionId::new(), + request_id: picloud_shared::RequestId::new(), + trigger_depth: 0, + root_execution_id: picloud_shared::ExecutionId::new(), + is_dead_letter_handler: false, + event: None, + } +} + +async fn ensure_app(apps: &dyn AppRepository, app_id: AppId) -> Result<(), DeadLettersApiError> { + apps.get_by_id(app_id) + .await + .map_err(|e| DeadLettersApiError::Backend(e.to_string()))? + .ok_or_else(|| DeadLettersApiError::AppNotFound(app_id.to_string()))?; + Ok(()) +} + +fn map_service_err(e: picloud_shared::DeadLetterError) -> DeadLettersApiError { + match e { + picloud_shared::DeadLetterError::NotFound => { + DeadLettersApiError::NotFound(DeadLetterId::new()) + } + picloud_shared::DeadLetterError::Forbidden => DeadLettersApiError::Forbidden, + picloud_shared::DeadLetterError::InvalidResolution(s) => { + DeadLettersApiError::Invalid(format!("invalid resolution: {s}")) + } + picloud_shared::DeadLetterError::Backend(s) => DeadLettersApiError::Backend(s), + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DeadLettersApiError { + #[error("app not found: {0}")] + AppNotFound(String), + + #[error("dead-letter not found: {0}")] + NotFound(DeadLetterId), + + #[error("invalid: {0}")] + Invalid(String), + + #[error("forbidden")] + Forbidden, + + #[error("authorization repo error: {0}")] + AuthzRepo(String), + + #[error("dead-letter backend: {0}")] + Backend(String), +} + +impl From for DeadLettersApiError { + fn from(d: AuthzDenied) -> Self { + match d { + AuthzDenied::Denied => Self::Forbidden, + AuthzDenied::Repo(e) => Self::AuthzRepo(e.to_string()), + } + } +} + +impl From for DeadLettersApiError { + fn from(e: AuthzError) -> Self { + Self::AuthzRepo(e.to_string()) + } +} + +impl From for DeadLettersApiError { + fn from(e: DeadLetterRepoError) -> Self { + match e { + DeadLetterRepoError::NotFound(id) => Self::NotFound(id), + DeadLetterRepoError::InvalidResolution(s) => Self::Invalid(s), + DeadLetterRepoError::Db(e) => Self::Backend(e.to_string()), + } + } +} + +impl IntoResponse for DeadLettersApiError { + fn into_response(self) -> Response { + let (status, body) = match &self { + Self::AppNotFound(_) | Self::NotFound(_) => { + (StatusCode::NOT_FOUND, json!({ "error": self.to_string() })) + } + Self::Invalid(_) => ( + StatusCode::UNPROCESSABLE_ENTITY, + json!({ "error": self.to_string() }), + ), + Self::Forbidden => (StatusCode::FORBIDDEN, json!({ "error": self.to_string() })), + Self::AuthzRepo(e) => { + tracing::error!(error = %e, "dead_letters authz repo error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "error": "internal error" }), + ) + } + Self::Backend(e) => { + tracing::error!(error = %e, "dead_letters api backend error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "error": "internal error" }), + ) + } + }; + (status, Json(body)).into_response() + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index 869d3de..f796d16 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -23,6 +23,8 @@ pub mod auth_bootstrap; pub mod auth_middleware; pub mod authz; pub mod dead_letter_repo; +pub mod dead_letter_service; +pub mod dead_letters_api; pub mod dispatcher; pub mod kv_repo; pub mod kv_service; @@ -80,6 +82,8 @@ pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, De pub use dead_letter_repo::{ DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo, }; +pub use dead_letter_service::PostgresDeadLetterService; +pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLettersState}; pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo}; pub use kv_service::KvServiceImpl; diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index e162ce5..7ccb36b 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -11,14 +11,15 @@ use axum::{routing::get, Json, Router}; use picloud_executor_core::{Engine, Limits}; use picloud_manager_core::{ admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, - attach_principal_if_present, auth_router, compile_routes, migrations, require_authenticated, - route_admin_router, triggers_router, AbandonedRepo, AdminPrincipalResolver, - AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, - ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository, - AppsState, AuthState, AuthzRepo, DeadLetterRepo, Dispatcher, KvServiceImpl, OutboxEventEmitter, - OutboxRepo, PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, - PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, - PostgresAppRepository, PostgresDeadLetterRepo, PostgresExecutionLogRepository, + attach_principal_if_present, auth_router, compile_routes, dead_letters_router, migrations, + require_authenticated, route_admin_router, triggers_router, AbandonedRepo, + AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, + ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, + AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher, + KvServiceImpl, OutboxEventEmitter, OutboxRepo, PostgresAbandonedRepo, + PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, + PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, + PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresRouteRepository, PostgresScriptRepository, PostgresTriggerRepo, PrincipalResolver, RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, TriggerConfig, TriggerRepo, @@ -30,9 +31,8 @@ use picloud_orchestrator_core::{ LocalExecutorClient, }; use picloud_shared::{ - ExecutionLogSink, InboxResolver, KvService, NoopDeadLetterService, OutboxWriter, - ScriptValidator, ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, - WIRE_VERSION, + DeadLetterService, ExecutionLogSink, InboxResolver, KvService, OutboxWriter, ScriptValidator, + ServiceEventEmitter, Services, API_VERSION, PRODUCT_VERSION, SDK_VERSION, WIRE_VERSION, }; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; @@ -119,10 +119,9 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { let abandoned_repo: Arc = Arc::new(PostgresAbandonedRepo::new(pool.clone())); let trigger_config = TriggerConfig::from_env(); - // SDK services bundle. v1.1.1 ships the KV store and the - // outbox-backed event emitter; `NoopDeadLetterService` is a v1.1.1 - // stub that errors loudly until the real `PostgresDeadLetterService` - // ships (commit 8). + // SDK services bundle. v1.1.1 ships the KV store + the + // outbox-backed event emitter + the dead-letter service (replay / + // resolve). let kv_repo = Arc::new(PostgresKvRepo::new(pool)); let events: Arc = Arc::new(OutboxEventEmitter::new( trigger_repo.clone(), @@ -130,7 +129,12 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { )); let kv: Arc = Arc::new(KvServiceImpl::new(kv_repo, authz.clone(), events.clone())); - let services = Services::new(kv, Arc::new(NoopDeadLetterService), events); + let dl_service: Arc = Arc::new(PostgresDeadLetterService::new( + dl_repo.clone(), + outbox_repo.clone(), + authz.clone(), + )); + let services = Services::new(kv, dl_service.clone(), events); let engine = Arc::new(Engine::new(Limits::default(), services)); // Compile the routes table once at startup; admin writes refresh it. @@ -216,16 +220,20 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { inbox: inbox_registry, outbox: outbox_writer, }; - // Silence unused-import warnings for repos handed to the - // dispatcher in this commit; commit 8 wires them into the - // dead-letters admin endpoints and commit 10 into the GC sweeper. - let _ = (&dl_repo, &abandoned_repo); + // Commit 10 wires abandoned_repo into the GC sweeper. + let _ = &abandoned_repo; let triggers_state = TriggersState { triggers: trigger_repo, apps: apps_repo.clone(), authz: authz.clone(), config: trigger_config, }; + let dead_letters_state = DeadLettersState { + repo: dl_repo, + service: dl_service, + apps: apps_repo.clone(), + authz: authz.clone(), + }; let apps_state = AppsState { apps: apps_repo, domains: domains_repo, @@ -268,6 +276,7 @@ pub async fn build_app(pool: PgPool, auth: AuthDeps) -> anyhow::Result { .merge(app_members_router(app_members_state)) .merge(api_keys_router(api_keys_state)) .merge(triggers_router(triggers_state)) + .merge(dead_letters_router(dead_letters_state)) .layer(from_fn_with_state( auth_state.clone(), require_authenticated,