Skip to content

feat(taskworker): Add Push Mode to Taskworker#576

Open
james-mcnulty wants to merge 7 commits intomainfrom
george/push-taskbroker/add-push-mode-to-taskworker
Open

feat(taskworker): Add Push Mode to Taskworker#576
james-mcnulty wants to merge 7 commits intomainfrom
george/push-taskbroker/add-push-mode-to-taskworker

Conversation

@james-mcnulty
Copy link
Member

Linear

Completes STREAM-822

Description

Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.

This PR allows users to run the taskworker in push mode via the --push-mode and --grpc-port CLI options.

Option Type Default Description
--push-mode bool False Whether to run in push or pull mode.
--grpc-port int 50052 The port to use for the taskworker gRPC server.

Details

Dependencies

  • Upgrade sentry-protos from 0.4.11 to 0.8.5 (to use the new worker service schema)

Additions

  • Define WorkerServicer class in worker.py
  • Add push_mode and grpc_port fields to TaskWorker and TaskbrokerClient classes
  • Add push_task method to TaskWorker class
  • Add gauge method to metrics backend abstract class
  • Add unit tests for push mode

Modifications

  • Start gRPC server if push mode, enter pull loop if pull mode
  • Add _get_stub method to TaskbrokerClient to connect to brokers on the fly by using the activation callback URL

@james-mcnulty james-mcnulty requested a review from a team as a code owner March 19, 2026 00:20
@linear-code
Copy link

linear-code bot commented Mar 19, 2026

channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets))
return ConsumerServiceStub(channel)

def _get_stub(self, host: str) -> ConsumerServiceStub:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we switch the taskbroker to be a deployment instead of a statefulset all of this logic will go away, the workers don't need to keep track of individual hosts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the workers use the callback_url on the task requests instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think we will even need that, we can just have one service that gets configured that all the workers talk too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One good thing about this is that you can still use push mode with SQLite if you'd like. If we drop callback_url completely, you will have no choice but to use a shared store. It's more flexibility for minimal cost, no?

In our case, callback_url will point to the single broker service that all workers will talk to. But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.

If we're not going to be running sqlite long term, then we should remove it (after providing a way to transition to postgres/in memory)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true, we want to leave it as we do the migration. Once everything is migrated we can point self-hosted to the existing postgres DB.

if timeout is None:
self._child_tasks.put(inflight)
else:
self._child_tasks.put(inflight, timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just do this line, there's no need for a conditional.

self.child_tasks.put(inflight, timeout=None) is equivalent to self._child_tasks.put(inflight).

channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets))
return ConsumerServiceStub(channel)

def _get_stub(self, host: str) -> ConsumerServiceStub:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the workers use the callback_url on the task requests instead?

Comment on lines +474 to +480
with mock.patch("taskbroker_client.worker.worker.grpc.server") as mock_grpc_server:
mock_grpc_server.return_value = mock_server
with mock.patch("taskbroker_client.worker.worker.ThreadPoolExecutor") as mock_tpe:
with mock.patch(
"taskbroker_client.worker.worker.taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server"
) as mock_add_servicer:
exitcode = taskworker.start()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a lot of mocks. How much of the actual implementation is being exercised here now? Should we consider having an integration test that tests the worker processing messages end to end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes definitely. Once the taskbroker and taskworker push changes are both done, I want to add integration tests for push mode.

Comment on lines +193 to +196
server.stop(grace=5)

self.shutdown()
else:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: In push mode, self.shutdown() is not called if the gRPC server terminates without a KeyboardInterrupt, causing child processes and threads to leak.
Severity: MEDIUM

Suggested Fix

Wrap the server.wait_for_termination() call in a try...finally block. The self.shutdown() method should be moved into the finally block to guarantee its execution, ensuring that resources are always cleaned up regardless of how the server terminates.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: clients/python/src/taskbroker_client/worker/worker.py#L193-L196

Potential issue: In push mode, the gRPC server's `start` method has a potential resource
leak. If `server.wait_for_termination()` returns for any reason other than a
`KeyboardInterrupt` (e.g., if `server.stop()` is called from another thread), the
`self.shutdown()` method is never executed. This prevents proper cleanup, leaving child
worker processes and background threads running after the server has stopped. The
current implementation only calls `self.shutdown()` within the `except
KeyboardInterrupt` block, failing to account for other graceful shutdown scenarios.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

with self._host_to_stubs_lock:
if host not in self._host_to_stubs:
self._host_to_stubs[host] = self._connect_to_host(host)
return self._host_to_stubs[host]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent lock usage on _host_to_stubs dict

Low Severity

The new _host_to_stubs_lock protects _host_to_stubs in _get_stub, but _get_cur_stub reads and writes the same dict (lines 260–264) without acquiring the lock. In pull mode, _get_cur_stub runs on the main thread while _get_stub runs on the result thread, creating a race on the shared dict with inconsistent locking.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid bug.

Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a couple bugs from the AI, but otherwise looks good!

with self._host_to_stubs_lock:
if host not in self._host_to_stubs:
self._host_to_stubs[host] = self._connect_to_host(host)
return self._host_to_stubs[host]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid bug.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants