@@ -92,6 +92,11 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
9292 Wrap stream_response to create an AI client span for streaming.
9393 stream_response is an async generator, so we yield events within the span.
9494
95+ Note: We use explicit try/finally instead of a context manager because
96+ if the consumer abandons the stream (breaks early, network error, etc.),
97+ the context manager's __exit__ may not be called. With try/finally,
98+ cleanup happens even when GeneratorExit is thrown.
99+
95100 Note: stream_response is called with positional args unlike get_response
96101 which uses keyword args. The signature is:
97102 stream_response(
@@ -108,17 +113,21 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
108113 prompt,
109114 )
110115 """
116+ import sys
117+
111118 # Build kwargs dict from positional args for span data capture
112119 span_kwargs = dict (kwargs )
113120 if len (args ) > 0 :
114121 span_kwargs ["system_instructions" ] = args [0 ]
115122 if len (args ) > 1 :
116123 span_kwargs ["input" ] = args [1 ]
117124
118- with ai_client_span (agent , span_kwargs ) as span :
119- span .set_data (SPANDATA .GEN_AI_RESPONSE_STREAMING , True )
125+ span = ai_client_span (agent , span_kwargs )
126+ span .__enter__ ()
127+ span .set_data (SPANDATA .GEN_AI_RESPONSE_STREAMING , True )
120128
121- streaming_response = None
129+ streaming_response = None
130+ try :
122131 async for event in original_stream_response (* args , ** kwargs ):
123132 # Capture the full response from ResponseCompletedEvent
124133 if hasattr (event , "response" ):
@@ -135,6 +144,8 @@ async def wrapped_stream_response(*args: "Any", **kwargs: "Any") -> "Any":
135144 )
136145 _set_response_model_on_agent_span (agent , response_model )
137146 update_ai_client_span (span , streaming_response )
147+ finally :
148+ span .__exit__ (* sys .exc_info ())
138149
139150 model .stream_response = wrapped_stream_response
140151
0 commit comments