use std::convert::Infallible; use std::time::Duration; use axum::extract::{Query, State}; use axum::response::sse::{Event, KeepAlive, Sse}; use futures::stream::Stream; use serde::Deserialize; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::StreamExt; use crate::auth::jwt; use crate::error::AppError; use crate::models::session::Session; use crate::state::AppState; #[derive(Deserialize)] pub struct SseQuery { pub token: String, } /// SSE stream endpoint. Accepts JWT via query param since EventSource /// doesn't support custom headers. pub async fn stream( State(state): State, Query(q): Query, ) -> Result>>, AppError> { // Verify token let _claims = jwt::verify_token(&q.token, &state.config.jwt_secret) .map_err(|_| AppError::Unauthorized("Token ungültig.".into()))?; let token_hash = jwt::hash_token(&q.token); Session::find_by_token_hash(&state.pool, &token_hash) .await .map_err(|e| AppError::Internal(e.into()))? .ok_or_else(|| AppError::Unauthorized("Sitzung nicht gefunden.".into()))?; let rx = state.sse_tx.subscribe(); let stream = BroadcastStream::new(rx).filter_map(|msg| match msg { Ok(sse_event) => Some(Ok(Event::default() .event(sse_event.event_type) .data(sse_event.data))), Err(_) => None, }); Ok(Sse::new(stream).keep_alive( KeepAlive::new() .interval(Duration::from_secs(30)) .text("ping"), )) }