Skip to content

feat(data-connect): add support for streaming transport#9809

Merged
hsubox76 merged 29 commits intomainfrom
pasta/main
Apr 8, 2026
Merged

feat(data-connect): add support for streaming transport#9809
hsubox76 merged 29 commits intomainfrom
pasta/main

Conversation

@stephenarosaj
Copy link
Copy Markdown
Contributor

@stephenarosaj stephenarosaj commented Apr 4, 2026

Streaming Transport (WebSockets)

Status

The feature branch is ready for review and merging.

Overview

✨ This PR introduces full-duplex streaming transport capability to Firebase Data Connect via WebSockets. It introduces a TransportManager to intelligently route requests between standard REST and a persistent WebSocket stream, implements a stateful Stream Transport with request tracking and idle timeouts, and fully integrates these changes into the query layer to support query subscriptions.

We've opened, reviewed, and merged a bunch of smaller PRs from sub-branches (pasta/spaghetti, pasta/lasagna, etc.) into this pasta/main branch. We've also been pulling from main into pasta/main as we go.

See go/fdc-sdk-streaming-design for more information.

Changes

1. Transport Manager & Routing

  • DataConnectTransportManager (manager.ts):
    • Created DataConnectTransportManager to act as the primary entry point for network operations, routing between RESTTransport and WebSocketTransport, using the stream when available and falling back to REST if the stream is closed or encounters a fatal error.
    • Implemented lazy initialization of the stream transport, opening the connection only when a subscription is requested.
    • Added routing logic to fall back to REST if the stream is unhealthy, pending closure, or unavailable.
  • Refactored existing network source code
    • Moved existing REST files to a flattened structure under network/rest/ to make room for the streaming implementation.
    • Created AbstractDataConnectTransport to centralize token management (Auth and App Check) and emulator configuration, reducing duplication across transports.
    • URL Utilities (url.ts):
    • Added websocketUrlBuilder for constructing the stream endpoint targeting the ConnectorStreamService.

2. Abstract Stream Transport

  • AbstractDataConnectStreamTransport (streamTransport.ts):
    • Handles request tracking (mapping RequestIds to execution promises or subscription hooks), de-duplication of queries, and routing incoming server messages.
    • Added tracking maps for active queries, mutations, and subscriptions to correlate outgoing requests with incoming responses via requestId.
    • Implemented prepareMessage logic to attach stateful headers (tokens) and resource names efficiently, ensuring tokens are only sent when changed and resource names are only sent on the first message.
    • Added a 60-second idle timeout to close the connection gracefully when no active subscriptions exist.
    • Handled auth state changes to disconnect the stream on user logout, login, or user change.
  • Wire Protocol (wire.ts):
    • Defined TypeScript interfaces for DataConnectStreamRequest and DataConnectStreamResponse.
    • Used mapped types to enforce mutual exclusivity of payload kinds (execute, subscribe, resume, cancel).

3. Concrete (WebSocket) Stream Transport

  • WebSocket Transport (websocket.ts):
    • Implemented WebSocketTransport extending the abstract stream transport.
    • Handled browser WebSocket API interactions and binary decoding (using TextDecoder) for production responses.
    • Implemented defensive truncation for close reason strings to comply with protocol limits.

4. Query Layer Integration

  • Query Manager (QueryManager.ts):
    • Updated QueryManager to use the new DataConnectTransportInterface.
    • Wired subscribe and unsubscribe to trigger transport-level stream operations when callbacks are added or removed.
    • Added SubscribeObserver interfaces and methods to handle incoming stream data and error notifications, updating the cache and notifying listeners accordingly.

Testing

Extensive unit tests were added to cover the new transport architecture:

  • test/unit/streamTransport.test.ts: Verified stateful header logic, tracking maps, and idle timeout behavior.
  • test/unit/streaming.test.ts: Verified streaming queries and cache integration.
  • test/unit/transportManager.test.ts: Verified routing logic and fallback behavior.
  • test/unit/websocketTransport.test.ts: Verified WebSocket lifecycle by mocking the global WebSocket object.
Description of tests added (based on the `describe()` and `it()` trees inside each test file)

streaming.test.ts

  • Streaming & Query Layer Integration
    • using stream via user-facing API
      • executeQuery / executeMutation should not initialize stream
      • subscribe should initialize stream
      • query layer should call invokeUnsubscribe on last unsubscribe
      • executeQuery should use stream when stream is active
      • executeMutation should use stream when stream is active
    • incoming notifications
      • should notify all relevant subscribers when data is received
      • should update cache when data is pushed
      • should notify all relevant subscribers of errors when they are pushed
      • should clean up subscriptions in query layer when observer receives a disconnect error
      • should NOT clean up subscriptions in query layer when observer receives a non-disconnect error

websocketTransport.test.ts

  • WebSocketTransport
    • openConnection
      • should resolve when onopen fires
      • should reject when onerror fires
      • should de-duplicate calls when already connected
      • should de-duplicate calls when open connection is pending
    • streamIsReady
      • should be false initially
      • should be false while connecting
      • should be true when connected
      • should be false after disconnected
    • closeConnection
      • should close the underlying websocket and reset connection states
      • should be idempotent if connection is already closed
      • should return rejected promise and clean up if ws.close() throws
    • onclose handler (handleWebsocketDisconnect)
      • should reset connection state when websocket is closed externally
      • should call rejectAllActiveRequests and clean up when closed externally
    • sendMessage
      • should not send until connection is open
      • should send stringified json payload
    • handleWebSocketMessage
      • should correctly parse incoming JSON and handle response
      • should map extensions to empty array if not strictly provided
      • should close connection with error if message is not an object
      • should close connection with error if result is missing
      • should close connection with error if result is not an object
      • should close connection with error if requestId is missing

transportManager.test.ts

  • DataConnectTransportManager
    • stream transport initialization
      • initStreamTransport should initialize stream transport only once
    • default transport routing
      • stream transport should not be initialized by default
      • invokeQuery should route to REST by default and not initialize stream transport
      • invokeMutation should route to REST by default and not initialize stream transport
      • invokeSubscribe should route to streaming by default and initialize stream transport
      • invokeUnsubscribe should route to streaming by default when stream transport is already initialized and not initialize stream transport
      • useEmulator should route to REST and streaming
      • onAuthTokenChanged should route to REST and streaming
    • dynamic transport routing
      • executeShouldUseStream should return false if streamTransport is not initialized
      • executeShouldUseStream should return true only when all stream conditions are met
      • invokeQuery dynamic routing
        • invokeQuery should route to stream if executeShouldUseStream returns true
        • invokeQuery should throw an error if stream transport throws an error and executeShouldUseStream remains true
        • invokeQuery should fallback to REST if stream transport throws an error and then executeShouldUseStream becomes false
      • invokeMutation dynamic routing
        • invokeMutation should route to stream if executeShouldUseStream() returns true
        • invokeMutation should throw an error if stream transport throws an error and executeShouldUseStream remains true
        • invokeMutation should fallback to REST if stream transport throws an error and then executeShouldUseStream becomes false
        • invokeMutation should fallback to REST for all in-flight mutations if stream fails
      • invokeSubscribe dynamic routing
        • invokeSubscribe should throw an error if isUnableToConnect is true
    • disconnects
      • subscriber onDisconnect should be called when stream fails during active subscription
      • idle timeout
        • should route to REST during idle timeout and disconnect after 60s
        • should route to REST after stream automatically closes
        • should route back to stream after reconnect

streamTransport.test.ts

  • AbstractDataConnectStreamTransport
    • prepareMessage
      • should not change data fields
      • should handle headers properly
        • auth token
          • should add auth token to the first message
          • should NOT add the same auth token to subsequent messages
          • should include auth token when it changes
        • app check token
          • should add app check token to the first message
          • should NOT add the same app check token to subsequent messages
          • should NOT include app check token when it changes
        • x-firebase-gmpid
          • should add x-firebase-gmpid to every message if appId is set
        • should add X-Goog-Api-Client to every message based on caller sdk type
      • should handle name properly
        • should add name to the first message
        • should NOT add name to subsequent messages
      • should reset connection state on onConnectionReady()
    • Request Tracking
      • getMapKey should sort keys consistently for map lookups
      • should generate unique request IDs
      • Incoming Requests from Transport Layer
        • invokeQuery
          • should populate tracking maps synchronously and then call sendMessage
          • should asynchronously clean up and reject if sendMessage fails
        • invokeMutation
          • should populate tracking maps synchronously and then call sendMessage
          • should asynchronously clean up and reject if sendMessage fails
        • invokeSubscribe
          • should populate tracking maps synchronously and then call sendMessage
          • should asynchronously call observer with error and clean up if sendMessage fails
        • invokeUnsubscribe
          • should de-populate tracking maps and call sendMessage
          • should asynchronously clean up and log error if sendMessage fails
      • Incoming Responses from Server
        • should throw an error if an unrecognized requestId is received
        • invokeQuery tracking
          • should route data to resolve the correct query promise when response is received
          • should clean map when response is received
          • should reject the correct query promise with DataConnectOperationError if response has errors
          • should clean map correctly when handleResponse rejects
        • invokeMutation tracking
          • should route data to resolve the correct mutation promise when response is received
          • should clean map of the correct tracked request when response is received
          • should reject the correct mutation promise with DataConnectOperationError if response has errors
          • should clean map correctly when handleResponse rejects
        • invokeSubscribe tracking
          • should route data to the correct subscribe observer whenever a response is received
          • should route error response to the correct subscribe observer whenever an error response is received
          • should NOT clean map when handleResponse rejects
    • Disconnects
      • should close connection after 60 seconds of idle (no active subscriptions)
      • should cancel close if a new subscription arrives during timeout
      • should restart close timeout if subscribe fails and leaves no active subscriptions
      • should not close connection if there are active execute requests
      • should close connection when last execute request finishes after idle timeout
      • Auth Disconnects
        • should close stream immediately on illegal auth change (login)
        • should close stream immediately on illegal auth change (logout)
        • should close stream immediately on illegal auth change (user change)
        • should NOT close stream on valid auth token refresh (same user)

stephenarosaj and others added 24 commits February 27, 2026 10:34
…structure (#9603)

* restructure existing transport code into new file structure

* remove DataConnectResponse from public API

* update copyright year
…on (#9609)

* introduce transport manager and transport abstraction

* update tests, run API docs and linter

* fix invokeQuery

* fix invokeMutation

* fix unit tests, update invokeUnsubscribe function signature

* run yarn format

* update names of transport classes/interfaces, update imports

* remove circular imports

* small edit to re-run jobs
…ort Implementation (#9618)

* introduce new abstract transport base class

* rename RestTransport to restTransport

* update rest to use DataConnectResponseWithMaxAge instead of DataConnectResponse
* first pass at request ID, headers/name field, and opening request logic

* updating unit tests

* finish unit tests

* fix _prepareMessage tests

* undo e2e changes, fix formatting

* remove unneeded gql files

* add onAuthTokenChanged back

* use interface exposing internal members for testing

* run yarn format
* first pass at request ID, headers/name field, and opening request logic

* updating unit tests

* finish unit tests

* fix _prepareMessage tests

* undo e2e changes, fix formatting

* remove unneeded gql files

* add onAuthTokenChanged back

* use interface exposing internal members for testing

* run yarn format

* first pass at adding basic stream request management logic

* first pass at adding basic stream request management logic

* undo test changes

* add unit tests for incoming requests from transport layer

* fix unit tests after adding _connectorResourcePath to all transports

* add unit tests for incoming responses from server

* refactor unit tests

* run yarn format

* remove initStreamTransport

* rename _handleMessage to _handleResponse

* run yarn format

* fix comment typo

* update visibility of subscribe notification hooks tracking map
…eQuery/invokeMutation (#9723)

* update validateArgs to handle new options arguments

* add validateArgs unit tests

* split functionality into separate functions, add changeset

* add hasVars argument to help differentiate first argument

* update changelog

* export validateArgs

* remove vestigial console log

* refactor tests

* run yarn format

* make sendMessage synchronous again

* fix error swallowing

* preserve synchronicity of invokeQuery and invokeMutation, update test promise rejection checking

* finish updating tests to verify non-settled promises

* run yarn format

* update test title to clearly define expected behavior

* update changeset

* add promise unsettled helper per reviewer suggestion, run yarn format
* add websocket implementation

* add open unit tests for websocket

* add more websocket unit tests

* add more websocket unit tests

* WIP adding proper error propagation

* WIP adding proper error propagation

* split error and websockets into two separate branches

* update error typing

* add more unit tests

* refactor tests

* run yarn format

* make sendMessage synchronous again

* fix error swallowing

* preserve synchronicity of invokeQuery and invokeMutation, update test promise rejection checking

* finish updating tests to verify non-settled promises

* run yarn format

* reintroduce tests lost to merge

* update test title to clearly define expected behavior

* remove old comment

* slowly merging pasta/main - 1

* slowly merging pasta/main - 2

* slowly merging pasta/main - 3

* add better tracking / cleaning to stream transport

* update unit tests to check for error behavior

* Merge pasta/main into pasta/orzo

* run yarn format

* remove redundant cleanup - add comment explaining no need for cleanup

* update tests

* update tests

* run yarn format
* initial commit for websockets

* update implementation a bit

* verifying tests

* update tests

* run yarn format

* add error handling to websocket message handling

* harden websocket message parsing
* initial commit for websockets

* update implementation a bit

* verifying tests

* update tests

* run yarn format

* add error handling to websocket message handling

* harden websocket message parsing

* first checkpoint for hooking streaming support into manager

* update tests

* WIP updating tests

* finish initialization and lazy loading tests

* fixed transport manager tests

* final unit tests fix - now integration testing

* update initialize* function documentation

* fix logError related unit tests

* add initializeWebSocket call to index.node.ts
* initial commit for websockets

* update implementation a bit

* verifying tests

* update tests

* run yarn format

* add error handling to websocket message handling

* harden websocket message parsing

* first checkpoint for hooking streaming support into manager

* update tests

* WIP updating tests

* finish initialization and lazy loading tests

* fixed transport manager tests

* final unit tests fix - now integration testing

* update initialize* function documentation

* fix logError related unit tests

* add initializeWebSocket call to index.node.ts

* first checkpoint for hooking up streaming to query layer

* add error checking to generated subscribe notification hook

* first checkpoint for streaming tests

* working through tests

* WIP updating tests

* existing tests done, adding more

* final tests

* added more tests for execute over stream

* ensure callbacks are updated before invoking subscribe

* add TODO

* update backend endpoint for websocket communication

* run yarn format

* remove debugging print statements

* move connector resource path initialization to be after connector name initialization

* remove debugging print statements

* fix some breaking tests, add some documentation, run yarn format

* update websocket url endpoint

* refactor sendMessage functions to more clearly orchestrate order of operations and eliminate race conditions- ensure connection
* add binary websocket message conversion to strings for messages from prod

* run yarn format

* apply reviewer suggestions
@stephenarosaj stephenarosaj requested review from a team and dconeybe as code owners April 4, 2026 00:28
@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 4, 2026

🦋 Changeset detected

Latest commit: 9042e51

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 2 packages
Name Type
firebase Minor
@firebase/data-connect Minor

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds a new transport layer to the Data Connect SDK, introducing a DataConnectTransportManager that manages routing between REST and a new WebSocket-based streaming transport. This architecture supports subscriptions and push notifications. Key implementation details include the AbstractDataConnectStreamTransport for request management and WebSocketTransport for connection handling. Review feedback highlights a hardcoded health check that disables the REST fallback, potential race conditions in query tracking maps, and several TypeScript type-safety issues where error arrays are incorrectly typed as empty tuples. Suggestions were also provided to improve variable initialization and error handling consistency.

Comment thread packages/data-connect/src/network/stream/streamTransport.ts Outdated
Comment thread packages/data-connect/src/core/query/QueryManager.ts Outdated
Comment thread packages/data-connect/src/network/rest/fetch.ts
Comment thread packages/data-connect/src/network/stream/streamTransport.ts
Comment thread packages/data-connect/src/network/stream/streamTransport.ts
Comment thread packages/data-connect/src/network/stream/websocket.ts Outdated
Copy link
Copy Markdown
Contributor

@hsubox76 hsubox76 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to add a changeset for data-connect (minor) - and don't forget to manually add "firebase": minor as well.

Comment thread packages/data-connect/src/index.node.ts Outdated
export * from './api';
export * from './api.node';
initializeFetch(fetch);
initializeWebSocket(WebSocket);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Websockets are stable in Node only as of Node 22. Officially the JS SDK supports Node 20+ (we will likely bump to 22 at the next breaking change release, maybe July). Will there be docs telling users to use a polyfill or make sure they're using Node 22+? I think there should at least be an engines field in the package.json reflecting the minimum required Node version. See https://github.com/firebase/firebase-js-sdk/blob/pasta/main/packages/ai/package.json#L7

Copy link
Copy Markdown
Contributor Author

@stephenarosaj stephenarosaj Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see Node 21 is the first to add WebSockets support, but it's experimental, while Node 22 enables it by default, like you mentioned, and it looks like Node 20 is going to enter End of Life mode on 4/30/26

There's currently no mention in the documentation of WebSocket support in Node - but we can add it. Just to be clear, we should be asking users to either use Node 20/21 with a polyfill, Node 21 with the --experimental-websockets flag, or Node 22+, and allowing users to call initializeWebSocket() themselves until the next major release which updates the Node dependency to 22+, right?

Copy link
Copy Markdown
Contributor Author

@stephenarosaj stephenarosaj Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsubox76 after discussing with our team, we don't anticipate usage of streaming / WebSockets in Node environments. While it's technically possible that users could decide to use our client SDK in a server environment, typically we see this happen in SSR use cases - and we don't expect initial loads of the page / data to come from subscribe / WebSocket calls - we'd like to omit this from the documentation for simplicity unless people start opening requests complaining about it.

If users really want to use streaming / WebSockets in a Node environment, they can add one to the globalThis object - and I did go ahead and add a warning to the initializeWebsocket() check in index.node.ts, which is part of this PR: 'WebSocket is not available in this environment. Use a polyfill or upgrade your Node version to one that supports WebSockets.'

WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That works, warning message makes it clear enough, thanks.

…eaming, harden error handling (#9808)

* one test left

* make cleanup call synchronous

* clean up streaming transport after disconnect

* add cleanup to query layer

* always cleanup on illegal auth changes

* apply reviewer suggestions

* make more robust error generation, printing, and handling

* add tests to ensure routing resumes normally after automatic disconnect

* one more test

* add comment

* add wait to close stream until execute requests resolve, address reviewer comments

* add auth refresh tests, harden state tracking for close attempts
stephenarosaj and others added 3 commits April 6, 2026 15:39
…rden errors / disconnects (#9815)

* add websocket implementation checks to node environments

* simplify websocket and streaming errors

* convert notification hooks to observers

* update observer interface, add websocket message size checks

* improve websocket reason guard, run yarn format

* change let to const

* WIP for fallback, fixing tests

* tests updated

* more robust testing to make sure fallback works as expected

* apply reviewer suggestions
@stephenarosaj
Copy link
Copy Markdown
Contributor Author

stephenarosaj commented Apr 7, 2026

The final PR associated with this work has been merged into the pasta/main branch.

@stephenarosaj stephenarosaj requested review from a team as code owners April 8, 2026 17:34
@hsubox76 hsubox76 merged commit 87d5cc1 into main Apr 8, 2026
36 of 41 checks passed
@hsubox76 hsubox76 deleted the pasta/main branch April 8, 2026 22:16
@google-oss-bot google-oss-bot mentioned this pull request Apr 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants