bugfix: wrap manga + chapter uploads in a DB transaction
Previously a storage failure mid-chapter-upload left a partial chapter row pointing at a `page_count` that didn't match what was on disk, plus any successfully-inserted page rows. Same shape for a manga create where the cover put or cover_image_path UPDATE failed after the manga row was already inserted. Fix at the DB layer: open `pool.begin()` at the start of the create, do all DB writes against `&mut *tx`, commit only after the full sequence succeeds. If anything before commit fails, the transaction is rolled back on drop and the DB stays consistent. Bytes already written to storage on a rolled-back transaction become orphans on disk; a future reaper can sweep them, and we prioritise DB consistency over storage tidiness in this branch. - repo::manga::create / set_cover_image_path: signature changed to `impl PgExecutor<'_>` so handlers can pass either `&PgPool` or `&mut *tx`. set_cover_image_path is new — replaces the inline `UPDATE` in the manga upload handler so the call site stays consistent. - repo::chapter::create / set_page_count: same shape. - repo::page::create: same. - api::mangas::create and api::chapters::create both open a transaction around their DB writes; storage puts happen inside the transaction window (since they must precede the page-row insert), so a failed put aborts before commit. New integration test (api_uploads::chapter_upload_rolls_back_when_ storage_fails_mid_loop) uses a `FailingStorage` helper that errors on the N-th `put`. With N=1 (page 2 fails), the handler returns 500 and the chapter + page tables stay empty. `harness_with_failing_storage` is exposed alongside the existing `harness` so future tests can reuse it for other fault-injection cases. Lockstep version bump to 0.9.3. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2
backend/Cargo.lock
generated
2
backend/Cargo.lock
generated
@@ -1033,7 +1033,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "mangalord"
|
||||
version = "0.9.1"
|
||||
version = "0.9.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"argon2",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "mangalord"
|
||||
version = "0.9.2"
|
||||
version = "0.9.3"
|
||||
edition = "2021"
|
||||
|
||||
[lib]
|
||||
|
||||
@@ -112,8 +112,15 @@ async fn create(
|
||||
});
|
||||
}
|
||||
|
||||
// Transactional create. If any storage put or page-row insert
|
||||
// fails mid-loop, the chapter row + any earlier page rows are
|
||||
// rolled back so we don't leave a chapter with stale page_count=0
|
||||
// and orphaned page rows. Bytes already written to storage on a
|
||||
// rolled-back transaction become orphans on disk; a future reaper
|
||||
// can sweep them. DB consistency wins over storage tidiness here.
|
||||
let mut tx = state.db.begin().await?;
|
||||
let mut chapter = repo::chapter::create(
|
||||
&state.db,
|
||||
&mut *tx,
|
||||
manga_id,
|
||||
metadata.number,
|
||||
metadata.title.as_deref(),
|
||||
@@ -128,17 +135,15 @@ async fn create(
|
||||
manga_id, chapter.id, nnnn, page.ext
|
||||
);
|
||||
state.storage.put(&key, &page.bytes).await?;
|
||||
repo::page::create(&state.db, chapter.id, page_number, &key, page.mime).await?;
|
||||
repo::page::create(&mut *tx, chapter.id, page_number, &key, page.mime).await?;
|
||||
}
|
||||
|
||||
let page_count = pages.len() as i32;
|
||||
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
|
||||
.bind(page_count)
|
||||
.bind(chapter.id)
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
repo::chapter::set_page_count(&mut *tx, chapter.id, page_count).await?;
|
||||
chapter.page_count = page_count;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok((StatusCode::CREATED, Json(chapter)))
|
||||
}
|
||||
|
||||
|
||||
@@ -94,19 +94,24 @@ async fn create(
|
||||
})?;
|
||||
validate_new_manga(&metadata)?;
|
||||
|
||||
let mut manga = repo::manga::create(&state.db, metadata).await?;
|
||||
// Transactional create. If the cover put or the cover_image_path
|
||||
// UPDATE fails, the manga row is rolled back so a half-uploaded
|
||||
// cover doesn't leave a manga without one stuck pointing at it.
|
||||
// Bytes already written to storage on a rolled-back transaction
|
||||
// become orphans on disk — a future reaper can sweep them; we
|
||||
// prioritise DB consistency over storage tidiness here.
|
||||
let mut tx = state.db.begin().await?;
|
||||
let mut manga = repo::manga::create(&mut *tx, metadata).await?;
|
||||
|
||||
if let Some(img) = cover {
|
||||
let key = format!("mangas/{}/cover.{}", manga.id, img.ext);
|
||||
state.storage.put(&key, &img.bytes).await?;
|
||||
sqlx::query("UPDATE mangas SET cover_image_path = $1, updated_at = now() WHERE id = $2")
|
||||
.bind(&key)
|
||||
.bind(manga.id)
|
||||
.execute(&state.db)
|
||||
.await?;
|
||||
repo::manga::set_cover_image_path(&mut *tx, manga.id, &key).await?;
|
||||
manga.cover_image_path = Some(key);
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok((StatusCode::CREATED, Json(manga)))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
//! Chapter persistence.
|
||||
|
||||
use sqlx::PgPool;
|
||||
use sqlx::{PgExecutor, PgPool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::domain::Chapter;
|
||||
use crate::error::AppResult;
|
||||
use crate::error::{AppError, AppResult};
|
||||
|
||||
pub async fn list_for_manga(
|
||||
pool: &PgPool,
|
||||
@@ -48,11 +48,12 @@ pub async fn find_by_manga_and_number(
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
/// Inserts a chapter. Used by tests today and by the upload handler in
|
||||
/// feat/uploads. Returns `AppError::Conflict` on the (manga_id, number)
|
||||
/// unique violation so handlers can surface a clean 409.
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
/// Accepts any `PgExecutor` so the upload handler can run this inside a
|
||||
/// transaction with the per-page inserts. Returns `AppError::Conflict`
|
||||
/// on the (manga_id, number) unique violation so handlers can surface a
|
||||
/// clean 409.
|
||||
pub async fn create<'e, E: PgExecutor<'e>>(
|
||||
executor: E,
|
||||
manga_id: Uuid,
|
||||
number: i32,
|
||||
title: Option<&str>,
|
||||
@@ -67,18 +68,31 @@ pub async fn create(
|
||||
.bind(manga_id)
|
||||
.bind(number)
|
||||
.bind(title)
|
||||
.fetch_one(pool)
|
||||
.fetch_one(executor)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(c) => Ok(c),
|
||||
Err(e) if is_unique_violation(&e) => Err(crate::error::AppError::Conflict(format!(
|
||||
Err(e) if is_unique_violation(&e) => Err(AppError::Conflict(format!(
|
||||
"chapter {number} already exists for this manga"
|
||||
))),
|
||||
Err(e) => Err(crate::error::AppError::Database(e)),
|
||||
Err(e) => Err(AppError::Database(e)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_page_count<'e, E: PgExecutor<'e>>(
|
||||
executor: E,
|
||||
id: Uuid,
|
||||
page_count: i32,
|
||||
) -> AppResult<()> {
|
||||
sqlx::query("UPDATE chapters SET page_count = $1 WHERE id = $2")
|
||||
.bind(page_count)
|
||||
.bind(id)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_unique_violation(err: &sqlx::Error) -> bool {
|
||||
if let sqlx::Error::Database(db_err) = err {
|
||||
db_err.code().as_deref() == Some("23505")
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! a trait + impl if a second backend ever becomes necessary.
|
||||
|
||||
use serde::Deserialize;
|
||||
use sqlx::PgPool;
|
||||
use sqlx::{PgExecutor, PgPool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::domain::manga::{Manga, NewManga};
|
||||
@@ -105,7 +105,13 @@ pub async fn get(pool: &PgPool, id: Uuid) -> AppResult<Manga> {
|
||||
.ok_or(AppError::NotFound)
|
||||
}
|
||||
|
||||
pub async fn create(pool: &PgPool, input: NewManga) -> AppResult<Manga> {
|
||||
/// Accepts any `PgExecutor` so callers can pass `&PgPool` for simple
|
||||
/// inserts or `&mut *tx` to run inside a transaction. Same applies to
|
||||
/// `set_cover_image_path` below.
|
||||
pub async fn create<'e, E: PgExecutor<'e>>(
|
||||
executor: E,
|
||||
input: NewManga,
|
||||
) -> AppResult<Manga> {
|
||||
let row = sqlx::query_as::<_, Manga>(
|
||||
r#"
|
||||
INSERT INTO mangas (title, author, description)
|
||||
@@ -116,7 +122,20 @@ pub async fn create(pool: &PgPool, input: NewManga) -> AppResult<Manga> {
|
||||
.bind(&input.title)
|
||||
.bind(&input.author)
|
||||
.bind(&input.description)
|
||||
.fetch_one(pool)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
pub async fn set_cover_image_path<'e, E: PgExecutor<'e>>(
|
||||
executor: E,
|
||||
id: Uuid,
|
||||
key: &str,
|
||||
) -> AppResult<()> {
|
||||
sqlx::query("UPDATE mangas SET cover_image_path = $1, updated_at = now() WHERE id = $2")
|
||||
.bind(key)
|
||||
.bind(id)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
//! Per-page persistence. Mirrors the rows that `pages` holds.
|
||||
|
||||
use sqlx::PgPool;
|
||||
use sqlx::{PgExecutor, PgPool};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::domain::Page;
|
||||
use crate::error::AppResult;
|
||||
|
||||
pub async fn create(
|
||||
pool: &PgPool,
|
||||
pub async fn create<'e, E: PgExecutor<'e>>(
|
||||
executor: E,
|
||||
chapter_id: Uuid,
|
||||
page_number: i32,
|
||||
storage_key: &str,
|
||||
@@ -24,7 +24,7 @@ pub async fn create(
|
||||
.bind(page_number)
|
||||
.bind(storage_key)
|
||||
.bind(content_type)
|
||||
.fetch_one(pool)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
Ok(row)
|
||||
}
|
||||
|
||||
@@ -323,6 +323,48 @@ async fn create_chapter_requires_authentication(pool: PgPool) {
|
||||
assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn chapter_upload_rolls_back_when_storage_fails_mid_loop(pool: PgPool) {
|
||||
// Configure storage so the second `put` call (0-indexed: index 1)
|
||||
// errors. seed_manga_via_api uploads no cover, so the very first
|
||||
// `put` happens inside the chapter handler — page 1 succeeds, page
|
||||
// 2 fails, the transaction rolls back.
|
||||
let h = common::harness_with_failing_storage(pool.clone(), 1);
|
||||
let (_, cookie) = common::register_user(&h.app).await;
|
||||
let manga_id = common::seed_manga_via_api(&h.app, &cookie, "Berserk").await;
|
||||
|
||||
let resp = h
|
||||
.app
|
||||
.oneshot(common::post_multipart_with_cookie(
|
||||
&format!("/api/v1/mangas/{manga_id}/chapters"),
|
||||
MultipartBuilder::new()
|
||||
.add_json("metadata", json!({ "number": 1 }))
|
||||
.add_file("page", "1.png", "image/png", &common::fake_png_bytes())
|
||||
.add_file("page", "2.png", "image/png", &common::fake_png_bytes())
|
||||
.add_file("page", "3.png", "image/png", &common::fake_png_bytes()),
|
||||
&cookie,
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
|
||||
let body = common::body_json(resp).await;
|
||||
assert_eq!(body["error"]["code"], "internal_error");
|
||||
|
||||
// No chapter rows for this manga.
|
||||
let (chapter_count,): (i64,) =
|
||||
sqlx::query_as("SELECT count(*) FROM chapters WHERE manga_id = $1")
|
||||
.bind(manga_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(chapter_count, 0, "rolled-back chapter must not persist");
|
||||
|
||||
// No page rows at all (we never seeded any other chapter).
|
||||
let (page_count,): (i64,) =
|
||||
sqlx::query_as("SELECT count(*) FROM pages").fetch_one(&pool).await.unwrap();
|
||||
assert_eq!(page_count, 0, "rolled-back pages must not persist");
|
||||
}
|
||||
|
||||
#[sqlx::test(migrations = "./migrations")]
|
||||
async fn create_chapter_under_unknown_manga_is_404(pool: PgPool) {
|
||||
let h = common::harness(pool);
|
||||
|
||||
@@ -16,7 +16,10 @@ use tower::ServiceExt;
|
||||
|
||||
use mangalord::app::{router, AppState};
|
||||
use mangalord::config::{AuthConfig, UploadConfig};
|
||||
use mangalord::storage::LocalStorage;
|
||||
use mangalord::storage::{LocalStorage, Storage, StorageError, StreamingFile};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct Harness {
|
||||
pub app: Router,
|
||||
@@ -26,9 +29,29 @@ pub struct Harness {
|
||||
|
||||
pub fn harness(pool: PgPool) -> Harness {
|
||||
let storage_dir = tempfile::tempdir().expect("tempdir");
|
||||
let storage = Arc::new(LocalStorage::new(storage_dir.path()));
|
||||
harness_inner(pool, storage, storage_dir)
|
||||
}
|
||||
|
||||
/// Variant of `harness` that swaps in a `Storage` that errors on the
|
||||
/// `fail_on_put_index`-th `put` call (0-indexed). Used to exercise the
|
||||
/// upload handlers' transactional rollback path without resorting to
|
||||
/// fault injection at lower layers.
|
||||
pub fn harness_with_failing_storage(pool: PgPool, fail_on_put_index: usize) -> Harness {
|
||||
let storage_dir = tempfile::tempdir().expect("tempdir");
|
||||
let inner = LocalStorage::new(storage_dir.path());
|
||||
let storage = Arc::new(FailingStorage::new(inner, fail_on_put_index));
|
||||
harness_inner(pool, storage, storage_dir)
|
||||
}
|
||||
|
||||
fn harness_inner(
|
||||
pool: PgPool,
|
||||
storage: Arc<dyn Storage>,
|
||||
storage_dir: TempDir,
|
||||
) -> Harness {
|
||||
let state = AppState {
|
||||
db: pool,
|
||||
storage: Arc::new(LocalStorage::new(storage_dir.path())),
|
||||
storage,
|
||||
auth: AuthConfig { cookie_secure: false, ..AuthConfig::default() },
|
||||
upload: UploadConfig {
|
||||
// Keep file caps small in tests so the size-cap path is cheap to
|
||||
@@ -40,6 +63,50 @@ pub fn harness(pool: PgPool) -> Harness {
|
||||
Harness { app: router(state), _storage_dir: storage_dir }
|
||||
}
|
||||
|
||||
/// Wraps a real `Storage` and fails on the N-th `put` call so tests can
|
||||
/// assert that handlers roll their DB writes back when storage errors
|
||||
/// mid-upload. Reads and other operations delegate to `inner`.
|
||||
pub struct FailingStorage {
|
||||
inner: LocalStorage,
|
||||
counter: AtomicUsize,
|
||||
fail_on_put_index: usize,
|
||||
}
|
||||
|
||||
impl FailingStorage {
|
||||
pub fn new(inner: LocalStorage, fail_on_put_index: usize) -> Self {
|
||||
Self {
|
||||
inner,
|
||||
counter: AtomicUsize::new(0),
|
||||
fail_on_put_index,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Storage for FailingStorage {
|
||||
async fn put(&self, key: &str, bytes: &[u8]) -> Result<(), StorageError> {
|
||||
let n = self.counter.fetch_add(1, Ordering::SeqCst);
|
||||
if n == self.fail_on_put_index {
|
||||
return Err(StorageError::Io(std::io::Error::other(
|
||||
"FailingStorage: injected put failure",
|
||||
)));
|
||||
}
|
||||
self.inner.put(key, bytes).await
|
||||
}
|
||||
async fn get(&self, key: &str) -> Result<Vec<u8>, StorageError> {
|
||||
self.inner.get(key).await
|
||||
}
|
||||
async fn get_stream(&self, key: &str) -> Result<StreamingFile, StorageError> {
|
||||
self.inner.get_stream(key).await
|
||||
}
|
||||
async fn delete(&self, key: &str) -> Result<(), StorageError> {
|
||||
self.inner.delete(key).await
|
||||
}
|
||||
async fn exists(&self, key: &str) -> Result<bool, StorageError> {
|
||||
self.inner.exists(key).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn body_json(response: axum::response::Response) -> serde_json::Value {
|
||||
let bytes = response.into_body().collect().await.unwrap().to_bytes();
|
||||
serde_json::from_slice(&bytes).expect("body is JSON")
|
||||
|
||||
Reference in New Issue
Block a user