Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3eee9b2
fix: improve readability of retry message assertions in tests
stuartp44 Dec 18, 2025
3e24ec6
Initial plan
Copilot Dec 18, 2025
90874d7
fix: move publishRetryMessage to end of loop and skip invalid messages
Copilot Dec 18, 2025
11c1578
refactor: improve performance and reduce code duplication
Copilot Dec 18, 2025
b5399f6
refactor: simplify naming and reduce complexity
Copilot Dec 18, 2025
cba7bbf
ci: run lambda workflow on all PRs, not just PRs to main
Copilot Dec 18, 2025
019ef82
Update lambdas/functions/control-plane/src/scale-runners/scale-up.ts
stuartp44 Dec 21, 2025
20ec578
Merge branch 'main' into stu/fix_job_retry
stuartp44 Dec 21, 2025
a218024
Add listEC2Runners verification to maximum runners test (#4980)
Copilot Jan 5, 2026
dd365f1
Merge branch 'stu/fix_job_retry' into copilot/sub-pr-4961
stuartp44 Jan 5, 2026
ef5c981
fix: apply prettier formatting fixes
Copilot Jan 5, 2026
0243458
fix: resolve merge conflicts from stu/fix_job_retry branch
Copilot Jan 5, 2026
c2712a6
revert: remove unnecessary formatting and workflow changes
Copilot Jan 8, 2026
311001e
fix: improve readability of retry message assertions in tests
stuartp44 Dec 18, 2025
a476eb0
Update lambdas/functions/control-plane/src/scale-runners/scale-up.ts
stuartp44 Dec 21, 2025
63d1914
Add listEC2Runners verification to maximum runners test (#4980)
Copilot Jan 5, 2026
8c0d34e
Merge branch 'main' into stu/fix_job_retry
npalm Jan 13, 2026
68b1794
Merge branch 'copilot/sub-pr-4961' into stu/fix_job_retry
Brend-Smits Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { createRunner, listEC2Runners } from './../aws/runners';
import { RunnerInputParameters } from './../aws/runners.d';
import * as scaleUpModule from './scale-up';
import { getParameter } from '@aws-github-runner/aws-ssm-util';
import { publishRetryMessage } from './job-retry';
import { describe, it, expect, beforeEach, vi } from 'vitest';
import type { Octokit } from '@octokit/rest';

Expand All @@ -33,6 +34,7 @@ const mockCreateRunner = vi.mocked(createRunner);
const mockListRunners = vi.mocked(listEC2Runners);
const mockSSMClient = mockClient(SSMClient);
const mockSSMgetParameter = vi.mocked(getParameter);
const mockPublishRetryMessage = vi.mocked(publishRetryMessage);

vi.mock('@octokit/rest', () => ({
Octokit: vi.fn().mockImplementation(function () {
Expand Down Expand Up @@ -63,6 +65,11 @@ vi.mock('@aws-github-runner/aws-ssm-util', async () => {
};
});

vi.mock('./job-retry', () => ({
publishRetryMessage: vi.fn(),
checkAndRetryJob: vi.fn(),
}));

export type RunnerType = 'ephemeral' | 'non-ephemeral';

// for ephemeral and non-ephemeral runners
Expand Down Expand Up @@ -1680,6 +1687,171 @@ describe('scaleUp with Github Data Residency', () => {
});
});

describe('Retry mechanism tests', () => {
beforeEach(() => {
process.env.ENABLE_ORGANIZATION_RUNNERS = 'true';
process.env.ENABLE_EPHEMERAL_RUNNERS = 'true';
process.env.ENABLE_JOB_QUEUED_CHECK = 'true';
process.env.RUNNERS_MAXIMUM_COUNT = '10';
expectedRunnerParams = { ...EXPECTED_RUNNER_PARAMS };
mockSSMClient.reset();
});

const createTestMessages = (
count: number,
overrides: Partial<scaleUpModule.ActionRequestMessageSQS>[] = [],
): scaleUpModule.ActionRequestMessageSQS[] => {
return Array.from({ length: count }, (_, i) => ({
...TEST_DATA_SINGLE,
id: i + 1,
messageId: `message-${i + 1}`,
...overrides[i],
}));
};

it('calls publishRetryMessage for each valid message when job is queued', async () => {
const messages = createTestMessages(3);
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef']); // Create all requested runners

await scaleUpModule.scaleUp(messages);

expect(mockPublishRetryMessage).toHaveBeenCalledTimes(3);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
id: 1,
messageId: 'message-1',
}),
);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
id: 2,
messageId: 'message-2',
}),
);
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
3,
expect.objectContaining({
id: 3,
messageId: 'message-3',
}),
);
});

it('does not call publishRetryMessage when job is not queued', async () => {
mockOctokit.actions.getJobForWorkflowRun.mockImplementation((params) => {
const isQueued = params.job_id === 1; // Only job 1 is queued
return {
data: {
status: isQueued ? 'queued' : 'completed',
},
};
});

const messages = createTestMessages(3);

await scaleUpModule.scaleUp(messages);

// Only message with id 1 should trigger retry
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(1);
expect(mockPublishRetryMessage).toHaveBeenCalledWith(
expect.objectContaining({
id: 1,
messageId: 'message-1',
}),
);
});

it('does not call publishRetryMessage when maximum runners is reached and messages are marked invalid', async () => {
process.env.RUNNERS_MAXIMUM_COUNT = '0'; // No runners can be created

const messages = createTestMessages(2);

await scaleUpModule.scaleUp(messages);

// Verify listEC2Runners is called to check current runner count
expect(listEC2Runners).toHaveBeenCalledWith({
environment: 'unit-test-environment',
runnerType: 'Org',
runnerOwner: TEST_DATA_SINGLE.repositoryOwner,
});

// publishRetryMessage should NOT be called because messages are marked as invalid
// Invalid messages go back to the SQS queue and will be retried there
expect(mockPublishRetryMessage).not.toHaveBeenCalled();
expect(createRunner).not.toHaveBeenCalled();
});

it('calls publishRetryMessage with correct message structure including retry counter', async () => {
const message = {
...TEST_DATA_SINGLE,
messageId: 'test-message-id',
retryCounter: 2,
};

await scaleUpModule.scaleUp([message]);

expect(mockPublishRetryMessage).toHaveBeenCalledWith(
expect.objectContaining({
id: message.id,
messageId: 'test-message-id',
retryCounter: 2,
}),
);
});

it('calls publishRetryMessage when ENABLE_JOB_QUEUED_CHECK is false', async () => {
process.env.ENABLE_JOB_QUEUED_CHECK = 'false';
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890']); // Create all requested runners

const messages = createTestMessages(2);

await scaleUpModule.scaleUp(messages);

// Should always call publishRetryMessage when queue check is disabled
expect(mockPublishRetryMessage).toHaveBeenCalledTimes(2);
expect(mockOctokit.actions.getJobForWorkflowRun).not.toHaveBeenCalled();
});

it('calls publishRetryMessage for each message in a multi-runner scenario', async () => {
mockCreateRunner.mockResolvedValue(['i-12345', 'i-67890', 'i-abcdef', 'i-11111', 'i-22222']); // Create all requested runners
const messages = createTestMessages(5);

await scaleUpModule.scaleUp(messages);

expect(mockPublishRetryMessage).toHaveBeenCalledTimes(5);
messages.forEach((msg, index) => {
expect(mockPublishRetryMessage).toHaveBeenNthCalledWith(
index + 1,
expect.objectContaining({
id: msg.id,
messageId: msg.messageId,
}),
);
});
});

it('calls publishRetryMessage after runner creation', async () => {
const messages = createTestMessages(1);
mockCreateRunner.mockResolvedValue(['i-12345']); // Create the requested runner

const callOrder: string[] = [];
mockPublishRetryMessage.mockImplementation(() => {
callOrder.push('publishRetryMessage');
return Promise.resolve();
});
mockCreateRunner.mockImplementation(async () => {
callOrder.push('createRunner');
return ['i-12345'];
});

await scaleUpModule.scaleUp(messages);

expect(callOrder).toEqual(['createRunner', 'publishRetryMessage']);
});
});

function defaultOctokitMockImpl() {
mockOctokit.actions.getJobForWorkflowRun.mockImplementation(() => ({
data: {
Expand Down
30 changes: 24 additions & 6 deletions lambdas/functions/control-plane/src/scale-runners/scale-up.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { createGithubAppAuth, createGithubInstallationAuth, createOctokitClient
import { createRunner, listEC2Runners, tag } from './../aws/runners';
import { RunnerInputParameters } from './../aws/runners.d';
import { metricGitHubAppRateLimit } from '../github/rate-limit';
import { publishRetryMessage } from './job-retry';

const logger = createChildLogger('scale-up');

Expand Down Expand Up @@ -276,7 +277,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
};

const validMessages = new Map<string, MessagesWithClient>();
const invalidMessages: string[] = [];
const rejectedMessageIds = new Set<string>();
for (const payload of payloads) {
const { eventType, messageId, repositoryName, repositoryOwner } = payload;
if (ephemeralEnabled && eventType !== 'workflow_job') {
Expand All @@ -285,7 +286,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
{ eventType, messageId },
);

invalidMessages.push(messageId);
rejectedMessageIds.add(messageId);

continue;
}
Expand Down Expand Up @@ -340,6 +341,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
for (const [group, { githubInstallationClient, messages }] of validMessages.entries()) {
// Work out how much we want to scale up by.
let scaleUp = 0;
const queuedMessages: ActionRequestMessageSQS[] = [];

for (const message of messages) {
const messageLogger = logger.createChild({
Expand All @@ -358,6 +360,7 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
}

scaleUp++;
queuedMessages.push(message);
}

if (scaleUp === 0) {
Expand Down Expand Up @@ -393,11 +396,18 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
if (ephemeralEnabled) {
// This removes `missingInstanceCount` items from the start of the array
// so that, if we retry more messages later, we pick fresh ones.
invalidMessages.push(...messages.splice(0, missingInstanceCount).map(({ messageId }) => messageId));
const removedMessages = messages.splice(0, missingInstanceCount);
removedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
}

// No runners will be created, so skip calling the EC2 API.
if (missingInstanceCount === scaleUp) {
if (newRunners <= 0) {
// Publish retry messages for messages that are not rejected
for (const message of queuedMessages) {
if (!rejectedMessageIds.has(message.messageId)) {
await publishRetryMessage(message as ActionRequestMessageRetry);
}
}
continue;
}
}
Expand Down Expand Up @@ -450,11 +460,19 @@ export async function scaleUp(payloads: ActionRequestMessageSQS[]): Promise<stri
failedInstanceCount,
});

invalidMessages.push(...messages.slice(0, failedInstanceCount).map(({ messageId }) => messageId));
const failedMessages = messages.slice(0, failedInstanceCount);
failedMessages.forEach(({ messageId }) => rejectedMessageIds.add(messageId));
}

// Publish retry messages for messages that are not rejected
for (const message of queuedMessages) {
if (!rejectedMessageIds.has(message.messageId)) {
await publishRetryMessage(message as ActionRequestMessageRetry);
}
}
}

return invalidMessages;
return Array.from(rejectedMessageIds);
}

export function getGitHubEnterpriseApiUrl() {
Expand Down