Skip to content

Commit 440e66e

Browse files
committed
Implementing stopping hte stream
1 parent e51c02b commit 440e66e

1 file changed

Lines changed: 25 additions & 10 deletions

File tree

  • libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming

libraries/Hosting/microsoft-agents-hosting-aiohttp/microsoft/agents/hosting/aiohttp/app/streaming/streaming_response.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ def __init__(self, context: "TurnContext"):
4646
context: Context for the current turn of conversation with the user.
4747
"""
4848
self._context = context
49-
self._next_sequence = 1
49+
self._sequence_number = 1
5050
self._stream_id: Optional[str] = None
5151
self._message = ""
5252
self._attachments: Optional[List[Attachment]] = None
5353
self._ended = False
54+
self._cancelled = False
5455

5556
# Queue for outgoing activities
5657
self._queue: List[Callable[[], Activity]] = []
@@ -66,6 +67,7 @@ def __init__(self, context: "TurnContext"):
6667

6768
# Channel information
6869
self._is_streaming_channel: bool = False
70+
self._channel_id: Channels = None
6971
self._interval: float = 0.1 # Default interval for sending updates
7072
self._set_defaults(context)
7173

@@ -85,7 +87,7 @@ def citations(self) -> Optional[List[ClientCitation]]:
8587
@property
8688
def updates_sent(self) -> int:
8789
"""Gets the number of updates sent for the stream."""
88-
return self._next_sequence - 1
90+
return self._sequence_number - 1
8991

9092
def queue_informative_update(self, text: str) -> None:
9193
"""
@@ -109,11 +111,11 @@ def create_activity():
109111
Entity(
110112
type="streaminfo",
111113
stream_type="informative",
112-
stream_sequence=self._next_sequence,
114+
stream_sequence=self._sequence_number,
113115
)
114116
],
115117
)
116-
self._next_sequence += 1
118+
self._sequence_number += 1
117119
return activity
118120

119121
self._queue_activity(create_activity)
@@ -131,6 +133,8 @@ def queue_text_chunk(
131133
text: Partial text of the message to send.
132134
citations: Citations to be included in the message.
133135
"""
136+
if self._cancelled:
137+
return
134138
if self._ended:
135139
raise RuntimeError("The stream has already ended.")
136140

@@ -257,6 +261,8 @@ def _set_defaults(self, context: "TurnContext"):
257261
self._is_streaming_channel = True
258262
self._interval = 0.1
259263

264+
self._channel_id = context.activity.channel_id
265+
260266
def _queue_next_chunk(self) -> None:
261267
"""
262268
Queues the next chunk of text to be sent to the client.
@@ -280,7 +286,7 @@ def create_activity():
280286
Entity(
281287
type="streaminfo",
282288
stream_type="final",
283-
stream_sequence=self._next_sequence,
289+
stream_sequence=self._sequence_number,
284290
)
285291
],
286292
)
@@ -293,13 +299,13 @@ def create_activity():
293299
Entity(
294300
type="streaminfo",
295301
stream_type="streaming",
296-
stream_sequence=self._next_sequence,
302+
stream_sequence=self._sequence_number,
297303
)
298304
],
299305
)
300306
else:
301307
return
302-
self._next_sequence += 1
308+
self._sequence_number += 1
303309
return activity
304310

305311
self._queue_activity(create_activity)
@@ -326,8 +332,17 @@ async def _drain_queue(self) -> None:
326332
if activity:
327333
await self._send_activity(activity)
328334
except Exception as err:
329-
logger.error(f"Error occurred when sending activity while streaming: {err}")
330-
raise
335+
if (
336+
"403" in str(err)
337+
and self._context.activity.channel_id == Channels.ms_teams
338+
):
339+
logger.warning("Teams channel stopped the stream.")
340+
self._cancelled = True
341+
else:
342+
logger.error(
343+
f"Error occurred when sending activity while streaming: {err}"
344+
)
345+
raise
331346
finally:
332347
self._queue_sync = None
333348

@@ -390,7 +405,7 @@ async def _send_activity(self, activity: Activity) -> None:
390405

391406
# Send activity
392407
response = await self._context.send_activity(activity)
393-
await asyncio.sleep(1)
408+
await asyncio.sleep(self._interval)
394409

395410
# Save assigned stream ID
396411
if not self._stream_id and response:

0 commit comments

Comments
 (0)