From 141c918dd522b4479ffa772aef999f52ef4e3be8 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sat, 16 May 2026 14:31:41 +0200 Subject: [PATCH] backend(infra): shared config helper, startup recovery, periodic maintenance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundations for the v0.16 features. No new endpoints here — those land in the next commit on top of these. - migrations 008 + 009: commit the load-bearing compression_status column that was uncommitted on disk; add 009_feature_toggles seeding the master + per-endpoint rate-limit switches, the master + per-area quota switches, and the admin-editable privacy_note. - services/config.rs (new): get_str / get_i64 / get_usize / get_f64 / get_bool consolidating the scattered helpers that lived in three handlers. - services/maintenance.rs (new): - startup_recovery() — resets compression_status='processing' and export_job.status='running' rows orphaned by a previous crashed instance, so users never see permanent "Wird vorbereitet…" spinners. - spawn_periodic_tasks() — hourly cleanup of expired sessions (rows were never pruned) + rate-limiter HashMap pruning (windows kept one entry per IP forever). - services/jobs.rs (new sketch): BackgroundJob trait + JobContext for future jobs to plug into the same progress + SSE pipeline as compression/export. Not wired yet — codifies the convention. - services/compression.rs: 120s hard timeout + kill_on_drop on ffmpeg so a malformed video can't hang and leak a worker semaphore permit. - services/rate_limiter.rs: new prune() called from the periodic task. - state.rs: SseEvent::new() constructor so event-type strings stay consistent instead of being typed inline at every emit site. - models/user.rs: UserRole::as_str() for /me/context serialization. - models/upload.rs: soft_delete() now runs in a transaction and decrements the uploader's total_upload_bytes (GREATEST(0, …) guard) — fixes a quota drift where deleting reclaimed no quota. - Cargo.toml + Cargo.lock: add `infer = "0.15"` (multipart MIME sniffing used by the upload handler). Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/Cargo.lock | 27 ++++++ backend/Cargo.toml | 1 + .../008_compression_status.down.sql | 2 + .../migrations/008_compression_status.up.sql | 6 ++ .../migrations/009_feature_toggles.down.sql | 11 +++ backend/migrations/009_feature_toggles.up.sql | 16 +++ backend/src/models/upload.rs | 45 ++++++++- backend/src/models/user.rs | 10 ++ backend/src/services/compression.rs | 69 ++++++++++--- backend/src/services/config.rs | 49 ++++++++++ backend/src/services/jobs.rs | 73 ++++++++++++++ backend/src/services/maintenance.rs | 97 +++++++++++++++++++ backend/src/services/mod.rs | 3 + backend/src/services/rate_limiter.rs | 22 +++++ backend/src/state.rs | 13 ++- 15 files changed, 429 insertions(+), 15 deletions(-) create mode 100644 backend/migrations/008_compression_status.down.sql create mode 100644 backend/migrations/008_compression_status.up.sql create mode 100644 backend/migrations/009_feature_toggles.down.sql create mode 100644 backend/migrations/009_feature_toggles.up.sql create mode 100644 backend/src/services/config.rs create mode 100644 backend/src/services/jobs.rs create mode 100644 backend/src/services/maintenance.rs diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 6ce95e0..8396b9d 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -513,6 +513,17 @@ dependencies = [ "shlex", ] +[[package]] +name = "cfb" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38f2da7a0a2c4ccf0065be06397cc26a81f4e528be095826eee9d4adbb8c60f" +dependencies = [ + "byteorder", + "fnv", + "uuid", +] + [[package]] name = "cfg-if" version = "1.0.4" @@ -897,6 +908,7 @@ dependencies = [ "futures", "image", "include_dir", + "infer", "jsonwebtoken", "oxipng", "rand 0.9.2", @@ -1004,6 +1016,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -1609,6 +1627,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "infer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb33622da908807a06f9513c19b3c1ad50fab3e4137d82a78107d502075aa199" +dependencies = [ + "cfb", +] + [[package]] name = "inout" version = "0.1.4" diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 6704236..654a1ad 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -30,6 +30,7 @@ image = "0.25" oxipng = "9" async_zip = { version = "0.0.17", features = ["tokio", "deflate"] } include_dir = "0.7" +infer = "0.15" [profile.release] opt-level = 3 diff --git a/backend/migrations/008_compression_status.down.sql b/backend/migrations/008_compression_status.down.sql new file mode 100644 index 0000000..a53ac7a --- /dev/null +++ b/backend/migrations/008_compression_status.down.sql @@ -0,0 +1,2 @@ +-- Remove compression_status field +ALTER TABLE upload DROP COLUMN compression_status; diff --git a/backend/migrations/008_compression_status.up.sql b/backend/migrations/008_compression_status.up.sql new file mode 100644 index 0000000..ad9f6d2 --- /dev/null +++ b/backend/migrations/008_compression_status.up.sql @@ -0,0 +1,6 @@ +-- Add compression_status to track media processing state +ALTER TABLE upload ADD COLUMN compression_status TEXT NOT NULL DEFAULT 'pending'; + +-- Values: 'pending', 'processing', 'done', 'failed' +-- Add comment to document the field +COMMENT ON COLUMN upload.compression_status IS 'Tracks media compression/preview generation: pending -> processing -> (done or failed)'; diff --git a/backend/migrations/009_feature_toggles.down.sql b/backend/migrations/009_feature_toggles.down.sql new file mode 100644 index 0000000..918decf --- /dev/null +++ b/backend/migrations/009_feature_toggles.down.sql @@ -0,0 +1,11 @@ +DELETE FROM config WHERE key IN ( + 'rate_limits_enabled', + 'upload_rate_enabled', + 'feed_rate_enabled', + 'export_rate_enabled', + 'join_rate_enabled', + 'quota_enabled', + 'storage_quota_enabled', + 'upload_count_quota_enabled', + 'privacy_note' +); diff --git a/backend/migrations/009_feature_toggles.up.sql b/backend/migrations/009_feature_toggles.up.sql new file mode 100644 index 0000000..f67a372 --- /dev/null +++ b/backend/migrations/009_feature_toggles.up.sql @@ -0,0 +1,16 @@ +-- Feature toggles for rate limits and quotas, plus the admin-configurable +-- Datenschutzhinweis. Everything lives in the `config` table — no schema change. +INSERT INTO config (key, value) VALUES + -- Rate limits (master + per-endpoint) + ('rate_limits_enabled', 'true'), + ('upload_rate_enabled', 'true'), + ('feed_rate_enabled', 'true'), + ('export_rate_enabled', 'true'), + ('join_rate_enabled', 'true'), + -- Quotas (master + per-area) + ('quota_enabled', 'true'), + ('storage_quota_enabled', 'true'), + ('upload_count_quota_enabled', 'true'), + -- Free-text privacy note shown to guests in My Account. Plain text — no HTML. + ('privacy_note', '') +ON CONFLICT (key) DO NOTHING; diff --git a/backend/src/models/upload.rs b/backend/src/models/upload.rs index 0dca7a5..54a83ae 100644 --- a/backend/src/models/upload.rs +++ b/backend/src/models/upload.rs @@ -14,6 +14,7 @@ pub struct Upload { pub mime_type: String, pub original_size_bytes: i64, pub caption: Option, + pub compression_status: String, pub created_at: DateTime, pub deleted_at: Option>, } @@ -94,11 +95,36 @@ impl Upload { Ok(()) } + /// Soft-deletes the upload and decrements the uploader's `total_upload_bytes`. + /// Done in a single transaction so a crash between the two writes can't leave + /// the quota counter pointing at bytes the user has already deleted (which would + /// silently lock them out of future uploads). + /// + /// No-op if the row is already deleted — protects against a double-tap on the + /// delete action double-decrementing the counter. pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> { - sqlx::query("UPDATE upload SET deleted_at = NOW() WHERE id = $1") - .bind(id) - .execute(pool) + let mut tx = pool.begin().await?; + let row: Option<(Uuid, i64)> = sqlx::query_as( + "UPDATE upload + SET deleted_at = NOW() + WHERE id = $1 AND deleted_at IS NULL + RETURNING user_id, original_size_bytes", + ) + .bind(id) + .fetch_optional(&mut *tx) + .await?; + if let Some((user_id, bytes)) = row { + sqlx::query( + "UPDATE \"user\" + SET total_upload_bytes = GREATEST(0, total_upload_bytes - $2) + WHERE id = $1", + ) + .bind(user_id) + .bind(bytes) + .execute(&mut *tx) .await?; + } + tx.commit().await?; Ok(()) } @@ -114,4 +140,17 @@ impl Upload { .await?; Ok(()) } + + pub async fn set_compression_status( + pool: &PgPool, + id: Uuid, + status: &str, + ) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE upload SET compression_status = $2 WHERE id = $1") + .bind(id) + .bind(status) + .execute(pool) + .await?; + Ok(()) + } } diff --git a/backend/src/models/user.rs b/backend/src/models/user.rs index 931b802..bed7fbf 100644 --- a/backend/src/models/user.rs +++ b/backend/src/models/user.rs @@ -12,6 +12,16 @@ pub enum UserRole { Admin, } +impl UserRole { + pub fn as_str(&self) -> &'static str { + match self { + UserRole::Guest => "guest", + UserRole::Host => "host", + UserRole::Admin => "admin", + } + } +} + #[derive(Debug, sqlx::FromRow)] pub struct User { pub id: Uuid, diff --git a/backend/src/services/compression.rs b/backend/src/services/compression.rs index f00d395..d42e990 100644 --- a/backend/src/services/compression.rs +++ b/backend/src/services/compression.rs @@ -3,24 +3,27 @@ use std::sync::Arc; use anyhow::{Context, Result}; use sqlx::PgPool; -use tokio::sync::Semaphore; +use tokio::sync::{broadcast, Semaphore}; use uuid::Uuid; use crate::models::upload::Upload; +use crate::state::SseEvent; #[derive(Clone)] pub struct CompressionWorker { semaphore: Arc, pool: PgPool, media_path: PathBuf, + sse_tx: broadcast::Sender, } impl CompressionWorker { - pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize) -> Self { + pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize, sse_tx: broadcast::Sender) -> Self { Self { semaphore: Arc::new(Semaphore::new(concurrency)), pool, media_path, + sse_tx, } } @@ -29,8 +32,22 @@ impl CompressionWorker { let worker = self.clone(); tokio::spawn(async move { let _permit = worker.semaphore.acquire().await; - if let Err(e) = worker.do_process(upload_id, &original_path, &mime_type).await { - tracing::error!("compression failed for upload {upload_id}: {e:#}"); + match worker.do_process(upload_id, &original_path, &mime_type).await { + Ok(_) => { + tracing::info!("compression completed for upload {upload_id}"); + let _ = worker.sse_tx.send(SseEvent { + event_type: "upload-processed".to_string(), + data: serde_json::json!({ "upload_id": upload_id }).to_string(), + }); + } + Err(e) => { + tracing::error!("compression failed for upload {upload_id}: {e:#}"); + let _ = worker.sse_tx.send(SseEvent { + event_type: "upload-error".to_string(), + data: serde_json::json!({ "upload_id": upload_id, "error": e.to_string() }).to_string(), + }); + let _ = Upload::set_compression_status(&worker.pool, upload_id, "failed").await; + } } }); } @@ -41,6 +58,8 @@ impl CompressionWorker { original_path: &str, mime_type: &str, ) -> Result<()> { + Upload::set_compression_status(&self.pool, upload_id, "processing").await?; + let original = self.media_path.join(original_path); if mime_type.starts_with("image/") { @@ -53,6 +72,7 @@ impl CompressionWorker { tracing::info!("thumbnail generated for upload {upload_id}"); } + Upload::set_compression_status(&self.pool, upload_id, "done").await?; Ok(()) } @@ -112,7 +132,11 @@ impl CompressionWorker { let thumb_filename = format!("{upload_id}.jpg"); let thumb_path = thumbs_dir.join(&thumb_filename); - let output = tokio::process::Command::new("ffmpeg") + // Hard timeout — a malformed video can hang `ffmpeg` indefinitely. Without a + // cap, the held compression-worker semaphore permit is never released and the + // pool eventually deadlocks (no further uploads ever processed). 120s is well + // above the time to extract one frame from any sane input. + let mut child = tokio::process::Command::new("ffmpeg") .args([ "-i", original.to_str().unwrap_or_default(), @@ -125,13 +149,36 @@ impl CompressionWorker { "-y", thumb_path.to_str().unwrap_or_default(), ]) - .output() - .await - .context("failed to run ffmpeg")?; + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .kill_on_drop(true) + .spawn() + .context("failed to spawn ffmpeg")?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - anyhow::bail!("ffmpeg failed: {stderr}"); + let status = match tokio::time::timeout( + std::time::Duration::from_secs(120), + child.wait(), + ) + .await + { + Ok(res) => res.context("ffmpeg wait failed")?, + Err(_) => { + let _ = child.kill().await; + anyhow::bail!("ffmpeg timeout after 120s"); + } + }; + + if !status.success() { + // Best-effort: drain stderr for the log. + let mut stderr = Vec::new(); + if let Some(mut handle) = child.stderr.take() { + use tokio::io::AsyncReadExt; + let _ = handle.read_to_end(&mut stderr).await; + } + anyhow::bail!( + "ffmpeg failed: {}", + String::from_utf8_lossy(&stderr) + ); } Ok(format!("thumbnails/{thumb_filename}")) diff --git a/backend/src/services/config.rs b/backend/src/services/config.rs new file mode 100644 index 0000000..2ef893a --- /dev/null +++ b/backend/src/services/config.rs @@ -0,0 +1,49 @@ +//! Reads of the runtime-tunable `config` table. +//! +//! Each handler used to keep a small local copy of these helpers; consolidating them +//! here means one place to add a parser, one place to mock for tests, and one place to +//! find when a key changes. New keys do not require code changes — they're picked up +//! the next time someone calls `get_*`. +//! +//! Values are read with a default fallback so the app still starts if a key is missing +//! (e.g. during a migration window). Production seeds keys via migrations 005 and 009. + +use sqlx::PgPool; + +async fn fetch_raw(pool: &PgPool, key: &str) -> Option { + sqlx::query_as::<_, (String,)>("SELECT value FROM config WHERE key = $1") + .bind(key) + .fetch_optional(pool) + .await + .ok() + .flatten() + .map(|(v,)| v) +} + +pub async fn get_str(pool: &PgPool, key: &str, default: &str) -> String { + fetch_raw(pool, key).await.unwrap_or_else(|| default.to_string()) +} + +pub async fn get_i64(pool: &PgPool, key: &str, default: i64) -> i64 { + fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default) +} + +pub async fn get_usize(pool: &PgPool, key: &str, default: usize) -> usize { + fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default) +} + +pub async fn get_f64(pool: &PgPool, key: &str, default: f64) -> f64 { + fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default) +} + +/// Parses common truthy spellings used by both the migration seeds and the admin form. +/// Accepts `true/false`, `1/0`, `yes/no`, `on/off` — case-insensitive. Anything else +/// returns `default`. +pub async fn get_bool(pool: &PgPool, key: &str, default: bool) -> bool { + let Some(raw) = fetch_raw(pool, key).await else { return default }; + match raw.trim().to_ascii_lowercase().as_str() { + "true" | "1" | "yes" | "on" => true, + "false" | "0" | "no" | "off" => false, + _ => default, + } +} diff --git a/backend/src/services/jobs.rs b/backend/src/services/jobs.rs new file mode 100644 index 0000000..c41aa86 --- /dev/null +++ b/backend/src/services/jobs.rs @@ -0,0 +1,73 @@ +//! Shared shape for long-running background work. +//! +//! Today's [`compression`](crate::services::compression) and [`export`](crate::services::export) +//! pipelines each implement their own progress + SSE plumbing. They could converge on the +//! trait sketched here so future jobs (analytics, archival, ...) plug into one progress +//! pipeline. +//! +//! This module is intentionally a *sketch*: the existing services are not yet wired to +//! it. The aim is to (a) document the convention so new jobs follow it, (b) make the +//! refactor mechanical when someone is ready to do it. See `docs/IDEAS.md` — +//! "Maintainability principles" — for the rationale. +//! +//! Example of an eventual implementor: +//! +//! ```ignore +//! struct ZipExport { event_id: Uuid, /* … */ } +//! +//! impl BackgroundJob for ZipExport { +//! fn name(&self) -> &'static str { "zip-export" } +//! async fn run(self, ctx: JobContext) -> Result<()> { +//! for (i, item) in items.iter().enumerate() { +//! ctx.report(percent(i, items.len())).await?; +//! // … write to zip … +//! } +//! Ok(()) +//! } +//! } +//! ``` + +use anyhow::Result; + +/// Handle handed to a running job: reports progress and emits SSE events. +/// +/// Wraps the existing SSE broadcaster and an optional `export_job` row. Implementors +/// don't need to know about `state.sse_tx` directly — they call [`JobContext::report`] +/// and get the same effect. +pub struct JobContext { + pub job_id: Option, + pub event_kind: &'static str, + pub sse_tx: tokio::sync::broadcast::Sender, + pub pool: sqlx::PgPool, +} + +impl JobContext { + /// Update progress (0..=100) and broadcast an SSE tick. Cheap to call often — + /// rate-limit at the call site if a job emits at > 10 Hz. + pub async fn report(&self, percent: u8) -> Result<()> { + if let Some(job_id) = self.job_id { + sqlx::query("UPDATE export_job SET progress_pct = $1 WHERE id = $2") + .bind(percent as i16) + .bind(job_id) + .execute(&self.pool) + .await?; + } + let _ = self.sse_tx.send(crate::state::SseEvent::new( + self.event_kind, + serde_json::json!({ "progress_pct": percent }).to_string(), + )); + Ok(()) + } +} + +/// One unit of work that publishes progress through a [`JobContext`]. +/// +/// `run` consumes `self`; spawn with `tokio::spawn` at the caller. Errors propagate; +/// the caller is responsible for mapping them to `export_job.error_message` or +/// equivalent. Implementors stay small — the trait deliberately has no `cancel` +/// or `pause`; we have not needed those yet. +#[allow(async_fn_in_trait)] +pub trait BackgroundJob: Send + 'static { + fn name(&self) -> &'static str; + async fn run(self, ctx: JobContext) -> Result<()>; +} diff --git a/backend/src/services/maintenance.rs b/backend/src/services/maintenance.rs new file mode 100644 index 0000000..75e9d41 --- /dev/null +++ b/backend/src/services/maintenance.rs @@ -0,0 +1,97 @@ +//! Startup recovery + periodic background hygiene. +//! +//! Two responsibilities: +//! +//! 1. **Startup sweep** — when the server boots, fix rows left in an "in-progress" +//! state by the previous (possibly crashed) instance. Compression and export jobs +//! each leave a status row when they begin; if the process is killed mid-run, that +//! row stays `'processing'` / `'running'` forever, blocking re-tries and leaving +//! users staring at a spinner. Resetting them on startup recovers gracefully. +//! +//! 2. **Periodic tasks** — pruning that should happen "every hour" rather than per +//! request: expired sessions (otherwise the table grows unboundedly), and the +//! rate-limiter's in-memory windows (so keys for IPs that left long ago don't +//! accumulate). + +use std::time::Duration; + +use sqlx::PgPool; + +use crate::services::rate_limiter::RateLimiter; + +/// Reset rows left in flight by a previous crashed instance. Run once on startup, +/// before the HTTP server starts taking requests, so users never observe the +/// half-state. +pub async fn startup_recovery(pool: &PgPool) { + // Uploads whose preview generation was interrupted. Marking them 'failed' is + // safer than re-queueing — the original file is still on disk, the user can + // delete + re-upload if they care, and we avoid double-processing risk. + match sqlx::query( + "UPDATE upload SET compression_status = 'failed' + WHERE compression_status = 'processing'", + ) + .execute(pool) + .await + { + Ok(r) if r.rows_affected() > 0 => { + tracing::warn!( + "startup recovery: reset {} stuck upload(s) from 'processing' to 'failed'", + r.rows_affected() + ); + } + Ok(_) => {} + Err(e) => tracing::error!("startup recovery: failed to sweep uploads: {e:#}"), + } + + // Export jobs interrupted mid-run. Mark 'failed' so the host can re-trigger. + // The `UNIQUE(event_id, type)` constraint would otherwise block re-release. + match sqlx::query( + "UPDATE export_job + SET status = 'failed', + error_message = COALESCE(error_message, 'Server-Neustart während des Exports') + WHERE status = 'running'", + ) + .execute(pool) + .await + { + Ok(r) if r.rows_affected() > 0 => { + tracing::warn!( + "startup recovery: reset {} stuck export job(s) from 'running' to 'failed'", + r.rows_affected() + ); + } + Ok(_) => {} + Err(e) => tracing::error!("startup recovery: failed to sweep export_job: {e:#}"), + } +} + +/// Spawns a background task that periodically: +/// - deletes session rows whose `expires_at` is more than a day in the past +/// - prunes the in-memory rate-limiter HashMap of empty windows +/// +/// Cadence is 1h — fine for both jobs at our scale. +pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) { + tokio::spawn(async move { + let mut tick = tokio::time::interval(Duration::from_secs(3600)); + // Fire the first tick immediately, then hourly. + tick.tick().await; + loop { + tick.tick().await; + cleanup_sessions(&pool).await; + rate_limiter.prune(); + } + }); +} + +async fn cleanup_sessions(pool: &PgPool) { + match sqlx::query("DELETE FROM session WHERE expires_at < NOW() - INTERVAL '1 day'") + .execute(pool) + .await + { + Ok(r) if r.rows_affected() > 0 => { + tracing::info!("cleaned up {} expired session(s)", r.rows_affected()); + } + Ok(_) => {} + Err(e) => tracing::warn!("session cleanup failed: {e:#}"), + } +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 2254b23..fee99a6 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,3 +1,6 @@ pub mod compression; +pub mod config; pub mod export; +pub mod jobs; +pub mod maintenance; pub mod rate_limiter; diff --git a/backend/src/services/rate_limiter.rs b/backend/src/services/rate_limiter.rs index b45fb0f..d7593a5 100644 --- a/backend/src/services/rate_limiter.rs +++ b/backend/src/services/rate_limiter.rs @@ -41,6 +41,28 @@ impl RateLimiter { Err(remaining.as_secs().max(1)) } } + + /// Drop keys whose windows are empty after expiring old timestamps. Called from a + /// background task (see [`crate::services::maintenance`]) so that long-lived + /// processes don't accumulate one HashMap entry per IP that ever connected. + /// + /// Uses a conservative 24h ceiling — anything older than that is gone regardless + /// of which endpoint's window it was tracked under (the longest window today is + /// 24h for export downloads). If we ever add longer windows, raise this constant. + pub fn prune(&self) { + let now = Instant::now(); + let ceiling = Duration::from_secs(24 * 60 * 60); + let mut map = self.windows.lock().unwrap(); + let before = map.len(); + map.retain(|_, ts| { + ts.retain(|&t| now.duration_since(t) < ceiling); + !ts.is_empty() + }); + let dropped = before.saturating_sub(map.len()); + if dropped > 0 { + tracing::debug!("rate limiter pruned {dropped} idle keys"); + } + } } /// Extract the client IP from X-Forwarded-For (Caddy sets this) or fall back diff --git a/backend/src/state.rs b/backend/src/state.rs index baa4895..d9e52fe 100644 --- a/backend/src/state.rs +++ b/backend/src/state.rs @@ -11,6 +11,17 @@ pub struct SseEvent { pub data: String, } +impl SseEvent { + /// Standardised constructor. Prefer this over building the struct inline so the + /// event-type strings stay consistent across handlers. + pub fn new(event_type: impl Into, data: impl Into) -> Self { + Self { + event_type: event_type.into(), + data: data.into(), + } + } +} + #[derive(Clone)] pub struct AppState { pub pool: PgPool, @@ -24,7 +35,7 @@ impl AppState { pub fn new(pool: PgPool, config: AppConfig) -> Self { let (sse_tx, _) = broadcast::channel(256); let compression = - CompressionWorker::new(pool.clone(), config.media_path.clone(), 2); + CompressionWorker::new(pool.clone(), config.media_path.clone(), 2, sse_tx.clone()); Self { pool, config,