Skip to content

Commit 22262ed

Browse files
committed
WIP preventing concurrent waits, throw an error
1 parent 719ae83 commit 22262ed

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

packages/core/src/v3/runtime/index.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@ import { usage } from "../usage-api.js";
1212

1313
const NOOP_RUNTIME_MANAGER = new NoopRuntimeManager();
1414

15+
const concurrentWaitErrorMessage =
16+
"Parallel waits are not supported, e.g. using Promise.all() around our wait functions.";
17+
18+
/**
19+
* All state must be inside the RuntimeManager, do NOT store it on this class.
20+
* This is because of the "dual package hazard", this can be bundled multiple times.
21+
*/
1522
export class RuntimeAPI {
1623
private static _instance?: RuntimeAPI;
24+
private isExecutingWait = false;
1725

1826
private constructor() {}
1927

@@ -26,23 +34,31 @@ export class RuntimeAPI {
2634
}
2735

2836
public waitForDuration(ms: number): Promise<void> {
29-
return usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms));
37+
return this.#preventConcurrentWaits(() =>
38+
usage.pauseAsync(() => this.#getRuntimeManager().waitForDuration(ms))
39+
);
3040
}
3141

3242
public waitUntil(date: Date): Promise<void> {
33-
return usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date));
43+
return this.#preventConcurrentWaits(() =>
44+
usage.pauseAsync(() => this.#getRuntimeManager().waitUntil(date))
45+
);
3446
}
3547

3648
public waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
37-
return usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params));
49+
return this.#preventConcurrentWaits(() =>
50+
usage.pauseAsync(() => this.#getRuntimeManager().waitForTask(params))
51+
);
3852
}
3953

4054
public waitForBatch(params: {
4155
id: string;
4256
runs: string[];
4357
ctx: TaskRunContext;
4458
}): Promise<BatchTaskRunExecutionResult> {
45-
return usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params));
59+
return this.#preventConcurrentWaits(() =>
60+
usage.pauseAsync(() => this.#getRuntimeManager().waitForBatch(params))
61+
);
4662
}
4763

4864
public setGlobalRuntimeManager(runtimeManager: RuntimeManager): boolean {
@@ -57,4 +73,19 @@ export class RuntimeAPI {
5773
#getRuntimeManager(): RuntimeManager {
5874
return getGlobal(API_NAME) ?? NOOP_RUNTIME_MANAGER;
5975
}
76+
77+
async #preventConcurrentWaits<T>(cb: () => Promise<T>): Promise<T> {
78+
if (this.isExecutingWait) {
79+
console.error(concurrentWaitErrorMessage);
80+
throw new Error(concurrentWaitErrorMessage);
81+
}
82+
83+
this.isExecutingWait = true;
84+
85+
try {
86+
return await cb();
87+
} finally {
88+
this.isExecutingWait = false;
89+
}
90+
}
6091
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { logger, task, wait } from "@trigger.dev/sdk/v3";
2+
import { childTask } from "./example.js";
3+
4+
/*
5+
* These aren't currently supported, and so should throw clear errors
6+
*/
7+
export const parallelWaits = task({
8+
id: "parallel-waits",
9+
run: async (payload: any, { ctx }) => {
10+
//parallel wait for 5/10 seconds
11+
await Promise.all([
12+
wait.for({ seconds: 5 }),
13+
wait.until({ date: new Date(Date.now() + 10_000) }),
14+
]);
15+
16+
//parallel task call
17+
await Promise.all([
18+
childTask.triggerAndWait({ message: "Hello, world!" }),
19+
childTask.batchTriggerAndWait([{ payload: { message: "Hello, world!" } }]),
20+
]);
21+
},
22+
});

0 commit comments

Comments
 (0)