Skip to content

Commit 1f28b96

Browse files
committed
feat(agent): proxify ws executor call
1 parent 252b69d commit 1f28b96

10 files changed

Lines changed: 449 additions & 0 deletions

File tree

packages/agent/src/routes/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import ScopeInvalidation from './security/scope-invalidation';
3131
import ErrorHandling from './system/error-handling';
3232
import HealthCheck from './system/healthcheck';
3333
import Logger from './system/logger';
34+
import WorkflowExecutorProxyRoute from './workflow/workflow-executor-proxy';
3435

3536
export const ROOT_ROUTES_CTOR = [
3637
Authentication,
@@ -172,6 +173,12 @@ function getAiRoutes(options: Options, services: Services, aiRouter: AiRouter |
172173
return [new AiProxyRoute(services, options, aiRouter)];
173174
}
174175

176+
function getWorkflowExecutorRoutes(options: Options, services: Services): BaseRoute[] {
177+
if (!options.workflowExecutorUrl) return [];
178+
179+
return [new WorkflowExecutorProxyRoute(services, options)];
180+
}
181+
175182
export default function makeRoutes(
176183
dataSource: DataSource,
177184
options: Options,
@@ -187,6 +194,7 @@ export default function makeRoutes(
187194
...getRelatedRoutes(dataSource, options, services),
188195
...getActionRoutes(dataSource, options, services),
189196
...getAiRoutes(options, services, aiRouter),
197+
...getWorkflowExecutorRoutes(options, services),
190198
];
191199

192200
// Ensure routes and middlewares are loaded in the right order.
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import type { ForestAdminHttpDriverServices } from '../../services';
2+
import type { AgentOptionsWithDefaults } from '../../types';
3+
import type KoaRouter from '@koa/router';
4+
import type { Context } from 'koa';
5+
6+
import { request as httpRequest } from 'http';
7+
import { request as httpsRequest } from 'https';
8+
9+
import { HttpCode, RouteType } from '../../types';
10+
import BaseRoute from '../base-route';
11+
12+
export default class WorkflowExecutorProxyRoute extends BaseRoute {
13+
readonly type = RouteType.PrivateRoute;
14+
private readonly executorUrl: URL;
15+
16+
constructor(services: ForestAdminHttpDriverServices, options: AgentOptionsWithDefaults) {
17+
super(services, options);
18+
// Remove trailing slash for clean URL joining
19+
this.executorUrl = new URL(options.workflowExecutorUrl.replace(/\/+$/, ''));
20+
}
21+
22+
private static readonly AGENT_PREFIX = '/_internal/workflow-executions';
23+
private static readonly EXECUTOR_PREFIX = '/runs';
24+
25+
setupRoutes(router: KoaRouter): void {
26+
router.get('/_internal/workflow-executions/:runId', this.handleProxy.bind(this));
27+
router.post('/_internal/workflow-executions/:runId/trigger', this.handleProxy.bind(this));
28+
router.patch(
29+
'/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data',
30+
this.handleProxy.bind(this),
31+
);
32+
}
33+
34+
private async handleProxy(context: Context): Promise<void> {
35+
// Rewrite /_internal/workflow-executions/... → /runs/...
36+
const executorPath = context.path.replace(
37+
WorkflowExecutorProxyRoute.AGENT_PREFIX,
38+
WorkflowExecutorProxyRoute.EXECUTOR_PREFIX,
39+
);
40+
const targetUrl = new URL(executorPath, this.executorUrl);
41+
42+
const response = await this.forwardRequest(context.method, targetUrl, context.request.body);
43+
44+
context.response.status = response.status;
45+
context.response.body = response.body;
46+
}
47+
48+
private forwardRequest(
49+
method: string,
50+
url: URL,
51+
body?: unknown,
52+
): Promise<{ status: number; body: unknown }> {
53+
const requestFn = url.protocol === 'https:' ? httpsRequest : httpRequest;
54+
55+
return new Promise((resolve, reject) => {
56+
const req = requestFn(
57+
url,
58+
{ method, headers: { 'Content-Type': 'application/json' } },
59+
res => {
60+
const chunks: Uint8Array[] = [];
61+
res.on('data', chunk => chunks.push(chunk));
62+
res.on('end', () => {
63+
const raw = Buffer.concat(chunks).toString('utf-8');
64+
let parsed: unknown;
65+
66+
try {
67+
parsed = JSON.parse(raw);
68+
} catch {
69+
parsed = raw;
70+
}
71+
72+
resolve({ status: res.statusCode ?? HttpCode.InternalServerError, body: parsed });
73+
});
74+
res.on('error', reject);
75+
},
76+
);
77+
78+
req.on('error', reject);
79+
80+
if (body && method !== 'GET') {
81+
req.write(JSON.stringify(body));
82+
}
83+
84+
req.end();
85+
});
86+
}
87+
}

packages/agent/src/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ export type AgentOptions = {
4545
*/
4646
ignoreMissingSchemaElementErrors?: boolean;
4747
useUnsafeActionEndpoint?: boolean;
48+
/**
49+
* Base URL of the workflow executor to proxy requests to.
50+
* When set, the agent exposes routes at `/_internal/workflow-executions/`
51+
* that forward to the executor, benefiting from the agent's authentication layer.
52+
* @example 'http://localhost:4001'
53+
*/
54+
workflowExecutorUrl?: string | null;
4855
};
4956
export type AgentOptionsWithDefaults = Readonly<Required<AgentOptions>>;
5057

packages/agent/src/utils/options-validator.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export default class OptionsValidator {
3838
copyOptions.loggerLevel = copyOptions.loggerLevel || 'Info';
3939
copyOptions.skipSchemaUpdate = copyOptions.skipSchemaUpdate || false;
4040
copyOptions.instantCacheRefresh = copyOptions.instantCacheRefresh ?? true;
41+
copyOptions.workflowExecutorUrl = copyOptions.workflowExecutorUrl ?? null;
4142
copyOptions.maxBodySize = copyOptions.maxBodySize || '50mb';
4243
copyOptions.bodyParserOptions = copyOptions.bodyParserOptions || {
4344
jsonLimit: '50mb',

packages/agent/test/__factories__/forest-admin-http-driver-options.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ export default Factory.define<AgentOptionsWithDefaults>(() => ({
2929
},
3030
ignoreMissingSchemaElementErrors: false,
3131
useUnsafeActionEndpoint: false,
32+
workflowExecutorUrl: null,
3233
}));

packages/agent/test/__factories__/router.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export class RouterFactory extends Factory<Router> {
77
router.get = jest.fn();
88
router.delete = jest.fn();
99
router.use = jest.fn();
10+
router.patch = jest.fn();
1011
router.post = jest.fn();
1112
router.put = jest.fn();
1213
router.all = jest.fn();
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import http from 'http';
2+
3+
import { createMockContext } from '@shopify/jest-koa-mocks';
4+
5+
import WorkflowExecutorProxyRoute from '../../../src/routes/workflow/workflow-executor-proxy';
6+
import { RouteType } from '../../../src/types';
7+
import * as factories from '../../__factories__';
8+
9+
describe('WorkflowExecutorProxyRoute', () => {
10+
const options = factories.forestAdminHttpDriverOptions.build();
11+
const services = factories.forestAdminHttpDriverServices.build();
12+
const router = factories.router.mockAllMethods().build();
13+
14+
let executorServer: http.Server;
15+
let executorPort: number;
16+
17+
// Start a real HTTP server to act as the workflow executor
18+
beforeAll(async () => {
19+
executorServer = http.createServer((req, res) => {
20+
const chunks: Uint8Array[] = [];
21+
req.on('data', chunk => chunks.push(chunk));
22+
req.on('end', () => {
23+
const body = Buffer.concat(chunks).toString('utf-8');
24+
25+
res.setHeader('Content-Type', 'application/json');
26+
27+
if (req.url?.includes('not-found')) {
28+
res.writeHead(404);
29+
res.end(JSON.stringify({ error: 'Run not found or unavailable' }));
30+
} else if (req.method === 'GET' && req.url?.match(/^\/runs\/[\w-]+$/)) {
31+
res.writeHead(200);
32+
res.end(JSON.stringify({ steps: [{ stepId: 's1', status: 'success' }] }));
33+
} else if (req.method === 'POST' && req.url?.match(/^\/runs\/[\w-]+\/trigger$/)) {
34+
res.writeHead(200);
35+
res.end(JSON.stringify({ triggered: true }));
36+
} else if (
37+
req.method === 'PATCH' &&
38+
req.url?.match(/^\/runs\/[\w-]+\/steps\/\d+\/pending-data$/)
39+
) {
40+
const parsed = body ? JSON.parse(body) : {};
41+
res.writeHead(200);
42+
res.end(JSON.stringify({ updated: true, received: parsed }));
43+
} else {
44+
res.writeHead(404);
45+
res.end(JSON.stringify({ error: 'Not found' }));
46+
}
47+
});
48+
});
49+
50+
await new Promise<void>((resolve, reject) => {
51+
executorServer.listen(0, () => {
52+
executorPort = (executorServer.address() as { port: number }).port;
53+
resolve();
54+
});
55+
executorServer.on('error', reject);
56+
});
57+
});
58+
59+
afterAll(async () => {
60+
await new Promise<void>((resolve, reject) => {
61+
executorServer.close(err => (err ? reject(err) : resolve()));
62+
});
63+
});
64+
65+
beforeEach(() => {
66+
jest.clearAllMocks();
67+
});
68+
69+
const buildOptions = (url: string) =>
70+
factories.forestAdminHttpDriverOptions.build({ workflowExecutorUrl: url });
71+
72+
describe('constructor', () => {
73+
test('should have RouteType.PrivateRoute', () => {
74+
const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001'));
75+
76+
expect(route.type).toBe(RouteType.PrivateRoute);
77+
});
78+
});
79+
80+
describe('setupRoutes', () => {
81+
test('should register GET, POST and PATCH routes', () => {
82+
const route = new WorkflowExecutorProxyRoute(services, buildOptions('http://localhost:4001'));
83+
route.setupRoutes(router);
84+
85+
expect(router.get).toHaveBeenCalledWith(
86+
'/_internal/workflow-executions/:runId',
87+
expect.any(Function),
88+
);
89+
expect(router.post).toHaveBeenCalledWith(
90+
'/_internal/workflow-executions/:runId/trigger',
91+
expect.any(Function),
92+
);
93+
expect(router.patch).toHaveBeenCalledWith(
94+
'/_internal/workflow-executions/:runId/steps/:stepIndex/pending-data',
95+
expect.any(Function),
96+
);
97+
});
98+
});
99+
100+
describe('handleProxy', () => {
101+
test('should forward GET /runs/:runId and return executor response', async () => {
102+
const route = new WorkflowExecutorProxyRoute(
103+
services,
104+
buildOptions(`http://localhost:${executorPort}`),
105+
);
106+
107+
const context = createMockContext({
108+
customProperties: { params: { runId: 'run-123' } },
109+
});
110+
Object.defineProperty(context, 'path', {
111+
value: '/_internal/workflow-executions/run-123',
112+
});
113+
114+
await (route as any).handleProxy(context);
115+
116+
expect(context.response.status).toBe(200);
117+
expect(context.response.body).toEqual({
118+
steps: [{ stepId: 's1', status: 'success' }],
119+
});
120+
});
121+
122+
test('should forward POST /runs/:runId/trigger and return executor response', async () => {
123+
const route = new WorkflowExecutorProxyRoute(
124+
services,
125+
buildOptions(`http://localhost:${executorPort}`),
126+
);
127+
128+
const context = createMockContext({
129+
method: 'POST',
130+
customProperties: { params: { runId: 'run-456' } },
131+
});
132+
Object.defineProperty(context, 'path', {
133+
value: '/_internal/workflow-executions/run-456/trigger',
134+
});
135+
136+
await (route as any).handleProxy(context);
137+
138+
expect(context.response.status).toBe(200);
139+
expect(context.response.body).toEqual({ triggered: true });
140+
});
141+
142+
test('should forward error status from executor', async () => {
143+
const route = new WorkflowExecutorProxyRoute(
144+
services,
145+
buildOptions(`http://localhost:${executorPort}`),
146+
);
147+
148+
const context = createMockContext({
149+
customProperties: { params: { runId: 'not-found' } },
150+
});
151+
Object.defineProperty(context, 'path', {
152+
value: '/_internal/workflow-executions/not-found',
153+
});
154+
155+
await (route as any).handleProxy(context);
156+
157+
expect(context.response.status).toBe(404);
158+
expect(context.response.body).toEqual({ error: 'Run not found or unavailable' });
159+
});
160+
161+
test('should forward PATCH pending-data and pass request body', async () => {
162+
const route = new WorkflowExecutorProxyRoute(
163+
services,
164+
buildOptions(`http://localhost:${executorPort}`),
165+
);
166+
167+
const context = createMockContext({
168+
method: 'PATCH',
169+
customProperties: { params: { runId: 'run-789', stepIndex: '2' } },
170+
requestBody: { fieldValues: { name: 'updated' } },
171+
});
172+
Object.defineProperty(context, 'path', {
173+
value: '/_internal/workflow-executions/run-789/steps/2/pending-data',
174+
});
175+
176+
await (route as any).handleProxy(context);
177+
178+
expect(context.response.status).toBe(200);
179+
expect(context.response.body).toEqual({
180+
updated: true,
181+
received: { fieldValues: { name: 'updated' } },
182+
});
183+
});
184+
185+
test('should reject when executor is unreachable', async () => {
186+
const route = new WorkflowExecutorProxyRoute(
187+
services,
188+
buildOptions('http://localhost:1'), // port that should be unreachable
189+
);
190+
191+
const context = createMockContext({
192+
customProperties: { params: { runId: 'run-789' } },
193+
});
194+
Object.defineProperty(context, 'path', {
195+
value: '/_internal/workflow-executions/run-789',
196+
});
197+
198+
await expect((route as any).handleProxy(context)).rejects.toThrow();
199+
});
200+
});
201+
});
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
**/node_modules
2+
**/dist
3+
**/coverage
4+
**/.git
5+
**/*.test.ts
6+
**/test

0 commit comments

Comments
 (0)