From 7f435231c6bac191da0092d78f2149706c8f815a Mon Sep 17 00:00:00 2001 From: Chris Ray Date: Thu, 7 May 2026 22:33:51 -0400 Subject: [PATCH] [BUG] Strip ANSI escape codes from Terraform/Ansible output in API and logs (fixes #1366) - Add strip_ansi() utility to attack_range/utils.py using regex - Terraform: pass --no-color via no_color=IsFlagged (also fixes #1367 where colored output was not captured as error text) - Terraform: strip ANSI from init/apply/destroy output before logging or raising RuntimeError - Ansible: set ANSIBLE_NO_COLOR=1 via envvars in ansible_runner.run() - Ansible: strip ANSI from stdout artifact files and event messages before logging - API: strip ANSI from exception messages and tracebacks at all error capture points (build_vpn, build_lab, destroy) Defense in depth: ANSI codes are suppressed at the source (flags) AND stripped at every boundary where they enter the system. --- api/app.py | 23 +++++++++--------- attack_range/managers/ansible_manager.py | 27 ++++++++++++++-------- attack_range/managers/terraform_manager.py | 26 +++++++++++++-------- attack_range/utils.py | 19 +++++++++++++++ 4 files changed, 64 insertions(+), 31 deletions(-) diff --git a/api/app.py b/api/app.py index a6dac0be..8f2696f5 100644 --- a/api/app.py +++ b/api/app.py @@ -13,6 +13,7 @@ from typing import Dict, Any, Optional, Tuple, List from flask import jsonify, request +from attack_range.utils import strip_ansi from flask_openapi3 import OpenAPI, Info, Tag from flask_cors import CORS from pydantic import ValidationError @@ -490,16 +491,16 @@ def run_build_vpn_phase(config: Dict[str, Any], config_path: str, attack_range_i with operations_lock: running_operations[attack_range_id]["status"] = "error" running_operations[attack_range_id]["end_time"] = datetime.now().isoformat() - running_operations[attack_range_id]["error"] = str(e) + running_operations[attack_range_id]["error"] = strip_ansi(str(e)) running_operations[attack_range_id]["error_phase"] = "build_vpn" - running_operations[attack_range_id]["traceback"] = traceback.format_exc() + running_operations[attack_range_id]["traceback"] = strip_ansi(traceback.format_exc()) # Update error status in config file (via controller if available, else direct write) try: controller = AttackRangeController(config, config_path=config_path) - controller.config_manager.update_status("error", error=str(e), error_phase="build_vpn") + controller.config_manager.update_status("error", error=strip_ansi(str(e)), error_phase="build_vpn") except Exception: - _write_config_error_status(config_path, str(e), "build_vpn") + _write_config_error_status(config_path, strip_ansi(str(e)), "build_vpn") def run_build_lab_phase(attack_range_id: str): @@ -549,18 +550,18 @@ def run_build_lab_phase(attack_range_id: str): config_path = get_config_path_from_attack_range_id(attack_range_id) running_operations[attack_range_id]["status"] = "error" running_operations[attack_range_id]["end_time"] = datetime.now().isoformat() - running_operations[attack_range_id]["error"] = str(e) + running_operations[attack_range_id]["error"] = strip_ansi(str(e)) running_operations[attack_range_id]["error_phase"] = "build_lab" - running_operations[attack_range_id]["traceback"] = traceback.format_exc() + running_operations[attack_range_id]["traceback"] = strip_ansi(traceback.format_exc()) # Update error status in config file (via controller if available, else direct write) if config_path: try: config = load_yaml_file(config_path) controller = AttackRangeController(config, config_path=config_path) - controller.config_manager.update_status("error", error=str(e), error_phase="build_lab") + controller.config_manager.update_status("error", error=strip_ansi(str(e)), error_phase="build_lab") except Exception: - _write_config_error_status(config_path, str(e), "build_lab") + _write_config_error_status(config_path, strip_ansi(str(e)), "build_lab") def run_destroy_operation( @@ -600,13 +601,13 @@ def run_destroy_operation( with operations_lock: running_operations[attack_range_id]["status"] = "failed" running_operations[attack_range_id]["end_time"] = datetime.now().isoformat() - running_operations[attack_range_id]["error"] = str(e) - running_operations[attack_range_id]["traceback"] = traceback.format_exc() + running_operations[attack_range_id]["error"] = strip_ansi(str(e)) + running_operations[attack_range_id]["traceback"] = strip_ansi(traceback.format_exc()) if config_path: try: c = AttackRangeController(config, config_path=config_path) - c.config_manager.update_status("failed", error=str(e)) + c.config_manager.update_status("failed", error=strip_ansi(str(e))) except Exception: pass diff --git a/attack_range/managers/ansible_manager.py b/attack_range/managers/ansible_manager.py index 6651ba61..be62656c 100644 --- a/attack_range/managers/ansible_manager.py +++ b/attack_range/managers/ansible_manager.py @@ -18,6 +18,7 @@ import logging import ansible_runner from typing import Optional, Dict, Any +from attack_range.utils import strip_ansi # Galaxy role that must always be updated to latest before VPN playbooks (vpn.yaml, vpn_config.yaml) WIREGUARD_GALAXY_ROLE = "p4t12ick.ar_wireguard_vpn" @@ -598,11 +599,15 @@ def run_ansible_playbook(self, playbook_name: str, extra_vars: dict = None) -> N extra_vars_parts = [f"-e {k}={shlex.quote(str(v))}" for k, v in extra_vars.items()] cmdline = f"{cmdline} {' '.join(extra_vars_parts)}" + ansible_env = os.environ.copy() + ansible_env["ANSIBLE_NO_COLOR"] = "1" + runner = ansible_runner.run( private_data_dir=self.ansible_dir, cmdline=cmdline, playbook=playbook_name, - verbosity=2, # Increased verbosity for better error diagnostics + verbosity=2, + envvars=ansible_env, ) if extra_vars_path and os.path.exists(extra_vars_path): @@ -633,12 +638,12 @@ def run_ansible_playbook(self, playbook_name: str, extra_vars: dict = None) -> N if event.get('event') in ['runner_on_failed', 'runner_on_unreachable', 'runner_on_error']: host = event_data.get('host', 'unknown') task = event_data.get('task', 'unknown') - msg = event_data.get('msg', 'No error message') + msg = strip_ansi(str(event_data.get('msg', 'No error message'))) error_events.append(f"Host: {host}, Task: {task}, Error: {msg}") if error_events: self.logger.error("Ansible playbook errors:") - for error in error_events[-10:]: # Show last 10 errors + for error in error_events[-10:]: self.logger.error(f" - {error}") # Try to read stdout/stderr from the runner's artifact directory @@ -646,9 +651,8 @@ def run_ansible_playbook(self, playbook_name: str, extra_vars: dict = None) -> N stdout_path = os.path.join(self.ansible_dir, 'artifacts', str(runner.config.ident), 'stdout') if os.path.exists(stdout_path): with open(stdout_path, 'r') as f: - stdout_content = f.read() + stdout_content = strip_ansi(f.read()) if stdout_content: - # Log last 50 lines of stdout lines = stdout_content.strip().split('\n') self.logger.error("Last 50 lines of Ansible output:") for line in lines[-50:]: @@ -721,11 +725,15 @@ def run_ansible_playbook_safe(self, playbook_name: str, extra_vars: dict = None) extra_vars_parts = [f"-e {k}={shlex.quote(str(v))}" for k, v in extra_vars.items()] cmdline = f"{cmdline} {' '.join(extra_vars_parts)}" + ansible_env = os.environ.copy() + ansible_env["ANSIBLE_NO_COLOR"] = "1" + runner = ansible_runner.run( private_data_dir=self.ansible_dir, cmdline=cmdline, playbook=playbook_name, - verbosity=2, # Increased verbosity for better error diagnostics + verbosity=2, + envvars=ansible_env, ) if extra_vars_path and os.path.exists(extra_vars_path): @@ -970,13 +978,13 @@ def run_ansible_playbook_safe(self, playbook_name: str, extra_vars: dict = None) if event.get('event') in ['runner_on_failed', 'runner_on_unreachable', 'runner_on_error']: host = event_data.get('host', 'unknown') task = event_data.get('task', 'unknown') - msg = event_data.get('msg', 'No error message') + msg = strip_ansi(str(event_data.get('msg', 'No error message'))) error_events.append(f"Host: {host}, Task: {task}, Error: {msg}") if error_events: error_details.extend(error_events) self.logger.error("Ansible playbook errors:") - for error in error_events[-10:]: # Show last 10 errors + for error in error_events[-10:]: self.logger.error(f" - {error}") # Try to read stdout/stderr from the runner's artifact directory @@ -984,9 +992,8 @@ def run_ansible_playbook_safe(self, playbook_name: str, extra_vars: dict = None) stdout_path = os.path.join(self.ansible_dir, 'artifacts', str(runner.config.ident), 'stdout') if os.path.exists(stdout_path): with open(stdout_path, 'r') as f: - stdout_content = f.read() + stdout_content = strip_ansi(f.read()) if stdout_content: - # Get last 50 lines of stdout lines = stdout_content.strip().split('\n') last_lines = lines[-50:] if len(lines) > 50 else lines error_details.append("Last Ansible output:") diff --git a/attack_range/managers/terraform_manager.py b/attack_range/managers/terraform_manager.py index 2ed4d722..0514db98 100644 --- a/attack_range/managers/terraform_manager.py +++ b/attack_range/managers/terraform_manager.py @@ -11,7 +11,8 @@ import subprocess import shutil import logging -from python_terraform import Terraform, IsNotFlagged +from python_terraform import Terraform, IsNotFlagged, IsFlagged +from attack_range.utils import strip_ansi class TerraformManager: @@ -110,7 +111,7 @@ def init(self, backend_was_created: bool = False) -> None: text=True ) if init_result.returncode != 0: - msg = init_result.stderr or init_result.stdout or "Terraform init failed" + msg = strip_ansi(init_result.stderr or init_result.stdout or "Terraform init failed") self.logger.error(f"Terraform init failed: {msg}") raise RuntimeError(msg) self.logger.info("Terraform initialized successfully") @@ -131,7 +132,7 @@ def init_for_destroy(self) -> None: text=True ) if init_result.returncode != 0: - msg = init_result.stderr or init_result.stdout or "Terraform init failed" + msg = strip_ansi(init_result.stderr or init_result.stdout or "Terraform init failed") self.logger.error(f"Terraform init failed: {msg}") raise RuntimeError(msg) self.logger.info("Terraform initialized successfully") @@ -144,9 +145,12 @@ def apply(self) -> None: """ self.logger.info("Applying terraform configuration...") return_code, stdout, stderr = self.terraform.apply( - capture_output="yes", skip_plan=True, no_color=IsNotFlagged + capture_output="yes", skip_plan=True, no_color=IsFlagged ) + stdout = strip_ansi(stdout) if stdout else stdout + stderr = strip_ansi(stderr) if stderr else stderr + if return_code != 0: self.logger.error("Terraform apply failed!") if stderr: @@ -189,31 +193,33 @@ def destroy(self) -> None: self.logger.info("Destroying terraform infrastructure...") return_code, stdout, stderr = self.terraform.destroy( capture_output="yes", - no_color=IsNotFlagged, + no_color=IsFlagged, force=IsNotFlagged, auto_approve=True, ) + stdout = strip_ansi(stdout) if stdout else stdout + stderr = strip_ansi(stderr) if stderr else stderr + if return_code != 0: - # Check if error is due to locked state error_output = stderr or stdout or "" if "Error acquiring the state lock" in error_output or "lock is currently held" in error_output.lower(): - # Try to extract lock ID from error message import re lock_id_match = re.search(r'Lock ID:\s*([a-f0-9-]+)', error_output, re.IGNORECASE) if lock_id_match: lock_id = lock_id_match.group(1) self.logger.warning(f"Terraform state is locked. Attempting to unlock with ID: {lock_id}") self.force_unlock(lock_id) - # Retry destroy after unlock self.logger.info("Retrying terraform destroy after unlock...") return_code, stdout, stderr = self.terraform.destroy( capture_output="yes", - no_color=IsNotFlagged, + no_color=IsFlagged, force=IsNotFlagged, auto_approve=True, ) - + stdout = strip_ansi(stdout) if stdout else stdout + stderr = strip_ansi(stderr) if stderr else stderr + if return_code != 0: self.logger.error(f"Terraform destroy failed: {stderr}") raise RuntimeError(stderr or "Terraform destroy failed") diff --git a/attack_range/utils.py b/attack_range/utils.py index a16ef7a0..386e19e5 100644 --- a/attack_range/utils.py +++ b/attack_range/utils.py @@ -6,11 +6,30 @@ """ import os +import re import uuid import yaml from typing import Dict, Any, Tuple, Optional from datetime import datetime +_ANSI_ESCAPE_RE = re.compile(r'\x1b\[[0-9;]*[a-zA-Z]') + + +def strip_ansi(text: str) -> str: + """ + Remove ANSI escape sequences from text. + + Subprocess output from tools like Terraform and Ansible may contain + terminal color codes (e.g., '\\x1b[31m') that are not readable in + web app logs, structured error messages, or API responses. + + :param text: String that may contain ANSI escape sequences + :return: String with all ANSI sequences removed + """ + if not text: + return text + return _ANSI_ESCAPE_RE.sub('', text) + def resolve_template_path(template: str, templates_dir: str) -> str: """