From 67d358f5a588b66fd8c39910ef001a96618b722c Mon Sep 17 00:00:00 2001 From: Pravin Kamble Date: Mon, 28 Jul 2025 13:10:17 +0530 Subject: [PATCH] Replace print() with logger across codebase for better logging Signed-off-by: Pravin Kamble --- agents/cluster/fluidity/crds_api.py | 3 - agents/cluster/fluidity/deploy.py | 4 +- agents/cluster/fluidity/internal_payload.py | 9 +- .../mvp/policies/policy-changeCompSpec.py | 2 +- .../mvp/policies/policy-relocateComponents.py | 2 +- .../mvp/policies/policy-staticPlacement.py | 2 +- agents/cluster/fluidity/plan_payload.py | 5 +- agents/cluster/mechanisms/fluidity.py | 8 +- .../cluster/policies/policy-changeCompSpec.py | 2 +- .../policies/policy-relocateComponents.py | 2 +- agents/continuum/MLSContinuumAgent.py | 10 +- agents/continuum/main.py | 2 +- .../templates/YAMLToJSONConverter.py | 16 +-- agents/deployments/deploy.py | 59 +++++------ agents/mlsysops/agent.py | 12 +-- agents/mlsysops/controllers/application.py | 2 +- agents/mlsysops/controllers/libs/otel_pods.py | 6 +- .../spade/behaviors/APIPingBehaviour.py | 8 +- .../CheckInactiveClustersBehaviour.py | 2 +- .../Check_ml_deployment_Behaviour.py | 16 +-- .../spade/behaviors/ML_process_Behaviour.py | 10 +- .../behaviors/ManagementModeBehaviour.py | 8 +- .../behaviors/MessageReceivingBehavior.py | 2 +- .../spade/behaviors/SubscribeBehavior.py | 2 +- agents/mlsysops/spade/mls_spade.py | 12 +-- agents/mlsysops/spade/redis_mgt.py | 28 +++--- agents/node/MLSNodeAgent.py | 6 +- .../mechanisms/CPUFrequencyConfigurator.py | 19 ++-- agents/node/mechanisms/fluidity_proxy.py | 5 +- .../tests/cluster_agent/cluster_agent_test.py | 39 ++++---- agents/tests/create_descriptions.py | 17 ++-- mlconnector/drift_app/app.py | 7 +- mlconnector/drift_app/manage_s3.py | 28 +++--- mlconnector/src/db/db_setup.py | 4 +- mlconnector/src/db/redis_setup.py | 6 +- mlconnector/src/migrations/env.py | 2 +- .../src/utils/api/generate_dockerfile.py | 33 ++++--- mlconnector/src/utils/generate_train.py | 17 ++-- mlconnector/src/utils/get_model.py | 3 +- mlconnector/src/utils/manage_s3.py | 28 +++--- mlconnector/src/utils/mldeployments.py | 27 +++--- mlconnector/src/utils/mlmodels.py | 10 +- mlconnector/src/utils/mltrainings.py | 7 +- mlconnector/xai-server-app/ShapExplainer.py | 3 +- mlconnector/xai-server-app/database.py | 11 ++- mlconnector/xai-server-app/manage_s3.py | 25 ++--- mlconnector/xai-server-app/server.py | 5 +- mlsysops-cli/mlsysops_cli/cli.py | 7 +- .../deployment/create_descriptions.py | 17 ++-- .../mlsysops_cli/deployment/deploy.py | 97 ++++++++++--------- .../deployment/descriptions_util.py | 9 +- .../schema generator/generate_model.py | 21 ++-- northbound-api/endpoints/applications.py | 19 ++-- northbound-api/endpoints/infrastructure.py | 13 +-- northbound-api/endpoints/management.py | 23 ++--- northbound-api/endpoints/ml_models.py | 9 +- northbound-api/getallpods.py | 7 +- northbound-api/karmada_info.py | 9 +- northbound-api/redis_setup/redis_mgt.py | 34 +++---- orchestrators/launch.py | 13 +-- 60 files changed, 422 insertions(+), 392 deletions(-) diff --git a/agents/cluster/fluidity/crds_api.py b/agents/cluster/fluidity/crds_api.py index f5cfda6..3c325ad 100644 --- a/agents/cluster/fluidity/crds_api.py +++ b/agents/cluster/fluidity/crds_api.py @@ -20,7 +20,6 @@ """ from __future__ import print_function # import json -import logging import os # import sys @@ -118,7 +117,6 @@ def register_all_fluidity_crd(): for crd_info in CRDS_INFO_LIST: if crd_info['singular'] not in current_crds_names: logger.info('Creating Fluidity CRD: %s', crd_info['kind']) - print('Creating Fluidity CRD: %s' % crd_info['kind']) try: yaml = YAML(typ='safe') with open(crd_info['crd_file'], 'r') as data: @@ -131,5 +129,4 @@ def register_all_fluidity_crd(): ext_api.create_custom_resource_definition(body) except ApiException as exc: logger.exception('%s update failed: %s', crd_info['kind'], exc) - print('%s update failed: %s' % (crd_info['kind'], exc)) raise FluidityCrdsApiException from exc diff --git a/agents/cluster/fluidity/deploy.py b/agents/cluster/fluidity/deploy.py index 7db9d48..b0f385f 100644 --- a/agents/cluster/fluidity/deploy.py +++ b/agents/cluster/fluidity/deploy.py @@ -194,7 +194,7 @@ def create_svc(svc_manifest): resp = core_api.read_namespaced_service( name=svc_manifest['metadata']['name'], namespace=cluster_config.NAMESPACE) - #print(resp) + #logger(resp) except ApiException as exc: if exc.status != 404: logger.error('Unknown error reading service: %s', exc) @@ -212,7 +212,7 @@ def create_svc(svc_manifest): try: svc_obj = core_api.create_namespaced_service(body=svc_manifest, namespace=cluster_config.NAMESPACE) - #print(svc_obj) + #logger(svc_obj) return svc_obj except ApiException as exc: logger.error('Failed to create service: %s', exc) diff --git a/agents/cluster/fluidity/internal_payload.py b/agents/cluster/fluidity/internal_payload.py index f9d3e25..ab9cee5 100644 --- a/agents/cluster/fluidity/internal_payload.py +++ b/agents/cluster/fluidity/internal_payload.py @@ -26,6 +26,7 @@ from typing import Optional, Dict, List, Any from enum import Enum from mlsysops.events import MessageEvents +from agents.mlsysops.logger_util import logger # --------------- SHARED COMPONENTS --------------- # @@ -317,8 +318,8 @@ def get_payload_model(self): try: event = FluidityEvent(**example) parsed_payload = event.get_payload_model() - print(f"Event: {example['event']}") - print(parsed_payload.model_dump_json(indent=4)) + logger.info(f"Event: {example['event']}") + logger.info(parsed_payload.model_dump_json(indent=4)) except Exception as e: - print(f"Error processing event: {example['event']}") - print(f"Exception: {str(e)}") + logger.error(f"Error processing event: {example['event']}") + logger.error(f"Exception: {str(e)}") diff --git a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-changeCompSpec.py b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-changeCompSpec.py index 0bd8c5b..d347bbe 100644 --- a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-changeCompSpec.py +++ b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-changeCompSpec.py @@ -61,7 +61,7 @@ def parse_analyze_interval(interval: str) -> int: def initialize(): - print(f"Initializing policy {inspect.stack()[1].filename}") + logger.info(f"Initializing policy {inspect.stack()[1].filename}") initialContext = { "telemetry": { diff --git a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-relocateComponents.py b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-relocateComponents.py index d635a4b..ae8b77f 100644 --- a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-relocateComponents.py +++ b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-relocateComponents.py @@ -44,7 +44,7 @@ def parse_analyze_interval(interval: str) -> int: def initialize(): - print(f"Initializing policy {inspect.stack()[1].filename}") + logger.info(f"Initializing policy {inspect.stack()[1].filename}") initialContext = { "telemetry": { diff --git a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-staticPlacement.py b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-staticPlacement.py index db5a706..2064314 100644 --- a/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-staticPlacement.py +++ b/agents/cluster/fluidity/manifests/templates/mvp/policies/policy-staticPlacement.py @@ -7,7 +7,7 @@ from mlsysops.logger_util import logger def initialize(): - print(f"Initializing policy {inspect.stack()[1].filename}") + logger.info(f"Initializing policy {inspect.stack()[1].filename}") initialContext = { "telemetry": { diff --git a/agents/cluster/fluidity/plan_payload.py b/agents/cluster/fluidity/plan_payload.py index 7f7ff22..a048a08 100644 --- a/agents/cluster/fluidity/plan_payload.py +++ b/agents/cluster/fluidity/plan_payload.py @@ -19,6 +19,7 @@ from pydantic import BaseModel, Field, ValidationError, model_validator, RootModel from typing import Any, Dict, List, Optional, Union import re +from agents.mlsysops.logger_util import logger class PlatformRequirements(BaseModel): @@ -144,6 +145,6 @@ class FluidityPlanPayload(BaseModel): # Validate the payload try: validated_payload = FluidityPlanPayload(**payload) - print("Validation successful!") + logger.info("Validation successful!") except ValidationError as e: - print(f"Validation failed: {e}") \ No newline at end of file + logger.error(f"Validation failed: {e}") \ No newline at end of file diff --git a/agents/cluster/mechanisms/fluidity.py b/agents/cluster/mechanisms/fluidity.py index 66991ac..ae3e49d 100644 --- a/agents/cluster/mechanisms/fluidity.py +++ b/agents/cluster/mechanisms/fluidity.py @@ -363,8 +363,8 @@ async def apply(plan): logger.test(f"|1| Fluidity mechanism forwarded planuid:{plan['plan_uid']} to Fluidity status:True") except Exception as e: - logger.debug("Error in sending message to fluidity") - print(traceback.format_exc()) + logger.error("Error in sending message to fluidity") + logger.exception(traceback.format_exc()) return False @@ -376,8 +376,8 @@ async def send_message(msg): try: await fluidity_mechanism_instance.internal_queue_outbound.put(msg) except Exception as e: - logger.debug("Error in sending message to fluidity") - print(traceback.format_exc()) + logger.error("Error in sending message to fluidity") + logger.exception(traceback.format_exc()) def get_state(): diff --git a/agents/cluster/policies/policy-changeCompSpec.py b/agents/cluster/policies/policy-changeCompSpec.py index 6399256..b8ec70d 100644 --- a/agents/cluster/policies/policy-changeCompSpec.py +++ b/agents/cluster/policies/policy-changeCompSpec.py @@ -62,7 +62,7 @@ def parse_analyze_interval(interval: str) -> int: def initialize(): - print(f"Initializing policy {inspect.stack()[1].filename}") + logger.info(f"Initializing policy {inspect.stack()[1].filename}") initialContext = { "telemetry": { diff --git a/agents/cluster/policies/policy-relocateComponents.py b/agents/cluster/policies/policy-relocateComponents.py index 39f2f3e..de0f6fd 100644 --- a/agents/cluster/policies/policy-relocateComponents.py +++ b/agents/cluster/policies/policy-relocateComponents.py @@ -11,7 +11,7 @@ from mlsysops.utilities import evaluate_condition def initialize(): - print(f"Initializing policy {inspect.stack()[1].filename}") + logger.info(f"Initializing policy {inspect.stack()[1].filename}") initialContext = { "telemetry": { diff --git a/agents/continuum/MLSContinuumAgent.py b/agents/continuum/MLSContinuumAgent.py index 34949d6..0e892f1 100644 --- a/agents/continuum/MLSContinuumAgent.py +++ b/agents/continuum/MLSContinuumAgent.py @@ -85,7 +85,7 @@ async def run(self): except Exception as e: logger.error(f"Error in running tasks: {e}") - print("MLSAgent stopped.") + logger.critical("MLSAgent stopped.") async def message_queue_listener(self): """ @@ -118,11 +118,11 @@ async def message_queue_listener(self): "payload": {"name": data['name']}, }) else: - print(f"Unhandled event type: {event}") + logger(f"Unhandled event type: {event}") except Exception as e: - print(f"Error processing message: {e}") - logger.error(traceback.format_exc()) + logger.info(f"Error processing message: {e}") + logger.exception(traceback.format_exc()) async def apply_propagation_policies(self): """ @@ -412,5 +412,5 @@ async def get_karmada_clusters(self): return return_object except Exception as e: - print(f"Error retrieving clusters: {e}") + logger.error(f"Error retrieving clusters: {e}") return [] \ No newline at end of file diff --git a/agents/continuum/main.py b/agents/continuum/main.py index d7bfd7d..2972ed8 100644 --- a/agents/continuum/main.py +++ b/agents/continuum/main.py @@ -49,7 +49,7 @@ async def main(): await asyncio.gather(agent_task) except Exception as e: - print(f"An error occurred: {e}") + logger.error(f"An error occurred: {e}") if __name__ == "__main__": diff --git a/agents/continuum/templates/YAMLToJSONConverter.py b/agents/continuum/templates/YAMLToJSONConverter.py index b3a18ba..9d1b66a 100644 --- a/agents/continuum/templates/YAMLToJSONConverter.py +++ b/agents/continuum/templates/YAMLToJSONConverter.py @@ -2,7 +2,7 @@ import json from jsonschema import validate, ValidationError import argparse - +from agents.mlsysops.logger_util import logger def convert_yaml_crd_to_json(yaml_file: str, json_file: str): """ @@ -53,10 +53,10 @@ def convert_yaml_crd_to_json(yaml_file: str, json_file: str): with open(json_file, 'w') as f: json.dump(full_schema, f, indent=4) - print(f"JSON schema successfully written to {json_file}") + logger.info(f"JSON schema successfully written to {json_file}") except Exception as e: - print(f"Error occurred: {e}") + logger.info(f"Error occurred: {e}") def validate_yaml_against_schema(yaml_file: str, json_schema_file: str): """ @@ -80,11 +80,11 @@ def validate_yaml_against_schema(yaml_file: str, json_schema_file: str): # Validate the YAML data against the JSON schema validate(instance=yaml_data, schema=schema) - print(f"The YAML file '{yaml_file}' is valid according to the schema '{json_schema_file}'.") + logger.info(f"The YAML file '{yaml_file}' is valid according to the schema '{json_schema_file}'.") except ValidationError as ve: - print(f"Validation Error: {ve.message}") + logger.error(f"Validation Error: {ve.message}") except Exception as e: - print(f"Error: {e}") + logger.error(f"Error: {e}") #example usage if __name__ == "__main__": @@ -101,12 +101,12 @@ def validate_yaml_against_schema(yaml_file: str, json_schema_file: str): if args.command == "convert": # Convert the YAML CRD schema to a JSON schema if not args.schema: - print("Error: You must specify an output file for the JSON schema using --schema.") + logger.warning("Error: You must specify an output file for the JSON schema using --schema.") else: convert_yaml_crd_to_json(args.input, args.schema) elif args.command == "validate": # Validate a YAML file against a JSON schema if not args.schema: - print("Error: You must specify the JSON schema file for validation using --schema.") + logger.warning("Error: You must specify the JSON schema file for validation using --schema.") else: validate_yaml_against_schema(args.input, args.schema) diff --git a/agents/deployments/deploy.py b/agents/deployments/deploy.py index c35ede7..868731b 100644 --- a/agents/deployments/deploy.py +++ b/agents/deployments/deploy.py @@ -8,6 +8,7 @@ import os from jinja2 import Template import subprocess +from agents.mlsysops.logger_util import logger def get_method(kind, operation): @@ -130,16 +131,16 @@ def __init__(self, group=None, version=None, kubeconfig=None, context=None): # Load Kubernetes configuration for the specified environment and context if kubeconfig: - print(f"Loading kubeconfig from {kubeconfig}, context: {context}") + logger.info(f"Loading kubeconfig from {kubeconfig}, context: {context}") config.load_kube_config(config_file=kubeconfig, context=context) elif 'KUBERNETES_PORT' in os.environ: - print("Loading in-cluster Kubernetes configuration") + logger.info("Loading in-cluster Kubernetes configuration") config.load_incluster_config() else: - print(f"Loading default kubeconfig, context: {context}") + logger.info(f"Loading default kubeconfig, context: {context}") config.load_kube_config(context=context) - print(f"Kubernetes configuration loaded successfully") + logger.info(f"Kubernetes configuration loaded successfully") self.group = group self.version = version self.custom_objects_api = client.CustomObjectsApi() @@ -167,7 +168,7 @@ def parse_yaml(self,resource_file: str = None, template_variables: dict = {}) -> yaml = YAML(typ='safe') # Safe loading of YAML if resource_file is None: - print("No resource file specified.") + logger.warning("No resource file specified.") return None # Load the file and use Jinja2 for template rendering @@ -182,7 +183,7 @@ def parse_yaml(self,resource_file: str = None, template_variables: dict = {}) -> resources = list(yaml.load_all(rendered_template)) # Load all YAML resource definitions if not resources: - print(f"No resources found in file: {resource_file}") + logger.warning(f"No resources found in file: {resource_file}") return None # Define the order for sorting resources by 'kind' @@ -215,7 +216,7 @@ def create_custom_object(self, yaml_content): body=yaml_content, ) except ApiException as e: - print(f"Failed to apply kind '{yaml_content['kind']}' to Kubernetes API: {e}") + logger.critical(f"Failed to apply kind '{yaml_content['kind']}' to Kubernetes API: {e}") def update_custom_object(self, name, yaml_content): kind = yaml_content["kind"] @@ -230,7 +231,7 @@ def update_custom_object(self, name, yaml_content): body=yaml_content, ) except ApiException as e: - print(f"Failed to apply kind '{yaml_content['kind']}' to Kuberentes API: {e}") + logger.critical(f"Failed to apply kind '{yaml_content['kind']}' to Kuberentes API: {e}") def create_or_update(self,resource_yaml): @@ -240,7 +241,7 @@ def create_or_update(self,resource_yaml): kind = resource["kind"].lower() name = resource["metadata"].get("name","None") namespace = resource["metadata"].get("namespace") - print(f"Creating/Updating resource: {name} of kind {kind} in namespace {namespace} ") + logger.info(f"Creating/Updating resource: {name} of kind {kind} in namespace {namespace} ") if namespace is not None : existing_resource = get_method(kind, "read")(name,namespace=namespace) get_method(kind, "replace")(name=name, namespace=namespace, body=resource_yaml) @@ -249,13 +250,13 @@ def create_or_update(self,resource_yaml): existing_resource = get_method(kind, "read")(name) get_method(kind, "replace")(name=name,body=resource_yaml) - print(f"Updated resource: {name}") + logger.info(f"Updated resource: {name}") except KeyError as e: - print(f"Error parsing resource: {e}") + logger.error(f"Error parsing resource: {e}") return except client.exceptions.ApiException as e: if e.status == 404: - print(f"Resource '{name}' of kind '{kind}' not found. Creating it now. {namespace}") + logger.warning(f"Resource '{name}' of kind '{kind}' not found. Creating it now. {namespace}") if namespace is not None: if kind in ['serviceaccount','configmap','daemonset',"deployment", "service", "persistentvolumeclaim"]: get_method(kind, "create")(namespace=namespace, body=resource_yaml) @@ -265,13 +266,13 @@ def create_or_update(self,resource_yaml): else: get_method(kind, "create")(body=resource_yaml) else: - print(f"Error updating Service '{name}' in namespace '{namespace}': {e}") + logger.error(f"Error updating Service '{name}' in namespace '{namespace}': {e}") def create_configmap_from_file(self, descriptions_directory, namespace, name,suffixes=["*.yml", "*.yaml"]): directory = Path(descriptions_directory) if not directory.exists() or not directory.is_dir(): - print(f"Invalid directory: {descriptions_directory}") + logger.warning(f"Invalid directory: {descriptions_directory}") exit() # Iterate through files in the directory and filter YAML files @@ -281,7 +282,7 @@ def create_configmap_from_file(self, descriptions_directory, namespace, name,suf files_data_object = {} for single_file in file_paths: - print(f"Reading file: {single_file}") + logger.info(f"Reading file: {single_file}") with open(single_file, "r") as file: file_data = file.read() files_data_object[single_file.name] = file_data @@ -293,11 +294,11 @@ def create_configmap_from_file(self, descriptions_directory, namespace, name,suf ) try: self.core_v1_api.create_namespaced_config_map(namespace, config_map) - print(f"Created configmap {name}") + logger.info(f"Created configmap {name}") except ApiException as e: if e.status != 409: self.core_v1_api.replace_namespaced_config_map(name, namespace, config_map) - print(f"Updated configmap {name}") + logger.info(f"Updated configmap {name}") def delete(self, kind, namespace, name): @@ -305,7 +306,7 @@ def delete(self, kind, namespace, name): get_method(kind,"delete")(name=name, namespace=namespace) except client.exceptions.ApiException as e: if e.status != 404: - print(f"Error deleting Service: {e}") + logger.error(f"Error deleting Service: {e}") def get_karmada_clusters(self): """ @@ -339,7 +340,7 @@ def get_karmada_clusters(self): return return_object except Exception as e: - print(f"Error retrieving clusters: {e}") + logger.error(f"Error retrieving clusters: {e}") return [] def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, namespaced: bool = False, namespace: str = None): @@ -360,7 +361,7 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, group = "policy.karmada.io" version = "v1alpha1" - print( + logger.info( f"Applying resource '{policy_name}' with group: {group}, version: {version}, plural: {plural}, namespaced: {namespaced}" ) @@ -390,7 +391,7 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, resource_version = current_resource["metadata"]["resourceVersion"] policy_body["metadata"]["resourceVersion"] = resource_version - print(f"Resource '{policy_name}' exists. Updating it...") + logger.info(f"Resource '{policy_name}' exists. Updating it...") # Perform an update using replace if namespaced: @@ -410,12 +411,12 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, name=policy_name, body=policy_body ) - print(f"Resource '{policy_name}' updated successfully.") + logger.info(f"Resource '{policy_name}' updated successfully.") except ApiException as e: if e.status == 404: # If the resource doesn't exist, create a new one - print(f"Resource '{policy_name}' not found. Creating a new one...") + logger.warning(f"Resource '{policy_name}' not found. Creating a new one...") # Create the new resource if namespaced: @@ -433,12 +434,12 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, plural=plural, body=policy_body ) - print(f"New resource '{policy_name}' created successfully.") + logger.warning(f"New resource '{policy_name}' created successfully.") else: raise # Re-raise any non-404 exceptions except Exception as e: - print(f"Error applying resource '{policy_name}': {e}") + logger.error(f"Error applying resource '{policy_name}': {e}") def apply_mlsysops_propagation_policies(self): """ @@ -448,7 +449,7 @@ def apply_mlsysops_propagation_policies(self): # Extract cluster names where the cluster status is True (ready) cluster_names = [name for name, status in self.get_karmada_clusters().items() if status.lower() == 'true'] - print(f"Applying PropagationPolicy with cluster names: {cluster_names}") + logger.info(f"Applying PropagationPolicy with cluster names: {cluster_names}") env = Environment(loader=FileSystemLoader(searchpath="./")) # Load from "templates" dir @@ -471,7 +472,7 @@ def apply_mlsysops_propagation_policies(self): ) except Exception as e: - print(f"Error applying Cluster-Wide PropagationPolicy: {e}") + logger.error(f"Error applying Cluster-Wide PropagationPolicy: {e}") # Apply Simple PropagationPolicy try: @@ -493,10 +494,10 @@ def apply_mlsysops_propagation_policies(self): ) except Exception as e: - print(f"Error applying Simple PropagationPolicy: {e}") + logger.error(f"Error applying Simple PropagationPolicy: {e}") except Exception as e: - print(f"Error applying PropagationPolicies: {e}") + logger.error(f"Error applying PropagationPolicies: {e}") if __name__ == "__main__": diff --git a/agents/mlsysops/agent.py b/agents/mlsysops/agent.py index 0efe7c7..5527f3d 100644 --- a/agents/mlsysops/agent.py +++ b/agents/mlsysops/agent.py @@ -63,7 +63,7 @@ def __init__(self): except Exception as e: logger.error(f"Error initializing SPADE: {e}") - print("blahblahblah") + logger.info("blahblahblah") # Telemetry self.telemetry_controller = TelemetryController(self) @@ -133,15 +133,15 @@ async def message_queue_listener(self): """ Task to listen for messages from the message queue and act upon them. """ - print("Starting default Message Queue Listener...") + logger("Starting default Message Queue Listener...") while True: try: # Wait for a message from the queue (default behavior) message = await self.message_queue.get() - print(f"Received message: {message}") + logger(f"Received message: {message}") # Default handling logic (can be extended in subclasses) except Exception as e: - print(f"Error in message listener: {e}") + logger(f"Error in message listener: {e}") async def send_message_to_node(self, recipient, event, payload): """ @@ -186,7 +186,7 @@ async def run(self): Main process of the MLSAgent. """ # Apply MLS System description - print("In RUN of AGENT") + logger("In RUN of AGENT") try: if self.state.configuration.continuum_layer == 'cluster': logger.debug(f"Applying system description") @@ -210,7 +210,7 @@ async def run(self): await self.telemetry_controller.initialize() try: - print("In spade_instance_start") + logger("In spade_instance_start") await self.spade_instance.start(auto_register=True) except Exception as e: logger.error(f"Error starting SPADE: {traceback.format_exc()}") diff --git a/agents/mlsysops/controllers/application.py b/agents/mlsysops/controllers/application.py index 76532a4..34f39ce 100644 --- a/agents/mlsysops/controllers/application.py +++ b/agents/mlsysops/controllers/application.py @@ -122,7 +122,7 @@ async def run(self): """ while True: for app_id, app_object in MLSState.applications.items(): - print(f'Application {app_id}') + logger.debug(f'Application {app_id}') # Check periodically (adjust the sleep interval as needed) await asyncio.sleep(10) \ No newline at end of file diff --git a/agents/mlsysops/controllers/libs/otel_pods.py b/agents/mlsysops/controllers/libs/otel_pods.py index 8524761..bf78d68 100644 --- a/agents/mlsysops/controllers/libs/otel_pods.py +++ b/agents/mlsysops/controllers/libs/otel_pods.py @@ -461,7 +461,7 @@ async def create_svc(name_prefix=None,svc_manifest=None,selector=None): resp = core_api.read_namespaced_service( name=svc_manifest['metadata']['name'], namespace=namespace) - #print(resp) + #logger(resp) except ApiException as exc: if exc.status != 404: logger.error('Unknown error reading service: %s', exc) @@ -472,13 +472,13 @@ async def create_svc(name_prefix=None,svc_manifest=None,selector=None): resp = core_api.delete_namespaced_service( name=svc_manifest['metadata']['name'], namespace=namespace) - #print(resp) + #logger(resp) except ApiException as exc: logger.error('Failed to delete service: %s', exc) try: svc_obj = core_api.create_namespaced_service(body=svc_manifest, namespace=namespace) - #print(svc_obj) + #logger(svc_obj) return svc_obj except ApiException as exc: logger.error('Failed to create service: %s', exc) diff --git a/agents/mlsysops/spade/behaviors/APIPingBehaviour.py b/agents/mlsysops/spade/behaviors/APIPingBehaviour.py index 31e7b8c..a45ca13 100644 --- a/agents/mlsysops/spade/behaviors/APIPingBehaviour.py +++ b/agents/mlsysops/spade/behaviors/APIPingBehaviour.py @@ -33,17 +33,17 @@ async def run(self): # wait for a message for 10 seconds msg = await self.receive(timeout=1) if msg: - #print(str(msg._sender).split("/")[0]) - #print(msg.to) + #logger(str(msg._sender).split("/")[0]) + #logger(msg.to) logger.debug("Ping received with content: {}".format(msg.body)) # Create a response message resp = Message(to=str(msg._sender).split("/")[0]) # Replace with the actual recipient JID resp.set_metadata("performative", "ping") # Set the "inform" FIPA performative resp.body = "Response From " + str(msg.to) # Set the message content - #print(resp.body) + #logger(resp.body) # Send the response message await self.send(resp) - #print("Callback message sent!\n") + #logger("Callback message sent!\n") else: await asyncio.sleep(5) diff --git a/agents/mlsysops/spade/behaviors/CheckInactiveClustersBehaviour.py b/agents/mlsysops/spade/behaviors/CheckInactiveClustersBehaviour.py index 735f320..ed3d246 100644 --- a/agents/mlsysops/spade/behaviors/CheckInactiveClustersBehaviour.py +++ b/agents/mlsysops/spade/behaviors/CheckInactiveClustersBehaviour.py @@ -38,4 +38,4 @@ async def run(self): last_seen_time = datetime.fromisoformat(last_seen) if now - last_seen_time > self.timeout: self.r.remove_key(self.r.redis_dict, node_jid) - print(f"Node {node_jid} removed due to inactivity.") + logger.debug(f"Node {node_jid} removed due to inactivity.") diff --git a/agents/mlsysops/spade/behaviors/Check_ml_deployment_Behaviour.py b/agents/mlsysops/spade/behaviors/Check_ml_deployment_Behaviour.py index 9345c59..35d55a6 100644 --- a/agents/mlsysops/spade/behaviors/Check_ml_deployment_Behaviour.py +++ b/agents/mlsysops/spade/behaviors/Check_ml_deployment_Behaviour.py @@ -95,18 +95,18 @@ def get_node_ip(host, api_client): # internal_ip = None # external_ip = None # addresses = node.status.addresses -# print('Addresses ' + addresses) +# logger('Addresses ' + addresses) # for address in addresses: # if address.type == "ExternalIP": # external_ip = address.address -# print(f"Node: {node_name}, External IP: {external_ip}") +# logger(f"Node: {node_name}, External IP: {external_ip}") # elif address.type == "InternalIP": # internal_ip = address.address -# print(f"Node: {node_name}, Internal IP: {internal_ip}") +# logger(f"Node: {node_name}, Internal IP: {internal_ip}") # if external_ip == None: -# print('External IP not found for node that should be accessible externally.') +# logger('External IP not found for node that should be accessible externally.') # if internal_ip == None: -# print('Internal IP not found for node that should be accessible externally.') +# logger('Internal IP not found for node that should be accessible externally.') # else: # node_ip = internal_ip # else: @@ -204,12 +204,12 @@ async def run(self): # namespace=config.NAMESPACE) # except ApiException as exc: # if exc.status != 404: - # print('Unknown error reading service: ' + exc)n + # logger('Unknown error reading service: ' + exc)n # return None # # # Retrieve svc endpoint info # if svc_obj is None: - # print('Failed to read svc with name ' + self.comp_name) + # logger('Failed to read svc with name ' + self.comp_name) # # Add handling # # # Retrieve the assigned VIP:port @@ -231,7 +231,7 @@ async def run(self): # if global_endpoint_port and node_ip: # info['global_endpoint'] = node_ip + ':' + global_endpoint_port # - # print('Going to push to redis_conf endpoint_queue the value ' + str(info)) + # logger('Going to push to redis_conf endpoint_queue the value ' + str(info)) # # NOTE: PLACEHOLDER FOR REDIS - YOU CAN CHANGE THIS WITH ANOTHER TYPE OF COMMUNICATION # self.r.update_dict_value('endpoint_hash', self.model_id, str(info)) # diff --git a/agents/mlsysops/spade/behaviors/ML_process_Behaviour.py b/agents/mlsysops/spade/behaviors/ML_process_Behaviour.py index f33ab4a..95a534a 100644 --- a/agents/mlsysops/spade/behaviors/ML_process_Behaviour.py +++ b/agents/mlsysops/spade/behaviors/ML_process_Behaviour.py @@ -94,11 +94,11 @@ async def run(self): q_info = self.r.pop(self.r.ml_q) q_info = q_info.replace("'", '"') - print(q_info) + logger.info(q_info) data_queue = json.loads(q_info) if 'MLSysOpsApplication' not in data_queue: # probably it is removal - print(f"fffff {data_queue.keys()}") + logger.debug(f"fffff {data_queue.keys()}") for key in data_queue.keys(): model_id = key else: @@ -111,7 +111,7 @@ async def run(self): self.r.update_dict_value("ml_location", model_id, cluster_id) except KeyError: cluster_id = self.r.get_dict_value("ml_location", model_id) - print("CLUSTER ID " + str(cluster_id)) + logger.info("CLUSTER ID " + str(cluster_id)) group = "mlsysops.eu" version = "v1" @@ -140,9 +140,9 @@ async def run(self): self.r.remove_key("endpoint_hash", model_id) except ApiException as e: if e.status == 404: - print(f"Custom Resource '{name}' not found. Skipping deletion.") + logger.debug(f"Custom Resource '{name}' not found. Skipping deletion.") else: - print(f"Error deleting Custom Resource '{name}': {e}") + logger.debug(f"Error deleting Custom Resource '{name}': {e}") raise else: try: diff --git a/agents/mlsysops/spade/behaviors/ManagementModeBehaviour.py b/agents/mlsysops/spade/behaviors/ManagementModeBehaviour.py index 215d42c..2a0399a 100644 --- a/agents/mlsysops/spade/behaviors/ManagementModeBehaviour.py +++ b/agents/mlsysops/spade/behaviors/ManagementModeBehaviour.py @@ -33,10 +33,10 @@ async def run(self): # wait for a message for 10 seconds msg = await self.receive(timeout=10) if msg: - print(str(msg._sender).split("/")[0]) - print(msg.to) - print("Ping received with content: {}".format(msg.body)) + logger.debug(str(msg._sender).split("/")[0]) + logger.debug(msg.to) + logger.debug("Ping received with content: {}".format(msg.body)) else: - print("Did not received any message after 10 seconds") + logger.debug("Did not received any message after 10 seconds") await asyncio.sleep(10) \ No newline at end of file diff --git a/agents/mlsysops/spade/behaviors/MessageReceivingBehavior.py b/agents/mlsysops/spade/behaviors/MessageReceivingBehavior.py index 901810d..cb726c0 100644 --- a/agents/mlsysops/spade/behaviors/MessageReceivingBehavior.py +++ b/agents/mlsysops/spade/behaviors/MessageReceivingBehavior.py @@ -117,6 +117,6 @@ async def run(self): } await self.message_queue.put(payload) except Exception: - print("Exception ;-)") + logger.error("Exception ;-)") else: logger.debug("Did not received any message after 10 seconds") diff --git a/agents/mlsysops/spade/behaviors/SubscribeBehavior.py b/agents/mlsysops/spade/behaviors/SubscribeBehavior.py index c4638ea..4f5d6d5 100644 --- a/agents/mlsysops/spade/behaviors/SubscribeBehavior.py +++ b/agents/mlsysops/spade/behaviors/SubscribeBehavior.py @@ -43,7 +43,7 @@ async def run(self): logger.debug(self.agent_to_subscribe) logger.debug(msg.thread) await self.send(msg) - # print("Subscription sent!\n") + # logger("Subscription sent!\n") response = await self.receive(timeout=10) # Wait for a response diff --git a/agents/mlsysops/spade/mls_spade.py b/agents/mlsysops/spade/mls_spade.py index acff375..2485ce7 100644 --- a/agents/mlsysops/spade/mls_spade.py +++ b/agents/mlsysops/spade/mls_spade.py @@ -42,10 +42,10 @@ class MLSSpade(Agent): def __init__(self, state: MLSState, message_queue: asyncio.Queue): - print(state.configuration) - print(state) + logger.debug(state.configuration) + logger.debug(state) super().__init__(state.configuration.n_jid, state.configuration.n_pass) - print("AFTER INIT") + logger.debug("AFTER INIT") self.is_subscribed = None self.cluster = state.configuration.cluster @@ -53,9 +53,9 @@ def __init__(self, state: MLSState, message_queue: asyncio.Queue): self.snapshot_queue = Queue() self.message_queue = message_queue self.redis = RedisManager() - print("BEFORE redis connect") + logger.debug("BEFORE redis connect") self.redis.connect() - print("AFTER redis connect") + logger.debug("AFTER redis connect") self.state = state self.behaviours_config = state.configuration.behaviours self.behaviour_classes = { @@ -73,7 +73,7 @@ def __init__(self, state: MLSState, message_queue: asyncio.Queue): "FailoverBehaviour": FailoverBehavior, "Subscribe": Subscribe } - print("AFTER INIT END") + logger.debug("AFTER INIT END") async def send_message(self, recipient: str, event: str, payload: dict): behavior = MessageSendingBehavior(recipient, event, payload) diff --git a/agents/mlsysops/spade/redis_mgt.py b/agents/mlsysops/spade/redis_mgt.py index cb7c1b2..8eb4302 100644 --- a/agents/mlsysops/spade/redis_mgt.py +++ b/agents/mlsysops/spade/redis_mgt.py @@ -65,20 +65,20 @@ def connect(self): def push(self, q_name, value): if self.redis_conn: self.redis_conn.rpush(q_name, value) - print(f"'{value}' added to the queue '{q_name}'.") + logger.debug(f"'{value}' added to the queue '{q_name}'.") else: - print("Redis connection not established.") + logger.debug("Redis connection not established.") def pop(self, q_name): if self.redis_conn: value = self.redis_conn.lpop(q_name) if value: - #print(f"'{value.decode()}' removed from the queue '{q_name}'.") + #logger(f"'{value.decode()}' removed from the queue '{q_name}'.") logger.debug(f" Info removed from '{q_name}'.") return value.decode() - print(f"The queue '{q_name}' is empty.") + logger.debug(f"The queue '{q_name}' is empty.") else: - print("Redis connection not established.") + logger.debug("Redis connection not established.") def is_empty(self, q_name): return self.redis_conn.llen(q_name) == 0 if self.redis_conn else True @@ -91,39 +91,39 @@ def empty_queue(self, q_name): def pub_ping(self, message): if self.redis_conn: self.redis_conn.publish(self.channel_name, message) - print(f"'{message}' published to the channel '{self.channel_name}'.") + logger.debug(f"'{message}' published to the channel '{self.channel_name}'.") else: - print("Redis connection not established.") + logger.debug("Redis connection not established.") def subs_ping(self): if self.redis_conn: pubsub = self.redis_conn.pubsub() pubsub.subscribe(self.channel_name) - print(f"Subscribed to the channel '{self.channel_name}'.") + logger.debug(f"Subscribed to the channel '{self.channel_name}'.") for message in pubsub.listen(): if message and message['type'] == 'message': - print(f"Message received: {message['data'].decode()}") + logger.debug(f"Message received: {message['data'].decode()}") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") # --- Dictionary (Hash Map) Methods --- def update_dict_value(self, dict_name, key, value): if self.redis_conn: self.redis_conn.hset(dict_name, key, value) - print(f"Value for key '{key}' updated to '{value}' in dictionary '{dict_name}'.") + logger.debug(f"Value for key '{key}' updated to '{value}' in dictionary '{dict_name}'.") else: - print("Redis connection not established.") + logger.debug("Redis connection not established.") def get_dict_value(self, dict_name, key): if self.redis_conn: value = self.redis_conn.hget(dict_name, key) return value.decode() if value else None - print("Redis connection not established.") + logger.debug("Redis connection not established.") def get_dict(self, dict_name): if self.redis_conn: return {k.decode(): v.decode() for k, v in self.redis_conn.hgetall(dict_name).items()} - print("Redis connection not established.") + logger.debug("Redis connection not established.") def remove_key(self, dict_name, key): return bool(self.redis_conn.hdel(dict_name, key)) if self.redis_conn else False diff --git a/agents/node/MLSNodeAgent.py b/agents/node/MLSNodeAgent.py index 90fc557..2408c4c 100644 --- a/agents/node/MLSNodeAgent.py +++ b/agents/node/MLSNodeAgent.py @@ -25,7 +25,7 @@ class MLSNodeAgent(MLSAgent): def __init__(self): # Initialize base MLS agent class - print("In INIT OF NODE AGENT") + logger.debug("In INIT OF NODE AGENT") super().__init__() # { 'app_name' : { "components" : [component_name] } } @@ -60,7 +60,7 @@ async def run(self): except Exception as e: logger.error(f"Error in running tasks: {e}") - print("MLSAgent stopped.") + logger("MLSAgent stopped.") async def message_queue_listener(self): """ @@ -193,4 +193,4 @@ async def fluidity_proxy_message_listener(self): except Exception as e: logger.error(f"fluidityproxy_message_listener: Error processing msg: {e}") await asyncio.sleep(1) - print(f"MLSAGENT:::: stopping fluidity message listener.... ") + logger(f"MLSAGENT:::: stopping fluidity message listener.... ") diff --git a/agents/node/mechanisms/CPUFrequencyConfigurator.py b/agents/node/mechanisms/CPUFrequencyConfigurator.py index ae1df35..fc36bb5 100644 --- a/agents/node/mechanisms/CPUFrequencyConfigurator.py +++ b/agents/node/mechanisms/CPUFrequencyConfigurator.py @@ -1,4 +1,5 @@ from cpufreq import cpuFreq +from agents.mlsysops.logger_util import logger def initialize(inbound_queue, outbound_queue,agent_state=None): @@ -48,9 +49,9 @@ async def apply(value: dict[str, any]) -> bool: # Set frequency for a specific CPU cpufreq.set_governor("userspace", cpu=value['cpu']) cpufreq.set_frequencies(int(value['frequency']), value['cpu']) - print(f"Frequency successfully set {value}") + logger.info(f"Frequency successfully set {value}") except Exception as e: - print(f"Error setting CPU frequency: {e}") + logger.error(f"Error setting CPU frequency: {e}") finally: reset_to_governor() @@ -101,9 +102,9 @@ def reset_to_governor(governor: str = "ondemand"): cpufreq = cpuFreq() try: cpufreq.set_governor(governor) - print(f"Successfully reset CPU governor to '{governor}'") + logger.info(f"Successfully reset CPU governor to '{governor}'") except Exception as e: - print(f"Error setting governor: {e}") + logger.error(f"Error setting governor: {e}") def set_governor(governor: str, cpu: str = "all"): """ @@ -118,9 +119,9 @@ def set_governor(governor: str, cpu: str = "all"): cpufreq.set_governor(governor) else: cpufreq.set_governor(cpu=cpu, governor=governor) - print(f"Successfully set governor to '{governor}' for {cpu}") + logger.info(f"Successfully set governor to '{governor}' for {cpu}") except Exception as e: - print(f"Error setting governor: {e}") + logger.error(f"Error setting governor: {e}") def set_to_min(cpu: str = "all"): """ @@ -128,7 +129,7 @@ def set_to_min(cpu: str = "all"): """ frequencies = get_cpu_available_frequencies() min_freq = min(frequencies) - print(f"Setting {cpu} to minimum frequency: {min_freq} kHz") + logger.info(f"Setting {cpu} to minimum frequency: {min_freq} kHz") if cpu == "all": set_governor("userspace", cpu="all") cpu.set_frequencies(min_freq) @@ -137,7 +138,7 @@ def set_to_min(cpu: str = "all"): set_governor("userspace", cpu=cpu) cpu.set_frequencies(min_freq, cpu) - print(f"Set {cpu} to minimum frequency: {min_freq} kHz") + logger.info(f"Set {cpu} to minimum frequency: {min_freq} kHz") def set_to_max(cpu: str = "all"): """ @@ -153,4 +154,4 @@ def set_to_max(cpu: str = "all"): set_governor("userspace", cpu=cpu) cpu.set_frequencies(max_freq, cpu) - print(f"Set {cpu} to maximum frequency: {max_freq} kHz") + logger.info(f"Set {cpu} to maximum frequency: {max_freq} kHz") diff --git a/agents/node/mechanisms/fluidity_proxy.py b/agents/node/mechanisms/fluidity_proxy.py index 9f60773..2d5345b 100644 --- a/agents/node/mechanisms/fluidity_proxy.py +++ b/agents/node/mechanisms/fluidity_proxy.py @@ -16,6 +16,7 @@ from mlsysops.events import MessageEvents +from mlsysops.logger_util import logger import asyncio queues = {"inbound": None, "outbound": None} @@ -42,7 +43,7 @@ async def fluidity_proxy_loop(): case "NETWORK_REDIRECT": continue # TODO case _: - print("Unknown event in fluidity proxy") + logger.warning("Unknown event in fluidity proxy") pass return False # async @@ -58,7 +59,7 @@ def initialize(inbound_queue=None, outbound_queue=None, agent_state=None): asyncio.create_task(fluidity_proxy_loop()) async def apply(plan): - print("--------------------------Applying fluidity plan", plan) + logger.info("--------------------------Applying fluidity plan", plan) global node_name # This mechanism uses the messaging interface to send to cluster fluidity await queues['outbound'].put({ diff --git a/agents/tests/cluster_agent/cluster_agent_test.py b/agents/tests/cluster_agent/cluster_agent_test.py index d4b3199..88434cb 100644 --- a/agents/tests/cluster_agent/cluster_agent_test.py +++ b/agents/tests/cluster_agent/cluster_agent_test.py @@ -3,6 +3,7 @@ from subprocess import Popen, PIPE from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler +from agents.mlsysops.logger_util import logger import re import json import pytest @@ -54,7 +55,7 @@ def run_agent(): Runs the cluster/main.py script as a subprocess and ensures the agent starts. Captures stdout/stderr and writes them to both test.log and the console for debugging. """ - print("Starting agent process...") + logger.info("Starting agent process...") env = os.environ.copy() env.update(ENV_VARS) @@ -92,7 +93,7 @@ def parse_test_logs(): parsed_logs = {} # Dictionary to store the parsed results consistent_planuuid = None # To ensure all stages use the same planuid - print("Parsing agent.log for TEST messages...") + logger.info("Parsing agent.log for TEST messages...") try: with open(log_file_path, "r") as logfile: logs = logfile.readlines() @@ -139,29 +140,29 @@ def parse_test_logs(): parsed_logs[path_filename][test_number] = message_data # Write the resulting parsed_logs dictionary to a file as JSON - print(f"Writing parsed log data to {output_file_path}...") + logger.info(f"Writing parsed log data to {output_file_path}...") with open(output_file_path, "w") as outfile: json.dump(parsed_logs, outfile, indent=4) - print("Log parsing completed. Parsed data written to JSON.") + logger.info("Log parsing completed. Parsed data written to JSON.") return parsed_logs except FileNotFoundError: - print(f"Error: Log file {log_file_path} not found.") + logger.error(f"Error: Log file {log_file_path} not found.") except Exception as e: - print(f"An error occurred while processing the logs: {e}") + logger.error(f"An error occurred while processing the logs: {e}") def rename_file(old_filename, new_filename): try: # Rename the file os.rename(old_filename, new_filename) - print(f"File renamed from {old_filename} to {new_filename}.") + logger.info(f"File renamed from {old_filename} to {new_filename}.") except FileNotFoundError: - print(f"Error: {old_filename} not found.") + logger.error(f"Error: {old_filename} not found.") except PermissionError: - print("Error: Permission denied.") + logger.error("Error: Permission denied.") except Exception as e: - print(f"An unexpected error occurred: {e}") + logger.error(f"An unexpected error occurred: {e}") def assert_payloads(subtests, expected_logs, parsed_logs): # Step 3: Validate each stage using subtests @@ -192,7 +193,7 @@ def assert_payloads(subtests, expected_logs, parsed_logs): f"Inconsistent `planuid` values found across stages: {all_planuids}" ) - print("All parsed logs match expected statuses and structure!") + logger.info("All parsed logs match expected statuses and structure!") @pytest.mark.timeout(40) # Fail the test if it exceeds 40 seconds def test_cluster_relocate_plan(subtests): @@ -205,7 +206,7 @@ def test_cluster_relocate_plan(subtests): subtests (SubTests): A SubTests instance for managing multiple subtest contexts within a single test function. """ - print("Setting up environment variables...") + logger.info("Setting up environment variables...") for key, value in ENV_VARS.items(): os.environ[key] = value @@ -224,11 +225,11 @@ def test_cluster_relocate_plan(subtests): try: observer.start() - print(f"Watching marker in {AGENT_LOG_PATH}: {EXPECTED_LOG_MARKER}") + logger.debug(f"Watching marker in {AGENT_LOG_PATH}: {EXPECTED_LOG_MARKER}") if not stop_event.wait(timeout=30): - print("Timeout: Marker not found in logs.") + logger.debug("Timeout: Marker not found in logs.") else: - print(f"Marker found: {EXPECTED_LOG_MARKER}") + logger.debug(f"Marker found: {EXPECTED_LOG_MARKER}") agent_process.terminate() finally: @@ -290,7 +291,7 @@ def test_cluster_relocate_plan(subtests): # subtests (SubTests): A SubTests instance for managing multiple subtest contexts within # a single test function. # """ -# print("Setting up environment variables...") +# logger("Setting up environment variables...") # for key, value in ENV_VARS.items(): # os.environ[key] = value # @@ -309,11 +310,11 @@ def test_cluster_relocate_plan(subtests): # # try: # observer.start() -# print(f"Watching marker in {AGENT_LOG_PATH}: {EXPECTED_LOG_MARKER}") +# logger(f"Watching marker in {AGENT_LOG_PATH}: {EXPECTED_LOG_MARKER}") # if not stop_event.wait(timeout=30): -# print("Timeout: Marker not found in logs.") +# logger("Timeout: Marker not found in logs.") # else: -# print(f"Marker found: {EXPECTED_LOG_MARKER}") +# logger(f"Marker found: {EXPECTED_LOG_MARKER}") # agent_process.terminate() # # finally: diff --git a/agents/tests/create_descriptions.py b/agents/tests/create_descriptions.py index 907b640..53da94c 100644 --- a/agents/tests/create_descriptions.py +++ b/agents/tests/create_descriptions.py @@ -1,5 +1,6 @@ import yaml from jinja2 import Template +from agents.mlsysops.logger_util import logger def render_template(template_file, context): """ @@ -39,7 +40,7 @@ def create_cluster_yaml(input_file, cluster_name): cluster_yaml = render_template("formal_descriptions/user_provided/cluster.yaml.j2", cluster_context) with open(cluster_yaml_filename, 'w') as output_file: output_file.write(cluster_yaml) - print(f"Cluster YAML written to {cluster_yaml_filename}") + logger.info(f"Cluster YAML written to {cluster_yaml_filename}") def create_worker_node_yaml(input_file, cluster_name): @@ -82,7 +83,7 @@ def create_worker_node_yaml(input_file, cluster_name): worker_yaml_filename = f"{worker_name}.yaml" with open(worker_yaml_filename, 'w') as output_file: output_file.write(worker_yaml) - print(f"Worker YAML written to {worker_yaml_filename}") + logger.info(f"Worker YAML written to {worker_yaml_filename}") def create_app_yaml(input_file): @@ -111,7 +112,7 @@ def create_app_yaml(input_file): app_yaml_filename = "app-description.yaml" with open(app_yaml_filename, 'w') as output_file: output_file.write(app_yaml) - print(f"Application YAML written to {app_yaml_filename}") + logger.info(f"Application YAML written to {app_yaml_filename}") def create_continuum_yaml(input_file): @@ -150,7 +151,7 @@ def create_continuum_yaml(input_file): continuum_yaml_content = render_template("formal_descriptions/user_provided/continuum.yaml.j2", continuum_context) with open(continuum_yaml_filename, 'w') as output_file: output_file.write(continuum_yaml_content) - print(f"Continuum YAML written to {continuum_yaml_filename}") + logger.info(f"Continuum YAML written to {continuum_yaml_filename}") def main(): @@ -161,23 +162,23 @@ def main(): for cluster_name in inventory['all']['children']: try: - print(f"Processing cluster: {cluster_name}") + logger.debug(f"Processing cluster: {cluster_name}") create_cluster_yaml(input_file, cluster_name) create_worker_node_yaml(input_file, cluster_name) except ValueError as e: - print(f"Skipping cluster '{cluster_name}': {e}") + logger.error(f"Skipping cluster '{cluster_name}': {e}") # Generate application-level YAML try: create_app_yaml(input_file) except ValueError as e: - print(f"Skipping application YAML generation: {e}") + logger.error(f"Skipping application YAML generation: {e}") # Generate continuum-level YAML try: create_continuum_yaml(input_file) except ValueError as e: - print(f"Skipping continuum YAML generation: {e}") + logger.error(f"Skipping continuum YAML generation: {e}") if __name__ == "__main__": diff --git a/mlconnector/drift_app/app.py b/mlconnector/drift_app/app.py index 46b036a..260c5bc 100644 --- a/mlconnector/drift_app/app.py +++ b/mlconnector/drift_app/app.py @@ -10,6 +10,7 @@ from apscheduler.schedulers.background import BackgroundScheduler import utilities as utl import json +from agents.mlsysops.logger_util import logger load_dotenv(override=True) @@ -139,7 +140,7 @@ def update_features(selected_model): q = "SELECT DISTINCT feature FROM drift_metrics WHERE modelid = %s ORDER BY feature" features = pd.read_sql(q, engine, params=(selected_model,))["feature"].tolist() except Exception as e: - print("Error reading features:", e) + logger.exception("Error reading features:", str(e)) return [], None, "❌ Error loading features", True if not features: @@ -170,7 +171,7 @@ def update_graph(selected_model, selected_feature): """ df = pd.read_sql(q, engine, params=(selected_model, selected_feature)) except Exception as e: - print("Error loading drift data:", e) + logger.error("Error loading drift data:", str(e)) return {}, html.Span("❌ Error loading data.", className="text-danger fw-bold") if df.empty: @@ -216,7 +217,7 @@ def update_table(selected_model): df["timestamp"] = df["timestamp"].astype(str) return df.to_dict("records") except Exception as e: - print("Error loading table data:", e) + logger.error("Error loading table data:", str(e)) return [] def drif_job(): diff --git a/mlconnector/drift_app/manage_s3.py b/mlconnector/drift_app/manage_s3.py index 2479314..551cc04 100644 --- a/mlconnector/drift_app/manage_s3.py +++ b/mlconnector/drift_app/manage_s3.py @@ -4,7 +4,7 @@ from boto3.exceptions import S3UploadFailedError from dotenv import load_dotenv import os -import logging +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -29,18 +29,18 @@ def _ensure_bucket_exists(self): """ try: self.s3_client.head_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' already exists.") + logger.info(f"Bucket '{self.bucket_name}' already exists.") except ClientError as e: # If a 404 error is thrown, then the bucket does not exist. error_code = int(e.response['Error']['Code']) if error_code == 404: try: self.s3_client.create_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' created successfully.") + logger.info(f"Bucket '{self.bucket_name}' created successfully.") except ClientError as ce: - print("Error creating bucket:", ce) + logger.error("Error creating bucket:", ce) else: - print("Error checking bucket:", e) + logger.error("Error checking bucket:", e) def upload_file(self, file_name, object_name=None): """Upload a file to an S3 bucket @@ -59,7 +59,7 @@ def upload_file(self, file_name, object_name=None): data = f.read() self.s3_client.put_object(Bucket=self.bucket_name, Key=object_name, Body=data, ContentLength=len(data)) except ClientError as e: - logging.error(e) + logger.error(e) return False return True @@ -76,9 +76,9 @@ def download_file(self, object_name, download_path): body = response['Body'].read() with open(download_path, 'wb') as f: f.write(body) - print(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") + logger.info(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") except ClientError as e: - print("Error downloading file:", e) + logger.error("Error downloading file:", e) def delete_file(self, object_name): """ @@ -88,9 +88,9 @@ def delete_file(self, object_name): """ try: self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name) - print(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") + logger.info(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") except ClientError as e: - print("Error deleting file:", e) + logger.error("Error deleting file:", e) def list_files(self): """ @@ -100,15 +100,15 @@ def list_files(self): response = self.s3_client.list_objects_v2(Bucket=self.bucket_name) if 'Contents' in response: files = [obj['Key'] for obj in response['Contents']] - print("Files in bucket:") + logger.debug("Files in bucket:") for f in files: - print(" -", f) + logger.debug(" -", f) return files else: - print("No files found in bucket.") + logger.debug("No files found in bucket.") return [] except ClientError as e: - print("Error listing files:", e) + logger.error("Error listing files:", e) return [] # Example usage: diff --git a/mlconnector/src/db/db_setup.py b/mlconnector/src/db/db_setup.py index 38f0465..e47b17c 100644 --- a/mlconnector/src/db/db_setup.py +++ b/mlconnector/src/db/db_setup.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import sessionmaker from dotenv import load_dotenv import os - +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -43,5 +43,5 @@ async def get_db(): await db.commit() # Commit transaction except Exception as e: await db.rollback() # Rollback in case of error - print(f"Error in database transaction: {e}") + logger.error(f"Error in database transaction: {e}") raise diff --git a/mlconnector/src/db/redis_setup.py b/mlconnector/src/db/redis_setup.py index 756848e..fe2a073 100644 --- a/mlconnector/src/db/redis_setup.py +++ b/mlconnector/src/db/redis_setup.py @@ -6,7 +6,7 @@ import os import asyncio_redis #import redis - +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -23,10 +23,10 @@ async def create_redis_connection(): # Ping the Redis server #ping = await redis_client.ping() # Awaiting the ping if await redis_client.ping(): - print(f"Successfully connected to Redis at {os.getenv('REDIS_HOST')}.") + logger.info(f"Successfully connected to Redis at {os.getenv('REDIS_HOST')}.") return redis_client else: raise Exception("Could not connect to Redis.") except Exception as e: - print(f"Redis connection error: {e}") + logger.critical(f"Redis connection error: {e}") raise \ No newline at end of file diff --git a/mlconnector/src/migrations/env.py b/mlconnector/src/migrations/env.py index 2272489..8f883b9 100644 --- a/mlconnector/src/migrations/env.py +++ b/mlconnector/src/migrations/env.py @@ -43,7 +43,7 @@ def run_migrations_offline() -> None: """ url = config.get_main_option("sqlalchemy.url") - # print(url) + # logger(url) context.configure( url=url, diff --git a/mlconnector/src/utils/api/generate_dockerfile.py b/mlconnector/src/utils/api/generate_dockerfile.py index 2da4b3b..5e4babb 100644 --- a/mlconnector/src/utils/api/generate_dockerfile.py +++ b/mlconnector/src/utils/api/generate_dockerfile.py @@ -8,6 +8,7 @@ from io import StringIO from utils.manage_s3 import S3Manager +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -83,13 +84,13 @@ def get_single_explanation(model_id, data): json_response = response.json() return json_response except Exception as json_err: - print("Error parsing JSON response:", json_err) + logger("Error parsing JSON response:", json_err) return None else: - print(f"Error: Received status code {{response.status_code}}. Response content: {{response.text}}") + logger(f"Error: Received status code {{response.status_code}}. Response content: {{response.text}}") return None except Exception as err: - print("Request error occurred:", err) + logger("Request error occurred:", err) return None @@ -106,7 +107,7 @@ async def make_prediction(request: DynamicSchema): current_timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z') try: loaded_model = joblib.load("{model}") - print("Model loaded successfully!") + logger("Model loaded successfully!") if data_source == 0: data_dict = request.data.dict() df = pd.DataFrame([data_dict]) @@ -120,11 +121,11 @@ async def make_prediction(request: DynamicSchema): "timestamp": current_timestamp }} response = requests.post(url, headers=headers, json=data) - print(f"Status Code: {{response.status_code}}") + logger(f"Status Code: {{response.status_code}}") try: - print("Response JSON:", response.json()) + logger("Response JSON:", response.json()) except ValueError: - print("No JSON response returned.") + logger("No JSON response returned.") #if request.explanation: # explanation_res = get_single_explanation(model_id,data_dict) # if explanation_res: @@ -153,7 +154,7 @@ def prepare_model_artifact(s3_manager: S3Manager, model_name: str, download_dir: # download from S3 s3_manager.download_file(object_name=model_name, download_path=local_path) - print(f"Model downloaded to {local_path}") + logger.info(f"Model downloaded to {local_path}") return local_path def merge_requirements_from_dir(req_dir: str) -> list[str]: @@ -228,7 +229,7 @@ def generate_dockerfile(model_id): """ with open("/code/utils/api/Dockerfile", "w") as file: file.write(dockerfile_content) - print("Dockerfile generated successfully!") + logger.info("Dockerfile generated successfully!") @@ -249,30 +250,30 @@ def build_and_push_image(model, registry_url, image_name, registry_username, reg with open("/code/utils/api/main.py", "w") as f: f.write(generated_code) - print("Python file 'main.py' has been created with the provided parameters.") + logger.info("Python file 'main.py' has been created with the provided parameters.") generate_dockerfile(model_id) client = docker.from_env() try: - print("Building Docker image...") + logger.debug("Building Docker image...") image, build_logs = client.images.build(path="/code/utils/api/", tag=image_name) for log in build_logs: - print(log.get("stream", "").strip()) + logger.debug(log.get("stream", "").strip()) except docker.errors.BuildError as e: - print(f"Error building image: {e}") + logger.error(f"Error building image: {e}") return - print("Pushing Docker image...") + logger.info("Pushing Docker image...") #registry_url, image_tag = image_name.split("/", 1) client.login(username=registry_username, password=registry_password, registry=registry_url) try: push_logs = client.images.push(image_name, stream=True, decode=True) for log in push_logs: - print(log) + logger.debug(log) except docker.errors.APIError as e: - print(f"Error pushing image: {e}") + logger.error(f"Error pushing image: {e}") """def generate_json(deployment_id: str, image: str, placement, port: int = 8000): diff --git a/mlconnector/src/utils/generate_train.py b/mlconnector/src/utils/generate_train.py index 9e81843..3f91679 100644 --- a/mlconnector/src/utils/generate_train.py +++ b/mlconnector/src/utils/generate_train.py @@ -2,6 +2,7 @@ import docker import yaml import requests +from agents.mlsysops.logger_util import logger def generate_entry_file(file_list): @@ -88,7 +89,7 @@ def upload_model_file(file_path: str,file_kind: str,model_id: str) -> dict: file_kind="model", model_id="{model_id}" ) - print("Upload response:", result) + logger("Upload response:", result) """ def generate_dockerfile(): @@ -112,7 +113,7 @@ def generate_dockerfile(): """ with open("/code/utils/train/Dockerfile", "w") as file: file.write(dockerfile_content) - print("Dockerfile generated successfully!") + logger.info("Dockerfile generated successfully!") def build_and_push_image(modelid, registry_url, image_name, registry_username, registry_password, training_data, training_code): @@ -130,24 +131,24 @@ def build_and_push_image(modelid, registry_url, image_name, registry_username, r client = docker.from_env() try: - print("Building Docker image...") + logger.debug("Building Docker image...") image, build_logs = client.images.build(path="/code/utils/train/", tag=image_name) for log in build_logs: - print(log.get("stream", "").strip()) + logger.debug(log.get("stream", "").strip()) except docker.errors.BuildError as e: - print(f"Error building image: {e}") + logger.error(f"Error building image: {e}") return - print("Pushing Docker image...") + logger.debug("Pushing Docker image...") #registry_url, image_tag = image_name.split("/", 1) client.login(username=registry_username, password=registry_password, registry=registry_url) try: push_logs = client.images.push(image_name, stream=True, decode=True) for log in push_logs: - print(log) + logger.debug(log) except docker.errors.APIError as e: - print(f"Error pushing image: {e}") + logger.critical(f"Error pushing image: {e}") def generate_yaml( diff --git a/mlconnector/src/utils/get_model.py b/mlconnector/src/utils/get_model.py index b9e744c..89452c4 100644 --- a/mlconnector/src/utils/get_model.py +++ b/mlconnector/src/utils/get_model.py @@ -1,4 +1,5 @@ import requests +from agents.mlsysops.logger_util import logger url = "http:///model/get/" @@ -9,4 +10,4 @@ response = requests.request("GET", url, headers=headers, data=payload) -print(response.text) +logger.info(response.text) diff --git a/mlconnector/src/utils/manage_s3.py b/mlconnector/src/utils/manage_s3.py index 2479314..c0bb65f 100644 --- a/mlconnector/src/utils/manage_s3.py +++ b/mlconnector/src/utils/manage_s3.py @@ -4,7 +4,7 @@ from boto3.exceptions import S3UploadFailedError from dotenv import load_dotenv import os -import logging +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -29,18 +29,18 @@ def _ensure_bucket_exists(self): """ try: self.s3_client.head_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' already exists.") + logger.warning(f"Bucket '{self.bucket_name}' already exists.") except ClientError as e: # If a 404 error is thrown, then the bucket does not exist. error_code = int(e.response['Error']['Code']) if error_code == 404: try: self.s3_client.create_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' created successfully.") + logger.info(f"Bucket '{self.bucket_name}' created successfully.") except ClientError as ce: - print("Error creating bucket:", ce) + logger.info("Error creating bucket:", ce) else: - print("Error checking bucket:", e) + logger.error("Error checking bucket:", e) def upload_file(self, file_name, object_name=None): """Upload a file to an S3 bucket @@ -59,7 +59,7 @@ def upload_file(self, file_name, object_name=None): data = f.read() self.s3_client.put_object(Bucket=self.bucket_name, Key=object_name, Body=data, ContentLength=len(data)) except ClientError as e: - logging.error(e) + logger.error(str(e)) return False return True @@ -76,9 +76,9 @@ def download_file(self, object_name, download_path): body = response['Body'].read() with open(download_path, 'wb') as f: f.write(body) - print(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") + logger.info(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") except ClientError as e: - print("Error downloading file:", e) + logger.error("Error downloading file:", e) def delete_file(self, object_name): """ @@ -88,9 +88,9 @@ def delete_file(self, object_name): """ try: self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name) - print(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") + logger.info(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") except ClientError as e: - print("Error deleting file:", e) + logger.error("Error deleting file:", e) def list_files(self): """ @@ -100,15 +100,15 @@ def list_files(self): response = self.s3_client.list_objects_v2(Bucket=self.bucket_name) if 'Contents' in response: files = [obj['Key'] for obj in response['Contents']] - print("Files in bucket:") + logger.debug("Files in bucket:") for f in files: - print(" -", f) + logger.debug(" -", f) return files else: - print("No files found in bucket.") + logger.debug("No files found in bucket.") return [] except ClientError as e: - print("Error listing files:", e) + logger.error("Error listing files:", e) return [] # Example usage: diff --git a/mlconnector/src/utils/mldeployments.py b/mlconnector/src/utils/mldeployments.py index 138394b..548b0a4 100644 --- a/mlconnector/src/utils/mldeployments.py +++ b/mlconnector/src/utils/mldeployments.py @@ -23,6 +23,9 @@ from textwrap import dedent from utils.manage_s3 import S3Manager from sqlalchemy import update + +from agents.mlsysops.logger_util import logger + #myuuid = uuid.uuid4() load_dotenv(verbose=True, override=True) @@ -56,15 +59,15 @@ async def deploy_ml_application(endpoint, payload): response = requests.post(url, headers=headers, json=payload) response.raise_for_status() except requests.exceptions.RequestException as e: - print(f"Error deploying ML application: {e}") + logger.info(f"Error deploying ML application: {e}") return # On success - print(f"Status Code: {response.status_code}") + logger.info(f"Status Code: {response.status_code}") try: - print("Response JSON:", response.json()) + logger.info("Response JSON:", response.json()) except ValueError: - print("Response Text:", response.text) + logger.error("Response Text:", response.text) def prepare_model_artifact(s3_manager: S3Manager, model_name: str, download_dir: str = "/code/utils/api"): """ @@ -77,7 +80,7 @@ def prepare_model_artifact(s3_manager: S3Manager, model_name: str, download_dir: # download from S3 s3_manager.download_file(object_name=model_name, download_path=local_path) - print(f"Model downloaded to {local_path}") + logger.info(f"Model downloaded to {local_path}") return local_path @@ -117,14 +120,14 @@ async def get_deployment_by_id(db: AsyncSession, deployment_id: str): async def get_deployment_status(db: AsyncSession, deployment_id: str): BASE_URL = os.getenv('NOTHBOUND_API_ENDPOINT') url = f"{BASE_URL}/ml/status/{deployment_id}" - #print(url) + #logger(url) headers = {"Accept": "application/json"} try: resp = requests.get(url, headers=headers) resp.raise_for_status() except requests.exceptions.RequestException as e: - print(f"[ERROR] Status fetch failed: {e}") + logger.error(f"[ERROR] Status fetch failed: {e}") return False try: @@ -145,14 +148,14 @@ async def get_deployment_status(db: AsyncSession, deployment_id: str): async def return_all_deployments(db: AsyncSession): BASE_URL = os.getenv('NOTHBOUND_API_ENDPOINT') url = f"{BASE_URL}/ml/list_all/" - #print(url) + #logger(url) headers = {"Accept": "application/json"} try: resp = requests.get(url, headers=headers) resp.raise_for_status() except requests.exceptions.RequestException as e: - print(f"[ERROR] Status fetch failed: {e}") + logger.error(f"Status fetch failed: {e}") return False try: @@ -184,8 +187,8 @@ async def create_deployment(db: AsyncSession, deployment: MLDeploymentCreate, cr deployment_id = str(uuid.uuid4()) else: deployment_id = deployment.deployment_id - #print(model.featurelist) - #print(file_model) + #logger(model.featurelist) + #logger(file_model) schema_code = "" if(deployment.inference_data==0): schema_code = generate_schema_code(flag=0, feature_list_str=json.dumps((extract_feature_names(model.featurelist)))) @@ -226,7 +229,7 @@ async def create_deployment(db: AsyncSession, deployment: MLDeploymentCreate, cr ) #deployment_json = json.dumps(new_deployment) - #print(str(new_deployment)) + #logger(str(new_deployment)) #con = await create_redis_connection() #await con.rpush(os.getenv("DEPLOYMENT_QUEUE"), [str(deployment_json)]) diff --git a/mlconnector/src/utils/mlmodels.py b/mlconnector/src/utils/mlmodels.py index 63b4c27..902a3f6 100644 --- a/mlconnector/src/utils/mlmodels.py +++ b/mlconnector/src/utils/mlmodels.py @@ -21,6 +21,8 @@ import utils.mldeployments #myuuid = uuid.uuid4() +from agents.mlsysops.logger_util import logger + s3_manager = S3Manager( os.getenv("AWS_S3_BUCKET_DATA"), @@ -40,8 +42,8 @@ def _serialize(obj): async def update_deployments(db: AsyncSession, deployments: List[dict]): count = 1 for row in deployments: - print("Processing deployment: ", count, " of ", len(deployments)) - print("*"*20) + logger.debug("Processing deployment: ", count, " of ", len(deployments)) + logger.debug("*"*20) # Convert the dictionary to a Pydantic model ml_deployment = MLDeploymentCreate( modelid=row['modelid'], @@ -51,7 +53,7 @@ async def update_deployments(db: AsyncSession, deployments: List[dict]): inference_data=0, ) results = await utils.mldeployments.create_deployment(db=db, deployment=ml_deployment, create_new=True) - print("Deployment created: ", results) + logger.info("Deployment created: ", results) count += 1 """# Check if the deployment is already in the database @@ -178,7 +180,7 @@ async def upload_models( return True except Exception as e: - print(f"[ERROR] upload_models failed: {e}") + logger.error(f"upload_models failed: {e}") return False finally: diff --git a/mlconnector/src/utils/mltrainings.py b/mlconnector/src/utils/mltrainings.py index 570b39e..07fefdc 100644 --- a/mlconnector/src/utils/mltrainings.py +++ b/mlconnector/src/utils/mltrainings.py @@ -17,6 +17,7 @@ import json from utils.manage_s3 import S3Manager from dotenv import load_dotenv +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True) @@ -39,7 +40,7 @@ def prepare_file_artifact(s3_manager: S3Manager, file_name: str, download_dir: s # download from S3 s3_manager.download_file(object_name=file_name, download_path=local_path) - print(f"File downloaded to {local_path}") + logger.info(f"File downloaded to {local_path}") return local_path async def get_train_deplyment_id(db: AsyncSession, modelid: str): @@ -62,7 +63,7 @@ async def create_training(db: AsyncSession, mltrain: MLTrainCreate): else: local_code_path = prepare_file_artifact(s3_manager, file_code[0].filename) local_data_path = prepare_file_artifact(s3_manager, file_data[0].filename) - #print(model[0][1].filename) + #logger(model[0][1].filename) image_name = "registry.mlsysops.eu/usecases/augmenta-demo-testbed/"+deployment_id+":0.0.1" build_and_push_image( mltrain.modelid, @@ -86,7 +87,7 @@ async def create_training(db: AsyncSession, mltrain: MLTrainCreate): ) deployment_json = json.dumps(new_deployment) - print(str(deployment_json)) + logger.debug(str(deployment_json)) new_train = MLTraining( diff --git a/mlconnector/xai-server-app/ShapExplainer.py b/mlconnector/xai-server-app/ShapExplainer.py index df366dd..cf5a1b3 100644 --- a/mlconnector/xai-server-app/ShapExplainer.py +++ b/mlconnector/xai-server-app/ShapExplainer.py @@ -5,6 +5,7 @@ import matplotlib.pyplot as plt import io import base64 +from agents.mlsysops.logger_util import logger class ShapExplainer: @@ -15,7 +16,7 @@ def __init__(self, model_path=None, test_data=None): self.model_path = model_path self.model = self.load_model() self.test_data = test_data - print(self.model) + logger.info(self.model) def load_model(self): return joblib.load(self.model_path) diff --git a/mlconnector/xai-server-app/database.py b/mlconnector/xai-server-app/database.py index dfd3d32..2ff6035 100644 --- a/mlconnector/xai-server-app/database.py +++ b/mlconnector/xai-server-app/database.py @@ -9,6 +9,7 @@ from manage_s3 import S3Manager from dotenv import load_dotenv import os +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True,dotenv_path='/.env') manager = S3Manager( @@ -30,7 +31,7 @@ def proccessURL(url:str): global parsed_url, gitlab_host, path_parts, repo_path, branch, file_path parsed_url = urlparse(url) - print(parsed_url) + logger.info(parsed_url) gitlab_host = f"{parsed_url.scheme}://{parsed_url.netloc}" path_parts = parsed_url.path.strip("/").split("/") repo_path = "/".join(path_parts[:2]) @@ -46,10 +47,10 @@ def getProjectID(): projects = response.json() project_id = next((p["id"] for p in projects if p["path_with_namespace"] == repo_path), None) if not project_id: - print(f"Project '{repo_path}' not found. Check the repository name.") + logger.info(f"Project '{repo_path}' not found. Check the repository name.") exit() else: - print(f"Failed to fetch projects: {response.status_code} - {response.text}") + logger.critical(f"Failed to fetch projects: {response.status_code} - {response.text}") exit() return project_id @@ -106,7 +107,7 @@ def getModelDataById(modelId:str): model_file = downloadFile(responseData["trained_model"][0]["modelname"]) return model_file, csv_data, featurs_names else: - print(f"Error: {modelData.status_code}") + logger.error(f"Error: {modelData.status_code}") return None, None, None def getModelByManager(modelId:str): @@ -118,4 +119,4 @@ def getModelByManager(modelId:str): manager.download_file(model_pkl_name,output_pkl_file) return output_pkl_file, pd.read_csv(output_csv_path) -#print(getModelDataById("d11356fc-48c0-43d1-bc27-2723395f1dfe")) \ No newline at end of file +#logger(getModelDataById("d11356fc-48c0-43d1-bc27-2723395f1dfe")) \ No newline at end of file diff --git a/mlconnector/xai-server-app/manage_s3.py b/mlconnector/xai-server-app/manage_s3.py index c6e2a33..407b560 100644 --- a/mlconnector/xai-server-app/manage_s3.py +++ b/mlconnector/xai-server-app/manage_s3.py @@ -5,6 +5,7 @@ from dotenv import load_dotenv import os import logging +from agents.mlsysops.logger_util import logger load_dotenv(verbose=True, override=True,dotenv_path='./param.env') @@ -29,18 +30,18 @@ def _ensure_bucket_exists(self): """ try: self.s3_client.head_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' already exists.") + logger.info(f"Bucket '{self.bucket_name}' already exists.") except ClientError as e: # If a 404 error is thrown, then the bucket does not exist. error_code = int(e.response['Error']['Code']) if error_code == 404: try: self.s3_client.create_bucket(Bucket=self.bucket_name) - print(f"Bucket '{self.bucket_name}' created successfully.") + logger.info(f"Bucket '{self.bucket_name}' created successfully.") except ClientError as ce: - print("Error creating bucket:", ce) + logger.error("Error creating bucket:", ce) else: - print("Error checking bucket:", e) + logger.error("Error checking bucket:", e) def upload_file(self, file_name, object_name=None): """Upload a file to an S3 bucket @@ -76,9 +77,9 @@ def download_file(self, object_name, download_path): body = response['Body'].read() with open(download_path, 'wb') as f: f.write(body) - print(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") + logger.info(f"File '{object_name}' downloaded from bucket '{self.bucket_name}' to '{download_path}'.") except ClientError as e: - print("Error downloading file:", e) + logger.info("Error downloading file:", e) def delete_file(self, object_name): """ @@ -88,9 +89,9 @@ def delete_file(self, object_name): """ try: self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name) - print(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") + logger.info(f"File '{object_name}' deleted from bucket '{self.bucket_name}'.") except ClientError as e: - print("Error deleting file:", e) + logger.error("Error deleting file:", e) def list_files(self): """ @@ -100,15 +101,15 @@ def list_files(self): response = self.s3_client.list_objects_v2(Bucket=self.bucket_name) if 'Contents' in response: files = [obj['Key'] for obj in response['Contents']] - print("Files in bucket:") + logger.info("Files in bucket:") for f in files: - print(" -", f) + logger.debug(" -", f) return files else: - print("No files found in bucket.") + logger.warning("No files found in bucket.") return [] except ClientError as e: - print("Error listing files:", e) + logger.error("Error listing files:", e) return [] # Example usage: diff --git a/mlconnector/xai-server-app/server.py b/mlconnector/xai-server-app/server.py index 187a8ee..83f95c7 100644 --- a/mlconnector/xai-server-app/server.py +++ b/mlconnector/xai-server-app/server.py @@ -5,6 +5,7 @@ from database import getModelByManager, getModelDataById from ShapExplainer import ShapExplainer # Assuming your class is in shap_explainer.py from typing import Optional +from agents.mlsysops.logger_util import logger app = FastAPI() @@ -67,7 +68,7 @@ class InitFromRepoRequest(BaseModel): # for v in ["local_time", "download_time_ms"]: # if v in test_data.keys(): # test_data = test_data.head(1000).drop(v, axis=1) -# print("-I- Data Downloaded Successfully") +# logger("-I- Data Downloaded Successfully") # models[request.model_id] = {"shap_explainer_instance":None, "test_data":test_data, "status":"Processing"} # shap_explainer_instance = ShapExplainer(model_path=model_data, test_data=test_data) # if not request.wait_for_trining: @@ -124,7 +125,7 @@ def initFromManager(request: InitFromRepoRequest): for v in ["local_time", "download_time_ms"]: if v in test_data.keys(): test_data = test_data.head(1000).drop(v, axis=1) - print("-I- Data Downloaded Successfully") + logger.info("-I- Data Downloaded Successfully") models[request.model_id] = {"shap_explainer_instance":None, "test_data":test_data, "status":"Processing"} shap_explainer_instance = ShapExplainer(model_path=model_data, test_data=test_data) if not request.wait_for_trining: diff --git a/mlsysops-cli/mlsysops_cli/cli.py b/mlsysops-cli/mlsysops_cli/cli.py index ee70ac8..6853979 100644 --- a/mlsysops-cli/mlsysops_cli/cli.py +++ b/mlsysops-cli/mlsysops_cli/cli.py @@ -16,6 +16,7 @@ from mlsysops_cli.deployment.descriptions_util import create_app_yaml from mlsysops_cli.deployment.deploy import KubernetesLibrary +from agents.mlsysops.logger_util import logger # Configurable IP and PORT via environment variables IP = os.getenv("MLS_API_IP", "127.0.0.1") @@ -56,7 +57,7 @@ def deploy_app(path, uri): if response.status_code == 200: click.secho("DESCRIPTION UPLOADED SUCCESSFULLY!", fg='green') - print(response.text) + logger.info(response.text) else: click.secho(f"ERROR: {response.json().get('detail', 'Unknown error')}", fg='red') except Exception as e: @@ -216,7 +217,7 @@ def remove_app(app_id): data=json.dumps({'app_id': app_id}), headers={'Content-Type': 'application/json'} ) - print(response.json()) + logger.debug(response.json()) if response.status_code == 200: responses = response.json() click.echo(click.style(f"AppID:{responses['app_id']} status updated to 'To_be_removed'.", fg='bright_blue')) @@ -296,7 +297,7 @@ def deploy_ml(path, uri): if response.status_code == 200: click.secho("ML MODEL UPLOADED SUCCESSFULLY!", fg='green') - print(response.json()) + logger.debug(response.json()) else: click.secho(f"ERROR: {response.json().get('detail', 'Unknown error')}", fg='red') except Exception as e: diff --git a/mlsysops-cli/mlsysops_cli/deployment/create_descriptions.py b/mlsysops-cli/mlsysops_cli/deployment/create_descriptions.py index b135adf..fba31e5 100644 --- a/mlsysops-cli/mlsysops_cli/deployment/create_descriptions.py +++ b/mlsysops-cli/mlsysops_cli/deployment/create_descriptions.py @@ -1,5 +1,6 @@ import yaml from jinja2 import Template +from agents.mlsysops.logger_util import logger def render_template(template_file, context): """ @@ -39,7 +40,7 @@ def create_cluster_yaml(input_file, cluster_name): cluster_yaml = render_template("mlsysops_cli/templates/cluster.yaml.j2", cluster_context) with open(cluster_yaml_filename, 'w') as output_file: output_file.write(cluster_yaml) - print(f"Cluster YAML written to {cluster_yaml_filename}") + logger.info(f"Cluster YAML written to {cluster_yaml_filename}") def create_worker_node_yaml(input_file, cluster_name): @@ -82,7 +83,7 @@ def create_worker_node_yaml(input_file, cluster_name): worker_yaml_filename = f"{worker_name}.yaml" with open(worker_yaml_filename, 'w') as output_file: output_file.write(worker_yaml) - print(f"Worker YAML written to {worker_yaml_filename}") + logger.info(f"Worker YAML written to {worker_yaml_filename}") def create_app_yaml(input_file, cluster_name): @@ -117,7 +118,7 @@ def create_app_yaml(input_file, cluster_name): app_yaml_filename = "app-description.yaml" with open(app_yaml_filename, 'w') as output_file: output_file.write(app_yaml) - print(f"Application YAML written to {app_yaml_filename}") + logger.info(f"Application YAML written to {app_yaml_filename}") def create_continuum_yaml(input_file): @@ -156,7 +157,7 @@ def create_continuum_yaml(input_file): continuum_yaml_content = render_template("mlsysops_cli/templates/continuum.yaml.j2", continuum_context) with open(continuum_yaml_filename, 'w') as output_file: output_file.write(continuum_yaml_content) - print(f"Continuum YAML written to {continuum_yaml_filename}") + logger.info(f"Continuum YAML written to {continuum_yaml_filename}") def main(): @@ -167,24 +168,24 @@ def main(): for cluster_name in inventory['all']['children']: try: - print(f"Processing cluster: {cluster_name}") + logger.info(f"Processing cluster: {cluster_name}") create_cluster_yaml(input_file, cluster_name) create_worker_node_yaml(input_file, cluster_name) except ValueError as e: - print(f"Skipping cluster '{cluster_name}': {e}") + logger.warning(f"Skipping cluster '{cluster_name}': {e}") for cluster_name in inventory['all']['children']: # Generate application-level YAML try: create_app_yaml(input_file,cluster_name) except ValueError as e: - print(f"Skipping application YAML generation: {e}") + logger.warning(f"Skipping application YAML generation: {e}") # Generate continuum-level YAML try: create_continuum_yaml(input_file) except ValueError as e: - print(f"Skipping continuum YAML generation: {e}") + logger.warning(f"Skipping continuum YAML generation: {e}") if __name__ == "__main__": diff --git a/mlsysops-cli/mlsysops_cli/deployment/deploy.py b/mlsysops-cli/mlsysops_cli/deployment/deploy.py index c605aa9..a31cb9c 100644 --- a/mlsysops-cli/mlsysops_cli/deployment/deploy.py +++ b/mlsysops-cli/mlsysops_cli/deployment/deploy.py @@ -13,6 +13,7 @@ import subprocess from mlsysops_cli import deployment from mlsysops_cli.deployment.descriptions_util import create_cluster_yaml, create_worker_node_yaml,create_continuum_yaml +from agents.mlsysops.logger_util import logger def parse_yaml_from_file(path_obj: Path, template_variables: dict = {}) -> list | None: @@ -33,7 +34,7 @@ def parse_yaml_from_file(path_obj: Path, template_variables: dict = {}) -> list yaml = YAML(typ='safe') if not path_obj.exists(): - print(f"❌ File does not exist: {path_obj}") + logger.error(f"❌ File does not exist: {path_obj}") return None raw_template = path_obj.read_text(encoding="utf-8") @@ -42,7 +43,7 @@ def parse_yaml_from_file(path_obj: Path, template_variables: dict = {}) -> list resources = list(yaml.load_all(rendered_template)) if not resources: - print(f"⚠️ No resources found in file: {path_obj}") + logger.error(f"⚠️ No resources found in file: {path_obj}") return None resource_order = [ @@ -61,7 +62,7 @@ def get_method(kind, operation): """ Retrieves the method corresponding to a Kubernetes resource kind and operation. This function maps a given resource kind (e.g., 'service', 'secret', 'deployment') and an operation (e.g., 'read', ' - print(description_directory', + logger(description_directory', 'delete', 'replace') to the appropriate method provided by the Kubernetes Python client library. It ensures that only supported kinds and operations are used. @@ -178,16 +179,16 @@ def __init__(self, group=None, version=None, kubeconfig=None, context=None): # Load Kubernetes configuration for the specified environment and context if kubeconfig: - print(f"Loading kubeconfig from {kubeconfig}, context: {context}") + logger.info(f"Loading kubeconfig from {kubeconfig}, context: {context}") config.load_kube_config(config_file=kubeconfig, context=context) elif 'KUBERNETES_PORT' in os.environ: - print("Loading in-cluster Kubernetes configuration") + logger.info("Loading in-cluster Kubernetes configuration") config.load_incluster_config() else: - print(f"Loading default kubeconfig, context: {context}") + logger.info(f"Loading default kubeconfig, context: {context}") config.load_kube_config(context=context) - print(f"Kubernetes configuration loaded successfully") + logger.info(f"Kubernetes configuration loaded successfully") self.config = config self.kubeconfig = kubeconfig self.group = group @@ -208,7 +209,7 @@ def create_custom_object(self, yaml_content): body=yaml_content, ) except ApiException as e: - print(f"Failed to apply kind '{yaml_content['kind']}' to Kubernetes API: {e}") + logger.error(f"Failed to apply kind '{yaml_content['kind']}' to Kubernetes API: {e}") def update_custom_object(self, name, yaml_content): kind = yaml_content["kind"] @@ -223,7 +224,7 @@ def update_custom_object(self, name, yaml_content): body=yaml_content, ) except ApiException as e: - print(f"Failed to apply kind '{yaml_content['kind']}' to Kuberentes API: {e}") + logger.error(f"Failed to apply kind '{yaml_content['kind']}' to Kuberentes API: {e}") def create_or_update(self, resource_yaml): @@ -232,7 +233,7 @@ def create_or_update(self, resource_yaml): kind = resource_yaml["kind"].lower() name = resource_yaml["metadata"].get("name", "None") namespace = resource_yaml["metadata"].get("namespace") - print(f"Creating/Updating resource: {name} of kind {kind} in namespace {namespace} ") + logger.info(f"Creating/Updating resource: {name} of kind {kind} in namespace {namespace} ") if namespace is not None: existing_resource = get_method(kind, "read")(name, namespace=namespace) get_method(kind, "replace")(name=name, namespace=namespace, body=resource_yaml) @@ -241,13 +242,13 @@ def create_or_update(self, resource_yaml): existing_resource = get_method(kind, "read")(name) get_method(kind, "replace")(name=name, body=resource_yaml) - print(f"Updated resource: {name}") + logger.info(f"Updated resource: {name}") except KeyError as e: - print(f"Error parsing resource: {e}") + logger.error(f"Error parsing resource: {e}") return except client.exceptions.ApiException as e: if e.status == 404: - print(f"Resource '{name}' of kind '{kind}' not found. Creating it now. {namespace}") + logger.error(f"Resource '{name}' of kind '{kind}' not found. Creating it now. {namespace}") if namespace is not None: if kind in ['serviceaccount', 'configmap', 'daemonset', "deployment", "service", "persistentvolumeclaim"]: @@ -258,7 +259,7 @@ def create_or_update(self, resource_yaml): else: get_method(kind, "create")(body=resource_yaml) else: - print(f"Error updating Service '{name}' in namespace '{namespace}': {e}") + logger.error(f"Error updating Service '{name}' in namespace '{namespace}': {e}") def dump_context_config(self,full_config, context_name): # Validate the context exists @@ -302,12 +303,12 @@ def create_karmada_api_configmap(self, namespace, name): try: self.core_v1_api.create_namespaced_config_map(namespace, config_map) - print(f"✅ Created Karmada API kubeconfig configmap {name}") + logger.info(f"✅ Created Karmada API kubeconfig configmap {name}") except ApiException as e: if e.status != 409: #self.core_v1_api.delete_namespaced_config_map(name,namespace) self.core_v1_api.replace_namespaced_config_map(name, namespace, config_map) - print(f"♻️ Updated configmap Karamda API kubeconfig {name}") + logger.info(f"♻️ Updated configmap Karamda API kubeconfig {name}") def create_configmap_from_file(self, descriptions_directory, namespace, name, suffixes=["*.yml", "*.yaml"], key_name = ""): """ @@ -319,12 +320,12 @@ def create_configmap_from_file(self, descriptions_directory, namespace, name, su try: for suffix in suffixes: for file in directory.glob(suffix): - print(f"Reading file: {file.name}") + logger.debug(f"Reading file: {file.name}") file_data = file.read_text() key_name = file.name files_data_object[key_name] = file_data except Exception as e: - print(f"Error reading from {descriptions_directory}: {e}") + logger.error(f"Error reading from {descriptions_directory}: {e}") return config_map = client.V1ConfigMap( @@ -334,11 +335,11 @@ def create_configmap_from_file(self, descriptions_directory, namespace, name, su try: self.core_v1_api.create_namespaced_config_map(namespace, config_map) - print(f"✅ Created configmap {name}") + logger.info(f"✅ Created configmap {name}") except ApiException as e: if e.status != 409: self.core_v1_api.replace_namespaced_config_map(name, namespace, config_map) - print(f"♻️ Updated configmap {name}") + logger.info(f"♻️ Updated configmap {name}") def get_karmada_clusters(self): """ @@ -372,7 +373,7 @@ def get_karmada_clusters(self): return return_object except Exception as e: - print(f"Error retrieving clusters: {e}") + logger.error(f"Error retrieving clusters: {e}") return [] def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, namespaced: bool = False, @@ -394,7 +395,7 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, group = "policy.karmada.io" version = "v1alpha1" - print( + logger.info( f"Applying resource '{policy_name}' with group: {group}, version: {version}, plural: {plural}, namespaced: {namespaced}" ) @@ -424,7 +425,7 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, resource_version = current_resource["metadata"]["resourceVersion"] policy_body["metadata"]["resourceVersion"] = resource_version - print(f"Resource '{policy_name}' exists. Updating it...") + logger.info(f"Resource '{policy_name}' exists. Updating it...") # Perform an update using replace if namespaced: @@ -444,12 +445,12 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, name=policy_name, body=policy_body ) - print(f"Resource '{policy_name}' updated successfully.") + logger.info(f"Resource '{policy_name}' updated successfully.") except ApiException as e: if e.status == 404: # If the resource doesn't exist, create a new one - print(f"Resource '{policy_name}' not found. Creating a new one...") + logger.warning(f"Resource '{policy_name}' not found. Creating a new one...") # Create the new resource if namespaced: @@ -467,12 +468,12 @@ def apply_karmada_policy(self, policy_name: str, policy_body: dict, plural: str, plural=plural, body=policy_body ) - print(f"New resource '{policy_name}' created successfully.") + logger.info(f"New resource '{policy_name}' created successfully.") else: raise # Re-raise any non-404 exceptions except Exception as e: - print(f"Error applying resource '{policy_name}': {e}") + logger.error(f"Error applying resource '{policy_name}': {e}") def apply_mlsysops_propagation_policies(self): """ @@ -482,7 +483,7 @@ def apply_mlsysops_propagation_policies(self): # Extract cluster names where the cluster status is True (ready) cluster_names = [name for name, status in self.get_karmada_clusters().items() if status.lower() == 'true'] - print(f"Applying PropagationPolicy with cluster names: {cluster_names}") + logger.info(f"Applying PropagationPolicy with cluster names: {cluster_names}") # Correctly load template path using importlib.resources templates_path = str(files(deployment)) @@ -503,10 +504,10 @@ def apply_mlsysops_propagation_policies(self): plural="clusterpropagationpolicies", namespaced=False, ) - print(f"✅ Cluster-Wide PropagationPolicy applied.") + logger.info(f"✅ Cluster-Wide PropagationPolicy applied.") except Exception as e: - print(f"❌ Error applying Cluster-Wide PropagationPolicy: {e}") + logger.error(f"❌ Error applying Cluster-Wide PropagationPolicy: {e}") # Apply Simple PropagationPolicy try: @@ -524,13 +525,13 @@ def apply_mlsysops_propagation_policies(self): namespaced=True, namespace="default" ) - print(f"✅ Simple PropagationPolicy applied.") + logger.info(f"✅ Simple PropagationPolicy applied.") except Exception as e: - print(f"❌ Error applying Simple PropagationPolicy: {e}") + logger.error(f"❌ Error applying Simple PropagationPolicy: {e}") except Exception as e: - print(f"❌ Error applying PropagationPolicies: {e}") + logger.error(f"❌ Error applying PropagationPolicies: {e}") def annotate_pod(self): path = "/apis/search.karmada.io/v1alpha1/proxying/karmada/proxy/api/v1/namespaces/mlsysops-framework/pods" @@ -546,7 +547,7 @@ def annotate_pod(self): pod_name = pod['metadata']['name'] pod_cluster = pod['metadata']['annotations']['resource.karmada.io/cached-from-cluster'] if pod_name.startswith("mlsysops-cluster-agent"): - print(f"Updating pod {pod_name} from cluster {pod_cluster}") + logger.info(f"Updating pod {pod_name} from cluster {pod_cluster}") pod_path = f"/apis/cluster.karmada.io/v1alpha1/clusters/{pod_cluster}/proxy/api/v1/namespaces/mlsysops-framework/pods/{pod_name}" annotation_patch = { @@ -584,9 +585,9 @@ def update_configmap_data(self, namespace: str, configmap_name: str, key: str, c self.annotate_pod() - print(f"ConfigMap '{configmap_name}' updated successfully.") + logger.info(f"ConfigMap '{configmap_name}' updated successfully.") except Exception as e: - print(f"Failed to update ConfigMap '{configmap_name}': {e}") + logger.error(f"Failed to update ConfigMap '{configmap_name}': {e}") raise @@ -597,18 +598,18 @@ def _check_required_env_vars(*required_vars): def run_deploy_all(path, inventory_path): try: - print("🚀 Deploying all MLSysOps components...") + logger.info("🚀 Deploying all MLSysOps components...") deploy_core_services() deploy_continuum_agents(path, inventory_path) deploy_cluster_agents(path, inventory_path) deploy_node_agents(path, inventory_path) - print("✅ All components deployed successfully.") + logger.info("✅ All components deployed successfully.") except Exception as e: - print(f"❌ Error in deploy_all: {e}") + logger.error(f"❌ Error in deploy_all: {e}") raise def deploy_core_services(): - print("🔧 Deploying core services (ejabberd, redis, API service)...") + logger.info("🔧 Deploying core services (ejabberd, redis, API service)...") _check_required_env_vars("KARMADA_HOST_IP", "KUBECONFIG") client_k8s = KubernetesLibrary("apps", "v1", os.getenv("KUBECONFIG", "/etc/rancher/k3s/k3s.yaml"), context="karmada-host") @@ -627,7 +628,7 @@ def deploy_core_services(): client_k8s.create_or_update(r) def deploy_continuum_agents(path, inventory_path): - print("🧠 Deploying Continuum Agent...") + logger.info("🧠 Deploying Continuum Agent...") _check_required_env_vars("KARMADA_HOST_IP", "KUBECONFIG") client_k8s = KubernetesLibrary("apps", "v1", os.getenv("KUBECONFIG", "/etc/rancher/k3s/k3s.yaml"), context="karmada-host") _apply_namespace_and_rbac(client_k8s) @@ -660,7 +661,7 @@ def deploy_continuum_agents(path, inventory_path): client_k8s.create_or_update(r) def deploy_cluster_agents(path, inventory_path): - print("🏢 Deploying Cluster Agents...") + logger.info("🏢 Deploying Cluster Agents...") _check_required_env_vars("KARMADA_HOST_IP", "KUBECONFIG") client_karmada = KubernetesLibrary("apps", "v1", os.getenv("KUBECONFIG", "/etc/rancher/k3s/k3s.yaml"), context="karmada-apiserver") @@ -685,10 +686,10 @@ def deploy_cluster_agents(path, inventory_path): for cluster_name in inventory['all']['children']: try: - print(f"Processing cluster: {cluster_name}") + logger.info(f"Processing cluster: {cluster_name}") create_cluster_yaml(inventory_path, cluster_name, descriptions_path) except ValueError as e: - print(f"Skipping cluster '{cluster_name}': {e}") + logger.error(f"Skipping cluster '{cluster_name}': {e}") # ConfigMap client_karmada.create_configmap_from_file(descriptions_path, "mlsysops-framework", "cluster-system-description") @@ -699,7 +700,7 @@ def deploy_cluster_agents(path, inventory_path): client_karmada.create_or_update(r) def deploy_node_agents(path, inventory_path): - print("🧱 Deploying Node Agents...") + logger.info("🧱 Deploying Node Agents...") _check_required_env_vars("KARMADA_HOST_IP", "KUBECONFIG") client_karmada = KubernetesLibrary("apps", "v1", os.getenv("KUBECONFIG", "/etc/rancher/k3s/k3s.yaml") ,context="karmada-apiserver") @@ -722,13 +723,13 @@ def deploy_node_agents(path, inventory_path): for cluster_name in inventory['all']['children']: try: - print(f"Processing cluster: {cluster_name}") + logger.info(f"Processing cluster: {cluster_name}") create_worker_node_yaml(inventory_path, cluster_name,descriptions_path) except ValueError as e: - print(f"Skipping cluster '{cluster_name}': {e}") + logger.error(f"Skipping cluster '{cluster_name}': {e}") # ConfigMap - print(f"Using node systems decriptions from {descriptions_path}") + logger.info(f"Using node systems decriptions from {descriptions_path}") client_karmada.create_configmap_from_file(descriptions_path, "mlsysops-framework", "node-system-descriptions") # DaemonSet YAML diff --git a/mlsysops-cli/mlsysops_cli/deployment/descriptions_util.py b/mlsysops-cli/mlsysops_cli/deployment/descriptions_util.py index a13a4af..fb9fe0a 100644 --- a/mlsysops-cli/mlsysops_cli/deployment/descriptions_util.py +++ b/mlsysops-cli/mlsysops_cli/deployment/descriptions_util.py @@ -2,6 +2,7 @@ import yaml from jinja2 import Template, Environment, PackageLoader, select_autoescape +from agents.mlsysops.logger_util import logger def render_template(template_file, context): @@ -52,7 +53,7 @@ def create_cluster_yaml(input_file, cluster_name, descriptions_path=""): cluster_yaml = template.render(cluster_context) with open(cluster_yaml_filename, 'w') as output_file: output_file.write(cluster_yaml) - print(f"Cluster YAML written to {cluster_yaml_filename}") + logger.info(f"Cluster YAML written to {cluster_yaml_filename}") def create_worker_node_yaml(input_file, cluster_name, descriptions_path=""): @@ -104,7 +105,7 @@ def create_worker_node_yaml(input_file, cluster_name, descriptions_path=""): worker_yaml_filename = os.path.join(descriptions_path, f"{worker_name}.yaml") with open(worker_yaml_filename, 'w') as output_file: output_file.write(worker_yaml) - print(f"Worker YAML written to {worker_yaml_filename}") + logger.info(f"Worker YAML written to {worker_yaml_filename}") def create_app_yaml(input_file,cluster_name=None): @@ -155,7 +156,7 @@ def create_app_yaml(input_file,cluster_name=None): app_yaml_filename = "mlsysops-test-app-description.yaml" with open(app_yaml_filename, 'w') as output_file: output_file.write(app_yaml) - print(f"Application YAML written to {app_yaml_filename}") + logger.info(f"Application YAML written to {app_yaml_filename}") def create_continuum_yaml(input_file, descriptions_path=""): @@ -203,4 +204,4 @@ def create_continuum_yaml(input_file, descriptions_path=""): continuum_yaml_content = template.render(continuum_context) with open(continuum_yaml_filename, 'w') as output_file: output_file.write(continuum_yaml_content) - print(f"Continuum YAML written to {continuum_yaml_filename}") \ No newline at end of file + logger.info(f"Continuum YAML written to {continuum_yaml_filename}") \ No newline at end of file diff --git a/northbound-api/MLSysOps_Schemas/schema generator/generate_model.py b/northbound-api/MLSysOps_Schemas/schema generator/generate_model.py index 39fdad7..39e23ec 100644 --- a/northbound-api/MLSysOps_Schemas/schema generator/generate_model.py +++ b/northbound-api/MLSysOps_Schemas/schema generator/generate_model.py @@ -7,6 +7,7 @@ import json import subprocess from jsonschema import validate, ValidationError +from agents.mlsysops.logger_util import logger def find_single_crd_yaml(crd_dir: str): """ @@ -20,10 +21,10 @@ def find_single_crd_yaml(crd_dir: str): matches.extend(glob.glob(p)) if not matches: - print(f"[✗] No .yaml/.yml file found under '{crd_dir}'.") + logger.info(f"[✗] No .yaml/.yml file found under '{crd_dir}'.") sys.exit(1) if len(matches) > 1: - print(f"[!] Multiple CRD files found under '{crd_dir}'. Using the first:\n {matches[0]}") + logger.info(f"[!] Multiple CRD files found under '{crd_dir}'. Using the first:\n {matches[0]}") return matches[0] def convert_yaml_crd_to_json(yaml_file: str, json_file: str): @@ -83,10 +84,10 @@ def convert_yaml_crd_to_json(yaml_file: str, json_file: str): with open(json_file, 'w') as f: json.dump(full_schema, f, indent=4) - print(f"[✓] JSON Schema written to: {json_file}") + logger.info(f"[✓] JSON Schema written to: {json_file}") except Exception as e: - print(f"[✗] Error converting YAML→JSON: {e}") + logger.error(f"[✗] Error converting YAML→JSON: {e}") sys.exit(1) def run_datamodel_codegen(json_schema_file: str, output_model_file: str): @@ -100,14 +101,14 @@ def run_datamodel_codegen(json_schema_file: str, output_model_file: str): "--output", output_model_file ] try: - print(f"[>] Running: {' '.join(cmd)}") + logger.info(f"[>] Running: {' '.join(cmd)}") subprocess.check_call(cmd) - print(f"[✓] Model written to: {output_model_file}") + logger.info(f"[✓] Model written to: {output_model_file}") except FileNotFoundError: - print("[✗] 'datamodel-codegen' not found. Please install it (pip install datamodel-code-generator).") + logger.error("[✗] 'datamodel-codegen' not found. Please install it (pip install datamodel-code-generator).") sys.exit(1) except subprocess.CalledProcessError as e: - print(f"[✗] datamodel-codegen failed: {e}") + logger.error(f"[✗] datamodel-codegen failed: {e}") sys.exit(1) if __name__ == "__main__": @@ -138,5 +139,5 @@ def run_datamodel_codegen(json_schema_file: str, output_model_file: str): except OSError: pass - print("\nDone. Your Pydantic model is here:") - print(f" {model_py_path}") + logger.info("\nDone. Your Pydantic model is here:") + logger.info(f" {model_py_path}") diff --git a/northbound-api/endpoints/applications.py b/northbound-api/endpoints/applications.py index 177c72b..9e0a091 100644 --- a/northbound-api/endpoints/applications.py +++ b/northbound-api/endpoints/applications.py @@ -14,9 +14,10 @@ from MLSysOps_Schemas.mlsysops_model import MlsysopsappSchema, Component from redis_setup import redis_mgt as rm # Your RedisManager class +from agents.mlsysops.logger_util import logger router = APIRouter() -logger = logging.getLogger(__name__) + os.environ["LOCAL_OTEL_ENDPOINT"] = "http://172.25.27.4:9464/metrics" os.environ["TELEMETRY_ENDPOINT"] = "172.25.27.4:4317" @@ -41,7 +42,7 @@ def get_pod_info(comp_name, model_id, api_client): response_type="json", _preload_content=False ) pods = json.loads(response[0].data.decode("utf-8")) - print(pods) + logger.debug(pods) except ApiException as exc: logger.error(f"Failed to fetch pods: {exc}") @@ -196,7 +197,7 @@ def store_qos_metrics(request:Request,app_id, app_description): redis_mgr.update_dict_value("component_metrics", redis_key, metric_id) except: - print(f"Error updating metrics in redis : ") + logger.error(f"Error updating metrics in redis : ") async def check_deployment_status(comp_name,app_id): @@ -366,7 +367,7 @@ async def get_app_status(request: Request,app_id: str): try: # Fetch the value of the given app_id from Redis app_status = redis_mgr.get_dict_value('system_app_hash', app_id) - print(app_status) + logger.debug(app_status) if app_status is None: # If the app_id doesn't exist in Redis, return a 404 error raise HTTPException(status_code=404, detail=f"App ID '{app_id}' not found in the system.") @@ -400,11 +401,11 @@ async def get_app_details(request: Request,app_id: str): if not components: raise Exception(f"No components found for application '{app_id}'.") - print(components) - print("--------------------") + logger.debug(components) + logger.debug("--------------------") pods_info = get_pods_from_kubeconfigs() - print(pods_info) + logger.debug(pods_info) component_details = [] for component in components: @@ -438,7 +439,7 @@ async def get_app_performance(request: Request,app_id: str): :return: A list of tuples [(metric_name, metric_value), ...] """ redis_mgr: rm.RedisManager = request.app.state.redis - print("Returning the app mQoS metric for", app_id) + logger.info("Returning the app mQoS metric for", app_id) if not redis_mgr.redis_conn: return {"error": "Redis connection not established"} @@ -462,7 +463,7 @@ async def get_app_performance(request: Request,app_id: str): if metric_value: metric_name = metric_value.decode("utf-8") # Decode Redis stored value metric_name = str(metric_name).lower() - print(metric_name) + logger.info(metric_name) return results # Returns a list of tuples [(metric_name, metric_value), ...] diff --git a/northbound-api/endpoints/infrastructure.py b/northbound-api/endpoints/infrastructure.py index dc9b57c..9c8bbff 100644 --- a/northbound-api/endpoints/infrastructure.py +++ b/northbound-api/endpoints/infrastructure.py @@ -6,6 +6,7 @@ from jsonschema import validate, ValidationError from MLSysOps_Schemas.mlsysops_schemas import node_schema, cluster_schema, datacenter_schema, continuum_schema from redis_setup import redis_mgt as rm +from agents.mlsysops.logger_util import logger # JSON schema with enum validation for the city # Update the required fields in the JSON schema @@ -56,15 +57,15 @@ def validate_infrastructure_file(json_data): async def deploy_infra(request: Request): try: data = await request.json() - #print(data) + #logger(data) except json.JSONDecodeError: - print("error") + logger.error("error") return HTTPException(status_code=400, detail="Invalid JSON payload") if 'uri' in data: # Retrieve and process the application configuration from the URI uri = data['uri'] - print(f"The path uri received is {uri}") + logger.error(f"The path uri received is {uri}") try: response = requests.get(uri) @@ -80,7 +81,7 @@ async def deploy_infra(request: Request): validation_error = validate_infrastructure_file(json_data) if validation_error is None: - print("Now execute the validation with the actual infrastructure and save in a datastructure") + logger.info("Now execute the validation with the actual infrastructure and save in a datastructure") else: raise HTTPException(status_code=400, detail=validation_error) @@ -93,7 +94,7 @@ async def deploy_infra(request: Request): @router.get("/list/", tags=["Infra"]) async def list_infra(id_type: str, id_value: str): - print(f"requested info : {id_type}, with id {id_value}") + logger.info(f"requested info : {id_type}, with id {id_value}") @@ -107,7 +108,7 @@ async def list_infra(id_type: str, id_value: str): # POST a new product @router.get("/node/{node_id}", tags=["Infra"]) async def get_node_state(app_id: str): - print("return the app mQoS metric of ", app_id) + logger.info("return the app mQoS metric of ", app_id) return (json.dumps(app_id)) diff --git a/northbound-api/endpoints/management.py b/northbound-api/endpoints/management.py index 50801f4..048ebfc 100644 --- a/northbound-api/endpoints/management.py +++ b/northbound-api/endpoints/management.py @@ -1,12 +1,13 @@ +import time import asyncio -from fastapi import FastAPI, APIRouter, HTTPException import spade +from fastapi import FastAPI, APIRouter, HTTPException from spade.agent import Agent from spade.behaviour import OneShotBehaviour from spade.message import Message from spade.template import Template from redis_setup import redis_mgt as rm -import time +from agents.mlsysops.logger_util import logger app = FastAPI() router = APIRouter() @@ -25,7 +26,7 @@ class PingBehaviour(OneShotBehaviour): """ A behavior that sends a ping message to a specific recipient and waits for a response. """ async def run(self): - print("PingBehaviour running...") + logger.info("PingBehaviour running...") recipient_jid = "continuum@karmada.mlsysops.eu" time.sleep(2) @@ -34,21 +35,21 @@ async def run(self): msg.set_metadata("performative", "ping") msg.body = "Ping from " + str(self.agent.jid) - print(f"Sending ping to {recipient_jid}") + logger.info(f"Sending ping to {recipient_jid}") await self.send(msg) # Wait for response for 5 seconds response = await self.receive(timeout=10) if response: - print(f"Received response: {response.body}") - print("Agent Alive") + logger.info(f"Received response: {response.body}") + logger.info("Agent Alive") return {"status": "alive", "response": response.body} else: - print("Did not receive a response. Agent Error.") + logger.info("Did not receive a response. Agent Error.") return {"status": "error", "message": "No response received"} async def setup(self): - print("Starting NBAPI agent...") + logger.info("Starting NBAPI agent...") redis_manager = rm.RedisManager() redis_manager.connect() @@ -73,7 +74,7 @@ def __init__(self, redis_manager, mode): self.mode = mode # Store the mode value (0 or 1) async def run(self): - print(f"ManageModeBehaviour running with mode {self.mode}...") + logger.info(f"ManageModeBehaviour running with mode {self.mode}...") recipient_jids = self.r.get_keys("system_agents") recipient_jids.append("continuum@karmada.mlsysops.eu") @@ -81,11 +82,11 @@ async def run(self): msg = Message(to=recipient_jid) msg.set_metadata("performative", "ch_mode") msg.body = f"Change management mode to: {self.mode}" # Send mode in the message - print(f"Sending mode {self.mode} to {recipient_jid}") + logger.info(f"Sending mode {self.mode} to {recipient_jid}") await self.send(msg) async def setup(self): - print("Starting ML_client agent...") + logger.info("Starting ML_client agent...") @router.get("/ping", tags=["Management"]) diff --git a/northbound-api/endpoints/ml_models.py b/northbound-api/endpoints/ml_models.py index 70fee22..d9aa310 100644 --- a/northbound-api/endpoints/ml_models.py +++ b/northbound-api/endpoints/ml_models.py @@ -15,6 +15,7 @@ from typing import Annotated, List, Optional, Dict, Any from pydantic import BaseModel, Field +from agents.mlsysops.logger_util import logger # JSON schema with enum validation for the city schema = app_schema @@ -154,7 +155,7 @@ def get_yaml_info(data): return app_name, components except Exception as e: - print(f"Error processing the data: {e}") + logger.error(f"Error processing the data: {e}") return None, [] @@ -208,7 +209,7 @@ async def deploy_ml(payload: RootModel): try: internal_uid = parsed_data["MLSysOpsApplication"]["mlsysops-id"] except KeyError: - print("The mlsysops-id is not specified in the model description") + logger.error("The mlsysops-id is not specified in the model description") if validation_error is None and internal_uid != "0": @@ -222,7 +223,7 @@ async def deploy_ml(payload: RootModel): r.update_dict_value('endpoint_hash', internal_uid, str(info)) return {"status": "success", "message": "Deployment request added to queue"} except Exception as e: - print(f"Error checking the app in Redis: {e}") + logger.error(f"Error checking the app in Redis: {e}") raise HTTPException(status_code=500, detail=str(e)) else: raise HTTPException(status_code=400, detail=validation_error) @@ -266,7 +267,7 @@ async def get_ml_status(model_uid: str): try: # Fetch the value of the given app_id from Redis app_status = r.get_dict_value('endpoint_hash', model_uid) - print(app_status) + logger.info(app_status) if app_status is None: # If the app_id doesn't exist in Redis, return a 404 error raise HTTPException(status_code=404, detail=f"Model ID '{model_uid}' not found in the system.") diff --git a/northbound-api/getallpods.py b/northbound-api/getallpods.py index d5fdcb9..d4827e9 100644 --- a/northbound-api/getallpods.py +++ b/northbound-api/getallpods.py @@ -1,4 +1,5 @@ import subprocess +from agents.mlsysops.logger_util import logger # Define kubeconfig files karmada_api_kubeconfig = "/home/runner/karmada_management/karmada-api.kubeconfig" @@ -89,12 +90,12 @@ def main(): if isinstance(pod_details, list): for pod in pod_details: if "error" in pod: - print(f"❌ {pod['error']}") + logger.error(f"❌ {pod['error']}") else: - print(f"✅ Pod: {pod['pod_name']}, Status: {pod['pod_status']}, Node: {pod['node_name']}, " + logger.info(f"✅ Pod: {pod['pod_name']}, Status: {pod['pod_status']}, Node: {pod['node_name']}, " f"Cluster: {pod['cluster_name']}, Kubeconfig: {pod['kubeconfig']}") else: - print("Unexpected error: pod_details is not a list") + logger.error("Unexpected error: pod_details is not a list") if __name__ == "__main__": main() diff --git a/northbound-api/karmada_info.py b/northbound-api/karmada_info.py index 4162607..869882a 100644 --- a/northbound-api/karmada_info.py +++ b/northbound-api/karmada_info.py @@ -1,6 +1,7 @@ import subprocess import json import os +from agents.mlsysops.logger_util import logger def get_karmada_pods(cluster_name): """ @@ -26,7 +27,7 @@ def get_karmada_pods(cluster_name): return pods except subprocess.CalledProcessError as e: - print(f"Error executing kubectl command: {e.stderr}") + logger.error(f"Error executing kubectl command: {e.stderr}") return None if __name__ == "__main__": @@ -38,10 +39,10 @@ def get_karmada_pods(cluster_name): # Print the result if pods: - print("Pods retrieved successfully:") + logger.info("Pods retrieved successfully:") for pod in pods.get("items", []): pod_name = pod["metadata"].get("name", "Unknown") pod_status = pod["status"].get("phase", "Unknown") - print(f"Pod name: {pod_name}, Status: {pod_status}") + logger.info(f"Pod name: {pod_name}, Status: {pod_status}") else: - print("Failed to retrieve pods. Make sure the cluster name is correct and kubectl is configured.") + logger.error("Failed to retrieve pods. Make sure the cluster name is correct and kubectl is configured.") diff --git a/northbound-api/redis_setup/redis_mgt.py b/northbound-api/redis_setup/redis_mgt.py index 60da465..9d42885 100644 --- a/northbound-api/redis_setup/redis_mgt.py +++ b/northbound-api/redis_setup/redis_mgt.py @@ -2,7 +2,7 @@ import json import time from redis_setup import redis_config as rc - +from agents.mlsysops.logger_util import logger class RedisManager: def __init__(self): @@ -27,30 +27,30 @@ def connect(self): try: self.redis_conn = redis.Redis(host=self.host, port=self.port, db=self.db, password=self.redis_password) if self.redis_conn.ping(): - print(f"Successfully connected to Redis at {self.host}.") + logger.info(f"Successfully connected to Redis at {self.host}.") else: raise Exception("Could not connect to Redis.") except redis.ConnectionError as e: - print(f"Connection error: {e}") + logger.error(f"Connection error: {e}") self.redis_conn = None # --- Queue Methods --- def push(self, q_name, value): if self.redis_conn: self.redis_conn.rpush(q_name, value) - print(f"'{value}' added to the queue '{q_name}'.") + logger.info(f"'{value}' added to the queue '{q_name}'.") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") def pop(self, q_name): if self.redis_conn: value = self.redis_conn.lpop(q_name) if value: - print(f"'{value.decode()}' removed from the queue '{q_name}'.") + logger.info(f"'{value.decode()}' removed from the queue '{q_name}'.") return value.decode() - print(f"The queue '{q_name}' is empty.") + logger.info(f"The queue '{q_name}' is empty.") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") def is_empty(self, q_name): return self.redis_conn.llen(q_name) == 0 if self.redis_conn else True @@ -63,39 +63,39 @@ def empty_queue(self, q_name): def pub_ping(self, message): if self.redis_conn: self.redis_conn.publish(self.channel_name, message) - print(f"'{message}' published to the channel '{self.channel_name}'.") + logger.info(f"'{message}' published to the channel '{self.channel_name}'.") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") def subs_ping(self): if self.redis_conn: pubsub = self.redis_conn.pubsub() pubsub.subscribe(self.channel_name) - print(f"Subscribed to the channel '{self.channel_name}'.") + logger.info(f"Subscribed to the channel '{self.channel_name}'.") for message in pubsub.listen(): if message and message['type'] == 'message': - print(f"Message received: {message['data'].decode()}") + logger.info(f"Message received: {message['data'].decode()}") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") # --- Dictionary (Hash Map) Methods --- def update_dict_value(self, dict_name, key, value): if self.redis_conn: self.redis_conn.hset(dict_name, key, value) - print(f"Value for key '{key}' updated to '{value}' in dictionary '{dict_name}'.") + logger.info(f"Value for key '{key}' updated to '{value}' in dictionary '{dict_name}'.") else: - print("Redis connection not established.") + logger.info("Redis connection not established.") def get_dict_value(self, dict_name, key): if self.redis_conn: value = self.redis_conn.hget(dict_name, key) return value.decode() if value else None - print("Redis connection not established.") + logger.info("Redis connection not established.") def get_dict(self, dict_name): if self.redis_conn: return {k.decode(): v.decode() for k, v in self.redis_conn.hgetall(dict_name).items()} - print("Redis connection not established.") + logger.info("Redis connection not established.") def remove_key(self, dict_name, key): return bool(self.redis_conn.hdel(dict_name, key)) if self.redis_conn else False diff --git a/orchestrators/launch.py b/orchestrators/launch.py index 8115385..a5452b9 100755 --- a/orchestrators/launch.py +++ b/orchestrators/launch.py @@ -5,6 +5,7 @@ import yaml import json import sys +from agents.mlsysops.logger_util import logger # VM definitions vms = [ @@ -32,7 +33,7 @@ # 1. Launch the VMs for vm in vms: name = vm["name"] - print(f"Launching VM: {name}") + logger.info(f"Launching VM: {name}") try: subprocess.run([ "incus", "launch", image, f"{remote}{name}", @@ -44,15 +45,15 @@ "--profile", profile ], check=True) except subprocess.CalledProcessError: - print(f"Failed to launch {name}", file=sys.stderr) + logger.error(f"Failed to launch {name}", file=sys.stderr) continue # 2. Wait a bit for VMs to boot and acquire IPs - print("Waiting 20 seconds for VMs to initialize networking...") + logger.info("Waiting 20 seconds for VMs to initialize networking...") time.sleep(20) # 3. Fetch all instance info -print("📡 Fetching VM information from Incus...") +logger.info("📡 Fetching VM information from Incus...") output = subprocess.check_output([ "incus", "list", remote, "--format", "json", "--project", project ]).decode() @@ -88,7 +89,7 @@ break if not ip_address: - print(f"No usable IP found for {name}", file=sys.stderr) + logger.info(f"No usable IP found for {name}", file=sys.stderr) continue host_entry = { @@ -122,4 +123,4 @@ with open("inv.yml", "w") as f: yaml.dump(inventory, f, sort_keys=False) -print("Inventory written to inv.yml") +logger.info("Inventory written to inv.yml")