From 42afcfdf67960a9fb9dad3bcb618beaa61ba416d Mon Sep 17 00:00:00 2001 From: Curtis Galione Date: Mon, 23 Mar 2026 14:56:06 -0700 Subject: [PATCH 1/2] fix(devserver): add x-bt-use-gateway to CORS allowed headers The Braintrust Playground sends x-bt-use-gateway when gateway routing is enabled. The api-ts CORS config already allows it, but the Python devserver's ALLOWED_HEADERS list was missing it, causing browsers to block preflight requests to remote eval servers with a CORS error. --- py/src/braintrust/devserver/cors.py | 1 + .../devserver/test_server_integration.py | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/py/src/braintrust/devserver/cors.py b/py/src/braintrust/devserver/cors.py index 0b60f4e1..9f920fbf 100644 --- a/py/src/braintrust/devserver/cors.py +++ b/py/src/braintrust/devserver/cors.py @@ -23,6 +23,7 @@ "x-bt-project-id", "x-bt-stream-fmt", "x-bt-use-cache", + "x-bt-use-gateway", "x-stainless-os", "x-stainless-lang", "x-stainless-package-version", diff --git a/py/src/braintrust/devserver/test_server_integration.py b/py/src/braintrust/devserver/test_server_integration.py index c5a803b7..e425001e 100644 --- a/py/src/braintrust/devserver/test_server_integration.py +++ b/py/src/braintrust/devserver/test_server_integration.py @@ -87,6 +87,28 @@ def test_devserver_health_check(client): assert response.text == "Hello, world!" +def test_cors_preflight_allows_gateway_header(client): + """Test that CORS preflight accepts x-bt-use-gateway header. + + The Braintrust Playground sends this header when gateway routing is + enabled. If it is missing from the devserver's allowed-headers list + the browser blocks the actual request with a CORS error. + """ + response = client.options( + "/eval", + headers={ + "origin": "https://www.braintrust.dev", + "access-control-request-method": "POST", + "access-control-request-headers": "x-bt-use-gateway", + }, + ) + assert response.status_code == 200 + allowed = response.headers.get("access-control-allow-headers", "") + assert "x-bt-use-gateway" in allowed, ( + f"x-bt-use-gateway not found in access-control-allow-headers: {allowed}" + ) + + @pytest.mark.vcr def test_devserver_list_evaluators(client, api_key, org_name): """Test listing evaluators endpoint.""" From a65aa05d388a9785d3d2069076546995e69001bf Mon Sep 17 00:00:00 2001 From: Curtis Galione Date: Thu, 7 May 2026 14:57:59 -0700 Subject: [PATCH 2/2] add eval completion webhook --- py/src/braintrust/devserver/schemas.py | 6 + py/src/braintrust/devserver/server.py | 93 +++++++++++ .../devserver/test_completion_webhook.py | 145 ++++++++++++++++++ py/src/braintrust/framework.py | 17 +- py/src/braintrust/test_framework.py | 60 ++++++++ 5 files changed, 320 insertions(+), 1 deletion(-) create mode 100644 py/src/braintrust/devserver/test_completion_webhook.py diff --git a/py/src/braintrust/devserver/schemas.py b/py/src/braintrust/devserver/schemas.py index cd8f49da..c0120ee8 100644 --- a/py/src/braintrust/devserver/schemas.py +++ b/py/src/braintrust/devserver/schemas.py @@ -42,6 +42,7 @@ class ParsedEvalBody(TypedDict, total=False): scores: list[ParsedFunctionId] experiment_name: str project_id: str + on_complete_webhook: str parent: str | ParsedParent stream: bool @@ -244,6 +245,11 @@ def parse_eval_body(request_data: str | bytes | dict) -> ParsedEvalBody: raise ValidationError("project_id must be a string") parsed["project_id"] = data["project_id"] + if "on_complete_webhook" in data: + if not isinstance(data["on_complete_webhook"], str): + raise ValidationError("on_complete_webhook must be a string") + parsed["on_complete_webhook"] = data["on_complete_webhook"] + if "parent" in data: parent = data["parent"] # InvokeParent can be a string or a complex object diff --git a/py/src/braintrust/devserver/server.py b/py/src/braintrust/devserver/server.py index 4b3ff79a..327c5e20 100644 --- a/py/src/braintrust/devserver/server.py +++ b/py/src/braintrust/devserver/server.py @@ -2,6 +2,9 @@ import json import sys import textwrap +import urllib.error +import urllib.request +from datetime import datetime, timezone from typing import Any @@ -52,6 +55,80 @@ _all_evaluators: dict[str, Evaluator[Any, Any]] = {} +WEBHOOK_ATTEMPTS = 3 +WEBHOOK_BACKOFF_SECONDS = (1.0, 2.0, 4.0) +WEBHOOK_TIMEOUT_SECONDS = 10.0 + + +def _pick_string(data: dict[str, Any], keys: list[str]) -> str | None: + for key in keys: + value = data.get(key) + if isinstance(value, str) and value: + return value + return None + + +def build_completion_webhook_payload(summary: dict[str, Any]) -> dict[str, Any]: + return { + "event": "experiment.completed", + "summary": summary, + "experiment": { + "projectId": _pick_string(summary, ["projectId", "project_id"]), + "projectName": _pick_string(summary, ["projectName", "project_name"]), + "projectUrl": _pick_string(summary, ["projectUrl", "project_url"]), + "experimentId": _pick_string(summary, ["experimentId", "experiment_id"]), + "experimentName": _pick_string(summary, ["experimentName", "experiment_name"]), + "experimentUrl": _pick_string(summary, ["experimentUrl", "experiment_url"]), + }, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + +def _post_completion_webhook_request(webhook_url: str, body: dict[str, Any], timeout: float) -> None: + payload = json.dumps(body).encode("utf-8") + request = urllib.request.Request( + webhook_url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=timeout) as response: + status = response.getcode() + if status < 200 or status >= 300: + raise RuntimeError(f"Webhook request failed with status {status}") + + +async def _send_completion_webhook_request(webhook_url: str, body: dict[str, Any], timeout: float) -> None: + await asyncio.to_thread(_post_completion_webhook_request, webhook_url, body, timeout) + + +async def dispatch_completion_webhook( + webhook_url: str, + summary: dict[str, Any], + *, + attempts: int = WEBHOOK_ATTEMPTS, + backoff_seconds: tuple[float, ...] = WEBHOOK_BACKOFF_SECONDS, + timeout_seconds: float = WEBHOOK_TIMEOUT_SECONDS, +) -> None: + payload = build_completion_webhook_payload(summary) + last_error: Exception | None = None + for attempt in range(1, attempts + 1): + try: + await _send_completion_webhook_request( + webhook_url, + payload, + timeout_seconds, + ) + return + except Exception as e: + last_error = e + if attempt < attempts: + backoff = backoff_seconds[min(attempt - 1, len(backoff_seconds) - 1)] + await asyncio.sleep(backoff) + + if last_error: + raise last_error + class _ParameterOverrideHooks: def __init__(self, hooks: EvalHooks[Any], parameters: ValidatedParameters): @@ -177,6 +254,7 @@ async def run_eval(request: Request) -> JSONResponse | StreamingResponse: # Check if streaming is requested stream = eval_data.get("stream", False) + on_complete_webhook = eval_data.get("on_complete_webhook") # Set up SSE headers for streaming sse_queue = SSEQueue() @@ -210,6 +288,20 @@ def stream_fn(event: SSEProgressEvent): # Use create_task to schedule the async write without blocking asyncio.create_task(sse_queue.put_event("progress", event)) + async def on_complete_fn(summary: ExperimentSummary): + if not on_complete_webhook: + return + try: + await dispatch_completion_webhook( + on_complete_webhook, + format_summary(summary), + ) + except Exception as e: + print( + f"Failed to deliver completion webhook to {on_complete_webhook}: {e}", + file=sys.stderr, + ) + parent = eval_data.get("parent") if parent: parent = parse_parent(parent) @@ -234,6 +326,7 @@ def stream_fn(event: SSEProgressEvent): ], "stream": stream_fn, "on_start": on_start_fn, + "on_complete": on_complete_fn, "data": dataset, "task": task, "experiment_name": eval_data.get("experiment_name"), diff --git a/py/src/braintrust/devserver/test_completion_webhook.py b/py/src/braintrust/devserver/test_completion_webhook.py new file mode 100644 index 00000000..c98ecbb2 --- /dev/null +++ b/py/src/braintrust/devserver/test_completion_webhook.py @@ -0,0 +1,145 @@ +import asyncio +import json + +import pytest +from braintrust.test_helpers import has_devserver_installed + + +def _parse_sse_events(response_text: str) -> list[dict[str, object]]: + events = [] + lines = response_text.strip().split("\n") + i = 0 + while i < len(lines): + if lines[i].startswith("event: "): + event_type = lines[i][7:].strip() + i += 1 + if i < len(lines) and lines[i].startswith("data: "): + raw_data = lines[i][6:].strip() + try: + data = json.loads(raw_data) if raw_data else None + except json.JSONDecodeError: + data = raw_data + events.append({"event": event_type, "data": data}) + i += 1 + else: + events.append({"event": event_type, "data": None}) + else: + i += 1 + return events + + +def test_dispatch_completion_webhook_retries(monkeypatch): + from braintrust.devserver import server as devserver_module + + attempts = [] + sleep_calls = [] + + async def fake_send(webhook_url, body, timeout): + attempts.append((webhook_url, body, timeout)) + if len(attempts) < 3: + raise RuntimeError("transient") + + async def fake_sleep(seconds): + sleep_calls.append(seconds) + + monkeypatch.setattr(devserver_module, "_send_completion_webhook_request", fake_send) + monkeypatch.setattr(devserver_module.asyncio, "sleep", fake_sleep) + + asyncio.run( + devserver_module.dispatch_completion_webhook( + "https://example.com/webhook", + {"projectName": "my-project", "experimentName": "my-exp"}, + attempts=3, + backoff_seconds=(1.0, 2.0, 4.0), + timeout_seconds=10.0, + ) + ) + + assert len(attempts) == 3 + assert sleep_calls == [1.0, 2.0] + + +def test_parse_eval_body_accepts_on_complete_webhook(): + from braintrust.devserver.schemas import parse_eval_body + + parsed = parse_eval_body( + { + "name": "my-eval", + "on_complete_webhook": "https://example.com/webhook", + } + ) + + assert parsed["on_complete_webhook"] == "https://example.com/webhook" + + +@pytest.mark.skipif(not has_devserver_installed(), reason="Devserver dependencies not installed (requires .[cli])") +def test_eval_webhook_failure_non_fatal_for_stream(monkeypatch): + from braintrust import Evaluator + from braintrust.devserver import server as devserver_module + from braintrust.devserver.server import create_app + from braintrust.logger import BraintrustState + from starlette.testclient import TestClient + + evaluator = Evaluator( + project_name="test-project", + eval_name="test-eval", + data=lambda: [{"input": "x", "expected": "x"}], + task=lambda input_value, _hooks: input_value, + scores=[], + experiment_name=None, + metadata=None, + ) + + async def fake_cached_login(**_kwargs): + return BraintrustState() + + class FakeSummary: + def as_dict(self): + return { + "project_name": "test-project", + "experiment_name": "test-eval", + "scores": {}, + } + + class FakeResult: + summary = FakeSummary() + + dispatch_calls = [] + + async def fake_dispatch(webhook_url, summary, **_kwargs): + dispatch_calls.append((webhook_url, summary)) + raise RuntimeError("webhook delivery failed") + + async def fake_eval_async(*, on_complete, **_kwargs): + await on_complete(FakeSummary()) + return FakeResult() + + monkeypatch.setattr(devserver_module, "cached_login", fake_cached_login) + monkeypatch.setattr(devserver_module, "dispatch_completion_webhook", fake_dispatch) + monkeypatch.setattr(devserver_module, "EvalAsync", fake_eval_async) + + response = TestClient(create_app([evaluator])).post( + "/eval", + headers={ + "x-bt-auth-token": "test-api-key", + "x-bt-org-name": "test-org", + "Content-Type": "application/json", + "Accept": "text/event-stream", + }, + json={ + "name": "test-eval", + "stream": True, + "on_complete_webhook": "https://example.com/webhook", + "data": [{"input": "x", "expected": "x"}], + }, + ) + + assert response.status_code == 200 + events = _parse_sse_events(response.text) + event_types = [e["event"] for e in events] + + assert "summary" in event_types + assert "done" in event_types + assert len(dispatch_calls) == 1 + assert dispatch_calls[0][0] == "https://example.com/webhook" + assert dispatch_calls[0][1]["experimentName"] == "test-eval" diff --git a/py/src/braintrust/framework.py b/py/src/braintrust/framework.py index 2eeb00de..a47b186f 100644 --- a/py/src/braintrust/framework.py +++ b/py/src/braintrust/framework.py @@ -685,6 +685,7 @@ def _EvalCommon( error_score_handler: ErrorScoreHandler | None = None, parameters: EvalParameters | RemoteEvalParameters | None = None, on_start: Callable[[ExperimentSummary], None] | None = None, + on_complete: Callable[[ExperimentSummary], Any] | None = None, stream: Callable[[SSEProgressEvent], None] | None = None, parent: str | None = None, state: BraintrustState | None = None, @@ -783,15 +784,21 @@ async def make_empty_summary(): async def run_to_completion(): with parent_context(parent, state): + ret = None try: ret = await run_evaluator(experiment, evaluator, 0, [], stream, state, enable_cache) reporter.report_eval(evaluator, ret, verbose=True, jsonl=False) - return ret finally: if experiment: experiment.flush() elif state is not None: state.flush() + if ret is None: + raise ValueError("Evaluation did not produce a result") + if on_complete: + event_loop = asyncio.get_event_loop() + await call_user_fn(event_loop, on_complete, summary=ret.summary) + return ret return run_to_completion @@ -821,6 +828,7 @@ async def EvalAsync( no_send_logs: bool = False, parameters: EvalParameters | RemoteEvalParameters | None = None, on_start: Callable[[ExperimentSummary], None] | None = None, + on_complete: Callable[[ExperimentSummary], Any] | None = None, stream: Callable[[SSEProgressEvent], None] | None = None, parent: str | None = None, state: BraintrustState | None = None, @@ -878,6 +886,8 @@ async def EvalAsync( :param parameters: A set of parameters that will be passed to the evaluator. :param on_start: An optional callback that will be called when the evaluation starts. It receives the `ExperimentSummary` object, which can be used to display metadata about the experiment. + :param on_complete: An optional callback that will be called after the evaluation finishes and flushes. + It receives the final `ExperimentSummary` object. :param stream: A function that will be called with progress events, which can be used to display intermediate progress. :param parent: If specified, instead of creating a new experiment object, the Eval() will populate @@ -911,6 +921,7 @@ async def EvalAsync( no_send_logs=no_send_logs, parameters=parameters, on_start=on_start, + on_complete=on_complete, stream=stream, parent=parent, state=state, @@ -948,6 +959,7 @@ def Eval( no_send_logs: bool = False, parameters: EvalParameters | RemoteEvalParameters | None = None, on_start: Callable[[ExperimentSummary], None] | None = None, + on_complete: Callable[[ExperimentSummary], Any] | None = None, stream: Callable[[SSEProgressEvent], None] | None = None, parent: str | None = None, state: BraintrustState | None = None, @@ -1005,6 +1017,8 @@ def Eval( :param parameters: A set of parameters that will be passed to the evaluator. :param on_start: An optional callback that will be called when the evaluation starts. It receives the `ExperimentSummary` object, which can be used to display metadata about the experiment. + :param on_complete: An optional callback that will be called after the evaluation finishes and flushes. + It receives the final `ExperimentSummary` object. :param stream: A function that will be called with progress events, which can be used to display intermediate progress. :param parent: If specified, instead of creating a new experiment object, the Eval() will populate @@ -1040,6 +1054,7 @@ def Eval( no_send_logs=no_send_logs, parameters=parameters, on_start=on_start, + on_complete=on_complete, stream=stream, parent=parent, state=state, diff --git a/py/src/braintrust/test_framework.py b/py/src/braintrust/test_framework.py index 7d33eda0..a7912afc 100644 --- a/py/src/braintrust/test_framework.py +++ b/py/src/braintrust/test_framework.py @@ -388,6 +388,66 @@ def exact_match(input, output, expected): assert len(logs) == 0 +def test_eval_on_complete_called_once(simple_scorer): + callback_summaries = [] + + def on_complete(summary): + callback_summaries.append(summary) + + result = Eval( + "test-on-complete", + data=[{"input": "hello", "expected": "hello world"}], + task=lambda input_val: input_val + " world", + scores=[simple_scorer], + no_send_logs=True, + on_complete=on_complete, + ) + + assert len(callback_summaries) == 1 + assert callback_summaries[0] == result.summary + + +def test_eval_on_complete_runs_after_flush(simple_scorer, monkeypatch): + state = BraintrustState() + order = [] + + def fake_flush(): + order.append("flush") + + monkeypatch.setattr(state, "flush", fake_flush) + + def on_complete(_summary): + order.append("on_complete") + + Eval( + "test-on-complete-order", + data=[{"input": "hello", "expected": "hello world"}], + task=lambda input_val: input_val + " world", + scores=[simple_scorer], + no_send_logs=True, + state=state, + on_complete=on_complete, + ) + + flush_positions = [i for i, value in enumerate(order) if value == "flush"] + assert flush_positions + assert "on_complete" in order + assert order.index("on_complete") > max(flush_positions) + + +def test_eval_unchanged_when_on_complete_omitted(simple_scorer): + result = Eval( + "test-without-on-complete", + data=[{"input": "hello", "expected": "hello world"}], + task=lambda input_val: input_val + " world", + scores=[simple_scorer], + no_send_logs=True, + ) + + assert len(result.results) == 1 + assert result.summary.project_name == "test-without-on-complete" + + @pytest.mark.asyncio async def test_eval_no_send_logs_with_none_score(with_memory_logger): """Test that scorers returning None don't crash local mode."""