feat(worker): Add worker-initiated requests support#138
Conversation
commit: |
5157da1 to
2ae43a1
Compare
3ba18eb to
f7e2e67
Compare
| }); | ||
| next = yield* requestSubscription.next(); | ||
| } | ||
| return yield* outcome.operation; |
There was a problem hiding this comment.
I think forEach should not resolve with the worker return value.
For example, a user might assume that the return type of worker.forEach(function*() { return 'some' }) is a string meanwhile the worker actually returns a value of a different type.
There was a problem hiding this comment.
I think this might be a misunderstanding of the type parameters - let me clarify:
-
WResponseis the per-request response that gets sent back to the worker. When the worker callssend('hello'), your handler returnsWResponse, and the worker receives it. -
TReturn(fromuseWorker<TSend, TRecv, TReturn, TData>) is the worker's final return value - what the worker's main function returns after all its work is done.
These are intentionally different:
// Worker-side
await workerMain(function* ({ send }) {
const r1 = yield* send('req1'); // Gets back WResponse
const r2 = yield* send('req2'); // Gets back WResponse
return 'all done'; // This is TReturn
});
// Host-side
const worker = yield* useWorker<TSend, TRecv, TReturn>(...);
const result = yield* worker.forEach<WRequest, WResponse>(function* (req) {
return 'response'; // This is WResponse, sent to worker
});
// result is TReturn ('all done'), not WResponseforEach serves two purposes:
- Handle each request from the worker (returning
WResponse) - Block until the worker completes (returning
TReturn)
The alternative would be separate methods like worker.handleRequests() + worker.result(), but combining them in forEach felt natural since you typically want to wait for the worker to finish anyway.
Does this clarify the design? Happy to discuss if you think separate methods would be clearer!
There was a problem hiding this comment.
But yield* worker gives the return value. Why not keep just that?
There was a problem hiding this comment.
The two ways of getting return values are meant for different use cases.
forEach is meant to be used when you expect worker to send you messages. In this case, it seems logical that forEach would give you each message and return value would give you the close value (this is how forEach helper works in stream-helpers)
yield* worker used when you just want to get return value.
They both would effective give you same thing, but with different workers.
joshamaju
left a comment
There was a problem hiding this comment.
I've dropped some of my observations.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds bidirectional, MessagePort channel-based worker-host communication: worker-initiated requests with optional progress streaming and backpressure, ACKs and timeouts, serialized errors/results crossing the boundary, new types and APIs (send/send.stream, forEach), many test assets and comprehensive tests, package/tsconfig updates, and removal of a legacy helper. Changes
Sequence DiagramssequenceDiagram
participant Host
participant WorkerThread
participant MessagePort as MessagePort
Host->>WorkerThread: spawn via useWorker()
WorkerThread->>Host: post { type: "open" }
rect rgba(100, 150, 200, 0.5)
Note over Host,WorkerThread: Worker-initiated request (with response port)
WorkerThread->>MessagePort: send WorkerToHost { type: "request", value, response: port }
Host->>MessagePort: receive on port1
Host->>Host: forEach handler invoked
Host->>MessagePort: send progress / final response (ChannelMessage)
WorkerThread->>MessagePort: receive progress/response
WorkerThread->>MessagePort: send ack / progress_ack
end
sequenceDiagram
participant Worker as WorkerThread
participant Port as MessagePort
participant Host
rect rgba(150, 100, 200, 0.5)
Note over Worker,Host: Progress streaming (Worker → Host) with backpressure
Worker->>Port: send progress message
Host->>Port: receive progress
Host->>Host: ctx.progress(...) processes progress
Host->>Port: send progress_ack
Worker->>Port: await progress_ack
end
rect rgba(200, 150, 100, 0.5)
Note over Worker,Host: Final response and ACK handshake
Worker->>Port: send final SerializedResult
Host->>Port: receive final result
Host->>Port: send ack
Worker->>Port: receive ack and complete
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@worker/channel.ts`:
- Around line 164-211: The current Symbol.iterator implementation (inside
waitForResponse) relies on once(channel.port1, "close") which doesn't exist in
browsers; update the code so that if the MessagePort does not support a "close"
event you either (A) require/validate options.timeout is provided and throw a
clear error when omitted, or (B) implement an explicit liveness protocol: send
periodic "ping"/"keepalive" messages from the caller and expect
"pong"/"progress" or dedicated keepalive ACKs from the responder and treat
missed pings as a crash—modify waitForResponse and the surrounding iterator to
feature-detect support for port close (e.g., check for onclose/event support or
that once(channel.port1,"close") would never resolve) and branch to the
timeout-or-heartbeat strategy accordingly (update related logic in timebox,
options.timeout handling, and the channel.port1 ACK/send paths).
In `@worker/worker.test.ts`:
- Around line 249-251: The test uses time-based sleeps inside the worker.forEach
handler (the yield* sleep(...) call) which makes ordering flaky; replace those
sleeps with a deterministic synchronization primitive (e.g., withResolvers, an
explicit latch/promise pair, or a handshake from the worker asset) so you can
block/unblock the handler deterministically during the test; update the affected
test functions that call worker.forEach<number, number> (and any other handlers
around the mentioned locations) to await or resolve the latch at the precise
points where you currently sleep, ensuring the test controls request timing
without timeouts.
🧹 Nitpick comments (2)
worker/types.ts (1)
84-90: Consider preservingerror.causein serialization.The
serializeErrorfunction serializesname,message, andstack, but does not preserveerror.cause. If the original error has a cause chain, that context is lost. This may be intentional (sincecausecould contain non-serializable data), but worth noting.Optional: Recursively serialize cause if present
export function serializeError(error: Error): SerializedError { + const cause = error.cause instanceof Error + ? serializeError(error.cause) + : undefined; return { name: error.name, message: error.message, stack: error.stack, + cause, }; }This would require adding
cause?: SerializedErrorto theSerializedErrorinterface.worker/worker.ts (1)
16-21: Separate type imports from value imports.Per coding guidelines, prefer
typeimports for type-only imports.♻️ Suggested refactor
import { useChannelRequest, useChannelResponse } from "./channel.ts"; -import { - type ForEachContext, - type SerializedError, - errorFromSerialized, -} from "./types.ts"; +import type { ForEachContext, SerializedError } from "./types.ts"; +import { errorFromSerialized } from "./types.ts";
…rEach test Replace sleep(100) with withResolvers latch for deterministic test synchronization per No-Sleep Test Sync policy. Addresses CodeRabbit feedback on PR #138.
Addressed CodeRabbit Feedback1. Browser MessagePort
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
worker/worker.test.ts (1)
249-252: Consider deterministic ordering instead of timing-based delays.The
sleep(request * 10)simulates different processing times to test concurrent request handling. While this isn't strictly test synchronization, timing-based ordering can be flaky under heavy system load. Consider using latches or signals to control completion order deterministically if flakiness is observed.
…rEach test Replace sleep(100) with withResolvers latch for deterministic test synchronization per No-Sleep Test Sync policy. Addresses CodeRabbit feedback on PR #138.
c66624d to
8383206
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
Before applying any fix, first verify the finding against the current code and
decide whether a code change is actually needed. If the finding is not valid or
no change is required, do not modify code for that item and briefly explain why
it was skipped.
In `@worker/channel.test.ts`:
- Around line 8-11: The test helper sleepThenTimeout and the race-based,
time-sleep waits should be replaced with deterministic synchronization using the
existing helpers: remove sleepThenTimeout and stop relying on long sleep(ms) in
race patterns; instead use withResolvers to create controllable resolvers, wait
for the channel's close event (e.g., subscribe to the channel close or use the
channel.close promise) and use when/is with a short timeout wrapper to assert
ordering, using sleep(0) only if you need to yield the event loop; update
occurrences that call sleepThenTimeout or perform time-based races (including
the race patterns around close detection) to resolve via withResolvers +
close-event + when(timeout) so tests are deterministic.
In `@worker/channel.ts`:
- Around line 326-378: Extract the duplicated ACK-waiting logic from the
generator methods *resolve and *reject into a small shared helper (e.g.,
waitForAck or awaitAck) that accepts the MessagePort (port) and returns after
validating an incoming ChannelAck or returns early on port "close"; use the same
race([ once(port,"message"), once(port,"close") ]) logic, cast the event to
Event/MessageEvent, check for close and for ack?.type === "ack" and throw the
same error if validation fails, and keep port cleanup in the existing finally
blocks; then replace the duplicated blocks in *resolve and *reject with a single
call to this helper (referencing ChannelMessage<TResponse,TProgress>, ChannelAck
and serializeError from the current scope).
In `@worker/types.ts`:
- Around line 84-90: serializeError currently omits nested causes; update the
SerializedError type to include an optional "cause?: SerializedError | string |
undefined" and modify the serializeError function to capture error.cause — if
cause is an Error, recursively call serializeError(cause); if cause is a string
or other primitive, store it directly (or convert to string); if absent, leave
undefined. Ensure you reference the serializeError function and the
SerializedError type so the change is propagated where serialized errors are
created and consumed.
In `@worker/worker-main.ts`:
- Around line 106-116: The package version for the worker package was not bumped
for this new feature; update the package manifest by changing the "version"
field in package.json for the worker package to 0.5.0 (semantic minor bump for a
new feature), ensure any lockfile or workspace manifest (e.g.,
package-lock.json, pnpm-lock.yaml, or root package.json workspaces) is
updated/committed to reflect the new version, and add a short changelog entry
noting the feature if one exists; refer to the workerMain symbol in
worker-main.ts to locate the package that needs the version bump.
Extract duplicate ACK-waiting code from resolve(), reject(), and progress() methods into a shared waitForAck() helper function. This reduces code duplication and makes the ACK handling more maintainable. Addresses CodeRabbit nitpick feedback on PR #138.
Add recursive serialization of error.cause in serializeError() so nested error chains are preserved when errors cross the worker boundary. This provides better debugging context when errors have underlying causes. Addresses CodeRabbit nitpick feedback on PR #138.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
Verify each finding against the current code and only fix it if needed.
In `@worker/channel.test.ts`:
- Around line 723-731: The timing assertions for progressDurations are flaky on
slow CI; update the test that checks progressDurations[0] and
progressDurations[1] (which compares against processingTime) to avoid tight
millisecond bounds—either increase the tolerances (e.g., use larger thresholds
than 20ms and processingTime - 10) or replace the timing checks with
ordering/signal-based checks (resolve a Promise or set flags when the first
progress completes and assert the second progress happens after that signal) so
the test verifies backpressure/order without relying on brittle exact timing.
In `@worker/channel.ts`:
- Around line 260-269: The timeout applied in the progress subscription is
per-call (each invocation of waitForNext via next()), not cumulative for the
whole exchange; update the code around the timebox(timeout, waitForNext) block
(symbols: timebox, waitForNext, timeout, next()) to add a concise comment
stating this per-call semantics and, if you want cumulative behavior, implement
tracking of elapsed time (record start time, subtract elapsed from timeout for
subsequent timebox calls) or mention that alternative in the comment so callers
understand how to change it.
Add 14 new test cases and 9 test asset workers for the upcoming worker-initiated requests feature. This allows workers to send requests to the host, enabling bidirectional communication. Tests are written first (TDD) and will fail until implementation is complete. The PLAN-worker-requests.md contains the detailed implementation plan with all design decisions documented.
Implements the worker→host request/response pattern allowing workers to send requests to the host and receive responses. This enables use cases like tool sessions where workers need to call back to the host. Changes: - Add send() function to workerMain options for worker→host requests - Add forEach() method to WorkerResource for host to handle requests - Implement error serialization with cause chain for cross-boundary errors - Use race() pattern so forEach exits cleanly when worker closes - Add concurrent forEach guard with try/finally cleanup - Fix bidirectional worker test to spawn messages.forEach in background All 14 new tests pass along with existing tests.
Verifies that workers can call send() to the host while handling a message from the host inside messages.forEach. This tests the nested bidirectional communication pattern.
Clarifies that these type parameters are for worker-initiated requests, distinguishing them from the host-initiated TSend/TRecv types. Also replaces race() with spawn() for the request processor loop to improve compatibility with effection 3.0.0.
Ensures compatibility with effection 3.0.0 on macOS by explicitly halting the request processor task when the outcome resolves, rather than relying on implicit scope cleanup.
Replace signal/each pattern with a callback-based approach that avoids the race condition in effection v3. Instead of spawning a processor task that blocks on a signal iterator, we now: 1. Queue requests that arrive before forEach is called 2. Set a handler callback when forEach is active 3. Dispatch requests via scope.run() from the message handler 4. Clear the handler when forEach exits This eliminates the need to halt a blocking iterator, which had inconsistent behavior between effection v3 and v4.
Add two complementary channel primitives for request-response communication: - useChannelResponse: Requester side - creates MessageChannel, returns port to transfer and operation to await response (with automatic ACK) - useChannelRequest: Responder side - wraps received port, provides resolve/reject operations that wait for ACK before closing Features: - ACK mechanism guarantees response delivery before port cleanup - Simple validation throws on unexpected messages - Proper resource cleanup with finally blocks - Full test coverage (9 tests)
Plan for integrating useChannelResponse/useChannelRequest into worker implementation. Includes prerequisite fix for cancellation handling: - Race ACK wait against port close event - Prevents responder from hanging when requester is cancelled
- Add SerializedResult<T> type to fix error typing mismatch - Clarify that ACK is sent for both Ok and Err responses - Update all code examples to use SerializedOk/SerializedErr - Note that resolve() is used for both success/error (reject is for Operation errors)
Explicitly document that useChannelResponse and useChannelRequest handle port.start() and port.close() internally, so removing these calls from worker code is intentional, not a regression.
Add tests to plan for: - ACK sent/received on error path (not just success) - Port closes if responder exits without calling resolve/reject - Port closes if responder throws before responding
…internally
- resolve(value) wraps in { ok: true, value } internally
- reject(error) serializes and wraps in { ok: false, error } internally
- Callers use natural resolve/reject semantics
- Channel primitive handles SerializedResult wrapping
- Remove SerializedOk/SerializedErr helper functions from exports
- ChannelResponse.operation returns SerializedResult<T>
- Use withResolvers for synchronization instead of sleep - Add tests for requester cancellation while waiting - Add tests for requester scope exit without calling operation - All tests use explicit signals for coordination
- Add SerializedResult<T> type for type-safe cross-boundary error handling - Update useChannelResponse with close detection and optional timeout - Update useChannelRequest to wrap responses in SerializedResult internally - Replace manual MessageChannel usage in worker.ts and worker-main.ts - Add @effectionx/timebox dependency for timeout support - Add comprehensive tests for close detection, timeout, and cancellation - Add .opencode/plans/ to .gitignore
- Remove redundant port.close() from resolve/reject (finally handles it) - Document reject() as application-level error, not transport error - Document close event runtime behavior across Node.js/browser/Deno - Refactor fragile cancellation test to use raw postMessage for precise signaling
Make ChannelResponse<T> extend Operation<SerializedResult<T>> so callers
can yield the response object directly instead of using a separate
.operation property.
Before:
const { port, operation } = yield* useChannelResponse<T>();
const result = yield* operation;
After:
const response = yield* useChannelResponse<T>();
const result = yield* response;
Extend channel primitives to support bidirectional progress streaming: - useChannelResponse: add 'progress' property returning Subscription - useChannelRequest: add 'progress()' method with backpressure (ACK) - forEach: pass ForEachContext with progress() to handler - send.stream<TProgress>(): receive progress updates from host Wire protocol extended with progress/progress_ack message types. All tests comply with no-sleep-test-sync policy using withResolvers() for deterministic synchronization. Tests: 53 pass (14 new progress streaming tests)
- Add progress streaming section to README with examples - Document true backpressure: host blocks until worker calls next() - Add test proving backpressure behavior (host waits for worker readiness)
Address reviewer feedback: errors in forEach handler should crash the host, not be silently swallowed. The error is still forwarded to the worker so it knows the request failed, but then re-thrown so the host doesn't continue running after a handler failure. This change also serializes request handling (one at a time) which makes the error semantics cleaner - if a handler fails, we stop immediately.
…rEach test Replace sleep(100) with withResolvers latch for deterministic test synchronization per No-Sleep Test Sync policy. Addresses CodeRabbit feedback on PR #138.
This release adds worker-initiated requests with progress streaming, bidirectional communication, and channel-based messaging primitives.
Use @effectionx/timebox for timeout handling instead of custom sleepThenTimeout helper. This provides cleaner semantics with the Timeboxed<T> discriminated union type. Addresses CodeRabbit feedback on sleep-based test synchronization.
Extract duplicate ACK-waiting code from resolve(), reject(), and progress() methods into a shared waitForAck() helper function. This reduces code duplication and makes the ACK handling more maintainable. Addresses CodeRabbit nitpick feedback on PR #138.
Add recursive serialization of error.cause in serializeError() so nested error chains are preserved when errors cross the worker boundary. This provides better debugging context when errors have underlying causes. Addresses CodeRabbit nitpick feedback on PR #138.
Increase tolerance bounds in backpressure timing test to avoid flakiness on slow CI systems: - First progress: 20ms -> 50ms - Second progress: processingTime - 10 -> processingTime - 20 Addresses CodeRabbit nitpick on timing-based assertions.
…tion Add comment explaining that timeout is applied per next() call, not cumulatively for the entire exchange. This helps callers understand the behavior when dealing with many progress updates. Addresses CodeRabbit nitpick on timeout semantics documentation.
add0c15 to
6bc2592
Compare
Motivation
The current
@effectionx/workerpackage only supports host-to-worker communication: the host can send messages to workers and receive responses. However, some use cases require the reverse direction - workers initiating requests to the host.A concrete example is the sweatpants project's web worker transport for MCP tool sessions. In this architecture:
This is the opposite of the current pattern where the host is always the initiator.
What's New
Progress Streaming Support
This PR now includes bidirectional progress streaming - the host can send progress updates back to the worker during request processing:
Worker side (receive progress):
Host side (send progress):
Backpressure Semantics
The
progress()method implements true backpressure:ctx.progress()blocks until the worker callssubscription.next()next()calls, the host remains blockedThis ensures the worker is never overwhelmed with progress updates. The ACK is sent inside
subscription.next(), so the host waits for the worker to be ready for the next value.Key Features
Approach
This PR adds symmetric bidirectional communication by introducing:
New Worker-Side API:
send()functionWorkers receive a
sendfunction in theirworkerMainoptions:New Host-Side API:
worker.forEach()methodHosts can handle worker requests using
forEach:Channel Primitives
Introduces reusable channel primitives for request-response over MessageChannel:
useChannelResponse<TResponse, TProgress>(options?)- Requester sideyield* response- waits for response, ignores progressyield* response.progress- returns Subscription that yields progress then response@effectionx/timeboxSerializedResult<T>for type-safe error handlinguseChannelRequest<TResponse, TProgress>(port)- Responder sideresolve(value),reject(error), andprogress(data)operationsprogress()sends update and waits for ACK (true backpressure)SerializedResult)Wire Protocol
Messages sent over channel (Host → Worker):
Acknowledgements (Worker → Host):
Key Design Decisions
{ ok: true, value }or{ ok: false, error: SerializedError }for cross-boundary communication{ name, message, stack }and wrapped withcauseon the receiving sideuseChannelResponseusing@effectionx/timeboxforEach()is called are queued (unbounded)forEach()call allowed at a time"open"viawithResolvers, and ensures outcome settles during teardown if the close message arrives latenext(), so host waits for worker readinessType Parameter Naming
TSend/TRecv- Host-initiated communication (host sends, worker receives/responds)WRequest/WResponse- Worker-initiated communication (worker sends, host receives/responds)WProgress- Progress updates from host to workerTests
All 54 worker tests pass (4 skipped):
Channel primitive tests (32 tests):
yield* responseignores progressWorker-initiated request tests (15 tests):
Policy Compliance
Tests comply with the No-Sleep Test Synchronization Policy:
withResolvers()for callback synchronizationsleep(0)only for yielding controlsleep(ms)only inside spawned tasks to trigger conditionsRelated Issues
Summary by CodeRabbit
New Features
Documentation
Tests
Chores