Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/2_frameworks/1_react_rag/langfuse_gradio.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import gradio as gr
from dotenv import load_dotenv
from gradio.components.chatbot import ChatMessage
from langfuse import propagate_attributes

from src.prompts import REACT_INSTRUCTIONS
from src.utils import (
Expand Down Expand Up @@ -51,9 +52,14 @@ async def _main(
),
)

with langfuse_client.start_as_current_span(name="Agents-SDK-Trace") as span:
span.update(input=query)

with (
langfuse_client.start_as_current_observation(
name="Agents-SDK-Trace", as_type="agent", input=query
) as obs,
propagate_attributes(
session_id=session.session_id # Propagate session_id to all child observations
),
):
# Run the agent in streaming mode to get and display intermediate outputs
result_stream = agents.Runner.run_streamed(
main_agent, input=query, session=session
Expand All @@ -64,7 +70,7 @@ async def _main(
if len(turn_messages) > 0:
yield turn_messages

span.update(output=result_stream.final_output)
obs.update(output=result_stream.final_output)

pretty_print(turn_messages)
yield turn_messages
Expand Down Expand Up @@ -92,7 +98,7 @@ async def _main(
[
"At which university did the SVP Software Engineering"
" at Apple (as of June 2025) earn their engineering degree?",
]
],
],
title="2.1: ReAct for Retrieval-Augmented Generation with OpenAI Agent SDK + LangFuse",
)
Expand Down
34 changes: 24 additions & 10 deletions src/2_frameworks/2_multi_agent/efficient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gradio as gr
from dotenv import load_dotenv
from gradio.components.chatbot import ChatMessage
from langfuse import propagate_attributes

from src.prompts import REACT_INSTRUCTIONS
from src.utils import (
Expand All @@ -39,20 +40,28 @@ async def _main(
session = get_or_create_session(history, session_state)

# Use the main agent as the entry point- not the worker agent.
with langfuse_client.start_as_current_span(name="Agents-SDK-Trace") as span:
span.update(input=query)

with (
langfuse_client.start_as_current_observation(
name="Orchestrator-Worker", as_type="agent", input=query
) as obs,
propagate_attributes(
session_id=session.session_id # Propagate session_id to all child observations
),
):
# Run the agent in streaming mode to get and display intermediate outputs
result_stream = agents.Runner.run_streamed(
main_agent, input=query, session=session
main_agent,
input=query,
session=session,
max_turns=30, # Increase max turns to support more complex queries
)

async for _item in result_stream.stream_events():
turn_messages += oai_agent_stream_to_gradio_messages(_item)
if len(turn_messages) > 0:
yield turn_messages

span.update(output=result_stream.final_output)
obs.update(output=result_stream.final_output)


if __name__ == "__main__":
Expand Down Expand Up @@ -81,7 +90,11 @@ async def _main(
instructions=(
"You are a search agent. You receive a single search query as input. "
"Use the search tool to perform a search, then produce a concise "
"'search summary' of the key findings. Do NOT return raw search results."
"'search summary' of the key findings. "
"For every fact you include in the summary, ALWAYS include a citation "
"both in-line and at the end of the summary as a numbered list. The "
"citation at the end should include relevant metadata from the search "
"results. Do NOT return raw search results. "
),
tools=[
agents.function_tool(client_manager.knowledgebase.search_knowledgebase),
Expand Down Expand Up @@ -118,12 +131,13 @@ async def _main(
**COMMON_GRADIO_CONFIG,
examples=[
[
"At which university did the SVP Software Engineering"
" at Apple (as of June 2025) earn their engineering degree?"
"Write a structured report on the history of AI, covering: "
"1) the start in the 50s, 2) the first AI winter, 3) the second AI winter, "
"4) the modern AI boom, 5) the evolution of AI hardware, and "
"6) the societal impacts of modern AI"
],
[
"How does the annual growth in the 50th-percentile income "
"in the US compare with that in Canada?",
"Compare the box office performance of 'Oppenheimer' with the third Avatar movie"
],
],
title="2.2.2: Multi-Agent Orchestrator-worker for Retrieval-Augmented Generation",
Expand Down
33 changes: 18 additions & 15 deletions src/2_frameworks/2_multi_agent/efficient_multiple_kbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import gradio as gr
from dotenv import load_dotenv
from gradio.components.chatbot import ChatMessage
from langfuse import propagate_attributes

from src.utils import (
oai_agent_stream_to_gradio_messages,
Expand Down Expand Up @@ -36,9 +37,14 @@ async def _main(
session = get_or_create_session(history, session_state)

# Use the main agent as the entry point- not the worker agent.
with langfuse_client.start_as_current_span(name="Agents-SDK-Trace") as span:
span.update(input=query)

with (
langfuse_client.start_as_current_observation(
name="Orchestrator-Worker", as_type="agent", input=query
) as obs,
propagate_attributes(
session_id=session.session_id # Propagate session_id to all child observations
),
):
# Run the agent in streaming mode to get and display intermediate outputs
result_stream = agents.Runner.run_streamed(
main_agent,
Expand All @@ -52,7 +58,7 @@ async def _main(
if len(turn_messages) > 0:
yield turn_messages

span.update(output=result_stream.final_output)
obs.update(output=result_stream.final_output)


if __name__ == "__main__":
Expand Down Expand Up @@ -173,26 +179,23 @@ async def _main(
model=agents.OpenAIChatCompletionsModel(
model=planner_model, openai_client=client_manager.openai_client
),
# NOTE: enabling parallel tool calls here can sometimes lead to issues with
# with invalid arguments being passed to the search agent.
model_settings=agents.ModelSettings(parallel_tool_calls=False),
)

demo = gr.ChatInterface(
_main,
**COMMON_GRADIO_CONFIG,
examples=[
[
"At which university did the SVP Software Engineering"
" at Apple (as of June 2025) earn their engineering degree?"
],
[
"How does the annual growth in the 50th-percentile income "
"in the US compare with that in Canada?",
"Write a structured report on the history of AI, covering: "
"1) the start in the 50s, 2) the first AI winter, 3) the second AI winter, "
"4) the modern AI boom, 5) the evolution of AI hardware, and "
"6) the societal impacts of modern AI"
],
[
"Provide a complete list of all countries that have a population "
"over 100 million in 2026, that contain over 500 billion cubic meters "
"of internal fresh water for the year 2021, and have a mortality rate "
"less than the birth rate for the year 2021. The order of the list "
"should be based on the largest population size in 2026."
"Compare the box office performance of 'Oppenheimer' with the third Avatar movie"
],
],
title="2.2.3: Multi-Agent Orchestrator-worker for Retrieval-Augmented Generation with Multiple Tools",
Expand Down
76 changes: 45 additions & 31 deletions src/2_frameworks/2_multi_agent/fan_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ async def process_document_pair(document_pair: DocumentPair) -> ConflictSummary

Returns None if exception is encountered.
"""
with langfuse_client.start_as_current_observation(name="Conflict- suggest") as span:
with langfuse_client.start_as_current_observation(
name="Conflict - suggest", as_type="agent"
) as obs:
try:
result = await agents.Runner.run(
worker_agent, input=document_pair.get_prompt()
Expand All @@ -251,7 +253,7 @@ async def process_document_pair(document_pair: DocumentPair) -> ConflictSummary
print(e)
return None

span.update(input=document_pair, output=output)
obs.update(input=document_pair, output=output)

return output

Expand Down Expand Up @@ -283,7 +285,9 @@ async def process_one_review(

Return None upon error.
"""
with langfuse_client.start_as_current_observation(name="Review proposal") as span:
with langfuse_client.start_as_current_observation(
name="Review proposal", as_type="agent"
) as obs:
try:
result = await agents.Runner.run(
conflict_review_agent, input=conflicted_document.model_dump_json()
Expand All @@ -293,7 +297,7 @@ async def process_one_review(
print(e)
return None

span.update(input=conflicted_document, output=output)
obs.update(input=conflicted_document, output=output)

return output

Expand Down Expand Up @@ -380,33 +384,43 @@ async def process_conflict_reviews(
assert isinstance(dataset_dict, datasets.DatasetDict)
documents = list(dataset_dict["train"])[: args.num_rows]

# Run O(N^2) agents on N documents to identify pairwise e.g., conflicts.
document_pairs = build_document_pairs(documents) # type: ignore[arg-type]
print(f"Built {len(document_pairs)} pair(s) from {len(documents)} document(s).")

with langfuse_client.start_as_current_span(name="Conflicts- Pairwise") as span:
flagged_pairs = asyncio.get_event_loop().run_until_complete(
process_fan_out(document_pairs)
)
span.update(
input=args.source_dataset, output=f"{len(flagged_pairs)} pairs identified."
)

# Collect conflicts related to each document.
# from O(N^2) pairs to O(N) summarized per-document conflicts.
conflicted_documents = group_conflicts(flagged_pairs)
with langfuse_client.start_as_current_observation(
name="Fan-Out", as_type="chain", input=args.source_dataset
) as span:
# Run O(N^2) agents on N documents to identify pairwise e.g., conflicts.
document_pairs = build_document_pairs(documents) # type: ignore[arg-type]
print(f"Built {len(document_pairs)} pair(s) from {len(documents)} document(s).")

with langfuse_client.start_as_current_observation(
name="Conflicts - Pairwise", as_type="chain"
) as obs:
flagged_pairs = asyncio.get_event_loop().run_until_complete(
process_fan_out(document_pairs)
)
obs.update(
input=args.source_dataset,
output=f"{len(flagged_pairs)} pairs identified.",
)

# Review these O(N) per-document conflicts.
with langfuse_client.start_as_current_span(name="Conflicts- Review") as span:
conflict_reviews: list[ReviewedDocument] = (
asyncio.get_event_loop().run_until_complete(
process_conflict_reviews(conflicted_documents)
# Collect conflicts related to each document.
# from O(N^2) pairs to O(N) summarized per-document conflicts.
conflicted_documents = group_conflicts(flagged_pairs)

# Review these O(N) per-document conflicts.
with langfuse_client.start_as_current_observation(
name="Conflicts - Review", as_type="chain"
) as obs:
conflict_reviews: list[ReviewedDocument] = (
asyncio.get_event_loop().run_until_complete(
process_conflict_reviews(conflicted_documents)
)
)
)
span.update(input=conflicted_documents, output=conflict_reviews)
obs.update(input=conflicted_documents, output=conflict_reviews)

# Generate markdown output
with open(args.output_report, "w") as output_file:
reports = [_review.get_report() for _review in conflict_reviews]
output_file.write("\n".join(reports))
print(f"Writing report to {args.output_report}.")

# Generate markdown output
with open(args.output_report, "w") as output_file:
reports = [_review.get_report() for _review in conflict_reviews]
output_file.write("\n".join(reports))
print(f"Writing report to {args.output_report}.")
span.update(output="Wrote report to " + args.output_report)
Loading