Skip to content

Commit 4ce001e

Browse files
committed
Added preventMultipleWaits to the devRuntimeManager
1 parent e6d744c commit 4ce001e

File tree

1 file changed

+40
-33
lines changed

1 file changed

+40
-33
lines changed

packages/core/src/v3/runtime/devRuntimeManager.ts

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
} from "../schemas/index.js";
77
import { unboundedTimeout } from "../utils/timers.js";
88
import { RuntimeManager } from "./manager.js";
9+
import { preventMultipleWaits } from "./preventMultipleWaits.js";
910

1011
export class DevRuntimeManager implements RuntimeManager {
1112
_taskWaits: Map<string, { resolve: (value: TaskRunExecutionResult) => void }> = new Map();
@@ -17,71 +18,77 @@ export class DevRuntimeManager implements RuntimeManager {
1718

1819
_pendingCompletionNotifications: Map<string, TaskRunExecutionResult> = new Map();
1920

21+
_preventMultipleWaits = preventMultipleWaits();
22+
2023
disable(): void {
2124
// do nothing
2225
}
2326

2427
async waitForDuration(ms: number): Promise<void> {
25-
await unboundedTimeout(ms);
28+
await this._preventMultipleWaits(() => unboundedTimeout(ms));
2629
}
2730

2831
async waitUntil(date: Date): Promise<void> {
2932
return this.waitForDuration(date.getTime() - Date.now());
3033
}
3134

3235
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
33-
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);
36+
return this._preventMultipleWaits(async () => {
37+
const pendingCompletion = this._pendingCompletionNotifications.get(params.id);
3438

35-
if (pendingCompletion) {
36-
this._pendingCompletionNotifications.delete(params.id);
39+
if (pendingCompletion) {
40+
this._pendingCompletionNotifications.delete(params.id);
3741

38-
return pendingCompletion;
39-
}
42+
return pendingCompletion;
43+
}
4044

41-
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
42-
this._taskWaits.set(params.id, { resolve });
43-
});
45+
const promise = new Promise<TaskRunExecutionResult>((resolve) => {
46+
this._taskWaits.set(params.id, { resolve });
47+
});
4448

45-
await this.#tryFlushMetadata();
49+
await this.#tryFlushMetadata();
4650

47-
return await promise;
51+
return await promise;
52+
});
4853
}
4954

5055
async waitForBatch(params: {
5156
id: string;
5257
runs: string[];
5358
ctx: TaskRunContext;
5459
}): Promise<BatchTaskRunExecutionResult> {
55-
if (!params.runs.length) {
56-
return Promise.resolve({ id: params.id, items: [] });
57-
}
60+
return this._preventMultipleWaits(async () => {
61+
if (!params.runs.length) {
62+
return Promise.resolve({ id: params.id, items: [] });
63+
}
5864

59-
const promise = Promise.all(
60-
params.runs.map((runId) => {
61-
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
62-
const pendingCompletion = this._pendingCompletionNotifications.get(runId);
65+
const promise = Promise.all(
66+
params.runs.map((runId) => {
67+
return new Promise<TaskRunExecutionResult>((resolve, reject) => {
68+
const pendingCompletion = this._pendingCompletionNotifications.get(runId);
6369

64-
if (pendingCompletion) {
65-
this._pendingCompletionNotifications.delete(runId);
70+
if (pendingCompletion) {
71+
this._pendingCompletionNotifications.delete(runId);
6672

67-
resolve(pendingCompletion);
73+
resolve(pendingCompletion);
6874

69-
return;
70-
}
75+
return;
76+
}
7177

72-
this._taskWaits.set(runId, { resolve });
73-
});
74-
})
75-
);
78+
this._taskWaits.set(runId, { resolve });
79+
});
80+
})
81+
);
7682

77-
await this.#tryFlushMetadata();
83+
await this.#tryFlushMetadata();
7884

79-
const results = await promise;
85+
const results = await promise;
8086

81-
return {
82-
id: params.id,
83-
items: results,
84-
};
87+
return {
88+
id: params.id,
89+
items: results,
90+
};
91+
});
8592
}
8693

8794
resumeTask(completion: TaskRunExecutionResult, runId: string): void {

0 commit comments

Comments
 (0)