Skip to content

Commit f6f2b77

Browse files
committed
Add result type and .unwrap() helper to inputStream.once().
1 parent 6b8180b commit f6f2b77

File tree

7 files changed

+146
-52
lines changed

7 files changed

+146
-52
lines changed

packages/core/src/v3/inputStreams/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { getGlobal, registerGlobal } from "../utils/globals.js";
22
import { NoopInputStreamManager } from "./noopManager.js";
3-
import { InputStreamManager } from "./types.js";
3+
import { InputStreamManager, InputStreamOncePromise } from "./types.js";
44
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
55

66
const API_NAME = "input-streams";
@@ -39,7 +39,7 @@ export class InputStreamsAPI implements InputStreamManager {
3939
return this.#getManager().on(streamId, handler);
4040
}
4141

42-
public once(streamId: string, options?: InputStreamOnceOptions): Promise<unknown> {
42+
public once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise<unknown> {
4343
return this.#getManager().once(streamId, options);
4444
}
4545

packages/core/src/v3/inputStreams/manager.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
import { ApiClient } from "../apiClient/index.js";
2-
import { InputStreamManager } from "./types.js";
2+
import {
3+
InputStreamManager,
4+
InputStreamOncePromise,
5+
InputStreamOnceResult,
6+
InputStreamTimeoutError,
7+
} from "./types.js";
38
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
49

510
type InputStreamHandler = (data: unknown) => void | Promise<void>;
611

712
type OnceWaiter = {
8-
resolve: (data: unknown) => void;
13+
resolve: (result: InputStreamOnceResult<unknown>) => void;
914
reject: (error: Error) => void;
1015
timeoutHandle?: ReturnType<typeof setTimeout>;
1116
};
@@ -72,7 +77,7 @@ export class StandardInputStreamManager implements InputStreamManager {
7277
};
7378
}
7479

75-
once(streamId: string, options?: InputStreamOnceOptions): Promise<unknown> {
80+
once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise<unknown> {
7681
this.#requireV2Streams();
7782

7883
// Lazily connect a tail for this stream
@@ -85,10 +90,12 @@ export class StandardInputStreamManager implements InputStreamManager {
8590
if (buffered.length === 0) {
8691
this.buffer.delete(streamId);
8792
}
88-
return Promise.resolve(data);
93+
return new InputStreamOncePromise((resolve) => {
94+
resolve({ ok: true, output: data });
95+
});
8996
}
9097

91-
return new Promise<unknown>((resolve, reject) => {
98+
return new InputStreamOncePromise<unknown>((resolve, reject) => {
9299
const waiter: OnceWaiter = { resolve, reject };
93100

94101
// Handle abort signal
@@ -110,11 +117,14 @@ export class StandardInputStreamManager implements InputStreamManager {
110117
);
111118
}
112119

113-
// Handle timeout
120+
// Handle timeout — resolve with error result instead of rejecting
114121
if (options?.timeoutMs) {
115122
waiter.timeoutHandle = setTimeout(() => {
116123
this.#removeOnceWaiter(streamId, waiter);
117-
reject(new Error(`Timeout waiting for input stream "${streamId}" after ${options.timeoutMs}ms`));
124+
resolve({
125+
ok: false,
126+
error: new InputStreamTimeoutError(streamId, options.timeoutMs!),
127+
});
118128
}, options.timeoutMs);
119129
}
120130

@@ -256,7 +266,7 @@ export class StandardInputStreamManager implements InputStreamManager {
256266
if (waiter.timeoutHandle) {
257267
clearTimeout(waiter.timeoutHandle);
258268
}
259-
waiter.resolve(data);
269+
waiter.resolve({ ok: true, output: data });
260270
// Also invoke persistent handlers
261271
this.#invokeHandlers(streamId, data);
262272
return;

packages/core/src/v3/inputStreams/noopManager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { InputStreamManager } from "./types.js";
1+
import { InputStreamManager, InputStreamOncePromise } from "./types.js";
22
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
33

44
export class NoopInputStreamManager implements InputStreamManager {
@@ -8,8 +8,8 @@ export class NoopInputStreamManager implements InputStreamManager {
88
return { off: () => {} };
99
}
1010

11-
once(_streamId: string, _options?: InputStreamOnceOptions): Promise<unknown> {
12-
return new Promise(() => {
11+
once(_streamId: string, _options?: InputStreamOnceOptions): InputStreamOncePromise<unknown> {
12+
return new InputStreamOncePromise(() => {
1313
// Never resolves in noop mode
1414
});
1515
}

packages/core/src/v3/inputStreams/types.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,42 @@
11
import { InputStreamOnceOptions } from "../realtimeStreams/types.js";
22

3+
export class InputStreamTimeoutError extends Error {
4+
constructor(
5+
public readonly streamId: string,
6+
public readonly timeoutMs: number
7+
) {
8+
super(`Timeout waiting for input stream "${streamId}" after ${timeoutMs}ms`);
9+
this.name = "InputStreamTimeoutError";
10+
}
11+
}
12+
13+
export type InputStreamOnceResult<TData> =
14+
| { ok: true; output: TData }
15+
| { ok: false; error: InputStreamTimeoutError };
16+
17+
export class InputStreamOncePromise<TData> extends Promise<InputStreamOnceResult<TData>> {
18+
constructor(
19+
executor: (
20+
resolve: (
21+
value: InputStreamOnceResult<TData> | PromiseLike<InputStreamOnceResult<TData>>
22+
) => void,
23+
reject: (reason?: any) => void
24+
) => void
25+
) {
26+
super(executor);
27+
}
28+
29+
unwrap(): Promise<TData> {
30+
return this.then((result) => {
31+
if (result.ok) {
32+
return result.output;
33+
} else {
34+
throw result.error;
35+
}
36+
});
37+
}
38+
}
39+
340
export interface InputStreamManager {
441
/**
542
* Set the current run ID and streams version. The tail connection will be
@@ -15,8 +52,10 @@ export interface InputStreamManager {
1552

1653
/**
1754
* Wait for the next piece of data on the given input stream.
55+
* Returns a result object `{ ok, output }` or `{ ok, error }`.
56+
* Chain `.unwrap()` to get the data directly or throw on timeout.
1857
*/
19-
once(streamId: string, options?: InputStreamOnceOptions): Promise<unknown>;
58+
once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise<unknown>;
2059

2160
/**
2261
* Non-blocking peek at the most recent data on the given input stream.

packages/core/src/v3/realtimeStreams/types.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import { AnyZodFetchOptions, ApiRequestOptions } from "../apiClient/core.js";
2+
import type { InputStreamOncePromise } from "../inputStreams/types.js";
3+
export { InputStreamOncePromise, InputStreamTimeoutError } from "../inputStreams/types.js";
4+
export type { InputStreamOnceResult } from "../inputStreams/types.js";
25
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
36
import { Prettify } from "../types/utils.js";
47
import type { ManualWaitpointPromise } from "../waitpoints/index.js";
@@ -162,9 +165,10 @@ export type RealtimeDefinedInputStream<TData> = {
162165
on: (handler: (data: TData) => void | Promise<void>) => InputStreamSubscription;
163166
/**
164167
* Wait for the next piece of data on this input stream.
165-
* Resolves with the data when it arrives.
168+
* Returns a result object `{ ok, output }` or `{ ok, error }`.
169+
* Chain `.unwrap()` to get the data directly or throw on timeout.
166170
*/
167-
once: (options?: InputStreamOnceOptions) => Promise<TData>;
171+
once: (options?: InputStreamOnceOptions) => InputStreamOncePromise<TData>;
168172
/**
169173
* Non-blocking peek at the most recent data received on this input stream.
170174
* Returns `undefined` if no data has been received yet.

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import {
2222
type RealtimeDefinedInputStream,
2323
type InputStreamSubscription,
2424
type InputStreamOnceOptions,
25+
InputStreamOncePromise,
26+
type InputStreamOnceResult,
2527
type InputStreamWaitOptions,
2628
type SendInputStreamOptions,
2729
type InferInputStreamType,
@@ -706,26 +708,33 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
706708
const ctx = taskContext.ctx;
707709
const runId = ctx?.run.id;
708710

709-
return tracer.startActiveSpan(
710-
`inputStream.once()`,
711-
async () => {
712-
return inputStreams.once(opts.id, options) as Promise<TData>;
713-
},
714-
{
715-
attributes: {
716-
[SemanticInternalAttributes.STYLE_ICON]: "streams",
717-
[SemanticInternalAttributes.ENTITY_TYPE]: "input-stream",
718-
...(runId
719-
? { [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}` }
720-
: {}),
721-
streamId: opts.id,
722-
...accessoryAttributes({
723-
items: [{ text: opts.id, variant: "normal" }],
724-
style: "codepath",
725-
}),
726-
},
727-
}
728-
);
711+
const innerPromise = inputStreams.once(opts.id, options);
712+
713+
return new InputStreamOncePromise<TData>((resolve, reject) => {
714+
tracer
715+
.startActiveSpan(
716+
`inputStream.once()`,
717+
async () => {
718+
const result = await innerPromise;
719+
resolve(result as InputStreamOnceResult<TData>);
720+
},
721+
{
722+
attributes: {
723+
[SemanticInternalAttributes.STYLE_ICON]: "streams",
724+
[SemanticInternalAttributes.ENTITY_TYPE]: "input-stream",
725+
...(runId
726+
? { [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}` }
727+
: {}),
728+
streamId: opts.id,
729+
...accessoryAttributes({
730+
items: [{ text: opts.id, variant: "normal" }],
731+
style: "codepath",
732+
}),
733+
},
734+
}
735+
)
736+
.catch(reject);
737+
});
729738
},
730739
peek() {
731740
return inputStreams.peek(opts.id) as TData | undefined;

references/hello-world/src/trigger/inputStreams.ts

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,34 @@ const messageStream = streams.input<{ text: string }>({ id: "messages" });
1010
/**
1111
* Coordinator task that exercises all input stream patterns end-to-end.
1212
*
13-
* 1. .once() — trigger a child, send it data via SSE tail, poll until complete
14-
* 2. .on() — trigger a child, send it multiple messages, poll until complete
15-
* 3. .wait() — trigger a child, send it data (completes its waitpoint), poll until complete
16-
* 4. .wait() race — send data before child calls .wait(), verify race handling
13+
* 1a. .once().unwrap() — trigger a child, send data, verify unwrap returns TData
14+
* 1b. .once() result — trigger a child, send data, verify result object { ok, output }
15+
* 2. .on() — trigger a child, send it multiple messages, poll until complete
16+
* 3. .wait() — trigger a child, send it data (completes its waitpoint), poll until complete
17+
* 4. .wait() race — send data before child calls .wait(), verify race handling
1718
*/
1819
export const inputStreamCoordinator = task({
1920
id: "input-stream-coordinator",
2021
run: async () => {
2122
const results: Record<string, unknown> = {};
2223

23-
// --- Test 1: .once() ----
24-
logger.info("Test 1: .once()");
25-
const onceHandle = await inputStreamOnce.trigger({});
24+
// --- Test 1a: .once() with .unwrap() ---
25+
logger.info("Test 1a: .once().unwrap()");
26+
const onceUnwrapHandle = await inputStreamOnceUnwrap.trigger({});
2627
await wait.for({ seconds: 5 });
27-
await approvalStream.send(onceHandle.id, { approved: true, reviewer: "coordinator-once" });
28-
const onceRun = await runs.poll(onceHandle, { pollIntervalMs: 1000 });
29-
results.once = onceRun.output;
30-
logger.info("Test 1 passed", { output: onceRun.output });
28+
await approvalStream.send(onceUnwrapHandle.id, { approved: true, reviewer: "coordinator-unwrap" });
29+
const onceUnwrapRun = await runs.poll(onceUnwrapHandle, { pollIntervalMs: 1000 });
30+
results.onceUnwrap = onceUnwrapRun.output;
31+
logger.info("Test 1a passed", { output: onceUnwrapRun.output });
32+
33+
// --- Test 1b: .once() with result object ---
34+
logger.info("Test 1b: .once() result object");
35+
const onceResultHandle = await inputStreamOnceResult.trigger({});
36+
await wait.for({ seconds: 5 });
37+
await approvalStream.send(onceResultHandle.id, { approved: true, reviewer: "coordinator-result" });
38+
const onceResultRun = await runs.poll(onceResultHandle, { pollIntervalMs: 1000 });
39+
results.onceResult = onceResultRun.output;
40+
logger.info("Test 1b passed", { output: onceResultRun.output });
3141

3242
// --- Test 2: .on() with multiple messages ---
3343
logger.info("Test 2: .on()");
@@ -64,18 +74,40 @@ export const inputStreamCoordinator = task({
6474
});
6575

6676
/**
67-
* Uses .once() to wait for a single input stream message.
77+
* Uses .once().unwrap() — returns TData directly, throws InputStreamTimeoutError on timeout.
6878
*/
69-
export const inputStreamOnce = task({
70-
id: "input-stream-once",
79+
export const inputStreamOnceUnwrap = task({
80+
id: "input-stream-once-unwrap",
7181
run: async (_payload: Record<string, never>) => {
72-
logger.info("Waiting for approval via .once()");
73-
const approval = await approvalStream.once();
82+
logger.info("Waiting for approval via .once().unwrap()");
83+
84+
const approval = await approvalStream.once({ timeoutMs: 30_000 }).unwrap();
85+
7486
logger.info("Received approval", { approval });
7587
return { approval };
7688
},
7789
});
7890

91+
/**
92+
* Uses .once() with result object — check result.ok to handle timeout without try/catch.
93+
*/
94+
export const inputStreamOnceResult = task({
95+
id: "input-stream-once-result",
96+
run: async (_payload: Record<string, never>) => {
97+
logger.info("Waiting for approval via .once() result object");
98+
99+
const result = await approvalStream.once({ timeoutMs: 30_000 });
100+
101+
if (!result.ok) {
102+
logger.error("Timed out waiting for approval", { error: result.error.message });
103+
return { approval: null, timedOut: true };
104+
}
105+
106+
logger.info("Received approval", { approval: result.output });
107+
return { approval: result.output };
108+
},
109+
});
110+
79111
/**
80112
* Uses .on() to subscribe and collect multiple messages.
81113
*/

0 commit comments

Comments
 (0)