-
Notifications
You must be signed in to change notification settings - Fork 0
Add overnight HFS process script with Lambda invocation and log streaming #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| """ | ||
| Trigger the overnight HFS process Lambda functions in series with real-time log streaming. | ||
| """ | ||
|
|
||
| import time | ||
| import logging | ||
| from aws.authentication.generate_aws_resource import generate_aws_service | ||
| from enums.enums import Stage | ||
|
|
||
| # Configure logging to write to file and flush immediately | ||
| logging.basicConfig( | ||
| filename="overnight_hfs_processes.txt", | ||
| level=logging.INFO, | ||
| filemode="w", | ||
| format="%(asctime)s - %(message)s", | ||
| ) | ||
|
|
||
| stage = Stage.DISASTER_RECOVERY | ||
| lambda_client = generate_aws_service("lambda", stage) | ||
| logs_client = generate_aws_service("logs", stage) | ||
|
|
||
| BASE_NAME = f"housing-finance-interim-api-{stage.to_env_name()}" | ||
| lambda_functions = [ | ||
| f"{BASE_NAME}-load-tenagree", | ||
| f"{BASE_NAME}-cash-file", | ||
|
Comment on lines
+24
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Between these 2 you have a nightly charges process that's the most resource-intensive of them all. Charges process depends on the load-tenagree as that's where the new accounts come from. |
||
| f"{BASE_NAME}-cash-file-trans", | ||
| f"{BASE_NAME}-direct-debit", | ||
| f"{BASE_NAME}-direct-debit-trans", | ||
| f"{BASE_NAME}-adjustments-trans", | ||
| f"{BASE_NAME}-action-diary", | ||
| f"{BASE_NAME}-susp-cash", | ||
| f"{BASE_NAME}-susp-hb", | ||
| f"{BASE_NAME}-refresh-cur-bal", | ||
| f"{BASE_NAME}-refresh-ma-cur-bal", | ||
| f"{BASE_NAME}-refresh-op-bal", | ||
| f"{BASE_NAME}-rent-position", | ||
| ] | ||
|
|
||
|
|
||
| def stream_logs(function_name: str, start_time: int): | ||
| """Poll CloudWatch Logs for the given function and stream them to the log file.""" | ||
| log_group = f"/aws/lambda/{function_name}" | ||
| last_timestamp = start_time | ||
|
|
||
| # We poll until we see the 'REPORT' line which indicates the end of execution | ||
| finished = False | ||
|
|
||
| while not finished: | ||
| try: | ||
| # Get the most recent log stream | ||
| streams = logs_client.describe_log_streams( | ||
| logGroupName=log_group, | ||
| orderBy="LastEventTime", | ||
| descending=True, | ||
| limit=1, | ||
| ).get("logStreams", []) | ||
|
|
||
| if not streams: | ||
| time.sleep(2) | ||
| continue | ||
|
|
||
| log_stream_name = streams[0]["logStreamName"] | ||
|
|
||
| # Fetch events since the last timestamp | ||
| events_resp = logs_client.get_log_events( | ||
| logGroupName=log_group, | ||
| logStreamName=log_stream_name, | ||
| startTime=last_timestamp, | ||
| startFromHead=True, | ||
| ) | ||
|
|
||
| events = events_resp.get("events", []) | ||
| for event in events: | ||
| msg = event["message"].strip() | ||
| # Update timestamp to avoid duplicates in next poll | ||
| last_timestamp = event["timestamp"] + 1 | ||
|
|
||
| # Write to file in real-time | ||
| logging.info(f"[{function_name}] {msg}") | ||
|
|
||
| # Check for completion markers | ||
| if "REPORT RequestId" in msg: | ||
| finished = True | ||
|
|
||
| # Small delay to prevent API throttling | ||
| time.sleep(1) | ||
|
|
||
| except Exception as e: | ||
| logging.error(f"Error streaming logs for {function_name}: {str(e)}") | ||
| break | ||
|
|
||
|
|
||
| def trigger_lambda(function_name: str) -> None: | ||
| """Trigger the Lambda asynchronously and start the log streaming loop.""" | ||
| print(f"🚀 Triggering Lambda: {function_name}") | ||
| logging.info(f"\n{'='*60}\nSTARTING EXECUTION: {function_name}\n{'='*60}") | ||
|
|
||
| # Capture the start time in milliseconds for CloudWatch filtering | ||
| start_ts = int(time.time() * 1000) | ||
|
|
||
| # Trigger the Lambda (Event type means it returns immediately) | ||
| response = lambda_client.invoke(FunctionName=function_name, InvocationType="Event") | ||
|
|
||
| if response["StatusCode"] == 202: | ||
| # Start the real-time log streaming loop | ||
| stream_logs(function_name, start_ts) | ||
| print(f"✅ {function_name} completed.") | ||
| else: | ||
| error_msg = ( | ||
| f"Failed to trigger {function_name}. Status: {response['StatusCode']}" | ||
| ) | ||
| print(f"❌ {error_msg}") | ||
| logging.error(error_msg) | ||
|
|
||
|
|
||
| def main() -> None: | ||
| """Execute the list of functions in sequence.""" | ||
| start_all = time.time() | ||
|
|
||
| # TODO: This could be improved by including running the ECS task for charges processing | ||
| for function in lambda_functions: | ||
| trigger_lambda(function) | ||
|
|
||
| total_duration = time.time() - start_all | ||
| summary = f"\nOvernight process finished in {total_duration/60:.2f} minutes." | ||
| print(summary) | ||
| logging.info(summary) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a left-over from DR testing