Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_STEP_TIMEOUT_MS,
Expand Down Expand Up @@ -43,6 +44,7 @@ export interface ExecutorOptions {
logger?: Logger;
stopTimeoutMs?: number;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
// Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining.
maxChainDepth?: number;
// Dev only: makes every AI call fail immediately so error paths can be exercised locally.
Expand Down Expand Up @@ -112,6 +114,7 @@ function buildCommonDependencies(options: ExecutorOptions) {
authSecret: options.authSecret,
stopTimeoutMs: options.stopTimeoutMs,
stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS,
aiInvokeTimeoutMs: options.aiInvokeTimeoutMs ?? DEFAULT_AI_INVOKE_TIMEOUT_MS,
maxChainDepth: options.maxChainDepth,
};
}
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow-executor/src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type WorkflowExecutor,
} from './build-workflow-executor';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_HTTP_PORT,
DEFAULT_MAX_CHAIN_DEPTH,
Expand Down Expand Up @@ -158,6 +159,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS),
stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS),
stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS),
aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS),
maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH),
...(aiConfigurations && { aiConfigurations }),
...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }),
Expand Down Expand Up @@ -194,6 +196,7 @@ Optional environment variables:
POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS}
STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS}
STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS})
AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS})
MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH})
NO_COLOR Set to any value to disable ANSI colors in pretty logs
FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths)
Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400;
export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com';
export const DEFAULT_POLLING_INTERVAL_MS = 30_000;
export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000;
export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000;
export const DEFAULT_STOP_TIMEOUT_MS = 30_000;
export const DEFAULT_MAX_CHAIN_DEPTH = 50;
12 changes: 12 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ export class StepTimeoutError extends WorkflowExecutorError {
}
}

// Thrown when the AI provider does not respond within the configured timeout — distinct from
// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout
// independently of the step timeout (AI hangs are common; record fetches are not).
export class AiInvokeTimeoutError extends WorkflowExecutorError {
constructor(timeoutMs: number) {
super(
`AI provider did not respond within ${timeoutMs}ms`,
'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.',
);
}
}

export class NoMcpToolsError extends WorkflowExecutorError {
constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) {
const technical = requestedMcpServerId
Expand Down
29 changes: 26 additions & 3 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type {
import { SystemMessage } from '@forestadmin/ai-proxy';

import {
AiInvokeTimeoutError,
InvalidAiRequestError,
MalformedToolCallError,
MissingToolCallError,
Expand Down Expand Up @@ -328,9 +329,31 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
): Promise<{ toolName: string; args: T }> {
BaseStepExecutor.assertNoMidArraySystemMessages(messages);
const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' });
const response = await modelWithTools.invoke(
BaseStepExecutor.mergeLeadingSystemMessages(messages),
);
const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages);
const aiTimeoutMs = this.context.aiInvokeTimeoutMs;
const timeoutEnabled = Boolean(aiTimeoutMs && aiTimeoutMs > 0);

let response;

try {
// LangChain turns the `timeout` call option into an AbortSignal (AbortSignal.timeout) and
// forwards it down to the underlying HTTP request, so a hanging provider is actually
// cancelled — not merely raced. 0/undefined leaves the call un-timed.
response = timeoutEnabled
? await modelWithTools.invoke(preparedMessages, { timeout: aiTimeoutMs })
: await modelWithTools.invoke(preparedMessages);
} catch (err) {
// On timeout the abort surfaces as TimeoutError (from AbortSignal.timeout) or AbortError.
// No other abort source exists on this path, so map either to our user-facing error.
const name = (err as { name?: string } | undefined)?.name;

if (timeoutEnabled && (name === 'TimeoutError' || name === 'AbortError')) {
throw new AiInvokeTimeoutError(aiTimeoutMs as number);
}

throw err;
}

const toolCall = response.tool_calls?.[0];

if (toolCall !== undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export interface StepContextConfig {
schemaCache: SchemaCache;
logger: Logger;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
}

export default class StepExecutorFactory {
Expand Down Expand Up @@ -135,6 +136,7 @@ export default class StepExecutorFactory {
logger: cfg.logger,
incomingPendingData,
stepTimeoutMs: cfg.stepTimeoutMs,
aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs,
activityLogPort,
};
}
Expand Down
5 changes: 5 additions & 0 deletions packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export interface RunnerConfig {
// On timeout the step reports status:error; the underlying work is not aborted (Promise.race
// limitation). Late rejections are caught and logged; late resolutions are silently discarded.
stepTimeoutMs?: number;
// Per-AI-invocation timeout (used by BaseStepExecutor.invokeWithTools). Aborts the underlying
// HTTP request via AbortSignal so a hanging provider is killed quickly, before stepTimeoutMs
// would fire. 0/undefined disables.
aiInvokeTimeoutMs?: number;
// Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the
// next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50.
maxChainDepth?: number;
Expand Down Expand Up @@ -416,6 +420,7 @@ export default class Runner {
schemaCache: this.config.schemaCache,
logger: this.logger,
stepTimeoutMs: this.config.stepTimeoutMs,
aiInvokeTimeoutMs: this.config.aiInvokeTimeoutMs,
};
}
}
1 change: 1 addition & 0 deletions packages/workflow-executor/src/types/execution-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ export interface ExecutionContext<TStep extends StepDefinition = StepDefinition>
readonly logger: Logger;
readonly incomingPendingData?: unknown;
readonly stepTimeoutMs?: number;
readonly aiInvokeTimeoutMs?: number;
readonly activityLogPort: ActivityLogPort;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy';
import { HumanMessage, SystemMessage } from '@forestadmin/ai-proxy';

import {
AiInvokeTimeoutError,
InvalidAiRequestError,
MalformedToolCallError,
MissingToolCallError,
Expand Down Expand Up @@ -854,6 +855,92 @@ describe('BaseStepExecutor', () => {
);
});
});

describe('AI invoke timeout', () => {
// Builds a model whose invoke rejects with the given error — mimics LangChain surfacing the
// abort it raises when the `timeout` call option fires (AbortSignal.timeout).
function makeRejectingModel(error: unknown) {
const invoke = jest.fn().mockRejectedValue(error);

return {
model: {
bindTools: jest.fn().mockReturnValue({ invoke }),
} as unknown as ExecutionContext['model'],
invoke,
};
}

it('maps a TimeoutError from invoke to AiInvokeTimeoutError', async () => {
const timeoutErr = Object.assign(new Error('Aborted'), { name: 'TimeoutError' });
const { model } = makeRejectingModel(timeoutErr);
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 }));

const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e);

expect(err).toBeInstanceOf(AiInvokeTimeoutError);
expect((err as Error).message).toContain('100ms');
});

it('maps an AbortError from invoke to AiInvokeTimeoutError', async () => {
const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' });
const { model } = makeRejectingModel(abortErr);
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 }));

const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e);

expect(err).toBeInstanceOf(AiInvokeTimeoutError);
expect((err as Error).message).toContain('100ms');
});

it('passes { timeout: aiInvokeTimeoutMs } as the second arg to model.invoke', async () => {
const { model, invoke } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 }));

await executor.invokeWithTool(dummyMessages, dummyTool);

expect(invoke).toHaveBeenCalledWith(expect.any(Array), { timeout: 5_000 });
});

it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => {
const { model, invoke } = makeMockModel({
tool_calls: [{ name: 'tool', args: {}, id: 'c1' }],
});
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: undefined }));

await executor.invokeWithTool(dummyMessages, dummyTool);

expect(invoke).toHaveBeenCalledTimes(1);
expect(invoke.mock.calls[0]).toHaveLength(1);
});

it('treats aiInvokeTimeoutMs <= 0 as disabled (no options, abort not mapped)', async () => {
const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' });
const invoke = jest
.fn()
.mockResolvedValueOnce({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }] })
.mockRejectedValueOnce(abortErr);
const model = {
bindTools: jest.fn().mockReturnValue({ invoke }),
} as unknown as ExecutionContext['model'];
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 }));

await executor.invokeWithTool(dummyMessages, dummyTool);
expect(invoke.mock.calls[0]).toHaveLength(1);

// With the timeout disabled, an abort is not ours to translate — it bubbles up untouched.
await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(abortErr);
});

it('rethrows non-abort errors without wrapping them', async () => {
const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' });
const { model } = makeRejectingModel(apiError);
const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 }));

await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError);
});
});
});

describe('patchAndReloadPendingData', () => {
Expand Down
Loading