From d0432c21048f7abd7ff605b8b155ee94288e17ec Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 11:40:01 +0100 Subject: [PATCH 1/7] Add sns as AWS resource --- aws/src/authentication/generate_aws_resource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/src/authentication/generate_aws_resource.py b/aws/src/authentication/generate_aws_resource.py index 94778ea..efb17ae 100755 --- a/aws/src/authentication/generate_aws_resource.py +++ b/aws/src/authentication/generate_aws_resource.py @@ -65,7 +65,7 @@ def generate_aws_service(service_name: str, stage: Stage, service_type="resource service: ServiceResource = session.resource(service_name) case "client": # Add to these as needed - valid_services = ["ssm", "rds", "es", "ec2"] + valid_services = ["ssm", "rds", "es", "ec2", "sns"] if service_name not in valid_services: raise ValueError(f"Service name must be one of {valid_services}") # noinspection PyTypeChecker From dc8bfd0464f407092dc3002da868498e49fd8181 Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 11:40:23 +0100 Subject: [PATCH 2/7] Add update_properties method --- .../housing_search/update_properties.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 aws/src/elastic_search/housing_search/update_properties.py diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py new file mode 100644 index 0000000..0f8c12f --- /dev/null +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -0,0 +1,95 @@ +from dataclasses import dataclass + +from aws.src.utils.logger import Logger +from aws.src.utils.progress_bar import ProgressBar +from aws.src.authentication.generate_aws_resource import generate_aws_service +from enums.enums import Stage +from aws.src.utils.csv_to_dict_list import csv_to_dict_list +from aws.src.database.dynamodb.utils.get_by_secondary_index import get_by_secondary_index +from aws.src.database.dynamodb.utils.get_dynamodb_table import get_dynamodb_table +from mypy_boto3_dynamodb.service_resource import Table + +import json +import datetime +import uuid + +@dataclass +class Config: + LOGGER = Logger() + STAGE = Stage.HOUSING_DEVELOPMENT + TABLE_NAME = "Assets" + FILE_PATH = "aws\src\elastic_search\data\input\property_data.csv" + SNS_TOPIC_ARN = input("Enter SNS topic ARN: ") + +@dataclass +class User: + NAME = "Callum Macpherson" + EMAIL = "callum.macpherson@hackney.gov.uk" + +def setupClient(): + return generate_aws_service("sns", Config.STAGE, "client") + +def generateMessage(assetId): + snsMessage = { + "id": str(uuid.uuid4()), + "eventType": "AssetUpdatedEvent", + "sourceDomain": "Asset", + "sourceSystem": "AssetAPI", + "version": "v1", + "correlationId": str(uuid.uuid4()), + "dateTime": str(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")), + "user": { + "name": User.NAME, + "email": User.EMAIL, + }, + "entityId": assetId, + "eventData": {}, + } + + return json.dumps(snsMessage) + +def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> int: + logger = Config.LOGGER + + update_count = 0 + progress_bar = ProgressBar(len(properties_from_csv), bar_length=len(properties_from_csv) // 10) + + snsClient = setupClient() + + for i, csv_property_item in enumerate(properties_from_csv): + if i % 100 == 0: + progress_bar.display(i) + + prop_ref = str(csv_property_item["property_number"]) + + # 1. Get asset from dynamoDb + asset = get_by_secondary_index(asset_table, "AssetId", "assetId", prop_ref)[0] + + # 2. Generate Message + snsMessage = generateMessage(asset['id']) + + # 3. Publish SNS message + response = snsClient.publish( + TopicArn=Config.SNS_TOPIC_ARN, + Message=snsMessage, + MessageGroupId="fake", + ) + + # Log ids for failed requests + if response["ResponseMetadata"]["HTTPStatusCode"] != 200: + logger.log(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") + + update_count += 1 + + return update_count + + +def main(): + table = get_dynamodb_table(Config.TABLE_NAME, Config.STAGE) + property_csv_data = csv_to_dict_list(Config.FILE_PATH) + + update_count = update_elasticsearch(table, property_csv_data) + + logger = Config.LOGGER + + logger.log(f"Updated {update_count} records") From 5e7e0f8ac80972c719531f192f94c54c90502791 Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 11:40:38 +0100 Subject: [PATCH 3/7] Add to usecases --- use_cases.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/use_cases.py b/use_cases.py index edb6a5d..31bedcf 100755 --- a/use_cases.py +++ b/use_cases.py @@ -8,13 +8,13 @@ # from aws.src.database.multi_table.amend_tenure import main as amend_tenure_main # from aws.src.database.multi_table.find_persons_for_properties import main as find_persons_for_properties_main -from aws.src.database.dynamodb.scripts.asset_table.patch_asset_with_additional_boiler_house_id import main as update_boilerhouse +from aws.src.elastic_search.housing_search.update_properties import main as update_properties if __name__ == "__main__": # 1) IMPORTANT: Uncomment and click into functions in the imports to view definitions and edit config # - e.g. set STAGE to "development" or "production" # 2) Call functions here - update_boilerhouse() + update_properties() pass From c410c05ddac33e1e563a7a250b0274d960ac565d Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 13:15:46 +0100 Subject: [PATCH 4/7] update all to snake_case --- .../housing_search/update_properties.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py index 0f8c12f..d5bc7bf 100644 --- a/aws/src/elastic_search/housing_search/update_properties.py +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -26,11 +26,11 @@ class User: NAME = "Callum Macpherson" EMAIL = "callum.macpherson@hackney.gov.uk" -def setupClient(): - return generate_aws_service("sns", Config.STAGE, "client") +def setup_client(): + return -def generateMessage(assetId): - snsMessage = { +def generate_message(assetId): + sns_message = { "id": str(uuid.uuid4()), "eventType": "AssetUpdatedEvent", "sourceDomain": "Asset", @@ -46,7 +46,7 @@ def generateMessage(assetId): "eventData": {}, } - return json.dumps(snsMessage) + return json.dumps(sns_message) def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> int: logger = Config.LOGGER @@ -54,7 +54,7 @@ def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> update_count = 0 progress_bar = ProgressBar(len(properties_from_csv), bar_length=len(properties_from_csv) // 10) - snsClient = setupClient() + sns_client = setup_client() for i, csv_property_item in enumerate(properties_from_csv): if i % 100 == 0: @@ -66,12 +66,12 @@ def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> asset = get_by_secondary_index(asset_table, "AssetId", "assetId", prop_ref)[0] # 2. Generate Message - snsMessage = generateMessage(asset['id']) + sns_message = generate_message(asset['id']) # 3. Publish SNS message - response = snsClient.publish( + response = sns_client.publish( TopicArn=Config.SNS_TOPIC_ARN, - Message=snsMessage, + Message=sns_message, MessageGroupId="fake", ) From c55183196720757b84475ff440ae6277e8003759 Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 13:17:36 +0100 Subject: [PATCH 5/7] Add return type --- aws/src/elastic_search/housing_search/update_properties.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py index d5bc7bf..8ac53fd 100644 --- a/aws/src/elastic_search/housing_search/update_properties.py +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -9,6 +9,7 @@ from aws.src.database.dynamodb.utils.get_dynamodb_table import get_dynamodb_table from mypy_boto3_dynamodb.service_resource import Table +import boto3 import json import datetime import uuid @@ -26,8 +27,8 @@ class User: NAME = "Callum Macpherson" EMAIL = "callum.macpherson@hackney.gov.uk" -def setup_client(): - return +def setup_client() -> boto3.client: + return generate_aws_service("sns", Config.STAGE, "client") def generate_message(assetId): sns_message = { From 3ef26f9dddc95ccaf8c79d590072bb1e8c37fc46 Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 15:23:58 +0100 Subject: [PATCH 6/7] Add error handling --- .../housing_search/update_properties.py | 57 ++++++++++++------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py index 8ac53fd..ddf6ba9 100644 --- a/aws/src/elastic_search/housing_search/update_properties.py +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -49,7 +49,7 @@ def generate_message(assetId): return json.dumps(sns_message) -def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> int: +def update_assets(asset_table: Table, properties_from_csv: list[dict]) -> int: logger = Config.LOGGER update_count = 0 @@ -63,33 +63,50 @@ def update_elasticsearch(asset_table: Table, properties_from_csv: list[dict]) -> prop_ref = str(csv_property_item["property_number"]) - # 1. Get asset from dynamoDb - asset = get_by_secondary_index(asset_table, "AssetId", "assetId", prop_ref)[0] - - # 2. Generate Message - sns_message = generate_message(asset['id']) - - # 3. Publish SNS message - response = sns_client.publish( - TopicArn=Config.SNS_TOPIC_ARN, - Message=sns_message, - MessageGroupId="fake", - ) - - # Log ids for failed requests - if response["ResponseMetadata"]["HTTPStatusCode"] != 200: - logger.log(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") + try: + update_asset(prop_ref, asset_table, sns_client) + + except Exception as e: + logger.error(f"Failed to update asset with propRef {prop_ref} with exception {str(e)}") + + else: + # Success + update_count += 1 + + return update_count - update_count += 1 +def update_asset(prop_ref, asset_table, sns_client): + logger = Config.LOGGER + + # 1. Get asset from dynamoDb + assets = get_by_secondary_index(asset_table, "AssetId", "assetId", prop_ref) + + # no assets found with matching property reference + if not assets: + logger.error(f"Asset with propRef {prop_ref} not found in asset table") + return + + asset = assets[0] - return update_count + # 2. Generate Message + sns_message = generate_message(asset['id']) + # 3. Publish SNS message + response = sns_client.publish( + TopicArn=Config.SNS_TOPIC_ARN, + Message=sns_message, + MessageGroupId="fake", + ) + + # Log ids for failed requests + if response["ResponseMetadata"]["HTTPStatusCode"] != 200: + logger.error(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") def main(): table = get_dynamodb_table(Config.TABLE_NAME, Config.STAGE) property_csv_data = csv_to_dict_list(Config.FILE_PATH) - update_count = update_elasticsearch(table, property_csv_data) + update_count = update_assets(table, property_csv_data) logger = Config.LOGGER From 85fff10193476fd2ebedd2acd8e181712a1f01be Mon Sep 17 00:00:00 2001 From: Callum Macpherson Date: Wed, 2 Aug 2023 16:00:06 +0100 Subject: [PATCH 7/7] apparently logger.error dont work --- aws/src/elastic_search/housing_search/update_properties.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py index ddf6ba9..eab28bd 100644 --- a/aws/src/elastic_search/housing_search/update_properties.py +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -67,7 +67,7 @@ def update_assets(asset_table: Table, properties_from_csv: list[dict]) -> int: update_asset(prop_ref, asset_table, sns_client) except Exception as e: - logger.error(f"Failed to update asset with propRef {prop_ref} with exception {str(e)}") + logger.log(f"Failed to update asset with propRef {prop_ref} with exception {str(e)}") else: # Success @@ -83,7 +83,7 @@ def update_asset(prop_ref, asset_table, sns_client): # no assets found with matching property reference if not assets: - logger.error(f"Asset with propRef {prop_ref} not found in asset table") + logger.log(f"Asset with propRef {prop_ref} not found in asset table") return asset = assets[0] @@ -100,7 +100,7 @@ def update_asset(prop_ref, asset_table, sns_client): # Log ids for failed requests if response["ResponseMetadata"]["HTTPStatusCode"] != 200: - logger.error(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") + logger.log(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") def main(): table = get_dynamodb_table(Config.TABLE_NAME, Config.STAGE)