From 1f28b96551fac2f3adc6e71b88907ecad615e25b Mon Sep 17 00:00:00 2001 From: Arnaud Moncel Date: Tue, 31 Mar 2026 11:20:14 +0200 Subject: [PATCH] feat(agent): proxify ws executor call --- packages/agent/src/routes/index.ts | 8 + .../workflow/workflow-executor-proxy.ts | 87 ++++++++ packages/agent/src/types.ts | 7 + packages/agent/src/utils/options-validator.ts | 1 + .../forest-admin-http-driver-options.ts | 1 + packages/agent/test/__factories__/router.ts | 1 + .../workflow/workflow-executor-proxy.test.ts | 201 ++++++++++++++++++ packages/workflow-executor/.dockerignore | 6 + packages/workflow-executor/Dockerfile | 48 +++++ packages/workflow-executor/src/main.ts | 89 ++++++++ 10 files changed, 449 insertions(+) create mode 100644 packages/agent/src/routes/workflow/workflow-executor-proxy.ts create mode 100644 packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts create mode 100644 packages/workflow-executor/.dockerignore create mode 100644 packages/workflow-executor/Dockerfile create mode 100644 packages/workflow-executor/src/main.ts diff --git a/packages/agent/src/routes/index.ts b/packages/agent/src/routes/index.ts index 24476bee4..6d626180e 100644 --- a/packages/agent/src/routes/index.ts +++ b/packages/agent/src/routes/index.ts @@ -31,6 +31,7 @@ import ScopeInvalidation from './security/scope-invalidation'; import ErrorHandling from './system/error-handling'; import HealthCheck from './system/healthcheck'; import Logger from './system/logger'; +import WorkflowExecutorProxyRoute from './workflow/workflow-executor-proxy'; export const ROOT_ROUTES_CTOR = [ Authentication, @@ -172,6 +173,12 @@ function getAiRoutes(options: Options, services: Services, aiRouter: AiRouter | return [new AiProxyRoute(services, options, aiRouter)]; } +function getWorkflowExecutorRoutes(options: Options, services: Services): BaseRoute[] { + if (!options.workflowExecutorUrl) return []; + + return [new WorkflowExecutorProxyRoute(services, options)]; +} + export default function makeRoutes( dataSource: DataSource, options: Options, @@ -187,6 +194,7 @@ export default function makeRoutes( ...getRelatedRoutes(dataSource, options, services), ...getActionRoutes(dataSource, options, services), ...getAiRoutes(options, services, aiRouter), + ...getWorkflowExecutorRoutes(options, services), ]; // Ensure routes and middlewares are loaded in the right order. diff --git a/packages/agent/src/routes/workflow/workflow-executor-proxy.ts b/packages/agent/src/routes/workflow/workflow-executor-proxy.ts new file mode 100644 index 000000000..33e629404 --- /dev/null +++ b/packages/agent/src/routes/workflow/workflow-executor-proxy.ts @@ -0,0 +1,87 @@ +import type { ForestAdminHttpDriverServices } from '../../services'; +import type { AgentOptionsWithDefaults } from '../../types'; +import type KoaRouter from '@koa/router'; +import type { Context } from 'koa'; + +import { request as httpRequest } from 'http'; +import { request as httpsRequest } from 'https'; + +import { HttpCode, RouteType } from '../../types'; +import BaseRoute from '../base-route'; + +export default class WorkflowExecutorProxyRoute extends BaseRoute { + readonly type = RouteType.PrivateRoute; + private readonly executorUrl: URL; + + constructor(services: ForestAdminHttpDriverServices, options: AgentOptionsWithDefaults) { + super(services, options); + // Remove trailing slash for clean URL joining + this.executorUrl = new URL(options.workflowExecutorUrl.replace(/\/+$/, '')); + } + + private static readonly AGENT_PREFIX = '/_internal/workflow-executions'; + private static readonly EXECUTOR_PREFIX = '/runs'; + + setupRoutes(router: KoaRouter): void { + router.get('/_internal/workflow-executions/:runId', this.handleProxy.bind(this)); + router.post('/_internal/workflow-executions/:runId/trigger', this.handleProxy.bind(this)); + router.patch( + '/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data', + this.handleProxy.bind(this), + ); + } + + private async handleProxy(context: Context): Promise { + // Rewrite /_internal/workflow-executions/... → /runs/... + const executorPath = context.path.replace( + WorkflowExecutorProxyRoute.AGENT_PREFIX, + WorkflowExecutorProxyRoute.EXECUTOR_PREFIX, + ); + const targetUrl = new URL(executorPath, this.executorUrl); + + const response = await this.forwardRequest(context.method, targetUrl, context.request.body); + + context.response.status = response.status; + context.response.body = response.body; + } + + private forwardRequest( + method: string, + url: URL, + body?: unknown, + ): Promise<{ status: number; body: unknown }> { + const requestFn = url.protocol === 'https:' ? httpsRequest : httpRequest; + + return new Promise((resolve, reject) => { + const req = requestFn( + url, + { method, headers: { 'Content-Type': 'application/json' } }, + res => { + const chunks: Uint8Array[] = []; + res.on('data', chunk => chunks.push(chunk)); + res.on('end', () => { + const raw = Buffer.concat(chunks).toString('utf-8'); + let parsed: unknown; + + try { + parsed = JSON.parse(raw); + } catch { + parsed = raw; + } + + resolve({ status: res.statusCode ?? HttpCode.InternalServerError, body: parsed }); + }); + res.on('error', reject); + }, + ); + + req.on('error', reject); + + if (body && method !== 'GET') { + req.write(JSON.stringify(body)); + } + + req.end(); + }); + } +} diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index d90d83b08..53785b8c2 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -45,6 +45,13 @@ export type AgentOptions = { */ ignoreMissingSchemaElementErrors?: boolean; useUnsafeActionEndpoint?: boolean; + /** + * Base URL of the workflow executor to proxy requests to. + * When set, the agent exposes routes at `/_internal/workflow-executions/` + * that forward to the executor, benefiting from the agent's authentication layer. + * @example 'http://localhost:4001' + */ + workflowExecutorUrl?: string | null; }; export type AgentOptionsWithDefaults = Readonly>; diff --git a/packages/agent/src/utils/options-validator.ts b/packages/agent/src/utils/options-validator.ts index 317819279..a6fa72548 100644 --- a/packages/agent/src/utils/options-validator.ts +++ b/packages/agent/src/utils/options-validator.ts @@ -38,6 +38,7 @@ export default class OptionsValidator { copyOptions.loggerLevel = copyOptions.loggerLevel || 'Info'; copyOptions.skipSchemaUpdate = copyOptions.skipSchemaUpdate || false; copyOptions.instantCacheRefresh = copyOptions.instantCacheRefresh ?? true; + copyOptions.workflowExecutorUrl = copyOptions.workflowExecutorUrl ?? null; copyOptions.maxBodySize = copyOptions.maxBodySize || '50mb'; copyOptions.bodyParserOptions = copyOptions.bodyParserOptions || { jsonLimit: '50mb', diff --git a/packages/agent/test/__factories__/forest-admin-http-driver-options.ts b/packages/agent/test/__factories__/forest-admin-http-driver-options.ts index bf64613f2..5189d2b9a 100644 --- a/packages/agent/test/__factories__/forest-admin-http-driver-options.ts +++ b/packages/agent/test/__factories__/forest-admin-http-driver-options.ts @@ -29,4 +29,5 @@ export default Factory.define(() => ({ }, ignoreMissingSchemaElementErrors: false, useUnsafeActionEndpoint: false, + workflowExecutorUrl: null, })); diff --git a/packages/agent/test/__factories__/router.ts b/packages/agent/test/__factories__/router.ts index 226a1bf2a..78cb0e55b 100644 --- a/packages/agent/test/__factories__/router.ts +++ b/packages/agent/test/__factories__/router.ts @@ -7,6 +7,7 @@ export class RouterFactory extends Factory { router.get = jest.fn(); router.delete = jest.fn(); router.use = jest.fn(); + router.patch = jest.fn(); router.post = jest.fn(); router.put = jest.fn(); router.all = jest.fn(); diff --git a/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts b/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts new file mode 100644 index 000000000..17e337aff --- /dev/null +++ b/packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts @@ -0,0 +1,201 @@ +import http from 'http'; + +import { createMockContext } from '@shopify/jest-koa-mocks'; + +import WorkflowExecutorProxyRoute from '../../../src/routes/workflow/workflow-executor-proxy'; +import { RouteType } from '../../../src/types'; +import * as factories from '../../__factories__'; + +describe('WorkflowExecutorProxyRoute', () => { + const options = factories.forestAdminHttpDriverOptions.build(); + const services = factories.forestAdminHttpDriverServices.build(); + const router = factories.router.mockAllMethods().build(); + + let executorServer: http.Server; + let executorPort: number; + + // Start a real HTTP server to act as the workflow executor + beforeAll(async () => { + executorServer = http.createServer((req, res) => { + const chunks: Uint8Array[] = []; + req.on('data', chunk => chunks.push(chunk)); + req.on('end', () => { + const body = Buffer.concat(chunks).toString('utf-8'); + + res.setHeader('Content-Type', 'application/json'); + + if (req.url?.includes('not-found')) { + res.writeHead(404); + res.end(JSON.stringify({ error: 'Run not found or unavailable' })); + } else if (req.method === 'GET' && req.url?.match(/^\/runs\/[\w-]+$/)) { + res.writeHead(200); + res.end(JSON.stringify({ steps: [{ stepId: 's1', status: 'success' }] })); + } else if (req.method === 'POST' && req.url?.match(/^\/runs\/[\w-]+\/trigger$/)) { + res.writeHead(200); + res.end(JSON.stringify({ triggered: true })); + } else if ( + req.method === 'PATCH' && + req.url?.match(/^\/runs\/[\w-]+\/steps\/\d+\/pending-data$/) + ) { + const parsed = body ? JSON.parse(body) : {}; + res.writeHead(200); + res.end(JSON.stringify({ updated: true, received: parsed })); + } else { + res.writeHead(404); + res.end(JSON.stringify({ error: 'Not found' })); + } + }); + }); + + await new Promise((resolve, reject) => { + executorServer.listen(0, () => { + executorPort = (executorServer.address() as { port: number }).port; + resolve(); + }); + executorServer.on('error', reject); + }); + }); + + afterAll(async () => { + await new Promise((resolve, reject) => { + executorServer.close(err => (err ? reject(err) : resolve())); + }); + }); + + beforeEach(() => { + jest.clearAllMocks(); + }); + + const buildOptions = (url: string) => + factories.forestAdminHttpDriverOptions.build({ workflowExecutorUrl: url }); + + describe('constructor', () => { + test('should have RouteType.PrivateRoute', () => { + const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001')); + + expect(route.type).toBe(RouteType.PrivateRoute); + }); + }); + + describe('setupRoutes', () => { + test('should register GET, POST and PATCH routes', () => { + const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001')); + route.setupRoutes(router); + + expect(router.get).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId', + expect.any(Function), + ); + expect(router.post).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId/trigger', + expect.any(Function), + ); + expect(router.patch).toHaveBeenCalledWith( + '/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data', + expect.any(Function), + ); + }); + }); + + describe('handleProxy', () => { + test('should forward GET /runs/:runId and return executor response', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'run-123' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-123', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ + steps: [{ stepId: 's1', status: 'success' }], + }); + }); + + test('should forward POST /runs/:runId/trigger and return executor response', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + method: 'POST', + customProperties: { params: { runId: 'run-456' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-456/trigger', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ triggered: true }); + }); + + test('should forward error status from executor', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'not-found' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/not-found', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(404); + expect(context.response.body).toEqual({ error: 'Run not found or unavailable' }); + }); + + test('should forward PATCH pending-data and pass request body', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions(`http://localhost:${executorPort}`), + ); + + const context = createMockContext({ + method: 'PATCH', + customProperties: { params: { runId: 'run-789', stepIndex: '2' } }, + requestBody: { fieldValues: { name: 'updated' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-789/steps/2/pending-data', + }); + + await (route as any).handleProxy(context); + + expect(context.response.status).toBe(200); + expect(context.response.body).toEqual({ + updated: true, + received: { fieldValues: { name: 'updated' } }, + }); + }); + + test('should reject when executor is unreachable', async () => { + const route = new WorkflowExecutorProxyRoute( + services, + buildOptions('http://localhost:1'), // port that should be unreachable + ); + + const context = createMockContext({ + customProperties: { params: { runId: 'run-789' } }, + }); + Object.defineProperty(context, 'path', { + value: '/_internal/workflow-executions/run-789', + }); + + await expect((route as any).handleProxy(context)).rejects.toThrow(); + }); + }); +}); diff --git a/packages/workflow-executor/.dockerignore b/packages/workflow-executor/.dockerignore new file mode 100644 index 000000000..bbdc96a6d --- /dev/null +++ b/packages/workflow-executor/.dockerignore @@ -0,0 +1,6 @@ +**/node_modules +**/dist +**/coverage +**/.git +**/*.test.ts +**/test diff --git a/packages/workflow-executor/Dockerfile b/packages/workflow-executor/Dockerfile new file mode 100644 index 000000000..e012038a3 --- /dev/null +++ b/packages/workflow-executor/Dockerfile @@ -0,0 +1,48 @@ +# ---- Build stage ---- +FROM node:22-alpine AS builder + +WORKDIR /app + +# Copy root config files needed for workspace resolution and build +COPY package.json yarn.lock global.d.ts tsconfig.json ./ + +# Copy all packages (yarn workspaces need the full structure to resolve deps) +COPY packages/ packages/ + +# Install all dependencies (workspaces) +RUN yarn install --frozen-lockfile + +# Build the workflow-executor and its internal dependencies +RUN yarn workspace @forestadmin/forestadmin-client build \ + && yarn workspace @forestadmin/datasource-toolkit build \ + && yarn workspace @forestadmin/agent-client build \ + && yarn workspace @forestadmin/ai-proxy build \ + && yarn workspace @forestadmin/workflow-executor build + +# ---- Production stage ---- +FROM node:22-alpine + +WORKDIR /app + +COPY package.json yarn.lock ./ +COPY packages/workflow-executor/package.json packages/workflow-executor/ +COPY packages/forestadmin-client/package.json packages/forestadmin-client/ +COPY packages/datasource-toolkit/package.json packages/datasource-toolkit/ +COPY packages/agent-client/package.json packages/agent-client/ +COPY packages/ai-proxy/package.json packages/ai-proxy/ + +RUN yarn install --frozen-lockfile --production + +# Copy built artifacts from builder +COPY --from=builder /app/packages/workflow-executor/dist packages/workflow-executor/dist +COPY --from=builder /app/packages/forestadmin-client/dist packages/forestadmin-client/dist +COPY --from=builder /app/packages/datasource-toolkit/dist packages/datasource-toolkit/dist +COPY --from=builder /app/packages/agent-client/dist packages/agent-client/dist +COPY --from=builder /app/packages/ai-proxy/dist packages/ai-proxy/dist + +ENV NODE_ENV=production +ENV EXECUTOR_HTTP_PORT=4001 + +EXPOSE 4001 + +CMD ["node", "packages/workflow-executor/dist/main.js"] diff --git a/packages/workflow-executor/src/main.ts b/packages/workflow-executor/src/main.ts new file mode 100644 index 000000000..ccfa7e201 --- /dev/null +++ b/packages/workflow-executor/src/main.ts @@ -0,0 +1,89 @@ +import type { RunStore } from './ports/run-store'; +import type { StepExecutionData } from './types/step-execution-data'; + +import { AiClient } from '@forestadmin/ai-proxy'; + +import AgentClientAgentPort from './adapters/agent-client-agent-port'; +import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; +import Runner from './runner'; + +/** Simple in-memory RunStore — suitable for single-instance deployments. */ +class InMemoryRunStore implements RunStore { + private readonly store = new Map(); + + async getStepExecutions(runId: string): Promise { + return this.store.get(runId) ?? []; + } + + async saveStepExecution(runId: string, stepExecution: StepExecutionData): Promise { + const executions = this.store.get(runId) ?? []; + executions.push(stepExecution); + this.store.set(runId, executions); + } +} + +function requireEnv(name: string): string { + const value = process.env[name]; + + if (!value) { + throw new Error(`Missing required environment variable: ${name}`); + } + + return value; +} + +async function main() { + const envSecret = requireEnv('FOREST_ENV_SECRET'); + const authSecret = requireEnv('FOREST_AUTH_SECRET'); + const forestServerUrl = requireEnv('FOREST_SERVER_URL'); + const agentUrl = requireEnv('FOREST_AGENT_URL'); + const httpPort = Number(process.env.EXECUTOR_HTTP_PORT ?? '4001'); + const pollingIntervalMs = Number(process.env.EXECUTOR_POLLING_INTERVAL_MS ?? '5000'); + + const aiConfigurations = []; + + if (process.env.AI_PROVIDER && process.env.AI_API_KEY && process.env.AI_MODEL) { + aiConfigurations.push({ + name: process.env.AI_CONFIG_NAME ?? 'default', + provider: process.env.AI_PROVIDER, + apiKey: process.env.AI_API_KEY, + model: process.env.AI_MODEL, + }); + } + + const { createRemoteAgentClient } = await import('@forestadmin/agent-client'); + + const agentClient = createRemoteAgentClient({ url: agentUrl }); + const workflowPort = new ForestServerWorkflowPort({ envSecret, forestServerUrl }); + const agentPort = new AgentClientAgentPort({ client: agentClient, collectionSchemas: {} }); + const runStore = new InMemoryRunStore(); + const aiClient = new AiClient({ aiConfigurations }); + + const runner = new Runner({ + agentPort, + workflowPort, + runStore, + pollingIntervalMs, + aiClient, + envSecret, + authSecret, + httpPort, + }); + + await runner.start(); + console.log(`Workflow executor started on port ${httpPort}`); + + const shutdown = async () => { + console.log('Shutting down...'); + await runner.stop(); + process.exit(0); + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); +} + +main().catch(err => { + console.error('Failed to start workflow executor:', err); + process.exit(1); +});