Skip to content

Commit 2d4fe85

Browse files
committed
Use the new schedule engine, create runs with future created ats, filter that out in the lists
1 parent 814319e commit 2d4fe85

File tree

14 files changed

+108
-75
lines changed

14 files changed

+108
-75
lines changed

apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ export class NextRunListPresenter {
190190
prisma: this.replica as PrismaClient,
191191
});
192192

193+
function clampToNow(date: Date): Date {
194+
const now = new Date();
195+
return date > now ? now : date;
196+
}
197+
193198
const { runs, pagination } = await runsRepository.listRuns({
194199
organizationId,
195200
environmentId,
@@ -200,8 +205,8 @@ export class NextRunListPresenter {
200205
tags,
201206
scheduleId,
202207
period: periodMs ?? undefined,
203-
from,
204-
to,
208+
from: time.from ? time.from.getTime() : undefined,
209+
to: time.to ? clampToNow(time.to).getTime() : undefined,
205210
isTest,
206211
rootOnly,
207212
batchId,

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ export class RunListPresenter extends BasePresenter {
179179

180180
const periodMs = time.period ? parse(time.period) : undefined;
181181

182+
function clampToNow(date: Date): Date {
183+
const now = new Date();
184+
185+
return date > now ? now : date;
186+
}
187+
182188
//get the runs
183189
const runs = await this._replica.$queryRaw<
184190
{
@@ -282,7 +288,9 @@ WHERE
282288
: Prisma.empty
283289
}
284290
${
285-
time.to ? Prisma.sql`AND tr."createdAt" <= ${time.to.toISOString()}::timestamp` : Prisma.empty
291+
time.to
292+
? Prisma.sql`AND tr."createdAt" <= ${clampToNow(time.to).toISOString()}::timestamp`
293+
: Prisma.sql`AND tr."createdAt" <= CURRENT_TIMESTAMP`
286294
}
287295
${
288296
tags && tags.length > 0

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ export class RunEngineTriggerTaskService {
308308
runChainState,
309309
scheduleId: options.scheduleId,
310310
scheduleInstanceId: options.scheduleInstanceId,
311+
createdAt: options.overrideCreatedAt,
311312
},
312313
this.prisma
313314
);

apps/webapp/app/services/runsRepository.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ export class RunsRepository {
9191

9292
if (options.to) {
9393
queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", { to: options.to });
94+
} else {
95+
queryBuilder.where("created_at <= fromUnixTimestamp64Milli({to: Int64})", {
96+
to: Date.now(),
97+
});
9498
}
9599

96100
if (typeof options.isTest === "boolean") {

apps/webapp/app/v3/scheduleEngine.server.ts

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -12,50 +12,6 @@ export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);
1212

1313
export type { ScheduleEngine };
1414

15-
const triggerScheduledTaskCallback: TriggerScheduledTaskCallback = async ({
16-
taskIdentifier,
17-
environment,
18-
payload,
19-
scheduleInstanceId,
20-
scheduleId,
21-
exactScheduleTime,
22-
}) => {
23-
try {
24-
// Use TriggerTaskServiceV1 for now (can be updated to use TriggerTaskService when ready)
25-
const triggerService = new TriggerTaskService();
26-
27-
const payloadPacket = await stringifyIO(payload);
28-
29-
logger.debug("Triggering scheduled task", {
30-
taskIdentifier,
31-
environment,
32-
payload,
33-
scheduleInstanceId,
34-
scheduleId,
35-
exactScheduleTime,
36-
});
37-
38-
const result = await triggerService.call(
39-
taskIdentifier,
40-
environment,
41-
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
42-
{
43-
customIcon: "scheduled",
44-
scheduleId,
45-
scheduleInstanceId,
46-
queueTimestamp: exactScheduleTime,
47-
}
48-
);
49-
50-
return { success: !!result };
51-
} catch (error) {
52-
return {
53-
success: false,
54-
error: error instanceof Error ? error.message : String(error),
55-
};
56-
}
57-
};
58-
5915
async function isDevEnvironmentConnectedHandler(environmentId: string) {
6016
const environment = await prisma.runtimeEnvironment.findFirst({
6117
where: {
@@ -114,7 +70,50 @@ function createScheduleEngine() {
11470
},
11571
tracer,
11672
meter,
117-
onTriggerScheduledTask: triggerScheduledTaskCallback,
73+
onTriggerScheduledTask: async ({
74+
taskIdentifier,
75+
environment,
76+
payload,
77+
scheduleInstanceId,
78+
scheduleId,
79+
exactScheduleTime,
80+
}) => {
81+
try {
82+
// Use TriggerTaskServiceV1 for now (can be updated to use TriggerTaskService when ready)
83+
const triggerService = new TriggerTaskService();
84+
85+
const payloadPacket = await stringifyIO(payload);
86+
87+
logger.debug("Triggering scheduled task", {
88+
taskIdentifier,
89+
environment,
90+
payload,
91+
scheduleInstanceId,
92+
scheduleId,
93+
exactScheduleTime,
94+
});
95+
96+
const result = await triggerService.call(
97+
taskIdentifier,
98+
environment,
99+
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
100+
{
101+
customIcon: "scheduled",
102+
scheduleId,
103+
scheduleInstanceId,
104+
queueTimestamp: exactScheduleTime,
105+
overrideCreatedAt: exactScheduleTime,
106+
}
107+
);
108+
109+
return { success: !!result };
110+
} catch (error) {
111+
return {
112+
success: false,
113+
error: error instanceof Error ? error.message : String(error),
114+
};
115+
}
116+
},
118117
isDevEnvironmentConnectedHandler: isDevEnvironmentConnectedHandler,
119118
});
120119

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ import { clampMaxDuration } from "../utils/maxDuration";
2323
import { BaseService, ServiceValidationError } from "./baseService.server";
2424
import { CheckScheduleService } from "./checkSchedule.server";
2525
import { projectPubSub } from "./projectPubSub.server";
26-
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
2726
import { tryCatch } from "@trigger.dev/core/v3";
2827
import { engine } from "../runEngine.server";
28+
import { scheduleEngine } from "../scheduleEngine.server";
2929

3030
export class CreateBackgroundWorkerService extends BaseService {
3131
public async call(
@@ -510,7 +510,6 @@ export async function syncDeclarativeSchedules(
510510
});
511511

512512
const checkSchedule = new CheckScheduleService(prisma);
513-
const registerNextService = new RegisterNextTaskScheduleInstanceService(prisma);
514513

515514
//start out by assuming they're all missing
516515
const missingSchedules = new Set<string>(
@@ -569,7 +568,7 @@ export async function syncDeclarativeSchedules(
569568
missingSchedules.delete(existingSchedule.id);
570569
const instance = schedule.instances.at(0);
571570
if (instance) {
572-
await registerNextService.call(instance.id);
571+
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
573572
} else {
574573
throw new CreateDeclarativeScheduleError(
575574
`Missing instance for declarative schedule ${schedule.id}`
@@ -601,7 +600,7 @@ export async function syncDeclarativeSchedules(
601600
const instance = newSchedule.instances.at(0);
602601

603602
if (instance) {
604-
await registerNextService.call(instance.id);
603+
await scheduleEngine.registerNextTaskScheduleInstance({ instanceId: instance.id });
605604
} else {
606605
throw new CreateDeclarativeScheduleError(
607606
`Missing instance for declarative schedule ${newSchedule.id}`

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ export type TriggerTaskServiceOptions = {
3232
scheduleId?: string;
3333
scheduleInstanceId?: string;
3434
queueTimestamp?: Date;
35+
overrideCreatedAt?: Date;
3536
};
3637

3738
export class OutOfEntitlementError extends Error {

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ export class TriggerTaskServiceV1 extends BaseService {
439439
machinePreset: body.options?.machine,
440440
scheduleId: options.scheduleId,
441441
scheduleInstanceId: options.scheduleInstanceId,
442+
createdAt: options.overrideCreatedAt,
442443
},
443444
});
444445

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ export class RunEngine {
366366
runChainState,
367367
scheduleId,
368368
scheduleInstanceId,
369+
createdAt,
369370
}: TriggerParams,
370371
tx?: PrismaClientOrTransaction
371372
): Promise<TaskRun> {
@@ -439,6 +440,7 @@ export class RunEngine {
439440
runChainState,
440441
scheduleId,
441442
scheduleInstanceId,
443+
createdAt,
442444
executionSnapshots: {
443445
create: {
444446
engine: "V2",

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ export type TriggerParams = {
133133
runChainState?: RunChainState;
134134
scheduleId?: string;
135135
scheduleInstanceId?: string;
136+
createdAt?: Date;
136137
};
137138

138139
export type EngineWorker = Worker<typeof workerCatalog>;

0 commit comments

Comments
 (0)