Compare commits
2 Commits
feat/tor-p
...
feat/crawl
| Author | SHA1 | Date | |
|---|---|---|---|
| 2f9037e210 | |||
|
|
0b5f5d1692 |
37
.env.example
37
.env.example
@@ -83,43 +83,6 @@ CRAWLER_MAX_IMAGE_BYTES=33554432
|
|||||||
# the image actually contains the binary.
|
# the image actually contains the binary.
|
||||||
CRAWLER_CHROMIUM_BINARY=
|
CRAWLER_CHROMIUM_BINARY=
|
||||||
|
|
||||||
# ----- Crawler TOR proxy + recircuit -----
|
|
||||||
# The compose stack ships a `tor` service (dockurr/tor) and defaults
|
|
||||||
# CRAWLER_PROXY to it, so by default all crawler traffic exits via the
|
|
||||||
# TOR network. To opt out, set CRAWLER_PROXY= (empty) AND
|
|
||||||
# CRAWLER_TOR_CONTROL_URL= (empty) below — the tor service can stay
|
|
||||||
# running, it just won't be used.
|
|
||||||
#
|
|
||||||
# Going through TOR adds latency to every fetch; image downloads in
|
|
||||||
# particular slow noticeably. The win is on sites that rate-limit or
|
|
||||||
# fingerprint by exit IP — NEWNYM recirculation makes a fresh exit
|
|
||||||
# cheap to reach for.
|
|
||||||
#
|
|
||||||
# CRAWLER_PROXY: SOCKS5(h) URL. Use `socks5h://` (not `socks5://`) so
|
|
||||||
# DNS resolution also goes through TOR, avoiding leaks via the host's
|
|
||||||
# resolver. Leave unset to talk to the upstream directly.
|
|
||||||
CRAWLER_PROXY=socks5h://tor:9050
|
|
||||||
# Control-port URL for SIGNAL NEWNYM ("get a fresh circuit"). Triggered
|
|
||||||
# automatically on bad pages (broken-page body, missing #logo) and on
|
|
||||||
# the Unauthenticated session probe outcome. Leave unset to disable
|
|
||||||
# the recircuit feature (the SOCKS proxy still works).
|
|
||||||
CRAWLER_TOR_CONTROL_URL=tcp://tor:9051
|
|
||||||
# Max NEWNYM-and-retry cycles per recircuit-eligible failure. Default 3.
|
|
||||||
CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS=3
|
|
||||||
|
|
||||||
# ----- TOR control-port password -----
|
|
||||||
# Shared between the bundled dockurr/tor service (which hashes it into
|
|
||||||
# its HashedControlPassword) and the backend's
|
|
||||||
# CRAWLER_TOR_CONTROL_PASSWORD. REQUIRED — docker-compose.yml fails
|
|
||||||
# fast if absent. Generate a strong random string; rotate by setting
|
|
||||||
# a new value and restarting both `tor` and `backend`.
|
|
||||||
#
|
|
||||||
# Operators running their own non-dockurr tor daemon with cookie-file
|
|
||||||
# auth can ignore this var and instead set
|
|
||||||
# CRAWLER_TOR_CONTROL_COOKIE_PATH on the backend — the TorController
|
|
||||||
# prefers cookie when both are present.
|
|
||||||
TOR_CONTROL_PASSWORD=change-me-to-a-strong-random-string
|
|
||||||
|
|
||||||
# ----- Frontend -----
|
# ----- Frontend -----
|
||||||
# The frontend container runs SvelteKit's Node adapter on :3000 and
|
# The frontend container runs SvelteKit's Node adapter on :3000 and
|
||||||
# proxies /api/* to BACKEND_URL via src/hooks.server.ts. In compose the
|
# proxies /api/* to BACKEND_URL via src/hooks.server.ts. In compose the
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ on:
|
|||||||
jobs:
|
jobs:
|
||||||
test-backend:
|
test-backend:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: rust:1-slim
|
||||||
services:
|
services:
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:16-alpine
|
image: postgres:16-alpine
|
||||||
@@ -26,18 +28,10 @@ jobs:
|
|||||||
DATABASE_URL: postgres://mangalord:mangalord@postgres:5432/mangalord
|
DATABASE_URL: postgres://mangalord:mangalord@postgres:5432/mangalord
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
# ubuntu-latest has node (so JS actions like checkout/cache run) but no
|
- name: Install build deps
|
||||||
# Rust. We intentionally avoid `container: rust:1-slim` because act_runner
|
|
||||||
# runs JS actions with node *inside* the job container, and the slim Rust
|
|
||||||
# image ships no node (checkout would fail with exit 127).
|
|
||||||
- name: Install Rust + build deps
|
|
||||||
run: |
|
run: |
|
||||||
set -eu
|
apt-get update
|
||||||
SUDO=""; [ "$(id -u)" = "0" ] || SUDO="sudo"
|
apt-get install -y --no-install-recommends pkg-config libssl-dev ca-certificates
|
||||||
$SUDO apt-get update
|
|
||||||
$SUDO apt-get install -y --no-install-recommends pkg-config libssl-dev ca-certificates curl
|
|
||||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --profile minimal --default-toolchain stable
|
|
||||||
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
|
|
||||||
- name: Cache cargo registry and target
|
- name: Cache cargo registry and target
|
||||||
uses: actions/cache@v4
|
uses: actions/cache@v4
|
||||||
with:
|
with:
|
||||||
@@ -72,17 +66,9 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs: [test-backend, test-frontend]
|
needs: [test-backend, test-frontend]
|
||||||
# PRs only run the test jobs; build + deploy are reserved for
|
# PRs only run the test jobs; build + deploy are reserved for
|
||||||
# post-merge pushes to main.
|
# post-merge pushes to main. Without this gate every PR would push
|
||||||
|
# a tagged image to the registry and SSH-deploy to prod.
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name != 'pull_request'
|
||||||
# Build on the host docker daemon directly (docker-outside-of-docker):
|
|
||||||
# the runner shares the deploy host's daemon, so a plain `docker build`
|
|
||||||
# reuses the host's layer cache and avoids buildx's docker-container
|
|
||||||
# driver + the gha cache exporter — neither works against this single-host
|
|
||||||
# act_runner, and there is no in-job daemon socket unless we mount it.
|
|
||||||
container:
|
|
||||||
image: docker.gitea.com/runner-images:ubuntu-latest
|
|
||||||
volumes:
|
|
||||||
- /var/run/docker.sock:/var/run/docker.sock
|
|
||||||
outputs:
|
outputs:
|
||||||
image_tag: ${{ steps.meta.outputs.image_tag }}
|
image_tag: ${{ steps.meta.outputs.image_tag }}
|
||||||
version: ${{ steps.meta.outputs.version }}
|
version: ${{ steps.meta.outputs.version }}
|
||||||
@@ -101,32 +87,48 @@ jobs:
|
|||||||
echo "image_tag=${GITHUB_SHA}" >> "$GITHUB_OUTPUT"
|
echo "image_tag=${GITHUB_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
echo "version=${version}" >> "$GITHUB_OUTPUT"
|
echo "version=${version}" >> "$GITHUB_OUTPUT"
|
||||||
|
|
||||||
- name: Build & push backend + frontend
|
- uses: docker/setup-buildx-action@v3
|
||||||
env:
|
|
||||||
REGISTRY_URL: ${{ secrets.REGISTRY_URL }}
|
- name: docker login
|
||||||
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
|
uses: docker/login-action@v3
|
||||||
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
|
with:
|
||||||
IMAGE_TAG: ${{ steps.meta.outputs.image_tag }}
|
registry: ${{ secrets.REGISTRY_URL }}
|
||||||
VERSION: ${{ steps.meta.outputs.version }}
|
username: ${{ secrets.REGISTRY_USERNAME }}
|
||||||
run: |
|
password: ${{ secrets.REGISTRY_PASSWORD }}
|
||||||
set -eu
|
|
||||||
echo "$REGISTRY_PASSWORD" | docker login "$REGISTRY_URL" -u "$REGISTRY_USERNAME" --password-stdin
|
- name: Build & push backend
|
||||||
for svc in backend frontend; do
|
uses: docker/build-push-action@v5
|
||||||
img="$REGISTRY_URL/mangalord-$svc"
|
with:
|
||||||
docker build -t "$img:$IMAGE_TAG" -t "$img:latest" -t "$img:$VERSION" "./$svc"
|
context: ./backend
|
||||||
for tag in "$IMAGE_TAG" latest "$VERSION"; do docker push "$img:$tag"; done
|
push: true
|
||||||
done
|
tags: |
|
||||||
docker logout "$REGISTRY_URL"
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:latest
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.image_tag }}
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.version }}
|
||||||
|
cache-from: type=gha,scope=backend
|
||||||
|
cache-to: type=gha,mode=max,scope=backend
|
||||||
|
|
||||||
|
- name: Build & push frontend
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: ./frontend
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:latest
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.image_tag }}
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.version }}
|
||||||
|
cache-from: type=gha,scope=frontend
|
||||||
|
cache-to: type=gha,mode=max,scope=frontend
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
needs: build-and-push
|
needs: build-and-push
|
||||||
if: github.event_name != 'pull_request'
|
if: github.event_name != 'pull_request'
|
||||||
# Single-host deploy: the runner lives on the same box as the stack, so we
|
# Single-host deploy: the runner lives on the same box as the stack, so we
|
||||||
# drive the host docker daemon directly (the job mounts the host docker
|
# drive the host docker daemon directly (act_runner shares its socket via
|
||||||
# socket) instead of SSHing out. The compose dir is bind-mounted at its
|
# `docker_host: "-"`) instead of SSHing out. The compose dir is bind-mounted
|
||||||
# REAL host path so compose's relative bind-mounts (./mangalord/...,
|
# at its REAL host path so compose's relative bind-mounts (./mangalord/...,
|
||||||
# ./Caddyfile) resolve; both paths must be in the runner's
|
# ./Caddyfile) resolve; this requires `/mnt/ssd/docker-data` in the runner's
|
||||||
# container.valid_volumes. The central compose references the images as
|
# container.valid_volumes. The central compose references the images as
|
||||||
# registry.mc02.dev/mangalord-*:${MANGALORD_TAG:-latest}, so we only pull
|
# registry.mc02.dev/mangalord-*:${MANGALORD_TAG:-latest}, so we only pull
|
||||||
# and recreate the two mangalord services at the freshly built SHA.
|
# and recreate the two mangalord services at the freshly built SHA.
|
||||||
@@ -134,7 +136,6 @@ jobs:
|
|||||||
image: docker:cli
|
image: docker:cli
|
||||||
volumes:
|
volumes:
|
||||||
- /mnt/ssd/docker-data:/mnt/ssd/docker-data
|
- /mnt/ssd/docker-data:/mnt/ssd/docker-data
|
||||||
- /var/run/docker.sock:/var/run/docker.sock
|
|
||||||
steps:
|
steps:
|
||||||
- name: Deploy to the local stack
|
- name: Deploy to the local stack
|
||||||
working-directory: /mnt/ssd/docker-data
|
working-directory: /mnt/ssd/docker-data
|
||||||
|
|||||||
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.46.0"
|
version = "0.45.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"argon2",
|
"argon2",
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.46.0"
|
version = "0.45.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
default-run = "mangalord"
|
default-run = "mangalord"
|
||||||
|
|
||||||
|
|||||||
@@ -123,18 +123,6 @@ async fn spawn_crawler_daemon(
|
|||||||
}
|
}
|
||||||
let rate = Arc::new(rate);
|
let rate = Arc::new(rate);
|
||||||
|
|
||||||
let tor = crate::crawler::tor::TorController::from_parts(
|
|
||||||
cfg.tor_control_url.as_deref(),
|
|
||||||
cfg.tor_control_password.as_deref(),
|
|
||||||
cfg.tor_control_cookie_path.as_deref(),
|
|
||||||
)
|
|
||||||
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
|
||||||
.map(Arc::new);
|
|
||||||
if let Some(t) = &tor {
|
|
||||||
tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM");
|
|
||||||
}
|
|
||||||
let tor_recircuit_max = cfg.tor_recircuit_max_attempts;
|
|
||||||
|
|
||||||
// Browser manager. on_launch re-injects PHPSESSID on every fresh
|
// Browser manager. on_launch re-injects PHPSESSID on every fresh
|
||||||
// chromium spawn so an idle teardown followed by re-launch stays
|
// chromium spawn so an idle teardown followed by re-launch stays
|
||||||
// authenticated without operator action.
|
// authenticated without operator action.
|
||||||
@@ -147,24 +135,17 @@ async fn spawn_crawler_daemon(
|
|||||||
let sid = sid.clone();
|
let sid = sid.clone();
|
||||||
let domain = domain.clone();
|
let domain = domain.clone();
|
||||||
let start_url = start_url.clone();
|
let start_url = start_url.clone();
|
||||||
let tor_for_launch = tor.as_ref().map(Arc::clone);
|
|
||||||
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
|
let on_launch: browser_manager::OnLaunch = Arc::new(move |browser| {
|
||||||
let sid = sid.clone();
|
let sid = sid.clone();
|
||||||
let domain = domain.clone();
|
let domain = domain.clone();
|
||||||
let start_url = start_url.clone();
|
let start_url = start_url.clone();
|
||||||
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
session::inject_phpsessid(&browser, &sid, &domain)
|
session::inject_phpsessid(&browser, &sid, &domain)
|
||||||
.await
|
.await
|
||||||
.context("on_launch: inject_phpsessid")?;
|
.context("on_launch: inject_phpsessid")?;
|
||||||
session::verify_session_with_recircuit(
|
session::verify_session(&browser, &start_url)
|
||||||
&browser,
|
.await
|
||||||
&start_url,
|
.context("on_launch: verify_session")?;
|
||||||
tor_for_launch.as_deref(),
|
|
||||||
tor_recircuit_max,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context("on_launch: verify_session")?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
@@ -186,7 +167,6 @@ async fn spawn_crawler_daemon(
|
|||||||
start_url: url.clone(),
|
start_url: url.clone(),
|
||||||
download_allowlist: cfg.download_allowlist.clone(),
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
max_image_bytes: cfg.max_image_bytes,
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
tor: tor.as_ref().map(Arc::clone),
|
|
||||||
});
|
});
|
||||||
m
|
m
|
||||||
});
|
});
|
||||||
@@ -199,7 +179,6 @@ async fn spawn_crawler_daemon(
|
|||||||
rate: Arc::clone(&rate),
|
rate: Arc::clone(&rate),
|
||||||
download_allowlist: cfg.download_allowlist.clone(),
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
max_image_bytes: cfg.max_image_bytes,
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
tor: tor.as_ref().map(Arc::clone),
|
|
||||||
});
|
});
|
||||||
|
|
||||||
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
||||||
@@ -253,7 +232,6 @@ struct RealMetadataPass {
|
|||||||
start_url: String,
|
start_url: String,
|
||||||
download_allowlist: DownloadAllowlist,
|
download_allowlist: DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -270,7 +248,6 @@ impl MetadataPass for RealMetadataPass {
|
|||||||
false,
|
false,
|
||||||
&self.download_allowlist,
|
&self.download_allowlist,
|
||||||
self.max_image_bytes,
|
self.max_image_bytes,
|
||||||
self.tor.as_deref(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Err(e) = &result {
|
if let Err(e) = &result {
|
||||||
@@ -290,7 +267,6 @@ struct RealChapterDispatcher {
|
|||||||
rate: Arc<HostRateLimiters>,
|
rate: Arc<HostRateLimiters>,
|
||||||
download_allowlist: DownloadAllowlist,
|
download_allowlist: DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -322,7 +298,6 @@ impl ChapterDispatcher for RealChapterDispatcher {
|
|||||||
false,
|
false,
|
||||||
&self.download_allowlist,
|
&self.download_allowlist,
|
||||||
self.max_image_bytes,
|
self.max_image_bytes,
|
||||||
self.tor.as_deref(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
|
|||||||
@@ -78,21 +78,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let proxy_url = std::env::var("CRAWLER_PROXY")
|
let proxy_url = std::env::var("CRAWLER_PROXY")
|
||||||
.ok()
|
.ok()
|
||||||
.filter(|s| !s.trim().is_empty());
|
.filter(|s| !s.trim().is_empty());
|
||||||
let tor_control_url = std::env::var("CRAWLER_TOR_CONTROL_URL")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty());
|
|
||||||
let tor_control_password = std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty());
|
|
||||||
let tor_control_cookie_path = std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty())
|
|
||||||
.map(std::path::PathBuf::from);
|
|
||||||
let tor_recircuit_max_attempts: u32 = std::env::var("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS")
|
|
||||||
.ok()
|
|
||||||
.and_then(|s| s.parse().ok())
|
|
||||||
.unwrap_or(3)
|
|
||||||
.max(1);
|
|
||||||
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
|
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
|
||||||
|
|
||||||
let db = PgPoolOptions::new()
|
let db = PgPoolOptions::new()
|
||||||
@@ -159,17 +144,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
"starting crawler"
|
"starting crawler"
|
||||||
);
|
);
|
||||||
|
|
||||||
let tor = mangalord::crawler::tor::TorController::from_parts(
|
|
||||||
tor_control_url.as_deref(),
|
|
||||||
tor_control_password.as_deref(),
|
|
||||||
tor_control_cookie_path.as_deref(),
|
|
||||||
)
|
|
||||||
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
|
||||||
.map(Arc::new);
|
|
||||||
if let Some(t) = &tor {
|
|
||||||
tracing::info!(?t, "TOR control configured");
|
|
||||||
}
|
|
||||||
|
|
||||||
// BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium
|
// BrowserManager with idle_timeout = ZERO so the CLI keeps Chromium
|
||||||
// alive for the entire run — same lifecycle as the old direct
|
// alive for the entire run — same lifecycle as the old direct
|
||||||
// `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the
|
// `browser::launch()` flow. on_launch re-injects PHPSESSID + runs the
|
||||||
@@ -179,24 +153,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let sid = sid.clone();
|
let sid = sid.clone();
|
||||||
let domain = domain.clone();
|
let domain = domain.clone();
|
||||||
let start_url_clone = start_url.clone();
|
let start_url_clone = start_url.clone();
|
||||||
let tor_for_launch = tor.as_ref().map(Arc::clone);
|
|
||||||
Arc::new(move |browser| {
|
Arc::new(move |browser| {
|
||||||
let sid = sid.clone();
|
let sid = sid.clone();
|
||||||
let domain = domain.clone();
|
let domain = domain.clone();
|
||||||
let start_url = start_url_clone.clone();
|
let start_url = start_url_clone.clone();
|
||||||
let tor_for_launch = tor_for_launch.as_ref().map(Arc::clone);
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
session::inject_phpsessid(&browser, &sid, &domain)
|
session::inject_phpsessid(&browser, &sid, &domain)
|
||||||
.await
|
.await
|
||||||
.context("inject_phpsessid")?;
|
.context("inject_phpsessid")?;
|
||||||
session::verify_session_with_recircuit(
|
session::verify_session(&browser, &start_url)
|
||||||
&browser,
|
.await
|
||||||
&start_url,
|
.context("verify_session")?;
|
||||||
tor_for_launch.as_deref(),
|
|
||||||
tor_recircuit_max_attempts,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context("verify_session")?;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -220,7 +187,6 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
skip_chapter_content || !session_ready,
|
skip_chapter_content || !session_ready,
|
||||||
chapter_workers,
|
chapter_workers,
|
||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
tor.clone(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -250,7 +216,6 @@ async fn run(
|
|||||||
skip_chapter_content: bool,
|
skip_chapter_content: bool,
|
||||||
chapter_workers: usize,
|
chapter_workers: usize,
|
||||||
force_refetch_chapters: bool,
|
force_refetch_chapters: bool,
|
||||||
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
||||||
if let Some(host) = cdn_host {
|
if let Some(host) = cdn_host {
|
||||||
@@ -302,7 +267,6 @@ async fn run(
|
|||||||
skip_chapters,
|
skip_chapters,
|
||||||
allowlist.as_ref(),
|
allowlist.as_ref(),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
tor.as_deref(),
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
tracing::info!(?stats, "metadata pass complete");
|
tracing::info!(?stats, "metadata pass complete");
|
||||||
@@ -319,7 +283,6 @@ async fn run(
|
|||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
Arc::clone(&allowlist),
|
Arc::clone(&allowlist),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
tor.clone(),
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -345,7 +308,6 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
|
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
|
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
@@ -383,7 +345,6 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
let rate = Arc::clone(&rate);
|
let rate = Arc::clone(&rate);
|
||||||
let manager = Arc::clone(&manager);
|
let manager = Arc::clone(&manager);
|
||||||
let allowlist = Arc::clone(&allowlist);
|
let allowlist = Arc::clone(&allowlist);
|
||||||
let tor = tor.clone();
|
|
||||||
let stats = &stats;
|
let stats = &stats;
|
||||||
async move {
|
async move {
|
||||||
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
@@ -410,7 +371,6 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
force_refetch,
|
force_refetch,
|
||||||
allowlist.as_ref(),
|
allowlist.as_ref(),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
tor.as_deref(),
|
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
|
|||||||
@@ -97,20 +97,6 @@ pub struct CrawlerConfig {
|
|||||||
pub cookie_domain: Option<String>,
|
pub cookie_domain: Option<String>,
|
||||||
pub user_agent: Option<String>,
|
pub user_agent: Option<String>,
|
||||||
pub proxy: Option<String>,
|
pub proxy: Option<String>,
|
||||||
/// `tcp://host:port`, `host:port`, or bare `host` (default port
|
|
||||||
/// 9051). When `None`, TOR-recircuit-on-transient is disabled and
|
|
||||||
/// the crawler behaves identically to pre-TOR releases.
|
|
||||||
pub tor_control_url: Option<String>,
|
|
||||||
/// HashedControlPassword auth. Used only when
|
|
||||||
/// `tor_control_cookie_path` is `None`.
|
|
||||||
pub tor_control_password: Option<String>,
|
|
||||||
/// Cookie-file auth path (e.g.
|
|
||||||
/// `/var/lib/tor/control_auth_cookie`). Takes precedence over
|
|
||||||
/// password when both are set.
|
|
||||||
pub tor_control_cookie_path: Option<PathBuf>,
|
|
||||||
/// Maximum NEWNYM-and-retry cycles per recircuit-eligible failure.
|
|
||||||
/// Defaults to 3.
|
|
||||||
pub tor_recircuit_max_attempts: u32,
|
|
||||||
pub browser: LaunchOptions,
|
pub browser: LaunchOptions,
|
||||||
/// Hosts the crawler is allowed to download images / covers from.
|
/// Hosts the crawler is allowed to download images / covers from.
|
||||||
/// Always seeded with the host of `start_url` and (when set) the
|
/// Always seeded with the host of `start_url` and (when set) the
|
||||||
@@ -138,10 +124,6 @@ impl Default for CrawlerConfig {
|
|||||||
cookie_domain: None,
|
cookie_domain: None,
|
||||||
user_agent: None,
|
user_agent: None,
|
||||||
proxy: None,
|
proxy: None,
|
||||||
tor_control_url: None,
|
|
||||||
tor_control_password: None,
|
|
||||||
tor_control_cookie_path: None,
|
|
||||||
tor_recircuit_max_attempts: 3,
|
|
||||||
browser: LaunchOptions::headless(),
|
browser: LaunchOptions::headless(),
|
||||||
download_allowlist: DownloadAllowlist::new(),
|
download_allowlist: DownloadAllowlist::new(),
|
||||||
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
||||||
@@ -252,18 +234,6 @@ impl CrawlerConfig {
|
|||||||
proxy: std::env::var("CRAWLER_PROXY")
|
proxy: std::env::var("CRAWLER_PROXY")
|
||||||
.ok()
|
.ok()
|
||||||
.filter(|s| !s.trim().is_empty()),
|
.filter(|s| !s.trim().is_empty()),
|
||||||
tor_control_url: std::env::var("CRAWLER_TOR_CONTROL_URL")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty()),
|
|
||||||
tor_control_password: std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty()),
|
|
||||||
tor_control_cookie_path: std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
|
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty())
|
|
||||||
.map(PathBuf::from),
|
|
||||||
tor_recircuit_max_attempts: env_u64("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS", 3)
|
|
||||||
.max(1) as u32,
|
|
||||||
browser: LaunchOptions::from_env(),
|
browser: LaunchOptions::from_env(),
|
||||||
download_allowlist,
|
download_allowlist,
|
||||||
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
||||||
|
|||||||
@@ -73,119 +73,6 @@ pub enum SyncOutcome {
|
|||||||
SessionExpired,
|
SessionExpired,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Per-chapter max fetch attempts when TOR is configured. `N = 3` means
|
|
||||||
/// up to 3 total page fetches with 2 NEWNYM signals between them. When
|
|
||||||
/// TOR is not configured the effective budget collapses to 1 (single
|
|
||||||
/// attempt, no retry, no recircuit — bit-for-bit pre-TOR behavior).
|
|
||||||
const CHAPTER_RECIRCUIT_MAX_ATTEMPTS: u32 = 3;
|
|
||||||
|
|
||||||
/// Outcome of [`fetch_chapter_html_with_recircuit`]. `Ok` carries the
|
|
||||||
/// final reader HTML; the other two map to `sync_chapter_content`'s
|
|
||||||
/// existing failure modes.
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum ChapterFetchOutcome {
|
|
||||||
Ok(String),
|
|
||||||
/// `ChapterProbe::Unauthenticated` after exhausting recircuit
|
|
||||||
/// budget (or with budget=0). Caller returns
|
|
||||||
/// `SyncOutcome::SessionExpired`.
|
|
||||||
SessionExpired,
|
|
||||||
/// `ChapterProbe::Transient` after exhausting recircuit budget
|
|
||||||
/// (or with budget=0). Caller bails so the dispatcher does
|
|
||||||
/// exponential backoff.
|
|
||||||
PersistentTransient,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Single rate-limited Chromium navigation to the chapter URL,
|
|
||||||
/// returning the page HTML. Extracted from `sync_chapter_content` so
|
|
||||||
/// the recircuit loop can call it once per attempt.
|
|
||||||
async fn fetch_chapter_html_once(
|
|
||||||
browser: &chromiumoxide::Browser,
|
|
||||||
rate: &HostRateLimiters,
|
|
||||||
source_url: &str,
|
|
||||||
) -> anyhow::Result<String> {
|
|
||||||
rate.wait_for(source_url).await?;
|
|
||||||
let page = browser
|
|
||||||
.new_page(source_url)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("open chapter page {source_url}"))?;
|
|
||||||
crate::crawler::nav::wait_for_nav(&page)
|
|
||||||
.await
|
|
||||||
.context("wait for chapter nav")?;
|
|
||||||
// Best-effort wait for the reader marker — same partial-render
|
|
||||||
// race that bit the chapter-list parser can hit here. Timeout is
|
|
||||||
// not an error; the chapter probe + parser sentinels still catch
|
|
||||||
// real failures.
|
|
||||||
let _ = crate::crawler::nav::wait_for_selector(
|
|
||||||
&page,
|
|
||||||
"a#pic_container",
|
|
||||||
crate::crawler::nav::SELECTOR_TIMEOUT,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let html = page.content().await.context("read chapter html")?;
|
|
||||||
page.close().await.ok();
|
|
||||||
Ok(html)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Pure-over-IO loop: fetch + classify, up to `max_attempts` total
|
|
||||||
/// fetches. Between attempts, `recircuit` is invoked (a no-op when
|
|
||||||
/// TOR isn't configured). `max_attempts = 1` collapses to the
|
|
||||||
/// original single-shot behavior — `Unauthenticated` →
|
|
||||||
/// `SessionExpired`, `Transient` → `PersistentTransient` on the first
|
|
||||||
/// hit, no recircuit.
|
|
||||||
///
|
|
||||||
/// Semantics match [`crate::crawler::detect::retry_on_transient`] and
|
|
||||||
/// [`run_session_probe_loop`]: `N` is **total attempts including the
|
|
||||||
/// first**, so `N = 3` means 3 fetches and up to 2 NEWNYM calls.
|
|
||||||
/// `Unauthenticated` and `Transient` share the budget — the loop
|
|
||||||
/// doesn't distinguish, so a sequence like Transient → Unauth → Ok
|
|
||||||
/// counts as 3 attempts.
|
|
||||||
async fn fetch_chapter_html_with_recircuit<F, Fut, R, RFut>(
|
|
||||||
mut fetch: F,
|
|
||||||
mut recircuit: R,
|
|
||||||
max_attempts: u32,
|
|
||||||
source_url_for_msg: &str,
|
|
||||||
) -> anyhow::Result<ChapterFetchOutcome>
|
|
||||||
where
|
|
||||||
F: FnMut() -> Fut,
|
|
||||||
Fut: std::future::Future<Output = anyhow::Result<String>>,
|
|
||||||
R: FnMut() -> RFut,
|
|
||||||
RFut: std::future::Future<Output = ()>,
|
|
||||||
{
|
|
||||||
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
|
|
||||||
let mut attempt = 0u32;
|
|
||||||
loop {
|
|
||||||
attempt += 1;
|
|
||||||
let html = fetch().await?;
|
|
||||||
match session::classify_chapter_probe(&html) {
|
|
||||||
ChapterProbe::Ok => return Ok(ChapterFetchOutcome::Ok(html)),
|
|
||||||
ChapterProbe::Unauthenticated => {
|
|
||||||
if attempt >= max_attempts {
|
|
||||||
return Ok(ChapterFetchOutcome::SessionExpired);
|
|
||||||
}
|
|
||||||
tracing::warn!(
|
|
||||||
attempt,
|
|
||||||
max = max_attempts,
|
|
||||||
url = source_url_for_msg,
|
|
||||||
"chapter probe Unauthenticated; signaling TOR NEWNYM and retrying"
|
|
||||||
);
|
|
||||||
recircuit().await;
|
|
||||||
}
|
|
||||||
ChapterProbe::Transient => {
|
|
||||||
if attempt >= max_attempts {
|
|
||||||
return Ok(ChapterFetchOutcome::PersistentTransient);
|
|
||||||
}
|
|
||||||
tracing::warn!(
|
|
||||||
attempt,
|
|
||||||
max = max_attempts,
|
|
||||||
url = source_url_for_msg,
|
|
||||||
"chapter probe Transient; signaling TOR NEWNYM and retrying"
|
|
||||||
);
|
|
||||||
recircuit().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch all images for one chapter and persist them atomically. On
|
/// Fetch all images for one chapter and persist them atomically. On
|
||||||
/// any error after the first storage put, the DB transaction rolls
|
/// any error after the first storage put, the DB transaction rolls
|
||||||
/// back so the chapter stays at `page_count = 0` and is retried on the
|
/// back so the chapter stays at `page_count = 0` and is retried on the
|
||||||
@@ -204,7 +91,6 @@ pub async fn sync_chapter_content(
|
|||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
allowlist: &DownloadAllowlist,
|
allowlist: &DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
tor: Option<&crate::crawler::tor::TorController>,
|
|
||||||
) -> anyhow::Result<SyncOutcome> {
|
) -> anyhow::Result<SyncOutcome> {
|
||||||
// Skip if already fetched, unless caller explicitly forces.
|
// Skip if already fetched, unless caller explicitly forces.
|
||||||
if !force_refetch {
|
if !force_refetch {
|
||||||
@@ -219,37 +105,47 @@ pub async fn sync_chapter_content(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch + classify. With TOR configured, allow up to
|
// Nav to chapter page (rate-limited per host).
|
||||||
// CHAPTER_RECIRCUIT_MAX_ATTEMPTS total page fetches with NEWNYM
|
rate.wait_for(source_url).await?;
|
||||||
// between each. Without TOR, collapse to 1 attempt (no retry, no
|
let page = browser
|
||||||
// recircuit) — matches the pre-TOR single-shot behavior bit-for-bit.
|
.new_page(source_url)
|
||||||
let max_attempts = if tor.is_some() { CHAPTER_RECIRCUIT_MAX_ATTEMPTS } else { 1 };
|
.await
|
||||||
let html = match fetch_chapter_html_with_recircuit(
|
.with_context(|| format!("open chapter page {source_url}"))?;
|
||||||
|| fetch_chapter_html_once(browser, rate, source_url),
|
crate::crawler::nav::wait_for_nav(&page)
|
||||||
|| async {
|
.await
|
||||||
if let Some(t) = tor {
|
.context("wait for chapter nav")?;
|
||||||
if let Err(e) = t.new_identity().await {
|
// Best-effort wait for the reader marker — same partial-render
|
||||||
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
|
// race that bit the chapter-list parser can hit here. Timeout is
|
||||||
}
|
// not an error; the chapter probe + parser sentinels still catch
|
||||||
}
|
// real failures.
|
||||||
},
|
let _ = crate::crawler::nav::wait_for_selector(
|
||||||
max_attempts,
|
&page,
|
||||||
source_url,
|
"a#pic_container",
|
||||||
|
crate::crawler::nav::SELECTOR_TIMEOUT,
|
||||||
)
|
)
|
||||||
.await?
|
.await;
|
||||||
{
|
|
||||||
ChapterFetchOutcome::Ok(html) => html,
|
let html = page.content().await.context("read chapter html")?;
|
||||||
ChapterFetchOutcome::SessionExpired => return Ok(SyncOutcome::SessionExpired),
|
page.close().await.ok();
|
||||||
ChapterFetchOutcome::PersistentTransient => {
|
|
||||||
|
// Three-way session classification: distinguishes a transient
|
||||||
|
// hiccup (broken-page body or logged-in-but-no-reader) from a
|
||||||
|
// genuine PHPSESSID expiry (no reader and no avatar widget). The
|
||||||
|
// earlier binary `#avatar_menu` check conflated both and froze
|
||||||
|
// every worker on a layout shift.
|
||||||
|
match session::classify_chapter_probe(&html) {
|
||||||
|
ChapterProbe::Unauthenticated => return Ok(SyncOutcome::SessionExpired),
|
||||||
|
ChapterProbe::Transient => {
|
||||||
// Surface as a typed Err so the dispatcher path runs
|
// Surface as a typed Err so the dispatcher path runs
|
||||||
// ack_failed with exponential backoff (rather than the
|
// ack_failed with exponential backoff (rather than the
|
||||||
// session-expired sticky flag).
|
// session-expired sticky flag).
|
||||||
anyhow::bail!(
|
anyhow::bail!(
|
||||||
"chapter page at {source_url} returned a transient response after \
|
"chapter page at {source_url} returned a transient response \
|
||||||
{max_attempts} attempt(s); will retry"
|
(broken-page body or reader didn't render); will retry"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
ChapterProbe::Ok => {}
|
||||||
|
}
|
||||||
|
|
||||||
let images = parse_chapter_pages(&html)
|
let images = parse_chapter_pages(&html)
|
||||||
.with_context(|| format!("parse chapter pages at {source_url}"))?;
|
.with_context(|| format!("parse chapter pages at {source_url}"))?;
|
||||||
@@ -408,214 +304,4 @@ mod tests {
|
|||||||
let err = parse_chapter_pages(html).expect_err("expected Transient");
|
let err = parse_chapter_pages(html).expect_err("expected Transient");
|
||||||
assert!(err.is_transient(), "got non-transient: {err}");
|
assert!(err.is_transient(), "got non-transient: {err}");
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- fetch_chapter_html_with_recircuit -------------------------------
|
|
||||||
|
|
||||||
const OK_HTML: &str = r#"<html><body><a id="pic_container"><img id="page1" src="x"/></a></body></html>"#;
|
|
||||||
const UNAUTH_HTML: &str = r#"<html><body><header><div id="logo">x</div></header><main>please log in</main></body></html>"#;
|
|
||||||
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_ok_first_attempt() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetches = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetches += 1;
|
|
||||||
async { Ok(OK_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
|
||||||
assert_eq!(fetches, 1);
|
|
||||||
assert_eq!(recircuits, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_unauth_with_single_attempt_returns_session_expired() {
|
|
||||||
// max_attempts=1 = TOR disabled, fail-fast on first Unauthenticated.
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetches = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetches += 1;
|
|
||||||
async { Ok(UNAUTH_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
1,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok-result");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
|
|
||||||
assert_eq!(fetches, 1);
|
|
||||||
assert_eq!(recircuits, 0, "no recircuit when budget is 1 (TOR disabled)");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_unauth_then_ok_within_budget() {
|
|
||||||
// max_attempts=3 = up to 3 fetches with 2 recircuits between.
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
let n = fetch_n;
|
|
||||||
async move {
|
|
||||||
if n == 1 {
|
|
||||||
Ok(UNAUTH_HTML.to_string())
|
|
||||||
} else {
|
|
||||||
Ok(OK_HTML.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
|
||||||
assert_eq!(fetch_n, 2);
|
|
||||||
assert_eq!(recircuits, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_unauth_exhausts_budget_returns_session_expired() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
async { Ok(UNAUTH_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok-result");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::SessionExpired));
|
|
||||||
assert_eq!(fetch_n, 3, "max_attempts=3 → 3 fetches total");
|
|
||||||
assert_eq!(recircuits, 2, "2 recircuits between 3 fetches");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_transient_then_ok_within_budget() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
let n = fetch_n;
|
|
||||||
async move {
|
|
||||||
if n < 3 {
|
|
||||||
Ok(TRANSIENT_HTML.to_string())
|
|
||||||
} else {
|
|
||||||
Ok(OK_HTML.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
|
||||||
assert_eq!(fetch_n, 3);
|
|
||||||
assert_eq!(recircuits, 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_transient_exhausts_budget_returns_persistent() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
async { Ok(TRANSIENT_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok-result");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::PersistentTransient));
|
|
||||||
assert_eq!(fetch_n, 3, "max_attempts=3 → 3 fetches total");
|
|
||||||
assert_eq!(recircuits, 2, "2 recircuits between 3 fetches");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_mixed_transient_then_unauth_then_ok_shares_budget() {
|
|
||||||
// Audit-prompted regression: outcomes share the attempt counter.
|
|
||||||
// Sequence: Transient (attempt 1) → Unauth (attempt 2) → Ok (3).
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let outcome = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
let n = fetch_n;
|
|
||||||
async move {
|
|
||||||
match n {
|
|
||||||
1 => Ok(TRANSIENT_HTML.to_string()),
|
|
||||||
2 => Ok(UNAUTH_HTML.to_string()),
|
|
||||||
_ => Ok(OK_HTML.to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok");
|
|
||||||
assert!(matches!(outcome, ChapterFetchOutcome::Ok(_)));
|
|
||||||
assert_eq!(fetch_n, 3);
|
|
||||||
assert_eq!(recircuits, 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn recircuit_loop_propagates_fetch_errors() {
|
|
||||||
let mut fetch_n = 0u32;
|
|
||||||
let err = fetch_chapter_html_with_recircuit(
|
|
||||||
|| {
|
|
||||||
fetch_n += 1;
|
|
||||||
async { Err(anyhow::anyhow!("nav timeout")) }
|
|
||||||
},
|
|
||||||
|| async {},
|
|
||||||
3,
|
|
||||||
"https://example/c",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect_err("fetch error bubbles");
|
|
||||||
assert_eq!(fetch_n, 1);
|
|
||||||
assert!(format!("{err:#}").contains("nav timeout"));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,36 +80,13 @@ pub fn has_logo_sentinel(doc: &scraper::Html) -> bool {
|
|||||||
/// caller can fall back on the job system's retry/backoff once the
|
/// caller can fall back on the job system's retry/backoff once the
|
||||||
/// inline budget is exhausted.
|
/// inline budget is exhausted.
|
||||||
pub async fn retry_on_transient<F, Fut, T>(
|
pub async fn retry_on_transient<F, Fut, T>(
|
||||||
op: F,
|
|
||||||
max_attempts: u32,
|
|
||||||
delay: Duration,
|
|
||||||
) -> Result<T, PageError>
|
|
||||||
where
|
|
||||||
F: FnMut() -> Fut,
|
|
||||||
Fut: Future<Output = Result<T, PageError>>,
|
|
||||||
{
|
|
||||||
retry_on_transient_with_hook(op, max_attempts, delay, || async {}).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Like [`retry_on_transient`] but invokes `on_retry` between a
|
|
||||||
/// transient failure and the subsequent sleep+retry. The hook does
|
|
||||||
/// **not** fire on the first attempt, after a non-transient error, or
|
|
||||||
/// after the final attempt (no retry follows). Hook failures are not
|
|
||||||
/// propagated — return `()` from the future and log inside if needed.
|
|
||||||
///
|
|
||||||
/// Wire the TOR controller's `new_identity` here to rotate circuits
|
|
||||||
/// between page-fetch retries; see [`crate::crawler::tor`].
|
|
||||||
pub async fn retry_on_transient_with_hook<F, Fut, T, H, HFut>(
|
|
||||||
mut op: F,
|
mut op: F,
|
||||||
max_attempts: u32,
|
max_attempts: u32,
|
||||||
delay: Duration,
|
delay: Duration,
|
||||||
mut on_retry: H,
|
|
||||||
) -> Result<T, PageError>
|
) -> Result<T, PageError>
|
||||||
where
|
where
|
||||||
F: FnMut() -> Fut,
|
F: FnMut() -> Fut,
|
||||||
Fut: Future<Output = Result<T, PageError>>,
|
Fut: Future<Output = Result<T, PageError>>,
|
||||||
H: FnMut() -> HFut,
|
|
||||||
HFut: Future<Output = ()>,
|
|
||||||
{
|
{
|
||||||
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
|
debug_assert!(max_attempts >= 1, "max_attempts must be at least 1");
|
||||||
let mut attempt = 0u32;
|
let mut attempt = 0u32;
|
||||||
@@ -124,9 +101,8 @@ where
|
|||||||
attempt,
|
attempt,
|
||||||
max_attempts,
|
max_attempts,
|
||||||
error = %e,
|
error = %e,
|
||||||
"transient error; running on-retry hook and sleeping before retry"
|
"transient error; sleeping before retry"
|
||||||
);
|
);
|
||||||
on_retry().await;
|
|
||||||
tokio::time::sleep(delay).await;
|
tokio::time::sleep(delay).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -271,92 +247,4 @@ mod tests {
|
|||||||
assert_eq!(result.unwrap(), 7);
|
assert_eq!(result.unwrap(), 7);
|
||||||
assert_eq!(attempt, 1);
|
assert_eq!(attempt, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn hook_fires_once_between_transient_and_success() {
|
|
||||||
let mut attempt = 0u32;
|
|
||||||
let mut hook_calls = 0u32;
|
|
||||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
|
||||||
|| {
|
|
||||||
attempt += 1;
|
|
||||||
let n = attempt;
|
|
||||||
async move {
|
|
||||||
if n < 2 {
|
|
||||||
Err(PageError::transient("once"))
|
|
||||||
} else {
|
|
||||||
Ok(99)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
5,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
|| {
|
|
||||||
hook_calls += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(result.unwrap(), 99);
|
|
||||||
assert_eq!(attempt, 2);
|
|
||||||
assert_eq!(hook_calls, 1, "hook fires exactly once between attempts");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn hook_does_not_fire_when_first_attempt_succeeds() {
|
|
||||||
let mut hook_calls = 0u32;
|
|
||||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
|
||||||
|| async { Ok(1) },
|
|
||||||
5,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
|| {
|
|
||||||
hook_calls += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert!(result.is_ok());
|
|
||||||
assert_eq!(hook_calls, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn hook_does_not_fire_after_non_transient_error() {
|
|
||||||
let mut hook_calls = 0u32;
|
|
||||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
|
||||||
|| async { Err(PageError::Other(anyhow::anyhow!("permanent"))) },
|
|
||||||
5,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
|| {
|
|
||||||
hook_calls += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert!(result.is_err());
|
|
||||||
assert_eq!(hook_calls, 0, "non-transient must short-circuit before hook");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn hook_does_not_fire_after_final_failed_attempt() {
|
|
||||||
// With max_attempts=3 and three persistent transients, the hook
|
|
||||||
// should run twice (between 1→2 and 2→3) — never a third time,
|
|
||||||
// because no retry follows attempt 3.
|
|
||||||
let mut attempt = 0u32;
|
|
||||||
let mut hook_calls = 0u32;
|
|
||||||
let result: Result<i32, PageError> = retry_on_transient_with_hook(
|
|
||||||
|| {
|
|
||||||
attempt += 1;
|
|
||||||
async { Err(PageError::transient("always")) }
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
|| {
|
|
||||||
hook_calls += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert!(result.is_err());
|
|
||||||
assert_eq!(attempt, 3);
|
|
||||||
assert_eq!(hook_calls, 2, "hook fires N-1 times for N attempts that all fail transient");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,5 +26,4 @@ pub mod rate_limit;
|
|||||||
pub mod safety;
|
pub mod safety;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
pub mod source;
|
pub mod source;
|
||||||
pub mod tor;
|
|
||||||
pub mod url_utils;
|
pub mod url_utils;
|
||||||
|
|||||||
@@ -103,7 +103,6 @@ pub async fn run_metadata_pass(
|
|||||||
skip_chapters: bool,
|
skip_chapters: bool,
|
||||||
allowlist: &DownloadAllowlist,
|
allowlist: &DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
tor: Option<&crate::crawler::tor::TorController>,
|
|
||||||
) -> anyhow::Result<MetadataStats> {
|
) -> anyhow::Result<MetadataStats> {
|
||||||
let lease = browser_manager
|
let lease = browser_manager
|
||||||
.acquire()
|
.acquire()
|
||||||
@@ -122,7 +121,6 @@ pub async fn run_metadata_pass(
|
|||||||
let ctx = FetchContext {
|
let ctx = FetchContext {
|
||||||
browser: browser_ref,
|
browser: browser_ref,
|
||||||
rate,
|
rate,
|
||||||
tor,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let source_id = source.id();
|
let source_id = source.id();
|
||||||
|
|||||||
@@ -162,123 +162,37 @@ const PROBE_RETRY_DELAY: Duration = Duration::from_secs(2);
|
|||||||
/// limiter. The trade is worth it — failing here costs ~1s; failing 30
|
/// limiter. The trade is worth it — failing here costs ~1s; failing 30
|
||||||
/// minutes into a backfill costs 30 minutes.
|
/// minutes into a backfill costs 30 minutes.
|
||||||
pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> {
|
pub async fn verify_session(browser: &Browser, probe_url: &str) -> anyhow::Result<()> {
|
||||||
verify_session_with_recircuit(browser, probe_url, None, 0).await
|
let mut attempt = 0u32;
|
||||||
}
|
|
||||||
|
|
||||||
/// Like [`verify_session`] but, when `tor` is `Some`, signals
|
|
||||||
/// `SIGNAL NEWNYM` between retries on transient pages AND treats
|
|
||||||
/// `Unauthenticated` as recoverable (up to `tor_max_attempts` total
|
|
||||||
/// probes, calling NEWNYM between each).
|
|
||||||
///
|
|
||||||
/// `verify_session` is `verify_session_with_recircuit(..., None, _)`,
|
|
||||||
/// which collapses the `Unauthenticated` budget to 1 attempt — i.e.
|
|
||||||
/// fail-fast, exactly the pre-TOR behavior.
|
|
||||||
pub async fn verify_session_with_recircuit(
|
|
||||||
browser: &Browser,
|
|
||||||
probe_url: &str,
|
|
||||||
tor: Option<&crate::crawler::tor::TorController>,
|
|
||||||
tor_max_attempts: u32,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let unauth_max_attempts = if tor.is_some() { tor_max_attempts.max(1) } else { 1 };
|
|
||||||
run_session_probe_loop(
|
|
||||||
|| fetch_probe_html(browser, probe_url),
|
|
||||||
|| async {
|
|
||||||
if let Some(t) = tor {
|
|
||||||
if let Err(e) = t.new_identity().await {
|
|
||||||
tracing::warn!(error = %e, "TOR NEWNYM failed; continuing with same circuit");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
PROBE_MAX_ATTEMPTS,
|
|
||||||
unauth_max_attempts,
|
|
||||||
PROBE_RETRY_DELAY,
|
|
||||||
probe_url,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Pure-over-IO loop body for the session probe. Generic over the
|
|
||||||
/// fetch and recircuit closures so it can be unit-tested without a
|
|
||||||
/// real browser or TOR daemon.
|
|
||||||
///
|
|
||||||
/// Both budgets count **total attempts**, including the first — so
|
|
||||||
/// `transient_max_attempts = 3` allows 3 fetches and 2 recircuits
|
|
||||||
/// between them, and `unauth_max_attempts = 1` means "fail-fast, no
|
|
||||||
/// retry". This matches [`crate::crawler::detect::retry_on_transient`]
|
|
||||||
/// and the content-path recircuit loop.
|
|
||||||
///
|
|
||||||
/// Outcomes:
|
|
||||||
/// - `SessionProbe::Ok` → return `Ok(())`.
|
|
||||||
/// - `SessionProbe::Unauthenticated` → recircuit + retry while
|
|
||||||
/// under the unauth budget. After the cap, bail with the
|
|
||||||
/// "PHPSESSID expired" diagnostic, mentioning the attempt count so
|
|
||||||
/// a TOR-misconfig diagnosis is easier.
|
|
||||||
/// - `SessionProbe::Transient` → same shape against the transient
|
|
||||||
/// budget; bails with "site down or rate-limiting" after the cap.
|
|
||||||
async fn run_session_probe_loop<F, Fut, R, RFut>(
|
|
||||||
mut fetch_html: F,
|
|
||||||
mut recircuit: R,
|
|
||||||
transient_max_attempts: u32,
|
|
||||||
unauth_max_attempts: u32,
|
|
||||||
retry_delay: Duration,
|
|
||||||
probe_url_for_msg: &str,
|
|
||||||
) -> anyhow::Result<()>
|
|
||||||
where
|
|
||||||
F: FnMut() -> Fut,
|
|
||||||
Fut: std::future::Future<Output = anyhow::Result<String>>,
|
|
||||||
R: FnMut() -> RFut,
|
|
||||||
RFut: std::future::Future<Output = ()>,
|
|
||||||
{
|
|
||||||
debug_assert!(transient_max_attempts >= 1);
|
|
||||||
debug_assert!(unauth_max_attempts >= 1);
|
|
||||||
let mut transient_attempts = 0u32;
|
|
||||||
let mut unauth_attempts = 0u32;
|
|
||||||
loop {
|
loop {
|
||||||
let html = fetch_html().await?;
|
attempt += 1;
|
||||||
|
let html = fetch_probe_html(browser, probe_url).await?;
|
||||||
match classify_probe(&html) {
|
match classify_probe(&html) {
|
||||||
SessionProbe::Ok => {
|
SessionProbe::Ok => {
|
||||||
tracing::info!(
|
tracing::info!(attempt, "session probe ok — #logo + #avatar_menu present");
|
||||||
transient_attempts,
|
|
||||||
unauth_attempts,
|
|
||||||
"session probe ok — #logo + #avatar_menu present"
|
|
||||||
);
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
SessionProbe::Unauthenticated => {
|
SessionProbe::Unauthenticated => {
|
||||||
unauth_attempts += 1;
|
return Err(anyhow!(
|
||||||
if unauth_attempts >= unauth_max_attempts {
|
"session probe failed — #avatar_menu not present at {probe_url} \
|
||||||
return Err(anyhow!(
|
(page rendered the normal layout); PHPSESSID is missing, expired, \
|
||||||
"session probe failed — #avatar_menu not present at {probe_url_for_msg} \
|
or revoked. Refresh CRAWLER_PHPSESSID and re-run."
|
||||||
after {unauth_attempts} attempt(s); PHPSESSID is missing, \
|
));
|
||||||
expired, or revoked. Refresh CRAWLER_PHPSESSID and re-run."
|
}
|
||||||
));
|
SessionProbe::Transient if attempt < PROBE_MAX_ATTEMPTS => {
|
||||||
}
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
attempt = unauth_attempts,
|
attempt,
|
||||||
max_attempts = unauth_max_attempts,
|
max_attempts = PROBE_MAX_ATTEMPTS,
|
||||||
"session probe Unauthenticated despite PHPSESSID; signaling TOR \
|
"session probe got a transient page; retrying"
|
||||||
NEWNYM and retrying"
|
|
||||||
);
|
);
|
||||||
recircuit().await;
|
tokio::time::sleep(PROBE_RETRY_DELAY).await;
|
||||||
tokio::time::sleep(retry_delay).await;
|
|
||||||
}
|
}
|
||||||
SessionProbe::Transient => {
|
SessionProbe::Transient => {
|
||||||
transient_attempts += 1;
|
return Err(anyhow!(
|
||||||
if transient_attempts >= transient_max_attempts {
|
"session probe failed — probe page at {probe_url} returned a \
|
||||||
return Err(anyhow!(
|
broken-page response after {PROBE_MAX_ATTEMPTS} attempts. \
|
||||||
"session probe failed — probe page at {probe_url_for_msg} returned \
|
The site appears to be down or rate-limiting us; try again \
|
||||||
a broken-page response after {transient_max_attempts} attempts. \
|
later before refreshing CRAWLER_PHPSESSID."
|
||||||
The site appears to be down or rate-limiting us; try again \
|
));
|
||||||
later before refreshing CRAWLER_PHPSESSID."
|
|
||||||
));
|
|
||||||
}
|
|
||||||
tracing::warn!(
|
|
||||||
attempt = transient_attempts,
|
|
||||||
max_attempts = transient_max_attempts,
|
|
||||||
"session probe got a transient page; recircuit + retry"
|
|
||||||
);
|
|
||||||
recircuit().await;
|
|
||||||
tokio::time::sleep(retry_delay).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -422,204 +336,6 @@ mod tests {
|
|||||||
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
|
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- run_session_probe_loop -----------------------------------------
|
|
||||||
//
|
|
||||||
// These tests exercise the recircuit-aware loop without a real
|
|
||||||
// browser. The fetch and recircuit closures are mocked over Vecs of
|
|
||||||
// canned outcomes / counters.
|
|
||||||
|
|
||||||
const OK_HTML: &str = r#"<html><body><div id="logo"></div><div id="avatar_menu"></div></body></html>"#;
|
|
||||||
const UNAUTH_HTML: &str = r#"<html><body><div id="logo"></div></body></html>"#;
|
|
||||||
const TRANSIENT_HTML: &str = "<html><body><p>we're sorry, the request file are not found.</p></body></html>";
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_ok_on_first_attempt_does_not_recircuit() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut fetched = 0u32;
|
|
||||||
run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
fetched += 1;
|
|
||||||
async { Ok(OK_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
3,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok on first attempt");
|
|
||||||
assert_eq!(fetched, 1);
|
|
||||||
assert_eq!(recircuits, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_unauth_then_ok_when_attempt_budget_available() {
|
|
||||||
// Budget = 3 total attempts. Unauth on call 1, ok on call 2.
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut call = 0u32;
|
|
||||||
run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
let n = call;
|
|
||||||
async move {
|
|
||||||
if n == 1 {
|
|
||||||
Ok(UNAUTH_HTML.to_string())
|
|
||||||
} else {
|
|
||||||
Ok(OK_HTML.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
3,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("recovers after one recircuit");
|
|
||||||
assert_eq!(call, 2);
|
|
||||||
assert_eq!(recircuits, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_unauth_with_single_attempt_budget_fails_fast() {
|
|
||||||
// Budget = 1 total attempt = no retry (matches no-TOR behavior).
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut call = 0u32;
|
|
||||||
let err = run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
async { Ok(UNAUTH_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
1,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect_err("budget=1 → fail-fast");
|
|
||||||
assert_eq!(call, 1, "no retry when budget is 1");
|
|
||||||
assert_eq!(recircuits, 0);
|
|
||||||
let msg = format!("{err:#}");
|
|
||||||
assert!(msg.contains("Refresh CRAWLER_PHPSESSID"), "msg: {msg}");
|
|
||||||
assert!(msg.contains("after 1 attempt"), "expected attempt count in msg: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_unauth_after_exhausting_budget_emits_attempt_count() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut call = 0u32;
|
|
||||||
let err = run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
async { Ok(UNAUTH_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
10, // transient budget irrelevant here
|
|
||||||
3, // 3 attempts total, 2 recircuits between
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect_err("exhausts unauth budget");
|
|
||||||
assert_eq!(call, 3);
|
|
||||||
assert_eq!(recircuits, 2);
|
|
||||||
let msg = format!("{err:#}");
|
|
||||||
assert!(msg.contains("after 3 attempt"), "expected attempt count in error, got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_transient_repeats_until_max_then_errors() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut call = 0u32;
|
|
||||||
let err = run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
async { Ok(TRANSIENT_HTML.to_string()) }
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
1,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect_err("transient until max → fail");
|
|
||||||
assert_eq!(call, 3);
|
|
||||||
// Recircuit fires between attempts: 3 attempts → 2 recircuits.
|
|
||||||
assert_eq!(recircuits, 2);
|
|
||||||
let msg = format!("{err:#}");
|
|
||||||
assert!(msg.contains("broken-page response after 3 attempts"), "msg: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_transient_then_ok_returns_ok_after_one_recircuit() {
|
|
||||||
let mut recircuits = 0u32;
|
|
||||||
let mut call = 0u32;
|
|
||||||
run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
let n = call;
|
|
||||||
async move {
|
|
||||||
if n == 1 {
|
|
||||||
Ok(TRANSIENT_HTML.to_string())
|
|
||||||
} else {
|
|
||||||
Ok(OK_HTML.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|| {
|
|
||||||
recircuits += 1;
|
|
||||||
async {}
|
|
||||||
},
|
|
||||||
3,
|
|
||||||
1,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect("ok on second try");
|
|
||||||
assert_eq!(call, 2);
|
|
||||||
assert_eq!(recircuits, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn probe_loop_propagates_fetch_errors_immediately() {
|
|
||||||
let mut call = 0u32;
|
|
||||||
let err = run_session_probe_loop(
|
|
||||||
|| {
|
|
||||||
call += 1;
|
|
||||||
async { Err(anyhow!("nav timeout")) }
|
|
||||||
},
|
|
||||||
|| async {},
|
|
||||||
5,
|
|
||||||
5,
|
|
||||||
Duration::from_millis(0),
|
|
||||||
"https://example/probe",
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.expect_err("fetch error bubbles");
|
|
||||||
assert_eq!(call, 1);
|
|
||||||
assert!(format!("{err:#}").contains("nav timeout"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
|
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
|
||||||
// Defensive: if a broken-page body somehow contains an
|
// Defensive: if a broken-page body somehow contains an
|
||||||
|
|||||||
@@ -67,10 +67,6 @@ pub struct SourceChapter {
|
|||||||
pub struct FetchContext<'a> {
|
pub struct FetchContext<'a> {
|
||||||
pub browser: &'a Browser,
|
pub browser: &'a Browser,
|
||||||
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
||||||
/// Optional TOR control-port client. When `Some`, retry helpers
|
|
||||||
/// signal `NEWNYM` between transient-page attempts so the next try
|
|
||||||
/// draws a fresh exit. `None` keeps pre-TOR behavior.
|
|
||||||
pub tor: Option<&'a crate::crawler::tor::TorController>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lazy iterator over discovered manga refs. The caller drives the
|
/// Lazy iterator over discovered manga refs. The caller drives the
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
//! (`td:has(label:contains("Author:"))`) are implemented by walking
|
//! (`td:has(label:contains("Author:"))`) are implemented by walking
|
||||||
//! the parsed tree.
|
//! the parsed tree.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
@@ -18,7 +19,7 @@ use super::{
|
|||||||
SourceMangaRef,
|
SourceMangaRef,
|
||||||
};
|
};
|
||||||
use crate::crawler::detect::{
|
use crate::crawler::detect::{
|
||||||
has_logo_sentinel, is_broken_page_body, retry_on_transient_with_hook, PageError,
|
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
|
||||||
};
|
};
|
||||||
use crate::crawler::nav::{wait_for_nav, wait_for_selector, NavError, SELECTOR_TIMEOUT};
|
use crate::crawler::nav::{wait_for_nav, wait_for_selector, NavError, SELECTOR_TIMEOUT};
|
||||||
|
|
||||||
@@ -74,24 +75,33 @@ impl Source for TargetSource {
|
|||||||
&self,
|
&self,
|
||||||
ctx: &FetchContext<'_>,
|
ctx: &FetchContext<'_>,
|
||||||
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
|
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
|
||||||
// Probe page 1 up front (with transient retry) for two reasons:
|
// Always visit page 1 first because that's the only way to
|
||||||
// a broken first page should abort cleanly rather than mid-walk,
|
// discover `last_page`. Retry it on transient — a broken first
|
||||||
// and the HTML is handed straight to the first `next_batch` call
|
// page would otherwise abort the whole walk before we've even
|
||||||
// so the walker doesn't re-fetch it. Page count is discovered
|
// started.
|
||||||
// incrementally — see `TargetSourceWalker::next_batch`.
|
let first_html = retry_on_transient(
|
||||||
let first_html = retry_on_transient_with_hook(
|
|
||||||
|| async {
|
|| async {
|
||||||
navigate(ctx, self.base_url.as_str(), LIST_PAGE_MARKER).await
|
navigate(ctx, self.base_url.as_str(), LIST_PAGE_MARKER).await
|
||||||
},
|
},
|
||||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
PAGE_TRANSIENT_RETRY_DELAY,
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|| async { recircuit_if_configured(ctx.tor).await },
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
let last_page = {
|
||||||
|
let doc = scraper::Html::parse_document(&first_html);
|
||||||
|
parse_last_page(&doc)
|
||||||
|
};
|
||||||
|
|
||||||
|
let order = build_page_order(last_page);
|
||||||
|
tracing::info!(
|
||||||
|
last_page = ?last_page,
|
||||||
|
page_count = order.len(),
|
||||||
|
"walking pagination"
|
||||||
|
);
|
||||||
|
|
||||||
Ok(Box::new(TargetSourceWalker {
|
Ok(Box::new(TargetSourceWalker {
|
||||||
base_url: self.base_url.clone(),
|
base_url: self.base_url.clone(),
|
||||||
next_page: 1,
|
pages_remaining: order,
|
||||||
first_page_html: Some(first_html),
|
first_page_html: Some(first_html),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
@@ -137,19 +147,24 @@ impl Source for TargetSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Walker returned by [`TargetSource::discover`]. Walks pages `1..` in
|
/// Build the queue of page numbers `TargetSource::discover` will walk.
|
||||||
/// order, terminating as soon as a page renders cleanly with zero entries
|
/// The site orders by `update_date DESC`, so newest-first is just the
|
||||||
/// — that's the "we ran off the end of the index" signal. Page 1's HTML
|
/// natural page order: `1..=last`. If `last_page` is unknown (source
|
||||||
/// is cached at construction time (discover already had to fetch it for
|
/// surfaces no pagination) only page 1 is visited.
|
||||||
/// the transient probe) so the first batch doesn't re-fetch.
|
fn build_page_order(last_page: Option<i32>) -> VecDeque<i32> {
|
||||||
///
|
match last_page {
|
||||||
/// A genuinely empty `Ok(vec![])` from `parse_manga_list_from` is what
|
None => VecDeque::from([1]),
|
||||||
/// stops us: the parser's `#logo` sentinel converts unrendered pages
|
Some(last) => (1..=last).collect(),
|
||||||
/// into transient errors before they reach this loop, so an empty
|
}
|
||||||
/// parse result reliably means "no more entries."
|
}
|
||||||
|
|
||||||
|
/// Walker returned by [`TargetSource::discover`]. Pops one source-index
|
||||||
|
/// page per `next_batch` call. Page 1's HTML is cached at construction
|
||||||
|
/// time (the discover call needed it to read `last_page` anyway) so the
|
||||||
|
/// batch covering page 1 doesn't re-fetch.
|
||||||
struct TargetSourceWalker {
|
struct TargetSourceWalker {
|
||||||
base_url: String,
|
base_url: String,
|
||||||
next_page: i32,
|
pages_remaining: VecDeque<i32>,
|
||||||
first_page_html: Option<String>,
|
first_page_html: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,18 +174,20 @@ impl DiscoverWalk for TargetSourceWalker {
|
|||||||
&mut self,
|
&mut self,
|
||||||
ctx: &FetchContext<'_>,
|
ctx: &FetchContext<'_>,
|
||||||
) -> anyhow::Result<Option<Vec<SourceMangaRef>>> {
|
) -> anyhow::Result<Option<Vec<SourceMangaRef>>> {
|
||||||
let page_num = self.next_page;
|
let Some(page_num) = self.pages_remaining.pop_front() else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
let page_refs = if page_num == 1 {
|
let page_refs = if page_num == 1 {
|
||||||
// Reuse the cached page-1 HTML from the initial probe. Take
|
// Reuse the cached page-1 HTML from the initial probe. Take
|
||||||
// it (rather than clone) so a future re-entry that somehow
|
// it (rather than clone) so a malformed page-order queue
|
||||||
// revisits page 1 still falls back to a real fetch.
|
// that re-visits page 1 still falls back to a real fetch.
|
||||||
match self.first_page_html.take() {
|
match self.first_page_html.take() {
|
||||||
Some(html) => {
|
Some(html) => {
|
||||||
let doc = scraper::Html::parse_document(&html);
|
let doc = scraper::Html::parse_document(&html);
|
||||||
parse_manga_list_from(&doc)?
|
parse_manga_list_from(&doc)?
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
retry_on_transient_with_hook(
|
retry_on_transient(
|
||||||
|| async {
|
|| async {
|
||||||
let html = navigate(
|
let html = navigate(
|
||||||
ctx,
|
ctx,
|
||||||
@@ -183,13 +200,12 @@ impl DiscoverWalk for TargetSourceWalker {
|
|||||||
},
|
},
|
||||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
PAGE_TRANSIENT_RETRY_DELAY,
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|| async { recircuit_if_configured(ctx.tor).await },
|
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
retry_on_transient_with_hook(
|
retry_on_transient(
|
||||||
|| async {
|
|| async {
|
||||||
let url = page_url(&self.base_url, page_num);
|
let url = page_url(&self.base_url, page_num);
|
||||||
let html = navigate(ctx, &url, LIST_PAGE_MARKER).await?;
|
let html = navigate(ctx, &url, LIST_PAGE_MARKER).await?;
|
||||||
@@ -198,15 +214,10 @@ impl DiscoverWalk for TargetSourceWalker {
|
|||||||
},
|
},
|
||||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
PAGE_TRANSIENT_RETRY_DELAY,
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|| async { recircuit_if_configured(ctx.tor).await },
|
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
tracing::info!(page_num, count = page_refs.len(), "page walked");
|
tracing::info!(page_num, count = page_refs.len(), "page walked");
|
||||||
if page_refs.is_empty() {
|
|
||||||
return Ok(None);
|
|
||||||
}
|
|
||||||
self.next_page += 1;
|
|
||||||
Ok(Some(page_refs))
|
Ok(Some(page_refs))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -277,20 +288,20 @@ fn classify_navigate_html(html: String) -> Result<String, PageError> {
|
|||||||
Ok(html)
|
Ok(html)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Hook for [`retry_on_transient_with_hook`]: when TOR is configured,
|
fn parse_last_page(doc: &scraper::Html) -> Option<i32> {
|
||||||
/// signal `NEWNYM` so the next navigation draws a fresh exit. Errors
|
// Pagination links carry their page number as text. Take the
|
||||||
/// from the controller are logged and swallowed — failing to recircuit
|
// numeric maximum so we don't depend on a specific layout (Prev,
|
||||||
/// shouldn't take down the crawl, the next attempt just runs on the
|
// Next, ellipses, etc. all get filtered out by .parse).
|
||||||
/// same circuit as before.
|
let sel = scraper::Selector::parse("#left_side .pagination a").unwrap();
|
||||||
async fn recircuit_if_configured(tor: Option<&crate::crawler::tor::TorController>) {
|
doc.select(&sel)
|
||||||
if let Some(t) = tor {
|
.filter_map(|a| {
|
||||||
if let Err(e) = t.new_identity().await {
|
collapse_whitespace(&a.text().collect::<String>())
|
||||||
tracing::warn!(error = %e, "TOR NEWNYM failed; retrying on same circuit");
|
.parse::<i32>()
|
||||||
}
|
.ok()
|
||||||
}
|
})
|
||||||
|
.max()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// Substitutes the first `/N/` path segment with the target page
|
/// Substitutes the first `/N/` path segment with the target page
|
||||||
/// number. Source impls that paginate via a different URL shape can
|
/// number. Source impls that paginate via a different URL shape can
|
||||||
/// override this — for the modeled site the segment is always present.
|
/// override this — for the modeled site the segment is always present.
|
||||||
@@ -842,6 +853,29 @@ mod tests {
|
|||||||
assert_eq!(parse_chapter_number("Special"), None);
|
assert_eq!(parse_chapter_number("Special"), None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_last_page_picks_highest_pagination_link() {
|
||||||
|
let html = r#"
|
||||||
|
<div id="left_side"><div class="pagination">
|
||||||
|
<a href="/list/1/">Prev</a>
|
||||||
|
<ol>
|
||||||
|
<li><a href="/list/1/">1</a></li>
|
||||||
|
<li><a href="/list/2/">2</a></li>
|
||||||
|
<li><a href="/list/47/">47</a></li>
|
||||||
|
<li><a href="/list/2/">Next</a></li>
|
||||||
|
</ol>
|
||||||
|
</div></div>
|
||||||
|
"#;
|
||||||
|
let doc = scraper::Html::parse_document(html);
|
||||||
|
assert_eq!(parse_last_page(&doc), Some(47));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_last_page_none_when_no_pagination() {
|
||||||
|
let doc = scraper::Html::parse_document("<html></html>");
|
||||||
|
assert!(parse_last_page(&doc).is_none());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn page_url_substitutes_numeric_path_segment() {
|
fn page_url_substitutes_numeric_path_segment() {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -990,6 +1024,28 @@ mod tests {
|
|||||||
assert!(err.is_transient(), "got non-transient: {err}");
|
assert!(err.is_transient(), "got non-transient: {err}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_is_natural_one_to_last() {
|
||||||
|
// Newest-first is just the source's natural pagination order:
|
||||||
|
// (update_date DESC) lives at page 1, oldest at the last page.
|
||||||
|
let order = build_page_order(Some(3));
|
||||||
|
assert_eq!(Vec::from(order), vec![1, 2, 3]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_falls_back_to_page_one_only_without_pagination() {
|
||||||
|
// Source surfaced no pagination control — visit page 1 alone
|
||||||
|
// and let the walk end after one batch.
|
||||||
|
let order = build_page_order(None);
|
||||||
|
assert_eq!(Vec::from(order), vec![1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_single_page_index_yields_one_entry() {
|
||||||
|
let order = build_page_order(Some(1));
|
||||||
|
assert_eq!(Vec::from(order), vec![1]);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse_chapter_list_returns_transient_when_table_missing() {
|
fn parse_chapter_list_returns_transient_when_table_missing() {
|
||||||
// Partial render (post-load JS hadn't injected the table, layout
|
// Partial render (post-load JS hadn't injected the table, layout
|
||||||
|
|||||||
@@ -1,446 +0,0 @@
|
|||||||
//! TOR control-port client for `SIGNAL NEWNYM` ("recircuit").
|
|
||||||
//!
|
|
||||||
//! The crawler can be proxied through TOR (`CRAWLER_PROXY=socks5h://tor:9050`)
|
|
||||||
//! to randomize the exit IP seen by the target site. When the target
|
|
||||||
//! returns a "bad page" (its broken-template body, missing layout
|
|
||||||
//! sentinel, or unauthenticated probe despite a valid PHPSESSID), it
|
|
||||||
//! is often the current exit being rate-limited or fingerprinted rather
|
|
||||||
//! than a real failure. Asking the local TOR daemon for a new identity
|
|
||||||
//! over its control port (port 9051 by default) makes subsequent
|
|
||||||
//! connections draw a fresh circuit; combined with `IsolateDestAddr`
|
|
||||||
//! in torrc this is usually enough to clear the failure.
|
|
||||||
//!
|
|
||||||
//! Scope is deliberately tiny — `AUTHENTICATE` + `SIGNAL NEWNYM` over
|
|
||||||
//! a one-shot TCP connection. No `torut` dep, no hidden-service
|
|
||||||
//! plumbing, no event streaming.
|
|
||||||
//!
|
|
||||||
//! **Caveat for in-flight connections:** Chromium reuses sockets, so a
|
|
||||||
//! `NEWNYM` only affects *new* connections (in TOR terms, new circuits).
|
|
||||||
//! That's fine for our retry path — the next navigation opens a fresh
|
|
||||||
//! connection. We do not try to forcibly close existing streams.
|
|
||||||
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Context};
|
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
use tokio::time::timeout;
|
|
||||||
|
|
||||||
/// Default control-port (`tor --defaults-torrc` ships 9051).
|
|
||||||
const DEFAULT_CONTROL_PORT: u16 = 9051;
|
|
||||||
/// Connect timeout — generous enough for a slow compose start, short
|
|
||||||
/// enough that a misconfigured controller doesn't stall a crawl.
|
|
||||||
const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
/// Per-command read timeout. `SIGNAL NEWNYM` returns instantly on the
|
|
||||||
/// happy path; bound it so a half-broken control port can't hang us.
|
|
||||||
const READ_TIMEOUT: Duration = Duration::from_secs(5);
|
|
||||||
|
|
||||||
/// How the controller authenticates to the control port.
|
|
||||||
///
|
|
||||||
/// `Cookie` is preferred for compose deploys where the auth cookie file
|
|
||||||
/// is shared between the `tor` and `backend` containers via a named
|
|
||||||
/// volume. `Password` is the fallback when the cookie file isn't
|
|
||||||
/// reachable (different gid, no shared volume, etc.). `None` matches a
|
|
||||||
/// torrc with no `CookieAuthentication 1` and no `HashedControlPassword`
|
|
||||||
/// — useful for local experimentation, not for production.
|
|
||||||
///
|
|
||||||
/// `Debug` is implemented manually to redact the password (and the
|
|
||||||
/// cookie path, which is non-sensitive but uninteresting in logs).
|
|
||||||
/// Don't add `#[derive(Debug)]` — the controller is `?`-logged at
|
|
||||||
/// startup and a derive would expand the password into the trace.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub enum TorAuth {
|
|
||||||
None,
|
|
||||||
Password(String),
|
|
||||||
Cookie(PathBuf),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Debug for TorAuth {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
TorAuth::None => f.write_str("None"),
|
|
||||||
TorAuth::Password(_) => f.write_str("Password(<redacted>)"),
|
|
||||||
TorAuth::Cookie(_) => f.write_str("Cookie(<path>)"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct TorController {
|
|
||||||
/// `host:port` string. Kept as a string (not a `SocketAddr`) so
|
|
||||||
/// docker-compose hostnames like `tor:9051` resolve at connect time.
|
|
||||||
addr: String,
|
|
||||||
auth: TorAuth,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TorController {
|
|
||||||
pub fn new(addr: impl Into<String>, auth: TorAuth) -> Self {
|
|
||||||
Self { addr: addr.into(), auth }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Build a controller from the env-config shape:
|
|
||||||
/// `url` (e.g. `tcp://tor:9051`, `127.0.0.1:9051`, or `tor`),
|
|
||||||
/// optional password, optional cookie path. Returns `Ok(None)` when
|
|
||||||
/// `url` is absent — that's the "TOR feature disabled" signal.
|
|
||||||
/// Cookie wins over password when both are set (rotates with TOR;
|
|
||||||
/// no secret to manage).
|
|
||||||
pub fn from_parts(
|
|
||||||
url: Option<&str>,
|
|
||||||
password: Option<&str>,
|
|
||||||
cookie_path: Option<&Path>,
|
|
||||||
) -> anyhow::Result<Option<Self>> {
|
|
||||||
let Some(url) = url else { return Ok(None) };
|
|
||||||
let addr = parse_control_url(url)?;
|
|
||||||
let auth = match (cookie_path, password) {
|
|
||||||
(Some(p), _) => TorAuth::Cookie(p.to_path_buf()),
|
|
||||||
(None, Some(p)) => TorAuth::Password(p.to_string()),
|
|
||||||
(None, None) => TorAuth::None,
|
|
||||||
};
|
|
||||||
Ok(Some(Self { addr, auth }))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Open the control port, `AUTHENTICATE`, `SIGNAL NEWNYM`, `QUIT`.
|
|
||||||
/// Each invocation is a fresh connection; the controller is cheap
|
|
||||||
/// to clone and stateless across calls.
|
|
||||||
pub async fn new_identity(&self) -> anyhow::Result<()> {
|
|
||||||
let stream = timeout(CONNECT_TIMEOUT, TcpStream::connect(&self.addr))
|
|
||||||
.await
|
|
||||||
.with_context(|| {
|
|
||||||
format!("timed out connecting to TOR control port {}", self.addr)
|
|
||||||
})?
|
|
||||||
.with_context(|| format!("connect to TOR control port {}", self.addr))?;
|
|
||||||
let (read, mut write) = stream.into_split();
|
|
||||||
let mut read = BufReader::new(read);
|
|
||||||
|
|
||||||
let auth_line = self.build_auth_line().await?;
|
|
||||||
write_line(&mut write, &auth_line).await?;
|
|
||||||
timeout(READ_TIMEOUT, expect_250(&mut read))
|
|
||||||
.await
|
|
||||||
.map_err(|_| anyhow!("TOR control AUTHENTICATE timed out"))?
|
|
||||||
.context("AUTHENTICATE")?;
|
|
||||||
|
|
||||||
write_line(&mut write, "SIGNAL NEWNYM").await?;
|
|
||||||
timeout(READ_TIMEOUT, expect_250(&mut read))
|
|
||||||
.await
|
|
||||||
.map_err(|_| anyhow!("TOR control SIGNAL NEWNYM timed out"))?
|
|
||||||
.context("SIGNAL NEWNYM")?;
|
|
||||||
|
|
||||||
// QUIT is courtesy; ignore errors — the daemon may close the
|
|
||||||
// socket before our QUIT lands and that's perfectly fine.
|
|
||||||
let _ = write_line(&mut write, "QUIT").await;
|
|
||||||
// Debug-level: a busy crawl can rotate circuits many times per
|
|
||||||
// minute, INFO is too chatty. Failures still log at WARN.
|
|
||||||
tracing::debug!(addr = %self.addr, "TOR NEWNYM signaled");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn build_auth_line(&self) -> anyhow::Result<String> {
|
|
||||||
match &self.auth {
|
|
||||||
TorAuth::None => Ok("AUTHENTICATE".to_string()),
|
|
||||||
TorAuth::Password(p) => Ok(format!("AUTHENTICATE \"{}\"", escape_quoted(p))),
|
|
||||||
TorAuth::Cookie(path) => {
|
|
||||||
let bytes = tokio::fs::read(path)
|
|
||||||
.await
|
|
||||||
.with_context(|| format!("read TOR cookie file {}", path.display()))?;
|
|
||||||
Ok(format!("AUTHENTICATE {}", hex_encode(&bytes)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse `tcp://host:port`, `host:port`, or bare `host` into a
|
|
||||||
/// connect-time string. Default port is [`DEFAULT_CONTROL_PORT`].
|
|
||||||
fn parse_control_url(url: &str) -> anyhow::Result<String> {
|
|
||||||
let stripped = url.strip_prefix("tcp://").unwrap_or(url);
|
|
||||||
if stripped.is_empty() {
|
|
||||||
bail!("TOR control url is empty");
|
|
||||||
}
|
|
||||||
if stripped.contains(':') {
|
|
||||||
Ok(stripped.to_string())
|
|
||||||
} else {
|
|
||||||
Ok(format!("{stripped}:{DEFAULT_CONTROL_PORT}"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn escape_quoted(s: &str) -> String {
|
|
||||||
s.replace('\\', r"\\").replace('"', r#"\""#)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn hex_encode(bytes: &[u8]) -> String {
|
|
||||||
let mut s = String::with_capacity(bytes.len() * 2);
|
|
||||||
for b in bytes {
|
|
||||||
s.push_str(&format!("{b:02x}"));
|
|
||||||
}
|
|
||||||
s
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_line<W: tokio::io::AsyncWrite + Unpin>(
|
|
||||||
w: &mut W,
|
|
||||||
line: &str,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
w.write_all(line.as_bytes()).await?;
|
|
||||||
w.write_all(b"\r\n").await?;
|
|
||||||
w.flush().await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Drain a TOR control reply, accepting only status `250`. Handles
|
|
||||||
/// the protocol's three line forms: `XYZ ...` (single/end), `XYZ-...`
|
|
||||||
/// (continuation), `XYZ+...` (data block ended by a lone `.`). Our
|
|
||||||
/// commands only ever produce single-line `250 OK`, but we honor the
|
|
||||||
/// continuation forms so a future torrc that adds events / banners
|
|
||||||
/// doesn't confuse the parser.
|
|
||||||
async fn expect_250<R: AsyncBufReadExt + Unpin>(r: &mut R) -> anyhow::Result<()> {
|
|
||||||
loop {
|
|
||||||
let mut line = String::new();
|
|
||||||
let n = r.read_line(&mut line).await?;
|
|
||||||
if n == 0 {
|
|
||||||
bail!("TOR control port closed connection mid-reply");
|
|
||||||
}
|
|
||||||
let trimmed = line.trim_end_matches(['\r', '\n']);
|
|
||||||
if trimmed.len() < 4 {
|
|
||||||
bail!("malformed TOR control reply: {trimmed:?}");
|
|
||||||
}
|
|
||||||
let (code, rest) = trimmed.split_at(3);
|
|
||||||
if code != "250" {
|
|
||||||
bail!("TOR control replied {trimmed:?}");
|
|
||||||
}
|
|
||||||
let sep = rest.as_bytes()[0];
|
|
||||||
match sep {
|
|
||||||
b' ' => return Ok(()),
|
|
||||||
b'-' => continue,
|
|
||||||
b'+' => {
|
|
||||||
// Data block — read until a line consisting of only ".".
|
|
||||||
loop {
|
|
||||||
let mut data = String::new();
|
|
||||||
let n = r.read_line(&mut data).await?;
|
|
||||||
if n == 0 {
|
|
||||||
bail!("TOR control port closed mid-data-block");
|
|
||||||
}
|
|
||||||
if data.trim_end_matches(['\r', '\n']) == "." {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => bail!("malformed TOR control reply separator: {trimmed:?}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
|
|
||||||
/// Spawn a mock control port that responds to each \r\n-terminated
|
|
||||||
/// inbound line with the next entry from `replies`. Each reply has
|
|
||||||
/// its own `\r\n` appended. Records received lines into `recorder`.
|
|
||||||
/// After `replies.len()` exchanges the task drops the socket — this
|
|
||||||
/// matches the real TOR behavior for QUIT (close after acking).
|
|
||||||
async fn spawn_mock(
|
|
||||||
replies: Vec<&'static str>,
|
|
||||||
recorder: Arc<Mutex<Vec<String>>>,
|
|
||||||
) -> String {
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let addr = listener.local_addr().unwrap().to_string();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let (sock, _) = listener.accept().await.unwrap();
|
|
||||||
let (r, mut w) = sock.into_split();
|
|
||||||
let mut r = BufReader::new(r);
|
|
||||||
for reply in replies {
|
|
||||||
let mut line = String::new();
|
|
||||||
let n = r.read_line(&mut line).await.unwrap_or(0);
|
|
||||||
if n == 0 {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
recorder
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.push(line.trim_end_matches(['\r', '\n']).to_string());
|
|
||||||
w.write_all(reply.as_bytes()).await.unwrap();
|
|
||||||
w.write_all(b"\r\n").await.unwrap();
|
|
||||||
w.flush().await.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
addr
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn password_auth_then_newnym_writes_expected_sequence() {
|
|
||||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
// Two replies: AUTHENTICATE then SIGNAL NEWNYM. QUIT is
|
|
||||||
// fire-and-forget; the mock dropping the socket is the
|
|
||||||
// expected real-world behavior.
|
|
||||||
let addr =
|
|
||||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
|
||||||
let controller = TorController::new(addr, TorAuth::Password("secret".into()));
|
|
||||||
controller.new_identity().await.expect("new_identity ok");
|
|
||||||
let recorded = recorder.lock().unwrap().clone();
|
|
||||||
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE \"secret\""));
|
|
||||||
assert_eq!(recorded.get(1).map(String::as_str), Some("SIGNAL NEWNYM"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn cookie_auth_hex_encodes_file_bytes() {
|
|
||||||
let tmp = tempfile::NamedTempFile::new().unwrap();
|
|
||||||
let cookie: Vec<u8> = (0u8..32).collect();
|
|
||||||
std::fs::write(tmp.path(), &cookie).unwrap();
|
|
||||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let addr =
|
|
||||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
|
||||||
let controller =
|
|
||||||
TorController::new(addr, TorAuth::Cookie(tmp.path().to_path_buf()));
|
|
||||||
controller.new_identity().await.expect("new_identity ok");
|
|
||||||
let recorded = recorder.lock().unwrap().clone();
|
|
||||||
let expected_hex: String = cookie.iter().map(|b| format!("{b:02x}")).collect();
|
|
||||||
assert_eq!(
|
|
||||||
recorded.first().map(String::as_str),
|
|
||||||
Some(format!("AUTHENTICATE {expected_hex}").as_str())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn no_auth_sends_bare_authenticate() {
|
|
||||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let addr =
|
|
||||||
spawn_mock(vec!["250 OK", "250 OK"], Arc::clone(&recorder)).await;
|
|
||||||
let controller = TorController::new(addr, TorAuth::None);
|
|
||||||
controller.new_identity().await.expect("new_identity ok");
|
|
||||||
let recorded = recorder.lock().unwrap().clone();
|
|
||||||
assert_eq!(recorded.first().map(String::as_str), Some("AUTHENTICATE"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn non_250_reply_returns_err_with_reply_text() {
|
|
||||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let addr = spawn_mock(
|
|
||||||
vec!["515 Bad authentication"],
|
|
||||||
Arc::clone(&recorder),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let controller =
|
|
||||||
TorController::new(addr, TorAuth::Password("wrong".into()));
|
|
||||||
let err = controller.new_identity().await.expect_err("should fail");
|
|
||||||
let msg = format!("{err:#}");
|
|
||||||
assert!(msg.contains("515"), "expected 515 in error, got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn closed_connection_mid_reply_is_an_error() {
|
|
||||||
// Listener accepts the AUTH line then drops without replying —
|
|
||||||
// this exercises the EOF-mid-reply path in expect_250 (rather
|
|
||||||
// than tor's own error replies which are covered elsewhere).
|
|
||||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
||||||
let addr = listener.local_addr().unwrap().to_string();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Ok((sock, _)) = listener.accept().await {
|
|
||||||
let (r, _w) = sock.into_split();
|
|
||||||
let mut r = BufReader::new(r);
|
|
||||||
let mut line = String::new();
|
|
||||||
let _ = r.read_line(&mut line).await; // read AUTH, ignore
|
|
||||||
// Drop _w (and the read half via scope exit) so the
|
|
||||||
// peer sees an immediate EOF on the next read.
|
|
||||||
}
|
|
||||||
});
|
|
||||||
let controller = TorController::new(addr, TorAuth::None);
|
|
||||||
let err = controller.new_identity().await.expect_err("should fail");
|
|
||||||
let msg = format!("{err:#}");
|
|
||||||
assert!(
|
|
||||||
msg.contains("closed connection"),
|
|
||||||
"expected EOF-mid-reply error, got: {msg}"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn multi_line_250_continuation_is_accepted() {
|
|
||||||
let recorder = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
// AUTHENTICATE reply uses the `250-...\r\n250 OK\r\n` form.
|
|
||||||
// Single reply string contains the whole multi-line response.
|
|
||||||
let addr = spawn_mock(
|
|
||||||
vec!["250-banner=foo\r\n250 OK", "250 OK"],
|
|
||||||
Arc::clone(&recorder),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let controller = TorController::new(addr, TorAuth::None);
|
|
||||||
controller.new_identity().await.expect("new_identity ok");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn from_parts_returns_none_when_url_unset() {
|
|
||||||
let c = TorController::from_parts(None, None, None).unwrap();
|
|
||||||
assert!(c.is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn from_parts_prefers_cookie_over_password() {
|
|
||||||
let c = TorController::from_parts(
|
|
||||||
Some("tor:9051"),
|
|
||||||
Some("pw"),
|
|
||||||
Some(Path::new("/var/lib/tor/control_auth_cookie")),
|
|
||||||
)
|
|
||||||
.unwrap()
|
|
||||||
.expect("controller built");
|
|
||||||
assert!(matches!(c.auth, TorAuth::Cookie(_)));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn from_parts_falls_back_to_password_without_cookie() {
|
|
||||||
let c = TorController::from_parts(Some("tor:9051"), Some("pw"), None)
|
|
||||||
.unwrap()
|
|
||||||
.expect("controller built");
|
|
||||||
assert!(matches!(c.auth, TorAuth::Password(p) if p == "pw"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_control_url_accepts_tcp_scheme() {
|
|
||||||
assert_eq!(parse_control_url("tcp://127.0.0.1:9051").unwrap(), "127.0.0.1:9051");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_control_url_defaults_port_when_omitted() {
|
|
||||||
assert_eq!(parse_control_url("tor").unwrap(), "tor:9051");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_control_url_passes_through_host_port() {
|
|
||||||
assert_eq!(parse_control_url("tor:9999").unwrap(), "tor:9999");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn parse_control_url_rejects_empty() {
|
|
||||||
assert!(parse_control_url("").is_err());
|
|
||||||
assert!(parse_control_url("tcp://").is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn escape_quoted_handles_quotes_and_backslashes() {
|
|
||||||
assert_eq!(escape_quoted(r#"a"b\c"#), r#"a\"b\\c"#);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn debug_format_redacts_password_and_cookie_path() {
|
|
||||||
// Regression: app.rs / bin/crawler.rs log the controller at
|
|
||||||
// startup via `tracing::info!(?t, ...)`. A derived Debug on
|
|
||||||
// TorAuth would expand TorAuth::Password(p) and leak the
|
|
||||||
// plaintext into logs.
|
|
||||||
let c = TorController::new("tor:9051", TorAuth::Password("super-secret".into()));
|
|
||||||
let dbg = format!("{c:?}");
|
|
||||||
assert!(!dbg.contains("super-secret"), "password leaked: {dbg}");
|
|
||||||
assert!(dbg.contains("<redacted>"), "expected <redacted>, got: {dbg}");
|
|
||||||
|
|
||||||
let c = TorController::new(
|
|
||||||
"tor:9051",
|
|
||||||
TorAuth::Cookie("/var/lib/tor/control_auth_cookie".into()),
|
|
||||||
);
|
|
||||||
let dbg = format!("{c:?}");
|
|
||||||
assert!(!dbg.contains("control_auth_cookie"), "cookie path leaked: {dbg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn hex_encode_zero_pads_low_bytes() {
|
|
||||||
assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -19,48 +19,11 @@ services:
|
|||||||
timeout: 5s
|
timeout: 5s
|
||||||
retries: 10
|
retries: 10
|
||||||
|
|
||||||
tor:
|
|
||||||
# SOCKS5 proxy for the crawler, plus a control port so the backend
|
|
||||||
# can signal NEWNYM on bad pages. See tor/torrc for the daemon
|
|
||||||
# config; both ports are only `expose`d (compose-internal), never
|
|
||||||
# bound on the host.
|
|
||||||
#
|
|
||||||
# We bypass dockurr/tor's stock entrypoint because it binds the
|
|
||||||
# control port to localhost (unreachable from the backend
|
|
||||||
# container) and skips its own HashedControlPassword injection
|
|
||||||
# when the user's torrc declares a ControlPort. Our wrapper
|
|
||||||
# (tor/entrypoint.sh) generates the hash from $PASSWORD and execs
|
|
||||||
# tor with our torrc. Backend authenticates with the same plain
|
|
||||||
# string via CRAWLER_TOR_CONTROL_PASSWORD.
|
|
||||||
image: dockurr/tor:latest
|
|
||||||
entrypoint: ["/bin/sh", "/usr/local/bin/mangalord-entrypoint.sh"]
|
|
||||||
environment:
|
|
||||||
PASSWORD: ${TOR_CONTROL_PASSWORD:?TOR_CONTROL_PASSWORD must be set in .env}
|
|
||||||
volumes:
|
|
||||||
- ./tor/torrc:/etc/tor/torrc:ro
|
|
||||||
- ./tor/entrypoint.sh:/usr/local/bin/mangalord-entrypoint.sh:ro
|
|
||||||
expose:
|
|
||||||
- "9050"
|
|
||||||
- "9051"
|
|
||||||
# Wait for both control + SOCKS ports to listen before downstream
|
|
||||||
# services start. dockurr/tor's main process spawns before tor
|
|
||||||
# itself is bound, so `service_started` alone races the first
|
|
||||||
# NEWNYM call.
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD-SHELL", "nc -z 127.0.0.1 9050 && nc -z 127.0.0.1 9051"]
|
|
||||||
interval: 5s
|
|
||||||
timeout: 5s
|
|
||||||
retries: 20
|
|
||||||
start_period: 30s
|
|
||||||
restart: unless-stopped
|
|
||||||
|
|
||||||
backend:
|
backend:
|
||||||
build: ./backend
|
build: ./backend
|
||||||
depends_on:
|
depends_on:
|
||||||
postgres:
|
postgres:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
tor:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
environment:
|
||||||
DATABASE_URL: postgres://${POSTGRES_USER:-mangalord}:${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set in .env}@postgres:5432/${POSTGRES_DB:-mangalord}
|
DATABASE_URL: postgres://${POSTGRES_USER:-mangalord}:${POSTGRES_PASSWORD:?POSTGRES_PASSWORD must be set in .env}@postgres:5432/${POSTGRES_DB:-mangalord}
|
||||||
BIND_ADDRESS: 0.0.0.0:8080
|
BIND_ADDRESS: 0.0.0.0:8080
|
||||||
@@ -81,16 +44,6 @@ services:
|
|||||||
# arm64 deployments. Pair with `--build-arg INSTALL_CHROMIUM=true`
|
# arm64 deployments. Pair with `--build-arg INSTALL_CHROMIUM=true`
|
||||||
# so the image actually contains the binary.
|
# so the image actually contains the binary.
|
||||||
CRAWLER_CHROMIUM_BINARY: ${CRAWLER_CHROMIUM_BINARY:-}
|
CRAWLER_CHROMIUM_BINARY: ${CRAWLER_CHROMIUM_BINARY:-}
|
||||||
# TOR proxy + NEWNYM recircuit (see .env.example for details).
|
|
||||||
# Defaults assume the bundled `tor` service above; override
|
|
||||||
# CRAWLER_PROXY= and CRAWLER_TOR_CONTROL_URL= (both empty) in
|
|
||||||
# .env to disable. CRAWLER_TOR_CONTROL_PASSWORD MUST match the
|
|
||||||
# tor service's PASSWORD (both wired to the same TOR_CONTROL_PASSWORD
|
|
||||||
# .env var below).
|
|
||||||
CRAWLER_PROXY: ${CRAWLER_PROXY-socks5h://tor:9050}
|
|
||||||
CRAWLER_TOR_CONTROL_URL: ${CRAWLER_TOR_CONTROL_URL-tcp://tor:9051}
|
|
||||||
CRAWLER_TOR_CONTROL_PASSWORD: ${TOR_CONTROL_PASSWORD:?TOR_CONTROL_PASSWORD must be set in .env}
|
|
||||||
CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS: ${CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS:-3}
|
|
||||||
volumes:
|
volumes:
|
||||||
- storage-data:/var/lib/mangalord/storage
|
- storage-data:/var/lib/mangalord/storage
|
||||||
# No host port mapping in the default setup — the frontend proxies
|
# No host port mapping in the default setup — the frontend proxies
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "mangalord-frontend",
|
"name": "mangalord-frontend",
|
||||||
"version": "0.46.0",
|
"version": "0.45.0",
|
||||||
"private": true,
|
"private": true,
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -1,40 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
# Mangalord wrapper around dockurr/tor's tor binary.
|
|
||||||
#
|
|
||||||
# We bypass the image's stock entrypoint for two reasons:
|
|
||||||
# 1. It generates a `ControlPort 9051` line that binds to localhost
|
|
||||||
# only (tor's default), but our backend lives in a separate
|
|
||||||
# container and needs to reach 0.0.0.0:9051.
|
|
||||||
# 2. It then *skips* writing HashedControlPassword whenever the
|
|
||||||
# user's torrc declares a ControlPort, so we can't both bind to
|
|
||||||
# 0.0.0.0 and benefit from its auto-hashing — it's one or the
|
|
||||||
# other. Doing the hashing ourselves is simpler than threading
|
|
||||||
# around its logic.
|
|
||||||
#
|
|
||||||
# This wrapper hashes $PASSWORD with `tor --hash-password`, appends a
|
|
||||||
# `HashedControlPassword` line to a writable copy of /etc/tor/torrc,
|
|
||||||
# then execs tor. Container runs as root (image default); tor binds
|
|
||||||
# 9050/9051 which don't require root and is fine inside a single-
|
|
||||||
# purpose container.
|
|
||||||
|
|
||||||
set -eu
|
|
||||||
|
|
||||||
if [ -z "${PASSWORD:-}" ]; then
|
|
||||||
echo "ERROR: PASSWORD env must be set (the plain string the backend will" >&2
|
|
||||||
echo " send as CRAWLER_TOR_CONTROL_PASSWORD)" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# `tor --hash-password` prints the hash on the last line of stdout
|
|
||||||
# (preceded by initialization noise).
|
|
||||||
HASH=$(tor --hash-password "$PASSWORD" 2>/dev/null | tail -n1)
|
|
||||||
if [ -z "$HASH" ]; then
|
|
||||||
echo "ERROR: 'tor --hash-password' produced no output" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
# /etc/tor/torrc is bind-mounted read-only, so copy + append.
|
|
||||||
cp /etc/tor/torrc /tmp/torrc
|
|
||||||
printf '\n# Injected by mangalord-entrypoint.sh from $PASSWORD env.\nHashedControlPassword %s\n' "$HASH" >> /tmp/torrc
|
|
||||||
|
|
||||||
exec tor -f /tmp/torrc
|
|
||||||
38
tor/torrc
38
tor/torrc
@@ -1,38 +0,0 @@
|
|||||||
# torrc for the Mangalord crawler.
|
|
||||||
#
|
|
||||||
# Mounted into the dockurr/tor container at /etc/tor/torrc. The
|
|
||||||
# crawler talks to this daemon over the internal compose network only:
|
|
||||||
# `expose:` on the tor service surfaces 9050/9051 to sibling
|
|
||||||
# containers, never to the host.
|
|
||||||
|
|
||||||
# SOCKS5 proxy that reqwest and Chromium use. IsolateDestAddr +
|
|
||||||
# IsolateDestPort means each new (destination IP, port) draws a fresh
|
|
||||||
# circuit — so a SIGNAL NEWNYM picks up promptly on the next
|
|
||||||
# navigation instead of having to wait for an existing dirty circuit
|
|
||||||
# to age out.
|
|
||||||
SOCKSPort 0.0.0.0:9050 IsolateDestAddr IsolateDestPort
|
|
||||||
|
|
||||||
# Control port for SIGNAL NEWNYM. We rely on the dockurr/tor
|
|
||||||
# entrypoint to inject `HashedControlPassword <hash>` from its
|
|
||||||
# PASSWORD env var (see docker-compose.yml `tor.environment.PASSWORD`)
|
|
||||||
# via a higher-priority --defaults-torrc. We just need to declare the
|
|
||||||
# port itself here.
|
|
||||||
ControlPort 0.0.0.0:9051
|
|
||||||
|
|
||||||
# Keep circuits dirty for a while so a single chapter (which serial-
|
|
||||||
# fetches all its images through the same SOCKS endpoint) finishes on
|
|
||||||
# one circuit rather than mid-circuit-rotating in a way that looks like
|
|
||||||
# anti-bot evasion to the target. NEWNYM still forces a fresh circuit
|
|
||||||
# immediately when we want one — this is just the idle-rotation knob.
|
|
||||||
MaxCircuitDirtiness 600
|
|
||||||
|
|
||||||
# Drop privileges to the image's `tor` user after binding ports.
|
|
||||||
# Required because /var/lib/tor (the image's DataDirectory volume)
|
|
||||||
# is owned by tor:tor and tor refuses to use a data dir it doesn't
|
|
||||||
# own. Our entrypoint runs as root only so it can call
|
|
||||||
# `tor --hash-password` and write /tmp/torrc.
|
|
||||||
User tor
|
|
||||||
|
|
||||||
# Data + logs.
|
|
||||||
DataDirectory /var/lib/tor
|
|
||||||
Log notice stdout
|
|
||||||
Reference in New Issue
Block a user