Compare commits

..

12 Commits

Author SHA1 Message Date
MechaCat02
e3cff9d874 fix(deploy): pivot tor service to password auth + wrapper entrypoint
Some checks failed
deploy / test-backend (push) Successful in 20m29s
deploy / test-frontend (push) Successful in 9m42s
deploy / deploy (push) Has been cancelled
deploy / build-and-push (push) Has been cancelled
Dockurr/tor's stock entrypoint binds the control port to localhost
(unreachable from a sibling container), refuses to run as a
non-default user (its setup chowns dirs and su-execs down to its
`tor` user, both requiring root), and skips its own
HashedControlPassword injection whenever the user's torrc declares
a ControlPort. The combination meant the original cookie-via-shared-
volume design couldn't work without fighting the image.

This commit:

- Adds tor/entrypoint.sh, a small wrapper that hashes $PASSWORD
  with `tor --hash-password`, appends the hash to a writable copy
  of /etc/tor/torrc, then execs tor. Container runs as root only
  for that bring-up; the torrc's `User tor` directive drops privs
  after port binding.
- Adds a healthcheck on the tor service that gates downstream
  containers on both 9050 + 9051 actually listening (was
  service_started, which fires before tor finishes bootstrap).
- Loosens MaxCircuitDirtiness 60 → 600. The 60s value would have
  rotated mid-chapter for any chapter with > ~50 images, which is
  exactly the kind of fingerprint we're trying to avoid.
- Wires TOR_CONTROL_PASSWORD as a REQUIRED .env var on both sides
  (PASSWORD on tor, CRAWLER_TOR_CONTROL_PASSWORD on backend).
  docker-compose.yml fails fast if unset.
- Removes the tor-data shared volume on backend (cookie auth is no
  longer the default; operators wanting cookie can mount it back).
- Documents the pivot + the cookie-vs-password tradeoff in
  .env.example.

End-to-end validated: `docker compose up -d tor`, then
`printf 'AUTHENTICATE "test"\r\nSIGNAL NEWNYM\r\nQUIT\r\n' | nc tor 9051`
returns three `250 OK` lines.

Audit ref: #2, #3, #6.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:25:54 +02:00
MechaCat02
d47e832613 fix(crawler): redact TorAuth::Password in Debug, drop NEWNYM info→debug
The startup log line in app.rs and bin/crawler.rs `?t`-debug-formats
the TorController, which through the derived Debug on TorAuth would
expand TorAuth::Password(p) and leak the plaintext password to logs.
Implement Debug manually on TorAuth — None / Password(<redacted>) /
Cookie(<path>) — and lock the redaction with a regression test.

Drop the per-NEWNYM success log from info to debug: a busy crawl
rotates circuits many times per minute. Failed NEWNYMs already log
at warn — those stay loud.

Tightens the closed_connection_mid_reply_is_an_error assertion which
was tautological (`closed connection` OR `AUTHENTICATE`) by driving
the mock to read the AUTH line then drop, exercising only the
EOF-mid-reply path.

Audit ref: #7, #9, nit on tautological test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:25:36 +02:00
MechaCat02
c30c7a546f fix(crawler): unify recircuit budget semantics — N = total attempts
The three retry-with-recircuit sites disagreed: detect.rs's
retry_on_transient_with_hook used "N = total attempts" (3 → 3
fetches), but session.rs's unauth branch and content.rs's chapter
loop used "N = recircuits" (3 → 4 fetches). At the same wall-clock
"max=3", different sites hit the upstream a different number of times.

Unify on N = total attempts (matching the existing
retry_on_transient convention). The CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS
env var now means exactly what its name suggests. Disabling the
recircuit feature collapses to max_attempts=1 (single attempt, no
retry) — bit-for-bit pre-TOR behavior preserved.

Adds a debug_assert!(max >= 1) on both helpers and a new
content.rs test exercising the mixed Transient → Unauth → Ok
sequence to lock in the shared-counter invariant.

Audit ref: #5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:25:25 +02:00
MechaCat02
a0db7beb81 chore: bump to 0.46.0 for TOR proxy + recircuit feature
CRAWLER_TOR_CONTROL_URL, _PASSWORD, _COOKIE_PATH,
_RECIRCUIT_MAX_ATTEMPTS are new feature env vars; treat per CLAUDE.md
as a minor bump (feat:).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:01:57 +02:00
MechaCat02
ecbbebafc4 feat(deploy): dockurr/tor service + torrc; wire crawler to use it by default
Adds a `tor` service to the compose stack (dockurr/tor) with a torrc
tuned for the crawler — SOCKS5 on 9050 with IsolateDestAddr +
IsolateDestPort so NEWNYM picks up promptly, control port on 9051
with cookie auth, MaxCircuitDirtiness 60.

Backend defaults CRAWLER_PROXY → socks5h://tor:9050 and
CRAWLER_TOR_CONTROL_URL → tcp://tor:9051 so TOR + recircuit are on
out-of-the-box. Operators can override both to empty in .env to opt
out without removing the service.

The tor-data named volume is mounted ro on the backend so it can read
/var/lib/tor/control_auth_cookie; CookieAuthFileGroupReadable handles
the permissions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:01:04 +02:00
MechaCat02
8c6378b877 feat(crawler): recircuit TOR on transient pages and unauthenticated probes
- target.rs swaps retry_on_transient → retry_on_transient_with_hook,
  signaling NEWNYM via ctx.tor between attempts when configured.
- session.rs gains verify_session_with_recircuit; the bare
  verify_session is now a one-line wrapper passing tor=None,
  unauth_max_recircuit=0. The inner run_session_probe_loop is
  pure-over-IO and unit-tested with closure-based fakes.
- content.rs extracts fetch_chapter_html_once + the closure-driven
  fetch_chapter_html_with_recircuit, used by sync_chapter_content to
  retry on Transient or Unauthenticated up to a recircuit_budget.
  Budget = 0 (no TOR) preserves original behavior bit-for-bit.
- app.rs and bin/crawler.rs construct the controller before on_launch
  and pass it into verify_session_with_recircuit, so a transient
  hiccup at startup no longer requires PHPSESSID rotation.

Recircuit budget defaults to CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS (3).
Errors from NEWNYM are logged and swallowed — failing to recircuit
should not take down the crawl.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 20:01:04 +02:00
MechaCat02
8557e432a2 feat(crawler): plumb TorController through FetchContext and pipelines
Adds CRAWLER_TOR_CONTROL_URL / _PASSWORD / _COOKIE_PATH /
_RECIRCUIT_MAX_ATTEMPTS to CrawlerConfig and to bin/crawler.rs's
env reads. Constructs an Option<Arc<TorController>> at daemon /
CLI startup and threads it through FetchContext,
pipeline::run_metadata_pass, and content::sync_chapter_content as
Option<&TorController>.

Pure scaffolding — the controller isn't used yet; behavior is
unchanged. Next commit wires the retry hooks and session-probe
recircuit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 19:59:47 +02:00
MechaCat02
d6d84dedcb feat(crawler): retry_on_transient_with_hook for between-retry side effects
Adds a sibling fn that fires a caller-supplied async hook between a
transient failure and the next attempt. The existing
retry_on_transient becomes a thin wrapper over it (no-op hook), so
no call sites churn yet.

Hook contract: fires only between attempts (N-1 times for N
attempts), never after a non-transient error or after the final
attempt. Designed for TOR NEWNYM, but the signature is generic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 19:59:47 +02:00
MechaCat02
d37b94871e feat(crawler): TorController for control-port NEWNYM signaling
Minimal client over tokio::net::TcpStream — AUTHENTICATE then
SIGNAL NEWNYM, one-shot connection. Supports cookie-file and
password auth (cookie preferred when both provided); covers the
multi-line `250-...\r\n250 OK` reply form so future torrc tweaks
won't confuse the parser.

Not yet wired into the crawler — that lands in the next commits.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-31 19:59:47 +02:00
8e39fadd21 ci: build via host docker socket (plain build); fix missing daemon socket (#5)
All checks were successful
deploy / test-backend (push) Successful in 19m12s
deploy / test-frontend (push) Successful in 9m43s
deploy / build-and-push (push) Successful in 8m11s
deploy / deploy (push) Successful in 11s
2026-05-31 17:40:14 +00:00
3b3d13a0f6 fix(crawler): walk list pages incrementally; stop on empty page (0.45.1) (#4)
Some checks failed
deploy / test-backend (push) Successful in 18m58s
deploy / test-frontend (push) Successful in 9m43s
deploy / build-and-push (push) Failing after 2m26s
deploy / deploy (push) Has been skipped
2026-05-31 16:37:14 +00:00
0f90af80cb ci(test-backend): ubuntu-latest + rustup (fix node-not-found) (#3)
Some checks failed
deploy / test-backend (push) Has been cancelled
deploy / test-frontend (push) Has been cancelled
deploy / build-and-push (push) Has been cancelled
deploy / deploy (push) Has been cancelled
2026-05-31 16:18:21 +00:00
19 changed files with 1560 additions and 124 deletions

View File

@@ -83,6 +83,43 @@ CRAWLER_MAX_IMAGE_BYTES=33554432
# the image actually contains the binary.
CRAWLER_CHROMIUM_BINARY=
# ----- Crawler TOR proxy + recircuit -----
# The compose stack ships a `tor` service (dockurr/tor) and defaults
# CRAWLER_PROXY to it, so by default all crawler traffic exits via the
# TOR network. To opt out, set CRAWLER_PROXY= (empty) AND
# CRAWLER_TOR_CONTROL_URL= (empty) below — the tor service can stay
# running, it just won't be used.
#
# Going through TOR adds latency to every fetch; image downloads in
# particular slow noticeably. The win is on sites that rate-limit or
# fingerprint by exit IP — NEWNYM recirculation makes a fresh exit
# cheap to reach for.
#
# CRAWLER_PROXY: SOCKS5(h) URL. Use `socks5h://` (not `socks5://`) so
# DNS resolution also goes through TOR, avoiding leaks via the host's
# resolver. Leave unset to talk to the upstream directly.
CRAWLER_PROXY=socks5h://tor:9050
# Control-port URL for SIGNAL NEWNYM ("get a fresh circuit"). Triggered
# automatically on bad pages (broken-page body, missing #logo) and on
# the Unauthenticated session probe outcome. Leave unset to disable
# the recircuit feature (the SOCKS proxy still works).
CRAWLER_TOR_CONTROL_URL=tcp://tor:9051
# Max NEWNYM-and-retry cycles per recircuit-eligible failure. Default 3.
CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS=3
# ----- TOR control-port password -----
# Shared between the bundled dockurr/tor service (which hashes it into
# its HashedControlPassword) and the backend's
# CRAWLER_TOR_CONTROL_PASSWORD. REQUIRED — docker-compose.yml fails
# fast if absent. Generate a strong random string; rotate by setting
# a new value and restarting both `tor` and `backend`.
#
# Operators running their own non-dockurr tor daemon with cookie-file
# auth can ignore this var and instead set
# CRAWLER_TOR_CONTROL_COOKIE_PATH on the backend — the TorController
# prefers cookie when both are present.
TOR_CONTROL_PASSWORD=change-me-to-a-strong-random-string
# ----- Frontend -----
# The frontend container runs SvelteKit's Node adapter on :3000 and
# proxies /api/* to BACKEND_URL via src/hooks.server.ts. In compose the

View File

@@ -10,8 +10,6 @@ on:
jobs:
test-backend:
runs-on: ubuntu-latest
container:
image: rust:1-slim
services:
postgres:
image: postgres:16-alpine
@@ -28,10 +26,18 @@ jobs:
DATABASE_URL: postgres://mangalord:mangalord@postgres:5432/mangalord
steps:
- uses: actions/checkout@v4
- name: Install build deps
# ubuntu-latest has node (so JS actions like checkout/cache run) but no
# Rust. We intentionally avoid `container: rust:1-slim` because act_runner
# runs JS actions with node *inside* the job container, and the slim Rust
# image ships no node (checkout would fail with exit 127).
- name: Install Rust + build deps
run: |
apt-get update
apt-get install -y --no-install-recommends pkg-config libssl-dev ca-certificates
set -eu
SUDO=""; [ "$(id -u)" = "0" ] || SUDO="sudo"
$SUDO apt-get update
$SUDO apt-get install -y --no-install-recommends pkg-config libssl-dev ca-certificates curl
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile minimal --default-toolchain stable
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
- name: Cache cargo registry and target
uses: actions/cache@v4
with:
@@ -66,9 +72,17 @@ jobs:
runs-on: ubuntu-latest
needs: [test-backend, test-frontend]
# PRs only run the test jobs; build + deploy are reserved for
# post-merge pushes to main. Without this gate every PR would push
# a tagged image to the registry and SSH-deploy to prod.
# post-merge pushes to main.
if: github.event_name != 'pull_request'
# Build on the host docker daemon directly (docker-outside-of-docker):
# the runner shares the deploy host's daemon, so a plain `docker build`
# reuses the host's layer cache and avoids buildx's docker-container
# driver + the gha cache exporter — neither works against this single-host
# act_runner, and there is no in-job daemon socket unless we mount it.
container:
image: docker.gitea.com/runner-images:ubuntu-latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock
outputs:
image_tag: ${{ steps.meta.outputs.image_tag }}
version: ${{ steps.meta.outputs.version }}
@@ -87,48 +101,32 @@ jobs:
echo "image_tag=${GITHUB_SHA}" >> "$GITHUB_OUTPUT"
echo "version=${version}" >> "$GITHUB_OUTPUT"
- uses: docker/setup-buildx-action@v3
- name: docker login
uses: docker/login-action@v3
with:
registry: ${{ secrets.REGISTRY_URL }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build & push backend
uses: docker/build-push-action@v5
with:
context: ./backend
push: true
tags: |
${{ secrets.REGISTRY_URL }}/mangalord-backend:latest
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.image_tag }}
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.version }}
cache-from: type=gha,scope=backend
cache-to: type=gha,mode=max,scope=backend
- name: Build & push frontend
uses: docker/build-push-action@v5
with:
context: ./frontend
push: true
tags: |
${{ secrets.REGISTRY_URL }}/mangalord-frontend:latest
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.image_tag }}
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.version }}
cache-from: type=gha,scope=frontend
cache-to: type=gha,mode=max,scope=frontend
- name: Build & push backend + frontend
env:
REGISTRY_URL: ${{ secrets.REGISTRY_URL }}
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
IMAGE_TAG: ${{ steps.meta.outputs.image_tag }}
VERSION: ${{ steps.meta.outputs.version }}
run: |
set -eu
echo "$REGISTRY_PASSWORD" | docker login "$REGISTRY_URL" -u "$REGISTRY_USERNAME" --password-stdin
for svc in backend frontend; do
img="$REGISTRY_URL/mangalord-$svc"
docker build -t "$img:$IMAGE_TAG" -t "$img:latest" -t "$img:$VERSION" "./$svc"
for tag in "$IMAGE_TAG" latest "$VERSION"; do docker push "$img:$tag"; done
done
docker logout "$REGISTRY_URL"
deploy:
runs-on: ubuntu-latest
needs: build-and-push
if: github.event_name != 'pull_request'
# Single-host deploy: the runner lives on the same box as the stack, so we
# drive the host docker daemon directly (act_runner shares its socket via
# `docker_host: "-"`) instead of SSHing out. The compose dir is bind-mounted
# at its REAL host path so compose's relative bind-mounts (./mangalord/...,
# ./Caddyfile) resolve; this requires `/mnt/ssd/docker-data` in the runner's
# drive the host docker daemon directly (the job mounts the host docker
# socket) instead of SSHing out. The compose dir is bind-mounted at its
# REAL host path so compose's relative bind-mounts (./mangalord/...,
# ./Caddyfile) resolve; both paths must be in the runner's
# container.valid_volumes. The central compose references the images as
# registry.mc02.dev/mangalord-*:${MANGALORD_TAG:-latest}, so we only pull
# and recreate the two mangalord services at the freshly built SHA.
@@ -136,6 +134,7 @@ jobs:
image: docker:cli
volumes:
- /mnt/ssd/docker-data:/mnt/ssd/docker-data
- /var/run/docker.sock:/var/run/docker.sock
steps:
- name: Deploy to the local stack
working-directory: /mnt/ssd/docker-data

2
backend/Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "mangalord"
version = "0.45.1"
version = "0.46.0"
dependencies = [
"anyhow",
"argon2",

View File

@@ -1,6 +1,6 @@
[package]
name = "mangalord"
version = "0.45.1"
version = "0.46.0"
edition = "2021"
default-run = "mangalord"

View File

@@ -123,6 +123,18 @@ async fn spawn_crawler_daemon(
}
let rate = Arc::new(rate);
let tor = crate::crawler::tor::TorController::from_parts(
cfg.tor_control_url.as_deref(),
cfg.tor_control_password.as_deref(),
cfg.tor_control_cookie_path.as_deref(),
)
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
.map(Arc::new);
if let Some(t) = &tor {
tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM");
}
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
// Browser manager. on_launch re-injects PHPSESSID on every fresh
// chromium spawn so an idle teardown followed by re-launch stays
// authenticated without operator action.
@@ -135,17 +147,24 @@ async fn spawn_crawler_daemon(
let sid = sid.clone();
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor.as_ref().map(Arc::clone);
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
let sid = sid.clone();
let domain = domain.clone();
let start_url = start_url.clone();
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
Box::pin(async move {
session::inject_phpsessid(&browser, &sid, &domain)
.await
.context("on_launch: inject_phpsessid")?;
session::verify_session(&browser, &start_url)
.await
.context("on_launch: verify_session")?;
session::verify_session_with_recircuit(
&browser,
&start_url,
tor_for_launch.as_deref(),
tor_recircuit_max,
)
.await
.context("on_launch: verify_session")?;
Ok(())
})
});
@@ -167,6 +186,7 @@ async fn spawn_crawler_daemon(
start_url: url.clone(),
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
tor: tor.as_ref().map(Arc::clone),
});
m
});
@@ -179,6 +199,7 @@ async fn spawn_crawler_daemon(
rate: Arc::clone(&rate),
download_allowlist: cfg.download_allowlist.clone(),
max_image_bytes: cfg.max_image_bytes,
tor: tor.as_ref().map(Arc::clone),
});
// Shared cancellation: daemon shutdown cancels the BrowserManager's
@@ -232,6 +253,7 @@ struct RealMetadataPass {
start_url: String,
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
#[async_trait]
@@ -248,6 +270,7 @@ impl MetadataPass for RealMetadataPass {
false,
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
)
.await;
if let Err(e) = &result {
@@ -267,6 +290,7 @@ struct RealChapterDispatcher {
rate: Arc<HostRateLimiters>,
download_allowlist: DownloadAllowlist,
max_image_bytes: usize,
tor: Option<Arc<crate::crawler::tor::TorController>>,
}
#[async_trait]
@@ -298,6 +322,7 @@ impl ChapterDispatcher for RealChapterDispatcher {
false,
&self.download_allowlist,
self.max_image_bytes,
self.tor.as_deref(),
)
.await;
drop(lease);

View File

@@ -78,6 +78,21 @@ async fn main() -> anyhow::Result<()> {
let proxy_url = std::env::var("CRAWLER_PROXY")
.ok()
.filter(|s| !s.trim().is_empty());
let tor_control_url = std::env::var("CRAWLER_TOR_CONTROL_URL")
.ok()
.filter(|s| !s.trim().is_empty());
let tor_control_password = std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
.ok()
.filter(|s| !s.trim().is_empty());
let tor_control_cookie_path = std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.map(std::path::PathBuf::from);
let tor_recircuit_max_attempts: u32 = std::env::var("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3)
.max(1);
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
let db = PgPoolOptions::new()
@@ -144,6 +159,17 @@ async fn main() -> anyhow::Result<()> {
"starting crawler"
);
let tor = mangalord::crawler::tor::TorController::from_parts(
tor_control_url.as_deref(),
tor_control_password.as_deref(),
tor_control_cookie_path.as_deref(),
)
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
.map(Arc::new);
if let Some(t) = &tor {
tracing::info!(?t, "TOR control configured");
}
// BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium
// alive for the entire run — same lifecycle as the old direct
// `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the
@@ -153,17 +179,24 @@ async fn main() -> anyhow::Result<()> {
let sid = sid.clone();
let domain = domain.clone();
let start_url_clone = start_url.clone();
let tor_for_launch = tor.as_ref().map(Arc::clone);
Arc::new(move |browser| {
let sid = sid.clone();
let domain = domain.clone();
let start_url = start_url_clone.clone();
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
Box::pin(async move {
session::inject_phpsessid(&browser, &sid, &domain)
.await
.context("inject_phpsessid")?;
session::verify_session(&browser, &start_url)
.await
.context("verify_session")?;
session::verify_session_with_recircuit(
&browser,
&start_url,
tor_for_launch.as_deref(),
tor_recircuit_max_attempts,
)
.await
.context("verify_session")?;
Ok(())
})
})
@@ -187,6 +220,7 @@ async fn main() -> anyhow::Result<()> {
skip_chapter_content || !session_ready,
chapter_workers,
force_refetch_chapters,
tor.clone(),
)
.await;
@@ -216,6 +250,7 @@ async fn run(
skip_chapter_content: bool,
chapter_workers: usize,
force_refetch_chapters: bool,
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
) -> anyhow::Result<()> {
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
if let Some(host) = cdn_host {
@@ -267,6 +302,7 @@ async fn run(
skip_chapters,
allowlist.as_ref(),
max_image_bytes,
tor.as_deref(),
)
.await?;
tracing::info!(?stats, "metadata pass complete");
@@ -283,6 +319,7 @@ async fn run(
force_refetch_chapters,
Arc::clone(&allowlist),
max_image_bytes,
tor.clone(),
)
.await?;
}
@@ -308,6 +345,7 @@ async fn sync_bookmarked_chapter_content(
force_refetch: bool,
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
max_image_bytes: usize,
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
) -> anyhow::Result<()> {
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
r#"
@@ -345,6 +383,7 @@ async fn sync_bookmarked_chapter_content(
let rate = Arc::clone(&rate);
let manager = Arc::clone(&manager);
let allowlist = Arc::clone(&allowlist);
let tor = tor.clone();
let stats = &stats;
async move {
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
@@ -371,6 +410,7 @@ async fn sync_bookmarked_chapter_content(
force_refetch,
allowlist.as_ref(),
max_image_bytes,
tor.as_deref(),
)
.await;
drop(lease);

View File

@@ -97,6 +97,20 @@ pub struct CrawlerConfig {
pub cookie_domain: Option<String>,
pub user_agent: Option<String>,
pub proxy: Option<String>,
/// `tcp://host:port`, `host:port`, or bare `host` (default port
/// 9051). When `None`, TOR-recircuit-on-transient is disabled and
/// the crawler behaves identically to pre-TOR releases.
pub tor_control_url: Option<String>,
/// HashedControlPassword auth. Used only when
/// `tor_control_cookie_path` is `None`.
pub tor_control_password: Option<String>,
/// Cookie-file auth path (e.g.
/// `/var/lib/tor/control_auth_cookie`). Takes precedence over
/// password when both are set.
pub tor_control_cookie_path: Option<PathBuf>,
/// Maximum NEWNYM-and-retry cycles per recircuit-eligible failure.
/// Defaults to 3.
pub tor_recircuit_max_attempts: u32,
pub browser: LaunchOptions,
/// Hosts the crawler is allowed to download images / covers from.
/// Always seeded with the host of `start_url` and (when set) the
@@ -124,6 +138,10 @@ impl Default for CrawlerConfig {
cookie_domain: None,
user_agent: None,
proxy: None,
tor_control_url: None,
tor_control_password: None,
tor_control_cookie_path: None,
tor_recircuit_max_attempts: 3,
browser: LaunchOptions::headless(),
download_allowlist: DownloadAllowlist::new(),
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
@@ -234,6 +252,18 @@ impl CrawlerConfig {
proxy: std::env::var("CRAWLER_PROXY")
.ok()
.filter(|s| !s.trim().is_empty()),
tor_control_url: std::env::var("CRAWLER_TOR_CONTROL_URL")
.ok()
.filter(|s| !s.trim().is_empty()),
tor_control_password: std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
.ok()
.filter(|s| !s.trim().is_empty()),
tor_control_cookie_path: std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
.ok()
.filter(|s| !s.trim().is_empty())
.map(PathBuf::from),
tor_recircuit_max_attempts: env_u64("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS", 3)
.max(1) as u32,
browser: LaunchOptions::from_env(),
download_allowlist,
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),

View File

@@ -73,39 +73,36 @@ pub enum SyncOutcome {
SessionExpired,
}
/// Fetch all images for one chapter and persist them atomically. On
/// any error after the first storage put, the DB transaction rolls
/// back so the chapter stays at `page_count = 0` and is retried on the
/// next run. Bytes already written to storage become orphans; a future
/// reaper sweeps them.
#[allow(clippy::too_many_arguments)]
pub async fn sync_chapter_content(
browser: &chromiumoxide::Browser,
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
chapter_id: Uuid,
manga_id: Uuid,
source_url: &str,
force_refetch: bool,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
) -> anyhow::Result<SyncOutcome> {
// Skip if already fetched, unless caller explicitly forces.
if !force_refetch {
let (page_count,): (i32,) =
sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_one(db)
.await
.context("read chapter page_count")?;
if page_count > 0 {
return Ok(SyncOutcome::Skipped);
}
}
/// Per-chapter max fetch attempts when TOR is configured. `N = 3` means
/// up to 3 total page fetches with 2 NEWNYM signals between them. When
/// TOR is not configured the effective budget collapses to 1 (single
/// attempt, no retry, no recircuit — bit-for-bit pre-TOR behavior).
const CHAPTER_RECIRCUIT_MAX_ATTEMPTS: u32 = 3;
// Nav to chapter page (rate-limited per host).
/// Outcome of [`fetch_chapter_html_with_recircuit`]. `Ok` carries the
/// final reader HTML; the other two map to `sync_chapter_content`'s
/// existing failure modes.
#[derive(Debug)]
enum ChapterFetchOutcome {
Ok(String),
/// `ChapterProbe::Unauthenticated` after exhausting recircuit
/// budget (or with budget=0). Caller returns
/// `SyncOutcome::SessionExpired`.
SessionExpired,
/// `ChapterProbe::Transient` after exhausting recircuit budget
/// (or with budget=0). Caller bails so the dispatcher does
/// exponential backoff.
PersistentTransient,
}
/// Single rate-limited Chromium navigation to the chapter URL,
/// returning the page HTML. Extracted from `sync_chapter_content` so
/// the recircuit loop can call it once per attempt.
async fn fetch_chapter_html_once(
browser: &chromiumoxide::Browser,
rate: &HostRateLimiters,
source_url: &str,
) -> anyhow::Result<String> {
rate.wait_for(source_url).await?;
let page = browser
.new_page(source_url)
@@ -124,28 +121,135 @@ pub async fn sync_chapter_content(
crate::crawler::nav::SELECTOR_TIMEOUT,
)
.await;
let html = page.content().await.context("read chapter html")?;
page.close().await.ok();
Ok(html)
}
// Three-way session classification: distinguishes a transient
// hiccup (broken-page body or logged-in-but-no-reader) from a
// genuine PHPSESSID expiry (no reader and no avatar widget). The
// earlier binary `#avatar_menu` check conflated both and froze
// every worker on a layout shift.
match session::classify_chapter_probe(&html) {
ChapterProbe::Unauthenticated => return Ok(SyncOutcome::SessionExpired),
ChapterProbe::Transient => {
/// Pure-over-IO loop: fetch + classify, up to `max_attempts` total
/// fetches. Between attempts, `recircuit` is invoked (a no-op when
/// TOR isn't configured). `max_attempts = 1` collapses to the
/// original single-shot behavior — `Unauthenticated` →
/// `SessionExpired`, `Transient` → `PersistentTransient` on the first
/// hit, no recircuit.
///
/// Semantics match [`crate::crawler::detect::retry_on_transient`] and
/// [`run_session_probe_loop`]: `N` is **total attempts including the
/// first**, so `N = 3` means 3 fetches and up to 2 NEWNYM calls.
/// `Unauthenticated` and `Transient` share the budget — the loop
/// doesn't distinguish, so a sequence like Transient → Unauth → Ok
/// counts as 3 attempts.
async fn fetch_chapter_html_with_recircuit<F, Fut, R, RFut>(
mut fetch: F,
mut recircuit: R,
max_attempts: u32,
source_url_for_msg: &str,
) -> anyhow::Result<ChapterFetchOutcome>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<String>>,
R: FnMut() -> RFut,
RFut: std::future::Future<Output = ()>,
{
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
let mut attempt = 0u32;
loop {
attempt += 1;
let html = fetch().await?;
match session::classify_chapter_probe(&html) {
ChapterProbe::Ok => return Ok(ChapterFetchOutcome::Ok(html)),
ChapterProbe::Unauthenticated => {
if attempt >= max_attempts {
return Ok(ChapterFetchOutcome::SessionExpired);
}
tracing::warn!(
attempt,
max = max_attempts,
url = source_url_for_msg,
"chapter probe Unauthenticated; signaling TOR NEWNYM and retrying"
);
recircuit().await;
}
ChapterProbe::Transient => {
if attempt >= max_attempts {
return Ok(ChapterFetchOutcome::PersistentTransient);
}
tracing::warn!(
attempt,
max = max_attempts,
url = source_url_for_msg,
"chapter probe Transient; signaling TOR NEWNYM and retrying"
);
recircuit().await;
}
}
}
}
/// Fetch all images for one chapter and persist them atomically. On
/// any error after the first storage put, the DB transaction rolls
/// back so the chapter stays at `page_count = 0` and is retried on the
/// next run. Bytes already written to storage become orphans; a future
/// reaper sweeps them.
#[allow(clippy::too_many_arguments)]
pub async fn sync_chapter_content(
browser: &chromiumoxide::Browser,
db: &PgPool,
storage: &dyn Storage,
http: &reqwest::Client,
rate: &HostRateLimiters,
chapter_id: Uuid,
manga_id: Uuid,
source_url: &str,
force_refetch: bool,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<SyncOutcome> {
// Skip if already fetched, unless caller explicitly forces.
if !force_refetch {
let (page_count,): (i32,) =
sqlx::query_as("SELECT page_count FROM chapters WHERE id = $1")
.bind(chapter_id)
.fetch_one(db)
.await
.context("read chapter page_count")?;
if page_count > 0 {
return Ok(SyncOutcome::Skipped);
}
}
// Fetch + classify. With TOR configured, allow up to
// CHAPTER_RECIRCUIT_MAX_ATTEMPTS total page fetches with NEWNYM
// between each. Without TOR, collapse to 1 attempt (no retry, no
// recircuit) — matches the pre-TOR single-shot behavior bit-for-bit.
let max_attempts = if tor.is_some() { CHAPTER_RECIRCUIT_MAX_ATTEMPTS } else { 1 };
let html = match fetch_chapter_html_with_recircuit(
|| fetch_chapter_html_once(browser, rate, source_url),
|| async {
if let Some(t) = tor {
if let Err(e) = t.new_identity().await {
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
}
}
},
max_attempts,
source_url,
)
.await?
{
ChapterFetchOutcome::Ok(html) => html,
ChapterFetchOutcome::SessionExpired => return Ok(SyncOutcome::SessionExpired),
ChapterFetchOutcome::PersistentTransient => {
// Surface as a typed Err so the dispatcher path runs
// ack_failed with exponential backoff (rather than the
// session-expired sticky flag).
anyhow::bail!(
"chapter page at {source_url} returned a transient response \
(broken-page body or reader didn't render); will retry"
"chapter page at {source_url} returned a transient response after \
{max_attempts} attempt(s); will retry"
);
}
ChapterProbe::Ok => {}
}
};
let images = parse_chapter_pages(&html)
.with_context(|| format!("parse chapter pages at {source_url}"))?;
@@ -304,4 +408,214 @@ mod tests {
let err = parse_chapter_pages(html).expect_err("expected Transient");
assert!(err.is_transient(), "got non-transient: {err}");
}
// --- fetch_chapter_html_with_recircuit -------------------------------
const OK_HTML: &str = r#"<html><body><a id="pic_container"><img id="page1" src="x"/></a></body></html>"#;
const UNAUTH_HTML: &str = r#"<html><body><header><div id="logo">x</div></header><main>please log in</main></body></html>"#;
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
#[tokio::test]
async fn recircuit_loop_ok_first_attempt() {
let mut recircuits = 0u32;
let mut fetches = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetches += 1;
async { Ok(OK_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok");
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
assert_eq!(fetches, 1);
assert_eq!(recircuits, 0);
}
#[tokio::test]
async fn recircuit_loop_unauth_with_single_attempt_returns_session_expired() {
// max_attempts=1 = TOR disabled, fail-fast on first Unauthenticated.
let mut recircuits = 0u32;
let mut fetches = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetches += 1;
async { Ok(UNAUTH_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
1,
"https://example/c",
)
.await
.expect("ok-result");
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
assert_eq!(fetches, 1);
assert_eq!(recircuits, 0, "no recircuit when budget is 1 (TOR disabled)");
}
#[tokio::test]
async fn recircuit_loop_unauth_then_ok_within_budget() {
// max_attempts=3 = up to 3 fetches with 2 recircuits between.
let mut recircuits = 0u32;
let mut fetch_n = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
let n = fetch_n;
async move {
if n == 1 {
Ok(UNAUTH_HTML.to_string())
} else {
Ok(OK_HTML.to_string())
}
}
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok");
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
assert_eq!(fetch_n, 2);
assert_eq!(recircuits, 1);
}
#[tokio::test]
async fn recircuit_loop_unauth_exhausts_budget_returns_session_expired() {
let mut recircuits = 0u32;
let mut fetch_n = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
async { Ok(UNAUTH_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok-result");
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
assert_eq!(fetch_n, 3, "max_attempts=3 → 3 fetches total");
assert_eq!(recircuits, 2, "2 recircuits between 3 fetches");
}
#[tokio::test]
async fn recircuit_loop_transient_then_ok_within_budget() {
let mut recircuits = 0u32;
let mut fetch_n = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
let n = fetch_n;
async move {
if n < 3 {
Ok(TRANSIENT_HTML.to_string())
} else {
Ok(OK_HTML.to_string())
}
}
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok");
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
assert_eq!(fetch_n, 3);
assert_eq!(recircuits, 2);
}
#[tokio::test]
async fn recircuit_loop_transient_exhausts_budget_returns_persistent() {
let mut recircuits = 0u32;
let mut fetch_n = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
async { Ok(TRANSIENT_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok-result");
assert!(matches!(outcome, ChapterFetchOutcome::PersistentTransient));
assert_eq!(fetch_n, 3, "max_attempts=3 → 3 fetches total");
assert_eq!(recircuits, 2, "2 recircuits between 3 fetches");
}
#[tokio::test]
async fn recircuit_loop_mixed_transient_then_unauth_then_ok_shares_budget() {
// Audit-prompted regression: outcomes share the attempt counter.
// Sequence: Transient (attempt 1) → Unauth (attempt 2) → Ok (3).
let mut recircuits = 0u32;
let mut fetch_n = 0u32;
let outcome = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
let n = fetch_n;
async move {
match n {
1 => Ok(TRANSIENT_HTML.to_string()),
2 => Ok(UNAUTH_HTML.to_string()),
_ => Ok(OK_HTML.to_string()),
}
}
},
|| {
recircuits += 1;
async {}
},
3,
"https://example/c",
)
.await
.expect("ok");
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
assert_eq!(fetch_n, 3);
assert_eq!(recircuits, 2);
}
#[tokio::test]
async fn recircuit_loop_propagates_fetch_errors() {
let mut fetch_n = 0u32;
let err = fetch_chapter_html_with_recircuit(
|| {
fetch_n += 1;
async { Err(anyhow::anyhow!("nav timeout")) }
},
|| async {},
3,
"https://example/c",
)
.await
.expect_err("fetch error bubbles");
assert_eq!(fetch_n, 1);
assert!(format!("{err:#}").contains("nav timeout"));
}
}

View File

@@ -80,13 +80,36 @@ pub fn has_logo_sentinel(doc: &scraper::Html) -> bool {
/// caller can fall back on the job system's retry/backoff once the
/// inline budget is exhausted.
pub async fn retry_on_transient<F, Fut, T>(
mut op: F,
op: F,
max_attempts: u32,
delay: Duration,
) -> Result<T, PageError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, PageError>>,
{
retry_on_transient_with_hook(op, max_attempts, delay, || async {}).await
}
/// Like [`retry_on_transient`] but invokes `on_retry` between a
/// transient failure and the subsequent sleep+retry. The hook does
/// **not** fire on the first attempt, after a non-transient error, or
/// after the final attempt (no retry follows). Hook failures are not
/// propagated — return `()` from the future and log inside if needed.
///
/// Wire the TOR controller's `new_identity` here to rotate circuits
/// between page-fetch retries; see [`crate::crawler::tor`].
pub async fn retry_on_transient_with_hook<F, Fut, T, H, HFut>(
mut op: F,
max_attempts: u32,
delay: Duration,
mut on_retry: H,
) -> Result<T, PageError>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, PageError>>,
H: FnMut() -> HFut,
HFut: Future<Output = ()>,
{
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
let mut attempt = 0u32;
@@ -101,8 +124,9 @@ where
attempt,
max_attempts,
error = %e,
"transient error; sleeping before retry"
"transient error; running on-retry hook and sleeping before retry"
);
on_retry().await;
tokio::time::sleep(delay).await;
}
}
@@ -247,4 +271,92 @@ mod tests {
assert_eq!(result.unwrap(), 7);
assert_eq!(attempt, 1);
}
#[tokio::test]
async fn hook_fires_once_between_transient_and_success() {
let mut attempt = 0u32;
let mut hook_calls = 0u32;
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|| {
attempt += 1;
let n = attempt;
async move {
if n < 2 {
Err(PageError::transient("once"))
} else {
Ok(99)
}
}
},
5,
Duration::from_millis(0),
|| {
hook_calls += 1;
async {}
},
)
.await;
assert_eq!(result.unwrap(), 99);
assert_eq!(attempt, 2);
assert_eq!(hook_calls, 1, "hook fires exactly once between attempts");
}
#[tokio::test]
async fn hook_does_not_fire_when_first_attempt_succeeds() {
let mut hook_calls = 0u32;
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|| async { Ok(1) },
5,
Duration::from_millis(0),
|| {
hook_calls += 1;
async {}
},
)
.await;
assert!(result.is_ok());
assert_eq!(hook_calls, 0);
}
#[tokio::test]
async fn hook_does_not_fire_after_non_transient_error() {
let mut hook_calls = 0u32;
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|| async { Err(PageError::Other(anyhow::anyhow!("permanent"))) },
5,
Duration::from_millis(0),
|| {
hook_calls += 1;
async {}
},
)
.await;
assert!(result.is_err());
assert_eq!(hook_calls, 0, "non-transient must short-circuit before hook");
}
#[tokio::test]
async fn hook_does_not_fire_after_final_failed_attempt() {
// With max_attempts=3 and three persistent transients, the hook
// should run twice (between 1→2 and 2→3) — never a third time,
// because no retry follows attempt 3.
let mut attempt = 0u32;
let mut hook_calls = 0u32;
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|| {
attempt += 1;
async { Err(PageError::transient("always")) }
},
3,
Duration::from_millis(0),
|| {
hook_calls += 1;
async {}
},
)
.await;
assert!(result.is_err());
assert_eq!(attempt, 3);
assert_eq!(hook_calls, 2, "hook fires N-1 times for N attempts that all fail transient");
}
}

View File

@@ -26,4 +26,5 @@ pub mod rate_limit;
pub mod safety;
pub mod session;
pub mod source;
pub mod tor;
pub mod url_utils;

View File

@@ -103,6 +103,7 @@ pub async fn run_metadata_pass(
skip_chapters: bool,
allowlist: &DownloadAllowlist,
max_image_bytes: usize,
tor: Option<&crate::crawler::tor::TorController>,
) -> anyhow::Result<MetadataStats> {
let lease = browser_manager
.acquire()
@@ -121,6 +122,7 @@ pub async fn run_metadata_pass(
let ctx = FetchContext {
browser: browser_ref,
rate,
tor,
};
let source_id = source.id();

View File

@@ -162,37 +162,123 @@ const PROBE_RETRY_DELAY: Duration = Duration::from_secs(2);
/// limiter. The trade is worth it — failing here costs ~1s; failing 30
/// minutes into a backfill costs 30 minutes.
pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> {
let mut attempt = 0u32;
verify_session_with_recircuit(browser, probe_url, None, 0).await
}
/// Like [`verify_session`] but, when `tor` is `Some`, signals
/// `SIGNAL NEWNYM` between retries on transient pages AND treats
/// `Unauthenticated` as recoverable (up to `tor_max_attempts` total
/// probes, calling NEWNYM between each).
///
/// `verify_session` is `verify_session_with_recircuit(..., None, _)`,
/// which collapses the `Unauthenticated` budget to 1 attempt — i.e.
/// fail-fast, exactly the pre-TOR behavior.
pub async fn verify_session_with_recircuit(
browser: &Browser,
probe_url: &str,
tor: Option<&crate::crawler::tor::TorController>,
tor_max_attempts: u32,
) -> anyhow::Result<()> {
let unauth_max_attempts = if tor.is_some() { tor_max_attempts.max(1) } else { 1 };
run_session_probe_loop(
|| fetch_probe_html(browser, probe_url),
|| async {
if let Some(t) = tor {
if let Err(e) = t.new_identity().await {
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
}
}
},
PROBE_MAX_ATTEMPTS,
unauth_max_attempts,
PROBE_RETRY_DELAY,
probe_url,
)
.await
}
/// Pure-over-IO loop body for the session probe. Generic over the
/// fetch and recircuit closures so it can be unit-tested without a
/// real browser or TOR daemon.
///
/// Both budgets count **total attempts**, including the first — so
/// `transient_max_attempts = 3` allows 3 fetches and 2 recircuits
/// between them, and `unauth_max_attempts = 1` means "fail-fast, no
/// retry". This matches [`crate::crawler::detect::retry_on_transient`]
/// and the content-path recircuit loop.
///
/// Outcomes:
/// - `SessionProbe::Ok` → return `Ok(())`.
/// - `SessionProbe::Unauthenticated` → recircuit + retry while
/// under the unauth budget. After the cap, bail with the
/// "PHPSESSID expired" diagnostic, mentioning the attempt count so
/// a TOR-misconfig diagnosis is easier.
/// - `SessionProbe::Transient` → same shape against the transient
/// budget; bails with "site down or rate-limiting" after the cap.
async fn run_session_probe_loop<F, Fut, R, RFut>(
mut fetch_html: F,
mut recircuit: R,
transient_max_attempts: u32,
unauth_max_attempts: u32,
retry_delay: Duration,
probe_url_for_msg: &str,
) -> anyhow::Result<()>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = anyhow::Result<String>>,
R: FnMut() -> RFut,
RFut: std::future::Future<Output = ()>,
{
debug_assert!(transient_max_attempts >= 1);
debug_assert!(unauth_max_attempts >= 1);
let mut transient_attempts = 0u32;
let mut unauth_attempts = 0u32;
loop {
attempt += 1;
let html = fetch_probe_html(browser, probe_url).await?;
let html = fetch_html().await?;
match classify_probe(&html) {
SessionProbe::Ok => {
tracing::info!(attempt, "session probe ok — #logo + #avatar_menu present");
tracing::info!(
transient_attempts,
unauth_attempts,
"session probe ok — #logo + #avatar_menu present"
);
return Ok(());
}
SessionProbe::Unauthenticated => {
return Err(anyhow!(
"session probe failed — #avatar_menu not present at {probe_url} \
(page rendered the normal layout); PHPSESSID is missing, expired, \
or revoked. Refresh CRAWLER_PHPSESSID and re-run."
));
}
SessionProbe::Transient if attempt < PROBE_MAX_ATTEMPTS => {
unauth_attempts += 1;
if unauth_attempts >= unauth_max_attempts {
return Err(anyhow!(
"session probe failed — #avatar_menu not present at {probe_url_for_msg} \
after {unauth_attempts} attempt(s); PHPSESSID is missing, \
expired, or revoked. Refresh CRAWLER_PHPSESSID and re-run."
));
}
tracing::warn!(
attempt,
max_attempts = PROBE_MAX_ATTEMPTS,
"session probe got a transient page; retrying"
attempt = unauth_attempts,
max_attempts = unauth_max_attempts,
"session probe Unauthenticated despite PHPSESSID; signaling TOR \
NEWNYM and retrying"
);
tokio::time::sleep(PROBE_RETRY_DELAY).await;
recircuit().await;
tokio::time::sleep(retry_delay).await;
}
SessionProbe::Transient => {
return Err(anyhow!(
"session probe failed — probe page at {probe_url} returned a \
broken-page response after {PROBE_MAX_ATTEMPTS} attempts. \
The site appears to be down or rate-limiting us; try again \
later before refreshing CRAWLER_PHPSESSID."
));
transient_attempts += 1;
if transient_attempts >= transient_max_attempts {
return Err(anyhow!(
"session probe failed — probe page at {probe_url_for_msg} returned \
a broken-page response after {transient_max_attempts} attempts. \
The site appears to be down or rate-limiting us; try again \
later before refreshing CRAWLER_PHPSESSID."
));
}
tracing::warn!(
attempt = transient_attempts,
max_attempts = transient_max_attempts,
"session probe got a transient page; recircuit + retry"
);
recircuit().await;
tokio::time::sleep(retry_delay).await;
}
}
}
@@ -336,6 +422,204 @@ mod tests {
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
}
// --- run_session_probe_loop -----------------------------------------
//
// These tests exercise the recircuit-aware loop without a real
// browser. The fetch and recircuit closures are mocked over Vecs of
// canned outcomes / counters.
const OK_HTML: &str = r#"<html><body><div id="logo"></div><div id="avatar_menu"></div></body></html>"#;
const UNAUTH_HTML: &str = r#"<html><body><div id="logo"></div></body></html>"#;
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
#[tokio::test]
async fn probe_loop_ok_on_first_attempt_does_not_recircuit() {
let mut recircuits = 0u32;
let mut fetched = 0u32;
run_session_probe_loop(
|| {
fetched += 1;
async { Ok(OK_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
3,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect("ok on first attempt");
assert_eq!(fetched, 1);
assert_eq!(recircuits, 0);
}
#[tokio::test]
async fn probe_loop_unauth_then_ok_when_attempt_budget_available() {
// Budget = 3 total attempts. Unauth on call 1, ok on call 2.
let mut recircuits = 0u32;
let mut call = 0u32;
run_session_probe_loop(
|| {
call += 1;
let n = call;
async move {
if n == 1 {
Ok(UNAUTH_HTML.to_string())
} else {
Ok(OK_HTML.to_string())
}
}
},
|| {
recircuits += 1;
async {}
},
3,
3,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect("recovers after one recircuit");
assert_eq!(call, 2);
assert_eq!(recircuits, 1);
}
#[tokio::test]
async fn probe_loop_unauth_with_single_attempt_budget_fails_fast() {
// Budget = 1 total attempt = no retry (matches no-TOR behavior).
let mut recircuits = 0u32;
let mut call = 0u32;
let err = run_session_probe_loop(
|| {
call += 1;
async { Ok(UNAUTH_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
1,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect_err("budget=1 → fail-fast");
assert_eq!(call, 1, "no retry when budget is 1");
assert_eq!(recircuits, 0);
let msg = format!("{err:#}");
assert!(msg.contains("Refresh CRAWLER_PHPSESSID"), "msg: {msg}");
assert!(msg.contains("after 1 attempt"), "expected attempt count in msg: {msg}");
}
#[tokio::test]
async fn probe_loop_unauth_after_exhausting_budget_emits_attempt_count() {
let mut recircuits = 0u32;
let mut call = 0u32;
let err = run_session_probe_loop(
|| {
call += 1;
async { Ok(UNAUTH_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
10, // transient budget irrelevant here
3, // 3 attempts total, 2 recircuits between
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect_err("exhausts unauth budget");
assert_eq!(call, 3);
assert_eq!(recircuits, 2);
let msg = format!("{err:#}");
assert!(msg.contains("after 3 attempt"), "expected attempt count in error, got: {msg}");
}
#[tokio::test]
async fn probe_loop_transient_repeats_until_max_then_errors() {
let mut recircuits = 0u32;
let mut call = 0u32;
let err = run_session_probe_loop(
|| {
call += 1;
async { Ok(TRANSIENT_HTML.to_string()) }
},
|| {
recircuits += 1;
async {}
},
3,
1,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect_err("transient until max → fail");
assert_eq!(call, 3);
// Recircuit fires between attempts: 3 attempts → 2 recircuits.
assert_eq!(recircuits, 2);
let msg = format!("{err:#}");
assert!(msg.contains("broken-page response after 3 attempts"), "msg: {msg}");
}
#[tokio::test]
async fn probe_loop_transient_then_ok_returns_ok_after_one_recircuit() {
let mut recircuits = 0u32;
let mut call = 0u32;
run_session_probe_loop(
|| {
call += 1;
let n = call;
async move {
if n == 1 {
Ok(TRANSIENT_HTML.to_string())
} else {
Ok(OK_HTML.to_string())
}
}
},
|| {
recircuits += 1;
async {}
},
3,
1,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect("ok on second try");
assert_eq!(call, 2);
assert_eq!(recircuits, 1);
}
#[tokio::test]
async fn probe_loop_propagates_fetch_errors_immediately() {
let mut call = 0u32;
let err = run_session_probe_loop(
|| {
call += 1;
async { Err(anyhow!("nav timeout")) }
},
|| async {},
5,
5,
Duration::from_millis(0),
"https://example/probe",
)
.await
.expect_err("fetch error bubbles");
assert_eq!(call, 1);
assert!(format!("{err:#}").contains("nav timeout"));
}
#[test]
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
// Defensive: if a broken-page body somehow contains an

View File

@@ -67,6 +67,10 @@ pub struct SourceChapter {
pub struct FetchContext<'a> {
pub browser: &'a Browser,
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
/// Optional TOR control-port client. When `Some`, retry helpers
/// signal `NEWNYM` between transient-page attempts so the next try
/// draws a fresh exit. `None` keeps pre-TOR behavior.
pub tor: Option<&'a crate::crawler::tor::TorController>,
}
/// Lazy iterator over discovered manga refs. The caller drives the

View File

@@ -18,7 +18,7 @@ use super::{
SourceMangaRef,
};
use crate::crawler::detect::{
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
has_logo_sentinel, is_broken_page_body, retry_on_transient_with_hook, PageError,
};
use crate::crawler::nav::{wait_for_nav, wait_for_selector, NavError, SELECTOR_TIMEOUT};
@@ -79,12 +79,13 @@ impl Source for TargetSource {
// and the HTML is handed straight to the first `next_batch` call
// so the walker doesn't re-fetch it. Page count is discovered
// incrementally — see `TargetSourceWalker::next_batch`.
let first_html = retry_on_transient(
let first_html = retry_on_transient_with_hook(
|| async {
navigate(ctx, self.base_url.as_str(), LIST_PAGE_MARKER).await
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
|| async { recircuit_if_configured(ctx.tor).await },
)
.await?;
@@ -169,7 +170,7 @@ impl DiscoverWalk for TargetSourceWalker {
parse_manga_list_from(&doc)?
}
None => {
retry_on_transient(
retry_on_transient_with_hook(
|| async {
let html = navigate(
ctx,
@@ -182,12 +183,13 @@ impl DiscoverWalk for TargetSourceWalker {
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
|| async { recircuit_if_configured(ctx.tor).await },
)
.await?
}
}
} else {
retry_on_transient(
retry_on_transient_with_hook(
|| async {
let url = page_url(&self.base_url, page_num);
let html = navigate(ctx, &url, LIST_PAGE_MARKER).await?;
@@ -196,6 +198,7 @@ impl DiscoverWalk for TargetSourceWalker {
},
PAGE_TRANSIENT_RETRY_ATTEMPTS,
PAGE_TRANSIENT_RETRY_DELAY,
|| async { recircuit_if_configured(ctx.tor).await },
)
.await?
};
@@ -274,6 +277,20 @@ fn classify_navigate_html(html: String) -> Result<String, PageError> {
Ok(html)
}
/// Hook for [`retry_on_transient_with_hook`]: when TOR is configured,
/// signal `NEWNYM` so the next navigation draws a fresh exit. Errors
/// from the controller are logged and swallowed — failing to recircuit
/// shouldn't take down the crawl, the next attempt just runs on the
/// same circuit as before.
async fn recircuit_if_configured(tor: Option<&crate::crawler::tor::TorController>) {
if let Some(t) = tor {
if let Err(e) = t.new_identity().await {
tracing::warn!(error = %e, "TOR NEWNYM failed; retrying on same circuit");
}
}
}
/// Substitutes the first `/N/` path segment with the target page
/// number. Source impls that paginate via a different URL shape can
/// override this — for the modeled site the segment is always present.

446
backend/src/crawler/tor.rs Normal file
View File

@@ -0,0 +1,446 @@
//! TOR control-port client for `SIGNAL NEWNYM` ("recircuit").
//!
//! The crawler can be proxied through TOR (`CRAWLER_PROXY=socks5h://tor:9050`)
//! to randomize the exit IP seen by the target site. When the target
//! returns a "bad page" (its broken-template body, missing layout
//! sentinel, or unauthenticated probe despite a valid PHPSESSID), it
//! is often the current exit being rate-limited or fingerprinted rather
//! than a real failure. Asking the local TOR daemon for a new identity
//! over its control port (port 9051 by default) makes subsequent
//! connections draw a fresh circuit; combined with `IsolateDestAddr`
//! in torrc this is usually enough to clear the failure.
//!
//! Scope is deliberately tiny — `AUTHENTICATE` + `SIGNAL NEWNYM` over
//! a one-shot TCP connection. No `torut` dep, no hidden-service
//! plumbing, no event streaming.
//!
//! **Caveat for in-flight connections:** Chromium reuses sockets, so a
//! `NEWNYM` only affects *new* connections (in TOR terms, new circuits).
//! That's fine for our retry path — the next navigation opens a fresh
//! connection. We do not try to forcibly close existing streams.
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{anyhow, bail, Context};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::time::timeout;
/// Default control-port (`tor --defaults-torrc` ships 9051).
const DEFAULT_CONTROL_PORT: u16 = 9051;
/// Connect timeout — generous enough for a slow compose start, short
/// enough that a misconfigured controller doesn't stall a crawl.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
/// Per-command read timeout. `SIGNAL NEWNYM` returns instantly on the
/// happy path; bound it so a half-broken control port can't hang us.
const READ_TIMEOUT: Duration = Duration::from_secs(5);
/// How the controller authenticates to the control port.
///
/// `Cookie` is preferred for compose deploys where the auth cookie file
/// is shared between the `tor` and `backend` containers via a named
/// volume. `Password` is the fallback when the cookie file isn't
/// reachable (different gid, no shared volume, etc.). `None` matches a
/// torrc with no `CookieAuthentication 1` and no `HashedControlPassword`
/// — useful for local experimentation, not for production.
///
/// `Debug` is implemented manually to redact the password (and the
/// cookie path, which is non-sensitive but uninteresting in logs).
/// Don't add `#[derive(Debug)]` — the controller is `?`-logged at
/// startup and a derive would expand the password into the trace.
#[derive(Clone)]
pub enum TorAuth {
None,
Password(String),
Cookie(PathBuf),
}
impl std::fmt::Debug for TorAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TorAuth::None => f.write_str("None"),
TorAuth::Password(_) => f.write_str("Password(<redacted>)"),
TorAuth::Cookie(_) => f.write_str("Cookie(<path>)"),
}
}
}
#[derive(Debug, Clone)]
pub struct TorController {
/// `host:port` string. Kept as a string (not a `SocketAddr`) so
/// docker-compose hostnames like `tor:9051` resolve at connect time.
addr: String,
auth: TorAuth,
}
impl TorController {
pub fn new(addr: impl Into<String>, auth: TorAuth) -> Self {
Self { addr: addr.into(), auth }
}
/// Build a controller from the env-config shape:
/// `url` (e.g. `tcp://tor:9051`, `127.0.0.1:9051`, or `tor`),
/// optional password, optional cookie path. Returns `Ok(None)` when
/// `url` is absent — that's the "TOR feature disabled" signal.
/// Cookie wins over password when both are set (rotates with TOR;
/// no secret to manage).
pub fn from_parts(
url: Option<&str>,
password: Option<&str>,
cookie_path: Option<&Path>,
) -> anyhow::Result<Option<Self>> {
let Some(url) = url else { return Ok(None) };
let addr = parse_control_url(url)?;
let auth = match (cookie_path, password) {
(Some(p), _) => TorAuth::Cookie(p.to_path_buf()),
(None, Some(p)) => TorAuth::Password(p.to_string()),
(None, None) => TorAuth::None,
};
Ok(Some(Self { addr, auth }))
}
/// Open the control port, `AUTHENTICATE`, `SIGNAL NEWNYM`, `QUIT`.
/// Each invocation is a fresh connection; the controller is cheap
/// to clone and stateless across calls.
pub async fn new_identity(&self) -> anyhow::Result<()> {
let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&self.addr))
.await
.with_context(|| {
format!("timed out connecting to TOR control port {}", self.addr)
})?
.with_context(|| format!("connect to TOR control port {}", self.addr))?;
let (read, mut write) = stream.into_split();
let mut read = BufReader::new(read);
let auth_line = self.build_auth_line().await?;
write_line(&mut write, &auth_line).await?;
timeout(READ_TIMEOUT, expect_250(&mut read))
.await
.map_err(|_| anyhow!("TOR control AUTHENTICATE timed out"))?
.context("AUTHENTICATE")?;
write_line(&mut write, "SIGNAL NEWNYM").await?;
timeout(READ_TIMEOUT, expect_250(&mut read))
.await
.map_err(|_| anyhow!("TOR control SIGNAL NEWNYM timed out"))?
.context("SIGNAL NEWNYM")?;
// QUIT is courtesy; ignore errors — the daemon may close the
// socket before our QUIT lands and that's perfectly fine.
let _ = write_line(&mut write, "QUIT").await;
// Debug-level: a busy crawl can rotate circuits many times per
// minute, INFO is too chatty. Failures still log at WARN.
tracing::debug!(addr = %self.addr, "TOR NEWNYM signaled");
Ok(())
}
async fn build_auth_line(&self) -> anyhow::Result<String> {
match &self.auth {
TorAuth::None => Ok("AUTHENTICATE".to_string()),
TorAuth::Password(p) => Ok(format!("AUTHENTICATE \"{}\"", escape_quoted(p))),
TorAuth::Cookie(path) => {
let bytes = tokio::fs::read(path)
.await
.with_context(|| format!("read TOR cookie file {}", path.display()))?;
Ok(format!("AUTHENTICATE {}", hex_encode(&bytes)))
}
}
}
}
/// Parse `tcp://host:port`, `host:port`, or bare `host` into a
/// connect-time string. Default port is [`DEFAULT_CONTROL_PORT`].
fn parse_control_url(url: &str) -> anyhow::Result<String> {
let stripped = url.strip_prefix("tcp://").unwrap_or(url);
if stripped.is_empty() {
bail!("TOR control url is empty");
}
if stripped.contains(':') {
Ok(stripped.to_string())
} else {
Ok(format!("{stripped}:{DEFAULT_CONTROL_PORT}"))
}
}
fn escape_quoted(s: &str) -> String {
s.replace('\\', r"\\").replace('"', r#"\""#)
}
fn hex_encode(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
async fn write_line<W: tokio::io::AsyncWrite + Unpin>(
w: &mut W,
line: &str,
) -> anyhow::Result<()> {
w.write_all(line.as_bytes()).await?;
w.write_all(b"\r\n").await?;
w.flush().await?;
Ok(())
}
/// Drain a TOR control reply, accepting only status `250`. Handles
/// the protocol's three line forms: `XYZ ...` (single/end), `XYZ-...`
/// (continuation), `XYZ+...` (data block ended by a lone `.`). Our
/// commands only ever produce single-line `250 OK`, but we honor the
/// continuation forms so a future torrc that adds events / banners
/// doesn't confuse the parser.
async fn expect_250<R: AsyncBufReadExt + Unpin>(r: &mut R) -> anyhow::Result<()> {
loop {
let mut line = String::new();
let n = r.read_line(&mut line).await?;
if n == 0 {
bail!("TOR control port closed connection mid-reply");
}
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.len() < 4 {
bail!("malformed TOR control reply: {trimmed:?}");
}
let (code, rest) = trimmed.split_at(3);
if code != "250" {
bail!("TOR control replied {trimmed:?}");
}
let sep = rest.as_bytes()[0];
match sep {
b' ' => return Ok(()),
b'-' => continue,
b'+' => {
// Data block — read until a line consisting of only ".".
loop {
let mut data = String::new();
let n = r.read_line(&mut data).await?;
if n == 0 {
bail!("TOR control port closed mid-data-block");
}
if data.trim_end_matches(['\r', '\n']) == "." {
break;
}
}
}
_ => bail!("malformed TOR control reply separator: {trimmed:?}"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
/// Spawn a mock control port that responds to each \r\n-terminated
/// inbound line with the next entry from `replies`. Each reply has
/// its own `\r\n` appended. Records received lines into `recorder`.
/// After `replies.len()` exchanges the task drops the socket — this
/// matches the real TOR behavior for QUIT (close after acking).
async fn spawn_mock(
replies: Vec<&'static str>,
recorder: Arc<Mutex<Vec<String>>>,
) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
tokio::spawn(async move {
let (sock, _) = listener.accept().await.unwrap();
let (r, mut w) = sock.into_split();
let mut r = BufReader::new(r);
for reply in replies {
let mut line = String::new();
let n = r.read_line(&mut line).await.unwrap_or(0);
if n == 0 {
return;
}
recorder
.lock()
.unwrap()
.push(line.trim_end_matches(['\r', '\n']).to_string());
w.write_all(reply.as_bytes()).await.unwrap();
w.write_all(b"\r\n").await.unwrap();
w.flush().await.unwrap();
}
});
addr
}
#[tokio::test]
async fn password_auth_then_newnym_writes_expected_sequence() {
let recorder = Arc::new(Mutex::new(Vec::new()));
// Two replies: AUTHENTICATE then SIGNAL NEWNYM. QUIT is
// fire-and-forget; the mock dropping the socket is the
// expected real-world behavior.
let addr =
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
let controller = TorController::new(addr, TorAuth::Password("secret".into()));
controller.new_identity().await.expect("new_identity ok");
let recorded = recorder.lock().unwrap().clone();
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE \"secret\""));
assert_eq!(recorded.get(1).map(String::as_str), Some("SIGNAL NEWNYM"));
}
#[tokio::test]
async fn cookie_auth_hex_encodes_file_bytes() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let cookie: Vec<u8> = (0u8..32).collect();
std::fs::write(tmp.path(), &cookie).unwrap();
let recorder = Arc::new(Mutex::new(Vec::new()));
let addr =
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
let controller =
TorController::new(addr, TorAuth::Cookie(tmp.path().to_path_buf()));
controller.new_identity().await.expect("new_identity ok");
let recorded = recorder.lock().unwrap().clone();
let expected_hex: String = cookie.iter().map(|b| format!("{b:02x}")).collect();
assert_eq!(
recorded.first().map(String::as_str),
Some(format!("AUTHENTICATE {expected_hex}").as_str())
);
}
#[tokio::test]
async fn no_auth_sends_bare_authenticate() {
let recorder = Arc::new(Mutex::new(Vec::new()));
let addr =
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
let controller = TorController::new(addr, TorAuth::None);
controller.new_identity().await.expect("new_identity ok");
let recorded = recorder.lock().unwrap().clone();
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE"));
}
#[tokio::test]
async fn non_250_reply_returns_err_with_reply_text() {
let recorder = Arc::new(Mutex::new(Vec::new()));
let addr = spawn_mock(
vec!["515 Bad authentication"],
Arc::clone(&recorder),
)
.await;
let controller =
TorController::new(addr, TorAuth::Password("wrong".into()));
let err = controller.new_identity().await.expect_err("should fail");
let msg = format!("{err:#}");
assert!(msg.contains("515"), "expected 515 in error, got: {msg}");
}
#[tokio::test]
async fn closed_connection_mid_reply_is_an_error() {
// Listener accepts the AUTH line then drops without replying —
// this exercises the EOF-mid-reply path in expect_250 (rather
// than tor's own error replies which are covered elsewhere).
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
tokio::spawn(async move {
if let Ok((sock, _)) = listener.accept().await {
let (r, _w) = sock.into_split();
let mut r = BufReader::new(r);
let mut line = String::new();
let _ = r.read_line(&mut line).await; // read AUTH, ignore
// Drop _w (and the read half via scope exit) so the
// peer sees an immediate EOF on the next read.
}
});
let controller = TorController::new(addr, TorAuth::None);
let err = controller.new_identity().await.expect_err("should fail");
let msg = format!("{err:#}");
assert!(
msg.contains("closed connection"),
"expected EOF-mid-reply error, got: {msg}"
);
}
#[tokio::test]
async fn multi_line_250_continuation_is_accepted() {
let recorder = Arc::new(Mutex::new(Vec::new()));
// AUTHENTICATE reply uses the `250-...\r\n250 OK\r\n` form.
// Single reply string contains the whole multi-line response.
let addr = spawn_mock(
vec!["250-banner=foo\r\n250 OK", "250 OK"],
Arc::clone(&recorder),
)
.await;
let controller = TorController::new(addr, TorAuth::None);
controller.new_identity().await.expect("new_identity ok");
}
#[test]
fn from_parts_returns_none_when_url_unset() {
let c = TorController::from_parts(None, None, None).unwrap();
assert!(c.is_none());
}
#[test]
fn from_parts_prefers_cookie_over_password() {
let c = TorController::from_parts(
Some("tor:9051"),
Some("pw"),
Some(Path::new("/var/lib/tor/control_auth_cookie")),
)
.unwrap()
.expect("controller built");
assert!(matches!(c.auth, TorAuth::Cookie(_)));
}
#[test]
fn from_parts_falls_back_to_password_without_cookie() {
let c = TorController::from_parts(Some("tor:9051"), Some("pw"), None)
.unwrap()
.expect("controller built");
assert!(matches!(c.auth, TorAuth::Password(p) if p == "pw"));
}
#[test]
fn parse_control_url_accepts_tcp_scheme() {
assert_eq!(parse_control_url("tcp://127.0.0.1:9051").unwrap(), "127.0.0.1:9051");
}
#[test]
fn parse_control_url_defaults_port_when_omitted() {
assert_eq!(parse_control_url("tor").unwrap(), "tor:9051");
}
#[test]
fn parse_control_url_passes_through_host_port() {
assert_eq!(parse_control_url("tor:9999").unwrap(), "tor:9999");
}
#[test]
fn parse_control_url_rejects_empty() {
assert!(parse_control_url("").is_err());
assert!(parse_control_url("tcp://").is_err());
}
#[test]
fn escape_quoted_handles_quotes_and_backslashes() {
assert_eq!(escape_quoted(r#"a"b\c"#), r#"a\"b\\c"#);
}
#[test]
fn debug_format_redacts_password_and_cookie_path() {
// Regression: app.rs / bin/crawler.rs log the controller at
// startup via `tracing::info!(?t, ...)`. A derived Debug on
// TorAuth would expand TorAuth::Password(p) and leak the
// plaintext into logs.
let c = TorController::new("tor:9051", TorAuth::Password("super-secret".into()));
let dbg = format!("{c:?}");
assert!(!dbg.contains("super-secret"), "password leaked: {dbg}");
assert!(dbg.contains("<redacted>"), "expected <redacted>, got: {dbg}");
let c = TorController::new(
"tor:9051",
TorAuth::Cookie("/var/lib/tor/control_auth_cookie".into()),
);
let dbg = format!("{c:?}");
assert!(!dbg.contains("control_auth_cookie"), "cookie path leaked: {dbg}");
}
#[test]
fn hex_encode_zero_pads_low_bytes() {
assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff");
}
}

View File

@@ -19,11 +19,48 @@ services:
timeout: 5s
retries: 10
tor:
# SOCKS5 proxy for the crawler, plus a control port so the backend
# can signal NEWNYM on bad pages. See tor/torrc for the daemon
# config; both ports are only `expose`d (compose-internal), never
# bound on the host.
#
# We bypass dockurr/tor's stock entrypoint because it binds the
# control port to localhost (unreachable from the backend
# container) and skips its own HashedControlPassword injection
# when the user's torrc declares a ControlPort. Our wrapper
# (tor/entrypoint.sh) generates the hash from $PASSWORD and execs
# tor with our torrc. Backend authenticates with the same plain
# string via CRAWLER_TOR_CONTROL_PASSWORD.
image: dockurr/tor:latest
entrypoint: ["/bin/sh", "/usr/local/bin/mangalord-entrypoint.sh"]
environment:
PASSWORD: ${TOR_CONTROL_PASSWORD:?TOR_CONTROL_PASSWORD must be set in .env}
volumes:
- ./tor/torrc:/etc/tor/torrc:ro
- ./tor/entrypoint.sh:/usr/local/bin/mangalord-entrypoint.sh:ro
expose:
- "9050"
- "9051"
# Wait for both control + SOCKS ports to listen before downstream
# services start. dockurr/tor's main process spawns before tor
# itself is bound, so `service_started` alone races the first
# NEWNYM call.
healthcheck:
test: ["CMD-SHELL", "nc -z 127.0.0.1 9050 && nc -z 127.0.0.1 9051"]
interval: 5s
timeout: 5s
retries: 20
start_period: 30s
restart: unless-stopped
backend:
build: ./backend
depends_on:
postgres:
condition: service_healthy
tor:
condition: service_healthy
environment:
DATABASE_URL: postgres://${POSTGRES_USER:-mangalord}:${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set in .env}@postgres:5432/${POSTGRES_DB:-mangalord}
BIND_ADDRESS: 0.0.0.0:8080
@@ -44,6 +81,16 @@ services:
# arm64 deployments. Pair with `--build-arg INSTALL_CHROMIUM=true`
# so the image actually contains the binary.
CRAWLER_CHROMIUM_BINARY: ${CRAWLER_CHROMIUM_BINARY:-}
# TOR proxy + NEWNYM recircuit (see .env.example for details).
# Defaults assume the bundled `tor` service above; override
# CRAWLER_PROXY= and CRAWLER_TOR_CONTROL_URL= (both empty) in
# .env to disable. CRAWLER_TOR_CONTROL_PASSWORD MUST match the
# tor service's PASSWORD (both wired to the same TOR_CONTROL_PASSWORD
# .env var below).
CRAWLER_PROXY: ${CRAWLER_PROXY-socks5h://tor:9050}
CRAWLER_TOR_CONTROL_URL: ${CRAWLER_TOR_CONTROL_URL-tcp://tor:9051}
CRAWLER_TOR_CONTROL_PASSWORD: ${TOR_CONTROL_PASSWORD:?TOR_CONTROL_PASSWORD must be set in .env}
CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS: ${CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS:-3}
volumes:
- storage-data:/var/lib/mangalord/storage
# No host port mapping in the default setup — the frontend proxies

View File

@@ -1,6 +1,6 @@
{
"name": "mangalord-frontend",
"version": "0.45.1",
"version": "0.46.0",
"private": true,
"type": "module",
"scripts": {

40
tor/entrypoint.sh Executable file
View File

@@ -0,0 +1,40 @@
#!/bin/sh
# Mangalord wrapper around dockurr/tor's tor binary.
#
# We bypass the image's stock entrypoint for two reasons:
# 1. It generates a `ControlPort 9051` line that binds to localhost
# only (tor's default), but our backend lives in a separate
# container and needs to reach 0.0.0.0:9051.
# 2. It then *skips* writing HashedControlPassword whenever the
# user's torrc declares a ControlPort, so we can't both bind to
# 0.0.0.0 and benefit from its auto-hashing — it's one or the
# other. Doing the hashing ourselves is simpler than threading
# around its logic.
#
# This wrapper hashes $PASSWORD with `tor --hash-password`, appends a
# `HashedControlPassword` line to a writable copy of /etc/tor/torrc,
# then execs tor. Container runs as root (image default); tor binds
# 9050/9051 which don't require root and is fine inside a single-
# purpose container.
set -eu
if [ -z "${PASSWORD:-}" ]; then
echo "ERROR: PASSWORD env must be set (the plain string the backend will" >&2
echo " send as CRAWLER_TOR_CONTROL_PASSWORD)" >&2
exit 1
fi
# `tor --hash-password` prints the hash on the last line of stdout
# (preceded by initialization noise).
HASH=$(tor --hash-password "$PASSWORD" 2>/dev/null | tail -n1)
if [ -z "$HASH" ]; then
echo "ERROR: 'tor --hash-password' produced no output" >&2
exit 1
fi
# /etc/tor/torrc is bind-mounted read-only, so copy + append.
cp /etc/tor/torrc /tmp/torrc
printf '\n# Injected by mangalord-entrypoint.sh from $PASSWORD env.\nHashedControlPassword %s\n' "$HASH" >> /tmp/torrc
exec tor -f /tmp/torrc

38
tor/torrc Normal file
View File

@@ -0,0 +1,38 @@
# torrc for the Mangalord crawler.
#
# Mounted into the dockurr/tor container at /etc/tor/torrc. The
# crawler talks to this daemon over the internal compose network only:
# `expose:` on the tor service surfaces 9050/9051 to sibling
# containers, never to the host.
# SOCKS5 proxy that reqwest and Chromium use. IsolateDestAddr +
# IsolateDestPort means each new (destination IP, port) draws a fresh
# circuit — so a SIGNAL NEWNYM picks up promptly on the next
# navigation instead of having to wait for an existing dirty circuit
# to age out.
SOCKSPort 0.0.0.0:9050 IsolateDestAddr IsolateDestPort
# Control port for SIGNAL NEWNYM. We rely on the dockurr/tor
# entrypoint to inject `HashedControlPassword <hash>` from its
# PASSWORD env var (see docker-compose.yml `tor.environment.PASSWORD`)
# via a higher-priority --defaults-torrc. We just need to declare the
# port itself here.
ControlPort 0.0.0.0:9051
# Keep circuits dirty for a while so a single chapter (which serial-
# fetches all its images through the same SOCKS endpoint) finishes on
# one circuit rather than mid-circuit-rotating in a way that looks like
# anti-bot evasion to the target. NEWNYM still forces a fresh circuit
# immediately when we want one — this is just the idle-rotation knob.
MaxCircuitDirtiness 600
# Drop privileges to the image's `tor` user after binding ports.
# Required because /var/lib/tor (the image's DataDirectory volume)
# is owned by tor:tor and tor refuses to use a data dir it doesn't
# own. Our entrypoint runs as root only so it can call
# `tor --hash-password` and write /tmp/torrc.
User tor
# Data + logs.
DataDirectory /var/lib/tor
Log notice stdout