Skip to content

Commit 45934cc

Browse files
committed
add fast idempotency key extraction for the run replication service
1 parent 0487cae commit 45934cc

File tree

5 files changed

+35
-12
lines changed

5 files changed

+35
-12
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
} from "@trigger.dev/core/v3";
1010
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
1111
import {
12-
getIdempotencyKeyScope,
12+
extractIdempotencyKeyScope,
1313
getUserProvidedIdempotencyKey,
1414
} from "@trigger.dev/core/v3/serverOnly";
1515
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
@@ -235,7 +235,7 @@ export class SpanPresenter extends BasePresenter {
235235
environmentId: run.runtimeEnvironment.id,
236236
idempotencyKey: getUserProvidedIdempotencyKey(run),
237237
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
238-
idempotencyKeyScope: getIdempotencyKeyScope(run),
238+
idempotencyKeyScope: extractIdempotencyKeyScope(run),
239239
idempotencyKeyStatus: this.getIdempotencyKeyStatus(run),
240240
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
241241
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
@@ -288,7 +288,7 @@ export class SpanPresenter extends BasePresenter {
288288
idempotencyKeyOptions: unknown;
289289
}): "active" | "inactive" | "expired" | undefined {
290290
// No idempotency configured if no scope exists
291-
const scope = getIdempotencyKeyScope(run);
291+
const scope = extractIdempotencyKeyScope(run);
292292
if (!scope) {
293293
return undefined;
294294
}

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2222
import { tryCatch } from "@trigger.dev/core/utils";
2323
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
24-
import { extractIdempotencyKeyUser, getIdempotencyKeyScope } from "@trigger.dev/core/v3/serverOnly";
24+
import { unsafeExtractIdempotencyKeyScope, unsafeExtractIdempotencyKeyUser } from "@trigger.dev/core/v3/serverOnly";
2525
import { type TaskRun } from "@trigger.dev/database";
2626
import { nanoid } from "nanoid";
2727
import EventEmitter from "node:events";
@@ -892,8 +892,8 @@ export class RunsReplicationService {
892892
run.spanId, // span_id
893893
run.traceId, // trace_id
894894
run.idempotencyKey ?? "", // idempotency_key
895-
extractIdempotencyKeyUser(run) ?? "", // idempotency_key_user
896-
getIdempotencyKeyScope(run) ?? "", // idempotency_key_scope
895+
unsafeExtractIdempotencyKeyUser(run) ?? "", // idempotency_key_user
896+
unsafeExtractIdempotencyKeyScope(run) ?? "", // idempotency_key_scope
897897
run.ttl ?? "", // expiration_ttl
898898
run.isTest ?? false, // is_test
899899
_version.toString(), // _version

apps/webapp/test/runsReplicationService.part2.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,11 @@ describe("RunsReplicationService (part 2/2)", () => {
12921292

12931293
// Execution settings
12941294
machinePreset: "large-1x",
1295-
idempotencyKey: "exhaustive-idempotency-key",
1295+
idempotencyKey: "exhaustive-idempotency-key-hashed",
1296+
idempotencyKeyOptions: {
1297+
key: "exhaustive-idempotency-key",
1298+
scope: "run",
1299+
},
12961300
ttl: "1h",
12971301
isTest: true,
12981302
concurrencyKey: "exhaustive-concurrency-key",
@@ -1388,7 +1392,9 @@ describe("RunsReplicationService (part 2/2)", () => {
13881392

13891393
// Execution settings
13901394
expect(clickhouseRun.machine_preset).toBe("large-1x");
1391-
expect(clickhouseRun.idempotency_key).toBe("exhaustive-idempotency-key");
1395+
expect(clickhouseRun.idempotency_key).toBe("exhaustive-idempotency-key-hashed");
1396+
expect(clickhouseRun.idempotency_key_user).toBe("exhaustive-idempotency-key");
1397+
expect(clickhouseRun.idempotency_key_scope).toBe("run");
13921398
expect(clickhouseRun.expiration_ttl).toBe("1h");
13931399
expect(clickhouseRun.is_test).toBe(1); // ClickHouse returns booleans as integers
13941400
expect(clickhouseRun.concurrency_key).toBe("exhaustive-concurrency-key");

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
TaskRunSuccessfulExecutionResult,
3232
} from "@trigger.dev/core/v3/schemas";
3333
import {
34-
getIdempotencyKeyScope,
34+
extractIdempotencyKeyScope,
3535
getUserProvidedIdempotencyKey,
3636
} from "@trigger.dev/core/v3/serverOnly";
3737
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
@@ -267,7 +267,7 @@ export class RunAttemptSystem {
267267
createdAt: run.createdAt,
268268
startedAt: run.startedAt ?? run.createdAt,
269269
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
270-
idempotencyKeyScope: getIdempotencyKeyScope(run),
270+
idempotencyKeyScope: extractIdempotencyKeyScope(run),
271271
maxAttempts: run.maxAttempts ?? undefined,
272272
version: run.taskVersion ?? "unknown",
273273
maxDuration: run.maxDurationInSeconds ?? undefined,
@@ -578,7 +578,7 @@ export class RunAttemptSystem {
578578
tags: updatedRun.runTags,
579579
isTest: updatedRun.isTest,
580580
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
581-
idempotencyKeyScope: getIdempotencyKeyScope(updatedRun),
581+
idempotencyKeyScope: extractIdempotencyKeyScope(updatedRun),
582582
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
583583
maxAttempts: updatedRun.maxAttempts ?? undefined,
584584
version: updatedRun.taskVersion ?? "unknown",

packages/core/src/v3/serverOnly/idempotencyKeys.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export function getUserProvidedIdempotencyKey(run: {
2424
* @param run - Object containing idempotencyKeyOptions (JSON from DB)
2525
* @returns The scope if valid options exist, otherwise undefined
2626
*/
27-
export function getIdempotencyKeyScope(run: {
27+
export function extractIdempotencyKeyScope(run: {
2828
idempotencyKeyOptions: unknown;
2929
}): "run" | "attempt" | "global" | undefined {
3030
const parsed = IdempotencyKeyOptionsSchema.safeParse(run.idempotencyKeyOptions);
@@ -34,6 +34,13 @@ export function getIdempotencyKeyScope(run: {
3434
return undefined;
3535
}
3636

37+
export function unsafeExtractIdempotencyKeyScope(run: {
38+
idempotencyKeyOptions: unknown;
39+
}): "run" | "attempt" | "global" | undefined {
40+
const unsafe = run.idempotencyKeyOptions as { scope?: "run" | "attempt" | "global" } | undefined | null;
41+
return unsafe?.scope ?? undefined;
42+
}
43+
3744
/**
3845
* Extracts just the user-provided key from idempotencyKeyOptions, without falling back to the hash.
3946
* Useful for ClickHouse replication where we want to store only the explicit user key.
@@ -50,3 +57,13 @@ export function extractIdempotencyKeyUser(run: {
5057
}
5158
return undefined;
5259
}
60+
61+
export function unsafeExtractIdempotencyKeyUser(run: {
62+
idempotencyKeyOptions: unknown;
63+
}): string | undefined {
64+
const unsafe = run.idempotencyKeyOptions as { key?: string } | undefined | null;
65+
66+
return unsafe?.key ?? undefined;
67+
}
68+
69+

0 commit comments

Comments
 (0)