From d228676a567da9655823f9c938c11300698ab56a Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sun, 17 May 2026 21:00:51 +0200 Subject: [PATCH] fix(security): cross-event authz, SSE ticket flow, account hardening, audit logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the comprehensive code review. Five batches: 1. Cross-event authorization: host_delete_upload, unban_user, and host_delete_comment now scope by auth.event_id. Adds Upload::find_by_id_and_event / soft_delete_in_event and a Comment::soft_delete_in_event variant that joins through upload. 2. Token exposure: SSE auth no longer puts the JWT in the URL. New /api/v1/stream/ticket endpoint mints a short-lived single-use ticket bound to the session; the EventSource passes ?ticket=... instead. Refuse to start in APP_ENV=production with the dev JWT sentinel; warn loudly otherwise. 3. Account hardening: per-IP+name rate limit on /recover (mitigates targeted lockout DoS), per-IP rate limit on /admin/login, random 32-char admin recovery PIN (replaces "0000"), structured tracing events for wrong PIN, lockout, failed admin login, ban/unban/role change/pin-reset/host-delete. 4. DoS / correctness: comment listing paginated (LIMIT 50 + ?before= cursor), hashtag extraction whitelisted to ASCII alnum+underscore (≤40 chars) with unit tests, display_name / caption / comment body length validated in chars rather than bytes. 5. Cleanup: session-touch failures now logged, DATABASE_MAX_CONNECTIONS env var (default 10). Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/src/auth/handlers.rs | 72 +++++++++++++++++++++++-- backend/src/auth/middleware.rs | 8 ++- backend/src/config.rs | 28 ++++++++-- backend/src/db.rs | 11 +++- backend/src/handlers/host.rs | 79 +++++++++++++++++++++++----- backend/src/handlers/social.rs | 20 +++++-- backend/src/handlers/sse.rs | 37 +++++++++---- backend/src/handlers/upload.rs | 6 ++- backend/src/main.rs | 7 ++- backend/src/models/comment.rs | 53 ++++++++++++++++--- backend/src/models/hashtag.rs | 43 +++++++++++++-- backend/src/models/upload.rs | 54 +++++++++++++++++++ backend/src/services/maintenance.rs | 9 +++- backend/src/services/mod.rs | 1 + backend/src/services/sse_tickets.rs | 74 ++++++++++++++++++++++++++ backend/src/state.rs | 3 ++ frontend/src/lib/sse.ts | 81 ++++++++++++++++++----------- 17 files changed, 507 insertions(+), 79 deletions(-) create mode 100644 backend/src/services/sse_tickets.rs diff --git a/backend/src/auth/handlers.rs b/backend/src/auth/handlers.rs index b8c6749..ca45d4c 100644 --- a/backend/src/auth/handlers.rs +++ b/backend/src/auth/handlers.rs @@ -49,7 +49,8 @@ pub async fn join( } let display_name = body.display_name.trim(); - if display_name.is_empty() || display_name.len() > 50 { + let name_chars = display_name.chars().count(); + if name_chars == 0 || name_chars > 50 { return Err(AppError::BadRequest( "Name muss zwischen 1 und 50 Zeichen lang sein.".into(), )); @@ -122,10 +123,33 @@ pub struct RecoverResponse { pub async fn recover( State(state): State, + headers: HeaderMap, Json(body): Json, ) -> Result, AppError> { let display_name = body.display_name.trim(); + // Per-IP+name throttle BEFORE the per-user 3-strike counter. Without this + // an attacker who knows a display name (they're visible on the feed) can + // burn through 3 wrong PINs and lock the victim for 15 minutes — repeated + // every 15 minutes, indefinitely. 5 attempts per 15 minutes per (IP, name) + // softens that into a real cost. + let ip = client_ip(&headers, "unknown"); + let rate_limits_on = config::get_bool(&state.pool, "rate_limits_enabled", true).await; + let recover_rate_on = config::get_bool(&state.pool, "recover_rate_enabled", true).await; + if rate_limits_on && recover_rate_on { + let name_key = display_name.to_lowercase(); + if !state.rate_limiter.check( + format!("recover:{ip}:{name_key}"), + 5, + Duration::from_secs(15 * 60), + ) { + return Err(AppError::TooManyRequests( + "Zu viele Versuche. Bitte warte kurz und versuche es erneut.".into(), + None, + )); + } + } + let event = Event::find_by_slug(&state.pool, &state.config.event_slug) .await? .ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?; @@ -184,9 +208,22 @@ pub async fn recover( // Wrong PIN — increment failure count let attempts = User::increment_failed_pin(&state.pool, user.id).await?; + tracing::warn!( + user_id = %user.id, + event_id = %event.id, + ip = %ip, + attempts, + "recover: wrong PIN" + ); if attempts >= 3 { let lockout = Utc::now() + chrono::Duration::minutes(15); User::lock_pin(&state.pool, user.id, lockout).await?; + tracing::warn!( + user_id = %user.id, + event_id = %event.id, + ip = %ip, + "recover: account locked for 15 minutes" + ); } } @@ -205,6 +242,7 @@ pub struct AdminLoginResponse { pub async fn admin_login( State(state): State, + headers: HeaderMap, Json(body): Json, ) -> Result, AppError> { if state.config.admin_password_hash.is_empty() { @@ -213,10 +251,31 @@ pub async fn admin_login( )); } + // Throttle password attempts. The admin password is bcrypt-hashed (slow to + // verify) but with no IP-level limit a determined attacker can still mount + // a long-running guess campaign. 5 attempts / minute / IP is plenty for + // honest typos. + let ip = client_ip(&headers, "unknown"); + let rate_limits_on = config::get_bool(&state.pool, "rate_limits_enabled", true).await; + let admin_rate_on = config::get_bool(&state.pool, "admin_login_rate_enabled", true).await; + if rate_limits_on && admin_rate_on + && !state.rate_limiter.check( + format!("admin_login:{ip}"), + 5, + Duration::from_secs(60), + ) + { + return Err(AppError::TooManyRequests( + "Zu viele Anmeldeversuche. Bitte warte kurz und versuche es erneut.".into(), + None, + )); + } + let valid = bcrypt::verify(&body.password, &state.config.admin_password_hash) .unwrap_or(false); if !valid { + tracing::warn!(ip = %ip, "admin_login: wrong password"); return Err(AppError::Unauthorized("Falsches Passwort.".into())); } @@ -233,8 +292,13 @@ pub async fn admin_login( let admin_user = if let Some(u) = users.into_iter().find(|u| u.role == UserRole::Admin) { u } else { - // Create admin user with a dummy PIN (admin authenticates via password) - let dummy_hash = bcrypt::hash("0000", 4) + // Admin authenticates via password, but the schema still requires a PIN + // hash. Generate a random unguessable PIN so the recovery path remains + // unusable as an escalation route even if the role flag ever got cleared. + let dummy_pin: String = (0..32) + .map(|_| rand::rng().random_range(b'a'..=b'z') as char) + .collect(); + let dummy_hash = bcrypt::hash(&dummy_pin, 4) .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?; let user = User::create(&state.pool, event.id, admin_name, &dummy_hash).await?; sqlx::query("UPDATE \"user\" SET role = 'admin' WHERE id = $1") @@ -246,6 +310,8 @@ pub async fn admin_login( .ok_or_else(|| AppError::Internal(anyhow::anyhow!("admin user creation failed")))? }; + tracing::info!(user_id = %admin_user.id, event_id = %event.id, ip = %ip, "admin_login: success"); + let token = jwt::create_token( admin_user.id, event.id, diff --git a/backend/src/auth/middleware.rs b/backend/src/auth/middleware.rs index e85d2a0..1511141 100644 --- a/backend/src/auth/middleware.rs +++ b/backend/src/auth/middleware.rs @@ -43,11 +43,15 @@ impl FromRequestParts for AuthUser { .map_err(|e| AppError::Internal(e.into()))? .ok_or_else(|| AppError::Unauthorized("Sitzung nicht gefunden oder abgelaufen.".into()))?; - // Update last_seen_at in the background (fire-and-forget) + // Update last_seen_at in the background (fire-and-forget). Failures are + // non-fatal but worth surfacing — silent swallowing hides DB connection + // pressure that would otherwise be the first symptom of a real problem. let pool = state.pool.clone(); let session_id = session.id; tokio::spawn(async move { - let _ = Session::touch(&pool, session_id).await; + if let Err(e) = Session::touch(&pool, session_id).await { + tracing::warn!(error = ?e, session_id = %session_id, "session touch failed"); + } }); Ok(Self { diff --git a/backend/src/config.rs b/backend/src/config.rs index 6078caf..88363fc 100644 --- a/backend/src/config.rs +++ b/backend/src/config.rs @@ -1,6 +1,10 @@ use std::path::PathBuf; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; + +/// Well-known dev JWT secret shipped in `.env.example`. If APP_ENV=production +/// we refuse to start with this value; otherwise we warn loudly. +const DEV_JWT_SECRET_SENTINEL: &str = "dev_secret_do_not_use_in_production_32byteslong_aaaa"; #[derive(Clone, Debug)] pub struct AppConfig { @@ -16,11 +20,29 @@ pub struct AppConfig { impl AppConfig { pub fn from_env() -> Result { + let app_env = + std::env::var("APP_ENV").unwrap_or_else(|_| "development".to_string()); + let is_prod = app_env.eq_ignore_ascii_case("production"); + + let jwt_secret = std::env::var("JWT_SECRET").context("JWT_SECRET must be set")?; + if jwt_secret == DEV_JWT_SECRET_SENTINEL { + if is_prod { + return Err(anyhow!( + "Refusing to start in production with the well-known dev JWT_SECRET — \ + rotate it (openssl rand -hex 64)." + )); + } + tracing::warn!( + "JWT_SECRET is the dev sentinel — fine for local development, NEVER ship this." + ); + } else if jwt_secret.len() < 32 { + return Err(anyhow!("JWT_SECRET must be at least 32 characters.")); + } + Ok(Self { database_url: std::env::var("DATABASE_URL") .context("DATABASE_URL must be set")?, - jwt_secret: std::env::var("JWT_SECRET") - .context("JWT_SECRET must be set")?, + jwt_secret, session_expiry_days: std::env::var("SESSION_EXPIRY_DAYS") .unwrap_or_else(|_| "30".to_string()) .parse() diff --git a/backend/src/db.rs b/backend/src/db.rs index 19f28e1..736b663 100644 --- a/backend/src/db.rs +++ b/backend/src/db.rs @@ -2,9 +2,16 @@ use anyhow::{Context, Result}; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; +const DEFAULT_MAX_CONNECTIONS: u32 = 10; + pub async fn create_pool(database_url: &str) -> Result { + let max_connections = std::env::var("DATABASE_MAX_CONNECTIONS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(DEFAULT_MAX_CONNECTIONS); + let pool = PgPoolOptions::new() - .max_connections(10) + .max_connections(max_connections) .connect(database_url) .await .context("failed to connect to database")?; @@ -14,6 +21,6 @@ pub async fn create_pool(database_url: &str) -> Result { .await .context("failed to run database migrations")?; - tracing::info!("database connected and migrations applied"); + tracing::info!(max_connections, "database connected and migrations applied"); Ok(pool) } diff --git a/backend/src/handlers/host.rs b/backend/src/handlers/host.rs index ae52f86..f9ea788 100644 --- a/backend/src/handlers/host.rs +++ b/backend/src/handlers/host.rs @@ -120,18 +120,38 @@ pub async fn ban_user( .execute(&state.pool) .await?; + tracing::info!( + actor_user_id = %auth.user_id, + target_user_id = %user_id, + event_id = %auth.event_id, + hide_uploads = body.hide_uploads, + "host: ban_user" + ); + Ok(StatusCode::NO_CONTENT) } pub async fn unban_user( State(state): State, - RequireHost(_auth): RequireHost, + RequireHost(auth): RequireHost, Path(user_id): Path, ) -> Result { - sqlx::query("UPDATE \"user\" SET is_banned = FALSE WHERE id = $1") - .bind(user_id) - .execute(&state.pool) - .await?; + let result = sqlx::query( + "UPDATE \"user\" SET is_banned = FALSE WHERE id = $1 AND event_id = $2", + ) + .bind(user_id) + .bind(auth.event_id) + .execute(&state.pool) + .await?; + if result.rows_affected() == 0 { + return Err(AppError::NotFound("Benutzer nicht gefunden.".into())); + } + tracing::info!( + actor_user_id = %auth.user_id, + target_user_id = %user_id, + event_id = %auth.event_id, + "host: unban_user" + ); Ok(StatusCode::NO_CONTENT) } @@ -181,6 +201,14 @@ pub async fn set_role( .bind(auth.event_id) .execute(&state.pool) .await?; + tracing::info!( + actor_user_id = %auth.user_id, + target_user_id = %user_id, + event_id = %auth.event_id, + old_role = %target.0, + new_role, + "host: set_role" + ); Ok(StatusCode::NO_CONTENT) } @@ -251,38 +279,61 @@ pub async fn reset_user_pin( serde_json::json!({ "user_id": user_id }).to_string(), )); + tracing::info!( + actor_user_id = %auth.user_id, + target_user_id = %user_id, + event_id = %auth.event_id, + "host: reset_user_pin" + ); + Ok(Json(PinResetResponse { pin })) } pub async fn host_delete_upload( State(state): State, - RequireHost(_auth): RequireHost, + RequireHost(auth): RequireHost, Path(upload_id): Path, ) -> Result { - let upload = Upload::find_by_id(&state.pool, upload_id) + let upload = Upload::find_by_id_and_event(&state.pool, upload_id, auth.event_id) .await? .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; - Upload::soft_delete(&state.pool, upload_id).await?; + let deleted = Upload::soft_delete_in_event(&state.pool, upload_id, auth.event_id).await?; + if !deleted { + return Err(AppError::NotFound("Upload nicht gefunden.".into())); + } let _ = state.sse_tx.send(SseEvent::new( "upload-deleted", serde_json::json!({ "upload_id": upload.id }).to_string(), )); + tracing::info!( + actor_user_id = %auth.user_id, + event_id = %auth.event_id, + upload_id = %upload.id, + "host: host_delete_upload" + ); + Ok(StatusCode::NO_CONTENT) } pub async fn host_delete_comment( State(state): State, - RequireHost(_auth): RequireHost, + RequireHost(auth): RequireHost, Path(comment_id): Path, ) -> Result { - Comment::find_by_id(&state.pool, comment_id) - .await? - .ok_or_else(|| AppError::NotFound("Kommentar nicht gefunden.".into()))?; - - Comment::soft_delete(&state.pool, comment_id).await?; + let deleted = + Comment::soft_delete_in_event(&state.pool, comment_id, auth.event_id).await?; + if !deleted { + return Err(AppError::NotFound("Kommentar nicht gefunden.".into())); + } + tracing::info!( + actor_user_id = %auth.user_id, + event_id = %auth.event_id, + comment_id = %comment_id, + "host: host_delete_comment" + ); Ok(StatusCode::NO_CONTENT) } diff --git a/backend/src/handlers/social.rs b/backend/src/handlers/social.rs index 520af4b..4c96e7b 100644 --- a/backend/src/handlers/social.rs +++ b/backend/src/handlers/social.rs @@ -1,6 +1,7 @@ -use axum::extract::{Path, State}; +use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use axum::Json; +use chrono::{DateTime, Utc}; use serde::Deserialize; use uuid::Uuid; @@ -51,12 +52,24 @@ pub async fn toggle_like( Ok(StatusCode::NO_CONTENT) } +#[derive(Deserialize, Default)] +pub struct ListCommentsQuery { + /// RFC3339 timestamp — return only comments older than this. Pass the + /// `created_at` of the oldest currently-loaded comment to fetch the next + /// older page. + pub before: Option>, +} + +const COMMENT_PAGE_SIZE: i64 = 50; + pub async fn list_comments( State(state): State, _auth: AuthUser, Path(upload_id): Path, + Query(q): Query, ) -> Result>, AppError> { - let comments = Comment::list_for_upload(&state.pool, upload_id).await?; + let comments = + Comment::list_for_upload(&state.pool, upload_id, q.before, COMMENT_PAGE_SIZE).await?; Ok(Json(comments)) } @@ -79,7 +92,8 @@ pub async fn add_comment( } let text = body.body.trim(); - if text.is_empty() || text.len() > 500 { + let text_chars = text.chars().count(); + if text_chars == 0 || text_chars > 500 { return Err(AppError::BadRequest( "Kommentar muss zwischen 1 und 500 Zeichen lang sein.".into(), )); diff --git a/backend/src/handlers/sse.rs b/backend/src/handlers/sse.rs index f5626ca..d4a0971 100644 --- a/backend/src/handlers/sse.rs +++ b/backend/src/handlers/sse.rs @@ -3,32 +3,51 @@ use std::time::Duration; use axum::extract::{Query, State}; use axum::response::sse::{Event, KeepAlive, Sse}; +use axum::Json; use futures::stream::Stream; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; -use crate::auth::jwt; +use crate::auth::middleware::AuthUser; use crate::error::AppError; use crate::models::session::Session; use crate::state::AppState; #[derive(Deserialize)] pub struct SseQuery { - pub token: String, + pub ticket: String, } -/// SSE stream endpoint. Accepts JWT via query param since EventSource -/// doesn't support custom headers. +#[derive(Serialize)] +pub struct StreamTicketResponse { + pub ticket: String, +} + +/// Mint a short-lived single-use SSE ticket. The browser's `EventSource` cannot +/// send an `Authorization` header, so the alternative used to be passing the JWT +/// as `?token=...` — which leaks the bearer token into access logs, referer +/// headers, and browser history. The client now exchanges its Bearer token for +/// an opaque ticket via this endpoint and passes that on the stream open. +pub async fn issue_ticket( + State(state): State, + auth: AuthUser, +) -> Json { + let ticket = state.sse_tickets.issue(auth.token_hash); + Json(StreamTicketResponse { ticket }) +} + +/// SSE stream endpoint. Authenticates via a single-use ticket (see +/// [`issue_ticket`]) — never the raw JWT. pub async fn stream( State(state): State, Query(q): Query, ) -> Result>>, AppError> { - // Verify token - let _claims = jwt::verify_token(&q.token, &state.config.jwt_secret) - .map_err(|_| AppError::Unauthorized("Token ungültig.".into()))?; + let token_hash = state + .sse_tickets + .consume(&q.ticket) + .ok_or_else(|| AppError::Unauthorized("Ticket ungültig oder abgelaufen.".into()))?; - let token_hash = jwt::hash_token(&q.token); Session::find_by_token_hash(&state.pool, &token_hash) .await .map_err(|e| AppError::Internal(e.into()))? diff --git a/backend/src/handlers/upload.rs b/backend/src/handlers/upload.rs index 0191354..a0fed5d 100644 --- a/backend/src/handlers/upload.rs +++ b/backend/src/handlers/upload.rs @@ -99,9 +99,11 @@ pub async fn upload( let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); let size = data.len() as i64; - // Validate caption length + // Validate caption length. Counted in chars (code points) to match the + // "Zeichen" wording in the error message — `.len()` would be bytes and + // reject perfectly valid German/emoji captions early. if let Some(ref cap) = caption { - if cap.len() > MAX_CAPTION_LENGTH { + if cap.chars().count() > MAX_CAPTION_LENGTH { return Err(AppError::BadRequest(format!( "Beschreibung ist zu lang. Maximum: {} Zeichen.", MAX_CAPTION_LENGTH diff --git a/backend/src/main.rs b/backend/src/main.rs index 6053027..5808c28 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -42,7 +42,11 @@ async fn main() -> Result<()> { // Hourly background hygiene: prune expired sessions, evict cold rate-limiter // keys. Keeps the DB and process from growing unboundedly over multi-day events. - services::maintenance::spawn_periodic_tasks(pool, state.rate_limiter.clone()); + services::maintenance::spawn_periodic_tasks( + pool, + state.rate_limiter.clone(), + state.sse_tickets.clone(), + ); // Ensure media directories exist tokio::fs::create_dir_all(&config.media_path).await.ok(); @@ -80,6 +84,7 @@ async fn main() -> Result<()> { .route("/api/v1/comment/{id}", delete(handlers::social::delete_comment)) // SSE .route("/api/v1/stream", get(handlers::sse::stream)) + .route("/api/v1/stream/ticket", post(handlers::sse::issue_ticket)) // Host Dashboard .route("/api/v1/host/event", get(handlers::host::get_event_status)) .route("/api/v1/host/event/close", post(handlers::host::close_event)) diff --git a/backend/src/models/comment.rs b/backend/src/models/comment.rs index 1ef06ec..ec08e26 100644 --- a/backend/src/models/comment.rs +++ b/backend/src/models/comment.rs @@ -40,15 +40,35 @@ impl Comment { .await } - pub async fn list_for_upload(pool: &PgPool, upload_id: Uuid) -> Result, sqlx::Error> { + /// Paginated comment listing — returns up to `limit` rows in chronological + /// order (oldest first). If `before` is set, only comments older than that + /// timestamp are returned, enabling backward cursor pagination ("load + /// earlier"). Without the LIMIT a hot post with thousands of comments could + /// OOM the server on a single GET. + pub async fn list_for_upload( + pool: &PgPool, + upload_id: Uuid, + before: Option>, + limit: i64, + ) -> Result, sqlx::Error> { + // Two-step: pick the newest `limit` rows older than `before`, then flip + // them back into ascending order so the caller can render top-to-bottom. sqlx::query_as::<_, CommentDto>( - "SELECT c.id, c.upload_id, c.user_id, u.display_name AS uploader_name, c.body, c.created_at - FROM comment c - JOIN \"user\" u ON u.id = c.user_id - WHERE c.upload_id = $1 AND c.deleted_at IS NULL - ORDER BY c.created_at ASC", + "SELECT * FROM ( + SELECT c.id, c.upload_id, c.user_id, u.display_name AS uploader_name, + c.body, c.created_at + FROM comment c + JOIN \"user\" u ON u.id = c.user_id + WHERE c.upload_id = $1 AND c.deleted_at IS NULL + AND ($2::timestamptz IS NULL OR c.created_at < $2) + ORDER BY c.created_at DESC + LIMIT $3 + ) page + ORDER BY created_at ASC", ) .bind(upload_id) + .bind(before) + .bind(limit) .fetch_all(pool) .await } @@ -69,4 +89,25 @@ impl Comment { .await?; Ok(()) } + + /// Event-scoped variant of [`Self::soft_delete`]. Returns `false` if the + /// comment doesn't exist or belongs to a different event. + pub async fn soft_delete_in_event( + pool: &PgPool, + id: Uuid, + event_id: Uuid, + ) -> Result { + let result = sqlx::query( + "UPDATE comment + SET deleted_at = NOW() + WHERE id = $1 + AND deleted_at IS NULL + AND upload_id IN (SELECT id FROM upload WHERE event_id = $2)", + ) + .bind(id) + .bind(event_id) + .execute(pool) + .await?; + Ok(result.rows_affected() > 0) + } } diff --git a/backend/src/models/hashtag.rs b/backend/src/models/hashtag.rs index 8eaecdf..921c555 100644 --- a/backend/src/models/hashtag.rs +++ b/backend/src/models/hashtag.rs @@ -67,11 +67,46 @@ impl Hashtag { } } -/// Extract #hashtags from text (caption or body). +/// Extract `#hashtags` from text (caption or body). Tags are restricted to +/// ASCII letters, digits, and underscore — emoji / punctuation / accented +/// characters are rejected. This is deliberately strict: hashtags are an index +/// (used for filtering and SQL JOINs), and tags like `#🎉` or `#!?` accumulate +/// noise without helping anyone find content. pub fn extract_hashtags(text: &str) -> Vec { + const MAX_TAG_LEN: usize = 40; text.split_whitespace() - .filter(|w| w.starts_with('#') && w.len() > 1) - .map(|w| w.trim_start_matches('#').to_lowercase()) - .filter(|t| !t.is_empty()) + .filter_map(|w| w.strip_prefix('#')) + .map(|t| { + t.chars() + .take_while(|c| c.is_ascii_alphanumeric() || *c == '_') + .collect::() + .to_lowercase() + }) + .filter(|t| !t.is_empty() && t.chars().count() <= MAX_TAG_LEN) .collect() } + +#[cfg(test)] +mod tests { + use super::extract_hashtags; + + #[test] + fn ascii_word_chars_extracted() { + assert_eq!( + extract_hashtags("hello #wedding #Day_2 #cake!"), + vec!["wedding", "day_2", "cake"] + ); + } + + #[test] + fn emojis_and_punctuation_excluded() { + // The 🎉 tag drops out entirely (no ASCII chars after #), the next tag + // stops at the !? and yields only "fun". + assert_eq!(extract_hashtags("#🎉 #!? #fun!"), vec!["fun"]); + } + + #[test] + fn empty_or_bare_hash_skipped() { + assert_eq!(extract_hashtags("# #"), Vec::::new()); + } +} diff --git a/backend/src/models/upload.rs b/backend/src/models/upload.rs index 54a83ae..1b085e1 100644 --- a/backend/src/models/upload.rs +++ b/backend/src/models/upload.rs @@ -69,6 +69,23 @@ impl Upload { .await } + /// Event-scoped lookup used by host endpoints so a host of event A cannot + /// reach uploads belonging to event B. + pub async fn find_by_id_and_event( + pool: &PgPool, + id: Uuid, + event_id: Uuid, + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, Self>( + "SELECT * FROM upload + WHERE id = $1 AND event_id = $2 AND deleted_at IS NULL", + ) + .bind(id) + .bind(event_id) + .fetch_optional(pool) + .await + } + pub async fn set_preview_path( pool: &PgPool, id: Uuid, @@ -128,6 +145,43 @@ impl Upload { Ok(()) } + /// Event-scoped variant of [`Self::soft_delete`]. Returns `false` if no row + /// matched (already deleted, wrong event, or unknown id) so host handlers + /// can return a clean 404 instead of silently no-op'ing. + pub async fn soft_delete_in_event( + pool: &PgPool, + id: Uuid, + event_id: Uuid, + ) -> Result { + let mut tx = pool.begin().await?; + let row: Option<(Uuid, i64)> = sqlx::query_as( + "UPDATE upload + SET deleted_at = NOW() + WHERE id = $1 AND event_id = $2 AND deleted_at IS NULL + RETURNING user_id, original_size_bytes", + ) + .bind(id) + .bind(event_id) + .fetch_optional(&mut *tx) + .await?; + let deleted = if let Some((user_id, bytes)) = row { + sqlx::query( + "UPDATE \"user\" + SET total_upload_bytes = GREATEST(0, total_upload_bytes - $2) + WHERE id = $1", + ) + .bind(user_id) + .bind(bytes) + .execute(&mut *tx) + .await?; + true + } else { + false + }; + tx.commit().await?; + Ok(deleted) + } + pub async fn update_caption( pool: &PgPool, id: Uuid, diff --git a/backend/src/services/maintenance.rs b/backend/src/services/maintenance.rs index 75e9d41..9264de9 100644 --- a/backend/src/services/maintenance.rs +++ b/backend/src/services/maintenance.rs @@ -18,6 +18,7 @@ use std::time::Duration; use sqlx::PgPool; use crate::services::rate_limiter::RateLimiter; +use crate::services::sse_tickets::SseTicketStore; /// Reset rows left in flight by a previous crashed instance. Run once on startup, /// before the HTTP server starts taking requests, so users never observe the @@ -68,9 +69,14 @@ pub async fn startup_recovery(pool: &PgPool) { /// Spawns a background task that periodically: /// - deletes session rows whose `expires_at` is more than a day in the past /// - prunes the in-memory rate-limiter HashMap of empty windows +/// - drops expired SSE tickets (30s TTL but the map keeps the slot until pruned) /// /// Cadence is 1h — fine for both jobs at our scale. -pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) { +pub fn spawn_periodic_tasks( + pool: PgPool, + rate_limiter: RateLimiter, + sse_tickets: SseTicketStore, +) { tokio::spawn(async move { let mut tick = tokio::time::interval(Duration::from_secs(3600)); // Fire the first tick immediately, then hourly. @@ -79,6 +85,7 @@ pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) { tick.tick().await; cleanup_sessions(&pool).await; rate_limiter.prune(); + sse_tickets.prune(); } }); } diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index fee99a6..66f838a 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -4,3 +4,4 @@ pub mod export; pub mod jobs; pub mod maintenance; pub mod rate_limiter; +pub mod sse_tickets; diff --git a/backend/src/services/sse_tickets.rs b/backend/src/services/sse_tickets.rs new file mode 100644 index 0000000..4de7b3e --- /dev/null +++ b/backend/src/services/sse_tickets.rs @@ -0,0 +1,74 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use rand::Rng; + +/// Short-lived single-use tickets that let `EventSource` clients open the SSE +/// stream without putting the JWT in the URL (where it would leak via access +/// logs / referer / browser history). +/// +/// Flow: client `POST /api/v1/stream/ticket` with `Authorization: Bearer `, +/// server returns an opaque ticket, client passes it as `?ticket=...` on the +/// stream open. Tickets are consumed on use and expire after `TTL`. +const TTL: Duration = Duration::from_secs(30); + +#[derive(Clone)] +pub struct SseTicketStore { + inner: Arc>>, +} + +#[derive(Clone)] +struct Entry { + token_hash: String, + issued_at: Instant, +} + +impl SseTicketStore { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Mint a new ticket bound to the caller's session (identified by token hash). + pub fn issue(&self, token_hash: String) -> String { + let ticket = random_ticket(); + let mut map = self.inner.lock().unwrap(); + map.insert( + ticket.clone(), + Entry { + token_hash, + issued_at: Instant::now(), + }, + ); + ticket + } + + /// Consume a ticket. Returns `Some(token_hash)` if the ticket exists and is + /// not expired. Single-use: the ticket is removed regardless of whether it + /// was still fresh, so a replay can't slip through after expiry. + pub fn consume(&self, ticket: &str) -> Option { + let mut map = self.inner.lock().unwrap(); + let entry = map.remove(ticket)?; + if entry.issued_at.elapsed() > TTL { + return None; + } + Some(entry.token_hash) + } + + /// Drop expired entries — called from the background maintenance task so a + /// long-running process doesn't accumulate stale tickets. + pub fn prune(&self) { + let mut map = self.inner.lock().unwrap(); + map.retain(|_, e| e.issued_at.elapsed() <= TTL); + } +} + +fn random_ticket() -> String { + // 192 bits of randomness, base32-ish hex. Plenty of entropy and URL-safe. + let mut rng = rand::rng(); + let mut bytes = [0u8; 24]; + rng.fill(&mut bytes); + bytes.iter().map(|b| format!("{b:02x}")).collect() +} diff --git a/backend/src/state.rs b/backend/src/state.rs index d9e52fe..d4e3e05 100644 --- a/backend/src/state.rs +++ b/backend/src/state.rs @@ -4,6 +4,7 @@ use tokio::sync::broadcast; use crate::config::AppConfig; use crate::services::compression::CompressionWorker; use crate::services::rate_limiter::RateLimiter; +use crate::services::sse_tickets::SseTicketStore; #[derive(Clone, Debug)] pub struct SseEvent { @@ -29,6 +30,7 @@ pub struct AppState { pub sse_tx: broadcast::Sender, pub compression: CompressionWorker, pub rate_limiter: RateLimiter, + pub sse_tickets: SseTicketStore, } impl AppState { @@ -42,6 +44,7 @@ impl AppState { sse_tx, compression, rate_limiter: RateLimiter::new(), + sse_tickets: SseTicketStore::new(), } } } diff --git a/frontend/src/lib/sse.ts b/frontend/src/lib/sse.ts index 713954c..cf95cc9 100644 --- a/frontend/src/lib/sse.ts +++ b/frontend/src/lib/sse.ts @@ -15,6 +15,8 @@ import { getToken } from './auth'; import { api } from './api'; import type { DeltaResponse } from './types'; +type StreamTicketResponse = { ticket: string }; + type EventHandler = (data: string) => void; let eventSource: EventSource | null = null; @@ -69,38 +71,59 @@ export function connectSse(): void { const token = getToken(); if (!token || eventSource) return; - // EventSource doesn't support custom headers, so pass token as query param. - eventSource = new EventSource(`/api/v1/stream?token=${encodeURIComponent(token)}`); - - eventSource.onopen = () => { - // Successful connection — reset the backoff counter. - reconnectAttempt = 0; - // If we have a previous timestamp this is a reconnect — fetch the gap. - const since = lastEventTime; - if (since) { - void deltaFetchAndFan(since); + // EventSource can't send an Authorization header, so we exchange the JWT for + // a short-lived single-use ticket via POST /stream/ticket (Bearer auth) and + // pass that on the URL. The JWT itself never appears in URLs / access logs. + void (async () => { + let ticket: string; + try { + const res = await api.post('/stream/ticket', {}); + ticket = res.ticket; + } catch { + // Failed to mint a ticket (auth lapse, network blip). Back off and retry + // via the existing error path. + scheduleReconnect(); + return; } - lastEventTime = new Date().toISOString(); - }; + // Auth flow may have torn things down while we were awaiting the ticket. + if (!getToken() || eventSource) return; - for (const eventName of KNOWN_EVENTS) { - eventSource.addEventListener(eventName, (e) => - dispatch(eventName, (e as MessageEvent).data) - ); - } + eventSource = new EventSource(`/api/v1/stream?ticket=${encodeURIComponent(ticket)}`); - eventSource.onerror = () => { - // EventSource auto-reconnects but the connection state can stay broken; close - // and try again ourselves with exponential backoff capped at 60s. Prevents - // retry storms (and lets the backend recover quietly) when the server is down - // for a while or when 100+ guests reconnect simultaneously after an outage. - disconnectSse(); - reconnectAttempt++; - const delay = Math.min(60_000, 1_000 * 2 ** (reconnectAttempt - 1)); - const jitter = Math.random() * 500; - if (reconnectTimer) clearTimeout(reconnectTimer); - reconnectTimer = setTimeout(connectSse, delay + jitter); - }; + eventSource.onopen = () => { + // Successful connection — reset the backoff counter. + reconnectAttempt = 0; + // If we have a previous timestamp this is a reconnect — fetch the gap. + const since = lastEventTime; + if (since) { + void deltaFetchAndFan(since); + } + lastEventTime = new Date().toISOString(); + }; + + for (const eventName of KNOWN_EVENTS) { + eventSource.addEventListener(eventName, (e) => + dispatch(eventName, (e as MessageEvent).data) + ); + } + + eventSource.onerror = () => { + // EventSource auto-reconnects but the connection state can stay broken; close + // and try again ourselves with exponential backoff capped at 60s. Prevents + // retry storms (and lets the backend recover quietly) when the server is down + // for a while or when 100+ guests reconnect simultaneously after an outage. + disconnectSse(); + scheduleReconnect(); + }; + })(); +} + +function scheduleReconnect(): void { + reconnectAttempt++; + const delay = Math.min(60_000, 1_000 * 2 ** (reconnectAttempt - 1)); + const jitter = Math.random() * 500; + if (reconnectTimer) clearTimeout(reconnectTimer); + reconnectTimer = setTimeout(connectSse, delay + jitter); } export function disconnectSse(): void {