feat: crawler manga-list & metadata sync with cover download (0.23.0)
- TargetSource: first concrete impl of the Source trait, modeled on
the old Puppeteer crawler's selectors (+ status normalization,
tag-count stripping, chapter list)
- DiscoverMode::Backfill walks pagination last->1, reverse within each
page (oldest-first); Incremental walks forward
- RateLimiter (tokio-time aware) plumbed through FetchContext so the
pagination walk honors the same per-host budget as the outer loop
- repo::crawler: ensure_source, upsert_manga_from_source (returns
New/Updated/Unchanged + current cover_image_path for backfill
decisions), sync_manga_chapters, mark_dropped_mangas — all
transactional, with case-insensitive lookups and source-insertable
genres
- Cover image download via reqwest+infer; stored under
mangas/{id}/cover.{ext} via the Storage trait
- Single CRAWLER_PROXY env wires both Chromium (--proxy-server) and
reqwest::Proxy::all (HTTP/HTTPS/SOCKS5)
- Crawler binary: positional start URL or $CRAWLER_START_URL,
$CRAWLER_LIMIT (cap fetches + skip drop pass on partial runs),
$CRAWLER_SKIP_CHAPTERS (disable selector AND sync), $CRAWLER_RATE_MS
- Silences chromiumoxide 0.7's known CDP deserialize log spam via
default tracing filter + CdpError::Serde downgrade
- 9 sqlx integration tests + 11 selector/rate-limit unit tests
This commit is contained in:
434
backend/src/repo/crawler.rs
Normal file
434
backend/src/repo/crawler.rs
Normal file
@@ -0,0 +1,434 @@
|
||||
//! Persistence for crawled mangas.
|
||||
//!
|
||||
//! High-level operations:
|
||||
//! - [`ensure_source`]: idempotent registration of a source row.
|
||||
//! - [`upsert_manga_from_source`]: end-to-end "I saw this manga" —
|
||||
//! creates or updates the `mangas` row, threads `manga_sources`, and
|
||||
//! refreshes authors/genres/tags. Returns whether the manga is new,
|
||||
//! updated (metadata_hash changed), or unchanged.
|
||||
//! - [`sync_manga_chapters`]: per-manga chapter reconciliation. Adds
|
||||
//! new ones, refreshes URLs on existing ones, soft-drops vanished.
|
||||
//! - [`mark_dropped_mangas`]: end-of-run pass. Any manga from this
|
||||
//! source whose `last_seen_at` is older than the run start is
|
||||
//! soft-dropped.
|
||||
//!
|
||||
//! Each public function is a transaction boundary so a partial failure
|
||||
//! mid-call leaves the DB in its pre-call state.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use sqlx::{PgPool, Postgres, Transaction};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::crawler::source::{SourceChapterRef, SourceManga};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum UpsertStatus {
|
||||
New,
|
||||
Updated,
|
||||
Unchanged,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpsertedManga {
|
||||
pub manga_id: Uuid,
|
||||
pub status: UpsertStatus,
|
||||
/// Current value of `mangas.cover_image_path` after the upsert.
|
||||
/// `None` means the cover hasn't been downloaded yet — the caller
|
||||
/// uses this to backfill covers for mangas that were synced before
|
||||
/// cover-download support existed.
|
||||
pub cover_image_path: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ChapterDiff {
|
||||
pub new: usize,
|
||||
pub refreshed: usize,
|
||||
pub dropped: usize,
|
||||
}
|
||||
|
||||
pub async fn ensure_source(
|
||||
pool: &PgPool,
|
||||
id: &str,
|
||||
name: &str,
|
||||
base_url: &str,
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO sources (id, name, base_url, enabled)
|
||||
VALUES ($1, $2, $3, true)
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET name = EXCLUDED.name,
|
||||
base_url = EXCLUDED.base_url
|
||||
"#,
|
||||
)
|
||||
.bind(id)
|
||||
.bind(name)
|
||||
.bind(base_url)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn upsert_manga_from_source(
|
||||
pool: &PgPool,
|
||||
source_id: &str,
|
||||
source_url: &str,
|
||||
sm: &SourceManga,
|
||||
) -> sqlx::Result<UpsertedManga> {
|
||||
let mut tx = pool.begin().await?;
|
||||
|
||||
let existing: Option<(Uuid, Option<String>)> = sqlx::query_as(
|
||||
r#"
|
||||
SELECT manga_id, metadata_hash
|
||||
FROM manga_sources
|
||||
WHERE source_id = $1 AND source_manga_key = $2
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(&sm.source_manga_key)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let status_db = sm.status.as_deref().unwrap_or("ongoing");
|
||||
|
||||
// Note: `cover_image_path` is intentionally not written here.
|
||||
// The repo layer doesn't know about the storage backend, so the
|
||||
// caller (crawler binary) downloads the cover via the `Storage`
|
||||
// trait and sets the path with `repo::manga::set_cover_image_path`
|
||||
// once the bytes have landed.
|
||||
let (manga_id, status) = match existing {
|
||||
None => {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO mangas (title, description, status, alt_titles)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(&sm.title)
|
||||
.bind(sm.summary.as_deref())
|
||||
.bind(status_db)
|
||||
.bind(&sm.alternative_titles)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
(id, UpsertStatus::New)
|
||||
}
|
||||
Some((id, prev_hash)) if prev_hash.as_deref() == Some(&sm.metadata_hash) => {
|
||||
(id, UpsertStatus::Unchanged)
|
||||
}
|
||||
Some((id, _)) => {
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE mangas
|
||||
SET title = $1,
|
||||
description = $2,
|
||||
status = $3,
|
||||
alt_titles = $4,
|
||||
updated_at = NOW()
|
||||
WHERE id = $5
|
||||
"#,
|
||||
)
|
||||
.bind(&sm.title)
|
||||
.bind(sm.summary.as_deref())
|
||||
.bind(status_db)
|
||||
.bind(&sm.alternative_titles)
|
||||
.bind(id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
(id, UpsertStatus::Updated)
|
||||
}
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO manga_sources
|
||||
(source_id, source_manga_key, manga_id, source_url, metadata_hash, last_seen_at, dropped_at)
|
||||
VALUES ($1, $2, $3, $4, $5, NOW(), NULL)
|
||||
ON CONFLICT (source_id, source_manga_key) DO UPDATE
|
||||
SET source_url = EXCLUDED.source_url,
|
||||
metadata_hash = EXCLUDED.metadata_hash,
|
||||
last_seen_at = NOW(),
|
||||
dropped_at = NULL
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(&sm.source_manga_key)
|
||||
.bind(manga_id)
|
||||
.bind(source_url)
|
||||
.bind(&sm.metadata_hash)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sync_authors(&mut tx, manga_id, &sm.authors).await?;
|
||||
sync_genres(&mut tx, manga_id, &sm.genres).await?;
|
||||
sync_tags(&mut tx, manga_id, &sm.tags).await?;
|
||||
|
||||
let cover_image_path: Option<String> =
|
||||
sqlx::query_scalar("SELECT cover_image_path FROM mangas WHERE id = $1")
|
||||
.bind(manga_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(UpsertedManga {
|
||||
manga_id,
|
||||
status,
|
||||
cover_image_path,
|
||||
})
|
||||
}
|
||||
|
||||
async fn sync_authors(
|
||||
tx: &mut Transaction<'_, Postgres>,
|
||||
manga_id: Uuid,
|
||||
authors: &[String],
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query("DELETE FROM manga_authors WHERE manga_id = $1")
|
||||
.bind(manga_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
for (i, name) in authors.iter().enumerate() {
|
||||
let trimmed = name.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// Self-update on conflict so the row id is always returned —
|
||||
// can't use DO NOTHING because that suppresses RETURNING.
|
||||
let (author_id,): (Uuid,) = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO authors (name) VALUES ($1)
|
||||
ON CONFLICT (lower(name)) DO UPDATE SET name = authors.name
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(trimmed)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO manga_authors (manga_id, author_id, position)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(author_id)
|
||||
.bind(i as i32)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_genres(
|
||||
tx: &mut Transaction<'_, Postgres>,
|
||||
manga_id: Uuid,
|
||||
genres: &[String],
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query("DELETE FROM manga_genres WHERE manga_id = $1")
|
||||
.bind(manga_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
for name in genres {
|
||||
let trimmed = name.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// Case-insensitive lookup so a source-supplied "action"
|
||||
// attaches to the seeded "Action" rather than creating a
|
||||
// second row.
|
||||
let existing: Option<(Uuid,)> =
|
||||
sqlx::query_as("SELECT id FROM genres WHERE lower(name) = lower($1)")
|
||||
.bind(trimmed)
|
||||
.fetch_optional(&mut **tx)
|
||||
.await?;
|
||||
let genre_id = match existing {
|
||||
Some((id,)) => id,
|
||||
None => {
|
||||
let (id,): (Uuid,) = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO genres (name) VALUES ($1)
|
||||
ON CONFLICT (name) DO UPDATE SET name = genres.name
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(trimmed)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
tracing::info!(genre = trimmed, "added new genre from source");
|
||||
id
|
||||
}
|
||||
};
|
||||
sqlx::query(
|
||||
"INSERT INTO manga_genres (manga_id, genre_id) VALUES ($1, $2) ON CONFLICT DO NOTHING",
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(genre_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn sync_tags(
|
||||
tx: &mut Transaction<'_, Postgres>,
|
||||
manga_id: Uuid,
|
||||
tags: &[String],
|
||||
) -> sqlx::Result<()> {
|
||||
sqlx::query("DELETE FROM manga_tags WHERE manga_id = $1")
|
||||
.bind(manga_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
for name in tags {
|
||||
let trimmed = name.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let (tag_id,): (Uuid,) = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO tags (name) VALUES ($1)
|
||||
ON CONFLICT (lower(name)) DO UPDATE SET name = tags.name
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(trimmed)
|
||||
.fetch_one(&mut **tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO manga_tags (manga_id, tag_id, added_by)
|
||||
VALUES ($1, $2, NULL)
|
||||
ON CONFLICT DO NOTHING
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(tag_id)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sync_manga_chapters(
|
||||
pool: &PgPool,
|
||||
source_id: &str,
|
||||
manga_id: Uuid,
|
||||
chapters: &[SourceChapterRef],
|
||||
) -> sqlx::Result<ChapterDiff> {
|
||||
let mut tx = pool.begin().await?;
|
||||
let mut diff = ChapterDiff::default();
|
||||
let seen_keys: Vec<String> = chapters
|
||||
.iter()
|
||||
.map(|c| c.source_chapter_key.clone())
|
||||
.collect();
|
||||
|
||||
for c in chapters {
|
||||
let existing: Option<(Uuid,)> = sqlx::query_as(
|
||||
"SELECT chapter_id FROM chapter_sources WHERE source_id = $1 AND source_chapter_key = $2",
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(&c.source_chapter_key)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
|
||||
match existing {
|
||||
None => {
|
||||
// New chapter row. The (manga_id, number) unique
|
||||
// constraint protects against re-inserts if the same
|
||||
// number arrives via a different source_chapter_key.
|
||||
let (chapter_id,): (Uuid,) = sqlx::query_as(
|
||||
r#"
|
||||
INSERT INTO chapters (manga_id, number, title, page_count)
|
||||
VALUES ($1, $2, $3, 0)
|
||||
ON CONFLICT (manga_id, number) DO UPDATE
|
||||
SET title = EXCLUDED.title
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(c.number)
|
||||
.bind(c.title.as_deref())
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO chapter_sources
|
||||
(source_id, source_chapter_key, chapter_id, source_url, last_seen_at, dropped_at)
|
||||
VALUES ($1, $2, $3, $4, NOW(), NULL)
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(&c.source_chapter_key)
|
||||
.bind(chapter_id)
|
||||
.bind(&c.url)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
diff.new += 1;
|
||||
}
|
||||
Some((chapter_id,)) => {
|
||||
sqlx::query("UPDATE chapters SET title = $1 WHERE id = $2")
|
||||
.bind(c.title.as_deref())
|
||||
.bind(chapter_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
UPDATE chapter_sources
|
||||
SET source_url = $1, last_seen_at = NOW(), dropped_at = NULL
|
||||
WHERE source_id = $2 AND source_chapter_key = $3
|
||||
"#,
|
||||
)
|
||||
.bind(&c.url)
|
||||
.bind(source_id)
|
||||
.bind(&c.source_chapter_key)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
diff.refreshed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Soft-drop any chapter previously seen from this source for this
|
||||
// manga that's not in the current list.
|
||||
let result = sqlx::query(
|
||||
r#"
|
||||
UPDATE chapter_sources cs
|
||||
SET dropped_at = NOW()
|
||||
FROM chapters ch
|
||||
WHERE cs.chapter_id = ch.id
|
||||
AND ch.manga_id = $1
|
||||
AND cs.source_id = $2
|
||||
AND cs.dropped_at IS NULL
|
||||
AND NOT (cs.source_chapter_key = ANY($3))
|
||||
"#,
|
||||
)
|
||||
.bind(manga_id)
|
||||
.bind(source_id)
|
||||
.bind(&seen_keys)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
diff.dropped = result.rows_affected() as usize;
|
||||
|
||||
tx.commit().await?;
|
||||
Ok(diff)
|
||||
}
|
||||
|
||||
pub async fn mark_dropped_mangas(
|
||||
pool: &PgPool,
|
||||
source_id: &str,
|
||||
run_started_at: DateTime<Utc>,
|
||||
) -> sqlx::Result<u64> {
|
||||
let res = sqlx::query(
|
||||
r#"
|
||||
UPDATE manga_sources
|
||||
SET dropped_at = NOW()
|
||||
WHERE source_id = $1
|
||||
AND last_seen_at < $2
|
||||
AND dropped_at IS NULL
|
||||
"#,
|
||||
)
|
||||
.bind(source_id)
|
||||
.bind(run_started_at)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(res.rows_affected())
|
||||
}
|
||||
@@ -3,6 +3,7 @@ pub mod author;
|
||||
pub mod bookmark;
|
||||
pub mod chapter;
|
||||
pub mod collection;
|
||||
pub mod crawler;
|
||||
pub mod genre;
|
||||
pub mod manga;
|
||||
pub mod page;
|
||||
|
||||
Reference in New Issue
Block a user