Skip to content

feat(taskbroker): Batch Status Updates#618

Open
george-sentry wants to merge 12 commits into
mainfrom
george/push-taskbroker/batch-updates
Open

feat(taskbroker): Batch Status Updates#618
george-sentry wants to merge 12 commits into
mainfrom
george/push-taskbroker/batch-updates

Conversation

@george-sentry
Copy link
Copy Markdown
Member

@george-sentry george-sentry commented Apr 30, 2026

Linear

Completes STREAM-918

Description

On the usual workload of 100 millisecond tasks, with the new "claimed" status, we can do around 5K tasks per second in the sandbox. By batching status updates, we reduce DB load, making all queries take less time. This can increase throughput by 1K to 2K tasks per second.

@george-sentry george-sentry requested a review from a team as a code owner April 30, 2026 21:02
@linear-code
Copy link
Copy Markdown

linear-code Bot commented Apr 30, 2026

Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
Comment thread src/main.rs Outdated
@george-sentry george-sentry marked this pull request as draft April 30, 2026 23:24
@george-sentry
Copy link
Copy Markdown
Member Author

Since we may want to treat claimed → processing updates the same way, I'm actually going to create a more general Flusher struct that can be used by both push threads and the gRPC server.

@george-sentry george-sentry marked this pull request as ready for review May 1, 2026 08:02
Comment thread src/grpc/server.rs
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/flusher.rs
/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
/// full or when the max flush interval has elapsed.
pub async fn run_flusher<T, F>(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this function because I'm also planning to batch claimed → processing updates in the push pool, which will use basically identical machinery.

Comment thread src/grpc/server.rs
Comment thread src/grpc/status_flusher.rs Outdated
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/store/adapters/sqlite.rs Outdated
Comment thread src/store/adapters/postgres.rs Outdated
Comment thread src/store/adapters/postgres.rs Outdated
@george-sentry george-sentry changed the title feat(taskbroker): Batch Status Updates and Delete Completed Tasks Immediately feat(taskbroker): Batch Status Updates May 1, 2026
Comment thread src/grpc/status_flusher.rs Outdated
}
}

_ = interval.tick() => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this trigger ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code now lives in flusher.rs, but it contains a similar loop. This condition triggers every interval_ms after the previous tick.

The flusher only handles the tick when select! actually chooses this branch. If messages keep arriving and the rx.recv() arm keeps winning before the tick is ready, the tick still advances in the background. When the tick is ready and this arm is selected, the buffer is flushed.

Comment thread src/grpc/status_flusher.rs Outdated
Comment on lines +124 to +126
for id in ids {
buffer.push((id, status));
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say there is a DB issue, would we keep appending to the buffer indefinitely? I think we should add a limit after which we stop and retry on the DB.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually dead code, but similar logic now lives elsewhere.

No, we only append to the buffer while it hasn't reached the desired batch size. So if there's a DB issue, here's what should happen.

  1. Timer runs out or buffer fills up → call flush (this function)
  2. As long as flush is running, the (now empty) buffer does not receive any more IDs
  3. Flush fails because store is unresponsive or some other problem
  4. IDs are pushed back onto the buffer (which was emptied right before attempting the flush)
  5. Flush function exits

So if the DB has a problem, we will keep retrying the same batch of IDs over and over again until it succeeds.

Comment thread src/grpc/server.rs Outdated

pub type StatusUpdate = (String, InflightActivationStatus);

pub async fn flush_status_updates(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this function should be in the server file. This feels like maybe it should be part of the store itself.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. In its own file within the store module, or within an existing file?

Comment thread src/grpc/server_tests.rs
Comment thread src/flusher.rs Outdated
Some(v) => {
buffer.push(v);

while let Ok(update) = rx.try_recv() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this again here? Won't this update get processed on the next loop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in theory, it'll be slower if we wait for the next loop iteration, since it'll need to be reawakened. The idea is, while we're already awake, we might as well empty the whole channel rather than going back to sleep just to be awakened a moment later for the next item in the channel.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I think there was a bug here that could result in this loop spinning for a very long time. I fixed it by adding a while buffer.len() < batch_size condition.

Comment thread src/main.rs Outdated
let handle = tokio::spawn(async move {
flusher::run_flusher(
rx,
flusher_config.status_flush_batch_size,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right. Won't this create another listener on the channel? Does that mean we will get duplicates?

When we are shutting down presumably we won't get a full batch. That means we are waiting at least status_flush_internal_ms before the application can shut down safely. I would be inclined to have this value be much lower when we are shutting down to try and clear the batch as quickly as possible.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, rx here is the only listener on this channel as it's an MPSC channel. And in this case, there is also only one writer - the gRPC server.

Comment thread src/main.rs
Comment thread src/main.rs Outdated
Comment thread src/grpc/server.rs
Comment thread src/fetch/tests.rs Outdated
Comment thread src/grpc/server.rs Outdated
george-sentry and others added 2 commits May 11, 2026 09:15
Co-authored-by: Markus Unterwaditzer <markus-github@unterwaditzer.net>
Comment thread src/grpc/server.rs
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 25dce9e. Configure here.

Comment thread src/main.rs Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants