From 5b68cdcfc10f67c5af6707c0456bbfb9a2911618 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 28 Jan 2026 13:01:45 +0000 Subject: [PATCH 01/23] init From d983dd3ee21cb32537ee1670a17e1baee455f025 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 11:51:15 +0000 Subject: [PATCH 02/23] log bugfix --- lambdas/ack_backend/src/update_ack_file.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index a4969c765..b9d564e6d 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -155,7 +155,6 @@ def update_ack_file( """Updates the ack file with the new data row based on the given arguments""" ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" - completed_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}" accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key) for row in ack_data_rows: @@ -166,4 +165,4 @@ def update_ack_file( csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8")) get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key) - logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, completed_ack_file_key) + logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key) From c8641d0bd017616213a657a06718adb8d9ca1975 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 14:25:03 +0000 Subject: [PATCH 03/23] interim: create_json_ack_file. pending new unit test and fixing old ones --- lambdas/ack_backend/src/constants.py | 1 - lambdas/ack_backend/src/update_ack_file.py | 9 +++-- .../src/file_level_validation.py | 5 ++- lambdas/shared/src/common/ack_file_utils.py | 36 +++++++++++++++++++ .../src/common/models/batch_constants.py | 1 + 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/lambdas/ack_backend/src/constants.py b/lambdas/ack_backend/src/constants.py index d157d02ba..b469037ce 100644 --- a/lambdas/ack_backend/src/constants.py +++ b/lambdas/ack_backend/src/constants.py @@ -1,7 +1,6 @@ """Constants for ack lambda""" COMPLETED_ACK_DIR = "forwardedFile" -TEMP_ACK_DIR = "TempAck" BATCH_FILE_PROCESSING_DIR = "processing" BATCH_FILE_ARCHIVE_DIR = "archive" LAMBDA_FUNCTION_NAME_PREFIX = "ack_processor" diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index b9d564e6d..1b7a64bdd 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -11,7 +11,13 @@ from common.batch.audit_table import get_record_count_and_failures_by_message_id, update_audit_table_item from common.clients import get_s3_client, logger from common.log_decorator import generate_and_send_logs -from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME, AuditTableKeys, FileStatus +from common.models.batch_constants import ( + ACK_BUCKET_NAME, + SOURCE_BUCKET_NAME, + TEMP_ACK_DIR, + AuditTableKeys, + FileStatus, +) from constants import ( ACK_HEADERS, BATCH_FILE_ARCHIVE_DIR, @@ -19,7 +25,6 @@ COMPLETED_ACK_DIR, DEFAULT_STREAM_NAME, LAMBDA_FUNCTION_NAME_PREFIX, - TEMP_ACK_DIR, ) STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", DEFAULT_STREAM_NAME) diff --git a/lambdas/recordprocessor/src/file_level_validation.py b/lambdas/recordprocessor/src/file_level_validation.py index 964eef58d..89faa74ce 100644 --- a/lambdas/recordprocessor/src/file_level_validation.py +++ b/lambdas/recordprocessor/src/file_level_validation.py @@ -6,7 +6,7 @@ import time from csv import DictReader -from common.ack_file_utils import make_and_upload_ack_file +from common.ack_file_utils import create_json_ack_file, make_and_upload_ack_file from common.aws_s3_utils import move_file from common.batch.audit_table import update_audit_table_item from common.clients import logger @@ -91,6 +91,9 @@ def file_level_validation(incoming_message_body: dict) -> dict: make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string) + # TODO create JSON ack file if flag is set + create_json_ack_file(message_id, file_key, created_at_formatted_string) + move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}") ingestion_start_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(time.time())) diff --git a/lambdas/shared/src/common/ack_file_utils.py b/lambdas/shared/src/common/ack_file_utils.py index bfa811874..8fcaa26bb 100644 --- a/lambdas/shared/src/common/ack_file_utils.py +++ b/lambdas/shared/src/common/ack_file_utils.py @@ -1,5 +1,6 @@ """Create ack file and upload to S3 bucket""" +import json from csv import writer from io import BytesIO, StringIO @@ -49,6 +50,41 @@ def upload_ack_file(file_key: str, ack_data: dict, created_at_formatted_string: get_s3_client().upload_fileobj(csv_bytes, ACK_BUCKET_NAME, ack_filename) +def create_json_ack_file( + message_id: str, + file_key: str, + created_at_formatted_string: str, +) -> None: + if file_key is None: + return + """Creates the initial JSON BusAck file and uploads it to the temp bucket""" + ack_filename = "TempAck/" + file_key.replace(".csv", f"_BusAck_{created_at_formatted_string}.json") + raw_ack_filename = ack_filename.split(".")[0] + try: + provider = ack_filename.split("_")[3] + except IndexError: + provider = "unknown" + + # Generate the initial fields + ack_data_dict = {} + ack_data_dict["system"] = "Immunisation FHIR API Batch Report" + ack_data_dict["version"] = 1 # TO FIX + + ack_data_dict["generatedDate"] = "" # will be filled on completion + ack_data_dict["filename"] = raw_ack_filename + ack_data_dict["provider"] = provider + ack_data_dict["messageHeaderId"] = message_id + + ack_data_dict["summary"] = {} + ack_data_dict["failures"] = [] + + print(json.dumps(ack_data_dict, indent=2)) + + # Upload ack_data_dict to S3 + json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) + get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, ack_filename) + + def make_and_upload_ack_file( message_id: str, file_key: str, diff --git a/lambdas/shared/src/common/models/batch_constants.py b/lambdas/shared/src/common/models/batch_constants.py index 6ec997f53..535f12e25 100644 --- a/lambdas/shared/src/common/models/batch_constants.py +++ b/lambdas/shared/src/common/models/batch_constants.py @@ -4,6 +4,7 @@ ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME") AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME") SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME") +TEMP_ACK_DIR = "TempAck" class FileStatus(StrEnum): From 6a796859e4ea20508962c6699bf972b31ff58001 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 17:11:30 +0000 Subject: [PATCH 04/23] TEMP_ACK_FILE --- lambdas/shared/src/common/ack_file_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambdas/shared/src/common/ack_file_utils.py b/lambdas/shared/src/common/ack_file_utils.py index 8fcaa26bb..32ebed7c2 100644 --- a/lambdas/shared/src/common/ack_file_utils.py +++ b/lambdas/shared/src/common/ack_file_utils.py @@ -5,7 +5,7 @@ from io import BytesIO, StringIO from common.clients import get_s3_client -from common.models.batch_constants import ACK_BUCKET_NAME +from common.models.batch_constants import ACK_BUCKET_NAME, TEMP_ACK_DIR def make_ack_data( @@ -58,7 +58,7 @@ def create_json_ack_file( if file_key is None: return """Creates the initial JSON BusAck file and uploads it to the temp bucket""" - ack_filename = "TempAck/" + file_key.replace(".csv", f"_BusAck_{created_at_formatted_string}.json") + ack_filename = TEMP_ACK_DIR + "/" + file_key.replace(".csv", f"_BusAck_{created_at_formatted_string}.json") raw_ack_filename = ack_filename.split(".")[0] try: provider = ack_filename.split("_")[3] From 38ce399ec88c699c90acdc2243bb690f187b8ec8 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 17:12:33 +0000 Subject: [PATCH 05/23] update_json_ack_file --- lambdas/ack_backend/src/ack_processor.py | 9 +++++- lambdas/ack_backend/src/update_ack_file.py | 34 ++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/lambdas/ack_backend/src/ack_processor.py b/lambdas/ack_backend/src/ack_processor.py index 6fdaae20b..a4bb434e2 100644 --- a/lambdas/ack_backend/src/ack_processor.py +++ b/lambdas/ack_backend/src/ack_processor.py @@ -6,7 +6,11 @@ from common.batch.eof_utils import is_eof_message from convert_message_to_ack_row import convert_message_to_ack_row from logging_decorators import ack_lambda_handler_logging_decorator -from update_ack_file import complete_batch_file_process, update_ack_file +from update_ack_file import ( + complete_batch_file_process, + update_ack_file, + update_json_ack_file, +) @ack_lambda_handler_logging_decorator @@ -54,6 +58,9 @@ def lambda_handler(event, _): update_ack_file(file_key, created_at_formatted_string, ack_data_rows) + # TODO: update json ack file on switch + update_json_ack_file(file_key, created_at_formatted_string, ack_data_rows) + if file_processing_complete: complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 1b7a64bdd..0240af37a 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -1,5 +1,6 @@ """Functions for uploading the data to the ack file""" +import json import os import time from datetime import datetime @@ -171,3 +172,36 @@ def update_ack_file( get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key) logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key) + + +def update_json_ack_file( + file_key: str, + created_at_formatted_string: str, + ack_data_rows: list, +) -> None: + """Updates the ack file with the new data row based on the given arguments""" + ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" + temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" + + # read the json file + existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) + existing_content = existing_ack_file["Body"].read().decode("utf-8") + ack_data_dict = json.loads(existing_content) + + for row in ack_data_rows: + json_data_row = {} + json_data_row["rowId"] = row["MESSAGE_HEADER_ID"].split("^")[-1] + json_data_row["responseCode"] = row["RESPONSE_CODE"] + json_data_row["responseDisplay"] = row["RESPONSE_DISPLAY"] + json_data_row["severity"] = row["ISSUE_SEVERITY"] + json_data_row["localId"] = row["LOCAL_ID"] + json_data_row["operationOutcome"] = row["OPERATION_OUTCOME"] + + ack_data_dict["failures"].append(json_data_row) + + # print(json.dumps(json_data_row, indent=2)) + + # Upload ack_data_dict to S3 + json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) + get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key) + logger.info("JSON ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key) From 4d0546902569fc30fb2d111d774fccddedf448c9 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 17:26:53 +0000 Subject: [PATCH 06/23] ingestion_start_time --- lambdas/recordprocessor/src/file_level_validation.py | 6 ++++-- lambdas/shared/src/common/ack_file_utils.py | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lambdas/recordprocessor/src/file_level_validation.py b/lambdas/recordprocessor/src/file_level_validation.py index 89faa74ce..d209c0444 100644 --- a/lambdas/recordprocessor/src/file_level_validation.py +++ b/lambdas/recordprocessor/src/file_level_validation.py @@ -92,11 +92,13 @@ def file_level_validation(incoming_message_body: dict) -> dict: make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string) # TODO create JSON ack file if flag is set - create_json_ack_file(message_id, file_key, created_at_formatted_string) + time_now = time.gmtime(time.time()) + ingestion_start_time = time.strftime("%Y%m%dT%H%M%S00", time_now) + ingestion_start_time_seconds = int(time.strftime("%s", time_now)) + create_json_ack_file(message_id, file_key, created_at_formatted_string, ingestion_start_time_seconds) move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}") - ingestion_start_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(time.time())) update_audit_table_item( file_key=file_key, message_id=message_id, diff --git a/lambdas/shared/src/common/ack_file_utils.py b/lambdas/shared/src/common/ack_file_utils.py index 32ebed7c2..14ae214f4 100644 --- a/lambdas/shared/src/common/ack_file_utils.py +++ b/lambdas/shared/src/common/ack_file_utils.py @@ -51,9 +51,7 @@ def upload_ack_file(file_key: str, ack_data: dict, created_at_formatted_string: def create_json_ack_file( - message_id: str, - file_key: str, - created_at_formatted_string: str, + message_id: str, file_key: str, created_at_formatted_string: str, ingestion_start_time_seconds: int ) -> None: if file_key is None: return @@ -76,6 +74,8 @@ def create_json_ack_file( ack_data_dict["messageHeaderId"] = message_id ack_data_dict["summary"] = {} + ack_data_dict["summary"]["ingestionTime"] = {} + ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time_seconds ack_data_dict["failures"] = [] print(json.dumps(ack_data_dict, indent=2)) From ef5e005b1455cc158c795013754740b09a967a8d Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Thu, 29 Jan 2026 17:49:48 +0000 Subject: [PATCH 07/23] complete_batch_file_process --- lambdas/ack_backend/src/update_ack_file.py | 35 ++++++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 0240af37a..e3b75f061 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -77,18 +77,15 @@ def complete_batch_file_process( the audit table status""" start_time = time.time() - ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" - - move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}") - move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}") - total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id) update_audit_table_item( file_key=file_key, message_id=message_id, attrs_to_update={AuditTableKeys.STATUS: FileStatus.PROCESSED} ) # Consider creating time utils and using datetime instead of time - ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime()) + time_now = time.gmtime(time.time()) + ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time_now) + ingestion_end_time_seconds = int(time.strftime("%s", time_now)) successful_record_count = total_ack_rows_processed - total_failures update_audit_table_item( file_key=file_key, @@ -99,6 +96,32 @@ def complete_batch_file_process( }, ) + # finish CSV file + ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" + + move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}") + move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}") + + # finish JSON file + # TODO: need to abstract this out + json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" + temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}" + + # read the json file + existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) + existing_content = existing_ack_file["Body"].read().decode("utf-8") + ack_data_dict = json.loads(existing_content) + + ack_data_dict["summary"]["totalRecords"] = total_ack_rows_processed + ack_data_dict["summary"]["success"] = successful_record_count + ack_data_dict["summary"]["failed"] = total_failures + ack_data_dict["summary"]["ingestionTime"]["end"] = ingestion_end_time_seconds + + # Upload ack_data_dict to S3 + json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) + get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key) + move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{json_ack_filename}", f"{COMPLETED_ACK_DIR}/{json_ack_filename}") + result = { "message_id": message_id, "file_key": file_key, From b2029ae727ecb9a9d57bd3d9c365be03898f783b Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 30 Jan 2026 13:10:58 +0000 Subject: [PATCH 08/23] obtain_current_json_ack_content --- lambdas/ack_backend/src/constants.py | 1 + lambdas/ack_backend/src/update_ack_file.py | 67 +++++++++++++++---- .../src/file_level_validation.py | 9 +-- lambdas/shared/src/common/ack_file_utils.py | 38 +---------- .../shared/src/common/batch/audit_table.py | 11 +++ .../src/common/models/batch_constants.py | 1 - 6 files changed, 70 insertions(+), 57 deletions(-) diff --git a/lambdas/ack_backend/src/constants.py b/lambdas/ack_backend/src/constants.py index b469037ce..d157d02ba 100644 --- a/lambdas/ack_backend/src/constants.py +++ b/lambdas/ack_backend/src/constants.py @@ -1,6 +1,7 @@ """Constants for ack lambda""" COMPLETED_ACK_DIR = "forwardedFile" +TEMP_ACK_DIR = "TempAck" BATCH_FILE_PROCESSING_DIR = "processing" BATCH_FILE_ARCHIVE_DIR = "archive" LAMBDA_FUNCTION_NAME_PREFIX = "ack_processor" diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index e3b75f061..12f774a8d 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -9,13 +9,16 @@ from botocore.exceptions import ClientError from common.aws_s3_utils import move_file -from common.batch.audit_table import get_record_count_and_failures_by_message_id, update_audit_table_item +from common.batch.audit_table import ( + get_ingestion_start_time_by_message_id, + get_record_count_and_failures_by_message_id, + update_audit_table_item, +) from common.clients import get_s3_client, logger from common.log_decorator import generate_and_send_logs from common.models.batch_constants import ( ACK_BUCKET_NAME, SOURCE_BUCKET_NAME, - TEMP_ACK_DIR, AuditTableKeys, FileStatus, ) @@ -26,6 +29,7 @@ COMPLETED_ACK_DIR, DEFAULT_STREAM_NAME, LAMBDA_FUNCTION_NAME_PREFIX, + TEMP_ACK_DIR, ) STREAM_NAME = os.getenv("SPLUNK_FIREHOSE_NAME", DEFAULT_STREAM_NAME) @@ -106,12 +110,10 @@ def complete_batch_file_process( # TODO: need to abstract this out json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}" + ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) - # read the json file - existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) - existing_content = existing_ack_file["Body"].read().decode("utf-8") - ack_data_dict = json.loads(existing_content) - + generated_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z") + ack_data_dict["generatedDate"] = generated_date ack_data_dict["summary"]["totalRecords"] = total_ack_rows_processed ack_data_dict["summary"]["success"] = successful_record_count ack_data_dict["summary"]["failed"] = total_failures @@ -176,6 +178,50 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO: return accumulated_csv_content +def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> dict: + """Returns the current ack file content if the file exists, or else initialises the content with the ack headers.""" + try: + # If ack file exists in S3 download the contents + existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) + existing_content = existing_ack_file["Body"].read().decode("utf-8") + ack_data_dict = json.loads(existing_content) + except ClientError as error: + # If ack file does not exist in S3 create a new file containing the headers only + if error.response["Error"]["Code"] in ("404", "NoSuchKey"): + logger.info("No existing JSON ack file found in S3 - creating new file") + + ingestion_start_time = get_ingestion_start_time_by_message_id(message_id) + raw_ack_filename = temp_ack_file_key.split(".")[0] + try: + provider = temp_ack_file_key.split("_")[3] + except IndexError: + provider = "unknown" + + # Generate the initial fields + ack_data_dict = {} + ack_data_dict["system"] = "Immunisation FHIR API Batch Report" + ack_data_dict["version"] = 1 # TO FIX + + ack_data_dict["generatedDate"] = "" # will be filled on completion + ack_data_dict["filename"] = raw_ack_filename + ack_data_dict["provider"] = provider + ack_data_dict["messageHeaderId"] = message_id + + ack_data_dict["summary"] = {} + ack_data_dict["summary"]["ingestionTime"] = {} + ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time + ack_data_dict["failures"] = [] + + print(json.dumps(ack_data_dict, indent=2)) + else: + logger.error("error whilst obtaining current JSON ack content: %s", error) + raise + + accumulated_csv_content = StringIO() + accumulated_csv_content.write(existing_content) + return accumulated_csv_content + + def update_ack_file( file_key: str, created_at_formatted_string: str, @@ -205,11 +251,8 @@ def update_json_ack_file( """Updates the ack file with the new data row based on the given arguments""" ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" - - # read the json file - existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) - existing_content = existing_ack_file["Body"].read().decode("utf-8") - ack_data_dict = json.loads(existing_content) + message_id = ack_data_rows[0]["MESSAGE_HEADER_ID"].split("^")[-1] + ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) for row in ack_data_rows: json_data_row = {} diff --git a/lambdas/recordprocessor/src/file_level_validation.py b/lambdas/recordprocessor/src/file_level_validation.py index d209c0444..964eef58d 100644 --- a/lambdas/recordprocessor/src/file_level_validation.py +++ b/lambdas/recordprocessor/src/file_level_validation.py @@ -6,7 +6,7 @@ import time from csv import DictReader -from common.ack_file_utils import create_json_ack_file, make_and_upload_ack_file +from common.ack_file_utils import make_and_upload_ack_file from common.aws_s3_utils import move_file from common.batch.audit_table import update_audit_table_item from common.clients import logger @@ -91,14 +91,9 @@ def file_level_validation(incoming_message_body: dict) -> dict: make_and_upload_ack_file(message_id, file_key, True, True, created_at_formatted_string) - # TODO create JSON ack file if flag is set - time_now = time.gmtime(time.time()) - ingestion_start_time = time.strftime("%Y%m%dT%H%M%S00", time_now) - ingestion_start_time_seconds = int(time.strftime("%s", time_now)) - create_json_ack_file(message_id, file_key, created_at_formatted_string, ingestion_start_time_seconds) - move_file(SOURCE_BUCKET_NAME, file_key, f"{PROCESSING_DIR_NAME}/{file_key}") + ingestion_start_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime(time.time())) update_audit_table_item( file_key=file_key, message_id=message_id, diff --git a/lambdas/shared/src/common/ack_file_utils.py b/lambdas/shared/src/common/ack_file_utils.py index 14ae214f4..bfa811874 100644 --- a/lambdas/shared/src/common/ack_file_utils.py +++ b/lambdas/shared/src/common/ack_file_utils.py @@ -1,11 +1,10 @@ """Create ack file and upload to S3 bucket""" -import json from csv import writer from io import BytesIO, StringIO from common.clients import get_s3_client -from common.models.batch_constants import ACK_BUCKET_NAME, TEMP_ACK_DIR +from common.models.batch_constants import ACK_BUCKET_NAME def make_ack_data( @@ -50,41 +49,6 @@ def upload_ack_file(file_key: str, ack_data: dict, created_at_formatted_string: get_s3_client().upload_fileobj(csv_bytes, ACK_BUCKET_NAME, ack_filename) -def create_json_ack_file( - message_id: str, file_key: str, created_at_formatted_string: str, ingestion_start_time_seconds: int -) -> None: - if file_key is None: - return - """Creates the initial JSON BusAck file and uploads it to the temp bucket""" - ack_filename = TEMP_ACK_DIR + "/" + file_key.replace(".csv", f"_BusAck_{created_at_formatted_string}.json") - raw_ack_filename = ack_filename.split(".")[0] - try: - provider = ack_filename.split("_")[3] - except IndexError: - provider = "unknown" - - # Generate the initial fields - ack_data_dict = {} - ack_data_dict["system"] = "Immunisation FHIR API Batch Report" - ack_data_dict["version"] = 1 # TO FIX - - ack_data_dict["generatedDate"] = "" # will be filled on completion - ack_data_dict["filename"] = raw_ack_filename - ack_data_dict["provider"] = provider - ack_data_dict["messageHeaderId"] = message_id - - ack_data_dict["summary"] = {} - ack_data_dict["summary"]["ingestionTime"] = {} - ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time_seconds - ack_data_dict["failures"] = [] - - print(json.dumps(ack_data_dict, indent=2)) - - # Upload ack_data_dict to S3 - json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) - get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, ack_filename) - - def make_and_upload_ack_file( message_id: str, file_key: str, diff --git a/lambdas/shared/src/common/batch/audit_table.py b/lambdas/shared/src/common/batch/audit_table.py index 58598757c..7f1e2a488 100644 --- a/lambdas/shared/src/common/batch/audit_table.py +++ b/lambdas/shared/src/common/batch/audit_table.py @@ -111,6 +111,17 @@ def _build_audit_table_update_log_message(file_key: str, message_id: str, attrs_ ) +def get_ingestion_start_time_by_message_id(event_message_id: str) -> int: + """Retrieves ingestion start time by unique event message ID""" + # Required by JSON ack file + audit_record = dynamodb_client.get_item( + TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}} + ) + + ingestion_start_time = audit_record.get("Item", {}).get(AuditTableKeys.INGESTION_START_TIME, {}).get("N") + return int(ingestion_start_time) if ingestion_start_time else 0 + + def get_record_count_and_failures_by_message_id(event_message_id: str) -> tuple[int, int]: """Retrieves total record count and total failures by unique event message ID""" audit_record = dynamodb_client.get_item( diff --git a/lambdas/shared/src/common/models/batch_constants.py b/lambdas/shared/src/common/models/batch_constants.py index 535f12e25..6ec997f53 100644 --- a/lambdas/shared/src/common/models/batch_constants.py +++ b/lambdas/shared/src/common/models/batch_constants.py @@ -4,7 +4,6 @@ ACK_BUCKET_NAME = os.getenv("ACK_BUCKET_NAME") AUDIT_TABLE_NAME = os.getenv("AUDIT_TABLE_NAME") SOURCE_BUCKET_NAME = os.getenv("SOURCE_BUCKET_NAME") -TEMP_ACK_DIR = "TempAck" class FileStatus(StrEnum): From 0c773d60475f007f062c2983105823efc6866f32 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 30 Jan 2026 14:13:25 +0000 Subject: [PATCH 09/23] cut/paste fix --- lambdas/ack_backend/src/update_ack_file.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 12f774a8d..e7ae645f9 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -217,9 +217,7 @@ def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> logger.error("error whilst obtaining current JSON ack content: %s", error) raise - accumulated_csv_content = StringIO() - accumulated_csv_content.write(existing_content) - return accumulated_csv_content + return ack_data_dict def update_ack_file( From fb16398b71e3877070bc8eae71103f28624a241b Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 30 Jan 2026 14:13:54 +0000 Subject: [PATCH 10/23] mock out get_ingestion_start_time_by_message_id --- lambdas/ack_backend/tests/test_update_ack_file_flow.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lambdas/ack_backend/tests/test_update_ack_file_flow.py b/lambdas/ack_backend/tests/test_update_ack_file_flow.py index a286cb950..aa020d763 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file_flow.py +++ b/lambdas/ack_backend/tests/test_update_ack_file_flow.py @@ -37,11 +37,14 @@ def setUp(self): self.mock_update_audit_table_item = self.update_audit_table_item_patcher.start() self.get_record_and_failure_count_patcher = patch("update_ack_file.get_record_count_and_failures_by_message_id") self.mock_get_record_and_failure_count = self.get_record_and_failure_count_patcher.start() + self.get_ingestion_start_time_patcher = patch("update_ack_file.get_ingestion_start_time_by_message_id") + self.mock_get_ingestion_start_time = self.get_ingestion_start_time_patcher.start() def tearDown(self): self.logger_patcher.stop() self.update_audit_table_item_patcher.stop() self.get_record_and_failure_count_patcher.stop() + self.get_ingestion_start_time_patcher.stop() def test_audit_table_updated_correctly_when_ack_process_complete(self): """VED-167 - Test that the audit table has been updated correctly""" @@ -58,6 +61,7 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): Bucket=self.ack_bucket_name, Key=f"TempAck/audit_table_test_BusAck_{mock_created_at_string}.csv" ) self.mock_get_record_and_failure_count.return_value = 10, 2 + self.mock_get_ingestion_start_time.return_value = 1769781283 # Act update_ack_file.complete_batch_file_process( From b4221efd334b49467e1cfb7de9daa708d97e21e1 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Fri, 30 Jan 2026 16:17:26 +0000 Subject: [PATCH 11/23] fix & test get_ingestion_start_time_by_message_id --- .../test_common/batch/test_audit_table.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/lambdas/shared/tests/test_common/batch/test_audit_table.py b/lambdas/shared/tests/test_common/batch/test_audit_table.py index 75b9763ca..6c4fc9e2d 100644 --- a/lambdas/shared/tests/test_common/batch/test_audit_table.py +++ b/lambdas/shared/tests/test_common/batch/test_audit_table.py @@ -27,6 +27,7 @@ from common.batch.audit_table import ( NOTHING_TO_UPDATE_ERROR_MESSAGE, create_audit_table_item, + get_ingestion_start_time_by_message_id, get_record_count_and_failures_by_message_id, increment_records_failed_count, update_audit_table_item, @@ -301,6 +302,35 @@ def test_get_record_count_and_failures_by_message_id_returns_zero_if_values_not_ self.assertEqual(record_count, 0) self.assertEqual(failed_count, 0) + def test_get_ingestion_start_time_by_message_id_returns_the_ingestion_start_time(self): + """Test that get_ingestion_start_time_by_message_id retrieves the integer value of the ingestion start time""" + ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26") + expected_table_entry = { + **MockFileDetails.rsv_ravs.audit_table_entry, + "status": {"S": FileStatus.PREPROCESSED}, + "ingestion_start_time": {"S": "1769789375"}, + } + + dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=expected_table_entry) + + ingestion_start_time = get_ingestion_start_time_by_message_id(ravs_rsv_test_file.message_id) + + self.assertEqual(ingestion_start_time, 1769789375) + + def test_get_ingestion_start_time_by_message_id_returns_zero_if_values_not_set(self): + """Test that if the ingestion start time has not yet been set on the audit item then zero is returned""" + ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26") + expected_table_entry = { + **MockFileDetails.rsv_ravs.audit_table_entry, + "status": {"S": FileStatus.PREPROCESSED}, + } + + dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=expected_table_entry) + + ingestion_start_time = get_ingestion_start_time_by_message_id(ravs_rsv_test_file.message_id) + + self.assertEqual(ingestion_start_time, 0) + def test_increment_records_failed_count(self): """Checks audit table correctly increments the records_failed count""" ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26") From cac9e325d594f2976ecc533638e79a371bc560d9 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 10:59:30 +0000 Subject: [PATCH 12/23] bugfix --- lambdas/shared/src/common/batch/audit_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/shared/src/common/batch/audit_table.py b/lambdas/shared/src/common/batch/audit_table.py index 7f1e2a488..8e003cc10 100644 --- a/lambdas/shared/src/common/batch/audit_table.py +++ b/lambdas/shared/src/common/batch/audit_table.py @@ -118,7 +118,7 @@ def get_ingestion_start_time_by_message_id(event_message_id: str) -> int: TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}} ) - ingestion_start_time = audit_record.get("Item", {}).get(AuditTableKeys.INGESTION_START_TIME, {}).get("N") + ingestion_start_time = audit_record.get("Item", {}).get(AuditTableKeys.INGESTION_START_TIME, {}).get("S") return int(ingestion_start_time) if ingestion_start_time else 0 From 319a98e8f3cd9bffabd929016eb3b0778ef07146 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 13:42:02 +0000 Subject: [PATCH 13/23] bugfix split()[] --- lambdas/ack_backend/src/update_ack_file.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index e7ae645f9..8d8d219db 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -211,8 +211,6 @@ def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> ack_data_dict["summary"]["ingestionTime"] = {} ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time ack_data_dict["failures"] = [] - - print(json.dumps(ack_data_dict, indent=2)) else: logger.error("error whilst obtaining current JSON ack content: %s", error) raise @@ -249,7 +247,7 @@ def update_json_ack_file( """Updates the ack file with the new data row based on the given arguments""" ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" - message_id = ack_data_rows[0]["MESSAGE_HEADER_ID"].split("^")[-1] + message_id = ack_data_rows[0]["MESSAGE_HEADER_ID"].split("^")[0] ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) for row in ack_data_rows: @@ -263,8 +261,6 @@ def update_json_ack_file( ack_data_dict["failures"].append(json_data_row) - # print(json.dumps(json_data_row, indent=2)) - # Upload ack_data_dict to S3 json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key) From fc500dbb9c8d4319b2429fa79ec8f399eed700ef Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 13:48:11 +0000 Subject: [PATCH 14/23] test_update_ack_file --- .../ack_backend/tests/test_update_ack_file.py | 160 +++++++++++++++++- .../tests/test_update_ack_file_flow.py | 4 + .../utils/utils_for_ack_backend_tests.py | 95 +++++++++++ .../utils/values_for_ack_backend_tests.py | 28 +++ 4 files changed, 285 insertions(+), 2 deletions(-) diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index 6b9cd163f..fe4300448 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -1,5 +1,7 @@ """Tests for the functions in the update_ack_file module.""" +import copy +import json import os import unittest from io import StringIO @@ -21,8 +23,12 @@ MOCK_MESSAGE_DETAILS, generate_expected_ack_content, generate_expected_ack_file_row, + generate_expected_json_ack_content, + generate_expected_json_ack_file_element, generate_sample_existing_ack_content, + generate_sample_existing_json_ack_content, obtain_current_ack_file_content, + obtain_current_json_ack_file_content, setup_existing_ack_file, ) from utils.values_for_ack_backend_tests import DefaultValues, ValidValues @@ -31,7 +37,9 @@ from update_ack_file import ( create_ack_data, obtain_current_ack_content, + obtain_current_json_ack_content, update_ack_file, + update_json_ack_file, ) firehose_client = boto3_client("firehose", region_name=REGION_NAME) @@ -44,7 +52,7 @@ class TestUpdateAckFile(unittest.TestCase): def setUp(self) -> None: self.s3_client = boto3_client("s3", region_name=REGION_NAME) - GenericSetUp(self.s3_client) + GenericSetUp(s3_client=self.s3_client) # MOCK SOURCE FILE WITH 100 ROWS TO SIMULATE THE SCENARIO WHERE THE ACK FILE IS NOT FULL. # TODO: Test all other scenarios. @@ -60,8 +68,14 @@ def setUp(self) -> None: self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION) self.ack_bucket_patcher.start() + self.get_ingestion_start_time_by_message_id_patcher = patch( + "update_ack_file.get_ingestion_start_time_by_message_id" + ) + self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start() + self.mock_get_ingestion_start_time_by_message_id.return_value = 3456 + def tearDown(self) -> None: - GenericTearDown(self.s3_client) + GenericTearDown(s3_client=self.s3_client) def validate_ack_file_content( self, @@ -76,6 +90,21 @@ def validate_ack_file_content( expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) self.assertEqual(expected_ack_file_content, actual_ack_file_content) + def validate_json_ack_file_content( + self, + incoming_messages: list[dict], + existing_file_content: str = ValidValues.json_ack_initial_content, + ) -> None: + """ + Obtains the json ack file content and ensures that it matches the expected content (expected content is based + on the incoming messages). + """ + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + def test_update_ack_file(self): """Test that update_ack_file correctly creates the ack file when there was no existing ack file""" @@ -147,6 +176,80 @@ def test_update_ack_file(self): Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key, ) + def test_update_json_ack_file(self): + """Test that update_json_ack_file correctly creates the ack file when there was no existing ack file""" + + test_cases = [ + { + "description": "Single failure row", + "input_rows": [ValidValues.ack_data_failure_dict], + "expected_elements": [ + generate_expected_json_ack_file_element( + success=False, imms_id=DefaultValues.imms_id, diagnostics="DIAGNOSTICS" + ) + ], + }, + { + "description": "With multiple rows", + "input_rows": [ + {**ValidValues.ack_data_failure_dict, "IMMS_ID": "TEST_IMMS_ID_1"}, + ValidValues.ack_data_failure_dict, + ValidValues.ack_data_failure_dict, + ], + "expected_elements": [ + generate_expected_json_ack_file_element( + success=False, + imms_id="TEST_IMMS_ID_1", + diagnostics="DIAGNOSTICS", + ), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + ], + }, + { + "description": "Multiple rows With different diagnostics", + "input_rows": [ + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 1", + }, + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 2", + }, + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 3", + }, + ], + "expected_elements": [ + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 1"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 2"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 3"), + ], + }, + ] + + for test_case in test_cases: + with self.subTest(test_case["description"]): + update_json_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=test_case["input_rows"], + ) + + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + expected_ack_file_content = copy.deepcopy(ValidValues.json_ack_initial_content) + for element in test_case["expected_elements"]: + expected_ack_file_content["failures"].append(element) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + self.s3_client.delete_object( + Bucket=BucketNames.DESTINATION, + Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + ) + def test_update_ack_file_existing(self): """Test that update_ack_file correctly updates the ack file when there was an existing ack file""" # Mock existing content in the ack file @@ -171,6 +274,34 @@ def test_update_ack_file_existing(self): expected_ack_file_content = existing_content + "\n".join(expected_rows) + "\n" self.assertEqual(expected_ack_file_content, actual_ack_file_content) + def test_update_json_ack_file_existing(self): + """Test that update_json_ack_file correctly updates the ack file when there was an existing ack file""" + # Mock existing content in the ack file + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + + ack_data_rows = [ + ValidValues.ack_data_failure_dict, + ] + update_json_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=ack_data_rows, + ) + + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + + expected_rows = [ + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + ] + expected_ack_file_content = existing_content + expected_ack_file_content["failures"].append(expected_rows[0]) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + def test_create_ack_data(self): """Test create_ack_data with success and failure cases.""" @@ -245,6 +376,31 @@ def test_obtain_current_ack_content_file_exists(self): result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) self.assertEqual(result.getvalue(), existing_content) + def test_obtain_current_json_ack_content_file_no_existing(self): + """Test that when the json ack file does not yet exist, obtain_current_json_ack_content returns the ack headers only.""" + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + self.assertEqual(result, ValidValues.json_ack_initial_content) + + def test_obtain_current_json_ack_content_file_exists(self): + """Test that the existing json ack file content is retrieved and new elements are added.""" + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + self.assertEqual(result, existing_content) + + """ + then: fix the flow test (complete) to check that the JSON file has been read, summary updated, written and moved; + this might actually be worth a test in its own right, and put it in here. + NB we need something that checks, in complete() when the JSON file doesn't exist at all, that it's + created from scratch. I think that is again a test in its own right + """ + if __name__ == "__main__": unittest.main() diff --git a/lambdas/ack_backend/tests/test_update_ack_file_flow.py b/lambdas/ack_backend/tests/test_update_ack_file_flow.py index aa020d763..be36e0be8 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file_flow.py +++ b/lambdas/ack_backend/tests/test_update_ack_file_flow.py @@ -75,3 +75,7 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): # Assert: Only check audit table interactions self.mock_get_record_and_failure_count.assert_called_once_with(message_id) self.assertEqual(self.mock_update_audit_table_item.call_count, 2) + + # TODO: add to this test: check the JSON ack file has been written out and that + # the fields have been correctly added to it. + # it'll involve putting a JSON object, calling complete_batch_file_process, reading it back. diff --git a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py index 9781b051a..16e8eb147 100644 --- a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py @@ -52,6 +52,14 @@ def obtain_current_ack_file_content(s3_client, temp_ack_file_key: str = MOCK_MES return retrieved_object["Body"].read().decode("utf-8") +def obtain_current_json_ack_file_content( + s3_client, temp_ack_file_key: str = MOCK_MESSAGE_DETAILS.temp_ack_file_key +) -> dict: + """Obtains the ack file content from the destination bucket.""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=temp_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + def obtain_completed_ack_file_content( s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key ) -> str: @@ -60,6 +68,14 @@ def obtain_completed_ack_file_content( return retrieved_object["Body"].read().decode("utf-8") +def obtain_completed_json_ack_file_content( + s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key +) -> dict: + """Obtains the ack file content from the forwardedFile directory""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + def generate_expected_ack_file_row( success: bool, imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, @@ -81,11 +97,40 @@ def generate_expected_ack_file_row( ) +def generate_expected_json_ack_file_element( + success: bool, + imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, + diagnostics: str = None, + row_id: str = MOCK_MESSAGE_DETAILS.row_id, + local_id: str = MOCK_MESSAGE_DETAILS.local_id, + created_at_formatted_string: str = MOCK_MESSAGE_DETAILS.created_at_formatted_string, +) -> dict: + """Create an ack element, containing the given message details.""" + if success: + return None # we no longer process success elements + else: + return { + "rowId": row_id.split("^")[-1], + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": local_id, + "operationOutcome": "" if not diagnostics else diagnostics, + } + + def generate_sample_existing_ack_content() -> str: """Returns sample ack file content with a single success row.""" return ValidValues.ack_headers + generate_expected_ack_file_row(success=True) +def generate_sample_existing_json_ack_content() -> dict: + """Returns sample ack file content with a single failure row.""" + sample_content = ValidValues.json_ack_initial_content + sample_content["failures"].append(generate_expected_json_ack_file_element(success=False)) + return sample_content + + def generate_expected_ack_content(incoming_messages: list[dict], existing_content: str = ValidValues.ack_headers) -> str: """Returns the expected_ack_file_content based on the incoming messages""" for message in incoming_messages: @@ -115,6 +160,37 @@ def generate_expected_ack_content(incoming_messages: list[dict], existing_conten return existing_content +def generate_expected_json_ack_content( + incoming_messages: list[dict], existing_content: str = ValidValues.json_ack_initial_content +) -> dict: + """Returns the expected_json_ack_file_content based on the incoming messages""" + for message in incoming_messages: + # Determine diagnostics based on the diagnostics value in the incoming message + diagnostics_dictionary = message.get("diagnostics", {}) + diagnostics = ( + diagnostics_dictionary.get("error_message", "") + if isinstance(diagnostics_dictionary, dict) + else "Unable to determine diagnostics issue" + ) + + # Create the ack row based on the incoming message details + ack_element = generate_expected_json_ack_file_element( + success=diagnostics == "", + row_id=message.get("row_id", MOCK_MESSAGE_DETAILS.row_id), + created_at_formatted_string=message.get( + "created_at_formatted_string", + MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ), + local_id=message.get("local_id", MOCK_MESSAGE_DETAILS.local_id), + imms_id=("" if diagnostics else message.get("imms_id", MOCK_MESSAGE_DETAILS.imms_id)), + diagnostics=diagnostics, + ) + + existing_content["failures"].append(ack_element) + + return existing_content + + def validate_ack_file_content( s3_client, incoming_messages: list[dict], @@ -130,3 +206,22 @@ def validate_ack_file_content( ) expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) assert expected_ack_file_content == actual_ack_file_content + + +def validate_json_ack_file_content( + s3_client, + incoming_messages: list[dict], + existing_file_content: str = ValidValues.json_ack_initial_content, + is_complete: bool = False, +) -> None: + """ + Obtains the ack file content and ensures that it matches the expected content (expected content is based + on the incoming messages). + """ + actual_ack_file_content = ( + obtain_current_json_ack_file_content(s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key) + if not is_complete + else obtain_completed_json_ack_file_content(s3_client, MOCK_MESSAGE_DETAILS.archive_json_ack_file_key) + ) + expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) + assert expected_ack_file_content == actual_ack_file_content diff --git a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py index 706debe26..4183c4e19 100644 --- a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py @@ -103,9 +103,15 @@ def __init__( self.temp_ack_file_key = ( f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.csv" ) + self.temp_json_ack_file_key = ( + f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" + ) self.archive_ack_file_key = ( f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.csv" ) + self.archive_json_ack_file_key = ( + f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" + ) self.vaccine_type = vaccine_type self.ods_code = ods_code self.supplier = supplier @@ -259,6 +265,28 @@ class ValidValues: "message": "Record processing complete", } + json_ack_initial_content = { + "system": "Immunisation FHIR API Batch Report", + "version": 1, + "generatedDate": "", + "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", + "provider": "X26", + "messageHeaderId": "test_file_id", + "summary": {"ingestionTime": {"start": 3456}}, + "failures": [], + } + + json_ack_data_failure_dict = ( + { + "rowId": DefaultValues.row_id, + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": DefaultValues.local_id, + "operationOutcome": "DIAGNOSTICS", + }, + ) + class InvalidValues: """Invalid values for use in tests""" From 26a3c6af134d4e970d194af5ae8723085fc6a6a1 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 16:43:57 +0000 Subject: [PATCH 15/23] test_complete_batch_file_process_json_ack_file II --- lambdas/ack_backend/src/ack_processor.py | 1 - lambdas/ack_backend/src/update_ack_file.py | 1 - .../ack_backend/tests/test_update_ack_file.py | 54 ++++++++++++++++--- .../tests/test_update_ack_file_flow.py | 4 -- .../utils/values_for_ack_backend_tests.py | 20 +++++++ 5 files changed, 68 insertions(+), 12 deletions(-) diff --git a/lambdas/ack_backend/src/ack_processor.py b/lambdas/ack_backend/src/ack_processor.py index a4bb434e2..b03f94f6e 100644 --- a/lambdas/ack_backend/src/ack_processor.py +++ b/lambdas/ack_backend/src/ack_processor.py @@ -58,7 +58,6 @@ def lambda_handler(event, _): update_ack_file(file_key, created_at_formatted_string, ack_data_rows) - # TODO: update json ack file on switch update_json_ack_file(file_key, created_at_formatted_string, ack_data_rows) if file_processing_complete: diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 8d8d219db..be9568211 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -107,7 +107,6 @@ def complete_batch_file_process( move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}") # finish JSON file - # TODO: need to abstract this out json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}" ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index fe4300448..c10e06ed2 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -35,6 +35,7 @@ with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT): from update_ack_file import ( + complete_batch_file_process, create_ack_data, obtain_current_ack_content, obtain_current_json_ack_content, @@ -67,6 +68,8 @@ def setUp(self) -> None: self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION) self.ack_bucket_patcher.start() + self.source_bucket_patcher = patch("update_ack_file.SOURCE_BUCKET_NAME", BucketNames.SOURCE) + self.source_bucket_patcher.start() self.get_ingestion_start_time_by_message_id_patcher = patch( "update_ack_file.get_ingestion_start_time_by_message_id" @@ -74,6 +77,19 @@ def setUp(self) -> None: self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start() self.mock_get_ingestion_start_time_by_message_id.return_value = 3456 + self.get_record_and_failure_count_patcher = patch("update_ack_file.get_record_count_and_failures_by_message_id") + self.mock_get_record_and_failure_count = self.get_record_and_failure_count_patcher.start() + + self.update_audit_table_item_patcher = patch("update_ack_file.update_audit_table_item") + self.mock_update_audit_table_item = self.update_audit_table_item_patcher.start() + + self.datetime_patcher = patch("update_ack_file.time") + self.mock_datetime = self.datetime_patcher.start() + self.mock_datetime.strftime.return_value = "7890" + + self.generate_send_patcher = patch("update_ack_file.generate_and_send_logs") + self.mock_generate_send = self.generate_send_patcher.start() + def tearDown(self) -> None: GenericTearDown(s3_client=self.s3_client) @@ -394,12 +410,38 @@ def test_obtain_current_json_ack_content_file_exists(self): ) self.assertEqual(result, existing_content) - """ - then: fix the flow test (complete) to check that the JSON file has been read, summary updated, written and moved; - this might actually be worth a test in its own right, and put it in here. - NB we need something that checks, in complete() when the JSON file doesn't exist at all, that it's - created from scratch. I think that is again a test in its own right - """ + def test_complete_batch_file_process_json_ack_file(self): + """Test that complete_batch_file_process completes and moves the JSON ack file.""" + generate_sample_existing_json_ack_content() + self.s3_client.put_object( + Bucket=BucketNames.SOURCE, + Key=f"processing/{MOCK_MESSAGE_DETAILS.file_key}", + Body="dummy content", + ) + update_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[ValidValues.ack_data_failure_dict], + ) + update_json_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[ValidValues.ack_data_failure_dict], + ) + + self.mock_get_record_and_failure_count.return_value = 10, 1 + + complete_batch_file_process( + message_id=MOCK_MESSAGE_DETAILS.message_id, + supplier=MOCK_MESSAGE_DETAILS.supplier, + vaccine_type=MOCK_MESSAGE_DETAILS.vaccine_type, + created_at_formatted_string="20211120T12000000", + file_key=MOCK_MESSAGE_DETAILS.file_key, + ) + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.archive_json_ack_file_key + ) + self.assertEqual(result, ValidValues.json_ack_complete_content) if __name__ == "__main__": diff --git a/lambdas/ack_backend/tests/test_update_ack_file_flow.py b/lambdas/ack_backend/tests/test_update_ack_file_flow.py index be36e0be8..aa020d763 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file_flow.py +++ b/lambdas/ack_backend/tests/test_update_ack_file_flow.py @@ -75,7 +75,3 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): # Assert: Only check audit table interactions self.mock_get_record_and_failure_count.assert_called_once_with(message_id) self.assertEqual(self.mock_update_audit_table_item.call_count, 2) - - # TODO: add to this test: check the JSON ack file has been written out and that - # the fields have been correctly added to it. - # it'll involve putting a JSON object, calling complete_batch_file_process, reading it back. diff --git a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py index 4183c4e19..8c1cf0092 100644 --- a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py @@ -276,6 +276,26 @@ class ValidValues: "failures": [], } + json_ack_complete_content = { + "system": "Immunisation FHIR API Batch Report", + "version": 1, + "generatedDate": "7890", + "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", + "provider": "X26", + "messageHeaderId": "test_file_id", + "summary": {"totalRecords": 10, "success": 9, "failed": 1, "ingestionTime": {"start": 3456, "end": 7890}}, + "failures": [ + { + "rowId": "1", + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": "test_system_uri^testabc", + "operationOutcome": "DIAGNOSTICS", + } + ], + } + json_ack_data_failure_dict = ( { "rowId": DefaultValues.row_id, From 706ee56dceab65af7b8ac13a7cf12da3821493ee Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 17:00:01 +0000 Subject: [PATCH 16/23] code cleanup --- .../ack_backend/tests/test_update_ack_file.py | 226 +++++++++--------- .../utils/utils_for_ack_backend_tests.py | 124 +++++----- .../utils/values_for_ack_backend_tests.py | 6 +- 3 files changed, 178 insertions(+), 178 deletions(-) diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index c10e06ed2..fdc9f6873 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -106,21 +106,6 @@ def validate_ack_file_content( expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def validate_json_ack_file_content( - self, - incoming_messages: list[dict], - existing_file_content: str = ValidValues.json_ack_initial_content, - ) -> None: - """ - Obtains the json ack file content and ensures that it matches the expected content (expected content is based - on the incoming messages). - """ - actual_ack_file_content = obtain_current_json_ack_file_content( - self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key - ) - expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) - self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def test_update_ack_file(self): """Test that update_ack_file correctly creates the ack file when there was no existing ack file""" @@ -192,6 +177,119 @@ def test_update_ack_file(self): Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key, ) + def test_update_ack_file_existing(self): + """Test that update_ack_file correctly updates the ack file when there was an existing ack file""" + # Mock existing content in the ack file + existing_content = generate_sample_existing_ack_content() + setup_existing_ack_file(MOCK_MESSAGE_DETAILS.temp_ack_file_key, existing_content, self.s3_client) + + ack_data_rows = [ + ValidValues.ack_data_success_dict, + ValidValues.ack_data_failure_dict, + ] + update_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=ack_data_rows, + ) + + actual_ack_file_content = obtain_current_ack_file_content(self.s3_client) + expected_rows = [ + generate_expected_ack_file_row(success=True, imms_id=DefaultValues.imms_id), + generate_expected_ack_file_row(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + ] + expected_ack_file_content = existing_content + "\n".join(expected_rows) + "\n" + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + + def test_create_ack_data(self): + """Test create_ack_data with success and failure cases.""" + + success_expected_result = { + "MESSAGE_HEADER_ID": MOCK_MESSAGE_DETAILS.row_id, + "HEADER_RESPONSE_CODE": "OK", + "ISSUE_SEVERITY": "Information", + "ISSUE_CODE": "OK", + "ISSUE_DETAILS_CODE": "30001", + "RESPONSE_TYPE": "Business", + "RESPONSE_CODE": "30001", + "RESPONSE_DISPLAY": "Success", + "RECEIVED_TIME": MOCK_MESSAGE_DETAILS.created_at_formatted_string, + "MAILBOX_FROM": "", + "LOCAL_ID": MOCK_MESSAGE_DETAILS.local_id, + "IMMS_ID": MOCK_MESSAGE_DETAILS.imms_id, + "OPERATION_OUTCOME": "", + "MESSAGE_DELIVERY": True, + } + + failure_expected_result = { + "MESSAGE_HEADER_ID": MOCK_MESSAGE_DETAILS.row_id, + "HEADER_RESPONSE_CODE": "Fatal Error", + "ISSUE_SEVERITY": "Fatal", + "ISSUE_CODE": "Fatal Error", + "ISSUE_DETAILS_CODE": "30002", + "RESPONSE_TYPE": "Business", + "RESPONSE_CODE": "30002", + "RESPONSE_DISPLAY": "Business Level Response Value - Processing Error", + "RECEIVED_TIME": MOCK_MESSAGE_DETAILS.created_at_formatted_string, + "MAILBOX_FROM": "", + "LOCAL_ID": MOCK_MESSAGE_DETAILS.local_id, + "IMMS_ID": "", + "OPERATION_OUTCOME": "test diagnostics message", + "MESSAGE_DELIVERY": False, + } + + test_cases = [ + { + "success": True, + "imms_id": MOCK_MESSAGE_DETAILS.imms_id, + "expected_result": success_expected_result, + }, + { + "success": False, + "diagnostics": "test diagnostics message", + "expected_result": failure_expected_result, + }, + ] + + for test_case in test_cases: + with self.subTest(f"success is {test_case['success']}"): + result = create_ack_data( + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + local_id=MOCK_MESSAGE_DETAILS.local_id, + row_id=MOCK_MESSAGE_DETAILS.row_id, + successful_api_response=test_case["success"], + diagnostics=test_case.get("diagnostics"), + imms_id=test_case.get("imms_id"), + ) + self.assertEqual(result, test_case["expected_result"]) + + def test_obtain_current_ack_content_file_no_existing(self): + """Test that when the ack file does not yet exist, obtain_current_ack_content returns the ack headers only.""" + result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) + self.assertEqual(result.getvalue(), ValidValues.ack_headers) + + def test_obtain_current_ack_content_file_exists(self): + """Test that the existing ack file content is retrieved and new rows are added.""" + existing_content = generate_sample_existing_ack_content() + setup_existing_ack_file(MOCK_MESSAGE_DETAILS.temp_ack_file_key, existing_content, self.s3_client) + result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) + self.assertEqual(result.getvalue(), existing_content) + + def validate_json_ack_file_content( + self, + incoming_messages: list[dict], + existing_file_content: str = ValidValues.json_ack_initial_content, + ) -> None: + """ + Obtains the json ack file content and ensures that it matches the expected content (expected content is based + on the incoming messages). + """ + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + def test_update_json_ack_file(self): """Test that update_json_ack_file correctly creates the ack file when there was no existing ack file""" @@ -266,30 +364,6 @@ def test_update_json_ack_file(self): Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, ) - def test_update_ack_file_existing(self): - """Test that update_ack_file correctly updates the ack file when there was an existing ack file""" - # Mock existing content in the ack file - existing_content = generate_sample_existing_ack_content() - setup_existing_ack_file(MOCK_MESSAGE_DETAILS.temp_ack_file_key, existing_content, self.s3_client) - - ack_data_rows = [ - ValidValues.ack_data_success_dict, - ValidValues.ack_data_failure_dict, - ] - update_ack_file( - file_key=MOCK_MESSAGE_DETAILS.file_key, - created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, - ack_data_rows=ack_data_rows, - ) - - actual_ack_file_content = obtain_current_ack_file_content(self.s3_client) - expected_rows = [ - generate_expected_ack_file_row(success=True, imms_id=DefaultValues.imms_id), - generate_expected_ack_file_row(success=False, imms_id="", diagnostics="DIAGNOSTICS"), - ] - expected_ack_file_content = existing_content + "\n".join(expected_rows) + "\n" - self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def test_update_json_ack_file_existing(self): """Test that update_json_ack_file correctly updates the ack file when there was an existing ack file""" # Mock existing content in the ack file @@ -318,80 +392,6 @@ def test_update_json_ack_file_existing(self): expected_ack_file_content["failures"].append(expected_rows[0]) self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def test_create_ack_data(self): - """Test create_ack_data with success and failure cases.""" - - success_expected_result = { - "MESSAGE_HEADER_ID": MOCK_MESSAGE_DETAILS.row_id, - "HEADER_RESPONSE_CODE": "OK", - "ISSUE_SEVERITY": "Information", - "ISSUE_CODE": "OK", - "ISSUE_DETAILS_CODE": "30001", - "RESPONSE_TYPE": "Business", - "RESPONSE_CODE": "30001", - "RESPONSE_DISPLAY": "Success", - "RECEIVED_TIME": MOCK_MESSAGE_DETAILS.created_at_formatted_string, - "MAILBOX_FROM": "", - "LOCAL_ID": MOCK_MESSAGE_DETAILS.local_id, - "IMMS_ID": MOCK_MESSAGE_DETAILS.imms_id, - "OPERATION_OUTCOME": "", - "MESSAGE_DELIVERY": True, - } - - failure_expected_result = { - "MESSAGE_HEADER_ID": MOCK_MESSAGE_DETAILS.row_id, - "HEADER_RESPONSE_CODE": "Fatal Error", - "ISSUE_SEVERITY": "Fatal", - "ISSUE_CODE": "Fatal Error", - "ISSUE_DETAILS_CODE": "30002", - "RESPONSE_TYPE": "Business", - "RESPONSE_CODE": "30002", - "RESPONSE_DISPLAY": "Business Level Response Value - Processing Error", - "RECEIVED_TIME": MOCK_MESSAGE_DETAILS.created_at_formatted_string, - "MAILBOX_FROM": "", - "LOCAL_ID": MOCK_MESSAGE_DETAILS.local_id, - "IMMS_ID": "", - "OPERATION_OUTCOME": "test diagnostics message", - "MESSAGE_DELIVERY": False, - } - - test_cases = [ - { - "success": True, - "imms_id": MOCK_MESSAGE_DETAILS.imms_id, - "expected_result": success_expected_result, - }, - { - "success": False, - "diagnostics": "test diagnostics message", - "expected_result": failure_expected_result, - }, - ] - - for test_case in test_cases: - with self.subTest(f"success is {test_case['success']}"): - result = create_ack_data( - created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, - local_id=MOCK_MESSAGE_DETAILS.local_id, - row_id=MOCK_MESSAGE_DETAILS.row_id, - successful_api_response=test_case["success"], - diagnostics=test_case.get("diagnostics"), - imms_id=test_case.get("imms_id"), - ) - self.assertEqual(result, test_case["expected_result"]) - - def test_obtain_current_ack_content_file_no_existing(self): - """Test that when the ack file does not yet exist, obtain_current_ack_content returns the ack headers only.""" - result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) - self.assertEqual(result.getvalue(), ValidValues.ack_headers) - - def test_obtain_current_ack_content_file_exists(self): - """Test that the existing ack file content is retrieved and new rows are added.""" - existing_content = generate_sample_existing_ack_content() - setup_existing_ack_file(MOCK_MESSAGE_DETAILS.temp_ack_file_key, existing_content, self.s3_client) - result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) - self.assertEqual(result.getvalue(), existing_content) - def test_obtain_current_json_ack_content_file_no_existing(self): """Test that when the json ack file does not yet exist, obtain_current_json_ack_content returns the ack headers only.""" result = obtain_current_json_ack_content( diff --git a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py index 16e8eb147..1f0ffcfda 100644 --- a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py @@ -52,14 +52,6 @@ def obtain_current_ack_file_content(s3_client, temp_ack_file_key: str = MOCK_MES return retrieved_object["Body"].read().decode("utf-8") -def obtain_current_json_ack_file_content( - s3_client, temp_ack_file_key: str = MOCK_MESSAGE_DETAILS.temp_ack_file_key -) -> dict: - """Obtains the ack file content from the destination bucket.""" - retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=temp_ack_file_key) - return json.loads(retrieved_object["Body"].read().decode("utf-8")) - - def obtain_completed_ack_file_content( s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key ) -> str: @@ -68,14 +60,6 @@ def obtain_completed_ack_file_content( return retrieved_object["Body"].read().decode("utf-8") -def obtain_completed_json_ack_file_content( - s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key -) -> dict: - """Obtains the ack file content from the forwardedFile directory""" - retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) - return json.loads(retrieved_object["Body"].read().decode("utf-8")) - - def generate_expected_ack_file_row( success: bool, imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, @@ -97,40 +81,11 @@ def generate_expected_ack_file_row( ) -def generate_expected_json_ack_file_element( - success: bool, - imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, - diagnostics: str = None, - row_id: str = MOCK_MESSAGE_DETAILS.row_id, - local_id: str = MOCK_MESSAGE_DETAILS.local_id, - created_at_formatted_string: str = MOCK_MESSAGE_DETAILS.created_at_formatted_string, -) -> dict: - """Create an ack element, containing the given message details.""" - if success: - return None # we no longer process success elements - else: - return { - "rowId": row_id.split("^")[-1], - "responseCode": "30002", - "responseDisplay": "Business Level Response Value - Processing Error", - "severity": "Fatal", - "localId": local_id, - "operationOutcome": "" if not diagnostics else diagnostics, - } - - def generate_sample_existing_ack_content() -> str: """Returns sample ack file content with a single success row.""" return ValidValues.ack_headers + generate_expected_ack_file_row(success=True) -def generate_sample_existing_json_ack_content() -> dict: - """Returns sample ack file content with a single failure row.""" - sample_content = ValidValues.json_ack_initial_content - sample_content["failures"].append(generate_expected_json_ack_file_element(success=False)) - return sample_content - - def generate_expected_ack_content(incoming_messages: list[dict], existing_content: str = ValidValues.ack_headers) -> str: """Returns the expected_ack_file_content based on the incoming messages""" for message in incoming_messages: @@ -160,6 +115,68 @@ def generate_expected_ack_content(incoming_messages: list[dict], existing_conten return existing_content +def validate_ack_file_content( + s3_client, + incoming_messages: list[dict], + existing_file_content: str = ValidValues.ack_headers, + is_complete: bool = False, +) -> None: + """ + Obtains the ack file content and ensures that it matches the expected content (expected content is based + on the incoming messages). + """ + actual_ack_file_content = ( + obtain_current_ack_file_content(s3_client) if not is_complete else (obtain_completed_ack_file_content(s3_client)) + ) + expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) + assert expected_ack_file_content == actual_ack_file_content + + +def obtain_current_json_ack_file_content( + s3_client, temp_ack_file_key: str = MOCK_MESSAGE_DETAILS.temp_ack_file_key +) -> dict: + """Obtains the ack file content from the destination bucket.""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=temp_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + +def obtain_completed_json_ack_file_content( + s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key +) -> dict: + """Obtains the ack file content from the forwardedFile directory""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + +def generate_expected_json_ack_file_element( + success: bool, + imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, + diagnostics: str = None, + row_id: str = MOCK_MESSAGE_DETAILS.row_id, + local_id: str = MOCK_MESSAGE_DETAILS.local_id, + created_at_formatted_string: str = MOCK_MESSAGE_DETAILS.created_at_formatted_string, +) -> dict: + """Create an ack element, containing the given message details.""" + if success: + return None # we no longer process success elements + else: + return { + "rowId": row_id.split("^")[-1], + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": local_id, + "operationOutcome": "" if not diagnostics else diagnostics, + } + + +def generate_sample_existing_json_ack_content() -> dict: + """Returns sample ack file content with a single failure row.""" + sample_content = ValidValues.json_ack_initial_content + sample_content["failures"].append(generate_expected_json_ack_file_element(success=False)) + return sample_content + + def generate_expected_json_ack_content( incoming_messages: list[dict], existing_content: str = ValidValues.json_ack_initial_content ) -> dict: @@ -191,23 +208,6 @@ def generate_expected_json_ack_content( return existing_content -def validate_ack_file_content( - s3_client, - incoming_messages: list[dict], - existing_file_content: str = ValidValues.ack_headers, - is_complete: bool = False, -) -> None: - """ - Obtains the ack file content and ensures that it matches the expected content (expected content is based - on the incoming messages). - """ - actual_ack_file_content = ( - obtain_current_ack_file_content(s3_client) if not is_complete else (obtain_completed_ack_file_content(s3_client)) - ) - expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) - assert expected_ack_file_content == actual_ack_file_content - - def validate_json_ack_file_content( s3_client, incoming_messages: list[dict], diff --git a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py index 8c1cf0092..801436a6a 100644 --- a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py @@ -103,12 +103,12 @@ def __init__( self.temp_ack_file_key = ( f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.csv" ) - self.temp_json_ack_file_key = ( - f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" - ) self.archive_ack_file_key = ( f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.csv" ) + self.temp_json_ack_file_key = ( + f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" + ) self.archive_json_ack_file_key = ( f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" ) From 3a7586101b5b764a2f662c73f4859ef1af1c1b9a Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Mon, 2 Feb 2026 17:15:14 +0000 Subject: [PATCH 17/23] extra mocks --- lambdas/ack_backend/tests/test_splunk_logging.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lambdas/ack_backend/tests/test_splunk_logging.py b/lambdas/ack_backend/tests/test_splunk_logging.py index a8e0e69c0..5c0e9fb34 100644 --- a/lambdas/ack_backend/tests/test_splunk_logging.py +++ b/lambdas/ack_backend/tests/test_splunk_logging.py @@ -48,6 +48,12 @@ def setUp(self): self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION) self.ack_bucket_patcher.start() + self.get_ingestion_start_time_by_message_id_patcher = patch( + "update_ack_file.get_ingestion_start_time_by_message_id" + ) + self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start() + self.mock_get_ingestion_start_time_by_message_id.return_value = 3456 + def tearDown(self): GenericTearDown(self.s3_client) @@ -100,9 +106,9 @@ def expected_lambda_handler_logs(self, success: bool, number_of_rows, ingestion_ """Returns the expected logs for the lambda handler function.""" # Mocking of timings is such that the time taken is 2 seconds for each row, # plus 2 seconds for the handler if it succeeds (i.e. it calls update_ack_file) or 1 second if it doesn't; - # plus an extra second if ingestion is complete + # plus an extra 2 seconds if ingestion is complete if success: - time_taken = f"{number_of_rows * 2 + 3}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s" + time_taken = f"{number_of_rows * 2 + 4}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s" else: time_taken = f"{number_of_rows * 2 + 1}.0s" @@ -429,6 +435,7 @@ def test_splunk_update_ack_file_logged(self): patch("common.log_decorator.logger") as mock_logger, # noqa: E999 patch("update_ack_file.get_record_count_and_failures_by_message_id", return_value=(99, 2)), patch("update_ack_file.update_audit_table_item") as mock_update_audit_table_item, # noqa: E999 + patch("update_ack_file.move_file"), # noqa: E999 patch("ack_processor.increment_records_failed_count"), # noqa: E999 ): # noqa: E999 mock_datetime.now.return_value = ValidValues.fixed_datetime @@ -443,7 +450,7 @@ def test_splunk_update_ack_file_logged(self): expected_secondlast_logger_info_data = { **ValidValues.upload_ack_file_expected_log, "message_id": "test", - "time_taken": "1.0s", + "time_taken": "2.0s", } expected_last_logger_info_data = self.expected_lambda_handler_logs( success=True, number_of_rows=99, ingestion_complete=True From f2151134009b200a8cdd8540f48dd8fea99a15b4 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 3 Feb 2026 10:13:00 +0000 Subject: [PATCH 18/23] supplier --- lambdas/ack_backend/src/update_ack_file.py | 7 ++----- .../tests/utils/values_for_ack_backend_tests.py | 4 ++-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index be9568211..23df83816 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -113,6 +113,7 @@ def complete_batch_file_process( generated_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z") ack_data_dict["generatedDate"] = generated_date + ack_data_dict["provider"] = supplier ack_data_dict["summary"]["totalRecords"] = total_ack_rows_processed ack_data_dict["summary"]["success"] = successful_record_count ack_data_dict["summary"]["failed"] = total_failures @@ -191,10 +192,6 @@ def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> ingestion_start_time = get_ingestion_start_time_by_message_id(message_id) raw_ack_filename = temp_ack_file_key.split(".")[0] - try: - provider = temp_ack_file_key.split("_")[3] - except IndexError: - provider = "unknown" # Generate the initial fields ack_data_dict = {} @@ -202,8 +199,8 @@ def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> ack_data_dict["version"] = 1 # TO FIX ack_data_dict["generatedDate"] = "" # will be filled on completion + ack_data_dict["provider"] = "" # will be filled on completion ack_data_dict["filename"] = raw_ack_filename - ack_data_dict["provider"] = provider ack_data_dict["messageHeaderId"] = message_id ack_data_dict["summary"] = {} diff --git a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py index 801436a6a..4b99d3196 100644 --- a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py @@ -270,7 +270,7 @@ class ValidValues: "version": 1, "generatedDate": "", "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", - "provider": "X26", + "provider": "", "messageHeaderId": "test_file_id", "summary": {"ingestionTime": {"start": 3456}}, "failures": [], @@ -281,7 +281,7 @@ class ValidValues: "version": 1, "generatedDate": "7890", "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", - "provider": "X26", + "provider": "RAVS", "messageHeaderId": "test_file_id", "summary": {"totalRecords": 10, "success": 9, "failed": 1, "ingestionTime": {"start": 3456, "end": 7890}}, "failures": [ From 10a7e91d67bdf2d376b9cfbaffb529706a7b8526 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Tue, 3 Feb 2026 11:57:37 +0000 Subject: [PATCH 19/23] restored move_file --- lambdas/ack_backend/src/update_ack_file.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 23df83816..3fef0faa6 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -81,6 +81,12 @@ def complete_batch_file_process( the audit table status""" start_time = time.time() + # finish CSV file + ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" + + move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}") + move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}") + total_ack_rows_processed, total_failures = get_record_count_and_failures_by_message_id(message_id) update_audit_table_item( file_key=file_key, message_id=message_id, attrs_to_update={AuditTableKeys.STATUS: FileStatus.PROCESSED} @@ -100,12 +106,6 @@ def complete_batch_file_process( }, ) - # finish CSV file - ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" - - move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}") - move_file(SOURCE_BUCKET_NAME, f"{BATCH_FILE_PROCESSING_DIR}/{file_key}", f"{BATCH_FILE_ARCHIVE_DIR}/{file_key}") - # finish JSON file json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}" From 6374daf7e539cc57c71536d66a2b11cdde5a232c Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 4 Feb 2026 09:05:59 +0000 Subject: [PATCH 20/23] json ack file unit tests for lambda_handler (part 1) --- .../ack_backend/tests/test_ack_processor.py | 37 +++++++++++++++++++ .../utils/utils_for_ack_backend_tests.py | 17 ++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/lambdas/ack_backend/tests/test_ack_processor.py b/lambdas/ack_backend/tests/test_ack_processor.py index cc3ca7a0d..720c8da74 100644 --- a/lambdas/ack_backend/tests/test_ack_processor.py +++ b/lambdas/ack_backend/tests/test_ack_processor.py @@ -23,7 +23,9 @@ from utils.utils_for_ack_backend_tests import ( add_audit_entry_to_table, generate_sample_existing_ack_content, + generate_sample_existing_json_ack_content, validate_ack_file_content, + validate_json_ack_file_content, ) from utils.values_for_ack_backend_tests import ( EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS, @@ -94,15 +96,20 @@ def assert_ack_and_source_file_locations_correct( source_file_key: str, tmp_ack_file_key: str, complete_ack_file_key: str, + tmp_json_ack_file_key: str, + complete_json_ack_file_key: str, is_complete: bool, ) -> None: """Helper function to check the ack and source files have not been moved as the processing is not yet complete""" if is_complete: ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) + json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_json_ack_file_key) else: ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_ack_file_key) + json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_json_ack_file_key) self.assertIsNotNone(ack_file["Body"].read()) + self.assertIsNotNone(json_ack_file["Body"].read()) full_src_file_key = f"archive/{source_file_key}" if is_complete else f"processing/{source_file_key}" src_file = self.s3_client.get_object(Bucket=BucketNames.SOURCE, Key=full_src_file_key) @@ -131,6 +138,7 @@ def assert_audit_entry_counts_equal(self, message_id: str, expected_counts: dict self.assertDictEqual(actual_counts, expected_counts) + # TODO: modify these for json_ack_file_content def test_lambda_handler_main_multiple_records(self): """Test lambda handler with multiple records.""" # Set up an audit entry which does not yet have record_count recorded @@ -264,6 +272,9 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records # Original source file had 100 records add_audit_entry_to_table(self.dynamodb_client, mock_batch_message_id, record_count=100) + existing_json_file_content = deepcopy(ValidValues.json_ack_initial_content) + existing_json_file_content["messageHeaderId"] = mock_batch_message_id + array_of_failure_messages = [ { **BASE_FAILURE_MESSAGE, @@ -287,10 +298,17 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records [*array_of_failure_messages], existing_file_content=ValidValues.ack_headers, ) + validate_json_ack_file_content( + self.s3_client, + [*array_of_failure_messages], + existing_file_content=existing_json_file_content, + ) self.assert_ack_and_source_file_locations_correct( MOCK_MESSAGE_DETAILS.file_key, MOCK_MESSAGE_DETAILS.temp_ack_file_key, MOCK_MESSAGE_DETAILS.archive_ack_file_key, + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + MOCK_MESSAGE_DETAILS.archive_json_ack_file_key, is_complete=False, ) self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed") @@ -317,6 +335,12 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key, Body=StringIO(existing_ack_content).getvalue(), ) + existing_json_ack_content = generate_sample_existing_json_ack_content(mock_batch_message_id) + self.s3_client.put_object( + Bucket=BucketNames.DESTINATION, + Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + Body=json.dumps(existing_json_ack_content), + ) array_of_failure_messages = [ { @@ -331,11 +355,16 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro all_messages_plus_eof = deepcopy(array_of_failure_messages) all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message) test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]} + expected_entry_counts = { "record_count": "100", "records_succeeded": "49", "records_failed": "51", } + # Include summary counts in expected JSON content + existing_json_ack_content["summary"]["totalRecords"] = int(expected_entry_counts["record_count"]) + existing_json_ack_content["summary"]["success"] = int(expected_entry_counts["records_succeeded"]) + existing_json_ack_content["summary"]["failed"] = int(expected_entry_counts["records_failed"]) response = lambda_handler(event=test_event, context={}) @@ -343,10 +372,18 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro validate_ack_file_content( self.s3_client, [*array_of_failure_messages], existing_file_content=existing_ack_content, is_complete=True ) + validate_json_ack_file_content( + self.s3_client, + [*array_of_failure_messages], + existing_file_content=existing_json_ack_content, + is_complete=True, + ) self.assert_ack_and_source_file_locations_correct( MOCK_MESSAGE_DETAILS.file_key, MOCK_MESSAGE_DETAILS.temp_ack_file_key, MOCK_MESSAGE_DETAILS.archive_ack_file_key, + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + MOCK_MESSAGE_DETAILS.archive_json_ack_file_key, is_complete=True, ) self.assert_audit_entry_status_equals(mock_batch_message_id, "Processed") diff --git a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py index 1f0ffcfda..7f5ae6451 100644 --- a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py @@ -1,6 +1,7 @@ """Utils functions for the ack backend tests""" import json +from copy import deepcopy from typing import Optional from boto3 import client as boto3_client @@ -170,13 +171,15 @@ def generate_expected_json_ack_file_element( } -def generate_sample_existing_json_ack_content() -> dict: +def generate_sample_existing_json_ack_content(message_id: str = "test_file_id") -> dict: """Returns sample ack file content with a single failure row.""" - sample_content = ValidValues.json_ack_initial_content + sample_content = deepcopy(ValidValues.json_ack_initial_content) + sample_content["messageHeaderId"] = message_id sample_content["failures"].append(generate_expected_json_ack_file_element(success=False)) return sample_content +# TODO: take supplier and summary counts as arguments def generate_expected_json_ack_content( incoming_messages: list[dict], existing_content: str = ValidValues.json_ack_initial_content ) -> dict: @@ -224,4 +227,14 @@ def validate_json_ack_file_content( else obtain_completed_json_ack_file_content(s3_client, MOCK_MESSAGE_DETAILS.archive_json_ack_file_key) ) expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) + + # in order for this to work, we need to disregard generated_date and ingestion time. + # TODO: we should fill 'provider' in generate_expected_json_ack_content() if we're complete. + actual_ack_file_content["generatedDate"] = expected_ack_file_content["generatedDate"] + actual_ack_file_content["summary"]["ingestionTime"] = expected_ack_file_content["summary"]["ingestionTime"] + actual_ack_file_content["provider"] = expected_ack_file_content["provider"] + + # print(expected_ack_file_content) + # print(actual_ack_file_content) + assert expected_ack_file_content == actual_ack_file_content From c5fddb11642d0d4808274cf6937959e5829723f7 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 4 Feb 2026 09:08:19 +0000 Subject: [PATCH 21/23] archive source file test - to support e2e testing --- .../tests/test_update_ack_file_flow.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/lambdas/ack_backend/tests/test_update_ack_file_flow.py b/lambdas/ack_backend/tests/test_update_ack_file_flow.py index aa020d763..f1f0fa9be 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file_flow.py +++ b/lambdas/ack_backend/tests/test_update_ack_file_flow.py @@ -33,6 +33,9 @@ def setUp(self): self.logger_patcher = patch("update_ack_file.logger") self.mock_logger = self.logger_patcher.start() + self.firehose_patcher = patch("common.clients.global_firehose_client") + self.mock_firehose = self.firehose_patcher.start() + self.update_audit_table_item_patcher = patch("update_ack_file.update_audit_table_item") self.mock_update_audit_table_item = self.update_audit_table_item_patcher.start() self.get_record_and_failure_count_patcher = patch("update_ack_file.get_record_count_and_failures_by_message_id") @@ -42,6 +45,7 @@ def setUp(self): def tearDown(self): self.logger_patcher.stop() + self.firehose_patcher.stop() self.update_audit_table_item_patcher.stop() self.get_record_and_failure_count_patcher.stop() self.get_ingestion_start_time_patcher.stop() @@ -75,3 +79,43 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): # Assert: Only check audit table interactions self.mock_get_record_and_failure_count.assert_called_once_with(message_id) self.assertEqual(self.mock_update_audit_table_item.call_count, 2) + + def test_source_file_moved_when_ack_process_complete(self): + """VED-167 - Test that the source file has been moved correctly""" + # Setup + message_id = "msg-audit-table" + mock_created_at_string = "created_at_formatted_string" + file_key = "audit_table_test.csv" + self.s3_client.put_object( + Bucket=self.source_bucket_name, + Key=f"processing/{file_key}", + Body="dummy content", + ) + self.s3_client.put_object( + Bucket=self.ack_bucket_name, Key=f"TempAck/audit_table_test_BusAck_{mock_created_at_string}.csv" + ) + self.mock_get_record_and_failure_count.return_value = 10, 2 + self.mock_get_ingestion_start_time.return_value = 1769781283 + + # Assert that the source file is not yet in the archive folder + with self.assertRaises(self.s3_client.exceptions.NoSuchKey): + archived_obj = self.s3_client.get_object( + Bucket=self.source_bucket_name, + Key=f"archive/{file_key}", + ) + + # Act + update_ack_file.complete_batch_file_process( + message_id=message_id, + supplier="queue-audit-table-supplier", + vaccine_type="vaccine-type", + created_at_formatted_string=mock_created_at_string, + file_key=file_key, + ) + + # Assert that the source file has been moved into the archive folder + archived_obj = self.s3_client.get_object( + Bucket=self.source_bucket_name, + Key=f"archive/{file_key}", + ) + self.assertIsNotNone(archived_obj) From 59b1b38ace2bb3fa8e880ea9448118f92bfd080a Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 4 Feb 2026 09:09:38 +0000 Subject: [PATCH 22/23] remove redundant functions --- .../ack_backend/tests/test_update_ack_file.py | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index fdc9f6873..8b306a9a0 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -21,9 +21,7 @@ ) from utils.utils_for_ack_backend_tests import ( MOCK_MESSAGE_DETAILS, - generate_expected_ack_content, generate_expected_ack_file_row, - generate_expected_json_ack_content, generate_expected_json_ack_file_element, generate_sample_existing_ack_content, generate_sample_existing_json_ack_content, @@ -93,19 +91,6 @@ def setUp(self) -> None: def tearDown(self) -> None: GenericTearDown(s3_client=self.s3_client) - def validate_ack_file_content( - self, - incoming_messages: list[dict], - existing_file_content: str = ValidValues.ack_headers, - ) -> None: - """ - Obtains the ack file content and ensures that it matches the expected content (expected content is based - on the incoming messages). - """ - actual_ack_file_content = obtain_current_ack_file_content(self.s3_client) - expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) - self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def test_update_ack_file(self): """Test that update_ack_file correctly creates the ack file when there was no existing ack file""" @@ -275,21 +260,6 @@ def test_obtain_current_ack_content_file_exists(self): result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) self.assertEqual(result.getvalue(), existing_content) - def validate_json_ack_file_content( - self, - incoming_messages: list[dict], - existing_file_content: str = ValidValues.json_ack_initial_content, - ) -> None: - """ - Obtains the json ack file content and ensures that it matches the expected content (expected content is based - on the incoming messages). - """ - actual_ack_file_content = obtain_current_json_ack_file_content( - self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key - ) - expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) - self.assertEqual(expected_ack_file_content, actual_ack_file_content) - def test_update_json_ack_file(self): """Test that update_json_ack_file correctly creates the ack file when there was no existing ack file""" From 22521d29a0247aa7176f296f5376483db3e8ba36 Mon Sep 17 00:00:00 2001 From: James Wharmby Date: Wed, 4 Feb 2026 16:26:12 +0000 Subject: [PATCH 23/23] bugfix: get message_id from correct place for JSON ack file --- lambdas/ack_backend/src/ack_processor.py | 2 +- lambdas/ack_backend/src/update_ack_file.py | 2 +- .../ack_backend/tests/test_update_ack_file.py | 19 +++++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/lambdas/ack_backend/src/ack_processor.py b/lambdas/ack_backend/src/ack_processor.py index b03f94f6e..dfc5d9df1 100644 --- a/lambdas/ack_backend/src/ack_processor.py +++ b/lambdas/ack_backend/src/ack_processor.py @@ -58,7 +58,7 @@ def lambda_handler(event, _): update_ack_file(file_key, created_at_formatted_string, ack_data_rows) - update_json_ack_file(file_key, created_at_formatted_string, ack_data_rows) + update_json_ack_file(message_id, file_key, created_at_formatted_string, ack_data_rows) if file_processing_complete: complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index 3fef0faa6..1f59ad9d0 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -236,6 +236,7 @@ def update_ack_file( def update_json_ack_file( + message_id: str, file_key: str, created_at_formatted_string: str, ack_data_rows: list, @@ -243,7 +244,6 @@ def update_json_ack_file( """Updates the ack file with the new data row based on the given arguments""" ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" - message_id = ack_data_rows[0]["MESSAGE_HEADER_ID"].split("^")[0] ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) for row in ack_data_rows: diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index 8b306a9a0..9213bb3aa 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -317,6 +317,7 @@ def test_update_json_ack_file(self): for test_case in test_cases: with self.subTest(test_case["description"]): update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, file_key=MOCK_MESSAGE_DETAILS.file_key, created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, ack_data_rows=test_case["input_rows"], @@ -346,6 +347,7 @@ def test_update_json_ack_file_existing(self): ValidValues.ack_data_failure_dict, ] update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, file_key=MOCK_MESSAGE_DETAILS.file_key, created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, ack_data_rows=ack_data_rows, @@ -394,6 +396,7 @@ def test_complete_batch_file_process_json_ack_file(self): ack_data_rows=[ValidValues.ack_data_failure_dict], ) update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, file_key=MOCK_MESSAGE_DETAILS.file_key, created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, ack_data_rows=[ValidValues.ack_data_failure_dict], @@ -413,6 +416,22 @@ def test_complete_batch_file_process_json_ack_file(self): ) self.assertEqual(result, ValidValues.json_ack_complete_content) + def test_update_json_ack_file_with_empty_ack_data_rows(self): + """Test that update_json_ack_file correctly updates the ack file when given an empty list""" + # Mock existing content in the ack file + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + + # Should not raise an exception + update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[], + ) + if __name__ == "__main__": unittest.main()