diff --git a/lambdas/ack_backend/src/ack_processor.py b/lambdas/ack_backend/src/ack_processor.py index 6fdaae20b..dfc5d9df1 100644 --- a/lambdas/ack_backend/src/ack_processor.py +++ b/lambdas/ack_backend/src/ack_processor.py @@ -6,7 +6,11 @@ from common.batch.eof_utils import is_eof_message from convert_message_to_ack_row import convert_message_to_ack_row from logging_decorators import ack_lambda_handler_logging_decorator -from update_ack_file import complete_batch_file_process, update_ack_file +from update_ack_file import ( + complete_batch_file_process, + update_ack_file, + update_json_ack_file, +) @ack_lambda_handler_logging_decorator @@ -54,6 +58,8 @@ def lambda_handler(event, _): update_ack_file(file_key, created_at_formatted_string, ack_data_rows) + update_json_ack_file(message_id, file_key, created_at_formatted_string, ack_data_rows) + if file_processing_complete: complete_batch_file_process(message_id, supplier, vaccine_type, created_at_formatted_string, file_key) diff --git a/lambdas/ack_backend/src/update_ack_file.py b/lambdas/ack_backend/src/update_ack_file.py index a4969c765..1f59ad9d0 100644 --- a/lambdas/ack_backend/src/update_ack_file.py +++ b/lambdas/ack_backend/src/update_ack_file.py @@ -1,5 +1,6 @@ """Functions for uploading the data to the ack file""" +import json import os import time from datetime import datetime @@ -8,10 +9,19 @@ from botocore.exceptions import ClientError from common.aws_s3_utils import move_file -from common.batch.audit_table import get_record_count_and_failures_by_message_id, update_audit_table_item +from common.batch.audit_table import ( + get_ingestion_start_time_by_message_id, + get_record_count_and_failures_by_message_id, + update_audit_table_item, +) from common.clients import get_s3_client, logger from common.log_decorator import generate_and_send_logs -from common.models.batch_constants import ACK_BUCKET_NAME, SOURCE_BUCKET_NAME, AuditTableKeys, FileStatus +from common.models.batch_constants import ( + ACK_BUCKET_NAME, + SOURCE_BUCKET_NAME, + AuditTableKeys, + FileStatus, +) from constants import ( ACK_HEADERS, BATCH_FILE_ARCHIVE_DIR, @@ -71,6 +81,7 @@ def complete_batch_file_process( the audit table status""" start_time = time.time() + # finish CSV file ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{ack_filename}", f"{COMPLETED_ACK_DIR}/{ack_filename}") @@ -82,7 +93,9 @@ def complete_batch_file_process( ) # Consider creating time utils and using datetime instead of time - ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time.gmtime()) + time_now = time.gmtime(time.time()) + ingestion_end_time = time.strftime("%Y%m%dT%H%M%S00", time_now) + ingestion_end_time_seconds = int(time.strftime("%s", time_now)) successful_record_count = total_ack_rows_processed - total_failures update_audit_table_item( file_key=file_key, @@ -93,6 +106,24 @@ def complete_batch_file_process( }, ) + # finish JSON file + json_ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" + temp_ack_file_key = f"{TEMP_ACK_DIR}/{json_ack_filename}" + ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) + + generated_date = time.strftime("%Y-%m-%dT%H:%M:%S.000Z") + ack_data_dict["generatedDate"] = generated_date + ack_data_dict["provider"] = supplier + ack_data_dict["summary"]["totalRecords"] = total_ack_rows_processed + ack_data_dict["summary"]["success"] = successful_record_count + ack_data_dict["summary"]["failed"] = total_failures + ack_data_dict["summary"]["ingestionTime"]["end"] = ingestion_end_time_seconds + + # Upload ack_data_dict to S3 + json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) + get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key) + move_file(ACK_BUCKET_NAME, f"{TEMP_ACK_DIR}/{json_ack_filename}", f"{COMPLETED_ACK_DIR}/{json_ack_filename}") + result = { "message_id": message_id, "file_key": file_key, @@ -147,6 +178,42 @@ def obtain_current_ack_content(temp_ack_file_key: str) -> StringIO: return accumulated_csv_content +def obtain_current_json_ack_content(message_id: str, temp_ack_file_key: str) -> dict: + """Returns the current ack file content if the file exists, or else initialises the content with the ack headers.""" + try: + # If ack file exists in S3 download the contents + existing_ack_file = get_s3_client().get_object(Bucket=ACK_BUCKET_NAME, Key=temp_ack_file_key) + existing_content = existing_ack_file["Body"].read().decode("utf-8") + ack_data_dict = json.loads(existing_content) + except ClientError as error: + # If ack file does not exist in S3 create a new file containing the headers only + if error.response["Error"]["Code"] in ("404", "NoSuchKey"): + logger.info("No existing JSON ack file found in S3 - creating new file") + + ingestion_start_time = get_ingestion_start_time_by_message_id(message_id) + raw_ack_filename = temp_ack_file_key.split(".")[0] + + # Generate the initial fields + ack_data_dict = {} + ack_data_dict["system"] = "Immunisation FHIR API Batch Report" + ack_data_dict["version"] = 1 # TO FIX + + ack_data_dict["generatedDate"] = "" # will be filled on completion + ack_data_dict["provider"] = "" # will be filled on completion + ack_data_dict["filename"] = raw_ack_filename + ack_data_dict["messageHeaderId"] = message_id + + ack_data_dict["summary"] = {} + ack_data_dict["summary"]["ingestionTime"] = {} + ack_data_dict["summary"]["ingestionTime"]["start"] = ingestion_start_time + ack_data_dict["failures"] = [] + else: + logger.error("error whilst obtaining current JSON ack content: %s", error) + raise + + return ack_data_dict + + def update_ack_file( file_key: str, created_at_formatted_string: str, @@ -155,7 +222,6 @@ def update_ack_file( """Updates the ack file with the new data row based on the given arguments""" ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.csv')}" temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" - completed_ack_file_key = f"{COMPLETED_ACK_DIR}/{ack_filename}" accumulated_csv_content = obtain_current_ack_content(temp_ack_file_key) for row in ack_data_rows: @@ -166,4 +232,32 @@ def update_ack_file( csv_file_like_object = BytesIO(accumulated_csv_content.getvalue().encode("utf-8")) get_s3_client().upload_fileobj(csv_file_like_object, ACK_BUCKET_NAME, temp_ack_file_key) - logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, completed_ack_file_key) + logger.info("Ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key) + + +def update_json_ack_file( + message_id: str, + file_key: str, + created_at_formatted_string: str, + ack_data_rows: list, +) -> None: + """Updates the ack file with the new data row based on the given arguments""" + ack_filename = f"{file_key.replace('.csv', f'_BusAck_{created_at_formatted_string}.json')}" + temp_ack_file_key = f"{TEMP_ACK_DIR}/{ack_filename}" + ack_data_dict = obtain_current_json_ack_content(message_id, temp_ack_file_key) + + for row in ack_data_rows: + json_data_row = {} + json_data_row["rowId"] = row["MESSAGE_HEADER_ID"].split("^")[-1] + json_data_row["responseCode"] = row["RESPONSE_CODE"] + json_data_row["responseDisplay"] = row["RESPONSE_DISPLAY"] + json_data_row["severity"] = row["ISSUE_SEVERITY"] + json_data_row["localId"] = row["LOCAL_ID"] + json_data_row["operationOutcome"] = row["OPERATION_OUTCOME"] + + ack_data_dict["failures"].append(json_data_row) + + # Upload ack_data_dict to S3 + json_bytes = BytesIO(json.dumps(ack_data_dict, indent=2).encode("utf-8")) + get_s3_client().upload_fileobj(json_bytes, ACK_BUCKET_NAME, temp_ack_file_key) + logger.info("JSON ack file updated to %s: %s", ACK_BUCKET_NAME, temp_ack_file_key) diff --git a/lambdas/ack_backend/tests/test_ack_processor.py b/lambdas/ack_backend/tests/test_ack_processor.py index cc3ca7a0d..720c8da74 100644 --- a/lambdas/ack_backend/tests/test_ack_processor.py +++ b/lambdas/ack_backend/tests/test_ack_processor.py @@ -23,7 +23,9 @@ from utils.utils_for_ack_backend_tests import ( add_audit_entry_to_table, generate_sample_existing_ack_content, + generate_sample_existing_json_ack_content, validate_ack_file_content, + validate_json_ack_file_content, ) from utils.values_for_ack_backend_tests import ( EXPECTED_ACK_LAMBDA_RESPONSE_FOR_SUCCESS, @@ -94,15 +96,20 @@ def assert_ack_and_source_file_locations_correct( source_file_key: str, tmp_ack_file_key: str, complete_ack_file_key: str, + tmp_json_ack_file_key: str, + complete_json_ack_file_key: str, is_complete: bool, ) -> None: """Helper function to check the ack and source files have not been moved as the processing is not yet complete""" if is_complete: ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) + json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_json_ack_file_key) else: ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_ack_file_key) + json_ack_file = self.s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=tmp_json_ack_file_key) self.assertIsNotNone(ack_file["Body"].read()) + self.assertIsNotNone(json_ack_file["Body"].read()) full_src_file_key = f"archive/{source_file_key}" if is_complete else f"processing/{source_file_key}" src_file = self.s3_client.get_object(Bucket=BucketNames.SOURCE, Key=full_src_file_key) @@ -131,6 +138,7 @@ def assert_audit_entry_counts_equal(self, message_id: str, expected_counts: dict self.assertDictEqual(actual_counts, expected_counts) + # TODO: modify these for json_ack_file_content def test_lambda_handler_main_multiple_records(self): """Test lambda handler with multiple records.""" # Set up an audit entry which does not yet have record_count recorded @@ -264,6 +272,9 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records # Original source file had 100 records add_audit_entry_to_table(self.dynamodb_client, mock_batch_message_id, record_count=100) + existing_json_file_content = deepcopy(ValidValues.json_ack_initial_content) + existing_json_file_content["messageHeaderId"] = mock_batch_message_id + array_of_failure_messages = [ { **BASE_FAILURE_MESSAGE, @@ -287,10 +298,17 @@ def test_lambda_handler_updates_ack_file_but_does_not_mark_complete_when_records [*array_of_failure_messages], existing_file_content=ValidValues.ack_headers, ) + validate_json_ack_file_content( + self.s3_client, + [*array_of_failure_messages], + existing_file_content=existing_json_file_content, + ) self.assert_ack_and_source_file_locations_correct( MOCK_MESSAGE_DETAILS.file_key, MOCK_MESSAGE_DETAILS.temp_ack_file_key, MOCK_MESSAGE_DETAILS.archive_ack_file_key, + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + MOCK_MESSAGE_DETAILS.archive_json_ack_file_key, is_complete=False, ) self.assert_audit_entry_status_equals(mock_batch_message_id, "Preprocessed") @@ -317,6 +335,12 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro Key=MOCK_MESSAGE_DETAILS.temp_ack_file_key, Body=StringIO(existing_ack_content).getvalue(), ) + existing_json_ack_content = generate_sample_existing_json_ack_content(mock_batch_message_id) + self.s3_client.put_object( + Bucket=BucketNames.DESTINATION, + Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + Body=json.dumps(existing_json_ack_content), + ) array_of_failure_messages = [ { @@ -331,11 +355,16 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro all_messages_plus_eof = deepcopy(array_of_failure_messages) all_messages_plus_eof.append(MOCK_MESSAGE_DETAILS.eof_message) test_event = {"Records": [{"body": json.dumps(all_messages_plus_eof)}]} + expected_entry_counts = { "record_count": "100", "records_succeeded": "49", "records_failed": "51", } + # Include summary counts in expected JSON content + existing_json_ack_content["summary"]["totalRecords"] = int(expected_entry_counts["record_count"]) + existing_json_ack_content["summary"]["success"] = int(expected_entry_counts["records_succeeded"]) + existing_json_ack_content["summary"]["failed"] = int(expected_entry_counts["records_failed"]) response = lambda_handler(event=test_event, context={}) @@ -343,10 +372,18 @@ def test_lambda_handler_updates_ack_file_and_marks_complete_when_all_records_pro validate_ack_file_content( self.s3_client, [*array_of_failure_messages], existing_file_content=existing_ack_content, is_complete=True ) + validate_json_ack_file_content( + self.s3_client, + [*array_of_failure_messages], + existing_file_content=existing_json_ack_content, + is_complete=True, + ) self.assert_ack_and_source_file_locations_correct( MOCK_MESSAGE_DETAILS.file_key, MOCK_MESSAGE_DETAILS.temp_ack_file_key, MOCK_MESSAGE_DETAILS.archive_ack_file_key, + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + MOCK_MESSAGE_DETAILS.archive_json_ack_file_key, is_complete=True, ) self.assert_audit_entry_status_equals(mock_batch_message_id, "Processed") diff --git a/lambdas/ack_backend/tests/test_splunk_logging.py b/lambdas/ack_backend/tests/test_splunk_logging.py index a8e0e69c0..5c0e9fb34 100644 --- a/lambdas/ack_backend/tests/test_splunk_logging.py +++ b/lambdas/ack_backend/tests/test_splunk_logging.py @@ -48,6 +48,12 @@ def setUp(self): self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION) self.ack_bucket_patcher.start() + self.get_ingestion_start_time_by_message_id_patcher = patch( + "update_ack_file.get_ingestion_start_time_by_message_id" + ) + self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start() + self.mock_get_ingestion_start_time_by_message_id.return_value = 3456 + def tearDown(self): GenericTearDown(self.s3_client) @@ -100,9 +106,9 @@ def expected_lambda_handler_logs(self, success: bool, number_of_rows, ingestion_ """Returns the expected logs for the lambda handler function.""" # Mocking of timings is such that the time taken is 2 seconds for each row, # plus 2 seconds for the handler if it succeeds (i.e. it calls update_ack_file) or 1 second if it doesn't; - # plus an extra second if ingestion is complete + # plus an extra 2 seconds if ingestion is complete if success: - time_taken = f"{number_of_rows * 2 + 3}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s" + time_taken = f"{number_of_rows * 2 + 4}.0s" if ingestion_complete else f"{number_of_rows * 2 + 1}.0s" else: time_taken = f"{number_of_rows * 2 + 1}.0s" @@ -429,6 +435,7 @@ def test_splunk_update_ack_file_logged(self): patch("common.log_decorator.logger") as mock_logger, # noqa: E999 patch("update_ack_file.get_record_count_and_failures_by_message_id", return_value=(99, 2)), patch("update_ack_file.update_audit_table_item") as mock_update_audit_table_item, # noqa: E999 + patch("update_ack_file.move_file"), # noqa: E999 patch("ack_processor.increment_records_failed_count"), # noqa: E999 ): # noqa: E999 mock_datetime.now.return_value = ValidValues.fixed_datetime @@ -443,7 +450,7 @@ def test_splunk_update_ack_file_logged(self): expected_secondlast_logger_info_data = { **ValidValues.upload_ack_file_expected_log, "message_id": "test", - "time_taken": "1.0s", + "time_taken": "2.0s", } expected_last_logger_info_data = self.expected_lambda_handler_logs( success=True, number_of_rows=99, ingestion_complete=True diff --git a/lambdas/ack_backend/tests/test_update_ack_file.py b/lambdas/ack_backend/tests/test_update_ack_file.py index 6b9cd163f..9213bb3aa 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file.py +++ b/lambdas/ack_backend/tests/test_update_ack_file.py @@ -1,5 +1,7 @@ """Tests for the functions in the update_ack_file module.""" +import copy +import json import os import unittest from io import StringIO @@ -19,19 +21,24 @@ ) from utils.utils_for_ack_backend_tests import ( MOCK_MESSAGE_DETAILS, - generate_expected_ack_content, generate_expected_ack_file_row, + generate_expected_json_ack_file_element, generate_sample_existing_ack_content, + generate_sample_existing_json_ack_content, obtain_current_ack_file_content, + obtain_current_json_ack_file_content, setup_existing_ack_file, ) from utils.values_for_ack_backend_tests import DefaultValues, ValidValues with patch.dict("os.environ", MOCK_ENVIRONMENT_DICT): from update_ack_file import ( + complete_batch_file_process, create_ack_data, obtain_current_ack_content, + obtain_current_json_ack_content, update_ack_file, + update_json_ack_file, ) firehose_client = boto3_client("firehose", region_name=REGION_NAME) @@ -44,7 +51,7 @@ class TestUpdateAckFile(unittest.TestCase): def setUp(self) -> None: self.s3_client = boto3_client("s3", region_name=REGION_NAME) - GenericSetUp(self.s3_client) + GenericSetUp(s3_client=self.s3_client) # MOCK SOURCE FILE WITH 100 ROWS TO SIMULATE THE SCENARIO WHERE THE ACK FILE IS NOT FULL. # TODO: Test all other scenarios. @@ -59,22 +66,30 @@ def setUp(self) -> None: self.ack_bucket_patcher = patch("update_ack_file.ACK_BUCKET_NAME", BucketNames.DESTINATION) self.ack_bucket_patcher.start() + self.source_bucket_patcher = patch("update_ack_file.SOURCE_BUCKET_NAME", BucketNames.SOURCE) + self.source_bucket_patcher.start() + + self.get_ingestion_start_time_by_message_id_patcher = patch( + "update_ack_file.get_ingestion_start_time_by_message_id" + ) + self.mock_get_ingestion_start_time_by_message_id = self.get_ingestion_start_time_by_message_id_patcher.start() + self.mock_get_ingestion_start_time_by_message_id.return_value = 3456 + + self.get_record_and_failure_count_patcher = patch("update_ack_file.get_record_count_and_failures_by_message_id") + self.mock_get_record_and_failure_count = self.get_record_and_failure_count_patcher.start() + + self.update_audit_table_item_patcher = patch("update_ack_file.update_audit_table_item") + self.mock_update_audit_table_item = self.update_audit_table_item_patcher.start() + + self.datetime_patcher = patch("update_ack_file.time") + self.mock_datetime = self.datetime_patcher.start() + self.mock_datetime.strftime.return_value = "7890" + + self.generate_send_patcher = patch("update_ack_file.generate_and_send_logs") + self.mock_generate_send = self.generate_send_patcher.start() def tearDown(self) -> None: - GenericTearDown(self.s3_client) - - def validate_ack_file_content( - self, - incoming_messages: list[dict], - existing_file_content: str = ValidValues.ack_headers, - ) -> None: - """ - Obtains the ack file content and ensures that it matches the expected content (expected content is based - on the incoming messages). - """ - actual_ack_file_content = obtain_current_ack_file_content(self.s3_client) - expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) - self.assertEqual(expected_ack_file_content, actual_ack_file_content) + GenericTearDown(s3_client=self.s3_client) def test_update_ack_file(self): """Test that update_ack_file correctly creates the ack file when there was no existing ack file""" @@ -245,6 +260,178 @@ def test_obtain_current_ack_content_file_exists(self): result = obtain_current_ack_content(MOCK_MESSAGE_DETAILS.temp_ack_file_key) self.assertEqual(result.getvalue(), existing_content) + def test_update_json_ack_file(self): + """Test that update_json_ack_file correctly creates the ack file when there was no existing ack file""" + + test_cases = [ + { + "description": "Single failure row", + "input_rows": [ValidValues.ack_data_failure_dict], + "expected_elements": [ + generate_expected_json_ack_file_element( + success=False, imms_id=DefaultValues.imms_id, diagnostics="DIAGNOSTICS" + ) + ], + }, + { + "description": "With multiple rows", + "input_rows": [ + {**ValidValues.ack_data_failure_dict, "IMMS_ID": "TEST_IMMS_ID_1"}, + ValidValues.ack_data_failure_dict, + ValidValues.ack_data_failure_dict, + ], + "expected_elements": [ + generate_expected_json_ack_file_element( + success=False, + imms_id="TEST_IMMS_ID_1", + diagnostics="DIAGNOSTICS", + ), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + ], + }, + { + "description": "Multiple rows With different diagnostics", + "input_rows": [ + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 1", + }, + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 2", + }, + { + **ValidValues.ack_data_failure_dict, + "OPERATION_OUTCOME": "Error 3", + }, + ], + "expected_elements": [ + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 1"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 2"), + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="Error 3"), + ], + }, + ] + + for test_case in test_cases: + with self.subTest(test_case["description"]): + update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=test_case["input_rows"], + ) + + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + expected_ack_file_content = copy.deepcopy(ValidValues.json_ack_initial_content) + for element in test_case["expected_elements"]: + expected_ack_file_content["failures"].append(element) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + self.s3_client.delete_object( + Bucket=BucketNames.DESTINATION, + Key=MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, + ) + + def test_update_json_ack_file_existing(self): + """Test that update_json_ack_file correctly updates the ack file when there was an existing ack file""" + # Mock existing content in the ack file + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + + ack_data_rows = [ + ValidValues.ack_data_failure_dict, + ] + update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=ack_data_rows, + ) + + actual_ack_file_content = obtain_current_json_ack_file_content( + self.s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + + expected_rows = [ + generate_expected_json_ack_file_element(success=False, imms_id="", diagnostics="DIAGNOSTICS"), + ] + expected_ack_file_content = existing_content + expected_ack_file_content["failures"].append(expected_rows[0]) + self.assertEqual(expected_ack_file_content, actual_ack_file_content) + + def test_obtain_current_json_ack_content_file_no_existing(self): + """Test that when the json ack file does not yet exist, obtain_current_json_ack_content returns the ack headers only.""" + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + self.assertEqual(result, ValidValues.json_ack_initial_content) + + def test_obtain_current_json_ack_content_file_exists(self): + """Test that the existing json ack file content is retrieved and new elements are added.""" + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key + ) + self.assertEqual(result, existing_content) + + def test_complete_batch_file_process_json_ack_file(self): + """Test that complete_batch_file_process completes and moves the JSON ack file.""" + generate_sample_existing_json_ack_content() + self.s3_client.put_object( + Bucket=BucketNames.SOURCE, + Key=f"processing/{MOCK_MESSAGE_DETAILS.file_key}", + Body="dummy content", + ) + update_ack_file( + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[ValidValues.ack_data_failure_dict], + ) + update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[ValidValues.ack_data_failure_dict], + ) + + self.mock_get_record_and_failure_count.return_value = 10, 1 + + complete_batch_file_process( + message_id=MOCK_MESSAGE_DETAILS.message_id, + supplier=MOCK_MESSAGE_DETAILS.supplier, + vaccine_type=MOCK_MESSAGE_DETAILS.vaccine_type, + created_at_formatted_string="20211120T12000000", + file_key=MOCK_MESSAGE_DETAILS.file_key, + ) + result = obtain_current_json_ack_content( + MOCK_MESSAGE_DETAILS.message_id, MOCK_MESSAGE_DETAILS.archive_json_ack_file_key + ) + self.assertEqual(result, ValidValues.json_ack_complete_content) + + def test_update_json_ack_file_with_empty_ack_data_rows(self): + """Test that update_json_ack_file correctly updates the ack file when given an empty list""" + # Mock existing content in the ack file + existing_content = generate_sample_existing_json_ack_content() + setup_existing_ack_file( + MOCK_MESSAGE_DETAILS.temp_json_ack_file_key, json.dumps(existing_content), self.s3_client + ) + + # Should not raise an exception + update_json_ack_file( + message_id=MOCK_MESSAGE_DETAILS.message_id, + file_key=MOCK_MESSAGE_DETAILS.file_key, + created_at_formatted_string=MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ack_data_rows=[], + ) + if __name__ == "__main__": unittest.main() diff --git a/lambdas/ack_backend/tests/test_update_ack_file_flow.py b/lambdas/ack_backend/tests/test_update_ack_file_flow.py index a286cb950..f1f0fa9be 100644 --- a/lambdas/ack_backend/tests/test_update_ack_file_flow.py +++ b/lambdas/ack_backend/tests/test_update_ack_file_flow.py @@ -33,15 +33,22 @@ def setUp(self): self.logger_patcher = patch("update_ack_file.logger") self.mock_logger = self.logger_patcher.start() + self.firehose_patcher = patch("common.clients.global_firehose_client") + self.mock_firehose = self.firehose_patcher.start() + self.update_audit_table_item_patcher = patch("update_ack_file.update_audit_table_item") self.mock_update_audit_table_item = self.update_audit_table_item_patcher.start() self.get_record_and_failure_count_patcher = patch("update_ack_file.get_record_count_and_failures_by_message_id") self.mock_get_record_and_failure_count = self.get_record_and_failure_count_patcher.start() + self.get_ingestion_start_time_patcher = patch("update_ack_file.get_ingestion_start_time_by_message_id") + self.mock_get_ingestion_start_time = self.get_ingestion_start_time_patcher.start() def tearDown(self): self.logger_patcher.stop() + self.firehose_patcher.stop() self.update_audit_table_item_patcher.stop() self.get_record_and_failure_count_patcher.stop() + self.get_ingestion_start_time_patcher.stop() def test_audit_table_updated_correctly_when_ack_process_complete(self): """VED-167 - Test that the audit table has been updated correctly""" @@ -58,6 +65,7 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): Bucket=self.ack_bucket_name, Key=f"TempAck/audit_table_test_BusAck_{mock_created_at_string}.csv" ) self.mock_get_record_and_failure_count.return_value = 10, 2 + self.mock_get_ingestion_start_time.return_value = 1769781283 # Act update_ack_file.complete_batch_file_process( @@ -71,3 +79,43 @@ def test_audit_table_updated_correctly_when_ack_process_complete(self): # Assert: Only check audit table interactions self.mock_get_record_and_failure_count.assert_called_once_with(message_id) self.assertEqual(self.mock_update_audit_table_item.call_count, 2) + + def test_source_file_moved_when_ack_process_complete(self): + """VED-167 - Test that the source file has been moved correctly""" + # Setup + message_id = "msg-audit-table" + mock_created_at_string = "created_at_formatted_string" + file_key = "audit_table_test.csv" + self.s3_client.put_object( + Bucket=self.source_bucket_name, + Key=f"processing/{file_key}", + Body="dummy content", + ) + self.s3_client.put_object( + Bucket=self.ack_bucket_name, Key=f"TempAck/audit_table_test_BusAck_{mock_created_at_string}.csv" + ) + self.mock_get_record_and_failure_count.return_value = 10, 2 + self.mock_get_ingestion_start_time.return_value = 1769781283 + + # Assert that the source file is not yet in the archive folder + with self.assertRaises(self.s3_client.exceptions.NoSuchKey): + archived_obj = self.s3_client.get_object( + Bucket=self.source_bucket_name, + Key=f"archive/{file_key}", + ) + + # Act + update_ack_file.complete_batch_file_process( + message_id=message_id, + supplier="queue-audit-table-supplier", + vaccine_type="vaccine-type", + created_at_formatted_string=mock_created_at_string, + file_key=file_key, + ) + + # Assert that the source file has been moved into the archive folder + archived_obj = self.s3_client.get_object( + Bucket=self.source_bucket_name, + Key=f"archive/{file_key}", + ) + self.assertIsNotNone(archived_obj) diff --git a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py index 9781b051a..7f5ae6451 100644 --- a/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/utils_for_ack_backend_tests.py @@ -1,6 +1,7 @@ """Utils functions for the ack backend tests""" import json +from copy import deepcopy from typing import Optional from boto3 import client as boto3_client @@ -130,3 +131,110 @@ def validate_ack_file_content( ) expected_ack_file_content = generate_expected_ack_content(incoming_messages, existing_file_content) assert expected_ack_file_content == actual_ack_file_content + + +def obtain_current_json_ack_file_content( + s3_client, temp_ack_file_key: str = MOCK_MESSAGE_DETAILS.temp_ack_file_key +) -> dict: + """Obtains the ack file content from the destination bucket.""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=temp_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + +def obtain_completed_json_ack_file_content( + s3_client, complete_ack_file_key: str = MOCK_MESSAGE_DETAILS.archive_ack_file_key +) -> dict: + """Obtains the ack file content from the forwardedFile directory""" + retrieved_object = s3_client.get_object(Bucket=BucketNames.DESTINATION, Key=complete_ack_file_key) + return json.loads(retrieved_object["Body"].read().decode("utf-8")) + + +def generate_expected_json_ack_file_element( + success: bool, + imms_id: str = MOCK_MESSAGE_DETAILS.imms_id, + diagnostics: str = None, + row_id: str = MOCK_MESSAGE_DETAILS.row_id, + local_id: str = MOCK_MESSAGE_DETAILS.local_id, + created_at_formatted_string: str = MOCK_MESSAGE_DETAILS.created_at_formatted_string, +) -> dict: + """Create an ack element, containing the given message details.""" + if success: + return None # we no longer process success elements + else: + return { + "rowId": row_id.split("^")[-1], + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": local_id, + "operationOutcome": "" if not diagnostics else diagnostics, + } + + +def generate_sample_existing_json_ack_content(message_id: str = "test_file_id") -> dict: + """Returns sample ack file content with a single failure row.""" + sample_content = deepcopy(ValidValues.json_ack_initial_content) + sample_content["messageHeaderId"] = message_id + sample_content["failures"].append(generate_expected_json_ack_file_element(success=False)) + return sample_content + + +# TODO: take supplier and summary counts as arguments +def generate_expected_json_ack_content( + incoming_messages: list[dict], existing_content: str = ValidValues.json_ack_initial_content +) -> dict: + """Returns the expected_json_ack_file_content based on the incoming messages""" + for message in incoming_messages: + # Determine diagnostics based on the diagnostics value in the incoming message + diagnostics_dictionary = message.get("diagnostics", {}) + diagnostics = ( + diagnostics_dictionary.get("error_message", "") + if isinstance(diagnostics_dictionary, dict) + else "Unable to determine diagnostics issue" + ) + + # Create the ack row based on the incoming message details + ack_element = generate_expected_json_ack_file_element( + success=diagnostics == "", + row_id=message.get("row_id", MOCK_MESSAGE_DETAILS.row_id), + created_at_formatted_string=message.get( + "created_at_formatted_string", + MOCK_MESSAGE_DETAILS.created_at_formatted_string, + ), + local_id=message.get("local_id", MOCK_MESSAGE_DETAILS.local_id), + imms_id=("" if diagnostics else message.get("imms_id", MOCK_MESSAGE_DETAILS.imms_id)), + diagnostics=diagnostics, + ) + + existing_content["failures"].append(ack_element) + + return existing_content + + +def validate_json_ack_file_content( + s3_client, + incoming_messages: list[dict], + existing_file_content: str = ValidValues.json_ack_initial_content, + is_complete: bool = False, +) -> None: + """ + Obtains the ack file content and ensures that it matches the expected content (expected content is based + on the incoming messages). + """ + actual_ack_file_content = ( + obtain_current_json_ack_file_content(s3_client, MOCK_MESSAGE_DETAILS.temp_json_ack_file_key) + if not is_complete + else obtain_completed_json_ack_file_content(s3_client, MOCK_MESSAGE_DETAILS.archive_json_ack_file_key) + ) + expected_ack_file_content = generate_expected_json_ack_content(incoming_messages, existing_file_content) + + # in order for this to work, we need to disregard generated_date and ingestion time. + # TODO: we should fill 'provider' in generate_expected_json_ack_content() if we're complete. + actual_ack_file_content["generatedDate"] = expected_ack_file_content["generatedDate"] + actual_ack_file_content["summary"]["ingestionTime"] = expected_ack_file_content["summary"]["ingestionTime"] + actual_ack_file_content["provider"] = expected_ack_file_content["provider"] + + # print(expected_ack_file_content) + # print(actual_ack_file_content) + + assert expected_ack_file_content == actual_ack_file_content diff --git a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py index 706debe26..4b99d3196 100644 --- a/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py +++ b/lambdas/ack_backend/tests/utils/values_for_ack_backend_tests.py @@ -106,6 +106,12 @@ def __init__( self.archive_ack_file_key = ( f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.csv" ) + self.temp_json_ack_file_key = ( + f"TempAck/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" + ) + self.archive_json_ack_file_key = ( + f"forwardedFile/{vaccine_type}_Vaccinations_v5_{ods_code}_20210730T12000000_BusAck_20211120T12000000.json" + ) self.vaccine_type = vaccine_type self.ods_code = ods_code self.supplier = supplier @@ -259,6 +265,48 @@ class ValidValues: "message": "Record processing complete", } + json_ack_initial_content = { + "system": "Immunisation FHIR API Batch Report", + "version": 1, + "generatedDate": "", + "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", + "provider": "", + "messageHeaderId": "test_file_id", + "summary": {"ingestionTime": {"start": 3456}}, + "failures": [], + } + + json_ack_complete_content = { + "system": "Immunisation FHIR API Batch Report", + "version": 1, + "generatedDate": "7890", + "filename": "TempAck/RSV_Vaccinations_v5_X26_20210730T12000000_BusAck_20211120T12000000", + "provider": "RAVS", + "messageHeaderId": "test_file_id", + "summary": {"totalRecords": 10, "success": 9, "failed": 1, "ingestionTime": {"start": 3456, "end": 7890}}, + "failures": [ + { + "rowId": "1", + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": "test_system_uri^testabc", + "operationOutcome": "DIAGNOSTICS", + } + ], + } + + json_ack_data_failure_dict = ( + { + "rowId": DefaultValues.row_id, + "responseCode": "30002", + "responseDisplay": "Business Level Response Value - Processing Error", + "severity": "Fatal", + "localId": DefaultValues.local_id, + "operationOutcome": "DIAGNOSTICS", + }, + ) + class InvalidValues: """Invalid values for use in tests""" diff --git a/lambdas/shared/src/common/batch/audit_table.py b/lambdas/shared/src/common/batch/audit_table.py index 58598757c..8e003cc10 100644 --- a/lambdas/shared/src/common/batch/audit_table.py +++ b/lambdas/shared/src/common/batch/audit_table.py @@ -111,6 +111,17 @@ def _build_audit_table_update_log_message(file_key: str, message_id: str, attrs_ ) +def get_ingestion_start_time_by_message_id(event_message_id: str) -> int: + """Retrieves ingestion start time by unique event message ID""" + # Required by JSON ack file + audit_record = dynamodb_client.get_item( + TableName=AUDIT_TABLE_NAME, Key={AuditTableKeys.MESSAGE_ID: {"S": event_message_id}} + ) + + ingestion_start_time = audit_record.get("Item", {}).get(AuditTableKeys.INGESTION_START_TIME, {}).get("S") + return int(ingestion_start_time) if ingestion_start_time else 0 + + def get_record_count_and_failures_by_message_id(event_message_id: str) -> tuple[int, int]: """Retrieves total record count and total failures by unique event message ID""" audit_record = dynamodb_client.get_item( diff --git a/lambdas/shared/tests/test_common/batch/test_audit_table.py b/lambdas/shared/tests/test_common/batch/test_audit_table.py index 75b9763ca..6c4fc9e2d 100644 --- a/lambdas/shared/tests/test_common/batch/test_audit_table.py +++ b/lambdas/shared/tests/test_common/batch/test_audit_table.py @@ -27,6 +27,7 @@ from common.batch.audit_table import ( NOTHING_TO_UPDATE_ERROR_MESSAGE, create_audit_table_item, + get_ingestion_start_time_by_message_id, get_record_count_and_failures_by_message_id, increment_records_failed_count, update_audit_table_item, @@ -301,6 +302,35 @@ def test_get_record_count_and_failures_by_message_id_returns_zero_if_values_not_ self.assertEqual(record_count, 0) self.assertEqual(failed_count, 0) + def test_get_ingestion_start_time_by_message_id_returns_the_ingestion_start_time(self): + """Test that get_ingestion_start_time_by_message_id retrieves the integer value of the ingestion start time""" + ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26") + expected_table_entry = { + **MockFileDetails.rsv_ravs.audit_table_entry, + "status": {"S": FileStatus.PREPROCESSED}, + "ingestion_start_time": {"S": "1769789375"}, + } + + dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=expected_table_entry) + + ingestion_start_time = get_ingestion_start_time_by_message_id(ravs_rsv_test_file.message_id) + + self.assertEqual(ingestion_start_time, 1769789375) + + def test_get_ingestion_start_time_by_message_id_returns_zero_if_values_not_set(self): + """Test that if the ingestion start time has not yet been set on the audit item then zero is returned""" + ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26") + expected_table_entry = { + **MockFileDetails.rsv_ravs.audit_table_entry, + "status": {"S": FileStatus.PREPROCESSED}, + } + + dynamodb_client.put_item(TableName=AUDIT_TABLE_NAME, Item=expected_table_entry) + + ingestion_start_time = get_ingestion_start_time_by_message_id(ravs_rsv_test_file.message_id) + + self.assertEqual(ingestion_start_time, 0) + def test_increment_records_failed_count(self): """Checks audit table correctly increments the records_failed count""" ravs_rsv_test_file = FileDetails("RSV", "RAVS", "X26")