diff --git a/.github/workflows/pytest_transitions_tests_coverage.yaml b/.github/workflows/pytest_transitions_tests_coverage.yaml new file mode 100644 index 0000000..21bf23d --- /dev/null +++ b/.github/workflows/pytest_transitions_tests_coverage.yaml @@ -0,0 +1,29 @@ +name: Pytest with Coverage + +on: + pull_request: + branches: [humble-devel] + push: + branches: [humble-devel, testing] +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10.12' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r test/requirements.txt + pip install pytest pytest-cov + + - name: Run tests with coverage + run: | + PYTHONPATH=. pytest --cov=manager/manager --cov-report=term \ No newline at end of file diff --git a/.gitignore b/.gitignore index dd22003..f95d89c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,8 @@ __pycache__/ /.idea # IDEs -.vscode \ No newline at end of file +.vscode + +# Log Files +*.log +*.coverage \ No newline at end of file diff --git a/manager/comms/new_consumer.py b/manager/comms/new_consumer.py index 2c7d215..7c04b84 100644 --- a/manager/comms/new_consumer.py +++ b/manager/comms/new_consumer.py @@ -1,3 +1,9 @@ +""" +WebSocket consumer module for the Robotics Application Manager (RAM). + +Handles client connections, message processing, and communication with manager queue. +""" + import json import logging from queue import Queue @@ -8,12 +14,15 @@ ManagerConsumerMessageException, ManagerConsumerMessage, ) -from manager.comms.websocker_server import WebsocketServer +from manager.comms.websocket_server import WebsocketServer from manager.ram_logging.log_manager import LogManager class Client: + """Represents a client connected to the WebSocket server.""" + def __init__(self, **kwargs): + """Initialize a Client instance with id, handler, and address.""" self.id = kwargs["id"] self.handler = kwargs["handler"] self.address = kwargs["address"] @@ -21,12 +30,21 @@ def __init__(self, **kwargs): class ManagerConsumer: """ - Websocket server consumer for new Robotics Application Manager aka. RAM + Websocket server consumer for new Robotics Application Manager aka: RAM. + Supports single client connection to RAM TODO: Better handling of single client connections, closing and redirecting """ def __init__(self, host, port, manager_queue: Queue): + """ + Initialize the ManagerConsumer with host, port, and manager_queue. + + Args: + host (str): The host address for the WebSocket server. + port (int): The port number for the WebSocket server. + manager_queue (Queue): The queue for communication with the manager. + """ self.host = host self.port = port self.server = WebsocketServer(host=host, port=port, loglevel=logging.INFO) @@ -37,7 +55,8 @@ def __init__(self, host, port, manager_queue: Queue): ws_logger.setLevel(logging.INFO) ws_logger.handlers.clear() ws_formatter = logging.Formatter( - "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] (%(name)s) %(message)s", + "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] " + "(%(name)s) %(message)s", "%H:%M:%S", ) ws_console_handler = logging.StreamHandler() @@ -51,11 +70,25 @@ def __init__(self, host, port, manager_queue: Queue): self.manager_queue = manager_queue def handle_client_new(self, client, server): + """ + Handle a new client connection event. + + Args: + client: The client object representing the connected client. + server: The WebSocket server instance. + """ LogManager.logger.info(f"client connected: {client}") self.client = client self.server.deny_new_connections() def handle_client_disconnect(self, client, server): + """ + Handle a client disconnection event. + + Args: + client: The client object representing the disconnected client. + server: The WebSocket server instance. + """ if client is None: return LogManager.logger.info(f"client disconnected: {client}") @@ -70,6 +103,14 @@ def handle_client_disconnect(self, client, server): self.server.allow_new_connections() def handle_message_received(self, client, server, websocket_message): + """ + Handle a message received from a client. + + Args: + client: The client object that sent the message. + server: The WebSocket server instance. + websocket_message (str): The message received from the client. + """ LogManager.logger.info( f"message received length: {len(websocket_message)} from client {client}" ) @@ -90,6 +131,15 @@ def handle_message_received(self, client, server, websocket_message): raise e def send_message(self, message_data, command=None): + """ + Send a message to the connected client. + + Args: + message_data: The message data to send, can be a ManagerConsumerMessage, + ManagerConsumerMessageException, or other data. + command (str, optional): The command associated with the message, + used if message_data is not a ManagerConsumerMessage. + """ if self.client is not None and self.server is not None: if isinstance(message_data, ManagerConsumerMessage): message = message_data @@ -103,7 +153,9 @@ def send_message(self, message_data, command=None): self.server.send_message(self.client, str(message)) def start(self): + """Start the WebSocket server in a separate thread.""" self.server.run_forever(threaded=True) def stop(self): + """Stop the WebSocket server gracefully.""" self.server.shutdown_gracefully() diff --git a/manager/comms/websocker_server.py b/manager/comms/websocket_server.py similarity index 100% rename from manager/comms/websocker_server.py rename to manager/comms/websocket_server.py diff --git a/manager/libs/launch_world_model.py b/manager/libs/launch_world_model.py index 8a017f3..33e6978 100644 --- a/manager/libs/launch_world_model.py +++ b/manager/libs/launch_world_model.py @@ -1,3 +1,5 @@ +"""Module for managing and validating robotics application launch world configs.""" + from dataclasses import dataclass from typing import Optional from pydantic import BaseModel, ValidationError @@ -6,6 +8,8 @@ class ConfigurationModel(BaseModel): + """Pydantic model for robotics application world type and launch file config.""" + type: str launch_file_path: str @@ -15,10 +19,23 @@ class ConfigurationModel(BaseModel): @dataclass class ConfigurationManager: + """Manager for robotics application configuration validation and storage.""" + configuration: ConfigurationModel @staticmethod def validate(configuration: dict): + """Validate the given configuration dictionary using the ConfigurationModel. + + Args: + configuration (dict): The configuration data to validate. + + Returns: + ConfigurationModel: The validated configuration model. + + Raises: + ValueError: If the configuration is invalid. + """ try: return ConfigurationModel(**configuration) except ValidationError as e: diff --git a/manager/manager/launcher/launcher_robot.py b/manager/manager/launcher/launcher_robot.py index 8519f67..a00c21a 100644 --- a/manager/manager/launcher/launcher_robot.py +++ b/manager/manager/launcher/launcher_robot.py @@ -1,3 +1,5 @@ +"""LauncherRobot module for managing robot launchers in different simulation worlds.""" + from typing import Optional from pydantic import BaseModel @@ -66,6 +68,8 @@ class LauncherRobot(BaseModel): + """Class for managing robot launchers in different simulation worlds.""" + type: str launch_file_path: str module: str = ".".join(__name__.split(".")[:-1]) @@ -74,7 +78,8 @@ class LauncherRobot(BaseModel): start_pose: Optional[list] = [] def run(self, start_pose=None): - if start_pose != None: + """Run the robot launcher with an optional start pose.""" + if start_pose is not None: self.start_pose = start_pose for module in worlds[self.type][str(self.ros_version)]: module["launch_file"] = self.launch_file_path @@ -83,6 +88,7 @@ def run(self, start_pose=None): LogManager.logger.info(self.launchers) def terminate(self): + """Terminate all robot launchers and clear the launchers list.""" LogManager.logger.info("Terminating robot launcher") if self.launchers: for launcher in self.launchers: @@ -90,6 +96,8 @@ def terminate(self): self.launchers = [] def launch_module(self, configuration): + """Launch a robot module based on the provided configuration.""" + def process_terminated(name, exit_code): LogManager.logger.info( f"LauncherEngine: {name} exited with code {exit_code}" @@ -98,7 +106,10 @@ def process_terminated(name, exit_code): self.terminated_callback(name, exit_code) launcher_module_name = configuration["module"] - launcher_module = f"{self.module}.launcher_{launcher_module_name}.Launcher{class_from_module(launcher_module_name)}" + launcher_module = ( + f"{self.module}.launcher_{launcher_module_name}." + f"Launcher{class_from_module(launcher_module_name)}" + ) launcher_class = get_class(launcher_module) launcher = launcher_class.from_config(launcher_class, configuration) @@ -106,9 +117,13 @@ def process_terminated(name, exit_code): return launcher def launch_command(self, configuration): + """Launch a robot command based on the provided configuration.""" pass class LauncherRobotException(Exception): + """Exception class for errors related to LauncherRobot.""" + def __init__(self, message): + """Initialize the LauncherRobotException with a message.""" super(LauncherRobotException, self).__init__(message) diff --git a/manager/manager/lint/linter.py b/manager/manager/lint/linter.py index 63c6966..da09a0b 100644 --- a/manager/manager/lint/linter.py +++ b/manager/manager/lint/linter.py @@ -1,12 +1,28 @@ +"""Linter module for evaluating and cleaning Python code using pylint.""" + import glob import re -import os +import os # noqa: F401 import subprocess import tempfile class Lint: + """Class for evaluating and cleaning Python code using pylint.""" + def clean_pylint_output(self, result, warnings=False): + """ + Clean the output from pylint. + + By removing unwanted messages and formatting errors. + + Args: + result (str): The output string from pylint. + warnings (bool): Whether to include warnings in the output. + + Returns: + str: The cleaned and formatted output. + """ # result = result.replace(os.path.basename(code_file_name), 'user_code') # Define the patterns to remove @@ -16,13 +32,15 @@ def clean_pylint_output(self, result, warnings=False): r":[0-9]+:[0-9]+: R[0-9]{4}:.*", # Refactor messages r":[0-9]+:[0-9]+: error.*EOF.*", # Unexpected EOF error r":[0-9]+:[0-9]+: E1101:.*Module 'ompl.*", # ompl E1101 error - r":[0-9]+:[0-9]+:.*value.*argument.*unbound.*method.*", # No value for argument 'self' error + ( + r":[0-9]+:[0-9]+:.*value.*argument.*unbound.*method.*" + ), # No value for argument 'self' error r":[0-9]+:[0-9]+: E1111:.*", # Assignment from no return error r":[0-9]+:[0-9]+: E1136:.*", # E1136 until issue is resolved ] if not warnings: - # Remove convention, refactor, and warning messages if warnings are not desired + # Remove convention, refactor, and warning msgs if warnings are not desired for pattern in patterns[:3]: result = re.sub(r"^[^:]*" + pattern, "", result, flags=re.MULTILINE) @@ -43,6 +61,15 @@ def clean_pylint_output(self, result, warnings=False): return result def append_rating_if_missing(self, result): + """ + Append a default rating message to the result if it is missing. + + Args: + result (str): The output string from pylint. + + Returns: + str: The result string with the rating message appended if necessary. + """ rating_message = ( "-----------------------------------\nYour code has been rated at 0.00/10" ) @@ -58,13 +85,28 @@ def append_rating_if_missing(self, result): def evaluate_code( self, code, ros_version, warnings=False, py_lint_source="pylint_checker.py" ): + """ + Evaluate the provided Python code using pylint and return the cleaned output. + + Args: + code (str): The Python code to evaluate. + ros_version (str): The ROS version to determine environment settings. + warnings (bool, optional): Whether to include warnings in the output. + Defaults to False. + py_lint_source (str, optional): The pylint checker source file. + Defaults to "pylint_checker.py". + + Returns: + str: The cleaned and formatted pylint output. + """ try: code = re.sub(r"from HAL import HAL", "from hal import HAL", code) code = re.sub(r"from GUI import GUI", "from gui import GUI", code) code = re.sub(r"from MAP import MAP", "from map import MAP", code) code = re.sub(r"\nimport cv2\n", "\nfrom cv2 import cv2\n", code) - # Avoids EOF error when iterative code is empty (which prevents other errors from showing) + # Avoids EOF error when iterative code is empty + # (which prevents other errors from showing) while_position = re.search( r"[^ ]while\s*\(\s*True\s*\)\s*:|[^ ]while\s*True\s*:|[^ ]while\s*1\s*:|[^ ]while\s*\(\s*1\s*\)\s*:", code, @@ -89,9 +131,17 @@ def evaluate_code( command = "" if "humble" in str(ros_version): - command = f"export PYTHONPATH=$PYTHONPATH:/workspace/code; python3 /RoboticsApplicationManager/manager/manager/lint/{py_lint_source}" + command = ( + f"export PYTHONPATH=$PYTHONPATH:/workspace/code; " + f"python3 /RoboticsApplicationManager/manager/manager/lint/" + f"{py_lint_source}" + ) else: - command = f"export PYTHONPATH=$PYTHONPATH:/workspace/code; python3 /RoboticsApplicationManager/manager/manager/lint/{py_lint_source}" + command = ( + f"export PYTHONPATH=$PYTHONPATH:/workspace/code; " + f"python3 /RoboticsApplicationManager/manager/manager/lint/" + f"{py_lint_source}" + ) ret = subprocess.run(command, capture_output=True, text=True, shell=True) diff --git a/manager/manager/manager.py b/manager/manager/manager.py index e9c1591..21cebc1 100644 --- a/manager/manager/manager.py +++ b/manager/manager/manager.py @@ -1,3 +1,10 @@ +"""Manager module for Robotics Application Manager. + +This module defines the Manager class and related logic for managing applications, +including launching worlds, robots, visualizations, +and handling code analysis and formatting. +""" + from __future__ import annotations import json import sys @@ -43,6 +50,14 @@ class Manager: + """ + Manager class for Robotics Application Manager. + + This class manages the lifecycle of robotics applications, + including launching worlds, robots, visualizations, + handling code analysis, formatting, and communication with clients. + """ + states = [ "idle", "connected", @@ -175,7 +190,15 @@ class Manager: ] def __init__(self, host: str, port: int): + """ + Initialize the Manager instance with the given host and port. + This method sets up the state machine, initializes the ROS version, + creates a message queue, and prepares the consumer for communication. + Parameters: + host (str): The host address to listen to. + port (int): The port number to listen to. + """ self.machine = Machine( model=self, states=Manager.states, @@ -207,27 +230,46 @@ def __init__(self, host: str, port: int): os.makedirs(binaries_dir) def state_change(self, event): + """ + Handle actions to be performed after a state change in the state machine. + + Parameters: + event: The event object associated with the state change. + """ LogManager.logger.info(f"State changed to {self.state}") if self.consumer is not None: self.consumer.send_message({"state": self.state}, command="state-changed") def update(self, data): - LogManager.logger.debug(f"Sending update to client") + """ + Send an update message to the client with the provided data. + + Parameters: + data: The data to be sent in the update message. + """ + LogManager.logger.debug("Sending update to client") if self.consumer is not None: self.consumer.send_message({"update": data}, command="update") def update_bt_studio(self, data): - LogManager.logger.debug(f"Sending update to client") + """ + Send an update message to the client for BT Studio with the provided data. + + Parameters: + data: The data to be sent in the update message. + """ + LogManager.logger.debug("Sending update to client") if self.consumer is not None: self.consumer.send_message({"update": data}, command="update") def on_connect(self, event): """ - This method is triggered when the application transitions to the 'connected' state. + Triggered when the application transitions to the 'connected' state. + It sends an introspection message to a consumer with key information. Parameters: - event (Event): The event object containing data related to the 'connect' event. + event (Event): Event object containing data related to the 'connect' event. The message sent to the consumer includes: - `robotics_backend_version`: The current Robotics Backend version. @@ -247,7 +289,8 @@ def on_connect(self, event): def on_launch_world(self, event): """ - Handles the 'launch' event, transitioning the application from 'connected' to 'ready' state. + Handle the 'launch' event, transitioning the application from 'connected' to 'ready' state. + This method initializes the launch process based on the provided configuration. During the launch process, it validates and processes the configuration data received from the event. @@ -312,7 +355,14 @@ def on_launch_world(self, event): LogManager.logger.info("Launch transition finished") def prepare_custom_universe(self, cfg_dict): + """ + Prepare and extract a custom universe from a base64-encoded zip file. + + Then build it in the workspace. + Parameters: + cfg_dict (dict): Config dictionary containing the universe name and zip data + """ # Unzip the app if cfg_dict["zip"].startswith("data:"): _, _, zip_file = cfg_dict["zip"].partition("base64,") @@ -358,7 +408,9 @@ def on_prepare_tools(self, event): def on_style_check_application(self, event): """ - Handles the 'style_check' event, does not change the state and returns the current state. + Handle the 'style_check' event. + + Does not change the state and returns the current state. It uses the linter to check if the style of the code is correct, if there are errors it writes them in all the consoles and raises the errors. @@ -371,7 +423,7 @@ def on_style_check_application(self, event): """ def find_docker_console(): - """Search console in docker different of /dev/pts/0""" + """Search console in docker different of /dev/pts/0 .""" pts_consoles = [ f"/dev/pts/{dev}" for dev in os.listdir("/dev/pts/") if dev.isdigit() ] @@ -425,8 +477,9 @@ def find_docker_console(): def on_code_analysis(self, event): """ - Handles the 'code_analysis' event, does not change the state and returns the current state. + Handle the 'code_analysis' event. + Does not change the state and returns the current state. It uses pylint to check for the errors and warnings in the code. Parameters: @@ -435,7 +488,6 @@ def on_code_analysis(self, event): Returns: Sends the output of the pylint command in the code-analysis event for the frontend. """ - # Extract app config app_cfg = event.kwargs.get("data", {}) code_string = app_cfg["code"] @@ -490,17 +542,18 @@ def on_code_analysis(self, event): def on_code_format(self, event): """ - Handles the 'code_format' event, does not change the state and returns the current state. + Handle the 'code_format' event. + Does not change the state and returns the current state. It uses the black formatter to format the user code. Parameters: event (Event): Has the fields code (user code). Returns: - Sends the output of the black format in the code-format event for the frontend. + Sends the output of the black format in + the code-format event for the frontend. """ - # Extract app config app_cfg = event.kwargs.get("data", {}) code = app_cfg["code"] @@ -524,17 +577,19 @@ def on_code_format(self, event): def on_code_autocomplete(self, event): """ - Handles the 'code_autocomplete' event, does not change the state and returns the current state. + Handle the 'code_autocomplete' event. - It uses jedi to find the possible autocompletions in the user code give the cursor position. + Does not change the state and returns the current state. + It uses jedi to find the possible autocompletions in the user code + given the cursor position. Parameters: event (Event): Has the fields code (user code), line and col . Returns: - Sends the possible completions in the code-autocomplete event for the frontend. + Sends the possible completions in + the code-autocomplete event for the frontend. """ - # Extract app config app_cfg = event.kwargs.get("data", {}) code = app_cfg["code"] @@ -568,8 +623,19 @@ def on_code_autocomplete(self, event): LogManager.logger.info("Error formating code" + str(e)) def on_run_application(self, event): + """ + Handle the 'run_application' event. + + This method manages the process of running the user application, + including preparing the code, handling console output, + and launching the application process. + + Parameters: + event: The event object containing application configuration and code data. + """ + def find_docker_console(): - """Search console in docker different of /dev/pts/0""" + """Search console in docker different of /dev/pts/0 .""" pts_consoles = [ f"/dev/pts/{dev}" for dev in os.listdir("/dev/pts/") if dev.isdigit() ] @@ -654,7 +720,10 @@ def find_docker_console(): def terminate_harmonic_processes(self): """ - Terminate all processes within the Docker container whose command line contains 'gz' or 'launch'. + Terminate all Harmonic processes in the container. + + Terminate all processes in the container + whose command line contains 'gz' or 'launch'. """ LogManager.logger.info("Terminate Harmonic process") keywords = ["gz", "launch"] @@ -699,7 +768,15 @@ def terminate_harmonic_processes(self): ) def on_terminate_application(self, event): + """ + Handle the 'terminate_application' event. + Terminates the currently running application process, + pauses and resets the simulation if applicable. + + Parameters: + event: The event object associated with the termination request. + """ if self.application_process: try: stop_process_and_children(self.application_process) @@ -716,14 +793,28 @@ def on_terminate_tools(self, event): self.terminate_harmonic_processes() def on_terminate_universe(self, event): + """ + Handle the 'terminate_universe' event. + + Terminates the world and robot launchers if they exist + and terminates related Harmonic processes. - if self.world_launcher != None: + Parameters: + event: The event object associated with the termination request. + """ + if self.world_launcher is not None: self.world_launcher.terminate() - if self.robot_launcher != None: + if self.robot_launcher is not None: self.robot_launcher.terminate() self.terminate_harmonic_processes() def on_disconnect(self, event): + """ + Handle the 'disconnect' event. + + This method stops all running processes, + terminates launchers, and restarts the script. + """ try: self.consumer.stop() @@ -786,6 +877,12 @@ def on_pause(self, msg): self.reset_sim() def on_resume(self, msg): + """ + Resume the application process if it exists, otherwise reset the simulation. + + Parameters: + msg: The event or message triggering the resume action. + """ if self.application_process is not None: try: proc = psutil.Process(self.application_process.pid) @@ -806,6 +903,13 @@ def unpause_sim(self): self.tools_launcher.unpause() def reset_sim(self): + """ + Reset the simulation environment and relaunch the robot if applicable. + + This method terminates the robot launcher, resets the simulation state using + the appropriate ROS or Gazebo services based on the visualization type, + and relaunches the robot if a launcher is available. + """ if self.robot_launcher: self.robot_launcher.terminate() @@ -819,8 +923,10 @@ def reset_sim(self): def start(self): """ - Starts the RAM - RAM must be run in main thread to be able to handle signaling other processes, for instance ROS launcher. + Start the RAM. + + RAM must be run in main thread to be able to handle signaling other processes, + for instance ROS launcher. """ LogManager.logger.info( f"Starting RAM consumer in {self.consumer.server}:{self.consumer.port}" diff --git a/manager/manager/vnc/vnc_server.py b/manager/manager/vnc/vnc_server.py index 307d6bf..6c25ae4 100755 --- a/manager/manager/vnc/vnc_server.py +++ b/manager/manager/vnc/vnc_server.py @@ -1,3 +1,9 @@ +"""VNC server management module for RoboticsApplicationManager. + +Provides classes and functions to start and manage VNC and noVNC servers, +including GPU-accelerated sessions and desktop icon creation. +""" + import time import socket from manager.manager.docker_thread.docker_thread import DockerThread @@ -8,28 +14,49 @@ class Vnc_server: + """Class to manage VNC and noVNC server sessions for RoboticsApplicationManager.""" + threads: List[Any] = [] running: bool = False def start_vnc(self, display, internal_port, external_port): + """Start a VNC and noVNC server session. + + Args: + display (str): X display identifier. + internal_port (int): Port for the VNC server. + external_port (int): Port for the noVNC server. + """ # Start X server in display - xserver_cmd = f"/usr/bin/Xorg -quiet -noreset +extension GLX +extension RANDR +extension RENDER -logfile ./xdummy.log -config ./xorg.conf {display}" + xserver_cmd = ( + f"/usr/bin/Xorg -quiet -noreset +extension GLX +extension RANDR " + f"+extension RENDER -logfile ./xdummy.log -config ./xorg.conf {display}" + ) xserver_thread = DockerThread(xserver_cmd) xserver_thread.start() self.threads.append(xserver_thread) wait_for_xserver(display) # Start VNC server without password, forever running in background - x11vnc_cmd = f"x11vnc -repeat -quiet -display {display} -nopw -forever -xkb -bg -rfbport {internal_port}" + x11vnc_cmd = ( + f"x11vnc -repeat -quiet -display {display} " + f"-nopw -forever -xkb -bg -rfbport {internal_port}" + ) x11vnc_thread = DockerThread(x11vnc_cmd) x11vnc_thread.start() self.threads.append(x11vnc_thread) # Start noVNC with default port 6080 listening to VNC server on 5900 if self.get_ros_version() == "2": - novnc_cmd = f"/noVNC/utils/novnc_proxy --listen {external_port} --vnc localhost:{internal_port}" + novnc_cmd = ( + f"/noVNC/utils/novnc_proxy --listen {external_port} " + f"--vnc localhost:{internal_port}" + ) else: - novnc_cmd = f"/noVNC/utils/launch.sh --listen {external_port} --vnc localhost:{internal_port}" + novnc_cmd = ( + f"/noVNC/utils/launch.sh --listen {external_port} " + f"--vnc localhost:{internal_port}" + ) novnc_thread = DockerThread(novnc_cmd) novnc_thread.start() @@ -40,8 +67,22 @@ def start_vnc(self, display, internal_port, external_port): self.wait_for_port("localhost", external_port) def start_vnc_gpu(self, display, internal_port, external_port, dri_path): + """Start a GPU-accelerated VNC and noVNC server session. + + Args: + display (str): X display identifier. + internal_port (int): Port for the VNC server. + external_port (int): Port for the noVNC server. + dri_path (str): Path to the GPU device for hardware acceleration. + """ # Start X and VNC servers - turbovnc_cmd = f"export VGL_DISPLAY={dri_path} && export TVNC_WM=startlxde && /opt/TurboVNC/bin/vncserver {display} -geometry '1920x1080' -vgl -noreset -SecurityTypes None -rfbport {internal_port}" + turbovnc_cmd = ( + f"export VGL_DISPLAY={dri_path} && " + f"export TVNC_WM=startlxde && " + f"/opt/TurboVNC/bin/vncserver {display} " + f"-geometry '1920x1080' -vgl -noreset " + f"-SecurityTypes None -rfbport {internal_port}" + ) turbovnc_thread = DockerThread(turbovnc_cmd) turbovnc_thread.start() self.threads.append(turbovnc_thread) @@ -49,9 +90,15 @@ def start_vnc_gpu(self, display, internal_port, external_port, dri_path): # Start noVNC with default port 6080 listening to VNC server on 5900 if self.get_ros_version() == "2": - novnc_cmd = f"/noVNC/utils/novnc_proxy --listen {external_port} --vnc localhost:{internal_port}" + novnc_cmd = ( + f"/noVNC/utils/novnc_proxy --listen {external_port} " + f"--vnc localhost:{internal_port}" + ) else: - novnc_cmd = f"/noVNC/utils/launch.sh --listen {external_port} --vnc localhost:{internal_port}" + novnc_cmd = ( + f"/noVNC/utils/launch.sh --listen {external_port} " + f"--vnc localhost:{internal_port}" + ) novnc_thread = DockerThread(novnc_cmd) novnc_thread.start() @@ -65,11 +112,24 @@ def start_vnc_gpu(self, display, internal_port, external_port, dri_path): self.create_gzclient_icon() def wait_for_port(self, host, port, timeout=20): + """Wait for a TCP port on a host to become available within a timeout period. + + Args: + host (str): Hostname or IP address to check. + port (int): Port number to check. + timeout (int, optional): Maximum time to wait in seconds. Defaults to 20. + + Raises: + TimeoutError: If the port does not become available within the timeout. + """ start_time = time.time() while True: if time.time() - start_time > timeout: raise TimeoutError( - f"Port {port} on {host} didn't become available within {timeout} seconds." + ( + f"Port {port} on {host} didn't become available " + f"within {timeout} seconds." + ) ) try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: @@ -80,9 +140,15 @@ def wait_for_port(self, host, port, timeout=20): time.sleep(1) def is_running(self): + """Check if the VNC server is currently running. + + Returns: + bool: True if running, False otherwise. + """ return self.running def terminate(self): + """Terminate all running threads and stop the VNC server.""" for thread in self.threads: if thread.is_alive(): thread.terminate() @@ -91,10 +157,16 @@ def terminate(self): self.running = False def get_ros_version(self): + """Get the current ROS version from the environment. + + Returns: + str: The ROS version as a string. + """ output = subprocess.check_output(["bash", "-c", "echo $ROS_VERSION"]) return output.decode("utf-8").strip() def create_desktop_icon(self): + """Create a desktop icon to launch a terminal application.""" try: desktop_dir = os.path.expanduser("~/Desktop") if not os.path.exists(desktop_dir): @@ -116,6 +188,7 @@ def create_desktop_icon(self): print(err) def create_gzclient_icon(self): + """Create a desktop icon to launch the Gazebo client application.""" desktop_dir = os.path.expanduser("~/Desktop") if not os.path.exists(desktop_dir): os.makedirs(desktop_dir) diff --git a/manager/ram_logging/log_manager.py b/manager/ram_logging/log_manager.py index ed97894..ad7d149 100644 --- a/manager/ram_logging/log_manager.py +++ b/manager/ram_logging/log_manager.py @@ -1,3 +1,9 @@ +""" +LogManager singleton for managing application logging. + +Has support for colored and length-limited log formatting. +""" + import logging import os @@ -6,6 +12,8 @@ # Clase para un Formatter personalizado que aƱade colores class ColorFormatter(logging.Formatter): + """Custom formatter that adds colors to log messages based on their log level.""" + # Diccionario de colores para diferentes niveles de log COLORS = { logging.ERROR: "\033[91m", # Rojo para errores @@ -16,6 +24,7 @@ class ColorFormatter(logging.Formatter): RESET = "\033[0m" # Resetear a color por defecto def format(self, record): + """Format the log record with color based on its log level.""" color = self.COLORS.get(record.levelno) message = super().format(record) if color: @@ -24,6 +33,8 @@ def format(self, record): class MaxLengthColorFormatter(logging.Formatter): + """Custom formatter that adds colors and limits log message length.""" + # Diccionario de colores para diferentes niveles de log COLORS = { logging.ERROR: "\033[91m", # Rojo para errores @@ -35,6 +46,7 @@ class MaxLengthColorFormatter(logging.Formatter): MAX_LENGTH = 1000 def format(self, record): + """Format the log record with color and limit its length.""" color = self.COLORS.get(record.levelno) msg = super().format(record) if color: @@ -50,13 +62,23 @@ def format(self, record): @singleton class LogManager: + """Singleton class for managing application logging.""" + def __init__(self): + """ + Initialize the LogManager. + + Sets up file and console logging handlers with appropriate formatters. + """ log_path = os.getcwd() log_level = logging.INFO log_to_console = True self.log_file = os.path.join(log_path, "ram.log") - log_format = "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] (%(name)s) %(message)s" + log_format = ( + "%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] " + "(%(name)s) %(message)s" + ) date_format = "%H:%M:%S" self.log_formatter = logging.Formatter(log_format, date_format) self.color_formatter = ColorFormatter( diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..037d43f --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +addopts = -v +testpaths = test +python_files = test_*.py \ No newline at end of file diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..c9a8126 --- /dev/null +++ b/test/README.md @@ -0,0 +1,15 @@ +# Testing RAM + +### Run All Tests + +- Navigate to root directory + +```bash +cd RoboticsApplicationManager +``` + +- Run PyTest + +```bash +PYTHONPATH=. pytest +``` \ No newline at end of file diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..0c6dbcd --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,156 @@ +"""Utility functions to transition the Manager to a specific state for testing.""" + +import pytest +from manager.manager.manager import Manager + + +class DummyServer: + """A dummy server class to simulate server behavior for testing purposes.""" + + def __init__(self, host, port, loglevel): + """Initialize the DummyServer with host, port, and loglevel.""" + self.host = host + self.port = port + self.loglevel = loglevel + + def set_fn_new_client(self, fn): + """Set the function to be called when a new client connects.""" + pass + + def set_fn_client_left(self, fn): + """Set the function to be called when a client leaves.""" + pass + + def set_fn_message_received(self, fn): + """Set the function to be called when a message is received.""" + pass + + def deny_new_connections(self): + """Simulate denying new connections (dummy implementation).""" + pass + + def allow_new_connections(self): + """Simulate allowing new connections (dummy implementation).""" + pass + + def send_message(self, client, message): + """Simulate sending a message to a client (dummy implementation).""" + pass + + def run_forever(self, threaded=True): + """Simulate running the server forever (dummy implementation).""" + pass + + def shutdown_gracefully(self): + """Simulate graceful shutdown of the server (dummy implementation).""" + pass + + +class DummyConsumer: + """A dummy consumer to capture messages sent by the Manager.""" + + def __init__(self, host=None, port=None, queue=None): + """ + Initialize the DummyConsumer with empty message storage. + + This constructor sets up the messages list and last_message attribute. + """ + self.messages = [] + self.last_message = None + + def send_message(self, *args, **kwargs): + """ + Capture and store a message sent by the Manager. + + Stores the message arguments and updates the last_message attribute. + """ + self.messages.append((args, kwargs)) + self.last_message = (args, kwargs) + + def stop(self): + """Simulate consumer stopping.""" + pass + + +@pytest.fixture +def manager(monkeypatch): + """Fixture to provide a Manager instance with patched dependencies for testing.""" + + monkeypatch.setattr("manager.comms.websocket_server.WebsocketServer", DummyServer) + monkeypatch.setattr("manager.manager.manager.ManagerConsumer", DummyConsumer) + + # Patch subprocess.check_output for ROS_DISTRO and IMAGE_TAG + def fake_check_output(cmd, *a, **k): + if "ROS_DISTRO" in cmd[-1]: + return b"humble" + if "IMAGE_TAG" in cmd[-1]: + return b"test_image_tag" + return b"" + + monkeypatch.setattr("subprocess.check_output", fake_check_output) + + # Patch check_gpu_acceleration where it is used + monkeypatch.setattr( + "manager.manager.manager.check_gpu_acceleration", lambda x=None: "OFF" + ) + + def dummy_run(self, start_pose=None): + print("run around") + + monkeypatch.setattr( + "manager.manager.launcher.launcher_robot.LauncherRobot.run", dummy_run + ) + + # Patch os.makedirs and os.path.isdir to avoid real FS operations + monkeypatch.setattr("os.makedirs", lambda path, exist_ok=False: None) + monkeypatch.setattr("os.path.isdir", lambda path: True) + + # Patch LauncherWorld to avoid launching real processes + class DummyLauncherWorld: + def __init__(self, *a, **k): + self.launched = False + + def launch(self): + self.launched = True + + def run(self): + self.launched = True + # Simulate running the world + return + + def terminate(self): + pass + + monkeypatch.setattr("manager.manager.manager.LauncherWorld", DummyLauncherWorld) + + class DummyFileWatchdog: + def __init__(self, path, update_callback): + self.path = path + self.update_callback = update_callback + self.started = False + + def start(self): + self.started = True + + def stop(self): + self.started = False + + class DummyToolsLauncher: + def __init__(self, *args, **kwargs): + self.launchers = [] + + def run(self, consumer): + # Simulate running the tools launcher + return + + def terminate(self): + pass + + monkeypatch.setattr("manager.manager.manager.LauncherTools", DummyToolsLauncher) + # Deprecated + # monkeypatch.setattr("manager.manager.manager.Server", DummyServer) + # monkeypatch.setattr("manager.manager.manager.FileWatchdog", DummyFileWatchdog) + + # Setup Manager with dummy consumer + m = Manager(host="localhost", port=12345) + return m diff --git a/test/requirements.txt b/test/requirements.txt new file mode 100644 index 0000000..34fc335 --- /dev/null +++ b/test/requirements.txt @@ -0,0 +1,39 @@ +annotated-types==0.7.0 +asgiref==3.9.1 +black==22.3.0 +click==8.2.1 +coverage==7.9.2 +Django==5.2.4 +djangorestframework==3.16.0 +exceptiongroup==1.3.0 +flake8==7.3.0 +flake8-docstrings==1.7.0 +iniconfig==2.1.0 +jedi==0.19.2 +lark==1.2.2 +mccabe==0.7.0 +mypy_extensions==1.1.0 +packaging==25.0 +parso==0.8.4 +pathspec==0.12.1 +platformdirs==4.3.8 +pluggy==1.6.0 +psutil==7.0.0 +pycodestyle==2.14.0 +pydantic==2.11.7 +pydantic_core==2.33.2 +pydocstyle==6.3.0 +pyflakes==3.4.0 +Pygments==2.19.2 +pytest==8.4.1 +pytest-cov==6.2.1 +PyYAML==6.0.2 +six==1.17.0 +snowballstemmer==3.0.1 +sqlparse==0.5.3 +tomli==2.2.1 +transitions==0.9.3 +typing-inspection==0.4.1 +typing_extensions==4.14.1 +watchdog==6.0.0 +websocket-server==0.6.4 diff --git a/test/test_connect_disconnect_transitions.py b/test/test_connect_disconnect_transitions.py new file mode 100644 index 0000000..f9d6729 --- /dev/null +++ b/test/test_connect_disconnect_transitions.py @@ -0,0 +1,99 @@ +"""Tests for transitioning Manager from 'idle' to 'connected' state.""" + +import pytest +from test_utils import setup_manager_to_world_ready +from test_utils import setup_manager_to_application_running + + +class DummyProc: + """Dummy process class for testing suspend and resume methods.""" + + def suspend(self): + """Simulate suspending the process.""" + pass + + def resume(self): + """Simulate resuming the process.""" + pass + + +def test_idle_to_connected(manager): + """Test transitioning Manager from 'idle' to 'connected' state.""" + # Initial state should be 'idle' + assert manager.state == "idle" + # Simulate the 'connect' event + manager.trigger("connect", event=None) + # State should now be 'connected' + assert manager.state == "connected" + # Check that the consumer received the expected message + msgs = manager.consumer.messages + on_connect_msg = msgs[0][0] + state_change_msg = msgs[1] + # print(msgs) + # Verify the first message (on connect) + assert on_connect_msg[0]["robotics_backend_version"] == b"test_image_tag" + assert on_connect_msg[0]["ros_version"] == b"humble" + assert on_connect_msg[0]["gpu_avaliable"] == "OFF" + + # Verify the state change message + assert state_change_msg[0][0]["state"] == "connected" + assert state_change_msg[1]["command"] == "state-changed" + + +def test_idle_to_connected_with_exception(manager): + """Test transitioning Manager from 'idle' to 'connected' state with an exception.""" + # Simulate an exception during the connection process + manager.consumer.send_message = lambda *args, **kwargs: ( + 1 / 0 # This will raise a ZeroDivisionError + ) + + with pytest.raises(ZeroDivisionError): + manager.trigger("connect", event=None) + + # State should still be 'idle' after the exception + assert manager.state == "idle" + + # Check that no messages were sent to the consumer + assert not manager.consumer.messages + + +def test_disconnect_valid(manager, monkeypatch): + """Test the valid disconnect transition in the Manager.""" + # Ensure the manager is in a state where it can disconnect + setup_manager_to_world_ready(manager, monkeypatch) + + # Mock needed methods and attributes + manager.on_disconnect = lambda event: None + + # Trigger the disconnect transition + manager.trigger("disconnect") + + # Check that the state has changed to 'idle' + assert manager.state == "idle" + + +def test_disconnect_invalid_process(manager, monkeypatch): + """ + Test the invalid disconnect transition in the Manager. + + Tests that the disconnect raises an Exception when the process + cannot be disconnected properly. + """ + # Ensure the manager is in a state where it can disconnect + setup_manager_to_application_running(manager, monkeypatch) + # Mock needed methods and attributes + monkeypatch.setattr("os.execl", lambda *args: None) + monkeypatch.setattr( + "manager.manager.manager.stop_process_and_children", + lambda proc: (_ for _ in ()).throw(Exception("Process cannot be stopped")), + ) + manager.application_process = DummyProc() + manager.terminate_harmonic_processes = lambda: None + + # Trigger the disconnect transition + manager.trigger("disconnect") + + # Optional: can test that exception has been logged + + # Check that the state changed + assert manager.state == "idle" diff --git a/test/test_connected_to_world_ready.py b/test/test_connected_to_world_ready.py new file mode 100644 index 0000000..1ee88e3 --- /dev/null +++ b/test/test_connected_to_world_ready.py @@ -0,0 +1,156 @@ +"""Tests for transitioning Manager from 'connected' to 'world_ready' state.""" + +import pytest +from manager.libs.launch_world_model import ConfigurationModel +from test_utils import setup_manager_to_connected +from manager.manager.launcher.launcher_robot import worlds + +valid_world_cfg = ConfigurationModel( + type=next(iter(worlds)), launch_file_path="/path/to/launch_file.launch" +).model_dump() + +valid_robot_cfg = { + "world": None, # No robot specified + "type": next(iter(worlds)), # Use the first world type + "start_pose": [0, 0, 0, 0, 0, 0], + "launch_file_path": "/path/to/robot_launch_file.launch", +} + +invalid_world_cfg = { + "world": "bad_world", + "type": next(iter(worlds)), + "launch_file_path": None, # No launch file specified +} # missing launch_file_path + + +def test_connected_to_world_ready(manager, monkeypatch): + """Test transitioning Manager from 'connected' to 'world_ready' state.""" + # Initial state should be 'connected' + setup_manager_to_connected(manager, monkeypatch) + + event_data = {"world": valid_world_cfg, "robot": valid_robot_cfg} + manager.trigger("launch_world", data=event_data) + + # State should now be 'world_ready' + assert manager.state == "world_ready" + + # Check that the consumer received the expected state change message + msgs = manager.consumer.messages + state_change_msgs = [ + msg for msg in msgs if msg[1].get("command") == "state-changed" + ] + assert state_change_msgs + assert state_change_msgs[-1][0][0]["state"] == "world_ready" + + +def test_launch_world_with_invalid_world_config(manager, monkeypatch): + """Test that launching world with invalid world config logs error.""" + # Initial state should be 'connected' + setup_manager_to_connected(manager, monkeypatch) + + # Patch ConfigurationManager.validate to simulate a failed validation + # but still return a dummy config + class DummyConfig: + def model_dump(self): + return {} + + def fake_validate(cfg): + # Simulate logging error, but return a dummy config to avoid UnboundLocalError + return DummyConfig() + + def fake_prepare_custom_universe(cfg): + raise ValueError("Invalid world configuration") + + monkeypatch.setattr( + "manager.libs.launch_world_model.ConfigurationManager.validate", fake_validate + ) + manager.prepare_custom_universe = fake_prepare_custom_universe + + event_data = {"world": invalid_world_cfg, "robot": valid_robot_cfg} + + with pytest.raises(ValueError): + manager.trigger("launch_world", data=event_data) + # Assert that world_launcher is created but has no useful config + assert manager.world_launcher is not None + assert ( + getattr(manager.world_launcher, "world", None) is None + or manager.world_launcher.world == "" + ) + + +def test_launch_world_with_invalid_robot_config(manager, monkeypatch): + """Test that launching world with invalid robot config logs error.""" + # Initial state should be 'connected' + setup_manager_to_connected(manager, monkeypatch) + + # Patch ConfigurationManager.validate to simulate a failed validation + # but still return a dummy config + class DummyConfig: + def model_dump(self): + return {} + + def fake_validate(cfg): + # Simulate logging error, but return a dummy config to avoid UnboundLocalError + return DummyConfig() + + monkeypatch.setattr( + "manager.libs.launch_world_model.ConfigurationManager.validate", fake_validate + ) + + invalid_robot_cfg = {"name": "", "type": ""} # Invalid robot config + event_data = { + "world": valid_world_cfg, + "robot": invalid_robot_cfg, + } + + with pytest.raises(ValueError): + # This should raise an error due to invalid robot config + manager.trigger("launch_world", data=event_data) + + # Assert that robot_launcher is not created + assert manager.robot_launcher is None + assert ( + getattr(manager.robot_launcher, "robot_config", None) is None + or manager.robot_launcher.robot_config == {} + ) + + +def test_launch_world_with_no_world_config(manager, monkeypatch): + """Test that launching world with no world config does not raise an error.""" + # Initial state should be 'connected' + setup_manager_to_connected(manager, monkeypatch) + + event_data = { + "world": { + "world": None, # No world specified + "launch_file_path": None, # No launch file specified + "type": None, + }, # No world specified + "robot": valid_robot_cfg, + } + + manager.trigger("launch_world", data=event_data) + + # State should now be 'world_ready' + assert manager.state == "world_ready" + assert manager.world_launcher is None + + +def test_launch_world_with_no_robot_config(manager, monkeypatch): + """Test that launching world with no robot config does not raise an error.""" + # Initial state should be 'connected' + setup_manager_to_connected(manager, monkeypatch) + + event_data = { + "world": valid_world_cfg, + "robot": { + "world": None, + "robot_config": None, + "type": None, + }, # No robot specified + } + manager.trigger("launch_world", data=event_data) + + # State should now be 'world_ready' + assert manager.state == "world_ready" + assert manager.robot_launcher is None diff --git a/test/test_resume_and_pause_transitions.py b/test/test_resume_and_pause_transitions.py new file mode 100644 index 0000000..83cfbbb --- /dev/null +++ b/test/test_resume_and_pause_transitions.py @@ -0,0 +1,86 @@ +"""Tests for resume and pause transitions in the Manager.""" + +import pytest +from transitions import MachineError +from test_utils import setup_manager_to_application_running +from test_utils import setup_manager_to_tools_ready + + +class DummyProc: + """Dummy process class for testing suspend and resume methods.""" + + def suspend(self): + """Simulate suspending the process.""" + pass + + def resume(self): + """Simulate resuming the process.""" + pass + + +def test_pause_transition_valid(manager, monkeypatch): + """Test the valid pause transition in the Manager.""" + # Ensure the manager is in a state where it can pause + setup_manager_to_application_running(manager, monkeypatch) + # Mock needed methods and attributes + monkeypatch.setattr("psutil.Process", lambda pid: DummyProc()) + + manager.pause_sim = lambda: None + + # Trigger the pause transition + manager.trigger("pause") + # Check that the state has changed to 'paused' + assert manager.state == "paused" + # Verify that the consumer received the correct message + assert manager.consumer.last_message[0][0]["state"] == "paused" + + +def test_pause_transition_invalid_machine_error(manager, monkeypatch): + """Test the invalid pause transition in the Manager.""" + # Ensure the manager is in a state where it can pause + setup_manager_to_tools_ready(manager, monkeypatch) + + # Mock needed methods and attributes + monkeypatch.setattr("psutil.Process", lambda pid: DummyProc()) + + # Trigger the pause transition + with pytest.raises(MachineError): + manager.trigger("pause") + # Check that the state has changed to 'paused' + assert manager.state == "tools_ready" + + +def test_resume_transition_valid(manager, monkeypatch): + """Test the valid resume transition in the Manager.""" + # Ensure the manager is in a paused state + setup_manager_to_application_running(manager, monkeypatch) + + # Mock needed methods and attributes + monkeypatch.setattr("psutil.Process", lambda pid: DummyProc()) + manager.pause_sim = lambda: None + + # Move to 'paused' state first + manager.trigger("pause") + assert manager.state == "paused" + + # Trigger the resume transition + manager.trigger("resume") + # Check that the state has changed to 'application_running' + assert manager.state == "application_running" + # Verify that the consumer received the correct message + assert manager.consumer.last_message[0][0]["state"] == "application_running" + + +def test_resume_transition_invalid(manager, monkeypatch): + """Test the invalid resume transition in the Manager.""" + # Ensure the manager is in a state where it can resume + setup_manager_to_application_running(manager, monkeypatch) + + # Mock needed methods and attributes + monkeypatch.setattr("psutil.Process", lambda pid: DummyProc()) + + # Trigger the resume transition + with pytest.raises(MachineError): + manager.trigger("resume") + # Check that the state has not changed + assert manager.state == "application_running" diff --git a/test/test_terminate_transitions.py b/test/test_terminate_transitions.py new file mode 100644 index 0000000..34dbf12 --- /dev/null +++ b/test/test_terminate_transitions.py @@ -0,0 +1,148 @@ +"""Tests for resume and pause transitions in the Manager.""" + +import pytest +from transitions import MachineError +from conftest import DummyServer +from test_utils import setup_manager_to_application_running +from test_utils import setup_manager_to_world_ready +from test_utils import setup_manager_to_tools_ready + + +class DummyProc: + """Dummy process class for testing suspend and resume methods.""" + + def suspend(self): + """Simulate suspending the process.""" + pass + + def resume(self): + """Simulate resuming the process.""" + pass + + +class DummyToolsLauncher: + """Dummy class for testing visualization launching.""" + + def launch(self): + """Simulate launching the visualization.""" + pass + + def stop(self): + """Simulate stopping the visualization.""" + pass + + def terminate(self): + """Simulate terminating the visualization.""" + pass + + +def test_terminate_application_valid(manager, monkeypatch): + """Test the valid terminate application transition in the Manager.""" + # Ensure the manager is in a state where it can terminate + setup_manager_to_application_running(manager, monkeypatch) + # Mock needed methods and attributes + + def dummy_stop_process_and_children(proc): + pass + + monkeypatch.setattr( + "manager.manager.manager.stop_process_and_children", + dummy_stop_process_and_children, + ) + manager.pause_sim = lambda: None + manager.reset_sim = lambda: None + + # Trigger the terminate transition + manager.trigger("terminate_application") + # Check that the state has changed to 'visualization_ready' + assert manager.state == "tools_ready" + + +def test_terminate_application_invalid_machine_error(manager, monkeypatch): + """ + Test the valid terminate application transition in the Manager. + + Ensure that the transition raises an error when executed from an invalid state. + """ + # Ensure the manager is in a state where it can terminate + setup_manager_to_world_ready(manager, monkeypatch) + # Mock needed methods and attributes + + def dummy_stop_process_and_children(proc): + pass + + monkeypatch.setattr( + "manager.manager.manager.stop_process_and_children", + dummy_stop_process_and_children, + ) + manager.pause_sim = lambda: None + manager.reset_sim = lambda: None + + # Trigger the terminate transition + with pytest.raises(MachineError): + manager.trigger("terminate_application") + # Check that the state has not changed + assert manager.state == "world_ready" + + +def test_terminate_tools_valid(manager, monkeypatch): + """Test the valid terminate visualization transition in the Manager.""" + # Ensure the manager is in a state where it can stop + setup_manager_to_tools_ready(manager, monkeypatch) + # Mock needed methods and attributes + manager.visualization_launcher = DummyToolsLauncher() + manager.terminate_harmonic_processes = lambda: None + + # Trigger the stop transition + manager.trigger("terminate_tools") + # Check that the state has changed to 'world_ready' + assert manager.state == "world_ready" + + +def test_terminate_tools_invalid_machine_error(manager, monkeypatch): + """ + Test the invalid terminate visualization transition in the Manager. + + Ensure that the transition raises an error when executed from an invalid state. + """ + monkeypatch.setattr( + "manager.libs.applications.compatibility.server.Server", DummyServer + ) + # Ensure the manager is in a state where it can stop + setup_manager_to_application_running(manager, monkeypatch) + + # Trigger the stop transition + with pytest.raises(MachineError): + manager.trigger("terminate_tools") + # Check that the state has not changed + assert manager.state == "application_running" + + +def test_terminate_universe_valid(manager, monkeypatch): + """Test the valid terminate_universe transition in the Manager.""" + # Ensure the manager is in a state where it can stop + setup_manager_to_world_ready(manager, monkeypatch) + # Mock needed methods and attributes + manager.visualization_launcher = DummyToolsLauncher() + manager.terminate_harmonic_processes = lambda: None + + # Trigger the stop transition + manager.trigger("terminate_universe") + # Check that the state has changed to 'connected' + assert manager.state == "connected" + + +def test_terminate_universe_invalid_machine_error(manager, monkeypatch): + """ + Test the invalid terminate_universe transition in the Manager. + + Ensure that the transition raises an error when executed from an invalid state. + """ + # Ensure the manager is in a state where it can stop + setup_manager_to_application_running(manager, monkeypatch) + + # Trigger the stop transition + with pytest.raises(MachineError): + manager.trigger("terminate_universe") + # Check that the state has not changed + assert manager.state == "application_running" diff --git a/test/test_tools_ready_to_application_running.py b/test/test_tools_ready_to_application_running.py new file mode 100644 index 0000000..29231e9 --- /dev/null +++ b/test/test_tools_ready_to_application_running.py @@ -0,0 +1,163 @@ +"""Tests for transitioning Manager from 'connected' to 'world_ready' state.""" + +import io +import pytest +import builtins +from test_utils import setup_manager_to_tools_ready + +valid_app_data = { + "entrypoint": "main.py", + "linter": "pylint", + "code": "data:base64,ZmFrZV9jb2Rl", +} + + +def test_tools_ready_to_application_running_valid(manager, monkeypatch): + """ + Test transitioning from 'tools_ready' to 'application_running' state. + + This test verifies the state transitions in case of valid values. + """ + setup_manager_to_tools_ready(manager, monkeypatch) + + class DummyProc: + def __init__(self): + self.pid = 123 + + def kill(self): + pass + + def suspend(self): + pass + + original_open = builtins.open + + def fake_open(file, mode="r", *args, **kwargs): + if file == "/workspace/code/app.zip": + if "w" in mode: + return io.BytesIO() + elif "r" in mode: + return io.BytesIO(b"fake zip content") + return original_open(file, mode, *args, **kwargs) + + # Mock file system and subprocess operations + monkeypatch.setattr("os.path.isfile", lambda path: True) + monkeypatch.setattr("os.listdir", lambda path: ["0", "1", "2"]) + monkeypatch.setattr("builtins.open", fake_open) + monkeypatch.setattr("subprocess.Popen", lambda *a, **k: DummyProc()) + monkeypatch.setattr("os.mkdir", lambda path: None) + monkeypatch.setattr("os.path.exists", lambda path: True) + monkeypatch.setattr("shutil.rmtree", lambda path: None) + monkeypatch.setattr( + "zipfile.ZipFile", + lambda *a, **k: type( + "Zip", + (), + {"extractall": lambda self, path: None, "close": lambda self: None}, + )(), + ) + monkeypatch.setattr("base64.b64decode", lambda s: b"print('hello')") + monkeypatch.setattr( + "manager.manager.manager.Manager.unpause_sim", lambda self: None + ) + # Mock linter to return no errors + manager.linter.evaluate_code = lambda code, ros_version: "" + # Trigger application running state + manager.trigger( + "run_application", + data=valid_app_data, + ) + # Assert state is now application_running + assert manager.state == "application_running" + + +def test_on_run_application_missing_code(manager, monkeypatch): + """Test running application with missing code file.""" + setup_manager_to_tools_ready(manager, monkeypatch) + + # Mock file system so code file is missing + monkeypatch.setattr("os.path.isfile", lambda path: False) + # Mock open for app.zip to avoid FileNotFoundError + original_open = builtins.open + + def fake_open(file, mode="r", *args, **kwargs): + if file == "/workspace/code/app.zip": + import io + + return io.BytesIO() + return original_open(file, mode, *args, **kwargs) + + monkeypatch.setattr("builtins.open", fake_open) + # Mock other unimportant operations + monkeypatch.setattr("os.listdir", lambda path: ["0", "1", "2"]) + monkeypatch.setattr("subprocess.Popen", lambda *a, **k: None) + monkeypatch.setattr("os.mkdir", lambda path: None) + monkeypatch.setattr("os.path.exists", lambda path: True) + monkeypatch.setattr("shutil.rmtree", lambda path: None) + monkeypatch.setattr( + "zipfile.ZipFile", + lambda *a, **k: type( + "Zip", + (), + {"extractall": lambda self, path: None, "close": lambda self: None}, + )(), + ) + monkeypatch.setattr("base64.b64decode", lambda s: b"print('hello')") + monkeypatch.setattr( + "manager.manager.manager.Manager.unpause_sim", lambda self: None + ) + # Mock linter to return no errors + manager.linter.evaluate_code = lambda code, ros_version: "" + # Prep data + data = valid_app_data + # Trigger run_application with missing code + with pytest.raises(Exception, match="User code not found"): + manager.trigger("run_application", data=data) + assert manager.application_process is None + # Ensure state is still tools_ready + assert manager.state == "tools_ready" + + +def test_on_run_application_corrupt_zip(manager, monkeypatch): + """Test running application with corrupt zip/base64.""" + setup_manager_to_tools_ready(manager, monkeypatch) + + # Mock file system so code dir exists + monkeypatch.setattr("os.path.isfile", lambda path: True) + monkeypatch.setattr("os.path.exists", lambda path: True) + monkeypatch.setattr("os.mkdir", lambda path: None) + monkeypatch.setattr("os.listdir", lambda path: ["0", "1", "2"]) + monkeypatch.setattr("shutil.rmtree", lambda path: None) + # Mock open for app.zip to avoid FileNotFoundError + original_open = builtins.open + + def fake_open(file, mode="r", *args, **kwargs): + if file == "/workspace/code/app.zip": + import io + + return io.BytesIO() + return original_open(file, mode, *args, **kwargs) + + monkeypatch.setattr("builtins.open", fake_open) + # Simulate corrupt base64 decoding + monkeypatch.setattr( + "base64.b64decode", lambda s: (_ for _ in ()).throw(Exception("Corrupt base64")) + ) + # Mock other unimportant operations + monkeypatch.setattr( + "zipfile.ZipFile", + lambda *a, **k: type( + "Zip", + (), + {"extractall": lambda self, path: None, "close": lambda self: None}, + )(), + ) + monkeypatch.setattr( + "manager.manager.manager.Manager.unpause_sim", lambda self: None + ) + manager.linter.evaluate_code = lambda code, ros_version: "" + data = valid_app_data + with pytest.raises(Exception): + manager.trigger("run_application", data=data) + assert manager.application_process is None + assert manager.state == "tools_ready" diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 0000000..6d45b94 --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,148 @@ +"""Utilities for testing the manager state transitions.""" + +import builtins +import io + +from manager.manager.launcher.launcher_robot import worlds + + +def setup_manager_to_connected(manager, monkeypatch): + """Move manager to connected state.""" + manager.trigger("connect", event=None) + assert manager.state == "connected" + + +def setup_manager_to_world_ready(manager, monkeypatch): + """Move manager to world_ready state.""" + + setup_manager_to_connected(manager, monkeypatch) + + # Use ConfigurationModel for valid world config + from manager.libs.launch_world_model import ConfigurationModel + + valid_world_cfg = ConfigurationModel( + type=next(iter(worlds)), # Use the first world type + launch_file_path="/path/to/launch_file.launch", + ).model_dump() + + event_data = { + "world": valid_world_cfg, + "robot": { + "world": None, # No robot specified + "type": next(iter(worlds)), # Use the first world type + "start_pose": [0, 0, 0, 0, 0, 0], + "launch_file_path": "/path/to/robot_launch_file.launch", + # "robot_config": {"name": "test_robot", "type": worlds[0].robot_type}, + }, + } + manager.trigger("launch_world", data=event_data) + + # State should now be 'world_ready' + assert manager.state == "world_ready" + + # Patch LauncherWorld to avoid starting real worlds + class DummyConsumer: + def __init__(self): + self.launched = False + + def consume(self, *args, **kwargs): + pass + + class DummyLauncherWorld: + def __init__(self, *args, **kwargs): + self.launched = False + + +def setup_manager_to_tools_ready(manager, monkeypatch): + """Move manager to visualization_ready state.""" + + # Move to 'world_ready' state first + setup_manager_to_world_ready(manager, monkeypatch) + + class DummyToolsLauncher: + def __init__(self, *args, **kwargs): + self.launchers = [] + + def run(self, consumer=None): + # Simulate running the tools launcher + return + + def terminate(self): + pass + + monkeypatch.setattr("manager.manager.manager.LauncherTools", DummyToolsLauncher) + + # Trigger visualization ready state + manager.trigger( + "prepare_tools", + data={ + "tools": [], + "config": None, + }, + ) + + assert manager.state == "tools_ready" + + +def setup_manager_to_application_running(manager, monkeypatch): + """Move manager to application_running state.""" + + setup_manager_to_tools_ready(manager, monkeypatch) + + class DummyProc: + def __init__(self): + self.pid = 123 + + def kill(self): + pass + + def suspend(self): + pass + + original_open = builtins.open + + def fake_open(file, mode="r", *args, **kwargs): + if file == "/workspace/code/app.zip": + if "w" in mode: + return io.BytesIO() + elif "r" in mode: + return io.BytesIO(b"fake zip content") + return original_open(file, mode, *args, **kwargs) + + # Mock file system and subprocess operations + monkeypatch.setattr("os.path.isfile", lambda path: True) + monkeypatch.setattr("os.listdir", lambda path: ["0", "1", "2"]) + monkeypatch.setattr("builtins.open", fake_open) + monkeypatch.setattr("subprocess.Popen", lambda *a, **k: DummyProc()) + monkeypatch.setattr("os.mkdir", lambda path: None) + monkeypatch.setattr("os.path.exists", lambda path: True) + monkeypatch.setattr("shutil.rmtree", lambda path: None) + monkeypatch.setattr( + "zipfile.ZipFile", + lambda *a, **k: type( + "Zip", + (), + {"extractall": lambda self, path: None, "close": lambda self: None}, + )(), + ) + monkeypatch.setattr("base64.b64decode", lambda s: b"print('hello')") + monkeypatch.setattr( + "manager.manager.manager.Manager.unpause_sim", lambda self: None + ) + # Mock linter to return no errors + manager.linter.evaluate_code = lambda code, ros_version: "" + # Trigger application running state + manager.trigger( + "run_application", + data={ + "type": "robotics-academy", + "code": "data:base64,ZmFrZV9jb2Rl", + "entrypoint": "main.py", + "linter": "pylint", + }, + ) + # Assert state is now application_running + assert manager.state == "application_running" + + # Ensure application process is not None + manager.application_process = DummyProc() diff --git a/test/test_world_ready_to_tools_ready.py b/test/test_world_ready_to_tools_ready.py new file mode 100644 index 0000000..cced1e8 --- /dev/null +++ b/test/test_world_ready_to_tools_ready.py @@ -0,0 +1,54 @@ +"""Tests for transitioning Manager from 'connected' to 'world_ready' state.""" + +import pytest +from test_utils import setup_manager_to_world_ready + + +def test_on_prepare_tools_valid(manager, monkeypatch): + """ + Test the transition from 'world_ready' to 'visualization_ready' state. + + Tests the preparation of the visualization state by triggering the + prepare_tools event with valid values. + """ + # Ensure the manager is in 'world_ready' state + setup_manager_to_world_ready(manager, monkeypatch) + # Trigger the prepare_tools event + manager.trigger( + "prepare_tools", + data={ + "tools": ["tool1", "tool2"], + "config": {"param1": "value1", "param2": "value2"}, + }, + ) + + # Check if the state has transitioned to 'visualization_ready' + assert manager.state == "tools_ready" + + print(manager.consumer.last_message) + + # Verify that the correct message was sent + assert manager.consumer.last_message[0][0]["state"] == "tools_ready" + + +def test_on_prepare_tools_invalid(manager, monkeypatch): + """ + Test the transition from 'world_ready' to 'visualization_ready' state. + + Tests that the prepare_tools event does not change the state + when invalid values are provided. + """ + # Ensure the manager is in 'world_ready' state + setup_manager_to_world_ready(manager, monkeypatch) + # Trigger the prepare_tools event with invalid data + with pytest.raises(KeyError): + # This should raise an error due to missing 'type' in data + manager.trigger( + "prepare_tools", + data={ + "file": "test_file", + }, + ) + + # Check if the state remains 'world_ready' + assert manager.state == "world_ready"