Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
{ logger: this.logger },
);

// Observed: server replies 200 with a null id when it declines to persist (PRD-428).
// Surface it once here; later mark-as-* calls then no-op instead of 404-looping.
if (!response.id) {
this.logger.warn('activity log not persisted by server — status updates will be skipped', {
action: args.action,
collectionId: args.collectionId,
});

return { id: null };
}

return { id: response.id, index: response.attributes.index };
} catch (cause) {
this.logger.error('activity log create failed', {
Expand All @@ -53,43 +64,53 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
}

async markSucceeded(handle: ActivityLogHandle): Promise<void> {
// Not persisted (see createPending) → no log to update; skip the 404-retrying status call.
if (!('index' in handle)) return undefined;

const { id, index } = handle;

return this.drainer.track(async () => {
try {
await withRetry(
'activity log mark-as-completed',
() =>
this.service.updateActivityLogStatus({
forestServerToken: this.forestServerToken,
activityLog: { id: handle.id, attributes: { index: handle.index } },
activityLog: { id, attributes: { index } },
status: 'completed',
}),
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('activity log mark-as-completed failed', {
handleId: handle.id,
handleId: id,
error: extractErrorMessage(err),
});
}
});
}

async markFailed(handle: ActivityLogHandle, errorMessage: string): Promise<void> {
// Not persisted (see markSucceeded) → skip.
if (!('index' in handle)) return undefined;

const { id, index } = handle;

return this.drainer.track(async () => {
try {
await withRetry(
'activity log mark-as-failed',
() =>
this.service.updateActivityLogStatus({
forestServerToken: this.forestServerToken,
activityLog: { id: handle.id, attributes: { index: handle.index } },
activityLog: { id, attributes: { index } },
status: 'failed',
}),
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('activity log mark-as-failed failed', {
handleId: handle.id,
handleId: id,
stepErrorMessage: errorMessage,
error: extractErrorMessage(err),
});
Expand Down
7 changes: 3 additions & 4 deletions packages/workflow-executor/src/ports/activity-log-port.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ export interface CreateActivityLogArgs {
label?: string;
}

export interface ActivityLogHandle {
id: string;
index: string;
}
// `{ id: null }` means the server declined to persist the log (returns 200 with a null id).
// markSucceeded/markFailed short-circuit on it — there is nothing to update.
export type ActivityLogHandle = { id: string; index: string } | { id: null };

// Per-run scoped port: token baked into the adapter's constructor. markSucceeded/markFailed
// retry transient failures internally and are invoked with `void` from base-step-executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,45 @@ describe('ForestadminClientActivityLogPort', () => {
expect.objectContaining({ collectionName: '11' }),
);
});

it('warns (with collectionId) and returns a not-persisted handle when the server responds 200 with id:null', async () => {
const service = makeService();
service.createActivityLog.mockResolvedValue({
id: null,
attributes: {},
} as unknown as { id: string; attributes: { index: string } });
const logger = makeLogger();
const port = makePort(service, { logger });

const handle = await port.createPending({
renderingId: 5,
action: 'action',
type: 'write',
collectionId: '11',
});

expect(handle).toEqual({ id: null });
expect(service.createActivityLog).toHaveBeenCalledTimes(1);
expect(logger.warn).toHaveBeenCalledWith(
'activity log not persisted by server — status updates will be skipped',
{ action: 'action', collectionId: '11' },
);
});
});

describe('markSucceeded', () => {
it('skips the status update and drainer tracking when the handle was not persisted', async () => {
const service = makeService();
const drainer = new ActivityLogDrainer();
const trackSpy = jest.spyOn(drainer, 'track');
const port = makePort(service, { drainer });

await port.markSucceeded({ id: null });

expect(service.updateActivityLogStatus).not.toHaveBeenCalled();
expect(trackSpy).not.toHaveBeenCalled();
});

it('retries on 503 and eventually resolves without rethrowing', async () => {
const service = makeService();
service.updateActivityLogStatus
Expand Down Expand Up @@ -214,6 +250,18 @@ describe('ForestadminClientActivityLogPort', () => {
});

describe('markFailed', () => {
it('skips the status update and drainer tracking when the handle was not persisted', async () => {
const service = makeService();
const drainer = new ActivityLogDrainer();
const trackSpy = jest.spyOn(drainer, 'track');
const port = makePort(service, { drainer });

await port.markFailed({ id: null }, 'boom');

expect(service.updateActivityLogStatus).not.toHaveBeenCalled();
expect(trackSpy).not.toHaveBeenCalled();
});

it('sends status: failed (no errorMessage — server schema rejects unknown fields) and retries on 503', async () => {
const service = makeService();
service.updateActivityLogStatus
Expand Down
Loading