fix(crawler): guard ack_done/ack_failed/release on state='running' (0.35.3)

The three lease-ack functions matched their UPDATE on the job id
alone. If a lease expired and another worker re-leased the row, a
late ack from the original worker would clobber the new lease's
state, leased_until, and (for release) decrement its attempts.

Add `AND state = 'running'` to each UPDATE and log a warn when
rows_affected is zero, so a stolen lease shows up in telemetry without
blocking the new lease holder's progress.

Three new integration tests pin the contract:
- ack_done_no_ops_when_lease_was_stolen
- ack_failed_no_ops_when_state_is_not_running
- release_no_ops_when_state_is_not_running

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
MechaCat02
2026-05-29 20:42:18 +02:00
parent c686d6eb51
commit 766c6eebac
5 changed files with 149 additions and 15 deletions

View File

@@ -160,23 +160,36 @@ pub async fn lease(
Ok(leases)
}
/// Mark a leased job as successfully completed.
/// Mark a leased job as successfully completed. The `state = 'running'`
/// predicate guards against a late ack from a worker whose lease expired
/// and was already re-leased by another worker: without it, the late ack
/// would clobber the new lease's `state` and `leased_until`. `rows_affected
/// == 0` means we lost the lease — surfaced as a warn rather than an
/// error because the new lease holder is doing real work; the late ack
/// just has to step aside.
pub async fn ack_done(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> {
sqlx::query(
let res = sqlx::query(
"UPDATE crawler_jobs \
SET state = 'done', leased_until = NULL, updated_at = now() \
WHERE id = $1",
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.execute(pool)
.await?;
if res.rows_affected() == 0 {
tracing::warn!(
%lease_id,
"ack_done: lease no longer running — likely re-leased by another worker; skipping update"
);
}
Ok(())
}
/// Mark a leased job as failed. If the current attempt count has reached
/// `max_attempts` the job is terminally dead and stops retrying;
/// otherwise it goes back to `pending` with `scheduled_at` pushed into
/// the future by the exponential backoff.
/// the future by the exponential backoff. See [`ack_done`] for the
/// `state = 'running'` guard rationale.
pub async fn ack_failed(
pool: &PgPool,
lease_id: Uuid,
@@ -184,16 +197,16 @@ pub async fn ack_failed(
attempts: i32,
max_attempts: i32,
) -> sqlx::Result<()> {
if attempts >= max_attempts {
let res = if attempts >= max_attempts {
sqlx::query(
"UPDATE crawler_jobs \
SET state = 'dead', last_error = $2, leased_until = NULL, updated_at = now() \
WHERE id = $1",
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.bind(error)
.execute(pool)
.await?;
.await?
} else {
let backoff_ms: i64 = backoff_for(attempts).as_millis().min(i64::MAX as u128) as i64;
sqlx::query(
@@ -201,30 +214,45 @@ pub async fn ack_failed(
SET state = 'pending', last_error = $2, leased_until = NULL, \
scheduled_at = now() + ($3::bigint || ' milliseconds')::interval, \
updated_at = now() \
WHERE id = $1",
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.bind(error)
.bind(backoff_ms)
.execute(pool)
.await?;
.await?
};
if res.rows_affected() == 0 {
tracing::warn!(
%lease_id,
"ack_failed: lease no longer running — likely re-leased by another worker; skipping update"
);
}
Ok(())
}
/// Return a leased job to `pending` without burning a retry attempt.
/// Used on graceful shutdown and on session-expired aborts where the
/// failure isn't the job's fault.
/// failure isn't the job's fault. See [`ack_done`] for the
/// `state = 'running'` guard rationale — important here because
/// `attempts - 1` would otherwise spuriously decrement the new lease's
/// attempt count.
pub async fn release(pool: &PgPool, lease_id: Uuid) -> sqlx::Result<()> {
sqlx::query(
let res = sqlx::query(
"UPDATE crawler_jobs \
SET state = 'pending', leased_until = NULL, \
attempts = GREATEST(0, attempts - 1), updated_at = now() \
WHERE id = $1",
WHERE id = $1 AND state = 'running'",
)
.bind(lease_id)
.execute(pool)
.await?;
if res.rows_affected() == 0 {
tracing::warn!(
%lease_id,
"release: lease no longer running — likely re-leased by another worker; skipping update"
);
}
Ok(())
}