Skip to content

feat(taskbroker): Add Push Mode to Taskbroker#573

Open
james-mcnulty wants to merge 17 commits intomainfrom
george/push-taskbroker/add-push-mode
Open

feat(taskbroker): Add Push Mode to Taskbroker#573
james-mcnulty wants to merge 17 commits intomainfrom
george/push-taskbroker/add-push-mode

Conversation

@james-mcnulty
Copy link
Member

@james-mcnulty james-mcnulty commented Mar 17, 2026

Linear

Completes STREAM-820

Description

Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.

This PR allows users to run the taskbroker in push mode that can be adjusted using several new configuration parameters.

Parameter Type Default Description
TASKBROKER_PUSH_MODE bool false Enables push mode.
TASKBROKER_FETCH_THREADS usize 1 Sets the number of fetch threads to run.
TASKBROKER_PUSH_THREADS usize 1 Sets the number of push threads to run.
TASKBROKER_PUSH_QUEUE_SIZE usize 1 Sets the capacity of the channel sitting in front of the push thread pool.
TASKBROKER_WORKER_ENDPOINT String http://127.0.0.1:50052 Sets the worker service endpoint.
TASKBROKER_CALLBACK_ADDR String 0.0.0.0 Sets the host used in the callback URL.
TASKBROKER_CALLBACK_PORT usize 50051 Sets the port used in the callback URL.
TASKBROKER_FETCH_WAIT_MS u64 100 Milliseconds to wait between fetch attempts when no pending activation is found.
TASKBROKER_PUSH_TIMEOUT_MS u64 5000 Maximum number of milliseconds to wait when submitting an activation to the push pool.

Push Threads

On startup, the taskbroker now creates a "push pool," which is a pool of push threads. All of them wait to receive activations from the same MPMC channel provided by the flume crate. When a push thread receives an activation, it sends it to the worker service. Note that each push thread has its own connection to the worker service.

Push threads are grouped together by the PushPool data structure, which exposes a start method to actually spawn the threads and a submit method to receive activations.

Fetch Threads

On startup, the taskbroker also creates a "fetch pool," which is a pool of fetch threads. Each one retrieves a pending activation from the store, passes it to the push pool (waiting until it accepts), and repeats.

Notes on Naming

Fetch threads and push threads are actually asynchronous tasks provided by the Tokio crate. They are not real threads.

Details

Dependencies

  • Add flume 0.12.0 as a dependency (I didn't want to add any dependencies, but Tokio does not provide an asynchronous MPMC queue - only MPSC)
  • Upgrade sentry-protos from 0.4.11 to 0.8.5 (to use the new worker service schema)
  • Upgrade tonic, tonic-health, prost, and prost-types to 0.14 (to match the version used by sentry-protos)

Additions

  • Add FetchPool abstraction in src/fetch.rs
  • Add PushPool abstraction in src/push.rs
  • Use push pool and fetch pool abstractions in src/main.rs
  • Add configuration parameters for push mode

Modifications

  • Return "permission denied" error with explanatory message for get_task when operating in push mode

Future Changes

  • Add useful metrics
  • Fetch and send tasks in batches
  • Update tasks in batches
  • Combine upkeep row count queries into a single query
  • Delete completed tasks immediately

@james-mcnulty james-mcnulty marked this pull request as ready for review March 18, 2026 08:17
@james-mcnulty james-mcnulty requested a review from a team as a code owner March 18, 2026 08:17
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 3 issues found in the latest run.

  • ✅ Fixed: GRPC server address ignores configurable grpc_addr field
    • Updated gRPC bind address construction to use config.grpc_addr together with config.grpc_port instead of hardcoding 0.0.0.0.
  • ✅ Fixed: Callback URL missing protocol scheme for worker callbacks
    • Changed push callback URL formatting to include the http:// scheme so workers receive a valid URI.
  • ✅ Fixed: push_threads=0 causes deadlock unlike guarded fetch_threads
    • Aligned push worker spawning with fetch behavior by using self.config.push_threads.max(1) to guarantee at least one consumer.

Create PR

Or push these changes by commenting:

@cursor push effe733488
Preview (effe733488)
diff --git a/src/main.rs b/src/main.rs
--- a/src/main.rs
+++ b/src/main.rs
@@ -196,7 +196,7 @@
         let config = config.clone();
 
         async move {
-            let addr = format!("0.0.0.0:{}", config.grpc_port)
+            let addr = format!("{}:{}", config.grpc_addr, config.grpc_port)
                 .parse()
                 .expect("Failed to parse address");
 

diff --git a/src/push.rs b/src/push.rs
--- a/src/push.rs
+++ b/src/push.rs
@@ -57,11 +57,11 @@
     pub async fn start(&self) -> Result<()> {
         let mut handles = vec![];
 
-        for _ in 0..self.config.push_threads {
+        for _ in 0..self.config.push_threads.max(1) {
             let endpoint = self.config.worker_endpoint.clone();
 
             let callback_url = format!(
-                "{}:{}",
+                "http://{}:{}",
                 self.config.callback_addr, self.config.callback_port
             );

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now a potential deadlock scenario where the push channels get full and the fetch activations block trying to send. Is that tracked somewhere with a metric?

src/fetch.rs Outdated

debug!("Fetching next pending activation...");

match store.get_pending_activation(None, None).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to use the namespace and application parameters passed into it in order to work correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually something I wanted to bring up. Right now, namespace and application come from the get_task request body. In push mode, there is no easy way to know what values to use here. Should they be provided in the configuration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should they be provided in the configuration?

Yes this is how it will have to work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Config now takes an optional application and an optional list of namespaces.

src/fetch.rs Outdated

Err(e) => {
error!("Failed to fetch pending activation - {:?}", e);
sleep(Duration::from_millis(100)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for a sleep here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a sleep there because if this fails, it's either because the store is having issues (e.g. due to scaling AlloyDB up) or because the push queue is full. In both cases it makes sense to wait a little, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we tell the difference between the two? I would handle those two scenarios differently. For the queue being full I would wait, but for an actual error we might want to take other actions (e.g. crash the entire producer).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely. Any idea what should happen if it's the queue being full versus a store error? For now, I can just distinguish between the two and simply log which one it was without doing anything else until we decide for sure.

```bash
# Run unit/integration tests
make test
make unit-test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's make unit-test, not make test - that doesn't do anything right now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, there is no test target in the Makefile for doing both unit and integration tests, which seemed to be the intention here.

fetch_threads: 1,
push_threads: 1,
push_queue_size: 1,
worker_endpoint: "http://127.0.0.1:50052".into(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the port that we'll be using for the worker in self-hosted and local dev? Ideally local development 'just works' and doesn't require additional configuration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yepp!

src/fetch.rs Outdated

debug!("Fetching next pending activation...");

match store.get_pending_activation(None, None).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a broker is handling multiple applications (like in local development, and in smaller environments) we'll need different worker pools to push to. Perhaps we need a mapping between application -> worker pools?

@james-mcnulty james-mcnulty changed the title feat: Add Push Mode feat: Add Push Mode to Taskbroker Mar 18, 2026
@linear-code
Copy link

linear-code bot commented Mar 18, 2026

@james-mcnulty james-mcnulty changed the title feat: Add Push Mode to Taskbroker feat(taskbroker): Add Push Mode to Taskbroker Mar 18, 2026
Copy link

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do a more in depth review later.
Though please consider doing a refactoring of the config before to separate the push attribute from the pull ones.
That would require its own PR.

src/config.rs Outdated
Comment on lines +243 to +244
/// Run the taskbroker in push mode (as opposed to pull mode).
pub push_mode: bool,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the push_mode boolean into a delivery_mode.

Now I think this configuration will be very hard to set. We have more than ten push specific config elements and more than 10 are pull specific. We need some ways to make it more intuitive.
One option would be to give it a structure, though I do not know whether we rely on these fields to be simple fields. @evanh may know better.
Otherwise, a common pattern for these scenarios is to prefix the poll specific parameters with poll_ and the push specific ones with push_

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid a helper module. It risks becoming a god object without a clear responsibility. If we need a function to spawn pools have a tokio module for this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a tokio module as suggested.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Comment on lines +118 to +123
.get_pending_activations_from_namespaces(
config.application.as_deref(),
config.namespaces.as_deref(),
Some(1),
)
.await;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: In push mode, fetch_activation bypasses validation, causing it to fetch tasks across all applications if only namespaces are configured without an application, breaking tenant isolation.
Severity: HIGH

Suggested Fix

Ensure that fetch_activation in src/fetch/mod.rs validates its configuration. Before calling store.get_pending_activations_from_namespaces, add a check to confirm that if config.namespaces is Some, then config.application must also be Some. If the validation fails, log a warning and return early, mirroring the logic in get_pending_activation.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/fetch/mod.rs#L118-L123

Potential issue: In push mode, if `TASKBROKER_NAMESPACES` is configured without
`TASKBROKER_APPLICATION`, the `fetch_activation` function bypasses a critical validation
check. It calls `get_pending_activations_from_namespaces` directly, which proceeds to
fetch tasks from the specified namespaces across all applications. This breaks the
intended multi-tenant isolation model by allowing unintended cross-application task
leakage. The pull mode is not affected as it correctly uses a wrapper function
containing the necessary validation logic.

/// - `Ok(true)` if an activation was found
/// - `Ok(false)` if none pending
/// - `Err` if fetching failed.
pub async fn fetch_activation<T: TaskPusher>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a standalone function instead of part of TaskPusher? That would cut down on the number of clone calls (which are memcpy commands).


// Instead of returning when `fetch_activation` fails, we just try again
match fetch_activation(store.clone(), pusher.clone(), config.clone()).await {
Ok(false) | Err(_) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should separate these cases. I would move any error handling (logging etc.) up to this function. That way we also handle any unexpected errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants