feat: auto-retry uploads when rate limited (v0.13.0)
Backend: rate limiter gains check_with_retry() returning seconds until the next slot opens. Upload 429 responses include retry_after_secs in JSON and a Retry-After header. Frontend: upload queue catches 429 as RateLimitError, resets affected item to pending, schedules processQueue() for the server-reported delay, and shows a live countdown banner in the queue UI. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -39,6 +39,7 @@ pub async fn join(
|
|||||||
if !state.rate_limiter.check(format!("join:{ip}"), 5, Duration::from_secs(60)) {
|
if !state.rate_limiter.check(format!("join:{ip}"), 5, Duration::from_secs(60)) {
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests(
|
||||||
"Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(),
|
"Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(),
|
||||||
|
None,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -124,6 +125,7 @@ pub async fn recover(
|
|||||||
if Utc::now() < locked_until {
|
if Utc::now() < locked_until {
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests(
|
||||||
"Zu viele Versuche. Bitte warte 15 Minuten.".into(),
|
"Zu viele Versuche. Bitte warte 15 Minuten.".into(),
|
||||||
|
None,
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use axum::http::StatusCode;
|
use axum::http::StatusCode;
|
||||||
use axum::response::{IntoResponse, Response};
|
use axum::response::{IntoResponse, Response};
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
@@ -8,7 +7,9 @@ pub enum AppError {
|
|||||||
Unauthorized(String),
|
Unauthorized(String),
|
||||||
Forbidden(String),
|
Forbidden(String),
|
||||||
NotFound(String),
|
NotFound(String),
|
||||||
TooManyRequests(String),
|
Conflict(String),
|
||||||
|
/// Second field: optional retry-after seconds to include in the response.
|
||||||
|
TooManyRequests(String, Option<u64>),
|
||||||
Internal(anyhow::Error),
|
Internal(anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -19,7 +20,8 @@ impl AppError {
|
|||||||
Self::Unauthorized(_) => (StatusCode::UNAUTHORIZED, "unauthorized"),
|
Self::Unauthorized(_) => (StatusCode::UNAUTHORIZED, "unauthorized"),
|
||||||
Self::Forbidden(_) => (StatusCode::FORBIDDEN, "forbidden"),
|
Self::Forbidden(_) => (StatusCode::FORBIDDEN, "forbidden"),
|
||||||
Self::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
|
Self::NotFound(_) => (StatusCode::NOT_FOUND, "not_found"),
|
||||||
Self::TooManyRequests(_) => (StatusCode::TOO_MANY_REQUESTS, "too_many_requests"),
|
Self::Conflict(_) => (StatusCode::CONFLICT, "conflict"),
|
||||||
|
Self::TooManyRequests(..) => (StatusCode::TOO_MANY_REQUESTS, "too_many_requests"),
|
||||||
Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "internal_error"),
|
Self::Internal(_) => (StatusCode::INTERNAL_SERVER_ERROR, "internal_error"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -30,7 +32,8 @@ impl AppError {
|
|||||||
| Self::Unauthorized(msg)
|
| Self::Unauthorized(msg)
|
||||||
| Self::Forbidden(msg)
|
| Self::Forbidden(msg)
|
||||||
| Self::NotFound(msg)
|
| Self::NotFound(msg)
|
||||||
| Self::TooManyRequests(msg) => msg.clone(),
|
| Self::Conflict(msg) => msg.clone(),
|
||||||
|
Self::TooManyRequests(msg, _) => msg.clone(),
|
||||||
Self::Internal(err) => {
|
Self::Internal(err) => {
|
||||||
tracing::error!("internal error: {err:#}");
|
tracing::error!("internal error: {err:#}");
|
||||||
"Ein interner Fehler ist aufgetreten.".to_string()
|
"Ein interner Fehler ist aufgetreten.".to_string()
|
||||||
@@ -42,13 +45,29 @@ impl AppError {
|
|||||||
impl IntoResponse for AppError {
|
impl IntoResponse for AppError {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
let (status, code) = self.status_and_code();
|
let (status, code) = self.status_and_code();
|
||||||
|
let retry_after_secs = if let Self::TooManyRequests(_, Some(secs)) = &self {
|
||||||
|
Some(*secs)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let message = self.message();
|
let message = self.message();
|
||||||
let body = json!({
|
|
||||||
|
let mut body = serde_json::json!({
|
||||||
"error": code,
|
"error": code,
|
||||||
"message": message,
|
"message": message,
|
||||||
"status": status.as_u16(),
|
"status": status.as_u16(),
|
||||||
});
|
});
|
||||||
(status, axum::Json(body)).into_response()
|
if let Some(secs) = retry_after_secs {
|
||||||
|
body["retry_after_secs"] = secs.into();
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut resp = (status, axum::Json(body)).into_response();
|
||||||
|
if let Some(secs) = retry_after_secs {
|
||||||
|
if let Ok(val) = axum::http::HeaderValue::from_str(&secs.to_string()) {
|
||||||
|
resp.headers_mut().insert(axum::http::header::RETRY_AFTER, val);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -180,9 +180,7 @@ pub async fn download_zip(
|
|||||||
let ip = client_ip(&headers, "unknown");
|
let ip = client_ip(&headers, "unknown");
|
||||||
let limit = get_config_usize(&state.pool, "export_rate_per_day", 3).await;
|
let limit = get_config_usize(&state.pool, "export_rate_per_day", 3).await;
|
||||||
if !state.rate_limiter.check(format!("export:{ip}"), limit, Duration::from_secs(86400)) {
|
if !state.rate_limiter.check(format!("export:{ip}"), limit, Duration::from_secs(86400)) {
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests("Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(), None));
|
||||||
"Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
|
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
|
||||||
@@ -211,9 +209,7 @@ pub async fn download_html(
|
|||||||
let ip = client_ip(&headers, "unknown");
|
let ip = client_ip(&headers, "unknown");
|
||||||
let limit = get_config_usize(&state.pool, "export_rate_per_day", 3).await;
|
let limit = get_config_usize(&state.pool, "export_rate_per_day", 3).await;
|
||||||
if !state.rate_limiter.check(format!("export:{ip}"), limit, Duration::from_secs(86400)) {
|
if !state.rate_limiter.check(format!("export:{ip}"), limit, Duration::from_secs(86400)) {
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests("Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(), None));
|
||||||
"Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
|
let event = crate::models::event::Event::find_by_slug(&state.pool, &state.config.event_slug)
|
||||||
|
|||||||
@@ -63,9 +63,7 @@ pub async fn feed(
|
|||||||
let ip = client_ip(&headers, "unknown");
|
let ip = client_ip(&headers, "unknown");
|
||||||
let rate_limit = get_config_usize(&state.pool, "feed_rate_per_min", 60).await;
|
let rate_limit = get_config_usize(&state.pool, "feed_rate_per_min", 60).await;
|
||||||
if !state.rate_limiter.check(format!("feed:{ip}"), rate_limit, Duration::from_secs(60)) {
|
if !state.rate_limiter.check(format!("feed:{ip}"), rate_limit, Duration::from_secs(60)) {
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests("Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(), None));
|
||||||
"Zu viele Anfragen. Bitte warte kurz und versuche es erneut.".into(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let limit = q.limit.unwrap_or(20).min(100);
|
let limit = q.limit.unwrap_or(20).min(100);
|
||||||
|
|||||||
@@ -20,13 +20,14 @@ pub async fn upload(
|
|||||||
) -> Result<(StatusCode, Json<UploadDto>), AppError> {
|
) -> Result<(StatusCode, Json<UploadDto>), AppError> {
|
||||||
// Rate limit: N uploads per hour per user
|
// Rate limit: N uploads per hour per user
|
||||||
let upload_rate = get_config_i64(&state.pool, "upload_rate_per_hour", 10).await as usize;
|
let upload_rate = get_config_i64(&state.pool, "upload_rate_per_hour", 10).await as usize;
|
||||||
if !state
|
if let Err(retry_after_secs) = state
|
||||||
.rate_limiter
|
.rate_limiter
|
||||||
.check(format!("upload:{}", auth.user_id), upload_rate, Duration::from_secs(3600))
|
.check_with_retry(format!("upload:{}", auth.user_id), upload_rate, Duration::from_secs(3600))
|
||||||
{
|
{
|
||||||
drain_multipart(multipart).await;
|
drain_multipart(multipart).await;
|
||||||
return Err(AppError::TooManyRequests(
|
return Err(AppError::TooManyRequests(
|
||||||
"Du hast dein Upload-Limit für diese Stunde erreicht.".into(),
|
"Du hast dein Upload-Limit für diese Stunde erreicht.".into(),
|
||||||
|
Some(retry_after_secs),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,17 +19,26 @@ impl RateLimiter {
|
|||||||
|
|
||||||
/// Returns `true` if the request is allowed, `false` if rate-limited.
|
/// Returns `true` if the request is allowed, `false` if rate-limited.
|
||||||
pub fn check(&self, key: impl Into<String>, max: usize, window: Duration) -> bool {
|
pub fn check(&self, key: impl Into<String>, max: usize, window: Duration) -> bool {
|
||||||
|
self.check_with_retry(key, max, window).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns `Ok(())` if allowed, `Err(retry_after_secs)` if rate-limited.
|
||||||
|
/// `retry_after_secs` is how long until the oldest slot in the window expires.
|
||||||
|
pub fn check_with_retry(&self, key: impl Into<String>, max: usize, window: Duration) -> Result<(), u64> {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let key = key.into();
|
let key = key.into();
|
||||||
let mut map = self.windows.lock().unwrap();
|
let mut map = self.windows.lock().unwrap();
|
||||||
let timestamps = map.entry(key).or_default();
|
let timestamps = map.entry(key).or_default();
|
||||||
// Drop entries outside the window
|
|
||||||
timestamps.retain(|&t| now.duration_since(t) < window);
|
timestamps.retain(|&t| now.duration_since(t) < window);
|
||||||
if timestamps.len() < max {
|
if timestamps.len() < max {
|
||||||
timestamps.push(now);
|
timestamps.push(now);
|
||||||
true
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
false
|
// The oldest timestamp expires at oldest + window; compute remaining seconds
|
||||||
|
let oldest = timestamps[0];
|
||||||
|
let elapsed = now.duration_since(oldest);
|
||||||
|
let remaining = window.saturating_sub(elapsed);
|
||||||
|
Err(remaining.as_secs().max(1))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
<script lang="ts">
|
<script lang="ts">
|
||||||
import { queueItems, isProcessing, retryItem, removeItem, clearCompleted } from '$lib/upload-queue';
|
import { queueItems, isProcessing, retryItem, removeItem, clearCompleted, rateLimitRetryAt } from '$lib/upload-queue';
|
||||||
import type { QueueItem } from '$lib/upload-queue';
|
import type { QueueItem } from '$lib/upload-queue';
|
||||||
|
|
||||||
function formatSize(bytes: number): string {
|
function formatSize(bytes: number): string {
|
||||||
@@ -28,6 +28,25 @@
|
|||||||
|
|
||||||
let items = $derived($queueItems);
|
let items = $derived($queueItems);
|
||||||
let hasCompleted = $derived(items.some((i) => i.status === 'done'));
|
let hasCompleted = $derived(items.some((i) => i.status === 'done'));
|
||||||
|
|
||||||
|
// Countdown for rate-limit banner
|
||||||
|
let countdown = $state(0);
|
||||||
|
|
||||||
|
$effect(() => {
|
||||||
|
const retryAt = $rateLimitRetryAt;
|
||||||
|
if (!retryAt) {
|
||||||
|
countdown = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
countdown = Math.ceil((retryAt - Date.now()) / 1000);
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
countdown = Math.ceil((retryAt - Date.now()) / 1000);
|
||||||
|
if (countdown <= 0) clearInterval(interval);
|
||||||
|
}, 1000);
|
||||||
|
|
||||||
|
return () => clearInterval(interval);
|
||||||
|
});
|
||||||
</script>
|
</script>
|
||||||
|
|
||||||
{#if items.length > 0}
|
{#if items.length > 0}
|
||||||
@@ -49,6 +68,12 @@
|
|||||||
{/if}
|
{/if}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
{#if $rateLimitRetryAt && countdown > 0}
|
||||||
|
<div class="border-b border-amber-100 bg-amber-50 px-4 py-2 text-sm text-amber-800">
|
||||||
|
Upload-Limit erreicht. Wird in {countdown} Sek. automatisch fortgesetzt.
|
||||||
|
</div>
|
||||||
|
{/if}
|
||||||
|
|
||||||
<ul class="divide-y divide-gray-100">
|
<ul class="divide-y divide-gray-100">
|
||||||
{#each items as item (item.id)}
|
{#each items as item (item.id)}
|
||||||
<li class="px-4 py-3">
|
<li class="px-4 py-3">
|
||||||
|
|||||||
@@ -18,6 +18,9 @@ export interface QueueItem {
|
|||||||
export const queueItems = writable<QueueItem[]>([]);
|
export const queueItems = writable<QueueItem[]>([]);
|
||||||
export const isProcessing = writable(false);
|
export const isProcessing = writable(false);
|
||||||
|
|
||||||
|
/** Set to the timestamp (ms) at which the rate-limit lifts, or null when clear. */
|
||||||
|
export const rateLimitRetryAt = writable<number | null>(null);
|
||||||
|
|
||||||
const DB_NAME = 'eventsnap-uploads';
|
const DB_NAME = 'eventsnap-uploads';
|
||||||
const STORE_NAME = 'queue';
|
const STORE_NAME = 'queue';
|
||||||
|
|
||||||
@@ -35,6 +38,14 @@ async function getDb(): Promise<IDBPDatabase> {
|
|||||||
return db;
|
return db;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class RateLimitError extends Error {
|
||||||
|
retryAfterSecs: number;
|
||||||
|
constructor(secs: number) {
|
||||||
|
super('rate_limited');
|
||||||
|
this.retryAfterSecs = secs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function loadQueue(): Promise<void> {
|
export async function loadQueue(): Promise<void> {
|
||||||
const database = await getDb();
|
const database = await getDb();
|
||||||
const all = await database.getAll(STORE_NAME);
|
const all = await database.getAll(STORE_NAME);
|
||||||
@@ -136,7 +147,21 @@ async function processQueue(): Promise<void> {
|
|||||||
const next = items.find((item) => item.status === 'pending');
|
const next = items.find((item) => item.status === 'pending');
|
||||||
if (!next) break;
|
if (!next) break;
|
||||||
|
|
||||||
await uploadItem(next.id);
|
try {
|
||||||
|
await uploadItem(next.id);
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof RateLimitError) {
|
||||||
|
// Keep all pending items as-is; schedule queue resume when limit lifts
|
||||||
|
const retryAt = Date.now() + e.retryAfterSecs * 1000;
|
||||||
|
rateLimitRetryAt.set(retryAt);
|
||||||
|
setTimeout(() => {
|
||||||
|
rateLimitRetryAt.set(null);
|
||||||
|
processQueue();
|
||||||
|
}, e.retryAfterSecs * 1000);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Other errors are already handled inside uploadItem (marked as 'error')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
processing = false;
|
processing = false;
|
||||||
@@ -148,7 +173,6 @@ async function uploadItem(id: string): Promise<void> {
|
|||||||
const database = await getDb();
|
const database = await getDb();
|
||||||
const entry = await database.get(STORE_NAME, id);
|
const entry = await database.get(STORE_NAME, id);
|
||||||
if (!entry || !entry.blob) {
|
if (!entry || !entry.blob) {
|
||||||
// No blob — mark as error
|
|
||||||
updateItemStatus(id, 'error', 'Datei nicht gefunden.');
|
updateItemStatus(id, 'error', 'Datei nicht gefunden.');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -184,6 +208,14 @@ async function uploadItem(id: string): Promise<void> {
|
|||||||
xhr.addEventListener('load', () => {
|
xhr.addEventListener('load', () => {
|
||||||
if (xhr.status >= 200 && xhr.status < 300) {
|
if (xhr.status >= 200 && xhr.status < 300) {
|
||||||
resolve();
|
resolve();
|
||||||
|
} else if (xhr.status === 429) {
|
||||||
|
try {
|
||||||
|
const body = JSON.parse(xhr.responseText);
|
||||||
|
const secs = typeof body.retry_after_secs === 'number' ? body.retry_after_secs : 60;
|
||||||
|
reject(new RateLimitError(secs));
|
||||||
|
} catch {
|
||||||
|
reject(new RateLimitError(60));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
const body = JSON.parse(xhr.responseText);
|
const body = JSON.parse(xhr.responseText);
|
||||||
@@ -205,6 +237,13 @@ async function uploadItem(id: string): Promise<void> {
|
|||||||
await database.put(STORE_NAME, entry);
|
await database.put(STORE_NAME, entry);
|
||||||
updateItemStatus(id, 'done');
|
updateItemStatus(id, 'done');
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
if (e instanceof RateLimitError) {
|
||||||
|
// Reset to pending so it will be retried when the queue resumes
|
||||||
|
entry.status = 'pending';
|
||||||
|
await database.put(STORE_NAME, entry);
|
||||||
|
updateItemStatus(id, 'pending');
|
||||||
|
throw e; // Propagate to processQueue for scheduling
|
||||||
|
}
|
||||||
const msg = e instanceof Error ? e.message : 'Upload fehlgeschlagen.';
|
const msg = e instanceof Error ? e.message : 'Upload fehlgeschlagen.';
|
||||||
entry.status = 'error';
|
entry.status = 'error';
|
||||||
entry.error = msg;
|
entry.error = msg;
|
||||||
@@ -224,7 +263,7 @@ function updateItemStatus(
|
|||||||
? {
|
? {
|
||||||
...item,
|
...item,
|
||||||
status,
|
status,
|
||||||
progress: status === 'done' ? 100 : status === 'error' ? item.progress : item.progress,
|
progress: status === 'done' ? 100 : status === 'pending' ? 0 : item.progress,
|
||||||
error
|
error
|
||||||
}
|
}
|
||||||
: item
|
: item
|
||||||
|
|||||||
Reference in New Issue
Block a user