Skip to content

Commit badaff0

Browse files
committed
introducing the schedule engine
1 parent a0262e3 commit badaff0

24 files changed

+1289
-419
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ const EnvironmentSchema = z.object({
683683
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
684684
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
685685

686+
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
686687
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
687688
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
688689
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),

apps/webapp/app/services/worker.server.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.
2222
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
2323
import { RetryAttemptService } from "~/v3/services/retryAttempt.server";
2424
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
25-
import { TriggerScheduledTaskService } from "~/v3/services/triggerScheduledTask.server";
2625
import { GraphileMigrationHelperService } from "./db/graphileMigrationHelper.server";
2726
import { sendEmail } from "./email.server";
2827
import { reportInvocationUsage } from "./platform.v3.server";
@@ -32,6 +31,7 @@ import {
3231
BatchProcessingOptions as RunEngineBatchProcessingOptions,
3332
RunEngineBatchTriggerService,
3433
} from "~/runEngine/services/batchTrigger.server";
34+
import { scheduleEngine } from "~/v3/scheduleEngine.server";
3535

3636
const workerCatalog = {
3737
scheduleEmail: DeliverEmailSchema,
@@ -222,13 +222,10 @@ function getWorkerQueue() {
222222
priority: 0,
223223
maxAttempts: 3, // total delay of 30 seconds
224224
handler: async (payload, job) => {
225-
const service = new TriggerScheduledTaskService();
226-
227-
// NOTE: This graphile worker job is now part of a graceful migration to Redis workers.
228-
// When this job runs, it will execute the current schedule and automatically
229-
// enqueue the next execution in the new Redis-based schedule worker with
230-
// distributed timing. Eventually this graphile job will be removed.
231-
return await service.call(payload.instanceId, job.attempts === job.max_attempts);
225+
await scheduleEngine.triggerScheduledTask({
226+
instanceId: payload.instanceId,
227+
finalAttempt: job.attempts === job.max_attempts,
228+
});
232229
},
233230
},
234231
"v3.performTaskRunAlerts": {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { ScheduleEngine, TriggerScheduledTaskCallback } from "@internal/schedule-engine";
2+
import { stringifyIO } from "@trigger.dev/core/v3";
3+
import { prisma } from "~/db.server";
4+
import { env } from "~/env.server";
5+
import { devPresence } from "~/presenters/v3/DevPresence.server";
6+
import { logger } from "~/services/logger.server";
7+
import { singleton } from "~/utils/singleton";
8+
import { TriggerTaskService } from "./services/triggerTask.server";
9+
import { meter, tracer } from "./tracer.server";
10+
11+
export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);
12+
13+
export type { ScheduleEngine };
14+
15+
const triggerScheduledTaskCallback: TriggerScheduledTaskCallback = async ({
16+
taskIdentifier,
17+
environment,
18+
payload,
19+
scheduleInstanceId,
20+
scheduleId,
21+
exactScheduleTime,
22+
}) => {
23+
try {
24+
// Use TriggerTaskServiceV1 for now (can be updated to use TriggerTaskService when ready)
25+
const triggerService = new TriggerTaskService();
26+
27+
const payloadPacket = await stringifyIO(payload);
28+
29+
logger.debug("Triggering scheduled task", {
30+
taskIdentifier,
31+
environment,
32+
payload,
33+
scheduleInstanceId,
34+
scheduleId,
35+
exactScheduleTime,
36+
});
37+
38+
const result = await triggerService.call(
39+
taskIdentifier,
40+
environment,
41+
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
42+
{
43+
customIcon: "scheduled",
44+
scheduleId,
45+
scheduleInstanceId,
46+
queueTimestamp: exactScheduleTime,
47+
}
48+
);
49+
50+
return { success: !!result };
51+
} catch (error) {
52+
return {
53+
success: false,
54+
error: error instanceof Error ? error.message : String(error),
55+
};
56+
}
57+
};
58+
59+
async function isDevEnvironmentConnectedHandler(environmentId: string) {
60+
const environment = await prisma.runtimeEnvironment.findFirst({
61+
where: {
62+
id: environmentId,
63+
},
64+
select: {
65+
currentSession: {
66+
select: {
67+
disconnectedAt: true,
68+
},
69+
},
70+
project: {
71+
select: {
72+
engine: true,
73+
},
74+
},
75+
},
76+
});
77+
78+
if (!environment) {
79+
return false;
80+
}
81+
82+
if (environment.project.engine === "V1") {
83+
const v3Disconnected = !environment.currentSession || environment.currentSession.disconnectedAt;
84+
85+
return !v3Disconnected;
86+
}
87+
88+
const v4Connected = await devPresence.isConnected(environmentId);
89+
90+
return v4Connected;
91+
}
92+
93+
function createScheduleEngine() {
94+
const engine = new ScheduleEngine({
95+
prisma,
96+
logLevel: env.SCHEDULE_ENGINE_LOG_LEVEL,
97+
redis: {
98+
host: env.SCHEDULE_WORKER_REDIS_HOST ?? "localhost",
99+
port: env.SCHEDULE_WORKER_REDIS_PORT ?? 6379,
100+
username: env.SCHEDULE_WORKER_REDIS_USERNAME,
101+
password: env.SCHEDULE_WORKER_REDIS_PASSWORD,
102+
keyPrefix: "schedule:",
103+
enableAutoPipelining: true,
104+
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
105+
},
106+
worker: {
107+
concurrency: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
108+
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
109+
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
110+
disabled: env.SCHEDULE_WORKER_ENABLED === "0",
111+
},
112+
distributionWindow: {
113+
seconds: env.SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS,
114+
},
115+
tracer,
116+
meter,
117+
onTriggerScheduledTask: triggerScheduledTaskCallback,
118+
isDevEnvironmentConnectedHandler: isDevEnvironmentConnectedHandler,
119+
});
120+
121+
return engine;
122+
}

apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)