Skip to content

Conversation

@jayeshp19
Copy link
Contributor

@jayeshp19 jayeshp19 commented Dec 29, 2025

fixes: #4409

Summary by CodeRabbit

  • Bug Fixes

    • Smarter TTS retry handling with clearer recoverable vs non-recoverable outcomes and detection of missing audio to avoid silent failures.
    • Improved finalization and cleanup across providers to prevent hangs, resource leaks, and missed audio delivery.
  • Improvements

    • Input buffering and replay so streamed text/audio survive retries.
    • Per-run scoped streaming resources for better concurrency isolation and safer connection handling.

✏️ Tip: You can customize this high-level summary in your review settings.

ctx.waiter.set_exception(
APIError("11labs stream ended without audio", retryable=True)
)
self.mark_non_current()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why mark_non_current is needed when one of the generation failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just ensures the next attempt gets a fresh connection


if data.get("isFinal"):
if not ctx.received_audio and not ctx.waiter.done():
# ElevenLabs sometimes returns `isFinal` with an empty `audio` payload.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a valid case that elevenlabs returns final without audio, like when the pushed text is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, empty/whitespace input returns isFinal with audio: null. I added a sent_text guard so we only error when real text was sent

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 25, 2026

📝 Walkthrough

Walkthrough

Implements replayable streaming input and per-attempt reset for TTS streaming retries; adds AudioEmitter.has_pushed_audio() and SynthesizeStream._reset_for_retry(); and converts several plugins to use per-run local channels/state instead of instance-level channel/state.

Changes

Cohort / File(s) Summary
Core TTS retry & buffering
livekit-agents/livekit/agents/tts/tts.py
Added threading import; introduced _replay_events and _input_ended; changed retry decision to should_retry (checks APIError, no audio pushed, positive max_retry, i < max_retry); added per-attempt _reset_for_retry; recorded pushes/flush/end_input for replay; made AudioEmitter.has_pushed_audio() to inform retry logic; guarded channel closes with locks.
ElevenLabs plugin — per-stream state & connection checks
livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py
Replaced _closed checks with _Connection.is_open; added _StreamData.sent_text and received_audio; moved tokenizer stream to local run scope; mark sent_text/received_audio on send/recv; if no audio but text sent, raise retryable APIError; broadened streaming exception handling; ensure local stream cleanup.
NVIDIA plugin — local run-scoped state
livekit-plugins/livekit-plugins-nvidia/livekit/plugins/nvidia/tts.py
Converted per-instance fields (_context_id, _sent_tokenizer_stream, _token_q, _event_loop) to local run-scoped variables; introduced local token queue + sentinel; updated producers/consumers, thread-safe emissions, done future, and cleanup to use local loop/streams; close local streams on finish.
Plugins — segments channel moved to run scope
livekit-plugins/*/*/tts.py
(deepgram, google, gradium, neuphonic, resemble, sarvam, upliftai, and others)
Removed instance-level self._segments_ch and replaced with local segments_ch inside _run; updated _tokenize_input, _run_segments/_process_segments, closes/iteration, and cleanup to use the per-run channel; adjusted small segment/segment-end flows where applicable.
Minor / metrics & per-attempt flow
livekit-agents/... and plugin adjustments
Adjusted per-attempt metrics creation/deferment and logging; ensured per-attempt metrics/state takes streaming replay into account; expanded exception classes treated as retryable in streaming flows; minor lock/timing resets and aclose guarding.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant TTS_Agent as TTS Agent\n(tts.py)
    participant InputBuffer as Input Buffer\n(_replay_events)
    participant Plugin as TTS Plugin\n(_run)
    participant Output as AudioEmitter

    Client->>TTS_Agent: start synthesize_stream(request)
    TTS_Agent->>InputBuffer: push_text / flush / end_input (record events)
    TTS_Agent->>Plugin: start _run with input channel
    Plugin->>Output: produce audio (attempt `#1`)
    alt attempt fails and should_retry
        Plugin-->>TTS_Agent: emit recoverable error
        TTS_Agent->>TTS_Agent: wait, increment attempt
        TTS_Agent->>Plugin: call SynthesizeStream._reset_for_retry()
        TTS_Agent->>InputBuffer: replay events into new input channel
        TTS_Agent->>Plugin: restart _run with reconstructed channel (attempt `#2`)
        Plugin->>Output: produce audio (attempt `#2`)
    else non-retryable or retries exhausted
        Plugin-->>TTS_Agent: emit non-recoverable error / raise
    end
    alt Output.has_pushed_audio() is true
        Output-->>Client: deliver audio stream
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested reviewers

  • tinalenguyen
  • davidzhao

Poem

🐰 I buffered your words in a cozy heap,
Replayed them again when the first try fell asleep.
Local streams hop in, fresh for each run,
Retries find their voice until the job is done. 🎶

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.12% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix streaming tts retries' directly addresses the main objective of the PR, which is to fix broken streaming TTS retries as described in issue #4409.
Linked Issues check ✅ Passed The code changes comprehensively address the core issue #4409 by implementing event replay during retries through new buffering/replay mechanisms, per-attempt state reset logic, and enhanced retry decision-making based on audio production.
Out of Scope Changes check ✅ Passed All changes across multiple plugin files focus on refactoring streaming state management (moving _segments_ch to local scope) and implementing retry capability, which are directly related to fixing the broken streaming TTS retry issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3463c65 and 56daf5f.

📒 Files selected for processing (1)
  • livekit-agents/livekit/agents/tts/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-agents/livekit/agents/tts/tts.py
🧠 Learnings (1)
📚 Learning: 2026-01-22T03:28:16.289Z
Learnt from: longcw
Repo: livekit/agents PR: 4563
File: livekit-agents/livekit/agents/beta/tools/end_call.py:65-65
Timestamp: 2026-01-22T03:28:16.289Z
Learning: In code paths that check capabilities or behavior of the LLM processing the current interaction, prefer using the activity's LLM obtained via ctx.session.current_agent._get_activity_or_raise().llm instead of ctx.session.llm. The session-level LLM may be a fallback and not reflect the actual agent handling the interaction. Use the activity LLM to determine capabilities and to make capability checks or feature toggles relevant to the current processing agent.

Applied to files:

  • livekit-agents/livekit/agents/tts/tts.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: unit-tests
  • GitHub Check: type-check (3.9)
  • GitHub Check: type-check (3.13)
🔇 Additional comments (10)
livekit-agents/livekit/agents/tts/tts.py (10)

6-6: LGTM!

The threading import is correctly added to support the threading.Lock() usage for synchronous context managers in push_text, flush, end_input, and _reset_for_retry.


282-297: LGTM!

The enhanced retry logic correctly prevents retries after audio has been pushed, avoiding potential data duplication. The composite should_retry condition is well-structured and the logging provides good observability.


362-364: LGTM!

The new instance variables correctly support the retry mechanism:

  • threading.Lock() for thread-safe access to shared state
  • _replay_events buffer to store input for potential retry replay
  • _input_ended flag to track input closure state

400-403: LGTM!

The retry reset is correctly positioned before creating a new output_emitter, ensuring the input channel is reconstructed from buffered events before each retry attempt. This directly addresses the core issue where streaming TTS implementations consume the input channel on the first attempt.


432-447: LGTM!

The retry logic is consistent with ChunkedStream and correctly prevents retries after audio has been pushed. The streamed=True flag in logging helps distinguish streaming vs non-streaming retry events.


528-556: LGTM!

The push_text method correctly:

  • Guards shared state with _input_lock
  • Buffers tokens to _replay_events for retry support
  • Lazily initializes the metrics task on first push
  • Includes a deprecation warning for multi-segment usage patterns

558-573: LGTM!

Good refactoring pattern: _flush_locked() allows reuse from end_input() without lock re-acquisition (avoiding deadlock). The flush sentinel is correctly stored in _replay_events to preserve segment boundaries during retry replay.


575-597: LGTM!

This is the core fix for the streaming TTS retry issue:

  • end_input() properly tracks the ended state via _input_ended
  • _reset_for_retry() reconstructs the input channel by replaying all buffered events
  • The old channel is closed after replacement (addressing the past review comment)
  • Per-attempt timing is reset to ensure correct metrics on retry

599-604: LGTM!

Correctly acquires _input_lock before closing the input channel, ensuring thread safety with concurrent push_text/flush/end_input calls.


677-678: LGTM!

The has_pushed_audio() method provides a clean API for the retry logic to determine if any audio was produced. This is essential for preventing retries after partial output, which could cause data duplication or corruption.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (2)

483-484: Missing end_segment() call on early return.

When text_parts is empty, the function returns early at line 484, but start_segment() was already called at line 471. This leaves the segment in an inconsistent state without a corresponding end_segment() call.

Proposed fix
             if not text_parts:
+                output_emitter.end_segment()
                 return

511-511: Incorrect method call: end_input() should be end_segment().

Other TTS plugins (Deepgram, Neuphonic, Gradium) call output_emitter.end_segment() to finalize a segment. Using end_input() here appears to be a bug that could cause segment tracking issues.

Proposed fix
-            output_emitter.end_input()
+            output_emitter.end_segment()
♻️ Duplicate comments (1)
livekit-agents/livekit/agents/tts/tts.py (1)

572-585: Close the previous input channel when swapping.
Leaving the old channel open can leak resources or leave readers hanging after a failed attempt.

🔧 Suggested fix
-        ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
+        old_ch = self._input_ch
+        ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
         for ev in self._replay_events:
             ch.send_nowait(ev)

         if self._input_ended:
             ch.close()

         self._input_ch = ch
+        old_ch.close()
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 620ed34 and ce3e57e.

📒 Files selected for processing (10)
  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/tts.py
  • livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py
  • livekit-plugins/livekit-plugins-google/livekit/plugins/google/tts.py
  • livekit-plugins/livekit-plugins-gradium/livekit/plugins/gradium/tts.py
  • livekit-plugins/livekit-plugins-neuphonic/livekit/plugins/neuphonic/tts.py
  • livekit-plugins/livekit-plugins-nvidia/livekit/plugins/nvidia/tts.py
  • livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
  • livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-plugins/livekit-plugins-nvidia/livekit/plugins/nvidia/tts.py
  • livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
  • livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py
  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-google/livekit/plugins/google/tts.py
  • livekit-plugins/livekit-plugins-neuphonic/livekit/plugins/neuphonic/tts.py
  • livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py
  • livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/tts.py
  • livekit-plugins/livekit-plugins-gradium/livekit/plugins/gradium/tts.py
🧠 Learnings (1)
📚 Learning: 2026-01-22T03:28:16.289Z
Learnt from: longcw
Repo: livekit/agents PR: 4563
File: livekit-agents/livekit/agents/beta/tools/end_call.py:65-65
Timestamp: 2026-01-22T03:28:16.289Z
Learning: In code paths that check capabilities or behavior of the LLM processing the current interaction, prefer using the activity's LLM obtained via ctx.session.current_agent._get_activity_or_raise().llm instead of ctx.session.llm. The session-level LLM may be a fallback and not reflect the actual agent handling the interaction. Use the activity LLM to determine capabilities and to make capability checks or feature toggles relevant to the current processing agent.

Applied to files:

  • livekit-agents/livekit/agents/tts/tts.py
🧬 Code graph analysis (6)
livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • SentenceStream (32-64)
livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (5)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • SentenceStream (32-64)
livekit-plugins/livekit-plugins-nvidia/livekit/plugins/nvidia/tts.py (1)
  • _process_segments (149-152)
livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py (1)
  • _process_segments (277-279)
livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (1)
  • _process_segments (445-448)
livekit-agents/livekit/agents/tts/tts.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (6)
  • send_nowait (32-32)
  • send_nowait (90-98)
  • close (34-34)
  • close (42-42)
  • close (133-146)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/token_stream.py (1)
  • flush (70-91)
livekit-plugins/livekit-plugins-neuphonic/livekit/plugins/neuphonic/tts.py (3)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • SentenceStream (32-64)
livekit-agents/livekit/agents/tokenize/token_stream.py (2)
  • push_text (35-67)
  • end_input (93-95)
livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (2)
  • WordStream (80-108)
  • SentenceStream (32-64)
livekit-plugins/livekit-plugins-gradium/livekit/plugins/gradium/tts.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • WordStream (80-108)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: type-check (3.13)
  • GitHub Check: type-check (3.9)
  • GitHub Check: unit-tests
🔇 Additional comments (29)
livekit-plugins/livekit-plugins-gradium/livekit/plugins/gradium/tts.py (1)

269-296: LGTM - Local channel enables proper retry behavior.

Moving segments_ch from an instance attribute to a local variable ensures a fresh channel is created for each _run invocation, correctly fixing the retry issue where the old channel would be exhausted.

Minor nit: the comment on line 279 still references _segments_ch but the code now uses segments_ch.

livekit-plugins/livekit-plugins-google/livekit/plugins/google/tts.py (1)

326-380: LGTM - Consistent with the retry fix pattern.

The local segments_ch scoped to _run ensures a fresh channel for each retry attempt. The implementation correctly handles the tokenization → segmentation → synthesis pipeline.

livekit-plugins/livekit-plugins-deepgram/livekit/plugins/deepgram/tts.py (1)

257-284: LGTM - Local channel enables proper retry behavior.

The refactor correctly scopes segments_ch to the _run method, ensuring retries get a fresh channel.

Minor nit: the comment on line 267 still references _segments_ch but the code now uses segments_ch.

livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (1)

413-448: LGTM - Local channel enables proper retry behavior.

The refactor correctly scopes segments_ch to the _run method. The Union type tokenize.WordStream | tokenize.SentenceStream appropriately handles both tokenizer types.

livekit-plugins/livekit-plugins-neuphonic/livekit/plugins/neuphonic/tts.py (1)

307-353: LGTM - Consistent with the retry fix pattern.

The local segments_ch scoped to _run ensures a fresh channel for each retry attempt. The implementation is clean and follows the established pattern across all TTS plugins.

livekit-plugins/livekit-plugins-nvidia/livekit/plugins/nvidia/tts.py (4)

126-139: LGTM - Proper per-run state initialization.

All state (context_id, sent_tokenizer_stream, token_q, event_loop, done_fut) is now scoped locally to _run, enabling correct retry behavior. The captured event_loop is correctly used for call_soon_threadsafe calls from the worker thread.


141-152: LGTM - Input processing pipeline.

The tokenization flow correctly uses the local sent_tokenizer_stream and queues tokens for the worker thread. The sentinel None properly signals end-of-input.


154-178: LGTM - Worker thread implementation.

The worker correctly pulls from the local token_q, calls the Riva service synchronously, and uses call_soon_threadsafe to safely push audio to the output emitter. The done_fut.set_result in the finally block ensures proper completion signaling.


192-198: LGTM - Cleanup logic.

The defensive token_q.put(None) in the finally block ensures the worker thread exits even if _process_segments is cancelled. Awaiting done_fut before end_segment() ensures all audio is emitted. Closing sent_tokenizer_stream via aclose() properly releases resources.

livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (3)

458-458: Per-run _segments_ch addresses the retry issue.

The change from a persistent instance-level _segments_ch to an optional field initialized per-run is the correct fix. This ensures that on retries, a fresh channel is created instead of reusing a consumed/closed one.

Regarding the past review comment asking why self._segments_ch is reserved: it's kept as an instance field (rather than purely local) so that aclose() can properly close it during cleanup if the stream is terminated mid-run.


473-474: LGTM!

Creating the segments channel at the start of _run and storing a reference in self._segments_ch correctly implements the per-run lifecycle pattern, enabling retries to work with fresh channels.


827-840: LGTM!

The dynamic channel list construction correctly handles the optional _segments_ch lifecycle, preventing cleanup errors when aclose() is called before or after _run() completes.

livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py (7)

374-378: LGTM!

Resetting _context_id, _text_buffer, and alignment lists at the start of _run ensures clean state for retries.


389-389: LGTM!

Creating sent_tokenizer_stream locally per-run (instead of per-instance) is the core fix that enables retries to work correctly.


542-544: LGTM!

The is_open property provides a proper tri-condition liveness check that prevents using stale WebSocket connections.


575-578: LGTM!

Only marking sent_text = True when actual non-empty text is sent correctly distinguishes between empty flushes and real content, which is essential for the retry detection logic.


732-745: Addresses the retry detection correctly.

The logic properly handles the edge case raised in the past review comment: empty input legitimately returns isFinal without audio, so the ctx.sent_text check ensures we only treat it as a retryable failure when text was actually sent but no audio was received.

Regarding the past comment about mark_non_current: this is appropriate because a connection that fails to produce audio for sent text may be in a degraded state, so marking it non-current ensures subsequent streams use a fresh connection.


463-464: LGTM!

Expanding the exception handling to include APIError, APIConnectionError, and APITimeoutError ensures the new retryable error from isFinal handling is properly propagated.


469-469: LGTM!

Closing sent_tokenizer_stream in the finally block ensures proper cleanup and prevents resource leaks on both success and failure paths.

livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py (2)

248-248: LGTM!

Creating segments_ch locally in _run correctly implements the per-run channel lifecycle for retry support. Unlike the sarvam plugin (which keeps an optional instance field for cleanup in aclose), the resemble implementation uses a purely local channel which is appropriate since the channel's lifetime is fully contained within _run.


257-279: LGTM!

The tokenization and segment processing logic correctly uses the local segments_ch throughout, consistent with the pattern applied across other plugins (upliftai, nvidia, etc.).

livekit-agents/livekit/agents/tts/tts.py (8)

280-299: Retry gating looks correct and safer.
The added has_pushed_audio() check prevents retrying after partial output.


360-362: Replay state initialization looks good.


397-402: Retry reset hook is appropriately placed.


430-445: Streaming retry gating is consistent with non-streaming.


542-549: Buffering text for replay and lazy metrics start are solid.


562-565: Flush sentinel replay is correct.


569-570: End-of-input state tracking looks right.


663-665: Helper method is clear and useful for retry logic.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@jayeshp19 jayeshp19 requested a review from longcw January 25, 2026 09:58
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
livekit-agents/livekit/agents/tts/tts.py (1)

526-570: Potential token loss during retry reset.
push_text() returns when the input channel is closed, and _reset_for_retry() closes the channel before swapping it, so tokens pushed during the retry window can be dropped and never replayed. Consider serializing push_text/flush/end_input with _reset_for_retry, or buffering events during reset to preserve order.

Also applies to: 572-587

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce3e57e and 2eba83a.

📒 Files selected for processing (3)
  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
🧠 Learnings (1)
📚 Learning: 2026-01-22T03:28:16.289Z
Learnt from: longcw
Repo: livekit/agents PR: 4563
File: livekit-agents/livekit/agents/beta/tools/end_call.py:65-65
Timestamp: 2026-01-22T03:28:16.289Z
Learning: In code paths that check capabilities or behavior of the LLM processing the current interaction, prefer using the activity's LLM obtained via ctx.session.current_agent._get_activity_or_raise().llm instead of ctx.session.llm. The session-level LLM may be a fallback and not reflect the actual agent handling the interaction. Use the activity LLM to determine capabilities and to make capability checks or feature toggles relevant to the current processing agent.

Applied to files:

  • livekit-agents/livekit/agents/tts/tts.py
🧬 Code graph analysis (2)
livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py (3)
livekit-agents/livekit/agents/tokenize/token_stream.py (3)
  • flush (70-91)
  • push_text (35-67)
  • end_input (93-95)
livekit-agents/livekit/agents/_exceptions.py (2)
  • APIStatusError (45-81)
  • APIError (14-42)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • closed (152-153)
livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (3)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • SentenceStream (32-64)
livekit-agents/livekit/agents/vad.py (2)
  • _FlushSentinel (99-100)
  • end_input (160-163)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: type-check (3.13)
  • GitHub Check: type-check (3.9)
  • GitHub Check: unit-tests
🔇 Additional comments (9)
livekit-agents/livekit/agents/tts/tts.py (4)

281-296: Retry gating avoids duplicating partial audio.
The has_pushed_audio() guard makes retry decisions safer once any audio has been emitted.


360-363: Replay buffer + input-end tracking is a solid setup.
This lays the groundwork for deterministic input replay across attempts.


397-446: Retry reset flow aligns streaming with chunked behavior.
Resetting per-attempt state and gating retries on “no audio” keeps retries consistent.


665-667: Helper for retry gating is clear and pragmatic.
Duration-based checks are easy to reason about and fit the retry logic.

livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (2)

472-513: Per-run segments channel lifecycle is cleaner.
Localizing the channel and closing it in finally avoids stale state across retries.


826-833: Cleanup matches the new per-run channel ownership.
Limiting close operations to the input channel avoids touching per-run locals.

livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.py (3)

260-264: Connection reuse now guarded by open-state.
This prevents handing out a closed websocket as the “current” connection.

Also applies to: 542-545


375-410: Per-run tokenizer stream reset is solid.
Resetting buffers and scoping the tokenizer stream per run prevents cross-attempt leakage, and cleanup in finally is tidy.

Also applies to: 419-469


508-515: Stream context state tracking looks robust.
Tracking per-context “sent/received” state keeps finalization and retry paths well-defined.

Also applies to: 575-579, 725-746

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@livekit-agents/livekit/agents/tts/tts.py`:
- Around line 361-363: Replace the asyncio.Lock instance used for synchronous
context manager usage with a threading.Lock: change the attribute
self._input_lock to use threading.Lock() instead of asyncio.Lock() so that the
sync methods push_text(), flush(), end_input(), and _reset_for_retry() can use
"with self._input_lock:" without raising AttributeError; ensure you import
threading at top of the module and leave async usages (if any) unaffected by
this change.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2eba83a and a3dcbb5.

📒 Files selected for processing (3)
  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
  • livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-agents/livekit/agents/tts/tts.py
  • livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py
  • livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py
🧠 Learnings (1)
📚 Learning: 2026-01-22T03:28:16.289Z
Learnt from: longcw
Repo: livekit/agents PR: 4563
File: livekit-agents/livekit/agents/beta/tools/end_call.py:65-65
Timestamp: 2026-01-22T03:28:16.289Z
Learning: In code paths that check capabilities or behavior of the LLM processing the current interaction, prefer using the activity's LLM obtained via ctx.session.current_agent._get_activity_or_raise().llm instead of ctx.session.llm. The session-level LLM may be a fallback and not reflect the actual agent handling the interaction. Use the activity LLM to determine capabilities and to make capability checks or feature toggles relevant to the current processing agent.

Applied to files:

  • livekit-agents/livekit/agents/tts/tts.py
🧬 Code graph analysis (3)
livekit-agents/livekit/agents/tts/tts.py (2)
livekit-agents/livekit/agents/stt/stt.py (2)
  • _FlushSentinel (251-254)
  • flush (407-416)
livekit-agents/livekit/agents/utils/aio/channel.py (7)
  • closed (152-153)
  • send_nowait (32-32)
  • send_nowait (90-98)
  • close (34-34)
  • close (42-42)
  • close (133-146)
  • Chan (49-178)
livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (2)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (2)
  • WordStream (80-108)
  • SentenceStream (32-64)
livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (3)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/tokenize/tokenizer.py (1)
  • SentenceStream (32-64)
livekit-plugins/livekit-plugins-resemble/livekit/plugins/resemble/tts.py (1)
  • _process_segments (277-279)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: type-check (3.9)
  • GitHub Check: type-check (3.13)
  • GitHub Check: unit-tests
🔇 Additional comments (10)
livekit-plugins/livekit-plugins-sarvam/livekit/plugins/sarvam/tts.py (2)

470-510: Good per-run channel scoping for retry safety.

Localizing segments_ch to _run and routing tokenization/processing through it keeps channel lifecycle per-attempt and avoids stale state reuse.


825-827: Cleanup aligns with new channel lifecycle.

Closing only _input_ch here is consistent now that segments_ch is local to _run.

livekit-plugins/livekit-plugins-upliftai/livekit/plugins/upliftai/tts.py (5)

410-414: Per-run channel isolation is a solid fix.

Localizing segments_ch ensures retries don’t reuse stale channels and aligns with replayable input.


425-433: Tokenization now correctly routes through the per-run channel.

Enqueuing the initial word_stream into the local channel and closing it at the end makes segment processing deterministic per attempt.

Also applies to: 440-443


445-448: Segment processing correctly consumes the local channel.

Iterating over segments_ch ensures only this run’s streams are processed.


483-485: Empty-text segments are cleanly finalized.

Ending the segment before returning prevents dangling segment state when no tokens are produced.


499-512: Segment lifecycle now ends with end_segment().

This aligns with streaming semantics and keeps segment boundaries consistent.

livekit-agents/livekit/agents/tts/tts.py (3)

281-296: Retry gating on emitted audio looks solid.

This should prevent retries after partial audio emission, which aligns with the retry safety goals.


399-403: Replay + retry decision flow looks consistent.

Resetting per-attempt state and gating retries on has_pushed_audio() make the retry behavior predictable.

Also applies to: 431-447


675-676: Nice helper for retry decisions.

has_pushed_audio() makes the retry policy explicit and easy to reason about.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Broken streaming tts retries

2 participants