Merge fix/security-review-followups: security review batch fixes
Some checks failed
E2E / Playwright E2E (chromium-desktop) (push) Has been cancelled
E2E / Cross-UA smoke matrix (push) Has been cancelled

This commit is contained in:
MechaCat02
2026-05-17 21:00:55 +02:00
17 changed files with 507 additions and 79 deletions

View File

@@ -49,7 +49,8 @@ pub async fn join(
} }
let display_name = body.display_name.trim(); let display_name = body.display_name.trim();
if display_name.is_empty() || display_name.len() > 50 { let name_chars = display_name.chars().count();
if name_chars == 0 || name_chars > 50 {
return Err(AppError::BadRequest( return Err(AppError::BadRequest(
"Name muss zwischen 1 und 50 Zeichen lang sein.".into(), "Name muss zwischen 1 und 50 Zeichen lang sein.".into(),
)); ));
@@ -122,10 +123,33 @@ pub struct RecoverResponse {
pub async fn recover( pub async fn recover(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap,
Json(body): Json<RecoverRequest>, Json(body): Json<RecoverRequest>,
) -> Result<Json<RecoverResponse>, AppError> { ) -> Result<Json<RecoverResponse>, AppError> {
let display_name = body.display_name.trim(); let display_name = body.display_name.trim();
// Per-IP+name throttle BEFORE the per-user 3-strike counter. Without this
// an attacker who knows a display name (they're visible on the feed) can
// burn through 3 wrong PINs and lock the victim for 15 minutes — repeated
// every 15 minutes, indefinitely. 5 attempts per 15 minutes per (IP, name)
// softens that into a real cost.
let ip = client_ip(&headers, "unknown");
let rate_limits_on = config::get_bool(&state.pool, "rate_limits_enabled", true).await;
let recover_rate_on = config::get_bool(&state.pool, "recover_rate_enabled", true).await;
if rate_limits_on && recover_rate_on {
let name_key = display_name.to_lowercase();
if !state.rate_limiter.check(
format!("recover:{ip}:{name_key}"),
5,
Duration::from_secs(15 * 60),
) {
return Err(AppError::TooManyRequests(
"Zu viele Versuche. Bitte warte kurz und versuche es erneut.".into(),
None,
));
}
}
let event = Event::find_by_slug(&state.pool, &state.config.event_slug) let event = Event::find_by_slug(&state.pool, &state.config.event_slug)
.await? .await?
.ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?; .ok_or_else(|| AppError::NotFound("Event nicht gefunden.".into()))?;
@@ -184,9 +208,22 @@ pub async fn recover(
// Wrong PIN — increment failure count // Wrong PIN — increment failure count
let attempts = User::increment_failed_pin(&state.pool, user.id).await?; let attempts = User::increment_failed_pin(&state.pool, user.id).await?;
tracing::warn!(
user_id = %user.id,
event_id = %event.id,
ip = %ip,
attempts,
"recover: wrong PIN"
);
if attempts >= 3 { if attempts >= 3 {
let lockout = Utc::now() + chrono::Duration::minutes(15); let lockout = Utc::now() + chrono::Duration::minutes(15);
User::lock_pin(&state.pool, user.id, lockout).await?; User::lock_pin(&state.pool, user.id, lockout).await?;
tracing::warn!(
user_id = %user.id,
event_id = %event.id,
ip = %ip,
"recover: account locked for 15 minutes"
);
} }
} }
@@ -205,6 +242,7 @@ pub struct AdminLoginResponse {
pub async fn admin_login( pub async fn admin_login(
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap,
Json(body): Json<AdminLoginRequest>, Json(body): Json<AdminLoginRequest>,
) -> Result<Json<AdminLoginResponse>, AppError> { ) -> Result<Json<AdminLoginResponse>, AppError> {
if state.config.admin_password_hash.is_empty() { if state.config.admin_password_hash.is_empty() {
@@ -213,10 +251,31 @@ pub async fn admin_login(
)); ));
} }
// Throttle password attempts. The admin password is bcrypt-hashed (slow to
// verify) but with no IP-level limit a determined attacker can still mount
// a long-running guess campaign. 5 attempts / minute / IP is plenty for
// honest typos.
let ip = client_ip(&headers, "unknown");
let rate_limits_on = config::get_bool(&state.pool, "rate_limits_enabled", true).await;
let admin_rate_on = config::get_bool(&state.pool, "admin_login_rate_enabled", true).await;
if rate_limits_on && admin_rate_on
&& !state.rate_limiter.check(
format!("admin_login:{ip}"),
5,
Duration::from_secs(60),
)
{
return Err(AppError::TooManyRequests(
"Zu viele Anmeldeversuche. Bitte warte kurz und versuche es erneut.".into(),
None,
));
}
let valid = bcrypt::verify(&body.password, &state.config.admin_password_hash) let valid = bcrypt::verify(&body.password, &state.config.admin_password_hash)
.unwrap_or(false); .unwrap_or(false);
if !valid { if !valid {
tracing::warn!(ip = %ip, "admin_login: wrong password");
return Err(AppError::Unauthorized("Falsches Passwort.".into())); return Err(AppError::Unauthorized("Falsches Passwort.".into()));
} }
@@ -233,8 +292,13 @@ pub async fn admin_login(
let admin_user = if let Some(u) = users.into_iter().find(|u| u.role == UserRole::Admin) { let admin_user = if let Some(u) = users.into_iter().find(|u| u.role == UserRole::Admin) {
u u
} else { } else {
// Create admin user with a dummy PIN (admin authenticates via password) // Admin authenticates via password, but the schema still requires a PIN
let dummy_hash = bcrypt::hash("0000", 4) // hash. Generate a random unguessable PIN so the recovery path remains
// unusable as an escalation route even if the role flag ever got cleared.
let dummy_pin: String = (0..32)
.map(|_| rand::rng().random_range(b'a'..=b'z') as char)
.collect();
let dummy_hash = bcrypt::hash(&dummy_pin, 4)
.map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?; .map_err(|e| AppError::Internal(anyhow::anyhow!(e)))?;
let user = User::create(&state.pool, event.id, admin_name, &dummy_hash).await?; let user = User::create(&state.pool, event.id, admin_name, &dummy_hash).await?;
sqlx::query("UPDATE \"user\" SET role = 'admin' WHERE id = $1") sqlx::query("UPDATE \"user\" SET role = 'admin' WHERE id = $1")
@@ -246,6 +310,8 @@ pub async fn admin_login(
.ok_or_else(|| AppError::Internal(anyhow::anyhow!("admin user creation failed")))? .ok_or_else(|| AppError::Internal(anyhow::anyhow!("admin user creation failed")))?
}; };
tracing::info!(user_id = %admin_user.id, event_id = %event.id, ip = %ip, "admin_login: success");
let token = jwt::create_token( let token = jwt::create_token(
admin_user.id, admin_user.id,
event.id, event.id,

View File

@@ -43,11 +43,15 @@ impl FromRequestParts<AppState> for AuthUser {
.map_err(|e| AppError::Internal(e.into()))? .map_err(|e| AppError::Internal(e.into()))?
.ok_or_else(|| AppError::Unauthorized("Sitzung nicht gefunden oder abgelaufen.".into()))?; .ok_or_else(|| AppError::Unauthorized("Sitzung nicht gefunden oder abgelaufen.".into()))?;
// Update last_seen_at in the background (fire-and-forget) // Update last_seen_at in the background (fire-and-forget). Failures are
// non-fatal but worth surfacing — silent swallowing hides DB connection
// pressure that would otherwise be the first symptom of a real problem.
let pool = state.pool.clone(); let pool = state.pool.clone();
let session_id = session.id; let session_id = session.id;
tokio::spawn(async move { tokio::spawn(async move {
let _ = Session::touch(&pool, session_id).await; if let Err(e) = Session::touch(&pool, session_id).await {
tracing::warn!(error = ?e, session_id = %session_id, "session touch failed");
}
}); });
Ok(Self { Ok(Self {

View File

@@ -1,6 +1,10 @@
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::{Context, Result}; use anyhow::{anyhow, Context, Result};
/// Well-known dev JWT secret shipped in `.env.example`. If APP_ENV=production
/// we refuse to start with this value; otherwise we warn loudly.
const DEV_JWT_SECRET_SENTINEL: &str = "dev_secret_do_not_use_in_production_32byteslong_aaaa";
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AppConfig { pub struct AppConfig {
@@ -16,11 +20,29 @@ pub struct AppConfig {
impl AppConfig { impl AppConfig {
pub fn from_env() -> Result<Self> { pub fn from_env() -> Result<Self> {
let app_env =
std::env::var("APP_ENV").unwrap_or_else(|_| "development".to_string());
let is_prod = app_env.eq_ignore_ascii_case("production");
let jwt_secret = std::env::var("JWT_SECRET").context("JWT_SECRET must be set")?;
if jwt_secret == DEV_JWT_SECRET_SENTINEL {
if is_prod {
return Err(anyhow!(
"Refusing to start in production with the well-known dev JWT_SECRET — \
rotate it (openssl rand -hex 64)."
));
}
tracing::warn!(
"JWT_SECRET is the dev sentinel — fine for local development, NEVER ship this."
);
} else if jwt_secret.len() < 32 {
return Err(anyhow!("JWT_SECRET must be at least 32 characters."));
}
Ok(Self { Ok(Self {
database_url: std::env::var("DATABASE_URL") database_url: std::env::var("DATABASE_URL")
.context("DATABASE_URL must be set")?, .context("DATABASE_URL must be set")?,
jwt_secret: std::env::var("JWT_SECRET") jwt_secret,
.context("JWT_SECRET must be set")?,
session_expiry_days: std::env::var("SESSION_EXPIRY_DAYS") session_expiry_days: std::env::var("SESSION_EXPIRY_DAYS")
.unwrap_or_else(|_| "30".to_string()) .unwrap_or_else(|_| "30".to_string())
.parse() .parse()

View File

@@ -2,9 +2,16 @@ use anyhow::{Context, Result};
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool; use sqlx::PgPool;
const DEFAULT_MAX_CONNECTIONS: u32 = 10;
pub async fn create_pool(database_url: &str) -> Result<PgPool> { pub async fn create_pool(database_url: &str) -> Result<PgPool> {
let max_connections = std::env::var("DATABASE_MAX_CONNECTIONS")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(DEFAULT_MAX_CONNECTIONS);
let pool = PgPoolOptions::new() let pool = PgPoolOptions::new()
.max_connections(10) .max_connections(max_connections)
.connect(database_url) .connect(database_url)
.await .await
.context("failed to connect to database")?; .context("failed to connect to database")?;
@@ -14,6 +21,6 @@ pub async fn create_pool(database_url: &str) -> Result<PgPool> {
.await .await
.context("failed to run database migrations")?; .context("failed to run database migrations")?;
tracing::info!("database connected and migrations applied"); tracing::info!(max_connections, "database connected and migrations applied");
Ok(pool) Ok(pool)
} }

View File

@@ -120,18 +120,38 @@ pub async fn ban_user(
.execute(&state.pool) .execute(&state.pool)
.await?; .await?;
tracing::info!(
actor_user_id = %auth.user_id,
target_user_id = %user_id,
event_id = %auth.event_id,
hide_uploads = body.hide_uploads,
"host: ban_user"
);
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
pub async fn unban_user( pub async fn unban_user(
State(state): State<AppState>, State(state): State<AppState>,
RequireHost(_auth): RequireHost, RequireHost(auth): RequireHost,
Path(user_id): Path<Uuid>, Path(user_id): Path<Uuid>,
) -> Result<StatusCode, AppError> { ) -> Result<StatusCode, AppError> {
sqlx::query("UPDATE \"user\" SET is_banned = FALSE WHERE id = $1") let result = sqlx::query(
"UPDATE \"user\" SET is_banned = FALSE WHERE id = $1 AND event_id = $2",
)
.bind(user_id) .bind(user_id)
.bind(auth.event_id)
.execute(&state.pool) .execute(&state.pool)
.await?; .await?;
if result.rows_affected() == 0 {
return Err(AppError::NotFound("Benutzer nicht gefunden.".into()));
}
tracing::info!(
actor_user_id = %auth.user_id,
target_user_id = %user_id,
event_id = %auth.event_id,
"host: unban_user"
);
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
@@ -181,6 +201,14 @@ pub async fn set_role(
.bind(auth.event_id) .bind(auth.event_id)
.execute(&state.pool) .execute(&state.pool)
.await?; .await?;
tracing::info!(
actor_user_id = %auth.user_id,
target_user_id = %user_id,
event_id = %auth.event_id,
old_role = %target.0,
new_role,
"host: set_role"
);
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
@@ -251,38 +279,61 @@ pub async fn reset_user_pin(
serde_json::json!({ "user_id": user_id }).to_string(), serde_json::json!({ "user_id": user_id }).to_string(),
)); ));
tracing::info!(
actor_user_id = %auth.user_id,
target_user_id = %user_id,
event_id = %auth.event_id,
"host: reset_user_pin"
);
Ok(Json(PinResetResponse { pin })) Ok(Json(PinResetResponse { pin }))
} }
pub async fn host_delete_upload( pub async fn host_delete_upload(
State(state): State<AppState>, State(state): State<AppState>,
RequireHost(_auth): RequireHost, RequireHost(auth): RequireHost,
Path(upload_id): Path<Uuid>, Path(upload_id): Path<Uuid>,
) -> Result<StatusCode, AppError> { ) -> Result<StatusCode, AppError> {
let upload = Upload::find_by_id(&state.pool, upload_id) let upload = Upload::find_by_id_and_event(&state.pool, upload_id, auth.event_id)
.await? .await?
.ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?; .ok_or_else(|| AppError::NotFound("Upload nicht gefunden.".into()))?;
Upload::soft_delete(&state.pool, upload_id).await?; let deleted = Upload::soft_delete_in_event(&state.pool, upload_id, auth.event_id).await?;
if !deleted {
return Err(AppError::NotFound("Upload nicht gefunden.".into()));
}
let _ = state.sse_tx.send(SseEvent::new( let _ = state.sse_tx.send(SseEvent::new(
"upload-deleted", "upload-deleted",
serde_json::json!({ "upload_id": upload.id }).to_string(), serde_json::json!({ "upload_id": upload.id }).to_string(),
)); ));
tracing::info!(
actor_user_id = %auth.user_id,
event_id = %auth.event_id,
upload_id = %upload.id,
"host: host_delete_upload"
);
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
pub async fn host_delete_comment( pub async fn host_delete_comment(
State(state): State<AppState>, State(state): State<AppState>,
RequireHost(_auth): RequireHost, RequireHost(auth): RequireHost,
Path(comment_id): Path<Uuid>, Path(comment_id): Path<Uuid>,
) -> Result<StatusCode, AppError> { ) -> Result<StatusCode, AppError> {
Comment::find_by_id(&state.pool, comment_id) let deleted =
.await? Comment::soft_delete_in_event(&state.pool, comment_id, auth.event_id).await?;
.ok_or_else(|| AppError::NotFound("Kommentar nicht gefunden.".into()))?; if !deleted {
return Err(AppError::NotFound("Kommentar nicht gefunden.".into()));
Comment::soft_delete(&state.pool, comment_id).await?; }
tracing::info!(
actor_user_id = %auth.user_id,
event_id = %auth.event_id,
comment_id = %comment_id,
"host: host_delete_comment"
);
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }

View File

@@ -1,6 +1,7 @@
use axum::extract::{Path, State}; use axum::extract::{Path, Query, State};
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::Json; use axum::Json;
use chrono::{DateTime, Utc};
use serde::Deserialize; use serde::Deserialize;
use uuid::Uuid; use uuid::Uuid;
@@ -51,12 +52,24 @@ pub async fn toggle_like(
Ok(StatusCode::NO_CONTENT) Ok(StatusCode::NO_CONTENT)
} }
#[derive(Deserialize, Default)]
pub struct ListCommentsQuery {
/// RFC3339 timestamp — return only comments older than this. Pass the
/// `created_at` of the oldest currently-loaded comment to fetch the next
/// older page.
pub before: Option<DateTime<Utc>>,
}
const COMMENT_PAGE_SIZE: i64 = 50;
pub async fn list_comments( pub async fn list_comments(
State(state): State<AppState>, State(state): State<AppState>,
_auth: AuthUser, _auth: AuthUser,
Path(upload_id): Path<Uuid>, Path(upload_id): Path<Uuid>,
Query(q): Query<ListCommentsQuery>,
) -> Result<Json<Vec<CommentDto>>, AppError> { ) -> Result<Json<Vec<CommentDto>>, AppError> {
let comments = Comment::list_for_upload(&state.pool, upload_id).await?; let comments =
Comment::list_for_upload(&state.pool, upload_id, q.before, COMMENT_PAGE_SIZE).await?;
Ok(Json(comments)) Ok(Json(comments))
} }
@@ -79,7 +92,8 @@ pub async fn add_comment(
} }
let text = body.body.trim(); let text = body.body.trim();
if text.is_empty() || text.len() > 500 { let text_chars = text.chars().count();
if text_chars == 0 || text_chars > 500 {
return Err(AppError::BadRequest( return Err(AppError::BadRequest(
"Kommentar muss zwischen 1 und 500 Zeichen lang sein.".into(), "Kommentar muss zwischen 1 und 500 Zeichen lang sein.".into(),
)); ));

View File

@@ -3,32 +3,51 @@ use std::time::Duration;
use axum::extract::{Query, State}; use axum::extract::{Query, State};
use axum::response::sse::{Event, KeepAlive, Sse}; use axum::response::sse::{Event, KeepAlive, Sse};
use axum::Json;
use futures::stream::Stream; use futures::stream::Stream;
use serde::Deserialize; use serde::{Deserialize, Serialize};
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::auth::jwt; use crate::auth::middleware::AuthUser;
use crate::error::AppError; use crate::error::AppError;
use crate::models::session::Session; use crate::models::session::Session;
use crate::state::AppState; use crate::state::AppState;
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct SseQuery { pub struct SseQuery {
pub token: String, pub ticket: String,
} }
/// SSE stream endpoint. Accepts JWT via query param since EventSource #[derive(Serialize)]
/// doesn't support custom headers. pub struct StreamTicketResponse {
pub ticket: String,
}
/// Mint a short-lived single-use SSE ticket. The browser's `EventSource` cannot
/// send an `Authorization` header, so the alternative used to be passing the JWT
/// as `?token=...` — which leaks the bearer token into access logs, referer
/// headers, and browser history. The client now exchanges its Bearer token for
/// an opaque ticket via this endpoint and passes that on the stream open.
pub async fn issue_ticket(
State(state): State<AppState>,
auth: AuthUser,
) -> Json<StreamTicketResponse> {
let ticket = state.sse_tickets.issue(auth.token_hash);
Json(StreamTicketResponse { ticket })
}
/// SSE stream endpoint. Authenticates via a single-use ticket (see
/// [`issue_ticket`]) — never the raw JWT.
pub async fn stream( pub async fn stream(
State(state): State<AppState>, State(state): State<AppState>,
Query(q): Query<SseQuery>, Query(q): Query<SseQuery>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> { ) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
// Verify token let token_hash = state
let _claims = jwt::verify_token(&q.token, &state.config.jwt_secret) .sse_tickets
.map_err(|_| AppError::Unauthorized("Token ungültig.".into()))?; .consume(&q.ticket)
.ok_or_else(|| AppError::Unauthorized("Ticket ungültig oder abgelaufen.".into()))?;
let token_hash = jwt::hash_token(&q.token);
Session::find_by_token_hash(&state.pool, &token_hash) Session::find_by_token_hash(&state.pool, &token_hash)
.await .await
.map_err(|e| AppError::Internal(e.into()))? .map_err(|e| AppError::Internal(e.into()))?

View File

@@ -99,9 +99,11 @@ pub async fn upload(
let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); let mime = content_type.unwrap_or_else(|| "application/octet-stream".to_string());
let size = data.len() as i64; let size = data.len() as i64;
// Validate caption length // Validate caption length. Counted in chars (code points) to match the
// "Zeichen" wording in the error message — `.len()` would be bytes and
// reject perfectly valid German/emoji captions early.
if let Some(ref cap) = caption { if let Some(ref cap) = caption {
if cap.len() > MAX_CAPTION_LENGTH { if cap.chars().count() > MAX_CAPTION_LENGTH {
return Err(AppError::BadRequest(format!( return Err(AppError::BadRequest(format!(
"Beschreibung ist zu lang. Maximum: {} Zeichen.", "Beschreibung ist zu lang. Maximum: {} Zeichen.",
MAX_CAPTION_LENGTH MAX_CAPTION_LENGTH

View File

@@ -42,7 +42,11 @@ async fn main() -> Result<()> {
// Hourly background hygiene: prune expired sessions, evict cold rate-limiter // Hourly background hygiene: prune expired sessions, evict cold rate-limiter
// keys. Keeps the DB and process from growing unboundedly over multi-day events. // keys. Keeps the DB and process from growing unboundedly over multi-day events.
services::maintenance::spawn_periodic_tasks(pool, state.rate_limiter.clone()); services::maintenance::spawn_periodic_tasks(
pool,
state.rate_limiter.clone(),
state.sse_tickets.clone(),
);
// Ensure media directories exist // Ensure media directories exist
tokio::fs::create_dir_all(&config.media_path).await.ok(); tokio::fs::create_dir_all(&config.media_path).await.ok();
@@ -80,6 +84,7 @@ async fn main() -> Result<()> {
.route("/api/v1/comment/{id}", delete(handlers::social::delete_comment)) .route("/api/v1/comment/{id}", delete(handlers::social::delete_comment))
// SSE // SSE
.route("/api/v1/stream", get(handlers::sse::stream)) .route("/api/v1/stream", get(handlers::sse::stream))
.route("/api/v1/stream/ticket", post(handlers::sse::issue_ticket))
// Host Dashboard // Host Dashboard
.route("/api/v1/host/event", get(handlers::host::get_event_status)) .route("/api/v1/host/event", get(handlers::host::get_event_status))
.route("/api/v1/host/event/close", post(handlers::host::close_event)) .route("/api/v1/host/event/close", post(handlers::host::close_event))

View File

@@ -40,15 +40,35 @@ impl Comment {
.await .await
} }
pub async fn list_for_upload(pool: &PgPool, upload_id: Uuid) -> Result<Vec<CommentDto>, sqlx::Error> { /// Paginated comment listing — returns up to `limit` rows in chronological
/// order (oldest first). If `before` is set, only comments older than that
/// timestamp are returned, enabling backward cursor pagination ("load
/// earlier"). Without the LIMIT a hot post with thousands of comments could
/// OOM the server on a single GET.
pub async fn list_for_upload(
pool: &PgPool,
upload_id: Uuid,
before: Option<DateTime<Utc>>,
limit: i64,
) -> Result<Vec<CommentDto>, sqlx::Error> {
// Two-step: pick the newest `limit` rows older than `before`, then flip
// them back into ascending order so the caller can render top-to-bottom.
sqlx::query_as::<_, CommentDto>( sqlx::query_as::<_, CommentDto>(
"SELECT c.id, c.upload_id, c.user_id, u.display_name AS uploader_name, c.body, c.created_at "SELECT * FROM (
SELECT c.id, c.upload_id, c.user_id, u.display_name AS uploader_name,
c.body, c.created_at
FROM comment c FROM comment c
JOIN \"user\" u ON u.id = c.user_id JOIN \"user\" u ON u.id = c.user_id
WHERE c.upload_id = $1 AND c.deleted_at IS NULL WHERE c.upload_id = $1 AND c.deleted_at IS NULL
ORDER BY c.created_at ASC", AND ($2::timestamptz IS NULL OR c.created_at < $2)
ORDER BY c.created_at DESC
LIMIT $3
) page
ORDER BY created_at ASC",
) )
.bind(upload_id) .bind(upload_id)
.bind(before)
.bind(limit)
.fetch_all(pool) .fetch_all(pool)
.await .await
} }
@@ -69,4 +89,25 @@ impl Comment {
.await?; .await?;
Ok(()) Ok(())
} }
/// Event-scoped variant of [`Self::soft_delete`]. Returns `false` if the
/// comment doesn't exist or belongs to a different event.
pub async fn soft_delete_in_event(
pool: &PgPool,
id: Uuid,
event_id: Uuid,
) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
"UPDATE comment
SET deleted_at = NOW()
WHERE id = $1
AND deleted_at IS NULL
AND upload_id IN (SELECT id FROM upload WHERE event_id = $2)",
)
.bind(id)
.bind(event_id)
.execute(pool)
.await?;
Ok(result.rows_affected() > 0)
}
} }

View File

@@ -67,11 +67,46 @@ impl Hashtag {
} }
} }
/// Extract #hashtags from text (caption or body). /// Extract `#hashtags` from text (caption or body). Tags are restricted to
/// ASCII letters, digits, and underscore — emoji / punctuation / accented
/// characters are rejected. This is deliberately strict: hashtags are an index
/// (used for filtering and SQL JOINs), and tags like `#🎉` or `#!?` accumulate
/// noise without helping anyone find content.
pub fn extract_hashtags(text: &str) -> Vec<String> { pub fn extract_hashtags(text: &str) -> Vec<String> {
const MAX_TAG_LEN: usize = 40;
text.split_whitespace() text.split_whitespace()
.filter(|w| w.starts_with('#') && w.len() > 1) .filter_map(|w| w.strip_prefix('#'))
.map(|w| w.trim_start_matches('#').to_lowercase()) .map(|t| {
.filter(|t| !t.is_empty()) t.chars()
.take_while(|c| c.is_ascii_alphanumeric() || *c == '_')
.collect::<String>()
.to_lowercase()
})
.filter(|t| !t.is_empty() && t.chars().count() <= MAX_TAG_LEN)
.collect() .collect()
} }
#[cfg(test)]
mod tests {
use super::extract_hashtags;
#[test]
fn ascii_word_chars_extracted() {
assert_eq!(
extract_hashtags("hello #wedding #Day_2 #cake!"),
vec!["wedding", "day_2", "cake"]
);
}
#[test]
fn emojis_and_punctuation_excluded() {
// The 🎉 tag drops out entirely (no ASCII chars after #), the next tag
// stops at the !? and yields only "fun".
assert_eq!(extract_hashtags("#🎉 #!? #fun!"), vec!["fun"]);
}
#[test]
fn empty_or_bare_hash_skipped() {
assert_eq!(extract_hashtags("# #"), Vec::<String>::new());
}
}

View File

@@ -69,6 +69,23 @@ impl Upload {
.await .await
} }
/// Event-scoped lookup used by host endpoints so a host of event A cannot
/// reach uploads belonging to event B.
pub async fn find_by_id_and_event(
pool: &PgPool,
id: Uuid,
event_id: Uuid,
) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>(
"SELECT * FROM upload
WHERE id = $1 AND event_id = $2 AND deleted_at IS NULL",
)
.bind(id)
.bind(event_id)
.fetch_optional(pool)
.await
}
pub async fn set_preview_path( pub async fn set_preview_path(
pool: &PgPool, pool: &PgPool,
id: Uuid, id: Uuid,
@@ -128,6 +145,43 @@ impl Upload {
Ok(()) Ok(())
} }
/// Event-scoped variant of [`Self::soft_delete`]. Returns `false` if no row
/// matched (already deleted, wrong event, or unknown id) so host handlers
/// can return a clean 404 instead of silently no-op'ing.
pub async fn soft_delete_in_event(
pool: &PgPool,
id: Uuid,
event_id: Uuid,
) -> Result<bool, sqlx::Error> {
let mut tx = pool.begin().await?;
let row: Option<(Uuid, i64)> = sqlx::query_as(
"UPDATE upload
SET deleted_at = NOW()
WHERE id = $1 AND event_id = $2 AND deleted_at IS NULL
RETURNING user_id, original_size_bytes",
)
.bind(id)
.bind(event_id)
.fetch_optional(&mut *tx)
.await?;
let deleted = if let Some((user_id, bytes)) = row {
sqlx::query(
"UPDATE \"user\"
SET total_upload_bytes = GREATEST(0, total_upload_bytes - $2)
WHERE id = $1",
)
.bind(user_id)
.bind(bytes)
.execute(&mut *tx)
.await?;
true
} else {
false
};
tx.commit().await?;
Ok(deleted)
}
pub async fn update_caption( pub async fn update_caption(
pool: &PgPool, pool: &PgPool,
id: Uuid, id: Uuid,

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use sqlx::PgPool; use sqlx::PgPool;
use crate::services::rate_limiter::RateLimiter; use crate::services::rate_limiter::RateLimiter;
use crate::services::sse_tickets::SseTicketStore;
/// Reset rows left in flight by a previous crashed instance. Run once on startup, /// Reset rows left in flight by a previous crashed instance. Run once on startup,
/// before the HTTP server starts taking requests, so users never observe the /// before the HTTP server starts taking requests, so users never observe the
@@ -68,9 +69,14 @@ pub async fn startup_recovery(pool: &PgPool) {
/// Spawns a background task that periodically: /// Spawns a background task that periodically:
/// - deletes session rows whose `expires_at` is more than a day in the past /// - deletes session rows whose `expires_at` is more than a day in the past
/// - prunes the in-memory rate-limiter HashMap of empty windows /// - prunes the in-memory rate-limiter HashMap of empty windows
/// - drops expired SSE tickets (30s TTL but the map keeps the slot until pruned)
/// ///
/// Cadence is 1h — fine for both jobs at our scale. /// Cadence is 1h — fine for both jobs at our scale.
pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) { pub fn spawn_periodic_tasks(
pool: PgPool,
rate_limiter: RateLimiter,
sse_tickets: SseTicketStore,
) {
tokio::spawn(async move { tokio::spawn(async move {
let mut tick = tokio::time::interval(Duration::from_secs(3600)); let mut tick = tokio::time::interval(Duration::from_secs(3600));
// Fire the first tick immediately, then hourly. // Fire the first tick immediately, then hourly.
@@ -79,6 +85,7 @@ pub fn spawn_periodic_tasks(pool: PgPool, rate_limiter: RateLimiter) {
tick.tick().await; tick.tick().await;
cleanup_sessions(&pool).await; cleanup_sessions(&pool).await;
rate_limiter.prune(); rate_limiter.prune();
sse_tickets.prune();
} }
}); });
} }

View File

@@ -4,3 +4,4 @@ pub mod export;
pub mod jobs; pub mod jobs;
pub mod maintenance; pub mod maintenance;
pub mod rate_limiter; pub mod rate_limiter;
pub mod sse_tickets;

View File

@@ -0,0 +1,74 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use rand::Rng;
/// Short-lived single-use tickets that let `EventSource` clients open the SSE
/// stream without putting the JWT in the URL (where it would leak via access
/// logs / referer / browser history).
///
/// Flow: client `POST /api/v1/stream/ticket` with `Authorization: Bearer <jwt>`,
/// server returns an opaque ticket, client passes it as `?ticket=...` on the
/// stream open. Tickets are consumed on use and expire after `TTL`.
const TTL: Duration = Duration::from_secs(30);
#[derive(Clone)]
pub struct SseTicketStore {
inner: Arc<Mutex<HashMap<String, Entry>>>,
}
#[derive(Clone)]
struct Entry {
token_hash: String,
issued_at: Instant,
}
impl SseTicketStore {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Mint a new ticket bound to the caller's session (identified by token hash).
pub fn issue(&self, token_hash: String) -> String {
let ticket = random_ticket();
let mut map = self.inner.lock().unwrap();
map.insert(
ticket.clone(),
Entry {
token_hash,
issued_at: Instant::now(),
},
);
ticket
}
/// Consume a ticket. Returns `Some(token_hash)` if the ticket exists and is
/// not expired. Single-use: the ticket is removed regardless of whether it
/// was still fresh, so a replay can't slip through after expiry.
pub fn consume(&self, ticket: &str) -> Option<String> {
let mut map = self.inner.lock().unwrap();
let entry = map.remove(ticket)?;
if entry.issued_at.elapsed() > TTL {
return None;
}
Some(entry.token_hash)
}
/// Drop expired entries — called from the background maintenance task so a
/// long-running process doesn't accumulate stale tickets.
pub fn prune(&self) {
let mut map = self.inner.lock().unwrap();
map.retain(|_, e| e.issued_at.elapsed() <= TTL);
}
}
fn random_ticket() -> String {
// 192 bits of randomness, base32-ish hex. Plenty of entropy and URL-safe.
let mut rng = rand::rng();
let mut bytes = [0u8; 24];
rng.fill(&mut bytes);
bytes.iter().map(|b| format!("{b:02x}")).collect()
}

View File

@@ -4,6 +4,7 @@ use tokio::sync::broadcast;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::services::compression::CompressionWorker; use crate::services::compression::CompressionWorker;
use crate::services::rate_limiter::RateLimiter; use crate::services::rate_limiter::RateLimiter;
use crate::services::sse_tickets::SseTicketStore;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SseEvent { pub struct SseEvent {
@@ -29,6 +30,7 @@ pub struct AppState {
pub sse_tx: broadcast::Sender<SseEvent>, pub sse_tx: broadcast::Sender<SseEvent>,
pub compression: CompressionWorker, pub compression: CompressionWorker,
pub rate_limiter: RateLimiter, pub rate_limiter: RateLimiter,
pub sse_tickets: SseTicketStore,
} }
impl AppState { impl AppState {
@@ -42,6 +44,7 @@ impl AppState {
sse_tx, sse_tx,
compression, compression,
rate_limiter: RateLimiter::new(), rate_limiter: RateLimiter::new(),
sse_tickets: SseTicketStore::new(),
} }
} }
} }

View File

@@ -15,6 +15,8 @@ import { getToken } from './auth';
import { api } from './api'; import { api } from './api';
import type { DeltaResponse } from './types'; import type { DeltaResponse } from './types';
type StreamTicketResponse = { ticket: string };
type EventHandler = (data: string) => void; type EventHandler = (data: string) => void;
let eventSource: EventSource | null = null; let eventSource: EventSource | null = null;
@@ -69,8 +71,24 @@ export function connectSse(): void {
const token = getToken(); const token = getToken();
if (!token || eventSource) return; if (!token || eventSource) return;
// EventSource doesn't support custom headers, so pass token as query param. // EventSource can't send an Authorization header, so we exchange the JWT for
eventSource = new EventSource(`/api/v1/stream?token=${encodeURIComponent(token)}`); // a short-lived single-use ticket via POST /stream/ticket (Bearer auth) and
// pass that on the URL. The JWT itself never appears in URLs / access logs.
void (async () => {
let ticket: string;
try {
const res = await api.post<StreamTicketResponse>('/stream/ticket', {});
ticket = res.ticket;
} catch {
// Failed to mint a ticket (auth lapse, network blip). Back off and retry
// via the existing error path.
scheduleReconnect();
return;
}
// Auth flow may have torn things down while we were awaiting the ticket.
if (!getToken() || eventSource) return;
eventSource = new EventSource(`/api/v1/stream?ticket=${encodeURIComponent(ticket)}`);
eventSource.onopen = () => { eventSource.onopen = () => {
// Successful connection — reset the backoff counter. // Successful connection — reset the backoff counter.
@@ -95,12 +113,17 @@ export function connectSse(): void {
// retry storms (and lets the backend recover quietly) when the server is down // retry storms (and lets the backend recover quietly) when the server is down
// for a while or when 100+ guests reconnect simultaneously after an outage. // for a while or when 100+ guests reconnect simultaneously after an outage.
disconnectSse(); disconnectSse();
scheduleReconnect();
};
})();
}
function scheduleReconnect(): void {
reconnectAttempt++; reconnectAttempt++;
const delay = Math.min(60_000, 1_000 * 2 ** (reconnectAttempt - 1)); const delay = Math.min(60_000, 1_000 * 2 ** (reconnectAttempt - 1));
const jitter = Math.random() * 500; const jitter = Math.random() * 500;
if (reconnectTimer) clearTimeout(reconnectTimer); if (reconnectTimer) clearTimeout(reconnectTimer);
reconnectTimer = setTimeout(connectSse, delay + jitter); reconnectTimer = setTimeout(connectSse, delay + jitter);
};
} }
export function disconnectSse(): void { export function disconnectSse(): void {