Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5b68cdc
init
JamesW1-NHS Jan 28, 2026
d983dd3
log bugfix
JamesW1-NHS Jan 29, 2026
c8641d0
interim: create_json_ack_file. pending new unit test and fixing old ones
JamesW1-NHS Jan 29, 2026
6a79685
TEMP_ACK_FILE
JamesW1-NHS Jan 29, 2026
38ce399
update_json_ack_file
JamesW1-NHS Jan 29, 2026
4d05469
ingestion_start_time
JamesW1-NHS Jan 29, 2026
ef5e005
complete_batch_file_process
JamesW1-NHS Jan 29, 2026
b2029ae
obtain_current_json_ack_content
JamesW1-NHS Jan 30, 2026
0c773d6
cut/paste fix
JamesW1-NHS Jan 30, 2026
fb16398
mock out get_ingestion_start_time_by_message_id
JamesW1-NHS Jan 30, 2026
b4221ef
fix & test get_ingestion_start_time_by_message_id
JamesW1-NHS Jan 30, 2026
cac9e32
bugfix
JamesW1-NHS Feb 2, 2026
319a98e
bugfix split()[]
JamesW1-NHS Feb 2, 2026
fc500db
test_update_ack_file
JamesW1-NHS Feb 2, 2026
26a3c6a
test_complete_batch_file_process_json_ack_file II
JamesW1-NHS Feb 2, 2026
706ee56
code cleanup
JamesW1-NHS Feb 2, 2026
3a75861
extra mocks
JamesW1-NHS Feb 2, 2026
f7e8e46
Merge branch 'master' into VED-1017-json-ack-file
JamesW1-NHS Feb 3, 2026
f215113
supplier
JamesW1-NHS Feb 3, 2026
b9d8fe8
Merge branch 'VED-1017-json-ack-file' of github.com:NHSDigital/immuni…
JamesW1-NHS Feb 3, 2026
10a7e91
restored move_file
JamesW1-NHS Feb 3, 2026
6374daf
json ack file unit tests for lambda_handler (part 1)
JamesW1-NHS Feb 4, 2026
c5fddb1
archive source file test - to support e2e testing
JamesW1-NHS Feb 4, 2026
59b1b38
remove redundant functions
JamesW1-NHS Feb 4, 2026
22521d2
bugfix: get message_id from correct place for JSON ack file
JamesW1-NHS Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lambdas/ack_backend/src/ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from common.batch.eof_utils import is_eof_message
from convert_message_to_ack_row import convert_message_to_ack_row
from logging_decorators import ack_lambda_handler_logging_decorator
from update_ack_file import complete_batch_file_process, update_ack_file
from update_ack_file import (
complete_batch_file_process,
update_ack_file,
update_json_ack_file,
)


@ack_lambda_handler_logging_decorator
Expand Down Expand Up @@ -54,6 +58,8 @@ def lambda_handler(event, _):

update_ack_file(file_key, created_at_formatted_string, ack_data_rows)

update_json_ack_file(message_id, file_key, created_at_formatted_string, ack_data_rows)

if file_processing_complete:
complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key)

Expand Down
104 changes: 99 additions & 5 deletions lambdas/ack_backend/src/update_ack_file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Functions for uploading the data to the ack file"""

import json
import os
import time
from datetime import datetime
Expand All @@ -8,10 +9,19 @@
from botocore.exceptions import ClientError

from common.aws_s3_utils import move_file
from common.batch.audit_table import get_record_count_and_failures_by_message_id, update_audit_table_item
from common.batch.audit_table import (
get_ingestion_start_time_by_message_id,
get_record_count_and_failures_by_message_id,
update_audit_table_item,
)
from common.clients import get_s3_client, logger
from common.log_decorator import generate_and_send_logs
from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME, AuditTableKeys, FileStatus
from common.models.batch_constants import (
ACK_BUCKET_NAME,
SOURCE_BUCKET_NAME,
AuditTableKeys,
FileStatus,
)
from constants import (
ACK_HEADERS,
BATCH_FILE_ARCHIVE_DIR,
Expand Down Expand Up @@ -71,6 +81,7 @@ def complete_batch_file_process(
the audit table status"""
start_time = time.time()

# finish CSV file
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"

move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}")
Expand All @@ -82,7 +93,9 @@ def complete_batch_file_process(
)

# Consider creating time utils and using datetime instead of time
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime())
time_now = time.gmtime(time.time())
ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time_now)
ingestion_end_time_seconds = int(time.strftime("%s", time_now))
successful_record_count = total_ack_rows_processed - total_failures
update_audit_table_item(
file_key=file_key,
Expand All @@ -93,6 +106,24 @@ def complete_batch_file_process(
},
)

# finish JSON file
json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}"
ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key)

generated_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z")
ack_data_dict["generatedDate"] = generated_date
ack_data_dict["provider"] = supplier
ack_data_dict["summary"]["totalRecords"] = total_ack_rows_processed
ack_data_dict["summary"]["success"] = successful_record_count
ack_data_dict["summary"]["failed"] = total_failures
ack_data_dict["summary"]["ingestionTime"]["end"] = ingestion_end_time_seconds

# Upload ack_data_dict to S3
json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8"))
get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key)
move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{json_ack_filename}", f"{COMPLETED_ACK_DIR}/{json_ack_filename}")

result = {
"message_id": message_id,
"file_key": file_key,
Expand Down Expand Up @@ -147,6 +178,42 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO:
return accumulated_csv_content


def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> dict:
"""Returns the current ack file content if the file exists, or else initialises the content with the ack headers."""
try:
# If ack file exists in S3 download the contents
existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key)
existing_content = existing_ack_file["Body"].read().decode("utf-8")
ack_data_dict = json.loads(existing_content)
except ClientError as error:
# If ack file does not exist in S3 create a new file containing the headers only
if error.response["Error"]["Code"] in ("404", "NoSuchKey"):
logger.info("No existing JSON ack file found in S3 - creating new file")

ingestion_start_time = get_ingestion_start_time_by_message_id(message_id)
raw_ack_filename = temp_ack_file_key.split(".")[0]

# Generate the initial fields
ack_data_dict = {}
ack_data_dict["system"] = "Immunisation FHIR API Batch Report"
ack_data_dict["version"] = 1 # TO FIX

ack_data_dict["generatedDate"] = "" # will be filled on completion
ack_data_dict["provider"] = "" # will be filled on completion
ack_data_dict["filename"] = raw_ack_filename
ack_data_dict["messageHeaderId"] = message_id

ack_data_dict["summary"] = {}
ack_data_dict["summary"]["ingestionTime"] = {}
ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time
ack_data_dict["failures"] = []
else:
logger.error("error whilst obtaining current JSON ack content: %s", error)
raise

return ack_data_dict


def update_ack_file(
file_key: str,
created_at_formatted_string: str,
Expand All @@ -155,7 +222,6 @@ def update_ack_file(
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
completed_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}"
accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key)

for row in ack_data_rows:
Expand All @@ -166,4 +232,32 @@ def update_ack_file(
csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8"))

get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key)
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, completed_ack_file_key)
logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key)


def update_json_ack_file(
message_id: str,
file_key: str,
created_at_formatted_string: str,
ack_data_rows: list,
) -> None:
"""Updates the ack file with the new data row based on the given arguments"""
ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}"
temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}"
ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key)

for row in ack_data_rows:
json_data_row = {}
json_data_row["rowId"] = row["MESSAGE_HEADER_ID"].split("^")[-1]
json_data_row["responseCode"] = row["RESPONSE_CODE"]
json_data_row["responseDisplay"] = row["RESPONSE_DISPLAY"]
json_data_row["severity"] = row["ISSUE_SEVERITY"]
json_data_row["localId"] = row["LOCAL_ID"]
json_data_row["operationOutcome"] = row["OPERATION_OUTCOME"]

ack_data_dict["failures"].append(json_data_row)

# Upload ack_data_dict to S3
json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8"))
get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key)
logger.info("JSON ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key)
37 changes: 37 additions & 0 deletions lambdas/ack_backend/tests/test_ack_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from utils.utils_for_ack_backend_tests import (
add_audit_entry_to_table,
generate_sample_existing_ack_content,
generate_sample_existing_json_ack_content,
validate_ack_file_content,
validate_json_ack_file_content,
)
from utils.values_for_ack_backend_tests import (
EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS,
Expand Down Expand Up @@ -94,15 +96,20 @@ def assert_ack_and_source_file_locations_correct(
source_file_key: str,
tmp_ack_file_key: str,
complete_ack_file_key: str,
tmp_json_ack_file_key: str,
complete_json_ack_file_key: str,
is_complete: bool,
) -> None:
"""Helper function to check the ack and source files have not been moved as the processing is not yet
complete"""
if is_complete:
ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key)
json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_json_ack_file_key)
else:
ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_ack_file_key)
json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_json_ack_file_key)
self.assertIsNotNone(ack_file["Body"].read())
self.assertIsNotNone(json_ack_file["Body"].read())

full_src_file_key = f"archive/{source_file_key}" if is_complete else f"processing/{source_file_key}"
src_file = self.s3_client.get_object(Bucket=BucketNames.SOURCE, Key=full_src_file_key)
Expand Down Expand Up @@ -131,6 +138,7 @@ def assert_audit_entry_counts_equal(self, message_id: str, expected_counts: dict

self.assertDictEqual(actual_counts, expected_counts)

# TODO: modify these for json_ack_file_content
def test_lambda_handler_main_multiple_records(self):
"""Test lambda handler with multiple records."""
# Set up an audit entry which does not yet have record_count recorded
Expand Down Expand Up @@ -264,6 +272,9 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records

# Original source file had 100 records
add_audit_entry_to_table(self.dynamodb_client, mock_batch_message_id, record_count=100)
existing_json_file_content = deepcopy(ValidValues.json_ack_initial_content)
existing_json_file_content["messageHeaderId"] = mock_batch_message_id

array_of_failure_messages = [
{
**BASE_FAILURE_MESSAGE,
Expand All @@ -287,10 +298,17 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records
[*array_of_failure_messages],
existing_file_content=ValidValues.ack_headers,
)
validate_json_ack_file_content(
self.s3_client,
[*array_of_failure_messages],
existing_file_content=existing_json_file_content,
)
self.assert_ack_and_source_file_locations_correct(
MOCK_MESSAGE_DETAILS.file_key,
MOCK_MESSAGE_DETAILS.temp_ack_file_key,
MOCK_MESSAGE_DETAILS.archive_ack_file_key,
MOCK_MESSAGE_DETAILS.temp_json_ack_file_key,
MOCK_MESSAGE_DETAILS.archive_json_ack_file_key,
is_complete=False,
)
self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed")
Expand All @@ -317,6 +335,12 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro
Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key,
Body=StringIO(existing_ack_content).getvalue(),
)
existing_json_ack_content = generate_sample_existing_json_ack_content(mock_batch_message_id)
self.s3_client.put_object(
Bucket=BucketNames.DESTINATION,
Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key,
Body=json.dumps(existing_json_ack_content),
)

array_of_failure_messages = [
{
Expand All @@ -331,22 +355,35 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro
all_messages_plus_eof = deepcopy(array_of_failure_messages)
all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message)
test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]}

expected_entry_counts = {
"record_count": "100",
"records_succeeded": "49",
"records_failed": "51",
}
# Include summary counts in expected JSON content
existing_json_ack_content["summary"]["totalRecords"] = int(expected_entry_counts["record_count"])
existing_json_ack_content["summary"]["success"] = int(expected_entry_counts["records_succeeded"])
existing_json_ack_content["summary"]["failed"] = int(expected_entry_counts["records_failed"])

response = lambda_handler(event=test_event, context={})

self.assertEqual(response, EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS)
validate_ack_file_content(
self.s3_client, [*array_of_failure_messages], existing_file_content=existing_ack_content, is_complete=True
)
validate_json_ack_file_content(
self.s3_client,
[*array_of_failure_messages],
existing_file_content=existing_json_ack_content,
is_complete=True,
)
self.assert_ack_and_source_file_locations_correct(
MOCK_MESSAGE_DETAILS.file_key,
MOCK_MESSAGE_DETAILS.temp_ack_file_key,
MOCK_MESSAGE_DETAILS.archive_ack_file_key,
MOCK_MESSAGE_DETAILS.temp_json_ack_file_key,
MOCK_MESSAGE_DETAILS.archive_json_ack_file_key,
is_complete=True,
)
self.assert_audit_entry_status_equals(mock_batch_message_id, "Processed")
Expand Down
13 changes: 10 additions & 3 deletions lambdas/ack_backend/tests/test_splunk_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def setUp(self):
self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION)
self.ack_bucket_patcher.start()

self.get_ingestion_start_time_by_message_id_patcher = patch(
"update_ack_file.get_ingestion_start_time_by_message_id"
)
self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start()
self.mock_get_ingestion_start_time_by_message_id.return_value = 3456

def tearDown(self):
GenericTearDown(self.s3_client)

Expand Down Expand Up @@ -100,9 +106,9 @@ def expected_lambda_handler_logs(self, success: bool, number_of_rows, ingestion_
"""Returns the expected logs for the lambda handler function."""
# Mocking of timings is such that the time taken is 2 seconds for each row,
# plus 2 seconds for the handler if it succeeds (i.e. it calls update_ack_file) or 1 second if it doesn't;
# plus an extra second if ingestion is complete
# plus an extra 2 seconds if ingestion is complete
if success:
time_taken = f"{number_of_rows * 2 + 3}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s"
time_taken = f"{number_of_rows * 2 + 4}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s"
else:
time_taken = f"{number_of_rows * 2 + 1}.0s"

Expand Down Expand Up @@ -429,6 +435,7 @@ def test_splunk_update_ack_file_logged(self):
patch("common.log_decorator.logger") as mock_logger, # noqa: E999
patch("update_ack_file.get_record_count_and_failures_by_message_id", return_value=(99, 2)),
patch("update_ack_file.update_audit_table_item") as mock_update_audit_table_item, # noqa: E999
patch("update_ack_file.move_file"), # noqa: E999
patch("ack_processor.increment_records_failed_count"), # noqa: E999
): # noqa: E999
mock_datetime.now.return_value = ValidValues.fixed_datetime
Expand All @@ -443,7 +450,7 @@ def test_splunk_update_ack_file_logged(self):
expected_secondlast_logger_info_data = {
**ValidValues.upload_ack_file_expected_log,
"message_id": "test",
"time_taken": "1.0s",
"time_taken": "2.0s",
}
expected_last_logger_info_data = self.expected_lambda_handler_logs(
success=True, number_of_rows=99, ingestion_complete=True
Expand Down
Loading
Loading