Skip to content

Commit c21d103

Browse files
committed
extract out the getUserProvidedIdempotencyKey/Scope stuff to core and also added a test for the resetting changes
1 parent d6a1375 commit c21d103

File tree

7 files changed

+221
-74
lines changed

7 files changed

+221
-74
lines changed

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
logger,
1010
} from "@trigger.dev/core/v3";
1111
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
12+
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
1213
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
1314
import assertNever from "assert-never";
1415
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
@@ -18,26 +19,6 @@ import { generatePresignedUrl } from "~/v3/r2.server";
1819
import { tracer } from "~/v3/tracer.server";
1920
import { startSpanWithEnv } from "~/v3/tracing.server";
2021

21-
/**
22-
* Returns the user-provided idempotency key if available (from idempotencyKeyOptions),
23-
* otherwise falls back to the stored idempotency key (which is the hash).
24-
*/
25-
function getUserProvidedIdempotencyKey(run: {
26-
idempotencyKey: string | null;
27-
idempotencyKeyOptions: unknown;
28-
}): string | undefined {
29-
// If we have the user-provided key options, return the original key
30-
const options = run.idempotencyKeyOptions as {
31-
key?: string;
32-
scope?: string;
33-
} | null;
34-
if (options?.key) {
35-
return options.key;
36-
}
37-
// Fallback to the hash (for runs created before this feature)
38-
return run.idempotencyKey ?? undefined;
39-
}
40-
4122
// Build 'select' object
4223
const commonRunSelect = {
4324
id: true,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
type V3TaskRunContext,
99
} from "@trigger.dev/core/v3";
1010
import { AttemptId, getMaxDuration, parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
11+
import { getUserProvidedIdempotencyKey } from "@trigger.dev/core/v3/serverOnly";
1112
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
1213
import { logger } from "~/services/logger.server";
1314
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
@@ -229,7 +230,7 @@ export class SpanPresenter extends BasePresenter {
229230
isTest: run.isTest,
230231
replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId,
231232
environmentId: run.runtimeEnvironment.id,
232-
idempotencyKey: this.getUserProvidedIdempotencyKey(run),
233+
idempotencyKey: getUserProvidedIdempotencyKey(run),
233234
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
234235
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
235236
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
@@ -645,7 +646,7 @@ export class SpanPresenter extends BasePresenter {
645646
createdAt: run.createdAt,
646647
tags: run.runTags,
647648
isTest: run.isTest,
648-
idempotencyKey: this.getUserProvidedIdempotencyKey(run) ?? undefined,
649+
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
649650
startedAt: run.startedAt ?? run.createdAt,
650651
durationMs: run.usageDurationMs,
651652
costInCents: run.costInCents,
@@ -706,22 +707,4 @@ export class SpanPresenter extends BasePresenter {
706707
return parsedTraceparent?.traceId;
707708
}
708709

709-
/**
710-
* Returns the user-provided idempotency key if available (from idempotencyKeyOptions),
711-
* otherwise falls back to the stored idempotency key (which is the hash).
712-
*/
713-
getUserProvidedIdempotencyKey(
714-
run: Pick<FindRunResult, "idempotencyKey" | "idempotencyKeyOptions">
715-
): string | null {
716-
// If we have the user-provided key options, return the original key
717-
const options = run.idempotencyKeyOptions as {
718-
key?: string;
719-
scope?: string;
720-
} | null;
721-
if (options?.key) {
722-
return options.key;
723-
}
724-
// Fallback to the hash (for runs created before this feature)
725-
return run.idempotencyKey;
726-
}
727710
}

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2222
import { tryCatch } from "@trigger.dev/core/utils";
2323
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
24+
import { extractIdempotencyKeyUser, getIdempotencyKeyScope } from "@trigger.dev/core/v3/serverOnly";
2425
import { type TaskRun } from "@trigger.dev/database";
2526
import { nanoid } from "nanoid";
2627
import EventEmitter from "node:events";
@@ -891,8 +892,8 @@ export class RunsReplicationService {
891892
run.spanId, // span_id
892893
run.traceId, // trace_id
893894
run.idempotencyKey ?? "", // idempotency_key
894-
this.#extractIdempotencyKeyUser(run), // idempotency_key_user
895-
this.#extractIdempotencyKeyScope(run), // idempotency_key_scope
895+
extractIdempotencyKeyUser(run) ?? "", // idempotency_key_user
896+
getIdempotencyKeyScope(run) ?? "", // idempotency_key_scope
896897
run.ttl ?? "", // expiration_ttl
897898
run.isTest ?? false, // is_test
898899
_version.toString(), // _version
@@ -954,15 +955,6 @@ export class RunsReplicationService {
954955
return { data: parsedData };
955956
}
956957

957-
#extractIdempotencyKeyUser(run: TaskRun): string {
958-
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
959-
return options?.key ?? "";
960-
}
961-
962-
#extractIdempotencyKeyScope(run: TaskRun): string {
963-
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
964-
return options?.scope ?? "";
965-
}
966958
}
967959

968960
export type ConcurrentFlushSchedulerConfig<T> = {

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ import {
3030
TaskRunInternalError,
3131
TaskRunSuccessfulExecutionResult,
3232
} from "@trigger.dev/core/v3/schemas";
33+
import {
34+
getIdempotencyKeyScope,
35+
getUserProvidedIdempotencyKey,
36+
} from "@trigger.dev/core/v3/serverOnly";
3337
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
3438
import {
3539
$transaction,
@@ -262,8 +266,8 @@ export class RunAttemptSystem {
262266
isTest: run.isTest,
263267
createdAt: run.createdAt,
264268
startedAt: run.startedAt ?? run.createdAt,
265-
idempotencyKey: this.#getUserProvidedIdempotencyKey(run) ?? undefined,
266-
idempotencyKeyScope: this.#getIdempotencyKeyScope(run),
269+
idempotencyKey: getUserProvidedIdempotencyKey(run) ?? undefined,
270+
idempotencyKeyScope: getIdempotencyKeyScope(run),
267271
maxAttempts: run.maxAttempts ?? undefined,
268272
version: run.taskVersion ?? "unknown",
269273
maxDuration: run.maxDurationInSeconds ?? undefined,
@@ -573,8 +577,8 @@ export class RunAttemptSystem {
573577
createdAt: updatedRun.createdAt,
574578
tags: updatedRun.runTags,
575579
isTest: updatedRun.isTest,
576-
idempotencyKey: this.#getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
577-
idempotencyKeyScope: this.#getIdempotencyKeyScope(updatedRun),
580+
idempotencyKey: getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
581+
idempotencyKeyScope: getIdempotencyKeyScope(updatedRun),
578582
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
579583
maxAttempts: updatedRun.maxAttempts ?? undefined,
580584
version: updatedRun.taskVersion ?? "unknown",
@@ -1919,24 +1923,6 @@ export class RunAttemptSystem {
19191923
};
19201924
}
19211925

1922-
#getUserProvidedIdempotencyKey(
1923-
run: { idempotencyKey: string | null; idempotencyKeyOptions: unknown }
1924-
): string | null {
1925-
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
1926-
// Return user-provided key if available, otherwise fall back to the hash
1927-
return options?.key ?? run.idempotencyKey;
1928-
}
1929-
1930-
#getIdempotencyKeyScope(
1931-
run: { idempotencyKeyOptions: unknown }
1932-
): "run" | "attempt" | "global" | undefined {
1933-
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
1934-
const scope = options?.scope;
1935-
if (scope === "run" || scope === "attempt" || scope === "global") {
1936-
return scope;
1937-
}
1938-
return undefined;
1939-
}
19401926
}
19411927

19421928
export function safeParseGitMeta(git: unknown): GitMeta | undefined {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { IdempotencyKeyOptionsSchema } from "../schemas/api.js";
2+
3+
/**
4+
* Safely parses idempotencyKeyOptions from a database record and extracts the user-provided key.
5+
* Returns the user-provided key if valid options exist, otherwise falls back to the hash.
6+
*
7+
* @param run - Object containing idempotencyKey (the hash) and idempotencyKeyOptions (JSON from DB)
8+
* @returns The user-provided key, the hash as fallback, or null if neither exists
9+
*/
10+
export function getUserProvidedIdempotencyKey(run: {
11+
idempotencyKey: string | null;
12+
idempotencyKeyOptions: unknown;
13+
}): string | null {
14+
const parsed = IdempotencyKeyOptionsSchema.safeParse(run.idempotencyKeyOptions);
15+
if (parsed.success) {
16+
return parsed.data.key;
17+
}
18+
return run.idempotencyKey;
19+
}
20+
21+
/**
22+
* Safely parses idempotencyKeyOptions and extracts the scope.
23+
*
24+
* @param run - Object containing idempotencyKeyOptions (JSON from DB)
25+
* @returns The scope if valid options exist, otherwise undefined
26+
*/
27+
export function getIdempotencyKeyScope(run: {
28+
idempotencyKeyOptions: unknown;
29+
}): "run" | "attempt" | "global" | undefined {
30+
const parsed = IdempotencyKeyOptionsSchema.safeParse(run.idempotencyKeyOptions);
31+
if (parsed.success) {
32+
return parsed.data.scope;
33+
}
34+
return undefined;
35+
}
36+
37+
/**
38+
* Extracts just the user-provided key from idempotencyKeyOptions, without falling back to the hash.
39+
* Useful for ClickHouse replication where we want to store only the explicit user key.
40+
*
41+
* @param run - Object containing idempotencyKeyOptions (JSON from DB)
42+
* @returns The user-provided key if valid options exist, otherwise undefined
43+
*/
44+
export function extractIdempotencyKeyUser(run: {
45+
idempotencyKeyOptions: unknown;
46+
}): string | undefined {
47+
const parsed = IdempotencyKeyOptionsSchema.safeParse(run.idempotencyKeyOptions);
48+
if (parsed.success) {
49+
return parsed.data.key;
50+
}
51+
return undefined;
52+
}

packages/core/src/v3/serverOnly/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ export * from "./jumpHash.js";
88
export * from "../apiClient/version.js";
99
export * from "./placementTags.js";
1010
export * from "./resourceMonitor.js";
11+
export * from "./idempotencyKeys.js";

references/hello-world/src/trigger/idempotency.ts

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { batch, idempotencyKeys, logger, task, timeout, usage, wait } from "@trigger.dev/sdk/v3";
1+
import { batch, idempotencyKeys, logger, runs, task, timeout, usage, wait } from "@trigger.dev/sdk/v3";
22
import { setTimeout } from "timers/promises";
33
import { childTask } from "./example.js";
44

@@ -388,3 +388,155 @@ export const idempotencyKeyOptionsTest = task({
388388
};
389389
},
390390
});
391+
392+
// Test task for verifying idempotencyKeys.reset works with the new API (TRI-4352)
393+
export const idempotencyKeyResetTest = task({
394+
id: "idempotency-key-reset-test",
395+
maxDuration: 120,
396+
run: async (payload: any, { ctx }) => {
397+
logger.log("Testing idempotencyKeys.reset feature (TRI-4352)");
398+
399+
const testResults: Array<{
400+
test: string;
401+
success: boolean;
402+
details: Record<string, unknown>;
403+
}> = [];
404+
405+
// Test 1: Reset using IdempotencyKey object (options extracted automatically)
406+
{
407+
const key = await idempotencyKeys.create("reset-test-key-1", { scope: "global" });
408+
logger.log("Test 1: Created global-scoped key", { key: key.toString() });
409+
410+
// First trigger - should create a new run
411+
const result1 = await idempotencyKeyOptionsChild.triggerAndWait(
412+
{ message: "First trigger" },
413+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
414+
);
415+
const firstRunId = result1.ok ? result1.id : null;
416+
logger.log("Test 1: First trigger", { runId: firstRunId });
417+
418+
// Second trigger - should be deduplicated (same run ID)
419+
const result2 = await idempotencyKeyOptionsChild.triggerAndWait(
420+
{ message: "Second trigger (should dedupe)" },
421+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
422+
);
423+
const secondRunId = result2.ok ? result2.id : null;
424+
logger.log("Test 1: Second trigger (dedupe check)", { runId: secondRunId });
425+
426+
const wasDeduplicated = firstRunId === secondRunId;
427+
428+
// Reset the idempotency key using the IdempotencyKey object
429+
logger.log("Test 1: Resetting idempotency key using IdempotencyKey object");
430+
await idempotencyKeys.reset("idempotency-key-options-child", key);
431+
432+
// Third trigger - should create a NEW run after reset
433+
const result3 = await idempotencyKeyOptionsChild.triggerAndWait(
434+
{ message: "Third trigger (after reset)" },
435+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
436+
);
437+
const thirdRunId = result3.ok ? result3.id : null;
438+
logger.log("Test 1: Third trigger (after reset)", { runId: thirdRunId });
439+
440+
const wasResetSuccessful = thirdRunId !== firstRunId && thirdRunId !== null;
441+
442+
testResults.push({
443+
test: "Reset with IdempotencyKey object (global scope)",
444+
success: wasDeduplicated && wasResetSuccessful,
445+
details: {
446+
firstRunId,
447+
secondRunId,
448+
thirdRunId,
449+
wasDeduplicated,
450+
wasResetSuccessful,
451+
},
452+
});
453+
}
454+
455+
// Test 2: Reset using raw string with scope option
456+
{
457+
const keyString = "reset-test-key-2";
458+
const key = await idempotencyKeys.create(keyString, { scope: "global" });
459+
logger.log("Test 2: Created global-scoped key from string", { key: key.toString() });
460+
461+
// First trigger
462+
const result1 = await idempotencyKeyOptionsChild.triggerAndWait(
463+
{ message: "First trigger (raw string test)" },
464+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
465+
);
466+
const firstRunId = result1.ok ? result1.id : null;
467+
logger.log("Test 2: First trigger", { runId: firstRunId });
468+
469+
// Reset using raw string + scope option
470+
logger.log("Test 2: Resetting idempotency key using raw string + scope");
471+
await idempotencyKeys.reset("idempotency-key-options-child", keyString, { scope: "global" });
472+
473+
// Second trigger - should create a NEW run after reset
474+
const result2 = await idempotencyKeyOptionsChild.triggerAndWait(
475+
{ message: "Second trigger (after reset with raw string)" },
476+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
477+
);
478+
const secondRunId = result2.ok ? result2.id : null;
479+
logger.log("Test 2: Second trigger (after reset)", { runId: secondRunId });
480+
481+
const wasResetSuccessful = secondRunId !== firstRunId && secondRunId !== null;
482+
483+
testResults.push({
484+
test: "Reset with raw string + scope option (global scope)",
485+
success: wasResetSuccessful,
486+
details: {
487+
firstRunId,
488+
secondRunId,
489+
wasResetSuccessful,
490+
},
491+
});
492+
}
493+
494+
// Test 3: Reset with run scope (uses current run context)
495+
{
496+
const key = await idempotencyKeys.create("reset-test-key-3", { scope: "run" });
497+
logger.log("Test 3: Created run-scoped key", { key: key.toString() });
498+
499+
// First trigger
500+
const result1 = await idempotencyKeyOptionsChild.triggerAndWait(
501+
{ message: "First trigger (run scope)" },
502+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
503+
);
504+
const firstRunId = result1.ok ? result1.id : null;
505+
logger.log("Test 3: First trigger", { runId: firstRunId });
506+
507+
// Reset using IdempotencyKey (run scope - should use current run context)
508+
logger.log("Test 3: Resetting idempotency key with run scope");
509+
await idempotencyKeys.reset("idempotency-key-options-child", key);
510+
511+
// Second trigger - should create a NEW run after reset
512+
const result2 = await idempotencyKeyOptionsChild.triggerAndWait(
513+
{ message: "Second trigger (after reset, run scope)" },
514+
{ idempotencyKey: key, idempotencyKeyTTL: "300s" }
515+
);
516+
const secondRunId = result2.ok ? result2.id : null;
517+
logger.log("Test 3: Second trigger (after reset)", { runId: secondRunId });
518+
519+
const wasResetSuccessful = secondRunId !== firstRunId && secondRunId !== null;
520+
521+
testResults.push({
522+
test: "Reset with IdempotencyKey object (run scope)",
523+
success: wasResetSuccessful,
524+
details: {
525+
firstRunId,
526+
secondRunId,
527+
wasResetSuccessful,
528+
parentRunId: ctx.run.id,
529+
},
530+
});
531+
}
532+
533+
// Summary
534+
const allPassed = testResults.every((r) => r.success);
535+
logger.log("Test summary", { allPassed, testResults });
536+
537+
return {
538+
allPassed,
539+
testResults,
540+
};
541+
},
542+
});

0 commit comments

Comments
 (0)