feat(query retries): handle transient or retryable DB errors#634
feat(query retries): handle transient or retryable DB errors#634victoria-yining-huang wants to merge 14 commits into
Conversation
george-sentry
left a comment
There was a problem hiding this comment.
I agree that it would be preferable to avoid having two nearly identical macro definitions. Otherwise I like this solution!
|
@george-sentry I removed the duplicated function, only cloning at arg level now |
| } | ||
|
|
||
| #[async_trait] | ||
| impl InflightActivationStore for RetryStore { |
There was a problem hiding this comment.
I don't think this is the right abstraction level where to put the retry (wrapping the entire store with the retry logic).
The ActivationStore does more than running queries to the DB. So we cannot just decide to retry the entire method if the method raises a retriable DB error. What if you wanted to retry the DB query but not retry the entire method ?
- Let's say your method had side effects beyond the DB query you do not want to retry ?
- What if the method ran two queries and you wanted to retry only one ?
The design argument more in general:
- the store has more responsibilities than just running DB queries.
- We want to avoid mixing concerns between abstractions levels to keep the design logically organized and the cognitive overhead low.
- When running the query, there are different categories of errors that can happen. We generally want to retry those that are infrastructure related (disconnection, network, etc.)
- The infrastructure aspects are hidden as lower as possible (like in the connection pool https://github.com/getsentry/taskbroker/blob/main/src/store/adapters/postgres.rs#L106-L114) so that the layers above do not have to be polluted with infrastructure details.
- As we are retrying only infrastructure related issues, the natural place where to put this logic should be as close to the infra as possible so the application only cares of application level issues.
I would recommend adding an abstraction around the connection pool to execute the queries and perform retry when needed rather than retrying the entire method.
I am not too familiar with sqlx, I know it does not manage retries for you:
- You can write a wrapper around the connection pool that performs retry
- sqlx produces futures, I think you can build a simple wrapper to retry them https://github.com/getsentry/symbolicator/blob/f8883320dcc5e8eb6db852620c1d143cacfdf9b9/crates/symbolicator-service/src/download/mod.rs#L387-L409
There was a problem hiding this comment.
i followed symbolicator's retry wrapper that you linked and added that at each appropriate call site.
| atomic.commit().await?; | ||
|
|
||
| if let Ok(query_res) = result { | ||
| processing_deadline_modified_rows += query_res.rows_affected(); | ||
| return Ok(processing_deadline_modified_rows); | ||
| } | ||
| if let Ok(query_res) = result { | ||
| processing_deadline_modified_rows += query_res.rows_affected(); | ||
| return Ok(processing_deadline_modified_rows); | ||
| } | ||
|
|
||
| Err(anyhow!("Could not update tasks past processing_deadline")) | ||
| Err(anyhow!("Could not update tasks past processing_deadline")) | ||
| }, | ||
| ) | ||
| .await |
There was a problem hiding this comment.
Bug: Wrapping transactions with non-idempotent updates like incrementing processing_attempts in retry_query can cause double execution on certain commit errors, leading to incorrect attempt counts.
Severity: MEDIUM
Suggested Fix
Remove the retry_query wrapper from functions that perform non-idempotent operations within a transaction, such as handle_processing_deadline and handle_failed_tasks. Alternatively, refactor the update logic to be idempotent, for example by using a unique client-generated key for each attempt to prevent duplicate processing.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/store/adapters/sqlite.rs#L879-L923
Potential issue: The functions `handle_processing_deadline` and `handle_failed_tasks`
are wrapped in a `retry_query` utility. These functions execute a database transaction
that includes a non-idempotent `UPDATE` operation (`processing_attempts =
processing_attempts + 1`). If retries are enabled and a transient error, such as an
`sqlx::Error::Io`, occurs after the transaction successfully commits on the database but
before the client receives confirmation, the entire transaction will be retried. This
causes the `processing_attempts` to be incremented a second time for the same task,
potentially leading to tasks being marked as failed prematurely when they exceed their
`max_processing_attempts`.
Also affects:
src/store/adapters/postgres.rs:841~895
| ", | ||
| ); | ||
| let query = query_builder | ||
| .push_values(rows.clone(), |mut b, row: TableRow| { |
There was a problem hiding this comment.
Unnecessary clone of large batch data on every store call
Medium Severity
The rows.clone() inside the retry_query closure executes on every store() call — even with the default config of 0 retries — because the Fn closure cannot move captured data. The PR description says this is "no-op" when retries are disabled, but the clone of TableRow vectors (containing activation: Vec<u8> payloads up to 16MB total per batch) adds a non-trivial allocation on the hot write path that didn't exist before.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 73459f3. Configure here.
There was a problem hiding this comment.
This cost is acceptable since retries will be enabled in production.
There was a problem hiding this comment.
I don't think we can say this. This will happen at every write on sqlte or postgres. So it may not be acceptable. Also we will not be able to turn this off by configuration by removing the retries.
Please reach out to @george-sentry and run a load test on this.
| if let Ok(query_res) = processing_attempts_result { | ||
| return Ok(query_res.rows_affected()); | ||
| } | ||
|
|
||
| Err(anyhow!("Could not update tasks past processing_deadline")) | ||
| Err(anyhow!("Could not update tasks past processing_deadline")) | ||
| }, | ||
| ) | ||
| .await | ||
| } | ||
|
|
||
| /// Perform upkeep work for tasks that are past expires_at deadlines |
There was a problem hiding this comment.
Bug: When retries are enabled, a network error after a successful DB commit can cause the handle_processing_deadline transaction to be retried, double-incrementing processing_attempts.
Severity: MEDIUM
Suggested Fix
To ensure idempotency, the operation within the retryable transaction should be made idempotent. Instead of incrementing processing_attempts, consider using a method that is safe to re-run, or restructure the logic to avoid retrying the entire transaction after a potentially successful commit. Alternatively, ensure that operations within retry_query are inherently idempotent.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/store/adapters/postgres.rs#L832-L926
Potential issue: The `handle_processing_deadline` function is wrapped in a retry
mechanism. If query retries are enabled via `db_query_max_retries`, a specific failure
scenario can cause data corruption. If a transaction commit succeeds on the database but
a network error prevents the client from receiving confirmation, the operation will be
retried. This re-executes the non-idempotent `UPDATE` statement, `processing_attempts =
processing_attempts + 1`, causing the counter to be incremented twice for the same
event. This can lead to tasks being marked as failed prematurely after reaching their
processing limit incorrectly.
| // Sync the WAL into the main database so we don't lose data on host failure. | ||
| // Best-effort — errors are swallowed and only recorded as metrics. | ||
| let checkpoint_timer = Instant::now(); | ||
| let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?; |
There was a problem hiding this comment.
Best-effort checkpoint can fail the successful store operation
Medium Severity
The acquire_write_conn_metric("store_checkpoint").await? uses ? to propagate connection acquisition errors, but this runs after the INSERT has already succeeded inside retry_query. If the write pool is temporarily unavailable (e.g., PoolTimedOut), the store method returns an error even though data was persisted. The comment on line 518 explicitly says "Best-effort — errors are swallowed and only recorded as metrics," but ? contradicts that by propagating. The old code reused the same connection acquired before the INSERT, so this failure mode didn't exist.
Reviewed by Cursor Bugbot for commit 3bcda75. Configure here.
There was a problem hiding this comment.
this existed before my change, i just copy pasted the same code over, not within the scope of this PR to fix
| let query = query_builder | ||
| .push_values(rows.clone(), |mut b, row: TableRow| { | ||
| b.push_bind(row.id); | ||
| b.push_bind(row.activation); | ||
| b.push_bind(row.partition); | ||
| b.push_bind(row.offset); | ||
| b.push_bind(row.added_at); | ||
| b.push_bind(row.received_at); | ||
| b.push_bind(row.processing_attempts); | ||
| b.push_bind(row.expires_at); | ||
| b.push_bind(row.delay_until); | ||
| b.push_bind(row.processing_deadline_duration); | ||
| if let Some(deadline) = row.processing_deadline { | ||
| b.push_bind(deadline); | ||
| } else { | ||
| // Add a literal null | ||
| b.push("null"); | ||
| } | ||
| if let Some(exp) = row.claim_expires_at { | ||
| b.push_bind(exp); | ||
| } else { | ||
| b.push("null"); | ||
| } | ||
| b.push_bind(row.status); | ||
| b.push_bind(row.at_most_once); | ||
| b.push_bind(row.application); | ||
| b.push_bind(row.namespace); | ||
| b.push_bind(row.taskname); | ||
| b.push_bind(row.on_attempts_exceeded as i32); | ||
| b.push_bind(row.bucket); | ||
| }) | ||
| .push(" ON CONFLICT(id) DO NOTHING") |
There was a problem hiding this comment.
It seems to me we are doing something very similar to what we did before.
The retry_query call is wrapping mostly the whole method rather than the query execution.
Is this a requirement ? (like you cannot reuse the QueryBuilder) If not, is it possible to run only the execute method in the retry_query` function ?
There was a problem hiding this comment.
correct, query_builder and query both cannot be reused, that's why they have to be constructed inside retry_query.
| let Some(row) = result else { | ||
| return Ok(None); | ||
| }; | ||
|
|
||
| Ok(Some(row.into())) | ||
| Ok(Some(row.into())) | ||
| }) |
There was a problem hiding this comment.
What happens here if we
- attempt the query, the DB applies the change but a connection error prevents us from getting the result
- retry
Do we get the row back if no change is applied the second time? does the client depend on that ?
| .await?; | ||
|
|
||
| Ok(()) | ||
| retry_query(&self.config.retry_config, "clear", || async { |
There was a problem hiding this comment.
You do not need to retry the operation to wipe the DB
| err.downcast_ref::<sqlx::Error>(), | ||
| Some(sqlx::Error::Io(_)) | ||
| | Some(sqlx::Error::PoolTimedOut) | ||
| | Some(sqlx::Error::PoolClosed) |
There was a problem hiding this comment.
Does this error recover by retrying ? Don't we have to re-establish the connection pool ?
There was a problem hiding this comment.
you're right, only Io and PoolTimedOut are retryable, removed the rest
| Ok(rows.into_iter().map(|row| row.into()).collect()) | ||
| Ok(rows.into_iter().map(|row| row.into()).collect()) | ||
| }) | ||
| .await |
There was a problem hiding this comment.
Claim retry hides stranded tasks
Medium Severity
With retry_query enabled, a retryable error after claim_activations commits can leave rows in Claimed/Processing while the retry selects only Pending rows and may return Ok with an empty vector. Callers that treat an empty success as “nothing to deliver” can strand work until claim expiry.
Reviewed by Cursor Bugbot for commit 7a434ee. Configure here.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
There are 6 total unresolved issues (including 3 from previous reviews).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.
|
|
||
| Ok(result.rows_affected()) | ||
| }) | ||
| .await |
There was a problem hiding this comment.
Retry skews store counts
Medium Severity
When retry_query retries after a successful INSERT ... ON CONFLICT DO NOTHING, the second attempt reports rows_affected of zero even though rows were persisted on the first attempt, so callers can mis-count ingested activations.
Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.
| let result = query.execute(&mut *conn).await?; | ||
| Ok(result.rows_affected()) | ||
| }) | ||
| .await?; |
There was a problem hiding this comment.
Retry skews store counts
Medium Severity
Same as Postgres store: after a successful insert, a retryable failure and retry make ON CONFLICT DO NOTHING return zero rows_affected while data from the first attempt remains in the database.
Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.
| Ok(rows.into_iter().map(|row| row.into()).collect()) | ||
| Ok(rows.into_iter().map(|row| row.into()).collect()) | ||
| }) | ||
| .await |
There was a problem hiding this comment.
Retry loses claimed activations
High Severity
Same as Postgres claim_activations: a committed claim plus a retryable read error can leave tasks claimed without returning them, while a retry claims other work.
Reviewed by Cursor Bugbot for commit b5bdb25. Configure here.
| // Sync the WAL into the main database so we don't lose data on host failure. | ||
| // Best-effort — errors are swallowed and only recorded as metrics. | ||
| let checkpoint_timer = Instant::now(); | ||
| let mut conn = self.acquire_write_conn_metric("store_checkpoint").await?; |
There was a problem hiding this comment.
Bug: The store function incorrectly returns an error if acquiring a connection for the best-effort WAL checkpoint fails, even after the data was successfully written.
Severity: HIGH
Suggested Fix
Handle the potential error from self.acquire_write_conn_metric instead of propagating it with ?. Match on the Result and if it's an Err, log the error and the associated metric, but allow the store function to proceed and return Ok. This will align the implementation with the documented "best-effort" behavior for the checkpoint.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: src/store/adapters/sqlite.rs#L520
Potential issue: In the `store` function, the WAL checkpoint operation is intended to be
"best-effort," meaning errors should be logged but not cause the function to fail.
However, the connection acquisition for this checkpoint at
`self.acquire_write_conn_metric("store_checkpoint").await?` uses the `?` operator. If
this connection acquisition fails, the error is propagated, causing `store` to return an
`Err` variant. This happens even after the data has been successfully committed, leading
callers to believe the write operation failed when it actually succeeded.


ticket https://linear.app/getsentry/issue/STREAM-940/handle-server-shutting-down-errors-and-retry
Performance note
The store method in both adapters clones the Vec (containing serialized activation payloads) on every call, even when the first attempt succeeds. This is unavoidable because push_values consumes the vector, and retry requires the data again on failure. Other query sites only capture references and have zero overhead.
The methods NOT wrapped with retry_query are: