diff --git a/python/pyproject.toml b/python/pyproject.toml index ded393a42f..4b0d7f1f0c 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -80,6 +80,7 @@ override-dependencies = [ [tool.uv.workspace] members = [ "packages/*" ] +exclude = [ "packages/bedrock", "packages/foundry_local" ] [tool.uv.sources] agent-framework = { workspace = true } diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/README.md b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/README.md new file mode 100644 index 0000000000..661a0b6bbf --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/README.md @@ -0,0 +1,293 @@ +# Agent HTTP Streaming + +This sample demonstrates how to expose an Azure OpenAI-powered agent through an Azure Functions HTTP trigger with **real-time streaming responses**. + +## ๐Ÿ“– Overview + +This sample shows the simplest way to run an agent in Azure Functions with streaming output. Unlike the durable samples, this approach: + +- Executes the agent **directly within the HTTP request** +- Streams responses in **real-time** using Server-Sent Events (SSE) +- Is **stateless** - no storage or orchestration required +- Returns results **synchronously** during the HTTP connection + +## ๐ŸŽฏ What You'll Learn + +- How to create an agent with `AzureOpenAIChatClient` +- How to stream agent responses using Azure Functions HTTP streaming +- How to format streaming data as Server-Sent Events (SSE) +- How to handle tool calls in a streaming context +- Error handling for streaming responses + +## ๐Ÿ—๏ธ Architecture + +``` +HTTP POST Request + โ†“ +Azure Function HTTP Trigger + โ†“ +Create/Get Agent + โ†“ +Run Agent with Streaming (agent.run_stream) + โ†“ +Stream chunks via AsyncGenerator + โ†“ +Format as SSE (data: {...}\n\n) + โ†“ +HTTP Response (text/event-stream) +``` + +## ๐Ÿ“‹ Prerequisites + +Before running this sample: + +1. **Azure OpenAI Resource** + - Endpoint URL (e.g., `https://your-resource.openai.azure.com`) + - Chat deployment name (e.g., `gpt-4`, `gpt-35-turbo`) + - Authentication via Azure CLI (`az login`) or API key + +2. **Development Tools** + - Python 3.10 or higher + - [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local) + - [REST Client](https://marketplace.visualstudio.com/items?itemName=humao.rest-client) (optional) + +## ๐Ÿš€ Setup + +### 1. Create Virtual Environment + +**Windows (PowerShell):** +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +``` + +**Linux/macOS:** +```bash +python -m venv .venv +source .venv/bin/activate +``` + +### 2. Install Dependencies + +```powershell +pip install -r requirements.txt +``` + +### 3. Configure Settings + +Copy the template and update with your Azure OpenAI details: + +```powershell +cp local.settings.json.template local.settings.json +``` + +Edit `local.settings.json`: +```json +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", + "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4" + } +} +``` + +**Note:** This sample uses `AzureCliCredential` by default. Run `az login` before starting, or set `AZURE_OPENAI_API_KEY` and modify the code to use API key authentication. + +### 4. Start the Function + +```powershell +func start +``` + +You should see output like: +``` +Azure Functions Core Tools +... +Functions: + stream_agent: [POST] http://localhost:7071/api/agent/stream +``` + +## ๐Ÿงช Testing + +### Using REST Client (VS Code) + +Open `demo.http` and click "Send Request" above any request: + +```http +### Stream agent response with tool calling +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "What's the weather like in Seattle and Portland?" +} +``` + +### Using cURL + +```bash +curl -X POST http://localhost:7071/api/agent/stream \ + -H "Content-Type: application/json" \ + -d '{"message": "What is the weather in Seattle?"}' +``` + +### Using Python + +```python +import requests +import json + +response = requests.post( + 'http://localhost:7071/api/agent/stream', + json={'message': 'Tell me about Seattle weather'}, + stream=True +) + +print("Agent: ", end='', flush=True) +for line in response.iter_lines(): + if line.startswith(b'data: '): + data = json.loads(line[6:]) + if data.get('text'): + print(data['text'], end='', flush=True) +print() +``` + +### Using JavaScript (Browser) + +```html + +``` + +## ๐Ÿ“ค Expected Output + +When you send a message, you'll receive a streaming response in SSE format: + +``` +data: {"text": "Let"} + +data: {"text": " me"} + +data: {"text": " check"} + +data: {"text": " the"} + +data: {"text": " weather"} + +data: {"text": " for"} + +data: {"text": " you"} + +data: {"text": "."} + +data: {"text": " The"} + +data: {"text": " weather"} + +data: {"text": " in"} + +data: {"text": " Seattle"} + +data: {"text": " is"} + +data: {"text": " cloudy"} + +data: {"text": " with"} + +data: {"text": " a"} + +data: {"text": " high"} + +data: {"text": " of"} + +data: {"text": " 15"} + +data: {"text": "ยฐC"} + +data: {"text": "."} +``` + +Note: Tool calls happen transparently; only text output is streamed to the client. + +## ๐Ÿ” Key Concepts + +### 1. HTTP Streaming with AsyncGenerator + +The function uses an async generator to yield streaming chunks: + +```python +async def generate(): + async for chunk in _agent.run_stream(message): + if chunk.text: + yield f"data: {json.dumps({'text': chunk.text})}\n\n" +``` + +### 2. Server-Sent Events (SSE) Format + +Each chunk is formatted as SSE with the `data:` prefix and double newline: +- `data: \n\n` +- Compatible with browsers' `EventSource` API +- Simple to parse on the client side + +### 3. Stateless Execution + +- No storage account or Azurite required +- Each request is independent +- No state persisted between requests +- Agent executes directly in the HTTP handler + +### 4. Tool Calling + +The sample includes a `get_weather` function that the agent can call: +- Tool calls happen transparently during streaming +- Only text responses are streamed to the client +- All happens within the same HTTP request + +## ๐Ÿ†š Comparison with Durable Samples + +| Feature | This Sample | Durable Samples (01-10) | +|---------|-------------|-------------------------| +| Response Mode | Real-time streaming | Fire-and-forget + polling | +| State Storage | None | Azure Storage/Azurite | +| Timeout | ~230s (HTTP timeout) | Hours/days | +| Status Queries | Not supported | Supported | +| Complexity | Low | Medium-High | +| Setup Required | Minimal | Storage + orchestration | + +## โš ๏ธ Limitations + +1. **Timeout Constraints** + - HTTP connections time out (~230 seconds) + - Not suitable for very long-running tasks + - Use durable samples for longer executions + +2. **No State Persistence** + - Can't query status after completion + - Can't resume interrupted executions + - Use durable samples if you need these features + +3. **No Orchestration Patterns** + - No built-in concurrency, conditionals, or HITL + - Use durable samples for complex workflows + +## ๐ŸŽ“ Next Steps + +- **[02_workflow_http_streaming](../02_workflow_http_streaming)** - Stream multi-agent workflows +- **[04_single_agent_orchestration_chaining](../../04_single_agent_orchestration_chaining)** - Learn durable orchestration +- **[07_single_agent_orchestration_hitl](../../07_single_agent_orchestration_hitl)** - Add human-in-the-loop + +## ๐Ÿ“š References + +- [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) +- [HTTP Streaming in Azure Functions](https://learn.microsoft.com/azure/azure-functions/functions-reference-python#http-streaming) +- [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/demo.http b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/demo.http new file mode 100644 index 0000000000..97921a3358 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/demo.http @@ -0,0 +1,116 @@ +### Health Check +GET http://localhost:7071/api/health + +### Stream agent response - Simple weather query +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "What's the weather like in Seattle?" +} + +### Stream agent response - Multiple locations +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "Tell me the weather in Seattle, Portland, and San Francisco." +} + +### Stream agent response - Weather and time +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "What's the weather in Tokyo and what time is it now?" +} + +### Stream agent response - Conversational +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "I'm planning a trip to Paris next week. Can you tell me what the weather might be like?" +} + +### Stream agent response - Time only +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "message": "What time is it right now?" +} + +### Test error handling - Missing message +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +{ + "query": "This will fail because field is wrong" +} + +### Test error handling - Invalid JSON +POST http://localhost:7071/api/agent/stream +Content-Type: application/json + +This is not valid JSON + +### +# cURL Examples (for terminal/command line use) +### + +# Simple weather query +# curl -X POST http://localhost:7071/api/agent/stream \ +# -H "Content-Type: application/json" \ +# -d '{"message": "What is the weather in Seattle?"}' + +# With streaming output visible +# curl -N -X POST http://localhost:7071/api/agent/stream \ +# -H "Content-Type: application/json" \ +# -d '{"message": "Tell me about the weather in London"}' + +### +# Python Client Example +### + +# import requests +# import json +# +# response = requests.post( +# 'http://localhost:7071/api/agent/stream', +# json={'message': 'What is the weather in Seattle?'}, +# stream=True +# ) +# +# print("Agent: ", end='', flush=True) +# for line in response.iter_lines(): +# if line.startswith(b'data: '): +# try: +# data = json.loads(line[6:]) +# if data.get('text'): +# print(data['text'], end='', flush=True) +# elif data.get('type') == 'done': +# print('\n[Stream completed]') +# except json.JSONDecodeError: +# pass + +### +# JavaScript Browser Example +### + +# const eventSource = new EventSource('http://localhost:7071/api/agent/stream'); +# +# eventSource.onmessage = (event) => { +# const data = JSON.parse(event.data); +# if (data.text) { +# document.body.innerHTML += data.text; +# } else if (data.type === 'done') { +# console.log('Stream completed'); +# eventSource.close(); +# } +# }; +# +# eventSource.onerror = (error) => { +# console.error('EventSource error:', error); +# eventSource.close(); +# }; diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/function_app.py b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/function_app.py new file mode 100644 index 0000000000..dcc1ba8801 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/function_app.py @@ -0,0 +1,111 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Agent HTTP Streaming Sample + +Demonstrates exposing an agent through Azure Functions HTTP trigger with streaming responses. + +Components: +- AzureOpenAIChatClient for agent creation +- Azure Functions HTTP trigger with async generator streaming +- Azure Storage for session state persistence +""" + +import json +import sys +from pathlib import Path +from random import randint +from typing import Annotated, Any + +import azure.functions as func +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from azurefunctions.extensions.http.fastapi import Request, StreamingResponse +from pydantic import Field + +# Add parent directory to path for session_storage import +sys.path.insert(0, str(Path(__file__).parent.parent)) +from session_storage import SessionStorage + +app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +# Initialize session storage +_storage = SessionStorage() + + +def get_weather( + location: Annotated[str, Field(description="The location to get the weather for.")], +) -> str: + """Get the weather for a given location.""" + conditions = ["sunny", "cloudy", "rainy", "stormy"] + temperature = randint(10, 30) + return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {temperature}ยฐC." + + +# Create the agent (reused across requests) +_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent( + name="WeatherAgent", + instructions="You are a helpful weather assistant. Be friendly and concise.", + tools=get_weather, +) + + +@app.route(route="agent/stream", methods=["POST"]) +async def stream_agent(req: Request) -> StreamingResponse: + """Stream agent responses in real-time with session support. + + Request body: {"message": "What's the weather in Seattle?", "session_id": "optional-session-id"} + Response: Server-Sent Events stream with text chunks + """ + # Parse request + req_body = await req.json() + message = req_body.get("message") + session_id = req_body.get("session_id") + + if not message: + return StreamingResponse( + iter([json.dumps({"error": "Missing 'message' field"})]), + media_type="application/json", + status_code=400 + ) + + # Get or create thread for this session using AgentThread serialization + if session_id: + # Load thread state from storage + thread_state = await _storage.load_thread(session_id) + if thread_state is None: + # New session + thread = _agent.get_new_thread() + _storage.create_session(session_id, {"agent": "WeatherAgent"}) + else: + # Existing session - deserialize thread with full conversation history + from agent_framework import AgentThread + thread = await AgentThread.deserialize(thread_state) + else: + # No session - stateless mode + thread = _agent.get_new_thread() + + # Stream agent responses as SSE + async def generate(): + async for chunk in _agent.run_stream(message, thread=thread): + if chunk.text: + yield f"data: {json.dumps({'text': chunk.text})}\n\n" + + # Save thread state with full conversation history + if session_id: + await _storage.save_thread(session_id, thread) + + return StreamingResponse(generate(), media_type="text/event-stream") + + +""" +Expected output when you POST {"message": "What's the weather in Seattle?"}: + +data: {"text": "Let"} +data: {"text": " me"} +data: {"text": " check"} +data: {"text": " the"} +data: {"text": " weather"} +... +data: {"text": "The weather in Seattle is cloudy with a high of 15ยฐC."} +""" diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/host.json b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/host.json new file mode 100644 index 0000000000..b7e5ad1c0b --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/host.json @@ -0,0 +1,7 @@ +{ + "version": "2.0", + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/local.settings.json.template b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/local.settings.json.template new file mode 100644 index 0000000000..65550da768 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "PYTHON_ENABLE_INIT_INDEXING": "1", + "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4", + "AZURE_OPENAI_API_KEY": "" + } +} diff --git a/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/requirements.txt b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/requirements.txt new file mode 100644 index 0000000000..57d4f5623f --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/01_agent_http_streaming/requirements.txt @@ -0,0 +1,7 @@ +agent-framework +agent-framework-azure +azure-identity +azure-functions +azurefunctions-extensions-http-fastapi +azure-data-tables +azure-storage-blob diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/README.md b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/README.md new file mode 100644 index 0000000000..1d3e4df2a2 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/README.md @@ -0,0 +1,327 @@ +# Workflow HTTP Streaming + +This sample demonstrates how to run **multi-agent workflows** through an Azure Functions HTTP trigger with **real-time streaming** of workflow execution steps. + +## ๐Ÿ“– Overview + +This sample shows how to execute Agent Framework workflows (MAF) in Azure Functions with streaming output. Unlike the durable orchestration samples, this approach: + +- Executes the workflow **directly within the HTTP request** +- Streams workflow events **in real-time** (agent transitions, tool calls, responses) +- Is **stateless** - no storage or durable orchestration required +- Returns results **synchronously** during the HTTP connection + +## ๐ŸŽฏ What You'll Learn + +- How to create multi-agent workflows without durable orchestration +- How to stream workflow execution events in real-time +- How to track agent transitions and handoffs +- How to handle tool calls across multiple agents +- Formatting workflow events as Server-Sent Events (SSE) + +## ๐Ÿ—๏ธ Architecture + +``` +HTTP POST Request + โ†“ +Azure Function HTTP Trigger + โ†“ +Create Workflow (Sequential/GroupChat) + โ†“ +Run Workflow with Streaming + โ†“ +Stream workflow events via AsyncGenerator + โ”‚ + โ”œโ”€โ†’ Agent started + โ”œโ”€โ†’ Tool call + โ”œโ”€โ†’ Tool result + โ”œโ”€โ†’ Response chunk + โ”œโ”€โ†’ Agent transition + โ””โ”€โ†’ Workflow complete + โ†“ +Format as SSE (data: {...}\n\n) + โ†“ +HTTP Response (text/event-stream) +``` + +## ๐Ÿ“‹ Prerequisites + +Before running this sample: + +1. **Azure OpenAI Resource** + - Endpoint URL (e.g., `https://your-resource.openai.azure.com`) + - Chat deployment name (e.g., `gpt-4`, `gpt-35-turbo`) + - Authentication via Azure CLI (`az login`) or API key + +2. **Development Tools** + - Python 3.10 or higher + - [Azure Functions Core Tools 4.x](https://learn.microsoft.com/azure/azure-functions/functions-run-local) + - [REST Client](https://marketplace.visualstudio.com/items?itemName=humao.rest-client) (optional) + +## ๐Ÿš€ Setup + +### 1. Create Virtual Environment + +**Windows (PowerShell):** +```powershell +python -m venv .venv +.venv\Scripts\Activate.ps1 +``` + +**Linux/macOS:** +```bash +python -m venv .venv +source .venv/bin/activate +``` + +### 2. Install Dependencies + +```powershell +pip install -r requirements.txt +``` + +### 3. Configure Settings + +Copy the template and update with your Azure OpenAI details: + +```powershell +cp local.settings.json.template local.settings.json +``` + +Edit `local.settings.json`: +```json +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "AzureWebJobsFeatureFlags": "EnableWorkerIndexing", + "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4" + } +} +``` + +**Note:** This sample uses `AzureCliCredential` by default. Run `az login` before starting, or set `AZURE_OPENAI_API_KEY` and modify the code to use API key authentication. + +### 4. Start the Function + +```powershell +func start +``` + +You should see output like: +``` +Azure Functions Core Tools +... +Functions: + stream_workflow: [POST] http://localhost:7071/api/workflow/stream +``` + +## ๐Ÿงช Testing + +### Using REST Client (VS Code) + +Open `demo.http` and click "Send Request" above any request: + +```http +### Stream workflow with research assistant +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Research the weather in Seattle and write a short poem about it" +} +``` + +### Using cURL + +```bash +curl -X POST http://localhost:7071/api/workflow/stream \ + -H "Content-Type: application/json" \ + -d '{"message": "Research Seattle weather and create a report"}' +``` + +### Using Python + +```python +import requests +import json + +response = requests.post( + 'http://localhost:7071/api/workflow/stream', + json={'message': 'Research Seattle weather and write a short article'}, + stream=True +) + +print("Workflow output: ", end='', flush=True) +for line in response.iter_lines(): + if line.startswith(b'data: '): + data = json.loads(line[6:]) + if data.get('text'): + print(data['text'], end='', flush=True) +print() +``` + +## ๐Ÿ“ค Expected Output + +When you send a message, you'll receive a streaming response showing text from both agents: + +``` +data: {"text": "Let"} + +data: {"text": " me"} + +data: {"text": " check"} + +data: {"text": " the"} + +data: {"text": " weather"} + +data: {"text": " for"} + +data: {"text": " Seattle"} + +data: {"text": "."} + +data: {"text": " The"} + +data: {"text": " weather"} + +data: {"text": " in"} + +data: {"text": " Seattle"} + +data: {"text": " is"} + +data: {"text": " sunny"} + +... + +(Researcher completes, Writer begins) + +data: {"text": "Based"} + +data: {"text": " on"} + +data: {"text": " the"} + +data: {"text": " research"} + +data: {"text": ","} + +data: {"text": " here"} + +data: {"text": "'s"} + +data: {"text": " a"} + +data: {"text": " short"} + +data: {"text": " article"} + +... +``` + +Note: The stream contains text from both agents sequentially. Agent transitions and tool calls happen transparently. + +## ๐Ÿ” Key Concepts + +### 1. Workflow Without Orchestration + +This sample uses Agent Framework workflows directly without durable orchestration: + +```python +workflow = ( + SequentialBuilder() + .participants([research_agent, writer_agent]) + .build() +) +``` + +### 2. Streaming Workflow Events + +The workflow streams text chunks as agents generate responses: + +```python +async def generate(): + async for event in _workflow.run_stream(message): + if isinstance(event, AgentRunUpdateEvent) and event.data: + text = event.data.text + if text: + yield f"data: {json.dumps({'text': text})}\n\n" +``` + +Only text output is streamed; internal events (agent transitions, tool calls) happen transparently. + +### 3. Multi-Agent Coordination + +The sample demonstrates sequential workflow with handoffs: +1. Research Agent gathers information using tools +2. Writer Agent creates content based on research +3. All happens within a single HTTP request + +### 4. Simple Client-Side Handling + +Clients receive only text chunks, making parsing straightforward: +```python +for line in response.iter_lines(): + if line.startswith(b'data: '): + data = json.loads(line[6:]) + if data.get('text'): + print(data['text'], end='', flush=True) +``` + +## ๐Ÿ†š Comparison with Durable Workflow Samples + +| Feature | This Sample | Durable Workflow Samples | +|---------|-------------|--------------------------| +| Orchestration | None (direct execution) | Durable Task Framework | +| Agent Transitions | Streamed in real-time | Via orchestration activities | +| State Management | In-memory only | Persisted in storage | +| Timeout | ~230s (HTTP timeout) | Hours/days | +| Complexity | Low | Medium-High | +| Use Case | Quick multi-step tasks | Long-running workflows | + +## โš ๏ธ Limitations + +1. **Timeout Constraints** + - Limited by HTTP timeout (~230 seconds) + - Not suitable for very long workflows + - Use durable samples for extended execution + +2. **No State Persistence** + - Workflow state lost after response completes + - Can't resume interrupted workflows + - Use durable samples for stateful workflows + +3. **No Advanced Orchestration** + - No built-in concurrency patterns + - No conditional branching with state + - No human-in-the-loop approval + - Use durable samples for complex patterns + +## ๐ŸŽฏ When to Use This Approach + +**โœ… Use Non-Durable Workflow Streaming When:** +- Workflow completes within a few minutes +- You need real-time progress updates +- State persistence isn't required +- Simple sequential or group chat patterns + +**โŒ Use Durable Workflows Instead When:** +- Workflow takes longer than HTTP timeout +- You need human approval/intervention +- State must persist across requests +- Complex orchestration patterns needed + +## ๐ŸŽ“ Next Steps + +- **[01_agent_http_streaming](../01_agent_http_streaming)** - Simpler single-agent streaming +- **[05_multi_agent_orchestration_concurrency](../../05_multi_agent_orchestration_concurrency)** - Concurrent agents with durable +- **[06_multi_agent_orchestration_conditionals](../../06_multi_agent_orchestration_conditionals)** - Conditional workflows + +## ๐Ÿ“š References + +- [Agent Framework Workflows](https://github.com/microsoft/agent-framework) +- [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) +- [Server-Sent Events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/demo.http b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/demo.http new file mode 100644 index 0000000000..1bd01579cc --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/demo.http @@ -0,0 +1,169 @@ +### Health Check +GET http://localhost:7071/api/health + +### Stream workflow - Research and poem +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Research the weather in Seattle and write a short poem about it" +} + +### Stream workflow - Multiple locations +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Research the weather in Tokyo, London, and New York, then write a comparative summary" +} + +### Stream workflow - Fact-based writing +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Search for facts about Seattle and write an interesting paragraph about the city" +} + +### Stream workflow - Weather report +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Check the weather in Paris and create a travel advisory" +} + +### Stream workflow - Creative task +POST http localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Find information about weather patterns and write a haiku about rain" +} + +### Stream workflow - Complex query +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "message": "Research weather conditions in Portland and Vancouver, then write a comparative analysis with recommendations for travelers" +} + +### Test error handling - Missing message +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +{ + "query": "This will fail because field is wrong" +} + +### Test error handling - Invalid JSON +POST http://localhost:7071/api/workflow/stream +Content-Type: application/json + +This is not valid JSON + +### +# cURL Examples (for terminal/command line use) +### + +# Simple workflow request +# curl -X POST http://localhost:7071/api/workflow/stream \ +# -H "Content-Type: application/json" \ +# -d '{"message": "Research Seattle weather and write a poem"}' + +# With streaming output visible +# curl -N -X POST http://localhost:7071/api/workflow/stream \ +# -H "Content-Type: application/json" \ +# -d '{"message": "Find facts about London and write a story"}' + +### +# Python Client Example +### + +# import requests +# import json +# +# response = requests.post( +# 'http://localhost:7071/api/workflow/stream', +# json={'message': 'Research weather in Tokyo and write a haiku'}, +# stream=True +# ) +# +# current_agent = None +# print("Workflow execution:\n") +# +# for line in response.iter_lines(): +# if line.startswith(b'data: '): +# try: +# data = json.loads(line[6:]) +# +# event_type = data.get('type') +# +# if event_type == 'workflow_started': +# print("=== Workflow Started ===\n") +# elif event_type == 'agent_started': +# current_agent = data.get('agent') +# print(f"\n[{current_agent} started]") +# elif event_type == 'agent_transition': +# print(f"\n[Transition: {data.get('from')} โ†’ {data.get('to')}]") +# elif event_type == 'text': +# print(data.get('text', ''), end='', flush=True) +# elif event_type == 'tool_call': +# tool = data.get('tool') +# print(f"\n ๐Ÿ”ง Calling tool: {tool}") +# elif event_type == 'tool_result': +# result = data.get('result', '')[:100] +# print(f" โœ“ Result: {result}...") +# elif event_type == 'done': +# print(f"\n\n=== Workflow Completed ===") +# print(f"Final agent: {data.get('final_agent')}") +# elif event_type == 'error': +# print(f"\nโŒ Error: {data.get('error')}") +# +# except json.JSONDecodeError: +# pass + +### +# JavaScript Browser Example +### + +# const eventSource = new EventSource('http://localhost:7071/api/workflow/stream'); +# const output = document.getElementById('output'); +# let currentAgent = null; +# +# eventSource.onmessage = (event) => { +# const data = JSON.parse(event.data); +# +# switch(data.type) { +# case 'workflow_started': +# output.innerHTML += '
Workflow Started
'; +# break; +# case 'agent_started': +# currentAgent = data.agent; +# output.innerHTML += `
[${currentAgent}]
`; +# break; +# case 'agent_transition': +# output.innerHTML += `
${data.from} โ†’ ${data.to}
`; +# break; +# case 'text': +# output.innerHTML += data.text; +# break; +# case 'tool_call': +# output.innerHTML += `
๐Ÿ”ง ${data.tool}
`; +# break; +# case 'done': +# output.innerHTML += '
โœ“ Completed
'; +# eventSource.close(); +# break; +# case 'error': +# output.innerHTML += `
Error: ${data.error}
`; +# eventSource.close(); +# break; +# } +# }; +# +# eventSource.onerror = (error) => { +# console.error('EventSource error:', error); +# eventSource.close(); +# }; diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/function_app.py b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/function_app.py new file mode 100644 index 0000000000..490071bf64 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/function_app.py @@ -0,0 +1,199 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Workflow HTTP Streaming Sample with Conversation Persistence + +Demonstrates running a multi-agent workflow through Azure Functions with: +- Streaming responses +- Conversation history persistence using AgentThreads +- Session management with Azure Storage + +Components: +- Sequential workflow with Research and Writer agents +- AgentExecutor with persistent AgentThread for conversation history +- Azure Functions HTTP trigger with streaming +- Azure Storage for session state persistence +""" + +import json +import sys +from pathlib import Path +from random import randint +from typing import Annotated, Any + +import azure.functions as func +from agent_framework import AgentThread, SequentialBuilder +from agent_framework._workflows._agent_executor import AgentExecutor +from agent_framework._workflows._events import AgentRunUpdateEvent +from agent_framework.azure import AzureOpenAIChatClient +from azure.identity import AzureCliCredential +from azurefunctions.extensions.http.fastapi import Request, StreamingResponse +from pydantic import Field + +# Add parent directory to path for session_storage import +sys.path.insert(0, str(Path(__file__).parent.parent)) +from session_storage import SessionStorage + +app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +# Initialize session storage +_storage = SessionStorage() + + +def get_weather( + location: Annotated[str, Field(description="The location to get the weather for.")], +) -> str: + """Get the weather for a given location.""" + conditions = ["sunny", "cloudy", "rainy", "stormy"] + temperature = randint(10, 30) + return f"Weather in {location}: {conditions[randint(0, 3)]}, {temperature}ยฐC" + + +# Create chat client at module level +chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) + + +# Create workflow with persistent thread for conversation history +async def create_workflow_with_thread(session_id: str) -> tuple[Any, AgentThread]: + """Create a workflow instance with a persistent thread for conversation history. + + The AgentThread is passed to AgentExecutors, which maintains conversation + history across multiple workflow runs within the same session. + + Args: + session_id: Session identifier for loading/saving thread state + + Returns: + Tuple of (workflow, thread) where thread contains conversation history + """ + # Load or create thread for this session + thread_state = await _storage.load_thread(session_id) + + if thread_state: + # Deserialize existing thread from storage + thread = await AgentThread.deserialize(thread_state) + print(f"Loaded existing thread for session {session_id}") + else: + # Create new thread - use any agent to get a new thread + temp_agent = chat_client.create_agent( + name="TempAgent", + instructions="Temporary agent for thread creation.", + ) + thread = temp_agent.get_new_thread() + print(f"Created new thread for session {session_id}") + + # Create agents (these are stateless - thread holds the state) + research_agent = chat_client.create_agent( + name="Researcher", + instructions="Research information using tools. Be brief.", + tools=get_weather, + ) + + writer_agent = chat_client.create_agent( + name="Writer", + instructions="Write creative content based on the research. Keep it short.", + ) + + # Wrap agents in AgentExecutors with the shared thread + # This allows conversation history to persist across workflow runs + research_executor = AgentExecutor( + research_agent, + agent_thread=thread, + id="researcher" + ) + writer_executor = AgentExecutor( + writer_agent, + agent_thread=thread, + id="writer" + ) + + # Build workflow + workflow = ( + SequentialBuilder() + .participants([research_executor, writer_executor]) + .build() + ) + + return workflow, thread + + +@app.route(route="workflow/stream", methods=["POST"]) +async def stream_workflow(req: Request) -> StreamingResponse: + """Stream workflow execution with conversation history persistence. + + Uses AgentThreads passed to AgentExecutors for true multi-turn conversations. + The thread maintains conversation history across all HTTP requests for a session. + + Request body: {"message": "Research Seattle weather", "session_id": "abc123"} + Response: Server-Sent Events stream with workflow events + """ + # Parse request + req_body = await req.json() + message = req_body.get("message") + session_id = req_body.get("session_id") + + if not message: + return StreamingResponse( + iter([json.dumps({"error": "Missing 'message' field"})]), + media_type="application/json", + status_code=400 + ) + + if not session_id: + return StreamingResponse( + iter([json.dumps({"error": "Missing 'session_id' field for conversation history"})]), + media_type="application/json", + status_code=400 + ) + + # Create/load workflow with persistent thread + workflow, thread = await create_workflow_with_thread(session_id) + + # Create session metadata if new session + if not _storage.session_exists(session_id): + _storage.create_session(session_id, {"workflow": "ResearchWriter"}) + print(f"Created session {session_id}") + + # Stream workflow events as SSE + async def generate(): + # Run workflow - the AgentThread automatically maintains conversation history + async for event in workflow.run_stream(message=message): + # Only stream output from the writer agent (final response) + # Researcher agent works silently in the background + if isinstance(event, AgentRunUpdateEvent) and event.data: + if event.executor_id == "writer": + # AgentRunUpdateEvent.data is AgentRunResponseUpdate with contents + for content in event.data.contents: + if hasattr(content, 'text') and content.text: + yield f"data: {json.dumps({'text': content.text})}\n\n" + + # Save thread state after workflow completes + # This persists the conversation history for the next request + await _storage.save_thread(session_id, thread) + print(f"Saved thread state for session {session_id}") + + return StreamingResponse(generate(), media_type="text/event-stream") + + +""" +Expected output when you POST {"message": "Research Seattle weather", "session_id": "abc123"}: + +Note: Only the writer agent's output is streamed. The researcher agent works silently, +gathering data and calling tools, then the writer crafts the final response. + +data: {"text": "Based"} +data: {"text": " on"} +data: {"text": " the"} +data: {"text": " weather"} +data: {"text": " data,"} +data: {"text": " Seattle"} +data: {"text": " is"} +data: {"text": " experiencing"} +data: {"text": " sunny"} +data: {"text": " skies..."} + +On follow-up POST with same session_id, the workflow remembers previous conversation: +{"message": "Write a haiku about it", "session_id": "abc123"} + +The agents will reference the weather from the previous conversation. +""" diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/host.json b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/host.json new file mode 100644 index 0000000000..b7e5ad1c0b --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/host.json @@ -0,0 +1,7 @@ +{ + "version": "2.0", + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + } +} diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/local.settings.json.template b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/local.settings.json.template new file mode 100644 index 0000000000..65550da768 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/local.settings.json.template @@ -0,0 +1,10 @@ +{ + "IsEncrypted": false, + "Values": { + "FUNCTIONS_WORKER_RUNTIME": "python", + "PYTHON_ENABLE_INIT_INDEXING": "1", + "AZURE_OPENAI_ENDPOINT": "https://your-resource.openai.azure.com", + "AZURE_OPENAI_CHAT_DEPLOYMENT_NAME": "gpt-4", + "AZURE_OPENAI_API_KEY": "" + } +} diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/requirements.txt b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/requirements.txt new file mode 100644 index 0000000000..57d4f5623f --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/requirements.txt @@ -0,0 +1,7 @@ +agent-framework +agent-framework-azure +azure-identity +azure-functions +azurefunctions-extensions-http-fastapi +azure-data-tables +azure-storage-blob diff --git a/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/test_stream.py b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/test_stream.py new file mode 100644 index 0000000000..62e73aa2b1 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/02_workflow_http_streaming/test_stream.py @@ -0,0 +1,26 @@ +"""Simple test script for streaming endpoints.""" +import httpx +import json + +async def test_workflow_stream(): + url = "http://localhost:7071/api/workflow/stream" + data = {"message": "What's the weather in Seattle?"} + + async with httpx.AsyncClient() as client: + async with client.stream("POST", url, json=data, timeout=60.0) as response: + print(f"Status: {response.status_code}") + print(f"Headers: {response.headers}") + print("\nStreaming response:\n") + + async for line in response.aiter_lines(): + if line.startswith("data: "): + data_str = line[6:] # Remove "data: " prefix + try: + data_obj = json.loads(data_str) + print(f" {data_obj}") + except: + print(f" {data_str}") + +if __name__ == "__main__": + import asyncio + asyncio.run(test_workflow_stream()) diff --git a/python/samples/getting_started/azure_functions/non-durable/README.md b/python/samples/getting_started/azure_functions/non-durable/README.md new file mode 100644 index 0000000000..f75b39a4eb --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/README.md @@ -0,0 +1,184 @@ +# Non-Durable Azure Functions Samples + +These samples demonstrate how to expose agents and workflows through Azure Functions HTTP triggers with **real-time streaming responses**, without using Durable Functions orchestration. + +## ๐Ÿ“– Overview + +This directory contains samples that show the **simple, stateless approach** to running agents and workflows in Azure Functions. Unlike the durable samples (01-10), these samples: + +- โœ… Use direct HTTP streaming for real-time responses +- โœ… Are stateless and don't require storage accounts +- โœ… Stream results using Server-Sent Events (SSE) +- โœ… Execute synchronously within the HTTP request lifecycle +- โŒ Don't use orchestration or durable state management +- โŒ Don't support fire-and-forget or status polling patterns + +## ๐Ÿ”„ When to Use Non-Durable vs Durable + +| Feature | Non-Durable (This Folder) | Durable (01-10 Samples) | +|---------|---------------------------|-------------------------| +| **Response Time** | Real-time streaming | Async with polling | +| **State Management** | Stateless | Persisted state | +| **Execution Model** | Synchronous | Orchestrated, async | +| **Complexity** | Simple, direct | More complex patterns | +| **Best For** | Quick responses, chat UIs | Long-running workflows, HITL | +| **Timeout Limits** | HTTP timeout (~230s) | Hours/days | +| **Storage Required** | No | Yes (Azurite/Azure Storage) | + +### Choose Non-Durable When: +- You need **real-time streaming** for chat interfaces +- Responses complete within a few minutes +- You want **simple, stateless** execution +- You don't need to track execution status over time +- You want minimal infrastructure dependencies + +### Choose Durable When: +- You need **human-in-the-loop** approval workflows +- Execution takes longer than HTTP timeout limits +- You need to query status or resume execution later +- You want **complex orchestration patterns** (concurrency, conditionals) +- You need reliable state persistence + +## ๐Ÿ“‚ Samples in This Directory + +### [01_agent_http_streaming](./01_agent_http_streaming) +Demonstrates exposing a single agent through an HTTP trigger with streaming responses using Server-Sent Events (SSE). + +**Key Concepts:** +- Direct agent execution in HTTP trigger +- Streaming responses with `AsyncGenerator` +- SSE format for browser compatibility +- Tool calling with function results +- Error handling in streaming context + +### [02_workflow_http_streaming](./02_workflow_http_streaming) +Shows how to run multi-agent workflows with real-time streaming of intermediate steps and agent handoffs. + +**Key Concepts:** +- Workflow execution without orchestration +- Streaming workflow events (agent transitions, tool calls) +- Multi-agent coordination in real-time +- Step-by-step progress updates + +## ๐Ÿš€ Environment Setup + +### Prerequisites + +1. **Install Azure Functions Core Tools 4.x** + ```powershell + # Windows (using npm) + npm install -g azure-functions-core-tools@4 --unsafe-perm true + ``` + +2. **Create Azure OpenAI Resource** + - Create an [Azure OpenAI](https://azure.microsoft.com/products/ai-services/openai-service) resource + - Deploy a chat model (e.g., gpt-4, gpt-4o, gpt-35-turbo) + - Note the endpoint and deployment name + +3. **Install REST Client** (optional but recommended) + - [REST Client for VS Code](https://marketplace.visualstudio.com/items?itemName=humao.rest-client) + - Or use cURL from the command line + +### Running a Sample + +1. **Navigate to the sample directory:** + ```powershell + cd python\samples\getting_started\azure_functions\non-durable\01_agent_http_streaming + ``` + +2. **Create and activate a virtual environment:** + ```powershell + python -m venv .venv + .venv\Scripts\Activate.ps1 + ``` + +3. **Install dependencies:** + ```powershell + pip install -r requirements.txt + ``` + +4. **Configure settings:** + ```powershell + # Copy the template + cp local.settings.json.template local.settings.json + + # Edit local.settings.json with your values + # Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_CHAT_DEPLOYMENT_NAME + ``` + +5. **Authenticate with Azure CLI** (if using AzureCliCredential): + ```powershell + az login + ``` + +6. **Start the function:** + ```powershell + func start + ``` + +7. **Test the endpoint:** + - Use the `demo.http` file with REST Client extension + - Or use cURL examples in the sample's README + +## ๐ŸŒ HTTP Streaming with Azure Functions + +These samples use Azure Functions' native support for HTTP streaming via async generators: + +```python +@app.route(route="agent/stream", methods=["POST"]) +async def stream_response(req: func.HttpRequest) -> func.HttpResponse: + async def generate(): + async for chunk in agent.run_stream(message): + if chunk.text: + # Format as Server-Sent Events + yield f"data: {json.dumps({'text': chunk.text})}\n\n" + + return func.HttpResponse( + body=generate(), + mimetype="text/event-stream", + status_code=200 + ) +``` + +### Client-Side Consumption + +**JavaScript (Browser):** +```javascript +const eventSource = new EventSource('http://localhost:7071/api/agent/stream'); +eventSource.onmessage = (event) => { + const data = JSON.parse(event.data); + console.log(data.text); +}; +``` + +**Python:** +```python +import requests + +response = requests.post( + 'http://localhost:7071/api/agent/stream', + json={'message': 'Hello!'}, + stream=True +) + +for line in response.iter_lines(): + if line.startswith(b'data: '): + data = json.loads(line[6:]) + print(data['text'], end='', flush=True) +``` + +## ๐Ÿ“š Additional Resources + +- [Azure Functions Python Developer Guide](https://learn.microsoft.com/azure/azure-functions/functions-reference-python) +- [HTTP Streaming in Azure Functions](https://learn.microsoft.com/azure/azure-functions/functions-reference-python#http-streaming) +- [Server-Sent Events (SSE) Specification](https://html.spec.whatwg.org/multipage/server-sent-events.html) +- [Durable Functions Samples](../) - For orchestration patterns + +## ๐Ÿค Contributing + +When adding new non-durable samples: +- Keep them simple and focused on HTTP streaming +- Avoid orchestration patterns (use durable samples for that) +- Document when to use this approach vs durable +- Include comprehensive `demo.http` examples +- Follow the [Sample Guidelines](../../../SAMPLE_GUIDELINES.md) diff --git a/python/samples/getting_started/azure_functions/non-durable/gradio_chat.py b/python/samples/getting_started/azure_functions/non-durable/gradio_chat.py new file mode 100644 index 0000000000..3b183cd149 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/gradio_chat.py @@ -0,0 +1,153 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Gradio Chat UI for testing Azure Functions streaming endpoints. + +Usage: + pip install gradio requests + python gradio_chat.py +""" + +import gradio as gr +import requests +import json + + +def chat_with_agent(message, endpoint, history): + """Stream response from the agent or workflow endpoint.""" + if not message.strip(): + return history + + try: + response = requests.post( + endpoint, + json={"message": message}, + stream=True, + timeout=240 + ) + + if response.status_code != 200: + error_msg = f"Error {response.status_code}: {response.text}" + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": error_msg}) + return history + + # Add user message and empty agent response + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": ""}) + + agent_response = "" + for line in response.iter_lines(): + if line.startswith(b'data: '): + try: + data = json.loads(line[6:]) + if data.get('text'): + agent_response += data['text'] + # Update the last message with accumulated response + history[-1]["content"] = agent_response + yield history + except json.JSONDecodeError: + pass + + # Final update + history[-1]["content"] = agent_response if agent_response else "No response received" + yield history + + except requests.exceptions.Timeout: + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": "โš ๏ธ Request timed out (>240s)"}) + yield history + except Exception as e: + history.append({"role": "user", "content": message}) + history.append({"role": "assistant", "content": f"โŒ Error: {str(e)}"}) + yield history + + +# Create the Gradio interface +with gr.Blocks(title="Agent Chat UI") as demo: + gr.Markdown( + """ + # ๐Ÿค– Agent Framework Chat UI + + Test your Azure Functions streaming endpoints with a simple chat interface. + Make sure your function app is running on `http://localhost:7071` + """ + ) + + with gr.Row(): + endpoint = gr.Radio( + choices=[ + "http://localhost:7071/api/agent/stream", + "http://localhost:7071/api/workflow/stream" + ], + value="http://localhost:7071/api/agent/stream", + label="๐ŸŽฏ Select Endpoint", + info="Choose between single agent or multi-agent workflow" + ) + + chatbot = gr.Chatbot( + height=500, + label="Chat", + avatar_images=(None, "๐Ÿค–") + ) + + with gr.Row(): + msg = gr.Textbox( + placeholder="Type your message here... (e.g., 'What's the weather in Seattle?')", + label="Message", + scale=9 + ) + submit_btn = gr.Button("Send", scale=1, variant="primary") + + with gr.Row(): + clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear Chat") + + gr.Markdown( + """ + ### ๐Ÿ’ก Try these examples: + - "What's the weather in Seattle?" + - "Tell me about the weather in Tokyo and Paris" + - "Research the weather in London and write a short poem about it" (workflow) + """ + ) + + # Event handlers + def submit_message(message, chat_history, endpoint_url): + for updated_history in chat_with_agent(message, endpoint_url, chat_history): + yield updated_history + + msg.submit( + submit_message, + [msg, chatbot, endpoint], + chatbot + ).then( + lambda: "", + None, + msg + ) + + submit_btn.click( + submit_message, + [msg, chatbot, endpoint], + chatbot + ).then( + lambda: "", + None, + msg + ) + + clear_btn.click(lambda: [], None, chatbot) + + +if __name__ == "__main__": + print("\n๐Ÿš€ Starting Gradio Chat UI...") + print("๐Ÿ“ Make sure your Azure Function is running on http://localhost:7071") + print("๐ŸŒ Opening browser...\n") + + demo.launch( + server_name="127.0.0.1", + server_port=7860, + show_error=True, + share=False, # Set to True to create a public URL + theme=gr.themes.Soft() + ) diff --git a/python/samples/getting_started/azure_functions/non-durable/session_storage.py b/python/samples/getting_started/azure_functions/non-durable/session_storage.py new file mode 100644 index 0000000000..b8fa6cf434 --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/session_storage.py @@ -0,0 +1,170 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Session storage using Azure Storage and Agent Framework's built-in serialization. +Uses Azurite for local development. +""" + +import json +import logging +import os +from typing import Any + +from agent_framework import AgentThread +from azure.core.credentials import AzureNamedKeyCredential +from azure.data.tables import TableServiceClient +from azure.storage.blob import BlobServiceClient + +# Azurite configuration for local development +ACCOUNT_NAME = "devstoreaccount1" +ACCOUNT_KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" +BLOB_ENDPOINT = os.environ.get("BLOB_ENDPOINT", "http://127.0.0.1:10000") +TABLE_ENDPOINT = os.environ.get("TABLE_ENDPOINT", f"http://127.0.0.1:10002/{ACCOUNT_NAME}") + +TABLE_NAME = "sessions" +CONTAINER_NAME = "threads" + + +class SessionStorage: + """Manages session state using Azure Storage and AgentThread serialization.""" + + def __init__(self): + """Initialize Azure Storage clients.""" + credential = AzureNamedKeyCredential(ACCOUNT_NAME, ACCOUNT_KEY) + + # Initialize Table Storage client with explicit endpoint + self.table_service = TableServiceClient( + endpoint=TABLE_ENDPOINT, + credential=credential + ) + self.table_client = self.table_service.get_table_client(TABLE_NAME) + + # Initialize Blob Storage client with explicit endpoint + self.blob_service = BlobServiceClient( + account_url=f"{BLOB_ENDPOINT}/{ACCOUNT_NAME}", + credential=credential + ) + self.container_client = self.blob_service.get_container_client(CONTAINER_NAME) + + # Create table and container if they don't exist + try: + self.table_service.create_table_if_not_exists(TABLE_NAME) + logging.info(f"Table '{TABLE_NAME}' ready") + except Exception as e: + logging.warning(f"Could not create table: {e}") + + try: + self.container_client.create_container() + logging.info(f"Container '{CONTAINER_NAME}' created") + except Exception as e: + if "ContainerAlreadyExists" not in str(e): + logging.warning(f"Could not create container: {e}") + + async def save_thread(self, session_id: str, thread: AgentThread) -> None: + """Save AgentThread to blob storage using framework serialization.""" + try: + blob_name = f"{session_id}/thread.json" + blob_client = self.container_client.get_blob_client(blob_name) + + # Use AgentThread's built-in serialization + thread_state = await thread.serialize() + thread_json = json.dumps(thread_state, indent=2) + blob_client.upload_blob(thread_json, overwrite=True) + + logging.info(f"Saved thread for session {session_id}") + except Exception as e: + logging.error(f"Error saving thread: {e}") + raise + + async def load_thread(self, session_id: str) -> dict[str, Any] | None: + """Load AgentThread state from blob storage.""" + try: + blob_name = f"{session_id}/thread.json" + blob_client = self.container_client.get_blob_client(blob_name) + + # Download and deserialize + blob_data = blob_client.download_blob() + thread_json = blob_data.readall() + thread_state = json.loads(thread_json) + + logging.info(f"Loaded thread for session {session_id}") + return thread_state + except Exception as e: + if "BlobNotFound" not in str(e): + logging.error(f"Error loading thread: {e}") + return None + + def save_conversation(self, session_id: str, conversation: list[Any]) -> None: + """Save conversation list for workflows (simpler than thread serialization).""" + try: + blob_name = f"{session_id}/conversation.json" + blob_client = self.container_client.get_blob_client(blob_name) + + conversation_json = json.dumps(conversation, indent=2) + blob_client.upload_blob(conversation_json, overwrite=True) + + logging.info(f"Saved conversation for session {session_id}") + except Exception as e: + logging.error(f"Error saving conversation: {e}") + raise + + def load_conversation(self, session_id: str) -> list[Any] | None: + """Load conversation list for workflows.""" + try: + blob_name = f"{session_id}/conversation.json" + blob_client = self.container_client.get_blob_client(blob_name) + + blob_data = blob_client.download_blob() + conversation_json = blob_data.readall() + conversation = json.loads(conversation_json) + + logging.info(f"Loaded conversation for session {session_id}") + return conversation + except Exception as e: + if "BlobNotFound" not in str(e): + logging.error(f"Error loading conversation: {e}") + return None + + def session_exists(self, session_id: str) -> bool: + """Check if session exists in table storage.""" + try: + entity = self.table_client.get_entity( + partition_key="session", + row_key=session_id + ) + return entity is not None + except Exception: + return False + + def create_session(self, session_id: str, metadata: dict[str, Any] | None = None) -> None: + """Create a new session entry in table storage.""" + try: + entity = { + "PartitionKey": "session", + "RowKey": session_id, + **(metadata or {}) + } + self.table_client.create_entity(entity) + logging.info(f"Created session {session_id}") + except Exception as e: + logging.error(f"Error creating session: {e}") + raise + + def delete_session(self, session_id: str) -> None: + """Delete session and its thread data.""" + try: + # Delete table entity + self.table_client.delete_entity( + partition_key="session", + row_key=session_id + ) + + # Delete blob + blob_name = f"{session_id}/thread.json" + blob_client = self.container_client.get_blob_client(blob_name) + blob_client.delete_blob() + + logging.info(f"Deleted session {session_id}") + except Exception as e: + logging.error(f"Error deleting session: {e}") + raise diff --git a/python/samples/getting_started/azure_functions/non-durable/streamlit_chat.py b/python/samples/getting_started/azure_functions/non-durable/streamlit_chat.py new file mode 100644 index 0000000000..f778f1dd7f --- /dev/null +++ b/python/samples/getting_started/azure_functions/non-durable/streamlit_chat.py @@ -0,0 +1,212 @@ +# Copyright (c) Microsoft. All rights reserved. + +""" +Streamlit Chat UI for testing Azure Functions streaming endpoints. + +Features: +- Persistent sessions across page refreshes using URL query parameters +- Support for both single agent and multi-agent workflows +- Session management with conversation history + +Usage: + pip install streamlit requests + streamlit run streamlit_chat.py +""" + +import json +import uuid + +import requests +import streamlit as st + +# Configure page +st.set_page_config(page_title="Agent Framework Chat", page_icon="๐Ÿค–", layout="centered") + +# Get session ID from URL query parameters +query_params = st.query_params +url_session_id = query_params.get("session_id") + +# Initialize session state with persistence +if "sessions" not in st.session_state: + st.session_state.sessions = {} + +if "current_session_id" not in st.session_state: + # If there's a session ID in URL, use it + if url_session_id: + st.session_state.current_session_id = url_session_id + if url_session_id not in st.session_state.sessions: + st.session_state.sessions[url_session_id] = [] + else: + st.session_state.current_session_id = None + +if "messages" not in st.session_state: + # Load messages for current session + if st.session_state.current_session_id: + st.session_state.messages = st.session_state.sessions.get( + st.session_state.current_session_id, [] + ) + else: + st.session_state.messages = [] + +# Sidebar configuration +with st.sidebar: + st.title("โš™๏ธ Configuration") + + # Endpoint selection + endpoint = st.radio( + "Select Endpoint", + [ + "http://localhost:7071/api/agent/stream", + "http://localhost:7071/api/workflow/stream", + ], + index=0, + help="Choose between single agent or multi-agent workflow", + ) + + st.markdown("---") + st.markdown("### ๐Ÿ’ฌ Session Management") + + # Session selection/creation + col1, col2 = st.columns(2) + with col1: + if st.button("โž• New Session", use_container_width=True): + new_session_id = str(uuid.uuid4())[:8] + st.session_state.sessions[new_session_id] = [] + st.session_state.current_session_id = new_session_id + st.session_state.messages = [] + # Update URL to persist session across refresh + st.query_params["session_id"] = new_session_id + st.rerun() + + with col2: + if st.button("๐Ÿšซ No Session", use_container_width=True): + st.session_state.current_session_id = None + st.session_state.messages = [] + # Remove session from URL + if "session_id" in st.query_params: + del st.query_params["session_id"] + st.rerun() + + # Display existing sessions + if st.session_state.sessions: + st.markdown("**Existing Sessions:**") + for session_id in list(st.session_state.sessions.keys()): + col_a, col_b = st.columns([3, 1]) + with col_a: + if st.button( + f"{'๐ŸŸข' if session_id == st.session_state.current_session_id else 'โšช'} {session_id}", + key=f"session_{session_id}", + use_container_width=True + ): + st.session_state.current_session_id = session_id + st.session_state.messages = st.session_state.sessions[session_id] + # Update URL to persist session across refresh + st.query_params["session_id"] = session_id + st.rerun() + with col_b: + if st.button("๐Ÿ—‘๏ธ", key=f"delete_{session_id}"): + del st.session_state.sessions[session_id] + if st.session_state.current_session_id == session_id: + st.session_state.current_session_id = None + st.session_state.messages = [] + # Remove session from URL + if "session_id" in st.query_params: + del st.query_params["session_id"] + st.rerun() + + st.markdown("---") + st.markdown("### ๐Ÿ’ก Try these examples:") + st.markdown("- What's the weather in Seattle?") + st.markdown("- Tell me about the weather in Tokyo and Paris") + st.markdown("- Research the weather in London (workflow)") + + st.markdown("---") + if st.button("๐Ÿ—‘๏ธ Clear Current Chat", use_container_width=True): + st.session_state.messages = [] + if st.session_state.current_session_id: + st.session_state.sessions[st.session_state.current_session_id] = [] + st.rerun() + +# Main chat interface +st.title("๐Ÿค– Agent Framework Chat UI") +if st.session_state.current_session_id: + st.caption(f"Session: {st.session_state.current_session_id}") + st.caption("๐Ÿ’ก This session persists across page refreshes (stored in Azure)") +else: + st.caption("No session (stateless mode)") + +# Display chat history +for message in st.session_state.messages: + with st.chat_message(message["role"]): + st.markdown(message["content"]) + +# Chat input +if prompt := st.chat_input("Type your message here..."): + # Add user message to history + st.session_state.messages.append({"role": "user", "content": prompt}) + + # Save to session storage + if st.session_state.current_session_id: + st.session_state.sessions[st.session_state.current_session_id] = st.session_state.messages.copy() + + # Display user message + with st.chat_message("user"): + st.markdown(prompt) + + # Display assistant response with streaming + with st.chat_message("assistant"): + message_placeholder = st.empty() + full_response = "" + + try: + # Prepare request payload + payload = {"message": prompt} + if st.session_state.current_session_id: + payload["session_id"] = st.session_state.current_session_id + + # Make streaming request to the endpoint + response = requests.post( + endpoint, + json=payload, + stream=True, + timeout=240 + ) + + if response.status_code == 200: + # Process SSE stream + for line in response.iter_lines(): + if line.startswith(b'data: '): + try: + data = json.loads(line[6:]) + if data.get('text'): + full_response += data['text'] + message_placeholder.markdown(full_response + "โ–Œ") + except json.JSONDecodeError: + pass + + # Final update without cursor + message_placeholder.markdown(full_response) + else: + error_msg = f"โŒ Error {response.status_code}: {response.text}" + full_response = error_msg + message_placeholder.markdown(error_msg) + + except requests.exceptions.Timeout: + error_msg = "โš ๏ธ Request timed out (>240s)" + full_response = error_msg + message_placeholder.markdown(error_msg) + except Exception as e: + error_msg = f"โŒ Error: {str(e)}" + full_response = error_msg + message_placeholder.markdown(error_msg) + + # Add assistant response to history + st.session_state.messages.append({"role": "assistant", "content": full_response}) + + # Save to session storage + if st.session_state.current_session_id: + st.session_state.sessions[st.session_state.current_session_id] = st.session_state.messages.copy() + +# Footer +st.markdown("---") +st.caption("๐Ÿ“ Make sure your Azure Function is running on http://localhost:7071")