Skip to content
Merged
37 changes: 12 additions & 25 deletions packages/workflow-executor/src/adapters/step-definition-mapper.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type {
ServerTaskTypeEnum,
ServerWorkflowCondition,
ServerWorkflowStep,
ServerWorkflowTask,
} from './server-types';
import type { ConditionStepDefinition, StepDefinition } from '../types/validated/step-definition';

import { ServerTaskTypeEnum } from './server-types';
import { InvalidStepDefinitionError, UnsupportedStepTypeError } from '../errors';
import {
ConditionStepDefinitionSchema,
Expand All @@ -18,48 +18,35 @@ import {
UpdateRecordStepDefinitionSchema,
} from '../types/validated/step-definition';

const TASK_TYPE_TO_STEP_TYPE: Record<ServerTaskTypeEnum, StepType> = {
'get-data': StepType.ReadRecord,
'update-data': StepType.UpdateRecord,
'trigger-action': StepType.TriggerAction,
'load-related-record': StepType.LoadRelatedRecord,
'mcp-server': StepType.Mcp,
guideline: StepType.Guidance,
};

function mapTask(task: ServerWorkflowTask): StepDefinition {
const stepType = TASK_TYPE_TO_STEP_TYPE[task.taskType];

if (!stepType) {
throw new InvalidStepDefinitionError(`Unknown taskType: "${task.taskType}"`);
}

// executionType is passed through as-is; each schema's .default().catch() handles
// missing or unsupported values without requiring an explicit mapping here.
const base = { prompt: task.prompt, executionType: task.executionType };

switch (stepType) {
case StepType.Mcp:
switch (task.taskType) {
case ServerTaskTypeEnum.McpServer:
return McpStepDefinitionSchema.parse({
...base,
type: StepType.Mcp,
...('mcpServerId' in task && { mcpServerId: task.mcpServerId }),
mcpServerId: task.mcpServerId,
});
case StepType.Guidance:
case ServerTaskTypeEnum.Guideline:
return GuidanceStepDefinitionSchema.parse({ ...base, type: StepType.Guidance });
case StepType.ReadRecord:
case ServerTaskTypeEnum.GetData:
return ReadRecordStepDefinitionSchema.parse({ ...base, type: StepType.ReadRecord });
case StepType.UpdateRecord:
case ServerTaskTypeEnum.UpdateData:
return UpdateRecordStepDefinitionSchema.parse({ ...base, type: StepType.UpdateRecord });
case StepType.TriggerAction:
case ServerTaskTypeEnum.TriggerAction:
return TriggerActionStepDefinitionSchema.parse({ ...base, type: StepType.TriggerAction });
case StepType.LoadRelatedRecord:
case ServerTaskTypeEnum.LoadRelatedRecord:
return LoadRelatedRecordStepDefinitionSchema.parse({
...base,
type: StepType.LoadRelatedRecord,
});
default:
throw new InvalidStepDefinitionError(`Unmapped step type: "${stepType}"`);
throw new InvalidStepDefinitionError(
`Unknown taskType: "${(task as { taskType: string }).taskType}"`,
);
}
}

Expand Down
12 changes: 5 additions & 7 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,11 @@ export class StepTimeoutError extends WorkflowExecutorError {
}

export class NoMcpToolsError extends WorkflowExecutorError {
constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) {
const technical = requestedMcpServerId
? `No MCP tools available for mcpServerId="${requestedMcpServerId}". Loaded MCP server ids: [${(
loadedMcpServerIds ?? []
).join(', ')}]`
: 'No MCP tools available';
super(technical, 'No tools are available to execute this step.');
constructor(requestedMcpServerId: string) {
super(
`No MCP tools available for mcpServerId="${requestedMcpServerId}"`,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the name and id maybe

'Tools could not be loaded for the targeted server. Please try again, or contact your administrator if the problem persists.',
);
}
}

Expand Down
32 changes: 10 additions & 22 deletions packages/workflow-executor/src/executors/mcp-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
}

// Branches B & C -- First call
const tools = this.getFilteredTools();
const tools = this.requireTools();
const { toolName, args } = await this.selectTool(tools);
const selectedTool = tools.find(t => t.base.name === toolName);
if (!selectedTool) throw new McpToolNotFoundError(toolName);
Expand All @@ -107,7 +107,7 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
target: McpToolCall,
existingExecution?: McpStepExecutionData,
): Promise<StepExecutionResult> {
const tools = this.getFilteredTools();
const tools = this.requireTools();
const tool = tools.find(t => t.base.name === target.name && t.sourceId === target.sourceId);
if (!tool) throw new McpToolNotFoundError(target.name);

Expand Down Expand Up @@ -225,27 +225,15 @@ export default class McpStepExecutor extends BaseStepExecutor<McpStepDefinition>
);
}

private getFilteredTools(): RemoteTool[] {
const { mcpServerId } = this.context.stepDefinition;
const tools = mcpServerId
? this.remoteTools.filter(t => t.mcpServerId === mcpServerId)
: [...this.remoteTools];

if (tools.length === 0) {
const loadedMcpServerIds = this.remoteTools
.map(t => t.mcpServerId)
.filter((value): value is string => !!value);
const error = new NoMcpToolsError(mcpServerId, loadedMcpServerIds);
this.context.logger.error(error.message, {
runId: this.context.runId,
stepId: this.context.stepId,
stepIndex: this.context.stepIndex,
requestedMcpServerId: mcpServerId,
loadedMcpServerIds,
});
throw error;
// Tools are pre-scoped to step.mcpServerId upstream. An empty list means either no config
// matched, or the per-server connection failed at load time (McpClient swallows per-server
// errors). RemoteToolFetcher emits the diagnostic upstream; here we just surface the empty
// case as a domain error so BaseStepExecutor turns it into a step outcome.
private requireTools(): RemoteTool[] {
if (this.remoteTools.length === 0) {
throw new NoMcpToolsError(this.context.stepDefinition.mcpServerId);
}

return tools;
return [...this.remoteTools];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export default class StepExecutorFactory {
step: AvailableStepExecution,
contextConfig: StepContextConfig,
activityLogPort: ActivityLogPort,
loadTools: () => Promise<RemoteTool[]>,
fetchRemoteTools: (mcpServerId: string) => Promise<RemoteTool[]>,
incomingPendingData?: unknown,
): Promise<IStepExecutor> {
try {
Expand Down Expand Up @@ -76,11 +76,16 @@ export default class StepExecutorFactory {
return new LoadRelatedRecordStepExecutor(
context as ExecutionContext<LoadRelatedRecordStepDefinition>,
);
case StepType.Mcp:

case StepType.Mcp: {
const mcpContext = context as ExecutionContext<McpStepDefinition>;

return new McpStepExecutor(
context as ExecutionContext<McpStepDefinition>,
await loadTools(),
mcpContext,
await fetchRemoteTools(mcpContext.stepDefinition.mcpServerId),
);
}

case StepType.Guidance:
return new GuidanceStepExecutor(context as ExecutionContext<GuidanceStepDefinition>);
default:
Expand Down
82 changes: 82 additions & 0 deletions packages/workflow-executor/src/remote-tool-fetcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { AiModelPort } from './ports/ai-model-port';
import type { Logger } from './ports/logger-port';
import type { WorkflowPort } from './ports/workflow-port';
import type { RemoteTool, ToolConfig } from '@forestadmin/ai-proxy';

// Match by config.id, not by Record key: server names can collide across configs.
export function scopeConfigsToServer(
configs: Record<string, ToolConfig>,
mcpServerId: string,
): Record<string, ToolConfig> {
return Object.fromEntries(Object.entries(configs).filter(([, cfg]) => cfg.id === mcpServerId));
}

export default class RemoteToolFetcher {
private readonly workflowPort: WorkflowPort;
private readonly aiModelPort: AiModelPort;
private readonly logger: Logger;

constructor(workflowPort: WorkflowPort, aiModelPort: AiModelPort, logger: Logger) {
this.workflowPort = workflowPort;
this.aiModelPort = aiModelPort;
this.logger = logger;
}

async fetch(mcpServerId: string): Promise<RemoteTool[]> {
const configs = await this.workflowPort.getMcpServerConfigs();
const scoped = scopeConfigsToServer(configs, mcpServerId);

this.warnMissingTargetServer(configs, scoped, mcpServerId);

if (Object.keys(scoped).length === 0) return [];

const tools = await this.aiModelPort.loadRemoteTools(scoped);

this.errorOnPartialLoadFailure(scoped, tools, mcpServerId);

return tools;
}

// Distinguish "no configs at all" (deployment misconfig) from "configs exist but none match"
// (orchestrator/executor drift on server id) — both yield zero tools, but ops need to know
// which one to fix.
private warnMissingTargetServer(
configs: Record<string, ToolConfig>,
scoped: Record<string, ToolConfig>,
mcpServerId: string,
): void {
if (Object.keys(scoped).length > 0) return;

const availableMcpServerIds = Object.values(configs)
.map(cfg => cfg.id)
.filter((id): id is string => Boolean(id));

this.logger.warn(
Object.keys(configs).length === 0
? 'MCP step targets a server but orchestrator returned no MCP configs'
: 'MCP step targets a server not advertised by the orchestrator',
{ requestedMcpServerId: mcpServerId, availableMcpServerIds },
);
}

// Partial-failure detection: McpClient swallows per-server load errors and returns whatever
// succeeded. Match config.id against tool.mcpServerId — both providers populate it from the
// orchestrator's persisted id, so the check is uniform across MCP and Forest connectors.
private errorOnPartialLoadFailure(
scoped: Record<string, ToolConfig>,
tools: RemoteTool[],
mcpServerId: string,
): void {
const loadedMcpServerIds = new Set(tools.map(t => t.mcpServerId));
const failedConfigNames = Object.entries(scoped)
.filter(([, cfg]) => !loadedMcpServerIds.has(cfg.id))
.map(([name]) => name);

if (failedConfigNames.length === 0) return;

this.logger.error('MCP servers failed to load tools', {
requestedMcpServerId: mcpServerId,
failedConfigNames,
});
}
}
17 changes: 8 additions & 9 deletions packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import type SchemaCache from './schema-cache';
import type { AvailableStepExecution, StepExecutionResult } from './types/execution-context';
import type { StepExecutionData } from './types/step-execution-data';
import type { StepOutcome } from './types/validated/step-outcome';
import type { RemoteTool } from '@forestadmin/ai-proxy';

import ConsoleLogger from './adapters/console-logger';
import { DEFAULT_MAX_CHAIN_DEPTH, DEFAULT_STOP_TIMEOUT_MS } from './defaults';
Expand All @@ -22,6 +21,7 @@ import {
} from './errors';
import StepExecutorFactory from './executors/step-executor-factory';
import InFlightRunRegistry from './in-flight-run-registry';
import RemoteToolFetcher from './remote-tool-fetcher';
import { stepTypeToOutcomeType } from './types/validated/step-outcome';
import validateSecrets from './validate-secrets';

Expand Down Expand Up @@ -52,11 +52,17 @@ export default class Runner {
private pollingTimer: NodeJS.Timeout | null = null;
private readonly inFlightRuns = new InFlightRunRegistry();
private readonly logger: Logger;
private readonly remoteToolFetcher: RemoteToolFetcher;
private _state: RunnerState = 'idle';

constructor(config: RunnerConfig) {
this.config = config;
this.logger = config.logger ?? new ConsoleLogger();
this.remoteToolFetcher = new RemoteToolFetcher(
config.workflowPort,
config.aiModelPort,
this.logger,
);
}

get state(): RunnerState {
Expand Down Expand Up @@ -251,13 +257,6 @@ export default class Runner {
}
}

private async fetchRemoteTools(): Promise<RemoteTool[]> {
const configs = await this.config.workflowPort.getMcpServerConfigs();
if (Object.keys(configs).length === 0) return [];

return this.config.aiModelPort.loadRemoteTools(configs);
}

private executeStep(
step: AvailableStepExecution,
forestServerToken: string,
Expand Down Expand Up @@ -295,7 +294,7 @@ export default class Runner {
currentStep,
this.contextConfig,
this.config.activityLogPortFactory.forRun(currentToken),
() => this.fetchRemoteTools(),
mcpServerId => this.remoteToolFetcher.fetch(mcpServerId),
currentIncomingData,
);
result = await executor.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export const McpStepDefinitionSchema = z.object({
.enum([AutomatedWithConfirmation, FullyAutomated])
.default(AutomatedWithConfirmation)
.catch(AutomatedWithConfirmation),
mcpServerId: z.string().optional(),
mcpServerId: z.string().min(1),
});
export type McpStepDefinition = z.infer<typeof McpStepDefinitionSchema>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,10 @@ describe('toStepDefinition', () => {
});
});

it('should map task with mcp-server taskType without mcpServerId', () => {
it('rejects an mcp-server task missing mcpServerId at the zod boundary', () => {
const task = makeTask({ taskType: ServerTaskTypeEnum.McpServer, prompt: 'run mcp' });

expect(toStepDefinition(task)).toEqual({
type: StepType.Mcp,
prompt: 'run mcp',
executionType: ServerStepExecutionTypeEnum.AutomatedWithConfirmation,
});
expect(() => toStepDefinition(task)).toThrow();
});

it('should map task with guideline taskType to guidance', () => {
Expand Down
35 changes: 7 additions & 28 deletions packages/workflow-executor/test/errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,38 +70,17 @@ describe('extractErrorMessage', () => {
});

describe('NoMcpToolsError', () => {
it('produces a fully generic technical message when no mcpServerId was requested (no filter case)', () => {
const err = new NoMcpToolsError();
it('includes the requested mcpServerId in the technical message', () => {
const err = new NoMcpToolsError('id-missing');

expect(err.message).toBe('No MCP tools available');
expect(err.userMessage).toBe('No tools are available to execute this step.');
expect(err.message).toBe('No MCP tools available for mcpServerId="id-missing"');
});

it('includes the requested mcpServerId in the technical message when a filter was active', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);
it('keeps the user-facing message free of internal ids', () => {
const err = new NoMcpToolsError('id-missing');

expect(err.message).toMatch(/id-missing/);
});

it('lists the loaded mcpServerIds in the technical message so misconfigurations are diagnosable', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);

expect(err.message).toMatch(/id-A/);
expect(err.message).toMatch(/id-B/);
});

it('handles an empty loaded-id list without producing a malformed message', () => {
const err = new NoMcpToolsError('id-missing', []);

expect(err.message).toMatch(/id-missing/);
expect(err.message).not.toMatch(/undefined|null|\[object/i);
});

it('keeps the user-facing message generic — no internal ids must leak', () => {
const err = new NoMcpToolsError('id-missing', ['id-A', 'id-B']);

expect(err.userMessage).toBe('No tools are available to execute this step.');
expect(err.userMessage).not.toMatch(/id-missing|id-A|id-B/);
expect(err.userMessage).toMatch(/^Tools could not be loaded for the targeted server\./);
expect(err.userMessage).not.toMatch(/id-missing/);
});
});

Expand Down
Loading
Loading