feat(taskbroker): Batch Status Updates#618
Conversation
|
Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general |
…eorge/push-taskbroker/batch-updates
| /// Run flusher that receives values of type T from a channel and flushes | ||
| /// them using the provided async `flush` function either when the batch is | ||
| /// full or when the max flush interval has elapsed. | ||
| pub async fn run_flusher<T, F>( |
There was a problem hiding this comment.
I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.
| } | ||
| } | ||
|
|
||
| _ = interval.tick() => { |
There was a problem hiding this comment.
This code now lives in flusher.rs, but it contains a similar loop. This condition triggers every interval_ms after the previous tick.
The flusher only handles the tick when select! actually chooses this branch. If messages keep arriving and the rx.recv() arm keeps winning before the tick is ready, the tick still advances in the background. When the tick is ready and this arm is selected, the buffer is flushed.
| for id in ids { | ||
| buffer.push((id, status)); | ||
| } |
There was a problem hiding this comment.
Let's say there is a DB issue, would we keep appending to the buffer indefinitely? I think we should add a limit after which we stop and retry on the DB.
There was a problem hiding this comment.
This was actually dead code, but similar logic now lives elsewhere.
No, we only append to the buffer while it hasn't reached the desired batch size. So if there's a DB issue, here's what should happen.
- Timer runs out or buffer fills up → call
flush(this function) - As long as
flushis running, the (now empty) buffer does not receive any more IDs - Flush fails because store is unresponsive or some other problem
- IDs are pushed back onto the buffer (which was emptied right before attempting the flush)
- Flush function exits
So if the DB has a problem, we will keep retrying the same batch of IDs over and over again until it succeeds.
|
|
||
| pub type StatusUpdate = (String, InflightActivationStatus); | ||
|
|
||
| pub async fn flush_status_updates( |
There was a problem hiding this comment.
I don't think this function should be in the server file. This feels like maybe it should be part of the store itself.
There was a problem hiding this comment.
Sure. In its own file within the store module, or within an existing file?
| Some(v) => { | ||
| buffer.push(v); | ||
|
|
||
| while let Ok(update) = rx.try_recv() { |
There was a problem hiding this comment.
Why do this again here? Won't this update get processed on the next loop?
There was a problem hiding this comment.
Yes, but in theory, it'll be slower if we wait for the next loop iteration, since it'll need to be reawakened. The idea is, while we're already awake, we might as well empty the whole channel rather than going back to sleep just to be awakened a moment later for the next item in the channel.
There was a problem hiding this comment.
However, I think there was a bug here that could result in this loop spinning for a very long time. I fixed it by adding a while buffer.len() < batch_size condition.
| let handle = tokio::spawn(async move { | ||
| flusher::run_flusher( | ||
| rx, | ||
| flusher_config.status_flush_batch_size, |
There was a problem hiding this comment.
I'm not sure this is right. Won't this create another listener on the channel? Does that mean we will get duplicates?
When we are shutting down presumably we won't get a full batch. That means we are waiting at least status_flush_internal_ms before the application can shut down safely. I would be inclined to have this value be much lower when we are shutting down to try and clear the batch as quickly as possible.
There was a problem hiding this comment.
No, rx here is the only listener on this channel as it's an MPSC channel. And in this case, there is also only one writer - the gRPC server.
…eorge/push-taskbroker/batch-updates
Co-authored-by: Markus Unterwaditzer <markus-github@unterwaditzer.net>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 25dce9e. Configure here.
….com/getsentry/taskbroker into george/push-taskbroker/batch-updates

Linear
Completes STREAM-918
Description
On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.