feat(taskworker): Add Push Mode to Taskworker#576
feat(taskworker): Add Push Mode to Taskworker#576james-mcnulty wants to merge 7 commits intomainfrom
Conversation
| channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets)) | ||
| return ConsumerServiceStub(channel) | ||
|
|
||
| def _get_stub(self, host: str) -> ConsumerServiceStub: |
There was a problem hiding this comment.
Once we switch the taskbroker to be a deployment instead of a statefulset all of this logic will go away, the workers don't need to keep track of individual hosts.
There was a problem hiding this comment.
Will the workers use the callback_url on the task requests instead?
There was a problem hiding this comment.
I actually don't think we will even need that, we can just have one service that gets configured that all the workers talk too.
There was a problem hiding this comment.
One good thing about this is that you can still use push mode with SQLite if you'd like. If we drop callback_url completely, you will have no choice but to use a shared store. It's more flexibility for minimal cost, no?
In our case, callback_url will point to the single broker service that all workers will talk to. But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.
There was a problem hiding this comment.
But if someone else wants to continue using SQLite, it would point to the specific taskbroker it came from.
If we're not going to be running sqlite long term, then we should remove it (after providing a way to transition to postgres/in memory)
There was a problem hiding this comment.
Yes that's true, we want to leave it as we do the migration. Once everything is migrated we can point self-hosted to the existing postgres DB.
| if timeout is None: | ||
| self._child_tasks.put(inflight) | ||
| else: | ||
| self._child_tasks.put(inflight, timeout=timeout) |
There was a problem hiding this comment.
I think you can just do this line, there's no need for a conditional.
self.child_tasks.put(inflight, timeout=None) is equivalent to self._child_tasks.put(inflight).
| channel = grpc.intercept_channel(channel, RequestSignatureInterceptor(secrets)) | ||
| return ConsumerServiceStub(channel) | ||
|
|
||
| def _get_stub(self, host: str) -> ConsumerServiceStub: |
There was a problem hiding this comment.
Will the workers use the callback_url on the task requests instead?
| with mock.patch("taskbroker_client.worker.worker.grpc.server") as mock_grpc_server: | ||
| mock_grpc_server.return_value = mock_server | ||
| with mock.patch("taskbroker_client.worker.worker.ThreadPoolExecutor") as mock_tpe: | ||
| with mock.patch( | ||
| "taskbroker_client.worker.worker.taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server" | ||
| ) as mock_add_servicer: | ||
| exitcode = taskworker.start() |
There was a problem hiding this comment.
This is a lot of mocks. How much of the actual implementation is being exercised here now? Should we consider having an integration test that tests the worker processing messages end to end?
There was a problem hiding this comment.
Yes definitely. Once the taskbroker and taskworker push changes are both done, I want to add integration tests for push mode.
| server.stop(grace=5) | ||
|
|
||
| self.shutdown() | ||
| else: |
There was a problem hiding this comment.
Bug: In push mode, self.shutdown() is not called if the gRPC server terminates without a KeyboardInterrupt, causing child processes and threads to leak.
Severity: MEDIUM
Suggested Fix
Wrap the server.wait_for_termination() call in a try...finally block. The self.shutdown() method should be moved into the finally block to guarantee its execution, ensuring that resources are always cleaned up regardless of how the server terminates.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: clients/python/src/taskbroker_client/worker/worker.py#L193-L196
Potential issue: In push mode, the gRPC server's `start` method has a potential resource
leak. If `server.wait_for_termination()` returns for any reason other than a
`KeyboardInterrupt` (e.g., if `server.stop()` is called from another thread), the
`self.shutdown()` method is never executed. This prevents proper cleanup, leaving child
worker processes and background threads running after the server has stopped. The
current implementation only calls `self.shutdown()` within the `except
KeyboardInterrupt` block, failing to account for other graceful shutdown scenarios.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| with self._host_to_stubs_lock: | ||
| if host not in self._host_to_stubs: | ||
| self._host_to_stubs[host] = self._connect_to_host(host) | ||
| return self._host_to_stubs[host] |
There was a problem hiding this comment.
Inconsistent lock usage on _host_to_stubs dict
Low Severity
The new _host_to_stubs_lock protects _host_to_stubs in _get_stub, but _get_cur_stub reads and writes the same dict (lines 260–264) without acquiring the lock. In pull mode, _get_cur_stub runs on the main thread while _get_stub runs on the result thread, creating a race on the shared dict with inconsistent locking.
Additional Locations (1)
evanh
left a comment
There was a problem hiding this comment.
Still a couple bugs from the AI, but otherwise looks good!
| with self._host_to_stubs_lock: | ||
| if host not in self._host_to_stubs: | ||
| self._host_to_stubs[host] = self._connect_to_host(host) | ||
| return self._host_to_stubs[host] |


Linear
Completes STREAM-822
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
This PR allows users to run the taskworker in push mode via the
--push-modeand--grpc-portCLI options.--push-modeboolFalse--grpc-portint50052Details
Dependencies
sentry-protosfrom 0.4.11 to 0.8.5 (to use the new worker service schema)Additions
WorkerServicerclass inworker.pypush_modeandgrpc_portfields toTaskWorkerandTaskbrokerClientclassespush_taskmethod toTaskWorkerclassgaugemethod to metrics backend abstract classModifications
_get_stubmethod toTaskbrokerClientto connect to brokers on the fly by using the activation callback URL