diff --git a/backend/src/api/admin/crawler.rs b/backend/src/api/admin/crawler.rs index d9d445d..804ba4f 100644 --- a/backend/src/api/admin/crawler.rs +++ b/backend/src/api/admin/crawler.rs @@ -6,9 +6,14 @@ //! ([`crate::crawler::status`]) with DB-derived queue counts and the //! session/browser flags. +use std::convert::Infallible; +use std::time::Duration; + use axum::extract::{Query, State}; +use axum::response::sse::{Event, KeepAlive, Sse}; use axum::routing::{get, post}; use axum::{Json, Router}; +use futures_util::stream::Stream; use serde::{Deserialize, Serialize}; use serde_json::json; use uuid::Uuid; @@ -21,9 +26,16 @@ use crate::error::{AppError, AppResult}; use crate::repo; use crate::repo::crawler::{DeadJob, RequeueScope}; +/// Backstop recompose interval for the SSE stream. Phase/worker/session +/// changes push instantly via the status `watch`; this only bounds the +/// staleness of DB-derived queue counts and the browser phase when those +/// change without an accompanying status poke. +const SSE_BACKSTOP: Duration = Duration::from_secs(5); + pub fn routes() -> Router { Router::new() .route("/admin/crawler", get(get_status)) + .route("/admin/crawler/stream", get(stream_status)) .route("/admin/crawler/run", post(run_now)) .route("/admin/crawler/browser/restart", post(restart_browser)) .route("/admin/crawler/session", post(update_session)) @@ -75,10 +87,10 @@ fn browser_phase_str(p: RestartPhase) -> &'static str { } } -async fn get_status( - State(state): State, - _admin: RequireAdmin, -) -> AppResult> { +/// Compose a full status snapshot from the in-memory status, the +/// browser/session flags, and a fresh DB queue-count query. Shared by the +/// one-shot `get_status` and the SSE `stream_status`. +async fn compose_status(state: &AppState) -> AppResult { let (pending, running, dead) = repo::crawler::job_state_counts(&state.db).await?; let queue = QueueCounts { pending, @@ -86,7 +98,7 @@ async fn get_status( dead, }; - let resp = match state.crawler.as_ref() { + Ok(match state.crawler.as_ref() { None => CrawlerStatusResponse { daemon: "disabled", phase: None, @@ -114,8 +126,64 @@ async fn get_status( queue, } } - }; - Ok(Json(resp)) + }) +} + +async fn get_status( + State(state): State, + _admin: RequireAdmin, +) -> AppResult> { + Ok(Json(compose_status(&state).await?)) +} + +// --------------------------------------------------------------------------- +// GET /admin/crawler/stream — Server-Sent Events live status +// --------------------------------------------------------------------------- + +/// Push live status to the dashboard instead of polling. Emits a snapshot +/// immediately on connect, then on every status change (instant, via the +/// `watch` notifier) and on a [`SSE_BACKSTOP`] tick (to refresh DB queue +/// counts / browser phase that change without a status poke). The browser +/// opens this only while the crawler page is mounted and closes it on +/// navigate-away, so the subscription is scoped to the active page. +async fn stream_status( + State(state): State, + _admin: RequireAdmin, +) -> Sse>> { + // Subscribe before the first emit so no change between the initial + // snapshot and the first await is lost. + let rx = state.crawler.as_ref().map(|c| c.status.subscribe()); + + let stream = futures_util::stream::unfold( + (state, rx, true), + |(state, mut rx, first)| async move { + // After the first immediate emit, wait for a change or the + // backstop tick before recomposing. + if !first { + match rx.as_mut() { + Some(rx) => { + tokio::select! { + _ = rx.changed() => {} + _ = tokio::time::sleep(SSE_BACKSTOP) => {} + } + } + None => tokio::time::sleep(SSE_BACKSTOP).await, + } + } + // Compose; on a transient DB error, emit a keep-alive comment + // rather than tearing down the stream. + let event = match compose_status(&state).await { + Ok(resp) => Event::default() + .event("status") + .json_data(&resp) + .unwrap_or_else(|_| Event::default().comment("serialize error")), + Err(_) => Event::default().comment("status unavailable"), + }; + Some((Ok(event), (state, rx, false))) + }, + ); + + Sse::new(stream).keep_alive(KeepAlive::default()) } // --------------------------------------------------------------------------- @@ -165,6 +233,8 @@ async fn restart_browser( ) -> AppResult> { let c = require_crawler(&state)?; let result = c.browser_manager.coordinated_restart(c.drain_deadline).await; + // Push the post-restart browser phase to live subscribers immediately. + c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, @@ -215,6 +285,8 @@ async fn update_session( // Relaunch the browser so on_launch re-injects the new cookie and // re-probes — the restart's success IS the session-validity signal. let probe = c.browser_manager.coordinated_restart(c.drain_deadline).await; + // Session + browser state changed — push to live subscribers. + c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, @@ -247,6 +319,8 @@ async fn clear_session_expired( ) -> AppResult> { let c = require_crawler(&state)?; c.session.clear_expired(); + // session.expired flipped — push to live subscribers. + c.status.poke(); repo::admin_audit::insert( &state.db, admin.0.id, diff --git a/backend/src/crawler/daemon.rs b/backend/src/crawler/daemon.rs index 602790a..2120a3f 100644 --- a/backend/src/crawler/daemon.rs +++ b/backend/src/crawler/daemon.rs @@ -447,6 +447,8 @@ impl WorkerContext { "session expired — workers will idle until restart" ); self.session_expired.store(true, Ordering::Release); + // Push the session-expired flip to live status subscribers. + self.status.poke(); let _ = jobs::release(&self.pool, lease.id).await; } Ok(Err(e)) => { diff --git a/backend/src/crawler/status.rs b/backend/src/crawler/status.rs index aaffca3..077676e 100644 --- a/backend/src/crawler/status.rs +++ b/backend/src/crawler/status.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use serde::Serialize; -use tokio::sync::RwLock; +use tokio::sync::{watch, RwLock}; use uuid::Uuid; use crate::crawler::pipeline::MetadataStats; @@ -84,46 +84,82 @@ impl CrawlerStatus { /// Cloneable handle the daemon tasks use to publish status. Cheap to clone /// (`Arc`). All writers funnel through the helper methods so locking stays -/// localised. +/// localised. Every mutation bumps a `watch` version so SSE subscribers +/// get pushed an update instead of polling. #[derive(Clone)] -pub struct StatusHandle(Arc>); +pub struct StatusHandle { + inner: Arc>, + /// Monotonic version bumped on every change. SSE handlers + /// `subscribe()` and `await .changed()` for instant pushes; `watch` + /// has no lost-wakeup so a change between snapshots is never missed. + version: Arc>, +} impl StatusHandle { pub fn new(num_workers: usize) -> Self { - Self(Arc::new(RwLock::new(CrawlerStatus::new(num_workers)))) + let (version, _rx) = watch::channel(0u64); + Self { + inner: Arc::new(RwLock::new(CrawlerStatus::new(num_workers))), + version: Arc::new(version), + } + } + + fn bump(&self) { + self.version.send_modify(|v| *v = v.wrapping_add(1)); + } + + /// A receiver whose `.changed()` resolves on the next status change. + pub fn subscribe(&self) -> watch::Receiver { + self.version.subscribe() + } + + /// Signal a change without mutating in-memory state — used when an + /// *external* signal the live snapshot reflects (browser phase, + /// session-expired flag, queue counts) has changed, so subscribers + /// recompose promptly. + pub fn poke(&self) { + self.bump(); } pub async fn set_phase(&self, phase: Phase) { - self.0.write().await.phase = phase; + self.inner.write().await.phase = phase; + self.bump(); } pub async fn set_worker(&self, id: usize, state: WorkerState) { - let mut s = self.0.write().await; - if let Some(slot) = s.workers.get_mut(id) { - *slot = state; + { + let mut s = self.inner.write().await; + if let Some(slot) = s.workers.get_mut(id) { + *slot = state; + } } + self.bump(); } /// Record a finished metadata pass. Stamps `at` with `now`. pub async fn record_pass(&self, stats: &MetadataStats, at: DateTime) { - let mut s = self.0.write().await; - s.last_pass = LastPass { - at: Some(at), - discovered: stats.discovered, - upserted: stats.upserted, - covers_fetched: stats.covers_fetched, - mangas_failed: stats.mangas_failed, - }; + { + let mut s = self.inner.write().await; + s.last_pass = LastPass { + at: Some(at), + discovered: stats.discovered, + upserted: stats.upserted, + covers_fetched: stats.covers_fetched, + mangas_failed: stats.mangas_failed, + }; + } + self.bump(); } /// Seed the last-pass summary from a persisted `crawler_state` value on /// startup so the dashboard isn't blank until the first tick. pub async fn set_last_pass(&self, last: LastPass) { - self.0.write().await.last_pass = last; + self.inner.write().await.last_pass = last; + self.bump(); } pub async fn snapshot(&self) -> CrawlerStatus { - self.0.read().await.clone() + self.inner.read().await.clone() } } @@ -159,4 +195,16 @@ mod tests { assert_eq!(snap.last_pass.upserted, 3); assert_eq!(snap.last_pass.at, Some(at)); } + + #[tokio::test] + async fn subscribe_resolves_on_mutation_and_poke() { + let h = StatusHandle::new(1); + let mut rx = h.subscribe(); + // A mutation wakes the subscriber. + h.set_phase(Phase::WalkingList).await; + rx.changed().await.unwrap(); + // A bare poke (external signal) also wakes it. + h.poke(); + rx.changed().await.unwrap(); + } } diff --git a/backend/tests/api_admin_crawler.rs b/backend/tests/api_admin_crawler.rs index bbedf77..d1b8b9f 100644 --- a/backend/tests/api_admin_crawler.rs +++ b/backend/tests/api_admin_crawler.rs @@ -7,8 +7,11 @@ mod common; +use std::time::Duration; + use axum::http::StatusCode; use axum::Router; +use http_body_util::BodyExt; use serde_json::json; use sqlx::PgPool; use tower::ServiceExt; @@ -121,6 +124,70 @@ async fn control_endpoints_return_503_when_daemon_disabled(pool: PgPool) { } } +#[sqlx::test(migrations = "./migrations")] +async fn status_stream_requires_admin(pool: PgPool) { + let h = harness(pool); + // Unauthenticated → 401. + let resp = h + .app + .clone() + .oneshot(get("/api/v1/admin/crawler/stream")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::UNAUTHORIZED); + // Non-admin → 403. + let (_u, cookie) = register_user(&h.app).await; + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::FORBIDDEN); +} + +#[sqlx::test(migrations = "./migrations")] +async fn status_stream_emits_initial_event(pool: PgPool) { + let h = harness(pool.clone()); + let cookie = seed_admin(&pool, &h.app).await; + + let resp = h + .app + .clone() + .oneshot(get_with_cookie("/api/v1/admin/crawler/stream", &cookie)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let ct = resp + .headers() + .get(axum::http::header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .unwrap_or_default() + .to_string(); + assert!(ct.starts_with("text/event-stream"), "content-type was {ct:?}"); + + // Accumulate frames (the immediate snapshot may arrive split across + // frames) until the status payload appears, with an overall timeout so + // the never-ending stream can't hang the test. + let mut body = resp.into_body(); + let mut acc = String::new(); + let deadline = tokio::time::timeout(Duration::from_secs(5), async { + loop { + let Some(frame) = body.frame().await else { break }; + if let Ok(data) = frame.expect("frame ok").into_data() { + acc.push_str(&String::from_utf8_lossy(&data)); + if acc.contains("\"daemon\"") { + break; + } + } + } + }) + .await; + assert!(deadline.is_ok(), "did not receive status within 5s; got: {acc:?}"); + assert!(acc.contains("\"daemon\""), "missing status payload: {acc}"); + assert!(acc.contains("status"), "missing SSE event name: {acc}"); +} + #[sqlx::test(migrations = "./migrations")] async fn mutating_endpoints_reject_non_admin(pool: PgPool) { let h = harness(pool); diff --git a/frontend/src/lib/api/admin.test.ts b/frontend/src/lib/api/admin.test.ts index a2fc06c..99381da 100644 --- a/frontend/src/lib/api/admin.test.ts +++ b/frontend/src/lib/api/admin.test.ts @@ -18,6 +18,7 @@ import { resyncManga, resyncChapter, getCrawlerStatus, + crawlerStatusStreamUrl, runCrawlerPass, restartCrawlerBrowser, updateCrawlerSession, @@ -356,6 +357,10 @@ describe('admin crawler api client', () => { queue: { pending: 2, running: 1, dead: 4 } }; + it('crawlerStatusStreamUrl points at the SSE endpoint under the API base', () => { + expect(crawlerStatusStreamUrl()).toMatch(/\/v1\/admin\/crawler\/stream$/); + }); + it('getCrawlerStatus GETs /v1/admin/crawler', async () => { fetchSpy.mockResolvedValueOnce(ok(statusFixture)); const s = await getCrawlerStatus(); diff --git a/frontend/src/lib/api/admin.ts b/frontend/src/lib/api/admin.ts index 09a021a..238b28e 100644 --- a/frontend/src/lib/api/admin.ts +++ b/frontend/src/lib/api/admin.ts @@ -3,7 +3,7 @@ // won't reach these routes). 403s thrown here propagate up to the // /admin layout, which renders the framework error page. -import { request, type Page } from './client'; +import { request, apiUrl, type Page } from './client'; import type { User } from './auth'; import type { MangaDetail } from './mangas'; import type { Chapter } from './chapters'; @@ -248,6 +248,14 @@ export async function getCrawlerStatus(): Promise { return request('/v1/admin/crawler'); } +/** URL of the Server-Sent Events live-status stream. Open with + * `new EventSource(...)` while the crawler page is mounted and close it on + * navigate-away so the subscription is scoped to the active page. Each + * message is a named `status` event whose `data` is a {@link CrawlerStatus}. */ +export function crawlerStatusStreamUrl(): string { + return apiUrl('/v1/admin/crawler/stream'); +} + /** POST /v1/admin/crawler/run — trigger an out-of-cycle metadata pass. */ export async function runCrawlerPass(): Promise<{ started: boolean }> { return request('/v1/admin/crawler/run', { method: 'POST' }); diff --git a/frontend/src/lib/api/client.ts b/frontend/src/lib/api/client.ts index 6b690be..c96414f 100644 --- a/frontend/src/lib/api/client.ts +++ b/frontend/src/lib/api/client.ts @@ -12,6 +12,15 @@ export function fileUrl(key: string): string { return `${BASE}/v1/files/${key}`; } +/** + * Builds an API URL for non-`fetch` consumers (e.g. `EventSource` for SSE), + * applying the same `VITE_API_BASE` prefix as `request()`. `path` is the + * route after the base, e.g. `/v1/admin/crawler/stream`. + */ +export function apiUrl(path: string): string { + return `${BASE}${path}`; +} + export class ApiError extends Error { constructor( public readonly status: number, diff --git a/frontend/src/routes/admin/crawler/+page.svelte b/frontend/src/routes/admin/crawler/+page.svelte index 5fb7c7c..ee53f48 100644 --- a/frontend/src/routes/admin/crawler/+page.svelte +++ b/frontend/src/routes/admin/crawler/+page.svelte @@ -4,6 +4,7 @@ import Pager from '$lib/components/Pager.svelte'; import { getCrawlerStatus, + crawlerStatusStreamUrl, runCrawlerPass, restartCrawlerBrowser, updateCrawlerSession, @@ -19,7 +20,8 @@ let status: CrawlerStatus | null = $state(null); let error: string | null = $state(null); let notice: string | null = $state(null); - let timer: ReturnType | null = null; + let live = $state(false); + let source: EventSource | null = null; let busy = $state(false); // Dead jobs @@ -58,13 +60,40 @@ } } + // Live updates via Server-Sent Events instead of polling. The + // EventSource is opened on mount and closed on destroy, so the + // subscription exists only while this page is showing live data. + function openStream() { + const es = new EventSource(crawlerStatusStreamUrl(), { withCredentials: true }); + es.addEventListener('status', (e) => { + try { + status = JSON.parse((e as MessageEvent).data) as CrawlerStatus; + error = null; + live = true; + } catch { + // ignore a malformed frame; the next one will replace it + } + }); + es.onopen = () => { + live = true; + }; + es.onerror = () => { + // The browser auto-reconnects; reflect the gap in the UI. + live = false; + }; + source = es; + } + onMount(() => { + // One-shot fetch for instant initial paint + resilience if SSE is + // blocked; the stream then drives subsequent updates. refresh(); loadDeadJobs(); - timer = setInterval(refresh, 5000); + openStream(); }); onDestroy(() => { - if (timer) clearInterval(timer); + source?.close(); + source = null; }); async function withBusy(label: string, fn: () => Promise) { @@ -188,7 +217,12 @@ const deadTotalPages = $derived(Math.max(1, Math.ceil(deadTotal / DEAD_LIMIT))); -

Crawler

+
+

Crawler

+ + {live ? '● live' : '○ reconnecting…'} + +
{#if error} @@ -403,7 +437,20 @@