Skip to content

Commit a178c8d

Browse files
committed
also use the readOnlyPrisma client when reading in the retrying logic
1 parent 0cb23c8 commit a178c8d

File tree

2 files changed

+301
-1
lines changed

2 files changed

+301
-1
lines changed

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ export class RunAttemptSystem {
899899

900900
const failedAt = new Date();
901901

902-
const retryResult = await retryOutcomeFromCompletion(prisma, {
902+
const retryResult = await retryOutcomeFromCompletion(this.$.readOnlyPrisma, {
903903
runId,
904904
error: completion.error,
905905
retryUsingQueue: forceRequeue ?? false,
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
#!/usr/bin/env npx tsx
2+
3+
/**
4+
* Script to clean orphaned concurrency entries from Redis.
5+
*
6+
* These are run IDs that exist in currentConcurrency/currentDequeued sets
7+
* but the corresponding TaskRun is not in DEQUEUED or EXECUTING status.
8+
*
9+
* Usage:
10+
* npx tsx scripts/clean-orphaned-concurrency.ts \
11+
* --org <orgId> \
12+
* --project <projectId> \
13+
* --env <envId> \
14+
* --read-redis <redisUrl> \
15+
* --write-redis <redisUrl> \
16+
* --pg <postgresUrl> \
17+
* [--dry-run] \
18+
* [--include-env] \
19+
* [--queues queue1,queue2]
20+
*
21+
* Options:
22+
* --include-env: Also clean the environment-level concurrency sets
23+
* (not just queue-level sets)
24+
*/
25+
26+
import { parseArgs } from "node:util";
27+
import Redis from "ioredis";
28+
import pg from "pg";
29+
30+
// Statuses where a run SHOULD be in the concurrency set
31+
// WAITING_TO_RESUME is included because suspended runs may legitimately be in the set
32+
const VALID_CONCURRENCY_STATUSES = ["DEQUEUED", "EXECUTING", "WAITING_TO_RESUME"];
33+
34+
interface OrphanedEntry {
35+
key: string;
36+
runId: string;
37+
dbStatus: string | null;
38+
keyType: "currentConcurrency" | "currentDequeued";
39+
queueName: string;
40+
}
41+
42+
async function main() {
43+
const { values } = parseArgs({
44+
options: {
45+
org: { type: "string" },
46+
project: { type: "string" },
47+
env: { type: "string" },
48+
"read-redis": { type: "string" },
49+
"write-redis": { type: "string" },
50+
pg: { type: "string" },
51+
"dry-run": { type: "boolean", default: false },
52+
"include-env": { type: "boolean", default: false }, // Also clean env-level concurrency sets
53+
queues: { type: "string" }, // Comma-separated queue names to skip scan
54+
},
55+
allowPositionals: true, // Ignore extra positional args from shell escaping
56+
});
57+
58+
const orgId = values.org;
59+
const projectId = values.project;
60+
const envId = values.env;
61+
const readRedisUrl = values["read-redis"];
62+
const writeRedisUrl = values["write-redis"];
63+
const pgUrl = values.pg;
64+
const dryRun = values["dry-run"] ?? false;
65+
const includeEnv = values["include-env"] ?? false;
66+
const queueNames = values.queues?.split(",").map((q) => q.trim()) ?? [];
67+
68+
if (!orgId || !projectId || !envId || !readRedisUrl || !writeRedisUrl || !pgUrl) {
69+
console.error("Missing required arguments");
70+
console.error(
71+
"Usage: npx tsx scripts/clean-orphaned-concurrency.ts --org <orgId> --project <projectId> --env <envId> --read-redis <url> --write-redis <url> --pg <url> [--dry-run] [--queues queue1,queue2]"
72+
);
73+
process.exit(1);
74+
}
75+
76+
console.log(`Mode: ${dryRun ? "DRY RUN" : "LIVE"}`);
77+
console.log(`Org: ${orgId}`);
78+
console.log(`Project: ${projectId}`);
79+
console.log(`Environment: ${envId}`);
80+
console.log("");
81+
82+
// Connect to Redis (read)
83+
const readRedis = new Redis(readRedisUrl, {
84+
lazyConnect: true,
85+
tls: { rejectUnauthorized: false },
86+
});
87+
await readRedis.connect();
88+
console.log("Connected to read Redis");
89+
90+
// Connect to Redis (write) - only if not dry run
91+
let writeRedis: Redis | null = null;
92+
if (!dryRun) {
93+
writeRedis = new Redis(writeRedisUrl, {
94+
lazyConnect: true,
95+
tls: { rejectUnauthorized: false },
96+
});
97+
await writeRedis.connect();
98+
console.log("Connected to write Redis");
99+
}
100+
101+
// Connect to PostgreSQL
102+
const pgClient = new pg.Client({ connectionString: pgUrl });
103+
await pgClient.connect();
104+
console.log("Connected to PostgreSQL");
105+
console.log("");
106+
107+
try {
108+
let keys: string[] = [];
109+
const envKeyPrefix = `engine:runqueue:{org:${orgId}}:proj:${projectId}:env:${envId}`;
110+
const queueKeyPrefix = `${envKeyPrefix}:queue:`;
111+
112+
if (queueNames.length > 0) {
113+
// Construct keys directly from provided queue names
114+
console.log(`Using provided queue names: ${queueNames.join(", ")}`);
115+
for (const queueName of queueNames) {
116+
keys.push(`${queueKeyPrefix}${queueName}:currentConcurrency`);
117+
keys.push(`${queueKeyPrefix}${queueName}:currentDequeued`);
118+
}
119+
console.log(`Constructed ${keys.length} queue-level keys to check`);
120+
}
121+
122+
if (includeEnv) {
123+
// Add environment-level concurrency keys
124+
keys.push(`${envKeyPrefix}:currentConcurrency`);
125+
keys.push(`${envKeyPrefix}:currentDequeued`);
126+
console.log(`Also checking environment-level concurrency keys`);
127+
}
128+
129+
if (keys.length === 0) {
130+
console.error("ERROR: --queues flag or --include-env flag is required.");
131+
console.error("");
132+
console.error("To find queue names, run:");
133+
console.error(` redis-cli --tls -u "<read-redis-url>" KEYS 'engine:runqueue:{org:${orgId}}:proj:${projectId}:env:${envId}:queue:*:currentConcurrency'`);
134+
console.error("");
135+
console.error("Then extract the queue names and pass them with --queues 'queue1,queue2,...'");
136+
console.error("");
137+
console.error("Or use --include-env to just clean the environment-level concurrency sets.");
138+
process.exit(1);
139+
}
140+
console.log("");
141+
142+
// Filter to only currentConcurrency and currentDequeued keys
143+
const concurrencyKeys = keys.filter(
144+
(k) => k.endsWith(":currentConcurrency") || k.endsWith(":currentDequeued")
145+
);
146+
147+
console.log(`Processing ${concurrencyKeys.length} concurrency/dequeued keys`);
148+
console.log("");
149+
150+
// Collect all run IDs from all keys
151+
const runIdToKeys = new Map<string, string[]>();
152+
153+
for (const key of concurrencyKeys) {
154+
const members = await readRedis.smembers(key);
155+
for (const runId of members) {
156+
const existing = runIdToKeys.get(runId) || [];
157+
existing.push(key);
158+
runIdToKeys.set(runId, existing);
159+
}
160+
}
161+
162+
const allRunIds = Array.from(runIdToKeys.keys());
163+
console.log(`Found ${allRunIds.length} unique run IDs across all concurrency sets`);
164+
console.log("");
165+
166+
if (allRunIds.length === 0) {
167+
console.log("No run IDs found in concurrency sets. Nothing to clean.");
168+
return;
169+
}
170+
171+
// Query database for run statuses in batches
172+
const batchSize = 500;
173+
const runStatuses = new Map<string, string | null>();
174+
175+
for (let i = 0; i < allRunIds.length; i += batchSize) {
176+
const batch = allRunIds.slice(i, i + batchSize);
177+
const placeholders = batch.map((_, idx) => `$${idx + 1}`).join(", ");
178+
const query = `SELECT id, status FROM "TaskRun" WHERE id IN (${placeholders})`;
179+
180+
const result = await pgClient.query(query, batch);
181+
182+
for (const row of result.rows) {
183+
runStatuses.set(row.id, row.status);
184+
}
185+
186+
// Mark missing runs as null
187+
for (const runId of batch) {
188+
if (!runStatuses.has(runId)) {
189+
runStatuses.set(runId, null);
190+
}
191+
}
192+
}
193+
194+
console.log(`Retrieved statuses for ${runStatuses.size} runs`);
195+
console.log("");
196+
197+
// Find orphaned entries
198+
const orphanedEntries: OrphanedEntry[] = [];
199+
200+
for (const [runId, keys] of runIdToKeys) {
201+
const status = runStatuses.get(runId);
202+
203+
if (status === null || !VALID_CONCURRENCY_STATUSES.includes(status)) {
204+
for (const key of keys) {
205+
const keyType = key.endsWith(":currentConcurrency")
206+
? "currentConcurrency"
207+
: "currentDequeued";
208+
const queueMatch = key.match(/:queue:([^:]+):/);
209+
// Use "[ENV]" for environment-level keys (no :queue: segment)
210+
const queueName = queueMatch ? queueMatch[1] : "[ENV]";
211+
212+
orphanedEntries.push({
213+
key,
214+
runId,
215+
dbStatus: status,
216+
keyType,
217+
queueName,
218+
});
219+
}
220+
}
221+
}
222+
223+
if (orphanedEntries.length === 0) {
224+
console.log("No orphaned entries found. All concurrency sets are clean.");
225+
return;
226+
}
227+
228+
// Group by queue for reporting
229+
const byQueue = new Map<string, OrphanedEntry[]>();
230+
for (const entry of orphanedEntries) {
231+
const existing = byQueue.get(entry.queueName) || [];
232+
existing.push(entry);
233+
byQueue.set(entry.queueName, existing);
234+
}
235+
236+
console.log(`Found ${orphanedEntries.length} orphaned entries across ${byQueue.size} queues:`);
237+
console.log("");
238+
239+
for (const [queueName, entries] of byQueue) {
240+
console.log(`Queue: ${queueName}`);
241+
242+
const concurrencyEntries = entries.filter((e) => e.keyType === "currentConcurrency");
243+
const dequeuedEntries = entries.filter((e) => e.keyType === "currentDequeued");
244+
245+
if (concurrencyEntries.length > 0) {
246+
console.log(` currentConcurrency (${concurrencyEntries.length}):`);
247+
for (const entry of concurrencyEntries) {
248+
console.log(` - ${entry.runId} (DB status: ${entry.dbStatus ?? "NOT FOUND"})`);
249+
}
250+
}
251+
252+
if (dequeuedEntries.length > 0) {
253+
console.log(` currentDequeued (${dequeuedEntries.length}):`);
254+
for (const entry of dequeuedEntries) {
255+
console.log(` - ${entry.runId} (DB status: ${entry.dbStatus ?? "NOT FOUND"})`);
256+
}
257+
}
258+
259+
console.log("");
260+
}
261+
262+
if (dryRun) {
263+
console.log("DRY RUN: No changes made. Run without --dry-run to clean these entries.");
264+
return;
265+
}
266+
267+
// Clean orphaned entries
268+
console.log("Cleaning orphaned entries...");
269+
console.log("");
270+
271+
// Group by key for efficient SREM
272+
const removalsByKey = new Map<string, string[]>();
273+
for (const entry of orphanedEntries) {
274+
const existing = removalsByKey.get(entry.key) || [];
275+
existing.push(entry.runId);
276+
removalsByKey.set(entry.key, existing);
277+
}
278+
279+
let totalRemoved = 0;
280+
for (const [key, runIds] of removalsByKey) {
281+
const removed = await writeRedis!.srem(key, runIds);
282+
console.log(`Removed ${removed} entries from ${key}`);
283+
totalRemoved += removed;
284+
}
285+
286+
console.log("");
287+
console.log(`Total removed: ${totalRemoved} entries`);
288+
} finally {
289+
await readRedis.quit();
290+
if (writeRedis) {
291+
await writeRedis.quit();
292+
}
293+
await pgClient.end();
294+
}
295+
}
296+
297+
main().catch((err) => {
298+
console.error("Fatal error:", err);
299+
process.exit(1);
300+
});

0 commit comments

Comments
 (0)