From d37b94871eb78783ab872abed26fc79a12276f25 Mon Sep 17 00:00:00 2001 From: MechaCat02 Date: Sun, 31 May 2026 18:36:24 +0200 Subject: [PATCH] feat(crawler): TorController for control-port NEWNYM signaling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimal client over tokio::net::TcpStream — AUTHENTICATE then SIGNAL NEWNYM, one-shot connection. Supports cookie-file and password auth (cookie preferred when both provided); covers the multi-line `250-...\r\n250 OK` reply form so future torrc tweaks won't confuse the parser. Not yet wired into the crawler — that lands in the next commits. Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/src/crawler/mod.rs | 1 + backend/src/crawler/tor.rs | 402 +++++++++++++++++++++++++++++++++++++ 2 files changed, 403 insertions(+) create mode 100644 backend/src/crawler/tor.rs diff --git a/backend/src/crawler/mod.rs b/backend/src/crawler/mod.rs index e82e737..662b8a3 100644 --- a/backend/src/crawler/mod.rs +++ b/backend/src/crawler/mod.rs @@ -26,4 +26,5 @@ pub mod rate_limit; pub mod safety; pub mod session; pub mod source; +pub mod tor; pub mod url_utils; diff --git a/backend/src/crawler/tor.rs b/backend/src/crawler/tor.rs new file mode 100644 index 0000000..777e676 --- /dev/null +++ b/backend/src/crawler/tor.rs @@ -0,0 +1,402 @@ +//! TOR control-port client for `SIGNAL NEWNYM` ("recircuit"). +//! +//! The crawler can be proxied through TOR (`CRAWLER_PROXY=socks5h://tor:9050`) +//! to randomize the exit IP seen by the target site. When the target +//! returns a "bad page" (its broken-template body, missing layout +//! sentinel, or unauthenticated probe despite a valid PHPSESSID), it +//! is often the current exit being rate-limited or fingerprinted rather +//! than a real failure. Asking the local TOR daemon for a new identity +//! over its control port (port 9051 by default) makes subsequent +//! connections draw a fresh circuit; combined with `IsolateDestAddr` +//! in torrc this is usually enough to clear the failure. +//! +//! Scope is deliberately tiny — `AUTHENTICATE` + `SIGNAL NEWNYM` over +//! a one-shot TCP connection. No `torut` dep, no hidden-service +//! plumbing, no event streaming. +//! +//! **Caveat for in-flight connections:** Chromium reuses sockets, so a +//! `NEWNYM` only affects *new* connections (in TOR terms, new circuits). +//! That's fine for our retry path — the next navigation opens a fresh +//! connection. We do not try to forcibly close existing streams. + +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::{anyhow, bail, Context}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tokio::time::timeout; + +/// Default control-port (`tor --defaults-torrc` ships 9051). +const DEFAULT_CONTROL_PORT: u16 = 9051; +/// Connect timeout — generous enough for a slow compose start, short +/// enough that a misconfigured controller doesn't stall a crawl. +const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +/// Per-command read timeout. `SIGNAL NEWNYM` returns instantly on the +/// happy path; bound it so a half-broken control port can't hang us. +const READ_TIMEOUT: Duration = Duration::from_secs(5); + +/// How the controller authenticates to the control port. +/// +/// `Cookie` is preferred for compose deploys where the auth cookie file +/// is shared between the `tor` and `backend` containers via a named +/// volume. `Password` is the fallback when the cookie file isn't +/// reachable (different gid, no shared volume, etc.). `None` matches a +/// torrc with no `CookieAuthentication 1` and no `HashedControlPassword` +/// — useful for local experimentation, not for production. +#[derive(Debug, Clone)] +pub enum TorAuth { + None, + Password(String), + Cookie(PathBuf), +} + +#[derive(Debug, Clone)] +pub struct TorController { + /// `host:port` string. Kept as a string (not a `SocketAddr`) so + /// docker-compose hostnames like `tor:9051` resolve at connect time. + addr: String, + auth: TorAuth, +} + +impl TorController { + pub fn new(addr: impl Into, auth: TorAuth) -> Self { + Self { addr: addr.into(), auth } + } + + /// Build a controller from the env-config shape: + /// `url` (e.g. `tcp://tor:9051`, `127.0.0.1:9051`, or `tor`), + /// optional password, optional cookie path. Returns `Ok(None)` when + /// `url` is absent — that's the "TOR feature disabled" signal. + /// Cookie wins over password when both are set (rotates with TOR; + /// no secret to manage). + pub fn from_parts( + url: Option<&str>, + password: Option<&str>, + cookie_path: Option<&Path>, + ) -> anyhow::Result> { + let Some(url) = url else { return Ok(None) }; + let addr = parse_control_url(url)?; + let auth = match (cookie_path, password) { + (Some(p), _) => TorAuth::Cookie(p.to_path_buf()), + (None, Some(p)) => TorAuth::Password(p.to_string()), + (None, None) => TorAuth::None, + }; + Ok(Some(Self { addr, auth })) + } + + /// Open the control port, `AUTHENTICATE`, `SIGNAL NEWNYM`, `QUIT`. + /// Each invocation is a fresh connection; the controller is cheap + /// to clone and stateless across calls. + pub async fn new_identity(&self) -> anyhow::Result<()> { + let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&self.addr)) + .await + .with_context(|| { + format!("timed out connecting to TOR control port {}", self.addr) + })? + .with_context(|| format!("connect to TOR control port {}", self.addr))?; + let (read, mut write) = stream.into_split(); + let mut read = BufReader::new(read); + + let auth_line = self.build_auth_line().await?; + write_line(&mut write, &auth_line).await?; + timeout(READ_TIMEOUT, expect_250(&mut read)) + .await + .map_err(|_| anyhow!("TOR control AUTHENTICATE timed out"))? + .context("AUTHENTICATE")?; + + write_line(&mut write, "SIGNAL NEWNYM").await?; + timeout(READ_TIMEOUT, expect_250(&mut read)) + .await + .map_err(|_| anyhow!("TOR control SIGNAL NEWNYM timed out"))? + .context("SIGNAL NEWNYM")?; + + // QUIT is courtesy; ignore errors — the daemon may close the + // socket before our QUIT lands and that's perfectly fine. + let _ = write_line(&mut write, "QUIT").await; + tracing::info!(addr = %self.addr, "TOR NEWNYM signaled"); + Ok(()) + } + + async fn build_auth_line(&self) -> anyhow::Result { + match &self.auth { + TorAuth::None => Ok("AUTHENTICATE".to_string()), + TorAuth::Password(p) => Ok(format!("AUTHENTICATE \"{}\"", escape_quoted(p))), + TorAuth::Cookie(path) => { + let bytes = tokio::fs::read(path) + .await + .with_context(|| format!("read TOR cookie file {}", path.display()))?; + Ok(format!("AUTHENTICATE {}", hex_encode(&bytes))) + } + } + } +} + +/// Parse `tcp://host:port`, `host:port`, or bare `host` into a +/// connect-time string. Default port is [`DEFAULT_CONTROL_PORT`]. +fn parse_control_url(url: &str) -> anyhow::Result { + let stripped = url.strip_prefix("tcp://").unwrap_or(url); + if stripped.is_empty() { + bail!("TOR control url is empty"); + } + if stripped.contains(':') { + Ok(stripped.to_string()) + } else { + Ok(format!("{stripped}:{DEFAULT_CONTROL_PORT}")) + } +} + +fn escape_quoted(s: &str) -> String { + s.replace('\\', r"\\").replace('"', r#"\""#) +} + +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + s.push_str(&format!("{b:02x}")); + } + s +} + +async fn write_line( + w: &mut W, + line: &str, +) -> anyhow::Result<()> { + w.write_all(line.as_bytes()).await?; + w.write_all(b"\r\n").await?; + w.flush().await?; + Ok(()) +} + +/// Drain a TOR control reply, accepting only status `250`. Handles +/// the protocol's three line forms: `XYZ ...` (single/end), `XYZ-...` +/// (continuation), `XYZ+...` (data block ended by a lone `.`). Our +/// commands only ever produce single-line `250 OK`, but we honor the +/// continuation forms so a future torrc that adds events / banners +/// doesn't confuse the parser. +async fn expect_250(r: &mut R) -> anyhow::Result<()> { + loop { + let mut line = String::new(); + let n = r.read_line(&mut line).await?; + if n == 0 { + bail!("TOR control port closed connection mid-reply"); + } + let trimmed = line.trim_end_matches(['\r', '\n']); + if trimmed.len() < 4 { + bail!("malformed TOR control reply: {trimmed:?}"); + } + let (code, rest) = trimmed.split_at(3); + if code != "250" { + bail!("TOR control replied {trimmed:?}"); + } + let sep = rest.as_bytes()[0]; + match sep { + b' ' => return Ok(()), + b'-' => continue, + b'+' => { + // Data block — read until a line consisting of only ".". + loop { + let mut data = String::new(); + let n = r.read_line(&mut data).await?; + if n == 0 { + bail!("TOR control port closed mid-data-block"); + } + if data.trim_end_matches(['\r', '\n']) == "." { + break; + } + } + } + _ => bail!("malformed TOR control reply separator: {trimmed:?}"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + use tokio::io::AsyncWriteExt; + use tokio::net::TcpListener; + + /// Spawn a mock control port that responds to each \r\n-terminated + /// inbound line with the next entry from `replies`. Each reply has + /// its own `\r\n` appended. Records received lines into `recorder`. + /// After `replies.len()` exchanges the task drops the socket — this + /// matches the real TOR behavior for QUIT (close after acking). + async fn spawn_mock( + replies: Vec<&'static str>, + recorder: Arc>>, + ) -> String { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tokio::spawn(async move { + let (sock, _) = listener.accept().await.unwrap(); + let (r, mut w) = sock.into_split(); + let mut r = BufReader::new(r); + for reply in replies { + let mut line = String::new(); + let n = r.read_line(&mut line).await.unwrap_or(0); + if n == 0 { + return; + } + recorder + .lock() + .unwrap() + .push(line.trim_end_matches(['\r', '\n']).to_string()); + w.write_all(reply.as_bytes()).await.unwrap(); + w.write_all(b"\r\n").await.unwrap(); + w.flush().await.unwrap(); + } + }); + addr + } + + #[tokio::test] + async fn password_auth_then_newnym_writes_expected_sequence() { + let recorder = Arc::new(Mutex::new(Vec::new())); + // Two replies: AUTHENTICATE then SIGNAL NEWNYM. QUIT is + // fire-and-forget; the mock dropping the socket is the + // expected real-world behavior. + let addr = + spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await; + let controller = TorController::new(addr, TorAuth::Password("secret".into())); + controller.new_identity().await.expect("new_identity ok"); + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE \"secret\"")); + assert_eq!(recorded.get(1).map(String::as_str), Some("SIGNAL NEWNYM")); + } + + #[tokio::test] + async fn cookie_auth_hex_encodes_file_bytes() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let cookie: Vec = (0u8..32).collect(); + std::fs::write(tmp.path(), &cookie).unwrap(); + let recorder = Arc::new(Mutex::new(Vec::new())); + let addr = + spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await; + let controller = + TorController::new(addr, TorAuth::Cookie(tmp.path().to_path_buf())); + controller.new_identity().await.expect("new_identity ok"); + let recorded = recorder.lock().unwrap().clone(); + let expected_hex: String = cookie.iter().map(|b| format!("{b:02x}")).collect(); + assert_eq!( + recorded.first().map(String::as_str), + Some(format!("AUTHENTICATE {expected_hex}").as_str()) + ); + } + + #[tokio::test] + async fn no_auth_sends_bare_authenticate() { + let recorder = Arc::new(Mutex::new(Vec::new())); + let addr = + spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await; + let controller = TorController::new(addr, TorAuth::None); + controller.new_identity().await.expect("new_identity ok"); + let recorded = recorder.lock().unwrap().clone(); + assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE")); + } + + #[tokio::test] + async fn non_250_reply_returns_err_with_reply_text() { + let recorder = Arc::new(Mutex::new(Vec::new())); + let addr = spawn_mock( + vec!["515 Bad authentication"], + Arc::clone(&recorder), + ) + .await; + let controller = + TorController::new(addr, TorAuth::Password("wrong".into())); + let err = controller.new_identity().await.expect_err("should fail"); + let msg = format!("{err:#}"); + assert!(msg.contains("515"), "expected 515 in error, got: {msg}"); + } + + #[tokio::test] + async fn closed_connection_mid_reply_is_an_error() { + // Listener accepts and immediately drops without replying. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap().to_string(); + tokio::spawn(async move { + let _ = listener.accept().await; + // Drop the accepted socket immediately. + }); + let controller = TorController::new(addr, TorAuth::None); + let err = controller.new_identity().await.expect_err("should fail"); + let msg = format!("{err:#}"); + assert!( + msg.contains("closed connection") || msg.contains("AUTHENTICATE"), + "expected close-or-AUTH error, got: {msg}" + ); + } + + #[tokio::test] + async fn multi_line_250_continuation_is_accepted() { + let recorder = Arc::new(Mutex::new(Vec::new())); + // AUTHENTICATE reply uses the `250-...\r\n250 OK\r\n` form. + // Single reply string contains the whole multi-line response. + let addr = spawn_mock( + vec!["250-banner=foo\r\n250 OK", "250 OK"], + Arc::clone(&recorder), + ) + .await; + let controller = TorController::new(addr, TorAuth::None); + controller.new_identity().await.expect("new_identity ok"); + } + + #[test] + fn from_parts_returns_none_when_url_unset() { + let c = TorController::from_parts(None, None, None).unwrap(); + assert!(c.is_none()); + } + + #[test] + fn from_parts_prefers_cookie_over_password() { + let c = TorController::from_parts( + Some("tor:9051"), + Some("pw"), + Some(Path::new("/var/lib/tor/control_auth_cookie")), + ) + .unwrap() + .expect("controller built"); + assert!(matches!(c.auth, TorAuth::Cookie(_))); + } + + #[test] + fn from_parts_falls_back_to_password_without_cookie() { + let c = TorController::from_parts(Some("tor:9051"), Some("pw"), None) + .unwrap() + .expect("controller built"); + assert!(matches!(c.auth, TorAuth::Password(p) if p == "pw")); + } + + #[test] + fn parse_control_url_accepts_tcp_scheme() { + assert_eq!(parse_control_url("tcp://127.0.0.1:9051").unwrap(), "127.0.0.1:9051"); + } + + #[test] + fn parse_control_url_defaults_port_when_omitted() { + assert_eq!(parse_control_url("tor").unwrap(), "tor:9051"); + } + + #[test] + fn parse_control_url_passes_through_host_port() { + assert_eq!(parse_control_url("tor:9999").unwrap(), "tor:9999"); + } + + #[test] + fn parse_control_url_rejects_empty() { + assert!(parse_control_url("").is_err()); + assert!(parse_control_url("tcp://").is_err()); + } + + #[test] + fn escape_quoted_handles_quotes_and_backslashes() { + assert_eq!(escape_quoted(r#"a"b\c"#), r#"a\"b\\c"#); + } + + #[test] + fn hex_encode_zero_pads_low_bytes() { + assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff"); + } +}