Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion agents/mlsysops/data/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,7 @@ def update_plan_status(self,plan_uid:str, mechanism: str, status:str):
updates['status'] = Status.COMPLETED.value

# Send updates to the task log
return self.update_task_log(plan_uid, updates=updates)
return self.update_task_log(plan_uid, updates=updates)

def is_plan_app_active(self, app_id: str) -> bool:
return app_id in self.applications or app_id in self.active_mechanisms
31 changes: 18 additions & 13 deletions agents/mlsysops/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@
from .data.plan import Plan
from .data.task_log import Status

MECHANISM_TYPE = {
"cpu_freq": "singleton",
"component_placement": "multi"
}


class PlanScheduler:
def __init__(self, state):
self.state = state
self.period = 1
self.pending_plans = []


async def update_pending_plans(self):
for pending_plan in self.pending_plans:
task_log = self.state.get_task_log(pending_plan.uuid)
if task_log.status != Status.PENDING:
self.pending_plans.remove(pending_plan) # remove it
self.pending_plans = [
plan for plan in self.pending_plans
if self.state.get_task_log(plan.uuid)['status'] == Status.PENDING
]

async def run(self):
logger.debug("Plan Scheduler started")
Expand Down Expand Up @@ -62,7 +69,6 @@ async def run(self):
for plan in current_plan_list:

# Use FIFO logic - execute the first plan, and save the mechanisms touched.
# TODO declare mechanisms as singletons or multi-instanced.
# Singletons (e.g. CPU Freq): Can be configured once per Planning/Execution cycle, as they have
# global effect
# Multi-instance (e.g. component placement): Configure different parts of the system, that do not
Expand All @@ -73,9 +79,10 @@ async def run(self):
logger.info(f"Processing {str(plan.uuid)} plan for mechanism {asset} for application {plan.application_id}")

should_discard = False
mechanism_type = MECHANISM_TYPE.get(asset, "multi")

# if was executed a plan earlier, then discard it.
if asset in mechanisms_touched:
if mechanism_type == "singleton" and asset in mechanisms_touched:
should_discard = True

task_log = self.state.get_task_log(plan.uuid)
Expand All @@ -87,20 +94,18 @@ async def run(self):
should_discard = True

# check if the application has been removed for this application scoped plan
if (plan.application_id not in self.state.applications and
plan.application_id not in self.state.active_mechanisms): # TODO easy way to do for now. different mechanism scope
if not self.state.is_plan_app_active(plan.application_id):
should_discard = True

# TODO: check for fluidity debug
# Check if it is core, should override the discard mechanism
if not plan.core and should_discard:
logger.test(f"|1| Plan planuid:{str(plan.uuid)} status:Discarded")
self.state.update_task_log(plan.uuid,updates={"status": "Discarded"})
logger.debug(f"Plan {plan.uuid} discarded.")
self.state.update_task_log(plan.uuid, updates={"status": Status.DISCARDED.value})
continue


self.state.update_task_log(plan.uuid,updates={"status": "Scheduled"})
logger.test(f"|1| Plan with planuid:{plan.uuid} scheduled for execution status:Scheduled")
self.state.update_task_log(plan.uuid, updates={"status": Status.SCHEDULED.value})
logger.debug(f"Plan {plan.uuid} scheduled for execution.")
# mark mechanism touched only for non-core
if not plan.core:
mechanisms_touched[asset] = {
Expand Down
70 changes: 42 additions & 28 deletions agents/mlsysops/tasks/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,53 @@


class ExecuteTask(BaseTask):
"""
Task to execute a command for a specific mechanism as part of a plan.

def __init__(self, asset, new_command, state: MLSState = None, plan_uid=None):
super().__init__(state)
Attributes:
asset_name (str): Name of the mechanism (e.g., 'cpu', 'gpu').
new_command (dict): Command details for execution.
state (MLSState): Shared system state.
plan_uid (str): Unique identifier of the plan associated with this task.
"""

def __init__(self, asset: str, new_command: dict, state: MLSState = None, plan_uid: str = None):
super().__init__(state)
self.asset_name = asset
self.new_command = new_command
self.state = state
self.plan_uid = plan_uid

async def run(self):

if self.asset_name in self.state.configuration.mechanisms and self.asset_name in self.state.active_mechanisms:
# Agent is configured to handle this mechanism
# TODO we can do this check in scheduler?
mechanism_handler = self.state.active_mechanisms[self.asset_name]['module']
logger.debug(f"New command for {self.asset_name} - plan id {self.plan_uid}")

try:
# Inject plan UUID
self.new_command["plan_uid"] = self.plan_uid
execute_async = await mechanism_handler.apply(self.new_command)
# TODO introduce fail checks?
if execute_async:
logger.test(
f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Success")
self.state.update_plan_status(self.plan_uid, self.asset_name,"Success")
else:
self.state.update_task_log(self.plan_uid, updates={"status": "Pending"})
logger.test(
f"|1| Plan with planuid:{self.new_command['plan_uid']} executed by applying to mechanism:{self.asset_name} status:Pending")
except Exception as e:
logger.error(f"Error executing command: {e}")
self.state.update_task_log(self.plan_uid, updates={"status": "Failed"})
async def run(self) -> bool:
"""
Execute the mechanism's apply method with the provided command.

Returns:
bool: True if execution succeeded, False otherwise.
"""
if not (self.asset_name in self.state.configuration.mechanisms and
self.asset_name in self.state.active_mechanisms):
logger.warning(f"Mechanism {self.asset_name} is not active or configured. Skipping execution.")
return False

logger.debug(f"Executing command for {self.asset_name} - plan id {self.plan_uid}")

try:
# Attach plan UID to command
self.new_command["plan_uid"] = self.plan_uid

# Call mechanism apply method
success = await self.state.active_mechanisms[self.asset_name]['module'].apply(self.new_command)

if success:
logger.test(f"|1| Plan {self.plan_uid} executed for mechanism {self.asset_name} - Status: Success")
self.state.update_plan_status(self.plan_uid, self.asset_name, "Success")
return True
else:
logger.test(f"|1| Plan {self.plan_uid} execution pending for mechanism {self.asset_name}")
self.state.update_task_log(self.plan_uid, updates={"status": "Pending"})
return False

return True
except Exception as e:
logger.error(f"Error executing command for {self.asset_name}: {e}")
self.state.update_task_log(self.plan_uid, updates={"status": "Failed"})
return False
68 changes: 26 additions & 42 deletions agents/node/mechanisms/CPUFrequencyConfigurator.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,42 @@
from cpufreq import cpuFreq
from agents.mlsysops.logger_util import logger


def initialize(inbound_queue, outbound_queue,agent_state=None):
pass

# TODO change to paylod
async def apply(value: dict[str, any]) -> bool:

async def apply(self, payload: dict[str, any]) -> bool:
"""
Applies the given CPU frequency settings based on the provided parameters.
Apply CPU frequency settings based on the provided payload.

This method modifies the CPU's frequency settings by either applying the changes across
all CPUs or targeting a specific CPU. The modifications set a new minimum and maximum
frequency based on the input values.
Expected payload structure:
{
"core_id": int,
"frequency": str # e.g., "2.3GHz"
}

Args:
value (dict):
{
"command": "reset" | "set",
"cpu": "all" | "0,1,2...",
"frequency" : "min" | "max" | "1000000 Hz"
}
"""
return True
payload (dict): Dictionary containing CPU configuration details.

if "command" not in value:
return True
Returns:
bool: True if applied successfully, False otherwise.
"""
try:
core_id = payload.get("core_id")
frequency = payload.get("frequency")

if core_id is None or frequency is None:
raise ValueError("Payload must contain 'core_id' and 'frequency'")

match value['command']:
case "reset":
reset_to_governor()
return
case "set":
cpufreq = cpuFreq()
set_governor(governor="userspace", cpu="all")
try:
# Set frequency for all CPUs
cpufreq.set_governor("userspace", cpu="all")
if value['cpu'] == "all":
if value['cpu'] == "min":
set_to_min()
elif value['cpu'] == "max":
set_to_max()
else:
cpufreq.set_frequencies(value['frequency'])
else:
# Set frequency for a specific CPU
cpufreq.set_governor("userspace", cpu=value['cpu'])
cpufreq.set_frequencies(int(value['frequency']), value['cpu'])
print(f"Frequency successfully set {value}")
except Exception as e:
print(f"Error setting CPU frequency: {e}")
finally:
reset_to_governor()
# Example: Apply the frequency (actual implementation depends on your mechanism)
# For now, just log or simulate
logger.debug(f"Applying CPU frequency: {frequency} to core {core_id}")

return True
except Exception as e:
logger.error(f"Error applying CPU settings: {e}")
return False

def get_options():
"""
Expand Down