forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconcurrent_basic.py
More file actions
133 lines (97 loc) · 4.47 KB
/
concurrent_basic.py
File metadata and controls
133 lines (97 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "semantic-kernel",
# ]
# ///
# Run with any PEP 723 compatible runner, e.g.:
# uv run samples/semantic-kernel-migration/orchestrations/concurrent_basic.py
# Copyright (c) Microsoft. All rights reserved.
"""Side-by-side concurrent orchestrations for Agent Framework and Semantic Kernel."""
import asyncio
from collections.abc import Sequence
from typing import cast
from agent_framework import Message
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.orchestrations import ConcurrentBuilder
from azure.identity import AzureCliCredential
from semantic_kernel.agents import ChatCompletionAgent, ConcurrentOrchestration
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents import ChatMessageContent
PROMPT = "Explain the concept of temperature from multiple scientific perspectives."
######################################################################
# Semantic Kernel orchestration path
######################################################################
def build_semantic_kernel_agents() -> list[ChatCompletionAgent]:
credential = AzureCliCredential()
physics_agent = ChatCompletionAgent(
name="PhysicsExpert",
instructions=("You are an expert in physics. Answer questions from a physics perspective."),
service=AzureChatCompletion(credential=credential),
)
chemistry_agent = ChatCompletionAgent(
name="ChemistryExpert",
instructions=("You are an expert in chemistry. Answer questions from a chemistry perspective."),
service=AzureChatCompletion(credential=credential),
)
return [physics_agent, chemistry_agent]
async def run_semantic_kernel_example(prompt: str) -> Sequence[ChatMessageContent]:
concurrent_orchestration = ConcurrentOrchestration(members=build_semantic_kernel_agents())
runtime = InProcessRuntime()
runtime.start()
try:
orchestration_result = await concurrent_orchestration.invoke(task=prompt, runtime=runtime)
final_value = await orchestration_result.get(timeout=60)
if isinstance(final_value, ChatMessageContent):
return [final_value]
if isinstance(final_value, Sequence):
return list(final_value)
return []
finally:
await runtime.stop_when_idle()
def _print_semantic_kernel_outputs(outputs: Sequence[ChatMessageContent]) -> None:
if not outputs:
print("No Semantic Kernel output.")
return
print("===== Semantic Kernel Concurrent =====")
for item in outputs:
content = item.content or ""
print(f"# {item.name}\n{content}\n")
######################################################################
# Agent Framework orchestration path
######################################################################
async def run_agent_framework_example(prompt: str) -> Sequence[list[Message]]:
client = AzureOpenAIChatClient(credential=AzureCliCredential())
physics = client.as_agent(
instructions=("You are an expert in physics. Answer questions from a physics perspective."),
name="physics",
)
chemistry = client.as_agent(
instructions=("You are an expert in chemistry. Answer questions from a chemistry perspective."),
name="chemistry",
)
workflow = ConcurrentBuilder(participants=[physics, chemistry]).build()
outputs: list[list[Message]] = []
async for event in workflow.run(prompt, stream=True):
if event.type == "output":
outputs.append(cast(list[Message], event.data))
return outputs
def _print_agent_framework_outputs(conversations: Sequence[Sequence[Message]]) -> None:
if not conversations:
print("No Agent Framework output.")
return
print("===== Agent Framework Concurrent =====")
for index, conversation in enumerate(conversations, start=1):
print(f"--- Conversation {index} ---")
for message in conversation:
name = message.author_name or "assistant"
print(f"[{name}] {message.text}")
print()
async def main() -> None:
agent_framework_outputs = await run_agent_framework_example(PROMPT)
_print_agent_framework_outputs(agent_framework_outputs)
semantic_kernel_outputs = await run_semantic_kernel_example(PROMPT)
_print_semantic_kernel_outputs(semantic_kernel_outputs)
if __name__ == "__main__":
asyncio.run(main())