Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions packages/workflow-executor/src/adapters/agent-client-agent-port.ts
Comment thread
EnkiP marked this conversation as resolved.
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
GetActionFormInfoQuery,
GetRecordQuery,
GetRelatedDataQuery,
GetSingleRelatedDataQuery,
UpdateRecordQuery,
} from '../ports/agent-port';
import type SchemaCache from '../schema-cache';
Expand All @@ -22,6 +23,10 @@ import {
extractErrorMessage,
} from '../errors';

function toCamelCase(name: string): string {
return name.replace(/_([a-zA-Z0-9])/g, (_, c: string) => c.toUpperCase());
}

// The agent-client HTTP layer deserializes JSON:API responses with camelCase keys.
// Field names in the schema and in GetRecordQuery.fields use the original format (e.g. snake_case).
// This function restores the original field names so callers can look up values by schema fieldName.
Expand All @@ -34,8 +39,7 @@ function restoreFieldNames(
const camelToOriginal: Record<string, string> = {};

for (const name of originalFieldNames) {
const camelName = name.replace(/_([a-zA-Z0-9])/g, (_, c: string) => c.toUpperCase());
camelToOriginal[camelName] = name;
camelToOriginal[toCamelCase(name)] = name;
}

return Object.fromEntries(Object.entries(values).map(([k, v]) => [camelToOriginal[k] ?? k, v]));
Expand Down Expand Up @@ -111,29 +115,24 @@ export default class AgentClientAgentPort implements AgentPort {
}

async getRelatedData(
{ collection, id, relation, limit, fields }: GetRelatedDataQuery,
{ collection, id, relation, relatedSchema, limit, fields }: GetRelatedDataQuery,
user: StepUser,
): Promise<RecordData[]> {
return this.callAgent('getRelatedData', async () => {
const client = this.createClient(user);
const parentSchema = this.resolveSchema(collection);
const relationField = parentSchema.fields.find(f => f.fieldName === relation);
const relatedCollectionName = relationField?.relatedCollectionName ?? relation;
const relatedSchema = this.resolveSchema(relatedCollectionName);

const records = await client
const rows = await client
.collection(collection)
.relation(relation, id)
.list<Record<string, unknown>>({
...(limit !== null && { pagination: { size: limit, number: 1 } }),
...(fields?.length && { fields }),
});

return records.map(record => {
const restored = restoreFieldNames(record, [
...relatedSchema.primaryKeyFields,
...(fields ?? []),
]);
return rows.map(row => {
const restored = restoreFieldNames(
row,
relatedSchema.fields.map(f => f.fieldName),
);

return {
collectionName: relatedSchema.collectionName,
Expand All @@ -144,6 +143,49 @@ export default class AgentClientAgentPort implements AgentPort {
});
}

// xToOne relations have no /relationships/<relation> route on the agent. We read the
// parent record with a `<relation>@@@<field>` projection and unpack the relation linkage
// jsonapi-serializer emits as a nested object on the parent (with the related PK packed
// under "id" when composite).
async getSingleRelatedData(
{ collection, id, relation, relatedSchema, fields }: GetSingleRelatedDataQuery,
user: StepUser,
): Promise<RecordData | null> {
return this.callAgent('getSingleRelatedData', async () => {
// The agent can't parse multiple sub-fields on one relation in a single projection
// (`fields[store]=id,name` is read as a single field name → ValidationError). The linkage
// `id` carries the (packed) related PK regardless of projection, so project at most ONE
// field: the requested reference field for display, else a single PK field just to pull the
// relation into the response.
const projectedField = fields?.[0] ?? relatedSchema.primaryKeyFields[0];
const parent = await this.getRecord(
{
collection,
id,
fields: [`${relation}@@@${projectedField}`],
},
user,
);

// agent-client camelCases relation keys; look the linkage up under the camelCased name.
const linkage = parent.values[toCamelCase(relation)] as
| Record<string, unknown>
| null
| undefined;
const packedId = linkage?.id as string | undefined;

if (!linkage || !packedId) return null;

const restored = restoreFieldNames(linkage, [projectedField]);

return {
collectionName: relatedSchema.collectionName,
recordId: packedId.split('|'),
values: restored,
};
});
}

async executeAction(
{ collection, action, id }: ExecuteActionQuery,
user: StepUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ const ROUTES = {
mcpServerConfigs: '/liana/mcp-server-configs-with-details',
};

// Forest sends relatedCollectionName as a `collection.targetKey` reference (e.g. "store.id");
// normalize it to a plain collection name (the target key lives in relatedPrimaryKey).
function stripReferenceKey(name: string | undefined): string | undefined {
return name?.includes('.') ? name.slice(0, name.lastIndexOf('.')) : name;
}

export default class ForestServerWorkflowPort implements WorkflowPort {
private readonly options: HttpOptions;
private readonly logger: Logger;
Expand Down Expand Up @@ -188,7 +194,15 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
);

try {
return CollectionSchemaSchema.parse(response);
const schema = CollectionSchemaSchema.parse(response);

return {
...schema,
fields: schema.fields.map(field => ({
...field,
relatedCollectionName: stripReferenceKey(field.relatedCollectionName),
})),
};
} catch (err) {
if (err instanceof z.ZodError) {
// runId is passed for observability — the schema call is scoped to a run.
Expand Down
2 changes: 1 addition & 1 deletion packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ export class WorkflowPortError extends WorkflowExecutorError {
`Workflow port "${operation}" failed: ${
cause instanceof Error ? cause.message : String(cause)
}`,
'Failed to communicate with the workflow orchestrator. Please try again.',
"This step couldn't be completed. Please try again, and contact your administrator if the problem continues.",
);
this.cause = cause;
}
Expand Down
Loading
Loading