Skip to content

feat(query retries): handle transient or retryable DB errors#634

Open
victoria-yining-huang wants to merge 14 commits into
mainfrom
vic/add_query_retry
Open

feat(query retries): handle transient or retryable DB errors#634
victoria-yining-huang wants to merge 14 commits into
mainfrom
vic/add_query_retry

Conversation

@victoria-yining-huang
Copy link
Copy Markdown

@victoria-yining-huang victoria-yining-huang commented May 13, 2026

ticket https://linear.app/getsentry/issue/STREAM-940/handle-server-shutting-down-errors-and-retry

  • Add retry_query utility function that retries individual database queries on transient infrastructure errors (Io, PoolTimedOut)
  • Errors like "Unable to update status of activation: server conn crashed?" and "Unable to write to sqlite: server shutting down" are now handled gracefully — the query is retried instead of immediately surfacing the error
  • Permanent errors (constraint violations, schema errors) are not retried
  • Retry is applied at the query execution level inside both SQLite and Postgres adapters (~30 query sites each), not at the store method level, so transactions and side effects are handled correctly
  • Two new config options: db_query_max_retries (default None / disabled) and db_query_retry_delay_ms (default 100ms)
  • Maintenance operations (vacuum) and best-effort operations (WAL checkpoint, max lag metric) are intentionally not retried — they already have their own retry loops or error handling

Performance note

The store method in both adapters clones the Vec (containing serialized activation payloads) on every call, even when the first attempt succeeds. This is unavoidable because push_values consumes the vector, and retry requires the data again on failure. Other query sites only capture references and have zero overhead.

The methods NOT wrapped with retry_query are:

  • vacuum_db / full_vacuum_db — maintenance, retried by upkeep/maintenance loops
  • handle_processing_deadline — non-idempotent counter increment, retried by upkeep loop
  • pending_activation_max_lag — best-effort metric, returns f64 not Result
  • assign_partitions — not a DB query
  • remove_db — test-only

@victoria-yining-huang victoria-yining-huang requested a review from a team as a code owner May 13, 2026 02:54
Comment thread src/store/retry.rs Outdated
Comment thread src/store/retry.rs Outdated
Comment thread src/store/retry.rs Outdated
Comment thread src/store/retry.rs Outdated
Comment thread src/config.rs Outdated
Copy link
Copy Markdown
Member

@george-sentry george-sentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it would be preferable to avoid having two nearly identical macro definitions. Otherwise I like this solution!

@victoria-yining-huang
Copy link
Copy Markdown
Author

@george-sentry I removed the duplicated function, only cloning at arg level now

Comment thread src/store/retry.rs Outdated
Comment thread src/store/retry.rs Outdated
Comment thread src/store/retry.rs Outdated
}

#[async_trait]
impl InflightActivationStore for RetryStore {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right abstraction level where to put the retry (wrapping the entire store with the retry logic).

The ActivationStore does more than running queries to the DB. So we cannot just decide to retry the entire method if the method raises a retriable DB error. What if you wanted to retry the DB query but not retry the entire method ?

  • Let's say your method had side effects beyond the DB query you do not want to retry ?
  • What if the method ran two queries and you wanted to retry only one ?

The design argument more in general:

  • the store has more responsibilities than just running DB queries.
  • We want to avoid mixing concerns between abstractions levels to keep the design logically organized and the cognitive overhead low.
  • When running the query, there are different categories of errors that can happen. We generally want to retry those that are infrastructure related (disconnection, network, etc.)
  • The infrastructure aspects are hidden as lower as possible (like in the connection pool https://github.com/getsentry/taskbroker/blob/main/src/store/adapters/postgres.rs#L106-L114) so that the layers above do not have to be polluted with infrastructure details.
  • As we are retrying only infrastructure related issues, the natural place where to put this logic should be as close to the infra as possible so the application only cares of application level issues.

I would recommend adding an abstraction around the connection pool to execute the queries and perform retry when needed rather than retrying the entire method.
I am not too familiar with sqlx, I know it does not manage retries for you:

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i followed symbolicator's retry wrapper that you linked and added that at each appropriate call site.

Comment thread src/store/adapters/sqlite.rs Outdated
Comment on lines +913 to +923
atomic.commit().await?;

if let Ok(query_res) = result {
processing_deadline_modified_rows += query_res.rows_affected();
return Ok(processing_deadline_modified_rows);
}
if let Ok(query_res) = result {
processing_deadline_modified_rows += query_res.rows_affected();
return Ok(processing_deadline_modified_rows);
}

Err(anyhow!("Could not update tasks past processing_deadline"))
Err(anyhow!("Could not update tasks past processing_deadline"))
},
)
.await
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Wrapping transactions with non-idempotent updates like incrementing processing_attempts in retry_query can cause double execution on certain commit errors, leading to incorrect attempt counts.
Severity: MEDIUM

Suggested Fix

Remove the retry_query wrapper from functions that perform non-idempotent operations within a transaction, such as handle_processing_deadline and handle_failed_tasks. Alternatively, refactor the update logic to be idempotent, for example by using a unique client-generated key for each attempt to prevent duplicate processing.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/store/adapters/sqlite.rs#L879-L923

Potential issue: The functions `handle_processing_deadline` and `handle_failed_tasks`
are wrapped in a `retry_query` utility. These functions execute a database transaction
that includes a non-idempotent `UPDATE` operation (`processing_attempts =
processing_attempts + 1`). If retries are enabled and a transient error, such as an
`sqlx::Error::Io`, occurs after the transaction successfully commits on the database but
before the client receives confirmation, the entire transaction will be retried. This
causes the `processing_attempts` to be incremented a second time for the same task,
potentially leading to tasks being marked as failed prematurely when they exceed their
`max_processing_attempts`.

Also affects:

  • src/store/adapters/postgres.rs:841~895

",
);
let query = query_builder
.push_values(rows.clone(), |mut b, row: TableRow| {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary clone of large batch data on every store call

Medium Severity

The rows.clone() inside the retry_query closure executes on every store() call — even with the default config of 0 retries — because the Fn closure cannot move captured data. The PR description says this is "no-op" when retries are disabled, but the clone of TableRow vectors (containing activation: Vec<u8> payloads up to 16MB total per batch) adds a non-trivial allocation on the hot write path that didn't exist before.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 73459f3. Configure here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cost is acceptable since retries will be enabled in production.

Copy link
Copy Markdown

@fpacifici fpacifici May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can say this. This will happen at every write on sqlte or postgres. So it may not be acceptable. Also we will not be able to turn this off by configuration by removing the retries.
Please reach out to @george-sentry and run a load test on this.

Comment thread src/store/retry.rs Outdated
Comment on lines +916 to 926
if let Ok(query_res) = processing_attempts_result {
return Ok(query_res.rows_affected());
}

Err(anyhow!("Could not update tasks past processing_deadline"))
Err(anyhow!("Could not update tasks past processing_deadline"))
},
)
.await
}

/// Perform upkeep work for tasks that are past expires_at deadlines
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: When retries are enabled, a network error after a successful DB commit can cause the handle_processing_deadline transaction to be retried, double-incrementing processing_attempts.
Severity: MEDIUM

Suggested Fix

To ensure idempotency, the operation within the retryable transaction should be made idempotent. Instead of incrementing processing_attempts, consider using a method that is safe to re-run, or restructure the logic to avoid retrying the entire transaction after a potentially successful commit. Alternatively, ensure that operations within retry_query are inherently idempotent.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/store/adapters/postgres.rs#L832-L926

Potential issue: The `handle_processing_deadline` function is wrapped in a retry
mechanism. If query retries are enabled via `db_query_max_retries`, a specific failure
scenario can cause data corruption. If a transaction commit succeeds on the database but
a network error prevents the client from receiving confirmation, the operation will be
retried. This re-executes the non-idempotent `UPDATE` statement, `processing_attempts =
processing_attempts + 1`, causing the counter to be incremented twice for the same
event. This can lead to tasks being marked as failed prematurely after reaching their
processing limit incorrectly.

// Sync the WAL into the main database so we don't lose data on host failure.
// Best-effort — errors are swallowed and only recorded as metrics.
let checkpoint_timer = Instant::now();
let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best-effort checkpoint can fail the successful store operation

Medium Severity

The acquire_write_conn_metric("store_checkpoint").await? uses ? to propagate connection acquisition errors, but this runs after the INSERT has already succeeded inside retry_query. If the write pool is temporarily unavailable (e.g., PoolTimedOut), the store method returns an error even though data was persisted. The comment on line 518 explicitly says "Best-effort — errors are swallowed and only recorded as metrics," but ? contradicts that by propagating. The old code reused the same connection acquired before the INSERT, so this failure mode didn't exist.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 3bcda75. Configure here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this existed before my change, i just copy pasted the same code over, not within the scope of this PR to fix

Comment thread src/store/adapters/postgres.rs
Comment thread src/store/adapters/sqlite.rs
Comment thread src/store/adapters/postgres.rs
@victoria-yining-huang victoria-yining-huang changed the title feat(RetryStore): handle transient or retryable DB errors feat(query retries): handle transient or retryable DB errors May 17, 2026
Comment on lines +383 to +414
let query = query_builder
.push_values(rows.clone(), |mut b, row: TableRow| {
b.push_bind(row.id);
b.push_bind(row.activation);
b.push_bind(row.partition);
b.push_bind(row.offset);
b.push_bind(row.added_at);
b.push_bind(row.received_at);
b.push_bind(row.processing_attempts);
b.push_bind(row.expires_at);
b.push_bind(row.delay_until);
b.push_bind(row.processing_deadline_duration);
if let Some(deadline) = row.processing_deadline {
b.push_bind(deadline);
} else {
// Add a literal null
b.push("null");
}
if let Some(exp) = row.claim_expires_at {
b.push_bind(exp);
} else {
b.push("null");
}
b.push_bind(row.status);
b.push_bind(row.at_most_once);
b.push_bind(row.application);
b.push_bind(row.namespace);
b.push_bind(row.taskname);
b.push_bind(row.on_attempts_exceeded as i32);
b.push_bind(row.bucket);
})
.push(" ON CONFLICT(id) DO NOTHING")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me we are doing something very similar to what we did before.
The retry_query call is wrapping mostly the whole method rather than the query execution.
Is this a requirement ? (like you cannot reuse the QueryBuilder) If not, is it possible to run only the execute method in the retry_query` function ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, query_builder and query both cannot be reused, that's why they have to be constructed inside retry_query.

Comment thread src/store/adapters/postgres.rs
Comment on lines +685 to +690
let Some(row) = result else {
return Ok(None);
};

Ok(Some(row.into()))
Ok(Some(row.into()))
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if we

  • attempt the query, the DB applies the change but a connection error prevents us from getting the result
  • retry
    Do we get the row back if no change is applied the second time? does the client depend on that ?

Comment thread src/store/adapters/postgres.rs Outdated
.await?;

Ok(())
retry_query(&self.config.retry_config, "clear", || async {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not need to retry the operation to wipe the DB

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment thread src/store/retry.rs Outdated
err.downcast_ref::<sqlx::Error>(),
Some(sqlx::Error::Io(_))
| Some(sqlx::Error::PoolTimedOut)
| Some(sqlx::Error::PoolClosed)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this error recover by retrying ? Don't we have to re-establish the connection pool ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, only Io and PoolTimedOut are retryable, removed the rest

Ok(rows.into_iter().map(|row| row.into()).collect())
Ok(rows.into_iter().map(|row| row.into()).collect())
})
.await
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claim retry hides stranded tasks

Medium Severity

With retry_query enabled, a retryable error after claim_activations commits can leave rows in Claimed/Processing while the retry selects only Pending rows and may return Ok with an empty vector. Callers that treat an empty success as “nothing to deliver” can strand work until claim expiry.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 7a434ee. Configure here.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

There are 6 total unresolved issues (including 3 from previous reviews).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.


Ok(result.rows_affected())
})
.await
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry skews store counts

Medium Severity

When retry_query retries after a successful INSERT ... ON CONFLICT DO NOTHING, the second attempt reports rows_affected of zero even though rows were persisted on the first attempt, so callers can mis-count ingested activations.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.

let result = query.execute(&mut *conn).await?;
Ok(result.rows_affected())
})
.await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry skews store counts

Medium Severity

Same as Postgres store: after a successful insert, a retryable failure and retry make ON CONFLICT DO NOTHING return zero rows_affected while data from the first attempt remains in the database.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.

Ok(rows.into_iter().map(|row| row.into()).collect())
Ok(rows.into_iter().map(|row| row.into()).collect())
})
.await
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry loses claimed activations

High Severity

Same as Postgres claim_activations: a committed claim plus a retryable read error can leave tasks claimed without returning them, while a retry claims other work.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.

// Sync the WAL into the main database so we don't lose data on host failure.
// Best-effort — errors are swallowed and only recorded as metrics.
let checkpoint_timer = Instant::now();
let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The store function incorrectly returns an error if acquiring a connection for the best-effort WAL checkpoint fails, even after the data was successfully written.
Severity: HIGH

Suggested Fix

Handle the potential error from self.acquire_write_conn_metric instead of propagating it with ?. Match on the Result and if it's an Err, log the error and the associated metric, but allow the store function to proceed and return Ok. This will align the implementation with the documented "best-effort" behavior for the checkpoint.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/store/adapters/sqlite.rs#L520

Potential issue: In the `store` function, the WAL checkpoint operation is intended to be
"best-effort," meaning errors should be logged but not cause the function to fail.
However, the connection acquisition for this checkpoint at
`self.acquire_write_conn_metric("store_checkpoint").await?` uses the `?` operator. If
this connection acquisition fails, the error is propagated, causing `store` to return an
`Err` variant. This happens even after the data has been successfully committed, leading
callers to believe the write operation failed when it actually succeeded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants