Skip to content
Draft
2 changes: 2 additions & 0 deletions python/packages/core/agent_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
Evaluator,
ExpectedToolCall,
LocalEvaluator,
RubricScore,
evaluate_agent,
evaluate_workflow,
evaluator,
Expand Down Expand Up @@ -425,6 +426,7 @@
"ResponseStream",
"Role",
"RoleLiteral",
"RubricScore",
"RunContext",
"Runner",
"RunnerContext",
Expand Down
43 changes: 43 additions & 0 deletions python/packages/core/agent_framework/_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,49 @@ def get_session(self, service_session_id: str, *, session_id: str | None = None)
"""
return AgentSession(session_id=session_id, service_session_id=service_session_id)

def as_eval_source(
self,
*,
include_instructions: bool = True,
include_tools: bool = True,
include_context_providers: bool = False,
include_examples: bool = False,
examples: Sequence[str] | None = None,
) -> str:
"""Render this agent as a textual dossier for rubric-evaluator generation.

Packages the agent's name, description, instructions, tool
definitions, and optional context-provider class names into a
single plain-text dossier suitable for passing to a rubric
generation pipeline (e.g. ``FoundryEvals.generate_rubric``).

Defaults are conservative: instructions and tools are included;
examples and context-provider class names are not.

Keyword Args:
include_instructions: Whether to include the agent's
instructions text.
include_tools: Whether to include tool definitions.
include_context_providers: Whether to include attached
context-provider class names.
include_examples: Whether to include the supplied ``examples``.
examples: Sample queries / interactions to include when
``include_examples`` is true.

Returns:
A plain-text dossier describing the agent.
"""
from ._evaluation import _render_agent_dossier # pyright: ignore[reportPrivateUsage]

return _render_agent_dossier(
self,
include_instructions=include_instructions,
include_tools=include_tools,
include_context_providers=include_context_providers,
include_examples=include_examples,
examples=examples,
)

async def _run_after_providers(
self,
*,
Expand Down
310 changes: 310 additions & 0 deletions python/packages/core/agent_framework/_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,15 @@ class EvalScoreResult:
score: Numeric score from the evaluator.
passed: Whether the item passed this evaluator's threshold.
sample: Optional raw evaluator output (rationale, metadata).
dimensions: Per-dimension scores when this evaluator is a rubric
evaluator. ``None`` for non-rubric (e.g. built-in) evaluators.
"""

name: str
score: float
passed: bool | None = None
sample: dict[str, Any] | None = None
dimensions: list[RubricScore] | None = None


@experimental(feature_id=ExperimentalFeature.EVALS)
Expand Down Expand Up @@ -496,6 +499,313 @@ def raise_for_status(self, msg: str | None = None) -> None:
detail += f" Errored items: {', '.join(summaries)}."
raise EvalNotPassedError(detail)

def assert_score_at_least(
self,
min_score: float,
*,
evaluator: str | None = None,
msg: str | None = None,
) -> None:
"""Assert every item's score (optionally filtered by evaluator) is ``>= min_score``.

Designed for CI gates on generated rubric evaluators (e.g.
``results.assert_score_at_least(0.80)``). Includes any
sub-results from workflow evaluations.

Args:
min_score: Minimum acceptable score (inclusive).
evaluator: When set, only check scores from the evaluator
whose ``EvalScoreResult.name`` matches.
msg: Optional custom failure message.

Raises:
EvalNotPassedError: When any matching score is below the threshold.
"""
offenders: list[str] = []

def _check(results: EvalResults) -> None:
for item in results.items:
for score in item.scores:
if evaluator is not None and score.name != evaluator:
continue
if score.score < min_score:
offenders.append(f"{item.item_id}/{score.name}={score.score:.3f}")
for sub in results.sub_results.values():
_check(sub)

_check(self)
if offenders:
detail = msg or (
f"{len(offenders)} score(s) below threshold {min_score}"
f"{' for ' + evaluator if evaluator else ''}: {', '.join(offenders[:5])}"
+ (f" (+{len(offenders) - 5} more)" if len(offenders) > 5 else "")
)
raise EvalNotPassedError(detail)

def assert_dimension_score_at_least(
self,
dimension_id: str,
min_score: float,
*,
evaluator: str | None = None,
require_applicable: bool = False,
msg: str | None = None,
) -> None:
"""Assert every item's score for a rubric *dimension* is ``>= min_score``.

Walks ``EvalScoreResult.dimensions`` looking for the named
dimension across all items (and sub-results). Non-applicable
dimensions are skipped by default; pass
``require_applicable=True`` to fail when no applicable score is
produced.

Args:
dimension_id: Dimension id (matches the rubric definition).
min_score: Minimum acceptable dimension score (inclusive).
evaluator: When set, only consider scores from the evaluator
whose ``EvalScoreResult.name`` matches.
require_applicable: When ``True``, missing or non-applicable
dimension scores raise. Defaults to ``False`` (skip).
msg: Optional custom failure message.

Raises:
EvalNotPassedError: When the dimension fails the threshold.
"""
offenders: list[str] = []
missing_items: list[str] = []

def _check(results: EvalResults) -> None:
for item in results.items:
found_applicable = False
for score in item.scores:
if evaluator is not None and score.name != evaluator:
continue
if not score.dimensions:
continue
for rs in score.dimensions:
if rs.id != dimension_id:
continue
if not rs.applicable:
continue
found_applicable = True
if rs.score is None or rs.score < min_score:
offenders.append(
f"{item.item_id}/{score.name}/{dimension_id}="
f"{rs.score if rs.score is not None else 'None'}"
)
if require_applicable and not found_applicable:
missing_items.append(item.item_id)
Comment thread
alliscode marked this conversation as resolved.
for sub in results.sub_results.values():
_check(sub)

_check(self)
problems: list[str] = []
if offenders:
problems.append(
f"{len(offenders)} dimension score(s) for '{dimension_id}' below {min_score}: "
f"{', '.join(offenders[:5])}" + (f" (+{len(offenders) - 5} more)" if len(offenders) > 5 else "")
)
if missing_items:
problems.append(
f"Dimension '{dimension_id}' not applicable on {len(missing_items)} item(s): "
f"{', '.join(missing_items[:5])}"
)
if problems:
raise EvalNotPassedError(msg or "; ".join(problems))

def assert_no_failed_items(self, msg: str | None = None) -> None:
"""Assert no item ended in ``fail`` or ``error`` status.

Includes any sub-results from workflow evaluations.

Args:
msg: Optional custom failure message.

Raises:
EvalNotPassedError: When any item failed or errored.
"""
bad: list[str] = []

def _check(results: EvalResults) -> None:
for item in results.items:
if item.is_failed or item.is_error:
bad.append(f"{item.item_id}:{item.status}")
for sub in results.sub_results.values():
_check(sub)

_check(self)
if bad:
detail = msg or (
f"{len(bad)} item(s) failed or errored: {', '.join(bad[:5])}"
+ (f" (+{len(bad) - 5} more)" if len(bad) > 5 else "")
)
raise EvalNotPassedError(detail)


# endregion

# region Generated rubric evaluators


@experimental(feature_id=ExperimentalFeature.EVALS)
@dataclass(frozen=True)
class RubricScore:
"""A single dimension's score from a rubric-based evaluator run.

Rubric evaluators emit one ``RubricScore`` per dimension per item.
Attached to :class:`EvalScoreResult` as a typed view of the raw
``properties.rubric_scores`` payload returned by providers such as
Foundry's generated rubric evaluators.

Attributes:
id: Dimension id (matches the rubric definition).
score: Numeric score, or ``None`` when the dimension was marked
non-applicable for this item.
applicable: Whether the dimension applied to this item.
weight: Dimension weight (mirrors the rubric definition).
reason: Short rationale produced by the evaluator.
"""

id: str
score: int | None
applicable: bool
weight: int
reason: str


# endregion

# region Eval source rendering


def _render_agent_dossier(
agent: Any,
*,
include_instructions: bool,
include_tools: bool,
include_context_providers: bool,
include_examples: bool,
examples: Sequence[str] | None,
) -> str:
"""Render a structured, plain-text dossier of an agent for rubric generation."""
lines: list[str] = []
name = getattr(agent, "name", None) or "<unnamed agent>"
description = getattr(agent, "description", None)
lines.append(f"Agent name: {name}")
if description:
lines.append(f"Description: {description}")

if include_instructions:
instructions: str | None = None
default_options: Any = getattr(agent, "default_options", None)
if isinstance(default_options, dict):
raw_instr: Any = cast("dict[str, Any]", default_options).get("instructions")
if isinstance(raw_instr, str) and raw_instr.strip():
instructions = raw_instr
if instructions is None:
raw_instr = getattr(agent, "instructions", None)
if isinstance(raw_instr, str) and raw_instr.strip():
instructions = raw_instr
if instructions:
lines.append("")
lines.append("Instructions:")
lines.append(instructions.strip())

if include_tools:
tool_defs = AgentEvalConverter.extract_tools(agent)
if tool_defs:
lines.append("")
lines.append("Tools:")
for tool in tool_defs:
tool_line = f"- {tool['name']}"
tool_desc = tool.get("description")
if tool_desc:
tool_line += f": {tool_desc}"
lines.append(tool_line)
params = tool.get("parameters")
if params:
try:
params_json = json.dumps(params, sort_keys=True)
except (TypeError, ValueError):
params_json = str(params)
lines.append(f" parameters: {params_json}")

if include_context_providers:
providers = getattr(agent, "context_providers", None)
if providers:
lines.append("")
lines.append("Context providers:")
for provider in providers:
lines.append(f"- {type(provider).__name__}")

if include_examples and examples:
lines.append("")
lines.append("Examples:")
for idx, example in enumerate(examples, start=1):
lines.append(f"{idx}. {example}")

return "\n".join(lines).strip()


def _render_workflow_dossier( # pyright: ignore[reportUnusedFunction]
workflow: Workflow,
*,
include_instructions: bool,
include_tools: bool,
include_context_providers: bool,
include_examples: bool,
examples: Sequence[str] | None,
include_topology: bool,
) -> str:
"""Render a structured, plain-text dossier of a workflow for rubric generation."""
from ._workflows._agent_executor import AgentExecutor as _AE

lines: list[str] = []
name = workflow.name or "<unnamed workflow>"
lines.append(f"Workflow name: {name}")
if workflow.description:
lines.append(f"Description: {workflow.description}")

if include_topology:
try:
topology = json.dumps(workflow.to_dict(), sort_keys=True, default=str)
except (TypeError, ValueError) as exc:
logger.debug("Workflow.to_dict() failed during eval source export: %s", exc)
topology = None
if topology:
lines.append("")
lines.append("Topology (JSON):")
lines.append(topology)

agent_executors: list[tuple[str, Any]] = []
for executor_id, executor in workflow.executors.items():
if isinstance(executor, _AE):
agent_executors.append((executor_id, executor.agent))

if agent_executors:
lines.append("")
lines.append("Agents:")
for executor_id, agent in agent_executors:
lines.append("")
lines.append(f"Executor: {executor_id}")
dossier = _render_agent_dossier(
agent,
include_instructions=include_instructions,
include_tools=include_tools,
include_context_providers=include_context_providers,
include_examples=False,
examples=None,
)
lines.append(dossier)

if include_examples and examples:
lines.append("")
lines.append("Examples:")
for idx, example in enumerate(examples, start=1):
lines.append(f"{idx}. {example}")

return "\n".join(lines).strip()


# endregion

Expand Down
Loading