feat(v1.1.2-docs): triggers framework + dispatcher + emitter extended for docs
The docs trigger kind hangs off the same Layout-E shape that v1.1.1
established for KV: a parent triggers row + a docs_trigger_details
row (collection_glob TEXT + ops TEXT[]) with the empty-array =
any-op semantic preserved.
- trigger_repo.rs adds TriggerKind::Docs + TriggerDetails::Docs +
CreateDocsTrigger + DocsTriggerMatch + PostgresTriggerRepo
implementations of create_docs_trigger and list_matching_docs.
list_matching_docs mirrors KV's Rust-side filter (does NOT push
ops membership into SQL — that would exclude empty-ops rows).
- outbox_repo.rs adds OutboxSourceKind::Docs to the enum + wire form.
- dispatcher.rs's generic Kv | DeadLetter routing arm extends to
Kv | DeadLetter | Docs. No kind-specific logic needed — the
resolve_trigger + build_exec_request path is already abstract.
- outbox_event_emitter.rs gains a "docs" arm in the emit match plus
emit_docs which builds TriggerEvent::Docs (carrying data +
prev_data) and fans out across matching triggers.
- triggers_api.rs adds CreateDocsTriggerRequest + create_docs_trigger
+ the POST /api/v1/admin/apps/{id}/triggers/docs route, all
guarded by Capability::AppManageTriggers (same as KV).
3 new triggers_api unit tests covering happy path, empty-glob
rejection, and capability denial. All existing trigger-related
tests still pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -19,7 +19,7 @@ use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use picloud_shared::{
|
||||
EmitError, KvEventOp, SdkCallCx, ServiceEvent, ServiceEventEmitter, TriggerEvent,
|
||||
DocsEventOp, EmitError, KvEventOp, SdkCallCx, ServiceEvent, ServiceEventEmitter, TriggerEvent,
|
||||
};
|
||||
|
||||
use crate::outbox_repo::{NewOutboxRow, OutboxRepo, OutboxSourceKind};
|
||||
@@ -42,6 +42,7 @@ impl ServiceEventEmitter for OutboxEventEmitter {
|
||||
async fn emit(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> {
|
||||
match event.source {
|
||||
"kv" => self.emit_kv(cx, event).await,
|
||||
"docs" => self.emit_docs(cx, event).await,
|
||||
// Future sources land here. For now, silently drop — the
|
||||
// SDK calls `events.emit(...)` unconditionally for forward
|
||||
// compat, so swallowing without an error is correct.
|
||||
@@ -100,4 +101,57 @@ impl OutboxEventEmitter {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// v1.1.2. Mirrors `emit_kv` — fan out a docs mutation across
|
||||
/// matching docs triggers + write one outbox row each. The
|
||||
/// `prev_data` change-data-capture surface is preserved from the
|
||||
/// `ServiceEvent.old_payload` field (set by `DocsServiceImpl` on
|
||||
/// update and delete; `None` for create).
|
||||
async fn emit_docs(&self, cx: &SdkCallCx, event: ServiceEvent) -> Result<(), EmitError> {
|
||||
let Some(op) = DocsEventOp::from_wire(event.op) else {
|
||||
return Ok(());
|
||||
};
|
||||
let Some(collection) = event.collection.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
let id = event.key.clone().unwrap_or_default();
|
||||
|
||||
let matches = self
|
||||
.triggers
|
||||
.list_matching_docs(cx.app_id, &collection, op)
|
||||
.await
|
||||
.map_err(|e| EmitError::Unavailable(format!("trigger lookup: {e}")))?;
|
||||
|
||||
if matches.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let trigger_event = TriggerEvent::Docs {
|
||||
op,
|
||||
collection,
|
||||
id,
|
||||
data: event.payload.clone(),
|
||||
prev_data: event.old_payload.clone(),
|
||||
};
|
||||
let payload = serde_json::to_value(&trigger_event)
|
||||
.map_err(|e| EmitError::Rejected(format!("event serialize: {e}")))?;
|
||||
|
||||
for m in matches {
|
||||
self.outbox
|
||||
.insert(NewOutboxRow {
|
||||
app_id: cx.app_id,
|
||||
source_kind: OutboxSourceKind::Docs,
|
||||
trigger_id: Some(m.trigger_id),
|
||||
script_id: Some(m.script_id),
|
||||
reply_to: None,
|
||||
payload: payload.clone(),
|
||||
origin_principal: cx.principal.as_ref().map(|p| p.user_id),
|
||||
trigger_depth: cx.trigger_depth.saturating_add(1),
|
||||
root_execution_id: Some(cx.root_execution_id),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| EmitError::Unavailable(format!("outbox insert: {e}")))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user