From 1f78937dd2ad79bf124d9b69e3d031598f12393e Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Thu, 4 Jun 2026 22:24:35 +0200 Subject: [PATCH] feat(v1.1.7-email-inbound): webhook receiver + email:receive trigger MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Inbound email: a provider POSTs a normalized JSON message to POST /api/v1/email-inbound/{app_id}/{trigger_id}; the public receiver verifies the optional HMAC signature, builds a TriggerEvent::Email, and enqueues an outbox row the dispatcher delivers like any async trigger. Handlers see ctx.event.email = #{from,to,cc,subject,text,html, received_at,message_id}. - migration 0024: widen triggers.kind + outbox.source_kind CHECKs to 'email'; new email_trigger_details table. - TriggerKind::Email, TriggerDetails::Email{has_inbound_secret}, OutboxSourceKind::Email, TriggerEvent::Email; dispatcher routes the email row via the generic resolve_trigger path. - Admin POST /apps/{id}/triggers/email (validate_trigger_target; module + cross-app rejection). inbound_secret is stored ENCRYPTED via the master key (deviation from the brief's plaintext default; decrypted per inbound request — see HANDBACK §7). - Dashboard: email trigger form on the Triggers tab + webhook URL + expected-payload help. - 8 DB-gated e2e tests (202/401/404/422/cross-app/handler-fire) + receiver unit tests (HMAC verify, secret round-trip, payload parse). Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 5 + crates/executor-core/src/engine.rs | 34 ++ crates/executor-core/tests/sdk_email.rs | 54 ++- crates/manager-core/Cargo.toml | 3 + .../migrations/0024_email_triggers.sql | 32 ++ crates/manager-core/src/dispatcher.rs | 3 +- crates/manager-core/src/email_inbound_api.rs | 307 ++++++++++++++++++ crates/manager-core/src/lib.rs | 10 +- crates/manager-core/src/outbox_repo.rs | 4 + crates/manager-core/src/trigger_repo.rs | 162 +++++++++ crates/manager-core/src/triggers_api.rs | 130 +++++++- crates/picloud/Cargo.toml | 4 + crates/picloud/src/lib.rs | 44 ++- crates/picloud/tests/email_inbound.rs | 298 +++++++++++++++++ crates/shared/src/trigger_event.rs | 22 ++ dashboard/src/lib/api.ts | 24 +- dashboard/src/routes/apps/[slug]/+page.svelte | 91 ++++++ 17 files changed, 1194 insertions(+), 33 deletions(-) create mode 100644 crates/manager-core/migrations/0024_email_triggers.sql create mode 100644 crates/manager-core/src/email_inbound_api.rs create mode 100644 crates/picloud/tests/email_inbound.rs diff --git a/Cargo.lock b/Cargo.lock index 88da4fe..37d571e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1762,12 +1762,15 @@ dependencies = [ "axum-test", "chrono", "figment", + "hex", + "hmac", "picloud-executor-core", "picloud-manager-core", "picloud-orchestrator-core", "picloud-shared", "serde", "serde_json", + "sha2", "sqlx", "thiserror 1.0.69", "tokio", @@ -1859,6 +1862,8 @@ dependencies = [ "chrono-tz", "cron", "data-encoding", + "hex", + "hmac", "lettre", "picloud-executor-core", "picloud-orchestrator-core", diff --git a/crates/executor-core/src/engine.rs b/crates/executor-core/src/engine.rs index c9cb7e4..6a3b448 100644 --- a/crates/executor-core/src/engine.rs +++ b/crates/executor-core/src/engine.rs @@ -448,6 +448,40 @@ fn trigger_event_to_dynamic(event: &TriggerEvent) -> Dynamic { ps.insert("published_at".into(), published_at.to_rfc3339().into()); m.insert("pubsub".into(), ps.into()); } + TriggerEvent::Email { + from, + to, + cc, + subject, + text, + html, + received_at, + message_id, + } => { + // `ctx.event.op` is always "receive" for inbound email. + m.insert("op".into(), "receive".into()); + let mut em = Map::new(); + em.insert("from".into(), from.clone().into()); + let to_arr: rhai::Array = to.iter().map(|a| Dynamic::from(a.clone())).collect(); + em.insert("to".into(), to_arr.into()); + let cc_arr: rhai::Array = cc.iter().map(|a| Dynamic::from(a.clone())).collect(); + em.insert("cc".into(), cc_arr.into()); + em.insert("subject".into(), subject.clone().into()); + em.insert( + "text".into(), + text.clone().map_or(Dynamic::UNIT, Dynamic::from), + ); + em.insert( + "html".into(), + html.clone().map_or(Dynamic::UNIT, Dynamic::from), + ); + em.insert("received_at".into(), received_at.to_rfc3339().into()); + em.insert( + "message_id".into(), + message_id.clone().map_or(Dynamic::UNIT, Dynamic::from), + ); + m.insert("email".into(), em.into()); + } TriggerEvent::DeadLetter { dead_letter_id, original, diff --git a/crates/executor-core/tests/sdk_email.rs b/crates/executor-core/tests/sdk_email.rs index f80e4e3..8f2b64c 100644 --- a/crates/executor-core/tests/sdk_email.rs +++ b/crates/executor-core/tests/sdk_email.rs @@ -12,9 +12,9 @@ use picloud_executor_core::{Engine, ExecRequest, InvocationType, Limits}; use picloud_shared::{ AppId, EmailError, EmailService, ExecutionId, NoopDeadLetterService, NoopDocsService, NoopEventEmitter, NoopHttpService, NoopKvService, NoopModuleSource, OutboundEmail, RequestId, - ScriptId, ScriptSandbox, SdkCallCx, Services, + ScriptId, ScriptSandbox, SdkCallCx, Services, TriggerEvent, }; -use serde_json::Value; +use serde_json::{json, Value}; #[derive(Default)] struct RecordingEmail { @@ -142,6 +142,56 @@ async fn send_html_carries_both_parts_and_lists() { assert_eq!(e.html.as_deref(), Some("

rich

")); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_email_event_visible_to_handler() { + // A handler invoked by an email:receive trigger sees the normalized + // message at ctx.event.email (built by the engine's ctx renderer). + let rec = Arc::new(RecordingEmail::default()); + let engine = engine_with(rec); + let mut req = baseline_request(AppId::new()); + req.event = Some(TriggerEvent::Email { + from: "sender@external.com".into(), + to: vec!["alice@myapp.com".into()], + cc: vec!["bob@myapp.com".into()], + subject: "Re: question".into(), + text: Some("hello".into()), + html: None, + received_at: chrono::DateTime::parse_from_rfc3339("2026-08-15T12:00:00Z") + .unwrap() + .with_timezone(&chrono::Utc), + message_id: Some("".into()), + }); + let src = r#" + let e = ctx.event; + #{ + source: e.source, + op: e.op, + from: e.email.from, + to0: e.email.to[0], + cc0: e.email.cc[0], + subject: e.email.subject, + text: e.email.text, + html_is_unit: type_of(e.email.html) == "()", + message_id: e.email.message_id + } + "#; + let src = src.to_string(); + let body = tokio::task::spawn_blocking(move || engine.execute(&src, req)) + .await + .unwrap() + .unwrap() + .body; + assert_eq!(body["source"], json!("email")); + assert_eq!(body["op"], json!("receive")); + assert_eq!(body["from"], json!("sender@external.com")); + assert_eq!(body["to0"], json!("alice@myapp.com")); + assert_eq!(body["cc0"], json!("bob@myapp.com")); + assert_eq!(body["subject"], json!("Re: question")); + assert_eq!(body["text"], json!("hello")); + assert_eq!(body["html_is_unit"], json!(true)); + assert_eq!(body["message_id"], json!("")); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn send_html_without_html_throws() { let rec = Arc::new(RecordingEmail::default()); diff --git a/crates/manager-core/Cargo.toml b/crates/manager-core/Cargo.toml index 73f7b21..8de9825 100644 --- a/crates/manager-core/Cargo.toml +++ b/crates/manager-core/Cargo.toml @@ -31,6 +31,9 @@ reqwest.workspace = true argon2.workspace = true sha2.workspace = true +# HMAC-SHA256 verification of inbound-email provider signatures (v1.1.7). +hmac.workspace = true +hex.workspace = true base64.workspace = true data-encoding.workspace = true # Outbound SMTP email (v1.1.7 email::send / send_html). diff --git a/crates/manager-core/migrations/0024_email_triggers.sql b/crates/manager-core/migrations/0024_email_triggers.sql new file mode 100644 index 0000000..b73f6e5 --- /dev/null +++ b/crates/manager-core/migrations/0024_email_triggers.sql @@ -0,0 +1,32 @@ +-- v1.1.7: inbound email triggers (email:receive). +-- +-- A configured provider (Mailgun / Postmark / SendGrid / SES) POSTs +-- inbound email to POST /api/v1/email-inbound/{app_id}/{trigger_id}; +-- the receiver normalizes it into a TriggerEvent::Email and enqueues an +-- outbox row for the trigger's handler. v1.1.7 ships the webhook path; +-- a native SMTP listener is v1.3+. + +-- Widen the trigger-kind + outbox-source CHECK constraints to admit +-- 'email'. +ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check; +ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check + CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron', + 'files', 'pubsub', 'email')); + +ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check; +ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check + CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs', + 'cron', 'files', 'pubsub', 'email')); + +-- Per-trigger inbound config. The HMAC secret used to verify provider +-- signatures is stored ENCRYPTED at rest (AES-256-GCM under the process +-- master key) — a deviation from the original brief's plaintext column, +-- chosen to keep all operationally-secret material encrypted. The +-- receiver decrypts it per inbound request. NULL columns mean the +-- trigger has no signature verification (accepts any POST to its URL — +-- relies on URL secrecy). +CREATE TABLE email_trigger_details ( + trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE, + inbound_secret_encrypted BYTEA, -- ciphertext incl. GCM auth tag (NULL = unsigned) + inbound_secret_nonce BYTEA -- 12 bytes (NULL = unsigned) +); diff --git a/crates/manager-core/src/dispatcher.rs b/crates/manager-core/src/dispatcher.rs index a377160..78bdfe1 100644 --- a/crates/manager-core/src/dispatcher.rs +++ b/crates/manager-core/src/dispatcher.rs @@ -168,7 +168,8 @@ impl Dispatcher { | OutboxSourceKind::DeadLetter | OutboxSourceKind::Cron | OutboxSourceKind::Files - | OutboxSourceKind::Pubsub => { + | OutboxSourceKind::Pubsub + | OutboxSourceKind::Email => { let resolved = self.resolve_trigger(&row).await?; let req = match self.build_exec_request(&row, &resolved).await { Ok(req) => req, diff --git a/crates/manager-core/src/email_inbound_api.rs b/crates/manager-core/src/email_inbound_api.rs new file mode 100644 index 0000000..7ad1ac7 --- /dev/null +++ b/crates/manager-core/src/email_inbound_api.rs @@ -0,0 +1,307 @@ +//! `POST /api/v1/email-inbound/{app_id}/{trigger_id}` — the inbound-email +//! webhook receiver (v1.1.7). +//! +//! A configured provider (Mailgun / Postmark / SendGrid / SES) POSTs a +//! normalized JSON message here; the receiver verifies the optional HMAC +//! signature, builds a `TriggerEvent::Email`, and enqueues an outbox row +//! the dispatcher picks up like any other async trigger. +//! +//! This is a PUBLIC endpoint (no admin auth) — the trigger URL itself, +//! plus the per-trigger HMAC secret, are the security boundary. It is +//! mounted OUTSIDE the `require_authenticated` layer. +//! +//! Status codes: +//! * 202 — accepted + enqueued +//! * 401 — HMAC required but missing/invalid +//! * 404 — trigger missing, disabled, not `kind=email`, or app mismatch +//! * 422 — body is not the expected JSON shape +//! +//! Only the generic provider-agnostic JSON shape is accepted in v1.1.7 +//! (see [`InboundPayload`]); provider-specific unmarshallers are v1.2. + +use std::sync::Arc; + +use axum::body::Bytes; +use axum::extract::{Path, State}; +use axum::http::{HeaderMap, StatusCode}; +use axum::response::{IntoResponse, Json, Response}; +use axum::routing::post; +use axum::Router; +use hmac::{Hmac, Mac}; +use picloud_shared::{AppId, MasterKey, TriggerEvent, TriggerId}; +use serde::Deserialize; +use serde_json::json; +use sha2::Sha256; + +use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind}; +use crate::secrets_repo::StoredSecret; +use crate::secrets_service::open; +use crate::trigger_repo::TriggerRepo; + +type HmacSha256 = Hmac; + +/// Header the provider's HMAC signature is read from. The signature is +/// the lowercase hex of `HMAC-SHA256(inbound_secret, raw_body)`. +const SIGNATURE_HEADER: &str = "x-picloud-signature"; + +#[derive(Clone)] +pub struct EmailInboundState { + pub triggers: Arc, + pub outbox: Arc, + pub master_key: MasterKey, +} + +pub fn email_inbound_router(state: EmailInboundState) -> Router { + Router::new() + .route( + "/email-inbound/{app_id}/{trigger_id}", + post(receive_inbound_email), + ) + .with_state(state) +} + +/// The generic provider-agnostic inbound shape. Users configure their +/// provider's webhook templating to POST this. `from` is required; +/// everything else defaults. +#[derive(Debug, Deserialize)] +struct InboundPayload { + from: String, + #[serde(default)] + to: Vec, + #[serde(default)] + cc: Vec, + #[serde(default)] + subject: String, + #[serde(default)] + text: Option, + #[serde(default)] + html: Option, + #[serde(default)] + message_id: Option, +} + +async fn receive_inbound_email( + State(s): State, + Path((app_id, trigger_id)): Path<(AppId, TriggerId)>, + headers: HeaderMap, + body: Bytes, +) -> Result { + // Resolve the trigger. 404 covers missing / wrong-kind / cross-app / + // disabled — all "this URL doesn't address a live email trigger". + let target = s + .triggers + .email_inbound_target(trigger_id) + .await + .map_err(|e| EmailInboundError::Backend(e.to_string()))? + .ok_or(EmailInboundError::NotFound)?; + if target.app_id != app_id || !target.enabled { + return Err(EmailInboundError::NotFound); + } + + // HMAC verification (only when the trigger has a secret configured). + if let (Some(ct), Some(nonce)) = ( + target.inbound_secret_encrypted.as_ref(), + target.inbound_secret_nonce.as_ref(), + ) { + let secret = decrypt_secret(&s.master_key, ct, nonce)?; + verify_signature(&headers, &body, secret.as_bytes())?; + } + + // Parse the generic JSON shape. Malformed → 422. + let payload: InboundPayload = + serde_json::from_slice(&body).map_err(|e| EmailInboundError::Malformed(e.to_string()))?; + + let event = TriggerEvent::Email { + from: payload.from, + to: payload.to, + cc: payload.cc, + subject: payload.subject, + text: payload.text, + html: payload.html, + received_at: chrono::Utc::now(), + message_id: payload.message_id, + }; + let payload_json = serde_json::to_value(&event) + .map_err(|e| EmailInboundError::Backend(format!("serialize event: {e}")))?; + + s.outbox + .insert(NewOutboxRow { + app_id, + source_kind: OutboxSourceKind::Email, + trigger_id: Some(trigger_id), + script_id: Some(target.script_id), + reply_to: None, + payload: payload_json, + origin_principal: Some(target.registered_by_principal), + // Inbound email is the root of a trigger chain (depth 1). + trigger_depth: 1, + root_execution_id: None, + }) + .await + .map_err(|e| EmailInboundError::Backend(e.to_string()))?; + + Ok(StatusCode::ACCEPTED) +} + +/// Decrypt the stored inbound secret back to its raw string. It was +/// sealed as a JSON string by the admin layer, so `open` yields a +/// `Value::String`. +fn decrypt_secret( + master_key: &MasterKey, + ciphertext: &[u8], + nonce: &[u8], +) -> Result { + let stored = StoredSecret { + encrypted_value: ciphertext.to_vec(), + nonce: nonce.to_vec(), + }; + let value = open(master_key, &stored).map_err(|_| { + // Corrupted secret means we can't verify — fail closed (401). + EmailInboundError::Unauthorized + })?; + value + .as_str() + .map(str::to_string) + .ok_or(EmailInboundError::Unauthorized) +} + +/// Constant-time HMAC-SHA256 verification of the body against the +/// `X-Picloud-Signature` header (lowercase hex). +fn verify_signature( + headers: &HeaderMap, + body: &[u8], + secret: &[u8], +) -> Result<(), EmailInboundError> { + let provided_hex = headers + .get(SIGNATURE_HEADER) + .and_then(|h| h.to_str().ok()) + .ok_or(EmailInboundError::Unauthorized)?; + let provided = hex::decode(provided_hex.trim()).map_err(|_| EmailInboundError::Unauthorized)?; + let mut mac = + HmacSha256::new_from_slice(secret).map_err(|_| EmailInboundError::Unauthorized)?; + mac.update(body); + mac.verify_slice(&provided) + .map_err(|_| EmailInboundError::Unauthorized) +} + +#[derive(Debug, thiserror::Error)] +pub enum EmailInboundError { + #[error("trigger not found")] + NotFound, + #[error("invalid signature")] + Unauthorized, + #[error("malformed body: {0}")] + Malformed(String), + #[error("backend: {0}")] + Backend(String), +} + +impl IntoResponse for EmailInboundError { + fn into_response(self) -> Response { + let (status, body) = match &self { + Self::NotFound => ( + StatusCode::NOT_FOUND, + json!({ "error": "trigger not found" }), + ), + Self::Unauthorized => ( + StatusCode::UNAUTHORIZED, + json!({ "error": "invalid or missing signature" }), + ), + Self::Malformed(m) => ( + StatusCode::UNPROCESSABLE_ENTITY, + json!({ "error": format!("malformed inbound email body: {m}") }), + ), + Self::Backend(e) => { + tracing::error!(error = %e, "inbound email receiver backend error"); + ( + StatusCode::INTERNAL_SERVER_ERROR, + json!({ "error": "internal error" }), + ) + } + }; + (status, Json(body)).into_response() + } +} + +#[cfg(test)] +mod tests { + //! Unit tests for the security-critical helpers (HMAC verify, secret + //! round-trip, payload parsing). The full request flow — 202 / 401 / + //! 404 / 422 / cross-app — is exercised end-to-end against a real + //! Postgres in `crates/picloud/tests/email_inbound.rs`. + + use super::*; + use crate::secrets_service::seal; + use crate::secrets_service::DEFAULT_SECRET_MAX_VALUE_BYTES; + + fn sign(secret: &[u8], body: &[u8]) -> String { + let mut mac = HmacSha256::new_from_slice(secret).unwrap(); + mac.update(body); + hex::encode(mac.finalize().into_bytes()) + } + + fn headers_with_sig(sig: &str) -> HeaderMap { + let mut h = HeaderMap::new(); + h.insert(SIGNATURE_HEADER, sig.parse().unwrap()); + h + } + + #[test] + fn valid_signature_verifies() { + let secret = b"shhh"; + let body = br#"{"from":"a@b.com"}"#; + let sig = sign(secret, body); + assert!(verify_signature(&headers_with_sig(&sig), body, secret).is_ok()); + } + + #[test] + fn wrong_signature_rejected() { + let body = br#"{"from":"a@b.com"}"#; + let sig = sign(b"shhh", body); + let err = verify_signature(&headers_with_sig(&sig), body, b"different").unwrap_err(); + assert!(matches!(err, EmailInboundError::Unauthorized)); + } + + #[test] + fn missing_signature_header_rejected() { + let err = verify_signature(&HeaderMap::new(), b"body", b"secret").unwrap_err(); + assert!(matches!(err, EmailInboundError::Unauthorized)); + } + + #[test] + fn tampered_body_fails_verification() { + let secret = b"shhh"; + let sig = sign(secret, b"original"); + let err = verify_signature(&headers_with_sig(&sig), b"tampered", secret).unwrap_err(); + assert!(matches!(err, EmailInboundError::Unauthorized)); + } + + #[test] + fn secret_round_trips_through_seal_open() { + let key = MasterKey::from_bytes([3u8; 32]); + let (ct, nonce) = seal( + &key, + &serde_json::Value::String("provider-secret".into()), + DEFAULT_SECRET_MAX_VALUE_BYTES, + ) + .unwrap(); + let recovered = decrypt_secret(&key, &ct, &nonce).unwrap(); + assert_eq!(recovered, "provider-secret"); + // And a signature made with the recovered secret verifies. + let body = br#"{"from":"x@y.com"}"#; + let sig = sign(recovered.as_bytes(), body); + assert!(verify_signature(&headers_with_sig(&sig), body, recovered.as_bytes()).is_ok()); + } + + #[test] + fn payload_requires_from_but_defaults_rest() { + let ok: Result = serde_json::from_slice(br#"{"from":"a@b.com"}"#); + let p = ok.expect("from-only payload parses"); + assert_eq!(p.from, "a@b.com"); + assert!(p.to.is_empty() && p.cc.is_empty() && p.text.is_none()); + + // Missing `from` → malformed. + let bad: Result = serde_json::from_slice(br#"{"subject":"hi"}"#); + assert!(bad.is_err()); + } +} diff --git a/crates/manager-core/src/lib.rs b/crates/manager-core/src/lib.rs index f246177..7e83ece 100644 --- a/crates/manager-core/src/lib.rs +++ b/crates/manager-core/src/lib.rs @@ -31,6 +31,7 @@ pub mod dispatcher; pub mod docs_filter; pub mod docs_repo; pub mod docs_service; +pub mod email_inbound_api; pub mod email_service; pub mod files_api; pub mod files_repo; @@ -113,6 +114,7 @@ pub use dead_letters_api::{dead_letters_router, DeadLettersApiError, DeadLetters pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError}; pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo}; pub use docs_service::DocsServiceImpl; +pub use email_inbound_api::{email_inbound_router, EmailInboundError, EmailInboundState}; pub use email_service::{ EmailConfig, EmailServiceImpl, EmailTransport, LettreEmailTransport, SmtpConfig, SmtpTls, DEFAULT_EMAIL_MAX_MESSAGE_BYTES, @@ -155,9 +157,9 @@ pub use topic_repo::{PostgresTopicRepo, Topic, TopicAuthMode, TopicRepo, TopicRe pub use topics_api::{topics_router, TopicsApiError, TopicsState}; pub use trigger_config::{BackoffShape, TriggerConfig}; pub use trigger_repo::{ - collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, - CreateKvTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, - FilesTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, Trigger, TriggerDetails, - TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError, + collection_matches, CreateDeadLetterTrigger, CreateDocsTrigger, CreateEmailTrigger, + CreateFilesTrigger, CreateKvTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, + DocsTriggerMatch, EmailInboundTarget, FilesTriggerMatch, KvTriggerMatch, PostgresTriggerRepo, + Trigger, TriggerDetails, TriggerDispatchMode, TriggerKind, TriggerRepo, TriggerRepoError, }; pub use triggers_api::{triggers_router, TriggersApiError, TriggersState}; diff --git a/crates/manager-core/src/outbox_repo.rs b/crates/manager-core/src/outbox_repo.rs index 0ad88be..30e06a2 100644 --- a/crates/manager-core/src/outbox_repo.rs +++ b/crates/manager-core/src/outbox_repo.rs @@ -31,6 +31,8 @@ pub enum OutboxSourceKind { Files, /// v1.1.5. Pubsub, + /// v1.1.7. Inbound email POSTed to the webhook receiver. + Email, } impl OutboxSourceKind { @@ -44,6 +46,7 @@ impl OutboxSourceKind { Self::Cron => "cron", Self::Files => "files", Self::Pubsub => "pubsub", + Self::Email => "email", } } @@ -57,6 +60,7 @@ impl OutboxSourceKind { "cron" => Some(Self::Cron), "files" => Some(Self::Files), "pubsub" => Some(Self::Pubsub), + "email" => Some(Self::Email), _ => None, } } diff --git a/crates/manager-core/src/trigger_repo.rs b/crates/manager-core/src/trigger_repo.rs index 0b601ae..8c75304 100644 --- a/crates/manager-core/src/trigger_repo.rs +++ b/crates/manager-core/src/trigger_repo.rs @@ -57,6 +57,8 @@ pub enum TriggerKind { Files, /// v1.1.5. Pubsub, + /// v1.1.7. Inbound email via the webhook receiver. + Email, } impl TriggerKind { @@ -69,6 +71,7 @@ impl TriggerKind { Self::Cron => "cron", Self::Files => "files", Self::Pubsub => "pubsub", + Self::Email => "email", } } @@ -81,6 +84,7 @@ impl TriggerKind { "cron" => Some(Self::Cron), "files" => Some(Self::Files), "pubsub" => Some(Self::Pubsub), + "email" => Some(Self::Email), _ => None, } } @@ -137,6 +141,10 @@ pub enum TriggerDetails { }, /// v1.1.5. A topic pattern: exact, `.*`, or `*`. Pubsub { topic_pattern: String }, + /// v1.1.7. Inbound email. The HMAC `inbound_secret` is never + /// surfaced (it's encrypted at rest); we expose only whether one is + /// configured so the admin UI can show "signed" vs "unsigned". + Email { has_inbound_secret: bool }, } /// Create payload for a KV trigger. Defaults applied at the admin @@ -232,6 +240,33 @@ pub struct CreatePubsubTrigger { pub registered_by_principal: AdminUserId, } +/// Create payload for an email trigger (v1.1.7). `inbound_secret_*` is +/// the already-encrypted HMAC secret (sealed by the admin layer with the +/// process master key) or `None` for an unsigned trigger. +#[derive(Debug, Clone)] +pub struct CreateEmailTrigger { + pub script_id: ScriptId, + pub inbound_secret_encrypted: Option>, + pub inbound_secret_nonce: Option>, + pub registered_by_principal: AdminUserId, +} + +/// What the inbound-email webhook receiver needs to verify + dispatch a +/// POST. Returned by `email_inbound_target`; `None` when the trigger +/// doesn't exist or isn't `kind = 'email'`. +#[derive(Debug, Clone)] +pub struct EmailInboundTarget { + pub app_id: AppId, + pub script_id: ScriptId, + pub enabled: bool, + pub dispatch_mode: TriggerDispatchMode, + pub registered_by_principal: AdminUserId, + /// Encrypted HMAC secret + nonce; both `None` for an unsigned + /// trigger (accepts any POST). + pub inbound_secret_encrypted: Option>, + pub inbound_secret_nonce: Option>, +} + /// One match for the dispatcher's "which KV triggers fire on this /// event" lookup. Carries everything the dispatcher needs to construct /// the outbox row. @@ -313,6 +348,23 @@ pub trait TriggerRepo: Send + Sync { req: CreatePubsubTrigger, ) -> Result; + /// v1.1.7. Inbound email trigger. The `inbound_secret` is stored + /// already-encrypted (the admin layer seals it). + async fn create_email_trigger( + &self, + app_id: AppId, + req: CreateEmailTrigger, + ) -> Result; + + /// v1.1.7. The webhook receiver's hot-path lookup: resolve a + /// `kind = 'email'` trigger to its app, handler script, dispatch + /// mode, and (encrypted) HMAC secret. Returns `None` when the + /// trigger doesn't exist or isn't an email trigger. + async fn email_inbound_target( + &self, + trigger_id: TriggerId, + ) -> Result, TriggerRepoError>; + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError>; async fn get(&self, id: TriggerId) -> Result, TriggerRepoError>; @@ -761,6 +813,89 @@ impl TriggerRepo for PostgresTriggerRepo { }) } + async fn create_email_trigger( + &self, + app_id: AppId, + req: CreateEmailTrigger, + ) -> Result { + let has_inbound_secret = req.inbound_secret_encrypted.is_some(); + let mut tx = self.pool.begin().await?; + // Inbound email is delivered async like every other fan-out + // event; the receiver enqueues an outbox row the dispatcher + // picks up. Retry settings use the standard defaults. + let parent: TriggerRow = sqlx::query_as( + "INSERT INTO triggers ( \ + app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal \ + ) VALUES ($1, $2, 'email', TRUE, 'async', 3, 'exponential', 1000, $3) \ + RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \ + retry_max_attempts, retry_backoff, retry_base_ms, \ + registered_by_principal, created_at, updated_at", + ) + .bind(app_id.into_inner()) + .bind(req.script_id.into_inner()) + .bind(req.registered_by_principal.into_inner()) + .fetch_one(&mut *tx) + .await?; + + sqlx::query( + "INSERT INTO email_trigger_details \ + (trigger_id, inbound_secret_encrypted, inbound_secret_nonce) \ + VALUES ($1, $2, $3)", + ) + .bind(parent.id) + .bind(req.inbound_secret_encrypted.as_deref()) + .bind(req.inbound_secret_nonce.as_deref()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(Trigger { + id: parent.id.into(), + app_id: parent.app_id.into(), + script_id: parent.script_id.into(), + kind: TriggerKind::Email, + enabled: parent.enabled, + dispatch_mode: dispatch_from_str(&parent.dispatch_mode), + retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3), + retry_backoff: BackoffShape::from_wire(&parent.retry_backoff) + .unwrap_or(BackoffShape::Exponential), + retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000), + registered_by_principal: parent.registered_by_principal.into(), + created_at: parent.created_at, + updated_at: parent.updated_at, + details: TriggerDetails::Email { has_inbound_secret }, + }) + } + + async fn email_inbound_target( + &self, + trigger_id: TriggerId, + ) -> Result, TriggerRepoError> { + let row: Option = sqlx::query_as( + "SELECT t.app_id, t.script_id, t.enabled, t.dispatch_mode, \ + t.registered_by_principal, \ + d.inbound_secret_encrypted, d.inbound_secret_nonce \ + FROM triggers t \ + JOIN email_trigger_details d ON d.trigger_id = t.id \ + WHERE t.id = $1 AND t.kind = 'email'", + ) + .bind(trigger_id.into_inner()) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|r| EmailInboundTarget { + app_id: r.app_id.into(), + script_id: r.script_id.into(), + enabled: r.enabled, + dispatch_mode: dispatch_from_str(&r.dispatch_mode), + registered_by_principal: r.registered_by_principal.into(), + inbound_secret_encrypted: r.inbound_secret_encrypted, + inbound_secret_nonce: r.inbound_secret_nonce, + })) + } + async fn list_for_app(&self, app_id: AppId) -> Result, TriggerRepoError> { let parents: Vec = sqlx::query_as( "SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \ @@ -1077,6 +1212,17 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result { + let row: EmailDetailRow = sqlx::query_as( + "SELECT inbound_secret_encrypted FROM email_trigger_details WHERE trigger_id = $1", + ) + .bind(parent.id) + .fetch_one(pool) + .await?; + TriggerDetails::Email { + has_inbound_secret: row.inbound_secret_encrypted.is_some(), + } + } }; Ok(Trigger { @@ -1154,6 +1300,22 @@ struct PubsubDetailRow { topic_pattern: String, } +#[derive(sqlx::FromRow)] +struct EmailDetailRow { + inbound_secret_encrypted: Option>, +} + +#[derive(sqlx::FromRow)] +struct EmailInboundRow { + app_id: Uuid, + script_id: Uuid, + enabled: bool, + dispatch_mode: String, + registered_by_principal: Uuid, + inbound_secret_encrypted: Option>, + inbound_secret_nonce: Option>, +} + #[derive(sqlx::FromRow)] #[allow(clippy::struct_field_names)] struct DlDetailRow { diff --git a/crates/manager-core/src/triggers_api.rs b/crates/manager-core/src/triggers_api.rs index aac77e2..f580316 100644 --- a/crates/manager-core/src/triggers_api.rs +++ b/crates/manager-core/src/triggers_api.rs @@ -17,7 +17,8 @@ use axum::response::{IntoResponse, Json, Response}; use axum::routing::{delete, get, post}; use axum::{Extension, Router}; use picloud_shared::{ - AppId, DocsEventOp, FilesEventOp, KvEventOp, Principal, ScriptId, ScriptKind, TriggerId, + AppId, DocsEventOp, FilesEventOp, KvEventOp, MasterKey, Principal, ScriptId, ScriptKind, + TriggerId, }; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -25,11 +26,12 @@ use serde_json::json; use crate::app_repo::AppRepository; use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability}; use crate::repo::{ScriptRepository, ScriptRepositoryError}; +use crate::secrets_service::seal; use crate::trigger_config::{BackoffShape, TriggerConfig}; use crate::trigger_repo::{ - CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateFilesTrigger, - CreateKvTrigger, CreatePubsubTrigger, Trigger, TriggerDispatchMode, TriggerRepo, - TriggerRepoError, + CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateEmailTrigger, + CreateFilesTrigger, CreateKvTrigger, CreatePubsubTrigger, Trigger, TriggerDispatchMode, + TriggerRepo, TriggerRepoError, }; #[derive(Clone)] @@ -46,6 +48,9 @@ pub struct TriggersState { /// retry settings. Kept on the state struct so tests can swap /// in a stricter / looser config without env tinkering. pub config: TriggerConfig, + /// v1.1.7: master key used to encrypt an email trigger's inbound HMAC + /// secret before it's stored. + pub master_key: MasterKey, } pub fn triggers_router(state: TriggersState) -> Router { @@ -66,6 +71,7 @@ pub fn triggers_router(state: TriggersState) -> Router { "/apps/{app_id}/triggers/dead_letter", post(create_dl_trigger), ) + .route("/apps/{app_id}/triggers/email", post(create_email_trigger)) .route( "/apps/{app_id}/triggers/{trigger_id}", delete(delete_trigger), @@ -467,6 +473,60 @@ async fn create_dl_trigger( Ok((StatusCode::CREATED, Json(created))) } +#[derive(Debug, Deserialize)] +struct CreateEmailTriggerRequest { + script_id: ScriptId, + /// Shared HMAC secret the provider signs inbound POSTs with. `null` + /// (or absent) means the trigger accepts unsigned POSTs. + #[serde(default)] + inbound_secret: Option, +} + +async fn create_email_trigger( + State(s): State, + Extension(principal): Extension, + Path(app_id): Path, + Json(input): Json, +) -> Result<(StatusCode, Json), TriggersApiError> { + ensure_app_exists(&*s.apps, app_id).await?; + require( + s.authz.as_ref(), + &principal, + Capability::AppManageTriggers(app_id), + ) + .await?; + validate_trigger_target(&*s.scripts, app_id, input.script_id).await?; + + // Encrypt the inbound HMAC secret at rest (user-approved deviation + // from the brief's plaintext column). An empty/whitespace secret is + // treated as "no secret" (unsigned trigger). + let (inbound_secret_encrypted, inbound_secret_nonce) = match input.inbound_secret { + Some(secret) if !secret.trim().is_empty() => { + // 64 KB cap is irrelevant for a signing secret, but `seal` + // takes one; reuse the secrets default. + let (ct, nonce) = seal( + &s.master_key, + &serde_json::Value::String(secret), + crate::secrets_service::DEFAULT_SECRET_MAX_VALUE_BYTES, + ) + .map_err(|e| { + TriggersApiError::Invalid(format!("could not seal inbound_secret: {e}")) + })?; + (Some(ct), Some(nonce.to_vec())) + } + _ => (None, None), + }; + + let req = CreateEmailTrigger { + script_id: input.script_id, + inbound_secret_encrypted, + inbound_secret_nonce, + registered_by_principal: principal.user_id, + }; + let created = s.triggers.create_email_trigger(app_id, req).await?; + Ok((StatusCode::CREATED, Json(created))) +} + async fn delete_trigger( State(s): State, Extension(principal): Extension, @@ -598,9 +658,9 @@ mod tests { use super::*; use crate::app_repo::{AppLookup, AppRepository}; use crate::trigger_repo::{ - CreateCronTrigger, CreateFilesTrigger, CreatePubsubTrigger, DeadLetterTriggerMatch, - DocsTriggerMatch, FilesTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails, TriggerRepo, - TriggerRepoError, + CreateCronTrigger, CreateEmailTrigger, CreateFilesTrigger, CreatePubsubTrigger, + DeadLetterTriggerMatch, DocsTriggerMatch, EmailInboundTarget, FilesTriggerMatch, + KvTriggerMatch, Trigger, TriggerDetails, TriggerKind, TriggerRepo, TriggerRepoError, }; use async_trait::async_trait; use chrono::Utc; @@ -703,6 +763,50 @@ mod tests { self.inner.lock().await.insert(id, trigger.clone()); Ok(trigger) } + async fn create_email_trigger( + &self, + app_id: AppId, + req: CreateEmailTrigger, + ) -> Result { + let now = Utc::now(); + let id = TriggerId::new(); + let trigger = Trigger { + id, + app_id, + script_id: req.script_id, + kind: TriggerKind::Email, + enabled: true, + dispatch_mode: TriggerDispatchMode::Async, + retry_max_attempts: 3, + retry_backoff: BackoffShape::Exponential, + retry_base_ms: 1000, + registered_by_principal: req.registered_by_principal, + created_at: now, + updated_at: now, + details: TriggerDetails::Email { + has_inbound_secret: req.inbound_secret_encrypted.is_some(), + }, + }; + self.inner.lock().await.insert(id, trigger.clone()); + Ok(trigger) + } + async fn email_inbound_target( + &self, + trigger_id: TriggerId, + ) -> Result, TriggerRepoError> { + let g = self.inner.lock().await; + Ok(g.get(&trigger_id) + .filter(|t| t.kind == TriggerKind::Email) + .map(|t| EmailInboundTarget { + app_id: t.app_id, + script_id: t.script_id, + enabled: t.enabled, + dispatch_mode: t.dispatch_mode, + registered_by_principal: t.registered_by_principal, + inbound_secret_encrypted: None, + inbound_secret_nonce: None, + })) + } async fn create_cron_trigger( &self, app_id: AppId, @@ -1101,6 +1205,7 @@ mod tests { authz, scripts: InMemoryScriptRepo::empty(), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), } } @@ -1118,6 +1223,7 @@ mod tests { authz, scripts: InMemoryScriptRepo::with_endpoint(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), } } @@ -1390,6 +1496,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_kv_trigger( State(state), @@ -1427,6 +1534,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_docs_trigger( State(state), @@ -1461,6 +1569,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_dl_trigger( State(state), @@ -1526,6 +1635,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts, config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_kv_trigger( State(state), @@ -1656,6 +1766,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_cron_trigger( State(state), @@ -1685,6 +1796,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_cron_trigger( State(state), @@ -1813,6 +1925,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_files_trigger( State(state), @@ -1839,6 +1952,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_files_trigger( State(state), @@ -1936,6 +2050,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_module(app_id, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_pubsub_trigger( State(state), @@ -1962,6 +2077,7 @@ mod tests { authz: Arc::new(AlwaysAllowAuthzRepo), scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id), config: TriggerConfig::conservative(), + master_key: picloud_shared::MasterKey::from_bytes([0u8; 32]), }; let res = create_pubsub_trigger( State(state), diff --git a/crates/picloud/Cargo.toml b/crates/picloud/Cargo.toml index c29598c..a9297ca 100644 --- a/crates/picloud/Cargo.toml +++ b/crates/picloud/Cargo.toml @@ -41,3 +41,7 @@ serde.workspace = true serde_json.workspace = true uuid.workspace = true chrono.workspace = true +# Compute inbound-email HMAC signatures in the e2e receiver tests. +hmac.workspace = true +sha2.workspace = true +hex.workspace = true diff --git a/crates/picloud/src/lib.rs b/crates/picloud/src/lib.rs index 541439f..bb717db 100644 --- a/crates/picloud/src/lib.rs +++ b/crates/picloud/src/lib.rs @@ -12,22 +12,23 @@ use picloud_executor_core::{Engine, Limits}; use picloud_manager_core::{ admin_router, admins_router, api_keys_router, app_members_router, apps_api, apps_router, attach_principal_if_present, auth_router, compile_routes, dead_letters_router, - files_admin_router, migrations, require_authenticated, route_admin_router, secrets_router, - topics_router, triggers_router, AbandonedRepo, AdminPrincipalResolver, AdminSessionRepository, - AdminState, AdminUserRepository, AdminsState, ApiKeyRepository, ApiKeysState, - AppDomainRepository, AppMembersRepository, AppMembersState, AppRepository, AppsState, - AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher, DocsServiceImpl, - EmailServiceImpl, FilesAdminState, FilesConfig, FilesServiceImpl, FsFilesRepo, HttpConfig, - HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, OutboxRepo, PostgresAbandonedRepo, - PostgresAdminSessionRepository, PostgresAdminUserRepository, PostgresApiKeyRepository, - PostgresAppDomainRepository, PostgresAppMembersRepository, PostgresAppRepository, - PostgresAppSecretsRepo, PostgresDeadLetterRepo, PostgresDeadLetterService, PostgresDocsRepo, - PostgresExecutionLogRepository, PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, - PostgresPubsubRepo, PostgresRouteRepository, PostgresScriptRepository, PostgresSecretsRepo, - PostgresTopicRepo, PostgresTriggerRepo, PrincipalResolver, PubsubServiceImpl, - RealtimeAuthorityImpl, RepoResolver, RouteAdminState, RouteRepository, SandboxCeiling, - ScriptRepository, SecretsConfig, SecretsServiceImpl, SecretsState, SubscriberTokenConfig, - TopicRepo, TopicsState, TriggerConfig, TriggerRepo, TriggersState, + email_inbound_router, files_admin_router, migrations, require_authenticated, + route_admin_router, secrets_router, topics_router, triggers_router, AbandonedRepo, + AdminPrincipalResolver, AdminSessionRepository, AdminState, AdminUserRepository, AdminsState, + ApiKeyRepository, ApiKeysState, AppDomainRepository, AppMembersRepository, AppMembersState, + AppRepository, AppsState, AuthState, AuthzRepo, DeadLetterRepo, DeadLettersState, Dispatcher, + DocsServiceImpl, EmailInboundState, EmailServiceImpl, FilesAdminState, FilesConfig, + FilesServiceImpl, FsFilesRepo, HttpConfig, HttpServiceImpl, KvServiceImpl, OutboxEventEmitter, + OutboxRepo, PostgresAbandonedRepo, PostgresAdminSessionRepository, PostgresAdminUserRepository, + PostgresApiKeyRepository, PostgresAppDomainRepository, PostgresAppMembersRepository, + PostgresAppRepository, PostgresAppSecretsRepo, PostgresDeadLetterRepo, + PostgresDeadLetterService, PostgresDocsRepo, PostgresExecutionLogRepository, + PostgresExecutionLogSink, PostgresKvRepo, PostgresOutboxRepo, PostgresPubsubRepo, + PostgresRouteRepository, PostgresScriptRepository, PostgresSecretsRepo, PostgresTopicRepo, + PostgresTriggerRepo, PrincipalResolver, PubsubServiceImpl, RealtimeAuthorityImpl, RepoResolver, + RouteAdminState, RouteRepository, SandboxCeiling, ScriptRepository, SecretsConfig, + SecretsServiceImpl, SecretsState, SubscriberTokenConfig, TopicRepo, TopicsState, TriggerConfig, + TriggerRepo, TriggersState, }; use picloud_orchestrator_core::realtime::DEFAULT_GC_INTERVAL_SECS; use picloud_orchestrator_core::routing::{AppDomainTable, RouteTable}; @@ -341,11 +342,19 @@ pub async fn build_app( spawn_realtime_gc(broadcaster_concrete, DEFAULT_GC_INTERVAL_SECS); picloud_manager_core::spawn_files_orphan_sweep(files_root); let triggers_state = TriggersState { - triggers: trigger_repo, + triggers: trigger_repo.clone(), apps: apps_repo.clone(), authz: authz.clone(), scripts: Arc::new(PostgresScriptRepoHandle(script_repo.clone())), config: trigger_config, + master_key: master_key.clone(), + }; + // v1.1.7 public inbound-email receiver. Outside the admin auth layer + // (the URL + per-trigger HMAC secret are the security boundary). + let email_inbound_state = EmailInboundState { + triggers: trigger_repo, + outbox: outbox_repo.clone(), + master_key: master_key.clone(), }; let dead_letters_state = DeadLettersState { repo: dl_repo, @@ -444,6 +453,7 @@ pub async fn build_app( let api_v1 = Router::new() .nest("/admin", auth_router(auth_state)) .nest("/admin", guarded_admin) + .merge(email_inbound_router(email_inbound_state)) .merge(data_plane_routed); // v1.1.6 SSE realtime surface, merged at the root (deliberately NOT diff --git a/crates/picloud/tests/email_inbound.rs b/crates/picloud/tests/email_inbound.rs new file mode 100644 index 0000000..5135c6e --- /dev/null +++ b/crates/picloud/tests/email_inbound.rs @@ -0,0 +1,298 @@ +//! End-to-end tests for the inbound-email webhook receiver (v1.1.7). +//! +//! Gated on `DATABASE_URL` like `dispatcher_e2e.rs`: when unset the test +//! prints a notice and returns early so plain `cargo test` stays green. +//! +//! Covers the receiver's status-code matrix (202 / 401 / 404 / 422), +//! cross-app path isolation, HMAC verification (signed + unsigned +//! triggers), the dispatcher routing the `email` outbox row, and the +//! handler actually firing with `ctx.event.email` populated. The +//! "handler fired" observation uses the same KV-marker pattern as +//! `dispatcher_e2e.rs`. + +#![allow(clippy::needless_pass_by_value)] + +use std::time::Duration; + +use axum_test::TestServer; +use hmac::{Hmac, Mac}; +use serde_json::{json, Value}; +use sha2::Sha256; +use sqlx::postgres::PgPoolOptions; +use sqlx::PgPool; +use uuid::Uuid; + +/// Fixed master key so the receiver decrypts the inbound_secret the +/// admin endpoint encrypted (same key feeds build_app + the admin path). +fn master_key() -> picloud_shared::MasterKey { + picloud_shared::MasterKey::from_bytes([0x42u8; 32]) +} + +async fn pool_or_skip() -> Option { + let Ok(url) = std::env::var("DATABASE_URL") else { + eprintln!("email_inbound: DATABASE_URL unset — skipping"); + return None; + }; + let pool = PgPoolOptions::new() + .max_connections(5) + .connect(&url) + .await + .expect("connect to DATABASE_URL"); + sqlx::migrate!("../manager-core/migrations") + .run(&pool) + .await + .expect("apply migrations"); + Some(pool) +} + +async fn server_for(pool: PgPool, suffix: &str) -> (TestServer, String) { + use picloud_manager_core::auth::hash_password; + use picloud_shared::InstanceRole; + + let unique = format!("{suffix}-{}", Uuid::new_v4().simple()); + let auth = picloud::AuthDeps::from_pool(pool.clone()); + let username = format!("eml-{unique}"); + let hash = hash_password("pw").expect("hash"); + auth.users + .create(&username, &hash, InstanceRole::Owner, None) + .await + .expect("seed admin"); + + let app = picloud::build_app(pool, auth, master_key()) + .await + .expect("build_app"); + let mut server = TestServer::new(app).expect("TestServer"); + let resp = server + .post("/api/v1/admin/auth/login") + .json(&json!({ "username": username, "password": "pw" })) + .await; + resp.assert_status_ok(); + let token = resp.json::()["token"] + .as_str() + .expect("login token") + .to_string(); + server.add_header("authorization", format!("Bearer {token}")); + + let slug = format!("eml-{unique}"); + let created: Value = server + .post("/api/v1/admin/apps") + .json(&json!({ "slug": slug, "name": slug })) + .await + .json(); + let app_id = created["id"].as_str().expect("app id").to_string(); + (server, app_id) +} + +async fn create_script(server: &TestServer, app_id: &str, name: &str, source: &str) -> String { + let created: Value = server + .post("/api/v1/admin/scripts") + .json(&json!({ "app_id": app_id, "name": name, "source": source })) + .await + .json(); + created["id"].as_str().expect("script id").to_string() +} + +const MARKER_HANDLER: &str = r#" + let e = ctx.event; + kv::collection("e2e_markers").set("marker", e); + #{ ok: true } +"#; + +async fn create_email_trigger( + server: &TestServer, + app_id: &str, + script_id: &str, + secret: Option<&str>, +) -> String { + let created: Value = server + .post(&format!("/api/v1/admin/apps/{app_id}/triggers/email")) + .json(&json!({ "script_id": script_id, "inbound_secret": secret })) + .await + .json(); + created["id"].as_str().expect("trigger id").to_string() +} + +fn sign(secret: &str, body: &str) -> String { + let mut mac = Hmac::::new_from_slice(secret.as_bytes()).expect("hmac key"); + mac.update(body.as_bytes()); + hex::encode(mac.finalize().into_bytes()) +} + +async fn poll_marker(pool: &PgPool, app_id: &str) -> Option { + let app_uuid = Uuid::parse_str(app_id).expect("app uuid"); + for _ in 0..100 { + let row: Option<(Value,)> = sqlx::query_as( + "SELECT value FROM kv_entries \ + WHERE app_id = $1 AND collection = 'e2e_markers' AND key = 'marker'", + ) + .bind(app_uuid) + .fetch_optional(pool) + .await + .expect("query marker"); + if let Some((value,)) = row { + return Some(value); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + None +} + +const BODY: &str = r#"{"from":"sender@external.com","to":["alice@myapp.com"],"cc":["bob@myapp.com"],"subject":"Re: question","text":"hello there","message_id":""}"#; + +#[tokio::test] +async fn signed_post_accepts_and_fires_handler() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool.clone(), "signed").await; + let handler = create_script(&server, &app_id, "eml-handler", MARKER_HANDLER).await; + let trigger = create_email_trigger(&server, &app_id, &handler, Some("topsecret")).await; + + let sig = sign("topsecret", BODY); + server + .post(&format!("/api/v1/email-inbound/{app_id}/{trigger}")) + .add_header("x-picloud-signature", sig) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::ACCEPTED); + + // Outbox row landed with source_kind = 'email'. + let app_uuid = Uuid::parse_str(&app_id).unwrap(); + // The dispatcher deletes the row after delivery; instead assert the + // handler fired (which proves the email row was dispatched). + let event = poll_marker(&pool, &app_id).await.expect("handler fired"); + assert_eq!(event["source"], "email"); + assert_eq!(event["op"], "receive"); + assert_eq!(event["email"]["from"], "sender@external.com"); + assert_eq!(event["email"]["to"][0], "alice@myapp.com"); + assert_eq!(event["email"]["cc"][0], "bob@myapp.com"); + assert_eq!(event["email"]["subject"], "Re: question"); + assert_eq!(event["email"]["text"], "hello there"); + assert_eq!(event["email"]["message_id"], ""); + let _ = app_uuid; +} + +#[tokio::test] +async fn missing_signature_is_401_when_secret_configured() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "nosig").await; + let handler = create_script(&server, &app_id, "h", MARKER_HANDLER).await; + let trigger = create_email_trigger(&server, &app_id, &handler, Some("topsecret")).await; + + server + .post(&format!("/api/v1/email-inbound/{app_id}/{trigger}")) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::UNAUTHORIZED); +} + +#[tokio::test] +async fn wrong_signature_is_401() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "wrongsig").await; + let handler = create_script(&server, &app_id, "h", MARKER_HANDLER).await; + let trigger = create_email_trigger(&server, &app_id, &handler, Some("topsecret")).await; + + server + .post(&format!("/api/v1/email-inbound/{app_id}/{trigger}")) + .add_header("x-picloud-signature", sign("WRONG", BODY)) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::UNAUTHORIZED); +} + +#[tokio::test] +async fn unsigned_trigger_accepts_without_signature() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "unsigned").await; + let handler = create_script(&server, &app_id, "h", MARKER_HANDLER).await; + let trigger = create_email_trigger(&server, &app_id, &handler, None).await; + + server + .post(&format!("/api/v1/email-inbound/{app_id}/{trigger}")) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::ACCEPTED); +} + +#[tokio::test] +async fn unknown_trigger_is_404() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "missing").await; + let missing = Uuid::new_v4(); + server + .post(&format!("/api/v1/email-inbound/{app_id}/{missing}")) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn wrong_kind_trigger_is_404() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "wrongkind").await; + let handler = create_script(&server, &app_id, "h", MARKER_HANDLER).await; + // A KV trigger — not an email trigger. + let kv_trigger: Value = server + .post(&format!("/api/v1/admin/apps/{app_id}/triggers/kv")) + .json(&json!({ "script_id": handler, "collection_glob": "*" })) + .await + .json(); + let kv_id = kv_trigger["id"].as_str().unwrap(); + server + .post(&format!("/api/v1/email-inbound/{app_id}/{kv_id}")) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn malformed_body_is_422() { + let Some(pool) = pool_or_skip().await else { + return; + }; + let (server, app_id) = server_for(pool, "malformed").await; + let handler = create_script(&server, &app_id, "h", MARKER_HANDLER).await; + // Unsigned so we reach the parse step. + let trigger = create_email_trigger(&server, &app_id, &handler, None).await; + server + .post(&format!("/api/v1/email-inbound/{app_id}/{trigger}")) + .text("not json at all") + .await + .assert_status(axum::http::StatusCode::UNPROCESSABLE_ENTITY); +} + +#[tokio::test] +async fn cross_app_path_is_404() { + let Some(pool) = pool_or_skip().await else { + return; + }; + // Two apps under the same server. A trigger created in app B must + // not be reachable via app A's path segment. + let (server, app_a) = server_for(pool.clone(), "xa").await; + let app_b: Value = server + .post("/api/v1/admin/apps") + .json(&json!({ "slug": format!("xb-{}", Uuid::new_v4().simple()), "name": "xb" })) + .await + .json(); + let app_b_id = app_b["id"].as_str().unwrap().to_string(); + let handler_b = create_script(&server, &app_b_id, "hb", MARKER_HANDLER).await; + let trigger_b = create_email_trigger(&server, &app_b_id, &handler_b, None).await; + + // POST to app A's path with app B's trigger id → 404 (path-bound). + server + .post(&format!("/api/v1/email-inbound/{app_a}/{trigger_b}")) + .text(BODY) + .await + .assert_status(axum::http::StatusCode::NOT_FOUND); +} diff --git a/crates/shared/src/trigger_event.rs b/crates/shared/src/trigger_event.rs index ac79813..4f0d24e 100644 --- a/crates/shared/src/trigger_event.rs +++ b/crates/shared/src/trigger_event.rs @@ -186,6 +186,27 @@ pub enum TriggerEvent { published_at: DateTime, }, + /// An inbound email (POSTed to the webhook receiver by a configured + /// provider) fired this handler. v1.1.7. Carries the normalized + /// message; `text`/`html` are absent when the provider sent only the + /// other. Surfaced to scripts as `ctx.event.email`. Attachments are + /// deferred to v1.2. + Email { + from: String, + to: Vec, + #[serde(default)] + cc: Vec, + subject: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + text: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + html: Option, + received_at: DateTime, + /// RFC 5322 Message-ID, when the provider supplied one. + #[serde(default, skip_serializing_if = "Option::is_none")] + message_id: Option, + }, + /// A dead-letter row fired this handler. The original event is /// nested verbatim plus the dead-letter metadata the design notes /// §4 require. @@ -213,6 +234,7 @@ impl TriggerEvent { Self::Cron { .. } => "cron", Self::Files { .. } => "files", Self::Pubsub { .. } => "pubsub", + Self::Email { .. } => "email", Self::DeadLetter { .. } => "dead_letter", } } diff --git a/dashboard/src/lib/api.ts b/dashboard/src/lib/api.ts index 2897a93..7e0a8cd 100644 --- a/dashboard/src/lib/api.ts +++ b/dashboard/src/lib/api.ts @@ -211,7 +211,14 @@ export interface DeadLetterRow { resolution: 'replayed' | 'ignored' | 'handled_by_script' | 'handler_failed' | null; } -export type TriggerKind = 'kv' | 'docs' | 'dead_letter' | 'cron' | 'files' | 'pubsub'; +export type TriggerKind = + | 'kv' + | 'docs' + | 'dead_letter' + | 'cron' + | 'files' + | 'pubsub' + | 'email'; export type TriggerDispatchMode = 'sync' | 'async'; /// Per-kind detail, tagged by `kind` to match the Rust serde shape. @@ -221,7 +228,15 @@ export type TriggerDetails = | { kind: 'dead_letter'; source_filter?: string; trigger_id_filter?: string; script_id_filter?: string } | { kind: 'cron'; schedule: string; timezone: string; last_fired_at?: string | null } | { kind: 'files'; collection_glob: string; ops: string[] } - | { kind: 'pubsub'; topic_pattern: string }; + | { kind: 'pubsub'; topic_pattern: string } + | { kind: 'email'; has_inbound_secret: boolean }; + +export interface CreateEmailTriggerInput { + script_id: string; + /// Shared HMAC secret; null/omitted means the receiver accepts + /// unsigned POSTs (URL secrecy is then the only guard). + inbound_secret?: string | null; +} /// v1.1.5 file metadata as the admin files endpoint returns it. export interface FileMeta { @@ -673,6 +688,11 @@ export const api = { `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/pubsub`, { method: 'POST', body: JSON.stringify(input) } ), + createEmail: (idOrSlug: string, input: CreateEmailTriggerInput) => + adminRequest( + `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/email`, + { method: 'POST', body: JSON.stringify(input) } + ), remove: (idOrSlug: string, triggerId: string) => adminRequest( `/api/v1/admin/apps/${encodeURIComponent(idOrSlug)}/triggers/${triggerId}`, diff --git a/dashboard/src/routes/apps/[slug]/+page.svelte b/dashboard/src/routes/apps/[slug]/+page.svelte index b237473..8b49491 100644 --- a/dashboard/src/routes/apps/[slug]/+page.svelte +++ b/dashboard/src/routes/apps/[slug]/+page.svelte @@ -126,6 +126,11 @@ let createPubsubTopic = $state(''); let creatingPubsub = $state(false); let createPubsubError = $state(null); + // Email triggers (v1.1.7). + let createEmailScriptId = $state(''); + let createEmailSecret = $state(''); + let creatingEmail = $state(false); + let createEmailError = $state(null); let triggerToRemove = $state(null); let removingTrigger = $state(false); // Endpoint scripts only — modules can't be trigger targets. @@ -182,6 +187,34 @@ } } + async function submitCreateEmail(e: SubmitEvent) { + e.preventDefault(); + if (!app) return; + creatingEmail = true; + createEmailError = null; + try { + await api.triggers.createEmail(app.id, { + script_id: createEmailScriptId, + inbound_secret: createEmailSecret.trim() === '' ? null : createEmailSecret + }); + createEmailScriptId = ''; + createEmailSecret = ''; + await loadTriggers(app.id); + } catch (err) { + createEmailError = + err instanceof ApiError ? err.message : err instanceof Error ? err.message : String(err); + } finally { + creatingEmail = false; + } + } + + // The inbound-email webhook URL for a given email trigger (shown so + // the operator can configure their provider). + function emailInboundUrl(triggerId: string): string { + if (!app) return ''; + return `${window.location.origin}/api/v1/email-inbound/${app.id}/${triggerId}`; + } + async function confirmRemoveTrigger() { if (!app || !triggerToRemove) return; removingTrigger = true; @@ -1099,6 +1132,59 @@ + +

Email triggers

+

+ Fire an endpoint script when your email provider POSTs an inbound + message to PiCloud. Configure your provider (Mailgun / Postmark / + SendGrid / SES) to POST the generic JSON shape below to the trigger's + webhook URL. Set a shared secret to require an + X-Picloud-Signature HMAC-SHA256 (hex of the request body); + leave it blank to accept unsigned POSTs (URL secrecy only). +

+
+ Expected inbound JSON shape +
{`{
+  "from": "sender@external.com",
+  "to": ["alice@myapp.com"],
+  "cc": [],
+  "subject": "...",
+  "text": "...",
+  "html": "...",
+  "message_id": ""
+}`}
+
+ +
+
+ + +
+ {#if createEmailError} +
{createEmailError}
+ {/if} +
+ +
+
+ {#if triggers.length === 0}

No triggers in this app yet.

{:else} @@ -1118,6 +1204,11 @@ — {t.details.ops.join(', ') || 'any op'} {:else if t.details.kind === 'pubsub'} {t.details.topic_pattern} + {:else if t.details.kind === 'email'} + + {t.details.has_inbound_secret ? 'signed (HMAC)' : 'unsigned'} + + {emailInboundUrl(t.id)} {/if} → {t.script_id}