feat: implement upload pipeline with compression and SSE

Backend:
- POST /api/v1/upload: multipart file upload with caption + hashtags
  - Validates file size against DB config limits (image/video separate)
  - Checks user ban status and event upload lock
  - Saves original to disk under {media_path}/originals/{slug}/
  - Tracks user total_upload_bytes for quota enforcement
  - Extracts hashtags from caption text and explicit CSV field
  - Upserts hashtags and links them to uploads
- PATCH /api/v1/upload/{id}: edit caption and hashtags (owner only)
- DELETE /api/v1/upload/{id}: soft-delete (owner only)
- GET /api/v1/stream: SSE endpoint with 30s keepalive
  - Broadcasts new-upload events to all connected clients
  - Uses tokio broadcast channel for fan-out

Services:
- CompressionWorker: Tokio semaphore-bounded (concurrency=2) background processor
  - Images: resize to 800px wide JPEG preview via image crate
  - PNG originals: lossless compression via oxipng
  - Videos: ffmpeg thumbnail extraction (1 frame at 1s, scaled to 800px)
  - Updates upload record with preview_path/thumbnail_path on completion

Models:
- Upload with full CRUD (create, find, update caption, soft delete, set paths)
- Hashtag with upsert, link/unlink, extract_hashtags() text parser
- UploadDto for API serialization with like/comment counts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
fabi
2026-03-31 21:48:59 +02:00
parent 8b9d916265
commit 3f052a4f91
12 changed files with 635 additions and 3 deletions

3
backend/Cargo.lock generated
View File

@@ -894,6 +894,7 @@ dependencies = [
"bcrypt", "bcrypt",
"chrono", "chrono",
"dotenvy", "dotenvy",
"futures",
"image", "image",
"jsonwebtoken", "jsonwebtoken",
"minijinja", "minijinja",
@@ -905,6 +906,7 @@ dependencies = [
"sqlx", "sqlx",
"sysinfo", "sysinfo",
"tokio", "tokio",
"tokio-stream",
"tower", "tower",
"tower-http", "tower-http",
"tower_governor", "tower_governor",
@@ -3253,6 +3255,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View File

@@ -16,6 +16,8 @@ jsonwebtoken = "9"
bcrypt = "0.15" bcrypt = "0.15"
uuid = { version = "1", features = ["v4", "serde"] } uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
tokio-stream = { version = "0.1", features = ["sync"] }
futures = "0.3"
sha2 = "0.10" sha2 = "0.10"
rand = "0.9" rand = "0.9"
anyhow = "1" anyhow = "1"

View File

@@ -0,0 +1,2 @@
pub mod sse;
pub mod upload;

View File

@@ -0,0 +1,33 @@
use std::convert::Infallible;
use std::time::Duration;
use axum::extract::State;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use crate::auth::middleware::AuthUser;
use crate::error::AppError;
use crate::state::AppState;
pub async fn stream(
State(state): State<AppState>,
_auth: AuthUser,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
let rx = state.sse_tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|msg| {
match msg {
Ok(sse_event) => Some(Ok(Event::default()
.event(sse_event.event_type)
.data(sse_event.data))),
Err(_) => None, // Lagged — skip missed events
}
});
Ok(Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(30))
.text("ping"),
))
}

View File

@@ -0,0 +1,237 @@
use axum::extract::{Multipart, Path, State};
use axum::http::StatusCode;
use axum::Json;
use serde::Deserialize;
use uuid::Uuid;
use crate::auth::middleware::AuthUser;
use crate::error::AppError;
use crate::models::hashtag::{self, Hashtag};
use crate::models::upload::{Upload, UploadDto};
use crate::models::user::User;
use crate::state::AppState;
pub async fn upload(
State(state): State<AppState>,
auth: AuthUser,
mut multipart: Multipart,
) -> Result<(StatusCode, Json<UploadDto>), AppError> {
// Check if user is banned
let user = User::find_by_id(&state.pool, auth.user_id)
.await?
.ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?;
if user.is_banned {
return Err(AppError::Forbidden("Du bist gesperrt.".into()));
}
// Check if uploads are locked
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
.await?
.ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?;
if event.uploads_locked_at.is_some() {
return Err(AppError::Forbidden("Uploads sind gesperrt.".into()));
}
// Read config limits from DB
let max_image_mb: i64 = get_config_i64(&state.pool, "max_image_size_mb", 20).await;
let max_video_mb: i64 = get_config_i64(&state.pool, "max_video_size_mb", 500).await;
let mut file_data: Option<Vec<u8>> = None;
let mut file_name: Option<String> = None;
let mut content_type: Option<String> = None;
let mut caption: Option<String> = None;
let mut hashtags_csv: Option<String> = None;
while let Some(field) = multipart.next_field().await.map_err(|e| AppError::BadRequest(e.to_string()))? {
let name = field.name().unwrap_or_default().to_string();
match name.as_str() {
"file" => {
file_name = field.file_name().map(|s| s.to_string());
content_type = field.content_type().map(|s| s.to_string());
file_data = Some(
field.bytes().await
.map_err(|e| AppError::BadRequest(format!("Datei konnte nicht gelesen werden: {e}")))?
.to_vec(),
);
}
"caption" => {
caption = Some(
field.text().await
.map_err(|e| AppError::BadRequest(e.to_string()))?,
);
}
"hashtags" => {
hashtags_csv = Some(
field.text().await
.map_err(|e| AppError::BadRequest(e.to_string()))?,
);
}
_ => {}
}
}
let data = file_data.ok_or_else(|| AppError::BadRequest("Keine Datei hochgeladen.".into()))?;
let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string());
let size = data.len() as i64;
// Validate file size
let max_bytes = if mime.starts_with("video/") {
max_video_mb * 1024 * 1024
} else {
max_image_mb * 1024 * 1024
};
if size > max_bytes {
return Err(AppError::BadRequest(format!(
"Datei ist zu groß. Maximum: {} MB.",
max_bytes / (1024 * 1024)
)));
}
// Determine file extension
let ext = file_name
.as_deref()
.and_then(|n| n.rsplit('.').next())
.unwrap_or(if mime.starts_with("video/") { "mp4" } else { "jpg" });
let upload_id = Uuid::new_v4();
let event_slug = &state.config.event_slug;
let relative_path = format!("originals/{event_slug}/{upload_id}.{ext}");
let absolute_path = state.config.media_path.join(&relative_path);
// Ensure directory exists and write file
if let Some(parent) = absolute_path.parent() {
tokio::fs::create_dir_all(parent).await.map_err(|e| AppError::Internal(e.into()))?;
}
tokio::fs::write(&absolute_path, &data).await.map_err(|e| AppError::Internal(e.into()))?;
// Update user's total upload bytes
sqlx::query("UPDATE \"user\" SET total_upload_bytes = total_upload_bytes + $2 WHERE id = $1")
.bind(auth.user_id)
.bind(size)
.execute(&state.pool)
.await?;
// Insert upload record
let upload = Upload::create(
&state.pool,
auth.event_id,
auth.user_id,
&relative_path,
&mime,
size,
caption.as_deref(),
)
.await?;
// Process hashtags from caption and explicit CSV
let mut tags: Vec<String> = Vec::new();
if let Some(ref cap) = caption {
tags.extend(hashtag::extract_hashtags(cap));
}
if let Some(ref csv) = hashtags_csv {
for tag in csv.split(',') {
let t = tag.trim().trim_start_matches('#').to_lowercase();
if !t.is_empty() {
tags.push(t);
}
}
}
tags.sort();
tags.dedup();
for tag in &tags {
let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?;
Hashtag::link_to_upload(&state.pool, upload.id, h.id).await?;
}
// Spawn compression task
state
.compression
.process(upload.id, relative_path, mime.clone());
// Broadcast SSE event
let dto = UploadDto {
id: upload.id,
user_id: auth.user_id,
uploader_name: user.display_name,
preview_url: None,
thumbnail_url: None,
mime_type: mime,
caption,
hashtags: tags,
like_count: 0,
comment_count: 0,
liked_by_me: false,
created_at: upload.created_at,
};
let _ = state.sse_tx.send(crate::state::SseEvent {
event_type: "new-upload".to_string(),
data: serde_json::to_string(&dto).unwrap_or_default(),
});
Ok((StatusCode::CREATED, Json(dto)))
}
#[derive(Deserialize)]
pub struct EditUploadRequest {
pub caption: Option<String>,
pub hashtags: Option<Vec<String>>,
}
pub async fn edit_upload(
State(state): State<AppState>,
auth: AuthUser,
Path(upload_id): Path<Uuid>,
Json(body): Json<EditUploadRequest>,
) -> Result<StatusCode, AppError> {
let upload = Upload::find_by_id(&state.pool, upload_id)
.await?
.ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?;
if upload.user_id != auth.user_id {
return Err(AppError::Forbidden("Nur eigene Uploads bearbeiten.".into()));
}
if let Some(ref caption) = body.caption {
Upload::update_caption(&state.pool, upload_id, Some(caption)).await?;
}
if let Some(ref hashtags) = body.hashtags {
Hashtag::unlink_all_from_upload(&state.pool, upload_id).await?;
for tag in hashtags {
let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?;
Hashtag::link_to_upload(&state.pool, upload_id, h.id).await?;
}
}
Ok(StatusCode::OK)
}
pub async fn delete_upload(
State(state): State<AppState>,
auth: AuthUser,
Path(upload_id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
let upload = Upload::find_by_id(&state.pool, upload_id)
.await?
.ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?;
if upload.user_id != auth.user_id {
return Err(AppError::Forbidden("Nur eigene Uploads löschen.".into()));
}
Upload::soft_delete(&state.pool, upload_id).await?;
Ok(StatusCode::NO_CONTENT)
}
async fn get_config_i64(pool: &sqlx::PgPool, key: &str, default: i64) -> i64 {
let row: Option<(String,)> =
sqlx::query_as("SELECT value FROM config WHERE key = $1")
.bind(key)
.fetch_optional(pool)
.await
.unwrap_or(None);
row.and_then(|r| r.0.parse().ok()).unwrap_or(default)
}

View File

@@ -1,5 +1,5 @@
use anyhow::Result; use anyhow::Result;
use axum::routing::{delete, post}; use axum::routing::{delete, get, patch, post};
use axum::Router; use axum::Router;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -7,7 +7,9 @@ mod auth;
mod config; mod config;
mod db; mod db;
mod error; mod error;
mod handlers;
mod models; mod models;
mod services;
mod state; mod state;
use config::AppConfig; use config::AppConfig;
@@ -28,14 +30,26 @@ async fn main() -> Result<()> {
let pool = db::create_pool(&config.database_url).await?; let pool = db::create_pool(&config.database_url).await?;
let state = AppState::new(pool, config.clone()); let state = AppState::new(pool, config.clone());
// Ensure media directories exist
tokio::fs::create_dir_all(&config.media_path).await.ok();
let api = Router::new() let api = Router::new()
// Auth
.route("/api/v1/join", post(auth::handlers::join)) .route("/api/v1/join", post(auth::handlers::join))
.route("/api/v1/recover", post(auth::handlers::recover)) .route("/api/v1/recover", post(auth::handlers::recover))
.route("/api/v1/admin/login", post(auth::handlers::admin_login)) .route("/api/v1/admin/login", post(auth::handlers::admin_login))
.route("/api/v1/session", delete(auth::handlers::logout)); .route("/api/v1/session", delete(auth::handlers::logout))
// Upload
.route("/api/v1/upload", post(handlers::upload::upload))
.route(
"/api/v1/upload/{id}",
patch(handlers::upload::edit_upload).delete(handlers::upload::delete_upload),
)
// SSE
.route("/api/v1/stream", get(handlers::sse::stream));
let router = Router::new() let router = Router::new()
.route("/health", axum::routing::get(|| async { "ok" })) .route("/health", get(|| async { "ok" }))
.merge(api) .merge(api)
.with_state(state); .with_state(state);

View File

@@ -0,0 +1,77 @@
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, sqlx::FromRow)]
pub struct Hashtag {
pub id: Uuid,
pub event_id: Uuid,
pub tag: String,
}
impl Hashtag {
/// Upsert a hashtag (insert if not exists, return existing if it does).
pub async fn upsert(pool: &PgPool, event_id: Uuid, tag: &str) -> Result<Self, sqlx::Error> {
let normalized = tag.trim().trim_start_matches('#').to_lowercase();
sqlx::query_as::<_, Self>(
"INSERT INTO hashtag (event_id, tag) VALUES ($1, $2)
ON CONFLICT (event_id, tag) DO UPDATE SET tag = EXCLUDED.tag
RETURNING *",
)
.bind(event_id)
.bind(&normalized)
.fetch_one(pool)
.await
}
pub async fn link_to_upload(
pool: &PgPool,
upload_id: Uuid,
hashtag_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query(
"INSERT INTO upload_hashtag (upload_id, hashtag_id) VALUES ($1, $2)
ON CONFLICT DO NOTHING",
)
.bind(upload_id)
.bind(hashtag_id)
.execute(pool)
.await?;
Ok(())
}
pub async fn unlink_all_from_upload(
pool: &PgPool,
upload_id: Uuid,
) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM upload_hashtag WHERE upload_id = $1")
.bind(upload_id)
.execute(pool)
.await?;
Ok(())
}
pub async fn tags_for_upload(
pool: &PgPool,
upload_id: Uuid,
) -> Result<Vec<String>, sqlx::Error> {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT h.tag FROM hashtag h
JOIN upload_hashtag uh ON uh.hashtag_id = h.id
WHERE uh.upload_id = $1
ORDER BY h.tag",
)
.bind(upload_id)
.fetch_all(pool)
.await?;
Ok(rows.into_iter().map(|r| r.0).collect())
}
}
/// Extract #hashtags from text (caption or body).
pub fn extract_hashtags(text: &str) -> Vec<String> {
text.split_whitespace()
.filter(|w| w.starts_with('#') && w.len() > 1)
.map(|w| w.trim_start_matches('#').to_lowercase())
.filter(|t| !t.is_empty())
.collect()
}

View File

@@ -1,3 +1,5 @@
pub mod event; pub mod event;
pub mod hashtag;
pub mod session; pub mod session;
pub mod upload;
pub mod user; pub mod user;

View File

@@ -0,0 +1,117 @@
use chrono::{DateTime, Utc};
use serde::Serialize;
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Debug, sqlx::FromRow)]
pub struct Upload {
pub id: Uuid,
pub event_id: Uuid,
pub user_id: Uuid,
pub original_path: String,
pub preview_path: Option<String>,
pub thumbnail_path: Option<String>,
pub mime_type: String,
pub original_size_bytes: i64,
pub caption: Option<String>,
pub created_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize)]
pub struct UploadDto {
pub id: Uuid,
pub user_id: Uuid,
pub uploader_name: String,
pub preview_url: Option<String>,
pub thumbnail_url: Option<String>,
pub mime_type: String,
pub caption: Option<String>,
pub hashtags: Vec<String>,
pub like_count: i64,
pub comment_count: i64,
pub liked_by_me: bool,
pub created_at: DateTime<Utc>,
}
impl Upload {
pub async fn create(
pool: &PgPool,
event_id: Uuid,
user_id: Uuid,
original_path: &str,
mime_type: &str,
original_size_bytes: i64,
caption: Option<&str>,
) -> Result<Self, sqlx::Error> {
sqlx::query_as::<_, Self>(
"INSERT INTO upload (event_id, user_id, original_path, mime_type, original_size_bytes, caption)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *",
)
.bind(event_id)
.bind(user_id)
.bind(original_path)
.bind(mime_type)
.bind(original_size_bytes)
.bind(caption)
.fetch_one(pool)
.await
}
pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM upload WHERE id = $1 AND deleted_at IS NULL",
)
.bind(id)
.fetch_optional(pool)
.await
}
pub async fn set_preview_path(
pool: &PgPool,
id: Uuid,
preview_path: &str,
) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET preview_path = $2 WHERE id = $1")
.bind(id)
.bind(preview_path)
.execute(pool)
.await?;
Ok(())
}
pub async fn set_thumbnail_path(
pool: &PgPool,
id: Uuid,
thumbnail_path: &str,
) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET thumbnail_path = $2 WHERE id = $1")
.bind(id)
.bind(thumbnail_path)
.execute(pool)
.await?;
Ok(())
}
pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET deleted_at = NOW() WHERE id = $1")
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn update_caption(
pool: &PgPool,
id: Uuid,
caption: Option<&str>,
) -> Result<(), sqlx::Error> {
sqlx::query("UPDATE upload SET caption = $2 WHERE id = $1")
.bind(id)
.bind(caption)
.execute(pool)
.await?;
Ok(())
}
}

View File

@@ -0,0 +1,139 @@
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{Context, Result};
use sqlx::PgPool;
use tokio::sync::Semaphore;
use uuid::Uuid;
use crate::models::upload::Upload;
#[derive(Clone)]
pub struct CompressionWorker {
semaphore: Arc<Semaphore>,
pool: PgPool,
media_path: PathBuf,
}
impl CompressionWorker {
pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(concurrency)),
pool,
media_path,
}
}
/// Spawn a background task to process an uploaded file.
pub fn process(&self, upload_id: Uuid, original_path: String, mime_type: String) {
let worker = self.clone();
tokio::spawn(async move {
let _permit = worker.semaphore.acquire().await;
if let Err(e) = worker.do_process(upload_id, &original_path, &mime_type).await {
tracing::error!("compression failed for upload {upload_id}: {e:#}");
}
});
}
async fn do_process(
&self,
upload_id: Uuid,
original_path: &str,
mime_type: &str,
) -> Result<()> {
let original = self.media_path.join(original_path);
if mime_type.starts_with("image/") {
let preview_rel = self.generate_image_preview(upload_id, &original, mime_type).await?;
Upload::set_preview_path(&self.pool, upload_id, &preview_rel).await?;
tracing::info!("preview generated for upload {upload_id}");
} else if mime_type.starts_with("video/") {
let thumb_rel = self.generate_video_thumbnail(upload_id, &original).await?;
Upload::set_thumbnail_path(&self.pool, upload_id, &thumb_rel).await?;
tracing::info!("thumbnail generated for upload {upload_id}");
}
Ok(())
}
async fn generate_image_preview(
&self,
upload_id: Uuid,
original: &Path,
mime_type: &str,
) -> Result<String> {
let previews_dir = self.media_path.join("previews");
tokio::fs::create_dir_all(&previews_dir).await?;
let preview_filename = format!("{upload_id}.jpg");
let preview_path = previews_dir.join(&preview_filename);
let original = original.to_path_buf();
let preview_path_clone = preview_path.clone();
let mime_owned = mime_type.to_string();
// Run blocking image operations in a spawn_blocking task
tokio::task::spawn_blocking(move || -> Result<()> {
let img = image::open(&original)
.context("failed to open image")?;
// Resize to max 800px wide, preserving aspect ratio
let preview = img.resize(800, 800, image::imageops::FilterType::Lanczos3);
preview.save_with_format(&preview_path_clone, image::ImageFormat::Jpeg)
.context("failed to save preview")?;
// If the original is PNG, try lossless compression in-place
if mime_owned == "image/png" {
let opts = oxipng::Options::from_preset(2);
let _ = oxipng::optimize(
&oxipng::InFile::Path(original),
&oxipng::OutFile::Path {
path: None,
preserve_attrs: true,
},
&opts,
);
}
Ok(())
})
.await??;
Ok(format!("previews/{preview_filename}"))
}
async fn generate_video_thumbnail(
&self,
upload_id: Uuid,
original: &Path,
) -> Result<String> {
let thumbs_dir = self.media_path.join("thumbnails");
tokio::fs::create_dir_all(&thumbs_dir).await?;
let thumb_filename = format!("{upload_id}.jpg");
let thumb_path = thumbs_dir.join(&thumb_filename);
let output = tokio::process::Command::new("ffmpeg")
.args([
"-i",
original.to_str().unwrap_or_default(),
"-vframes",
"1",
"-ss",
"00:00:01",
"-vf",
"scale=800:-1",
"-y",
thumb_path.to_str().unwrap_or_default(),
])
.output()
.await
.context("failed to run ffmpeg")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("ffmpeg failed: {stderr}");
}
Ok(format!("thumbnails/{thumb_filename}"))
}
}

View File

@@ -0,0 +1 @@
pub mod compression;

View File

@@ -2,6 +2,7 @@ use sqlx::PgPool;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::services::compression::CompressionWorker;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SseEvent { pub struct SseEvent {
@@ -14,15 +15,19 @@ pub struct AppState {
pub pool: PgPool, pub pool: PgPool,
pub config: AppConfig, pub config: AppConfig,
pub sse_tx: broadcast::Sender<SseEvent>, pub sse_tx: broadcast::Sender<SseEvent>,
pub compression: CompressionWorker,
} }
impl AppState { impl AppState {
pub fn new(pool: PgPool, config: AppConfig) -> Self { pub fn new(pool: PgPool, config: AppConfig) -> Self {
let (sse_tx, _) = broadcast::channel(256); let (sse_tx, _) = broadcast::channel(256);
let compression =
CompressionWorker::new(pool.clone(), config.media_path.clone(), 2);
Self { Self {
pool, pool,
config, config,
sse_tx, sse_tx,
compression,
} }
} }
} }