feat(v1.1.4): outbound HTTP SDK + cron triggers
HTTP (`http::*`):
- `HttpService` trait (picloud-shared) + reqwest-backed `HttpServiceImpl`
(manager-core), wired into the `Services` bundle.
- SSRF deny-list applied to the resolved IP via a custom reqwest
`dns_resolver` (covers every redirect hop + defeats DNS rebinding) plus
a literal-IP check at URL-parse time. Scheme/port restrictions, request
+ response body caps (stream-with-cap), layered timeout. Error reason is
a CIDR category, never the IP. `PICLOUD_HTTP_ALLOW_PRIVATE` dev override
(logs a startup warning).
- Rhai bridge with three-arg split `verb(url, body, opts)` (resolves the
brief's body-vs-opts contradiction; unknown opt keys throw). Body
dispatch by type; response `#{status,headers,body,body_raw}` with JSON
auto-parse; non-2xx does not throw.
- `Capability::AppHttpRequest` → existing `script:write` scope (no new
Scope variant). `SdkCallCx` gains `script_id` (attribution + User-Agent).
Cron triggers (4th trigger kind):
- Migration 0017 widens the kind/source_kind CHECKs and adds
`cron_trigger_details`. `cron`/`chrono-tz` parse + validate 6-field
schedules and IANA timezones.
- `spawn_cron_scheduler` polls due triggers and enqueues to the universal
outbox; the dispatcher delivers them (one-line match-arm extension).
Catch-up fires exactly once per trigger per tick, not once per missed
window. `ctx.event.cron` for handlers.
- `POST /api/v1/admin/apps/{id}/triggers/cron` reuses the v1.1.3
cross-app + kind!=module target check.
- Dashboard: admin-gated Triggers tab (cron create form + list).
Follow-ups: redact module backend errors at the resolver boundary (log
original at error level); pin `rhai = "=1.24"`; CHANGELOG incl. retroactive
v1.1.3 cross-app-trigger security note. Version bumps: workspace 1.1.4,
SDK 1.5, dashboard 0.10.0.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -23,8 +23,11 @@ tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
uuid.workspace = true
|
||||
chrono.workspace = true
|
||||
chrono-tz.workspace = true
|
||||
cron.workspace = true
|
||||
sqlx.workspace = true
|
||||
url.workspace = true
|
||||
reqwest.workspace = true
|
||||
|
||||
argon2.workspace = true
|
||||
sha2.workspace = true
|
||||
|
||||
43
crates/manager-core/migrations/0017_cron_triggers.sql
Normal file
43
crates/manager-core/migrations/0017_cron_triggers.sql
Normal file
@@ -0,0 +1,43 @@
|
||||
-- v1.1.4: Extend the triggers framework to recognise `cron` as the
|
||||
-- fourth concrete kind (after `kv` v1.1.1, `dead_letter` v1.1.1, `docs`
|
||||
-- v1.1.2). Mirrors the 0014 docs extension: two CHECK constraints widen
|
||||
-- (strictly gaining `'cron'`), one new detail table.
|
||||
--
|
||||
-- Cron rows route through the SAME generic dispatcher path as kv/docs/
|
||||
-- dead_letter (single match-arm extension on the Rust side). The only
|
||||
-- new machinery is a scheduler task that enqueues due cron triggers
|
||||
-- into the outbox; dispatch itself is unchanged.
|
||||
|
||||
-- Extend triggers.kind to include 'cron'. No existing row carries a
|
||||
-- value outside the widened set, so the drop+add is safe.
|
||||
ALTER TABLE triggers DROP CONSTRAINT triggers_kind_check;
|
||||
ALTER TABLE triggers ADD CONSTRAINT triggers_kind_check
|
||||
CHECK (kind IN ('kv', 'dead_letter', 'docs', 'cron'));
|
||||
|
||||
-- Extend outbox.source_kind to include 'cron'. v1.1.x's existing
|
||||
-- source_kinds ('http', 'kv', 'dead_letter', 'docs') stay.
|
||||
ALTER TABLE outbox DROP CONSTRAINT outbox_source_kind_check;
|
||||
ALTER TABLE outbox ADD CONSTRAINT outbox_source_kind_check
|
||||
CHECK (source_kind IN ('http', 'kv', 'dead_letter', 'docs', 'cron'));
|
||||
|
||||
-- One row per cron trigger.
|
||||
-- schedule — 6-field cron expression (with seconds), validated
|
||||
-- at insert time by the `cron` crate.
|
||||
-- timezone — IANA tz name (e.g. "America/Los_Angeles"), validated
|
||||
-- via chrono-tz. Required so schedules like "every
|
||||
-- weekday at 9am" are unambiguous. Defaults to UTC.
|
||||
-- last_fired_at — set transactionally with each enqueue. NULL until
|
||||
-- the trigger first fires. The scheduler computes the
|
||||
-- next fire time in-process from
|
||||
-- (schedule, timezone, last_fired_at); there is no
|
||||
-- stored next_fire column (kept stateless on purpose).
|
||||
CREATE TABLE cron_trigger_details (
|
||||
trigger_id UUID PRIMARY KEY REFERENCES triggers(id) ON DELETE CASCADE,
|
||||
schedule TEXT NOT NULL,
|
||||
timezone TEXT NOT NULL DEFAULT 'UTC',
|
||||
last_fired_at TIMESTAMPTZ
|
||||
);
|
||||
|
||||
-- Hot lookup for the scheduler: "all enabled cron triggers due now"
|
||||
-- scans by last_fired_at.
|
||||
CREATE INDEX idx_cron_triggers_due ON cron_trigger_details (last_fired_at);
|
||||
@@ -72,6 +72,12 @@ pub enum Capability {
|
||||
/// shape as KV write — granted to `editor`+, maps to
|
||||
/// `script:write` on API keys.
|
||||
AppDocsWrite(AppId),
|
||||
/// Make an outbound HTTP request from a script in this app
|
||||
/// (v1.1.4). Maps to `script:write` on API keys: any outbound
|
||||
/// request can exfiltrate data — including read methods like GET —
|
||||
/// so the conservative write mapping is correct. Splitting
|
||||
/// read/write is a v1.2+ refinement. Granted to `editor`+.
|
||||
AppHttpRequest(AppId),
|
||||
/// Create / list / delete triggers for this app (v1.1.1). Maps to
|
||||
/// `app:admin` on API keys — triggers are app-configuration acts
|
||||
/// rather than data-plane access. Granted to `app_admin`+.
|
||||
@@ -101,6 +107,7 @@ impl Capability {
|
||||
| Self::AppKvWrite(id)
|
||||
| Self::AppDocsRead(id)
|
||||
| Self::AppDocsWrite(id)
|
||||
| Self::AppHttpRequest(id)
|
||||
| Self::AppManageTriggers(id)
|
||||
| Self::AppDeadLetterManage(id) => Some(id),
|
||||
}
|
||||
@@ -118,9 +125,10 @@ impl Capability {
|
||||
Scope::InstanceAdmin
|
||||
}
|
||||
Self::AppRead(_) | Self::AppKvRead(_) | Self::AppDocsRead(_) => Scope::ScriptRead,
|
||||
Self::AppWriteScript(_) | Self::AppKvWrite(_) | Self::AppDocsWrite(_) => {
|
||||
Scope::ScriptWrite
|
||||
}
|
||||
Self::AppWriteScript(_)
|
||||
| Self::AppKvWrite(_)
|
||||
| Self::AppDocsWrite(_)
|
||||
| Self::AppHttpRequest(_) => Scope::ScriptWrite,
|
||||
Self::AppWriteRoute(_) => Scope::RouteWrite,
|
||||
Self::AppManageDomains(_) => Scope::DomainManage,
|
||||
Self::AppAdmin(_) | Self::AppManageTriggers(_) | Self::AppDeadLetterManage(_) => {
|
||||
@@ -277,6 +285,7 @@ const fn role_satisfies(role: AppRole, cap: Capability) -> bool {
|
||||
| Capability::AppWriteRoute(_)
|
||||
| Capability::AppKvWrite(_)
|
||||
| Capability::AppDocsWrite(_)
|
||||
| Capability::AppHttpRequest(_)
|
||||
);
|
||||
let in_app_admin = in_editor
|
||||
|| matches!(
|
||||
|
||||
297
crates/manager-core/src/cron_scheduler.rs
Normal file
297
crates/manager-core/src/cron_scheduler.rs
Normal file
@@ -0,0 +1,297 @@
|
||||
//! Cron scheduler — the v1.1.4 time-based trigger source.
|
||||
//!
|
||||
//! A single tokio task polls `cron_trigger_details` on a tick (default
|
||||
//! 30s; `PICLOUD_CRON_TICK_INTERVAL_MS`). For each enabled cron trigger
|
||||
//! whose next scheduled fire is due, it enqueues ONE outbox row
|
||||
//! (`source_kind = 'cron'`) and updates `last_fired_at` — both in the
|
||||
//! same transaction, claimed via `FOR UPDATE SKIP LOCKED` so a future
|
||||
//! multi-node deploy can't double-fire.
|
||||
//!
|
||||
//! The scheduler does NOT dispatch or touch the `ExecutionGate`: it only
|
||||
//! enqueues. The existing dispatcher picks the row up and acquires the
|
||||
//! gate exactly as it does for kv/docs/dead_letter rows.
|
||||
//!
|
||||
//! **Catch-up policy (matches the brief):** a trigger that missed N fire
|
||||
//! windows since `last_fired_at` fires exactly ONCE on the next tick,
|
||||
//! not N times. This falls out of the design: [`next_due`] returns a
|
||||
//! single canonical scheduled time (the first slot after the reference
|
||||
//! point), and after firing we set `last_fired_at = now`, so the next
|
||||
//! tick computes from `now` and sees only future slots. Backfilling
|
||||
//! missed windows is intentionally out of scope (an explicit replay
|
||||
//! action is the v1.2+ escape hatch).
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono_tz::Tz;
|
||||
use cron::Schedule;
|
||||
use picloud_shared::TriggerEvent;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Validate a 6-field cron expression. Returns the parse error message
|
||||
/// on failure.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns the underlying parse error string when `schedule` is not a
|
||||
/// valid cron expression.
|
||||
pub fn validate_schedule(schedule: &str) -> Result<(), String> {
|
||||
Schedule::from_str(schedule)
|
||||
.map(|_| ())
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
/// Validate an IANA timezone name (e.g. `America/Los_Angeles`).
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error string when `timezone` is not a known IANA name.
|
||||
pub fn validate_timezone(timezone: &str) -> Result<(), String> {
|
||||
Tz::from_str(timezone)
|
||||
.map(|_| ())
|
||||
.map_err(|_| format!("unknown IANA timezone: {timezone}"))
|
||||
}
|
||||
|
||||
/// Compute whether a cron trigger is due, and if so its canonical
|
||||
/// scheduled-at moment (UTC).
|
||||
///
|
||||
/// Returns `Some(scheduled_at)` when the first scheduled slot after the
|
||||
/// reference point (`last_fired_at`, or `created_at` if never fired) is
|
||||
/// at/before `now`; `None` otherwise. Returns `None` if the schedule or
|
||||
/// timezone fails to parse (the row is skipped — it should never have
|
||||
/// been inserted, since the admin endpoint validates).
|
||||
#[must_use]
|
||||
pub fn next_due(
|
||||
schedule: &str,
|
||||
timezone: &str,
|
||||
last_fired_at: Option<DateTime<Utc>>,
|
||||
created_at: DateTime<Utc>,
|
||||
now: DateTime<Utc>,
|
||||
) -> Option<DateTime<Utc>> {
|
||||
let sched = Schedule::from_str(schedule).ok()?;
|
||||
let tz = Tz::from_str(timezone).ok()?;
|
||||
// Reference: the last actual fire, or creation if never fired. A
|
||||
// never-fired trigger fires at its first slot at/after creation.
|
||||
let base = last_fired_at.unwrap_or(created_at);
|
||||
let base_tz = base.with_timezone(&tz);
|
||||
let next = sched.after(&base_tz).next()?;
|
||||
let next_utc = next.with_timezone(&Utc);
|
||||
(next_utc <= now).then_some(next_utc)
|
||||
}
|
||||
|
||||
/// Spawn the scheduler loop. Runs for the process lifetime.
|
||||
pub fn spawn_cron_scheduler(pool: PgPool, tick_interval_ms: u32) {
|
||||
// Floor the tick at 1s so a misconfigured 0 can't spin.
|
||||
let interval = Duration::from_millis(u64::from(tick_interval_ms).max(1_000));
|
||||
tokio::spawn(async move {
|
||||
let mut ticker = tokio::time::interval(interval);
|
||||
// Skip the immediate first fire so we don't race startup.
|
||||
ticker.tick().await;
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
if let Err(e) = tick(&pool, Utc::now()).await {
|
||||
tracing::warn!(?e, "cron scheduler tick errored");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct DueRow {
|
||||
id: Uuid,
|
||||
app_id: Uuid,
|
||||
script_id: Uuid,
|
||||
registered_by_principal: Uuid,
|
||||
created_at: DateTime<Utc>,
|
||||
schedule: String,
|
||||
timezone: String,
|
||||
last_fired_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// One scheduler tick: claim enabled cron rows, enqueue the due ones,
|
||||
/// bump `last_fired_at`. Returns the number of triggers fired.
|
||||
async fn tick(pool: &PgPool, now: DateTime<Utc>) -> Result<usize, sqlx::Error> {
|
||||
let mut tx = pool.begin().await?;
|
||||
let rows: Vec<DueRow> = sqlx::query_as(
|
||||
"SELECT t.id, t.app_id, t.script_id, t.registered_by_principal, t.created_at, \
|
||||
d.schedule, d.timezone, d.last_fired_at \
|
||||
FROM cron_trigger_details d \
|
||||
JOIN triggers t ON t.id = d.trigger_id \
|
||||
WHERE t.enabled = TRUE \
|
||||
FOR UPDATE OF d SKIP LOCKED",
|
||||
)
|
||||
.fetch_all(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let mut fired = 0usize;
|
||||
for r in rows {
|
||||
let Some(scheduled_at) =
|
||||
next_due(&r.schedule, &r.timezone, r.last_fired_at, r.created_at, now)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let event = TriggerEvent::Cron {
|
||||
schedule: r.schedule.clone(),
|
||||
timezone: r.timezone.clone(),
|
||||
scheduled_at,
|
||||
fired_at: now,
|
||||
};
|
||||
let payload = serde_json::to_value(&event)
|
||||
.map_err(|e| sqlx::Error::Decode(Box::new(std::io::Error::other(e))))?;
|
||||
|
||||
// Enqueue exactly one outbox row. Relies on the same column
|
||||
// defaults the OutboxEventEmitter uses (next_attempt_at = NOW(),
|
||||
// attempt_count = 0, claimed_at NULL → immediately due).
|
||||
sqlx::query(
|
||||
"INSERT INTO outbox \
|
||||
(app_id, source_kind, trigger_id, script_id, payload, \
|
||||
origin_principal, trigger_depth) \
|
||||
VALUES ($1, 'cron', $2, $3, $4, $5, 0)",
|
||||
)
|
||||
.bind(r.app_id)
|
||||
.bind(r.id)
|
||||
.bind(r.script_id)
|
||||
.bind(payload)
|
||||
.bind(r.registered_by_principal)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("UPDATE cron_trigger_details SET last_fired_at = $2 WHERE trigger_id = $1")
|
||||
.bind(r.id)
|
||||
.bind(now)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
fired += 1;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(fired)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::TimeZone;
|
||||
|
||||
#[test]
|
||||
fn valid_six_field_schedule_accepted() {
|
||||
// sec min hour dom mon dow — "every weekday at 9am".
|
||||
validate_schedule("0 0 9 * * MON-FRI").unwrap();
|
||||
validate_schedule("*/5 * * * * *").unwrap();
|
||||
validate_schedule("0 0 0 1 1 *").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_schedules_rejected() {
|
||||
// 5-field (no seconds) is not the format we accept.
|
||||
assert!(validate_schedule("* * * * *").is_err());
|
||||
// Gibberish.
|
||||
assert!(validate_schedule("not a cron").is_err());
|
||||
assert!(validate_schedule("").is_err());
|
||||
// Out-of-range hour.
|
||||
assert!(validate_schedule("0 0 99 * * *").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn known_timezones_accepted() {
|
||||
validate_timezone("UTC").unwrap();
|
||||
validate_timezone("America/Los_Angeles").unwrap();
|
||||
validate_timezone("Europe/Berlin").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unknown_timezones_rejected() {
|
||||
assert!(validate_timezone("Mars/Phobos").is_err());
|
||||
assert!(validate_timezone("PST").is_err()); // abbreviations aren't IANA names
|
||||
assert!(validate_timezone("").is_err());
|
||||
}
|
||||
|
||||
fn ts(s: &str) -> DateTime<Utc> {
|
||||
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn due_when_next_slot_is_at_or_before_now() {
|
||||
// Every minute at second 0. Last fired 90s ago → the next slot
|
||||
// after that is due now.
|
||||
let created = ts("2026-06-01T00:00:00Z");
|
||||
let last = Some(ts("2026-06-15T11:58:10Z"));
|
||||
let now = ts("2026-06-15T12:00:05Z");
|
||||
let due = next_due("0 * * * * *", "UTC", last, created, now);
|
||||
assert_eq!(due, Some(ts("2026-06-15T11:59:00Z")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn not_due_when_next_slot_is_in_the_future() {
|
||||
let created = ts("2026-06-01T00:00:00Z");
|
||||
let last = Some(ts("2026-06-15T12:00:00Z"));
|
||||
let now = ts("2026-06-15T12:00:30Z");
|
||||
// Next minute slot is 12:01:00 — still in the future.
|
||||
assert_eq!(next_due("0 * * * * *", "UTC", last, created, now), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn never_fired_uses_created_at_as_reference() {
|
||||
let created = ts("2026-06-15T12:00:10Z");
|
||||
let now = ts("2026-06-15T12:01:30Z");
|
||||
// First slot after creation is 12:01:00, which is <= now → due.
|
||||
let due = next_due("0 * * * * *", "UTC", None, created, now);
|
||||
assert_eq!(due, Some(ts("2026-06-15T12:01:00Z")));
|
||||
}
|
||||
|
||||
/// Catch-up policy: a trigger that missed many windows fires exactly
|
||||
/// ONCE. We simulate two consecutive scheduler ticks the way the DB
|
||||
/// loop does — fire once, set last_fired = now, then re-evaluate.
|
||||
#[test]
|
||||
fn catch_up_fires_exactly_once_after_missed_windows() {
|
||||
let created = ts("2026-06-15T09:00:00Z");
|
||||
// Last fired over 5 minutes (5 windows) ago.
|
||||
let mut last_fired = Some(ts("2026-06-15T11:54:30Z"));
|
||||
let now = ts("2026-06-15T12:00:05Z");
|
||||
|
||||
// Tick 1: due → fire once, advance last_fired to `now`.
|
||||
let first = next_due("0 * * * * *", "UTC", last_fired, created, now);
|
||||
assert!(first.is_some(), "should be due after missing windows");
|
||||
last_fired = Some(now);
|
||||
|
||||
// Tick 2 (same wall-clock): NOT due again — only one fire total,
|
||||
// not one-per-missed-window.
|
||||
let second = next_due("0 * * * * *", "UTC", last_fired, created, now);
|
||||
assert_eq!(second, None, "catch-up must fire exactly once");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn timezone_affects_fire_time() {
|
||||
// "9am every day" in Los Angeles. On 2026-06-15, PDT = UTC-7, so
|
||||
// 09:00 local = 16:00 UTC.
|
||||
let created = ts("2026-06-15T00:00:00Z");
|
||||
let last = Some(ts("2026-06-15T15:59:00Z"));
|
||||
let now = ts("2026-06-15T16:00:30Z");
|
||||
let due = next_due("0 0 9 * * *", "America/Los_Angeles", last, created, now);
|
||||
assert_eq!(due, Some(ts("2026-06-15T16:00:00Z")));
|
||||
// Sanity: the same expression in UTC would NOT be due at 16:00.
|
||||
assert_eq!(next_due("0 0 9 * * *", "UTC", last, created, now), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bad_schedule_or_tz_yields_none() {
|
||||
let created = ts("2026-06-15T00:00:00Z");
|
||||
let now = ts("2026-06-15T12:00:00Z");
|
||||
assert_eq!(next_due("garbage", "UTC", None, created, now), None);
|
||||
assert_eq!(
|
||||
next_due("0 * * * * *", "Mars/Phobos", None, created, now),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn utc_offset_constructor_smoke() {
|
||||
// Guard the chrono TimeZone import is actually exercised.
|
||||
let dt = Utc.with_ymd_and_hms(2026, 6, 15, 12, 0, 0).unwrap();
|
||||
assert_eq!(dt, ts("2026-06-15T12:00:00Z"));
|
||||
}
|
||||
}
|
||||
@@ -208,6 +208,9 @@ async fn resolve(
|
||||
fn admin_cx(app_id: AppId, principal: &Principal) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
// Admin-plane cx (dead-letter replay/resolve) — no script is
|
||||
// executing, so this attribution id is a fresh sentinel.
|
||||
script_id: picloud_shared::ScriptId::new(),
|
||||
principal: Some(principal.clone()),
|
||||
execution_id: picloud_shared::ExecutionId::new(),
|
||||
request_id: picloud_shared::RequestId::new(),
|
||||
|
||||
@@ -163,7 +163,10 @@ impl Dispatcher {
|
||||
return Ok(());
|
||||
}
|
||||
},
|
||||
OutboxSourceKind::Kv | OutboxSourceKind::Docs | OutboxSourceKind::DeadLetter => {
|
||||
OutboxSourceKind::Kv
|
||||
| OutboxSourceKind::Docs
|
||||
| OutboxSourceKind::DeadLetter
|
||||
| OutboxSourceKind::Cron => {
|
||||
let resolved = self.resolve_trigger(&row).await?;
|
||||
let req = match self.build_exec_request(&row, &resolved).await {
|
||||
Ok(req) => req,
|
||||
|
||||
@@ -272,7 +272,7 @@ mod tests {
|
||||
use chrono::Utc;
|
||||
use picloud_shared::{
|
||||
AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal,
|
||||
RequestId, UserId,
|
||||
RequestId, ScriptId, UserId,
|
||||
};
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -507,6 +507,7 @@ mod tests {
|
||||
fn anon_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: None,
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
@@ -520,6 +521,7 @@ mod tests {
|
||||
fn owner_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Owner,
|
||||
@@ -538,6 +540,7 @@ mod tests {
|
||||
fn member_no_role_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Member,
|
||||
|
||||
793
crates/manager-core/src/http_service.rs
Normal file
793
crates/manager-core/src/http_service.rs
Normal file
@@ -0,0 +1,793 @@
|
||||
//! `HttpServiceImpl` — reqwest-backed outbound HTTP for the v1.1.4
|
||||
//! `http::*` SDK.
|
||||
//!
|
||||
//! Mirrors the v1.1.1+ stateful-service shape (`KvServiceImpl`):
|
||||
//! script-as-gate authz (`AppHttpRequest`, skipped when
|
||||
//! `cx.principal` is `None`), with the backend talking to the network
|
||||
//! instead of Postgres. The reqwest client is built once at startup
|
||||
//! with the [`crate::ssrf::SsrfResolver`] wired in via
|
||||
//! `dns_resolver`, so the SSRF deny-list applies at every connection —
|
||||
//! including each redirect hop, since redirects are followed manually
|
||||
//! through the same client.
|
||||
//!
|
||||
//! Layering vs the raw client:
|
||||
//! 1. URL validation: scheme must be http/https; ports 22/25/465/587
|
||||
//! are blocked. (IP-level filtering is the resolver's job.)
|
||||
//! 2. Body-size caps on both request and response (stream-with-cap on
|
||||
//! the response, checking `Content-Length` first).
|
||||
//! 3. Total-request timeout (default 30s, max 60s) on top of the
|
||||
//! client's 10s connect timeout.
|
||||
//! 4. Default `User-Agent` unless the caller set one.
|
||||
//!
|
||||
//! Bodies/headers are never logged (PII): only url + status + duration
|
||||
//! at debug level.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{HttpError, HttpRequest, HttpResponse, HttpService, SdkCallCx};
|
||||
use reqwest::header::{HeaderMap, HeaderName, HeaderValue, CONTENT_TYPE, LOCATION, USER_AGENT};
|
||||
use reqwest::{Client, Method, StatusCode};
|
||||
|
||||
use crate::authz::{self, AuthzRepo, Capability};
|
||||
use crate::ssrf::{self, SsrfPolicy, SSRF_BLOCK_PREFIX};
|
||||
|
||||
/// Default per-request timeout (ms) when the script omits `timeout_ms`.
|
||||
pub const DEFAULT_TIMEOUT_MS: u32 = 30_000;
|
||||
/// Hard ceiling on the per-request timeout. Values above this are
|
||||
/// rejected by the bridge (not silently clamped).
|
||||
pub const MAX_TIMEOUT_MS: u32 = 60_000;
|
||||
/// Default redirect cap.
|
||||
pub const DEFAULT_MAX_REDIRECTS: u32 = 5;
|
||||
/// Hard ceiling on redirects.
|
||||
pub const MAX_REDIRECTS_CEILING: u32 = 10;
|
||||
/// 10 MB default body cap on both directions.
|
||||
const DEFAULT_BODY_LIMIT_BYTES: usize = 10 * 1024 * 1024;
|
||||
/// DNS + connect + TLS hard cap.
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Outbound-HTTP tunables. Env-overridable following the same pattern
|
||||
/// as `TriggerConfig::from_env`.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct HttpConfig {
|
||||
/// Disables the SSRF deny-list entirely. Dev/test only — the binary
|
||||
/// logs a startup warning when this is set.
|
||||
pub allow_private: bool,
|
||||
pub max_request_body_bytes: usize,
|
||||
pub max_response_body_bytes: usize,
|
||||
}
|
||||
|
||||
impl HttpConfig {
|
||||
#[must_use]
|
||||
pub const fn conservative() -> Self {
|
||||
Self {
|
||||
allow_private: false,
|
||||
max_request_body_bytes: DEFAULT_BODY_LIMIT_BYTES,
|
||||
max_response_body_bytes: DEFAULT_BODY_LIMIT_BYTES,
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn from_env() -> Self {
|
||||
let mut c = Self::conservative();
|
||||
if let Ok(v) = env::var("PICLOUD_HTTP_ALLOW_PRIVATE") {
|
||||
c.allow_private =
|
||||
matches!(v.trim().to_ascii_lowercase().as_str(), "1" | "true" | "yes");
|
||||
}
|
||||
load_usize(
|
||||
&mut c.max_request_body_bytes,
|
||||
"PICLOUD_HTTP_MAX_REQUEST_BODY_BYTES",
|
||||
);
|
||||
load_usize(
|
||||
&mut c.max_response_body_bytes,
|
||||
"PICLOUD_HTTP_MAX_RESPONSE_BODY_BYTES",
|
||||
);
|
||||
c
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for HttpConfig {
|
||||
fn default() -> Self {
|
||||
Self::conservative()
|
||||
}
|
||||
}
|
||||
|
||||
fn load_usize(dst: &mut usize, key: &str) {
|
||||
if let Ok(v) = env::var(key) {
|
||||
match v.parse::<usize>() {
|
||||
Ok(n) => *dst = n,
|
||||
Err(e) => {
|
||||
tracing::warn!(env = key, error = %e, "ignoring invalid http-config value");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HttpServiceImpl {
|
||||
client: Client,
|
||||
authz: Arc<dyn AuthzRepo>,
|
||||
config: HttpConfig,
|
||||
/// Same policy wired into the DNS resolver. Held here too because
|
||||
/// reqwest only routes *hostnames* through the custom resolver — a
|
||||
/// URL with a **literal IP** host bypasses it, so literal IPs are
|
||||
/// checked directly at URL-validation time.
|
||||
policy: SsrfPolicy,
|
||||
}
|
||||
|
||||
impl HttpServiceImpl {
|
||||
/// Build the service, constructing the reqwest client with the SSRF
|
||||
/// resolver. Redirects are followed manually (so per-request limits
|
||||
/// are honored and every hop re-resolves through the SSRF
|
||||
/// resolver), hence `redirect(Policy::none())`.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if the reqwest client fails to build — this is a
|
||||
/// startup-time invariant, not a runtime path.
|
||||
#[must_use]
|
||||
pub fn new(config: HttpConfig, authz: Arc<dyn AuthzRepo>) -> Self {
|
||||
let policy = SsrfPolicy::new(config.allow_private);
|
||||
let client = Client::builder()
|
||||
.dns_resolver(ssrf::resolver(policy))
|
||||
.connect_timeout(CONNECT_TIMEOUT)
|
||||
.redirect(reqwest::redirect::Policy::none())
|
||||
.build()
|
||||
.expect("build outbound http client");
|
||||
Self {
|
||||
client,
|
||||
authz,
|
||||
config,
|
||||
policy,
|
||||
}
|
||||
}
|
||||
|
||||
async fn check_request(&self, cx: &SdkCallCx) -> Result<(), HttpError> {
|
||||
if let Some(ref principal) = cx.principal {
|
||||
authz::require(
|
||||
&*self.authz,
|
||||
principal,
|
||||
Capability::AppHttpRequest(cx.app_id),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| HttpError::Forbidden)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HttpService for HttpServiceImpl {
|
||||
async fn request(&self, cx: &SdkCallCx, req: HttpRequest) -> Result<HttpResponse, HttpError> {
|
||||
self.check_request(cx).await?;
|
||||
|
||||
// Request body cap.
|
||||
if let Some(ref body) = req.body {
|
||||
if body.len() > self.config.max_request_body_bytes {
|
||||
return Err(HttpError::BodyTooLarge("request"));
|
||||
}
|
||||
}
|
||||
|
||||
let timeout = Duration::from_millis(u64::from(req.timeout_ms.min(MAX_TIMEOUT_MS)));
|
||||
let started = std::time::Instant::now();
|
||||
let url_for_log = req.url.clone();
|
||||
|
||||
// Whole-request budget (DNS + connect + TLS + all redirect hops
|
||||
// + body read). Connect alone is further bounded by the
|
||||
// client's CONNECT_TIMEOUT.
|
||||
let outcome = match tokio::time::timeout(timeout, self.run(req)).await {
|
||||
Ok(r) => r,
|
||||
Err(_) => Err(HttpError::Timeout),
|
||||
};
|
||||
|
||||
let duration_ms = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
|
||||
match &outcome {
|
||||
Ok(resp) => tracing::debug!(
|
||||
url = %url_for_log,
|
||||
status = resp.status,
|
||||
duration_ms,
|
||||
"outbound http"
|
||||
),
|
||||
Err(err) => tracing::debug!(
|
||||
url = %url_for_log,
|
||||
error = %err,
|
||||
duration_ms,
|
||||
"outbound http failed"
|
||||
),
|
||||
}
|
||||
outcome
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpServiceImpl {
|
||||
/// Core request path: validate, build headers, follow redirects
|
||||
/// manually, read the response body with a cap.
|
||||
async fn run(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
|
||||
let method = Method::from_bytes(req.method.as_bytes())
|
||||
.map_err(|_| HttpError::Backend(format!("invalid method: {}", req.method)))?;
|
||||
|
||||
let mut current = url::Url::parse(&req.url)
|
||||
.map_err(|e| HttpError::InvalidUrl(format!("{}: {e}", req.url)))?;
|
||||
validate_url(¤t, self.policy)?;
|
||||
|
||||
let mut header_map = build_headers(&req, ¤t)?;
|
||||
let mut method = method;
|
||||
let mut body = req.body.clone();
|
||||
let mut redirects: u32 = 0;
|
||||
let max_redirects = req.max_redirects.min(MAX_REDIRECTS_CEILING);
|
||||
|
||||
loop {
|
||||
// Re-validate scheme/port (and literal-IP SSRF) on each hop.
|
||||
// Hostname IP filtering is the resolver's job and runs
|
||||
// automatically at connect time.
|
||||
validate_url(¤t, self.policy)?;
|
||||
|
||||
let mut rb = self.client.request(method.clone(), current.clone());
|
||||
rb = rb.headers(header_map.clone());
|
||||
if let Some(ref b) = body {
|
||||
rb = rb.body(b.clone());
|
||||
}
|
||||
let resp = rb.send().await.map_err(map_reqwest_err)?;
|
||||
let status = resp.status();
|
||||
|
||||
if req.follow_redirects && is_redirect(status) {
|
||||
if let Some(loc) = resp.headers().get(LOCATION) {
|
||||
if redirects >= max_redirects {
|
||||
return Err(HttpError::Backend(format!(
|
||||
"too many redirects (max {max_redirects})"
|
||||
)));
|
||||
}
|
||||
redirects += 1;
|
||||
let loc_str = loc.to_str().map_err(|_| {
|
||||
HttpError::Backend("redirect Location not valid UTF-8".into())
|
||||
})?;
|
||||
current = current
|
||||
.join(loc_str)
|
||||
.map_err(|e| HttpError::InvalidUrl(format!("redirect target: {e}")))?;
|
||||
|
||||
// 303 always → GET; 301/302 historically downgrade
|
||||
// POST→GET (matches browsers). 307/308 preserve.
|
||||
if matches!(status.as_u16(), 301..=303) {
|
||||
method = Method::GET;
|
||||
body = None;
|
||||
header_map.remove(CONTENT_TYPE);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return self.read_capped(resp).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_capped(&self, resp: reqwest::Response) -> Result<HttpResponse, HttpError> {
|
||||
let status = resp.status().as_u16();
|
||||
let mut headers = BTreeMap::new();
|
||||
for (name, value) in resp.headers() {
|
||||
// Header names lowercased per the documented response shape.
|
||||
headers.insert(
|
||||
name.as_str().to_ascii_lowercase(),
|
||||
value.to_str().unwrap_or("").to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let cap = self.config.max_response_body_bytes;
|
||||
if let Some(len) = resp.content_length() {
|
||||
if len > cap as u64 {
|
||||
return Err(HttpError::BodyTooLarge("response"));
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
let mut resp = resp;
|
||||
while let Some(chunk) = resp.chunk().await.map_err(map_reqwest_err)? {
|
||||
if buf.len() + chunk.len() > cap {
|
||||
return Err(HttpError::BodyTooLarge("response"));
|
||||
}
|
||||
buf.extend_from_slice(&chunk);
|
||||
}
|
||||
let body_raw = String::from_utf8_lossy(&buf).into_owned();
|
||||
Ok(HttpResponse {
|
||||
status,
|
||||
headers,
|
||||
body_raw,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// http/https only; block the SSH + SMTP ports; apply the SSRF policy
|
||||
/// to **literal-IP** hosts (hostnames are filtered by the DNS resolver
|
||||
/// at connect time, but literal IPs never reach the resolver).
|
||||
fn validate_url(url: &url::Url, policy: SsrfPolicy) -> Result<(), HttpError> {
|
||||
match url.scheme() {
|
||||
"http" | "https" => {}
|
||||
other => return Err(HttpError::BlockedScheme(other.to_string())),
|
||||
}
|
||||
match url.host() {
|
||||
None => return Err(HttpError::InvalidUrl("missing host".into())),
|
||||
Some(url::Host::Ipv4(ip)) => {
|
||||
policy
|
||||
.check(std::net::IpAddr::V4(ip))
|
||||
.map_err(|reason| HttpError::Ssrf(reason.to_string()))?;
|
||||
}
|
||||
Some(url::Host::Ipv6(ip)) => {
|
||||
policy
|
||||
.check(std::net::IpAddr::V6(ip))
|
||||
.map_err(|reason| HttpError::Ssrf(reason.to_string()))?;
|
||||
}
|
||||
Some(url::Host::Domain(_)) => {}
|
||||
}
|
||||
let port = url
|
||||
.port_or_known_default()
|
||||
.unwrap_or(if url.scheme() == "https" { 443 } else { 80 });
|
||||
if matches!(port, 22 | 25 | 465 | 587) {
|
||||
return Err(HttpError::BlockedPort(port));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the request header map: merge caller headers, then apply the
|
||||
/// default `User-Agent` (unless overridden) and the bridge-chosen
|
||||
/// `Content-Type` (unless overridden).
|
||||
fn build_headers(req: &HttpRequest, _url: &url::Url) -> Result<HeaderMap, HttpError> {
|
||||
let mut map = HeaderMap::new();
|
||||
let mut has_user_agent = false;
|
||||
let mut has_content_type = false;
|
||||
for (k, v) in &req.headers {
|
||||
let name = HeaderName::from_bytes(k.as_bytes())
|
||||
.map_err(|_| HttpError::Backend(format!("invalid header name: {k}")))?;
|
||||
let value = HeaderValue::from_str(v)
|
||||
.map_err(|_| HttpError::Backend(format!("invalid header value for {k}")))?;
|
||||
if name == USER_AGENT {
|
||||
has_user_agent = true;
|
||||
}
|
||||
if name == CONTENT_TYPE {
|
||||
has_content_type = true;
|
||||
}
|
||||
map.append(name, value);
|
||||
}
|
||||
|
||||
if !has_user_agent {
|
||||
let script = req.script_id.as_deref().unwrap_or("unknown");
|
||||
let ua = format!(
|
||||
"picloud/{} (script:{})",
|
||||
picloud_shared::PRODUCT_VERSION,
|
||||
script
|
||||
);
|
||||
if let Ok(value) = HeaderValue::from_str(&ua) {
|
||||
map.insert(USER_AGENT, value);
|
||||
}
|
||||
}
|
||||
|
||||
if !has_content_type {
|
||||
if let Some(ref ct) = req.content_type {
|
||||
if let Ok(value) = HeaderValue::from_str(ct) {
|
||||
map.insert(CONTENT_TYPE, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
const fn is_redirect(status: StatusCode) -> bool {
|
||||
matches!(status.as_u16(), 301..=303 | 307 | 308)
|
||||
}
|
||||
|
||||
/// Map a reqwest error to an `HttpError`, never leaking the resolved
|
||||
/// IP. SSRF blocks are detected by scanning the error source chain for
|
||||
/// the resolver's marker prefix.
|
||||
fn map_reqwest_err(err: reqwest::Error) -> HttpError {
|
||||
if let Some(reason) = ssrf_reason(&err) {
|
||||
return HttpError::Ssrf(reason);
|
||||
}
|
||||
if err.is_timeout() {
|
||||
return HttpError::Timeout;
|
||||
}
|
||||
if err.is_connect() {
|
||||
return HttpError::Network("connection failed".into());
|
||||
}
|
||||
if err.is_request() {
|
||||
return HttpError::Network("request failed".into());
|
||||
}
|
||||
HttpError::Network("network error".into())
|
||||
}
|
||||
|
||||
/// Walk the error source chain looking for the SSRF marker the resolver
|
||||
/// embeds. Returns the category reason (no IP) when found.
|
||||
fn ssrf_reason(err: &reqwest::Error) -> Option<String> {
|
||||
let mut src: Option<&(dyn std::error::Error + 'static)> = Some(err);
|
||||
while let Some(e) = src {
|
||||
let s = e.to_string();
|
||||
if let Some(idx) = s.find(SSRF_BLOCK_PREFIX) {
|
||||
return Some(s[idx + SSRF_BLOCK_PREFIX.len()..].to_string());
|
||||
}
|
||||
src = e.source();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::authz::AuthzError;
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{
|
||||
AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, Principal, RequestId, ScriptId,
|
||||
UserId,
|
||||
};
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::Write as _;
|
||||
use std::net::SocketAddr;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
struct AllowAuthz;
|
||||
#[async_trait]
|
||||
impl AuthzRepo for AllowAuthz {
|
||||
async fn membership(&self, _u: UserId, _a: AppId) -> Result<Option<AppRole>, AuthzError> {
|
||||
Ok(Some(AppRole::Editor))
|
||||
}
|
||||
}
|
||||
struct DenyAuthz;
|
||||
#[async_trait]
|
||||
impl AuthzRepo for DenyAuthz {
|
||||
async fn membership(&self, _u: UserId, _a: AppId) -> Result<Option<AppRole>, AuthzError> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn dev_service(authz: Arc<dyn AuthzRepo>) -> HttpServiceImpl {
|
||||
// allow_private so the test TcpListener on 127.0.0.1 is reachable.
|
||||
let mut config = HttpConfig::conservative();
|
||||
config.allow_private = true;
|
||||
HttpServiceImpl::new(config, authz)
|
||||
}
|
||||
|
||||
fn anon_cx() -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id: AppId::new(),
|
||||
script_id: ScriptId::new(),
|
||||
principal: None,
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
trigger_depth: 0,
|
||||
root_execution_id: ExecutionId::new(),
|
||||
is_dead_letter_handler: false,
|
||||
event: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn member_cx() -> SdkCallCx {
|
||||
let mut cx = anon_cx();
|
||||
cx.principal = Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Member,
|
||||
scopes: None,
|
||||
app_binding: None,
|
||||
});
|
||||
cx
|
||||
}
|
||||
|
||||
fn req(method: &str, url: String) -> HttpRequest {
|
||||
HttpRequest {
|
||||
method: method.into(),
|
||||
url,
|
||||
headers: BTreeMap::new(),
|
||||
body: None,
|
||||
content_type: None,
|
||||
timeout_ms: 5000,
|
||||
follow_redirects: true,
|
||||
max_redirects: 5,
|
||||
script_id: Some("test-script".into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal single-shot HTTP/1.1 server. Reads the request, runs
|
||||
/// `handler` to produce the raw response bytes, writes them, closes.
|
||||
/// Returns the bound address.
|
||||
async fn spawn_server<F>(handler: F) -> SocketAddr
|
||||
where
|
||||
F: Fn(String) -> Vec<u8> + Send + Sync + 'static,
|
||||
{
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let Ok((mut sock, _)) = listener.accept().await else {
|
||||
break;
|
||||
};
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let n = sock.read(&mut buf).await.unwrap_or(0);
|
||||
let request = String::from_utf8_lossy(&buf[..n]).to_string();
|
||||
let response = handler(request);
|
||||
let _ = sock.write_all(&response).await;
|
||||
let _ = sock.flush().await;
|
||||
}
|
||||
});
|
||||
addr
|
||||
}
|
||||
|
||||
fn ok_response(body: &str, content_type: &str) -> Vec<u8> {
|
||||
let mut v = Vec::new();
|
||||
write!(
|
||||
v,
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
|
||||
body.len()
|
||||
)
|
||||
.unwrap();
|
||||
v
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn get_round_trip() {
|
||||
let addr = spawn_server(|_req| ok_response("hello", "text/plain")).await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let resp = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status, 200);
|
||||
assert_eq!(resp.body_raw, "hello");
|
||||
assert_eq!(
|
||||
resp.headers.get("content-type").map(String::as_str),
|
||||
Some("text/plain")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_sends_body_and_default_user_agent() {
|
||||
let addr = spawn_server(|request| {
|
||||
// Echo back whether the body + default UA were present.
|
||||
let has_ua = request.to_lowercase().contains("user-agent: picloud/");
|
||||
let has_body = request.contains("xyzzy");
|
||||
ok_response(&format!("ua={has_ua},body={has_body}"), "text/plain")
|
||||
})
|
||||
.await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let mut r = req("POST", format!("http://{addr}/"));
|
||||
r.body = Some(b"xyzzy".to_vec());
|
||||
r.content_type = Some("text/plain".into());
|
||||
let resp = svc.request(&anon_cx(), r).await.unwrap();
|
||||
assert_eq!(resp.body_raw, "ua=true,body=true");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn custom_user_agent_overrides_default() {
|
||||
let addr = spawn_server(|request| {
|
||||
let has_custom = request.to_lowercase().contains("user-agent: my-agent");
|
||||
let has_default = request.to_lowercase().contains("picloud/");
|
||||
ok_response(
|
||||
&format!("custom={has_custom},default={has_default}"),
|
||||
"text/plain",
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let mut r = req("GET", format!("http://{addr}/"));
|
||||
r.headers.insert("User-Agent".into(), "my-agent".into());
|
||||
let resp = svc.request(&anon_cx(), r).await.unwrap();
|
||||
assert_eq!(resp.body_raw, "custom=true,default=false");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_body_response() {
|
||||
let addr = spawn_server(|_r| {
|
||||
b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\nConnection: close\r\n\r\n".to_vec()
|
||||
})
|
||||
.await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let resp = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status, 204);
|
||||
assert_eq!(resp.body_raw, "");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn non_2xx_does_not_error() {
|
||||
let addr = spawn_server(|_r| {
|
||||
b"HTTP/1.1 500 Internal Server Error\r\nContent-Length: 3\r\nConnection: close\r\n\r\nerr".to_vec()
|
||||
})
|
||||
.await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let resp = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status, 500);
|
||||
assert_eq!(resp.body_raw, "err");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn response_over_content_length_cap_rejected() {
|
||||
let addr = spawn_server(|_r| ok_response("0123456789", "text/plain")).await;
|
||||
let mut config = HttpConfig::conservative();
|
||||
config.allow_private = true;
|
||||
config.max_response_body_bytes = 5; // body is 10 bytes
|
||||
let svc = HttpServiceImpl::new(config, Arc::new(AllowAuthz));
|
||||
let err = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, HttpError::BodyTooLarge("response")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn response_over_cap_without_content_length_rejected_mid_stream() {
|
||||
// No Content-Length header → must be caught while streaming.
|
||||
let addr = spawn_server(|_r| {
|
||||
b"HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n0123456789ABCDEF".to_vec()
|
||||
})
|
||||
.await;
|
||||
let mut config = HttpConfig::conservative();
|
||||
config.allow_private = true;
|
||||
config.max_response_body_bytes = 4;
|
||||
let svc = HttpServiceImpl::new(config, Arc::new(AllowAuthz));
|
||||
let err = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, HttpError::BodyTooLarge("response")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn request_body_over_cap_rejected_before_send() {
|
||||
let mut config = HttpConfig::conservative();
|
||||
config.allow_private = true;
|
||||
config.max_request_body_bytes = 3;
|
||||
let svc = HttpServiceImpl::new(config, Arc::new(AllowAuthz));
|
||||
let mut r = req("POST", "http://127.0.0.1:1/".into());
|
||||
r.body = Some(b"too long".to_vec());
|
||||
let err = svc.request(&anon_cx(), r).await.unwrap_err();
|
||||
assert!(matches!(err, HttpError::BodyTooLarge("request")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn redirect_followed_up_to_then_throws_beyond_max() {
|
||||
// Server always 302s to itself → unbounded redirect loop,
|
||||
// bounded by max_redirects.
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let Ok((mut sock, _)) = listener.accept().await else {
|
||||
break;
|
||||
};
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = sock.read(&mut buf).await;
|
||||
let body = format!(
|
||||
"HTTP/1.1 302 Found\r\nLocation: http://{addr}/next\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"
|
||||
);
|
||||
let _ = sock.write_all(body.as_bytes()).await;
|
||||
}
|
||||
});
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let mut r = req("GET", format!("http://{addr}/"));
|
||||
r.max_redirects = 2;
|
||||
let err = svc.request(&anon_cx(), r).await.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, HttpError::Backend(ref m) if m.contains("too many redirects")),
|
||||
"expected too-many-redirects, got {err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn scheme_rejected() {
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
for url in ["file:///etc/passwd", "ftp://host/x", "gopher://host/"] {
|
||||
let err = svc
|
||||
.request(&anon_cx(), req("GET", url.into()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
match err {
|
||||
HttpError::BlockedScheme(s) => {
|
||||
assert!(url.starts_with(&s), "scheme {s} not in url {url}");
|
||||
}
|
||||
other => panic!("expected BlockedScheme for {url}, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ports_rejected() {
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
for port in [22u16, 25, 465, 587] {
|
||||
let err = svc
|
||||
.request(
|
||||
&anon_cx(),
|
||||
req("GET", format!("http://example.com:{port}/")),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, HttpError::BlockedPort(p) if p == port),
|
||||
"port {port} should be blocked, got {err:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ssrf_blocks_loopback_without_allow_private() {
|
||||
// Default config (deny-list ON). A request to a loopback host
|
||||
// must surface as Ssrf, not a generic network error.
|
||||
let svc = HttpServiceImpl::new(HttpConfig::conservative(), Arc::new(AllowAuthz));
|
||||
let err = svc
|
||||
.request(&anon_cx(), req("GET", "http://127.0.0.1:9/".into()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
match err {
|
||||
HttpError::Ssrf(reason) => {
|
||||
assert_eq!(reason, "loopback");
|
||||
assert!(!reason.contains("127.0.0.1"), "reason must not leak the IP");
|
||||
}
|
||||
other => panic!("expected Ssrf, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ssrf_blocks_hostname_resolving_to_loopback() {
|
||||
// `localhost` resolves to 127.0.0.1 / ::1 — all denied. This
|
||||
// exercises the DNS-resolver path (vs the literal-IP path) and
|
||||
// must surface as Ssrf, not a generic DNS error.
|
||||
let svc = HttpServiceImpl::new(HttpConfig::conservative(), Arc::new(AllowAuthz));
|
||||
let err = svc
|
||||
.request(&anon_cx(), req("GET", "http://localhost:9/".into()))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(
|
||||
matches!(err, HttpError::Ssrf(_)),
|
||||
"expected Ssrf for localhost, got {err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn timeout_throws() {
|
||||
// Server that accepts then never responds.
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
tokio::spawn(async move {
|
||||
if let Ok((sock, _)) = listener.accept().await {
|
||||
// Hold the socket open without replying.
|
||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||
drop(sock);
|
||||
}
|
||||
});
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let mut r = req("GET", format!("http://{addr}/"));
|
||||
r.timeout_ms = 300;
|
||||
let err = svc.request(&anon_cx(), r).await.unwrap_err();
|
||||
assert!(matches!(err, HttpError::Timeout), "got {err:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn anon_skips_authz_member_without_scope_forbidden() {
|
||||
let addr = spawn_server(|_r| ok_response("ok", "text/plain")).await;
|
||||
// Anonymous principal → authz skipped even with DenyAuthz.
|
||||
let svc = dev_service(Arc::new(DenyAuthz));
|
||||
let ok = svc
|
||||
.request(&anon_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await;
|
||||
assert!(ok.is_ok());
|
||||
// Authenticated member with no role → Forbidden.
|
||||
let err = svc
|
||||
.request(&member_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, HttpError::Forbidden));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn member_with_role_allowed() {
|
||||
let addr = spawn_server(|_r| ok_response("ok", "text/plain")).await;
|
||||
let svc = dev_service(Arc::new(AllowAuthz));
|
||||
let resp = svc
|
||||
.request(&member_cx(), req("GET", format!("http://{addr}/")))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status, 200);
|
||||
}
|
||||
}
|
||||
@@ -188,7 +188,7 @@ mod tests {
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{
|
||||
AdminUserId, AppId, AppRole, ExecutionId, InstanceRole, NoopEventEmitter, Principal,
|
||||
RequestId, UserId,
|
||||
RequestId, ScriptId, UserId,
|
||||
};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use tokio::sync::Mutex;
|
||||
@@ -301,6 +301,7 @@ mod tests {
|
||||
fn anon_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: None,
|
||||
execution_id: ExecutionId::new(),
|
||||
request_id: RequestId::new(),
|
||||
@@ -314,6 +315,7 @@ mod tests {
|
||||
fn owner_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Owner,
|
||||
@@ -332,6 +334,7 @@ mod tests {
|
||||
fn member_no_role_cx(app_id: AppId) -> SdkCallCx {
|
||||
SdkCallCx {
|
||||
app_id,
|
||||
script_id: ScriptId::new(),
|
||||
principal: Some(Principal {
|
||||
user_id: AdminUserId::new(),
|
||||
instance_role: InstanceRole::Member,
|
||||
|
||||
@@ -22,6 +22,7 @@ pub mod auth_api;
|
||||
pub mod auth_bootstrap;
|
||||
pub mod auth_middleware;
|
||||
pub mod authz;
|
||||
pub mod cron_scheduler;
|
||||
pub mod dead_letter_repo;
|
||||
pub mod dead_letter_service;
|
||||
pub mod dead_letters_api;
|
||||
@@ -30,6 +31,7 @@ pub mod docs_filter;
|
||||
pub mod docs_repo;
|
||||
pub mod docs_service;
|
||||
pub mod gc;
|
||||
pub mod http_service;
|
||||
pub mod kv_repo;
|
||||
pub mod kv_service;
|
||||
pub mod log_sink;
|
||||
@@ -43,6 +45,7 @@ pub mod route_admin;
|
||||
pub mod route_repo;
|
||||
pub mod sandbox;
|
||||
pub mod scheduler;
|
||||
pub mod ssrf;
|
||||
pub mod trigger_config;
|
||||
pub mod trigger_repo;
|
||||
pub mod triggers_api;
|
||||
@@ -84,6 +87,7 @@ pub use auth_middleware::{
|
||||
API_KEY_PREFIX, API_KEY_PREFIX_LEN, SESSION_COOKIE,
|
||||
};
|
||||
pub use authz::{can, require, AuthzDenied, AuthzError, AuthzRepo, Capability, Decision};
|
||||
pub use cron_scheduler::spawn_cron_scheduler;
|
||||
pub use dead_letter_repo::{
|
||||
DeadLetterRepo, DeadLetterRepoError, DeadLetterRow, NewDeadLetter, PostgresDeadLetterRepo,
|
||||
};
|
||||
@@ -93,6 +97,7 @@ pub use dispatcher::{compute_backoff, Dispatcher, DispatcherError};
|
||||
pub use docs_repo::{DocsRepo, DocsRepoError, PostgresDocsRepo};
|
||||
pub use docs_service::DocsServiceImpl;
|
||||
pub use gc::{spawn_abandoned_gc, spawn_dead_letter_gc};
|
||||
pub use http_service::{HttpConfig, HttpServiceImpl};
|
||||
pub use kv_repo::{KvRepo, KvRepoError, PostgresKvRepo};
|
||||
pub use kv_service::KvServiceImpl;
|
||||
pub use log_sink::PostgresExecutionLogSink;
|
||||
|
||||
@@ -25,6 +25,8 @@ pub enum OutboxSourceKind {
|
||||
/// v1.1.2.
|
||||
Docs,
|
||||
DeadLetter,
|
||||
/// v1.1.4.
|
||||
Cron,
|
||||
}
|
||||
|
||||
impl OutboxSourceKind {
|
||||
@@ -35,6 +37,7 @@ impl OutboxSourceKind {
|
||||
Self::Kv => "kv",
|
||||
Self::Docs => "docs",
|
||||
Self::DeadLetter => "dead_letter",
|
||||
Self::Cron => "cron",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +48,7 @@ impl OutboxSourceKind {
|
||||
"kv" => Some(Self::Kv),
|
||||
"docs" => Some(Self::Docs),
|
||||
"dead_letter" => Some(Self::DeadLetter),
|
||||
"cron" => Some(Self::Cron),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
457
crates/manager-core/src/ssrf.rs
Normal file
457
crates/manager-core/src/ssrf.rs
Normal file
@@ -0,0 +1,457 @@
|
||||
//! SSRF deny-list — the load-bearing security mechanism behind the
|
||||
//! v1.1.4 `http::*` SDK.
|
||||
//!
|
||||
//! The policy is applied to the **resolved IP address**, not the
|
||||
//! hostname. That is the DNS-rebinding defense: a hostname that
|
||||
//! resolves to a public IP at lookup time and a private IP at connect
|
||||
//! time is not exploitable, because reqwest re-runs every connection
|
||||
//! (including post-redirect hops) through [`SsrfResolver`], which
|
||||
//! filters the address list before the socket is opened.
|
||||
//!
|
||||
//! [`SsrfPolicy::check`] returns a CIDR-*category* reason on denial
|
||||
//! (e.g. `"loopback"`, `"private"`) — never the IP itself, so the
|
||||
//! script-visible error can't be used to map the internal network.
|
||||
//!
|
||||
//! `PICLOUD_HTTP_ALLOW_PRIVATE=true` flips `allow_private`, which
|
||||
//! short-circuits every check to allow. That is dev/test-only and the
|
||||
//! binary logs a startup warning when it's set.
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
|
||||
|
||||
/// Decision policy for a single resolved IP. Cheap to clone (one bool).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct SsrfPolicy {
|
||||
/// When true, every address is allowed — the entire deny-list is
|
||||
/// disabled. Set from `PICLOUD_HTTP_ALLOW_PRIVATE`. Dev/test only.
|
||||
pub allow_private: bool,
|
||||
}
|
||||
|
||||
impl SsrfPolicy {
|
||||
#[must_use]
|
||||
pub const fn new(allow_private: bool) -> Self {
|
||||
Self { allow_private }
|
||||
}
|
||||
|
||||
/// `Ok(())` if the IP may be connected to; `Err(reason)` with a
|
||||
/// CIDR-category label otherwise. The reason is safe to surface to
|
||||
/// a script — it never contains the address.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns the deny reason when `ip` falls in a blocked range and
|
||||
/// `allow_private` is false.
|
||||
pub fn check(&self, ip: IpAddr) -> Result<(), &'static str> {
|
||||
if self.allow_private {
|
||||
return Ok(());
|
||||
}
|
||||
match ip {
|
||||
IpAddr::V4(v4) => check_v4(v4),
|
||||
IpAddr::V6(v6) => check_v6(v6),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn is_allowed(&self, ip: IpAddr) -> bool {
|
||||
self.check(ip).is_ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// IPv4 deny-list. Order doesn't matter (ranges are disjoint by
|
||||
/// construction); first match wins for the reason label.
|
||||
// Several arms share a reason ("private") for distinct CIDRs — keeping
|
||||
// them separate documents each blocked range explicitly.
|
||||
#[allow(clippy::match_same_arms)]
|
||||
fn check_v4(ip: Ipv4Addr) -> Result<(), &'static str> {
|
||||
let o = ip.octets();
|
||||
match o {
|
||||
[127, ..] => Err("loopback"),
|
||||
[0, ..] => Err("unspecified"), // 0.0.0.0/8 "this network"
|
||||
[10, ..] => Err("private"),
|
||||
[172, b, ..] if (16..=31).contains(&b) => Err("private"),
|
||||
[192, 168, ..] => Err("private"),
|
||||
[169, 254, ..] => Err("link-local"), // includes cloud metadata 169.254.169.254
|
||||
[100, b, ..] if (64..=127).contains(&b) => Err("carrier-grade-nat"),
|
||||
[224..=239, ..] => Err("multicast"),
|
||||
[240..=255, ..] => Err("reserved"),
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// IPv6 deny-list. IPv4-mapped addresses (`::ffff:0:0/96`) re-run the
|
||||
/// v4 deny-list against the embedded address.
|
||||
fn check_v6(ip: Ipv6Addr) -> Result<(), &'static str> {
|
||||
// IPv4-mapped (::ffff:a.b.c.d) — re-check the embedded v4 address
|
||||
// so a mapped private/loopback address can't sneak through.
|
||||
if let Some(v4) = ip.to_ipv4_mapped() {
|
||||
return check_v4(v4);
|
||||
}
|
||||
if ip == Ipv6Addr::LOCALHOST {
|
||||
return Err("loopback");
|
||||
}
|
||||
if ip == Ipv6Addr::UNSPECIFIED {
|
||||
return Err("unspecified");
|
||||
}
|
||||
let seg0 = ip.segments()[0];
|
||||
if seg0 & 0xffc0 == 0xfe80 {
|
||||
return Err("link-local"); // fe80::/10
|
||||
}
|
||||
if seg0 & 0xfe00 == 0xfc00 {
|
||||
return Err("unique-local"); // fc00::/7
|
||||
}
|
||||
if seg0 & 0xff00 == 0xff00 {
|
||||
return Err("multicast"); // ff00::/8
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Marker error returned by the resolver when *every* resolved address
|
||||
/// for a host was denied. reqwest wraps this into a connect error; the
|
||||
/// `http_service` impl walks the source chain for the
|
||||
/// `"blocked by SSRF policy:"` prefix to surface a clean
|
||||
/// [`crate::http_service::HttpError::Ssrf`] instead of a generic DNS
|
||||
/// failure. Keeping the reason a category label means no IP leaks.
|
||||
#[derive(Debug)]
|
||||
struct SsrfBlocked {
|
||||
reason: &'static str,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SsrfBlocked {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "blocked by SSRF policy: {}", self.reason)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for SsrfBlocked {}
|
||||
|
||||
/// Prefix the resolver embeds in its error and the impl scans for.
|
||||
pub const SSRF_BLOCK_PREFIX: &str = "blocked by SSRF policy: ";
|
||||
|
||||
/// Pluggable host→addresses lookup. Production uses the system
|
||||
/// resolver; tests inject a closure (e.g. to simulate DNS rebinding —
|
||||
/// a different address on a later call).
|
||||
pub type LookupFn = Arc<
|
||||
dyn Fn(String) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<SocketAddr>>> + Send>>
|
||||
+ Send
|
||||
+ Sync,
|
||||
>;
|
||||
|
||||
fn system_lookup(
|
||||
host: String,
|
||||
) -> Pin<Box<dyn Future<Output = std::io::Result<Vec<SocketAddr>>> + Send>> {
|
||||
Box::pin(async move {
|
||||
// Port 0 — reqwest overrides it with the real target port.
|
||||
Ok(tokio::net::lookup_host((host.as_str(), 0u16))
|
||||
.await?
|
||||
.collect())
|
||||
})
|
||||
}
|
||||
|
||||
/// reqwest DNS resolver that delegates to the system resolver, then
|
||||
/// filters the address list through [`SsrfPolicy`]. Plugged in via
|
||||
/// `ClientBuilder::dns_resolver`, so it runs at the actual connection
|
||||
/// point — including for every redirect hop. This is the DNS-rebinding
|
||||
/// defense: filtering happens at connect time, not at URL-parse time.
|
||||
#[derive(Clone)]
|
||||
pub struct SsrfResolver {
|
||||
policy: SsrfPolicy,
|
||||
lookup: LookupFn,
|
||||
}
|
||||
|
||||
impl SsrfResolver {
|
||||
#[must_use]
|
||||
pub fn new(policy: SsrfPolicy) -> Self {
|
||||
Self {
|
||||
policy,
|
||||
lookup: Arc::new(system_lookup),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct with an injected lookup (tests only).
|
||||
#[must_use]
|
||||
pub fn with_lookup(policy: SsrfPolicy, lookup: LookupFn) -> Self {
|
||||
Self { policy, lookup }
|
||||
}
|
||||
}
|
||||
|
||||
impl Resolve for SsrfResolver {
|
||||
fn resolve(&self, name: Name) -> Resolving {
|
||||
let policy = self.policy;
|
||||
let lookup = self.lookup.clone();
|
||||
let host = name.as_str().to_string();
|
||||
Box::pin(async move {
|
||||
let resolved: Vec<SocketAddr> = lookup(host)
|
||||
.await
|
||||
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
|
||||
|
||||
// Empty resolution → genuine DNS miss; let reqwest surface
|
||||
// it as a normal "no addresses" error.
|
||||
if resolved.is_empty() {
|
||||
let addrs: Addrs = Box::new(std::iter::empty());
|
||||
return Ok(addrs);
|
||||
}
|
||||
|
||||
let mut allowed: Vec<SocketAddr> = Vec::with_capacity(resolved.len());
|
||||
let mut last_reason: &'static str = "denied";
|
||||
for sa in resolved {
|
||||
match policy.check(sa.ip()) {
|
||||
Ok(()) => allowed.push(sa),
|
||||
Err(reason) => last_reason = reason,
|
||||
}
|
||||
}
|
||||
|
||||
// Resolution returned addresses but the policy denied them
|
||||
// all → fail with the SSRF marker so the impl can report a
|
||||
// policy block (not a generic DNS error).
|
||||
if allowed.is_empty() {
|
||||
let err: Box<dyn std::error::Error + Send + Sync> = Box::new(SsrfBlocked {
|
||||
reason: last_reason,
|
||||
});
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let addrs: Addrs = Box::new(allowed.into_iter());
|
||||
Ok(addrs)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the resolver. reqwest's `dns_resolver` is generic over a
|
||||
/// concrete `R: Resolve` (it stores `Arc<R>`), so this returns the
|
||||
/// concrete `Arc<SsrfResolver>` rather than a trait object.
|
||||
#[must_use]
|
||||
pub fn resolver(policy: SsrfPolicy) -> Arc<SsrfResolver> {
|
||||
Arc::new(SsrfResolver::new(policy))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::str::FromStr;
|
||||
|
||||
fn denied(ip: &str) -> &'static str {
|
||||
let policy = SsrfPolicy::new(false);
|
||||
policy
|
||||
.check(IpAddr::from_str(ip).unwrap())
|
||||
.expect_err(&format!("{ip} should be denied"))
|
||||
}
|
||||
|
||||
fn allowed(ip: &str) {
|
||||
let policy = SsrfPolicy::new(false);
|
||||
policy
|
||||
.check(IpAddr::from_str(ip).unwrap())
|
||||
.unwrap_or_else(|r| panic!("{ip} should be allowed, denied as {r}"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv4_loopback() {
|
||||
assert_eq!(denied("127.0.0.1"), "loopback");
|
||||
assert_eq!(denied("127.1.2.3"), "loopback");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv4_unspecified() {
|
||||
assert_eq!(denied("0.0.0.0"), "unspecified");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_rfc1918_private() {
|
||||
assert_eq!(denied("10.0.0.1"), "private");
|
||||
assert_eq!(denied("10.255.255.255"), "private");
|
||||
assert_eq!(denied("172.16.0.1"), "private");
|
||||
assert_eq!(denied("172.31.255.255"), "private");
|
||||
assert_eq!(denied("192.168.0.1"), "private");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allows_172_outside_private_range() {
|
||||
// 172.15.x and 172.32.x are public — only 172.16.0.0/12 is private.
|
||||
allowed("172.15.0.1");
|
||||
allowed("172.32.0.1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_link_local_and_cloud_metadata() {
|
||||
assert_eq!(denied("169.254.0.1"), "link-local");
|
||||
// The cloud metadata endpoint is the canonical SSRF target.
|
||||
assert_eq!(denied("169.254.169.254"), "link-local");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_carrier_grade_nat() {
|
||||
assert_eq!(denied("100.64.0.1"), "carrier-grade-nat");
|
||||
assert_eq!(denied("100.127.255.255"), "carrier-grade-nat");
|
||||
// 100.63.x and 100.128.x are outside 100.64.0.0/10.
|
||||
allowed("100.63.0.1");
|
||||
allowed("100.128.0.1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_multicast_and_reserved() {
|
||||
assert_eq!(denied("224.0.0.1"), "multicast");
|
||||
assert_eq!(denied("239.255.255.255"), "multicast");
|
||||
assert_eq!(denied("240.0.0.1"), "reserved");
|
||||
assert_eq!(denied("255.255.255.255"), "reserved");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allows_public_ipv4() {
|
||||
allowed("1.1.1.1");
|
||||
allowed("8.8.8.8");
|
||||
allowed("93.184.216.34"); // example.com
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv6_loopback() {
|
||||
assert_eq!(denied("::1"), "loopback");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv6_unspecified() {
|
||||
assert_eq!(denied("::"), "unspecified");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv6_link_local() {
|
||||
assert_eq!(denied("fe80::1"), "link-local");
|
||||
assert_eq!(denied("febf:ffff::1"), "link-local");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv6_unique_local() {
|
||||
assert_eq!(denied("fc00::1"), "unique-local");
|
||||
assert_eq!(denied("fd12:3456::1"), "unique-local");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn denies_ipv6_multicast() {
|
||||
assert_eq!(denied("ff00::1"), "multicast");
|
||||
assert_eq!(denied("ff02::1"), "multicast");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allows_public_ipv6() {
|
||||
allowed("2606:4700:4700::1111"); // cloudflare
|
||||
allowed("2001:4860:4860::8888"); // google
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ipv4_mapped_ipv6_rechecks_embedded_address() {
|
||||
// ::ffff:127.0.0.1 must be denied via the embedded-v4 re-check.
|
||||
assert_eq!(denied("::ffff:127.0.0.1"), "loopback");
|
||||
assert_eq!(denied("::ffff:10.0.0.1"), "private");
|
||||
assert_eq!(denied("::ffff:169.254.169.254"), "link-local");
|
||||
// A mapped *public* address stays allowed.
|
||||
allowed("::ffff:1.1.1.1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn allow_private_disables_all_denials() {
|
||||
let policy = SsrfPolicy::new(true);
|
||||
for ip in ["127.0.0.1", "10.0.0.1", "169.254.169.254", "::1", "fe80::1"] {
|
||||
assert!(policy.is_allowed(IpAddr::from_str(ip).unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
// --- resolver-path tests (the connect-time filter) ---
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
fn name(s: &str) -> Name {
|
||||
Name::from_str(s).unwrap()
|
||||
}
|
||||
|
||||
fn fixed_lookup(addrs: Vec<SocketAddr>) -> LookupFn {
|
||||
Arc::new(move |_host| {
|
||||
let addrs = addrs.clone();
|
||||
Box::pin(async move { Ok(addrs) })
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolver_returns_only_allowed_addresses() {
|
||||
// A host resolving to one public + one private IP yields only
|
||||
// the public one to reqwest.
|
||||
let public: SocketAddr = "1.1.1.1:0".parse().unwrap();
|
||||
let private: SocketAddr = "10.0.0.1:0".parse().unwrap();
|
||||
let resolver =
|
||||
SsrfResolver::with_lookup(SsrfPolicy::new(false), fixed_lookup(vec![public, private]));
|
||||
let got: Vec<SocketAddr> = resolver
|
||||
.resolve(name("mixed.example"))
|
||||
.await
|
||||
.unwrap()
|
||||
.collect();
|
||||
assert_eq!(got, vec![public]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolver_all_denied_fails_with_ssrf_marker() {
|
||||
// A host resolving to ONLY private IPs fails with the SSRF
|
||||
// marker (not a generic empty/DNS result).
|
||||
let resolver = SsrfResolver::with_lookup(
|
||||
SsrfPolicy::new(false),
|
||||
fixed_lookup(vec![
|
||||
"10.0.0.1:0".parse().unwrap(),
|
||||
"127.0.0.1:0".parse().unwrap(),
|
||||
]),
|
||||
);
|
||||
let Err(err) = resolver.resolve(name("internal.example")).await else {
|
||||
panic!("all-denied resolution should error");
|
||||
};
|
||||
assert!(
|
||||
err.to_string().starts_with(SSRF_BLOCK_PREFIX),
|
||||
"expected SSRF marker, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolver_dns_rebinding_second_resolution_denied() {
|
||||
// Simulate rebinding: public IP on the first lookup, private on
|
||||
// the second. The connect-time filter denies the second.
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let calls2 = calls.clone();
|
||||
let lookup: LookupFn = Arc::new(move |_host| {
|
||||
let n = calls2.fetch_add(1, Ordering::SeqCst);
|
||||
Box::pin(async move {
|
||||
let addr: SocketAddr = if n == 0 {
|
||||
"1.1.1.1:0".parse().unwrap()
|
||||
} else {
|
||||
"127.0.0.1:0".parse().unwrap()
|
||||
};
|
||||
Ok(vec![addr])
|
||||
})
|
||||
});
|
||||
let resolver = SsrfResolver::with_lookup(SsrfPolicy::new(false), lookup);
|
||||
|
||||
// First resolution: public → allowed.
|
||||
let first: Vec<SocketAddr> = resolver
|
||||
.resolve(name("rebind.example"))
|
||||
.await
|
||||
.unwrap()
|
||||
.collect();
|
||||
assert_eq!(first, vec!["1.1.1.1:0".parse::<SocketAddr>().unwrap()]);
|
||||
|
||||
// Second resolution: rebinding returns loopback → denied.
|
||||
let Err(err) = resolver.resolve(name("rebind.example")).await else {
|
||||
panic!("rebound private address must be denied");
|
||||
};
|
||||
assert!(err.to_string().contains("loopback"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resolver_empty_resolution_is_not_ssrf() {
|
||||
// Genuine DNS miss (no addresses) returns an empty iterator,
|
||||
// NOT the SSRF marker — reqwest surfaces a normal DNS error.
|
||||
let resolver = SsrfResolver::with_lookup(SsrfPolicy::new(false), fixed_lookup(vec![]));
|
||||
let got: Vec<SocketAddr> = resolver
|
||||
.resolve(name("nxdomain.example"))
|
||||
.await
|
||||
.unwrap()
|
||||
.collect();
|
||||
assert!(got.is_empty());
|
||||
}
|
||||
}
|
||||
@@ -56,6 +56,11 @@ pub struct TriggerConfig {
|
||||
pub dead_letter_retention_days: u32,
|
||||
/// abandoned-execution retention before GC, in days. Default 7.
|
||||
pub abandoned_retention_days: u32,
|
||||
|
||||
/// Cron scheduler poll cadence, in ms (v1.1.4). Default 30 000 —
|
||||
/// real-world cron precision is per-minute, so a 30s tick is fine.
|
||||
/// Floored at 1s by the scheduler.
|
||||
pub cron_tick_interval_ms: u32,
|
||||
}
|
||||
|
||||
impl TriggerConfig {
|
||||
@@ -69,6 +74,7 @@ impl TriggerConfig {
|
||||
retry_jitter_pct: 20,
|
||||
dead_letter_retention_days: 30,
|
||||
abandoned_retention_days: 7,
|
||||
cron_tick_interval_ms: 30_000,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,6 +97,10 @@ impl TriggerConfig {
|
||||
&mut c.abandoned_retention_days,
|
||||
"PICLOUD_ABANDONED_EXECUTIONS_RETENTION_DAYS",
|
||||
);
|
||||
load_u32(
|
||||
&mut c.cron_tick_interval_ms,
|
||||
"PICLOUD_CRON_TICK_INTERVAL_MS",
|
||||
);
|
||||
c
|
||||
}
|
||||
}
|
||||
@@ -141,6 +151,7 @@ mod tests {
|
||||
assert_eq!(c.retry_jitter_pct, 20);
|
||||
assert_eq!(c.dead_letter_retention_days, 30);
|
||||
assert_eq!(c.abandoned_retention_days, 7);
|
||||
assert_eq!(c.cron_tick_interval_ms, 30_000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -49,6 +49,8 @@ pub enum TriggerKind {
|
||||
Kv,
|
||||
Docs,
|
||||
DeadLetter,
|
||||
/// v1.1.4.
|
||||
Cron,
|
||||
}
|
||||
|
||||
impl TriggerKind {
|
||||
@@ -58,6 +60,7 @@ impl TriggerKind {
|
||||
Self::Kv => "kv",
|
||||
Self::Docs => "docs",
|
||||
Self::DeadLetter => "dead_letter",
|
||||
Self::Cron => "cron",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,6 +70,7 @@ impl TriggerKind {
|
||||
"kv" => Some(Self::Kv),
|
||||
"docs" => Some(Self::Docs),
|
||||
"dead_letter" => Some(Self::DeadLetter),
|
||||
"cron" => Some(Self::Cron),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
@@ -108,6 +112,14 @@ pub enum TriggerDetails {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
script_id_filter: Option<ScriptId>,
|
||||
},
|
||||
/// v1.1.4. The 6-field cron schedule + IANA timezone the trigger
|
||||
/// fires on, plus the last enqueue time (for dashboard display).
|
||||
Cron {
|
||||
schedule: String,
|
||||
timezone: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
last_fired_at: Option<DateTime<Utc>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Create payload for a KV trigger. Defaults applied at the admin
|
||||
@@ -148,6 +160,21 @@ pub struct CreateDeadLetterTrigger {
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
/// Create payload for a cron trigger (v1.1.4). `schedule` is a 6-field
|
||||
/// cron expression and `timezone` an IANA name; both are validated
|
||||
/// (by the admin endpoint and defensively by the repo) before insert.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CreateCronTrigger {
|
||||
pub script_id: ScriptId,
|
||||
pub schedule: String,
|
||||
pub timezone: String,
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
pub retry_max_attempts: u32,
|
||||
pub retry_backoff: BackoffShape,
|
||||
pub retry_base_ms: u32,
|
||||
pub registered_by_principal: AdminUserId,
|
||||
}
|
||||
|
||||
/// One match for the dispatcher's "which KV triggers fire on this
|
||||
/// event" lookup. Carries everything the dispatcher needs to construct
|
||||
/// the outbox row.
|
||||
@@ -206,6 +233,15 @@ pub trait TriggerRepo: Send + Sync {
|
||||
req: CreateDeadLetterTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError>;
|
||||
|
||||
/// v1.1.4. `schedule` + `timezone` are validated before insert; an
|
||||
/// invalid expression or unknown IANA name returns
|
||||
/// `TriggerRepoError::Invalid`.
|
||||
async fn create_cron_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateCronTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError>;
|
||||
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError>;
|
||||
|
||||
async fn get(&self, id: TriggerId) -> Result<Option<Trigger>, TriggerRepoError>;
|
||||
@@ -453,6 +489,72 @@ impl TriggerRepo for PostgresTriggerRepo {
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_cron_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateCronTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError> {
|
||||
// Defense-in-depth validation (the admin endpoint validates too).
|
||||
crate::cron_scheduler::validate_schedule(&req.schedule)
|
||||
.map_err(|e| TriggerRepoError::Invalid(format!("invalid cron schedule: {e}")))?;
|
||||
crate::cron_scheduler::validate_timezone(&req.timezone)
|
||||
.map_err(|e| TriggerRepoError::Invalid(format!("invalid timezone: {e}")))?;
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let parent: TriggerRow = sqlx::query_as(
|
||||
"INSERT INTO triggers ( \
|
||||
app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal \
|
||||
) VALUES ($1, $2, 'cron', TRUE, $3, $4, $5, $6, $7) \
|
||||
RETURNING id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
retry_max_attempts, retry_backoff, retry_base_ms, \
|
||||
registered_by_principal, created_at, updated_at",
|
||||
)
|
||||
.bind(app_id.into_inner())
|
||||
.bind(req.script_id.into_inner())
|
||||
.bind(req.dispatch_mode.as_str())
|
||||
.bind(i32::try_from(req.retry_max_attempts).unwrap_or(3))
|
||||
.bind(req.retry_backoff.as_str())
|
||||
.bind(i32::try_from(req.retry_base_ms).unwrap_or(1000))
|
||||
.bind(req.registered_by_principal.into_inner())
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO cron_trigger_details (trigger_id, schedule, timezone) \
|
||||
VALUES ($1, $2, $3)",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.bind(&req.schedule)
|
||||
.bind(&req.timezone)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Trigger {
|
||||
id: parent.id.into(),
|
||||
app_id: parent.app_id.into(),
|
||||
script_id: parent.script_id.into(),
|
||||
kind: TriggerKind::Cron,
|
||||
enabled: parent.enabled,
|
||||
dispatch_mode: dispatch_from_str(&parent.dispatch_mode),
|
||||
retry_max_attempts: u32::try_from(parent.retry_max_attempts).unwrap_or(3),
|
||||
retry_backoff: BackoffShape::from_wire(&parent.retry_backoff)
|
||||
.unwrap_or(BackoffShape::Exponential),
|
||||
retry_base_ms: u32::try_from(parent.retry_base_ms).unwrap_or(1000),
|
||||
registered_by_principal: parent.registered_by_principal.into(),
|
||||
created_at: parent.created_at,
|
||||
updated_at: parent.updated_at,
|
||||
details: TriggerDetails::Cron {
|
||||
schedule: req.schedule,
|
||||
timezone: req.timezone,
|
||||
last_fired_at: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
|
||||
let parents: Vec<TriggerRow> = sqlx::query_as(
|
||||
"SELECT id, app_id, script_id, kind, enabled, dispatch_mode, \
|
||||
@@ -681,6 +783,20 @@ async fn hydrate_one(pool: &PgPool, parent: TriggerRow) -> Result<Trigger, Trigg
|
||||
script_id_filter: row.script_id_filter.map(Into::into),
|
||||
}
|
||||
}
|
||||
TriggerKind::Cron => {
|
||||
let row: CronDetailRow = sqlx::query_as(
|
||||
"SELECT schedule, timezone, last_fired_at \
|
||||
FROM cron_trigger_details WHERE trigger_id = $1",
|
||||
)
|
||||
.bind(parent.id)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
TriggerDetails::Cron {
|
||||
schedule: row.schedule,
|
||||
timezone: row.timezone,
|
||||
last_fired_at: row.last_fired_at,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Trigger {
|
||||
@@ -746,6 +862,13 @@ struct KvDetailRow {
|
||||
ops: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct CronDetailRow {
|
||||
schedule: String,
|
||||
timezone: String,
|
||||
last_fired_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
#[allow(clippy::struct_field_names)]
|
||||
struct DlDetailRow {
|
||||
|
||||
@@ -25,8 +25,8 @@ use crate::authz::{require, AuthzDenied, AuthzError, AuthzRepo, Capability};
|
||||
use crate::repo::{ScriptRepository, ScriptRepositoryError};
|
||||
use crate::trigger_config::{BackoffShape, TriggerConfig};
|
||||
use crate::trigger_repo::{
|
||||
CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, Trigger, TriggerDispatchMode,
|
||||
TriggerRepo, TriggerRepoError,
|
||||
CreateCronTrigger, CreateDeadLetterTrigger, CreateDocsTrigger, CreateKvTrigger, Trigger,
|
||||
TriggerDispatchMode, TriggerRepo, TriggerRepoError,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -53,6 +53,7 @@ pub fn triggers_router(state: TriggersState) -> Router {
|
||||
)
|
||||
.route("/apps/{app_id}/triggers/kv", post(create_kv_trigger))
|
||||
.route("/apps/{app_id}/triggers/docs", post(create_docs_trigger))
|
||||
.route("/apps/{app_id}/triggers/cron", post(create_cron_trigger))
|
||||
.route(
|
||||
"/apps/{app_id}/triggers/dead_letter",
|
||||
post(create_dl_trigger),
|
||||
@@ -116,6 +117,28 @@ pub struct CreateDocsTriggerRequest {
|
||||
pub retry_base_ms: Option<u32>,
|
||||
}
|
||||
|
||||
/// v1.1.4 cron trigger. `schedule` is a 6-field cron expression (with
|
||||
/// seconds); `timezone` is an IANA name (defaults to UTC if omitted).
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct CreateCronTriggerRequest {
|
||||
pub script_id: ScriptId,
|
||||
pub schedule: String,
|
||||
#[serde(default = "default_timezone")]
|
||||
pub timezone: String,
|
||||
#[serde(default = "default_dispatch")]
|
||||
pub dispatch_mode: TriggerDispatchMode,
|
||||
#[serde(default)]
|
||||
pub retry_max_attempts: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub retry_backoff: Option<BackoffShape>,
|
||||
#[serde(default)]
|
||||
pub retry_base_ms: Option<u32>,
|
||||
}
|
||||
|
||||
fn default_timezone() -> String {
|
||||
"UTC".to_string()
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct CreateDeadLetterTriggerRequest {
|
||||
pub script_id: ScriptId,
|
||||
@@ -264,6 +287,47 @@ async fn create_docs_trigger(
|
||||
Ok((StatusCode::CREATED, Json(created)))
|
||||
}
|
||||
|
||||
async fn create_cron_trigger(
|
||||
State(s): State<TriggersState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
Path(app_id): Path<AppId>,
|
||||
Json(input): Json<CreateCronTriggerRequest>,
|
||||
) -> Result<(StatusCode, Json<Trigger>), TriggersApiError> {
|
||||
ensure_app_exists(&*s.apps, app_id).await?;
|
||||
require(
|
||||
s.authz.as_ref(),
|
||||
&principal,
|
||||
Capability::AppManageTriggers(app_id),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Validate the schedule + timezone before touching the script repo
|
||||
// so a bad expression fails fast with a clear 422.
|
||||
crate::cron_scheduler::validate_schedule(&input.schedule)
|
||||
.map_err(|e| TriggersApiError::Invalid(format!("invalid cron schedule: {e}")))?;
|
||||
crate::cron_scheduler::validate_timezone(&input.timezone)
|
||||
.map_err(|e| TriggersApiError::Invalid(format!("invalid timezone: {e}")))?;
|
||||
|
||||
// v1.1.3 check: target script exists, lives in this app, is an
|
||||
// endpoint (not a module).
|
||||
validate_trigger_target(&*s.scripts, app_id, input.script_id).await?;
|
||||
|
||||
let req = CreateCronTrigger {
|
||||
script_id: input.script_id,
|
||||
schedule: input.schedule,
|
||||
timezone: input.timezone,
|
||||
dispatch_mode: input.dispatch_mode,
|
||||
retry_max_attempts: input
|
||||
.retry_max_attempts
|
||||
.unwrap_or(s.config.retry_max_attempts),
|
||||
retry_backoff: input.retry_backoff.unwrap_or(s.config.retry_backoff),
|
||||
retry_base_ms: input.retry_base_ms.unwrap_or(s.config.retry_base_ms),
|
||||
registered_by_principal: principal.user_id,
|
||||
};
|
||||
let created = s.triggers.create_cron_trigger(app_id, req).await?;
|
||||
Ok((StatusCode::CREATED, Json(created)))
|
||||
}
|
||||
|
||||
async fn create_dl_trigger(
|
||||
State(s): State<TriggersState>,
|
||||
Extension(principal): Extension<Principal>,
|
||||
@@ -420,8 +484,8 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::app_repo::{AppLookup, AppRepository};
|
||||
use crate::trigger_repo::{
|
||||
DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, Trigger, TriggerDetails,
|
||||
TriggerRepo, TriggerRepoError,
|
||||
CreateCronTrigger, DeadLetterTriggerMatch, DocsTriggerMatch, KvTriggerMatch, Trigger,
|
||||
TriggerDetails, TriggerRepo, TriggerRepoError,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
@@ -523,6 +587,35 @@ mod tests {
|
||||
self.inner.lock().await.insert(id, trigger.clone());
|
||||
Ok(trigger)
|
||||
}
|
||||
async fn create_cron_trigger(
|
||||
&self,
|
||||
app_id: AppId,
|
||||
req: CreateCronTrigger,
|
||||
) -> Result<Trigger, TriggerRepoError> {
|
||||
let now = Utc::now();
|
||||
let id = TriggerId::new();
|
||||
let trigger = Trigger {
|
||||
id,
|
||||
app_id,
|
||||
script_id: req.script_id,
|
||||
kind: crate::trigger_repo::TriggerKind::Cron,
|
||||
enabled: true,
|
||||
dispatch_mode: req.dispatch_mode,
|
||||
retry_max_attempts: req.retry_max_attempts,
|
||||
retry_backoff: req.retry_backoff,
|
||||
retry_base_ms: req.retry_base_ms,
|
||||
registered_by_principal: req.registered_by_principal,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
details: TriggerDetails::Cron {
|
||||
schedule: req.schedule,
|
||||
timezone: req.timezone,
|
||||
last_fired_at: None,
|
||||
},
|
||||
};
|
||||
self.inner.lock().await.insert(id, trigger.clone());
|
||||
Ok(trigger)
|
||||
}
|
||||
async fn list_for_app(&self, app_id: AppId) -> Result<Vec<Trigger>, TriggerRepoError> {
|
||||
Ok(self
|
||||
.inner
|
||||
@@ -1281,6 +1374,169 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------
|
||||
// v1.1.4: cron trigger create.
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
fn cron_req(script_id: ScriptId, schedule: &str, timezone: &str) -> CreateCronTriggerRequest {
|
||||
CreateCronTriggerRequest {
|
||||
script_id,
|
||||
schedule: schedule.into(),
|
||||
timezone: timezone.into(),
|
||||
dispatch_mode: TriggerDispatchMode::Async,
|
||||
retry_max_attempts: None,
|
||||
retry_backoff: None,
|
||||
retry_base_ms: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_create_succeeds() {
|
||||
let app_id = AppId::new();
|
||||
let script_id = ScriptId::new();
|
||||
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
|
||||
let (status, Json(trigger)) = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_id),
|
||||
Json(cron_req(
|
||||
script_id,
|
||||
"0 0 9 * * MON-FRI",
|
||||
"America/Los_Angeles",
|
||||
)),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(status, StatusCode::CREATED);
|
||||
assert!(matches!(
|
||||
trigger.kind,
|
||||
crate::trigger_repo::TriggerKind::Cron
|
||||
));
|
||||
match trigger.details {
|
||||
TriggerDetails::Cron {
|
||||
schedule,
|
||||
timezone,
|
||||
last_fired_at,
|
||||
} => {
|
||||
assert_eq!(schedule, "0 0 9 * * MON-FRI");
|
||||
assert_eq!(timezone, "America/Los_Angeles");
|
||||
assert!(last_fired_at.is_none());
|
||||
}
|
||||
other => panic!("expected Cron details, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_rejects_invalid_schedule() {
|
||||
let app_id = AppId::new();
|
||||
let script_id = ScriptId::new();
|
||||
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
|
||||
let res = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_id),
|
||||
// 5-field expression — not the 6-field format we accept.
|
||||
Json(cron_req(script_id, "* * * * *", "UTC")),
|
||||
)
|
||||
.await;
|
||||
let err = res.expect_err("invalid schedule should reject");
|
||||
let msg = match err {
|
||||
TriggersApiError::Invalid(m) => m,
|
||||
other => panic!("expected Invalid, got {other:?}"),
|
||||
};
|
||||
assert!(msg.to_lowercase().contains("schedule"), "got {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_rejects_unknown_timezone() {
|
||||
let app_id = AppId::new();
|
||||
let script_id = ScriptId::new();
|
||||
let state = state_with_endpoint(Arc::new(AlwaysAllowAuthzRepo), app_id, script_id);
|
||||
let res = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_id),
|
||||
Json(cron_req(script_id, "0 * * * * *", "Mars/Phobos")),
|
||||
)
|
||||
.await;
|
||||
let err = res.expect_err("unknown timezone should reject");
|
||||
let msg = match err {
|
||||
TriggersApiError::Invalid(m) => m,
|
||||
other => panic!("expected Invalid, got {other:?}"),
|
||||
};
|
||||
assert!(msg.to_lowercase().contains("timezone"), "got {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_rejects_module_target() {
|
||||
let app_id = AppId::new();
|
||||
let script_id = ScriptId::new();
|
||||
let state = TriggersState {
|
||||
triggers: Arc::new(InMemoryTriggerRepo::default()),
|
||||
apps: InMemoryAppRepo::with(app_id),
|
||||
authz: Arc::new(AlwaysAllowAuthzRepo),
|
||||
scripts: InMemoryScriptRepo::with_module(app_id, script_id),
|
||||
config: TriggerConfig::conservative(),
|
||||
};
|
||||
let res = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_id),
|
||||
Json(cron_req(script_id, "0 * * * * *", "UTC")),
|
||||
)
|
||||
.await;
|
||||
let err = res.expect_err("module script should be rejected as cron target");
|
||||
let msg = match err {
|
||||
TriggersApiError::Invalid(m) => m,
|
||||
other => panic!("expected Invalid, got {other:?}"),
|
||||
};
|
||||
assert!(msg.to_lowercase().contains("module"), "got {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_rejects_cross_app_script() {
|
||||
// v1.1.3 isolation gap regression: app A cannot target app B's
|
||||
// script via a cron trigger.
|
||||
let app_a = AppId::new();
|
||||
let app_b = AppId::new();
|
||||
let script_id = ScriptId::new();
|
||||
let state = TriggersState {
|
||||
triggers: Arc::new(InMemoryTriggerRepo::default()),
|
||||
apps: InMemoryAppRepo::with(app_a),
|
||||
authz: Arc::new(AlwaysAllowAuthzRepo),
|
||||
scripts: InMemoryScriptRepo::with_endpoint(app_b, script_id),
|
||||
config: TriggerConfig::conservative(),
|
||||
};
|
||||
let res = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_a),
|
||||
Json(cron_req(script_id, "0 * * * * *", "UTC")),
|
||||
)
|
||||
.await;
|
||||
let err = res.expect_err("cross-app cron target should reject");
|
||||
let msg = match err {
|
||||
TriggersApiError::Invalid(m) => m,
|
||||
other => panic!("expected Invalid, got {other:?}"),
|
||||
};
|
||||
assert!(msg.to_lowercase().contains("does not belong"), "got {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cron_trigger_member_without_role_is_forbidden() {
|
||||
let app_id = AppId::new();
|
||||
let state = state_with(Arc::new(AlwaysDenyAuthzRepo), app_id);
|
||||
let res = create_cron_trigger(
|
||||
State(state),
|
||||
Extension(member_principal()),
|
||||
Path(app_id),
|
||||
Json(cron_req(ScriptId::new(), "0 * * * * *", "UTC")),
|
||||
)
|
||||
.await;
|
||||
let err = res.expect_err("member without role should be forbidden");
|
||||
assert!(matches!(err, TriggersApiError::Forbidden));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn kv_trigger_accepts_endpoint_target() {
|
||||
let app_id = AppId::new();
|
||||
|
||||
@@ -3,6 +3,16 @@
|
||||
|
||||
## tables
|
||||
|
||||
table: abandoned_executions
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
app_id: uuid NOT NULL
|
||||
outbox_id: uuid NOT NULL
|
||||
script_id: uuid NULL
|
||||
inbox_id: uuid NOT NULL
|
||||
status_code: integer NOT NULL
|
||||
result_summary: text NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: admin_sessions
|
||||
token_hash: text NOT NULL
|
||||
user_id: uuid NOT NULL
|
||||
@@ -61,6 +71,48 @@ table: apps
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
updated_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: cron_trigger_details
|
||||
trigger_id: uuid NOT NULL
|
||||
schedule: text NOT NULL
|
||||
timezone: text NOT NULL default='UTC'::text
|
||||
last_fired_at: timestamp with time zone NULL
|
||||
|
||||
table: dead_letter_trigger_details
|
||||
trigger_id: uuid NOT NULL
|
||||
source_filter: text NULL
|
||||
trigger_id_filter: uuid NULL
|
||||
script_id_filter: uuid NULL
|
||||
|
||||
table: dead_letters
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
app_id: uuid NOT NULL
|
||||
original_event_id: uuid NOT NULL
|
||||
source: text NOT NULL
|
||||
op: text NOT NULL
|
||||
trigger_id: uuid NULL
|
||||
script_id: uuid NULL
|
||||
payload: jsonb NOT NULL
|
||||
attempt_count: integer NOT NULL
|
||||
first_attempt_at: timestamp with time zone NOT NULL
|
||||
last_attempt_at: timestamp with time zone NOT NULL
|
||||
last_error: text NOT NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
resolved_at: timestamp with time zone NULL
|
||||
resolution: text NULL
|
||||
|
||||
table: docs
|
||||
app_id: uuid NOT NULL
|
||||
collection: text NOT NULL
|
||||
id: uuid NOT NULL
|
||||
data: jsonb NOT NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
updated_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: docs_trigger_details
|
||||
trigger_id: uuid NOT NULL
|
||||
collection_glob: text NOT NULL
|
||||
ops: ARRAY NOT NULL
|
||||
|
||||
table: execution_logs
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
script_id: uuid NOT NULL
|
||||
@@ -76,6 +128,36 @@ table: execution_logs
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
app_id: uuid NOT NULL
|
||||
|
||||
table: kv_entries
|
||||
app_id: uuid NOT NULL
|
||||
collection: text NOT NULL
|
||||
key: text NOT NULL
|
||||
value: jsonb NOT NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
updated_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: kv_trigger_details
|
||||
trigger_id: uuid NOT NULL
|
||||
collection_glob: text NOT NULL
|
||||
ops: ARRAY NOT NULL
|
||||
|
||||
table: outbox
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
app_id: uuid NOT NULL
|
||||
source_kind: text NOT NULL
|
||||
trigger_id: uuid NULL
|
||||
script_id: uuid NULL
|
||||
reply_to: uuid NULL
|
||||
payload: jsonb NOT NULL
|
||||
origin_principal: uuid NULL
|
||||
trigger_depth: integer NOT NULL default=0
|
||||
root_execution_id: uuid NULL
|
||||
attempt_count: integer NOT NULL default=0
|
||||
next_attempt_at: timestamp with time zone NOT NULL default=now()
|
||||
claimed_at: timestamp with time zone NULL
|
||||
claimed_by: text NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: routes
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
script_id: uuid NOT NULL
|
||||
@@ -87,6 +169,13 @@ table: routes
|
||||
method: text NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
app_id: uuid NOT NULL
|
||||
dispatch_mode: text NOT NULL default='sync'::text
|
||||
|
||||
table: script_imports
|
||||
app_id: uuid NOT NULL
|
||||
importer_script_id: uuid NOT NULL
|
||||
imported_script_id: uuid NOT NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
table: scripts
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
@@ -100,9 +189,28 @@ table: scripts
|
||||
updated_at: timestamp with time zone NOT NULL default=now()
|
||||
sandbox: jsonb NOT NULL default='{}'::jsonb
|
||||
app_id: uuid NOT NULL
|
||||
kind: text NOT NULL default='endpoint'::text
|
||||
|
||||
table: triggers
|
||||
id: uuid NOT NULL default=gen_random_uuid()
|
||||
app_id: uuid NOT NULL
|
||||
script_id: uuid NOT NULL
|
||||
kind: text NOT NULL
|
||||
enabled: boolean NOT NULL default=true
|
||||
dispatch_mode: text NOT NULL default='async'::text
|
||||
retry_max_attempts: integer NOT NULL
|
||||
retry_backoff: text NOT NULL
|
||||
retry_base_ms: integer NOT NULL
|
||||
registered_by_principal: uuid NOT NULL
|
||||
created_at: timestamp with time zone NOT NULL default=now()
|
||||
updated_at: timestamp with time zone NOT NULL default=now()
|
||||
|
||||
## indexes
|
||||
|
||||
indexes on abandoned_executions:
|
||||
abandoned_executions_pkey: public.abandoned_executions USING btree (id)
|
||||
idx_abandoned_executions_gc: public.abandoned_executions USING btree (created_at)
|
||||
|
||||
indexes on admin_sessions:
|
||||
admin_sessions_expiry_idx: public.admin_sessions USING btree (expires_at)
|
||||
admin_sessions_pkey: public.admin_sessions USING btree (token_hash)
|
||||
@@ -135,11 +243,43 @@ indexes on apps:
|
||||
apps_pkey: public.apps USING btree (id)
|
||||
apps_slug_key: public.apps USING btree (slug)
|
||||
|
||||
indexes on cron_trigger_details:
|
||||
cron_trigger_details_pkey: public.cron_trigger_details USING btree (trigger_id)
|
||||
idx_cron_triggers_due: public.cron_trigger_details USING btree (last_fired_at)
|
||||
|
||||
indexes on dead_letter_trigger_details:
|
||||
dead_letter_trigger_details_pkey: public.dead_letter_trigger_details USING btree (trigger_id)
|
||||
|
||||
indexes on dead_letters:
|
||||
dead_letters_pkey: public.dead_letters USING btree (id)
|
||||
idx_dead_letters_app_unresolved: public.dead_letters USING btree (app_id) WHERE (resolved_at IS NULL)
|
||||
idx_dead_letters_gc: public.dead_letters USING btree (created_at)
|
||||
|
||||
indexes on docs:
|
||||
docs_pkey: public.docs USING btree (app_id, collection, id)
|
||||
idx_docs_app_collection: public.docs USING btree (app_id, collection)
|
||||
idx_docs_data_gin: public.docs USING gin (data jsonb_path_ops)
|
||||
|
||||
indexes on docs_trigger_details:
|
||||
docs_trigger_details_pkey: public.docs_trigger_details USING btree (trigger_id)
|
||||
|
||||
indexes on execution_logs:
|
||||
execution_logs_app_id_created_at_idx: public.execution_logs USING btree (app_id, created_at DESC)
|
||||
execution_logs_pkey: public.execution_logs USING btree (id)
|
||||
execution_logs_script_id_created_at_idx: public.execution_logs USING btree (script_id, created_at DESC)
|
||||
|
||||
indexes on kv_entries:
|
||||
idx_kv_entries_app_collection: public.kv_entries USING btree (app_id, collection)
|
||||
kv_entries_pkey: public.kv_entries USING btree (app_id, collection, key)
|
||||
|
||||
indexes on kv_trigger_details:
|
||||
kv_trigger_details_pkey: public.kv_trigger_details USING btree (trigger_id)
|
||||
|
||||
indexes on outbox:
|
||||
idx_outbox_app: public.outbox USING btree (app_id)
|
||||
idx_outbox_due: public.outbox USING btree (next_attempt_at) WHERE (claimed_at IS NULL)
|
||||
outbox_pkey: public.outbox USING btree (id)
|
||||
|
||||
indexes on routes:
|
||||
routes_app_id_idx: public.routes USING btree (app_id)
|
||||
routes_lookup_idx: public.routes USING btree (host_kind, host)
|
||||
@@ -147,13 +287,27 @@ indexes on routes:
|
||||
routes_script_id_idx: public.routes USING btree (script_id)
|
||||
routes_unique_binding_idx: public.routes USING btree (app_id, host_kind, host, path_kind, path, COALESCE(method, ''::text))
|
||||
|
||||
indexes on script_imports:
|
||||
idx_script_imports_app: public.script_imports USING btree (app_id)
|
||||
idx_script_imports_imported: public.script_imports USING btree (imported_script_id)
|
||||
script_imports_pkey: public.script_imports USING btree (importer_script_id, imported_script_id)
|
||||
|
||||
indexes on scripts:
|
||||
idx_scripts_app_kind: public.scripts USING btree (app_id, kind)
|
||||
scripts_app_id_idx: public.scripts USING btree (app_id)
|
||||
scripts_name_uidx: public.scripts USING btree (app_id, lower(name))
|
||||
scripts_pkey: public.scripts USING btree (id)
|
||||
|
||||
indexes on triggers:
|
||||
idx_triggers_app_kind_enabled: public.triggers USING btree (app_id, kind) WHERE (enabled = true)
|
||||
triggers_pkey: public.triggers USING btree (id)
|
||||
|
||||
## constraints
|
||||
|
||||
constraints on abandoned_executions:
|
||||
[FOREIGN KEY] abandoned_executions_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] abandoned_executions_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on admin_sessions:
|
||||
[FOREIGN KEY] admin_sessions_user_id_fkey: FOREIGN KEY (user_id) REFERENCES admin_users(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] admin_sessions_pkey: PRIMARY KEY (token_hash)
|
||||
@@ -189,25 +343,77 @@ constraints on apps:
|
||||
[PRIMARY KEY] apps_pkey: PRIMARY KEY (id)
|
||||
[UNIQUE] apps_slug_key: UNIQUE (slug)
|
||||
|
||||
constraints on cron_trigger_details:
|
||||
[FOREIGN KEY] cron_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] cron_trigger_details_pkey: PRIMARY KEY (trigger_id)
|
||||
|
||||
constraints on dead_letter_trigger_details:
|
||||
[FOREIGN KEY] dead_letter_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] dead_letter_trigger_details_pkey: PRIMARY KEY (trigger_id)
|
||||
|
||||
constraints on dead_letters:
|
||||
[CHECK] dead_letters_resolution_check: CHECK ((resolution = ANY (ARRAY['replayed'::text, 'ignored'::text, 'handled_by_script'::text, 'handler_failed'::text])))
|
||||
[FOREIGN KEY] dead_letters_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] dead_letters_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on docs:
|
||||
[FOREIGN KEY] docs_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] docs_pkey: PRIMARY KEY (app_id, collection, id)
|
||||
|
||||
constraints on docs_trigger_details:
|
||||
[FOREIGN KEY] docs_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] docs_trigger_details_pkey: PRIMARY KEY (trigger_id)
|
||||
|
||||
constraints on execution_logs:
|
||||
[CHECK] execution_logs_status_check: CHECK ((status = ANY (ARRAY['success'::text, 'error'::text, 'timeout'::text, 'budget_exceeded'::text])))
|
||||
[FOREIGN KEY] execution_logs_app_id_fk: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] execution_logs_script_id_fkey: FOREIGN KEY (script_id) REFERENCES scripts(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] execution_logs_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on kv_entries:
|
||||
[FOREIGN KEY] kv_entries_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] kv_entries_pkey: PRIMARY KEY (app_id, collection, key)
|
||||
|
||||
constraints on kv_trigger_details:
|
||||
[FOREIGN KEY] kv_trigger_details_trigger_id_fkey: FOREIGN KEY (trigger_id) REFERENCES triggers(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] kv_trigger_details_pkey: PRIMARY KEY (trigger_id)
|
||||
|
||||
constraints on outbox:
|
||||
[CHECK] outbox_source_kind_check: CHECK ((source_kind = ANY (ARRAY['http'::text, 'kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text])))
|
||||
[FOREIGN KEY] outbox_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] outbox_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on routes:
|
||||
[CHECK] routes_dispatch_mode_check: CHECK ((dispatch_mode = ANY (ARRAY['sync'::text, 'async'::text])))
|
||||
[CHECK] routes_host_kind_check: CHECK ((host_kind = ANY (ARRAY['any'::text, 'strict'::text, 'wildcard'::text])))
|
||||
[CHECK] routes_path_kind_check: CHECK ((path_kind = ANY (ARRAY['exact'::text, 'prefix'::text, 'param'::text])))
|
||||
[FOREIGN KEY] routes_app_id_fk: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] routes_script_id_fkey: FOREIGN KEY (script_id) REFERENCES scripts(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] routes_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on script_imports:
|
||||
[FOREIGN KEY] script_imports_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] script_imports_imported_script_id_fkey: FOREIGN KEY (imported_script_id) REFERENCES scripts(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] script_imports_importer_script_id_fkey: FOREIGN KEY (importer_script_id) REFERENCES scripts(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] script_imports_pkey: PRIMARY KEY (importer_script_id, imported_script_id)
|
||||
|
||||
constraints on scripts:
|
||||
[CHECK] scripts_kind_check: CHECK ((kind = ANY (ARRAY['endpoint'::text, 'module'::text])))
|
||||
[CHECK] scripts_memory_limit_mb_check: CHECK (((memory_limit_mb > 0) AND (memory_limit_mb <= 2048)))
|
||||
[CHECK] scripts_module_name_shape: CHECK (((kind <> 'module'::text) OR (name ~ '^[a-zA-Z_][a-zA-Z0-9_]{0,63}$'::text)))
|
||||
[CHECK] scripts_timeout_seconds_check: CHECK (((timeout_seconds > 0) AND (timeout_seconds <= 300)))
|
||||
[FOREIGN KEY] scripts_app_id_fk: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE RESTRICT
|
||||
[PRIMARY KEY] scripts_pkey: PRIMARY KEY (id)
|
||||
|
||||
constraints on triggers:
|
||||
[CHECK] triggers_dispatch_mode_check: CHECK ((dispatch_mode = ANY (ARRAY['sync'::text, 'async'::text])))
|
||||
[CHECK] triggers_kind_check: CHECK ((kind = ANY (ARRAY['kv'::text, 'dead_letter'::text, 'docs'::text, 'cron'::text])))
|
||||
[CHECK] triggers_retry_backoff_check: CHECK ((retry_backoff = ANY (ARRAY['exponential'::text, 'linear'::text, 'constant'::text])))
|
||||
[FOREIGN KEY] triggers_app_id_fkey: FOREIGN KEY (app_id) REFERENCES apps(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] triggers_registered_by_principal_fkey: FOREIGN KEY (registered_by_principal) REFERENCES admin_users(id) ON DELETE CASCADE
|
||||
[FOREIGN KEY] triggers_script_id_fkey: FOREIGN KEY (script_id) REFERENCES scripts(id) ON DELETE CASCADE
|
||||
[PRIMARY KEY] triggers_pkey: PRIMARY KEY (id)
|
||||
|
||||
## applied migrations
|
||||
0001: init
|
||||
0002: sandbox
|
||||
@@ -215,3 +421,14 @@ constraints on scripts:
|
||||
0004: admin auth
|
||||
0005: apps
|
||||
0006: users authz
|
||||
0007: kv
|
||||
0008: triggers
|
||||
0009: outbox
|
||||
0010: dead letters
|
||||
0011: abandoned executions
|
||||
0012: routes dispatch mode
|
||||
0013: docs
|
||||
0014: docs triggers
|
||||
0015: scripts kind
|
||||
0016: script imports
|
||||
0017: cron triggers
|
||||
|
||||
Reference in New Issue
Block a user