Skip to content
1 change: 1 addition & 0 deletions infrastructure/terraform/modules/eventpub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
| Name | Source | Version |
|------|--------|---------|
| <a name="module_s3bucket_event_cache"></a> [s3bucket\_event\_cache](#module\_s3bucket\_event\_cache) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.0.3/terraform-s3bucket.zip | n/a |
| <a name="module_sqs_queue"></a> [sqs\_queue](#module\_sqs\_queue) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.1.4/terraform-sqs.zip | n/a |
## Outputs

| Name | Description |
Expand Down
16 changes: 16 additions & 0 deletions infrastructure/terraform/modules/eventpub/iam_role_lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ data "aws_iam_policy_document" "lambda" {
]
}

statement {
sid = "AllowSQSInput"
effect = "Allow"

actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility",
]

resources = [
module.sqs_queue.sqs_queue_arn,
]
}

statement {
sid = "KMSCloudwatchKeyAccess"
effect = "Allow"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,19 @@ async function sendToDLQ(events) {
}
}

exports.handler = async (snsEvent) => {
console.debug(`Received SNS event with ${snsEvent.Records.length} records.`);
exports.handler = async (sqsEvent) => {
console.debug(`Received SQS event with ${sqsEvent.Records.length} records.`);

if (THROTTLE_DELAY_MS > 0) {
console.info(`Throttling enabled. Delaying processing by ${THROTTLE_DELAY_MS}ms`);
await new Promise(res => setTimeout(res, THROTTLE_DELAY_MS));
}

const records = snsEvent.Records.map(record => JSON.parse(record.Sns.Message));
const records = sqsEvent.Records
.map(record => record.body)
.map(JSON.parse)
.map(record => record.Message)
.map(JSON.parse);
const validEvents = records.filter(validateEvent);
const invalidEvents = records.filter(event => !validateEvent(event));

Expand Down
17 changes: 16 additions & 1 deletion infrastructure/terraform/modules/eventpub/lambda_function.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resource "aws_lambda_function" "main" {
handler = "index.handler"
runtime = "nodejs22.x"
publish = true
memory_size = 128
memory_size = 512
timeout = 20

filename = data.archive_file.lambda.output_path
Expand All @@ -28,3 +28,18 @@ resource "aws_lambda_function" "main" {
}
}
}

resource "aws_lambda_event_source_mapping" "sqs_to_lambda" {
event_source_arn = module.sqs_queue.sqs_queue_arn
function_name = aws_lambda_function.main.function_name
batch_size = 5000
maximum_batching_window_in_seconds = 1
function_response_types = [
"ReportBatchItemFailures"
]

scaling_config {
maximum_concurrency = 24
}
}

This file was deleted.

34 changes: 34 additions & 0 deletions infrastructure/terraform/modules/eventpub/module_sqs_queue.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
module "sqs_queue" {
source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/3.1.4/terraform-sqs.zip"

aws_account_id = var.aws_account_id
component = var.component
environment = var.environment
project = var.project
region = var.region
name = local.csi
create_dlq = true
sqs_kms_key_arn = var.kms_key_arn
sqs_policy_overload = data.aws_iam_policy_document.allow_sns_send.json
message_retention_seconds = 1209600 # 14 days
}

data "aws_iam_policy_document" "allow_sns_send" {
statement {
sid = "AllowSNSSendMessage"
effect = "Allow"

principals {
type = "Service"
identifiers = ["sns.amazonaws.com"]
}

actions = [
"sqs:SendMessage",
]

resources = [
module.sqs_queue.sqs_queue_arn,
]
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
resource "aws_sns_topic_subscription" "sqs" {
topic_arn = aws_sns_topic.main.arn
protocol = "sqs"
endpoint = module.sqs_queue.sqs_queue_arn
}