diff --git a/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts b/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts index 801b7917..4775b00e 100644 --- a/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts +++ b/lambdas/update-letter-queue/src/__tests__/update-letter-queue.test.ts @@ -6,7 +6,6 @@ import { } from "@internal/datastore"; import { mockDeep } from "jest-mock-extended"; import pino from "pino"; -import { MetricStatus } from "@internal/helpers"; import { Context, DynamoDBRecord, @@ -32,9 +31,12 @@ const mockedDeps: jest.Mocked = { env: {} as unknown as EnvVars, } as Deps; -function generateLetter(status: LetterStatus, id?: string): Letter { +function generateLetter( + status: LetterStatus, + overrides?: Partial, +): Letter { return { - id: id || "1", + id: "1", status, specificationId: "spec1", supplierId: "supplier1", @@ -48,6 +50,7 @@ function generateLetter(status: LetterStatus, id?: string): Letter { source: "test-source", subject: "test-subject", billingRef: "billing-ref-1", + ...overrides, }; } @@ -144,8 +147,8 @@ describe("update-letter-queue Lambda", () => { it("returns on the first failure", async () => { const handler = createHandler(mockedDeps); - const newLetter1 = generateLetter("PENDING", "1"); - const newLetter2 = generateLetter("PENDING", "2"); + const newLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { id: "2" }); (mockedDeps.letterQueueRepository.putLetter as jest.Mock) .mockRejectedValueOnce({}) .mockResolvedValueOnce({}); @@ -164,8 +167,8 @@ describe("update-letter-queue Lambda", () => { it("does not treat a replayed insert as a failure", async () => { const handler = createHandler(mockedDeps); - const newLetter1 = generateLetter("PENDING", "1"); - const newLetter2 = generateLetter("PENDING", "2"); + const newLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { id: "2" }); (mockedDeps.letterQueueRepository.putLetter as jest.Mock) .mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1")) .mockResolvedValueOnce({}); @@ -181,10 +184,10 @@ describe("update-letter-queue Lambda", () => { it("does not treat a replayed delete as a failure", async () => { const handler = createHandler(mockedDeps); - const oldLetter1 = generateLetter("PENDING", "1"); - const oldLetter2 = generateLetter("PENDING", "2"); - const newLetter1 = generateLetter("ACCEPTED", "1"); - const newLetter2 = generateLetter("ACCEPTED", "2"); + const oldLetter1 = generateLetter("PENDING", { id: "1" }); + const oldLetter2 = generateLetter("PENDING", { id: "2" }); + const newLetter1 = generateLetter("ACCEPTED", { id: "1" }); + const newLetter2 = generateLetter("ACCEPTED", { id: "2" }); (mockedDeps.letterQueueRepository.deleteLetter as jest.Mock) .mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1")) .mockResolvedValueOnce({}); @@ -227,26 +230,51 @@ describe("update-letter-queue Lambda", () => { }); describe("Metrics", () => { - it("emits success metrics when all letters are processed successfully", async () => { + // eslint-disable-next-line jest/expect-expect + it("logs a metric containing the delta of pending letters added/deleted", async () => { const handler = createHandler(mockedDeps); - const oldLetter1 = generateLetter("PENDING", "1"); - const newLetter1 = generateLetter("ACCEPTED", "1"); - const newLetter2 = generateLetter("PENDING", "2"); + const oldLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter1 = generateLetter("ACCEPTED", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { id: "2" }); + const newLetter3 = generateLetter("PENDING", { id: "3" }); const testData = generateKinesisEvent([ generateModifyRecord(oldLetter1, newLetter1), generateInsertRecord(newLetter2), + generateInsertRecord(newLetter3), ]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(2); - assertFailureMetricLogged(0); + assertQueueDeltaMetricLogged("supplier1", 1); }); - it("emits failure metrics when a letter fails to be inserted", async () => { + // eslint-disable-next-line jest/expect-expect + it("breaks the metric down by supplier", async () => { const handler = createHandler(mockedDeps); - const newLetter1 = generateLetter("PENDING", "1"); - const newLetter2 = generateLetter("PENDING", "2"); + const oldLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter1 = generateLetter("ACCEPTED", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { + supplierId: "supplier2", + id: "2", + }); + const newLetter3 = generateLetter("PENDING", { id: "3" }); + + const testData = generateKinesisEvent([ + generateModifyRecord(oldLetter1, newLetter1), + generateInsertRecord(newLetter2), + generateInsertRecord(newLetter3), + ]); + await handler(testData, mockDeep(), jest.fn()); + + assertQueueDeltaMetricLogged("supplier1", 0); + assertQueueDeltaMetricLogged("supplier2", 1); + }); + + // eslint-disable-next-line jest/expect-expect + it("counts a failed insert as zero", async () => { + const handler = createHandler(mockedDeps); + const newLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { id: "2" }); (mockedDeps.letterQueueRepository.putLetter as jest.Mock) .mockResolvedValueOnce({}) .mockRejectedValueOnce(new Error("DynamoDB error")); @@ -257,16 +285,16 @@ describe("update-letter-queue Lambda", () => { ]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(1); - assertFailureMetricLogged(1); + assertQueueDeltaMetricLogged("supplier1", 1); }); - it("emits failure metrics when a letter fails to be deleted", async () => { + // eslint-disable-next-line jest/expect-expect + it("counts a failed delete as zero", async () => { const handler = createHandler(mockedDeps); - const oldLetter1 = generateLetter("PENDING", "1"); - const oldLetter2 = generateLetter("PENDING", "2"); - const newLetter1 = generateLetter("ACCEPTED", "1"); - const newLetter2 = generateLetter("ACCEPTED", "2"); + const oldLetter1 = generateLetter("PENDING", { id: "1" }); + const oldLetter2 = generateLetter("PENDING", { id: "2" }); + const newLetter1 = generateLetter("ACCEPTED", { id: "1" }); + const newLetter2 = generateLetter("ACCEPTED", { id: "2" }); (mockedDeps.letterQueueRepository.deleteLetter as jest.Mock) .mockResolvedValueOnce({}) .mockRejectedValueOnce(new Error("DynamoDB error")); @@ -277,14 +305,14 @@ describe("update-letter-queue Lambda", () => { ]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(1); - assertFailureMetricLogged(1); + assertQueueDeltaMetricLogged("supplier1", -1); }); - it("does not count a replayed insert as a success or failure", async () => { + // eslint-disable-next-line jest/expect-expect + it("counts a replayed insert as zero", async () => { const handler = createHandler(mockedDeps); - const newLetter1 = generateLetter("PENDING", "1"); - const newLetter2 = generateLetter("PENDING", "2"); + const newLetter1 = generateLetter("PENDING", { id: "1" }); + const newLetter2 = generateLetter("PENDING", { id: "2" }); (mockedDeps.letterQueueRepository.putLetter as jest.Mock) .mockRejectedValueOnce(new LetterAlreadyExistsError("supplier1", "1")) @@ -296,16 +324,16 @@ describe("update-letter-queue Lambda", () => { ]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(1); - assertFailureMetricLogged(0); + assertQueueDeltaMetricLogged("supplier1", 1); }); - it("does not count a replayed delete as a success or failure", async () => { + // eslint-disable-next-line jest/expect-expect + it("counts a replayed delete as zero", async () => { const handler = createHandler(mockedDeps); - const oldLetter1 = generateLetter("PENDING", "1"); - const oldLetter2 = generateLetter("PENDING", "2"); - const newLetter1 = generateLetter("ACCEPTED", "1"); - const newLetter2 = generateLetter("ACCEPTED", "2"); + const oldLetter1 = generateLetter("PENDING", { id: "1" }); + const oldLetter2 = generateLetter("PENDING", { id: "2" }); + const newLetter1 = generateLetter("ACCEPTED", { id: "1" }); + const newLetter2 = generateLetter("ACCEPTED", { id: "2" }); (mockedDeps.letterQueueRepository.deleteLetter as jest.Mock) .mockRejectedValueOnce(new LetterDoesNotExistError("supplier1", "1")) .mockResolvedValueOnce({}); @@ -316,19 +344,36 @@ describe("update-letter-queue Lambda", () => { ]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(1); - assertFailureMetricLogged(0); + assertQueueDeltaMetricLogged("supplier1", -1); }); - it("emits zero success metrics when no pending letters are in the batch", async () => { + // eslint-disable-next-line jest/expect-expect + it("logs zero counts when no pending letters are in the batch", async () => { const handler = createHandler(mockedDeps); const newLetter = generateLetter("PRINTED"); const testData = generateKinesisEvent([generateInsertRecord(newLetter)]); await handler(testData, mockDeep(), jest.fn()); - assertSuccessMetricLogged(0); - assertFailureMetricLogged(0); + assertQueueDeltaMetricNotLogged(); + }); + + it("skips records with no NewImage (e.g. DELETE events) without error", async () => { + const handler = createHandler(mockedDeps); + const deleteRecord: DynamoDBRecord = { + eventName: "REMOVE", + dynamodb: { OldImage: mapToImage(generateLetter("PENDING")) }, + }; + + const testData = generateKinesisEvent([deleteRecord]); + const result = await handler(testData, mockDeep(), jest.fn()); + + expect(mockedDeps.letterQueueRepository.putLetter).not.toHaveBeenCalled(); + expect( + mockedDeps.letterQueueRepository.deleteLetter, + ).not.toHaveBeenCalled(); + expect(result.batchItemFailures).toEqual([]); + assertQueueDeltaMetricNotLogged(); }); }); }); @@ -375,43 +420,42 @@ function mapToImage(oldLetter: Letter) { ); } -function assertSuccessMetricLogged(count: number) { +function assertQueueDeltaMetricLogged(supplierId: string, delta: number) { expect(mockedDeps.logger.info).toHaveBeenCalledWith( expect.objectContaining({ + supplier: supplierId, _aws: expect.objectContaining({ CloudWatchMetrics: expect.arrayContaining([ expect.objectContaining({ Metrics: [ expect.objectContaining({ - Name: MetricStatus.Success, - Value: count, + Name: "QueueDelta", + Value: delta, + Unit: Unit.Count, }), ], }), ]), }), - success: count, + QueueDelta: delta, }), ); } -function assertFailureMetricLogged(count: number) { - expect(mockedDeps.logger.info).toHaveBeenCalledWith( +function assertQueueDeltaMetricNotLogged() { + expect(mockedDeps.logger.info).not.toHaveBeenCalledWith( expect.objectContaining({ _aws: expect.objectContaining({ CloudWatchMetrics: expect.arrayContaining([ expect.objectContaining({ Metrics: [ expect.objectContaining({ - Name: MetricStatus.Failure, - Value: count, - Unit: Unit.Count, + Name: "QueueDelta", }), ], }), ]), }), - failure: count, }), ); } diff --git a/lambdas/update-letter-queue/src/update-letter-queue.ts b/lambdas/update-letter-queue/src/update-letter-queue.ts index 392336fb..a41dfc0b 100644 --- a/lambdas/update-letter-queue/src/update-letter-queue.ts +++ b/lambdas/update-letter-queue/src/update-letter-queue.ts @@ -6,7 +6,7 @@ import { } from "aws-lambda"; import { unmarshall } from "@aws-sdk/util-dynamodb"; import { Unit } from "aws-embedded-metrics"; -import { MetricStatus, buildEMFObject } from "@internal/helpers"; +import { buildEMFObject } from "@internal/helpers"; import { InsertPendingLetter, Letter, @@ -20,6 +20,9 @@ export default function createHandler(deps: Deps): Handler { return async (streamEvent: KinesisStreamEvent) => { let successCount = 0; + // The change in the size of the pending letters queue, keyed by supplier + const deltasBySupplierId = new Map(); + deps.logger.info({ description: "Received event", streamEvent }); deps.logger.info({ description: "Number of records", @@ -31,11 +34,15 @@ export default function createHandler(deps: Deps): Handler { try { if (isNewPendingLetter(ddbRecord)) { - const added = await addPendingLetterToQueue(ddbRecord, deps); - successCount += added ? 1 : 0; + const letter = extractNewOrUpdatedLetter(ddbRecord); + const added = await addPendingLetterToQueue(letter, deps); + updateDeltas(deltasBySupplierId, letter.supplierId, added); + successCount += added; } else if (isNoLongerPending(ddbRecord)) { - const deleted = await deletePendingLetterFromQueue(ddbRecord, deps); - successCount += deleted ? 1 : 0; + const letter = extractNewOrUpdatedLetter(ddbRecord); + const deleted = await deletePendingLetterFromQueue(letter, deps); + updateDeltas(deltasBySupplierId, letter.supplierId, -deleted); + successCount += deleted; } } catch (error) { deps.logger.error({ @@ -43,7 +50,7 @@ export default function createHandler(deps: Deps): Handler { error, ddbRecord, }); - recordProcessing(deps, successCount, 1); + recordProcessing(deps, successCount, 1, deltasBySupplierId); // If we get a failure, return immediately without processing the remaining records. Since we are // working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost. // See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html @@ -54,16 +61,15 @@ export default function createHandler(deps: Deps): Handler { }; } } - recordProcessing(deps, successCount, 0); + recordProcessing(deps, successCount, 0, deltasBySupplierId); return { batchItemFailures: [] }; }; } async function addPendingLetterToQueue( - ddbRecord: DynamoDBRecord, + letter: Letter, deps: Deps, -): Promise { - const letter = extractNewLetter(ddbRecord); +): Promise { const pendingLetter = mapLetterToPendingLetter(letter); try { @@ -72,7 +78,7 @@ async function addPendingLetterToQueue( pendingLetter, }); await deps.letterQueueRepository.putLetter(pendingLetter); - return true; + return 1; } catch (error) { if (error instanceof LetterAlreadyExistsError) { deps.logger.warn({ @@ -80,17 +86,16 @@ async function addPendingLetterToQueue( supplierId: pendingLetter.supplierId, letterId: pendingLetter.letterId, }); - return false; + return 0; } throw error; } } async function deletePendingLetterFromQueue( - ddbRecord: DynamoDBRecord, + letter: Letter, deps: Deps, -): Promise { - const letter = extractNewLetter(ddbRecord); +): Promise { try { deps.logger.info({ description: "Deleting pending letter", @@ -98,7 +103,7 @@ async function deletePendingLetterFromQueue( letterId: letter.id, }); await deps.letterQueueRepository.deleteLetter(letter.supplierId, letter.id); - return true; + return 1; } catch (error) { if (error instanceof LetterDoesNotExistError) { deps.logger.warn({ @@ -106,7 +111,7 @@ async function deletePendingLetterFromQueue( supplierId: letter.supplierId, letterId: letter.id, }); - return false; + return 0; } throw error; } @@ -116,6 +121,7 @@ function recordProcessing( deps: Deps, successCount: number, failureCount: number, + deltasBySupplierId: Map, ) { deps.logger.info({ description: "Processing complete", @@ -124,8 +130,9 @@ function recordProcessing( totalProcessed: successCount + failureCount, }); - deps.logger.info(buildMetric(MetricStatus.Success, successCount)); - deps.logger.info(buildMetric(MetricStatus.Failure, failureCount)); + for (const [supplierId, delta] of deltasBySupplierId) { + deps.logger.info(buildMetric(supplierId, delta)); + } } function isNewPendingLetter(record: DynamoDBRecord): boolean { @@ -172,8 +179,8 @@ function extractPayload( } } -function extractNewLetter(record: DynamoDBRecord): Letter { - const newImage = record.dynamodb?.NewImage!; +function extractNewOrUpdatedLetter(record: DynamoDBRecord): Letter { + const newImage = record.dynamodb?.NewImage; return LetterSchema.parse(unmarshall(newImage as any)); } @@ -186,14 +193,23 @@ function mapLetterToPendingLetter(letter: Letter): InsertPendingLetter { }; } -function buildMetric(status: MetricStatus, count: number) { +function buildMetric(supplierId: string, delta: number) { return buildEMFObject( "update-letter-queue", - {}, + { supplier: supplierId }, { - key: status, - value: count, + key: "QueueDelta", + value: delta, unit: Unit.Count, }, ); } + +function updateDeltas( + deltasBySupplierId: Map, + supplierId: string, + delta: number, +): void { + const current = deltasBySupplierId.get(supplierId) ?? 0; + deltasBySupplierId.set(supplierId, current + delta); +} diff --git a/package-lock.json b/package-lock.json index dda49061..d288d6a0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13489,9 +13489,9 @@ } }, "node_modules/flatted": { - "version": "3.3.4", - "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.3.4.tgz", - "integrity": "sha512-3+mMldrTAPdta5kjX2G2J7iX4zxtnwpdA8Tr2ZSjkyPSanvbZAcy6flmtnXbEybHrDcU9641lxrMfFuUxVz9vA==", + "version": "3.4.1", + "resolved": "https://registry.npmjs.org/flatted/-/flatted-3.4.1.tgz", + "integrity": "sha512-IxfVbRFVlV8V/yRaGzk0UVIcsKKHMSfYw66T/u4nTwlWteQePsxe//LjudR1AMX4tZW3WFCh3Zqa/sjlqpbURQ==", "dev": true, "license": "ISC" }, @@ -21829,9 +21829,9 @@ "license": "MIT" }, "node_modules/undici": { - "version": "7.22.0", - "resolved": "https://registry.npmjs.org/undici/-/undici-7.22.0.tgz", - "integrity": "sha512-RqslV2Us5BrllB+JeiZnK4peryVTndy9Dnqq62S3yYRRTj0tFQCwEniUy2167skdGOy3vqRzEvl1Dm4sV2ReDg==", + "version": "7.24.4", + "resolved": "https://registry.npmjs.org/undici/-/undici-7.24.4.tgz", + "integrity": "sha512-BM/JzwwaRXxrLdElV2Uo6cTLEjhSb3WXboncJamZ15NgUURmvlXvxa6xkwIOILIjPNo9i8ku136ZvWV0Uly8+w==", "dev": true, "license": "MIT", "engines": { diff --git a/package.json b/package.json index 7bf1c6b4..8ec3fd89 100644 --- a/package.json +++ b/package.json @@ -66,6 +66,8 @@ "axios": "^1.13.5", "fast-xml-parser": "^5.3.6", "@isaacs/brace-expansion": "^5.0.1", + "flatted": "^3.4.0", + "undici": "^7.24.0", "pretty-format": { "react-is": "19.0.0" }, diff --git a/tests/e2e-tests/poetry.lock b/tests/e2e-tests/poetry.lock index a1f09a32..498e37c3 100644 --- a/tests/e2e-tests/poetry.lock +++ b/tests/e2e-tests/poetry.lock @@ -863,21 +863,21 @@ windows-terminal = ["colorama (>=0.4.6)"] [[package]] name = "pyjwt" -version = "2.10.1" +version = "2.12.1" description = "JSON Web Token implementation in Python" optional = false python-versions = ">=3.9" groups = ["main"] files = [ - {file = "PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb"}, - {file = "pyjwt-2.10.1.tar.gz", hash = "sha256:3cc5772eb20009233caf06e9d8a0577824723b44e6648ee0a2aedb6cf9381953"}, + {file = "pyjwt-2.12.1-py3-none-any.whl", hash = "sha256:28ca37c070cad8ba8cd9790cd940535d40274d22f80ab87f3ac6a713e6e8454c"}, + {file = "pyjwt-2.12.1.tar.gz", hash = "sha256:c74a7a2adf861c04d002db713dd85f84beb242228e671280bf709d765b03672b"}, ] [package.extras] crypto = ["cryptography (>=3.4.0)"] -dev = ["coverage[toml] (==5.0.4)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=6.0.0,<7.0.0)", "sphinx", "sphinx-rtd-theme", "zope.interface"] +dev = ["coverage[toml] (==7.10.7)", "cryptography (>=3.4.0)", "pre-commit", "pytest (>=8.4.2,<9.0.0)", "sphinx", "sphinx-rtd-theme", "zope.interface"] docs = ["sphinx", "sphinx-rtd-theme", "zope.interface"] -tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] +tests = ["coverage[toml] (==7.10.7)", "pytest (>=8.4.2,<9.0.0)"] [[package]] name = "pyotp"