diff --git a/backend/Cargo.lock b/backend/Cargo.lock index cfc01d9..ffb1622 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -894,6 +894,7 @@ dependencies = [ "bcrypt", "chrono", "dotenvy", + "futures", "image", "jsonwebtoken", "minijinja", @@ -905,6 +906,7 @@ dependencies = [ "sqlx", "sysinfo", "tokio", + "tokio-stream", "tower", "tower-http", "tower_governor", @@ -3253,6 +3255,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 800a93b..d864893 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -16,6 +16,8 @@ jsonwebtoken = "9" bcrypt = "0.15" uuid = { version = "1", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } +tokio-stream = { version = "0.1", features = ["sync"] } +futures = "0.3" sha2 = "0.10" rand = "0.9" anyhow = "1" diff --git a/backend/src/handlers/mod.rs b/backend/src/handlers/mod.rs new file mode 100644 index 0000000..3a4f17c --- /dev/null +++ b/backend/src/handlers/mod.rs @@ -0,0 +1,2 @@ +pub mod sse; +pub mod upload; diff --git a/backend/src/handlers/sse.rs b/backend/src/handlers/sse.rs new file mode 100644 index 0000000..783e5cb --- /dev/null +++ b/backend/src/handlers/sse.rs @@ -0,0 +1,33 @@ +use std::convert::Infallible; +use std::time::Duration; + +use axum::extract::State; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures::stream::Stream; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; + +use crate::auth::middleware::AuthUser; +use crate::error::AppError; +use crate::state::AppState; + +pub async fn stream( + State(state): State, + _auth: AuthUser, +) -> Result>>, AppError> { + let rx = state.sse_tx.subscribe(); + let stream = BroadcastStream::new(rx).filter_map(|msg| { + match msg { + Ok(sse_event) => Some(Ok(Event::default() + .event(sse_event.event_type) + .data(sse_event.data))), + Err(_) => None, // Lagged — skip missed events + } + }); + + Ok(Sse::new(stream).keep_alive( + KeepAlive::new() + .interval(Duration::from_secs(30)) + .text("ping"), + )) +} diff --git a/backend/src/handlers/upload.rs b/backend/src/handlers/upload.rs new file mode 100644 index 0000000..0933e5a --- /dev/null +++ b/backend/src/handlers/upload.rs @@ -0,0 +1,237 @@ +use axum::extract::{Multipart, Path, State}; +use axum::http::StatusCode; +use axum::Json; +use serde::Deserialize; +use uuid::Uuid; + +use crate::auth::middleware::AuthUser; +use crate::error::AppError; +use crate::models::hashtag::{self, Hashtag}; +use crate::models::upload::{Upload, UploadDto}; +use crate::models::user::User; +use crate::state::AppState; + +pub async fn upload( + State(state): State, + auth: AuthUser, + mut multipart: Multipart, +) -> Result<(StatusCode, Json), AppError> { + // Check if user is banned + let user = User::find_by_id(&state.pool, auth.user_id) + .await? + .ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?; + if user.is_banned { + return Err(AppError::Forbidden("Du bist gesperrt.".into())); + } + + // Check if uploads are locked + let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug) + .await? + .ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?; + if event.uploads_locked_at.is_some() { + return Err(AppError::Forbidden("Uploads sind gesperrt.".into())); + } + + // Read config limits from DB + let max_image_mb: i64 = get_config_i64(&state.pool, "max_image_size_mb", 20).await; + let max_video_mb: i64 = get_config_i64(&state.pool, "max_video_size_mb", 500).await; + + let mut file_data: Option> = None; + let mut file_name: Option = None; + let mut content_type: Option = None; + let mut caption: Option = None; + let mut hashtags_csv: Option = None; + + while let Some(field) = multipart.next_field().await.map_err(|e| AppError::BadRequest(e.to_string()))? { + let name = field.name().unwrap_or_default().to_string(); + match name.as_str() { + "file" => { + file_name = field.file_name().map(|s| s.to_string()); + content_type = field.content_type().map(|s| s.to_string()); + file_data = Some( + field.bytes().await + .map_err(|e| AppError::BadRequest(format!("Datei konnte nicht gelesen werden: {e}")))? + .to_vec(), + ); + } + "caption" => { + caption = Some( + field.text().await + .map_err(|e| AppError::BadRequest(e.to_string()))?, + ); + } + "hashtags" => { + hashtags_csv = Some( + field.text().await + .map_err(|e| AppError::BadRequest(e.to_string()))?, + ); + } + _ => {} + } + } + + let data = file_data.ok_or_else(|| AppError::BadRequest("Keine Datei hochgeladen.".into()))?; + let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); + let size = data.len() as i64; + + // Validate file size + let max_bytes = if mime.starts_with("video/") { + max_video_mb * 1024 * 1024 + } else { + max_image_mb * 1024 * 1024 + }; + if size > max_bytes { + return Err(AppError::BadRequest(format!( + "Datei ist zu groß. Maximum: {} MB.", + max_bytes / (1024 * 1024) + ))); + } + + // Determine file extension + let ext = file_name + .as_deref() + .and_then(|n| n.rsplit('.').next()) + .unwrap_or(if mime.starts_with("video/") { "mp4" } else { "jpg" }); + + let upload_id = Uuid::new_v4(); + let event_slug = &state.config.event_slug; + let relative_path = format!("originals/{event_slug}/{upload_id}.{ext}"); + let absolute_path = state.config.media_path.join(&relative_path); + + // Ensure directory exists and write file + if let Some(parent) = absolute_path.parent() { + tokio::fs::create_dir_all(parent).await.map_err(|e| AppError::Internal(e.into()))?; + } + tokio::fs::write(&absolute_path, &data).await.map_err(|e| AppError::Internal(e.into()))?; + + // Update user's total upload bytes + sqlx::query("UPDATE \"user\" SET total_upload_bytes = total_upload_bytes + $2 WHERE id = $1") + .bind(auth.user_id) + .bind(size) + .execute(&state.pool) + .await?; + + // Insert upload record + let upload = Upload::create( + &state.pool, + auth.event_id, + auth.user_id, + &relative_path, + &mime, + size, + caption.as_deref(), + ) + .await?; + + // Process hashtags from caption and explicit CSV + let mut tags: Vec = Vec::new(); + if let Some(ref cap) = caption { + tags.extend(hashtag::extract_hashtags(cap)); + } + if let Some(ref csv) = hashtags_csv { + for tag in csv.split(',') { + let t = tag.trim().trim_start_matches('#').to_lowercase(); + if !t.is_empty() { + tags.push(t); + } + } + } + tags.sort(); + tags.dedup(); + + for tag in &tags { + let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?; + Hashtag::link_to_upload(&state.pool, upload.id, h.id).await?; + } + + // Spawn compression task + state + .compression + .process(upload.id, relative_path, mime.clone()); + + // Broadcast SSE event + let dto = UploadDto { + id: upload.id, + user_id: auth.user_id, + uploader_name: user.display_name, + preview_url: None, + thumbnail_url: None, + mime_type: mime, + caption, + hashtags: tags, + like_count: 0, + comment_count: 0, + liked_by_me: false, + created_at: upload.created_at, + }; + + let _ = state.sse_tx.send(crate::state::SseEvent { + event_type: "new-upload".to_string(), + data: serde_json::to_string(&dto).unwrap_or_default(), + }); + + Ok((StatusCode::CREATED, Json(dto))) +} + +#[derive(Deserialize)] +pub struct EditUploadRequest { + pub caption: Option, + pub hashtags: Option>, +} + +pub async fn edit_upload( + State(state): State, + auth: AuthUser, + Path(upload_id): Path, + Json(body): Json, +) -> Result { + let upload = Upload::find_by_id(&state.pool, upload_id) + .await? + .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; + + if upload.user_id != auth.user_id { + return Err(AppError::Forbidden("Nur eigene Uploads bearbeiten.".into())); + } + + if let Some(ref caption) = body.caption { + Upload::update_caption(&state.pool, upload_id, Some(caption)).await?; + } + + if let Some(ref hashtags) = body.hashtags { + Hashtag::unlink_all_from_upload(&state.pool, upload_id).await?; + for tag in hashtags { + let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?; + Hashtag::link_to_upload(&state.pool, upload_id, h.id).await?; + } + } + + Ok(StatusCode::OK) +} + +pub async fn delete_upload( + State(state): State, + auth: AuthUser, + Path(upload_id): Path, +) -> Result { + let upload = Upload::find_by_id(&state.pool, upload_id) + .await? + .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; + + if upload.user_id != auth.user_id { + return Err(AppError::Forbidden("Nur eigene Uploads löschen.".into())); + } + + Upload::soft_delete(&state.pool, upload_id).await?; + + Ok(StatusCode::NO_CONTENT) +} + +async fn get_config_i64(pool: &sqlx::PgPool, key: &str, default: i64) -> i64 { + let row: Option<(String,)> = + sqlx::query_as("SELECT value FROM config WHERE key = $1") + .bind(key) + .fetch_optional(pool) + .await + .unwrap_or(None); + row.and_then(|r| r.0.parse().ok()).unwrap_or(default) +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 14f9f58..e2ffce2 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use axum::routing::{delete, post}; +use axum::routing::{delete, get, patch, post}; use axum::Router; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -7,7 +7,9 @@ mod auth; mod config; mod db; mod error; +mod handlers; mod models; +mod services; mod state; use config::AppConfig; @@ -28,14 +30,26 @@ async fn main() -> Result<()> { let pool = db::create_pool(&config.database_url).await?; let state = AppState::new(pool, config.clone()); + // Ensure media directories exist + tokio::fs::create_dir_all(&config.media_path).await.ok(); + let api = Router::new() + // Auth .route("/api/v1/join", post(auth::handlers::join)) .route("/api/v1/recover", post(auth::handlers::recover)) .route("/api/v1/admin/login", post(auth::handlers::admin_login)) - .route("/api/v1/session", delete(auth::handlers::logout)); + .route("/api/v1/session", delete(auth::handlers::logout)) + // Upload + .route("/api/v1/upload", post(handlers::upload::upload)) + .route( + "/api/v1/upload/{id}", + patch(handlers::upload::edit_upload).delete(handlers::upload::delete_upload), + ) + // SSE + .route("/api/v1/stream", get(handlers::sse::stream)); let router = Router::new() - .route("/health", axum::routing::get(|| async { "ok" })) + .route("/health", get(|| async { "ok" })) .merge(api) .with_state(state); diff --git a/backend/src/models/hashtag.rs b/backend/src/models/hashtag.rs new file mode 100644 index 0000000..8eaecdf --- /dev/null +++ b/backend/src/models/hashtag.rs @@ -0,0 +1,77 @@ +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, sqlx::FromRow)] +pub struct Hashtag { + pub id: Uuid, + pub event_id: Uuid, + pub tag: String, +} + +impl Hashtag { + /// Upsert a hashtag (insert if not exists, return existing if it does). + pub async fn upsert(pool: &PgPool, event_id: Uuid, tag: &str) -> Result { + let normalized = tag.trim().trim_start_matches('#').to_lowercase(); + sqlx::query_as::<_, Self>( + "INSERT INTO hashtag (event_id, tag) VALUES ($1, $2) + ON CONFLICT (event_id, tag) DO UPDATE SET tag = EXCLUDED.tag + RETURNING *", + ) + .bind(event_id) + .bind(&normalized) + .fetch_one(pool) + .await + } + + pub async fn link_to_upload( + pool: &PgPool, + upload_id: Uuid, + hashtag_id: Uuid, + ) -> Result<(), sqlx::Error> { + sqlx::query( + "INSERT INTO upload_hashtag (upload_id, hashtag_id) VALUES ($1, $2) + ON CONFLICT DO NOTHING", + ) + .bind(upload_id) + .bind(hashtag_id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn unlink_all_from_upload( + pool: &PgPool, + upload_id: Uuid, + ) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM upload_hashtag WHERE upload_id = $1") + .bind(upload_id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn tags_for_upload( + pool: &PgPool, + upload_id: Uuid, + ) -> Result, sqlx::Error> { + let rows: Vec<(String,)> = sqlx::query_as( + "SELECT h.tag FROM hashtag h + JOIN upload_hashtag uh ON uh.hashtag_id = h.id + WHERE uh.upload_id = $1 + ORDER BY h.tag", + ) + .bind(upload_id) + .fetch_all(pool) + .await?; + Ok(rows.into_iter().map(|r| r.0).collect()) + } +} + +/// Extract #hashtags from text (caption or body). +pub fn extract_hashtags(text: &str) -> Vec { + text.split_whitespace() + .filter(|w| w.starts_with('#') && w.len() > 1) + .map(|w| w.trim_start_matches('#').to_lowercase()) + .filter(|t| !t.is_empty()) + .collect() +} diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index 881e438..f51d106 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -1,3 +1,5 @@ pub mod event; +pub mod hashtag; pub mod session; +pub mod upload; pub mod user; diff --git a/backend/src/models/upload.rs b/backend/src/models/upload.rs new file mode 100644 index 0000000..0dca7a5 --- /dev/null +++ b/backend/src/models/upload.rs @@ -0,0 +1,117 @@ +use chrono::{DateTime, Utc}; +use serde::Serialize; +use sqlx::PgPool; +use uuid::Uuid; + +#[derive(Debug, sqlx::FromRow)] +pub struct Upload { + pub id: Uuid, + pub event_id: Uuid, + pub user_id: Uuid, + pub original_path: String, + pub preview_path: Option, + pub thumbnail_path: Option, + pub mime_type: String, + pub original_size_bytes: i64, + pub caption: Option, + pub created_at: DateTime, + pub deleted_at: Option>, +} + +#[derive(Debug, Serialize)] +pub struct UploadDto { + pub id: Uuid, + pub user_id: Uuid, + pub uploader_name: String, + pub preview_url: Option, + pub thumbnail_url: Option, + pub mime_type: String, + pub caption: Option, + pub hashtags: Vec, + pub like_count: i64, + pub comment_count: i64, + pub liked_by_me: bool, + pub created_at: DateTime, +} + +impl Upload { + pub async fn create( + pool: &PgPool, + event_id: Uuid, + user_id: Uuid, + original_path: &str, + mime_type: &str, + original_size_bytes: i64, + caption: Option<&str>, + ) -> Result { + sqlx::query_as::<_, Self>( + "INSERT INTO upload (event_id, user_id, original_path, mime_type, original_size_bytes, caption) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *", + ) + .bind(event_id) + .bind(user_id) + .bind(original_path) + .bind(mime_type) + .bind(original_size_bytes) + .bind(caption) + .fetch_one(pool) + .await + } + + pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( + "SELECT * FROM upload WHERE id = $1 AND deleted_at IS NULL", + ) + .bind(id) + .fetch_optional(pool) + .await + } + + pub async fn set_preview_path( + pool: &PgPool, + id: Uuid, + preview_path: &str, + ) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE upload SET preview_path = $2 WHERE id = $1") + .bind(id) + .bind(preview_path) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn set_thumbnail_path( + pool: &PgPool, + id: Uuid, + thumbnail_path: &str, + ) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE upload SET thumbnail_path = $2 WHERE id = $1") + .bind(id) + .bind(thumbnail_path) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE upload SET deleted_at = NOW() WHERE id = $1") + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn update_caption( + pool: &PgPool, + id: Uuid, + caption: Option<&str>, + ) -> Result<(), sqlx::Error> { + sqlx::query("UPDATE upload SET caption = $2 WHERE id = $1") + .bind(id) + .bind(caption) + .execute(pool) + .await?; + Ok(()) + } +} diff --git a/backend/src/services/compression.rs b/backend/src/services/compression.rs new file mode 100644 index 0000000..f00d395 --- /dev/null +++ b/backend/src/services/compression.rs @@ -0,0 +1,139 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use sqlx::PgPool; +use tokio::sync::Semaphore; +use uuid::Uuid; + +use crate::models::upload::Upload; + +#[derive(Clone)] +pub struct CompressionWorker { + semaphore: Arc, + pool: PgPool, + media_path: PathBuf, +} + +impl CompressionWorker { + pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize) -> Self { + Self { + semaphore: Arc::new(Semaphore::new(concurrency)), + pool, + media_path, + } + } + + /// Spawn a background task to process an uploaded file. + pub fn process(&self, upload_id: Uuid, original_path: String, mime_type: String) { + let worker = self.clone(); + tokio::spawn(async move { + let _permit = worker.semaphore.acquire().await; + if let Err(e) = worker.do_process(upload_id, &original_path, &mime_type).await { + tracing::error!("compression failed for upload {upload_id}: {e:#}"); + } + }); + } + + async fn do_process( + &self, + upload_id: Uuid, + original_path: &str, + mime_type: &str, + ) -> Result<()> { + let original = self.media_path.join(original_path); + + if mime_type.starts_with("image/") { + let preview_rel = self.generate_image_preview(upload_id, &original, mime_type).await?; + Upload::set_preview_path(&self.pool, upload_id, &preview_rel).await?; + tracing::info!("preview generated for upload {upload_id}"); + } else if mime_type.starts_with("video/") { + let thumb_rel = self.generate_video_thumbnail(upload_id, &original).await?; + Upload::set_thumbnail_path(&self.pool, upload_id, &thumb_rel).await?; + tracing::info!("thumbnail generated for upload {upload_id}"); + } + + Ok(()) + } + + async fn generate_image_preview( + &self, + upload_id: Uuid, + original: &Path, + mime_type: &str, + ) -> Result { + let previews_dir = self.media_path.join("previews"); + tokio::fs::create_dir_all(&previews_dir).await?; + + let preview_filename = format!("{upload_id}.jpg"); + let preview_path = previews_dir.join(&preview_filename); + let original = original.to_path_buf(); + let preview_path_clone = preview_path.clone(); + let mime_owned = mime_type.to_string(); + + // Run blocking image operations in a spawn_blocking task + tokio::task::spawn_blocking(move || -> Result<()> { + let img = image::open(&original) + .context("failed to open image")?; + + // Resize to max 800px wide, preserving aspect ratio + let preview = img.resize(800, 800, image::imageops::FilterType::Lanczos3); + preview.save_with_format(&preview_path_clone, image::ImageFormat::Jpeg) + .context("failed to save preview")?; + + // If the original is PNG, try lossless compression in-place + if mime_owned == "image/png" { + let opts = oxipng::Options::from_preset(2); + let _ = oxipng::optimize( + &oxipng::InFile::Path(original), + &oxipng::OutFile::Path { + path: None, + preserve_attrs: true, + }, + &opts, + ); + } + + Ok(()) + }) + .await??; + + Ok(format!("previews/{preview_filename}")) + } + + async fn generate_video_thumbnail( + &self, + upload_id: Uuid, + original: &Path, + ) -> Result { + let thumbs_dir = self.media_path.join("thumbnails"); + tokio::fs::create_dir_all(&thumbs_dir).await?; + + let thumb_filename = format!("{upload_id}.jpg"); + let thumb_path = thumbs_dir.join(&thumb_filename); + + let output = tokio::process::Command::new("ffmpeg") + .args([ + "-i", + original.to_str().unwrap_or_default(), + "-vframes", + "1", + "-ss", + "00:00:01", + "-vf", + "scale=800:-1", + "-y", + thumb_path.to_str().unwrap_or_default(), + ]) + .output() + .await + .context("failed to run ffmpeg")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + anyhow::bail!("ffmpeg failed: {stderr}"); + } + + Ok(format!("thumbnails/{thumb_filename}")) + } +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs new file mode 100644 index 0000000..376ef62 --- /dev/null +++ b/backend/src/services/mod.rs @@ -0,0 +1 @@ +pub mod compression; diff --git a/backend/src/state.rs b/backend/src/state.rs index 5e0556c..9d9b939 100644 --- a/backend/src/state.rs +++ b/backend/src/state.rs @@ -2,6 +2,7 @@ use sqlx::PgPool; use tokio::sync::broadcast; use crate::config::AppConfig; +use crate::services::compression::CompressionWorker; #[derive(Clone, Debug)] pub struct SseEvent { @@ -14,15 +15,19 @@ pub struct AppState { pub pool: PgPool, pub config: AppConfig, pub sse_tx: broadcast::Sender, + pub compression: CompressionWorker, } impl AppState { pub fn new(pool: PgPool, config: AppConfig) -> Self { let (sse_tx, _) = broadcast::channel(256); + let compression = + CompressionWorker::new(pool.clone(), config.media_path.clone(), 2); Self { pool, config, sse_tx, + compression, } } }