Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ src/
- **Auto-chain from `/update-step` response** — `WorkflowPort.updateStepExecution` returns `AvailableRunDispatch | null`: when non-null, the `Runner` executes the next step inline instead of waiting for the next poll. The chain exits on `null` (awaiting-input / finished / error), on a non-progressing `stepIndex` (server bug defense), at `maxChainDepth` (config, default 50), or when `stop()` is called. Each chained step uses the `forestServerToken` from its own dispatch — token freshness is preserved across the chain. The port retries `POST /update-step` on transient failures (network, 5xx) — this relies on server-side idempotency: the orchestrator MUST deduplicate identical outcomes for a given `(runId, stepIndex)` to prevent double side-effects on retry.
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction.

## Commands
Expand Down
13 changes: 12 additions & 1 deletion packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,20 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
return (await this.invokeWithTools<T>(messages, [tool])).args;
}

// Overridden by executors that carry type-specific log identifiers (e.g. McpStepExecutor).
protected getExtraLogContext(): Record<string, unknown> {
return {};
}

private get logCtx() {
const { runId, stepId, stepIndex, stepDefinition } = this.context;

return { runId, stepId, stepIndex, stepType: stepDefinition.type };
return {
runId,
stepId,
stepIndex,
stepType: stepDefinition.type,
...this.getExtraLogContext(),
};
}
}
16 changes: 15 additions & 1 deletion packages/workflow-executor/src/executors/mcp-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,23 @@ Important rules:
export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition> {
private readonly remoteTools: readonly RemoteTool[];

constructor(context: ExecutionContext<McpStepDefinition>, remoteTools: readonly RemoteTool[]) {
private readonly mcpServerName?: string;

constructor(
context: ExecutionContext<McpStepDefinition>,
remoteTools: readonly RemoteTool[],
mcpServerName?: string,
) {
super(context);
this.remoteTools = remoteTools;
this.mcpServerName = mcpServerName;
}

protected override getExtraLogContext(): Record<string, unknown> {
return {
mcpServerId: this.context.stepDefinition.mcpServerId,
mcpServerName: this.mcpServerName,
};
}

protected override buildActivityLogArgs(): CreateActivityLogArgs | null {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { AiModelPort } from '../ports/ai-model-port';
import type { Logger } from '../ports/logger-port';
import type { RunStore } from '../ports/run-store';
import type { WorkflowPort } from '../ports/workflow-port';
import type { FetchRemoteToolsResult } from '../remote-tool-fetcher';
import type SchemaCache from '../schema-cache';
import type {
AvailableStepExecution,
Expand All @@ -20,7 +21,6 @@ import type {
TriggerActionStepDefinition,
UpdateRecordStepDefinition,
} from '../types/validated/step-definition';
import type { RemoteTool } from '@forestadmin/ai-proxy';

import { StepStateError, causeMessage, extractErrorMessage } from '../errors';
import ConditionStepExecutor from './condition-step-executor';
Expand Down Expand Up @@ -48,7 +48,7 @@ export default class StepExecutorFactory {
step: AvailableStepExecution,
contextConfig: StepContextConfig,
activityLogPort: ActivityLogPort,
fetchRemoteTools: (mcpServerId: string) => Promise<RemoteTool[]>,
fetchRemoteTools: (mcpServerId: string) => Promise<FetchRemoteToolsResult>,
incomingPendingData?: unknown,
): Promise<IStepExecutor> {
try {
Expand Down Expand Up @@ -79,11 +79,11 @@ export default class StepExecutorFactory {

case StepType.Mcp: {
const mcpContext = context as ExecutionContext<McpStepDefinition>;

return new McpStepExecutor(
mcpContext,
await fetchRemoteTools(mcpContext.stepDefinition.mcpServerId),
const { tools, mcpServerName } = await fetchRemoteTools(
mcpContext.stepDefinition.mcpServerId,
);

return new McpStepExecutor(mcpContext, tools, mcpServerName);
}

case StepType.Guidance:
Expand Down
21 changes: 15 additions & 6 deletions packages/workflow-executor/src/remote-tool-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ export function scopeConfigsToServer(
return Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId));
}

export interface FetchRemoteToolsResult {
tools: RemoteTool[];
mcpServerName?: string;
}

export default class RemoteToolFetcher {
private readonly workflowPort: WorkflowPort;
private readonly aiModelPort: AiModelPort;
Expand All @@ -22,19 +27,20 @@ export default class RemoteToolFetcher {
this.logger = logger;
}

async fetch(mcpServerId: string): Promise<RemoteTool[]> {
async fetch(mcpServerId: string): Promise<FetchRemoteToolsResult> {
const configs = await this.workflowPort.getMcpServerConfigs();
const scoped = scopeConfigsToServer(configs, mcpServerId);
const [mcpServerName] = Object.keys(scoped);

this.warnMissingTargetServer(configs, scoped, mcpServerId);
this.warnMissingTargetServer(configs, scoped, mcpServerId, mcpServerName);

if (Object.keys(scoped).length === 0) return [];
if (Object.keys(scoped).length === 0) return { tools: [], mcpServerName };

const tools = await this.aiModelPort.loadRemoteTools(scoped);

this.errorOnPartialLoadFailure(scoped, tools, mcpServerId);
this.errorOnPartialLoadFailure(scoped, tools, mcpServerId, mcpServerName);

return tools;
return { tools, mcpServerName };
}

// Distinguish "no configs at all" (deployment misconfig) from "configs exist but none match"
Expand All @@ -44,6 +50,7 @@ export default class RemoteToolFetcher {
configs: Record<string, ToolConfig>,
scoped: Record<string, ToolConfig>,
mcpServerId: string,
mcpServerName: string | undefined,
): void {
if (Object.keys(scoped).length > 0) return;

Expand All @@ -55,7 +62,7 @@ export default class RemoteToolFetcher {
Object.keys(configs).length === 0
? 'MCP step targets a server but orchestrator returned no MCP configs'
: 'MCP step targets a server not advertised by the orchestrator',
{ requestedMcpServerId: mcpServerId, availableMcpServerIds },
{ requestedMcpServerId: mcpServerId, mcpServerName, availableMcpServerIds },
);
}

Expand All @@ -66,6 +73,7 @@ export default class RemoteToolFetcher {
scoped: Record<string, ToolConfig>,
tools: RemoteTool[],
mcpServerId: string,
mcpServerName: string | undefined,
): void {
const loadedMcpServerIds = new Set(tools.map(t => t.mcpServerId));
const failedConfigNames = Object.entries(scoped)
Expand All @@ -76,6 +84,7 @@ export default class RemoteToolFetcher {

this.logger.error('MCP servers failed to load tools', {
requestedMcpServerId: mcpServerId,
mcpServerName,
failedConfigNames,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,4 +981,54 @@ describe('McpStepExecutor', () => {
});
});
});

describe('log context', () => {
it('includes mcpServerId and mcpServerName in the start and completion log lines', async () => {
const tool = new MockRemoteTool({ name: 'send_notification', sourceId: 'mcp-server-1' });
const { model } = makeMockModel('send_notification', { message: 'Hello' });
const logger = { info: jest.fn(), warn: jest.fn(), error: jest.fn() };
const context = makeContext({
model,
logger,
stepDefinition: makeStep({
executionType: StepExecutionMode.FullyAutomated,
mcpServerId: 'my-mcp-server',
}),
});
const executor = new McpStepExecutor(context, [tool], 'Production Slack');

await executor.execute();

expect(logger.info).toHaveBeenCalledWith(
'Step execution started',
expect.objectContaining({
mcpServerId: 'my-mcp-server',
mcpServerName: 'Production Slack',
}),
);
expect(logger.info).toHaveBeenCalledWith(
'Step execution completed',
expect.objectContaining({
mcpServerId: 'my-mcp-server',
mcpServerName: 'Production Slack',
}),
);
});

it('logs mcpServerName as undefined when no server name was resolved', async () => {
const logger = { info: jest.fn(), warn: jest.fn(), error: jest.fn() };
const context = makeContext({
logger,
stepDefinition: makeStep({ mcpServerId: 'id-missing' }),
});
const executor = new McpStepExecutor(context, []);

await executor.execute();

expect(logger.error).toHaveBeenCalledWith(
'No MCP tools available for mcpServerId="id-missing"',
expect.objectContaining({ mcpServerId: 'id-missing', mcpServerName: undefined }),
);
});
});
});
32 changes: 26 additions & 6 deletions packages/workflow-executor/test/remote-tool-fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,31 @@ describe('RemoteToolFetcher.fetch', () => {
expect(aiModelPort.loadRemoteTools).toHaveBeenCalledWith({ 'srv-a': cfg('id-A') });
});

it('returns an empty array and skips loadRemoteTools when the scoped Record is empty', async () => {
it('returns no tools and an undefined mcpServerName, skipping loadRemoteTools, when the scoped Record is empty', async () => {
const { fetcher, aiModelPort } = makeFetcher({
workflowPort: { getMcpServerConfigs: jest.fn().mockResolvedValue({}) },
});

const tools = await fetcher.fetch('id-A');
const result = await fetcher.fetch('id-A');

expect(tools).toEqual([]);
expect(result).toEqual({ tools: [], mcpServerName: undefined });
expect(aiModelPort.loadRemoteTools).not.toHaveBeenCalled();
});

it('resolves mcpServerName from the scoped Record key', async () => {
const remoteTools = [makeRemoteTool('srv-a', 'id-A')];
const { fetcher } = makeFetcher({
workflowPort: {
getMcpServerConfigs: jest.fn().mockResolvedValue({ 'srv-a': cfg('id-A') }),
},
aiModelPort: { loadRemoteTools: jest.fn().mockResolvedValue(remoteTools) },
});

const result = await fetcher.fetch('id-A');

expect(result).toEqual({ tools: remoteTools, mcpServerName: 'srv-a' });
});

it('warns about the missing target with the list of advertised ids when no config matches', async () => {
const { fetcher, logger } = makeFetcher({
workflowPort: {
Expand All @@ -111,7 +125,11 @@ describe('RemoteToolFetcher.fetch', () => {

expect(logger.warn).toHaveBeenCalledWith(
'MCP step targets a server not advertised by the orchestrator',
{ requestedMcpServerId: 'id-missing', availableMcpServerIds: ['id-A', 'id-B'] },
{
requestedMcpServerId: 'id-missing',
mcpServerName: undefined,
availableMcpServerIds: ['id-A', 'id-B'],
},
);
});

Expand All @@ -124,7 +142,7 @@ describe('RemoteToolFetcher.fetch', () => {

expect(logger.warn).toHaveBeenCalledWith(
'MCP step targets a server but orchestrator returned no MCP configs',
{ requestedMcpServerId: 'id-A', availableMcpServerIds: [] },
{ requestedMcpServerId: 'id-A', mcpServerName: undefined, availableMcpServerIds: [] },
);
expect(logger.warn).not.toHaveBeenCalledWith(
'MCP step targets a server not advertised by the orchestrator',
Expand Down Expand Up @@ -156,6 +174,7 @@ describe('RemoteToolFetcher.fetch', () => {

expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', {
requestedMcpServerId: 'id-A',
mcpServerName: 'srv-a',
failedConfigNames: ['srv-a'],
});
});
Expand Down Expand Up @@ -214,6 +233,7 @@ describe('RemoteToolFetcher.fetch', () => {

expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', {
requestedMcpServerId: 'id-zendesk',
mcpServerName: 'zendesk-prod',
failedConfigNames: ['zendesk-prod'],
});
});
Expand All @@ -229,7 +249,7 @@ describe('RemoteToolFetcher.fetch', () => {

const result = await fetcher.fetch('id-A');

expect(result).toBe(remoteTools);
expect(result.tools).toBe(remoteTools);
});

it('propagates a rejection from loadRemoteTools without logging partial-failure', async () => {
Expand Down
13 changes: 11 additions & 2 deletions packages/workflow-executor/test/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,7 @@ describe('MCP fetch scoping', () => {
'MCP step targets a server not advertised by the orchestrator',
{
requestedMcpServerId: 'id-missing',
mcpServerName: undefined,
availableMcpServerIds: expect.arrayContaining(['id-A', 'id-B']),
},
);
Expand Down Expand Up @@ -1332,7 +1333,7 @@ describe('MCP fetch scoping', () => {
expect(aiClient.loadRemoteTools).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledWith(
'MCP step targets a server but orchestrator returned no MCP configs',
{ requestedMcpServerId: 'id-A', availableMcpServerIds: [] },
{ requestedMcpServerId: 'id-A', mcpServerName: undefined, availableMcpServerIds: [] },
);
expect(logger.warn).not.toHaveBeenCalledWith(
'MCP step targets a server not advertised by the orchestrator',
Expand Down Expand Up @@ -1377,6 +1378,7 @@ describe('MCP fetch scoping', () => {

expect(logger.error).toHaveBeenCalledWith('MCP servers failed to load tools', {
requestedMcpServerId: 'id-A',
mcpServerName: 'server-A',
failedConfigNames: ['server-A'],
});
expect(executeSpy).toHaveBeenCalledTimes(1);
Expand Down Expand Up @@ -1523,7 +1525,9 @@ describe('StepExecutorFactory.create — factory', () => {
mcpServerId: 'srv-42',
},
});
const fetchRemoteTools = jest.fn().mockResolvedValue([]);
const fetchRemoteTools = jest
.fn()
.mockResolvedValue({ tools: [], mcpServerName: 'Production Slack' });
const executor = await StepExecutorFactory.create(
step,
makeContextConfig(),
Expand All @@ -1532,6 +1536,11 @@ describe('StepExecutorFactory.create — factory', () => {
);
expect(executor).toBeInstanceOf(McpStepExecutor);
expect(fetchRemoteTools).toHaveBeenCalledWith('srv-42');
expect(
(
executor as unknown as { getExtraLogContext(): Record<string, unknown> }
).getExtraLogContext(),
).toEqual({ mcpServerId: 'srv-42', mcpServerName: 'Production Slack' });
});

it('dispatches Guidance steps to GuidanceStepExecutor', async () => {
Expand Down
Loading