feat(crawler): TorController for control-port NEWNYM signaling
Minimal client over tokio::net::TcpStream — AUTHENTICATE then SIGNAL NEWNYM, one-shot connection. Supports cookie-file and password auth (cookie preferred when both provided); covers the multi-line `250-...\r\n250 OK` reply form so future torrc tweaks won't confuse the parser. Not yet wired into the crawler — that lands in the next commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
402
backend/src/crawler/tor.rs
Normal file
402
backend/src/crawler/tor.rs
Normal file
@@ -0,0 +1,402 @@
|
||||
//! TOR control-port client for `SIGNAL NEWNYM` ("recircuit").
|
||||
//!
|
||||
//! The crawler can be proxied through TOR (`CRAWLER_PROXY=socks5h://tor:9050`)
|
||||
//! to randomize the exit IP seen by the target site. When the target
|
||||
//! returns a "bad page" (its broken-template body, missing layout
|
||||
//! sentinel, or unauthenticated probe despite a valid PHPSESSID), it
|
||||
//! is often the current exit being rate-limited or fingerprinted rather
|
||||
//! than a real failure. Asking the local TOR daemon for a new identity
|
||||
//! over its control port (port 9051 by default) makes subsequent
|
||||
//! connections draw a fresh circuit; combined with `IsolateDestAddr`
|
||||
//! in torrc this is usually enough to clear the failure.
|
||||
//!
|
||||
//! Scope is deliberately tiny — `AUTHENTICATE` + `SIGNAL NEWNYM` over
|
||||
//! a one-shot TCP connection. No `torut` dep, no hidden-service
|
||||
//! plumbing, no event streaming.
|
||||
//!
|
||||
//! **Caveat for in-flight connections:** Chromium reuses sockets, so a
|
||||
//! `NEWNYM` only affects *new* connections (in TOR terms, new circuits).
|
||||
//! That's fine for our retry path — the next navigation opens a fresh
|
||||
//! connection. We do not try to forcibly close existing streams.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::timeout;
|
||||
|
||||
/// Default control-port (`tor --defaults-torrc` ships 9051).
|
||||
const DEFAULT_CONTROL_PORT: u16 = 9051;
|
||||
/// Connect timeout — generous enough for a slow compose start, short
|
||||
/// enough that a misconfigured controller doesn't stall a crawl.
|
||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Per-command read timeout. `SIGNAL NEWNYM` returns instantly on the
|
||||
/// happy path; bound it so a half-broken control port can't hang us.
|
||||
const READ_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// How the controller authenticates to the control port.
|
||||
///
|
||||
/// `Cookie` is preferred for compose deploys where the auth cookie file
|
||||
/// is shared between the `tor` and `backend` containers via a named
|
||||
/// volume. `Password` is the fallback when the cookie file isn't
|
||||
/// reachable (different gid, no shared volume, etc.). `None` matches a
|
||||
/// torrc with no `CookieAuthentication 1` and no `HashedControlPassword`
|
||||
/// — useful for local experimentation, not for production.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum TorAuth {
|
||||
None,
|
||||
Password(String),
|
||||
Cookie(PathBuf),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TorController {
|
||||
/// `host:port` string. Kept as a string (not a `SocketAddr`) so
|
||||
/// docker-compose hostnames like `tor:9051` resolve at connect time.
|
||||
addr: String,
|
||||
auth: TorAuth,
|
||||
}
|
||||
|
||||
impl TorController {
|
||||
pub fn new(addr: impl Into<String>, auth: TorAuth) -> Self {
|
||||
Self { addr: addr.into(), auth }
|
||||
}
|
||||
|
||||
/// Build a controller from the env-config shape:
|
||||
/// `url` (e.g. `tcp://tor:9051`, `127.0.0.1:9051`, or `tor`),
|
||||
/// optional password, optional cookie path. Returns `Ok(None)` when
|
||||
/// `url` is absent — that's the "TOR feature disabled" signal.
|
||||
/// Cookie wins over password when both are set (rotates with TOR;
|
||||
/// no secret to manage).
|
||||
pub fn from_parts(
|
||||
url: Option<&str>,
|
||||
password: Option<&str>,
|
||||
cookie_path: Option<&Path>,
|
||||
) -> anyhow::Result<Option<Self>> {
|
||||
let Some(url) = url else { return Ok(None) };
|
||||
let addr = parse_control_url(url)?;
|
||||
let auth = match (cookie_path, password) {
|
||||
(Some(p), _) => TorAuth::Cookie(p.to_path_buf()),
|
||||
(None, Some(p)) => TorAuth::Password(p.to_string()),
|
||||
(None, None) => TorAuth::None,
|
||||
};
|
||||
Ok(Some(Self { addr, auth }))
|
||||
}
|
||||
|
||||
/// Open the control port, `AUTHENTICATE`, `SIGNAL NEWNYM`, `QUIT`.
|
||||
/// Each invocation is a fresh connection; the controller is cheap
|
||||
/// to clone and stateless across calls.
|
||||
pub async fn new_identity(&self) -> anyhow::Result<()> {
|
||||
let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&self.addr))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("timed out connecting to TOR control port {}", self.addr)
|
||||
})?
|
||||
.with_context(|| format!("connect to TOR control port {}", self.addr))?;
|
||||
let (read, mut write) = stream.into_split();
|
||||
let mut read = BufReader::new(read);
|
||||
|
||||
let auth_line = self.build_auth_line().await?;
|
||||
write_line(&mut write, &auth_line).await?;
|
||||
timeout(READ_TIMEOUT, expect_250(&mut read))
|
||||
.await
|
||||
.map_err(|_| anyhow!("TOR control AUTHENTICATE timed out"))?
|
||||
.context("AUTHENTICATE")?;
|
||||
|
||||
write_line(&mut write, "SIGNAL NEWNYM").await?;
|
||||
timeout(READ_TIMEOUT, expect_250(&mut read))
|
||||
.await
|
||||
.map_err(|_| anyhow!("TOR control SIGNAL NEWNYM timed out"))?
|
||||
.context("SIGNAL NEWNYM")?;
|
||||
|
||||
// QUIT is courtesy; ignore errors — the daemon may close the
|
||||
// socket before our QUIT lands and that's perfectly fine.
|
||||
let _ = write_line(&mut write, "QUIT").await;
|
||||
tracing::info!(addr = %self.addr, "TOR NEWNYM signaled");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn build_auth_line(&self) -> anyhow::Result<String> {
|
||||
match &self.auth {
|
||||
TorAuth::None => Ok("AUTHENTICATE".to_string()),
|
||||
TorAuth::Password(p) => Ok(format!("AUTHENTICATE \"{}\"", escape_quoted(p))),
|
||||
TorAuth::Cookie(path) => {
|
||||
let bytes = tokio::fs::read(path)
|
||||
.await
|
||||
.with_context(|| format!("read TOR cookie file {}", path.display()))?;
|
||||
Ok(format!("AUTHENTICATE {}", hex_encode(&bytes)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse `tcp://host:port`, `host:port`, or bare `host` into a
|
||||
/// connect-time string. Default port is [`DEFAULT_CONTROL_PORT`].
|
||||
fn parse_control_url(url: &str) -> anyhow::Result<String> {
|
||||
let stripped = url.strip_prefix("tcp://").unwrap_or(url);
|
||||
if stripped.is_empty() {
|
||||
bail!("TOR control url is empty");
|
||||
}
|
||||
if stripped.contains(':') {
|
||||
Ok(stripped.to_string())
|
||||
} else {
|
||||
Ok(format!("{stripped}:{DEFAULT_CONTROL_PORT}"))
|
||||
}
|
||||
}
|
||||
|
||||
fn escape_quoted(s: &str) -> String {
|
||||
s.replace('\\', r"\\").replace('"', r#"\""#)
|
||||
}
|
||||
|
||||
fn hex_encode(bytes: &[u8]) -> String {
|
||||
let mut s = String::with_capacity(bytes.len() * 2);
|
||||
for b in bytes {
|
||||
s.push_str(&format!("{b:02x}"));
|
||||
}
|
||||
s
|
||||
}
|
||||
|
||||
async fn write_line<W: tokio::io::AsyncWrite + Unpin>(
|
||||
w: &mut W,
|
||||
line: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
w.write_all(line.as_bytes()).await?;
|
||||
w.write_all(b"\r\n").await?;
|
||||
w.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drain a TOR control reply, accepting only status `250`. Handles
|
||||
/// the protocol's three line forms: `XYZ ...` (single/end), `XYZ-...`
|
||||
/// (continuation), `XYZ+...` (data block ended by a lone `.`). Our
|
||||
/// commands only ever produce single-line `250 OK`, but we honor the
|
||||
/// continuation forms so a future torrc that adds events / banners
|
||||
/// doesn't confuse the parser.
|
||||
async fn expect_250<R: AsyncBufReadExt + Unpin>(r: &mut R) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
let n = r.read_line(&mut line).await?;
|
||||
if n == 0 {
|
||||
bail!("TOR control port closed connection mid-reply");
|
||||
}
|
||||
let trimmed = line.trim_end_matches(['\r', '\n']);
|
||||
if trimmed.len() < 4 {
|
||||
bail!("malformed TOR control reply: {trimmed:?}");
|
||||
}
|
||||
let (code, rest) = trimmed.split_at(3);
|
||||
if code != "250" {
|
||||
bail!("TOR control replied {trimmed:?}");
|
||||
}
|
||||
let sep = rest.as_bytes()[0];
|
||||
match sep {
|
||||
b' ' => return Ok(()),
|
||||
b'-' => continue,
|
||||
b'+' => {
|
||||
// Data block — read until a line consisting of only ".".
|
||||
loop {
|
||||
let mut data = String::new();
|
||||
let n = r.read_line(&mut data).await?;
|
||||
if n == 0 {
|
||||
bail!("TOR control port closed mid-data-block");
|
||||
}
|
||||
if data.trim_end_matches(['\r', '\n']) == "." {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => bail!("malformed TOR control reply separator: {trimmed:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
/// Spawn a mock control port that responds to each \r\n-terminated
|
||||
/// inbound line with the next entry from `replies`. Each reply has
|
||||
/// its own `\r\n` appended. Records received lines into `recorder`.
|
||||
/// After `replies.len()` exchanges the task drops the socket — this
|
||||
/// matches the real TOR behavior for QUIT (close after acking).
|
||||
async fn spawn_mock(
|
||||
replies: Vec<&'static str>,
|
||||
recorder: Arc<Mutex<Vec<String>>>,
|
||||
) -> String {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap().to_string();
|
||||
tokio::spawn(async move {
|
||||
let (sock, _) = listener.accept().await.unwrap();
|
||||
let (r, mut w) = sock.into_split();
|
||||
let mut r = BufReader::new(r);
|
||||
for reply in replies {
|
||||
let mut line = String::new();
|
||||
let n = r.read_line(&mut line).await.unwrap_or(0);
|
||||
if n == 0 {
|
||||
return;
|
||||
}
|
||||
recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push(line.trim_end_matches(['\r', '\n']).to_string());
|
||||
w.write_all(reply.as_bytes()).await.unwrap();
|
||||
w.write_all(b"\r\n").await.unwrap();
|
||||
w.flush().await.unwrap();
|
||||
}
|
||||
});
|
||||
addr
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn password_auth_then_newnym_writes_expected_sequence() {
|
||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
||||
// Two replies: AUTHENTICATE then SIGNAL NEWNYM. QUIT is
|
||||
// fire-and-forget; the mock dropping the socket is the
|
||||
// expected real-world behavior.
|
||||
let addr =
|
||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
||||
let controller = TorController::new(addr, TorAuth::Password("secret".into()));
|
||||
controller.new_identity().await.expect("new_identity ok");
|
||||
let recorded = recorder.lock().unwrap().clone();
|
||||
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE \"secret\""));
|
||||
assert_eq!(recorded.get(1).map(String::as_str), Some("SIGNAL NEWNYM"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cookie_auth_hex_encodes_file_bytes() {
|
||||
let tmp = tempfile::NamedTempFile::new().unwrap();
|
||||
let cookie: Vec<u8> = (0u8..32).collect();
|
||||
std::fs::write(tmp.path(), &cookie).unwrap();
|
||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
||||
let addr =
|
||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
||||
let controller =
|
||||
TorController::new(addr, TorAuth::Cookie(tmp.path().to_path_buf()));
|
||||
controller.new_identity().await.expect("new_identity ok");
|
||||
let recorded = recorder.lock().unwrap().clone();
|
||||
let expected_hex: String = cookie.iter().map(|b| format!("{b:02x}")).collect();
|
||||
assert_eq!(
|
||||
recorded.first().map(String::as_str),
|
||||
Some(format!("AUTHENTICATE {expected_hex}").as_str())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_auth_sends_bare_authenticate() {
|
||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
||||
let addr =
|
||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
||||
let controller = TorController::new(addr, TorAuth::None);
|
||||
controller.new_identity().await.expect("new_identity ok");
|
||||
let recorded = recorder.lock().unwrap().clone();
|
||||
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn non_250_reply_returns_err_with_reply_text() {
|
||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
||||
let addr = spawn_mock(
|
||||
vec!["515 Bad authentication"],
|
||||
Arc::clone(&recorder),
|
||||
)
|
||||
.await;
|
||||
let controller =
|
||||
TorController::new(addr, TorAuth::Password("wrong".into()));
|
||||
let err = controller.new_identity().await.expect_err("should fail");
|
||||
let msg = format!("{err:#}");
|
||||
assert!(msg.contains("515"), "expected 515 in error, got: {msg}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closed_connection_mid_reply_is_an_error() {
|
||||
// Listener accepts and immediately drops without replying.
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap().to_string();
|
||||
tokio::spawn(async move {
|
||||
let _ = listener.accept().await;
|
||||
// Drop the accepted socket immediately.
|
||||
});
|
||||
let controller = TorController::new(addr, TorAuth::None);
|
||||
let err = controller.new_identity().await.expect_err("should fail");
|
||||
let msg = format!("{err:#}");
|
||||
assert!(
|
||||
msg.contains("closed connection") || msg.contains("AUTHENTICATE"),
|
||||
"expected close-or-AUTH error, got: {msg}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multi_line_250_continuation_is_accepted() {
|
||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
||||
// AUTHENTICATE reply uses the `250-...\r\n250 OK\r\n` form.
|
||||
// Single reply string contains the whole multi-line response.
|
||||
let addr = spawn_mock(
|
||||
vec!["250-banner=foo\r\n250 OK", "250 OK"],
|
||||
Arc::clone(&recorder),
|
||||
)
|
||||
.await;
|
||||
let controller = TorController::new(addr, TorAuth::None);
|
||||
controller.new_identity().await.expect("new_identity ok");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_parts_returns_none_when_url_unset() {
|
||||
let c = TorController::from_parts(None, None, None).unwrap();
|
||||
assert!(c.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_parts_prefers_cookie_over_password() {
|
||||
let c = TorController::from_parts(
|
||||
Some("tor:9051"),
|
||||
Some("pw"),
|
||||
Some(Path::new("/var/lib/tor/control_auth_cookie")),
|
||||
)
|
||||
.unwrap()
|
||||
.expect("controller built");
|
||||
assert!(matches!(c.auth, TorAuth::Cookie(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_parts_falls_back_to_password_without_cookie() {
|
||||
let c = TorController::from_parts(Some("tor:9051"), Some("pw"), None)
|
||||
.unwrap()
|
||||
.expect("controller built");
|
||||
assert!(matches!(c.auth, TorAuth::Password(p) if p == "pw"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_control_url_accepts_tcp_scheme() {
|
||||
assert_eq!(parse_control_url("tcp://127.0.0.1:9051").unwrap(), "127.0.0.1:9051");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_control_url_defaults_port_when_omitted() {
|
||||
assert_eq!(parse_control_url("tor").unwrap(), "tor:9051");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_control_url_passes_through_host_port() {
|
||||
assert_eq!(parse_control_url("tor:9999").unwrap(), "tor:9999");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_control_url_rejects_empty() {
|
||||
assert!(parse_control_url("").is_err());
|
||||
assert!(parse_control_url("tcp://").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn escape_quoted_handles_quotes_and_backslashes() {
|
||||
assert_eq!(escape_quoted(r#"a"b\c"#), r#"a\"b\\c"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hex_encode_zero_pads_low_bytes() {
|
||||
assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user