Skip to content

Commit a0262e3

Browse files
committed
new redis-worker backed schedules with distributed triggering
1 parent 33e1008 commit a0262e3

File tree

10 files changed

+358
-14
lines changed

10 files changed

+358
-14
lines changed

apps/webapp/app/env.server.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,45 @@ const EnvironmentSchema = z.object({
683683
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
684684
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
685685

686+
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
687+
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(1),
688+
SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
689+
SCHEDULE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
690+
SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
691+
SCHEDULE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
692+
SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
693+
SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS: z.coerce.number().int().default(30),
694+
695+
SCHEDULE_WORKER_REDIS_HOST: z
696+
.string()
697+
.optional()
698+
.transform((v) => v ?? process.env.REDIS_HOST),
699+
SCHEDULE_WORKER_REDIS_READER_HOST: z
700+
.string()
701+
.optional()
702+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
703+
SCHEDULE_WORKER_REDIS_READER_PORT: z.coerce
704+
.number()
705+
.optional()
706+
.transform(
707+
(v) =>
708+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
709+
),
710+
SCHEDULE_WORKER_REDIS_PORT: z.coerce
711+
.number()
712+
.optional()
713+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
714+
SCHEDULE_WORKER_REDIS_USERNAME: z
715+
.string()
716+
.optional()
717+
.transform((v) => v ?? process.env.REDIS_USERNAME),
718+
SCHEDULE_WORKER_REDIS_PASSWORD: z
719+
.string()
720+
.optional()
721+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
722+
SCHEDULE_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
723+
SCHEDULE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
724+
686725
TASK_EVENT_PARTITIONING_ENABLED: z.string().default("0"),
687726
TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS: z.coerce.number().int().default(60), // 1 minute
688727

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,10 @@ export class RunEngineTriggerTaskService {
301301
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
302302
releaseConcurrency: body.options?.releaseConcurrency,
303303
queueTimestamp:
304-
parentRun && body.options?.resumeParentOnCompletion
304+
options.queueTimestamp ??
305+
(parentRun && body.options?.resumeParentOnCompletion
305306
? parentRun.queueTimestamp ?? undefined
306-
: undefined,
307+
: undefined),
307308
runChainState,
308309
scheduleId: options.scheduleId,
309310
scheduleInstanceId: options.scheduleInstanceId,

apps/webapp/app/services/worker.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ function getWorkerQueue() {
224224
handler: async (payload, job) => {
225225
const service = new TriggerScheduledTaskService();
226226

227+
// NOTE: This graphile worker job is now part of a graceful migration to Redis workers.
228+
// When this job runs, it will execute the current schedule and automatically
229+
// enqueue the next execution in the new Redis-based schedule worker with
230+
// distributed timing. Eventually this graphile job will be removed.
227231
return await service.call(payload.instanceId, job.attempts === job.max_attempts);
228232
},
229233
},
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { TriggerScheduledTaskService } from "./services/triggerScheduledTask.server";
8+
import { calculateDistributedExecutionTime as calculateDistributedExecutionTimeUtil } from "./utils/distributedScheduling.server";
9+
10+
function initializeWorker() {
11+
const redisOptions = {
12+
keyPrefix: "schedule:worker:",
13+
host: env.SCHEDULE_WORKER_REDIS_HOST,
14+
port: env.SCHEDULE_WORKER_REDIS_PORT,
15+
username: env.SCHEDULE_WORKER_REDIS_USERNAME,
16+
password: env.SCHEDULE_WORKER_REDIS_PASSWORD,
17+
enableAutoPipelining: true,
18+
...(env.SCHEDULE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
19+
};
20+
21+
logger.debug(`📅 Initializing schedule worker at host ${env.SCHEDULE_WORKER_REDIS_HOST}`);
22+
23+
const worker = new RedisWorker({
24+
name: "schedule-worker",
25+
redisOptions,
26+
catalog: {
27+
"schedule.triggerScheduledTask": {
28+
schema: z.object({
29+
instanceId: z.string(),
30+
exactScheduleTime: z.coerce.date(),
31+
}),
32+
visibilityTimeoutMs: 60_000,
33+
retry: {
34+
maxAttempts: 5,
35+
},
36+
},
37+
},
38+
concurrency: {
39+
workers: env.SCHEDULE_WORKER_CONCURRENCY_WORKERS,
40+
tasksPerWorker: env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
41+
limit: env.SCHEDULE_WORKER_CONCURRENCY_LIMIT,
42+
},
43+
pollIntervalMs: env.SCHEDULE_WORKER_POLL_INTERVAL,
44+
immediatePollIntervalMs: env.SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL,
45+
shutdownTimeoutMs: env.SCHEDULE_WORKER_SHUTDOWN_TIMEOUT_MS,
46+
logger: new Logger("ScheduleWorker", "debug"),
47+
jobs: {
48+
"schedule.triggerScheduledTask": async ({ payload }) => {
49+
const service = new TriggerScheduledTaskService();
50+
51+
// Pass false for final attempt since Redis worker handles retries differently than graphile
52+
// The exactScheduleTime will be used as the queueTimestamp in the triggered task
53+
await service.call(payload.instanceId, false, payload.exactScheduleTime);
54+
},
55+
},
56+
});
57+
58+
if (env.SCHEDULE_WORKER_ENABLED === "true") {
59+
logger.debug(
60+
`📅 Starting schedule worker at host ${env.SCHEDULE_WORKER_REDIS_HOST}, pollInterval = ${env.SCHEDULE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.SCHEDULE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.SCHEDULE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.SCHEDULE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.SCHEDULE_WORKER_CONCURRENCY_LIMIT}, distributionWindow = ${env.SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS}s`
61+
);
62+
63+
worker.start();
64+
}
65+
66+
return worker;
67+
}
68+
69+
export const scheduleWorker = singleton("scheduleWorker", initializeWorker);
70+
71+
/**
72+
* Calculates a distributed execution time within the configured distribution window
73+
* before the exact schedule time. This helps spread the load across time instead of
74+
* having all scheduled tasks execute at exactly the same moment.
75+
*/
76+
export function calculateDistributedExecutionTime(exactScheduleTime: Date): Date {
77+
return calculateDistributedExecutionTimeUtil(
78+
exactScheduleTime,
79+
env.SCHEDULE_WORKER_DISTRIBUTION_WINDOW_SECONDS
80+
);
81+
}
82+
83+
/**
84+
* Enqueues a scheduled task to be executed at a distributed time before the exact schedule time,
85+
* but ensures the task is triggered with the correct exact schedule time.
86+
*/
87+
export async function enqueueScheduledTask(instanceId: string, exactScheduleTime: Date) {
88+
const distributedExecutionTime = calculateDistributedExecutionTime(exactScheduleTime);
89+
90+
logger.debug("Enqueuing scheduled task with distributed execution", {
91+
instanceId,
92+
exactScheduleTime: exactScheduleTime.toISOString(),
93+
distributedExecutionTime: distributedExecutionTime.toISOString(),
94+
distributionOffsetMs: exactScheduleTime.getTime() - distributedExecutionTime.getTime(),
95+
});
96+
97+
await scheduleWorker.enqueue({
98+
id: `scheduled-task-instance:${instanceId}`,
99+
job: "schedule.triggerScheduledTask",
100+
payload: {
101+
instanceId,
102+
exactScheduleTime,
103+
},
104+
availableAt: distributedExecutionTime,
105+
});
106+
}

apps/webapp/app/v3/services/registerNextTaskScheduleInstance.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export class RegisterNextTaskScheduleInstanceService extends BaseService {
4949
},
5050
});
5151

52-
// Enqueue triggering the task at the next scheduled timestamp
52+
// Enqueue triggering the task at the next scheduled timestamp in the new Redis worker
5353
await TriggerScheduledTaskService.enqueue(instanceId, nextScheduledTimestamp);
5454
}
5555
}

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ import { nextScheduledTimestamps } from "../utils/calculateNextSchedule.server";
88
import { BaseService } from "./baseService.server";
99
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
1010
import { TriggerTaskService } from "./triggerTask.server";
11+
import { enqueueScheduledTask } from "../scheduleWorker.server";
1112

1213
export class TriggerScheduledTaskService extends BaseService {
13-
public async call(instanceId: string, finalAttempt: boolean) {
14+
public async call(instanceId: string, finalAttempt: boolean, exactScheduleTime?: Date) {
1415
const registerNextService = new RegisterNextTaskScheduleInstanceService();
1516

1617
const instance = await this._prisma.taskScheduleInstance.findFirst({
@@ -131,17 +132,19 @@ export class TriggerScheduledTaskService extends BaseService {
131132
// Enqueue triggering the task
132133
const triggerTask = new TriggerTaskService();
133134

135+
const scheduleTimestamp = exactScheduleTime ?? instance.nextScheduledTimestamp;
136+
134137
const payload = {
135138
scheduleId: instance.taskSchedule.friendlyId,
136139
type: instance.taskSchedule.type,
137-
timestamp: instance.nextScheduledTimestamp,
140+
timestamp: scheduleTimestamp,
138141
lastTimestamp: instance.lastScheduledTimestamp ?? undefined,
139142
externalId: instance.taskSchedule.externalId ?? undefined,
140143
timezone: instance.taskSchedule.timezone,
141144
upcoming: nextScheduledTimestamps(
142145
instance.taskSchedule.generatorExpression,
143146
instance.taskSchedule.timezone,
144-
instance.nextScheduledTimestamp!,
147+
scheduleTimestamp!,
145148
10
146149
),
147150
};
@@ -161,6 +164,7 @@ export class TriggerScheduledTaskService extends BaseService {
161164
customIcon: "scheduled",
162165
scheduleId: instance.taskSchedule.id,
163166
scheduleInstanceId: instance.id,
167+
queueTimestamp: exactScheduleTime ?? instance.nextScheduledTimestamp ?? undefined,
164168
}
165169
);
166170

@@ -191,26 +195,39 @@ export class TriggerScheduledTaskService extends BaseService {
191195
},
192196
});
193197

194-
await registerNextService.call(instanceId);
198+
// For migration: after handling current execution, schedule the next run in Redis worker
199+
try {
200+
await registerNextService.call(instanceId);
201+
} catch (nextRunError) {
202+
logger.error("Failed to schedule next run in Redis worker after successful execution", {
203+
instanceId,
204+
error: nextRunError,
205+
});
206+
// Don't fail the current execution due to next run scheduling issues
207+
}
195208
} catch (e) {
196209
if (finalAttempt) {
197210
logger.error("Failed to trigger scheduled task, rescheduling the next run", {
198211
instanceId,
199212
error: e,
200213
});
201214

202-
await registerNextService.call(instanceId);
215+
try {
216+
await registerNextService.call(instanceId);
217+
} catch (nextRunError) {
218+
logger.error("Failed to schedule next run in Redis worker after failed execution", {
219+
instanceId,
220+
originalError: e,
221+
nextRunError,
222+
});
223+
}
203224
} else {
204225
throw e;
205226
}
206227
}
207228
}
208229

209-
public static async enqueue(instanceId: string, runAt: Date, tx?: PrismaClientOrTransaction) {
210-
return await workerQueue.enqueue(
211-
"v3.triggerScheduledTask",
212-
{ instanceId },
213-
{ tx, jobKey: `scheduled-task-instance:${instanceId}`, runAt }
214-
);
230+
public static async enqueue(instanceId: string, runAt: Date) {
231+
return await enqueueScheduledTask(instanceId, runAt);
215232
}
216233
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export type TriggerTaskServiceOptions = {
3131
oneTimeUseToken?: string;
3232
scheduleId?: string;
3333
scheduleInstanceId?: string;
34+
queueTimestamp?: Date;
3435
};
3536

3637
export class OutOfEntitlementError extends Error {

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ export class TriggerTaskServiceV1 extends BaseService {
364364
: 0;
365365

366366
const queueTimestamp =
367+
options.queueTimestamp ??
367368
dependentAttempt?.taskRun.queueTimestamp ??
368369
dependentBatchRun?.dependentTaskAttempt?.taskRun.queueTimestamp ??
369370
delayUntil ??
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Calculates a distributed execution time within the specified distribution window
3+
* before the exact schedule time. This helps spread the load across time instead of
4+
* having all scheduled tasks execute at exactly the same moment.
5+
*/
6+
export function calculateDistributedExecutionTime(
7+
exactScheduleTime: Date,
8+
distributionWindowSeconds: number
9+
): Date {
10+
const distributionWindowMs = distributionWindowSeconds * 1000;
11+
12+
// Generate a random offset within the distribution window
13+
// Use a hash of the schedule time to ensure consistent distribution
14+
// for the same schedule time across multiple calls
15+
const timeString = exactScheduleTime.toISOString();
16+
let hash = 0;
17+
for (let i = 0; i < timeString.length; i++) {
18+
const char = timeString.charCodeAt(i);
19+
hash = (hash << 5) - hash + char;
20+
hash = hash & hash; // Convert to 32-bit integer
21+
}
22+
23+
// Convert hash to a value between 0 and 1
24+
const normalized = Math.abs(hash) / 2147483648; // 2^31
25+
26+
// Calculate offset (0 to distributionWindowMs)
27+
const offsetMs = Math.floor(normalized * distributionWindowMs);
28+
29+
// Return the distributed execution time (before the exact schedule time)
30+
return new Date(exactScheduleTime.getTime() - offsetMs);
31+
}

0 commit comments

Comments
 (0)