Skip to content

Commit b6b0b67

Browse files
committed
feat(sdk): expose user-provided idempotency key and scope in task context
1 parent 5504e7f commit b6b0b67

File tree

16 files changed

+441
-31
lines changed

16 files changed

+441
-31
lines changed

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,26 @@ import { generatePresignedUrl } from "~/v3/r2.server";
1818
import { tracer } from "~/v3/tracer.server";
1919
import { startSpanWithEnv } from "~/v3/tracing.server";
2020

21+
/**
22+
* Returns the user-provided idempotency key if available (from idempotencyKeyOptions),
23+
* otherwise falls back to the stored idempotency key (which is the hash).
24+
*/
25+
function getUserProvidedIdempotencyKey(run: {
26+
idempotencyKey: string | null;
27+
idempotencyKeyOptions: unknown;
28+
}): string | undefined {
29+
// If we have the user-provided key options, return the original key
30+
const options = run.idempotencyKeyOptions as {
31+
key?: string;
32+
scope?: string;
33+
} | null;
34+
if (options?.key) {
35+
return options.key;
36+
}
37+
// Fallback to the hash (for runs created before this feature)
38+
return run.idempotencyKey ?? undefined;
39+
}
40+
2141
// Build 'select' object
2242
const commonRunSelect = {
2343
id: true,
@@ -38,6 +58,7 @@ const commonRunSelect = {
3858
baseCostInCents: true,
3959
usageDurationMs: true,
4060
idempotencyKey: true,
61+
idempotencyKeyOptions: true,
4162
isTest: true,
4263
depth: true,
4364
scheduleId: true,
@@ -442,7 +463,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
442463
return {
443464
id: run.friendlyId,
444465
taskIdentifier: run.taskIdentifier,
445-
idempotencyKey: run.idempotencyKey ?? undefined,
466+
idempotencyKey: getUserProvidedIdempotencyKey(run),
446467
version: run.lockedToVersion?.version,
447468
status: ApiRetrieveRunPresenter.apiStatusFromRunStatus(run.status, apiVersion),
448469
createdAt: run.createdAt,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export class SpanPresenter extends BasePresenter {
229229
isTest: run.isTest,
230230
replayedFromTaskRunFriendlyId: run.replayedFromTaskRunFriendlyId,
231231
environmentId: run.runtimeEnvironment.id,
232-
idempotencyKey: run.idempotencyKey,
232+
idempotencyKey: this.getUserProvidedIdempotencyKey(run),
233233
idempotencyKeyExpiresAt: run.idempotencyKeyExpiresAt,
234234
debounce: run.debounce as { key: string; delay: string; createdAt: Date } | null,
235235
schedule: await this.resolveSchedule(run.scheduleId ?? undefined),
@@ -355,6 +355,7 @@ export class SpanPresenter extends BasePresenter {
355355
//idempotency
356356
idempotencyKey: true,
357357
idempotencyKeyExpiresAt: true,
358+
idempotencyKeyOptions: true,
358359
//debounce
359360
debounce: true,
360361
//delayed
@@ -644,7 +645,7 @@ export class SpanPresenter extends BasePresenter {
644645
createdAt: run.createdAt,
645646
tags: run.runTags,
646647
isTest: run.isTest,
647-
idempotencyKey: run.idempotencyKey ?? undefined,
648+
idempotencyKey: this.getUserProvidedIdempotencyKey(run) ?? undefined,
648649
startedAt: run.startedAt ?? run.createdAt,
649650
durationMs: run.usageDurationMs,
650651
costInCents: run.costInCents,
@@ -704,4 +705,23 @@ export class SpanPresenter extends BasePresenter {
704705

705706
return parsedTraceparent?.traceId;
706707
}
708+
709+
/**
710+
* Returns the user-provided idempotency key if available (from idempotencyKeyOptions),
711+
* otherwise falls back to the stored idempotency key (which is the hash).
712+
*/
713+
getUserProvidedIdempotencyKey(
714+
run: Pick<FindRunResult, "idempotencyKey" | "idempotencyKeyOptions">
715+
): string | null {
716+
// If we have the user-provided key options, return the original key
717+
const options = run.idempotencyKeyOptions as {
718+
key?: string;
719+
scope?: string;
720+
} | null;
721+
if (options?.key) {
722+
return options.key;
723+
}
724+
// Fallback to the hash (for runs created before this feature)
725+
return run.idempotencyKey;
726+
}
707727
}

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ export class RunEngineTriggerTaskService {
304304
environment: environment,
305305
idempotencyKey,
306306
idempotencyKeyExpiresAt: idempotencyKey ? idempotencyKeyExpiresAt : undefined,
307+
idempotencyKeyOptions: body.options?.idempotencyKeyOptions,
307308
taskIdentifier: taskId,
308309
payload: payloadPacket.data ?? "",
309310
payloadType: payloadPacket.dataType,

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,8 @@ export class RunsReplicationService {
891891
run.spanId, // span_id
892892
run.traceId, // trace_id
893893
run.idempotencyKey ?? "", // idempotency_key
894+
this.#extractIdempotencyKeyUser(run), // idempotency_key_user
895+
this.#extractIdempotencyKeyScope(run), // idempotency_key_scope
894896
run.ttl ?? "", // expiration_ttl
895897
run.isTest ?? false, // is_test
896898
_version.toString(), // _version
@@ -951,6 +953,16 @@ export class RunsReplicationService {
951953

952954
return { data: parsedData };
953955
}
956+
957+
#extractIdempotencyKeyUser(run: TaskRun): string {
958+
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
959+
return options?.key ?? "";
960+
}
961+
962+
#extractIdempotencyKeyScope(run: TaskRun): string {
963+
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
964+
return options?.scope ?? "";
965+
}
954966
}
955967

956968
export type ConcurrentFlushSchedulerConfig<T> = {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
-- +goose Up
2+
3+
-- Add columns for storing user-provided idempotency key and scope for searching
4+
ALTER TABLE trigger_dev.task_runs_v2
5+
ADD COLUMN idempotency_key_user String DEFAULT '';
6+
7+
ALTER TABLE trigger_dev.task_runs_v2
8+
ADD COLUMN idempotency_key_scope String DEFAULT '';
9+
10+
-- +goose Down
11+
12+
ALTER TABLE trigger_dev.task_runs_v2
13+
DROP COLUMN idempotency_key_user;
14+
15+
ALTER TABLE trigger_dev.task_runs_v2
16+
DROP COLUMN idempotency_key_scope;

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ export const TaskRunV2 = z.object({
4040
span_id: z.string(),
4141
trace_id: z.string(),
4242
idempotency_key: z.string(),
43+
idempotency_key_user: z.string().default(""),
44+
idempotency_key_scope: z.string().default(""),
4345
expiration_ttl: z.string(),
4446
is_test: z.boolean().default(false),
4547
concurrency_key: z.string().default(""),
@@ -91,6 +93,8 @@ export const TASK_RUN_COLUMNS = [
9193
"span_id",
9294
"trace_id",
9395
"idempotency_key",
96+
"idempotency_key_user",
97+
"idempotency_key_scope",
9498
"expiration_ttl",
9599
"is_test",
96100
"_version",
@@ -151,6 +155,8 @@ export type TaskRunFieldTypes = {
151155
span_id: string;
152156
trace_id: string;
153157
idempotency_key: string;
158+
idempotency_key_user: string;
159+
idempotency_key_scope: string;
154160
expiration_ttl: string;
155161
is_test: boolean;
156162
_version: string;
@@ -282,6 +288,8 @@ export type TaskRunInsertArray = [
282288
span_id: string,
283289
trace_id: string,
284290
idempotency_key: string,
291+
idempotency_key_user: string,
292+
idempotency_key_scope: string,
285293
expiration_ttl: string,
286294
is_test: boolean,
287295
_version: string,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRun" ADD COLUMN "idempotencyKeyOptions" JSONB;

internal-packages/database/prisma/schema.prisma

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,8 @@ model TaskRun {
589589
590590
idempotencyKey String?
591591
idempotencyKeyExpiresAt DateTime?
592+
/// Stores the user-provided key and scope: { key: string, scope: "run" | "attempt" | "global" }
593+
idempotencyKeyOptions Json?
592594
593595
/// Debounce options: { key: string, delay: string, createdAt: Date }
594596
debounce Json?

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ export class RunEngine {
395395
environment,
396396
idempotencyKey,
397397
idempotencyKeyExpiresAt,
398+
idempotencyKeyOptions,
398399
taskIdentifier,
399400
payload,
400401
payloadType,
@@ -544,6 +545,7 @@ export class RunEngine {
544545
projectId: environment.project.id,
545546
idempotencyKey,
546547
idempotencyKeyExpiresAt,
548+
idempotencyKeyOptions,
547549
taskIdentifier,
548550
payload,
549551
payloadType,

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ export class RunAttemptSystem {
194194
runTags: true,
195195
isTest: true,
196196
idempotencyKey: true,
197+
idempotencyKeyOptions: true,
197198
startedAt: true,
198199
maxAttempts: true,
199200
taskVersion: true,
@@ -261,7 +262,8 @@ export class RunAttemptSystem {
261262
isTest: run.isTest,
262263
createdAt: run.createdAt,
263264
startedAt: run.startedAt ?? run.createdAt,
264-
idempotencyKey: run.idempotencyKey ?? undefined,
265+
idempotencyKey: this.#getUserProvidedIdempotencyKey(run) ?? undefined,
266+
idempotencyKeyScope: this.#getIdempotencyKeyScope(run),
265267
maxAttempts: run.maxAttempts ?? undefined,
266268
version: run.taskVersion ?? "unknown",
267269
maxDuration: run.maxDurationInSeconds ?? undefined,
@@ -422,6 +424,7 @@ export class RunAttemptSystem {
422424
runTags: true,
423425
isTest: true,
424426
idempotencyKey: true,
427+
idempotencyKeyOptions: true,
425428
startedAt: true,
426429
maxAttempts: true,
427430
taskVersion: true,
@@ -570,7 +573,8 @@ export class RunAttemptSystem {
570573
createdAt: updatedRun.createdAt,
571574
tags: updatedRun.runTags,
572575
isTest: updatedRun.isTest,
573-
idempotencyKey: updatedRun.idempotencyKey ?? undefined,
576+
idempotencyKey: this.#getUserProvidedIdempotencyKey(updatedRun) ?? undefined,
577+
idempotencyKeyScope: this.#getIdempotencyKeyScope(updatedRun),
574578
startedAt: updatedRun.startedAt ?? updatedRun.createdAt,
575579
maxAttempts: updatedRun.maxAttempts ?? undefined,
576580
version: updatedRun.taskVersion ?? "unknown",
@@ -1914,6 +1918,25 @@ export class RunAttemptSystem {
19141918
stackTrace: truncateString(error.stackTrace, 1024 * 16), // 16kb
19151919
};
19161920
}
1921+
1922+
#getUserProvidedIdempotencyKey(
1923+
run: { idempotencyKey: string | null; idempotencyKeyOptions: unknown }
1924+
): string | null {
1925+
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
1926+
// Return user-provided key if available, otherwise fall back to the hash
1927+
return options?.key ?? run.idempotencyKey;
1928+
}
1929+
1930+
#getIdempotencyKeyScope(
1931+
run: { idempotencyKeyOptions: unknown }
1932+
): "run" | "attempt" | "global" | undefined {
1933+
const options = run.idempotencyKeyOptions as { key?: string; scope?: string } | null;
1934+
const scope = options?.scope;
1935+
if (scope === "run" || scope === "attempt" || scope === "global") {
1936+
return scope;
1937+
}
1938+
return undefined;
1939+
}
19171940
}
19181941

19191942
export function safeParseGitMeta(git: unknown): GitMeta | undefined {

0 commit comments

Comments
 (0)