use async_trait::async_trait; use picloud_shared::{ExecutionLog, ExecutionLogSink, LogSinkError}; use sqlx::PgPool; /// Persists `ExecutionLog` rows to the `execution_logs` table. /// /// In cluster mode this impl lives in the manager and is reachable /// from orchestrator nodes via an HTTP wrapper; in single-process MVP /// mode the orchestrator's `DataPlaneState` holds it directly. pub struct PostgresExecutionLogSink { pool: PgPool, } impl PostgresExecutionLogSink { #[must_use] pub fn new(pool: PgPool) -> Self { Self { pool } } } #[async_trait] impl ExecutionLogSink for PostgresExecutionLogSink { async fn record(&self, log: ExecutionLog) -> Result<(), LogSinkError> { let headers = serde_json::to_value(&log.request_headers) .map_err(|e| LogSinkError::Backend(format!("encode headers: {e}")))?; let response_code = log.response_code.map(i32::from); let duration_ms = i32::try_from(log.duration_ms).unwrap_or(i32::MAX); sqlx::query( "INSERT INTO execution_logs ( \ id, app_id, script_id, request_id, \ request_path, request_headers, request_body, \ response_code, response_body, \ logs, duration_ms, status, created_at \ ) VALUES ( \ $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 \ )", ) .bind(log.id) .bind(log.app_id.into_inner()) .bind(log.script_id.into_inner()) .bind(log.request_id.into_inner()) .bind(&log.request_path) .bind(headers) .bind(&log.request_body) .bind(response_code) .bind(&log.response_body) .bind(&log.script_logs) .bind(duration_ms) .bind(log.status.as_str()) .bind(log.created_at) .execute(&self.pool) .await .map_err(|e| LogSinkError::Backend(e.to_string()))?; Ok(()) } }