use std::time::Duration; use axum::extract::{Multipart, Path, State}; use axum::http::StatusCode; use axum::Json; use serde::Deserialize; use uuid::Uuid; use crate::auth::middleware::AuthUser; use crate::error::AppError; use crate::models::hashtag::{self, Hashtag}; use crate::models::upload::{Upload, UploadDto}; use crate::models::user::User; use crate::services::config; use crate::state::AppState; const MAX_CAPTION_LENGTH: usize = 2000; pub async fn upload( State(state): State, auth: AuthUser, mut multipart: Multipart, ) -> Result<(StatusCode, Json), AppError> { // Rate limit: N uploads per hour per user. Gated by master + per-endpoint toggles. let rate_limits_on = config::get_bool(&state.pool, "rate_limits_enabled", true).await; let upload_rate_on = config::get_bool(&state.pool, "upload_rate_enabled", true).await; if rate_limits_on && upload_rate_on { let upload_rate = config::get_i64(&state.pool, "upload_rate_per_hour", 10).await as usize; if let Err(retry_after_secs) = state.rate_limiter.check_with_retry( format!("upload:{}", auth.user_id), upload_rate, Duration::from_secs(3600), ) { drain_multipart(multipart).await; return Err(AppError::TooManyRequests( "Du hast dein Upload-Limit für diese Stunde erreicht.".into(), Some(retry_after_secs), )); } } // Check if user is banned let user = User::find_by_id(&state.pool, auth.user_id) .await? .ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?; if user.is_banned { drain_multipart(multipart).await; return Err(AppError::Forbidden("Du bist gesperrt.".into())); } // Check if uploads are locked let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug) .await? .ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?; if event.uploads_locked_at.is_some() { drain_multipart(multipart).await; return Err(AppError::Forbidden("Uploads sind gesperrt.".into())); } // Read config limits from DB let max_image_mb: i64 = config::get_i64(&state.pool, "max_image_size_mb", 20).await; let max_video_mb: i64 = config::get_i64(&state.pool, "max_video_size_mb", 500).await; let mut file_data: Option> = None; let mut file_name: Option = None; let mut content_type: Option = None; let mut caption: Option = None; let mut hashtags_csv: Option = None; while let Some(field) = multipart.next_field().await.map_err(|e| AppError::BadRequest(e.to_string()))? { let name = field.name().unwrap_or_default().to_string(); match name.as_str() { "file" => { file_name = field.file_name().map(|s| s.to_string()); content_type = field.content_type().map(|s| s.to_string()); file_data = Some( field.bytes().await .map_err(|e| AppError::BadRequest(format!("Datei konnte nicht gelesen werden: {e}")))? .to_vec(), ); } "caption" => { caption = Some( field.text().await .map_err(|e| AppError::BadRequest(e.to_string()))?, ); } "hashtags" => { hashtags_csv = Some( field.text().await .map_err(|e| AppError::BadRequest(e.to_string()))?, ); } _ => {} } } let data = file_data.ok_or_else(|| AppError::BadRequest("Keine Datei hochgeladen.".into()))?; let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); let size = data.len() as i64; // Validate caption length if let Some(ref cap) = caption { if cap.len() > MAX_CAPTION_LENGTH { return Err(AppError::BadRequest(format!( "Beschreibung ist zu lang. Maximum: {} Zeichen.", MAX_CAPTION_LENGTH ))); } } // Validate file MIME type using magic bytes let detected_mime = infer::get(&data); if let Some(detected) = detected_mime { let detected_type = detected.mime_type(); // Ensure detected type is compatible with declared MIME type let declared_category = mime.split('/').next().unwrap_or(""); let detected_category = detected_type.split('/').next().unwrap_or(""); // Only reject if categories don't match (e.g., image vs video) if declared_category != "application" && declared_category != detected_category { return Err(AppError::BadRequest(format!( "Dateiinhalt entspricht nicht dem deklarierten Typ. Erwartet: {}, erkannt: {}", mime, detected_type ))); } } // Validate file size let max_bytes = if mime.starts_with("video/") { max_video_mb * 1024 * 1024 } else { max_image_mb * 1024 * 1024 }; if size > max_bytes { return Err(AppError::BadRequest(format!( "Datei ist zu groß. Maximum: {} MB.", max_bytes / (1024 * 1024) ))); } // Per-user storage quota — dynamic formula based on available disk space and the // number of active uploaders. Gated by master + per-area toggles so the admin can // disable it on trusted instances. let quota_on = config::get_bool(&state.pool, "quota_enabled", true).await; let storage_quota_on = config::get_bool(&state.pool, "storage_quota_enabled", true).await; if quota_on && storage_quota_on { let estimate = compute_storage_quota(&state).await; if let Some(limit) = estimate.limit_bytes { let prospective_total = user.total_upload_bytes.saturating_add(size); if prospective_total > limit { return Err(AppError::TooManyRequests( "Du hast dein Upload-Limit für dieses Event erreicht.".into(), None, )); } } } // Determine file extension let ext = file_name .as_deref() .and_then(|n| n.rsplit('.').next()) .unwrap_or(if mime.starts_with("video/") { "mp4" } else { "jpg" }); let upload_id = Uuid::new_v4(); let event_slug = &state.config.event_slug; let relative_path = format!("originals/{event_slug}/{upload_id}.{ext}"); let absolute_path = state.config.media_path.join(&relative_path); // Ensure directory exists and write file if let Some(parent) = absolute_path.parent() { tokio::fs::create_dir_all(parent).await.map_err(|e| AppError::Internal(e.into()))?; } tokio::fs::write(&absolute_path, &data).await.map_err(|e| AppError::Internal(e.into()))?; // Update user's total upload bytes sqlx::query("UPDATE \"user\" SET total_upload_bytes = total_upload_bytes + $2 WHERE id = $1") .bind(auth.user_id) .bind(size) .execute(&state.pool) .await?; // Insert upload record let upload = Upload::create( &state.pool, auth.event_id, auth.user_id, &relative_path, &mime, size, caption.as_deref(), ) .await?; // Process hashtags from caption and explicit CSV let mut tags: Vec = Vec::new(); if let Some(ref cap) = caption { tags.extend(hashtag::extract_hashtags(cap)); } if let Some(ref csv) = hashtags_csv { for tag in csv.split(',') { let t = tag.trim().trim_start_matches('#').to_lowercase(); if !t.is_empty() { tags.push(t); } } } tags.sort(); tags.dedup(); for tag in &tags { let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?; Hashtag::link_to_upload(&state.pool, upload.id, h.id).await?; } // Spawn compression task state .compression .process(upload.id, relative_path, mime.clone()); // Broadcast SSE event let dto = UploadDto { id: upload.id, user_id: auth.user_id, uploader_name: user.display_name, preview_url: None, thumbnail_url: None, mime_type: mime, caption, hashtags: tags, like_count: 0, comment_count: 0, liked_by_me: false, created_at: upload.created_at, }; let _ = state.sse_tx.send(crate::state::SseEvent::new( "new-upload", serde_json::to_string(&dto).unwrap_or_default(), )); Ok((StatusCode::CREATED, Json(dto))) } #[derive(Deserialize)] pub struct EditUploadRequest { pub caption: Option, pub hashtags: Option>, } pub async fn edit_upload( State(state): State, auth: AuthUser, Path(upload_id): Path, Json(body): Json, ) -> Result { let upload = Upload::find_by_id(&state.pool, upload_id) .await? .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; if upload.user_id != auth.user_id { return Err(AppError::Forbidden("Nur eigene Uploads bearbeiten.".into())); } if let Some(ref caption) = body.caption { Upload::update_caption(&state.pool, upload_id, Some(caption)).await?; } if let Some(ref hashtags) = body.hashtags { Hashtag::unlink_all_from_upload(&state.pool, upload_id).await?; for tag in hashtags { let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?; Hashtag::link_to_upload(&state.pool, upload_id, h.id).await?; } } Ok(StatusCode::OK) } pub async fn delete_upload( State(state): State, auth: AuthUser, Path(upload_id): Path, ) -> Result { let upload = Upload::find_by_id(&state.pool, upload_id) .await? .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; if upload.user_id != auth.user_id { return Err(AppError::Forbidden("Nur eigene Uploads löschen.".into())); } Upload::soft_delete(&state.pool, upload_id).await?; Ok(StatusCode::NO_CONTENT) } /// Drain a multipart body so the HTTP connection stays clean when returning an early error. /// Without draining, the client may still be sending the body after we've sent our response, /// which can corrupt the keep-alive connection for subsequent requests. async fn drain_multipart(mut mp: Multipart) { while let Ok(Some(mut field)) = mp.next_field().await { while field.chunk().await.ok().flatten().is_some() {} } } /// Snapshot of the dynamic per-user quota used both by the upload pre-check and the /// `GET /me/quota` endpoint. `limit_bytes = None` means quota enforcement is currently /// off (the frontend hides the widget in that case). pub struct QuotaEstimate { pub limit_bytes: Option, pub active_uploaders: i64, pub free_disk_bytes: i64, pub tolerance: f64, } /// Computes the per-user storage quota using /// `floor((free_disk * tolerance) / max(active_uploaders, 1))`. Returns `limit_bytes = /// None` whenever the storage quota is currently disabled — callers should skip the /// check (upload handler) or hide the UI (quota endpoint). pub async fn compute_storage_quota(state: &AppState) -> QuotaEstimate { let quota_on = config::get_bool(&state.pool, "quota_enabled", true).await; let storage_quota_on = config::get_bool(&state.pool, "storage_quota_enabled", true).await; let tolerance = config::get_f64(&state.pool, "quota_tolerance", 0.75).await; let (active_count,): (i64,) = sqlx::query_as( "SELECT COUNT(DISTINCT user_id) FROM upload WHERE deleted_at IS NULL", ) .fetch_one(&state.pool) .await .unwrap_or((0,)); let active = active_count.max(1); let media_path = state.config.media_path.to_string_lossy().to_string(); let free_disk = sysinfo::Disks::new_with_refreshed_list() .iter() .find(|d| media_path.starts_with(d.mount_point().to_string_lossy().as_ref())) .map(|d| d.available_space()) .unwrap_or_else(|| { sysinfo::Disks::new_with_refreshed_list() .iter() .find(|d| d.mount_point().to_string_lossy() == "/") .map(|d| d.available_space()) .unwrap_or(0) }) as i64; let limit_bytes = if quota_on && storage_quota_on { Some(((free_disk as f64 * tolerance) / active as f64).floor() as i64) } else { None }; QuotaEstimate { limit_bytes, active_uploaders: active, free_disk_bytes: free_disk, tolerance, } } /// Streaming download of the original file behind an upload. Used by: /// - the per-post "Original anzeigen" context action (`window.open`) /// - `` / `