feat(crawler): plumb TorController through FetchContext and pipelines
Adds CRAWLER_TOR_CONTROL_URL / _PASSWORD / _COOKIE_PATH / _RECIRCUIT_MAX_ATTEMPTS to CrawlerConfig and to bin/crawler.rs's env reads. Constructs an Option<Arc<TorController>> at daemon / CLI startup and threads it through FetchContext, pipeline::run_metadata_pass, and content::sync_chapter_content as Option<&TorController>. Pure scaffolding — the controller isn't used yet; behavior is unchanged. Next commit wires the retry hooks and session-probe recircuit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -157,6 +157,17 @@ async fn spawn_crawler_daemon(
|
|||||||
|
|
||||||
let session_expired = Arc::new(AtomicBool::new(false));
|
let session_expired = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
|
let tor = crate::crawler::tor::TorController::from_parts(
|
||||||
|
cfg.tor_control_url.as_deref(),
|
||||||
|
cfg.tor_control_password.as_deref(),
|
||||||
|
cfg.tor_control_cookie_path.as_deref(),
|
||||||
|
)
|
||||||
|
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
||||||
|
.map(Arc::new);
|
||||||
|
if let Some(t) = &tor {
|
||||||
|
tracing::info!(?t, "TOR control configured; transient pages will trigger NEWNYM");
|
||||||
|
}
|
||||||
|
|
||||||
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
|
let metadata_pass: Option<Arc<dyn MetadataPass>> = cfg.start_url.as_ref().map(|url| {
|
||||||
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
|
let m: Arc<dyn MetadataPass> = Arc::new(RealMetadataPass {
|
||||||
browser_manager: Arc::clone(&browser_manager),
|
browser_manager: Arc::clone(&browser_manager),
|
||||||
@@ -167,6 +178,7 @@ async fn spawn_crawler_daemon(
|
|||||||
start_url: url.clone(),
|
start_url: url.clone(),
|
||||||
download_allowlist: cfg.download_allowlist.clone(),
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
max_image_bytes: cfg.max_image_bytes,
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
|
tor: tor.as_ref().map(Arc::clone),
|
||||||
});
|
});
|
||||||
m
|
m
|
||||||
});
|
});
|
||||||
@@ -179,6 +191,7 @@ async fn spawn_crawler_daemon(
|
|||||||
rate: Arc::clone(&rate),
|
rate: Arc::clone(&rate),
|
||||||
download_allowlist: cfg.download_allowlist.clone(),
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
max_image_bytes: cfg.max_image_bytes,
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
|
tor: tor.as_ref().map(Arc::clone),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
||||||
@@ -232,6 +245,7 @@ struct RealMetadataPass {
|
|||||||
start_url: String,
|
start_url: String,
|
||||||
download_allowlist: DownloadAllowlist,
|
download_allowlist: DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
|
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -248,6 +262,7 @@ impl MetadataPass for RealMetadataPass {
|
|||||||
false,
|
false,
|
||||||
&self.download_allowlist,
|
&self.download_allowlist,
|
||||||
self.max_image_bytes,
|
self.max_image_bytes,
|
||||||
|
self.tor.as_deref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
if let Err(e) = &result {
|
if let Err(e) = &result {
|
||||||
@@ -267,6 +282,7 @@ struct RealChapterDispatcher {
|
|||||||
rate: Arc<HostRateLimiters>,
|
rate: Arc<HostRateLimiters>,
|
||||||
download_allowlist: DownloadAllowlist,
|
download_allowlist: DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
|
tor: Option<Arc<crate::crawler::tor::TorController>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -298,6 +314,7 @@ impl ChapterDispatcher for RealChapterDispatcher {
|
|||||||
false,
|
false,
|
||||||
&self.download_allowlist,
|
&self.download_allowlist,
|
||||||
self.max_image_bytes,
|
self.max_image_bytes,
|
||||||
|
self.tor.as_deref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
|
|||||||
@@ -78,6 +78,16 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let proxy_url = std::env::var("CRAWLER_PROXY")
|
let proxy_url = std::env::var("CRAWLER_PROXY")
|
||||||
.ok()
|
.ok()
|
||||||
.filter(|s| !s.trim().is_empty());
|
.filter(|s| !s.trim().is_empty());
|
||||||
|
let tor_control_url = std::env::var("CRAWLER_TOR_CONTROL_URL")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty());
|
||||||
|
let tor_control_password = std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty());
|
||||||
|
let tor_control_cookie_path = std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty())
|
||||||
|
.map(std::path::PathBuf::from);
|
||||||
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
|
let keep_browser_open = env_bool("CRAWLER_KEEP_BROWSER_OPEN", false);
|
||||||
|
|
||||||
let db = PgPoolOptions::new()
|
let db = PgPoolOptions::new()
|
||||||
@@ -173,6 +183,17 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let session_ready = phpsessid.is_some() && cookie_domain.is_some();
|
let session_ready = phpsessid.is_some() && cookie_domain.is_some();
|
||||||
let manager = BrowserManager::new(options, Duration::ZERO, on_launch);
|
let manager = BrowserManager::new(options, Duration::ZERO, on_launch);
|
||||||
|
|
||||||
|
let tor = mangalord::crawler::tor::TorController::from_parts(
|
||||||
|
tor_control_url.as_deref(),
|
||||||
|
tor_control_password.as_deref(),
|
||||||
|
tor_control_cookie_path.as_deref(),
|
||||||
|
)
|
||||||
|
.context("build TorController from CRAWLER_TOR_CONTROL_* env")?
|
||||||
|
.map(Arc::new);
|
||||||
|
if let Some(t) = &tor {
|
||||||
|
tracing::info!(?t, "TOR control configured");
|
||||||
|
}
|
||||||
|
|
||||||
let result = run(
|
let result = run(
|
||||||
Arc::clone(&manager),
|
Arc::clone(&manager),
|
||||||
&db,
|
&db,
|
||||||
@@ -187,6 +208,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
skip_chapter_content || !session_ready,
|
skip_chapter_content || !session_ready,
|
||||||
chapter_workers,
|
chapter_workers,
|
||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
|
tor.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -216,6 +238,7 @@ async fn run(
|
|||||||
skip_chapter_content: bool,
|
skip_chapter_content: bool,
|
||||||
chapter_workers: usize,
|
chapter_workers: usize,
|
||||||
force_refetch_chapters: bool,
|
force_refetch_chapters: bool,
|
||||||
|
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
||||||
if let Some(host) = cdn_host {
|
if let Some(host) = cdn_host {
|
||||||
@@ -267,6 +290,7 @@ async fn run(
|
|||||||
skip_chapters,
|
skip_chapters,
|
||||||
allowlist.as_ref(),
|
allowlist.as_ref(),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
|
tor.as_deref(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
tracing::info!(?stats, "metadata pass complete");
|
tracing::info!(?stats, "metadata pass complete");
|
||||||
@@ -283,6 +307,7 @@ async fn run(
|
|||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
Arc::clone(&allowlist),
|
Arc::clone(&allowlist),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
|
tor.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -308,6 +333,7 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
|
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
|
tor: Option<Arc<mangalord::crawler::tor::TorController>>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
@@ -345,6 +371,7 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
let rate = Arc::clone(&rate);
|
let rate = Arc::clone(&rate);
|
||||||
let manager = Arc::clone(&manager);
|
let manager = Arc::clone(&manager);
|
||||||
let allowlist = Arc::clone(&allowlist);
|
let allowlist = Arc::clone(&allowlist);
|
||||||
|
let tor = tor.clone();
|
||||||
let stats = &stats;
|
let stats = &stats;
|
||||||
async move {
|
async move {
|
||||||
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
@@ -371,6 +398,7 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
force_refetch,
|
force_refetch,
|
||||||
allowlist.as_ref(),
|
allowlist.as_ref(),
|
||||||
max_image_bytes,
|
max_image_bytes,
|
||||||
|
tor.as_deref(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
|
|||||||
@@ -97,6 +97,20 @@ pub struct CrawlerConfig {
|
|||||||
pub cookie_domain: Option<String>,
|
pub cookie_domain: Option<String>,
|
||||||
pub user_agent: Option<String>,
|
pub user_agent: Option<String>,
|
||||||
pub proxy: Option<String>,
|
pub proxy: Option<String>,
|
||||||
|
/// `tcp://host:port`, `host:port`, or bare `host` (default port
|
||||||
|
/// 9051). When `None`, TOR-recircuit-on-transient is disabled and
|
||||||
|
/// the crawler behaves identically to pre-TOR releases.
|
||||||
|
pub tor_control_url: Option<String>,
|
||||||
|
/// HashedControlPassword auth. Used only when
|
||||||
|
/// `tor_control_cookie_path` is `None`.
|
||||||
|
pub tor_control_password: Option<String>,
|
||||||
|
/// Cookie-file auth path (e.g.
|
||||||
|
/// `/var/lib/tor/control_auth_cookie`). Takes precedence over
|
||||||
|
/// password when both are set.
|
||||||
|
pub tor_control_cookie_path: Option<PathBuf>,
|
||||||
|
/// Maximum NEWNYM-and-retry cycles per recircuit-eligible failure.
|
||||||
|
/// Defaults to 3.
|
||||||
|
pub tor_recircuit_max_attempts: u32,
|
||||||
pub browser: LaunchOptions,
|
pub browser: LaunchOptions,
|
||||||
/// Hosts the crawler is allowed to download images / covers from.
|
/// Hosts the crawler is allowed to download images / covers from.
|
||||||
/// Always seeded with the host of `start_url` and (when set) the
|
/// Always seeded with the host of `start_url` and (when set) the
|
||||||
@@ -124,6 +138,10 @@ impl Default for CrawlerConfig {
|
|||||||
cookie_domain: None,
|
cookie_domain: None,
|
||||||
user_agent: None,
|
user_agent: None,
|
||||||
proxy: None,
|
proxy: None,
|
||||||
|
tor_control_url: None,
|
||||||
|
tor_control_password: None,
|
||||||
|
tor_control_cookie_path: None,
|
||||||
|
tor_recircuit_max_attempts: 3,
|
||||||
browser: LaunchOptions::headless(),
|
browser: LaunchOptions::headless(),
|
||||||
download_allowlist: DownloadAllowlist::new(),
|
download_allowlist: DownloadAllowlist::new(),
|
||||||
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
||||||
@@ -234,6 +252,18 @@ impl CrawlerConfig {
|
|||||||
proxy: std::env::var("CRAWLER_PROXY")
|
proxy: std::env::var("CRAWLER_PROXY")
|
||||||
.ok()
|
.ok()
|
||||||
.filter(|s| !s.trim().is_empty()),
|
.filter(|s| !s.trim().is_empty()),
|
||||||
|
tor_control_url: std::env::var("CRAWLER_TOR_CONTROL_URL")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty()),
|
||||||
|
tor_control_password: std::env::var("CRAWLER_TOR_CONTROL_PASSWORD")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty()),
|
||||||
|
tor_control_cookie_path: std::env::var("CRAWLER_TOR_CONTROL_COOKIE_PATH")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty())
|
||||||
|
.map(PathBuf::from),
|
||||||
|
tor_recircuit_max_attempts: env_u64("CRAWLER_TOR_RECIRCUIT_MAX_ATTEMPTS", 3)
|
||||||
|
.max(1) as u32,
|
||||||
browser: LaunchOptions::from_env(),
|
browser: LaunchOptions::from_env(),
|
||||||
download_allowlist,
|
download_allowlist,
|
||||||
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ pub async fn sync_chapter_content(
|
|||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
allowlist: &DownloadAllowlist,
|
allowlist: &DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
|
_tor: Option<&crate::crawler::tor::TorController>,
|
||||||
) -> anyhow::Result<SyncOutcome> {
|
) -> anyhow::Result<SyncOutcome> {
|
||||||
// Skip if already fetched, unless caller explicitly forces.
|
// Skip if already fetched, unless caller explicitly forces.
|
||||||
if !force_refetch {
|
if !force_refetch {
|
||||||
|
|||||||
@@ -103,6 +103,7 @@ pub async fn run_metadata_pass(
|
|||||||
skip_chapters: bool,
|
skip_chapters: bool,
|
||||||
allowlist: &DownloadAllowlist,
|
allowlist: &DownloadAllowlist,
|
||||||
max_image_bytes: usize,
|
max_image_bytes: usize,
|
||||||
|
tor: Option<&crate::crawler::tor::TorController>,
|
||||||
) -> anyhow::Result<MetadataStats> {
|
) -> anyhow::Result<MetadataStats> {
|
||||||
let lease = browser_manager
|
let lease = browser_manager
|
||||||
.acquire()
|
.acquire()
|
||||||
@@ -121,6 +122,7 @@ pub async fn run_metadata_pass(
|
|||||||
let ctx = FetchContext {
|
let ctx = FetchContext {
|
||||||
browser: browser_ref,
|
browser: browser_ref,
|
||||||
rate,
|
rate,
|
||||||
|
tor,
|
||||||
};
|
};
|
||||||
|
|
||||||
let source_id = source.id();
|
let source_id = source.id();
|
||||||
|
|||||||
@@ -67,6 +67,10 @@ pub struct SourceChapter {
|
|||||||
pub struct FetchContext<'a> {
|
pub struct FetchContext<'a> {
|
||||||
pub browser: &'a Browser,
|
pub browser: &'a Browser,
|
||||||
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
||||||
|
/// Optional TOR control-port client. When `Some`, retry helpers
|
||||||
|
/// signal `NEWNYM` between transient-page attempts so the next try
|
||||||
|
/// draws a fresh exit. `None` keeps pre-TOR behavior.
|
||||||
|
pub tor: Option<&'a crate::crawler::tor::TorController>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lazy iterator over discovered manga refs. The caller drives the
|
/// Lazy iterator over discovered manga refs. The caller drives the
|
||||||
|
|||||||
Reference in New Issue
Block a user