Files
Mangalord/backend/src/crawler/browser.rs
MechaCat02 1eebb90e25
Some checks failed
deploy / test-backend (push) Failing after 6s
deploy / test-frontend (push) Failing after 40s
deploy / build-and-push (push) Has been skipped
deploy / deploy (push) Has been skipped
fix(crawler): unhang shutdown on lingering Arc<Browser>, silence WS noise (0.43.1)
- Handle::close aborts its chromiumoxide driver task when another
  Arc<Browser> outlives the call, so shutdown returns instead of
  hanging on a stream that never terminates. Generic close_or_abort
  helper with regression tests covering both Arc paths.
- daemon.shutdown() is wrapped in a 5s timeout in main as defense
  in depth.
- Default RUST_LOG silences chromiumoxide::conn / chromiumoxide::handler
  WS-deserialize ERROR spam.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 14:47:36 +02:00

336 lines
12 KiB
Rust

//! Chromium launcher and lifecycle.
//!
//! Uses `chromiumoxide`'s `fetcher` feature so we don't depend on a
//! system Chrome install — first call downloads a known-good revision
//! into a cache dir and reuses it forever after. `BrowserMode` toggles
//! headed vs headless; the headed path needs a display (real `$DISPLAY`
//! or `xvfb-run`).
//!
//! Extra Chromium command-line flags can be supplied through
//! [`LaunchOptions::extra_args`] in code, or via the
//! `CRAWLER_BROWSER_ARGS` env var (whitespace-separated) when going
//! through [`LaunchOptions::from_env`]. The launcher always also
//! injects `--no-sandbox` and `--disable-dev-shm-usage` because they're
//! near-mandatory for containerized Chromium; everything else is
//! caller-provided.
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Context;
use chromiumoxide::browser::{Browser, BrowserConfig};
use chromiumoxide::error::CdpError;
use chromiumoxide::fetcher::{BrowserFetcher, BrowserFetcherOptions};
use futures_util::StreamExt;
use tokio::task::JoinHandle;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BrowserMode {
/// Real window. Needs `$DISPLAY` (or `xvfb-run` wrapping the
/// binary). Opt-in via `CRAWLER_BROWSER_MODE=headed` — useful for
/// debugging a flow visually or for sites that fingerprint
/// headless Chrome. Not used in production.
Headed,
/// No window. Faster, lower resource use, runs without a display.
/// This is the default for both `from_env()` and `Default`.
Headless,
}
/// Configuration for a single browser launch.
///
/// Public fields rather than a builder — there are only two of them
/// and callers benefit from struct literal syntax for clarity.
#[derive(Clone, Debug)]
pub struct LaunchOptions {
pub mode: BrowserMode,
/// Extra Chromium flags, appended after the launcher's own
/// defaults. Example: `vec!["--lang=de-DE".into(),
/// "--window-size=1280,800".into()]`.
pub extra_args: Vec<String>,
}
impl LaunchOptions {
pub fn headed() -> Self {
Self {
mode: BrowserMode::Headed,
extra_args: Vec::new(),
}
}
pub fn headless() -> Self {
Self {
mode: BrowserMode::Headless,
extra_args: Vec::new(),
}
}
/// Reads `CRAWLER_BROWSER_MODE` (`headless`|`headed`, default
/// `headless`) and `CRAWLER_BROWSER_ARGS` (whitespace-separated
/// Chromium flags). Flags containing whitespace aren't supported
/// through the env var — use the programmatic API for those.
pub fn from_env() -> Self {
let mode = match std::env::var("CRAWLER_BROWSER_MODE").as_deref() {
Ok("headed") => BrowserMode::Headed,
_ => BrowserMode::Headless,
};
let extra_args = std::env::var("CRAWLER_BROWSER_ARGS")
.map(|s| parse_args(&s))
.unwrap_or_default();
Self { mode, extra_args }
}
}
impl Default for LaunchOptions {
fn default() -> Self {
Self::headless()
}
}
/// Whitespace-split a CRAWLER_BROWSER_ARGS-style string. Exposed
/// separately from `from_env` so it can be unit-tested without
/// touching process environment.
pub(crate) fn parse_args(s: &str) -> Vec<String> {
s.split_whitespace().map(str::to_string).collect()
}
/// Owned browser plus the spawned task that drives its CDP event loop.
/// Dropping `Handle` without calling `close` leaks the Chromium process
/// — always call `close().await` in production paths.
///
/// The browser is stored behind an `Arc` so it can be shared across
/// worker tasks (via [`Handle::shared`]) without copying. `Browser::new_page`
/// only needs `&self`, so multiple workers can drive the same browser
/// concurrently as long as the manager keeps the `Arc` alive.
pub struct Handle {
browser: Arc<Browser>,
driver: JoinHandle<()>,
}
impl Handle {
/// Borrow the browser. Equivalent to `&*handle.shared()`.
pub fn browser(&self) -> &Browser {
&self.browser
}
/// Clone the shared handle. Workers hold these to call `new_page`
/// concurrently. The browser only exits when the last `Arc<Browser>`
/// is dropped (kill-on-drop), or when `close()` is called on the
/// originating `Handle` while it is the sole holder.
pub fn shared(&self) -> Arc<Browser> {
Arc::clone(&self.browser)
}
/// Closes the browser and awaits the driver task. If other Arcs to
/// the browser are still alive we can't issue a clean CDP `close`,
/// so we abort the driver task instead — otherwise `handler.next()`
/// keeps polling forever and `Handle::close` hangs (chromiumoxide's
/// handler stream doesn't end on its own when the underlying WS
/// dies). Chromium itself is reaped by kill-on-drop once the last
/// `Arc<Browser>` is dropped.
pub async fn close(self) -> anyhow::Result<()> {
close_or_abort(self.browser, self.driver, |mut owned| async move {
let _ = owned.close().await;
let _ = owned.wait().await;
})
.await;
Ok(())
}
}
/// Shutdown core for [`Handle::close`], extracted so it can be unit-
/// tested without launching real Chromium. When `arc` is uniquely owned,
/// `on_owned` runs against the owned value and the driver is awaited
/// normally. When other Arc holders exist, the driver is aborted before
/// awaiting it so shutdown returns promptly.
async fn close_or_abort<T, F, Fut>(arc: Arc<T>, driver: JoinHandle<()>, on_owned: F)
where
T: Send + 'static,
F: FnOnce(T) -> Fut + Send,
Fut: std::future::Future<Output = ()> + Send,
{
match Arc::try_unwrap(arc) {
Ok(owned) => {
on_owned(owned).await;
let _ = driver.await;
}
Err(shared) => {
tracing::warn!(
strong_count = Arc::strong_count(&shared),
"Handle::close while Arc still shared — aborting driver, relying on kill-on-drop"
);
drop(shared);
driver.abort();
let _ = driver.await;
}
}
}
/// Launches Chromium. Downloads it on first run via the `fetcher`
/// feature; subsequent runs hit the cache. The cache dir is
/// `$CRAWLER_CHROMIUM_DIR` if set, else `$HOME/.cache/mangalord/chromium`,
/// else `./.chromium-cache` as a last-resort repo-local fallback.
pub async fn launch(options: LaunchOptions) -> anyhow::Result<Handle> {
let cache = cache_dir()?;
tokio::fs::create_dir_all(&cache)
.await
.with_context(|| format!("create cache dir {}", cache.display()))?;
let fetcher = BrowserFetcher::new(
BrowserFetcherOptions::builder()
.with_path(&cache)
.build()
.map_err(|e| anyhow::anyhow!("fetcher options: {e}"))?,
);
tracing::info!(path = %cache.display(), "ensuring chromium revision is present");
let info = fetcher
.fetch()
.await
.context("download chromium via fetcher")?;
tracing::info!(executable = %info.executable_path.display(), "chromium ready");
let mut builder = BrowserConfig::builder()
.chrome_executable(info.executable_path)
// Linux containers / CI commonly lack the user namespaces
// Chromium's sandbox wants. Disable it; the crawler runs in its
// own container anyway.
.arg("--no-sandbox")
.arg("--disable-dev-shm-usage");
for arg in &options.extra_args {
builder = builder.arg(arg);
}
if matches!(options.mode, BrowserMode::Headed) {
builder = builder.with_head();
}
tracing::info!(
mode = ?options.mode,
extra_args = ?options.extra_args,
"building browser config"
);
let config = builder
.build()
.map_err(|e| anyhow::anyhow!("browser config: {e}"))?;
let (browser, mut handler) = Browser::launch(config)
.await
.context("launch chromium")?;
let driver = tokio::spawn(async move {
while let Some(event) = handler.next().await {
match event {
Ok(_) => {}
// chromiumoxide 0.7 ships fixed CDP type bindings, so any
// CDP event Chrome added later fails to deserialize. The
// connection is unaffected — these are noise. Suppress
// them so real failures stay visible.
Err(CdpError::Serde(_)) => {
tracing::trace!("chromium emitted an unrecognized CDP event");
}
Err(err) => tracing::warn!(?err, "chromium handler event error"),
}
}
});
Ok(Handle {
browser: Arc::new(browser),
driver,
})
}
fn cache_dir() -> anyhow::Result<PathBuf> {
if let Ok(dir) = std::env::var("CRAWLER_CHROMIUM_DIR") {
return Ok(PathBuf::from(dir));
}
if let Ok(home) = std::env::var("HOME") {
return Ok(PathBuf::from(home).join(".cache/mangalord/chromium"));
}
Ok(PathBuf::from("./.chromium-cache"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_args_splits_on_whitespace() {
assert_eq!(
parse_args("--lang=de-DE --window-size=1280,800"),
vec!["--lang=de-DE", "--window-size=1280,800"]
);
}
#[test]
fn parse_args_tolerates_irregular_whitespace() {
// tabs, multiple spaces, leading/trailing — all collapsed.
assert_eq!(
parse_args(" --a\t--b --c=1\n"),
vec!["--a", "--b", "--c=1"]
);
}
#[test]
fn parse_args_empty_string_yields_empty_vec() {
assert!(parse_args("").is_empty());
assert!(parse_args(" \t\n").is_empty());
}
#[test]
fn default_launch_options_are_headless() {
// Headless is the production-safe default — no display required,
// smaller resource footprint. `Headed` stays available as an
// opt-in for debugging via CRAWLER_BROWSER_MODE=headed.
assert_eq!(LaunchOptions::default().mode, BrowserMode::Headless);
assert_eq!(LaunchOptions::headless().mode, BrowserMode::Headless);
assert_eq!(LaunchOptions::headed().mode, BrowserMode::Headed);
}
// Regression: if another Arc<Browser> outlives `Handle::close`, the
// old code awaited the driver task forever because the chromiumoxide
// handler stream doesn't return None on its own. Aborting the driver
// unblocks shutdown even when kill-on-drop can't fire yet.
#[tokio::test]
async fn close_or_abort_returns_when_arc_is_shared() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
let arc = Arc::new(());
let _keepalive = Arc::clone(&arc); // forces try_unwrap to fail
let driver = tokio::spawn(std::future::pending::<()>());
let on_owned_ran = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&on_owned_ran);
let fut = close_or_abort(arc, driver, move |_| {
let flag = Arc::clone(&flag);
async move { flag.store(true, Ordering::Release) }
});
tokio::time::timeout(Duration::from_secs(2), fut)
.await
.expect("close_or_abort must not hang when driver is pending and Arc is shared");
assert!(
!on_owned_ran.load(Ordering::Acquire),
"on_owned must not run when the Arc is still shared"
);
}
#[tokio::test]
async fn close_or_abort_runs_on_owned_when_arc_is_unique() {
use std::sync::atomic::{AtomicBool, Ordering};
let arc = Arc::new(());
let driver = tokio::spawn(async {}); // completes immediately
let on_owned_ran = Arc::new(AtomicBool::new(false));
let flag = Arc::clone(&on_owned_ran);
close_or_abort(arc, driver, move |_| {
let flag = Arc::clone(&flag);
async move { flag.store(true, Ordering::Release) }
})
.await;
assert!(
on_owned_ran.load(Ordering::Acquire),
"on_owned must run when the Arc is unique"
);
}
}