Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e1f1d6426 | ||
|
|
3f052a4f91 |
3
backend/Cargo.lock
generated
3
backend/Cargo.lock
generated
@@ -894,6 +894,7 @@ dependencies = [
|
||||
"bcrypt",
|
||||
"chrono",
|
||||
"dotenvy",
|
||||
"futures",
|
||||
"image",
|
||||
"jsonwebtoken",
|
||||
"minijinja",
|
||||
@@ -905,6 +906,7 @@ dependencies = [
|
||||
"sqlx",
|
||||
"sysinfo",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tower",
|
||||
"tower-http",
|
||||
"tower_governor",
|
||||
@@ -3253,6 +3255,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -16,6 +16,8 @@ jsonwebtoken = "9"
|
||||
bcrypt = "0.15"
|
||||
uuid = { version = "1", features = ["v4", "serde"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
futures = "0.3"
|
||||
sha2 = "0.10"
|
||||
rand = "0.9"
|
||||
anyhow = "1"
|
||||
|
||||
2
backend/src/handlers/mod.rs
Normal file
2
backend/src/handlers/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod sse;
|
||||
pub mod upload;
|
||||
33
backend/src/handlers/sse.rs
Normal file
33
backend/src/handlers/sse.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::convert::Infallible;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::response::sse::{Event, KeepAlive, Sse};
|
||||
use futures::stream::Stream;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::auth::middleware::AuthUser;
|
||||
use crate::error::AppError;
|
||||
use crate::state::AppState;
|
||||
|
||||
pub async fn stream(
|
||||
State(state): State<AppState>,
|
||||
_auth: AuthUser,
|
||||
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
|
||||
let rx = state.sse_tx.subscribe();
|
||||
let stream = BroadcastStream::new(rx).filter_map(|msg| {
|
||||
match msg {
|
||||
Ok(sse_event) => Some(Ok(Event::default()
|
||||
.event(sse_event.event_type)
|
||||
.data(sse_event.data))),
|
||||
Err(_) => None, // Lagged — skip missed events
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Sse::new(stream).keep_alive(
|
||||
KeepAlive::new()
|
||||
.interval(Duration::from_secs(30))
|
||||
.text("ping"),
|
||||
))
|
||||
}
|
||||
237
backend/src/handlers/upload.rs
Normal file
237
backend/src/handlers/upload.rs
Normal file
@@ -0,0 +1,237 @@
|
||||
use axum::extract::{Multipart, Path, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::Json;
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::middleware::AuthUser;
|
||||
use crate::error::AppError;
|
||||
use crate::models::hashtag::{self, Hashtag};
|
||||
use crate::models::upload::{Upload, UploadDto};
|
||||
use crate::models::user::User;
|
||||
use crate::state::AppState;
|
||||
|
||||
pub async fn upload(
|
||||
State(state): State<AppState>,
|
||||
auth: AuthUser,
|
||||
mut multipart: Multipart,
|
||||
) -> Result<(StatusCode, Json<UploadDto>), AppError> {
|
||||
// Check if user is banned
|
||||
let user = User::find_by_id(&state.pool, auth.user_id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?;
|
||||
if user.is_banned {
|
||||
return Err(AppError::Forbidden("Du bist gesperrt.".into()));
|
||||
}
|
||||
|
||||
// Check if uploads are locked
|
||||
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?;
|
||||
if event.uploads_locked_at.is_some() {
|
||||
return Err(AppError::Forbidden("Uploads sind gesperrt.".into()));
|
||||
}
|
||||
|
||||
// Read config limits from DB
|
||||
let max_image_mb: i64 = get_config_i64(&state.pool, "max_image_size_mb", 20).await;
|
||||
let max_video_mb: i64 = get_config_i64(&state.pool, "max_video_size_mb", 500).await;
|
||||
|
||||
let mut file_data: Option<Vec<u8>> = None;
|
||||
let mut file_name: Option<String> = None;
|
||||
let mut content_type: Option<String> = None;
|
||||
let mut caption: Option<String> = None;
|
||||
let mut hashtags_csv: Option<String> = None;
|
||||
|
||||
while let Some(field) = multipart.next_field().await.map_err(|e| AppError::BadRequest(e.to_string()))? {
|
||||
let name = field.name().unwrap_or_default().to_string();
|
||||
match name.as_str() {
|
||||
"file" => {
|
||||
file_name = field.file_name().map(|s| s.to_string());
|
||||
content_type = field.content_type().map(|s| s.to_string());
|
||||
file_data = Some(
|
||||
field.bytes().await
|
||||
.map_err(|e| AppError::BadRequest(format!("Datei konnte nicht gelesen werden: {e}")))?
|
||||
.to_vec(),
|
||||
);
|
||||
}
|
||||
"caption" => {
|
||||
caption = Some(
|
||||
field.text().await
|
||||
.map_err(|e| AppError::BadRequest(e.to_string()))?,
|
||||
);
|
||||
}
|
||||
"hashtags" => {
|
||||
hashtags_csv = Some(
|
||||
field.text().await
|
||||
.map_err(|e| AppError::BadRequest(e.to_string()))?,
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let data = file_data.ok_or_else(|| AppError::BadRequest("Keine Datei hochgeladen.".into()))?;
|
||||
let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string());
|
||||
let size = data.len() as i64;
|
||||
|
||||
// Validate file size
|
||||
let max_bytes = if mime.starts_with("video/") {
|
||||
max_video_mb * 1024 * 1024
|
||||
} else {
|
||||
max_image_mb * 1024 * 1024
|
||||
};
|
||||
if size > max_bytes {
|
||||
return Err(AppError::BadRequest(format!(
|
||||
"Datei ist zu groß. Maximum: {} MB.",
|
||||
max_bytes / (1024 * 1024)
|
||||
)));
|
||||
}
|
||||
|
||||
// Determine file extension
|
||||
let ext = file_name
|
||||
.as_deref()
|
||||
.and_then(|n| n.rsplit('.').next())
|
||||
.unwrap_or(if mime.starts_with("video/") { "mp4" } else { "jpg" });
|
||||
|
||||
let upload_id = Uuid::new_v4();
|
||||
let event_slug = &state.config.event_slug;
|
||||
let relative_path = format!("originals/{event_slug}/{upload_id}.{ext}");
|
||||
let absolute_path = state.config.media_path.join(&relative_path);
|
||||
|
||||
// Ensure directory exists and write file
|
||||
if let Some(parent) = absolute_path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await.map_err(|e| AppError::Internal(e.into()))?;
|
||||
}
|
||||
tokio::fs::write(&absolute_path, &data).await.map_err(|e| AppError::Internal(e.into()))?;
|
||||
|
||||
// Update user's total upload bytes
|
||||
sqlx::query("UPDATE \"user\" SET total_upload_bytes = total_upload_bytes + $2 WHERE id = $1")
|
||||
.bind(auth.user_id)
|
||||
.bind(size)
|
||||
.execute(&state.pool)
|
||||
.await?;
|
||||
|
||||
// Insert upload record
|
||||
let upload = Upload::create(
|
||||
&state.pool,
|
||||
auth.event_id,
|
||||
auth.user_id,
|
||||
&relative_path,
|
||||
&mime,
|
||||
size,
|
||||
caption.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Process hashtags from caption and explicit CSV
|
||||
let mut tags: Vec<String> = Vec::new();
|
||||
if let Some(ref cap) = caption {
|
||||
tags.extend(hashtag::extract_hashtags(cap));
|
||||
}
|
||||
if let Some(ref csv) = hashtags_csv {
|
||||
for tag in csv.split(',') {
|
||||
let t = tag.trim().trim_start_matches('#').to_lowercase();
|
||||
if !t.is_empty() {
|
||||
tags.push(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
tags.sort();
|
||||
tags.dedup();
|
||||
|
||||
for tag in &tags {
|
||||
let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?;
|
||||
Hashtag::link_to_upload(&state.pool, upload.id, h.id).await?;
|
||||
}
|
||||
|
||||
// Spawn compression task
|
||||
state
|
||||
.compression
|
||||
.process(upload.id, relative_path, mime.clone());
|
||||
|
||||
// Broadcast SSE event
|
||||
let dto = UploadDto {
|
||||
id: upload.id,
|
||||
user_id: auth.user_id,
|
||||
uploader_name: user.display_name,
|
||||
preview_url: None,
|
||||
thumbnail_url: None,
|
||||
mime_type: mime,
|
||||
caption,
|
||||
hashtags: tags,
|
||||
like_count: 0,
|
||||
comment_count: 0,
|
||||
liked_by_me: false,
|
||||
created_at: upload.created_at,
|
||||
};
|
||||
|
||||
let _ = state.sse_tx.send(crate::state::SseEvent {
|
||||
event_type: "new-upload".to_string(),
|
||||
data: serde_json::to_string(&dto).unwrap_or_default(),
|
||||
});
|
||||
|
||||
Ok((StatusCode::CREATED, Json(dto)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct EditUploadRequest {
|
||||
pub caption: Option<String>,
|
||||
pub hashtags: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
pub async fn edit_upload(
|
||||
State(state): State<AppState>,
|
||||
auth: AuthUser,
|
||||
Path(upload_id): Path<Uuid>,
|
||||
Json(body): Json<EditUploadRequest>,
|
||||
) -> Result<StatusCode, AppError> {
|
||||
let upload = Upload::find_by_id(&state.pool, upload_id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?;
|
||||
|
||||
if upload.user_id != auth.user_id {
|
||||
return Err(AppError::Forbidden("Nur eigene Uploads bearbeiten.".into()));
|
||||
}
|
||||
|
||||
if let Some(ref caption) = body.caption {
|
||||
Upload::update_caption(&state.pool, upload_id, Some(caption)).await?;
|
||||
}
|
||||
|
||||
if let Some(ref hashtags) = body.hashtags {
|
||||
Hashtag::unlink_all_from_upload(&state.pool, upload_id).await?;
|
||||
for tag in hashtags {
|
||||
let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?;
|
||||
Hashtag::link_to_upload(&state.pool, upload_id, h.id).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(StatusCode::OK)
|
||||
}
|
||||
|
||||
pub async fn delete_upload(
|
||||
State(state): State<AppState>,
|
||||
auth: AuthUser,
|
||||
Path(upload_id): Path<Uuid>,
|
||||
) -> Result<StatusCode, AppError> {
|
||||
let upload = Upload::find_by_id(&state.pool, upload_id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?;
|
||||
|
||||
if upload.user_id != auth.user_id {
|
||||
return Err(AppError::Forbidden("Nur eigene Uploads löschen.".into()));
|
||||
}
|
||||
|
||||
Upload::soft_delete(&state.pool, upload_id).await?;
|
||||
|
||||
Ok(StatusCode::NO_CONTENT)
|
||||
}
|
||||
|
||||
async fn get_config_i64(pool: &sqlx::PgPool, key: &str, default: i64) -> i64 {
|
||||
let row: Option<(String,)> =
|
||||
sqlx::query_as("SELECT value FROM config WHERE key = $1")
|
||||
.bind(key)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
.unwrap_or(None);
|
||||
row.and_then(|r| r.0.parse().ok()).unwrap_or(default)
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use axum::routing::{delete, post};
|
||||
use axum::routing::{delete, get, patch, post};
|
||||
use axum::Router;
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
@@ -7,7 +7,9 @@ mod auth;
|
||||
mod config;
|
||||
mod db;
|
||||
mod error;
|
||||
mod handlers;
|
||||
mod models;
|
||||
mod services;
|
||||
mod state;
|
||||
|
||||
use config::AppConfig;
|
||||
@@ -28,14 +30,26 @@ async fn main() -> Result<()> {
|
||||
let pool = db::create_pool(&config.database_url).await?;
|
||||
let state = AppState::new(pool, config.clone());
|
||||
|
||||
// Ensure media directories exist
|
||||
tokio::fs::create_dir_all(&config.media_path).await.ok();
|
||||
|
||||
let api = Router::new()
|
||||
// Auth
|
||||
.route("/api/v1/join", post(auth::handlers::join))
|
||||
.route("/api/v1/recover", post(auth::handlers::recover))
|
||||
.route("/api/v1/admin/login", post(auth::handlers::admin_login))
|
||||
.route("/api/v1/session", delete(auth::handlers::logout));
|
||||
.route("/api/v1/session", delete(auth::handlers::logout))
|
||||
// Upload
|
||||
.route("/api/v1/upload", post(handlers::upload::upload))
|
||||
.route(
|
||||
"/api/v1/upload/{id}",
|
||||
patch(handlers::upload::edit_upload).delete(handlers::upload::delete_upload),
|
||||
)
|
||||
// SSE
|
||||
.route("/api/v1/stream", get(handlers::sse::stream));
|
||||
|
||||
let router = Router::new()
|
||||
.route("/health", axum::routing::get(|| async { "ok" }))
|
||||
.route("/health", get(|| async { "ok" }))
|
||||
.merge(api)
|
||||
.with_state(state);
|
||||
|
||||
|
||||
77
backend/src/models/hashtag.rs
Normal file
77
backend/src/models/hashtag.rs
Normal file
@@ -0,0 +1,77 @@
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct Hashtag {
|
||||
pub id: Uuid,
|
||||
pub event_id: Uuid,
|
||||
pub tag: String,
|
||||
}
|
||||
|
||||
impl Hashtag {
|
||||
/// Upsert a hashtag (insert if not exists, return existing if it does).
|
||||
pub async fn upsert(pool: &PgPool, event_id: Uuid, tag: &str) -> Result<Self, sqlx::Error> {
|
||||
let normalized = tag.trim().trim_start_matches('#').to_lowercase();
|
||||
sqlx::query_as::<_, Self>(
|
||||
"INSERT INTO hashtag (event_id, tag) VALUES ($1, $2)
|
||||
ON CONFLICT (event_id, tag) DO UPDATE SET tag = EXCLUDED.tag
|
||||
RETURNING *",
|
||||
)
|
||||
.bind(event_id)
|
||||
.bind(&normalized)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn link_to_upload(
|
||||
pool: &PgPool,
|
||||
upload_id: Uuid,
|
||||
hashtag_id: Uuid,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query(
|
||||
"INSERT INTO upload_hashtag (upload_id, hashtag_id) VALUES ($1, $2)
|
||||
ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind(upload_id)
|
||||
.bind(hashtag_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn unlink_all_from_upload(
|
||||
pool: &PgPool,
|
||||
upload_id: Uuid,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("DELETE FROM upload_hashtag WHERE upload_id = $1")
|
||||
.bind(upload_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tags_for_upload(
|
||||
pool: &PgPool,
|
||||
upload_id: Uuid,
|
||||
) -> Result<Vec<String>, sqlx::Error> {
|
||||
let rows: Vec<(String,)> = sqlx::query_as(
|
||||
"SELECT h.tag FROM hashtag h
|
||||
JOIN upload_hashtag uh ON uh.hashtag_id = h.id
|
||||
WHERE uh.upload_id = $1
|
||||
ORDER BY h.tag",
|
||||
)
|
||||
.bind(upload_id)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
Ok(rows.into_iter().map(|r| r.0).collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract #hashtags from text (caption or body).
|
||||
pub fn extract_hashtags(text: &str) -> Vec<String> {
|
||||
text.split_whitespace()
|
||||
.filter(|w| w.starts_with('#') && w.len() > 1)
|
||||
.map(|w| w.trim_start_matches('#').to_lowercase())
|
||||
.filter(|t| !t.is_empty())
|
||||
.collect()
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod event;
|
||||
pub mod hashtag;
|
||||
pub mod session;
|
||||
pub mod upload;
|
||||
pub mod user;
|
||||
|
||||
117
backend/src/models/upload.rs
Normal file
117
backend/src/models/upload.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
use sqlx::PgPool;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct Upload {
|
||||
pub id: Uuid,
|
||||
pub event_id: Uuid,
|
||||
pub user_id: Uuid,
|
||||
pub original_path: String,
|
||||
pub preview_path: Option<String>,
|
||||
pub thumbnail_path: Option<String>,
|
||||
pub mime_type: String,
|
||||
pub original_size_bytes: i64,
|
||||
pub caption: Option<String>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub deleted_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct UploadDto {
|
||||
pub id: Uuid,
|
||||
pub user_id: Uuid,
|
||||
pub uploader_name: String,
|
||||
pub preview_url: Option<String>,
|
||||
pub thumbnail_url: Option<String>,
|
||||
pub mime_type: String,
|
||||
pub caption: Option<String>,
|
||||
pub hashtags: Vec<String>,
|
||||
pub like_count: i64,
|
||||
pub comment_count: i64,
|
||||
pub liked_by_me: bool,
|
||||
pub created_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Upload {
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
event_id: Uuid,
|
||||
user_id: Uuid,
|
||||
original_path: &str,
|
||||
mime_type: &str,
|
||||
original_size_bytes: i64,
|
||||
caption: Option<&str>,
|
||||
) -> Result<Self, sqlx::Error> {
|
||||
sqlx::query_as::<_, Self>(
|
||||
"INSERT INTO upload (event_id, user_id, original_path, mime_type, original_size_bytes, caption)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING *",
|
||||
)
|
||||
.bind(event_id)
|
||||
.bind(user_id)
|
||||
.bind(original_path)
|
||||
.bind(mime_type)
|
||||
.bind(original_size_bytes)
|
||||
.bind(caption)
|
||||
.fetch_one(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<Self>, sqlx::Error> {
|
||||
sqlx::query_as::<_, Self>(
|
||||
"SELECT * FROM upload WHERE id = $1 AND deleted_at IS NULL",
|
||||
)
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_preview_path(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
preview_path: &str,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("UPDATE upload SET preview_path = $2 WHERE id = $1")
|
||||
.bind(id)
|
||||
.bind(preview_path)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_thumbnail_path(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
thumbnail_path: &str,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("UPDATE upload SET thumbnail_path = $2 WHERE id = $1")
|
||||
.bind(id)
|
||||
.bind(thumbnail_path)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn soft_delete(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("UPDATE upload SET deleted_at = NOW() WHERE id = $1")
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_caption(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
caption: Option<&str>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("UPDATE upload SET caption = $2 WHERE id = $1")
|
||||
.bind(id)
|
||||
.bind(caption)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
139
backend/src/services/compression.rs
Normal file
139
backend/src/services/compression.rs
Normal file
@@ -0,0 +1,139 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use sqlx::PgPool;
|
||||
use tokio::sync::Semaphore;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::models::upload::Upload;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CompressionWorker {
|
||||
semaphore: Arc<Semaphore>,
|
||||
pool: PgPool,
|
||||
media_path: PathBuf,
|
||||
}
|
||||
|
||||
impl CompressionWorker {
|
||||
pub fn new(pool: PgPool, media_path: PathBuf, concurrency: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(Semaphore::new(concurrency)),
|
||||
pool,
|
||||
media_path,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a background task to process an uploaded file.
|
||||
pub fn process(&self, upload_id: Uuid, original_path: String, mime_type: String) {
|
||||
let worker = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let _permit = worker.semaphore.acquire().await;
|
||||
if let Err(e) = worker.do_process(upload_id, &original_path, &mime_type).await {
|
||||
tracing::error!("compression failed for upload {upload_id}: {e:#}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn do_process(
|
||||
&self,
|
||||
upload_id: Uuid,
|
||||
original_path: &str,
|
||||
mime_type: &str,
|
||||
) -> Result<()> {
|
||||
let original = self.media_path.join(original_path);
|
||||
|
||||
if mime_type.starts_with("image/") {
|
||||
let preview_rel = self.generate_image_preview(upload_id, &original, mime_type).await?;
|
||||
Upload::set_preview_path(&self.pool, upload_id, &preview_rel).await?;
|
||||
tracing::info!("preview generated for upload {upload_id}");
|
||||
} else if mime_type.starts_with("video/") {
|
||||
let thumb_rel = self.generate_video_thumbnail(upload_id, &original).await?;
|
||||
Upload::set_thumbnail_path(&self.pool, upload_id, &thumb_rel).await?;
|
||||
tracing::info!("thumbnail generated for upload {upload_id}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn generate_image_preview(
|
||||
&self,
|
||||
upload_id: Uuid,
|
||||
original: &Path,
|
||||
mime_type: &str,
|
||||
) -> Result<String> {
|
||||
let previews_dir = self.media_path.join("previews");
|
||||
tokio::fs::create_dir_all(&previews_dir).await?;
|
||||
|
||||
let preview_filename = format!("{upload_id}.jpg");
|
||||
let preview_path = previews_dir.join(&preview_filename);
|
||||
let original = original.to_path_buf();
|
||||
let preview_path_clone = preview_path.clone();
|
||||
let mime_owned = mime_type.to_string();
|
||||
|
||||
// Run blocking image operations in a spawn_blocking task
|
||||
tokio::task::spawn_blocking(move || -> Result<()> {
|
||||
let img = image::open(&original)
|
||||
.context("failed to open image")?;
|
||||
|
||||
// Resize to max 800px wide, preserving aspect ratio
|
||||
let preview = img.resize(800, 800, image::imageops::FilterType::Lanczos3);
|
||||
preview.save_with_format(&preview_path_clone, image::ImageFormat::Jpeg)
|
||||
.context("failed to save preview")?;
|
||||
|
||||
// If the original is PNG, try lossless compression in-place
|
||||
if mime_owned == "image/png" {
|
||||
let opts = oxipng::Options::from_preset(2);
|
||||
let _ = oxipng::optimize(
|
||||
&oxipng::InFile::Path(original),
|
||||
&oxipng::OutFile::Path {
|
||||
path: None,
|
||||
preserve_attrs: true,
|
||||
},
|
||||
&opts,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(format!("previews/{preview_filename}"))
|
||||
}
|
||||
|
||||
async fn generate_video_thumbnail(
|
||||
&self,
|
||||
upload_id: Uuid,
|
||||
original: &Path,
|
||||
) -> Result<String> {
|
||||
let thumbs_dir = self.media_path.join("thumbnails");
|
||||
tokio::fs::create_dir_all(&thumbs_dir).await?;
|
||||
|
||||
let thumb_filename = format!("{upload_id}.jpg");
|
||||
let thumb_path = thumbs_dir.join(&thumb_filename);
|
||||
|
||||
let output = tokio::process::Command::new("ffmpeg")
|
||||
.args([
|
||||
"-i",
|
||||
original.to_str().unwrap_or_default(),
|
||||
"-vframes",
|
||||
"1",
|
||||
"-ss",
|
||||
"00:00:01",
|
||||
"-vf",
|
||||
"scale=800:-1",
|
||||
"-y",
|
||||
thumb_path.to_str().unwrap_or_default(),
|
||||
])
|
||||
.output()
|
||||
.await
|
||||
.context("failed to run ffmpeg")?;
|
||||
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
anyhow::bail!("ffmpeg failed: {stderr}");
|
||||
}
|
||||
|
||||
Ok(format!("thumbnails/{thumb_filename}"))
|
||||
}
|
||||
}
|
||||
1
backend/src/services/mod.rs
Normal file
1
backend/src/services/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod compression;
|
||||
@@ -2,6 +2,7 @@ use sqlx::PgPool;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::config::AppConfig;
|
||||
use crate::services::compression::CompressionWorker;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SseEvent {
|
||||
@@ -14,15 +15,19 @@ pub struct AppState {
|
||||
pub pool: PgPool,
|
||||
pub config: AppConfig,
|
||||
pub sse_tx: broadcast::Sender<SseEvent>,
|
||||
pub compression: CompressionWorker,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(pool: PgPool, config: AppConfig) -> Self {
|
||||
let (sse_tx, _) = broadcast::channel(256);
|
||||
let compression =
|
||||
CompressionWorker::new(pool.clone(), config.media_path.clone(), 2);
|
||||
Self {
|
||||
pool,
|
||||
config,
|
||||
sse_tx,
|
||||
compression,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
103
frontend/src/lib/components/UploadQueue.svelte
Normal file
103
frontend/src/lib/components/UploadQueue.svelte
Normal file
@@ -0,0 +1,103 @@
|
||||
<script lang="ts">
|
||||
import { queueItems, isProcessing, retryItem, removeItem, clearCompleted } from '$lib/upload-queue';
|
||||
import type { QueueItem } from '$lib/upload-queue';
|
||||
|
||||
function formatSize(bytes: number): string {
|
||||
if (bytes < 1024) return `${bytes} B`;
|
||||
if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`;
|
||||
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
|
||||
}
|
||||
|
||||
function statusLabel(status: QueueItem['status']): string {
|
||||
switch (status) {
|
||||
case 'pending': return 'Wartend';
|
||||
case 'uploading': return 'Wird hochgeladen';
|
||||
case 'done': return 'Fertig';
|
||||
case 'error': return 'Fehler';
|
||||
}
|
||||
}
|
||||
|
||||
function statusColor(status: QueueItem['status']): string {
|
||||
switch (status) {
|
||||
case 'pending': return 'text-gray-500';
|
||||
case 'uploading': return 'text-blue-600';
|
||||
case 'done': return 'text-green-600';
|
||||
case 'error': return 'text-red-600';
|
||||
}
|
||||
}
|
||||
|
||||
let items = $derived($queueItems);
|
||||
let hasCompleted = $derived(items.some((i) => i.status === 'done'));
|
||||
</script>
|
||||
|
||||
{#if items.length > 0}
|
||||
<div class="mt-4 rounded-lg border border-gray-200 bg-white">
|
||||
<div class="flex items-center justify-between border-b border-gray-100 px-4 py-3">
|
||||
<h3 class="text-sm font-semibold text-gray-900">
|
||||
Upload-Warteschlange
|
||||
{#if $isProcessing}
|
||||
<span class="ml-2 inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500"></span>
|
||||
{/if}
|
||||
</h3>
|
||||
{#if hasCompleted}
|
||||
<button
|
||||
onclick={() => clearCompleted()}
|
||||
class="text-xs text-gray-500 hover:text-gray-700"
|
||||
>
|
||||
Fertige entfernen
|
||||
</button>
|
||||
{/if}
|
||||
</div>
|
||||
|
||||
<ul class="divide-y divide-gray-100">
|
||||
{#each items as item (item.id)}
|
||||
<li class="px-4 py-3">
|
||||
<div class="flex items-center justify-between">
|
||||
<div class="min-w-0 flex-1">
|
||||
<p class="truncate text-sm font-medium text-gray-900">{item.fileName}</p>
|
||||
<p class="text-xs text-gray-500">{formatSize(item.fileSize)}</p>
|
||||
</div>
|
||||
<div class="ml-3 flex items-center gap-2">
|
||||
<span class="text-xs font-medium {statusColor(item.status)}">
|
||||
{statusLabel(item.status)}
|
||||
</span>
|
||||
{#if item.status === 'error'}
|
||||
<button
|
||||
onclick={() => retryItem(item.id)}
|
||||
class="rounded bg-red-100 px-2 py-0.5 text-xs text-red-700 hover:bg-red-200"
|
||||
>
|
||||
Erneut
|
||||
</button>
|
||||
{/if}
|
||||
{#if item.status === 'done' || item.status === 'error'}
|
||||
<button
|
||||
onclick={() => removeItem(item.id)}
|
||||
class="text-gray-400 hover:text-gray-600"
|
||||
aria-label="Entfernen"
|
||||
>
|
||||
<svg class="h-4 w-4" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M6 18L18 6M6 6l12 12" />
|
||||
</svg>
|
||||
</button>
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{#if item.status === 'uploading'}
|
||||
<div class="mt-2 h-1.5 w-full overflow-hidden rounded-full bg-gray-200">
|
||||
<div
|
||||
class="h-full rounded-full bg-blue-500 transition-all duration-300"
|
||||
style="width: {item.progress}%"
|
||||
></div>
|
||||
</div>
|
||||
<p class="mt-1 text-right text-xs text-gray-400">{item.progress}%</p>
|
||||
{/if}
|
||||
|
||||
{#if item.error}
|
||||
<p class="mt-1 text-xs text-red-500">{item.error}</p>
|
||||
{/if}
|
||||
</li>
|
||||
{/each}
|
||||
</ul>
|
||||
</div>
|
||||
{/if}
|
||||
233
frontend/src/lib/upload-queue.ts
Normal file
233
frontend/src/lib/upload-queue.ts
Normal file
@@ -0,0 +1,233 @@
|
||||
import { openDB, type IDBPDatabase } from 'idb';
|
||||
import { writable, get } from 'svelte/store';
|
||||
import { getToken } from './auth';
|
||||
|
||||
export interface QueueItem {
|
||||
id: string;
|
||||
fileName: string;
|
||||
fileSize: number;
|
||||
mimeType: string;
|
||||
caption: string;
|
||||
hashtags: string;
|
||||
status: 'pending' | 'uploading' | 'done' | 'error';
|
||||
progress: number;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
// Store does NOT hold file blobs — those stay in IndexedDB only
|
||||
export const queueItems = writable<QueueItem[]>([]);
|
||||
export const isProcessing = writable(false);
|
||||
|
||||
const DB_NAME = 'eventsnap-uploads';
|
||||
const STORE_NAME = 'queue';
|
||||
|
||||
let db: IDBPDatabase | null = null;
|
||||
|
||||
async function getDb(): Promise<IDBPDatabase> {
|
||||
if (db) return db;
|
||||
db = await openDB(DB_NAME, 1, {
|
||||
upgrade(database) {
|
||||
if (!database.objectStoreNames.contains(STORE_NAME)) {
|
||||
database.createObjectStore(STORE_NAME, { keyPath: 'id' });
|
||||
}
|
||||
}
|
||||
});
|
||||
return db;
|
||||
}
|
||||
|
||||
export async function loadQueue(): Promise<void> {
|
||||
const database = await getDb();
|
||||
const all = await database.getAll(STORE_NAME);
|
||||
const items: QueueItem[] = all.map((entry) => ({
|
||||
id: entry.id,
|
||||
fileName: entry.fileName,
|
||||
fileSize: entry.fileSize,
|
||||
mimeType: entry.mimeType,
|
||||
caption: entry.caption ?? '',
|
||||
hashtags: entry.hashtags ?? '',
|
||||
status: entry.status === 'uploading' ? 'pending' : entry.status,
|
||||
progress: entry.status === 'done' ? 100 : 0,
|
||||
error: entry.error
|
||||
}));
|
||||
queueItems.set(items);
|
||||
}
|
||||
|
||||
export async function addToQueue(
|
||||
file: File,
|
||||
caption: string,
|
||||
hashtags: string
|
||||
): Promise<void> {
|
||||
const database = await getDb();
|
||||
const id = crypto.randomUUID();
|
||||
const entry = {
|
||||
id,
|
||||
fileName: file.name,
|
||||
fileSize: file.size,
|
||||
mimeType: file.type,
|
||||
caption,
|
||||
hashtags,
|
||||
status: 'pending',
|
||||
blob: file
|
||||
};
|
||||
await database.put(STORE_NAME, entry);
|
||||
|
||||
queueItems.update((items) => [
|
||||
...items,
|
||||
{
|
||||
id,
|
||||
fileName: file.name,
|
||||
fileSize: file.size,
|
||||
mimeType: file.type,
|
||||
caption,
|
||||
hashtags,
|
||||
status: 'pending',
|
||||
progress: 0
|
||||
}
|
||||
]);
|
||||
|
||||
processQueue();
|
||||
}
|
||||
|
||||
export async function retryItem(id: string): Promise<void> {
|
||||
const database = await getDb();
|
||||
const entry = await database.get(STORE_NAME, id);
|
||||
if (!entry) return;
|
||||
|
||||
entry.status = 'pending';
|
||||
entry.error = undefined;
|
||||
await database.put(STORE_NAME, entry);
|
||||
|
||||
queueItems.update((items) =>
|
||||
items.map((item) =>
|
||||
item.id === id ? { ...item, status: 'pending' as const, progress: 0, error: undefined } : item
|
||||
)
|
||||
);
|
||||
|
||||
processQueue();
|
||||
}
|
||||
|
||||
export async function removeItem(id: string): Promise<void> {
|
||||
const database = await getDb();
|
||||
await database.delete(STORE_NAME, id);
|
||||
queueItems.update((items) => items.filter((item) => item.id !== id));
|
||||
}
|
||||
|
||||
export async function clearCompleted(): Promise<void> {
|
||||
const database = await getDb();
|
||||
const items = get(queueItems);
|
||||
for (const item of items) {
|
||||
if (item.status === 'done') {
|
||||
await database.delete(STORE_NAME, item.id);
|
||||
}
|
||||
}
|
||||
queueItems.update((items) => items.filter((item) => item.status !== 'done'));
|
||||
}
|
||||
|
||||
let processing = false;
|
||||
|
||||
async function processQueue(): Promise<void> {
|
||||
if (processing) return;
|
||||
processing = true;
|
||||
isProcessing.set(true);
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const items = get(queueItems);
|
||||
const next = items.find((item) => item.status === 'pending');
|
||||
if (!next) break;
|
||||
|
||||
await uploadItem(next.id);
|
||||
}
|
||||
} finally {
|
||||
processing = false;
|
||||
isProcessing.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
async function uploadItem(id: string): Promise<void> {
|
||||
const database = await getDb();
|
||||
const entry = await database.get(STORE_NAME, id);
|
||||
if (!entry || !entry.blob) {
|
||||
// No blob — mark as error
|
||||
updateItemStatus(id, 'error', 'Datei nicht gefunden.');
|
||||
return;
|
||||
}
|
||||
|
||||
updateItemStatus(id, 'uploading');
|
||||
|
||||
const token = getToken();
|
||||
if (!token) {
|
||||
updateItemStatus(id, 'error', 'Nicht angemeldet.');
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const formData = new FormData();
|
||||
formData.append('file', entry.blob, entry.fileName);
|
||||
if (entry.caption) formData.append('caption', entry.caption);
|
||||
if (entry.hashtags) formData.append('hashtags', entry.hashtags);
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const xhr = new XMLHttpRequest();
|
||||
xhr.open('POST', '/api/v1/upload');
|
||||
xhr.setRequestHeader('Authorization', `Bearer ${token}`);
|
||||
|
||||
xhr.upload.addEventListener('progress', (e) => {
|
||||
if (e.lengthComputable) {
|
||||
const pct = Math.round((e.loaded / e.total) * 100);
|
||||
queueItems.update((items) =>
|
||||
items.map((item) => (item.id === id ? { ...item, progress: pct } : item))
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
xhr.addEventListener('load', () => {
|
||||
if (xhr.status >= 200 && xhr.status < 300) {
|
||||
resolve();
|
||||
} else {
|
||||
try {
|
||||
const body = JSON.parse(xhr.responseText);
|
||||
reject(new Error(body.message || `HTTP ${xhr.status}`));
|
||||
} catch {
|
||||
reject(new Error(`HTTP ${xhr.status}`));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
xhr.addEventListener('error', () => reject(new Error('Netzwerkfehler')));
|
||||
xhr.addEventListener('abort', () => reject(new Error('Abgebrochen')));
|
||||
xhr.send(formData);
|
||||
});
|
||||
|
||||
// Success — remove blob from IndexedDB, mark done
|
||||
entry.status = 'done';
|
||||
delete entry.blob;
|
||||
await database.put(STORE_NAME, entry);
|
||||
updateItemStatus(id, 'done');
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : 'Upload fehlgeschlagen.';
|
||||
entry.status = 'error';
|
||||
entry.error = msg;
|
||||
await database.put(STORE_NAME, entry);
|
||||
updateItemStatus(id, 'error', msg);
|
||||
}
|
||||
}
|
||||
|
||||
function updateItemStatus(
|
||||
id: string,
|
||||
status: QueueItem['status'],
|
||||
error?: string
|
||||
): void {
|
||||
queueItems.update((items) =>
|
||||
items.map((item) =>
|
||||
item.id === id
|
||||
? {
|
||||
...item,
|
||||
status,
|
||||
progress: status === 'done' ? 100 : status === 'error' ? item.progress : item.progress,
|
||||
error
|
||||
}
|
||||
: item
|
||||
)
|
||||
);
|
||||
}
|
||||
79
frontend/src/routes/upload/+page.svelte
Normal file
79
frontend/src/routes/upload/+page.svelte
Normal file
@@ -0,0 +1,79 @@
|
||||
<script lang="ts">
|
||||
import { goto } from '$app/navigation';
|
||||
import { getToken } from '$lib/auth';
|
||||
import { addToQueue, loadQueue } from '$lib/upload-queue';
|
||||
import UploadQueue from '$lib/components/UploadQueue.svelte';
|
||||
import { onMount } from 'svelte';
|
||||
|
||||
let caption = $state('');
|
||||
let hashtags = $state('');
|
||||
let fileInput: HTMLInputElement;
|
||||
|
||||
onMount(() => {
|
||||
if (!getToken()) {
|
||||
goto('/join');
|
||||
return;
|
||||
}
|
||||
loadQueue();
|
||||
});
|
||||
|
||||
async function handleFiles() {
|
||||
const files = fileInput?.files;
|
||||
if (!files || files.length === 0) return;
|
||||
|
||||
for (const file of files) {
|
||||
await addToQueue(file, caption, hashtags);
|
||||
}
|
||||
|
||||
// Reset form
|
||||
caption = '';
|
||||
hashtags = '';
|
||||
if (fileInput) fileInput.value = '';
|
||||
}
|
||||
</script>
|
||||
|
||||
<div class="min-h-screen bg-gray-50 p-4">
|
||||
<div class="mx-auto max-w-lg">
|
||||
<div class="mb-6 flex items-center justify-between">
|
||||
<h1 class="text-xl font-bold text-gray-900">Hochladen</h1>
|
||||
<a href="/feed" class="text-sm text-blue-600 hover:underline">Zur Galerie</a>
|
||||
</div>
|
||||
|
||||
<div class="rounded-lg border border-gray-200 bg-white p-4">
|
||||
<label
|
||||
class="flex cursor-pointer flex-col items-center justify-center rounded-lg border-2 border-dashed border-gray-300 bg-gray-50 p-8 transition hover:border-blue-400 hover:bg-blue-50"
|
||||
>
|
||||
<svg class="mb-2 h-10 w-10 text-gray-400" fill="none" viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="1.5" d="M12 16V4m0 0l-4 4m4-4l4 4M4 20h16" />
|
||||
</svg>
|
||||
<span class="text-sm font-medium text-gray-600">Fotos oder Videos auswählen</span>
|
||||
<span class="mt-1 text-xs text-gray-400">Mehrere Dateien möglich</span>
|
||||
<input
|
||||
bind:this={fileInput}
|
||||
type="file"
|
||||
accept="image/*,video/*"
|
||||
multiple
|
||||
class="hidden"
|
||||
onchange={handleFiles}
|
||||
/>
|
||||
</label>
|
||||
|
||||
<div class="mt-4 space-y-3">
|
||||
<input
|
||||
type="text"
|
||||
bind:value={caption}
|
||||
placeholder="Beschreibung (optional, #hashtags möglich)"
|
||||
class="w-full rounded-lg border border-gray-300 px-3 py-2 text-sm focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-200"
|
||||
/>
|
||||
<input
|
||||
type="text"
|
||||
bind:value={hashtags}
|
||||
placeholder="Hashtags (kommagetrennt, z.B. hochzeit, party)"
|
||||
class="w-full rounded-lg border border-gray-300 px-3 py-2 text-sm focus:border-blue-500 focus:outline-none focus:ring-1 focus:ring-blue-200"
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<UploadQueue />
|
||||
</div>
|
||||
</div>
|
||||
Reference in New Issue
Block a user