diff --git a/backend/src/crawler/browser_manager.rs b/backend/src/crawler/browser_manager.rs index c1725e4..8bc7cd4 100644 --- a/backend/src/crawler/browser_manager.rs +++ b/backend/src/crawler/browser_manager.rs @@ -13,7 +13,7 @@ //! until [`BrowserManager::shutdown`]. use std::ops::Deref; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -71,12 +71,38 @@ impl ActiveTracker { } } +/// Lifecycle gate for a coordinated browser restart. `acquire()` parks +/// while not [`RestartPhase::Healthy`] so no new navigation starts mid- +/// restart; long-lived lease holders (the metadata pass) cooperate by +/// checking [`BrowserManager::is_restart_pending`] at safe boundaries. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub enum RestartPhase { + /// Normal operation — acquires proceed. + Healthy, + /// Restart requested; new acquires park, waiting for in-flight leases + /// to drain. + Draining, + /// Chromium is being closed + relaunched. + Restarting, +} + +const PHASE_HEALTHY: u8 = 0; +const PHASE_DRAINING: u8 = 1; +const PHASE_RESTARTING: u8 = 2; + pub struct BrowserManager { inner: Mutex, active: Arc, launch_opts: LaunchOptions, idle_timeout: Duration, on_launch: OnLaunch, + /// Coarse lifecycle phase (one of the `PHASE_*` constants). + phase: AtomicU8, + /// Woken when the phase returns to `Healthy` so parked acquires resume. + resume: Notify, + /// Serialises coordinated restarts so concurrent requests collapse into + /// a single relaunch. + restart_lock: Mutex<()>, } struct Inner { @@ -99,28 +125,71 @@ impl BrowserManager { launch_opts, idle_timeout, on_launch, + phase: AtomicU8::new(PHASE_HEALTHY), + resume: Notify::new(), + restart_lock: Mutex::new(()), }) } + /// Current restart phase. + pub fn phase(&self) -> RestartPhase { + match self.phase.load(Ordering::Acquire) { + PHASE_DRAINING => RestartPhase::Draining, + PHASE_RESTARTING => RestartPhase::Restarting, + _ => RestartPhase::Healthy, + } + } + + fn set_phase(&self, phase: RestartPhase) { + let v = match phase { + RestartPhase::Healthy => PHASE_HEALTHY, + RestartPhase::Draining => PHASE_DRAINING, + RestartPhase::Restarting => PHASE_RESTARTING, + }; + self.phase.store(v, Ordering::Release); + } + + /// Whether a coordinated restart is in progress. Long-lived lease + /// holders poll this at safe boundaries and yield their lease so the + /// drain can complete promptly. + pub fn is_restart_pending(&self) -> bool { + self.phase() != RestartPhase::Healthy + } + + /// Launch Chromium into `guard`, running the `on_launch` hook before + /// publishing the handle so a probe failure doesn't leave a half- + /// initialised browser behind. + async fn launch_into(&self, guard: &mut Inner) -> anyhow::Result<()> { + let handle = browser::launch(self.launch_opts.clone()) + .await + .context("BrowserManager: launch chromium")?; + let shared = handle.shared(); + if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await { + let _ = handle.close().await; + return Err(e.context("BrowserManager: on_launch hook failed")); + } + guard.handle = Some(handle); + guard.shared = Some(shared); + Ok(()) + } + /// Acquire a shared browser lease. The first acquire after a teardown /// launches a fresh Chromium (and runs `on_launch`); subsequent acquires /// while a process is alive just bump the counter and clone the `Arc`. pub async fn acquire(&self) -> anyhow::Result { + // Park while a coordinated restart is draining/relaunching so no new + // navigation starts against a browser that's about to be torn down. + // The short sleep fallback guarantees liveness even if a `resume` + // notification is missed (classic Notify lost-wakeup). + while self.phase() != RestartPhase::Healthy { + tokio::select! { + _ = self.resume.notified() => {} + _ = tokio::time::sleep(Duration::from_millis(100)) => {} + } + } let mut guard = self.inner.lock().await; if guard.handle.is_none() { - let handle = browser::launch(self.launch_opts.clone()) - .await - .context("BrowserManager: launch chromium")?; - let shared = handle.shared(); - // Run the on-launch hook before publishing the handle so a session - // probe failure doesn't leave a half-initialized browser behind. - if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await { - // Close the just-launched browser since we won't be using it. - let _ = handle.close().await; - return Err(e.context("BrowserManager: on_launch hook failed")); - } - guard.handle = Some(handle); - guard.shared = Some(shared); + self.launch_into(&mut guard).await?; } let browser = guard .shared @@ -134,6 +203,45 @@ impl BrowserManager { }) } + /// Coordinated restart: block new acquires, wait for in-flight leases + /// to drain (up to `drain_deadline`, then force), close + relaunch + /// Chromium (re-running `on_launch` → re-inject session + probe), then + /// resume parked acquirers. Concurrent calls collapse into one + /// relaunch. The phase is always returned to `Healthy` — even if the + /// relaunch errors — so a failed restart never permanently wedges + /// acquisition (the next acquire retries the launch lazily). + pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> { + // Dedup: if a restart is already running, wait for it and return. + let _restart_guard = match self.restart_lock.try_lock() { + Ok(g) => g, + Err(_) => { + let _ = self.restart_lock.lock().await; + return Ok(()); + } + }; + + self.set_phase(RestartPhase::Draining); + await_drain(&self.active, drain_deadline).await; + + self.set_phase(RestartPhase::Restarting); + let relaunch = { + let mut guard = self.inner.lock().await; + guard.shared = None; + if let Some(handle) = guard.handle.take() { + let _ = handle.close().await; + } + self.launch_into(&mut guard).await + }; + + self.set_phase(RestartPhase::Healthy); + self.resume.notify_waiters(); + match &relaunch { + Ok(()) => tracing::info!("BrowserManager: coordinated restart complete"), + Err(e) => tracing::error!(error = ?e, "BrowserManager: coordinated restart relaunch failed"), + } + relaunch.context("coordinated_restart: relaunch") + } + /// Forcefully close the cached browser regardless of active count. /// Used on daemon shutdown. After this returns the next acquire will /// re-launch from scratch. @@ -176,6 +284,29 @@ impl BrowserManager { } } +/// Wait for the active-lease count to reach zero, up to `deadline`. Wakes +/// on the tracker's idle signal and re-checks on a short poll so a missed +/// signal can't strand the drain. Returns when drained or when the +/// deadline elapses (the caller then force-restarts). Extracted as a free +/// fn so the timing logic is unit-testable without launching Chromium. +async fn await_drain(active: &Arc, deadline: Duration) { + let start = tokio::time::Instant::now(); + while active.current() > 0 { + let Some(remaining) = deadline.checked_sub(start.elapsed()) else { + tracing::warn!( + active = active.current(), + "coordinated_restart: drain deadline exceeded — forcing relaunch" + ); + return; + }; + let nap = remaining.min(Duration::from_millis(250)); + tokio::select! { + _ = active.idle_signal().notified() => {} + _ = tokio::time::sleep(nap) => {} + } + } +} + /// Background reaper. Returns immediately when `idle_timeout == 0`. /// Otherwise spawns a task that: /// 1. Waits on `idle_signal` (woken when active hits zero). @@ -270,6 +401,63 @@ mod tests { mgr.invalidate().await; } + #[tokio::test] + async fn await_drain_returns_immediately_when_already_idle() { + let active = ActiveTracker::new(); + let start = tokio::time::Instant::now(); + await_drain(&active, Duration::from_secs(5)).await; + assert!(start.elapsed() < Duration::from_millis(200), "no wait when idle"); + } + + #[tokio::test] + async fn await_drain_completes_when_lease_released() { + let active = ActiveTracker::new(); + active.acquire(); + let bg = { + let a = Arc::clone(&active); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + a.release(); + }) + }; + // Generous deadline; should return shortly after the release, not + // at the deadline. + let start = tokio::time::Instant::now(); + await_drain(&active, Duration::from_secs(5)).await; + assert!(start.elapsed() < Duration::from_secs(2), "drained on release"); + assert_eq!(active.current(), 0); + bg.await.unwrap(); + } + + #[tokio::test] + async fn await_drain_force_returns_after_deadline_when_stuck() { + let active = ActiveTracker::new(); + active.acquire(); // never released + let start = tokio::time::Instant::now(); + await_drain(&active, Duration::from_millis(300)).await; + let elapsed = start.elapsed(); + assert!(elapsed >= Duration::from_millis(250), "waited ~deadline: {elapsed:?}"); + assert!(elapsed < Duration::from_secs(2), "but not forever: {elapsed:?}"); + assert_eq!(active.current(), 1, "still held — caller force-restarts"); + } + + #[test] + fn phase_transitions_reflect_is_restart_pending() { + let mgr = BrowserManager::new( + crate::crawler::browser::LaunchOptions::default(), + Duration::ZERO, + noop_on_launch(), + ); + assert_eq!(mgr.phase(), RestartPhase::Healthy); + assert!(!mgr.is_restart_pending()); + mgr.set_phase(RestartPhase::Draining); + assert!(mgr.is_restart_pending()); + mgr.set_phase(RestartPhase::Restarting); + assert!(mgr.is_restart_pending()); + mgr.set_phase(RestartPhase::Healthy); + assert!(!mgr.is_restart_pending()); + } + #[tokio::test] async fn active_tracker_signals_idle_only_on_zero_transition() { let tracker = ActiveTracker::new(); diff --git a/backend/src/crawler/pipeline.rs b/backend/src/crawler/pipeline.rs index 3e17c3c..ffa37e7 100644 --- a/backend/src/crawler/pipeline.rs +++ b/backend/src/crawler/pipeline.rs @@ -192,6 +192,17 @@ pub async fn run_metadata_pass( } }; for r in batch { + // Cooperative checkpoint: if a coordinated browser restart is + // pending, yield our (long-lived) lease so the drain can + // proceed instead of stalling for the rest of the walk. The + // pass exits un-clean, so the next tick recovery-sweeps the + // tail we didn't reach. + if browser_manager.is_restart_pending() { + tracing::info!( + "metadata pass: browser restart pending — yielding (recovery sweep next tick)" + ); + break 'outer; + } if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) { hit_limit = true; tracing::info!(cap = ?max_refs, "max_results reached; halting walk");