99from typing import Any , Callable
1010
1111import websockets
12+ from openai .types .beta .realtime .conversation_item import ConversationItem
1213from openai .types .beta .realtime .realtime_server_event import (
1314 RealtimeServerEvent as OpenAIRealtimeServerEvent ,
1415)
16+ from openai .types .beta .realtime .response_audio_delta_event import ResponseAudioDeltaEvent
1517from pydantic import TypeAdapter
1618from websockets .asyncio .client import ClientConnection
1719
@@ -233,6 +235,77 @@ async def interrupt(self) -> None:
233235 self ._audio_length_ms = 0.0
234236 self ._current_audio_content_index = None
235237
238+ async def _handle_audio_delta (self , parsed : ResponseAudioDeltaEvent ) -> None :
239+ """Handle audio delta events and update audio tracking state."""
240+ self ._current_audio_content_index = parsed .content_index
241+ self ._current_item_id = parsed .item_id
242+ if self ._audio_start_time is None :
243+ self ._audio_start_time = datetime .now ()
244+ self ._audio_length_ms = 0.0
245+
246+ audio_bytes = base64 .b64decode (parsed .delta )
247+ # Calculate audio length in ms using 24KHz pcm16le
248+ self ._audio_length_ms += self ._calculate_audio_length_ms (audio_bytes )
249+ await self ._emit_event (
250+ RealtimeModelAudioEvent (data = audio_bytes , response_id = parsed .response_id )
251+ )
252+
253+ def _calculate_audio_length_ms (self , audio_bytes : bytes ) -> float :
254+ """Calculate audio length in milliseconds for 24KHz PCM16LE format."""
255+ return len (audio_bytes ) / 24 / 2
256+
257+ async def _handle_output_item (self , item : ConversationItem ) -> None :
258+ """Handle response output item events (function calls and messages)."""
259+ if item .type == "function_call" and item .status == "completed" :
260+ tool_call = RealtimeToolCallItem (
261+ item_id = item .id or "" ,
262+ previous_item_id = None ,
263+ type = "function_call" ,
264+ # We use the same item for tool call and output, so it will be completed by the
265+ # output being added
266+ status = "in_progress" ,
267+ arguments = item .arguments or "" ,
268+ name = item .name or "" ,
269+ output = None ,
270+ )
271+ await self ._emit_event (RealtimeModelItemUpdatedEvent (item = tool_call ))
272+ await self ._emit_event (
273+ RealtimeModelToolCallEvent (
274+ call_id = item .id or "" ,
275+ name = item .name or "" ,
276+ arguments = item .arguments or "" ,
277+ id = item .id or "" ,
278+ )
279+ )
280+ elif item .type == "message" :
281+ # Handle message items from output_item events (no previous_item_id)
282+ message_item : RealtimeMessageItem = TypeAdapter (RealtimeMessageItem ).validate_python (
283+ {
284+ "item_id" : item .id or "" ,
285+ "type" : item .type ,
286+ "role" : item .role ,
287+ "content" : item .content ,
288+ "status" : "in_progress" ,
289+ }
290+ )
291+ await self ._emit_event (RealtimeModelItemUpdatedEvent (item = message_item ))
292+
293+ async def _handle_conversation_item (
294+ self , item : ConversationItem , previous_item_id : str | None
295+ ) -> None :
296+ """Handle conversation item creation/retrieval events."""
297+ message_item : RealtimeMessageItem = TypeAdapter (RealtimeMessageItem ).validate_python (
298+ {
299+ "item_id" : item .id or "" ,
300+ "previous_item_id" : previous_item_id ,
301+ "type" : item .type ,
302+ "role" : item .role ,
303+ "content" : item .content ,
304+ "status" : "in_progress" ,
305+ }
306+ )
307+ await self ._emit_event (RealtimeModelItemUpdatedEvent (item = message_item ))
308+
236309 async def close (self ) -> None :
237310 """Close the session."""
238311 if self ._websocket :
@@ -258,18 +331,7 @@ async def _handle_ws_event(self, event: dict[str, Any]):
258331 return
259332
260333 if parsed .type == "response.audio.delta" :
261- self ._current_audio_content_index = parsed .content_index
262- self ._current_item_id = parsed .item_id
263- if self ._audio_start_time is None :
264- self ._audio_start_time = datetime .now ()
265- self ._audio_length_ms = 0.0
266-
267- audio_bytes = base64 .b64decode (parsed .delta )
268- # Calculate audio length in ms using 24KHz pcm16le
269- self ._audio_length_ms += len (audio_bytes ) / 24 / 2
270- await self ._emit_event (
271- RealtimeModelAudioEvent (data = audio_bytes , response_id = parsed .response_id )
272- )
334+ await self ._handle_audio_delta (parsed )
273335 elif parsed .type == "response.audio.done" :
274336 await self ._emit_event (RealtimeModelAudioDoneEvent ())
275337 elif parsed .type == "input_audio_buffer.speech_started" :
@@ -291,21 +353,10 @@ async def _handle_ws_event(self, event: dict[str, Any]):
291353 parsed .type == "conversation.item.created"
292354 or parsed .type == "conversation.item.retrieved"
293355 ):
294- item = parsed .item
295356 previous_item_id = (
296357 parsed .previous_item_id if parsed .type == "conversation.item.created" else None
297358 )
298- message_item : RealtimeMessageItem = TypeAdapter (RealtimeMessageItem ).validate_python (
299- {
300- "item_id" : item .id or "" ,
301- "previous_item_id" : previous_item_id ,
302- "type" : item .type ,
303- "role" : item .role ,
304- "content" : item .content ,
305- "status" : "in_progress" ,
306- }
307- )
308- await self ._emit_event (RealtimeModelItemUpdatedEvent (item = message_item ))
359+ await self ._handle_conversation_item (parsed .item , previous_item_id )
309360 elif (
310361 parsed .type == "conversation.item.input_audio_transcription.completed"
311362 or parsed .type == "conversation.item.truncated"
@@ -341,36 +392,4 @@ async def _handle_ws_event(self, event: dict[str, Any]):
341392 parsed .type == "response.output_item.added"
342393 or parsed .type == "response.output_item.done"
343394 ):
344- item = parsed .item
345- if item .type == "function_call" and item .status == "completed" :
346- tool_call = RealtimeToolCallItem (
347- item_id = item .id or "" ,
348- previous_item_id = None ,
349- type = "function_call" ,
350- # We use the same item for tool call and output, so it will be completed by the
351- # output being added
352- status = "in_progress" ,
353- arguments = item .arguments or "" ,
354- name = item .name or "" ,
355- output = None ,
356- )
357- await self ._emit_event (RealtimeModelItemUpdatedEvent (item = tool_call ))
358- await self ._emit_event (
359- RealtimeModelToolCallEvent (
360- call_id = item .id or "" ,
361- name = item .name or "" ,
362- arguments = item .arguments or "" ,
363- id = item .id or "" ,
364- )
365- )
366- elif item .type == "message" :
367- message_item = TypeAdapter (RealtimeMessageItem ).validate_python (
368- {
369- "item_id" : item .id or "" ,
370- "type" : item .type ,
371- "role" : item .role ,
372- "content" : item .content ,
373- "status" : "in_progress" ,
374- }
375- )
376- await self ._emit_event (RealtimeModelItemUpdatedEvent (item = message_item ))
395+ await self ._handle_output_item (parsed .item )
0 commit comments