Files
PiCloud/crates/orchestrator-core/src/inbox.rs
MechaCat02 77b2cb58bb feat(v1.1.1-routes): outbox-routed sync HTTP + dispatch_mode=async
Routes gain `dispatch_mode TEXT NOT NULL DEFAULT 'sync'` (migration
0012). Existing routes default to sync so the migration is
non-breaking. `DispatchMode` enum lands in `picloud-shared`.

The user-routes orchestrator handler now branches:
- `dispatch_mode = async` → write outbox row with `reply_to = None`,
  return `202 Accepted` + `{accepted_at, execution_id}`. Dispatcher
  fires the script in the background; retries / dead-letters via
  the framework from commit 5.
- `dispatch_mode = sync` → register an inbox channel
  (`tokio::sync::oneshot`), write outbox row with `reply_to =
  inbox_id`, `.await` on the receiver with a timeout =
  script.timeout_seconds + 2s buffer. Dispatcher hands the result
  back; orchestrator maps `InboxResult` into the HTTP response per
  the design-notes §3 status-code table (422/502/503/504/507/500).

`InboxRegistry` (orchestrator-core/src/inbox.rs) is the in-process
implementation of `InboxResolver`. Lock-free HashMap of pending
oneshot senders keyed by `inbox_id`. Tests cover register/deliver
round-trip, unknown-id is abandoned, dropped-receiver is abandoned,
explicit cancel. Cluster mode (v1.3+) swaps this for
LISTEN/NOTIFY-keyed lookup behind the same trait.

`OutboxWriter` trait lives in `picloud-shared` so orchestrator-core
can write to the outbox without depending on manager-core (which
would invert the dependency arrow). `PostgresOutboxRepo` implements
both `OutboxRepo` (dispatcher surface) and `OutboxWriter`
(orchestrator surface); the picloud binary clones the same concrete
Arc into both trait views.

The dispatcher's HTTP arm (commit 5 had a stub) now decodes the
`HttpDispatchPayload` off the outbox row, looks up the script,
synthesizes an `ExecRequest`, and runs it through the executor.
Outcome routing reuses the same path as KV triggers — sync HTTP
flows through the inbox, async dispatch gets dropped after
success (or DL'd on exhaustion).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 22:12:55 +02:00

140 lines
4.5 KiB
Rust

//! In-process `InboxRegistry` — the NATS-style request/reply
//! implementation for sync HTTP via the trigger outbox (design notes
//! §3).
//!
//! Workflow:
//! 1. Orchestrator allocates an `inbox_id`, calls
//! `registry.register()` to get a oneshot receiver.
//! 2. Orchestrator writes an outbox row with `reply_to = inbox_id`.
//! 3. Dispatcher picks the row, runs the script, calls
//! `registry.deliver(inbox_id, result)`.
//! 4. Orchestrator's `.await` on the receiver fires; it maps the
//! `InboxResult` back into an HTTP response.
//!
//! `Delivered` means the receiver was alive when delivery hit. If the
//! orchestrator timed out and dropped the receiver before delivery,
//! `Abandoned` comes back — the dispatcher writes an
//! `abandoned_executions` row (design notes §3 #9).
//!
//! Cluster mode (v1.3+) swaps this for a Postgres `LISTEN/NOTIFY`-
//! based resolver; the `InboxResolver` trait stays the same.
use std::collections::HashMap;
use std::sync::Mutex;
use async_trait::async_trait;
use picloud_shared::{InboxDeliveryOutcome, InboxResolver, InboxResult};
use tokio::sync::oneshot;
use uuid::Uuid;
pub struct InboxRegistry {
inner: Mutex<HashMap<Uuid, oneshot::Sender<InboxResult>>>,
}
impl InboxRegistry {
#[must_use]
pub fn new() -> Self {
Self {
inner: Mutex::new(HashMap::new()),
}
}
/// Allocate a new inbox id and register the sender side. The
/// caller awaits the returned `Receiver`; the dispatcher delivers
/// the outcome via `deliver(id, …)`.
#[must_use]
pub fn register(&self) -> (Uuid, oneshot::Receiver<InboxResult>) {
let id = Uuid::new_v4();
let (tx, rx) = oneshot::channel();
if let Ok(mut g) = self.inner.lock() {
g.insert(id, tx);
}
(id, rx)
}
/// Cancel a pending inbox (orchestrator timed out and gave up).
/// Drops the sender so any future `deliver` returns `Abandoned`.
/// Returns `true` if the receiver was still registered.
pub fn cancel(&self, id: Uuid) -> bool {
self.inner
.lock()
.map(|mut g| g.remove(&id).is_some())
.unwrap_or(false)
}
}
impl Default for InboxRegistry {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl InboxResolver for InboxRegistry {
async fn deliver(&self, inbox_id: Uuid, result: InboxResult) -> InboxDeliveryOutcome {
let Ok(mut g) = self.inner.lock() else {
return InboxDeliveryOutcome::Abandoned;
};
let Some(tx) = g.remove(&inbox_id) else {
return InboxDeliveryOutcome::Abandoned;
};
// `send` returns Err iff the receiver was dropped — exactly
// the abandoned-execution case.
if tx.send(result).is_err() {
InboxDeliveryOutcome::Abandoned
} else {
InboxDeliveryOutcome::Delivered
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use picloud_shared::ExecResponseSummary;
use std::collections::BTreeMap;
fn ok_result() -> InboxResult {
InboxResult::Success(ExecResponseSummary {
status_code: 200,
headers: BTreeMap::new(),
body: serde_json::json!({ "ok": true }),
})
}
#[tokio::test]
async fn register_then_deliver_resolves_receiver() {
let reg = InboxRegistry::new();
let (id, rx) = reg.register();
let outcome = reg.deliver(id, ok_result()).await;
assert_eq!(outcome, InboxDeliveryOutcome::Delivered);
let received = rx.await.expect("receiver should fire");
assert!(matches!(received, InboxResult::Success(_)));
}
#[tokio::test]
async fn deliver_to_unknown_id_is_abandoned() {
let reg = InboxRegistry::new();
let outcome = reg.deliver(Uuid::new_v4(), ok_result()).await;
assert_eq!(outcome, InboxDeliveryOutcome::Abandoned);
}
#[tokio::test]
async fn dropping_receiver_then_delivering_is_abandoned() {
let reg = InboxRegistry::new();
let (id, rx) = reg.register();
drop(rx);
let outcome = reg.deliver(id, ok_result()).await;
assert_eq!(outcome, InboxDeliveryOutcome::Abandoned);
}
#[tokio::test]
async fn cancel_removes_sender() {
let reg = InboxRegistry::new();
let (id, _rx) = reg.register();
assert!(reg.cancel(id));
let outcome = reg.deliver(id, ok_result()).await;
assert_eq!(outcome, InboxDeliveryOutcome::Abandoned);
}
}