Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/agent/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import ScopeInvalidation from './security/scope-invalidation';
import ErrorHandling from './system/error-handling';
import HealthCheck from './system/healthcheck';
import Logger from './system/logger';
import WorkflowExecutorProxyRoute from './workflow/workflow-executor-proxy';

export const ROOT_ROUTES_CTOR = [
Authentication,
Expand Down Expand Up @@ -172,6 +173,12 @@ function getAiRoutes(options: Options, services: Services, aiRouter: AiRouter |
return [new AiProxyRoute(services, options, aiRouter)];
}

function getWorkflowExecutorRoutes(options: Options, services: Services): BaseRoute[] {
if (!options.workflowExecutorUrl) return [];

return [new WorkflowExecutorProxyRoute(services, options)];
}

export default function makeRoutes(
dataSource: DataSource,
options: Options,
Expand All @@ -187,6 +194,7 @@ export default function makeRoutes(
...getRelatedRoutes(dataSource, options, services),
...getActionRoutes(dataSource, options, services),
...getAiRoutes(options, services, aiRouter),
...getWorkflowExecutorRoutes(options, services),
];

// Ensure routes and middlewares are loaded in the right order.
Expand Down
87 changes: 87 additions & 0 deletions packages/agent/src/routes/workflow/workflow-executor-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { ForestAdminHttpDriverServices } from '../../services';
import type { AgentOptionsWithDefaults } from '../../types';
import type KoaRouter from '@koa/router';
import type { Context } from 'koa';

import { request as httpRequest } from 'http';
import { request as httpsRequest } from 'https';

import { HttpCode, RouteType } from '../../types';
import BaseRoute from '../base-route';

export default class WorkflowExecutorProxyRoute extends BaseRoute {
readonly type = RouteType.PrivateRoute;
private readonly executorUrl: URL;

constructor(services: ForestAdminHttpDriverServices, options: AgentOptionsWithDefaults) {
super(services, options);
// Remove trailing slash for clean URL joining
this.executorUrl = new URL(options.workflowExecutorUrl.replace(/\/+$/, ''));
}

private static readonly AGENT_PREFIX = '/_internal/workflow-executions';
private static readonly EXECUTOR_PREFIX = '/runs';

setupRoutes(router: KoaRouter): void {
router.get('/_internal/workflow-executions/:runId', this.handleProxy.bind(this));
router.post('/_internal/workflow-executions/:runId/trigger', this.handleProxy.bind(this));
router.patch(
'/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data',
this.handleProxy.bind(this),
);
}

private async handleProxy(context: Context): Promise<void> {
// Rewrite /_internal/workflow-executions/... → /runs/...
const executorPath = context.path.replace(
WorkflowExecutorProxyRoute.AGENT_PREFIX,
WorkflowExecutorProxyRoute.EXECUTOR_PREFIX,
);
Comment on lines +36 to +39
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium workflow/workflow-executor-proxy.ts:36

handleProxy uses context.path to construct the target URL, but path contains only the pathname without the query string. Requests with query parameters (e.g., ?foo=bar) lose those parameters when forwarded to the executor. Consider using context.url instead, which includes both the pathname and query string.

-    const executorPath = context.path.replace(
+    const executorPath = context.url.replace(
       WorkflowExecutorProxyRoute.AGENT_PREFIX,
       WorkflowExecutorProxyRoute.EXECUTOR_PREFIX,
     );
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file packages/agent/src/routes/workflow/workflow-executor-proxy.ts around lines 36-39:

`handleProxy` uses `context.path` to construct the target URL, but `path` contains only the pathname without the query string. Requests with query parameters (e.g., `?foo=bar`) lose those parameters when forwarded to the executor. Consider using `context.url` instead, which includes both the pathname and query string.

Evidence trail:
1. packages/agent/src/routes/workflow/workflow-executor-proxy.ts lines 36-40 (REVIEWED_COMMIT): Code uses `context.path.replace()` to construct `executorPath`, then `new URL(executorPath, this.executorUrl)` to create the target URL.
2. Koa documentation at https://koajs.com confirms: `request.path` - "Get request pathname" and `request.querystring` - "Get raw query string void of ?" - these are separate properties, confirming `path` does not include query parameters.

const targetUrl = new URL(executorPath, this.executorUrl);

const response = await this.forwardRequest(context.method, targetUrl, context.request.body);

context.response.status = response.status;
context.response.body = response.body;
}

private forwardRequest(
method: string,
url: URL,
body?: unknown,
): Promise<{ status: number; body: unknown }> {
const requestFn = url.protocol === 'https:' ? httpsRequest : httpRequest;

return new Promise((resolve, reject) => {
const req = requestFn(
url,
{ method, headers: { 'Content-Type': 'application/json' } },
res => {
const chunks: Uint8Array[] = [];
res.on('data', chunk => chunks.push(chunk));
res.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf-8');
let parsed: unknown;

try {
parsed = JSON.parse(raw);
} catch {
parsed = raw;
}

resolve({ status: res.statusCode ?? HttpCode.InternalServerError, body: parsed });
});
res.on('error', reject);
},
);

req.on('error', reject);

if (body && method !== 'GET') {
req.write(JSON.stringify(body));
}

req.end();
});
}
}
7 changes: 7 additions & 0 deletions packages/agent/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export type AgentOptions = {
*/
ignoreMissingSchemaElementErrors?: boolean;
useUnsafeActionEndpoint?: boolean;
/**
* Base URL of the workflow executor to proxy requests to.
* When set, the agent exposes routes at `/_internal/workflow-executions/`
* that forward to the executor, benefiting from the agent's authentication layer.
* @example 'http://localhost:4001'
*/
workflowExecutorUrl?: string | null;
};
export type AgentOptionsWithDefaults = Readonly<Required<AgentOptions>>;

Expand Down
1 change: 1 addition & 0 deletions packages/agent/src/utils/options-validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export default class OptionsValidator {
copyOptions.loggerLevel = copyOptions.loggerLevel || 'Info';
copyOptions.skipSchemaUpdate = copyOptions.skipSchemaUpdate || false;
copyOptions.instantCacheRefresh = copyOptions.instantCacheRefresh ?? true;
copyOptions.workflowExecutorUrl = copyOptions.workflowExecutorUrl ?? null;
copyOptions.maxBodySize = copyOptions.maxBodySize || '50mb';
copyOptions.bodyParserOptions = copyOptions.bodyParserOptions || {
jsonLimit: '50mb',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ export default Factory.define<AgentOptionsWithDefaults>(() => ({
},
ignoreMissingSchemaElementErrors: false,
useUnsafeActionEndpoint: false,
workflowExecutorUrl: null,
}));
1 change: 1 addition & 0 deletions packages/agent/test/__factories__/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export class RouterFactory extends Factory<Router> {
router.get = jest.fn();
router.delete = jest.fn();
router.use = jest.fn();
router.patch = jest.fn();
router.post = jest.fn();
router.put = jest.fn();
router.all = jest.fn();
Expand Down
201 changes: 201 additions & 0 deletions packages/agent/test/routes/workflow/workflow-executor-proxy.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import http from 'http';

import { createMockContext } from '@shopify/jest-koa-mocks';

import WorkflowExecutorProxyRoute from '../../../src/routes/workflow/workflow-executor-proxy';
import { RouteType } from '../../../src/types';
import * as factories from '../../__factories__';

describe('WorkflowExecutorProxyRoute', () => {
const options = factories.forestAdminHttpDriverOptions.build();
const services = factories.forestAdminHttpDriverServices.build();
const router = factories.router.mockAllMethods().build();

let executorServer: http.Server;
let executorPort: number;

// Start a real HTTP server to act as the workflow executor
beforeAll(async () => {
executorServer = http.createServer((req, res) => {
const chunks: Uint8Array[] = [];
req.on('data', chunk => chunks.push(chunk));
req.on('end', () => {
const body = Buffer.concat(chunks).toString('utf-8');

res.setHeader('Content-Type', 'application/json');

if (req.url?.includes('not-found')) {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Run not found or unavailable' }));
} else if (req.method === 'GET' && req.url?.match(/^\/runs\/[\w-]+$/)) {
res.writeHead(200);
res.end(JSON.stringify({ steps: [{ stepId: 's1', status: 'success' }] }));
} else if (req.method === 'POST' && req.url?.match(/^\/runs\/[\w-]+\/trigger$/)) {
res.writeHead(200);
res.end(JSON.stringify({ triggered: true }));
} else if (
req.method === 'PATCH' &&
req.url?.match(/^\/runs\/[\w-]+\/steps\/\d+\/pending-data$/)
) {
const parsed = body ? JSON.parse(body) : {};
res.writeHead(200);
res.end(JSON.stringify({ updated: true, received: parsed }));
} else {
res.writeHead(404);
res.end(JSON.stringify({ error: 'Not found' }));
}
});
});

await new Promise<void>((resolve, reject) => {
executorServer.listen(0, () => {
executorPort = (executorServer.address() as { port: number }).port;
resolve();
});
executorServer.on('error', reject);
});
});

afterAll(async () => {
await new Promise<void>((resolve, reject) => {
executorServer.close(err => (err ? reject(err) : resolve()));
});
});

beforeEach(() => {
jest.clearAllMocks();
});

const buildOptions = (url: string) =>
factories.forestAdminHttpDriverOptions.build({ workflowExecutorUrl: url });

describe('constructor', () => {
test('should have RouteType.PrivateRoute', () => {
const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001'));

expect(route.type).toBe(RouteType.PrivateRoute);
});
});

describe('setupRoutes', () => {
test('should register GET, POST and PATCH routes', () => {
const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001'));
route.setupRoutes(router);

expect(router.get).toHaveBeenCalledWith(
'/_internal/workflow-executions/:runId',
expect.any(Function),
);
expect(router.post).toHaveBeenCalledWith(
'/_internal/workflow-executions/:runId/trigger',
expect.any(Function),
);
expect(router.patch).toHaveBeenCalledWith(
'/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data',
expect.any(Function),
);
});
});

describe('handleProxy', () => {
test('should forward GET /runs/:runId and return executor response', async () => {
const route = new WorkflowExecutorProxyRoute(
services,
buildOptions(`http://localhost:${executorPort}`),
);

const context = createMockContext({
customProperties: { params: { runId: 'run-123' } },
});
Object.defineProperty(context, 'path', {
value: '/_internal/workflow-executions/run-123',
});

await (route as any).handleProxy(context);

expect(context.response.status).toBe(200);
expect(context.response.body).toEqual({
steps: [{ stepId: 's1', status: 'success' }],
});
});

test('should forward POST /runs/:runId/trigger and return executor response', async () => {
const route = new WorkflowExecutorProxyRoute(
services,
buildOptions(`http://localhost:${executorPort}`),
);

const context = createMockContext({
method: 'POST',
customProperties: { params: { runId: 'run-456' } },
});
Object.defineProperty(context, 'path', {
value: '/_internal/workflow-executions/run-456/trigger',
});

await (route as any).handleProxy(context);

expect(context.response.status).toBe(200);
expect(context.response.body).toEqual({ triggered: true });
});

test('should forward error status from executor', async () => {
const route = new WorkflowExecutorProxyRoute(
services,
buildOptions(`http://localhost:${executorPort}`),
);

const context = createMockContext({
customProperties: { params: { runId: 'not-found' } },
});
Object.defineProperty(context, 'path', {
value: '/_internal/workflow-executions/not-found',
});

await (route as any).handleProxy(context);

expect(context.response.status).toBe(404);
expect(context.response.body).toEqual({ error: 'Run not found or unavailable' });
});

test('should forward PATCH pending-data and pass request body', async () => {
const route = new WorkflowExecutorProxyRoute(
services,
buildOptions(`http://localhost:${executorPort}`),
);

const context = createMockContext({
method: 'PATCH',
customProperties: { params: { runId: 'run-789', stepIndex: '2' } },
requestBody: { fieldValues: { name: 'updated' } },
});
Object.defineProperty(context, 'path', {
value: '/_internal/workflow-executions/run-789/steps/2/pending-data',
});

await (route as any).handleProxy(context);

expect(context.response.status).toBe(200);
expect(context.response.body).toEqual({
updated: true,
received: { fieldValues: { name: 'updated' } },
});
});

test('should reject when executor is unreachable', async () => {
const route = new WorkflowExecutorProxyRoute(
services,
buildOptions('http://localhost:1'), // port that should be unreachable
);

const context = createMockContext({
customProperties: { params: { runId: 'run-789' } },
});
Object.defineProperty(context, 'path', {
value: '/_internal/workflow-executions/run-789',
});

await expect((route as any).handleProxy(context)).rejects.toThrow();
});
});
});
6 changes: 6 additions & 0 deletions packages/workflow-executor/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
**/node_modules
**/dist
**/coverage
**/.git
**/*.test.ts
**/test
Loading
Loading