Files
PiCloud/crates/manager-core/src/route_repo.rs
MechaCat02 77b2cb58bb feat(v1.1.1-routes): outbox-routed sync HTTP + dispatch_mode=async
Routes gain `dispatch_mode TEXT NOT NULL DEFAULT 'sync'` (migration
0012). Existing routes default to sync so the migration is
non-breaking. `DispatchMode` enum lands in `picloud-shared`.

The user-routes orchestrator handler now branches:
- `dispatch_mode = async` → write outbox row with `reply_to = None`,
  return `202 Accepted` + `{accepted_at, execution_id}`. Dispatcher
  fires the script in the background; retries / dead-letters via
  the framework from commit 5.
- `dispatch_mode = sync` → register an inbox channel
  (`tokio::sync::oneshot`), write outbox row with `reply_to =
  inbox_id`, `.await` on the receiver with a timeout =
  script.timeout_seconds + 2s buffer. Dispatcher hands the result
  back; orchestrator maps `InboxResult` into the HTTP response per
  the design-notes §3 status-code table (422/502/503/504/507/500).

`InboxRegistry` (orchestrator-core/src/inbox.rs) is the in-process
implementation of `InboxResolver`. Lock-free HashMap of pending
oneshot senders keyed by `inbox_id`. Tests cover register/deliver
round-trip, unknown-id is abandoned, dropped-receiver is abandoned,
explicit cancel. Cluster mode (v1.3+) swaps this for
LISTEN/NOTIFY-keyed lookup behind the same trait.

`OutboxWriter` trait lives in `picloud-shared` so orchestrator-core
can write to the outbox without depending on manager-core (which
would invert the dependency arrow). `PostgresOutboxRepo` implements
both `OutboxRepo` (dispatcher surface) and `OutboxWriter`
(orchestrator surface); the picloud binary clones the same concrete
Arc into both trait views.

The dispatcher's HTTP arm (commit 5 had a stub) now decodes the
`HttpDispatchPayload` off the outbox row, looks up the script,
synthesizes an `ExecRequest`, and runs it through the executor.
Outcome routing reuses the same path as KV triggers — sync HTTP
flows through the inbox, async dispatch gets dropped after
success (or DL'd on exhaustion).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-01 22:12:55 +02:00

232 lines
7.6 KiB
Rust

//! CRUD over the `routes` table.
//!
//! The orchestrator's `AppRouteTables` is repopulated from this repo
//! after every write — see the route_admin module for the binding.
use async_trait::async_trait;
use picloud_shared::{AppId, DispatchMode, HostKind, PathKind, Route, ScriptId};
use sqlx::PgPool;
use uuid::Uuid;
use crate::repo::ScriptRepositoryError;
#[derive(Debug, Clone)]
pub struct NewRoute {
pub app_id: AppId,
pub script_id: ScriptId,
pub host_kind: HostKind,
pub host: String,
pub host_param_name: Option<String>,
pub path_kind: PathKind,
pub path: String,
pub method: Option<String>,
pub dispatch_mode: DispatchMode,
}
#[async_trait]
pub trait RouteRepository: Send + Sync {
async fn list_all(&self) -> Result<Vec<Route>, ScriptRepositoryError>;
/// Single-row lookup. Used by `DELETE /api/v1/admin/routes/{id}` so
/// the capability check binds to the route's actual `app_id`
/// (not a path param).
async fn get(&self, route_id: Uuid) -> Result<Option<Route>, ScriptRepositoryError>;
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Route>, ScriptRepositoryError>;
async fn list_for_script(
&self,
script_id: ScriptId,
) -> Result<Vec<Route>, ScriptRepositoryError>;
async fn create(&self, input: NewRoute) -> Result<Route, ScriptRepositoryError>;
async fn delete(&self, route_id: Uuid) -> Result<(), ScriptRepositoryError>;
/// Count routes whose host_kind/host pair matches a pattern in
/// `app_id`. Used by the domain-claim delete guard.
async fn count_for_app_host(
&self,
app_id: AppId,
host_kind: HostKind,
host: &str,
) -> Result<i64, ScriptRepositoryError>;
}
pub struct PostgresRouteRepository {
pool: PgPool,
}
impl PostgresRouteRepository {
#[must_use]
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl RouteRepository for PostgresRouteRepository {
async fn list_all(&self) -> Result<Vec<Route>, ScriptRepositoryError> {
let rows = sqlx::query_as::<_, RouteRow>(
"SELECT id, app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode, created_at \
FROM routes ORDER BY created_at",
)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn get(&self, route_id: Uuid) -> Result<Option<Route>, ScriptRepositoryError> {
let row = sqlx::query_as::<_, RouteRow>(
"SELECT id, app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode, created_at \
FROM routes WHERE id = $1",
)
.bind(route_id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Route>, ScriptRepositoryError> {
let rows = sqlx::query_as::<_, RouteRow>(
"SELECT id, app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode, created_at \
FROM routes WHERE app_id = $1 ORDER BY created_at",
)
.bind(app_id.into_inner())
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn list_for_script(
&self,
script_id: ScriptId,
) -> Result<Vec<Route>, ScriptRepositoryError> {
let rows = sqlx::query_as::<_, RouteRow>(
"SELECT id, app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode, created_at \
FROM routes WHERE script_id = $1 ORDER BY created_at",
)
.bind(script_id.into_inner())
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn create(&self, input: NewRoute) -> Result<Route, ScriptRepositoryError> {
let res = sqlx::query_as::<_, RouteRow>(
"INSERT INTO routes ( \
app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode \
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \
RETURNING id, app_id, script_id, host_kind, host, host_param_name, \
path_kind, path, method, dispatch_mode, created_at",
)
.bind(input.app_id.into_inner())
.bind(input.script_id.into_inner())
.bind(host_kind_str(input.host_kind))
.bind(&input.host)
.bind(input.host_param_name.as_deref())
.bind(path_kind_str(input.path_kind))
.bind(&input.path)
.bind(input.method.as_deref())
.bind(input.dispatch_mode.as_str())
.fetch_one(&self.pool)
.await;
match res {
Ok(row) => Ok(row.into()),
Err(sqlx::Error::Database(e)) if e.is_unique_violation() => Err(
ScriptRepositoryError::Conflict("a route with this binding already exists".into()),
),
Err(sqlx::Error::Database(e)) if e.is_foreign_key_violation() => {
Err(ScriptRepositoryError::NotFound(input.script_id))
}
Err(e) => Err(e.into()),
}
}
async fn delete(&self, route_id: Uuid) -> Result<(), ScriptRepositoryError> {
let res = sqlx::query("DELETE FROM routes WHERE id = $1")
.bind(route_id)
.execute(&self.pool)
.await?;
if res.rows_affected() == 0 {
return Err(ScriptRepositoryError::NotFound(ScriptId::from(route_id)));
}
Ok(())
}
async fn count_for_app_host(
&self,
app_id: AppId,
host_kind: HostKind,
host: &str,
) -> Result<i64, ScriptRepositoryError> {
let count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM routes \
WHERE app_id = $1 AND host_kind = $2 AND host = $3",
)
.bind(app_id.into_inner())
.bind(host_kind_str(host_kind))
.bind(host)
.fetch_one(&self.pool)
.await?;
Ok(count.0)
}
}
const fn host_kind_str(k: HostKind) -> &'static str {
match k {
HostKind::Any => "any",
HostKind::Strict => "strict",
HostKind::Wildcard => "wildcard",
}
}
const fn path_kind_str(k: PathKind) -> &'static str {
match k {
PathKind::Exact => "exact",
PathKind::Prefix => "prefix",
PathKind::Param => "param",
}
}
#[derive(sqlx::FromRow)]
struct RouteRow {
id: Uuid,
app_id: Uuid,
script_id: Uuid,
host_kind: String,
host: String,
host_param_name: Option<String>,
path_kind: String,
path: String,
method: Option<String>,
dispatch_mode: String,
created_at: chrono::DateTime<chrono::Utc>,
}
impl From<RouteRow> for Route {
fn from(r: RouteRow) -> Self {
Self {
id: r.id,
app_id: r.app_id.into(),
script_id: r.script_id.into(),
host_kind: match r.host_kind.as_str() {
"strict" => HostKind::Strict,
"wildcard" => HostKind::Wildcard,
_ => HostKind::Any,
},
host: r.host,
host_param_name: r.host_param_name,
path_kind: match r.path_kind.as_str() {
"prefix" => PathKind::Prefix,
"param" => PathKind::Param,
_ => PathKind::Exact,
},
path: r.path,
method: r.method,
dispatch_mode: DispatchMode::from_wire(&r.dispatch_mode).unwrap_or(DispatchMode::Sync),
created_at: r.created_at,
}
}
}