-
Notifications
You must be signed in to change notification settings - Fork 214
Inworld websocket improvements #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: f8fda4a The changes in this PR will be included in the next version bump. This PR includes changesets to release 18 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughThis PR adds a changeset and refactors the TTS implementation to use a shared WebSocket connection pool (InworldConnection/ConnectionPool), narrows the Encoding type to a fixed union, and exposes new TTS accessors and pooling exports while migrating per-context lifecycle into the pool. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant TTS
participant SharedPool as "Shared Pool"
participant InworldConn as "Inworld Connection"
participant WebSocket
Client->>TTS: synthesize(text)
TTS->>SharedPool: acquireContext(wsURL, auth)
alt context available
SharedPool-->>TTS: contextId
else create connection
SharedPool->>InworldConn: create connection
InworldConn->>WebSocket: connect
WebSocket-->>InworldConn: connected
InworldConn-->>SharedPool: ready
SharedPool-->>TTS: contextId
end
TTS->>InworldConn: send_text(contextId, chunks)
loop stream audio
InworldConn->>WebSocket: send message
WebSocket-->>InworldConn: audio frames
InworldConn-->>TTS: emit frames
TTS-->>Client: yield frame
end
TTS->>InworldConn: flush_context(contextId)
TTS->>InworldConn: close_context(contextId)
InworldConn->>SharedPool: release context
SharedPool->>SharedPool: update refs
alt no refs
SharedPool->>InworldConn: close connection
InworldConn->>WebSocket: disconnect
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
✏️ Tip: You can disable this entire section by setting Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/inworld/src/tts.ts (1)
908-961: Ensure context cleanup on error paths in SynthesizeStream.If
sendLoop,flushContext, orwaiterrejects, the context can remain open and capacity never returns to the pool. Close the context on failure.🩹 Proposed fix
try { // Acquire a context from the shared pool const acquired = await pool.acquireContext(handleMessage, config); contextId = acquired.contextId; connection = acquired.connection; waiter = acquired.waiter; @@ await waiter; @@ for (const frame of bstream.flush()) { this.queue.put({ requestId: contextId, segmentId: contextId, frame, final: false, }); } } catch (e) { this.#logger.error({ error: e, contextId }, 'Error in SynthesizeStream run'); + if (connection && contextId) { + try { + await connection.closeContext(contextId); + } catch (closeErr) { + this.#logger.warn({ error: closeErr, contextId }, 'Failed to close context after error'); + } + } throw e; }
🤖 Fix all issues with AI agents
In `@plugins/inworld/src/tts.ts`:
- Around line 220-255: In acquireContext, if this.#sendCreateContext(contextId,
config) throws the created ContextInfo remains in this.#contexts and capacity is
leaked; wrap the send in try/catch, and on error remove the contextId from
this.#contexts, clear or call rejectWaiter(err) if set, then rethrow the error;
ensure this.#lastActivityAt is only left as-is or updated appropriately. Apply
the same cleanup pattern to the other send-create-context call noted (lines
446-448) so any failed send removes the context entry and signals the waiter.
- Around line 471-479: The recurring setInterval in ConnectionPool's constructor
(the `#idleCleanupInterval` created for `#cleanupIdleConnections`) prevents process
exit in short-lived contexts; update the interval creation to call .unref() if
available (e.g., setInterval(...).unref?.()) so the timer is non-blocking, and
also ensure resource cleanup by wiring TTS.close() to call
ConnectionPool.close() and remove entries from the module-level sharedPools Map
so intervals are cleared when a TTS instance is closed; apply the same .unref()
fix to the other setInterval usages mentioned (around the other constructors at
the referenced locations).
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.changeset/chatty-rockets-start.mdREADME.mdplugins/inworld/src/tts.ts
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/inworld/src/tts.ts (1)
agents/src/utils.ts (1)
shortuuid(554-556)
🔇 Additional comments (2)
README.md (1)
74-74: Inworld plugin entry looks good.Clear addition to the supported plugins table.
.changeset/chatty-rockets-start.md (1)
1-5: Changeset entry is clear.Version bump and summary look fine.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
@codex review this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ef62402c77
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/inworld/src/tts.ts (1)
862-1009: Ensure contexts are closed on error to avoid capacity leaks.
If any exception occurs after acquiring a context, the context can remain open and occupy pool capacity. Add a best‑effort cleanup path.🩹 Suggested fix
let contextId: string | undefined; let connection: InworldConnection | undefined; let waiter: Promise<void> | undefined; + let completed = false; try { // Acquire a context from the shared pool const acquired = await pool.acquireContext(handleMessage, config); contextId = acquired.contextId; connection = acquired.connection; waiter = acquired.waiter; @@ for (const frame of bstream.flush()) { this.queue.put({ requestId: contextId, segmentId: contextId, frame, final: false, }); } + completed = true; } catch (e) { this.#logger.error({ error: e, contextId }, 'Error in SynthesizeStream run'); throw e; + } finally { + if (!completed && contextId && connection) { + try { + await connection.closeContext(contextId); + } catch (err) { + this.#logger.debug({ error: err, contextId }, 'Failed to close context after error'); + } + } }
🤖 Fix all issues with AI agents
In `@plugins/inworld/src/tts.ts`:
- Around line 603-636: The shared-pool release bug occurs because the instance
property `#pool` (and its key) can drift when options (apiKey/wsUrl) change;
updateOptions must either prevent changing those identity fields or rebind the
pool: detect when wsUrl or authorization/apiKey changed, call releaseSharedPool
with the old wsUrl/authorization key, then set `#pool` =
acquireSharedPool(newWsUrl, newAuthorization) and update any stored key; use the
helper functions getSharedPoolKey, acquireSharedPool and releaseSharedPool and
update the class field that tracks the current key (alongside `#pool`) so close()
always releases the correct pool.
- Around line 369-379: When the WebSocket close handler clears contexts it
currently never notifies capacity waiters, so acquireContext can hang; update
the ws.on('close') handler (the close callback that sets this.#ws and
this.#connecting and iterates this.#contexts) to also signal the pool capacity
for each cleared context by resolving or rejecting any pending capacity waiters
(e.g., resolve pending promises or call the pool/semaphore release method used
by acquireContext). Ensure you notify those capacity waiters before or while
removing entries from this.#contexts so waiting acquireContext calls are
unblocked.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
plugins/inworld/src/tts.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'
Files:
plugins/inworld/src/tts.ts
**/*.{ts,tsx}?(test|example|spec)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
When testing inference LLM, always use full model names from
agents/src/inference/models.ts(e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')
Files:
plugins/inworld/src/tts.ts
**/*.{ts,tsx}?(test|example)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Initialize logger before using any LLM functionality with
initializeLogger({ pretty: true })from '@livekit/agents'
Files:
plugins/inworld/src/tts.ts
🧬 Code graph analysis (1)
plugins/inworld/src/tts.ts (1)
agents/src/utils.ts (1)
shortuuid(554-556)
🔇 Additional comments (7)
plugins/inworld/src/tts.ts (7)
27-27: Encoding union and default value look consistent.
The narrowed union and default assignment align cleanly.Also applies to: 150-155
128-173: Pooling types and limits are well-scoped.
Clear type boundaries and centralized pool constants make lifecycle management easy to follow.
174-293: Context acquisition + lifecycle bookkeeping looks solid.
Nice to see creation failure cleanup and activity tracking consolidated here.
295-358: Connection retry/session refresh flow is clear.
Backoff and reconnection paths read well.
381-471: Message routing and context cleanup paths look good.
Status errors and context closures now release slots and notify capacity appropriately.
474-601: Pool orchestration and idle cleanup look good.
Acquire/wait/cleanup flow is straightforward and easy to reason about.
638-679: TTS pool wiring and accessors are clean.
Exposingpool,opts, andwsURLimproves testability and inspection.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
…tead of per api key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/inworld/src/tts.ts (1)
926-979: Ensure contexts are released on early failures.If any error occurs after
acquireContext, the context can remain active and consume capacity, even though the stream fails. This can eventually exhaust the pool.🩹 Suggested fix
- try { + let completed = false; + try { // Acquire a context from the shared pool const acquired = await pool.acquireContext(handleMessage, config); contextId = acquired.contextId; connection = acquired.connection; waiter = acquired.waiter; @@ for (const frame of bstream.flush()) { this.queue.put({ requestId: contextId, segmentId: contextId, frame, final: false, }); } + completed = true; } catch (e) { this.#logger.error({ error: e, contextId }, 'Error in SynthesizeStream run'); throw e; + } finally { + if (!completed && contextId && connection) { + try { + await connection.closeContext(contextId); + } catch (closeErr) { + this.#logger.warn( + { error: closeErr, contextId }, + 'Failed to close context after error', + ); + } + } }
♻️ Duplicate comments (1)
plugins/inworld/src/tts.ts (1)
691-696: Rebind the pool whenapiKeyorwsURLchanges.
updateOptionsupdates auth/URL but keeps the existingConnectionPool, so new syntheses still use the old credentials/endpoint. This can silently route requests to the wrong backend and is hard to debug.🩹 Suggested fix
updateOptions(opts: Partial<TTSOptions>) { - this.#opts = { ...this.#opts, ...opts }; - if (opts.apiKey) { - this.#authorization = `Basic ${opts.apiKey}`; - } + const prevWsURL = this.#opts.wsURL; + const prevAuth = this.#authorization; + this.#opts = { ...this.#opts, ...opts }; + if (opts.apiKey) { + this.#authorization = `Basic ${opts.apiKey}`; + } + if (prevWsURL !== this.#opts.wsURL || prevAuth !== this.#authorization) { + this.#pool.close(); + this.#pool = new ConnectionPool(this.#opts.wsURL, this.#authorization); + } }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
plugins/inworld/src/tts.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'
Files:
plugins/inworld/src/tts.ts
**/*.{ts,tsx}?(test|example|spec)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
When testing inference LLM, always use full model names from
agents/src/inference/models.ts(e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')
Files:
plugins/inworld/src/tts.ts
**/*.{ts,tsx}?(test|example)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Initialize logger before using any LLM functionality with
initializeLogger({ pretty: true })from '@livekit/agents'
Files:
plugins/inworld/src/tts.ts
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
davidzhao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lg
|
@cshape I think this comment is still valid. |
Description
Update Websockets implementation as per Inworld's connection/context setup
https://docs.inworld.ai/api-reference/ttsAPI/texttospeech/synthesize-speech-websocket
Pre-Review Checklist
Testing
Tested in examples/src/inworld_tts.ts
Summary by CodeRabbit
Bug Fixes
Performance Improvements
API Changes
✏️ Tip: You can customize this high-level summary in your review settings.