From 386a9fd55246b9d4915cbb96fc8ad4ed50200b2e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 14 May 2026 19:59:04 +0000 Subject: [PATCH 1/2] Add SSE keep-alive to invocations server (env-var driven) Agent-Logs-Url: https://github.com/Azure/azure-sdk-for-python/sessions/bbcab784-0e85-4a7e-a4ad-64e15c50f26e Co-authored-by: ankitbko <3169316+ankitbko@users.noreply.github.com> --- .../CHANGELOG.md | 2 + .../ai/agentserver/invocations/_invocation.py | 70 ++++++---- .../tests/test_sse_keepalive.py | 124 ++++++++++++++++++ 3 files changed, 169 insertions(+), 27 deletions(-) create mode 100644 sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/CHANGELOG.md b/sdk/agentserver/azure-ai-agentserver-invocations/CHANGELOG.md index 6ead3c39d58d..579a3e6ecffb 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/CHANGELOG.md +++ b/sdk/agentserver/azure-ai-agentserver-invocations/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- SSE keep-alive comment frames (`: keep-alive`) are now interleaved into `text/event-stream` responses returned by invoke handlers when the `SSE_KEEPALIVE_INTERVAL` environment variable is set to a positive integer (resolved via `AgentConfig.sse_keepalive_interval`). This prevents idle SSE connections from being closed by intermediate proxies and brings the invocations server to parity with the responses server. + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py b/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py index bf3120974fa0..492195e22f55 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/azure/ai/agentserver/invocations/_invocation.py @@ -288,17 +288,25 @@ def _wrap_streaming_response( response: StreamingResponse, otel_span: Any, ) -> StreamingResponse: - """Wrap a streaming response's body iterator with span lifecycle and context. - - Two layers of wrapping are applied: - - 1. **Inner (tracing):** ``trace_stream`` wraps the body iterator so - the OTel span covers the full streaming duration and is ended - when iteration completes. - 2. **Outer (context):** A second async generator re-attaches the span - as the current context for the duration of streaming, so that - child spans created by user handler code (e.g. Agent Framework) - are correctly parented under this span. + """Wrap a streaming response's body iterator with span lifecycle, context, and SSE keep-alive. + + Up to three layers of wrapping are applied (from innermost to + outermost): + + 1. **Tracing:** ``trace_stream`` wraps the body iterator so the OTel + span covers the full streaming duration and is ended when + iteration completes. Skipped when *otel_span* is ``None``. + 2. **Context:** An async generator re-attaches the span as the + current context for the duration of streaming, so child spans + created by user handler code (e.g. Agent Framework) are + correctly parented under this span. Skipped when *otel_span* + is ``None``. + 3. **SSE keep-alive:** When the response media type is + ``text/event-stream`` and ``AgentConfig.sse_keepalive_interval`` + is positive (driven by the ``SSE_KEEPALIVE_INTERVAL`` env var + set on the container), :meth:`AgentServerHost.sse_keepalive_stream` + interleaves ``: keep-alive`` SSE comment frames into idle + streams so intermediaries do not close the connection. :param response: The ``StreamingResponse`` returned by the user handler. :type response: ~starlette.responses.StreamingResponse @@ -307,23 +315,31 @@ def _wrap_streaming_response( :return: The same response object, with its body_iterator replaced. :rtype: ~starlette.responses.StreamingResponse """ - if otel_span is None: - return response - - # Inner wrap: trace_stream ends the span when iteration completes. - traced = trace_stream(response.body_iterator, otel_span) - - # Outer wrap: re-attach span as current context during streaming - # so child spans are correctly parented. - async def _iter_with_context(): # type: ignore[return-value] - token = set_current_span(otel_span) - try: - async for chunk in traced: - yield chunk - finally: - detach_context(token) + if otel_span is not None: + # Inner wrap: trace_stream ends the span when iteration completes. + traced = trace_stream(response.body_iterator, otel_span) + + # Middle wrap: re-attach span as current context during streaming + # so child spans are correctly parented. + async def _iter_with_context(): # type: ignore[return-value] + token = set_current_span(otel_span) + try: + async for chunk in traced: + yield chunk + finally: + detach_context(token) + + response.body_iterator = _iter_with_context() + + # Outer wrap: interleave SSE keep-alive frames for text/event-stream + # responses when the platform has configured a positive interval via + # the SSE_KEEPALIVE_INTERVAL env var (resolved by AgentConfig). + keepalive_interval = self.config.sse_keepalive_interval + if keepalive_interval > 0 and response.media_type == "text/event-stream": + response.body_iterator = AgentServerHost.sse_keepalive_stream( + response.body_iterator, keepalive_interval + ) - response.body_iterator = _iter_with_context() return response # ------------------------------------------------------------------ diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py new file mode 100644 index 000000000000..0386a0f64139 --- /dev/null +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py @@ -0,0 +1,124 @@ +# --------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# --------------------------------------------------------- +"""Tests for SSE keep-alive interleaving on the invocations server. + +The invocations server uses the ``SSE_KEEPALIVE_INTERVAL`` environment +variable (resolved via :class:`AgentConfig`) to drive keep-alive frame +injection. These tests exercise both the env-var-driven path and the +default-disabled path, and also verify that keep-alive is only injected +for ``text/event-stream`` responses (not for arbitrary streaming +content types such as NDJSON). +""" +import asyncio + +import pytest +from httpx import ASGITransport, AsyncClient +from starlette.requests import Request +from starlette.responses import StreamingResponse + +from azure.ai.agentserver.invocations import InvocationAgentServerHost + + +def _make_slow_sse_agent(delay_seconds: float = 0.6, event_count: int = 2) -> InvocationAgentServerHost: + """Agent whose invoke handler yields SSE events spaced by *delay_seconds*.""" + app = InvocationAgentServerHost() + + @app.invoke_handler + async def handle(_request: Request) -> StreamingResponse: + async def _events(): + for i in range(event_count): + if i > 0: + await asyncio.sleep(delay_seconds) + yield f"event: msg\ndata: {{\"i\": {i}}}\n\n".encode("utf-8") + + return StreamingResponse(_events(), media_type="text/event-stream") + + return app + + +def _make_slow_ndjson_agent(delay_seconds: float = 0.6, event_count: int = 2) -> InvocationAgentServerHost: + """Agent whose invoke handler streams NDJSON (not SSE) with delays.""" + app = InvocationAgentServerHost() + + @app.invoke_handler + async def handle(_request: Request) -> StreamingResponse: + async def _events(): + for i in range(event_count): + if i > 0: + await asyncio.sleep(delay_seconds) + yield f'{{"i": {i}}}\n'.encode("utf-8") + + return StreamingResponse(_events(), media_type="application/x-ndjson") + + return app + + +def _parse_lines(text: str) -> list[str]: + return text.splitlines() + + +# --------------------------------------------------------------------------- +# Default (env var unset) — no keep-alive frames are emitted +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_sse_keepalive_disabled_by_default(monkeypatch): + """With SSE_KEEPALIVE_INTERVAL unset, no ``: keep-alive`` lines appear.""" + monkeypatch.delenv("SSE_KEEPALIVE_INTERVAL", raising=False) + app = _make_slow_sse_agent(delay_seconds=0.4, event_count=2) + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + resp = await client.post("/invocations", content=b"") + assert resp.status_code == 200 + lines = _parse_lines(resp.text) + + keep_alives = [line for line in lines if line.startswith(": keep-alive")] + assert keep_alives == [] + + +# --------------------------------------------------------------------------- +# Env-var driven — keep-alive frames are injected into idle SSE streams +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_sse_keepalive_interleaves_frames_when_env_var_set(monkeypatch): + """When SSE_KEEPALIVE_INTERVAL is set, ``: keep-alive`` frames appear + during gaps between handler events.""" + monkeypatch.setenv("SSE_KEEPALIVE_INTERVAL", "1") + # Construct the host AFTER setting the env var so AgentConfig.from_env() + # picks up the value. + app = _make_slow_sse_agent(delay_seconds=2.5, event_count=2) + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + resp = await client.post("/invocations", content=b"") + assert resp.status_code == 200 + lines = _parse_lines(resp.text) + + keep_alives = [line for line in lines if line.startswith(": keep-alive")] + assert len(keep_alives) >= 1, f"Expected at least one keep-alive comment, got lines={lines!r}" + # Original handler events are still present and intact. + assert any(line == "event: msg" for line in lines) + assert any(line.startswith("data:") for line in lines) + + +# --------------------------------------------------------------------------- +# Keep-alive must not be applied to non-SSE streaming responses +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_sse_keepalive_not_applied_to_non_sse_streams(monkeypatch): + """Keep-alive comment frames must not be injected into NDJSON streams + even when SSE_KEEPALIVE_INTERVAL is set.""" + monkeypatch.setenv("SSE_KEEPALIVE_INTERVAL", "1") + app = _make_slow_ndjson_agent(delay_seconds=2.5, event_count=2) + transport = ASGITransport(app=app) + async with AsyncClient(transport=transport, base_url="http://testserver") as client: + resp = await client.post("/invocations", content=b"") + assert resp.status_code == 200 + body = resp.text + + assert ": keep-alive" not in body From 5952f86452756fe1a751be84c1eb2ff859d43035 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 15 May 2026 06:16:47 +0000 Subject: [PATCH 2/2] Fix cspell failure on test_sse_keepalive.py Agent-Logs-Url: https://github.com/Azure/azure-sdk-for-python/sessions/97517d60-af55-4954-859e-150a27a2c48d Co-authored-by: ankitbko <3169316+ankitbko@users.noreply.github.com> --- .../tests/test_sse_keepalive.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py index 0386a0f64139..11d0c0dc1191 100644 --- a/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py +++ b/sdk/agentserver/azure-ai-agentserver-invocations/tests/test_sse_keepalive.py @@ -74,8 +74,8 @@ async def test_sse_keepalive_disabled_by_default(monkeypatch): assert resp.status_code == 200 lines = _parse_lines(resp.text) - keep_alives = [line for line in lines if line.startswith(": keep-alive")] - assert keep_alives == [] + keepalive_lines = [line for line in lines if line.startswith(": keep-alive")] + assert keepalive_lines == [] # --------------------------------------------------------------------------- @@ -97,8 +97,8 @@ async def test_sse_keepalive_interleaves_frames_when_env_var_set(monkeypatch): assert resp.status_code == 200 lines = _parse_lines(resp.text) - keep_alives = [line for line in lines if line.startswith(": keep-alive")] - assert len(keep_alives) >= 1, f"Expected at least one keep-alive comment, got lines={lines!r}" + keepalive_lines = [line for line in lines if line.startswith(": keep-alive")] + assert len(keepalive_lines) >= 1, f"Expected at least one keep-alive comment, got lines={lines!r}" # Original handler events are still present and intact. assert any(line == "event: msg" for line in lines) assert any(line.startswith("data:") for line in lines)