From 4b63756db7d1f356189d81fe5bbd6f86d00fd501 Mon Sep 17 00:00:00 2001 From: Foivos-Pournaropoulos Date: Fri, 8 Aug 2025 08:24:35 +0000 Subject: [PATCH] Added logic for relocation delay measurement --- agents/cluster/mechanisms/fluidity.py | 82 ++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 13 deletions(-) diff --git a/agents/cluster/mechanisms/fluidity.py b/agents/cluster/mechanisms/fluidity.py index 66991ac..52e87a6 100644 --- a/agents/cluster/mechanisms/fluidity.py +++ b/agents/cluster/mechanisms/fluidity.py @@ -14,6 +14,7 @@ # # # # +import time import asyncio import json import traceback @@ -21,13 +22,12 @@ from dataclasses import dataclass, field from typing import Dict from pydantic import ValidationError - +import mlstelemetry import fluidity.controller as fluidity_controller from fluidity.plan_payload import FluidityPlanPayload from fluidity.internal_payload import FluidityEvent -# from mlsysops.logger_util import logger from mlsysops import MessageEvents import mlsysops from mlsysops.logger_util import logger @@ -43,8 +43,10 @@ class FluidityMechanism: internal_queue_inbound = None internal_queue_outbound = None state = None + relocation_state = {} fluidity_proxy_plans = {} - + mls_client = mlstelemetry.MLSTelemetry("cluster_agent", "fluidity_mechanism") + def __init__(self, mlsysops_inbound_queue=None, mlsysops_outbound_queue=None, agent_state=None): self.inbound_queue = mlsysops_inbound_queue self.outbound_queue = mlsysops_outbound_queue @@ -108,12 +110,6 @@ async def internal_message_listener(self): # Listen to fluidity messages message = await self.internal_queue_inbound.get() - # Log or save message for debugging - with open("fluidity_dump.json", "w") as file: - file.write(json.dumps(message, skipkeys=True, indent=4, default=str, ensure_ascii=False, sort_keys=True, - separators=(',', ': '))) - file.write(",\n") # Ensure a newline is added after the content - event = message.get("event") if not event: continue @@ -258,7 +254,11 @@ async def internal_message_listener(self): # Handle application created event payload = message.get("payload", {}) app_name = payload.get("name") - del self.state["applications"][app_name] + + if app_name in self.state["applications"]: + del self.state["applications"][app_name] + else: + logger.warning(f"App {app_name} not in state dictionary. Ignoring.") elif event == MessageEvents.POD_MODIFIED.value: # Handle pod modified event @@ -273,8 +273,10 @@ async def internal_message_listener(self): app_name = pod_labels.get("mlsysops.eu/app") component_name = pod_labels.get("mlsysops.eu/component") component_uid = pod_labels.get("mlsysops.eu/componentUID") + plan_uid = pod_labels.get("mlsysops.eu/planUID", None) if app_name and app_name in self.state["applications"] and component_name: + logger.debug(f"Pod modified event caught for comp {component_name} of app {app_name}") # Update the application component state app = self.state["applications"][app_name] components = app.get("components", {}) @@ -285,9 +287,35 @@ async def internal_message_listener(self): "node_placed": node_name }) - # Test log - logger.test( - f"|3| Fluidity mechanism planuid:{pod_labels.get('mlsysops.eu/planUID','-')} pod modification status:Success") + if plan_uid and plan_uid in self.relocation_state and component_name in self.relocation_state[plan_uid]: + logger.debug(f"Pod name {pod_metadata['name']}") + logger.debug(f"Pod state {pod_state}") + logger.debug(f"Host name {node_name}") + + start_timestamp = self.relocation_state[plan_uid][component_name]['start'] + # Get timestamp of modification event of the new pod + curr_timestamp = time.perf_counter() + diff = curr_timestamp - start_timestamp + + # If the new Pod is deployed successfully on the new host, record the delay + if pod_state == 'Running' and node_name == self.relocation_state[plan_uid][component_name]['dst']: + logger.info(f"Relocation delay is {diff}") + self.mls_client.pushMetric("relocation_delay", "gauge", diff, {"comp_name":component_name}) + logger.debug(f"Removing {component_name} from relocation state of plan {plan_uid}") + + # Remove entry from dictionary + del self.relocation_state[plan_uid][component_name] + if self.relocation_state[plan_uid] == {}: + logger.debug(f"relocation_state for plan {plan_uid} is empty.") + del self.relocation_state[plan_uid] + # If the Pod is in Pending state, we measure the delay until the call to kubernetes is done + elif pod_state == 'Pending' and node_name == self.relocation_state[plan_uid][component_name]['dst']: + logger.debug(f"New Pod start delay is {diff}") + self.mls_client.pushMetric("deployment_delay", "gauge", diff, {"comp_name":component_name}) + + # Test log + logger.test( + f"|3| Fluidity mechanism planuid:{pod_labels.get('mlsysops.eu/planUID','-')} pod modification status:Success") # Update the node's state if node_name: @@ -361,6 +389,34 @@ async def apply(plan): try: await fluidity_mechanism_instance.internal_queue_outbound.put(msg) logger.test(f"|1| Fluidity mechanism forwarded planuid:{plan['plan_uid']} to Fluidity status:True") + + logger.debug(f"In apply, plan_uid {plan['plan_uid']}") + logger.debug(f"Plan {plan}") + + for comp in plan['deployment_plan']: + if comp == 'initial_plan': + continue + # If 'move' action in a plan for a given component, + # Store the new plan info (plan_uid, plan creation timestamp, component names, src/dst nodes) + for action_entry in plan['deployment_plan'][comp]: + if action_entry['action'] == 'move': + if plan['plan_uid'] not in fluidity_mechanism_instance.relocation_state: + logger.debug(f"Creating entry for plan_uid {plan['plan_uid']}") + fluidity_mechanism_instance.relocation_state[plan['plan_uid']] = {} + + curr_timestamp = time.perf_counter() + + src = action_entry['src_host'] + dst = action_entry['target_host'] + + logger.debug(f"Found move action for component {comp} from {src} to {dst}") + fluidity_mechanism_instance.relocation_state[plan['plan_uid']][comp] = { + 'start': curr_timestamp, + 'src': src, + 'dst': dst + } + + logger.debug(f"Current timestamp {curr_timestamp}") except Exception as e: logger.debug("Error in sending message to fluidity")