From 1f8f016a54c137a92a58d537fa2ae8e948a359e1 Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Wed, 31 Dec 2025 11:26:22 -0600 Subject: [PATCH 1/6] Streaming sample for azurefunctions --- .../azure_functions/03_callbacks/README.md | 83 ---- .../azure_functions/03_callbacks/demo.http | 30 -- .../03_callbacks/function_app.py | 185 -------- .../03_reliable_streaming/README.md | 438 ++++++++++++++++++ .../03_reliable_streaming/demo.http | 167 +++++++ .../03_reliable_streaming/function_app.py | 343 ++++++++++++++ .../host.json | 0 .../local.settings.json.template | 4 +- .../redis_stream_response_handler.py | 201 ++++++++ .../requirements.txt | 3 +- .../03_reliable_streaming/tools.py | 165 +++++++ 11 files changed, 1319 insertions(+), 300 deletions(-) delete mode 100644 python/samples/getting_started/azure_functions/03_callbacks/README.md delete mode 100644 python/samples/getting_started/azure_functions/03_callbacks/demo.http delete mode 100644 python/samples/getting_started/azure_functions/03_callbacks/function_app.py create mode 100644 python/samples/getting_started/azure_functions/03_reliable_streaming/README.md create mode 100644 python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http create mode 100644 python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py rename python/samples/getting_started/azure_functions/{03_callbacks => 03_reliable_streaming}/host.json (100%) rename python/samples/getting_started/azure_functions/{03_callbacks => 03_reliable_streaming}/local.settings.json.template (74%) create mode 100644 python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py rename python/samples/getting_started/azure_functions/{03_callbacks => 03_reliable_streaming}/requirements.txt (59%) create mode 100644 python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py diff --git a/python/samples/getting_started/azure_functions/03_callbacks/README.md b/python/samples/getting_started/azure_functions/03_callbacks/README.md deleted file mode 100644 index 09e50bcfd1..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/README.md +++ /dev/null @@ -1,83 +0,0 @@ -# Callback Telemetry Sample - -This sample demonstrates how to use the Durable Extension for Agent Framework's response callbacks to observe -streaming updates and final agent responses in real time. The `ConversationAuditTrail` callback -records each chunk received from the Azure OpenAI agent and exposes the collected events through -an HTTP API that can be polled by a web client or dashboard. - -## Highlights - -- Registers a default `AgentResponseCallbackProtocol` implementation that logs streaming and final - responses. -- Persists callback events in an in-memory store and exposes them via - `GET /api/agents/{agentName}/callbacks/{thread_id}`. -- Shows how to reset stored callback events with `DELETE /api/agents/{agentName}/callbacks/{thread_id}`. -- Works alongside the standard `/api/agents/{agentName}/run` endpoint so you can correlate callback - telemetry with agent responses. - -## Prerequisites - -Complete the shared environment setup steps in `../README.md`, including creating a virtual environment, installing dependencies, and configuring Azure OpenAI credentials and storage settings. - -> **Note:** This is a streaming example that currently uses a local in-memory store for simplicity. -> For distributed environments, consider using Redis, Service Bus, or another pub/sub mechanism for -> callback coordination. - -## Running the Sample - -Send a prompt to the agent: - -```bash -curl -X POST http://localhost:7071/api/agents/CallbackAgent/run \ - -H "Content-Type: application/json" \ - -d '{"message": "Tell me a short joke"}' -``` - -> **Note:** The run endpoint waits for the agent response by default. To return immediately, set the `x-ms-wait-for-response` header or include `"wait_for_response": false` in the request body. - -Poll callback telemetry (replace `` with the value from the POST response): - -```bash -curl http://localhost:7071/api/agents/CallbackAgent/callbacks/ -``` - -Reset stored events: - -```bash -curl -X DELETE http://localhost:7071/api/agents/CallbackAgent/callbacks/ -``` - -## Expected Output - -When you call `GET /api/agents/CallbackAgent/callbacks/{thread_id}` after sending a request to the agent, -the API returns a list of streaming and final callback events similar to the following: - -```json -[ - { - "timestamp": "2024-01-01T00:00:00Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "stream", - "update_kind": "text", - "text": "Sure, here's a joke..." - }, - { - "timestamp": "2024-01-01T00:00:01Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "final", - "response_text": "Why did the cloud...", - "usage": { - "type": "usage_details", - "input_token_count": 159, - "output_token_count": 29, - "total_token_count": 188 - } - } -] -``` diff --git a/python/samples/getting_started/azure_functions/03_callbacks/demo.http b/python/samples/getting_started/azure_functions/03_callbacks/demo.http deleted file mode 100644 index 771ed38027..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/demo.http +++ /dev/null @@ -1,30 +0,0 @@ -### Callback Sample - API Tests -### Use with VS Code REST Client or another HTTP testing tool. -### -### Endpoints introduced in this sample: -### - POST /api/agents/{agentName}/run : send a message to the agent -### - GET /api/agents/{agentName}/callbacks/{thread_id} : retrieve callback telemetry -### - DELETE /api/agents/{agentName}/callbacks/{thread_id} : clear stored callback events - -@baseUrl = http://localhost:7071 -@agentName = CallbackAgent -@agentRoute = {{baseUrl}}/api/agents/{{agentName}} -@thread_id = test-thread-00 - -### Health Check -GET {{baseUrl}}/api/health - -### Send message (callbacks will capture streaming + final response) -POST {{agentRoute}}/run -Content-Type: application/json - -{ - "message": "Generate a short weather update for Paris and mention streaming callbacks.", - "thread_id": "{{thread_id}}" -} - -### Inspect callback telemetry -GET {{agentRoute}}/callbacks/{{thread_id}} - -### Clear stored callback telemetry for the thread -DELETE {{agentRoute}}/callbacks/{{thread_id}} diff --git a/python/samples/getting_started/azure_functions/03_callbacks/function_app.py b/python/samples/getting_started/azure_functions/03_callbacks/function_app.py deleted file mode 100644 index e6702f6586..0000000000 --- a/python/samples/getting_started/azure_functions/03_callbacks/function_app.py +++ /dev/null @@ -1,185 +0,0 @@ -"""Capture agent response callbacks inside Azure Functions. - -Components used in this sample: -- AzureOpenAIChatClient to build an agent that streams interim updates. -- AgentFunctionApp with a default AgentResponseCallbackProtocol implementation. -- Azure Functions HTTP triggers that expose callback telemetry via REST. - -Prerequisites: set `AZURE_OPENAI_ENDPOINT`, `AZURE_OPENAI_CHAT_DEPLOYMENT_NAME`, and either -`AZURE_OPENAI_API_KEY` or authenticate with Azure CLI before starting the Functions host.""" - -import json -import logging -from collections import defaultdict -from datetime import datetime, timezone -from typing import Any, DefaultDict - -import azure.functions as func -from agent_framework import AgentRunResponseUpdate -from agent_framework.azure import ( - AgentCallbackContext, - AgentFunctionApp, - AgentResponseCallbackProtocol, - AzureOpenAIChatClient, -) -from azure.identity import AzureCliCredential - -logger = logging.getLogger(__name__) - - -# 1. Maintain an in-memory store for callback events keyed by thread ID. -# NOTE: This is a streaming example using a local console logger. For distributed environments, -# consider using Redis or Service Bus for callback coordination across multiple instances. -CallbackStore = DefaultDict[str, list[dict[str, Any]]] -callback_events: CallbackStore = defaultdict(list) - - -def _serialize_usage(usage: Any) -> Any: - """Best-effort serialization for agent usage metadata.""" - - if usage is None: - return None - - model_dump = getattr(usage, "model_dump", None) - if callable(model_dump): - return model_dump() - - to_dict = getattr(usage, "to_dict", None) - if callable(to_dict): - return to_dict() - - return str(usage) - - -class ConversationAuditTrail(AgentResponseCallbackProtocol): - """Callback that records streaming chunks and final responses for later inspection.""" - - def __init__(self) -> None: - self._logger = logging.getLogger("durableagent.samples.callbacks.audit") - - async def on_streaming_response_update( - self, - update: AgentRunResponseUpdate, - context: AgentCallbackContext, - ) -> None: - event = self._build_base_event(context) - event.update( - { - "event_type": "stream", - "update_kind": getattr(update, "kind", "text"), - "text": getattr(update, "text", None), - } - ) - thread_id = context.thread_id or "" - callback_events[thread_id].append(event) - - preview = event.get("text") or event.get("update_kind") - self._logger.info( - "[%s][%s] streaming chunk: %s", - context.agent_name, - context.correlation_id, - preview, - ) - - async def on_agent_response(self, response, context: AgentCallbackContext) -> None: - event = self._build_base_event(context) - event.update( - { - "event_type": "final", - "response_text": getattr(response, "text", None), - "usage": _serialize_usage(getattr(response, "usage_details", None)), - } - ) - thread_id = context.thread_id or "" - callback_events[thread_id].append(event) - - self._logger.info( - "[%s][%s] final response recorded", - context.agent_name, - context.correlation_id, - ) - - @staticmethod - def _build_base_event(context: AgentCallbackContext) -> dict[str, Any]: - thread_id = context.thread_id - return { - "timestamp": datetime.now(timezone.utc).isoformat(), - "agent_name": context.agent_name, - "thread_id": thread_id, - "correlation_id": context.correlation_id, - "request_message": context.request_message, - } - - -# 2. Create the agent that will emit streaming updates and final responses. -callback_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( - name="CallbackAgent", - instructions=( - "You are a friendly assistant that narrates actions while responding. " - "Keep answers concise and acknowledge when callbacks capture streaming updates." - ), -) - - -# 3. Register the agent inside AgentFunctionApp with a default callback instance. -audit_callback = ConversationAuditTrail() -app = AgentFunctionApp(enable_health_check=True, default_callback=audit_callback) -app.add_agent(callback_agent) - - -@app.function_name("get_callback_events") -@app.route(route="agents/{agent_name}/callbacks/{thread_id}", methods=["GET"]) -async def get_callback_events(req: func.HttpRequest) -> func.HttpResponse: - """Return all callback events collected for a thread.""" - - thread_id = req.route_params.get("thread_id", "") - events = callback_events.get(thread_id, []) - return func.HttpResponse( - json.dumps(events, indent=2), - status_code=200, - mimetype="application/json", - ) - - -@app.function_name("reset_callback_events") -@app.route(route="agents/{agent_name}/callbacks/{thread_id}", methods=["DELETE"]) -async def reset_callback_events(req: func.HttpRequest) -> func.HttpResponse: - """Clear the stored callback events for a thread.""" - - thread_id = req.route_params.get("thread_id", "") - callback_events.pop(thread_id, None) - return func.HttpResponse(status_code=204) - - -""" -Expected output when querying `GET /api/agents/CallbackAgent/callbacks/{thread_id}`: - -HTTP/1.1 200 OK -[ - { - "timestamp": "2024-01-01T00:00:00Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "stream", - "update_kind": "text", - "text": "Sure, here's a joke..." - }, - { - "timestamp": "2024-01-01T00:00:01Z", - "agent_name": "CallbackAgent", - "thread_id": "", - "correlation_id": "", - "request_message": "Tell me a short joke", - "event_type": "final", - "response_text": "Why did the cloud...", - "usage": { - "type": "usage_details", - "input_token_count": 159, - "output_token_count": 29, - "total_token_count": 188 - } - } -] -""" diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md new file mode 100644 index 0000000000..d182997fcd --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md @@ -0,0 +1,438 @@ +# Agent Response Callbacks with Redis Streaming + +This sample demonstrates how to implement **agent response callbacks** for durable agents using Redis Streams for persistent, reliable message delivery. It shows how to capture streaming agent responses via callbacks and persist them to Redis, enabling clients to disconnect and reconnect without losing messages. + +## Key Concepts Demonstrated + +- **Durable Agents**: Uses `AgentFunctionApp` for orchestrated background agent execution +- **Persistent Message Delivery**: Agent responses are written to Redis Streams via a callback +- **Redis Streams**: Messages persist with configurable TTL (default 10 minutes) for reliable delivery +- **Cursor-Based Resumption**: Clients can resume from any point using Redis stream entry IDs +- **Fire-and-Forget Invocation**: Agents run asynchronously in the background +- **Agent Response Callbacks**: Demonstrates how to capture and persist streaming agent responses + +## Architecture + +The sample creates a travel planning agent using the durable agents pattern with Redis streaming: + +### Endpoints + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/agents/TravelPlanner/run` | POST | **Standard:** Starts a durable agent run (returns 202 Accepted) | +| `/api/agent/stream/{conversation_id}` | GET | **Custom:** Streams chunks from Redis with cursor-based resumption | +| `/api/agents/TravelPlanner/run/{conversation_id}` | GET | **Standard:** Check status of agent run | +| `/api/health` | GET | Health check endpoint | + +### Flow + +1. **Client sends prompt** to `/api/agents/TravelPlanner/run` (standard endpoint) +2. **Function app returns 202 Accepted** with `conversation_id` and `correlation_id` +3. **Agent runs in background** via durable orchestration +4. **Redis callback writes chunks** to Redis Stream as agent generates responses +5. **Client calls** `/api/agent/stream/{conversation_id}` (custom endpoint) to read chunks from Redis +6. **Messages persist with TTL** allowing clients to resume from any cursor position using optional `cursor` parameter + +### Components + +```python +# Redis callback writes streaming updates to Redis +class RedisStreamCallback(AgentResponseCallbackProtocol): + async def on_streaming_response_update(self, update, context): + # Write chunk to Redis Stream with sequence number and timestamp + + async def on_agent_response(self, response, context): + # Write end-of-stream marker when agent completes + +# AgentFunctionApp with durable agents and Redis callback +app = AgentFunctionApp( + agents=[create_travel_agent()], + default_callback=redis_callback, +) +``` + +## Prerequisites + +Before running this sample, ensure you have: + +1. **Azure OpenAI**: Set up an Azure OpenAI resource with a chat deployment +2. **Redis**: Running locally or in Docker for persistent message storage +3. **Durable Task Scheduler (DTS)**: Running locally for orchestration (see parent README) +4. **Azurite**: Local storage emulator for Azure Functions (see parent README) + +### Starting Redis + +Start Redis using Docker: + +```bash +docker run -d --name redis -p 6379:6379 redis:latest +``` + +To verify Redis is running: + +```bash +docker ps | grep redis +``` + +**Note:** This sample uses Redis Streams to demonstrate persistent callback storage, which is more robust than in-memory storage for production scenarios. + +## Configuration + +Update your `local.settings.json` with your Azure OpenAI credentials: + +```json +{ + "Values": { + "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com/", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "your-deployment-name", + "AZURE_OPENAI_API_KEY": "your-api-key-if-not-using-rbac", + "REDIS_CONNECTION_STRING": "redis://localhost:6379", + "REDIS_STREAM_TTL_MINUTES": "10", + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", + "AzureWebJobsStorage": "UseDevelopmentStorage=true" + } +} +``` + +Configuration options: +- `REDIS_CONNECTION_STRING`: Connection string for Redis (default: `redis://localhost:6379`) +- `REDIS_STREAM_TTL_MINUTES`: Time-to-live for stream entries in minutes (default: `10`) + +## Running the Sample + +1. **Start required services** (Redis, DTS, Azurite): + ```bash + # Redis (if not already running) + docker run -d --name redis -p 6379:6379 redis:latest + + # DTS and Azurite (see parent README for instructions) + ``` + +2. **Install dependencies**: + ```bash + cd python/samples/getting_started/azure_functions/03_reliable_streaming + pip install -r requirements.txt + ``` + +3. **Start the Function App**: + ```bash + func start + ``` + +## Testing the Sample + +### Test 1: Basic Workflow (Standard + Custom Endpoints) + +**Step 1:** Start the agent run: + +```bash +curl -X POST http://localhost:7071/api/agents/TravelPlanner/run \ + -H "Content-Type: text/plain" \ + -d "Plan a 3-day trip to Tokyo" +``` + +**Response (202 Accepted):** +```json +{ + "status": "accepted", + "response": "Agent request accepted", + "message": "Plan a 3-day trip to Tokyo", + "conversation_id": "abc-123-def-456", + "correlation_id": "xyz-789-ghi-012" +} +``` + +**Step 2:** Stream chunks from Redis using the custom endpoint: + +```bash +curl http://localhost:7071/api/agent/stream/abc-123-def-456 \ + -H "Accept: text/event-stream" +``` + +**Expected Response (SSE format with chunks):** +``` +id: 1734649123456-0 +event: message +data: Here's a wonderful 3-day Tokyo itinerary... + +id: 1734649123789-0 +event: message +data: Day 1: Arrival and Shibuya... + +id: 1734649124012-0 +event: done +data: [DONE] +``` + +**Step 3 (Optional):** Resume from a specific cursor: + +```bash +# Use a cursor from an earlier SSE event +curl "http://localhost:7071/api/agent/stream/abc-123-def-456?cursor=1734649123456-0" \ + -H "Accept: text/event-stream" +``` + +### Test 2: Plain Text Format (for terminals) + +```bash +# Start the run +RESPONSE=$(curl -s -X POST http://localhost:7071/api/agents/TravelPlanner/run \ + -H "Content-Type: text/plain" \ + -d "Plan a weekend in Paris") + +CONV_ID=$(echo $RESPONSE | jq -r .conversation_id) + +# Stream in plain text format +curl http://localhost:7071/api/agent/stream/$CONV_ID \ + -H "Accept: text/plain" +``` + +### Test 3: Reading from Redis using redis-cli + +```bash +# Connect to Redis +docker exec -it redis redis-cli + +# View all messages for the conversation +XRANGE agent-stream:abc-123-def-456 - + + +# Example output: +# 1) 1) "1734649123456-0" +# 2) 1) "text" +# 2) "Here's a wonderful 3-day Tokyo itinerary..." +# 3) "sequence" +# 4) "0" +# 5) "timestamp" +# 6) "1734649123456" +# 2) 1) "1734649123789-0" +# 2) 1) "text" +# 2) "Day 1: Arrival and Shibuya..." +# 3) "sequence" +# 4) "1" +``` + +### Test 2: Check Agent Status + +```bash +curl http://localhost:7071/api/agents/TravelPlanner/run/abc-123-def-456 +``` + +Returns the orchestration status (Running, Completed, Failed, etc.). + +### Test 3: Using Python to Read from Redis + +```python +import asyncio +from datetime import timedelta +import redis.asyncio as aioredis +from redis_stream_response_handler import RedisStreamResponseHandler + +async def read_agent_response(conversation_id: str): + redis_client = aioredis.from_url("redis://localhost:6379") + handler = RedisStreamResponseHandler( + redis_client=redis_client, + stream_ttl=timedelta(minutes=10) + ) + + async for chunk in handler.read_stream(conversation_id): + if chunk.is_done: + print("\n[Agent completed]") + break + if chunk.text: + print(chunk.text, end="", flush=True) + +# Usage +asyncio.run(read_agent_response("abc-123-def-456")) +``` + +### Test 7: Using the demo.http File + +If you have VS Code with the REST Client extension: + +1. Open `demo.http` in VS Code +2. Click "Send Request" above any test +3. The `conversation_id` is automatically captured for subsequent requests +4. Try both the custom `/agent/create` and standard `/agents/TravelPlanner/run` endpoints + +## How It Works + +### Durable Agents Pattern + +This sample uses the **durable agents pattern** from `AgentFunctionApp`: + +1. **Client calls `/run`** → Returns 202 Accepted immediately with `conversation_id` +2. **Durable orchestration starts** → Agent executes in background +3. **Callback writes to Redis** → Each streaming chunk is persisted with sequence number and timestamp +4. **Client reads from Redis** → Using RedisStreamResponseHandler or redis-cli +5. **Cursor-based resumption** → Client can resume from any cursor position + +### RedisStreamCallback + +The `RedisStreamCallback` class implements `AgentResponseCallbackProtocol`: + +**Writing to Redis** (`on_streaming_response_update`): +- Receives streaming updates from the agent +- Writes each chunk to a Redis Stream with metadata (sequence number, timestamp) +- Sets a TTL on the stream (default 10 minutes) + +**End-of-stream marker** (`on_agent_response`): +- Called when agent completes +- Writes a marker with `done: true` to signal completion + +### Reading from Redis + +Clients can read agent responses directly from Redis using the `RedisStreamResponseHandler`: + +```python +from redis_stream_response_handler import RedisStreamResponseHandler +import redis.asyncio as aioredis + +# Connect to Redis +redis_client = aioredis.from_url("redis://localhost:6379") +handler = RedisStreamResponseHandler(redis_client, stream_ttl=timedelta(minutes=10)) + +# Read all messages for a conversation +async for chunk in handler.read_stream(conversation_id, cursor=None): + if chunk.is_done: + print("Agent completed") + break + if chunk.text: + print(chunk.text, end="") +``` + +The `read_stream` method supports: +- Reading from beginning (cursor=None) or from a specific cursor +- Automatic handling of stream completion markers +- Error handling for missing or expired streams + +## Delivery Guarantees + +This pattern provides: + +- **At-least-once delivery**: Messages are persisted and can be read multiple times +- **Ordering**: Messages are delivered in the order they were written +- **Durability**: Messages persist until TTL expires (default 10 minutes) +- **Background execution**: Agent runs independently of client connection + +However, it does NOT guarantee: + +- **Exactly-once delivery**: Clients may receive duplicate messages if they resume from an earlier cursor +- **Infinite retention**: Messages expire after the configured TTL + +Clients should handle potential duplicates if necessary (e.g., using sequence numbers). + +## Advanced Usage + +### Custom TTL + +Set a custom TTL for stream entries (in minutes): + +```json +{ + "REDIS_STREAM_TTL_MINUTES": "30" +} +``` + +### Remote Redis + +Use a remote Redis instance: + +```json +{ + "REDIS_CONNECTION_STRING": "rediss://username:password@your-redis.com:6380" +} +``` + +### Checking Agent Status + +Use the standard durable agents status endpoint: + +```bash +curl http://localhost:7071/api/agents/TravelPlanner/run/{conversation_id} +``` + +This returns the orchestration status (Running, Completed, Failed, etc.). + +## Debugging + +### Redis CLI Commands + +Connect to Redis: +```bash +docker exec -it redis redis-cli +``` + +Check if stream exists: +```bash +XLEN agent-stream:{conversation_id} +``` + +View stream contents: +```bash +XRANGE agent-stream:{conversation_id} - + +``` + +View with entry IDs: +```bash +XREAD STREAMS agent-stream:{conversation_id} 0-0 +``` + +Check TTL: +```bash +TTL agent-stream:{conversation_id} +``` + +### Logs + +Watch the function app logs for: +- Agent execution progress +- Redis write operations +- Client connection/disconnection events + +```bash +# Logs show: +[durableagent.samples.redis_streaming] Wrote chunk to Redis: seq=0, len=52 +[durableagent.samples.redis_streaming] Agent completed, wrote end-of-stream marker +``` + +## Cleanup + +Stop and remove the Redis container: + +```bash +docker stop redis +docker rm redis +``` + +## Code Structure + +``` +03_reliable_streaming/ +├── function_app.py # Main app with AgentFunctionApp and Redis callback +├── redis_stream_response_handler.py # Redis streaming utilities +├── tools.py # Mock travel tools (weather, events) +├── requirements.txt # Python dependencies +├── host.json # Azure Functions configuration +├── local.settings.json # Local environment variables +├── demo.http # REST Client test file +└── README.md # This file +``` + +## Comparison with Standard Durable Agents + +| Feature | Standard Durable Agents | This Sample | +|---------|------------------------|-------------| +| Agent execution | Background orchestration | ✅ Same | +| Client response | 202 Accepted only | ✅ Same | +| Response callback | None | ✅ Writes to Redis Streams | +| Message persistence | None (status only) | ✅ Redis Streams with TTL | +| Resumption | Not supported | ✅ Resume from any cursor | +| Client access | Status endpoint only | ✅ Direct Redis access or RedisStreamResponseHandler | + +## Learn More + +- [Redis Streams Documentation](https://redis.io/docs/data-types/streams/) +- [Server-Sent Events Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html) +- [Microsoft Agent Framework Documentation](https://github.com/microsoft/agent-framework) +- [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) +- [Durable Task Framework](https://github.com/microsoft/durabletask) diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http new file mode 100644 index 0000000000..8892585eb2 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http @@ -0,0 +1,167 @@ +### +# Reliable Streaming with Redis - Demo HTTP Requests +# +# Workflow: +# 1. POST /api/agents/TravelPlanner/run - Start durable agent run (returns conversation_id) +# 2. GET /api/agent/stream/{conversation_id} - Read chunks from Redis (SSE or plain text) +# 3. Resume with ?cursor={id} parameter to skip already-processed messages +# +# Prerequisites: +# 1. Start Redis: docker run -d --name redis -p 6379:6379 redis:latest +# 2. Start DTS and Azurite (see README) +# 3. Start function app: func start +### + +@baseUrl = http://localhost:7071 +@agentName = TravelPlanner + +### +# ===== RECOMMENDED WORKFLOW ===== +### + +### +# Test 1: Start agent run (standard endpoint - triggers Redis callback) +# @name tokyo-trip +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +Plan a 3-day trip to Tokyo + +### +# Test 2: Stream from Redis (custom endpoint with SSE format) +# Automatically captures conversation_id from Test 1 +@tokyoConvId = {{tokyo-trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}} +Accept: text/event-stream + +### +# Test 3: Stream in plain text format (good for terminals) +GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}} +Accept: text/plain + +### +# Test 4: Resume from a specific cursor +# Replace {cursor_id} with actual id from SSE events +GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}}?cursor={cursor_id} +Accept: text/event-stream + +### +# ===== MORE EXAMPLES ===== +### + +### +# Test 5: Weekend trip to Paris +# @name paris-trip +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +Plan a weekend trip to Paris with romantic restaurants and museums + +### +# Test 6: Stream Paris trip +@parisConvId = {{paris-trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{parisConvId}} +Accept: text/event-stream + +### +# Test 7: Check agent status +GET {{baseUrl}}/api/agents/{{agentName}}/run/{{parisConvId}} + +### +# ===== LONG-RUNNING EXAMPLES ===== +### + +### +# Test 8: Detailed Rome itinerary (long-running, good for testing resumption) +# @name rome-trip +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +Create a detailed 5-day itinerary for Rome including historical sites, +local restaurants, and day trips. Check weather and local events! + +### +# Test 9: Stream Rome trip +@romeConvId = {{rome-trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{romeConvId}} +Accept: text/event-stream + +### +# Test 10: Business trip comparison +# @name business-trip +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +Compare Tokyo and New York for a 4-day business trip. +Which has better weather next week? + +### +# Test 11: Stream business trip +@businessConvId = {{business-trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{businessConvId}} +Accept: text/plain + +### +# ===== SYSTEM ENDPOINTS ===== +### + +### +# Test 12: Check health endpoint +GET {{baseUrl}}/api/health + +### +# ===== USAGE PATTERNS ===== +# +# Standard Workflow: +# 1. POST /api/agents/TravelPlanner/run → get conversation_id +# 2. GET /api/agent/stream/{conversation_id} → read chunks from Redis +# 3. Check status: GET /api/agents/TravelPlanner/run/{conversation_id} +# 4. Resume: GET /api/agent/stream/{conversation_id}?cursor={cursor_id} +# +# Demonstrating Stream Resumption: +# 1. Run Test 8 (detailed Rome itinerary) +# 2. Run Test 9 to start streaming (note some SSE event IDs) +# 3. While streaming, cancel the request (Stop button) +# 4. Run Test 9 again - messages replay from start +# 5. Or use Test 4 with a cursor to skip to a specific point +# +# Key Benefits: +# - Agent runs in background (durable orchestration) +# - Redis callback persists all chunks with TTL (default 10 minutes) +# - Clients can disconnect/reconnect without losing messages +# - Cursor-based resumption allows skipping already-processed messages +# - SSE format provides structured events with cursors for resumption +### + +### +# ===== EDGE CASE TESTS ===== +### + +### +# Test 13: Empty prompt (should return 400) +POST {{baseUrl}}/api/agents/{{agentName}}/run +Content-Type: text/plain + +### +# Test 14: Stream non-existent conversation (should timeout or return error) +GET {{baseUrl}}/api/agent/stream/non-existent-conversation-id-12345 +Accept: text/event-stream + +### +# Debugging with Redis CLI +# +# Connect to Redis: +# docker exec -it redis redis-cli +# +# Check if stream exists: +# XLEN agent-stream:{conversation_id} +# +# View stream contents: +# XRANGE agent-stream:{conversation_id} - + +# +# View stream with entry IDs: +# XREAD STREAMS agent-stream:{conversation_id} 0-0 +# +# Check TTL: +# TTL agent-stream:{conversation_id} +### diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py new file mode 100644 index 0000000000..1c6f8236f1 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py @@ -0,0 +1,343 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Reliable streaming for durable agents using Redis Streams. + +This sample demonstrates how to implement reliable streaming for durable agents using Redis Streams. + +Components used in this sample: +- AzureOpenAIChatClient to create the travel planner agent with tools. +- AgentFunctionApp with a Redis-based callback for persistent streaming. +- Custom HTTP endpoint to resume streaming from any point using cursor-based pagination. + +Prerequisites: +- Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME +- Redis running (docker run -d --name redis -p 6379:6379 redis:latest) +- DTS and Azurite running (see parent README) +""" + +import logging +import os +import time +from datetime import timedelta + +import redis.asyncio as aioredis +from agent_framework import AgentRunResponseUpdate +import azure.functions as func +from agent_framework.azure import ( + AgentCallbackContext, + AgentFunctionApp, + AgentResponseCallbackProtocol, + AzureOpenAIChatClient, +) +from azure.identity import AzureCliCredential + +from redis_stream_response_handler import RedisStreamResponseHandler, StreamChunk +from tools import get_local_events, get_weather_forecast +from typing import Optional + +logger = logging.getLogger(__name__) + +# Configuration +REDIS_CONNECTION_STRING = os.environ.get("REDIS_CONNECTION_STRING", "redis://localhost:6379") +REDIS_STREAM_TTL_MINUTES = int(os.environ.get("REDIS_STREAM_TTL_MINUTES", "10")) + +async def get_stream_handler() -> RedisStreamResponseHandler: + """Create a new Redis stream handler for each request. + + This avoids event loop conflicts in Azure Functions by creating + a fresh Redis client in the current event loop context. + """ + # Create a new Redis client in the current event loop + redis_client = aioredis.from_url( + REDIS_CONNECTION_STRING, + encoding="utf-8", + decode_responses=False, + ) + + return RedisStreamResponseHandler( + redis_client=redis_client, + stream_ttl=timedelta(minutes=REDIS_STREAM_TTL_MINUTES), + ) + + +class RedisStreamCallback(AgentResponseCallbackProtocol): + """Callback that writes streaming updates to Redis Streams for reliable delivery. + + This enables clients to disconnect and reconnect without losing messages. + """ + + def __init__(self) -> None: + self._logger = logging.getLogger("durableagent.samples.redis_streaming") + self._sequence_numbers = {} # Track sequence per thread + + async def on_streaming_response_update( + self, + update: AgentRunResponseUpdate, + context: AgentCallbackContext, + ) -> None: + """Write streaming update to Redis Stream. + + Args: + update: The streaming response update chunk. + context: The callback context with thread_id, agent_name, etc. + """ + thread_id = context.thread_id or context.correlation_id + if not thread_id: + self._logger.warning("No thread_id or correlation_id available for streaming update") + return + + text = getattr(update, "text", None) + if not text: + return + + # Get or initialize sequence number for this thread + if thread_id not in self._sequence_numbers: + self._sequence_numbers[thread_id] = 0 + + sequence = self._sequence_numbers[thread_id] + + try: + # Get stream handler + stream_handler = await get_stream_handler() + + # Write to Redis Stream + stream_key = stream_handler._get_stream_key(thread_id) + await stream_handler._redis.xadd( + stream_key, + { + "text": text, + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + } + ) + await stream_handler._redis.expire(stream_key, stream_handler._stream_ttl) + + self._sequence_numbers[thread_id] += 1 + + self._logger.info( + "[%s][%s] Wrote chunk to Redis: seq=%d, test=%s", + context.agent_name, + thread_id[:8], + sequence, + text, + ) + except Exception as ex: + self._logger.error(f"Error writing to Redis stream: {ex}", exc_info=True) + + async def on_agent_response(self, response, context: AgentCallbackContext) -> None: + """Write end-of-stream marker when agent completes. + + Args: + response: The final agent response. + context: The callback context. + """ + thread_id = context.thread_id or context.correlation_id + if not thread_id: + return + + sequence = self._sequence_numbers.get(thread_id, 0) + + try: + # Get stream handler (creates fresh Redis client in current event loop) + stream_handler = await get_stream_handler() + + # Write end-of-stream marker + stream_key = stream_handler._get_stream_key(thread_id) + await stream_handler._redis.xadd( + stream_key, + { + "text": "", + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + "done": "true", + } + ) + await stream_handler._redis.expire(stream_key, stream_handler._stream_ttl) + + self._logger.info( + "[%s][%s] Agent completed, wrote end-of-stream marker", + context.agent_name, + thread_id[:8], + ) + + # Clean up sequence tracker + self._sequence_numbers.pop(thread_id, None) + except Exception as ex: + self._logger.error(f"Error writing end-of-stream marker: {ex}", exc_info=True) + + +# Create the Redis streaming callback +redis_callback = RedisStreamCallback() + + +# Create the travel planner agent +def create_travel_agent(): + """Create the TravelPlanner agent with tools.""" + return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="TravelPlanner", + instructions="""You are an expert travel planner who creates detailed, personalized travel itineraries. +When asked to plan a trip, you should: +1. Create a comprehensive day-by-day itinerary +2. Include specific recommendations for activities, restaurants, and attractions +3. Provide practical tips for each destination +4. Consider weather and local events when making recommendations +5. Include estimated times and logistics between activities + +Always use the available tools to get current weather forecasts and local events +for the destination to make your recommendations more relevant and timely. + +Format your response with clear headings for each day and include emoji icons +to make the itinerary easy to scan and visually appealing.""", + tools=[get_weather_forecast, get_local_events], + ) + + +# Create AgentFunctionApp with the Redis callback +app = AgentFunctionApp( + agents=[create_travel_agent()], + enable_health_check=True, + default_callback=redis_callback, + max_poll_retries=100, # Increase for longer-running agents +) + + +# Custom streaming endpoint for reading from Redis +# Use the standard /api/agents/TravelPlanner/run endpoint to start agent runs + + +@app.function_name("stream") +@app.route(route="agent/stream/{conversation_id}", methods=["GET"]) +async def stream(req: func.HttpRequest) -> func.HttpResponse: + """Resume streaming from a specific cursor position for an existing session. + + This endpoint reads all currently available chunks from Redis for the given + conversation ID, starting from the specified cursor (or beginning if no cursor). + + Use this endpoint to resume a stream after disconnection. Pass the conversation ID + and optionally a cursor (Redis entry ID) to continue from where you left off. + + Query Parameters: + cursor (optional): Redis stream entry ID to resume from. If not provided, starts from beginning. + + Response Headers: + Content-Type: text/event-stream or text/plain based on Accept header + x-conversation-id: The conversation/thread ID + + SSE Event Fields (when Accept: text/event-stream): + id: Redis stream entry ID (use as cursor for resumption) + event: "message" for content, "done" for completion, "error" for errors + data: The text content or status message + """ + try: + conversation_id = req.route_params.get("conversation_id") + if not conversation_id: + return func.HttpResponse( + "Conversation ID is required.", + status_code=400, + ) + + # Get optional cursor from query string + cursor = req.params.get("cursor") + + logger.info( + f"Resuming stream for conversation {conversation_id} from cursor: {cursor or '(beginning)'}" + ) + + # Check Accept header to determine response format + accept_header = req.headers.get("Accept", "") + use_sse_format = "text/plain" not in accept_header.lower() + + # Stream chunks from Redis + return await _stream_to_client(conversation_id, cursor, use_sse_format) + + except Exception as ex: + logger.error(f"Error in stream endpoint: {ex}", exc_info=True) + return func.HttpResponse( + f"Internal server error: {str(ex)}", + status_code=500, + ) + + +async def _stream_to_client( + conversation_id: str, + cursor: str | None, + use_sse_format: bool, +) -> func.HttpResponse: + """Stream chunks from Redis to the HTTP response. + + Args: + conversation_id: The conversation ID to stream from. + cursor: Optional cursor to resume from. If None, streams from the beginning. + use_sse_format: True to use SSE format, false for plain text. + + Returns: + HTTP response with all currently available chunks. + """ + chunks = [] + stream_handler = await get_stream_handler() + + try: + async for chunk in stream_handler.read_stream(conversation_id, cursor): + if chunk.error: + logger.warning(f"Stream error for {conversation_id}: {chunk.error}") + chunks.append(_format_error(chunk.error, use_sse_format)) + break + + if chunk.is_done: + chunks.append(_format_end_of_stream(chunk.entry_id, use_sse_format)) + break + + if chunk.text: + chunks.append(_format_chunk(chunk, use_sse_format)) + + except Exception as ex: + logger.error(f"Error reading from Redis: {ex}", exc_info=True) + chunks.append(_format_error(str(ex), use_sse_format)) + + # Return all chunks + response_body = "".join(chunks) + + return func.HttpResponse( + body=response_body, + mimetype="text/event-stream" if use_sse_format else "text/plain; charset=utf-8", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "x-conversation-id": conversation_id, + }, + ) + + +def _format_chunk(chunk: StreamChunk, use_sse_format: bool) -> str: + """Format a text chunk.""" + if use_sse_format: + return _format_sse_event("message", chunk.text, chunk.entry_id) + else: + return chunk.text + + +def _format_end_of_stream(entry_id: str, use_sse_format: bool) -> str: + """Format end-of-stream marker.""" + if use_sse_format: + return _format_sse_event("done", "[DONE]", entry_id) + else: + return "\n" + + +def _format_error(error: str, use_sse_format: bool) -> str: + """Format error message.""" + if use_sse_format: + return _format_sse_event("error", error, None) + else: + return f"\n[Error: {error}]\n" + + +def _format_sse_event(event_type: str, data: str, event_id: str | None = None) -> str: + """Format a Server-Sent Event.""" + lines = [] + if event_id: + lines.append(f"id: {event_id}") + lines.append(f"event: {event_type}") + lines.append(f"data: {data}") + lines.append("") + return "\n".join(lines) + "\n" diff --git a/python/samples/getting_started/azure_functions/03_callbacks/host.json b/python/samples/getting_started/azure_functions/03_reliable_streaming/host.json similarity index 100% rename from python/samples/getting_started/azure_functions/03_callbacks/host.json rename to python/samples/getting_started/azure_functions/03_reliable_streaming/host.json diff --git a/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template b/python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template similarity index 74% rename from python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template rename to python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template index 7d6ef15f82..b87786468e 100644 --- a/python/samples/getting_started/azure_functions/03_callbacks/local.settings.json.template +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/local.settings.json.template @@ -7,6 +7,8 @@ "TASKHUB_NAME": "default", "AZURE_OPENAI_ENDPOINT": "", "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "", - "AZURE_OPENAI_API_KEY": "" + "AZURE_OPENAI_API_KEY": "", + "REDIS_CONNECTION_STRING": "redis://localhost:6379", + "REDIS_STREAM_TTL_MINUTES": "10" } } diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py new file mode 100644 index 0000000000..67110e5e30 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py @@ -0,0 +1,201 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Redis-based streaming response handler for durable agents. + +This module provides reliable, resumable streaming of agent responses using Redis Streams +as a message broker. It enables clients to disconnect and reconnect without losing messages. +""" + +import asyncio +from dataclasses import dataclass +from datetime import timedelta +from typing import Optional +from collections.abc import AsyncIterator + +import redis.asyncio as aioredis +from agent_framework import AgentRunResponseUpdate + + +@dataclass +class StreamChunk: + """Represents a chunk of streamed data from Redis. + + Attributes: + entry_id: The Redis stream entry ID (used as cursor for resumption). + text: The text content of the chunk, if any. + is_done: Whether this is the final chunk in the stream. + error: Error message if an error occurred, otherwise None. + """ + entry_id: str + text: str | None = None + is_done: bool = False + error: str | None = None + + +class RedisStreamResponseHandler: + """Handles agent responses by persisting them to Redis Streams. + + This handler writes agent response updates to Redis Streams, enabling reliable, + resumable streaming delivery to clients. Clients can disconnect and reconnect + at any point using cursor-based pagination. + + Attributes: + MAX_EMPTY_READS: Maximum number of empty reads before timing out. + POLL_INTERVAL_MS: Interval in milliseconds between polling attempts. + """ + + MAX_EMPTY_READS = 300 + POLL_INTERVAL_MS = 1000 + + def __init__(self, redis_client: aioredis.Redis, stream_ttl: timedelta): + """Initialize the Redis stream response handler. + + Args: + redis_client: The async Redis client instance. + stream_ttl: Time-to-live for stream entries in Redis. + """ + self._redis = redis_client + self._stream_ttl = stream_ttl + + async def write_streaming_response( + self, + response_stream: AsyncIterator[AgentRunResponseUpdate], + conversation_id: str, + ) -> None: + """Write streaming response updates to Redis Stream. + + Args: + response_stream: An async iterator of response update chunks. + conversation_id: The conversation ID for this agent run. + """ + stream_key = self._get_stream_key(conversation_id) + sequence_number = 0 + + try: + async for update in response_stream: + text = update.text + + if text: + # Add text chunk to the stream + await self._redis.xadd( + stream_key, + { + "text": text, + "sequence": str(sequence_number), + "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + sequence_number += 1 + + # Add end-of-stream marker + await self._redis.xadd( + stream_key, + { + "text": "", + "sequence": str(sequence_number), + "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), + "done": "true", + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + + except Exception as ex: + # Write error to stream + await self._redis.xadd( + stream_key, + { + "error": str(ex), + "sequence": str(sequence_number), + "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + + async def read_stream( + self, + conversation_id: str, + cursor: str | None = None, + ) -> AsyncIterator[StreamChunk]: + """Read entries from a Redis Stream with cursor-based pagination. + + This method polls the Redis Stream for new entries, yielding chunks as they + become available. Clients can resume from any point using the entry_id from + a previous chunk. + + Args: + conversation_id: The conversation ID to read from. + cursor: Optional cursor to resume from. If None, starts from beginning. + + Yields: + StreamChunk instances containing text content or status markers. + """ + stream_key = self._get_stream_key(conversation_id) + start_id = cursor if cursor else "0-0" + + empty_read_count = 0 + has_seen_data = False + + while True: + try: + # Read up to 100 entries from the stream + entries = await self._redis.xread( + {stream_key: start_id}, + count=100, + block=None, + ) + + if not entries: + # No entries found + if not has_seen_data: + empty_read_count += 1 + if empty_read_count >= self.MAX_EMPTY_READS: + timeout_seconds = self.MAX_EMPTY_READS * self.POLL_INTERVAL_MS / 1000 + yield StreamChunk( + entry_id=start_id, + error=f"Stream not found or timed out after {timeout_seconds} seconds" + ) + return + + # Wait before polling again + await asyncio.sleep(self.POLL_INTERVAL_MS / 1000) + continue + + has_seen_data = True + + # Process entries from the stream + for stream_name, stream_entries in entries: + for entry_id, entry_data in stream_entries: + start_id = entry_id.decode() if isinstance(entry_id, bytes) else entry_id + + # Decode entry data + text = entry_data.get(b"text", b"").decode() if b"text" in entry_data else None + done = entry_data.get(b"done", b"").decode() if b"done" in entry_data else None + error = entry_data.get(b"error", b"").decode() if b"error" in entry_data else None + + if error: + yield StreamChunk(entry_id=start_id, error=error) + return + + if done == "true": + yield StreamChunk(entry_id=start_id, is_done=True) + return + + if text: + yield StreamChunk(entry_id=start_id, text=text) + + except Exception as ex: + yield StreamChunk(entry_id=start_id, error=str(ex)) + return + + @staticmethod + def _get_stream_key(conversation_id: str) -> str: + """Generate the Redis key for a conversation's stream. + + Args: + conversation_id: The conversation ID. + + Returns: + The Redis stream key. + """ + return f"agent-stream:{conversation_id}" diff --git a/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt b/python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt similarity index 59% rename from python/samples/getting_started/azure_functions/03_callbacks/requirements.txt rename to python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt index 8aa2c75d80..8b3943b92f 100644 --- a/python/samples/getting_started/azure_functions/03_callbacks/requirements.txt +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/requirements.txt @@ -1,2 +1,3 @@ agent-framework-azurefunctions -azure-identity \ No newline at end of file +azure-identity +redis diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py new file mode 100644 index 0000000000..6a71fdfa03 --- /dev/null +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/tools.py @@ -0,0 +1,165 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Mock travel tools for demonstration purposes. + +In a real application, these would call actual weather and events APIs. +""" + +from typing import Annotated + + +def get_weather_forecast( + destination: Annotated[str, "The destination city or location"], + date: Annotated[str, 'The date for the forecast (e.g., "2025-01-15" or "next Monday")'], +) -> str: + """Get the weather forecast for a destination on a specific date. + + Use this to provide weather-aware recommendations in the itinerary. + + Args: + destination: The destination city or location. + date: The date for the forecast. + + Returns: + A weather forecast summary. + """ + # Mock weather data based on destination for realistic responses + weather_by_region = { + "Tokyo": ("Partly cloudy with a chance of light rain", 58, 45), + "Paris": ("Overcast with occasional drizzle", 52, 41), + "New York": ("Clear and cold", 42, 28), + "London": ("Foggy morning, clearing in afternoon", 48, 38), + "Sydney": ("Sunny and warm", 82, 68), + "Rome": ("Sunny with light breeze", 62, 48), + "Barcelona": ("Partly sunny", 59, 47), + "Amsterdam": ("Cloudy with light rain", 46, 38), + "Dubai": ("Sunny and hot", 85, 72), + "Singapore": ("Tropical thunderstorms in afternoon", 88, 77), + "Bangkok": ("Hot and humid, afternoon showers", 91, 78), + "Los Angeles": ("Sunny and pleasant", 72, 55), + "San Francisco": ("Morning fog, afternoon sun", 62, 52), + "Seattle": ("Rainy with breaks", 48, 40), + "Miami": ("Warm and sunny", 78, 65), + "Honolulu": ("Tropical paradise weather", 82, 72), + } + + # Find a matching destination or use a default + forecast = ("Partly cloudy", 65, 50) + for city, weather in weather_by_region.items(): + if city.lower() in destination.lower(): + forecast = weather + break + + condition, high_f, low_f = forecast + high_c = (high_f - 32) * 5 // 9 + low_c = (low_f - 32) * 5 // 9 + + recommendation = _get_weather_recommendation(condition) + + return f"""Weather forecast for {destination} on {date}: +Conditions: {condition} +High: {high_f}°F ({high_c}°C) +Low: {low_f}°F ({low_c}°C) + +Recommendation: {recommendation}""" + + +def get_local_events( + destination: Annotated[str, "The destination city or location"], + date: Annotated[str, 'The date to search for events (e.g., "2025-01-15" or "next week")'], +) -> str: + """Get local events and activities happening at a destination around a specific date. + + Use this to suggest timely activities and experiences. + + Args: + destination: The destination city or location. + date: The date to search for events. + + Returns: + A list of local events and activities. + """ + # Mock events data based on destination + events_by_city = { + "Tokyo": [ + "🎭 Kabuki Theater Performance at Kabukiza Theatre - Traditional Japanese drama", + "🌸 Winter Illuminations at Yoyogi Park - Spectacular light displays", + "🍜 Ramen Festival at Tokyo Station - Sample ramen from across Japan", + "🎮 Gaming Expo at Tokyo Big Sight - Latest video games and technology", + ], + "Paris": [ + "🎨 Impressionist Exhibition at Musée d'Orsay - Extended evening hours", + "🍷 Wine Tasting Tour in Le Marais - Local sommelier guided", + "🎵 Jazz Night at Le Caveau de la Huchette - Historic jazz club", + "🥐 French Pastry Workshop - Learn from master pâtissiers", + ], + "New York": [ + "🎭 Broadway Show: Hamilton - Limited engagement performances", + "🏀 Knicks vs Lakers at Madison Square Garden", + "🎨 Modern Art Exhibit at MoMA - New installations", + "🍕 Pizza Walking Tour of Brooklyn - Artisan pizzerias", + ], + "London": [ + "👑 Royal Collection Exhibition at Buckingham Palace", + "🎭 West End Musical: The Phantom of the Opera", + "🍺 Craft Beer Festival at Brick Lane", + "🎪 Winter Wonderland at Hyde Park - Rides and markets", + ], + "Sydney": [ + "🏄 Pro Surfing Competition at Bondi Beach", + "🎵 Opera at Sydney Opera House - La Bohème", + "🦘 Wildlife Night Safari at Taronga Zoo", + "🍽️ Harbor Dinner Cruise with fireworks", + ], + "Rome": [ + "🏛️ After-Hours Vatican Tour - Skip the crowds", + "🍝 Pasta Making Class in Trastevere", + "🎵 Classical Concert at Borghese Gallery", + "🍷 Wine Tasting in Roman Cellars", + ], + } + + # Find events for the destination or use generic events + events = [ + "🎭 Local theater performance", + "🍽️ Food and wine festival", + "🎨 Art gallery opening", + "🎵 Live music at local venues", + ] + + for city, city_events in events_by_city.items(): + if city.lower() in destination.lower(): + events = city_events + break + + event_list = "\n• ".join(events) + return f"""Local events in {destination} around {date}: + +• {event_list} + +💡 Tip: Book popular events in advance as they may sell out quickly!""" + + +def _get_weather_recommendation(condition: str) -> str: + """Get a recommendation based on weather conditions. + + Args: + condition: The weather condition description. + + Returns: + A recommendation string. + """ + condition_lower = condition.lower() + + if "rain" in condition_lower or "drizzle" in condition_lower: + return "Bring an umbrella and waterproof jacket. Consider indoor activities for backup." + elif "fog" in condition_lower: + return "Morning visibility may be limited. Plan outdoor sightseeing for afternoon." + elif "cold" in condition_lower: + return "Layer up with warm clothing. Hot drinks and cozy cafés recommended." + elif "hot" in condition_lower or "warm" in condition_lower: + return "Stay hydrated and use sunscreen. Plan strenuous activities for cooler morning hours." + elif "thunder" in condition_lower or "storm" in condition_lower: + return "Keep an eye on weather updates. Have indoor alternatives ready." + else: + return "Pleasant conditions expected. Great day for outdoor exploration!" From 5f67638bbea277976c8881990f67a235afc348ef Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Wed, 31 Dec 2025 11:51:54 -0600 Subject: [PATCH 2/6] Fixed links and sample name --- python/samples/README.md | 2 +- .../azure_functions/03_reliable_streaming/README.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/samples/README.md b/python/samples/README.md index 7d291f119e..20798e4ad0 100644 --- a/python/samples/README.md +++ b/python/samples/README.md @@ -236,7 +236,7 @@ The recommended way to use Ollama is via the native `OllamaChatClient` from the |--------|-------------| | [`getting_started/azure_functions/01_single_agent/`](./getting_started/azure_functions/01_single_agent/) | Host a single agent in Azure Functions with Durable Extension HTTP endpoints and per-session state. | | [`getting_started/azure_functions/02_multi_agent/`](./getting_started/azure_functions/02_multi_agent/) | Register multiple agents in one function app with dedicated run routes and a health check endpoint. | -| [`getting_started/azure_functions/03_callbacks/`](./getting_started/azure_functions/03_callbacks/) | Capture streaming response telemetry via Durable Extension callbacks exposed through HTTP APIs. | +| [`getting_started/azure_functions/03_reliable_streaming/`](./getting_started/azure_functions/03_reliable_streaming/) | Implement reliable streaming for durable agents using Redis Streams with cursor-based resumption. | | [`getting_started/azure_functions/04_single_agent_orchestration_chaining/`](./getting_started/azure_functions/04_single_agent_orchestration_chaining/) | Chain sequential agent executions inside a durable orchestration while preserving the shared thread context. | | [`getting_started/azure_functions/05_multi_agent_orchestration_concurrency/`](./getting_started/azure_functions/05_multi_agent_orchestration_concurrency/) | Run two agents concurrently within a durable orchestration and combine their domain-specific outputs. | | [`getting_started/azure_functions/06_multi_agent_orchestration_conditionals/`](./getting_started/azure_functions/06_multi_agent_orchestration_conditionals/) | Route orchestration logic based on structured agent responses for spam detection and reply drafting. | diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md index d182997fcd..2ecd14cb80 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md @@ -431,8 +431,8 @@ docker rm redis ## Learn More -- [Redis Streams Documentation](https://redis.io/docs/data-types/streams/) +- [Redis Streams Documentation](https://redis.io/docs/latest/develop/data-types/streams/) - [Server-Sent Events Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html) - [Microsoft Agent Framework Documentation](https://github.com/microsoft/agent-framework) - [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) -- [Durable Task Framework](https://github.com/microsoft/durabletask) +- [Durable Task Framework](https://github.com/Azure/durabletask) From 2844a382b825cbb36e87da2c08e18f886fbb00d2 Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Wed, 31 Dec 2025 13:43:19 -0600 Subject: [PATCH 3/6] Addressed feedback --- .../03_reliable_streaming/README.md | 8 +- .../03_reliable_streaming/function_app.py | 108 +++++++----------- .../redis_stream_response_handler.py | 95 ++++++++------- 3 files changed, 94 insertions(+), 117 deletions(-) diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md index 2ecd14cb80..fd55de8ee3 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md @@ -211,7 +211,7 @@ XRANGE agent-stream:abc-123-def-456 - + # 4) "1" ``` -### Test 2: Check Agent Status +### Test 4: Check Agent Status ```bash curl http://localhost:7071/api/agents/TravelPlanner/run/abc-123-def-456 @@ -219,7 +219,7 @@ curl http://localhost:7071/api/agents/TravelPlanner/run/abc-123-def-456 Returns the orchestration status (Running, Completed, Failed, etc.). -### Test 3: Using Python to Read from Redis +### Test 5: Using Python to Read from Redis ```python import asyncio @@ -245,14 +245,14 @@ async def read_agent_response(conversation_id: str): asyncio.run(read_agent_response("abc-123-def-456")) ``` -### Test 7: Using the demo.http File +### Test 6: Using the demo.http File If you have VS Code with the REST Client extension: 1. Open `demo.http` in VS Code 2. Click "Send Request" above any test 3. The `conversation_id` is automatically captured for subsequent requests -4. Try both the custom `/agent/create` and standard `/agents/TravelPlanner/run` endpoints +4. Try the standard `/api/agents/TravelPlanner/run` endpoint to start the agent, then use `/api/agent/stream/{conversation_id}` to read the response ## How It Works diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py index 1c6f8236f1..9f11c7cb57 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py @@ -17,7 +17,6 @@ import logging import os -import time from datetime import timedelta import redis.asyncio as aioredis @@ -33,7 +32,6 @@ from redis_stream_response_handler import RedisStreamResponseHandler, StreamChunk from tools import get_local_events, get_weather_forecast -from typing import Optional logger = logging.getLogger(__name__) @@ -97,30 +95,20 @@ async def on_streaming_response_update( sequence = self._sequence_numbers[thread_id] try: - # Get stream handler - stream_handler = await get_stream_handler() - - # Write to Redis Stream - stream_key = stream_handler._get_stream_key(thread_id) - await stream_handler._redis.xadd( - stream_key, - { - "text": text, - "sequence": str(sequence), - "timestamp": str(int(time.time() * 1000)), - } - ) - await stream_handler._redis.expire(stream_key, stream_handler._stream_ttl) - - self._sequence_numbers[thread_id] += 1 - - self._logger.info( - "[%s][%s] Wrote chunk to Redis: seq=%d, test=%s", - context.agent_name, - thread_id[:8], - sequence, - text, - ) + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + # Write chunk to Redis Stream using public API + await stream_handler.write_chunk(thread_id, text, sequence) + + self._sequence_numbers[thread_id] += 1 + + self._logger.info( + "[%s][%s] Wrote chunk to Redis: seq=%d, text=%s", + context.agent_name, + thread_id[:8], + sequence, + text, + ) except Exception as ex: self._logger.error(f"Error writing to Redis stream: {ex}", exc_info=True) @@ -138,30 +126,19 @@ async def on_agent_response(self, response, context: AgentCallbackContext) -> No sequence = self._sequence_numbers.get(thread_id, 0) try: - # Get stream handler (creates fresh Redis client in current event loop) - stream_handler = await get_stream_handler() - - # Write end-of-stream marker - stream_key = stream_handler._get_stream_key(thread_id) - await stream_handler._redis.xadd( - stream_key, - { - "text": "", - "sequence": str(sequence), - "timestamp": str(int(time.time() * 1000)), - "done": "true", - } - ) - await stream_handler._redis.expire(stream_key, stream_handler._stream_ttl) - - self._logger.info( - "[%s][%s] Agent completed, wrote end-of-stream marker", - context.agent_name, - thread_id[:8], - ) - - # Clean up sequence tracker - self._sequence_numbers.pop(thread_id, None) + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + # Write end-of-stream marker using public API + await stream_handler.write_completion(thread_id, sequence) + + self._logger.info( + "[%s][%s] Agent completed, wrote end-of-stream marker", + context.agent_name, + thread_id[:8], + ) + + # Clean up sequence tracker + self._sequence_numbers.pop(thread_id, None) except Exception as ex: self._logger.error(f"Error writing end-of-stream marker: {ex}", exc_info=True) @@ -274,25 +251,26 @@ async def _stream_to_client( HTTP response with all currently available chunks. """ chunks = [] - stream_handler = await get_stream_handler() - try: - async for chunk in stream_handler.read_stream(conversation_id, cursor): - if chunk.error: - logger.warning(f"Stream error for {conversation_id}: {chunk.error}") - chunks.append(_format_error(chunk.error, use_sse_format)) - break + # Use context manager to ensure Redis client is properly closed + async with await get_stream_handler() as stream_handler: + try: + async for chunk in stream_handler.read_stream(conversation_id, cursor): + if chunk.error: + logger.warning(f"Stream error for {conversation_id}: {chunk.error}") + chunks.append(_format_error(chunk.error, use_sse_format)) + break - if chunk.is_done: - chunks.append(_format_end_of_stream(chunk.entry_id, use_sse_format)) - break + if chunk.is_done: + chunks.append(_format_end_of_stream(chunk.entry_id, use_sse_format)) + break - if chunk.text: - chunks.append(_format_chunk(chunk, use_sse_format)) + if chunk.text: + chunks.append(_format_chunk(chunk, use_sse_format)) - except Exception as ex: - logger.error(f"Error reading from Redis: {ex}", exc_info=True) - chunks.append(_format_error(str(ex), use_sse_format)) + except Exception as ex: + logger.error(f"Error reading from Redis: {ex}", exc_info=True) + chunks.append(_format_error(str(ex), use_sse_format)) # Return all chunks response_body = "".join(chunks) diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py index 67110e5e30..e6d60735bf 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/redis_stream_response_handler.py @@ -7,13 +7,12 @@ """ import asyncio +import time from dataclasses import dataclass from datetime import timedelta -from typing import Optional from collections.abc import AsyncIterator import redis.asyncio as aioredis -from agent_framework import AgentRunResponseUpdate @dataclass @@ -57,60 +56,60 @@ def __init__(self, redis_client: aioredis.Redis, stream_ttl: timedelta): self._redis = redis_client self._stream_ttl = stream_ttl - async def write_streaming_response( + async def __aenter__(self): + """Enter async context manager.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Exit async context manager and close Redis connection.""" + await self._redis.aclose() + + async def write_chunk( + self, + conversation_id: str, + text: str, + sequence: int, + ) -> None: + """Write a single text chunk to the Redis Stream. + + Args: + conversation_id: The conversation ID for this agent run. + text: The text content to write. + sequence: The sequence number for ordering. + """ + stream_key = self._get_stream_key(conversation_id) + await self._redis.xadd( + stream_key, + { + "text": text, + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + } + ) + await self._redis.expire(stream_key, self._stream_ttl) + + async def write_completion( self, - response_stream: AsyncIterator[AgentRunResponseUpdate], conversation_id: str, + sequence: int, ) -> None: - """Write streaming response updates to Redis Stream. + """Write an end-of-stream marker to the Redis Stream. Args: - response_stream: An async iterator of response update chunks. conversation_id: The conversation ID for this agent run. + sequence: The final sequence number. """ stream_key = self._get_stream_key(conversation_id) - sequence_number = 0 - - try: - async for update in response_stream: - text = update.text - - if text: - # Add text chunk to the stream - await self._redis.xadd( - stream_key, - { - "text": text, - "sequence": str(sequence_number), - "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), - } - ) - await self._redis.expire(stream_key, self._stream_ttl) - sequence_number += 1 - - # Add end-of-stream marker - await self._redis.xadd( - stream_key, - { - "text": "", - "sequence": str(sequence_number), - "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), - "done": "true", - } - ) - await self._redis.expire(stream_key, self._stream_ttl) - - except Exception as ex: - # Write error to stream - await self._redis.xadd( - stream_key, - { - "error": str(ex), - "sequence": str(sequence_number), - "timestamp": str(int(asyncio.get_event_loop().time() * 1000)), - } - ) - await self._redis.expire(stream_key, self._stream_ttl) + await self._redis.xadd( + stream_key, + { + "text": "", + "sequence": str(sequence), + "timestamp": str(int(time.time() * 1000)), + "done": "true", + } + ) + await self._redis.expire(stream_key, self._stream_ttl) async def read_stream( self, From 91b229d720e372659fa98cfbf3e52515855c9a4e Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Fri, 2 Jan 2026 11:46:34 -0600 Subject: [PATCH 4/6] Addressed feedback --- .../integration_tests/test_03_callbacks.py | 102 ----- .../test_03_reliable_streaming.py | 117 +++++ .../03_reliable_streaming/README.md | 408 +++--------------- .../03_reliable_streaming/demo.http | 170 ++------ .../03_reliable_streaming/function_app.py | 11 +- 5 files changed, 203 insertions(+), 605 deletions(-) delete mode 100644 python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py create mode 100644 python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py b/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py deleted file mode 100644 index 06414f993a..0000000000 --- a/python/packages/azurefunctions/tests/integration_tests/test_03_callbacks.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. -""" -Integration Tests for Callbacks Sample - -Tests the callbacks sample for event tracking and management. - -The function app is automatically started by the test fixture. - -Prerequisites: -- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example) -- Azurite or Azure Storage account configured - -Usage: - uv run pytest packages/azurefunctions/tests/integration_tests/test_03_callbacks.py -v -""" - -from typing import Any - -import pytest -import requests - -from .testutils import ( - TIMEOUT, - SampleTestHelper, - skip_if_azure_functions_integration_tests_disabled, -) - -# Module-level markers - applied to all tests in this file -pytestmark = [ - pytest.mark.sample("03_callbacks"), - pytest.mark.usefixtures("function_app_for_test"), - skip_if_azure_functions_integration_tests_disabled, -] - - -class TestSampleCallbacks: - """Tests for 03_callbacks sample.""" - - @pytest.fixture(autouse=True) - def _set_base_url(self, base_url: str) -> None: - """Provide the callback agent base URL for each test.""" - self.base_url = f"{base_url}/api/agents/CallbackAgent" - - @staticmethod - def _wait_for_callback_events(base_url: str, thread_id: str) -> list[dict[str, Any]]: - events: list[dict[str, Any]] = [] - response = SampleTestHelper.get(f"{base_url}/callbacks/{thread_id}") - if response.status_code == 200: - events = response.json() - return events - - def test_agent_with_callbacks(self) -> None: - """Test agent execution with callback tracking.""" - thread_id = "test-callback" - - response = SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Tell me about Python", "thread_id": thread_id}, - ) - assert response.status_code == 200 - data = response.json() - - assert data["status"] == "success" - - events = self._wait_for_callback_events(self.base_url, thread_id) - - assert events - assert any(event.get("event_type") == "final" for event in events) - - def test_get_callbacks(self) -> None: - """Test retrieving callback events.""" - thread_id = "test-callback-retrieve" - - # Send a message first - SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Hello", "thread_id": thread_id, "wait_for_response": False}, - ) - - # Get callbacks - response = SampleTestHelper.get(f"{self.base_url}/callbacks/{thread_id}") - assert response.status_code == 200 - data = response.json() - assert isinstance(data, list) - - def test_delete_callbacks(self) -> None: - """Test clearing callback events.""" - thread_id = "test-callback-delete" - - # Send a message first - SampleTestHelper.post_json( - f"{self.base_url}/run", - {"message": "Test", "thread_id": thread_id, "wait_for_response": False}, - ) - - # Delete callbacks - response = requests.delete(f"{self.base_url}/callbacks/{thread_id}", timeout=TIMEOUT) - assert response.status_code == 204 - - -if __name__ == "__main__": - pytest.main([__file__, "-v"]) diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py new file mode 100644 index 0000000000..03e22eecab --- /dev/null +++ b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py @@ -0,0 +1,117 @@ +# Copyright (c) Microsoft. All rights reserved. +""" +Integration Tests for Reliable Streaming Sample + +Tests the reliable streaming sample using Redis Streams for persistent message delivery. + +The function app is automatically started by the test fixture. + +Prerequisites: +- Azure OpenAI credentials configured (see packages/azurefunctions/tests/integration_tests/.env.example) +- Azurite or Azure Storage account configured +- Redis running (docker run -d --name redis -p 6379:6379 redis:latest) + +Usage: + uv run pytest packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py -v +""" + +import pytest +import requests + +from .testutils import ( + TIMEOUT, + SampleTestHelper, + skip_if_azure_functions_integration_tests_disabled, +) + +# Module-level markers - applied to all tests in this file +pytestmark = [ + pytest.mark.sample("03_reliable_streaming"), + pytest.mark.usefixtures("function_app_for_test"), + skip_if_azure_functions_integration_tests_disabled, +] + + +class TestSampleReliableStreaming: + """Tests for 03_reliable_streaming sample.""" + + @pytest.fixture(autouse=True) + def _set_base_url(self, base_url: str) -> None: + """Provide the base URL for each test.""" + self.base_url = base_url + self.agent_url = f"{base_url}/api/agents/TravelPlanner" + self.stream_url = f"{base_url}/api/agent/stream" + + def test_agent_run_and_stream(self) -> None: + """Test agent execution with Redis streaming.""" + # Start agent run + response = SampleTestHelper.post_text( + f"{self.agent_url}/run", + "Plan a 1-day trip to Seattle", + ) + assert response.status_code == 202 + data = response.json() + + assert data["status"] == "accepted" + assert "conversation_id" in data + conversation_id = data["conversation_id"] + + # Stream response from Redis + # Note: We use text/plain to avoid SSE parsing complexity + stream_response = requests.get( + f"{self.stream_url}/{conversation_id}", + headers={"Accept": "text/plain"}, + timeout=TIMEOUT, + ) + assert stream_response.status_code == 200 + assert len(stream_response.text) > 0 + + def test_stream_with_sse_format(self) -> None: + """Test streaming with Server-Sent Events format.""" + # Start agent run + response = SampleTestHelper.post_text( + f"{self.agent_url}/run", + "What's the weather like?", + ) + assert response.status_code == 202 + data = response.json() + conversation_id = data["conversation_id"] + + # Stream with SSE format + stream_response = requests.get( + f"{self.stream_url}/{conversation_id}", + headers={"Accept": "text/event-stream"}, + timeout=TIMEOUT, + ) + assert stream_response.status_code == 200 + assert stream_response.headers.get("content-type") == "text/event-stream" + + # Check for SSE event markers + content = stream_response.text + assert "event:" in content or "data:" in content + + def test_stream_nonexistent_conversation(self) -> None: + """Test streaming from a non-existent conversation.""" + fake_id = "nonexistent-conversation-12345" + + # Should timeout or return error after waiting + stream_response = requests.get( + f"{self.stream_url}/{fake_id}", + headers={"Accept": "text/plain"}, + timeout=TIMEOUT, + ) + assert stream_response.status_code == 200 + # Should contain error or timeout message + assert len(stream_response.text) > 0 + + def test_health_endpoint(self) -> None: + """Test health check endpoint.""" + response = SampleTestHelper.get(f"{self.base_url}/api/health") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "healthy" + assert "agents" in data + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md index fd55de8ee3..181a338962 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/README.md @@ -1,129 +1,39 @@ # Agent Response Callbacks with Redis Streaming -This sample demonstrates how to implement **agent response callbacks** for durable agents using Redis Streams for persistent, reliable message delivery. It shows how to capture streaming agent responses via callbacks and persist them to Redis, enabling clients to disconnect and reconnect without losing messages. +This sample demonstrates how to use Redis Streams with agent response callbacks to enable reliable, resumable streaming for durable agents. Clients can disconnect and reconnect without losing messages by using cursor-based pagination. ## Key Concepts Demonstrated -- **Durable Agents**: Uses `AgentFunctionApp` for orchestrated background agent execution -- **Persistent Message Delivery**: Agent responses are written to Redis Streams via a callback -- **Redis Streams**: Messages persist with configurable TTL (default 10 minutes) for reliable delivery -- **Cursor-Based Resumption**: Clients can resume from any point using Redis stream entry IDs -- **Fire-and-Forget Invocation**: Agents run asynchronously in the background -- **Agent Response Callbacks**: Demonstrates how to capture and persist streaming agent responses - -## Architecture - -The sample creates a travel planning agent using the durable agents pattern with Redis streaming: - -### Endpoints - -| Endpoint | Method | Description | -|----------|--------|-------------| -| `/api/agents/TravelPlanner/run` | POST | **Standard:** Starts a durable agent run (returns 202 Accepted) | -| `/api/agent/stream/{conversation_id}` | GET | **Custom:** Streams chunks from Redis with cursor-based resumption | -| `/api/agents/TravelPlanner/run/{conversation_id}` | GET | **Standard:** Check status of agent run | -| `/api/health` | GET | Health check endpoint | - -### Flow - -1. **Client sends prompt** to `/api/agents/TravelPlanner/run` (standard endpoint) -2. **Function app returns 202 Accepted** with `conversation_id` and `correlation_id` -3. **Agent runs in background** via durable orchestration -4. **Redis callback writes chunks** to Redis Stream as agent generates responses -5. **Client calls** `/api/agent/stream/{conversation_id}` (custom endpoint) to read chunks from Redis -6. **Messages persist with TTL** allowing clients to resume from any cursor position using optional `cursor` parameter - -### Components - -```python -# Redis callback writes streaming updates to Redis -class RedisStreamCallback(AgentResponseCallbackProtocol): - async def on_streaming_response_update(self, update, context): - # Write chunk to Redis Stream with sequence number and timestamp - - async def on_agent_response(self, response, context): - # Write end-of-stream marker when agent completes - -# AgentFunctionApp with durable agents and Redis callback -app = AgentFunctionApp( - agents=[create_travel_agent()], - default_callback=redis_callback, -) -``` +- Using `AgentResponseCallbackProtocol` to capture streaming agent responses +- Persisting streaming chunks to Redis Streams for reliable delivery +- Building a custom HTTP endpoint to read from Redis with Server-Sent Events (SSE) format +- Supporting cursor-based resumption for disconnected clients +- Managing Redis client lifecycle with async context managers ## Prerequisites -Before running this sample, ensure you have: - -1. **Azure OpenAI**: Set up an Azure OpenAI resource with a chat deployment -2. **Redis**: Running locally or in Docker for persistent message storage -3. **Durable Task Scheduler (DTS)**: Running locally for orchestration (see parent README) -4. **Azurite**: Local storage emulator for Azure Functions (see parent README) - -### Starting Redis - -Start Redis using Docker: +In addition to the common setup steps in `../README.md`, this sample requires Redis: ```bash +# Start Redis docker run -d --name redis -p 6379:6379 redis:latest ``` -To verify Redis is running: - -```bash -docker ps | grep redis -``` - -**Note:** This sample uses Redis Streams to demonstrate persistent callback storage, which is more robust than in-memory storage for production scenarios. - -## Configuration - -Update your `local.settings.json` with your Azure OpenAI credentials: +Update `local.settings.json` with your Redis connection string: ```json { "Values": { - "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com/", - "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "your-deployment-name", - "AZURE_OPENAI_API_KEY": "your-api-key-if-not-using-rbac", - "REDIS_CONNECTION_STRING": "redis://localhost:6379", - "REDIS_STREAM_TTL_MINUTES": "10", - "DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", - "AzureWebJobsStorage": "UseDevelopmentStorage=true" + "REDIS_CONNECTION_STRING": "redis://localhost:6379" } } ``` -Configuration options: -- `REDIS_CONNECTION_STRING`: Connection string for Redis (default: `redis://localhost:6379`) -- `REDIS_STREAM_TTL_MINUTES`: Time-to-live for stream entries in minutes (default: `10`) - ## Running the Sample -1. **Start required services** (Redis, DTS, Azurite): - ```bash - # Redis (if not already running) - docker run -d --name redis -p 6379:6379 redis:latest - - # DTS and Azurite (see parent README for instructions) - ``` - -2. **Install dependencies**: - ```bash - cd python/samples/getting_started/azure_functions/03_reliable_streaming - pip install -r requirements.txt - ``` - -3. **Start the Function App**: - ```bash - func start - ``` - -## Testing the Sample - -### Test 1: Basic Workflow (Standard + Custom Endpoints) +### Start the agent run -**Step 1:** Start the agent run: +The agent executes in the background via durable orchestration. The `RedisStreamCallback` persists streaming chunks to Redis: ```bash curl -X POST http://localhost:7071/api/agents/TravelPlanner/run \ @@ -131,25 +41,26 @@ curl -X POST http://localhost:7071/api/agents/TravelPlanner/run \ -d "Plan a 3-day trip to Tokyo" ``` -**Response (202 Accepted):** +Response (202 Accepted): ```json { "status": "accepted", "response": "Agent request accepted", - "message": "Plan a 3-day trip to Tokyo", "conversation_id": "abc-123-def-456", - "correlation_id": "xyz-789-ghi-012" + "correlation_id": "xyz-789" } ``` -**Step 2:** Stream chunks from Redis using the custom endpoint: +### Stream the response from Redis + +Use the custom `/api/agent/stream/{conversation_id}` endpoint to read persisted chunks: ```bash curl http://localhost:7071/api/agent/stream/abc-123-def-456 \ -H "Accept: text/event-stream" ``` -**Expected Response (SSE format with chunks):** +Response (SSE format): ``` id: 1734649123456-0 event: message @@ -164,275 +75,58 @@ event: done data: [DONE] ``` -**Step 3 (Optional):** Resume from a specific cursor: +### Resume from a cursor + +Use a cursor ID from an SSE event to skip already-processed messages: ```bash -# Use a cursor from an earlier SSE event curl "http://localhost:7071/api/agent/stream/abc-123-def-456?cursor=1734649123456-0" \ -H "Accept: text/event-stream" ``` -### Test 2: Plain Text Format (for terminals) - -```bash -# Start the run -RESPONSE=$(curl -s -X POST http://localhost:7071/api/agents/TravelPlanner/run \ - -H "Content-Type: text/plain" \ - -d "Plan a weekend in Paris") - -CONV_ID=$(echo $RESPONSE | jq -r .conversation_id) - -# Stream in plain text format -curl http://localhost:7071/api/agent/stream/$CONV_ID \ - -H "Accept: text/plain" -``` - -### Test 3: Reading from Redis using redis-cli - -```bash -# Connect to Redis -docker exec -it redis redis-cli - -# View all messages for the conversation -XRANGE agent-stream:abc-123-def-456 - + - -# Example output: -# 1) 1) "1734649123456-0" -# 2) 1) "text" -# 2) "Here's a wonderful 3-day Tokyo itinerary..." -# 3) "sequence" -# 4) "0" -# 5) "timestamp" -# 6) "1734649123456" -# 2) 1) "1734649123789-0" -# 2) 1) "text" -# 2) "Day 1: Arrival and Shibuya..." -# 3) "sequence" -# 4) "1" -``` - -### Test 4: Check Agent Status - -```bash -curl http://localhost:7071/api/agents/TravelPlanner/run/abc-123-def-456 -``` - -Returns the orchestration status (Running, Completed, Failed, etc.). - -### Test 5: Using Python to Read from Redis - -```python -import asyncio -from datetime import timedelta -import redis.asyncio as aioredis -from redis_stream_response_handler import RedisStreamResponseHandler - -async def read_agent_response(conversation_id: str): - redis_client = aioredis.from_url("redis://localhost:6379") - handler = RedisStreamResponseHandler( - redis_client=redis_client, - stream_ttl=timedelta(minutes=10) - ) - - async for chunk in handler.read_stream(conversation_id): - if chunk.is_done: - print("\n[Agent completed]") - break - if chunk.text: - print(chunk.text, end="", flush=True) - -# Usage -asyncio.run(read_agent_response("abc-123-def-456")) -``` - -### Test 6: Using the demo.http File - -If you have VS Code with the REST Client extension: - -1. Open `demo.http` in VS Code -2. Click "Send Request" above any test -3. The `conversation_id` is automatically captured for subsequent requests -4. Try the standard `/api/agents/TravelPlanner/run` endpoint to start the agent, then use `/api/agent/stream/{conversation_id}` to read the response - ## How It Works -### Durable Agents Pattern - -This sample uses the **durable agents pattern** from `AgentFunctionApp`: - -1. **Client calls `/run`** → Returns 202 Accepted immediately with `conversation_id` -2. **Durable orchestration starts** → Agent executes in background -3. **Callback writes to Redis** → Each streaming chunk is persisted with sequence number and timestamp -4. **Client reads from Redis** → Using RedisStreamResponseHandler or redis-cli -5. **Cursor-based resumption** → Client can resume from any cursor position - -### RedisStreamCallback - -The `RedisStreamCallback` class implements `AgentResponseCallbackProtocol`: - -**Writing to Redis** (`on_streaming_response_update`): -- Receives streaming updates from the agent -- Writes each chunk to a Redis Stream with metadata (sequence number, timestamp) -- Sets a TTL on the stream (default 10 minutes) +### 1. Redis Callback -**End-of-stream marker** (`on_agent_response`): -- Called when agent completes -- Writes a marker with `done: true` to signal completion - -### Reading from Redis - -Clients can read agent responses directly from Redis using the `RedisStreamResponseHandler`: +The `RedisStreamCallback` class implements `AgentResponseCallbackProtocol` to capture streaming updates: ```python -from redis_stream_response_handler import RedisStreamResponseHandler -import redis.asyncio as aioredis - -# Connect to Redis -redis_client = aioredis.from_url("redis://localhost:6379") -handler = RedisStreamResponseHandler(redis_client, stream_ttl=timedelta(minutes=10)) - -# Read all messages for a conversation -async for chunk in handler.read_stream(conversation_id, cursor=None): - if chunk.is_done: - print("Agent completed") - break - if chunk.text: - print(chunk.text, end="") -``` - -The `read_stream` method supports: -- Reading from beginning (cursor=None) or from a specific cursor -- Automatic handling of stream completion markers -- Error handling for missing or expired streams - -## Delivery Guarantees - -This pattern provides: - -- **At-least-once delivery**: Messages are persisted and can be read multiple times -- **Ordering**: Messages are delivered in the order they were written -- **Durability**: Messages persist until TTL expires (default 10 minutes) -- **Background execution**: Agent runs independently of client connection - -However, it does NOT guarantee: - -- **Exactly-once delivery**: Clients may receive duplicate messages if they resume from an earlier cursor -- **Infinite retention**: Messages expire after the configured TTL - -Clients should handle potential duplicates if necessary (e.g., using sequence numbers). - -## Advanced Usage - -### Custom TTL - -Set a custom TTL for stream entries (in minutes): - -```json -{ - "REDIS_STREAM_TTL_MINUTES": "30" -} -``` - -### Remote Redis - -Use a remote Redis instance: - -```json -{ - "REDIS_CONNECTION_STRING": "rediss://username:password@your-redis.com:6380" -} -``` - -### Checking Agent Status - -Use the standard durable agents status endpoint: - -```bash -curl http://localhost:7071/api/agents/TravelPlanner/run/{conversation_id} -``` - -This returns the orchestration status (Running, Completed, Failed, etc.). - -## Debugging - -### Redis CLI Commands - -Connect to Redis: -```bash -docker exec -it redis redis-cli -``` - -Check if stream exists: -```bash -XLEN agent-stream:{conversation_id} -``` - -View stream contents: -```bash -XRANGE agent-stream:{conversation_id} - + -``` +class RedisStreamCallback(AgentResponseCallbackProtocol): + async def on_streaming_response_update(self, update, context): + # Write chunk to Redis Stream + async with await get_stream_handler() as handler: + await handler.write_chunk(thread_id, update.text, sequence) -View with entry IDs: -```bash -XREAD STREAMS agent-stream:{conversation_id} 0-0 + async def on_agent_response(self, response, context): + # Write end-of-stream marker + async with await get_stream_handler() as handler: + await handler.write_completion(thread_id, sequence) ``` -Check TTL: -```bash -TTL agent-stream:{conversation_id} -``` +### 2. Custom Streaming Endpoint -### Logs +The `/api/agent/stream/{conversation_id}` endpoint reads from Redis: -Watch the function app logs for: -- Agent execution progress -- Redis write operations -- Client connection/disconnection events +```python +@app.route(route="agent/stream/{conversation_id}", methods=["GET"]) +async def stream(req): + conversation_id = req.route_params.get("conversation_id") + cursor = req.params.get("cursor") # Optional -```bash -# Logs show: -[durableagent.samples.redis_streaming] Wrote chunk to Redis: seq=0, len=52 -[durableagent.samples.redis_streaming] Agent completed, wrote end-of-stream marker + async with await get_stream_handler() as handler: + async for chunk in handler.read_stream(conversation_id, cursor): + # Format and return chunks ``` -## Cleanup +### 3. Redis Streams -Stop and remove the Redis container: +Messages are stored in Redis Streams with automatic TTL (default: 10 minutes): -```bash -docker stop redis -docker rm redis ``` - -## Code Structure - -``` -03_reliable_streaming/ -├── function_app.py # Main app with AgentFunctionApp and Redis callback -├── redis_stream_response_handler.py # Redis streaming utilities -├── tools.py # Mock travel tools (weather, events) -├── requirements.txt # Python dependencies -├── host.json # Azure Functions configuration -├── local.settings.json # Local environment variables -├── demo.http # REST Client test file -└── README.md # This file -``` - -## Comparison with Standard Durable Agents - -| Feature | Standard Durable Agents | This Sample | -|---------|------------------------|-------------| -| Agent execution | Background orchestration | ✅ Same | -| Client response | 202 Accepted only | ✅ Same | -| Response callback | None | ✅ Writes to Redis Streams | -| Message persistence | None (status only) | ✅ Redis Streams with TTL | -| Resumption | Not supported | ✅ Resume from any cursor | -| Client access | Status endpoint only | ✅ Direct Redis access or RedisStreamResponseHandler | - -## Learn More - -- [Redis Streams Documentation](https://redis.io/docs/latest/develop/data-types/streams/) -- [Server-Sent Events Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html) -- [Microsoft Agent Framework Documentation](https://github.com/microsoft/agent-framework) -- [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) -- [Durable Task Framework](https://github.com/Azure/durabletask) +Stream Key: agent-stream:{conversation_id} +Entry: { + "text": "chunk content", + "sequence": "0", + "timestamp": "1734649123456" +} +``` \ No newline at end of file diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http index 8892585eb2..6cdc1d10c3 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/demo.http @@ -1,167 +1,55 @@ +### Reliable Streaming with Redis - Demo HTTP Requests +### Use with the VS Code REST Client extension or any HTTP client ### -# Reliable Streaming with Redis - Demo HTTP Requests -# -# Workflow: -# 1. POST /api/agents/TravelPlanner/run - Start durable agent run (returns conversation_id) -# 2. GET /api/agent/stream/{conversation_id} - Read chunks from Redis (SSE or plain text) -# 3. Resume with ?cursor={id} parameter to skip already-processed messages -# -# Prerequisites: -# 1. Start Redis: docker run -d --name redis -p 6379:6379 redis:latest -# 2. Start DTS and Azurite (see README) -# 3. Start function app: func start +### Workflow: +### 1. POST /api/agents/{agentName}/run -> Start durable agent (returns conversation_id) +### 2. GET /api/agent/stream/{id} -> Read chunks from Redis (SSE or plain text) +### 3. Add ?cursor={id} to resume from a specific point ### +### Prerequisites: +### - Redis: docker run -d --name redis -p 6379:6379 redis:latest +### - Start function app: func start +### Variables @baseUrl = http://localhost:7071 @agentName = TravelPlanner -### -# ===== RECOMMENDED WORKFLOW ===== -### - -### -# Test 1: Start agent run (standard endpoint - triggers Redis callback) -# @name tokyo-trip -POST {{baseUrl}}/api/agents/{{agentName}}/run -Content-Type: text/plain - -Plan a 3-day trip to Tokyo - -### -# Test 2: Stream from Redis (custom endpoint with SSE format) -# Automatically captures conversation_id from Test 1 -@tokyoConvId = {{tokyo-trip.response.body.$.conversation_id}} -GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}} -Accept: text/event-stream - -### -# Test 3: Stream in plain text format (good for terminals) -GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}} -Accept: text/plain +### Health Check +GET {{baseUrl}}/api/health ### -# Test 4: Resume from a specific cursor -# Replace {cursor_id} with actual id from SSE events -GET {{baseUrl}}/api/agent/stream/{{tokyoConvId}}?cursor={cursor_id} -Accept: text/event-stream -### -# ===== MORE EXAMPLES ===== -### - -### -# Test 5: Weekend trip to Paris -# @name paris-trip +### Start Agent Run +# Starts the agent in the background via durable orchestration. +# The RedisStreamCallback persists streaming chunks to Redis. +# @name trip POST {{baseUrl}}/api/agents/{{agentName}}/run Content-Type: text/plain -Plan a weekend trip to Paris with romantic restaurants and museums - -### -# Test 6: Stream Paris trip -@parisConvId = {{paris-trip.response.body.$.conversation_id}} -GET {{baseUrl}}/api/agent/stream/{{parisConvId}} -Accept: text/event-stream - -### -# Test 7: Check agent status -GET {{baseUrl}}/api/agents/{{agentName}}/run/{{parisConvId}} - -### -# ===== LONG-RUNNING EXAMPLES ===== -### +Plan a 3-day trip to Tokyo ### -# Test 8: Detailed Rome itinerary (long-running, good for testing resumption) -# @name rome-trip -POST {{baseUrl}}/api/agents/{{agentName}}/run -Content-Type: text/plain -Create a detailed 5-day itinerary for Rome including historical sites, -local restaurants, and day trips. Check weather and local events! - -### -# Test 9: Stream Rome trip -@romeConvId = {{rome-trip.response.body.$.conversation_id}} -GET {{baseUrl}}/api/agent/stream/{{romeConvId}} +### Stream from Redis (SSE format) +# Reads persisted chunks from Redis using cursor-based pagination. +# The conversation_id is automatically captured from the previous request. +@conversationId = {{trip.response.body.$.conversation_id}} +GET {{baseUrl}}/api/agent/stream/{{conversationId}} Accept: text/event-stream ### -# Test 10: Business trip comparison -# @name business-trip -POST {{baseUrl}}/api/agents/{{agentName}}/run -Content-Type: text/plain - -Compare Tokyo and New York for a 4-day business trip. -Which has better weather next week? -### -# Test 11: Stream business trip -@businessConvId = {{business-trip.response.body.$.conversation_id}} -GET {{baseUrl}}/api/agent/stream/{{businessConvId}} +### Stream from Redis (plain text) +# Same as above, but returns plain text instead of SSE format +GET {{baseUrl}}/api/agent/stream/{{conversationId}} Accept: text/plain -### -# ===== SYSTEM ENDPOINTS ===== -### - -### -# Test 12: Check health endpoint -GET {{baseUrl}}/api/health - -### -# ===== USAGE PATTERNS ===== -# -# Standard Workflow: -# 1. POST /api/agents/TravelPlanner/run → get conversation_id -# 2. GET /api/agent/stream/{conversation_id} → read chunks from Redis -# 3. Check status: GET /api/agents/TravelPlanner/run/{conversation_id} -# 4. Resume: GET /api/agent/stream/{conversation_id}?cursor={cursor_id} -# -# Demonstrating Stream Resumption: -# 1. Run Test 8 (detailed Rome itinerary) -# 2. Run Test 9 to start streaming (note some SSE event IDs) -# 3. While streaming, cancel the request (Stop button) -# 4. Run Test 9 again - messages replay from start -# 5. Or use Test 4 with a cursor to skip to a specific point -# -# Key Benefits: -# - Agent runs in background (durable orchestration) -# - Redis callback persists all chunks with TTL (default 10 minutes) -# - Clients can disconnect/reconnect without losing messages -# - Cursor-based resumption allows skipping already-processed messages -# - SSE format provides structured events with cursors for resumption -### - -### -# ===== EDGE CASE TESTS ===== ### -### -# Test 13: Empty prompt (should return 400) -POST {{baseUrl}}/api/agents/{{agentName}}/run -Content-Type: text/plain - -### -# Test 14: Stream non-existent conversation (should timeout or return error) -GET {{baseUrl}}/api/agent/stream/non-existent-conversation-id-12345 +### Resume from cursor +# Use a cursor ID from an SSE event to skip already-processed messages +# Replace {cursor_id} with an actual entry ID from the SSE stream +GET {{baseUrl}}/api/agent/stream/{{conversationId}}?cursor={cursor_id} Accept: text/event-stream ### -# Debugging with Redis CLI -# -# Connect to Redis: -# docker exec -it redis redis-cli -# -# Check if stream exists: -# XLEN agent-stream:{conversation_id} -# -# View stream contents: -# XRANGE agent-stream:{conversation_id} - + -# -# View stream with entry IDs: -# XREAD STREAMS agent-stream:{conversation_id} 0-0 -# -# Check TTL: -# TTL agent-stream:{conversation_id} -### diff --git a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py index 9f11c7cb57..31db10a9df 100644 --- a/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py +++ b/python/samples/getting_started/azure_functions/03_reliable_streaming/function_app.py @@ -79,15 +79,16 @@ async def on_streaming_response_update( update: The streaming response update chunk. context: The callback context with thread_id, agent_name, etc. """ - thread_id = context.thread_id or context.correlation_id + thread_id = context.thread_id if not thread_id: - self._logger.warning("No thread_id or correlation_id available for streaming update") + self._logger.warning("No thread_id available for streaming update") return - text = getattr(update, "text", None) - if not text: + if not update.text: return + text = update.text + # Get or initialize sequence number for this thread if thread_id not in self._sequence_numbers: self._sequence_numbers[thread_id] = 0 @@ -119,7 +120,7 @@ async def on_agent_response(self, response, context: AgentCallbackContext) -> No response: The final agent response. context: The callback context. """ - thread_id = context.thread_id or context.correlation_id + thread_id = context.thread_id if not thread_id: return From b0abe59c62153eeb325e3b5351adbb06f30fae40 Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Fri, 2 Jan 2026 16:59:30 -0600 Subject: [PATCH 5/6] Fixed integration tests --- .../test_03_reliable_streaming.py | 63 +++++++++++-------- 1 file changed, 37 insertions(+), 26 deletions(-) diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py index 03e22eecab..62a47bf1c2 100644 --- a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py +++ b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py @@ -15,11 +15,12 @@ uv run pytest packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py -v """ +import time + import pytest import requests from .testutils import ( - TIMEOUT, SampleTestHelper, skip_if_azure_functions_integration_tests_disabled, ) @@ -45,64 +46,74 @@ def _set_base_url(self, base_url: str) -> None: def test_agent_run_and_stream(self) -> None: """Test agent execution with Redis streaming.""" # Start agent run - response = SampleTestHelper.post_text( + response = SampleTestHelper.post_json( f"{self.agent_url}/run", - "Plan a 1-day trip to Seattle", + {"message": "Plan a 1-day trip to Seattle in 1 sentence", "wait_for_response": False}, ) assert response.status_code == 202 data = response.json() - assert data["status"] == "accepted" - assert "conversation_id" in data - conversation_id = data["conversation_id"] + thread_id = data.get("thread_id") + + # Wait a moment for the agent to start writing to Redis + time.sleep(2) - # Stream response from Redis + # Stream response from Redis with shorter timeout # Note: We use text/plain to avoid SSE parsing complexity stream_response = requests.get( - f"{self.stream_url}/{conversation_id}", + f"{self.stream_url}/{thread_id}", headers={"Accept": "text/plain"}, - timeout=TIMEOUT, + timeout=30, # Shorter timeout for test ) assert stream_response.status_code == 200 - assert len(stream_response.text) > 0 def test_stream_with_sse_format(self) -> None: """Test streaming with Server-Sent Events format.""" # Start agent run - response = SampleTestHelper.post_text( + response = SampleTestHelper.post_json( f"{self.agent_url}/run", - "What's the weather like?", + {"message": "What's the weather like?", "wait_for_response": False}, ) assert response.status_code == 202 data = response.json() - conversation_id = data["conversation_id"] + thread_id = data.get("thread_id") + + # Wait for agent to start writing + time.sleep(2) # Stream with SSE format stream_response = requests.get( - f"{self.stream_url}/{conversation_id}", + f"{self.stream_url}/{thread_id}", headers={"Accept": "text/event-stream"}, - timeout=TIMEOUT, + timeout=30, # Shorter timeout ) assert stream_response.status_code == 200 - assert stream_response.headers.get("content-type") == "text/event-stream" + content_type = stream_response.headers.get("content-type", "") + assert "text/event-stream" in content_type - # Check for SSE event markers + # Check for SSE event markers if we got content content = stream_response.text - assert "event:" in content or "data:" in content + if content: + assert "event:" in content or "data:" in content def test_stream_nonexistent_conversation(self) -> None: """Test streaming from a non-existent conversation.""" fake_id = "nonexistent-conversation-12345" # Should timeout or return error after waiting - stream_response = requests.get( - f"{self.stream_url}/{fake_id}", - headers={"Accept": "text/plain"}, - timeout=TIMEOUT, - ) - assert stream_response.status_code == 200 - # Should contain error or timeout message - assert len(stream_response.text) > 0 + # Use shorter timeout since we know this will fail + try: + stream_response = requests.get( + f"{self.stream_url}/{fake_id}", + headers={"Accept": "text/plain"}, + timeout=10, # Short timeout for non-existent ID + ) + assert stream_response.status_code == 200 + # Should contain error or timeout message + assert len(stream_response.text) > 0 + except requests.exceptions.ReadTimeout: + # Timeout is expected for non-existent conversation + pass def test_health_endpoint(self) -> None: """Test health check endpoint.""" From c6ee47e1db46e8da4030bef1e66af0bba88e7ade Mon Sep 17 00:00:00 2001 From: Gavin Aguiar Date: Wed, 7 Jan 2026 14:07:28 -0600 Subject: [PATCH 6/6] Updated test --- .../test_03_reliable_streaming.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py index 62a47bf1c2..44fb8efb2f 100644 --- a/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py +++ b/python/packages/azurefunctions/tests/integration_tests/test_03_reliable_streaming.py @@ -97,23 +97,20 @@ def test_stream_with_sse_format(self) -> None: assert "event:" in content or "data:" in content def test_stream_nonexistent_conversation(self) -> None: - """Test streaming from a non-existent conversation.""" + """Test streaming from a non-existent conversation. + + The endpoint will wait for data in Redis, but since the conversation + doesn't exist, it will timeout. This is expected behavior. + """ fake_id = "nonexistent-conversation-12345" - # Should timeout or return error after waiting - # Use shorter timeout since we know this will fail - try: - stream_response = requests.get( + # Should timeout since the conversation doesn't exist + with pytest.raises(requests.exceptions.ReadTimeout): + requests.get( f"{self.stream_url}/{fake_id}", headers={"Accept": "text/plain"}, timeout=10, # Short timeout for non-existent ID ) - assert stream_response.status_code == 200 - # Should contain error or timeout message - assert len(stream_response.text) > 0 - except requests.exceptions.ReadTimeout: - # Timeout is expected for non-existent conversation - pass def test_health_endpoint(self) -> None: """Test health check endpoint."""