backend(infra): shared config helper, startup recovery, periodic maintenance

Foundations for the v0.16 features. No new endpoints here — those land in
the next commit on top of these.

- migrations 008 + 009: commit the load-bearing compression_status column
  that was uncommitted on disk; add 009_feature_toggles seeding the master
  + per-endpoint rate-limit switches, the master + per-area quota switches,
  and the admin-editable privacy_note.
- services/config.rs (new): get_str / get_i64 / get_usize / get_f64 / get_bool
  consolidating the scattered helpers that lived in three handlers.
- services/maintenance.rs (new):
  - startup_recovery() — resets compression_status='processing' and
    export_job.status='running' rows orphaned by a previous crashed
    instance, so users never see permanent "Wird vorbereitet…" spinners.
  - spawn_periodic_tasks() — hourly cleanup of expired sessions (rows
    were never pruned) + rate-limiter HashMap pruning (windows kept one
    entry per IP forever).
- services/jobs.rs (new sketch): BackgroundJob trait + JobContext for
  future jobs to plug into the same progress + SSE pipeline as
  compression/export. Not wired yet — codifies the convention.
- services/compression.rs: 120s hard timeout + kill_on_drop on ffmpeg
  so a malformed video can't hang and leak a worker semaphore permit.
- services/rate_limiter.rs: new prune() called from the periodic task.
- state.rs: SseEvent::new() constructor so event-type strings stay
  consistent instead of being typed inline at every emit site.
- models/user.rs: UserRole::as_str() for /me/context serialization.
- models/upload.rs: soft_delete() now runs in a transaction and
  decrements the uploader's total_upload_bytes (GREATEST(0, …) guard) —
  fixes a quota drift where deleting reclaimed no quota.
- Cargo.toml + Cargo.lock: add `infer = "0.15"` (multipart MIME sniffing
  used by the upload handler).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-16 14:31:41 +02:00
parent 9a0ceeced7
commit 141c918dd5
15 changed files with 429 additions and 15 deletions

View File

@@ -14,6 +14,7 @@ pub struct Upload {
pub mime_type: String,
pub original_size_bytes: i64,
pub caption: Option<String>,
pub compression_status: String,
pub created_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
@@ -94,11 +95,36 @@ impl Upload {
Ok(())
}
/// Soft-deletes the upload and decrements the uploader's `total_upload_bytes`.
/// Done in a single transaction so a crash between the two writes can't leave
/// the quota counter pointing at bytes the user has already deleted (which would
/// silently lock them out of future uploads).
///
/// No-op if the row is already deleted — protects against a double-tap on the
/// delete action double-decrementing the counter.
pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET deleted_at = NOW() WHERE id = $1")
.bind(id)
.execute(pool)
let mut tx = pool.begin().await?;
let row: Option<(Uuid, i64)> = sqlx::query_as(
"UPDATE upload
SET deleted_at = NOW()
WHERE id = $1 AND deleted_at IS NULL
RETURNING user_id, original_size_bytes",
)
.bind(id)
.fetch_optional(&mut *tx)
.await?;
if let Some((user_id, bytes)) = row {
sqlx::query(
"UPDATE \"user\"
SET total_upload_bytes = GREATEST(0, total_upload_bytes - $2)
WHERE id = $1",
)
.bind(user_id)
.bind(bytes)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
Ok(())
}
@@ -114,4 +140,17 @@ impl Upload {
.await?;
Ok(())
}
pub async fn set_compression_status(
pool: &PgPool,
id: Uuid,
status: &str,
) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET compression_status = $2 WHERE id = $1")
.bind(id)
.bind(status)
.execute(pool)
.await?;
Ok(())
}
}

View File

@@ -12,6 +12,16 @@ pub enum UserRole {
Admin,
}
impl UserRole {
pub fn as_str(&self) -> &'static str {
match self {
UserRole::Guest => "guest",
UserRole::Host => "host",
UserRole::Admin => "admin",
}
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct User {
pub id: Uuid,

View File

@@ -3,24 +3,27 @@ use std::sync::Arc;
use anyhow::{Context, Result};
use sqlx::PgPool;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, Semaphore};
use uuid::Uuid;
use crate::models::upload::Upload;
use crate::state::SseEvent;
#[derive(Clone)]
pub struct CompressionWorker {
semaphore: Arc<Semaphore>,
pool: PgPool,
media_path: PathBuf,
sse_tx: broadcast::Sender<SseEvent>,
}
impl CompressionWorker {
pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize) -> Self {
pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize, sse_tx: broadcast::Sender<SseEvent>) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(concurrency)),
pool,
media_path,
sse_tx,
}
}
@@ -29,8 +32,22 @@ impl CompressionWorker {
let worker = self.clone();
tokio::spawn(async move {
let _permit = worker.semaphore.acquire().await;
if let Err(e) = worker.do_process(upload_id, &original_path, &mime_type).await {
tracing::error!("compression failed for upload {upload_id}: {e:#}");
match worker.do_process(upload_id, &original_path, &mime_type).await {
Ok(_) => {
tracing::info!("compression completed for upload {upload_id}");
let _ = worker.sse_tx.send(SseEvent {
event_type: "upload-processed".to_string(),
data: serde_json::json!({ "upload_id": upload_id }).to_string(),
});
}
Err(e) => {
tracing::error!("compression failed for upload {upload_id}: {e:#}");
let _ = worker.sse_tx.send(SseEvent {
event_type: "upload-error".to_string(),
data: serde_json::json!({ "upload_id": upload_id, "error": e.to_string() }).to_string(),
});
let _ = Upload::set_compression_status(&worker.pool, upload_id, "failed").await;
}
}
});
}
@@ -41,6 +58,8 @@ impl CompressionWorker {
original_path: &str,
mime_type: &str,
) -> Result<()> {
Upload::set_compression_status(&self.pool, upload_id, "processing").await?;
let original = self.media_path.join(original_path);
if mime_type.starts_with("image/") {
@@ -53,6 +72,7 @@ impl CompressionWorker {
tracing::info!("thumbnail generated for upload {upload_id}");
}
Upload::set_compression_status(&self.pool, upload_id, "done").await?;
Ok(())
}
@@ -112,7 +132,11 @@ impl CompressionWorker {
let thumb_filename = format!("{upload_id}.jpg");
let thumb_path = thumbs_dir.join(&thumb_filename);
let output = tokio::process::Command::new("ffmpeg")
// Hard timeout — a malformed video can hang `ffmpeg` indefinitely. Without a
// cap, the held compression-worker semaphore permit is never released and the
// pool eventually deadlocks (no further uploads ever processed). 120s is well
// above the time to extract one frame from any sane input.
let mut child = tokio::process::Command::new("ffmpeg")
.args([
"-i",
original.to_str().unwrap_or_default(),
@@ -125,13 +149,36 @@ impl CompressionWorker {
"-y",
thumb_path.to_str().unwrap_or_default(),
])
.output()
.await
.context("failed to run ffmpeg")?;
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()
.context("failed to spawn ffmpeg")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ffmpeg failed: {stderr}");
let status = match tokio::time::timeout(
std::time::Duration::from_secs(120),
child.wait(),
)
.await
{
Ok(res) => res.context("ffmpeg wait failed")?,
Err(_) => {
let _ = child.kill().await;
anyhow::bail!("ffmpeg timeout after 120s");
}
};
if !status.success() {
// Best-effort: drain stderr for the log.
let mut stderr = Vec::new();
if let Some(mut handle) = child.stderr.take() {
use tokio::io::AsyncReadExt;
let _ = handle.read_to_end(&mut stderr).await;
}
anyhow::bail!(
"ffmpeg failed: {}",
String::from_utf8_lossy(&stderr)
);
}
Ok(format!("thumbnails/{thumb_filename}"))

View File

@@ -0,0 +1,49 @@
//! Reads of the runtime-tunable `config` table.
//!
//! Each handler used to keep a small local copy of these helpers; consolidating them
//! here means one place to add a parser, one place to mock for tests, and one place to
//! find when a key changes. New keys do not require code changes — they're picked up
//! the next time someone calls `get_*`.
//!
//! Values are read with a default fallback so the app still starts if a key is missing
//! (e.g. during a migration window). Production seeds keys via migrations 005 and 009.
use sqlx::PgPool;
async fn fetch_raw(pool: &PgPool, key: &str) -> Option<String> {
sqlx::query_as::<_, (String,)>("SELECT value FROM config WHERE key = $1")
.bind(key)
.fetch_optional(pool)
.await
.ok()
.flatten()
.map(|(v,)| v)
}
pub async fn get_str(pool: &PgPool, key: &str, default: &str) -> String {
fetch_raw(pool, key).await.unwrap_or_else(|| default.to_string())
}
pub async fn get_i64(pool: &PgPool, key: &str, default: i64) -> i64 {
fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub async fn get_usize(pool: &PgPool, key: &str, default: usize) -> usize {
fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default)
}
pub async fn get_f64(pool: &PgPool, key: &str, default: f64) -> f64 {
fetch_raw(pool, key).await.and_then(|v| v.parse().ok()).unwrap_or(default)
}
/// Parses common truthy spellings used by both the migration seeds and the admin form.
/// Accepts `true/false`, `1/0`, `yes/no`, `on/off` — case-insensitive. Anything else
/// returns `default`.
pub async fn get_bool(pool: &PgPool, key: &str, default: bool) -> bool {
let Some(raw) = fetch_raw(pool, key).await else { return default };
match raw.trim().to_ascii_lowercase().as_str() {
"true" | "1" | "yes" | "on" => true,
"false" | "0" | "no" | "off" => false,
_ => default,
}
}

View File

@@ -0,0 +1,73 @@
//! Shared shape for long-running background work.
//!
//! Today's [`compression`](crate::services::compression) and [`export`](crate::services::export)
//! pipelines each implement their own progress + SSE plumbing. They could converge on the
//! trait sketched here so future jobs (analytics, archival, ...) plug into one progress
//! pipeline.
//!
//! This module is intentionally a *sketch*: the existing services are not yet wired to
//! it. The aim is to (a) document the convention so new jobs follow it, (b) make the
//! refactor mechanical when someone is ready to do it. See `docs/IDEAS.md` —
//! "Maintainability principles" — for the rationale.
//!
//! Example of an eventual implementor:
//!
//! ```ignore
//! struct ZipExport { event_id: Uuid, /* … */ }
//!
//! impl BackgroundJob for ZipExport {
//! fn name(&self) -> &'static str { "zip-export" }
//! async fn run(self, ctx: JobContext) -> Result<()> {
//! for (i, item) in items.iter().enumerate() {
//! ctx.report(percent(i, items.len())).await?;
//! // … write to zip …
//! }
//! Ok(())
//! }
//! }
//! ```
use anyhow::Result;
/// Handle handed to a running job: reports progress and emits SSE events.
///
/// Wraps the existing SSE broadcaster and an optional `export_job` row. Implementors
/// don't need to know about `state.sse_tx` directly — they call [`JobContext::report`]
/// and get the same effect.
pub struct JobContext {
pub job_id: Option<uuid::Uuid>,
pub event_kind: &'static str,
pub sse_tx: tokio::sync::broadcast::Sender<crate::state::SseEvent>,
pub pool: sqlx::PgPool,
}
impl JobContext {
/// Update progress (0..=100) and broadcast an SSE tick. Cheap to call often —
/// rate-limit at the call site if a job emits at > 10 Hz.
pub async fn report(&self, percent: u8) -> Result<()> {
if let Some(job_id) = self.job_id {
sqlx::query("UPDATE export_job SET progress_pct = $1 WHERE id = $2")
.bind(percent as i16)
.bind(job_id)
.execute(&self.pool)
.await?;
}
let _ = self.sse_tx.send(crate::state::SseEvent::new(
self.event_kind,
serde_json::json!({ "progress_pct": percent }).to_string(),
));
Ok(())
}
}
/// One unit of work that publishes progress through a [`JobContext`].
///
/// `run` consumes `self`; spawn with `tokio::spawn` at the caller. Errors propagate;
/// the caller is responsible for mapping them to `export_job.error_message` or
/// equivalent. Implementors stay small — the trait deliberately has no `cancel`
/// or `pause`; we have not needed those yet.
#[allow(async_fn_in_trait)]
pub trait BackgroundJob: Send + 'static {
fn name(&self) -> &'static str;
async fn run(self, ctx: JobContext) -> Result<()>;
}

View File

@@ -0,0 +1,97 @@
//! Startup recovery + periodic background hygiene.
//!
//! Two responsibilities:
//!
//! 1. **Startup sweep** — when the server boots, fix rows left in an "in-progress"
//! state by the previous (possibly crashed) instance. Compression and export jobs
//! each leave a status row when they begin; if the process is killed mid-run, that
//! row stays `'processing'` / `'running'` forever, blocking re-tries and leaving
//! users staring at a spinner. Resetting them on startup recovers gracefully.
//!
//! 2. **Periodic tasks** — pruning that should happen "every hour" rather than per
//! request: expired sessions (otherwise the table grows unboundedly), and the
//! rate-limiter's in-memory windows (so keys for IPs that left long ago don't
//! accumulate).
use std::time::Duration;
use sqlx::PgPool;
use crate::services::rate_limiter::RateLimiter;
/// Reset rows left in flight by a previous crashed instance. Run once on startup,
/// before the HTTP server starts taking requests, so users never observe the
/// half-state.
pub async fn startup_recovery(pool: &PgPool) {
// Uploads whose preview generation was interrupted. Marking them 'failed' is
// safer than re-queueing — the original file is still on disk, the user can
// delete + re-upload if they care, and we avoid double-processing risk.
match sqlx::query(
"UPDATE upload SET compression_status = 'failed'
WHERE compression_status = 'processing'",
)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::warn!(
"startup recovery: reset {} stuck upload(s) from 'processing' to 'failed'",
r.rows_affected()
);
}
Ok(_) => {}
Err(e) => tracing::error!("startup recovery: failed to sweep uploads: {e:#}"),
}
// Export jobs interrupted mid-run. Mark 'failed' so the host can re-trigger.
// The `UNIQUE(event_id, type)` constraint would otherwise block re-release.
match sqlx::query(
"UPDATE export_job
SET status = 'failed',
error_message = COALESCE(error_message, 'Server-Neustart während des Exports')
WHERE status = 'running'",
)
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::warn!(
"startup recovery: reset {} stuck export job(s) from 'running' to 'failed'",
r.rows_affected()
);
}
Ok(_) => {}
Err(e) => tracing::error!("startup recovery: failed to sweep export_job: {e:#}"),
}
}
/// Spawns a background task that periodically:
/// - deletes session rows whose `expires_at` is more than a day in the past
/// - prunes the in-memory rate-limiter HashMap of empty windows
///
/// Cadence is 1h — fine for both jobs at our scale.
pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_secs(3600));
// Fire the first tick immediately, then hourly.
tick.tick().await;
loop {
tick.tick().await;
cleanup_sessions(&pool).await;
rate_limiter.prune();
}
});
}
async fn cleanup_sessions(pool: &PgPool) {
match sqlx::query("DELETE FROM session WHERE expires_at < NOW() - INTERVAL '1 day'")
.execute(pool)
.await
{
Ok(r) if r.rows_affected() > 0 => {
tracing::info!("cleaned up {} expired session(s)", r.rows_affected());
}
Ok(_) => {}
Err(e) => tracing::warn!("session cleanup failed: {e:#}"),
}
}

View File

@@ -1,3 +1,6 @@
pub mod compression;
pub mod config;
pub mod export;
pub mod jobs;
pub mod maintenance;
pub mod rate_limiter;

View File

@@ -41,6 +41,28 @@ impl RateLimiter {
Err(remaining.as_secs().max(1))
}
}
/// Drop keys whose windows are empty after expiring old timestamps. Called from a
/// background task (see [`crate::services::maintenance`]) so that long-lived
/// processes don't accumulate one HashMap entry per IP that ever connected.
///
/// Uses a conservative 24h ceiling — anything older than that is gone regardless
/// of which endpoint's window it was tracked under (the longest window today is
/// 24h for export downloads). If we ever add longer windows, raise this constant.
pub fn prune(&self) {
let now = Instant::now();
let ceiling = Duration::from_secs(24 * 60 * 60);
let mut map = self.windows.lock().unwrap();
let before = map.len();
map.retain(|_, ts| {
ts.retain(|&t| now.duration_since(t) < ceiling);
!ts.is_empty()
});
let dropped = before.saturating_sub(map.len());
if dropped > 0 {
tracing::debug!("rate limiter pruned {dropped} idle keys");
}
}
}
/// Extract the client IP from X-Forwarded-For (Caddy sets this) or fall back

View File

@@ -11,6 +11,17 @@ pub struct SseEvent {
pub data: String,
}
impl SseEvent {
/// Standardised constructor. Prefer this over building the struct inline so the
/// event-type strings stay consistent across handlers.
pub fn new(event_type: impl Into<String>, data: impl Into<String>) -> Self {
Self {
event_type: event_type.into(),
data: data.into(),
}
}
}
#[derive(Clone)]
pub struct AppState {
pub pool: PgPool,
@@ -24,7 +35,7 @@ impl AppState {
pub fn new(pool: PgPool, config: AppConfig) -> Self {
let (sse_tx, _) = broadcast::channel(256);
let compression =
CompressionWorker::new(pool.clone(), config.media_path.clone(), 2);
CompressionWorker::new(pool.clone(), config.media_path.clone(), 2, sse_tx.clone());
Self {
pool,
config,