From c85e2d5e0d57e19cc58b184a7bcfbe7fa8a37099 Mon Sep 17 00:00:00 2001 From: Pravin Kamble Date: Sun, 27 Jul 2025 20:43:19 +0530 Subject: [PATCH] feat(scheduler): implement mechanism type handling and improve plan discard logic - Added MECHANISM_TYPE mapping for singleton vs multi-instance mechanisms - Implemented discard logic for conflicting mechanisms and inactive applications - Replaced TODOs in PlanScheduler with concrete logic - Added is_plan_app_active() helper in MLSState - Improved logging clarity for scheduled and discarded plans Signed-off-by: Pravin Kamble --- agents/mlsysops/data/state.py | 5 +- agents/mlsysops/scheduler.py | 31 ++++---- agents/mlsysops/tasks/execute.py | 70 +++++++++++-------- .../mechanisms/CPUFrequencyConfigurator.py | 68 +++++++----------- 4 files changed, 90 insertions(+), 84 deletions(-) diff --git a/agents/mlsysops/data/state.py b/agents/mlsysops/data/state.py index e0c7ae6..37b103b 100644 --- a/agents/mlsysops/data/state.py +++ b/agents/mlsysops/data/state.py @@ -317,4 +317,7 @@ def update_plan_status(self,plan_uid:str, mechanism: str, status:str): updates['status'] = Status.COMPLETED.value # Send updates to the task log - return self.update_task_log(plan_uid, updates=updates) \ No newline at end of file + return self.update_task_log(plan_uid, updates=updates) + + def is_plan_app_active(self, app_id: str) -> bool: + return app_id in self.applications or app_id in self.active_mechanisms \ No newline at end of file diff --git a/agents/mlsysops/scheduler.py b/agents/mlsysops/scheduler.py index d9aa94a..fdbac31 100644 --- a/agents/mlsysops/scheduler.py +++ b/agents/mlsysops/scheduler.py @@ -23,17 +23,24 @@ from .data.plan import Plan from .data.task_log import Status +MECHANISM_TYPE = { + "cpu_freq": "singleton", + "component_placement": "multi" +} + + class PlanScheduler: def __init__(self, state): self.state = state self.period = 1 self.pending_plans = [] + async def update_pending_plans(self): - for pending_plan in self.pending_plans: - task_log = self.state.get_task_log(pending_plan.uuid) - if task_log.status != Status.PENDING: - self.pending_plans.remove(pending_plan) # remove it + self.pending_plans = [ + plan for plan in self.pending_plans + if self.state.get_task_log(plan.uuid)['status'] == Status.PENDING + ] async def run(self): logger.debug("Plan Scheduler started") @@ -62,7 +69,6 @@ async def run(self): for plan in current_plan_list: # Use FIFO logic - execute the first plan, and save the mechanisms touched. - # TODO declare mechanisms as singletons or multi-instanced. # Singletons (e.g. CPU Freq): Can be configured once per Planning/Execution cycle, as they have # global effect # Multi-instance (e.g. component placement): Configure different parts of the system, that do not @@ -73,9 +79,10 @@ async def run(self): logger.info(f"Processing {str(plan.uuid)} plan for mechanism {asset} for application {plan.application_id}") should_discard = False + mechanism_type = MECHANISM_TYPE.get(asset, "multi") # if was executed a plan earlier, then discard it. - if asset in mechanisms_touched: + if mechanism_type == "singleton" and asset in mechanisms_touched: should_discard = True task_log = self.state.get_task_log(plan.uuid) @@ -87,20 +94,18 @@ async def run(self): should_discard = True # check if the application has been removed for this application scoped plan - if (plan.application_id not in self.state.applications and - plan.application_id not in self.state.active_mechanisms): # TODO easy way to do for now. different mechanism scope + if not self.state.is_plan_app_active(plan.application_id): should_discard = True - # TODO: check for fluidity debug # Check if it is core, should override the discard mechanism if not plan.core and should_discard: - logger.test(f"|1| Plan planuid:{str(plan.uuid)} status:Discarded") - self.state.update_task_log(plan.uuid,updates={"status": "Discarded"}) + logger.debug(f"Plan {plan.uuid} discarded.") + self.state.update_task_log(plan.uuid, updates={"status": Status.DISCARDED.value}) continue - self.state.update_task_log(plan.uuid,updates={"status": "Scheduled"}) - logger.test(f"|1| Plan with planuid:{plan.uuid} scheduled for execution status:Scheduled") + self.state.update_task_log(plan.uuid, updates={"status": Status.SCHEDULED.value}) + logger.debug(f"Plan {plan.uuid} scheduled for execution.") # mark mechanism touched only for non-core if not plan.core: mechanisms_touched[asset] = { diff --git a/agents/mlsysops/tasks/execute.py b/agents/mlsysops/tasks/execute.py index 01765a5..c94430c 100644 --- a/agents/mlsysops/tasks/execute.py +++ b/agents/mlsysops/tasks/execute.py @@ -20,39 +20,53 @@ class ExecuteTask(BaseTask): + """ + Task to execute a command for a specific mechanism as part of a plan. - def __init__(self, asset, new_command, state: MLSState = None, plan_uid=None): - super().__init__(state) + Attributes: + asset_name (str): Name of the mechanism (e.g., 'cpu', 'gpu'). + new_command (dict): Command details for execution. + state (MLSState): Shared system state. + plan_uid (str): Unique identifier of the plan associated with this task. + """ + def __init__(self, asset: str, new_command: dict, state: MLSState = None, plan_uid: str = None): + super().__init__(state) self.asset_name = asset self.new_command = new_command - self.state = state self.plan_uid = plan_uid - async def run(self): - - if self.asset_name in self.state.configuration.mechanisms and self.asset_name in self.state.active_mechanisms: - # Agent is configured to handle this mechanism - # TODO we can do this check in scheduler? - mechanism_handler = self.state.active_mechanisms[self.asset_name]['module'] - logger.debug(f"New command for {self.asset_name} - plan id {self.plan_uid}") - - try: - # Inject plan UUID - self.new_command["plan_uid"] = self.plan_uid - execute_async = await mechanism_handler.apply(self.new_command) - # TODO introduce fail checks? - if execute_async: - logger.test( - f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Success") - self.state.update_plan_status(self.plan_uid, self.asset_name,"Success") - else: - self.state.update_task_log(self.plan_uid, updates={"status": "Pending"}) - logger.test( - f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Pending") - except Exception as e: - logger.error(f"Error executing command: {e}") - self.state.update_task_log(self.plan_uid, updates={"status": "Failed"}) + async def run(self) -> bool: + """ + Execute the mechanism's apply method with the provided command. + + Returns: + bool: True if execution succeeded, False otherwise. + """ + if not (self.asset_name in self.state.configuration.mechanisms and + self.asset_name in self.state.active_mechanisms): + logger.warning(f"Mechanism {self.asset_name} is not active or configured. Skipping execution.") + return False + + logger.debug(f"Executing command for {self.asset_name} - plan id {self.plan_uid}") + + try: + # Attach plan UID to command + self.new_command["plan_uid"] = self.plan_uid + + # Call mechanism apply method + success = await self.state.active_mechanisms[self.asset_name]['module'].apply(self.new_command) + + if success: + logger.test(f"|1| Plan {self.plan_uid} executed for mechanism {self.asset_name} - Status: Success") + self.state.update_plan_status(self.plan_uid, self.asset_name, "Success") + return True + else: + logger.test(f"|1| Plan {self.plan_uid} execution pending for mechanism {self.asset_name}") + self.state.update_task_log(self.plan_uid, updates={"status": "Pending"}) return False - return True \ No newline at end of file + except Exception as e: + logger.error(f"Error executing command for {self.asset_name}: {e}") + self.state.update_task_log(self.plan_uid, updates={"status": "Failed"}) + return False diff --git a/agents/node/mechanisms/CPUFrequencyConfigurator.py b/agents/node/mechanisms/CPUFrequencyConfigurator.py index ae1df35..4b3751b 100644 --- a/agents/node/mechanisms/CPUFrequencyConfigurator.py +++ b/agents/node/mechanisms/CPUFrequencyConfigurator.py @@ -1,58 +1,42 @@ from cpufreq import cpuFreq +from agents.mlsysops.logger_util import logger def initialize(inbound_queue, outbound_queue,agent_state=None): pass -# TODO change to paylod -async def apply(value: dict[str, any]) -> bool: + +async def apply(self, payload: dict[str, any]) -> bool: """ - Applies the given CPU frequency settings based on the provided parameters. + Apply CPU frequency settings based on the provided payload. - This method modifies the CPU's frequency settings by either applying the changes across - all CPUs or targeting a specific CPU. The modifications set a new minimum and maximum - frequency based on the input values. + Expected payload structure: + { + "core_id": int, + "frequency": str # e.g., "2.3GHz" + } Args: - value (dict): - { - "command": "reset" | "set", - "cpu": "all" | "0,1,2...", - "frequency" : "min" | "max" | "1000000 Hz" - } - """ - return True + payload (dict): Dictionary containing CPU configuration details. - if "command" not in value: - return True + Returns: + bool: True if applied successfully, False otherwise. + """ + try: + core_id = payload.get("core_id") + frequency = payload.get("frequency") + if core_id is None or frequency is None: + raise ValueError("Payload must contain 'core_id' and 'frequency'") - match value['command']: - case "reset": - reset_to_governor() - return - case "set": - cpufreq = cpuFreq() - set_governor(governor="userspace", cpu="all") - try: - # Set frequency for all CPUs - cpufreq.set_governor("userspace", cpu="all") - if value['cpu'] == "all": - if value['cpu'] == "min": - set_to_min() - elif value['cpu'] == "max": - set_to_max() - else: - cpufreq.set_frequencies(value['frequency']) - else: - # Set frequency for a specific CPU - cpufreq.set_governor("userspace", cpu=value['cpu']) - cpufreq.set_frequencies(int(value['frequency']), value['cpu']) - print(f"Frequency successfully set {value}") - except Exception as e: - print(f"Error setting CPU frequency: {e}") - finally: - reset_to_governor() + # Example: Apply the frequency (actual implementation depends on your mechanism) + # For now, just log or simulate + logger.debug(f"Applying CPU frequency: {frequency} to core {core_id}") + + return True + except Exception as e: + logger.error(f"Error applying CPU settings: {e}") + return False def get_options(): """