feat(data-connect): add support for streaming transport#9809
Conversation
…structure (#9603) * restructure existing transport code into new file structure * remove DataConnectResponse from public API * update copyright year
…on (#9609) * introduce transport manager and transport abstraction * update tests, run API docs and linter * fix invokeQuery * fix invokeMutation * fix unit tests, update invokeUnsubscribe function signature * run yarn format * update names of transport classes/interfaces, update imports * remove circular imports * small edit to re-run jobs
…ort Implementation (#9618) * introduce new abstract transport base class * rename RestTransport to restTransport * update rest to use DataConnectResponseWithMaxAge instead of DataConnectResponse
* first pass at request ID, headers/name field, and opening request logic * updating unit tests * finish unit tests * fix _prepareMessage tests * undo e2e changes, fix formatting * remove unneeded gql files * add onAuthTokenChanged back * use interface exposing internal members for testing * run yarn format
* first pass at request ID, headers/name field, and opening request logic * updating unit tests * finish unit tests * fix _prepareMessage tests * undo e2e changes, fix formatting * remove unneeded gql files * add onAuthTokenChanged back * use interface exposing internal members for testing * run yarn format * first pass at adding basic stream request management logic * first pass at adding basic stream request management logic * undo test changes * add unit tests for incoming requests from transport layer * fix unit tests after adding _connectorResourcePath to all transports * add unit tests for incoming responses from server * refactor unit tests * run yarn format * remove initStreamTransport * rename _handleMessage to _handleResponse * run yarn format * fix comment typo * update visibility of subscribe notification hooks tracking map
…eQuery/invokeMutation (#9723) * update validateArgs to handle new options arguments * add validateArgs unit tests * split functionality into separate functions, add changeset * add hasVars argument to help differentiate first argument * update changelog * export validateArgs * remove vestigial console log * refactor tests * run yarn format * make sendMessage synchronous again * fix error swallowing * preserve synchronicity of invokeQuery and invokeMutation, update test promise rejection checking * finish updating tests to verify non-settled promises * run yarn format * update test title to clearly define expected behavior * update changeset * add promise unsettled helper per reviewer suggestion, run yarn format
* add websocket implementation * add open unit tests for websocket * add more websocket unit tests * add more websocket unit tests * WIP adding proper error propagation * WIP adding proper error propagation * split error and websockets into two separate branches * update error typing * add more unit tests * refactor tests * run yarn format * make sendMessage synchronous again * fix error swallowing * preserve synchronicity of invokeQuery and invokeMutation, update test promise rejection checking * finish updating tests to verify non-settled promises * run yarn format * reintroduce tests lost to merge * update test title to clearly define expected behavior * remove old comment * slowly merging pasta/main - 1 * slowly merging pasta/main - 2 * slowly merging pasta/main - 3 * add better tracking / cleaning to stream transport * update unit tests to check for error behavior * Merge pasta/main into pasta/orzo * run yarn format * remove redundant cleanup - add comment explaining no need for cleanup * update tests * update tests * run yarn format
* initial commit for websockets * update implementation a bit * verifying tests * update tests * run yarn format * add error handling to websocket message handling * harden websocket message parsing
* initial commit for websockets * update implementation a bit * verifying tests * update tests * run yarn format * add error handling to websocket message handling * harden websocket message parsing * first checkpoint for hooking streaming support into manager * update tests * WIP updating tests * finish initialization and lazy loading tests * fixed transport manager tests * final unit tests fix - now integration testing * update initialize* function documentation * fix logError related unit tests * add initializeWebSocket call to index.node.ts
* initial commit for websockets * update implementation a bit * verifying tests * update tests * run yarn format * add error handling to websocket message handling * harden websocket message parsing * first checkpoint for hooking streaming support into manager * update tests * WIP updating tests * finish initialization and lazy loading tests * fixed transport manager tests * final unit tests fix - now integration testing * update initialize* function documentation * fix logError related unit tests * add initializeWebSocket call to index.node.ts * first checkpoint for hooking up streaming to query layer * add error checking to generated subscribe notification hook * first checkpoint for streaming tests * working through tests * WIP updating tests * existing tests done, adding more * final tests * added more tests for execute over stream * ensure callbacks are updated before invoking subscribe * add TODO * update backend endpoint for websocket communication * run yarn format * remove debugging print statements * move connector resource path initialization to be after connector name initialization * remove debugging print statements * fix some breaking tests, add some documentation, run yarn format * update websocket url endpoint * refactor sendMessage functions to more clearly orchestrate order of operations and eliminate race conditions- ensure connection
* add binary websocket message conversion to strings for messages from prod * run yarn format * apply reviewer suggestions
🦋 Changeset detectedLatest commit: 9042e51 The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
There was a problem hiding this comment.
Code Review
This pull request adds a new transport layer to the Data Connect SDK, introducing a DataConnectTransportManager that manages routing between REST and a new WebSocket-based streaming transport. This architecture supports subscriptions and push notifications. Key implementation details include the AbstractDataConnectStreamTransport for request management and WebSocketTransport for connection handling. Review feedback highlights a hardcoded health check that disables the REST fallback, potential race conditions in query tracking maps, and several TypeScript type-safety issues where error arrays are incorrectly typed as empty tuples. Suggestions were also provided to improve variable initialization and error handling consistency.
hsubox76
left a comment
There was a problem hiding this comment.
Make sure to add a changeset for data-connect (minor) - and don't forget to manually add "firebase": minor as well.
| export * from './api'; | ||
| export * from './api.node'; | ||
| initializeFetch(fetch); | ||
| initializeWebSocket(WebSocket); |
There was a problem hiding this comment.
Websockets are stable in Node only as of Node 22. Officially the JS SDK supports Node 20+ (we will likely bump to 22 at the next breaking change release, maybe July). Will there be docs telling users to use a polyfill or make sure they're using Node 22+? I think there should at least be an engines field in the package.json reflecting the minimum required Node version. See https://github.com/firebase/firebase-js-sdk/blob/pasta/main/packages/ai/package.json#L7
There was a problem hiding this comment.
I see Node 21 is the first to add WebSockets support, but it's experimental, while Node 22 enables it by default, like you mentioned, and it looks like Node 20 is going to enter End of Life mode on 4/30/26
There's currently no mention in the documentation of WebSocket support in Node - but we can add it. Just to be clear, we should be asking users to either use Node 20/21 with a polyfill, Node 21 with the --experimental-websockets flag, or Node 22+, and allowing users to call initializeWebSocket() themselves until the next major release which updates the Node dependency to 22+, right?
There was a problem hiding this comment.
@hsubox76 after discussing with our team, we don't anticipate usage of streaming / WebSockets in Node environments. While it's technically possible that users could decide to use our client SDK in a server environment, typically we see this happen in SSR use cases - and we don't expect initial loads of the page / data to come from subscribe / WebSocket calls - we'd like to omit this from the documentation for simplicity unless people start opening requests complaining about it.
If users really want to use streaming / WebSockets in a Node environment, they can add one to the globalThis object - and I did go ahead and add a warning to the initializeWebsocket() check in index.node.ts, which is part of this PR: 'WebSocket is not available in this environment. Use a polyfill or upgrade your Node version to one that supports WebSockets.'
WDYT?
There was a problem hiding this comment.
That works, warning message makes it clear enough, thanks.
…eaming, harden error handling (#9808) * one test left * make cleanup call synchronous * clean up streaming transport after disconnect * add cleanup to query layer * always cleanup on illegal auth changes * apply reviewer suggestions * make more robust error generation, printing, and handling * add tests to ensure routing resumes normally after automatic disconnect * one more test * add comment * add wait to close stream until execute requests resolve, address reviewer comments * add auth refresh tests, harden state tracking for close attempts
…rden errors / disconnects (#9815) * add websocket implementation checks to node environments * simplify websocket and streaming errors * convert notification hooks to observers * update observer interface, add websocket message size checks * improve websocket reason guard, run yarn format * change let to const * WIP for fallback, fixing tests * tests updated * more robust testing to make sure fallback works as expected * apply reviewer suggestions
|
The final PR associated with this work has been merged into the |
Streaming Transport (WebSockets)
Status
The feature branch is ready for review and merging.
Overview
✨ This PR introduces full-duplex streaming transport capability to Firebase Data Connect via WebSockets. It introduces a
TransportManagerto intelligently route requests between standard REST and a persistent WebSocket stream, implements a stateful Stream Transport with request tracking and idle timeouts, and fully integrates these changes into the query layer to support query subscriptions.We've opened, reviewed, and merged a bunch of smaller PRs from sub-branches (
pasta/spaghetti,pasta/lasagna, etc.) into thispasta/mainbranch. We've also been pulling frommainintopasta/mainas we go.See go/fdc-sdk-streaming-design for more information.
Changes
1. Transport Manager & Routing
DataConnectTransportManager(manager.ts):DataConnectTransportManagerto act as the primary entry point for network operations, routing betweenRESTTransportandWebSocketTransport, using the stream when available and falling back to REST if the stream is closed or encounters a fatal error.network/rest/to make room for the streaming implementation.AbstractDataConnectTransportto centralize token management (Auth and App Check) and emulator configuration, reducing duplication across transports.url.ts):websocketUrlBuilderfor constructing the stream endpoint targeting theConnectorStreamService.2. Abstract Stream Transport
streamTransport.ts):RequestIds to execution promises or subscription hooks), de-duplication of queries, and routing incoming server messages.requestId.prepareMessagelogic to attach stateful headers (tokens) and resource names efficiently, ensuring tokens are only sent when changed and resource names are only sent on the first message.wire.ts):DataConnectStreamRequestandDataConnectStreamResponse.execute,subscribe,resume,cancel).3. Concrete (WebSocket) Stream Transport
websocket.ts):WebSocketTransportextending the abstract stream transport.WebSocketAPI interactions and binary decoding (usingTextDecoder) for production responses.4. Query Layer Integration
QueryManager.ts):QueryManagerto use the newDataConnectTransportInterface.subscribeandunsubscribeto trigger transport-level stream operations when callbacks are added or removed.SubscribeObserverinterfaces and methods to handle incoming stream data and error notifications, updating the cache and notifying listeners accordingly.Testing
Extensive unit tests were added to cover the new transport architecture:
test/unit/streamTransport.test.ts: Verified stateful header logic, tracking maps, and idle timeout behavior.test/unit/streaming.test.ts: Verified streaming queries and cache integration.test/unit/transportManager.test.ts: Verified routing logic and fallback behavior.test/unit/websocketTransport.test.ts: Verified WebSocket lifecycle by mocking the global WebSocket object.Description of tests added (based on the `describe()` and `it()` trees inside each test file)
streaming.test.ts
websocketTransport.test.ts
transportManager.test.ts
streamTransport.test.ts