Skip to content

Commit fa1fa02

Browse files
committed
Switch run repository using a feature flag
1 parent 8076a44 commit fa1fa02

File tree

3 files changed

+391
-76
lines changed

3 files changed

+391
-76
lines changed

apps/webapp/app/services/runsRepository/clickhouseRunsRepository.server.ts

Lines changed: 3 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
type IRunsRepository,
66
type ListRunsOptions,
77
type RunsRepositoryOptions,
8+
convertRunListInputOptionsToFilterRunsOptions,
89
} from "./runsRepository.server";
910
import parseDuration from "parse-duration";
1011
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
@@ -17,7 +18,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
1718
const queryBuilder = this.options.clickhouse.taskRuns.queryBuilder();
1819
applyRunFiltersToQueryBuilder(
1920
queryBuilder,
20-
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
21+
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
2122
);
2223

2324
if (options.page.cursor) {
@@ -144,7 +145,7 @@ export class ClickHouseRunsRepository implements IRunsRepository {
144145
const queryBuilder = this.options.clickhouse.taskRuns.countQueryBuilder();
145146
applyRunFiltersToQueryBuilder(
146147
queryBuilder,
147-
await this.#convertRunListInputOptionsToFilterRunsOptions(options)
148+
await convertRunListInputOptionsToFilterRunsOptions(options, this.options.prisma)
148149
);
149150

150151
const [queryError, result] = await queryBuilder.execute();
@@ -159,73 +160,6 @@ export class ClickHouseRunsRepository implements IRunsRepository {
159160

160161
return result[0].count;
161162
}
162-
163-
async #convertRunListInputOptionsToFilterRunsOptions(
164-
options: RunListInputOptions
165-
): Promise<FilterRunsOptions> {
166-
const convertedOptions: FilterRunsOptions = {
167-
...options,
168-
period: undefined,
169-
};
170-
171-
// Convert time period to ms
172-
const time = timeFilters({
173-
period: options.period,
174-
from: options.from,
175-
to: options.to,
176-
});
177-
convertedOptions.period = time.period ? parseDuration(time.period) ?? undefined : undefined;
178-
179-
// batch friendlyId to id
180-
if (options.batchId && options.batchId.startsWith("batch_")) {
181-
const batch = await this.options.prisma.batchTaskRun.findFirst({
182-
select: {
183-
id: true,
184-
},
185-
where: {
186-
friendlyId: options.batchId,
187-
runtimeEnvironmentId: options.environmentId,
188-
},
189-
});
190-
191-
if (batch) {
192-
convertedOptions.batchId = batch.id;
193-
}
194-
}
195-
196-
// scheduleId can be a friendlyId
197-
if (options.scheduleId && options.scheduleId.startsWith("sched_")) {
198-
const schedule = await this.options.prisma.taskSchedule.findFirst({
199-
select: {
200-
id: true,
201-
},
202-
where: {
203-
friendlyId: options.scheduleId,
204-
projectId: options.projectId,
205-
},
206-
});
207-
208-
if (schedule) {
209-
convertedOptions.scheduleId = schedule?.id;
210-
}
211-
}
212-
213-
if (options.bulkId && options.bulkId.startsWith("bulk_")) {
214-
convertedOptions.bulkId = BulkActionId.toId(options.bulkId);
215-
}
216-
217-
if (options.runId) {
218-
//convert to friendlyId
219-
convertedOptions.runId = options.runId.map((r) => RunId.toFriendlyId(r));
220-
}
221-
222-
// Show all runs if we are filtering by batchId or runId
223-
if (options.batchId || options.runId?.length || options.scheduleId || options.tasks?.length) {
224-
convertedOptions.rootOnly = false;
225-
}
226-
227-
return convertedOptions;
228-
}
229163
}
230164

231165
function applyRunFiltersToQueryBuilder<T>(
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
import { Prisma, type TaskRunStatus } from "@trigger.dev/database";
2+
import parseDuration from "parse-duration";
3+
import { MachinePresetName } from "@trigger.dev/core/v3";
4+
import { BulkActionId, RunId } from "@trigger.dev/core/v3/isomorphic";
5+
import { timeFilters } from "~/components/runs/v3/SharedFilters";
6+
import { sqlDatabaseSchema } from "~/db.server";
7+
import {
8+
type FilterRunsOptions,
9+
type RunListInputOptions,
10+
type IRunsRepository,
11+
type ListRunsOptions,
12+
type RunsRepositoryOptions,
13+
type ListedRun,
14+
convertRunListInputOptionsToFilterRunsOptions,
15+
} from "./runsRepository.server";
16+
17+
export class PostgresRunsRepository implements IRunsRepository {
18+
constructor(private readonly options: RunsRepositoryOptions) {}
19+
20+
async listRunIds(options: ListRunsOptions) {
21+
const filterOptions = await convertRunListInputOptionsToFilterRunsOptions(
22+
options,
23+
this.options.prisma
24+
);
25+
26+
const query = this.#buildRunIdsQuery(filterOptions, options.page);
27+
const runs = await this.options.prisma.$queryRaw<{ id: string }[]>(query);
28+
29+
return runs.map((run) => run.id);
30+
}
31+
32+
async listRuns(options: ListRunsOptions) {
33+
const filterOptions = await convertRunListInputOptionsToFilterRunsOptions(
34+
options,
35+
this.options.prisma
36+
);
37+
38+
const query = this.#buildRunsQuery(filterOptions, options.page);
39+
const runs = await this.options.prisma.$queryRaw<ListedRun[]>(query);
40+
41+
// If there are more runs than the page size, we need to fetch the next page
42+
const hasMore = runs.length > options.page.size;
43+
44+
let nextCursor: string | null = null;
45+
let previousCursor: string | null = null;
46+
47+
// Get cursors for next and previous pages
48+
const direction = options.page.direction ?? "forward";
49+
switch (direction) {
50+
case "forward": {
51+
previousCursor = options.page.cursor ? runs.at(0)?.id ?? null : null;
52+
if (hasMore) {
53+
// The next cursor should be the last run ID from this page
54+
nextCursor = runs[options.page.size - 1]?.id ?? null;
55+
}
56+
break;
57+
}
58+
case "backward": {
59+
const reversedRuns = [...runs].reverse();
60+
if (hasMore) {
61+
previousCursor = reversedRuns.at(1)?.id ?? null;
62+
nextCursor = reversedRuns.at(options.page.size)?.id ?? null;
63+
} else {
64+
nextCursor = reversedRuns.at(options.page.size - 1)?.id ?? null;
65+
}
66+
break;
67+
}
68+
}
69+
70+
const runsToReturn =
71+
options.page.direction === "backward" && hasMore
72+
? runs.slice(1, options.page.size + 1)
73+
: runs.slice(0, options.page.size);
74+
75+
// ClickHouse is slightly delayed, so we're going to do in-memory status filtering too
76+
let filteredRuns = runsToReturn;
77+
if (options.statuses && options.statuses.length > 0) {
78+
filteredRuns = runsToReturn.filter((run) => options.statuses!.includes(run.status));
79+
}
80+
81+
return {
82+
runs: filteredRuns,
83+
pagination: {
84+
nextCursor,
85+
previousCursor,
86+
},
87+
};
88+
}
89+
90+
async countRuns(options: RunListInputOptions) {
91+
const filterOptions = await convertRunListInputOptionsToFilterRunsOptions(
92+
options,
93+
this.options.prisma
94+
);
95+
96+
const query = this.#buildCountQuery(filterOptions);
97+
const result = await this.options.prisma.$queryRaw<{ count: bigint }[]>(query);
98+
99+
if (result.length === 0) {
100+
throw new Error("No count rows returned");
101+
}
102+
103+
return Number(result[0].count);
104+
}
105+
106+
#buildRunIdsQuery(
107+
filterOptions: FilterRunsOptions,
108+
page: { size: number; cursor?: string; direction?: "forward" | "backward" }
109+
) {
110+
const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction);
111+
112+
return Prisma.sql`
113+
SELECT tr.id
114+
FROM ${sqlDatabaseSchema}."TaskRun" tr
115+
WHERE ${whereConditions}
116+
ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`}
117+
LIMIT ${page.size + 1}
118+
`;
119+
}
120+
121+
#buildRunsQuery(
122+
filterOptions: FilterRunsOptions,
123+
page: { size: number; cursor?: string; direction?: "forward" | "backward" }
124+
) {
125+
const whereConditions = this.#buildWhereConditions(filterOptions, page.cursor, page.direction);
126+
127+
return Prisma.sql`
128+
SELECT
129+
tr.id,
130+
tr."friendlyId",
131+
tr."taskIdentifier",
132+
tr."taskVersion",
133+
tr."runtimeEnvironmentId",
134+
tr.status,
135+
tr."createdAt",
136+
tr."startedAt",
137+
tr."lockedAt",
138+
tr."delayUntil",
139+
tr."updatedAt",
140+
tr."completedAt",
141+
tr."isTest",
142+
tr."spanId",
143+
tr."idempotencyKey",
144+
tr."ttl",
145+
tr."expiredAt",
146+
tr."costInCents",
147+
tr."baseCostInCents",
148+
tr."usageDurationMs",
149+
tr."runTags",
150+
tr."depth",
151+
tr."rootTaskRunId",
152+
tr."batchId",
153+
tr."metadata",
154+
tr."metadataType",
155+
tr."machinePreset",
156+
tr."queue"
157+
FROM ${sqlDatabaseSchema}."TaskRun" tr
158+
WHERE ${whereConditions}
159+
ORDER BY ${page.direction === "backward" ? Prisma.sql`tr.id ASC` : Prisma.sql`tr.id DESC`}
160+
LIMIT ${page.size + 1}
161+
`;
162+
}
163+
164+
#buildCountQuery(filterOptions: FilterRunsOptions) {
165+
const whereConditions = this.#buildWhereConditions(filterOptions);
166+
167+
return Prisma.sql`
168+
SELECT COUNT(*) as count
169+
FROM ${sqlDatabaseSchema}."TaskRun" tr
170+
WHERE ${whereConditions}
171+
`;
172+
}
173+
174+
#buildWhereConditions(
175+
filterOptions: FilterRunsOptions,
176+
cursor?: string,
177+
direction?: "forward" | "backward"
178+
) {
179+
const conditions: Prisma.Sql[] = [];
180+
181+
// Environment filter
182+
conditions.push(Prisma.sql`tr."runtimeEnvironmentId" = ${filterOptions.environmentId}`);
183+
184+
// Cursor pagination
185+
if (cursor) {
186+
if (direction === "forward" || !direction) {
187+
conditions.push(Prisma.sql`tr.id < ${cursor}`);
188+
} else {
189+
conditions.push(Prisma.sql`tr.id > ${cursor}`);
190+
}
191+
}
192+
193+
// Task filters
194+
if (filterOptions.tasks && filterOptions.tasks.length > 0) {
195+
conditions.push(Prisma.sql`tr."taskIdentifier" IN (${Prisma.join(filterOptions.tasks)})`);
196+
}
197+
198+
// Version filters
199+
if (filterOptions.versions && filterOptions.versions.length > 0) {
200+
conditions.push(Prisma.sql`tr."taskVersion" IN (${Prisma.join(filterOptions.versions)})`);
201+
}
202+
203+
// Status filters
204+
if (filterOptions.statuses && filterOptions.statuses.length > 0) {
205+
conditions.push(
206+
Prisma.sql`tr.status = ANY(ARRAY[${Prisma.join(
207+
filterOptions.statuses
208+
)}]::"TaskRunStatus"[])`
209+
);
210+
}
211+
212+
// Tag filters
213+
if (filterOptions.tags && filterOptions.tags.length > 0) {
214+
conditions.push(
215+
Prisma.sql`tr."runTags" && ARRAY[${Prisma.join(filterOptions.tags)}]::text[]`
216+
);
217+
}
218+
219+
// Schedule filter
220+
if (filterOptions.scheduleId) {
221+
conditions.push(Prisma.sql`tr."scheduleId" = ${filterOptions.scheduleId}`);
222+
}
223+
224+
// Time period filter
225+
if (filterOptions.period) {
226+
conditions.push(
227+
Prisma.sql`tr."createdAt" >= NOW() - INTERVAL '1 millisecond' * ${filterOptions.period}`
228+
);
229+
}
230+
231+
// From date filter
232+
if (filterOptions.from) {
233+
conditions.push(
234+
Prisma.sql`tr."createdAt" >= ${new Date(filterOptions.from).toISOString()}::timestamp`
235+
);
236+
}
237+
238+
// To date filter
239+
if (filterOptions.to) {
240+
const toDate = new Date(filterOptions.to);
241+
const now = new Date();
242+
const clampedDate = toDate > now ? now : toDate;
243+
conditions.push(Prisma.sql`tr."createdAt" <= ${clampedDate.toISOString()}::timestamp`);
244+
}
245+
246+
// Test filter
247+
if (typeof filterOptions.isTest === "boolean") {
248+
conditions.push(Prisma.sql`tr."isTest" = ${filterOptions.isTest}`);
249+
}
250+
251+
// Root only filter
252+
if (filterOptions.rootOnly) {
253+
conditions.push(Prisma.sql`tr."rootTaskRunId" IS NULL`);
254+
}
255+
256+
// Batch filter
257+
if (filterOptions.batchId) {
258+
conditions.push(Prisma.sql`tr."batchId" = ${filterOptions.batchId}`);
259+
}
260+
261+
// Bulk action filter
262+
if (filterOptions.bulkId) {
263+
conditions.push(
264+
Prisma.sql`tr."bulkActionGroupIds" && ARRAY[${filterOptions.bulkId}]::text[]`
265+
);
266+
}
267+
268+
// Run ID filter
269+
if (filterOptions.runId && filterOptions.runId.length > 0) {
270+
const friendlyIds = filterOptions.runId.map((runId) => RunId.toFriendlyId(runId));
271+
conditions.push(Prisma.sql`tr."friendlyId" IN (${Prisma.join(friendlyIds)})`);
272+
}
273+
274+
// Queue filter
275+
if (filterOptions.queues && filterOptions.queues.length > 0) {
276+
conditions.push(Prisma.sql`tr."queue" IN (${Prisma.join(filterOptions.queues)})`);
277+
}
278+
279+
// Machine preset filter
280+
if (filterOptions.machines && filterOptions.machines.length > 0) {
281+
conditions.push(Prisma.sql`tr."machinePreset" IN (${Prisma.join(filterOptions.machines)})`);
282+
}
283+
284+
// Combine all conditions with AND
285+
return conditions.reduce((acc, condition) =>
286+
acc === null ? condition : Prisma.sql`${acc} AND ${condition}`
287+
);
288+
}
289+
}

0 commit comments

Comments
 (0)