Compare commits
3 Commits
feat/incre
...
bugfix/cra
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c33f30972e | ||
|
|
e7662d18d6 | ||
|
|
45ce0d8f12 |
@@ -44,6 +44,14 @@ MAX_REQUEST_BYTES=209715200
|
|||||||
# Default 20 MiB.
|
# Default 20 MiB.
|
||||||
MAX_FILE_BYTES=20971520
|
MAX_FILE_BYTES=20971520
|
||||||
|
|
||||||
|
# ----- Crawler download safety -----
|
||||||
|
# Hosts the crawler is allowed to fetch images/covers from, in addition
|
||||||
|
# to CRAWLER_START_URL's host and CRAWLER_CDN_HOST. Comma-separated.
|
||||||
|
# Defends against SSRF via scraped <img src="http://10.0.0.1/...">.
|
||||||
|
CRAWLER_DOWNLOAD_ALLOWLIST=
|
||||||
|
# Hard cap on a single image body. Default 32 MiB.
|
||||||
|
CRAWLER_MAX_IMAGE_BYTES=33554432
|
||||||
|
|
||||||
# ----- Frontend -----
|
# ----- Frontend -----
|
||||||
# The frontend container runs SvelteKit's Node adapter on :3000 and
|
# The frontend container runs SvelteKit's Node adapter on :3000 and
|
||||||
# proxies /api/* to BACKEND_URL via src/hooks.server.ts. In compose the
|
# proxies /api/* to BACKEND_URL via src/hooks.server.ts. In compose the
|
||||||
|
|||||||
71
.gitea/README.md
Normal file
71
.gitea/README.md
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# Gitea Actions
|
||||||
|
|
||||||
|
The [`deploy`](workflows/deploy.yml) workflow runs on every push to `main`
|
||||||
|
(and via manual `workflow_dispatch`). It tests, builds, pushes the images
|
||||||
|
to a private registry, and rolls the stack over by SSH on the target host.
|
||||||
|
|
||||||
|
## Required secrets
|
||||||
|
|
||||||
|
Set under *Repo Settings → Actions → Secrets*:
|
||||||
|
|
||||||
|
| Name | Example | Purpose |
|
||||||
|
| -------------------- | ------------------------ | ---------------------------------------------------------------- |
|
||||||
|
| `REGISTRY_URL` | `registry.example.com` | Registry host. No scheme, no trailing slash. |
|
||||||
|
| `REGISTRY_USERNAME` | `mangalord-ci` | `docker login` user. |
|
||||||
|
| `REGISTRY_PASSWORD` | `<token>` | `docker login` token/password. |
|
||||||
|
| `SSH_HOST` | `mangalord.example.com` | Deploy target hostname/IP. |
|
||||||
|
| `SSH_USER` | `deploy` | SSH user on the target (must be in the `docker` group). |
|
||||||
|
| `SSH_PRIVATE_KEY` | `-----BEGIN OPENSSH...` | Private key authorised in the target user's `authorized_keys`. |
|
||||||
|
| `SSH_PORT` | `22` | Optional. Defaults to `22` if unset. |
|
||||||
|
|
||||||
|
## Required variables
|
||||||
|
|
||||||
|
Set under *Repo Settings → Actions → Variables* (not secrets — they appear
|
||||||
|
in logs):
|
||||||
|
|
||||||
|
| Name | Example | Purpose |
|
||||||
|
| ------------- | ------------------------ | ---------------------------------------------------------------------- |
|
||||||
|
| `DEPLOY_PATH` | `/srv/mangalord` | Directory on target holding `docker-compose.yml`, `.env`, and the prod overlay. |
|
||||||
|
|
||||||
|
## One-time host setup
|
||||||
|
|
||||||
|
The workflow assumes the deploy target already has:
|
||||||
|
|
||||||
|
1. Docker + Docker Compose v2 installed and the `SSH_USER` in the `docker` group.
|
||||||
|
2. `$DEPLOY_PATH/docker-compose.yml` (copy of the repo's [docker-compose.yml](../docker-compose.yml)).
|
||||||
|
3. `$DEPLOY_PATH/docker-compose.prod.yml` (copy of the repo's [docker-compose.prod.yml](../docker-compose.prod.yml)).
|
||||||
|
4. `$DEPLOY_PATH/.env` populated from [.env.example](../.env.example) with production values (real `POSTGRES_PASSWORD`, `COOKIE_SECURE=true`, etc.).
|
||||||
|
|
||||||
|
Bootstrap once:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh deploy@mangalord.example.com
|
||||||
|
sudo mkdir -p /srv/mangalord && sudo chown deploy:deploy /srv/mangalord
|
||||||
|
cd /srv/mangalord
|
||||||
|
# place docker-compose.yml, docker-compose.prod.yml, and .env here
|
||||||
|
```
|
||||||
|
|
||||||
|
The first workflow run will pull the images, bring the stack up, and run
|
||||||
|
the embedded migrations on startup.
|
||||||
|
|
||||||
|
## Image tags
|
||||||
|
|
||||||
|
Every push produces three tags per image:
|
||||||
|
|
||||||
|
- `mangalord-{backend,frontend}:latest`
|
||||||
|
- `mangalord-{backend,frontend}:<git-sha>` — used by the deploy job; lets
|
||||||
|
you pin a deploy to a specific commit
|
||||||
|
- `mangalord-{backend,frontend}:<version>` — the version from
|
||||||
|
[backend/Cargo.toml](../backend/Cargo.toml) (verified in lockstep with
|
||||||
|
[frontend/package.json](../frontend/package.json))
|
||||||
|
|
||||||
|
## Rollback
|
||||||
|
|
||||||
|
SSH to the target, set `IMAGE_TAG` to a previous commit SHA, and re-up:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd /srv/mangalord
|
||||||
|
export REGISTRY_URL=registry.example.com
|
||||||
|
export IMAGE_TAG=<previous-sha>
|
||||||
|
docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d
|
||||||
|
```
|
||||||
144
.gitea/workflows/deploy.yml
Normal file
144
.gitea/workflows/deploy.yml
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
name: deploy
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
workflow_dispatch:
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
test-backend:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
container:
|
||||||
|
image: rust:1-slim
|
||||||
|
services:
|
||||||
|
postgres:
|
||||||
|
image: postgres:16-alpine
|
||||||
|
env:
|
||||||
|
POSTGRES_USER: mangalord
|
||||||
|
POSTGRES_PASSWORD: mangalord
|
||||||
|
POSTGRES_DB: mangalord
|
||||||
|
options: >-
|
||||||
|
--health-cmd "pg_isready -U mangalord"
|
||||||
|
--health-interval 5s
|
||||||
|
--health-timeout 5s
|
||||||
|
--health-retries 10
|
||||||
|
env:
|
||||||
|
DATABASE_URL: postgres://mangalord:mangalord@postgres:5432/mangalord
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- name: Install build deps
|
||||||
|
run: |
|
||||||
|
apt-get update
|
||||||
|
apt-get install -y --no-install-recommends pkg-config libssl-dev ca-certificates
|
||||||
|
- name: Cache cargo registry and target
|
||||||
|
uses: actions/cache@v4
|
||||||
|
with:
|
||||||
|
path: |
|
||||||
|
~/.cargo/registry
|
||||||
|
~/.cargo/git
|
||||||
|
backend/target
|
||||||
|
key: cargo-${{ runner.os }}-${{ hashFiles('backend/Cargo.lock') }}
|
||||||
|
restore-keys: |
|
||||||
|
cargo-${{ runner.os }}-
|
||||||
|
- name: cargo test
|
||||||
|
working-directory: backend
|
||||||
|
run: cargo test --locked
|
||||||
|
|
||||||
|
test-frontend:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: actions/setup-node@v4
|
||||||
|
with:
|
||||||
|
node-version: '22'
|
||||||
|
cache: npm
|
||||||
|
cache-dependency-path: frontend/package-lock.json
|
||||||
|
- name: npm ci
|
||||||
|
working-directory: frontend
|
||||||
|
run: npm ci
|
||||||
|
- name: vitest
|
||||||
|
working-directory: frontend
|
||||||
|
run: npm test
|
||||||
|
|
||||||
|
build-and-push:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: [test-backend, test-frontend]
|
||||||
|
outputs:
|
||||||
|
image_tag: ${{ steps.meta.outputs.image_tag }}
|
||||||
|
version: ${{ steps.meta.outputs.version }}
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Resolve image tags
|
||||||
|
id: meta
|
||||||
|
run: |
|
||||||
|
version="$(grep -m1 '^version' backend/Cargo.toml | cut -d'"' -f2)"
|
||||||
|
frontend_version="$(grep -m1 '"version"' frontend/package.json | cut -d'"' -f4)"
|
||||||
|
if [ "$version" != "$frontend_version" ]; then
|
||||||
|
echo "Version mismatch: backend=$version frontend=$frontend_version" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "image_tag=${GITHUB_SHA}" >> "$GITHUB_OUTPUT"
|
||||||
|
echo "version=${version}" >> "$GITHUB_OUTPUT"
|
||||||
|
|
||||||
|
- uses: docker/setup-buildx-action@v3
|
||||||
|
|
||||||
|
- name: docker login
|
||||||
|
uses: docker/login-action@v3
|
||||||
|
with:
|
||||||
|
registry: ${{ secrets.REGISTRY_URL }}
|
||||||
|
username: ${{ secrets.REGISTRY_USERNAME }}
|
||||||
|
password: ${{ secrets.REGISTRY_PASSWORD }}
|
||||||
|
|
||||||
|
- name: Build & push backend
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: ./backend
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:latest
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.image_tag }}
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-backend:${{ steps.meta.outputs.version }}
|
||||||
|
cache-from: type=gha,scope=backend
|
||||||
|
cache-to: type=gha,mode=max,scope=backend
|
||||||
|
|
||||||
|
- name: Build & push frontend
|
||||||
|
uses: docker/build-push-action@v5
|
||||||
|
with:
|
||||||
|
context: ./frontend
|
||||||
|
push: true
|
||||||
|
tags: |
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:latest
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.image_tag }}
|
||||||
|
${{ secrets.REGISTRY_URL }}/mangalord-frontend:${{ steps.meta.outputs.version }}
|
||||||
|
cache-from: type=gha,scope=frontend
|
||||||
|
cache-to: type=gha,mode=max,scope=frontend
|
||||||
|
|
||||||
|
deploy:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: build-and-push
|
||||||
|
steps:
|
||||||
|
- name: SSH deploy
|
||||||
|
uses: appleboy/ssh-action@v1.0.3
|
||||||
|
with:
|
||||||
|
host: ${{ secrets.SSH_HOST }}
|
||||||
|
username: ${{ secrets.SSH_USER }}
|
||||||
|
key: ${{ secrets.SSH_PRIVATE_KEY }}
|
||||||
|
port: ${{ secrets.SSH_PORT || 22 }}
|
||||||
|
envs: REGISTRY_URL,REGISTRY_USERNAME,REGISTRY_PASSWORD,IMAGE_TAG,DEPLOY_PATH
|
||||||
|
script_stop: true
|
||||||
|
script: |
|
||||||
|
set -euo pipefail
|
||||||
|
cd "$DEPLOY_PATH"
|
||||||
|
echo "$REGISTRY_PASSWORD" | docker login "$REGISTRY_URL" -u "$REGISTRY_USERNAME" --password-stdin
|
||||||
|
export REGISTRY_URL IMAGE_TAG
|
||||||
|
docker compose -f docker-compose.yml -f docker-compose.prod.yml pull
|
||||||
|
docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d
|
||||||
|
docker image prune -f
|
||||||
|
docker logout "$REGISTRY_URL"
|
||||||
|
env:
|
||||||
|
REGISTRY_URL: ${{ secrets.REGISTRY_URL }}
|
||||||
|
REGISTRY_USERNAME: ${{ secrets.REGISTRY_USERNAME }}
|
||||||
|
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
|
||||||
|
IMAGE_TAG: ${{ needs.build-and-push.outputs.image_tag }}
|
||||||
|
DEPLOY_PATH: ${{ vars.DEPLOY_PATH }}
|
||||||
18
backend/Cargo.lock
generated
18
backend/Cargo.lock
generated
@@ -1470,7 +1470,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.32.0"
|
version = "0.34.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"argon2",
|
"argon2",
|
||||||
@@ -2324,6 +2324,7 @@ dependencies = [
|
|||||||
"cookie",
|
"cookie",
|
||||||
"cookie_store",
|
"cookie_store",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
"http",
|
"http",
|
||||||
"http-body",
|
"http-body",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
@@ -2343,12 +2344,14 @@ dependencies = [
|
|||||||
"sync_wrapper",
|
"sync_wrapper",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
|
"tokio-util",
|
||||||
"tower",
|
"tower",
|
||||||
"tower-http",
|
"tower-http",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"url",
|
"url",
|
||||||
"wasm-bindgen",
|
"wasm-bindgen",
|
||||||
"wasm-bindgen-futures",
|
"wasm-bindgen-futures",
|
||||||
|
"wasm-streams",
|
||||||
"web-sys",
|
"web-sys",
|
||||||
"webpki-roots",
|
"webpki-roots",
|
||||||
]
|
]
|
||||||
@@ -3527,6 +3530,19 @@ dependencies = [
|
|||||||
"wasmparser",
|
"wasmparser",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasm-streams"
|
||||||
|
version = "0.4.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"js-sys",
|
||||||
|
"wasm-bindgen",
|
||||||
|
"wasm-bindgen-futures",
|
||||||
|
"web-sys",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasmparser"
|
name = "wasmparser"
|
||||||
version = "0.244.0"
|
version = "0.244.0"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "mangalord"
|
name = "mangalord"
|
||||||
version = "0.32.0"
|
version = "0.34.1"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
default-run = "mangalord"
|
default-run = "mangalord"
|
||||||
|
|
||||||
@@ -46,7 +46,7 @@ futures-util = "0.3"
|
|||||||
bytes = "1"
|
bytes = "1"
|
||||||
chromiumoxide = { version = "0.7", features = ["tokio-runtime", "_fetcher-rusttls-tokio"], default-features = false }
|
chromiumoxide = { version = "0.7", features = ["tokio-runtime", "_fetcher-rusttls-tokio"], default-features = false }
|
||||||
scraper = "0.20"
|
scraper = "0.20"
|
||||||
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "socks", "cookies"] }
|
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "socks", "cookies", "stream"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
|||||||
@@ -12,14 +12,17 @@ use tokio_util::sync::CancellationToken;
|
|||||||
use tower_http::cors::{AllowOrigin, CorsLayer};
|
use tower_http::cors::{AllowOrigin, CorsLayer};
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
|
|
||||||
use crate::config::{AuthConfig, Config, CrawlerConfig, UploadConfig};
|
use crate::config::{AuthConfig, Config, CrawlerConfig, CrawlerModePref, UploadConfig};
|
||||||
use crate::crawler::browser_manager::{self, BrowserManager};
|
use crate::crawler::browser_manager::{self, BrowserManager};
|
||||||
use crate::crawler::content::{self, SyncOutcome};
|
use crate::crawler::content::{self, SyncOutcome};
|
||||||
use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass};
|
use crate::crawler::daemon::{self, ChapterDispatcher, DaemonConfig, MetadataPass};
|
||||||
use crate::crawler::jobs::JobPayload;
|
use crate::crawler::jobs::JobPayload;
|
||||||
use crate::crawler::pipeline::{self, MetadataStats};
|
use crate::crawler::pipeline::{self, MetadataStats};
|
||||||
use crate::crawler::rate_limit::HostRateLimiters;
|
use crate::crawler::rate_limit::HostRateLimiters;
|
||||||
|
use crate::crawler::safety::DownloadAllowlist;
|
||||||
use crate::crawler::session;
|
use crate::crawler::session;
|
||||||
|
use crate::crawler::source::{target as target_source, DiscoverMode};
|
||||||
|
use crate::repo;
|
||||||
use crate::storage::{LocalStorage, Storage};
|
use crate::storage::{LocalStorage, Storage};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
@@ -149,6 +152,10 @@ async fn spawn_crawler_daemon(
|
|||||||
http: http.clone(),
|
http: http.clone(),
|
||||||
rate: Arc::clone(&rate),
|
rate: Arc::clone(&rate),
|
||||||
start_url: url.clone(),
|
start_url: url.clone(),
|
||||||
|
mode_pref: cfg.mode,
|
||||||
|
incremental_stop_after: cfg.incremental_stop_after,
|
||||||
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
});
|
});
|
||||||
m
|
m
|
||||||
});
|
});
|
||||||
@@ -159,6 +166,8 @@ async fn spawn_crawler_daemon(
|
|||||||
storage: Arc::clone(&storage),
|
storage: Arc::clone(&storage),
|
||||||
http,
|
http,
|
||||||
rate: Arc::clone(&rate),
|
rate: Arc::clone(&rate),
|
||||||
|
download_allowlist: cfg.download_allowlist.clone(),
|
||||||
|
max_image_bytes: cfg.max_image_bytes,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
// Shared cancellation: daemon shutdown cancels the BrowserManager's
|
||||||
@@ -210,11 +219,22 @@ struct RealMetadataPass {
|
|||||||
http: reqwest::Client,
|
http: reqwest::Client,
|
||||||
rate: Arc<HostRateLimiters>,
|
rate: Arc<HostRateLimiters>,
|
||||||
start_url: String,
|
start_url: String,
|
||||||
|
mode_pref: CrawlerModePref,
|
||||||
|
incremental_stop_after: usize,
|
||||||
|
download_allowlist: DownloadAllowlist,
|
||||||
|
max_image_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl MetadataPass for RealMetadataPass {
|
impl MetadataPass for RealMetadataPass {
|
||||||
async fn run(&self) -> anyhow::Result<MetadataStats> {
|
async fn run(&self) -> anyhow::Result<MetadataStats> {
|
||||||
|
let mode = resolve_mode(
|
||||||
|
&self.db,
|
||||||
|
target_source::SOURCE_ID,
|
||||||
|
self.mode_pref,
|
||||||
|
self.incremental_stop_after,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
pipeline::run_metadata_pass(
|
pipeline::run_metadata_pass(
|
||||||
&self.browser_manager,
|
&self.browser_manager,
|
||||||
&self.db,
|
&self.db,
|
||||||
@@ -224,17 +244,66 @@ impl MetadataPass for RealMetadataPass {
|
|||||||
&self.start_url,
|
&self.start_url,
|
||||||
0,
|
0,
|
||||||
false,
|
false,
|
||||||
|
mode,
|
||||||
|
&self.download_allowlist,
|
||||||
|
self.max_image_bytes,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Pick the active mode for this tick. `Explicit` short-circuits the
|
||||||
|
/// DB lookup. `Auto` reads `seed_completed_at`: missing → Backfill
|
||||||
|
/// (initial seed for this source), present → Incremental with the
|
||||||
|
/// configured threshold.
|
||||||
|
///
|
||||||
|
/// A DB error during the Auto lookup propagates as `Err` rather than
|
||||||
|
/// silently degrading to Backfill — the daemon's `run_tick` catches
|
||||||
|
/// the error, logs, and skips the tick. That's safer than running a
|
||||||
|
/// full re-backfill (including a drop pass against stale-looking rows)
|
||||||
|
/// when the DB is flaky.
|
||||||
|
async fn resolve_mode(
|
||||||
|
db: &PgPool,
|
||||||
|
source_id: &str,
|
||||||
|
pref: CrawlerModePref,
|
||||||
|
incremental_stop_after: usize,
|
||||||
|
) -> anyhow::Result<DiscoverMode> {
|
||||||
|
match pref {
|
||||||
|
CrawlerModePref::Explicit(m) => {
|
||||||
|
tracing::info!(?m, "crawler mode: explicit (CRAWLER_MODE override)");
|
||||||
|
Ok(m)
|
||||||
|
}
|
||||||
|
CrawlerModePref::Auto => {
|
||||||
|
let seeded = repo::crawler::seed_completed_at(db, source_id)
|
||||||
|
.await
|
||||||
|
.context("seed_completed_at lookup for mode auto-detection")?;
|
||||||
|
match seeded {
|
||||||
|
Some(at) => {
|
||||||
|
tracing::info!(
|
||||||
|
seed_completed_at = %at.to_rfc3339(),
|
||||||
|
"crawler mode: auto → incremental (seed previously completed)"
|
||||||
|
);
|
||||||
|
Ok(DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: incremental_stop_after,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
tracing::info!("crawler mode: auto → backfill (no seed marker for source)");
|
||||||
|
Ok(DiscoverMode::Backfill)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct RealChapterDispatcher {
|
struct RealChapterDispatcher {
|
||||||
browser_manager: Arc<BrowserManager>,
|
browser_manager: Arc<BrowserManager>,
|
||||||
db: PgPool,
|
db: PgPool,
|
||||||
storage: Arc<dyn Storage>,
|
storage: Arc<dyn Storage>,
|
||||||
http: reqwest::Client,
|
http: reqwest::Client,
|
||||||
rate: Arc<HostRateLimiters>,
|
rate: Arc<HostRateLimiters>,
|
||||||
|
download_allowlist: DownloadAllowlist,
|
||||||
|
max_image_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -273,6 +342,8 @@ impl ChapterDispatcher for RealChapterDispatcher {
|
|||||||
manga_id,
|
manga_id,
|
||||||
&source_url,
|
&source_url,
|
||||||
false,
|
false,
|
||||||
|
&self.download_allowlist,
|
||||||
|
self.max_image_bytes,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use mangalord::crawler::content::{self, SyncOutcome};
|
|||||||
use mangalord::crawler::pipeline;
|
use mangalord::crawler::pipeline;
|
||||||
use mangalord::crawler::rate_limit::HostRateLimiters;
|
use mangalord::crawler::rate_limit::HostRateLimiters;
|
||||||
use mangalord::crawler::session;
|
use mangalord::crawler::session;
|
||||||
|
use mangalord::crawler::source::DiscoverMode;
|
||||||
use mangalord::storage::{LocalStorage, Storage};
|
use mangalord::storage::{LocalStorage, Storage};
|
||||||
use sqlx::postgres::PgPoolOptions;
|
use sqlx::postgres::PgPoolOptions;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
@@ -62,6 +63,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms);
|
let cdn_rate_ms = env_u64("CRAWLER_CDN_RATE_MS", rate_ms);
|
||||||
let limit = env_u64("CRAWLER_LIMIT", 0) as usize;
|
let limit = env_u64("CRAWLER_LIMIT", 0) as usize;
|
||||||
let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false);
|
let skip_chapters = env_bool("CRAWLER_SKIP_CHAPTERS", false);
|
||||||
|
let incremental_stop_after = env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize;
|
||||||
|
let mode = parse_crawler_mode(incremental_stop_after)?;
|
||||||
let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false);
|
let skip_chapter_content = env_bool("CRAWLER_SKIP_CHAPTER_CONTENT", false);
|
||||||
let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize;
|
let chapter_workers = env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize;
|
||||||
let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false);
|
let force_refetch_chapters = env_bool("CRAWLER_FORCE_REFETCH_CHAPTERS", false);
|
||||||
@@ -140,6 +143,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
user_agent = ?user_agent,
|
user_agent = ?user_agent,
|
||||||
proxy = ?proxy_url,
|
proxy = ?proxy_url,
|
||||||
keep_open,
|
keep_open,
|
||||||
|
?mode,
|
||||||
storage_dir = %storage_dir.display(),
|
storage_dir = %storage_dir.display(),
|
||||||
"starting crawler"
|
"starting crawler"
|
||||||
);
|
);
|
||||||
@@ -187,6 +191,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
skip_chapter_content || !session_ready,
|
skip_chapter_content || !session_ready,
|
||||||
chapter_workers,
|
chapter_workers,
|
||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
|
mode,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -216,6 +221,7 @@ async fn run(
|
|||||||
skip_chapter_content: bool,
|
skip_chapter_content: bool,
|
||||||
chapter_workers: usize,
|
chapter_workers: usize,
|
||||||
force_refetch_chapters: bool,
|
force_refetch_chapters: bool,
|
||||||
|
mode: DiscoverMode,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
let mut rate = HostRateLimiters::new(Duration::from_millis(rate_ms));
|
||||||
if let Some(host) = cdn_host {
|
if let Some(host) = cdn_host {
|
||||||
@@ -223,6 +229,33 @@ async fn run(
|
|||||||
}
|
}
|
||||||
let rate = Arc::new(rate);
|
let rate = Arc::new(rate);
|
||||||
|
|
||||||
|
// SSRF defence: only download from the catalog host + CDN host
|
||||||
|
// (plus optional CRAWLER_DOWNLOAD_ALLOWLIST extras), and cap
|
||||||
|
// single-image downloads at CRAWLER_MAX_IMAGE_BYTES bytes.
|
||||||
|
let mut allowlist =
|
||||||
|
mangalord::crawler::safety::DownloadAllowlist::new();
|
||||||
|
if let Ok(parsed) = reqwest::Url::parse(start_url) {
|
||||||
|
if let Some(h) = parsed.host_str() {
|
||||||
|
allowlist = allowlist.allow(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(host) = cdn_host {
|
||||||
|
allowlist = allowlist.allow(host);
|
||||||
|
}
|
||||||
|
if let Ok(extras) = std::env::var("CRAWLER_DOWNLOAD_ALLOWLIST") {
|
||||||
|
for piece in extras.split(',') {
|
||||||
|
let trimmed = piece.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
allowlist = allowlist.allow(trimmed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let max_image_bytes: usize = std::env::var("CRAWLER_MAX_IMAGE_BYTES")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(mangalord::crawler::safety::DEFAULT_MAX_IMAGE_BYTES);
|
||||||
|
let allowlist = Arc::new(allowlist);
|
||||||
|
|
||||||
let stats = pipeline::run_metadata_pass(
|
let stats = pipeline::run_metadata_pass(
|
||||||
manager.as_ref(),
|
manager.as_ref(),
|
||||||
db,
|
db,
|
||||||
@@ -232,6 +265,9 @@ async fn run(
|
|||||||
start_url,
|
start_url,
|
||||||
limit,
|
limit,
|
||||||
skip_chapters,
|
skip_chapters,
|
||||||
|
mode,
|
||||||
|
allowlist.as_ref(),
|
||||||
|
max_image_bytes,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
tracing::info!(?stats, "metadata pass complete");
|
tracing::info!(?stats, "metadata pass complete");
|
||||||
@@ -246,6 +282,8 @@ async fn run(
|
|||||||
"target",
|
"target",
|
||||||
chapter_workers,
|
chapter_workers,
|
||||||
force_refetch_chapters,
|
force_refetch_chapters,
|
||||||
|
Arc::clone(&allowlist),
|
||||||
|
max_image_bytes,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
@@ -269,6 +307,8 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
source_id: &str,
|
source_id: &str,
|
||||||
workers: usize,
|
workers: usize,
|
||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
|
allowlist: Arc<mangalord::crawler::safety::DownloadAllowlist>,
|
||||||
|
max_image_bytes: usize,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
let pending: Vec<(Uuid, Uuid, String)> = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
@@ -305,6 +345,7 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
let storage = Arc::clone(&storage);
|
let storage = Arc::clone(&storage);
|
||||||
let rate = Arc::clone(&rate);
|
let rate = Arc::clone(&rate);
|
||||||
let manager = Arc::clone(&manager);
|
let manager = Arc::clone(&manager);
|
||||||
|
let allowlist = Arc::clone(&allowlist);
|
||||||
let stats = &stats;
|
let stats = &stats;
|
||||||
async move {
|
async move {
|
||||||
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
if session_expired.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
@@ -329,6 +370,8 @@ async fn sync_bookmarked_chapter_content(
|
|||||||
manga_id,
|
manga_id,
|
||||||
&source_url,
|
&source_url,
|
||||||
force_refetch,
|
force_refetch,
|
||||||
|
allowlist.as_ref(),
|
||||||
|
max_image_bytes,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
drop(lease);
|
drop(lease);
|
||||||
@@ -390,6 +433,38 @@ fn resolve_start_url() -> anyhow::Result<String> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse the CLI's `CRAWLER_MODE`. Defaults to `backfill` because the
|
||||||
|
/// binary is operator-driven (manual reseeds, force-refetches) — the
|
||||||
|
/// auto-detect logic lives in the daemon. `auto` is rejected because
|
||||||
|
/// the CLI has no DB state to consult before the run.
|
||||||
|
fn parse_crawler_mode(incremental_stop_after: usize) -> anyhow::Result<DiscoverMode> {
|
||||||
|
parse_crawler_mode_str(
|
||||||
|
std::env::var("CRAWLER_MODE").ok().as_deref(),
|
||||||
|
incremental_stop_after,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pure variant of [`parse_crawler_mode`] — testable without env-var
|
||||||
|
/// mutation.
|
||||||
|
fn parse_crawler_mode_str(
|
||||||
|
raw: Option<&str>,
|
||||||
|
incremental_stop_after: usize,
|
||||||
|
) -> anyhow::Result<DiscoverMode> {
|
||||||
|
match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() {
|
||||||
|
None | Some("") | Some("backfill") => Ok(DiscoverMode::Backfill),
|
||||||
|
Some("incremental") => Ok(DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: incremental_stop_after,
|
||||||
|
}),
|
||||||
|
Some("auto") => Err(anyhow!(
|
||||||
|
"CRAWLER_MODE=auto isn't supported by the CLI (use backfill or incremental); \
|
||||||
|
the daemon does auto-detection"
|
||||||
|
)),
|
||||||
|
Some(other) => Err(anyhow!(
|
||||||
|
"CRAWLER_MODE must be one of: backfill, incremental (got {other:?})"
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn env_u64(name: &str, default: u64) -> u64 {
|
fn env_u64(name: &str, default: u64) -> u64 {
|
||||||
std::env::var(name)
|
std::env::var(name)
|
||||||
.ok()
|
.ok()
|
||||||
@@ -405,3 +480,55 @@ fn env_bool(name: &str, default: bool) -> bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_mode_defaults_to_backfill_when_unset_or_blank() {
|
||||||
|
let none = parse_crawler_mode_str(None, 20).unwrap();
|
||||||
|
assert!(matches!(none, DiscoverMode::Backfill));
|
||||||
|
let blank = parse_crawler_mode_str(Some(""), 20).unwrap();
|
||||||
|
assert!(matches!(blank, DiscoverMode::Backfill));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_mode_recognizes_backfill_and_incremental() {
|
||||||
|
let backfill = parse_crawler_mode_str(Some("backfill"), 20).unwrap();
|
||||||
|
assert!(matches!(backfill, DiscoverMode::Backfill));
|
||||||
|
|
||||||
|
let incremental = parse_crawler_mode_str(Some("incremental"), 9).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
incremental,
|
||||||
|
DiscoverMode::Incremental { stop_after_unchanged: 9 }
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_mode_rejects_auto_explicitly() {
|
||||||
|
let err = parse_crawler_mode_str(Some("auto"), 20).unwrap_err();
|
||||||
|
let msg = format!("{err}");
|
||||||
|
assert!(
|
||||||
|
msg.contains("daemon"),
|
||||||
|
"rejection should point operator at the daemon: {msg}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_mode_rejects_unknown_value() {
|
||||||
|
let err = parse_crawler_mode_str(Some("garbage"), 20).unwrap_err();
|
||||||
|
let msg = format!("{err}");
|
||||||
|
assert!(msg.contains("backfill"));
|
||||||
|
assert!(msg.contains("incremental"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cli_mode_is_case_insensitive_and_trims() {
|
||||||
|
let mixed = parse_crawler_mode_str(Some(" Incremental "), 4).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
mixed,
|
||||||
|
DiscoverMode::Incremental { stop_after_unchanged: 4 }
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,17 @@ use chrono::NaiveTime;
|
|||||||
use chrono_tz::Tz;
|
use chrono_tz::Tz;
|
||||||
|
|
||||||
use crate::crawler::browser::LaunchOptions;
|
use crate::crawler::browser::LaunchOptions;
|
||||||
|
use crate::crawler::safety::{DownloadAllowlist, DEFAULT_MAX_IMAGE_BYTES};
|
||||||
|
use crate::crawler::source::DiscoverMode;
|
||||||
|
|
||||||
|
/// What `CRAWLER_MODE` was set to. `Auto` is the daemon's default —
|
||||||
|
/// pick Backfill until `seed_completed_at` is written, then flip to
|
||||||
|
/// Incremental. `Explicit` forces a single mode regardless.
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub enum CrawlerModePref {
|
||||||
|
Auto,
|
||||||
|
Explicit(DiscoverMode),
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct AuthConfig {
|
pub struct AuthConfig {
|
||||||
@@ -77,6 +88,19 @@ pub struct CrawlerConfig {
|
|||||||
pub user_agent: Option<String>,
|
pub user_agent: Option<String>,
|
||||||
pub proxy: Option<String>,
|
pub proxy: Option<String>,
|
||||||
pub browser: LaunchOptions,
|
pub browser: LaunchOptions,
|
||||||
|
/// Mode preference for the metadata pass. Daemon default is `Auto`
|
||||||
|
/// (Backfill until `seed_completed_at` is written, then Incremental).
|
||||||
|
pub mode: CrawlerModePref,
|
||||||
|
/// `stop_after_unchanged` threshold supplied to Incremental in both
|
||||||
|
/// `Auto` (post-seed) and `Explicit(Incremental)` modes.
|
||||||
|
pub incremental_stop_after: usize,
|
||||||
|
/// Hosts the crawler is allowed to download images / covers from.
|
||||||
|
/// Always seeded with the host of `start_url` and (when set) the
|
||||||
|
/// configured `cdn_host`. Additional hosts can be added via
|
||||||
|
/// `CRAWLER_DOWNLOAD_ALLOWLIST` (comma-separated).
|
||||||
|
pub download_allowlist: DownloadAllowlist,
|
||||||
|
/// Hard upper bound on a single image download. Defaults to 32 MiB.
|
||||||
|
pub max_image_bytes: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for CrawlerConfig {
|
impl Default for CrawlerConfig {
|
||||||
@@ -97,6 +121,10 @@ impl Default for CrawlerConfig {
|
|||||||
user_agent: None,
|
user_agent: None,
|
||||||
proxy: None,
|
proxy: None,
|
||||||
browser: LaunchOptions::headless(),
|
browser: LaunchOptions::headless(),
|
||||||
|
mode: CrawlerModePref::Auto,
|
||||||
|
incremental_stop_after: 20,
|
||||||
|
download_allowlist: DownloadAllowlist::new(),
|
||||||
|
max_image_bytes: DEFAULT_MAX_IMAGE_BYTES,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -151,6 +179,17 @@ impl CrawlerConfig {
|
|||||||
.parse()
|
.parse()
|
||||||
.map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?,
|
.map_err(|e| anyhow::anyhow!("CRAWLER_TZ must be a valid IANA TZ (got {raw:?}): {e}"))?,
|
||||||
};
|
};
|
||||||
|
let incremental_stop_after =
|
||||||
|
env_u64("CRAWLER_INCREMENTAL_STOP_AFTER", 20).max(1) as usize;
|
||||||
|
let mode = parse_mode_env(incremental_stop_after)?;
|
||||||
|
let start_url = std::env::var("CRAWLER_START_URL")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty());
|
||||||
|
let cdn_host = std::env::var("CRAWLER_CDN_HOST")
|
||||||
|
.ok()
|
||||||
|
.filter(|s| !s.trim().is_empty());
|
||||||
|
let download_allowlist =
|
||||||
|
build_download_allowlist(start_url.as_deref(), cdn_host.as_deref());
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
daemon_enabled: env_bool("CRAWLER_DAEMON", true),
|
daemon_enabled: env_bool("CRAWLER_DAEMON", true),
|
||||||
daily_at,
|
daily_at,
|
||||||
@@ -158,13 +197,9 @@ impl CrawlerConfig {
|
|||||||
idle_timeout: Duration::from_secs(env_u64("CRAWLER_IDLE_TIMEOUT_S", 600)),
|
idle_timeout: Duration::from_secs(env_u64("CRAWLER_IDLE_TIMEOUT_S", 600)),
|
||||||
chapter_workers: env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize,
|
chapter_workers: env_u64("CRAWLER_CHAPTER_WORKERS", 1).max(1) as usize,
|
||||||
retention_days: env_u64("CRAWLER_JOB_RETENTION_DAYS", 7) as u32,
|
retention_days: env_u64("CRAWLER_JOB_RETENTION_DAYS", 7) as u32,
|
||||||
start_url: std::env::var("CRAWLER_START_URL")
|
start_url,
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty()),
|
|
||||||
rate_ms: env_u64("CRAWLER_RATE_MS", 1000),
|
rate_ms: env_u64("CRAWLER_RATE_MS", 1000),
|
||||||
cdn_host: std::env::var("CRAWLER_CDN_HOST")
|
cdn_host,
|
||||||
.ok()
|
|
||||||
.filter(|s| !s.trim().is_empty()),
|
|
||||||
cdn_rate_ms: env_u64("CRAWLER_CDN_RATE_MS", env_u64("CRAWLER_RATE_MS", 1000)),
|
cdn_rate_ms: env_u64("CRAWLER_CDN_RATE_MS", env_u64("CRAWLER_RATE_MS", 1000)),
|
||||||
phpsessid: std::env::var("CRAWLER_PHPSESSID")
|
phpsessid: std::env::var("CRAWLER_PHPSESSID")
|
||||||
.ok()
|
.ok()
|
||||||
@@ -179,10 +214,73 @@ impl CrawlerConfig {
|
|||||||
.ok()
|
.ok()
|
||||||
.filter(|s| !s.trim().is_empty()),
|
.filter(|s| !s.trim().is_empty()),
|
||||||
browser: LaunchOptions::from_env(),
|
browser: LaunchOptions::from_env(),
|
||||||
|
mode,
|
||||||
|
incremental_stop_after,
|
||||||
|
download_allowlist,
|
||||||
|
max_image_bytes: env_usize("CRAWLER_MAX_IMAGE_BYTES", DEFAULT_MAX_IMAGE_BYTES),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build the download allowlist from env. Always includes
|
||||||
|
/// `CRAWLER_START_URL`'s host (so the crawler can fetch covers from
|
||||||
|
/// the catalog itself) and `CRAWLER_CDN_HOST` when set. Additional
|
||||||
|
/// hosts can be supplied via `CRAWLER_DOWNLOAD_ALLOWLIST` (comma-
|
||||||
|
/// separated). Empty by default — meaning the crawler refuses to
|
||||||
|
/// download anything when no source is configured, which is the safe
|
||||||
|
/// fail-closed posture.
|
||||||
|
fn build_download_allowlist(
|
||||||
|
start_url: Option<&str>,
|
||||||
|
cdn_host: Option<&str>,
|
||||||
|
) -> DownloadAllowlist {
|
||||||
|
let mut allow = DownloadAllowlist::new();
|
||||||
|
if let Some(url) = start_url {
|
||||||
|
if let Ok(parsed) = reqwest::Url::parse(url) {
|
||||||
|
if let Some(h) = parsed.host_str() {
|
||||||
|
allow = allow.allow(h);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(host) = cdn_host {
|
||||||
|
allow = allow.allow(host);
|
||||||
|
}
|
||||||
|
if let Ok(extras) = std::env::var("CRAWLER_DOWNLOAD_ALLOWLIST") {
|
||||||
|
for piece in extras.split(',') {
|
||||||
|
let trimmed = piece.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
allow = allow.allow(trimmed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
allow
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse `CRAWLER_MODE`. Empty/unset → `Auto`. Recognized values are
|
||||||
|
/// `auto`, `backfill`, and `incremental` (case-insensitive). Anything
|
||||||
|
/// else is a hard error so a typo can't silently fall through to the
|
||||||
|
/// default and mask itself.
|
||||||
|
fn parse_mode_env(incremental_stop_after: usize) -> anyhow::Result<CrawlerModePref> {
|
||||||
|
parse_mode_str(std::env::var("CRAWLER_MODE").ok().as_deref(), incremental_stop_after)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pure variant of [`parse_mode_env`] — testable without env-var
|
||||||
|
/// mutation. Takes the raw value (or `None` if unset).
|
||||||
|
pub(crate) fn parse_mode_str(
|
||||||
|
raw: Option<&str>,
|
||||||
|
incremental_stop_after: usize,
|
||||||
|
) -> anyhow::Result<CrawlerModePref> {
|
||||||
|
match raw.map(|s| s.trim().to_ascii_lowercase()).as_deref() {
|
||||||
|
None | Some("") | Some("auto") => Ok(CrawlerModePref::Auto),
|
||||||
|
Some("backfill") => Ok(CrawlerModePref::Explicit(DiscoverMode::Backfill)),
|
||||||
|
Some("incremental") => Ok(CrawlerModePref::Explicit(DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: incremental_stop_after,
|
||||||
|
})),
|
||||||
|
Some(other) => Err(anyhow::anyhow!(
|
||||||
|
"CRAWLER_MODE must be one of: auto, backfill, incremental (got {other:?})"
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn env_u64(name: &str, default: u64) -> u64 {
|
fn env_u64(name: &str, default: u64) -> u64 {
|
||||||
std::env::var(name)
|
std::env::var(name)
|
||||||
.ok()
|
.ok()
|
||||||
@@ -211,3 +309,63 @@ fn env_usize(name: &str, default: usize) -> usize {
|
|||||||
.and_then(|s| s.parse().ok())
|
.and_then(|s| s.parse().ok())
|
||||||
.unwrap_or(default)
|
.unwrap_or(default)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_mode_str_defaults_to_auto_when_unset_or_blank() {
|
||||||
|
let none = parse_mode_str(None, 20).unwrap();
|
||||||
|
assert!(matches!(none, CrawlerModePref::Auto));
|
||||||
|
let blank = parse_mode_str(Some(""), 20).unwrap();
|
||||||
|
assert!(matches!(blank, CrawlerModePref::Auto));
|
||||||
|
let whitespace = parse_mode_str(Some(" "), 20).unwrap();
|
||||||
|
assert!(matches!(whitespace, CrawlerModePref::Auto));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_mode_str_recognizes_each_keyword() {
|
||||||
|
let auto = parse_mode_str(Some("auto"), 20).unwrap();
|
||||||
|
assert!(matches!(auto, CrawlerModePref::Auto));
|
||||||
|
|
||||||
|
let backfill = parse_mode_str(Some("backfill"), 20).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
backfill,
|
||||||
|
CrawlerModePref::Explicit(DiscoverMode::Backfill)
|
||||||
|
));
|
||||||
|
|
||||||
|
let incremental = parse_mode_str(Some("incremental"), 7).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
incremental,
|
||||||
|
CrawlerModePref::Explicit(DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: 7
|
||||||
|
})
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_mode_str_is_case_insensitive_and_trims_whitespace() {
|
||||||
|
let mixed = parse_mode_str(Some(" Incremental "), 5).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
mixed,
|
||||||
|
CrawlerModePref::Explicit(DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: 5
|
||||||
|
})
|
||||||
|
));
|
||||||
|
let upper = parse_mode_str(Some("BACKFILL"), 5).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
upper,
|
||||||
|
CrawlerModePref::Explicit(DiscoverMode::Backfill)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_mode_str_hard_errors_on_unknown_value() {
|
||||||
|
let err = parse_mode_str(Some("backfil"), 20).unwrap_err();
|
||||||
|
let msg = format!("{err}");
|
||||||
|
assert!(msg.contains("backfill"), "error should list valid values: {msg}");
|
||||||
|
assert!(msg.contains("auto"));
|
||||||
|
assert!(msg.contains("incremental"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -18,7 +18,8 @@ use uuid::Uuid;
|
|||||||
|
|
||||||
use crate::crawler::detect::PageError;
|
use crate::crawler::detect::PageError;
|
||||||
use crate::crawler::rate_limit::HostRateLimiters;
|
use crate::crawler::rate_limit::HostRateLimiters;
|
||||||
use crate::crawler::session;
|
use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist};
|
||||||
|
use crate::crawler::session::{self, ChapterProbe};
|
||||||
use crate::storage::Storage;
|
use crate::storage::Storage;
|
||||||
|
|
||||||
/// Parse the chapter page DOM and return the page images in `pageN`
|
/// Parse the chapter page DOM and return the page images in `pageN`
|
||||||
@@ -88,6 +89,8 @@ pub async fn sync_chapter_content(
|
|||||||
manga_id: Uuid,
|
manga_id: Uuid,
|
||||||
source_url: &str,
|
source_url: &str,
|
||||||
force_refetch: bool,
|
force_refetch: bool,
|
||||||
|
allowlist: &DownloadAllowlist,
|
||||||
|
max_image_bytes: usize,
|
||||||
) -> anyhow::Result<SyncOutcome> {
|
) -> anyhow::Result<SyncOutcome> {
|
||||||
// Skip if already fetched, unless caller explicitly forces.
|
// Skip if already fetched, unless caller explicitly forces.
|
||||||
if !force_refetch {
|
if !force_refetch {
|
||||||
@@ -110,16 +113,28 @@ pub async fn sync_chapter_content(
|
|||||||
.with_context(|| format!("open chapter page {source_url}"))?;
|
.with_context(|| format!("open chapter page {source_url}"))?;
|
||||||
page.wait_for_navigation().await.context("wait for chapter nav")?;
|
page.wait_for_navigation().await.context("wait for chapter nav")?;
|
||||||
|
|
||||||
// Session probe: avatar present == still logged in. Missing means
|
|
||||||
// PHPSESSID expired; bail the entire crawler run.
|
|
||||||
if page.find_element("#avatar_menu").await.is_err() {
|
|
||||||
page.close().await.ok();
|
|
||||||
return Ok(SyncOutcome::SessionExpired);
|
|
||||||
}
|
|
||||||
|
|
||||||
let html = page.content().await.context("read chapter html")?;
|
let html = page.content().await.context("read chapter html")?;
|
||||||
page.close().await.ok();
|
page.close().await.ok();
|
||||||
|
|
||||||
|
// Three-way session classification: distinguishes a transient
|
||||||
|
// hiccup (broken-page body or logged-in-but-no-reader) from a
|
||||||
|
// genuine PHPSESSID expiry (no reader and no avatar widget). The
|
||||||
|
// earlier binary `#avatar_menu` check conflated both and froze
|
||||||
|
// every worker on a layout shift.
|
||||||
|
match session::classify_chapter_probe(&html) {
|
||||||
|
ChapterProbe::Unauthenticated => return Ok(SyncOutcome::SessionExpired),
|
||||||
|
ChapterProbe::Transient => {
|
||||||
|
// Surface as a typed Err so the dispatcher path runs
|
||||||
|
// ack_failed with exponential backoff (rather than the
|
||||||
|
// session-expired sticky flag).
|
||||||
|
anyhow::bail!(
|
||||||
|
"chapter page at {source_url} returned a transient response \
|
||||||
|
(broken-page body or reader didn't render); will retry"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
ChapterProbe::Ok => {}
|
||||||
|
}
|
||||||
|
|
||||||
let images = parse_chapter_pages(&html)
|
let images = parse_chapter_pages(&html)
|
||||||
.with_context(|| format!("parse chapter pages at {source_url}"))?;
|
.with_context(|| format!("parse chapter pages at {source_url}"))?;
|
||||||
if images.is_empty() {
|
if images.is_empty() {
|
||||||
@@ -138,18 +153,29 @@ pub async fn sync_chapter_content(
|
|||||||
format!("join image URL {} onto {source_url}", img.url)
|
format!("join image URL {} onto {source_url}", img.url)
|
||||||
})?;
|
})?;
|
||||||
rate.wait_for(url.as_str()).await?;
|
rate.wait_for(url.as_str()).await?;
|
||||||
let resp = http
|
let bytes = fetch_bytes_capped(
|
||||||
.get(url.clone())
|
http,
|
||||||
// Source CDNs commonly check Referer. Set it to the
|
url.as_str(),
|
||||||
// chapter page — matches what the browser would send.
|
Some(source_url),
|
||||||
.header(reqwest::header::REFERER, source_url)
|
allowlist,
|
||||||
.send()
|
max_image_bytes,
|
||||||
.await
|
)
|
||||||
.with_context(|| format!("GET {url}"))?
|
.await?
|
||||||
.error_for_status()
|
.to_vec();
|
||||||
.with_context(|| format!("non-2xx for {url}"))?;
|
// Reject any non-image response: the only valid output of an
|
||||||
let bytes = resp.bytes().await.context("read image body")?.to_vec();
|
// image URL is an image. `infer` returns None on truncated
|
||||||
let ext = infer::get(&bytes).map(|k| k.extension()).unwrap_or("bin");
|
// bytes too, which also wants to be a failure not a silent
|
||||||
|
// `.bin` extension.
|
||||||
|
if !looks_like_image(&bytes) {
|
||||||
|
anyhow::bail!(
|
||||||
|
"image URL {url} returned non-image bytes \
|
||||||
|
(first 16: {:?}); refusing to store as binary blob",
|
||||||
|
&bytes.get(..16.min(bytes.len()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let ext = infer::get(&bytes)
|
||||||
|
.map(|k| k.extension())
|
||||||
|
.expect("looks_like_image asserted infer succeeded");
|
||||||
fetched.push((img.page_number, bytes, ext));
|
fetched.push((img.page_number, bytes, ext));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,8 +220,9 @@ pub async fn sync_chapter_content(
|
|||||||
Ok(SyncOutcome::Fetched { pages: fetched.len() })
|
Ok(SyncOutcome::Fetched { pages: fetched.len() })
|
||||||
}
|
}
|
||||||
|
|
||||||
// Suppress unused-import warning for `session` until the bin/crawler
|
// Suppress unused-import warning for `session::registrable_domain`
|
||||||
// wiring lands in this branch and uses it through this module.
|
// until the bin/crawler wiring lands in this branch and uses it
|
||||||
|
// through this module.
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
fn _keep_session_in_scope() {
|
fn _keep_session_in_scope() {
|
||||||
let _ = session::registrable_domain;
|
let _ = session::registrable_domain;
|
||||||
|
|||||||
@@ -22,5 +22,6 @@ pub mod diff;
|
|||||||
pub mod jobs;
|
pub mod jobs;
|
||||||
pub mod pipeline;
|
pub mod pipeline;
|
||||||
pub mod rate_limit;
|
pub mod rate_limit;
|
||||||
|
pub mod safety;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
pub mod source;
|
pub mod source;
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ use uuid::Uuid;
|
|||||||
use crate::crawler::browser_manager::BrowserManager;
|
use crate::crawler::browser_manager::BrowserManager;
|
||||||
use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
|
use crate::crawler::jobs::{self, EnqueueResult, JobPayload};
|
||||||
use crate::crawler::rate_limit::HostRateLimiters;
|
use crate::crawler::rate_limit::HostRateLimiters;
|
||||||
|
use crate::crawler::safety::{fetch_bytes_capped, looks_like_image, DownloadAllowlist};
|
||||||
use crate::crawler::source::target::TargetSource;
|
use crate::crawler::source::target::TargetSource;
|
||||||
use crate::crawler::source::{DiscoverMode, FetchContext, Source};
|
use crate::crawler::source::{DiscoverMode, FetchContext, Source};
|
||||||
use crate::repo;
|
use crate::repo;
|
||||||
@@ -23,14 +24,34 @@ pub struct MetadataStats {
|
|||||||
pub mangas_failed: usize,
|
pub mangas_failed: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Decide whether the per-ref loop should stop based on the Incremental
|
||||||
|
/// streak counter. Pulled out as a pure function so the rule is unit-
|
||||||
|
/// testable without standing up the walker or DB.
|
||||||
|
pub(crate) fn should_stop(mode: DiscoverMode, consecutive_unchanged: usize) -> bool {
|
||||||
|
match mode {
|
||||||
|
DiscoverMode::Backfill => false,
|
||||||
|
DiscoverMode::Incremental { stop_after_unchanged } => {
|
||||||
|
consecutive_unchanged >= stop_after_unchanged
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
|
/// Runs the discover → fetch → upsert → cover → chapter-list-diff pipeline
|
||||||
/// for the target source. Pure metadata; chapter content is enqueued as
|
/// for the target source. Pure metadata; chapter content is enqueued as
|
||||||
/// separate `SyncChapterContent` jobs by the caller after this returns.
|
/// separate `SyncChapterContent` jobs by the caller after this returns.
|
||||||
///
|
///
|
||||||
/// `limit == 0` means no cap (full backfill). `skip_chapters == true` is
|
/// `limit == 0` means no cap (full sweep up to the source's own bound).
|
||||||
/// the "metadata-only" mode (parser doesn't extract chapters, and
|
/// `skip_chapters == true` is the "metadata-only" mode (parser doesn't
|
||||||
/// `sync_manga_chapters` is skipped — otherwise an empty chapter list
|
/// extract chapters, and `sync_manga_chapters` is skipped — otherwise an
|
||||||
/// would soft-drop existing rows).
|
/// empty chapter list would soft-drop existing rows).
|
||||||
|
///
|
||||||
|
/// `mode` controls the walk:
|
||||||
|
/// - `Backfill` — oldest-first, no early exit. The only mode that runs
|
||||||
|
/// the end-of-walk drop pass + writes `seed_completed_at`.
|
||||||
|
/// - `Incremental { stop_after_unchanged }` — newest-first, breaks out
|
||||||
|
/// after N consecutive Unchanged upserts. Drop pass is skipped (the
|
||||||
|
/// tail of the index is never visited, so its `last_seen_at` is
|
||||||
|
/// stale and using it to soft-drop would be unsafe).
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub async fn run_metadata_pass(
|
pub async fn run_metadata_pass(
|
||||||
browser_manager: &BrowserManager,
|
browser_manager: &BrowserManager,
|
||||||
@@ -41,6 +62,9 @@ pub async fn run_metadata_pass(
|
|||||||
start_url: &str,
|
start_url: &str,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
skip_chapters: bool,
|
skip_chapters: bool,
|
||||||
|
mode: DiscoverMode,
|
||||||
|
allowlist: &DownloadAllowlist,
|
||||||
|
max_image_bytes: usize,
|
||||||
) -> anyhow::Result<MetadataStats> {
|
) -> anyhow::Result<MetadataStats> {
|
||||||
let lease = browser_manager
|
let lease = browser_manager
|
||||||
.acquire()
|
.acquire()
|
||||||
@@ -74,123 +98,191 @@ pub async fn run_metadata_pass(
|
|||||||
let run_started_at = chrono::Utc::now();
|
let run_started_at = chrono::Utc::now();
|
||||||
let max_refs = (limit > 0).then_some(limit);
|
let max_refs = (limit > 0).then_some(limit);
|
||||||
|
|
||||||
tracing::info!(?max_refs, "discovering manga list");
|
tracing::info!(?mode, ?max_refs, "starting metadata pass");
|
||||||
let refs = source
|
let mut walker = source
|
||||||
.discover(&ctx, DiscoverMode::Backfill, max_refs)
|
.discover(&ctx, mode)
|
||||||
.await
|
.await
|
||||||
.context("discover failed")?;
|
.context("discover failed")?;
|
||||||
tracing::info!(count = refs.len(), "discovered manga list");
|
|
||||||
|
|
||||||
let mut stats = MetadataStats {
|
let mut stats = MetadataStats::default();
|
||||||
discovered: refs.len(),
|
let mut consecutive_unchanged: usize = 0;
|
||||||
..MetadataStats::default()
|
let mut walked_to_completion = false;
|
||||||
};
|
let mut hit_limit = false;
|
||||||
|
let mut hit_incremental_stop = false;
|
||||||
|
|
||||||
for (i, r) in refs.iter().enumerate() {
|
'outer: loop {
|
||||||
tracing::info!(
|
let batch = match walker.next_batch(&ctx).await? {
|
||||||
idx = i + 1,
|
Some(b) => b,
|
||||||
total = stats.discovered,
|
None => {
|
||||||
key = %r.source_manga_key,
|
walked_to_completion = true;
|
||||||
"fetching metadata"
|
break;
|
||||||
);
|
|
||||||
let manga = match source.fetch_manga(&ctx, r).await {
|
|
||||||
Ok(m) => m,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(
|
|
||||||
key = %r.source_manga_key,
|
|
||||||
url = %r.url,
|
|
||||||
error = ?e,
|
|
||||||
"fetch_manga failed"
|
|
||||||
);
|
|
||||||
stats.mangas_failed += 1;
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
for r in batch {
|
||||||
let upsert = match repo::crawler::upsert_manga_from_source(db, source_id, &r.url, &manga)
|
if max_refs.map(|m| stats.discovered >= m).unwrap_or(false) {
|
||||||
.await
|
hit_limit = true;
|
||||||
{
|
tracing::info!(cap = ?max_refs, "max_results reached; halting walk");
|
||||||
Ok(u) => u,
|
break 'outer;
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(
|
|
||||||
key = %r.source_manga_key,
|
|
||||||
error = ?e,
|
|
||||||
"upsert_manga_from_source failed"
|
|
||||||
);
|
|
||||||
stats.mangas_failed += 1;
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
};
|
stats.discovered += 1;
|
||||||
stats.upserted += 1;
|
tracing::info!(
|
||||||
tracing::info!(
|
idx = stats.discovered,
|
||||||
key = %manga.source_manga_key,
|
key = %r.source_manga_key,
|
||||||
manga_id = %upsert.manga_id,
|
"fetching metadata"
|
||||||
status = ?upsert.status,
|
);
|
||||||
title = %manga.title,
|
let manga = match source.fetch_manga(&ctx, &r).await {
|
||||||
"manga upserted"
|
Ok(m) => m,
|
||||||
);
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
// Cover image: download when missing in storage or when metadata
|
key = %r.source_manga_key,
|
||||||
// signaled an update (cover URL is part of metadata_hash, so
|
url = %r.url,
|
||||||
// Updated implies the URL may have moved). Failures are non-fatal.
|
|
||||||
let needs_cover = upsert.cover_image_path.is_none()
|
|
||||||
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
|
|
||||||
if needs_cover {
|
|
||||||
if let Some(cover_url) = manga.cover_url.as_deref() {
|
|
||||||
match download_and_store_cover(
|
|
||||||
db,
|
|
||||||
storage,
|
|
||||||
http,
|
|
||||||
rate,
|
|
||||||
&r.url,
|
|
||||||
upsert.manga_id,
|
|
||||||
cover_url,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => stats.covers_fetched += 1,
|
|
||||||
Err(e) => tracing::warn!(
|
|
||||||
manga_id = %upsert.manga_id,
|
|
||||||
error = ?e,
|
error = ?e,
|
||||||
"cover download failed"
|
"fetch_manga failed"
|
||||||
),
|
);
|
||||||
|
stats.mangas_failed += 1;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
}
|
|
||||||
|
|
||||||
if !skip_chapters {
|
let upsert = match repo::crawler::upsert_manga_from_source(
|
||||||
match repo::crawler::sync_manga_chapters(
|
db, source_id, &r.url, &manga,
|
||||||
db,
|
|
||||||
source_id,
|
|
||||||
upsert.manga_id,
|
|
||||||
&manga.chapters,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(diff) => tracing::info!(
|
Ok(u) => u,
|
||||||
manga_id = %upsert.manga_id,
|
Err(e) => {
|
||||||
new = diff.new,
|
tracing::error!(
|
||||||
refreshed = diff.refreshed,
|
key = %r.source_manga_key,
|
||||||
dropped = diff.dropped,
|
error = ?e,
|
||||||
"chapters synced"
|
"upsert_manga_from_source failed"
|
||||||
),
|
);
|
||||||
Err(e) => tracing::warn!(
|
stats.mangas_failed += 1;
|
||||||
manga_id = %upsert.manga_id,
|
continue;
|
||||||
error = ?e,
|
}
|
||||||
"chapter sync failed"
|
};
|
||||||
),
|
stats.upserted += 1;
|
||||||
|
tracing::info!(
|
||||||
|
key = %manga.source_manga_key,
|
||||||
|
manga_id = %upsert.manga_id,
|
||||||
|
status = ?upsert.status,
|
||||||
|
title = %manga.title,
|
||||||
|
"manga upserted"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Cover image: download when missing in storage or when metadata
|
||||||
|
// signaled an update (cover URL is part of metadata_hash, so
|
||||||
|
// Updated implies the URL may have moved). Failures are non-fatal.
|
||||||
|
let needs_cover = upsert.cover_image_path.is_none()
|
||||||
|
|| matches!(upsert.status, repo::crawler::UpsertStatus::Updated);
|
||||||
|
if needs_cover {
|
||||||
|
if let Some(cover_url) = manga.cover_url.as_deref() {
|
||||||
|
match download_and_store_cover(
|
||||||
|
db,
|
||||||
|
storage,
|
||||||
|
http,
|
||||||
|
rate,
|
||||||
|
&r.url,
|
||||||
|
upsert.manga_id,
|
||||||
|
cover_url,
|
||||||
|
allowlist,
|
||||||
|
max_image_bytes,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => stats.covers_fetched += 1,
|
||||||
|
Err(e) => tracing::warn!(
|
||||||
|
manga_id = %upsert.manga_id,
|
||||||
|
error = ?e,
|
||||||
|
"cover download failed"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !skip_chapters {
|
||||||
|
match repo::crawler::sync_manga_chapters(
|
||||||
|
db,
|
||||||
|
source_id,
|
||||||
|
upsert.manga_id,
|
||||||
|
&manga.chapters,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(diff) => tracing::info!(
|
||||||
|
manga_id = %upsert.manga_id,
|
||||||
|
new = diff.new,
|
||||||
|
refreshed = diff.refreshed,
|
||||||
|
dropped = diff.dropped,
|
||||||
|
"chapters synced"
|
||||||
|
),
|
||||||
|
Err(e) => tracing::warn!(
|
||||||
|
manga_id = %upsert.manga_id,
|
||||||
|
error = ?e,
|
||||||
|
"chapter sync failed"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Incremental stop: count consecutive Unchanged upserts and
|
||||||
|
// bail once the threshold is reached. New/Updated resets the
|
||||||
|
// streak so a fresh entry mid-batch doesn't accidentally trip
|
||||||
|
// the cutoff.
|
||||||
|
match upsert.status {
|
||||||
|
repo::crawler::UpsertStatus::Unchanged => {
|
||||||
|
consecutive_unchanged += 1;
|
||||||
|
}
|
||||||
|
repo::crawler::UpsertStatus::New | repo::crawler::UpsertStatus::Updated => {
|
||||||
|
consecutive_unchanged = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if should_stop(mode, consecutive_unchanged) {
|
||||||
|
hit_incremental_stop = true;
|
||||||
|
tracing::info!(
|
||||||
|
consecutive_unchanged,
|
||||||
|
"incremental stop threshold reached; halting walk"
|
||||||
|
);
|
||||||
|
break 'outer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if limit == 0 {
|
// Drop pass: only when the walk truly covered everything the source
|
||||||
|
// surfaces. `last_seen_at` on un-visited rows is stale, so running
|
||||||
|
// the drop on a partial walk would soft-drop the tail of the index.
|
||||||
|
let full_walk = walked_to_completion && !hit_limit && !hit_incremental_stop;
|
||||||
|
let backfill_complete = full_walk && matches!(mode, DiscoverMode::Backfill);
|
||||||
|
if full_walk {
|
||||||
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
|
match repo::crawler::mark_dropped_mangas(db, source_id, run_started_at).await {
|
||||||
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
|
Ok(n) => tracing::info!(dropped = n, "marked unseen manga as dropped"),
|
||||||
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
|
Err(e) => tracing::warn!(error = ?e, "drop-pass failed"),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::info!(limit, "partial sync — skipping drop pass");
|
tracing::info!(
|
||||||
|
?mode,
|
||||||
|
hit_limit,
|
||||||
|
hit_incremental_stop,
|
||||||
|
"partial sync — skipping drop pass"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
if backfill_complete {
|
||||||
|
if let Err(e) = repo::crawler::mark_seed_completed(db, source_id, run_started_at).await {
|
||||||
|
tracing::warn!(error = ?e, "mark_seed_completed failed");
|
||||||
|
} else {
|
||||||
|
tracing::info!(source_id, "seed marked complete");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
?mode,
|
||||||
|
discovered = stats.discovered,
|
||||||
|
upserted = stats.upserted,
|
||||||
|
covers_fetched = stats.covers_fetched,
|
||||||
|
mangas_failed = stats.mangas_failed,
|
||||||
|
walked_to_completion,
|
||||||
|
hit_limit,
|
||||||
|
hit_incremental_stop,
|
||||||
|
"metadata pass complete"
|
||||||
|
);
|
||||||
|
|
||||||
drop(lease);
|
drop(lease);
|
||||||
Ok(stats)
|
Ok(stats)
|
||||||
@@ -295,6 +387,7 @@ pub struct EnqueueSummary {
|
|||||||
/// pipeline because the CLI still calls it from its inline chapter-content
|
/// pipeline because the CLI still calls it from its inline chapter-content
|
||||||
/// loop; once the worker pool fully replaces that path we can fold this
|
/// loop; once the worker pool fully replaces that path we can fold this
|
||||||
/// into `pipeline` proper.
|
/// into `pipeline` proper.
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn download_and_store_cover(
|
async fn download_and_store_cover(
|
||||||
db: &PgPool,
|
db: &PgPool,
|
||||||
storage: &dyn Storage,
|
storage: &dyn Storage,
|
||||||
@@ -303,6 +396,8 @@ async fn download_and_store_cover(
|
|||||||
manga_url: &str,
|
manga_url: &str,
|
||||||
manga_id: Uuid,
|
manga_id: Uuid,
|
||||||
cover_url: &str,
|
cover_url: &str,
|
||||||
|
allowlist: &DownloadAllowlist,
|
||||||
|
max_image_bytes: usize,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let absolute = reqwest::Url::parse(manga_url)
|
let absolute = reqwest::Url::parse(manga_url)
|
||||||
.context("parse manga URL")?
|
.context("parse manga URL")?
|
||||||
@@ -310,17 +405,22 @@ async fn download_and_store_cover(
|
|||||||
.context("join cover URL onto manga URL")?;
|
.context("join cover URL onto manga URL")?;
|
||||||
|
|
||||||
rate.wait_for(absolute.as_str()).await?;
|
rate.wait_for(absolute.as_str()).await?;
|
||||||
let resp = http
|
let bytes = fetch_bytes_capped(
|
||||||
.get(absolute.clone())
|
http,
|
||||||
.header(reqwest::header::REFERER, manga_url)
|
absolute.as_str(),
|
||||||
.send()
|
Some(manga_url),
|
||||||
.await
|
allowlist,
|
||||||
.with_context(|| format!("GET {absolute}"))?
|
max_image_bytes,
|
||||||
.error_for_status()
|
)
|
||||||
.with_context(|| format!("non-2xx for {absolute}"))?;
|
.await?;
|
||||||
let bytes = resp.bytes().await.context("read cover body")?;
|
if !looks_like_image(&bytes) {
|
||||||
let kind = infer::get(&bytes);
|
anyhow::bail!(
|
||||||
let ext = kind.map(|k| k.extension()).unwrap_or("bin");
|
"cover URL {absolute} returned non-image bytes; refusing to store as binary blob"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
let ext = infer::get(&bytes)
|
||||||
|
.map(|k| k.extension())
|
||||||
|
.expect("looks_like_image asserted infer succeeded");
|
||||||
let key = format!("mangas/{manga_id}/cover.{ext}");
|
let key = format!("mangas/{manga_id}/cover.{ext}");
|
||||||
|
|
||||||
storage
|
storage
|
||||||
@@ -345,3 +445,36 @@ fn origin_of(url: &str) -> Option<String> {
|
|||||||
let host = rest.split('/').next()?;
|
let host = rest.split('/').next()?;
|
||||||
Some(format!("{scheme}://{host}"))
|
Some(format!("{scheme}://{host}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn backfill_never_stops_regardless_of_streak() {
|
||||||
|
assert!(!should_stop(DiscoverMode::Backfill, 0));
|
||||||
|
assert!(!should_stop(DiscoverMode::Backfill, 100));
|
||||||
|
assert!(!should_stop(DiscoverMode::Backfill, usize::MAX));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn incremental_stops_when_streak_meets_threshold() {
|
||||||
|
let mode = DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: 3,
|
||||||
|
};
|
||||||
|
assert!(!should_stop(mode, 0));
|
||||||
|
assert!(!should_stop(mode, 2));
|
||||||
|
assert!(should_stop(mode, 3), "stops at exactly the threshold");
|
||||||
|
assert!(should_stop(mode, 100), "stops at anything past threshold");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn incremental_with_zero_threshold_stops_immediately() {
|
||||||
|
// A nonsensical config (no Unchanged needed to stop) shouldn't
|
||||||
|
// panic — it just means the very first ref triggers the bail.
|
||||||
|
let mode = DiscoverMode::Incremental {
|
||||||
|
stop_after_unchanged: 0,
|
||||||
|
};
|
||||||
|
assert!(should_stop(mode, 0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
486
backend/src/crawler/safety.rs
Normal file
486
backend/src/crawler/safety.rs
Normal file
@@ -0,0 +1,486 @@
|
|||||||
|
//! Defensive helpers for the image-download paths.
|
||||||
|
//!
|
||||||
|
//! Two threats this module addresses:
|
||||||
|
//!
|
||||||
|
//! - **SSRF**: a scraped chapter or manga page can embed an absolute
|
||||||
|
//! `<img src="http://10.0.0.1/...">`. The crawler runs inside the
|
||||||
|
//! backend container with intra-compose access to `postgres:5432`
|
||||||
|
//! and possibly other internal services; without a host check the
|
||||||
|
//! crawler would happily probe them. [`is_safe_url`] rejects
|
||||||
|
//! anything whose host isn't on the operator-configured allowlist,
|
||||||
|
//! plus any IP literal in RFC1918 / loopback / link-local / unique-
|
||||||
|
//! local space (including IPv4-mapped IPv6 like `::ffff:127.0.0.1`)
|
||||||
|
//! as a second defence for the case where an allowlisted hostname's
|
||||||
|
//! DNS happens to resolve to a literal private address.
|
||||||
|
//!
|
||||||
|
//! **DNS rebinding is not covered.** A hostname like `cdn.allowed.com`
|
||||||
|
//! that *resolves* to `127.0.0.1` via hostile DNS bypasses the IP
|
||||||
|
//! check entirely — `is_safe_url` only inspects URL strings, not
|
||||||
|
//! resolved IPs. Mitigating that requires a custom reqwest resolver
|
||||||
|
//! that filters IPs after DNS, which would mean rebuilding reqwest's
|
||||||
|
//! connector. The allowlist + good operator DNS hygiene is the
|
||||||
|
//! realistic mitigation today.
|
||||||
|
//!
|
||||||
|
//! - **Unbounded download**: `Response::bytes().await` reads the full
|
||||||
|
//! body before returning. A malicious source serving a 10 GiB image
|
||||||
|
//! would fill memory and then disk. [`accumulate_capped`] streams
|
||||||
|
//! the body chunk-by-chunk into a [`bytes::BytesMut`] and bails as
|
||||||
|
//! soon as the running total exceeds the cap.
|
||||||
|
//!
|
||||||
|
//! Both helpers are pure-data: the SSRF check is keyed off a parsed
|
||||||
|
//! URL string, and the byte accumulator is keyed off a generic stream.
|
||||||
|
//! Easy to unit-test without a live network or browser.
|
||||||
|
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
use anyhow::{bail, Context};
|
||||||
|
use bytes::BytesMut;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use reqwest::Url;
|
||||||
|
|
||||||
|
/// Default per-image download cap. A page image is generally <2 MiB;
|
||||||
|
/// 32 MiB leaves headroom for high-resolution covers while still
|
||||||
|
/// stopping a misbehaving CDN dead. Override via `CRAWLER_MAX_IMAGE_BYTES`.
|
||||||
|
pub const DEFAULT_MAX_IMAGE_BYTES: usize = 32 * 1024 * 1024;
|
||||||
|
|
||||||
|
/// Hosts that are always allowed in addition to the operator's
|
||||||
|
/// configured allowlist. None by default — keeping the surface area
|
||||||
|
/// minimal so the only way a URL gets through is if it matches an
|
||||||
|
/// explicit catalog/CDN entry.
|
||||||
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct DownloadAllowlist {
|
||||||
|
hosts: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DownloadAllowlist {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { hosts: Vec::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a host (case-insensitive match). Sub-domains are *not*
|
||||||
|
/// implied: pass `cdn.example.com` and `example.com` separately
|
||||||
|
/// if both should be reachable.
|
||||||
|
pub fn allow(mut self, host: impl Into<String>) -> Self {
|
||||||
|
let h = host.into().to_ascii_lowercase();
|
||||||
|
if !h.is_empty() && !self.hosts.iter().any(|existing| existing == &h) {
|
||||||
|
self.hosts.push(h);
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.hosts.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, host: &str) -> bool {
|
||||||
|
let lower = host.to_ascii_lowercase();
|
||||||
|
self.hosts.iter().any(|h| h == &lower)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify a URL is safe for the crawler to fetch.
|
||||||
|
///
|
||||||
|
/// Rejects:
|
||||||
|
/// - non-http(s) schemes (file://, gopher://, …),
|
||||||
|
/// - any IP literal in private / loopback / link-local / unique-local
|
||||||
|
/// space (defense in depth — a DNS allowlist alone wouldn't cover an
|
||||||
|
/// attacker that places an entry like `cdn.evil` pointing at
|
||||||
|
/// `192.168.1.1`),
|
||||||
|
/// - the literal hostname `localhost`,
|
||||||
|
/// - hosts that aren't on the supplied allowlist.
|
||||||
|
///
|
||||||
|
/// An empty allowlist rejects everything (the conservative default —
|
||||||
|
/// callers must explicitly allow the catalog and CDN hosts).
|
||||||
|
pub fn is_safe_url(raw_url: &str, allow: &DownloadAllowlist) -> Result<(), UrlSafetyError> {
|
||||||
|
let url = Url::parse(raw_url).map_err(|_| UrlSafetyError::Unparseable)?;
|
||||||
|
let scheme = url.scheme();
|
||||||
|
if scheme != "http" && scheme != "https" {
|
||||||
|
return Err(UrlSafetyError::BadScheme(scheme.to_string()));
|
||||||
|
}
|
||||||
|
let host = url.host_str().ok_or(UrlSafetyError::NoHost)?;
|
||||||
|
let lower_host = host.to_ascii_lowercase();
|
||||||
|
if lower_host == "localhost" {
|
||||||
|
return Err(UrlSafetyError::Loopback);
|
||||||
|
}
|
||||||
|
// Reject IP literals in private/loopback ranges regardless of the
|
||||||
|
// allowlist — if someone puts an IP literal on the allowlist they
|
||||||
|
// almost certainly didn't mean a private range.
|
||||||
|
// reqwest::Url normalises IPv6 literals as `[::1]` (brackets
|
||||||
|
// included) in `host_str()`. Strip the brackets before parsing.
|
||||||
|
let ip_candidate = lower_host
|
||||||
|
.strip_prefix('[')
|
||||||
|
.and_then(|s| s.strip_suffix(']'))
|
||||||
|
.unwrap_or(&lower_host);
|
||||||
|
if let Ok(ip) = ip_candidate.parse::<IpAddr>() {
|
||||||
|
if is_private_ip(&ip) {
|
||||||
|
return Err(UrlSafetyError::PrivateIp(ip));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !allow.contains(&lower_host) {
|
||||||
|
return Err(UrlSafetyError::HostNotAllowed(lower_host));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_private_ip(ip: &IpAddr) -> bool {
|
||||||
|
match ip {
|
||||||
|
IpAddr::V4(v4) => {
|
||||||
|
v4.is_loopback()
|
||||||
|
|| v4.is_private()
|
||||||
|
|| v4.is_link_local()
|
||||||
|
|| v4.is_unspecified()
|
||||||
|
|| v4.is_broadcast()
|
||||||
|
// CGNAT 100.64.0.0/10
|
||||||
|
|| (v4.octets()[0] == 100 && (v4.octets()[1] & 0xC0) == 64)
|
||||||
|
// 169.254/16 link-local already covered, but 0.0.0.0/8 is special-use
|
||||||
|
|| v4.octets()[0] == 0
|
||||||
|
}
|
||||||
|
IpAddr::V6(v6) => {
|
||||||
|
// IPv4-mapped IPv6 (::ffff:0:0/96): unwrap to the embedded
|
||||||
|
// IPv4 and recurse so `::ffff:127.0.0.1` is caught by the
|
||||||
|
// IPv4 loopback check rather than passing through.
|
||||||
|
// `Ipv6Addr::is_loopback()` only matches `::1` exactly.
|
||||||
|
if let Some(v4) = v6.to_ipv4_mapped() {
|
||||||
|
return is_private_ip(&IpAddr::V4(v4));
|
||||||
|
}
|
||||||
|
v6.is_loopback()
|
||||||
|
|| v6.is_unspecified()
|
||||||
|
// fc00::/7 unique-local
|
||||||
|
|| (v6.segments()[0] & 0xfe00) == 0xfc00
|
||||||
|
// fe80::/10 link-local
|
||||||
|
|| (v6.segments()[0] & 0xffc0) == 0xfe80
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||||
|
pub enum UrlSafetyError {
|
||||||
|
#[error("URL is not parseable")]
|
||||||
|
Unparseable,
|
||||||
|
#[error("scheme {0:?} is not http or https")]
|
||||||
|
BadScheme(String),
|
||||||
|
#[error("URL is missing a host")]
|
||||||
|
NoHost,
|
||||||
|
#[error("host points at the loopback interface")]
|
||||||
|
Loopback,
|
||||||
|
#[error("host is a private/internal IP: {0}")]
|
||||||
|
PrivateIp(IpAddr),
|
||||||
|
#[error("host {0:?} is not on the crawler download allowlist")]
|
||||||
|
HostNotAllowed(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drain a byte stream into a single buffer, bailing out as soon as
|
||||||
|
/// the running total exceeds `max_bytes`. Generic over the stream so
|
||||||
|
/// it's testable without a live HTTP response.
|
||||||
|
pub async fn accumulate_capped<S, E>(stream: S, max_bytes: usize) -> anyhow::Result<bytes::Bytes>
|
||||||
|
where
|
||||||
|
S: futures_core::Stream<Item = Result<bytes::Bytes, E>>,
|
||||||
|
E: std::error::Error + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
let mut stream = std::pin::pin!(stream);
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
let chunk = chunk.map_err(|e| anyhow::anyhow!("stream chunk: {e}"))?;
|
||||||
|
if buf.len().saturating_add(chunk.len()) > max_bytes {
|
||||||
|
bail!(
|
||||||
|
"response exceeds {max_bytes}-byte cap (received >{}+{})",
|
||||||
|
buf.len(),
|
||||||
|
chunk.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
buf.extend_from_slice(&chunk);
|
||||||
|
}
|
||||||
|
Ok(buf.freeze())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send `req` and stream the response into a length-limited buffer.
|
||||||
|
/// Combines [`is_safe_url`] check + [`accumulate_capped`] so each
|
||||||
|
/// call-site is one line.
|
||||||
|
pub async fn fetch_bytes_capped(
|
||||||
|
http: &reqwest::Client,
|
||||||
|
url: &str,
|
||||||
|
referer: Option<&str>,
|
||||||
|
allow: &DownloadAllowlist,
|
||||||
|
max_bytes: usize,
|
||||||
|
) -> anyhow::Result<bytes::Bytes> {
|
||||||
|
is_safe_url(url, allow).with_context(|| format!("reject unsafe URL {url}"))?;
|
||||||
|
let mut req = http.get(url);
|
||||||
|
if let Some(r) = referer {
|
||||||
|
req = req.header(reqwest::header::REFERER, r);
|
||||||
|
}
|
||||||
|
let resp = req
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("GET {url}"))?
|
||||||
|
.error_for_status()
|
||||||
|
.with_context(|| format!("non-2xx for {url}"))?;
|
||||||
|
accumulate_capped(resp.bytes_stream(), max_bytes)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("download body for {url}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// True when `bytes` sniffs as one of the *renderable* image formats
|
||||||
|
/// the `/files/*key` endpoint can serve with a correct Content-Type:
|
||||||
|
/// JPEG, PNG, WebP, GIF, AVIF. Matches the upload pipeline's
|
||||||
|
/// whitelist in `upload::parse_image`.
|
||||||
|
///
|
||||||
|
/// `infer::MatcherType::Image` is intentionally NOT used — it also
|
||||||
|
/// matches BMP, TIFF, HEIF, ICO, PSD, and JP2. Those would sniff as
|
||||||
|
/// "image" here but [`api::files::content_type_for`] would fall back
|
||||||
|
/// to `application/octet-stream`, prompting browsers to download
|
||||||
|
/// instead of render. Keep the two layers aligned.
|
||||||
|
pub fn looks_like_image(bytes: &[u8]) -> bool {
|
||||||
|
matches!(
|
||||||
|
infer::get(bytes).map(|k| k.mime_type()),
|
||||||
|
Some("image/jpeg" | "image/png" | "image/webp" | "image/gif" | "image/avif")
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use futures_util::stream;
|
||||||
|
|
||||||
|
fn allow_just(host: &str) -> DownloadAllowlist {
|
||||||
|
DownloadAllowlist::new().allow(host)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_allows_listed_host() {
|
||||||
|
let allow = allow_just("cdn.example.com");
|
||||||
|
assert!(is_safe_url("https://cdn.example.com/img.jpg", &allow).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_unlisted_host() {
|
||||||
|
let allow = allow_just("cdn.example.com");
|
||||||
|
let err = is_safe_url("https://evil.example.org/img.jpg", &allow).unwrap_err();
|
||||||
|
assert!(matches!(err, UrlSafetyError::HostNotAllowed(h) if h == "evil.example.org"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_localhost_even_if_allowlisted() {
|
||||||
|
let allow = allow_just("localhost");
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("http://localhost:8080/", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::Loopback
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_loopback_ipv4() {
|
||||||
|
let allow = allow_just("127.0.0.1");
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("http://127.0.0.1/", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::PrivateIp(_)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_rfc1918() {
|
||||||
|
let allow = allow_just("10.0.0.1");
|
||||||
|
for url in [
|
||||||
|
"http://10.0.0.1/",
|
||||||
|
"http://192.168.1.1/",
|
||||||
|
"http://172.16.0.5/",
|
||||||
|
"http://172.31.255.255/",
|
||||||
|
] {
|
||||||
|
assert!(
|
||||||
|
matches!(
|
||||||
|
is_safe_url(url, &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::PrivateIp(_)
|
||||||
|
),
|
||||||
|
"should reject {url}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_link_local() {
|
||||||
|
let allow = allow_just("169.254.169.254");
|
||||||
|
// 169.254.169.254 is the AWS/GCP metadata service — the most
|
||||||
|
// dangerous SSRF target on a default cloud VM.
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("http://169.254.169.254/", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::PrivateIp(_)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_ipv6_loopback_and_ula() {
|
||||||
|
// Debug what host_str returns first — reqwest::Url normalises
|
||||||
|
// IPv6 literals as `[::1]` with brackets, which doesn't parse
|
||||||
|
// as `IpAddr` directly. The implementation strips them.
|
||||||
|
let allow = allow_just("[::1]");
|
||||||
|
let err = is_safe_url("http://[::1]/", &allow).unwrap_err();
|
||||||
|
assert!(
|
||||||
|
matches!(err, UrlSafetyError::PrivateIp(_)),
|
||||||
|
"expected PrivateIp, got {err:?}"
|
||||||
|
);
|
||||||
|
let allow = allow_just("[fd00::1]");
|
||||||
|
let err = is_safe_url("http://[fd00::1]/", &allow).unwrap_err();
|
||||||
|
assert!(
|
||||||
|
matches!(err, UrlSafetyError::PrivateIp(_)),
|
||||||
|
"expected PrivateIp, got {err:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_ipv4_mapped_ipv6_loopback() {
|
||||||
|
// `Ipv6Addr::is_loopback()` only matches `::1` exactly, so
|
||||||
|
// `::ffff:127.0.0.1` would slip through without the
|
||||||
|
// to_ipv4_mapped() unwrap in is_private_ip.
|
||||||
|
let allow = allow_just("[::ffff:127.0.0.1]");
|
||||||
|
let err = is_safe_url("http://[::ffff:127.0.0.1]/", &allow).unwrap_err();
|
||||||
|
assert!(
|
||||||
|
matches!(err, UrlSafetyError::PrivateIp(_)),
|
||||||
|
"expected PrivateIp, got {err:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_ipv4_mapped_ipv6_rfc1918() {
|
||||||
|
let allow = allow_just("[::ffff:10.0.0.1]");
|
||||||
|
let err = is_safe_url("http://[::ffff:10.0.0.1]/", &allow).unwrap_err();
|
||||||
|
assert!(matches!(err, UrlSafetyError::PrivateIp(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_blocks_non_http_schemes() {
|
||||||
|
let allow = allow_just("anywhere");
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("file:///etc/passwd", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::BadScheme(_)
|
||||||
|
));
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("gopher://anywhere:70/", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::BadScheme(_)
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_rejects_unparseable() {
|
||||||
|
let allow = allow_just("anywhere");
|
||||||
|
assert!(matches!(
|
||||||
|
is_safe_url("not a url", &allow).unwrap_err(),
|
||||||
|
UrlSafetyError::Unparseable
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn safe_url_empty_allowlist_rejects_everything() {
|
||||||
|
let allow = DownloadAllowlist::new();
|
||||||
|
let err = is_safe_url("https://cdn.example.com/img.jpg", &allow).unwrap_err();
|
||||||
|
assert!(matches!(err, UrlSafetyError::HostNotAllowed(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn allowlist_matches_case_insensitively() {
|
||||||
|
let allow = DownloadAllowlist::new().allow("CDN.Example.COM");
|
||||||
|
assert!(is_safe_url("https://cdn.example.com/x.jpg", &allow).is_ok());
|
||||||
|
assert!(is_safe_url("https://CDN.EXAMPLE.com/x.jpg", &allow).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn accumulate_capped_returns_full_body_under_cap() {
|
||||||
|
let chunks: Vec<Result<bytes::Bytes, std::io::Error>> = vec![
|
||||||
|
Ok(bytes::Bytes::from_static(b"hello ")),
|
||||||
|
Ok(bytes::Bytes::from_static(b"world")),
|
||||||
|
];
|
||||||
|
let s = stream::iter(chunks);
|
||||||
|
let out = accumulate_capped(s, 100).await.unwrap();
|
||||||
|
assert_eq!(out.as_ref(), b"hello world");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn accumulate_capped_bails_past_cap() {
|
||||||
|
let chunks: Vec<Result<bytes::Bytes, std::io::Error>> = vec![
|
||||||
|
Ok(bytes::Bytes::from(vec![0u8; 50])),
|
||||||
|
Ok(bytes::Bytes::from(vec![0u8; 60])),
|
||||||
|
];
|
||||||
|
let s = stream::iter(chunks);
|
||||||
|
let err = accumulate_capped(s, 100).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("100-byte cap"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn accumulate_capped_surfaces_stream_errors() {
|
||||||
|
let chunks: Vec<Result<bytes::Bytes, std::io::Error>> = vec![
|
||||||
|
Ok(bytes::Bytes::from_static(b"ok")),
|
||||||
|
Err(std::io::Error::other("network blip")),
|
||||||
|
];
|
||||||
|
let s = stream::iter(chunks);
|
||||||
|
let err = accumulate_capped(s, 100).await.unwrap_err();
|
||||||
|
assert!(err.to_string().contains("network blip"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_accepts_jpeg() {
|
||||||
|
// JPEG SOI + APP0 segment.
|
||||||
|
let jpeg = [0xff, 0xd8, 0xff, 0xe0, 0, 0x10, b'J', b'F', b'I', b'F'];
|
||||||
|
assert!(looks_like_image(&jpeg));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_accepts_png() {
|
||||||
|
let png = [0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0, 0, 0, 0];
|
||||||
|
assert!(looks_like_image(&png));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_rejects_html_disguised_as_image() {
|
||||||
|
let html = b"<html><body>not an image</body></html>";
|
||||||
|
assert!(!looks_like_image(html));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_rejects_empty() {
|
||||||
|
assert!(!looks_like_image(&[]));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_rejects_renderable_but_unsupported_formats() {
|
||||||
|
// BMP, TIFF, ICO, PSD are `infer::MatcherType::Image` but the
|
||||||
|
// /files/*key handler doesn't have Content-Type mappings for
|
||||||
|
// them, so they'd be served as application/octet-stream and
|
||||||
|
// download instead of render. Reject at the crawler so we
|
||||||
|
// never land them in storage.
|
||||||
|
// BMP magic: "BM" + 4-byte size.
|
||||||
|
let bmp = [b'B', b'M', 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
|
||||||
|
assert!(!looks_like_image(&bmp), "BMP must be rejected (not renderable by /files)");
|
||||||
|
|
||||||
|
// TIFF little-endian magic: "II" + 42.
|
||||||
|
let tiff = [0x49, 0x49, 0x2a, 0x00, 0, 0, 0, 0];
|
||||||
|
assert!(!looks_like_image(&tiff), "TIFF must be rejected");
|
||||||
|
|
||||||
|
// ICO magic: 0x00,0x00,0x01,0x00.
|
||||||
|
let ico = [0x00, 0x00, 0x01, 0x00, 1, 0, 16, 16, 0, 0, 1, 0, 0x18, 0, 0x40, 0, 0, 0, 0x16, 0, 0, 0];
|
||||||
|
assert!(!looks_like_image(&ico), "ICO must be rejected");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn looks_like_image_accepts_webp_gif_avif() {
|
||||||
|
// Cover the three remaining whitelisted formats so a future
|
||||||
|
// tightening that drops one would fail noisily.
|
||||||
|
let webp = [
|
||||||
|
b'R', b'I', b'F', b'F',
|
||||||
|
0, 0, 0, 0,
|
||||||
|
b'W', b'E', b'B', b'P',
|
||||||
|
b'V', b'P', b'8', b' ',
|
||||||
|
];
|
||||||
|
assert!(looks_like_image(&webp));
|
||||||
|
|
||||||
|
let gif = [b'G', b'I', b'F', b'8', b'7', b'a', 0, 0, 0, 0];
|
||||||
|
assert!(looks_like_image(&gif));
|
||||||
|
|
||||||
|
let avif = [
|
||||||
|
0x00, 0x00, 0x00, 0x18,
|
||||||
|
b'f', b't', b'y', b'p',
|
||||||
|
b'a', b'v', b'i', b'f',
|
||||||
|
0x00, 0x00, 0x00, 0x00,
|
||||||
|
b'm', b'i', b'f', b'1',
|
||||||
|
b'a', b'v', b'i', b'f',
|
||||||
|
];
|
||||||
|
assert!(looks_like_image(&avif));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -127,6 +127,54 @@ pub fn classify_probe(html: &str) -> SessionProbe {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Three-way classification of a chapter page response.
|
||||||
|
///
|
||||||
|
/// Reader pages don't render `#logo`, so [`classify_probe`] can't be
|
||||||
|
/// reused as-is. The chapter-specific marker is `a#pic_container`
|
||||||
|
/// (asserted by the reader-page parser at `parse_chapter_pages`).
|
||||||
|
///
|
||||||
|
/// Order matters: broken-page body wins over selector matches, so a
|
||||||
|
/// transient site-wide 5xx that happens to render the avatar widget
|
||||||
|
/// elsewhere doesn't falsely reach `Ok`.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum ChapterProbe {
|
||||||
|
/// `a#pic_container` present — reader rendered. Whether
|
||||||
|
/// `#avatar_menu` is also there is informational; if the reader
|
||||||
|
/// loaded the session is by definition still good.
|
||||||
|
Ok,
|
||||||
|
/// Site rendered a "logged out" or "please log in" page (no
|
||||||
|
/// reader, no broken-page body, and no avatar widget either).
|
||||||
|
/// Distinguishes the genuine expired-session case from a
|
||||||
|
/// transient site hiccup.
|
||||||
|
Unauthenticated,
|
||||||
|
/// Broken-page body, or reader didn't render but the user is
|
||||||
|
/// still logged in (avatar widget present). Caller should retry
|
||||||
|
/// rather than blame the session.
|
||||||
|
Transient,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn classify_chapter_probe(html: &str) -> ChapterProbe {
|
||||||
|
if is_broken_page_body(html) {
|
||||||
|
return ChapterProbe::Transient;
|
||||||
|
}
|
||||||
|
let doc = scraper::Html::parse_document(html);
|
||||||
|
let container = scraper::Selector::parse("a#pic_container").unwrap();
|
||||||
|
if doc.select(&container).next().is_some() {
|
||||||
|
return ChapterProbe::Ok;
|
||||||
|
}
|
||||||
|
let avatar = scraper::Selector::parse("#avatar_menu").unwrap();
|
||||||
|
if doc.select(&avatar).next().is_some() {
|
||||||
|
// Logged-in user, but the reader didn't render — most likely
|
||||||
|
// the layout shifted or the site is serving an interstitial.
|
||||||
|
ChapterProbe::Transient
|
||||||
|
} else {
|
||||||
|
// No reader, no avatar, no broken-body marker — site rendered
|
||||||
|
// the "please log in" page, which is the genuine session-
|
||||||
|
// expired signal on this route.
|
||||||
|
ChapterProbe::Unauthenticated
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// In-startup retry budget for the session probe. Small but non-zero —
|
/// In-startup retry budget for the session probe. Small but non-zero —
|
||||||
/// startup hitting a 5-second site hiccup shouldn't fail the operator
|
/// startup hitting a 5-second site hiccup shouldn't fail the operator
|
||||||
/// with "PHPSESSID expired" when the session is actually fine.
|
/// with "PHPSESSID expired" when the session is actually fine.
|
||||||
@@ -273,6 +321,73 @@ mod tests {
|
|||||||
assert_eq!(classify_probe(""), SessionProbe::Transient);
|
assert_eq!(classify_probe(""), SessionProbe::Transient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn classify_chapter_probe_ok_when_reader_rendered() {
|
||||||
|
let html = r#"
|
||||||
|
<html><body>
|
||||||
|
<a id="pic_container">
|
||||||
|
<img id="page1" src="https://cdn/1.jpg">
|
||||||
|
</a>
|
||||||
|
</body></html>
|
||||||
|
"#;
|
||||||
|
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn classify_chapter_probe_unauthenticated_when_no_reader_and_no_avatar() {
|
||||||
|
// What a logged-out hit on a chapter URL renders: a normal
|
||||||
|
// site layout (header etc.) with a "please log in" body, but
|
||||||
|
// no reader and no avatar widget.
|
||||||
|
let html = r#"
|
||||||
|
<html><body>
|
||||||
|
<header><div id="logo">Catalog</div></header>
|
||||||
|
<main>Please log in to read this chapter.</main>
|
||||||
|
</body></html>
|
||||||
|
"#;
|
||||||
|
assert_eq!(
|
||||||
|
classify_chapter_probe(html),
|
||||||
|
ChapterProbe::Unauthenticated
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn classify_chapter_probe_transient_when_logged_in_but_reader_missing() {
|
||||||
|
// Avatar shows the session is still valid; reader didn't
|
||||||
|
// render — site is serving an interstitial or the layout
|
||||||
|
// momentarily shifted. Retry, don't blame the session.
|
||||||
|
let html = r#"
|
||||||
|
<html><body>
|
||||||
|
<header><div id="logo">Catalog</div><div id="avatar_menu"></div></header>
|
||||||
|
<main>Site maintenance — back in 5 minutes.</main>
|
||||||
|
</body></html>
|
||||||
|
"#;
|
||||||
|
assert_eq!(classify_chapter_probe(html), ChapterProbe::Transient);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn classify_chapter_probe_transient_on_broken_page_body() {
|
||||||
|
let html =
|
||||||
|
"<html><body><p>we're sorry, the request file are not found.</p></body></html>";
|
||||||
|
assert_eq!(classify_chapter_probe(html), ChapterProbe::Transient);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn classify_chapter_probe_does_not_misfire_on_avatar_alone_without_reader() {
|
||||||
|
// Regression for the original bug: the binary
|
||||||
|
// find_element("#avatar_menu") check treated "no avatar" as
|
||||||
|
// session-expired even when a transient hiccup was the real
|
||||||
|
// cause. classify_chapter_probe must NOT trip on that pattern
|
||||||
|
// when pic_container *is* present.
|
||||||
|
let html = r#"
|
||||||
|
<html><body>
|
||||||
|
<a id="pic_container">
|
||||||
|
<img id="page1" src="https://cdn/1.jpg">
|
||||||
|
</a>
|
||||||
|
</body></html>
|
||||||
|
"#;
|
||||||
|
assert_eq!(classify_chapter_probe(html), ChapterProbe::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
|
fn classify_probe_trusts_broken_body_over_stray_avatar_match() {
|
||||||
// Defensive: if a broken-page body somehow contains an
|
// Defensive: if a broken-page body somehow contains an
|
||||||
|
|||||||
@@ -82,21 +82,42 @@ pub struct FetchContext<'a> {
|
|||||||
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
pub rate: &'a crate::crawler::rate_limit::HostRateLimiters,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Lazy iterator over discovered manga refs. The caller drives the
|
||||||
|
/// walk one batch at a time, so it can break out as soon as a
|
||||||
|
/// downstream stop condition is met (e.g. N consecutive Unchanged
|
||||||
|
/// upserts in Incremental mode) without paying for pages it won't use.
|
||||||
|
///
|
||||||
|
/// Batches are typically one source-index page each. Within a batch
|
||||||
|
/// refs are already in the right per-page order for the active mode
|
||||||
|
/// (Backfill reverses each page to oldest-first; Incremental leaves
|
||||||
|
/// the source's natural newest-first ordering).
|
||||||
|
#[async_trait]
|
||||||
|
pub trait DiscoverWalk: Send {
|
||||||
|
/// Return the next batch of refs, or `Ok(None)` when the source has
|
||||||
|
/// no more pages. The walker is single-use; calling `next_batch`
|
||||||
|
/// after `None` is allowed and continues to return `None`.
|
||||||
|
async fn next_batch(
|
||||||
|
&mut self,
|
||||||
|
ctx: &FetchContext<'_>,
|
||||||
|
) -> anyhow::Result<Option<Vec<SourceMangaRef>>>;
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait Source: Send + Sync {
|
pub trait Source: Send + Sync {
|
||||||
/// Stable identifier — also the row key in the `sources` table.
|
/// Stable identifier — also the row key in the `sources` table.
|
||||||
fn id(&self) -> &'static str;
|
fn id(&self) -> &'static str;
|
||||||
|
|
||||||
/// Returns up to `max_results` manga refs in source order. Pass
|
/// Begin discovery in `mode`. Returns a walker the caller drives
|
||||||
/// `None` for an uncapped walk (full backfill / incremental sweep).
|
/// page-by-page via `next_batch`. The initial page-1 probe (used
|
||||||
/// Implementations should stop paginating as soon as the cap is
|
/// to determine `last_page` and warm the cache for sites that
|
||||||
/// reached so partial runs don't pay for pages they won't use.
|
/// can't be paged without knowing the bound) happens inside this
|
||||||
|
/// call, so a fresh walker is ready to yield its first batch
|
||||||
|
/// without further setup.
|
||||||
async fn discover(
|
async fn discover(
|
||||||
&self,
|
&self,
|
||||||
ctx: &FetchContext<'_>,
|
ctx: &FetchContext<'_>,
|
||||||
mode: DiscoverMode,
|
mode: DiscoverMode,
|
||||||
max_results: Option<usize>,
|
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>>;
|
||||||
) -> anyhow::Result<Vec<SourceMangaRef>>;
|
|
||||||
|
|
||||||
async fn fetch_manga(
|
async fn fetch_manga(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
//! (`td:has(label:contains("Author:"))`) are implemented by walking
|
//! (`td:has(label:contains("Author:"))`) are implemented by walking
|
||||||
//! the parsed tree.
|
//! the parsed tree.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
@@ -14,13 +15,18 @@ use async_trait::async_trait;
|
|||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
DiscoverMode, FetchContext, Source, SourceChapter, SourceChapterRef, SourceManga,
|
DiscoverMode, DiscoverWalk, FetchContext, Source, SourceChapter, SourceChapterRef,
|
||||||
SourceMangaRef,
|
SourceManga, SourceMangaRef,
|
||||||
};
|
};
|
||||||
use crate::crawler::detect::{
|
use crate::crawler::detect::{
|
||||||
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
|
has_logo_sentinel, is_broken_page_body, retry_on_transient, PageError,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// `sources.id` value for this Source impl. Exposed as a const so the
|
||||||
|
/// daemon can look up per-source state (e.g. `seed_completed_at`)
|
||||||
|
/// before constructing the Source itself.
|
||||||
|
pub const SOURCE_ID: &str = "target";
|
||||||
|
|
||||||
/// In-loop retry budget for transient pages encountered during a single
|
/// In-loop retry budget for transient pages encountered during a single
|
||||||
/// `discover` walk. Bounded small because the job system itself retries
|
/// `discover` walk. Bounded small because the job system itself retries
|
||||||
/// the whole `Discover` job on failure — these inline retries only need
|
/// the whole `Discover` job on failure — these inline retries only need
|
||||||
@@ -60,15 +66,14 @@ impl TargetSource {
|
|||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl Source for TargetSource {
|
impl Source for TargetSource {
|
||||||
fn id(&self) -> &'static str {
|
fn id(&self) -> &'static str {
|
||||||
"target"
|
SOURCE_ID
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn discover(
|
async fn discover(
|
||||||
&self,
|
&self,
|
||||||
ctx: &FetchContext<'_>,
|
ctx: &FetchContext<'_>,
|
||||||
mode: DiscoverMode,
|
mode: DiscoverMode,
|
||||||
max_results: Option<usize>,
|
) -> anyhow::Result<Box<dyn DiscoverWalk + Send>> {
|
||||||
) -> anyhow::Result<Vec<SourceMangaRef>> {
|
|
||||||
// Always visit page 1 first because that's the only way to
|
// Always visit page 1 first because that's the only way to
|
||||||
// discover `last_page`. Retry it on transient — a broken first
|
// discover `last_page`. Retry it on transient — a broken first
|
||||||
// page would otherwise abort the whole walk before we've even
|
// page would otherwise abort the whole walk before we've even
|
||||||
@@ -85,15 +90,7 @@ impl Source for TargetSource {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let backfill = matches!(mode, DiscoverMode::Backfill);
|
let backfill = matches!(mode, DiscoverMode::Backfill);
|
||||||
let order: Vec<i32> = match (last_page, backfill) {
|
let order = build_page_order(last_page, backfill);
|
||||||
(None, _) => vec![1],
|
|
||||||
// Backfill = oldest-first: walk pages last → 1, then
|
|
||||||
// reverse within each page (the listing is update_date
|
|
||||||
// DESC, so the bottom of the last page is the oldest
|
|
||||||
// entry the source still surfaces).
|
|
||||||
(Some(last), true) => (1..=last).rev().collect(),
|
|
||||||
(Some(last), false) => (1..=last).collect(),
|
|
||||||
};
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
?mode,
|
?mode,
|
||||||
last_page = ?last_page,
|
last_page = ?last_page,
|
||||||
@@ -101,40 +98,12 @@ impl Source for TargetSource {
|
|||||||
"walking pagination"
|
"walking pagination"
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut all = Vec::new();
|
Ok(Box::new(TargetSourceWalker {
|
||||||
for page_num in order {
|
base_url: self.base_url.clone(),
|
||||||
// Page 1 is already cached from the last_page probe — reuse
|
backfill,
|
||||||
// it rather than navigating twice. Every other page goes
|
pages_remaining: order,
|
||||||
// through the retry helper so a single broken page mid-walk
|
first_page_html: Some(first_html),
|
||||||
// doesn't silently drop its mangas from the result.
|
}))
|
||||||
let mut page_refs = if page_num == 1 {
|
|
||||||
let doc = scraper::Html::parse_document(&first_html);
|
|
||||||
parse_manga_list_from(&doc)?
|
|
||||||
} else {
|
|
||||||
retry_on_transient(
|
|
||||||
|| async {
|
|
||||||
let url = page_url(&self.base_url, page_num);
|
|
||||||
let html = navigate(ctx, &url).await?;
|
|
||||||
let doc = scraper::Html::parse_document(&html);
|
|
||||||
parse_manga_list_from(&doc)
|
|
||||||
},
|
|
||||||
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
|
||||||
PAGE_TRANSIENT_RETRY_DELAY,
|
|
||||||
)
|
|
||||||
.await?
|
|
||||||
};
|
|
||||||
if backfill {
|
|
||||||
page_refs.reverse();
|
|
||||||
}
|
|
||||||
tracing::info!(page_num, count = page_refs.len(), "page walked");
|
|
||||||
all.extend(page_refs);
|
|
||||||
if cap_reached(&all, max_results) {
|
|
||||||
tracing::info!(cap = ?max_results, "max_results reached; halting pagination");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(truncate_to_cap(all, max_results))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn fetch_manga(
|
async fn fetch_manga(
|
||||||
@@ -168,15 +137,81 @@ impl Source for TargetSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cap_reached<T>(buf: &[T], max: Option<usize>) -> bool {
|
/// Build the queue of page numbers `TargetSource::discover` will walk.
|
||||||
matches!(max, Some(m) if buf.len() >= m)
|
/// Backfill is oldest-first: pages `last..=1` (within each page the
|
||||||
|
/// walker reverses entries, since the source orders by update_date
|
||||||
|
/// DESC). Incremental is newest-first: pages `1..=last` in natural
|
||||||
|
/// order. If `last_page` is unknown (source surfaces no pagination)
|
||||||
|
/// only page 1 is visited.
|
||||||
|
fn build_page_order(last_page: Option<i32>, backfill: bool) -> VecDeque<i32> {
|
||||||
|
match (last_page, backfill) {
|
||||||
|
(None, _) => VecDeque::from([1]),
|
||||||
|
(Some(last), true) => (1..=last).rev().collect(),
|
||||||
|
(Some(last), false) => (1..=last).collect(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn truncate_to_cap<T>(mut buf: Vec<T>, max: Option<usize>) -> Vec<T> {
|
/// Walker returned by [`TargetSource::discover`]. Pops one source-index
|
||||||
if let Some(m) = max {
|
/// page per `next_batch` call. Page 1's HTML is cached at construction
|
||||||
buf.truncate(m);
|
/// time (the discover call needed it to read `last_page` anyway) so the
|
||||||
|
/// batch covering page 1 doesn't re-fetch.
|
||||||
|
struct TargetSourceWalker {
|
||||||
|
base_url: String,
|
||||||
|
backfill: bool,
|
||||||
|
pages_remaining: VecDeque<i32>,
|
||||||
|
first_page_html: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl DiscoverWalk for TargetSourceWalker {
|
||||||
|
async fn next_batch(
|
||||||
|
&mut self,
|
||||||
|
ctx: &FetchContext<'_>,
|
||||||
|
) -> anyhow::Result<Option<Vec<SourceMangaRef>>> {
|
||||||
|
let Some(page_num) = self.pages_remaining.pop_front() else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
let mut page_refs = if page_num == 1 {
|
||||||
|
// Reuse the cached page-1 HTML from the initial probe. Take
|
||||||
|
// it (rather than clone) so a malformed page-order queue
|
||||||
|
// that re-visits page 1 still falls back to a real fetch.
|
||||||
|
match self.first_page_html.take() {
|
||||||
|
Some(html) => {
|
||||||
|
let doc = scraper::Html::parse_document(&html);
|
||||||
|
parse_manga_list_from(&doc)?
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
retry_on_transient(
|
||||||
|
|| async {
|
||||||
|
let html = navigate(ctx, self.base_url.as_str()).await?;
|
||||||
|
let doc = scraper::Html::parse_document(&html);
|
||||||
|
parse_manga_list_from(&doc)
|
||||||
|
},
|
||||||
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
retry_on_transient(
|
||||||
|
|| async {
|
||||||
|
let url = page_url(&self.base_url, page_num);
|
||||||
|
let html = navigate(ctx, &url).await?;
|
||||||
|
let doc = scraper::Html::parse_document(&html);
|
||||||
|
parse_manga_list_from(&doc)
|
||||||
|
},
|
||||||
|
PAGE_TRANSIENT_RETRY_ATTEMPTS,
|
||||||
|
PAGE_TRANSIENT_RETRY_DELAY,
|
||||||
|
)
|
||||||
|
.await?
|
||||||
|
};
|
||||||
|
if self.backfill {
|
||||||
|
page_refs.reverse();
|
||||||
|
}
|
||||||
|
tracing::info!(page_num, count = page_refs.len(), "page walked");
|
||||||
|
Ok(Some(page_refs))
|
||||||
}
|
}
|
||||||
buf
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Single point of rate-limited navigation. Every Source request goes
|
/// Single point of rate-limited navigation. Every Source request goes
|
||||||
@@ -922,4 +957,37 @@ mod tests {
|
|||||||
let err = parse_manga_detail(html, "x", true).expect_err("expected Transient");
|
let err = parse_manga_detail(html, "x", true).expect_err("expected Transient");
|
||||||
assert!(err.is_transient(), "got non-transient: {err}");
|
assert!(err.is_transient(), "got non-transient: {err}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_backfill_is_last_to_one() {
|
||||||
|
// Backfill walks pages oldest-first: queue is [last, last-1, ..., 1]
|
||||||
|
// so popping from the front yields the last page first.
|
||||||
|
let order = build_page_order(Some(3), true);
|
||||||
|
assert_eq!(Vec::from(order), vec![3, 2, 1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_incremental_is_one_to_last() {
|
||||||
|
// Incremental walks newest-first in natural source order.
|
||||||
|
let order = build_page_order(Some(3), false);
|
||||||
|
assert_eq!(Vec::from(order), vec![1, 2, 3]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_falls_back_to_page_one_only_without_pagination() {
|
||||||
|
let backfill = build_page_order(None, true);
|
||||||
|
assert_eq!(Vec::from(backfill), vec![1]);
|
||||||
|
let incremental = build_page_order(None, false);
|
||||||
|
assert_eq!(Vec::from(incremental), vec![1]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_page_order_single_page_index_yields_one_entry() {
|
||||||
|
// Sources with exactly one page should not yield duplicates
|
||||||
|
// regardless of mode.
|
||||||
|
let backfill = build_page_order(Some(1), true);
|
||||||
|
assert_eq!(Vec::from(backfill), vec![1]);
|
||||||
|
let incremental = build_page_order(Some(1), false);
|
||||||
|
assert_eq!(Vec::from(incremental), vec![1]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -412,6 +412,53 @@ pub async fn sync_manga_chapters(
|
|||||||
Ok(diff)
|
Ok(diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Record that a complete Backfill walk has finished for `source_id`.
|
||||||
|
/// The presence of this row is what the daemon's mode auto-detection
|
||||||
|
/// uses to flip from Backfill to Incremental on subsequent ticks.
|
||||||
|
///
|
||||||
|
/// Keyed `seed_completed:<source_id>` in `crawler_state`. JSON payload
|
||||||
|
/// stores the timestamp so we can surface "last fully reseeded at" in
|
||||||
|
/// future ops tooling without another migration.
|
||||||
|
pub async fn mark_seed_completed(
|
||||||
|
pool: &PgPool,
|
||||||
|
source_id: &str,
|
||||||
|
at: DateTime<Utc>,
|
||||||
|
) -> sqlx::Result<()> {
|
||||||
|
let key = format!("seed_completed:{source_id}");
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO crawler_state (key, value, updated_at) \
|
||||||
|
VALUES ($1, $2, now()) \
|
||||||
|
ON CONFLICT (key) DO UPDATE \
|
||||||
|
SET value = EXCLUDED.value, updated_at = now()",
|
||||||
|
)
|
||||||
|
.bind(&key)
|
||||||
|
.bind(serde_json::json!({ "at": at.to_rfc3339() }))
|
||||||
|
.execute(pool)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read the timestamp written by [`mark_seed_completed`], if any.
|
||||||
|
/// `None` means no complete Backfill has ever finished for this
|
||||||
|
/// source — the daemon should run Backfill on the next tick.
|
||||||
|
pub async fn seed_completed_at(
|
||||||
|
pool: &PgPool,
|
||||||
|
source_id: &str,
|
||||||
|
) -> sqlx::Result<Option<DateTime<Utc>>> {
|
||||||
|
let key = format!("seed_completed:{source_id}");
|
||||||
|
let row: Option<serde_json::Value> =
|
||||||
|
sqlx::query_scalar("SELECT value FROM crawler_state WHERE key = $1")
|
||||||
|
.bind(&key)
|
||||||
|
.fetch_optional(pool)
|
||||||
|
.await?;
|
||||||
|
Ok(row.and_then(|v| {
|
||||||
|
v.get("at")
|
||||||
|
.and_then(|s| s.as_str())
|
||||||
|
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
|
||||||
|
.map(|dt| dt.with_timezone(&Utc))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn mark_dropped_mangas(
|
pub async fn mark_dropped_mangas(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
source_id: &str,
|
source_id: &str,
|
||||||
|
|||||||
85
backend/tests/crawler_incremental.rs
Normal file
85
backend/tests/crawler_incremental.rs
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
//! Integration tests for the incremental-mode coordination state:
|
||||||
|
//! `mark_seed_completed` / `seed_completed_at` round-trip via the
|
||||||
|
//! `crawler_state` table.
|
||||||
|
//!
|
||||||
|
//! End-to-end pipeline behavior (walker + stop-on-Unchanged) requires
|
||||||
|
//! a real `chromiumoxide::Browser` to construct a `FetchContext`, so
|
||||||
|
//! the live integration of that path is covered by
|
||||||
|
//! `crawler_browser_smoke.rs` instead. The pure stop logic itself is
|
||||||
|
//! unit-tested in `crawler::pipeline::tests`.
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use mangalord::repo::crawler;
|
||||||
|
use sqlx::PgPool;
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn seed_completed_at_none_before_any_run(pool: PgPool) {
|
||||||
|
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let res = crawler::seed_completed_at(&pool, "target").await.unwrap();
|
||||||
|
assert!(res.is_none(), "fresh source has no seed marker");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn mark_seed_completed_then_read_round_trips_timestamp(pool: PgPool) {
|
||||||
|
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let at = Utc::now();
|
||||||
|
crawler::mark_seed_completed(&pool, "target", at)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let read = crawler::seed_completed_at(&pool, "target")
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.expect("marker present after mark");
|
||||||
|
// RFC3339 round-trip is millisecond-precise on chrono::Utc; allow a
|
||||||
|
// 1ms tolerance to absorb postgres jsonb whitespace canonicalization.
|
||||||
|
let drift = (read - at).num_milliseconds().abs();
|
||||||
|
assert!(drift <= 1, "round-trip drift: {drift}ms (at={at}, read={read})");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn mark_seed_completed_overwrites_previous_value(pool: PgPool) {
|
||||||
|
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let first = Utc::now() - chrono::Duration::hours(1);
|
||||||
|
let second = Utc::now();
|
||||||
|
crawler::mark_seed_completed(&pool, "target", first)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
crawler::mark_seed_completed(&pool, "target", second)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let read = crawler::seed_completed_at(&pool, "target")
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.expect("marker present");
|
||||||
|
let drift = (read - second).num_milliseconds().abs();
|
||||||
|
assert!(drift <= 1, "should reflect the latest mark, not the first");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[sqlx::test(migrations = "./migrations")]
|
||||||
|
async fn seed_completed_is_per_source(pool: PgPool) {
|
||||||
|
// Two sources, only one is marked complete. The other must still
|
||||||
|
// report None — the key is namespaced by source_id.
|
||||||
|
crawler::ensure_source(&pool, "target", "T", "https://x.example")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
crawler::ensure_source(&pool, "other", "O", "https://y.example")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
crawler::mark_seed_completed(&pool, "target", Utc::now())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(crawler::seed_completed_at(&pool, "target")
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_some());
|
||||||
|
assert!(crawler::seed_completed_at(&pool, "other")
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
}
|
||||||
22
docker-compose.prod.yml
Normal file
22
docker-compose.prod.yml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# Production overlay: layer on top of docker-compose.yml on the deploy
|
||||||
|
# host so the backend and frontend run from pre-built registry images
|
||||||
|
# instead of building locally.
|
||||||
|
#
|
||||||
|
# docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d
|
||||||
|
#
|
||||||
|
# REGISTRY_URL and IMAGE_TAG are injected by .gitea/workflows/deploy.yml
|
||||||
|
# at deploy time. IMAGE_TAG defaults to `latest` so a manual
|
||||||
|
# `docker compose ... up -d` on the host still works.
|
||||||
|
|
||||||
|
services:
|
||||||
|
backend:
|
||||||
|
build: !reset null
|
||||||
|
image: ${REGISTRY_URL}/mangalord-backend:${IMAGE_TAG:-latest}
|
||||||
|
pull_policy: always
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
frontend:
|
||||||
|
build: !reset null
|
||||||
|
image: ${REGISTRY_URL}/mangalord-frontend:${IMAGE_TAG:-latest}
|
||||||
|
pull_policy: always
|
||||||
|
restart: unless-stopped
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "mangalord-frontend",
|
"name": "mangalord-frontend",
|
||||||
"version": "0.32.0",
|
"version": "0.34.1",
|
||||||
"private": true,
|
"private": true,
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
Reference in New Issue
Block a user