diff --git a/js/dev/server.test.ts b/js/dev/server.test.ts new file mode 100644 index 000000000..89ead9ef0 --- /dev/null +++ b/js/dev/server.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, test, vi } from "vitest"; +import { + buildCompletionWebhookPayload, + dispatchCompletionWebhook, +} from "./server"; + +const summary = { + projectName: "completion-webhook-test", + experimentName: "completion-webhook-test-exp", + scores: { + exact_match: { + name: "exact_match", + score: 1, + }, + }, +}; + +describe("completion webhook delivery", () => { + test("builds expected payload shape", async () => { + const payload = buildCompletionWebhookPayload(summary); + + expect(payload.event).toBe("experiment.completed"); + expect(payload.summary.projectName).toBe("completion-webhook-test"); + expect(payload.experiment.projectName).toBe("completion-webhook-test"); + expect(payload.timestamp).toMatch(/T/); + }); + + test("retries transient failures and succeeds", async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const fetchImpl = vi + .fn() + .mockRejectedValueOnce(new Error("network")) + .mockResolvedValueOnce(new Response("bad", { status: 500 })) + .mockResolvedValueOnce(new Response("ok", { status: 200 })); + + await dispatchCompletionWebhook("https://example.com/webhook", summary, { + fetchImpl, + sleep, + timeoutMs: 5, + attempts: 3, + backoffMs: [1, 2, 4], + }); + + expect(fetchImpl).toHaveBeenCalledTimes(3); + expect(sleep).toHaveBeenCalledTimes(2); + }); + + test("throws after final failure", async () => { + const sleep = vi.fn(async (_ms: number) => {}); + const fetchImpl = vi + .fn() + .mockResolvedValue(new Response("bad", { status: 500 })); + + await expect( + dispatchCompletionWebhook("https://example.com/webhook", summary, { + fetchImpl, + sleep, + timeoutMs: 5, + attempts: 3, + backoffMs: [1, 2, 4], + }), + ).rejects.toThrow("status 500"); + + expect(fetchImpl).toHaveBeenCalledTimes(3); + expect(sleep).toHaveBeenCalledTimes(2); + }); +}); diff --git a/js/dev/server.ts b/js/dev/server.ts index aee357ad5..700cbd233 100644 --- a/js/dev/server.ts +++ b/js/dev/server.ts @@ -53,6 +53,141 @@ export interface DevServerOpts { orgName?: string; } +const WEBHOOK_ATTEMPTS = 3; +const WEBHOOK_BACKOFF_MS = [1000, 2000, 4000]; +const WEBHOOK_TIMEOUT_MS = 10000; + +type CompletionWebhookPayload = { + event: "experiment.completed"; + summary: Record; + experiment: { + projectId?: string; + projectName?: string; + projectUrl?: string; + experimentId?: string; + experimentName?: string; + experimentUrl?: string; + }; + timestamp: string; +}; + +type SerializableSummary = ExperimentSummary | Record; + +type DispatchCompletionWebhookOptions = { + attempts?: number; + backoffMs?: number[]; + timeoutMs?: number; + fetchImpl?: typeof fetch; + sleep?: (ms: number) => Promise; +}; + +function pickString(record: Record, keys: string[]) { + for (const key of keys) { + const value = record[key]; + if (typeof value === "string" && value.length > 0) { + return value; + } + } + return undefined; +} + +function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function serializeSummary(summary: SerializableSummary): Record { + const serializedSummary = JSON.parse(JSON.stringify(summary)); + if (isRecord(serializedSummary)) { + return serializedSummary; + } + return {}; +} + +export function buildCompletionWebhookPayload(summary: SerializableSummary) { + const summaryObject = serializeSummary(summary); + return { + event: "experiment.completed", + summary: summaryObject, + experiment: { + projectId: pickString(summaryObject, ["projectId", "project_id"]), + projectName: pickString(summaryObject, ["projectName", "project_name"]), + projectUrl: pickString(summaryObject, ["projectUrl", "project_url"]), + experimentId: pickString(summaryObject, [ + "experimentId", + "experiment_id", + ]), + experimentName: pickString(summaryObject, [ + "experimentName", + "experiment_name", + ]), + experimentUrl: pickString(summaryObject, [ + "experimentUrl", + "experiment_url", + ]), + }, + timestamp: new Date().toISOString(), + } satisfies CompletionWebhookPayload; +} + +export async function dispatchCompletionWebhook( + webhookUrl: string, + summary: SerializableSummary, + options: DispatchCompletionWebhookOptions = {}, +) { + const attempts = options.attempts ?? WEBHOOK_ATTEMPTS; + const backoffMs = options.backoffMs ?? WEBHOOK_BACKOFF_MS; + const timeoutMs = options.timeoutMs ?? WEBHOOK_TIMEOUT_MS; + const fetchImpl = options.fetchImpl ?? fetch; + const sleepFn = options.sleep ?? sleep; + const payload = buildCompletionWebhookPayload(summary); + let lastError: Error | null = null; + + for (let attempt = 1; attempt <= attempts; attempt++) { + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + try { + const response = await fetchImpl(webhookUrl, { + method: "POST", + headers: { + "content-type": "application/json", + }, + body: JSON.stringify(payload), + signal: controller.signal, + }); + + if (response.ok) { + return; + } + + lastError = new Error( + `Webhook request failed with status ${response.status}`, + ); + } catch (error) { + lastError = + error instanceof Error ? error : new Error(`Webhook failed: ${error}`); + } finally { + clearTimeout(timeoutId); + } + + if (attempt < attempts) { + const delayMs = backoffMs[Math.min(attempt - 1, backoffMs.length - 1)]; + if (delayMs > 0) { + await sleepFn(delayMs); + } + } + } + + if (lastError) { + throw lastError; + } +} + export function runDevServer( // eslint-disable-next-line @typescript-eslint/no-explicit-any evaluators: EvaluatorDef[], @@ -137,6 +272,7 @@ export function runDevServer( parent, experiment_name, project_id, + on_complete_webhook, data, scores, stream, @@ -253,6 +389,22 @@ export function runDevServer( ); } }, + onComplete: async (completionSummary) => { + if (!on_complete_webhook) { + return; + } + try { + await dispatchCompletionWebhook( + on_complete_webhook, + completionSummary, + ); + } catch (error) { + console.error( + `Failed to deliver completion webhook to ${on_complete_webhook}`, + error, + ); + } + }, parent: parseParent(parent), parameters: parameters ?? {}, }, diff --git a/js/dev/types.ts b/js/dev/types.ts index 7c2524352..cf6c8ccd0 100644 --- a/js/dev/types.ts +++ b/js/dev/types.ts @@ -22,6 +22,7 @@ export const evalBodySchema = z.object({ .nullish(), experiment_name: z.string().nullish(), project_id: z.string().nullish(), + on_complete_webhook: z.string().nullish(), parent: invokeParentSchema.optional(), stream: z.boolean().optional(), }); diff --git a/js/src/framework.test.ts b/js/src/framework.test.ts index 6379905f1..facf4b6c1 100644 --- a/js/src/framework.test.ts +++ b/js/src/framework.test.ts @@ -676,6 +676,87 @@ test("Eval with returnResults: true collects all results", async () => { expect(result.summary.scores.exact_match.score).toBe(1); }); +test("Eval onComplete is called exactly once on success", async () => { + const onComplete = vi.fn(); + const result = await Eval( + "test-on-complete", + { + data: [{ input: "hello", expected: "hello world" }], + task: (input) => input + " world", + scores: [ + (args) => ({ + name: "exact_match", + score: args.output === args.expected ? 1 : 0, + }), + ], + }, + { noSendLogs: true, onComplete }, + ); + + expect(onComplete).toHaveBeenCalledTimes(1); + expect(onComplete).toHaveBeenCalledWith(result.summary); +}); + +test("Eval onComplete runs after flush", async () => { + await _exportsForTestingOnly.simulateLoginForTests(); + + _exportsForTestingOnly.useTestBackgroundLogger(); + + const evaluatorState = new BraintrustState({ + apiKey: "test-api-key", + appUrl: "https://example.com", + }); + const evaluatorMemoryLogger = new TestBackgroundLogger(); + evaluatorState.setOverrideBgLogger(evaluatorMemoryLogger); + + const logger = initLogger({ projectName: "test", projectId: "pid" }); + const span = logger.startSpan({ name: "parent-span" }); + const parentStr = await span.export(); + span.end(); + + const evaluatorFlushSpy = vi.spyOn(evaluatorMemoryLogger, "flush"); + const onComplete = vi.fn(() => { + expect(evaluatorFlushSpy).toHaveBeenCalled(); + }); + + await Eval( + "test-parent-flush-on-complete", + { + data: [{ input: 1, expected: 2 }], + task: (input) => input * 2, + scores: [], + state: evaluatorState, + }, + { parent: parentStr, onComplete }, + ); + + expect(onComplete).toHaveBeenCalledTimes(1); + + _exportsForTestingOnly.clearTestBackgroundLogger(); + _exportsForTestingOnly.simulateLogoutForTests(); +}); + +test("Eval behavior is unchanged when onComplete is omitted", async () => { + const result = await Eval( + "test-no-on-complete", + { + data: [{ input: "foo", expected: "foo bar" }], + task: (input) => input + " bar", + scores: [ + (args) => ({ + name: "exact_match", + score: args.output === args.expected ? 1 : 0, + }), + ], + }, + { noSendLogs: true }, + ); + + expect(result.results).toHaveLength(1); + expect(result.summary.projectName).toBe("test-no-on-complete"); + expect(result.summary.scores.exact_match.score).toBe(1); +}); + test("tags can be appended and logged to root span", async () => { await _exportsForTestingOnly.simulateLoginForTests(); const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger(); diff --git a/js/src/framework.ts b/js/src/framework.ts index cfdd62bde..676d7869d 100644 --- a/js/src/framework.ts +++ b/js/src/framework.ts @@ -546,6 +546,11 @@ export interface EvalOptions { * @param metadata */ onStart?: (metadata: Omit) => void; + /** + * A callback function that is called once after the eval has completed and spans are flushed. + * @param summary + */ + onComplete?: (summary: ExperimentSummary) => void | Promise; /** * A function that will be called with progress events, which can be used to * display intermediate progress. @@ -683,6 +688,8 @@ export async function Eval< } const resolvedReporter = options.reporter || defaultReporter; + let result: EvalResultWithSummary | null = + null; try { const { data, baseExperiment: defaultBaseExperiment } = callEvaluatorData( evaluator.data, @@ -770,7 +777,7 @@ export async function Eval< verbose: true, jsonl: false, }); - return ret; + result = ret; } finally { if (experiment) { await experiment.flush().catch(console.error); @@ -778,6 +785,14 @@ export async function Eval< await flush({ state: evaluator.state }).catch(console.error); } } + + if (!result) { + throw new Error("Eval completed without a result"); + } + if (options.onComplete) { + await options.onComplete(result.summary); + } + return result; } finally { progressReporter.stop(); }