feat(crawler): coordinated browser restart gate in BrowserManager
Adds a Healthy/Draining/Restarting lifecycle gate. `acquire()` parks while a restart is in progress; `coordinated_restart(deadline)` blocks new acquires, drains in-flight leases (bounded, then forces), closes + relaunches Chromium (re-running on_launch → re-inject session + probe), then resumes parked acquirers. Concurrent restart requests collapse into one relaunch; the phase always returns to Healthy so a failed relaunch never wedges acquisition. The metadata pass cooperates via is_restart_pending() at its per-manga checkpoint, yielding its long-lived lease (recovery sweep next tick) instead of stalling the drain. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -13,7 +13,7 @@
|
||||
//! until [`BrowserManager::shutdown`].
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -71,12 +71,38 @@ impl ActiveTracker {
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifecycle gate for a coordinated browser restart. `acquire()` parks
|
||||
/// while not [`RestartPhase::Healthy`] so no new navigation starts mid-
|
||||
/// restart; long-lived lease holders (the metadata pass) cooperate by
|
||||
/// checking [`BrowserManager::is_restart_pending`] at safe boundaries.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
pub enum RestartPhase {
|
||||
/// Normal operation — acquires proceed.
|
||||
Healthy,
|
||||
/// Restart requested; new acquires park, waiting for in-flight leases
|
||||
/// to drain.
|
||||
Draining,
|
||||
/// Chromium is being closed + relaunched.
|
||||
Restarting,
|
||||
}
|
||||
|
||||
const PHASE_HEALTHY: u8 = 0;
|
||||
const PHASE_DRAINING: u8 = 1;
|
||||
const PHASE_RESTARTING: u8 = 2;
|
||||
|
||||
pub struct BrowserManager {
|
||||
inner: Mutex<Inner>,
|
||||
active: Arc<ActiveTracker>,
|
||||
launch_opts: LaunchOptions,
|
||||
idle_timeout: Duration,
|
||||
on_launch: OnLaunch,
|
||||
/// Coarse lifecycle phase (one of the `PHASE_*` constants).
|
||||
phase: AtomicU8,
|
||||
/// Woken when the phase returns to `Healthy` so parked acquires resume.
|
||||
resume: Notify,
|
||||
/// Serialises coordinated restarts so concurrent requests collapse into
|
||||
/// a single relaunch.
|
||||
restart_lock: Mutex<()>,
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
@@ -99,28 +125,71 @@ impl BrowserManager {
|
||||
launch_opts,
|
||||
idle_timeout,
|
||||
on_launch,
|
||||
phase: AtomicU8::new(PHASE_HEALTHY),
|
||||
resume: Notify::new(),
|
||||
restart_lock: Mutex::new(()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Current restart phase.
|
||||
pub fn phase(&self) -> RestartPhase {
|
||||
match self.phase.load(Ordering::Acquire) {
|
||||
PHASE_DRAINING => RestartPhase::Draining,
|
||||
PHASE_RESTARTING => RestartPhase::Restarting,
|
||||
_ => RestartPhase::Healthy,
|
||||
}
|
||||
}
|
||||
|
||||
fn set_phase(&self, phase: RestartPhase) {
|
||||
let v = match phase {
|
||||
RestartPhase::Healthy => PHASE_HEALTHY,
|
||||
RestartPhase::Draining => PHASE_DRAINING,
|
||||
RestartPhase::Restarting => PHASE_RESTARTING,
|
||||
};
|
||||
self.phase.store(v, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Whether a coordinated restart is in progress. Long-lived lease
|
||||
/// holders poll this at safe boundaries and yield their lease so the
|
||||
/// drain can complete promptly.
|
||||
pub fn is_restart_pending(&self) -> bool {
|
||||
self.phase() != RestartPhase::Healthy
|
||||
}
|
||||
|
||||
/// Launch Chromium into `guard`, running the `on_launch` hook before
|
||||
/// publishing the handle so a probe failure doesn't leave a half-
|
||||
/// initialised browser behind.
|
||||
async fn launch_into(&self, guard: &mut Inner) -> anyhow::Result<()> {
|
||||
let handle = browser::launch(self.launch_opts.clone())
|
||||
.await
|
||||
.context("BrowserManager: launch chromium")?;
|
||||
let shared = handle.shared();
|
||||
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
|
||||
let _ = handle.close().await;
|
||||
return Err(e.context("BrowserManager: on_launch hook failed"));
|
||||
}
|
||||
guard.handle = Some(handle);
|
||||
guard.shared = Some(shared);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Acquire a shared browser lease. The first acquire after a teardown
|
||||
/// launches a fresh Chromium (and runs `on_launch`); subsequent acquires
|
||||
/// while a process is alive just bump the counter and clone the `Arc`.
|
||||
pub async fn acquire(&self) -> anyhow::Result<BrowserLease> {
|
||||
// Park while a coordinated restart is draining/relaunching so no new
|
||||
// navigation starts against a browser that's about to be torn down.
|
||||
// The short sleep fallback guarantees liveness even if a `resume`
|
||||
// notification is missed (classic Notify lost-wakeup).
|
||||
while self.phase() != RestartPhase::Healthy {
|
||||
tokio::select! {
|
||||
_ = self.resume.notified() => {}
|
||||
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
|
||||
}
|
||||
}
|
||||
let mut guard = self.inner.lock().await;
|
||||
if guard.handle.is_none() {
|
||||
let handle = browser::launch(self.launch_opts.clone())
|
||||
.await
|
||||
.context("BrowserManager: launch chromium")?;
|
||||
let shared = handle.shared();
|
||||
// Run the on-launch hook before publishing the handle so a session
|
||||
// probe failure doesn't leave a half-initialized browser behind.
|
||||
if let Err(e) = (self.on_launch)(Arc::clone(&shared)).await {
|
||||
// Close the just-launched browser since we won't be using it.
|
||||
let _ = handle.close().await;
|
||||
return Err(e.context("BrowserManager: on_launch hook failed"));
|
||||
}
|
||||
guard.handle = Some(handle);
|
||||
guard.shared = Some(shared);
|
||||
self.launch_into(&mut guard).await?;
|
||||
}
|
||||
let browser = guard
|
||||
.shared
|
||||
@@ -134,6 +203,45 @@ impl BrowserManager {
|
||||
})
|
||||
}
|
||||
|
||||
/// Coordinated restart: block new acquires, wait for in-flight leases
|
||||
/// to drain (up to `drain_deadline`, then force), close + relaunch
|
||||
/// Chromium (re-running `on_launch` → re-inject session + probe), then
|
||||
/// resume parked acquirers. Concurrent calls collapse into one
|
||||
/// relaunch. The phase is always returned to `Healthy` — even if the
|
||||
/// relaunch errors — so a failed restart never permanently wedges
|
||||
/// acquisition (the next acquire retries the launch lazily).
|
||||
pub async fn coordinated_restart(&self, drain_deadline: Duration) -> anyhow::Result<()> {
|
||||
// Dedup: if a restart is already running, wait for it and return.
|
||||
let _restart_guard = match self.restart_lock.try_lock() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
let _ = self.restart_lock.lock().await;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
self.set_phase(RestartPhase::Draining);
|
||||
await_drain(&self.active, drain_deadline).await;
|
||||
|
||||
self.set_phase(RestartPhase::Restarting);
|
||||
let relaunch = {
|
||||
let mut guard = self.inner.lock().await;
|
||||
guard.shared = None;
|
||||
if let Some(handle) = guard.handle.take() {
|
||||
let _ = handle.close().await;
|
||||
}
|
||||
self.launch_into(&mut guard).await
|
||||
};
|
||||
|
||||
self.set_phase(RestartPhase::Healthy);
|
||||
self.resume.notify_waiters();
|
||||
match &relaunch {
|
||||
Ok(()) => tracing::info!("BrowserManager: coordinated restart complete"),
|
||||
Err(e) => tracing::error!(error = ?e, "BrowserManager: coordinated restart relaunch failed"),
|
||||
}
|
||||
relaunch.context("coordinated_restart: relaunch")
|
||||
}
|
||||
|
||||
/// Forcefully close the cached browser regardless of active count.
|
||||
/// Used on daemon shutdown. After this returns the next acquire will
|
||||
/// re-launch from scratch.
|
||||
@@ -176,6 +284,29 @@ impl BrowserManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the active-lease count to reach zero, up to `deadline`. Wakes
|
||||
/// on the tracker's idle signal and re-checks on a short poll so a missed
|
||||
/// signal can't strand the drain. Returns when drained or when the
|
||||
/// deadline elapses (the caller then force-restarts). Extracted as a free
|
||||
/// fn so the timing logic is unit-testable without launching Chromium.
|
||||
async fn await_drain(active: &Arc<ActiveTracker>, deadline: Duration) {
|
||||
let start = tokio::time::Instant::now();
|
||||
while active.current() > 0 {
|
||||
let Some(remaining) = deadline.checked_sub(start.elapsed()) else {
|
||||
tracing::warn!(
|
||||
active = active.current(),
|
||||
"coordinated_restart: drain deadline exceeded — forcing relaunch"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let nap = remaining.min(Duration::from_millis(250));
|
||||
tokio::select! {
|
||||
_ = active.idle_signal().notified() => {}
|
||||
_ = tokio::time::sleep(nap) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Background reaper. Returns immediately when `idle_timeout == 0`.
|
||||
/// Otherwise spawns a task that:
|
||||
/// 1. Waits on `idle_signal` (woken when active hits zero).
|
||||
@@ -270,6 +401,63 @@ mod tests {
|
||||
mgr.invalidate().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_returns_immediately_when_already_idle() {
|
||||
let active = ActiveTracker::new();
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_secs(5)).await;
|
||||
assert!(start.elapsed() < Duration::from_millis(200), "no wait when idle");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_completes_when_lease_released() {
|
||||
let active = ActiveTracker::new();
|
||||
active.acquire();
|
||||
let bg = {
|
||||
let a = Arc::clone(&active);
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
a.release();
|
||||
})
|
||||
};
|
||||
// Generous deadline; should return shortly after the release, not
|
||||
// at the deadline.
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_secs(5)).await;
|
||||
assert!(start.elapsed() < Duration::from_secs(2), "drained on release");
|
||||
assert_eq!(active.current(), 0);
|
||||
bg.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_drain_force_returns_after_deadline_when_stuck() {
|
||||
let active = ActiveTracker::new();
|
||||
active.acquire(); // never released
|
||||
let start = tokio::time::Instant::now();
|
||||
await_drain(&active, Duration::from_millis(300)).await;
|
||||
let elapsed = start.elapsed();
|
||||
assert!(elapsed >= Duration::from_millis(250), "waited ~deadline: {elapsed:?}");
|
||||
assert!(elapsed < Duration::from_secs(2), "but not forever: {elapsed:?}");
|
||||
assert_eq!(active.current(), 1, "still held — caller force-restarts");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn phase_transitions_reflect_is_restart_pending() {
|
||||
let mgr = BrowserManager::new(
|
||||
crate::crawler::browser::LaunchOptions::default(),
|
||||
Duration::ZERO,
|
||||
noop_on_launch(),
|
||||
);
|
||||
assert_eq!(mgr.phase(), RestartPhase::Healthy);
|
||||
assert!(!mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Draining);
|
||||
assert!(mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Restarting);
|
||||
assert!(mgr.is_restart_pending());
|
||||
mgr.set_phase(RestartPhase::Healthy);
|
||||
assert!(!mgr.is_restart_pending());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn active_tracker_signals_idle_only_on_zero_transition() {
|
||||
let tracker = ActiveTracker::new();
|
||||
|
||||
@@ -192,6 +192,17 @@ pub async fn run_metadata_pass(
|
||||
}
|
||||
};
|
||||
for r in batch {
|
||||
// Cooperative checkpoint: if a coordinated browser restart is
|
||||
// pending, yield our (long-lived) lease so the drain can
|
||||
// proceed instead of stalling for the rest of the walk. The
|
||||
// pass exits un-clean, so the next tick recovery-sweeps the
|
||||
// tail we didn't reach.
|
||||
if browser_manager.is_restart_pending() {
|
||||
tracing::info!(
|
||||
"metadata pass: browser restart pending — yielding (recovery sweep next tick)"
|
||||
);
|
||||
break 'outer;
|
||||
}
|
||||
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
|
||||
hit_limit = true;
|
||||
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
|
||||
|
||||
Reference in New Issue
Block a user