Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/uncaught-exception-fail-attempt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fail attempts on uncaught exceptions instead of hanging to `MAX_DURATION_EXCEEDED`. A Node `EventEmitter` (e.g. `node-redis`) emitting `"error"` with no `.on("error", ...)` listener escalates to `uncaughtException`, which the worker previously reported but did not act on — runs drifted to maxDuration with empty attempts. They now fail fast with the original error and status `FAILED`, and respect the task's normal retry policy. You should still attach `.on("error", ...)` listeners to long-lived clients to handle errors gracefully.
12 changes: 12 additions & 0 deletions .server-changes/uncaught-exception-status-mapping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
area: run-engine
type: fix
---

Map the new `TASK_RUN_UNCAUGHT_EXCEPTION` internal-error code to
`COMPLETED_WITH_ERRORS` (Failed) status in `runStatusFromError`. cli-v3
now emits this code when the worker process surfaces an uncaught
exception (e.g. a Node EventEmitter emitting `"error"` with no listener),
so the run renders as a regular task failure in the dashboard rather
than a system failure, while still routing through the engine's
`lockedRetryConfig` lookup so the user's retry policy is honoured.
49 changes: 49 additions & 0 deletions docs/troubleshooting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,55 @@ You could also offload the CPU-heavy work to a Node.js worker thread, but this i

If the above doesn't work, then we recommend you try increasing the machine size of your task. See our [machines guide](/machines) for more information.

### Uncaught exceptions

If you see a `TASK_RUN_UNCAUGHT_EXCEPTION` error, an exception escaped your task's `run()` function without being thrown through your `await` chain — the runtime caught it via Node's `process.on("uncaughtException")` handler. The dashboard surfaces this as a regular task failure (status `Failed`) and the run will retry according to your task's retry policy, but the exception still indicates a bug worth fixing.

The most common cause is a Node `EventEmitter` emitting an `"error"` event with no listener attached. When this happens, Node escalates the event into an `uncaughtException`. Long-lived clients like `node-redis`, `pg`, `kafkajs`, and `mongodb` all surface socket-level errors this way.

For example, a `node-redis` client with no error listener will fail your run with an `Error: read ECONNRESET` (or similar TCP error) the next time the socket is reset:

```ts
import { task } from "@trigger.dev/sdk";
import { createClient } from "redis";

export const myTask = task({
id: "my-task",
run: async () => {
const client = createClient({ url: process.env.REDIS_URL });

// BAD: no .on("error", ...) listener — a socket reset will crash the run
// with an uncaught exception, even if the next .get() would have worked.
await client.connect();
return await client.get("foo");
},
});
```

Fix it by attaching an `error` listener so the event has somewhere to go:

```ts
const client = createClient({ url: process.env.REDIS_URL });

// GOOD: the listener catches socket-level errors. The awaited command
// (e.g. .get) will still reject if the connection is broken, and that
// rejection propagates through your run() and fails the attempt cleanly.
client.on("error", (err) => {
logger.warn("Redis client error", { err });
});

await client.connect();
return await client.get("foo");
```

The same fix applies to any library that emits `"error"` events. As a rule, attach an `.on("error", ...)` listener to every long-lived client you create inside a task.

<Note>

Unhandled promise rejections (e.g. `Promise.reject(...)` with no `.catch`) take the same path — Node routes them through `uncaughtException` by default, and the runtime treats them as `TASK_RUN_UNCAUGHT_EXCEPTION` for the same reasons. Make sure every promise either gets `await`ed or has a `.catch(...)` handler.

</Note>

### Realtime stream error (`sendBatchNonBlocking` / `S2AppendSession`)

Errors mentioning `sendBatchNonBlocking`, `@s2-dev/streamstore`, or `S2AppendSession` (often with `code: undefined`) can occur when you close a stream and then await `waitUntilComplete()`, or when a stream runs for a long time (e.g. 20+ minutes). Wrap `waitUntilComplete()` in try/catch so Transport/closed-stream errors don't fail your task:
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function runStatusFromError(
case "TASK_INPUT_ERROR":
case "TASK_OUTPUT_ERROR":
case "TASK_MIDDLEWARE_ERROR":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
return "COMPLETED_WITH_ERRORS";
case "TASK_RUN_CANCELLED":
return "CANCELED";
Expand Down
36 changes: 35 additions & 1 deletion packages/cli-v3/src/executions/taskRunProcess.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TaskRunProcess, type TaskRunProcessOptions } from "./taskRunProcess.js";
import { describe, it, expect, vi } from "vitest";
import { UnexpectedExitError } from "@trigger.dev/core/v3/errors";
import { UncaughtExceptionError, UnexpectedExitError } from "@trigger.dev/core/v3/errors";
import type {
TaskRunExecution,
TaskRunExecutionPayload,
Expand Down Expand Up @@ -118,4 +118,38 @@ describe("TaskRunProcess", () => {
}
});
});

describe("parseExecuteError(UncaughtExceptionError)", () => {
it("returns INTERNAL_ERROR with TASK_RUN_UNCAUGHT_EXCEPTION + original message and stack", () => {
const error = new UncaughtExceptionError(
{
name: "Error",
message: "read ECONNRESET",
stack:
"Error: read ECONNRESET\n at TCP.onStreamRead (node:internal/stream_base_commons:216:20)",
},
"uncaughtException"
);

const result = TaskRunProcess.parseExecuteError(error);

expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("read ECONNRESET");
expect(result.stackTrace).toContain("TCP.onStreamRead");
});

it("uses the same code for unhandledRejection origin", () => {
const error = new UncaughtExceptionError(
{ name: "TypeError", message: "boom" },
"unhandledRejection"
);

const result = TaskRunProcess.parseExecuteError(error);

expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("boom");
});
});
});
45 changes: 45 additions & 0 deletions packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
MaxDurationExceededError,
UnexpectedExitError,
SuspendedProcessError,
UncaughtExceptionError,
} from "@trigger.dev/core/v3/errors";

export type OnSendDebugLogMessage = InferSocketMessageSchema<
Expand Down Expand Up @@ -205,6 +206,18 @@ export class TaskRunProcess {
},
UNCAUGHT_EXCEPTION: async (message) => {
logger.debug("uncaught exception in task run process", { ...message });

// The worker process reports uncaught exceptions and unhandled rejections via this
// event, but does not exit on its own. If we don't terminate the attempt here, run()
// hangs (the awaited promise that triggered the throw is orphaned) until maxDuration
// expires — surfacing as TIMED_OUT/MAX_DURATION_EXCEEDED with empty attempts. Reject
// any pending attempts now and gracefully terminate the worker so OTEL gets a flush
// window before SIGKILL.
this.#rejectPendingAttempts(
new UncaughtExceptionError(message.error, message.origin)
);

await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
Comment thread
matt-aitken marked this conversation as resolved.
},
SEND_DEBUG_LOG: async (message) => {
this.onSendDebugLog.post(message);
Expand Down Expand Up @@ -339,6 +352,23 @@ export class TaskRunProcess {
logger.debug("child process error", { error, pid: this.pid });
}

#rejectPendingAttempts(error: Error) {
for (const [id, status] of this._attemptStatuses.entries()) {
if (status !== "PENDING") {
continue;
}

this._attemptStatuses.set(id, "REJECTED");

const attemptPromise = this._attemptPromises.get(id);
if (!attemptPromise) {
continue;
}

attemptPromise.rejecter(error);
}
}

async #handleExit(code: number | null, signal: NodeJS.Signals | null) {
logger.debug("handling child exit", { code, signal, pid: this.pid });

Expand Down Expand Up @@ -559,6 +589,21 @@ export class TaskRunProcess {
};
}

if (error instanceof UncaughtExceptionError) {
// Dedicated INTERNAL_ERROR code so the engine handles retry via the
// existing crash-style lookup of run.lockedRetryConfig (same pathway as
// TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE etc.) and so the dashboard
// renders this as "Failed" rather than "System failure" — the exception
// was raised by user code (or a dependency the user controls, e.g. an
// EventEmitter "error" event with no listener), not a platform fault.
return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_UNCAUGHT_EXCEPTION,
message: error.originalError.message,
stackTrace: error.originalError.stack,
};
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_EXECUTION_FAILED,
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ export function shouldRetryError(error: TaskRunError): boolean {
case "TASK_EXECUTION_ABORTED":
case "TASK_EXECUTION_FAILED":
case "TASK_RUN_CRASHED":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE":
case "TASK_PROCESS_SIGTERM":
return true;
Expand Down Expand Up @@ -425,6 +426,7 @@ export function shouldLookupRetrySettings(error: TaskRunError): boolean {
case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE":
case "TASK_PROCESS_SIGTERM":
case "TASK_PROCESS_SIGSEGV":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
return true;

default:
Expand Down Expand Up @@ -722,6 +724,18 @@ const prettyInternalErrors: Partial<
href: links.docs.troubleshooting.stalledExecution,
},
},
// Link only — we deliberately do NOT set `message`, so the original
// error message (e.g. "read ECONNRESET") is preserved in the dashboard.
// Common cause: an EventEmitter (node-redis, pg, etc.) emitted "error"
// with no listener attached, which Node escalates to uncaughtException.
// The docs page explains how to attach .on("error") listeners and how
// unhandled rejections route through the same path.
TASK_RUN_UNCAUGHT_EXCEPTION: {
link: {
name: "Read our troubleshooting guide",
href: links.docs.troubleshooting.uncaughtException,
},
},
};

const getPrettyTaskRunError = (code: TaskRunInternalError["code"]): TaskRunInternalError => {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const links = {
troubleshooting: {
concurrentWaits: "https://trigger.dev/docs/troubleshooting#parallel-waits-are-not-supported",
stalledExecution: "https://trigger.dev/docs/troubleshooting#task-run-stalled-executing",
uncaughtException: "https://trigger.dev/docs/troubleshooting#uncaught-exceptions",
},
concurrency: {
recursiveDeadlock:
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export const TaskRunInternalError = z.object({
"GRACEFUL_EXIT_TIMEOUT",
"TASK_RUN_HEARTBEAT_TIMEOUT",
"TASK_RUN_CRASHED",
"TASK_RUN_UNCAUGHT_EXCEPTION",
"MAX_DURATION_EXCEEDED",
"DISK_SPACE_EXCEEDED",
"POD_EVICTED",
Expand Down
Loading