feat: implement gallery feed with social features and SSE

- Cursor-based feed endpoint using v_feed view with hashtag filtering
- Like toggle (INSERT ON CONFLICT), comments CRUD
- Feed delta endpoint for SSE-driven incremental updates
- SSE client with Page Visibility API (pause/reconnect)
- Responsive photo/video grid with infinite scroll
- Hashtag filter chips, lightbox modal with comments
- Media file serving via tower-http ServeDir

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
fabi
2026-04-01 19:17:06 +02:00
parent 4e1f1d6426
commit 964598e41d
13 changed files with 1134 additions and 26 deletions

View File

@@ -0,0 +1,258 @@
use axum::extract::{Query, State};
use axum::Json;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::auth::middleware::AuthUser;
use crate::error::AppError;
use crate::state::AppState;
#[derive(Deserialize)]
pub struct FeedQuery {
pub cursor: Option<Uuid>,
pub limit: Option<i64>,
pub hashtag: Option<String>,
}
#[derive(Serialize)]
pub struct FeedUpload {
pub id: Uuid,
pub user_id: Uuid,
pub uploader_name: String,
pub preview_url: Option<String>,
pub thumbnail_url: Option<String>,
pub mime_type: String,
pub caption: Option<String>,
pub like_count: i64,
pub comment_count: i64,
pub liked_by_me: bool,
pub created_at: DateTime<Utc>,
}
#[derive(Serialize)]
pub struct FeedResponse {
pub uploads: Vec<FeedUpload>,
pub next_cursor: Option<Uuid>,
}
#[derive(sqlx::FromRow)]
struct FeedRow {
id: Uuid,
user_id: Uuid,
uploader_name: String,
preview_path: Option<String>,
thumbnail_path: Option<String>,
mime_type: String,
caption: Option<String>,
like_count: i64,
comment_count: i64,
created_at: DateTime<Utc>,
}
pub async fn feed(
State(state): State<AppState>,
auth: AuthUser,
Query(q): Query<FeedQuery>,
) -> Result<Json<FeedResponse>, AppError> {
let limit = q.limit.unwrap_or(20).min(100);
let rows = if let Some(hashtag) = &q.hashtag {
let tag = hashtag.trim().trim_start_matches('#').to_lowercase();
sqlx::query_as::<_, FeedRow>(
"SELECT v.id, v.user_id, v.uploader_name, v.preview_path, v.thumbnail_path,
v.mime_type, v.caption, v.like_count, v.comment_count, v.created_at
FROM v_feed v
JOIN upload_hashtag uh ON uh.upload_id = v.id
JOIN hashtag h ON h.id = uh.hashtag_id AND h.tag = $1
WHERE v.event_id = $2
AND ($3::timestamptz IS NULL OR v.created_at < $3)
ORDER BY v.created_at DESC
LIMIT $4",
)
.bind(&tag)
.bind(auth.event_id)
.bind(
if let Some(cursor) = q.cursor {
get_cursor_time(&state.pool, cursor).await
} else {
None
},
)
.bind(limit + 1)
.fetch_all(&state.pool)
.await?
} else {
sqlx::query_as::<_, FeedRow>(
"SELECT id, user_id, uploader_name, preview_path, thumbnail_path,
mime_type, caption, like_count, comment_count, created_at
FROM v_feed
WHERE event_id = $1
AND ($2::timestamptz IS NULL OR created_at < $2)
ORDER BY created_at DESC
LIMIT $3",
)
.bind(auth.event_id)
.bind(
if let Some(cursor) = q.cursor {
get_cursor_time(&state.pool, cursor).await
} else {
None
},
)
.bind(limit + 1)
.fetch_all(&state.pool)
.await?
};
let has_more = rows.len() as i64 > limit;
let rows: Vec<FeedRow> = rows.into_iter().take(limit as usize).collect();
let next_cursor = if has_more { rows.last().map(|r| r.id) } else { None };
// Batch check which uploads the current user has liked
let upload_ids: Vec<Uuid> = rows.iter().map(|r| r.id).collect();
let liked_set = get_liked_set(&state.pool, auth.user_id, &upload_ids).await;
let uploads = rows
.into_iter()
.map(|r| {
let preview_url = r.preview_path.map(|p| format!("/media/{p}"));
let thumbnail_url = r.thumbnail_path.map(|p| format!("/media/{p}"));
FeedUpload {
liked_by_me: liked_set.contains(&r.id),
id: r.id,
user_id: r.user_id,
uploader_name: r.uploader_name,
preview_url,
thumbnail_url,
mime_type: r.mime_type,
caption: r.caption,
like_count: r.like_count,
comment_count: r.comment_count,
created_at: r.created_at,
}
})
.collect();
Ok(Json(FeedResponse {
uploads,
next_cursor,
}))
}
#[derive(Deserialize)]
pub struct DeltaQuery {
pub since: DateTime<Utc>,
}
#[derive(Serialize)]
pub struct DeltaResponse {
pub uploads: Vec<FeedUpload>,
pub deleted_ids: Vec<Uuid>,
}
pub async fn feed_delta(
State(state): State<AppState>,
auth: AuthUser,
Query(q): Query<DeltaQuery>,
) -> Result<Json<DeltaResponse>, AppError> {
let rows = sqlx::query_as::<_, FeedRow>(
"SELECT id, user_id, uploader_name, preview_path, thumbnail_path,
mime_type, caption, like_count, comment_count, created_at
FROM v_feed
WHERE event_id = $1 AND created_at > $2
ORDER BY created_at DESC",
)
.bind(auth.event_id)
.bind(q.since)
.fetch_all(&state.pool)
.await?;
let deleted_ids: Vec<(Uuid,)> = sqlx::query_as(
"SELECT id FROM upload
WHERE event_id = $1 AND deleted_at IS NOT NULL AND deleted_at > $2",
)
.bind(auth.event_id)
.bind(q.since)
.fetch_all(&state.pool)
.await?;
let upload_ids: Vec<Uuid> = rows.iter().map(|r| r.id).collect();
let liked_set = get_liked_set(&state.pool, auth.user_id, &upload_ids).await;
let uploads = rows
.into_iter()
.map(|r| FeedUpload {
liked_by_me: liked_set.contains(&r.id),
id: r.id,
user_id: r.user_id,
uploader_name: r.uploader_name,
preview_url: r.preview_path.map(|p| format!("/media/{p}")),
thumbnail_url: r.thumbnail_path.map(|p| format!("/media/{p}")),
mime_type: r.mime_type,
caption: r.caption,
like_count: r.like_count,
comment_count: r.comment_count,
created_at: r.created_at,
})
.collect();
Ok(Json(DeltaResponse {
uploads,
deleted_ids: deleted_ids.into_iter().map(|r| r.0).collect(),
}))
}
#[derive(Serialize)]
pub struct HashtagCount {
pub tag: String,
pub count: i64,
}
pub async fn hashtags(
State(state): State<AppState>,
auth: AuthUser,
) -> Result<Json<Vec<HashtagCount>>, AppError> {
let rows: Vec<(String, i64)> = sqlx::query_as(
"SELECT tag, upload_count FROM v_hashtag_counts WHERE event_id = $1",
)
.bind(auth.event_id)
.fetch_all(&state.pool)
.await?;
Ok(Json(
rows.into_iter()
.map(|(tag, count)| HashtagCount { tag, count })
.collect(),
))
}
async fn get_cursor_time(pool: &sqlx::PgPool, cursor_id: Uuid) -> Option<DateTime<Utc>> {
let row: Option<(DateTime<Utc>,)> =
sqlx::query_as("SELECT created_at FROM upload WHERE id = $1")
.bind(cursor_id)
.fetch_optional(pool)
.await
.ok()?;
row.map(|r| r.0)
}
async fn get_liked_set(
pool: &sqlx::PgPool,
user_id: Uuid,
upload_ids: &[Uuid],
) -> std::collections::HashSet<Uuid> {
if upload_ids.is_empty() {
return std::collections::HashSet::new();
}
let rows: Vec<(Uuid,)> = sqlx::query_as(
"SELECT upload_id FROM \"like\" WHERE user_id = $1 AND upload_id = ANY($2)",
)
.bind(user_id)
.bind(upload_ids)
.fetch_all(pool)
.await
.unwrap_or_default();
rows.into_iter().map(|r| r.0).collect()
}

View File

@@ -1,2 +1,4 @@
pub mod feed;
pub mod social;
pub mod sse;
pub mod upload;

View File

@@ -0,0 +1,136 @@
use axum::extract::{Path, State};
use axum::http::StatusCode;
use axum::Json;
use serde::Deserialize;
use uuid::Uuid;
use crate::auth::middleware::AuthUser;
use crate::error::AppError;
use crate::models::comment::{Comment, CommentDto};
use crate::models::hashtag::{self, Hashtag};
use crate::state::AppState;
pub async fn toggle_like(
State(state): State<AppState>,
auth: AuthUser,
Path(upload_id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
// Check if user is banned
let user = crate::models::user::User::find_by_id(&state.pool, auth.user_id)
.await?
.ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?;
if user.is_banned {
return Err(AppError::Forbidden("Du bist gesperrt.".into()));
}
// Try to insert; if conflict, delete (toggle)
let result = sqlx::query(
"INSERT INTO \"like\" (upload_id, user_id) VALUES ($1, $2)
ON CONFLICT (upload_id, user_id) DO NOTHING",
)
.bind(upload_id)
.bind(auth.user_id)
.execute(&state.pool)
.await?;
if result.rows_affected() == 0 {
// Already liked — remove
sqlx::query("DELETE FROM \"like\" WHERE upload_id = $1 AND user_id = $2")
.bind(upload_id)
.bind(auth.user_id)
.execute(&state.pool)
.await?;
}
// Broadcast SSE
let _ = state.sse_tx.send(crate::state::SseEvent {
event_type: "like-update".to_string(),
data: serde_json::json!({ "upload_id": upload_id }).to_string(),
});
Ok(StatusCode::NO_CONTENT)
}
pub async fn list_comments(
State(state): State<AppState>,
_auth: AuthUser,
Path(upload_id): Path<Uuid>,
) -> Result<Json<Vec<CommentDto>>, AppError> {
let comments = Comment::list_for_upload(&state.pool, upload_id).await?;
Ok(Json(comments))
}
#[derive(Deserialize)]
pub struct AddCommentRequest {
pub body: String,
}
pub async fn add_comment(
State(state): State<AppState>,
auth: AuthUser,
Path(upload_id): Path<Uuid>,
Json(body): Json<AddCommentRequest>,
) -> Result<(StatusCode, Json<CommentDto>), AppError> {
let user = crate::models::user::User::find_by_id(&state.pool, auth.user_id)
.await?
.ok_or_else(|| AppError::NotFound("Benutzer nicht gefunden.".into()))?;
if user.is_banned {
return Err(AppError::Forbidden("Du bist gesperrt.".into()));
}
let text = body.body.trim();
if text.is_empty() || text.len() > 500 {
return Err(AppError::BadRequest(
"Kommentar muss zwischen 1 und 500 Zeichen lang sein.".into(),
));
}
let comment = Comment::create(&state.pool, upload_id, auth.user_id, text).await?;
// Process hashtags in comment body
let tags = hashtag::extract_hashtags(text);
for tag in &tags {
let h = Hashtag::upsert(&state.pool, auth.event_id, tag).await?;
sqlx::query(
"INSERT INTO comment_hashtag (comment_id, hashtag_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
)
.bind(comment.id)
.bind(h.id)
.execute(&state.pool)
.await?;
}
// Broadcast SSE
let _ = state.sse_tx.send(crate::state::SseEvent {
event_type: "new-comment".to_string(),
data: serde_json::json!({ "upload_id": upload_id }).to_string(),
});
let dto = CommentDto {
id: comment.id,
upload_id,
user_id: auth.user_id,
uploader_name: user.display_name,
body: comment.body,
created_at: comment.created_at,
};
Ok((StatusCode::CREATED, Json(dto)))
}
pub async fn delete_comment(
State(state): State<AppState>,
auth: AuthUser,
Path(comment_id): Path<Uuid>,
) -> Result<StatusCode, AppError> {
let comment = Comment::find_by_id(&state.pool, comment_id)
.await?
.ok_or_else(|| AppError::NotFound("Kommentar nicht gefunden.".into()))?;
if comment.user_id != auth.user_id {
return Err(AppError::Forbidden("Nur eigene Kommentare löschen.".into()));
}
Comment::soft_delete(&state.pool, comment_id).await?;
Ok(StatusCode::NO_CONTENT)
}

View File

@@ -1,28 +1,45 @@
use std::convert::Infallible;
use std::time::Duration;
use axum::extract::State;
use axum::extract::{Query, State};
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use serde::Deserialize;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::StreamExt;
use crate::auth::middleware::AuthUser;
use crate::auth::jwt;
use crate::error::AppError;
use crate::models::session::Session;
use crate::state::AppState;
#[derive(Deserialize)]
pub struct SseQuery {
pub token: String,
}
/// SSE stream endpoint. Accepts JWT via query param since EventSource
/// doesn't support custom headers.
pub async fn stream(
State(state): State<AppState>,
_auth: AuthUser,
Query(q): Query<SseQuery>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, AppError> {
// Verify token
let _claims = jwt::verify_token(&q.token, &state.config.jwt_secret)
.map_err(|_| AppError::Unauthorized("Token ungültig.".into()))?;
let token_hash = jwt::hash_token(&q.token);
Session::find_by_token_hash(&state.pool, &token_hash)
.await
.map_err(|e| AppError::Internal(e.into()))?
.ok_or_else(|| AppError::Unauthorized("Sitzung nicht gefunden.".into()))?;
let rx = state.sse_tx.subscribe();
let stream = BroadcastStream::new(rx).filter_map(|msg| {
match msg {
Ok(sse_event) => Some(Ok(Event::default()
.event(sse_event.event_type)
.data(sse_event.data))),
Err(_) => None, // Lagged — skip missed events
}
let stream = BroadcastStream::new(rx).filter_map(|msg| match msg {
Ok(sse_event) => Some(Ok(Event::default()
.event(sse_event.event_type)
.data(sse_event.data))),
Err(_) => None,
});
Ok(Sse::new(stream).keep_alive(