diff --git a/aws/src/authentication/generate_aws_resource.py b/aws/src/authentication/generate_aws_resource.py index 94778ea..efb17ae 100755 --- a/aws/src/authentication/generate_aws_resource.py +++ b/aws/src/authentication/generate_aws_resource.py @@ -65,7 +65,7 @@ def generate_aws_service(service_name: str, stage: Stage, service_type="resource service: ServiceResource = session.resource(service_name) case "client": # Add to these as needed - valid_services = ["ssm", "rds", "es", "ec2"] + valid_services = ["ssm", "rds", "es", "ec2", "sns"] if service_name not in valid_services: raise ValueError(f"Service name must be one of {valid_services}") # noinspection PyTypeChecker diff --git a/aws/src/elastic_search/housing_search/update_properties.py b/aws/src/elastic_search/housing_search/update_properties.py new file mode 100644 index 0000000..eab28bd --- /dev/null +++ b/aws/src/elastic_search/housing_search/update_properties.py @@ -0,0 +1,113 @@ +from dataclasses import dataclass + +from aws.src.utils.logger import Logger +from aws.src.utils.progress_bar import ProgressBar +from aws.src.authentication.generate_aws_resource import generate_aws_service +from enums.enums import Stage +from aws.src.utils.csv_to_dict_list import csv_to_dict_list +from aws.src.database.dynamodb.utils.get_by_secondary_index import get_by_secondary_index +from aws.src.database.dynamodb.utils.get_dynamodb_table import get_dynamodb_table +from mypy_boto3_dynamodb.service_resource import Table + +import boto3 +import json +import datetime +import uuid + +@dataclass +class Config: + LOGGER = Logger() + STAGE = Stage.HOUSING_DEVELOPMENT + TABLE_NAME = "Assets" + FILE_PATH = "aws\src\elastic_search\data\input\property_data.csv" + SNS_TOPIC_ARN = input("Enter SNS topic ARN: ") + +@dataclass +class User: + NAME = "Callum Macpherson" + EMAIL = "callum.macpherson@hackney.gov.uk" + +def setup_client() -> boto3.client: + return generate_aws_service("sns", Config.STAGE, "client") + +def generate_message(assetId): + sns_message = { + "id": str(uuid.uuid4()), + "eventType": "AssetUpdatedEvent", + "sourceDomain": "Asset", + "sourceSystem": "AssetAPI", + "version": "v1", + "correlationId": str(uuid.uuid4()), + "dateTime": str(datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")), + "user": { + "name": User.NAME, + "email": User.EMAIL, + }, + "entityId": assetId, + "eventData": {}, + } + + return json.dumps(sns_message) + +def update_assets(asset_table: Table, properties_from_csv: list[dict]) -> int: + logger = Config.LOGGER + + update_count = 0 + progress_bar = ProgressBar(len(properties_from_csv), bar_length=len(properties_from_csv) // 10) + + sns_client = setup_client() + + for i, csv_property_item in enumerate(properties_from_csv): + if i % 100 == 0: + progress_bar.display(i) + + prop_ref = str(csv_property_item["property_number"]) + + try: + update_asset(prop_ref, asset_table, sns_client) + + except Exception as e: + logger.log(f"Failed to update asset with propRef {prop_ref} with exception {str(e)}") + + else: + # Success + update_count += 1 + + return update_count + +def update_asset(prop_ref, asset_table, sns_client): + logger = Config.LOGGER + + # 1. Get asset from dynamoDb + assets = get_by_secondary_index(asset_table, "AssetId", "assetId", prop_ref) + + # no assets found with matching property reference + if not assets: + logger.log(f"Asset with propRef {prop_ref} not found in asset table") + return + + asset = assets[0] + + # 2. Generate Message + sns_message = generate_message(asset['id']) + + # 3. Publish SNS message + response = sns_client.publish( + TopicArn=Config.SNS_TOPIC_ARN, + Message=sns_message, + MessageGroupId="fake", + ) + + # Log ids for failed requests + if response["ResponseMetadata"]["HTTPStatusCode"] != 200: + logger.log(f"Request failed for asset {asset.id} with response {response['ResponseMetadata']['HTTPStatusCode']}") + +def main(): + table = get_dynamodb_table(Config.TABLE_NAME, Config.STAGE) + property_csv_data = csv_to_dict_list(Config.FILE_PATH) + + update_count = update_assets(table, property_csv_data) + + logger = Config.LOGGER + + logger.log(f"Updated {update_count} records") diff --git a/use_cases.py b/use_cases.py index edb6a5d..31bedcf 100755 --- a/use_cases.py +++ b/use_cases.py @@ -8,13 +8,13 @@ # from aws.src.database.multi_table.amend_tenure import main as amend_tenure_main # from aws.src.database.multi_table.find_persons_for_properties import main as find_persons_for_properties_main -from aws.src.database.dynamodb.scripts.asset_table.patch_asset_with_additional_boiler_house_id import main as update_boilerhouse +from aws.src.elastic_search.housing_search.update_properties import main as update_properties if __name__ == "__main__": # 1) IMPORTANT: Uncomment and click into functions in the imports to view definitions and edit config # - e.g. set STAGE to "development" or "production" # 2) Call functions here - update_boilerhouse() + update_properties() pass