diff --git a/src/firewall/blocking.py b/src/firewall/blocking.py index ea9b115..fe4230f 100644 --- a/src/firewall/blocking.py +++ b/src/firewall/blocking.py @@ -5,6 +5,7 @@ import platform import tempfile import shutil +import ipaddress from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Set, List @@ -28,9 +29,26 @@ def __init__(self, block_duration: int, whitelist: Set[str]): # macOS-specific: pfctl configuration if self.platform == 'darwin': self._init_macos_firewall() - + + def _is_valid_ip(self, ip: str) -> bool: + """Validate IPv4, IPv6, or CIDR ranges""" + try: + # Supports IPv4, IPv6 and CIDR (ip_network supports ranges) + ipaddress.ip_network(ip, strict=False) + return True + except ValueError: + return False + + def block_ip(self, ip: str, reason: str) -> bool: """Block an IP address using the appropriate firewall system""" + + # Validate IP before doing anything + if not self._is_valid_ip(ip): + self.logger.error(f"Invalid IP address format: {ip}") + print(f"{Fore.YELLOW}⚠ INVALID IP: {ip}{Style.RESET_ALL}") + return False + if self._is_whitelisted(ip): self.logger.info(f"IP {ip} is whitelisted, not blocking") return False @@ -55,9 +73,15 @@ def block_ip(self, ip: str, reason: str) -> bool: else: self.logger.debug(f"IP {ip} already blocked") return True - + def unblock_ip(self, ip: str) -> bool: """Manually unblock a specific IP address""" + + # Validate IP + if not self._is_valid_ip(ip): + self.logger.error(f"Invalid IP address format (unblock): {ip}") + return False + with self.lock: if ip in self.blocked_ips: try: @@ -78,9 +102,9 @@ def unblock_ip(self, ip: str) -> bool: else: self.logger.warning(f"IP {ip} was not blocked") return False - + + def unblock_expired_ips(self) -> List[str]: - """Unblock IPs that have exceeded the block duration""" current_time = datetime.now() block_duration = timedelta(seconds=self.block_duration) unblocked_ips = [] @@ -96,17 +120,15 @@ def unblock_expired_ips(self) -> List[str]: unblocked_ips.append(ip) return unblocked_ips - + def _is_whitelisted(self, ip: str) -> bool: - """Check if IP is in whitelist""" return ip in self.whitelist - + def _execute_block_command(self, ip: str) -> bool: - """Execute platform-specific block command""" try: if self.platform == 'linux': return self._block_ip_linux(ip) - elif self.platform == 'darwin': # macOS + elif self.platform == 'darwin': return self._block_ip_macos(ip) elif self.platform == 'windows': return self._block_ip_windows(ip) @@ -116,13 +138,12 @@ def _execute_block_command(self, ip: str) -> bool: except Exception as e: self.logger.error(f"Platform-specific blocking failed: {e}") return False - + def _execute_unblock_command(self, ip: str) -> bool: - """Execute platform-specific unblock command""" try: if self.platform == 'linux': return self._unblock_ip_linux(ip) - elif self.platform == 'darwin': # macOS + elif self.platform == 'darwin': return self._unblock_ip_macos(ip) elif self.platform == 'windows': return self._unblock_ip_windows(ip) @@ -132,23 +153,52 @@ def _execute_unblock_command(self, ip: str) -> bool: except Exception as e: self.logger.error(f"Platform-specific unblocking failed: {e}") return False - + + def _block_ip_linux(self, ip: str) -> bool: - """Block IP using iptables on Linux""" - cmd = ['sudo', 'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'] - result = subprocess.run(cmd, check=True, capture_output=True, text=True) - return result.returncode == 0 - + """Block IPv4 or IPv6 using iptables / ip6tables""" + + try: + parsed = ipaddress.ip_network(ip, strict=False) + + # IPv6 + if isinstance(parsed, ipaddress.IPv6Network): + cmd = ['sudo', 'ip6tables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'] + + # IPv4 + else: + cmd = ['sudo', 'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'] + + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + return result.returncode == 0 + + except Exception as e: + self.logger.error(f"Linux block error for {ip}: {e}") + return False + def _unblock_ip_linux(self, ip: str) -> bool: - """Unblock IP using iptables on Linux""" - cmd = ['sudo', 'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP'] - result = subprocess.run(cmd, capture_output=True, text=True) - return result.returncode == 0 - + """Unblock IPv4 or IPv6""" + + try: + parsed = ipaddress.ip_network(ip, strict=False) + + # IPv6 + if isinstance(parsed, ipaddress.IPv6Network): + cmd = ['sudo', 'ip6tables', '-D', 'INPUT', '-s', ip, '-j', 'DROP'] + # IPv4 + else: + cmd = ['sudo', 'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP'] + + result = subprocess.run(cmd, capture_output=True, text=True) + return result.returncode == 0 + + except Exception as e: + self.logger.error(f"Linux unblock error for {ip}: {e}") + return False + + def _init_macos_firewall(self): - """Initialize macOS firewall (pfctl) - create table and rules""" try: - # Check if pfctl is available pfctl_path = shutil.which('pfctl') if not pfctl_path: self.logger.warning("pfctl not found.") @@ -157,7 +207,6 @@ def _init_macos_firewall(self): cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'show'] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: - # if doesn't exist, create dummy subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'add', '127.0.0.1'], capture_output=True, text=True) subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'delete', '127.0.0.1'], @@ -166,7 +215,6 @@ def _init_macos_firewall(self): cmd = ['sudo', 'pfctl', '-s', 'info'] result = subprocess.run(cmd, capture_output=True, text=True) if 'Status: Enabled' not in result.stdout: - # Try to enable pfctl (may require user interaction) self.logger.info("Attempting to enable pfctl...") subprocess.run(['sudo', 'pfctl', '-e'], capture_output=True, text=True) @@ -177,21 +225,12 @@ def _init_macos_firewall(self): self.logger.error(f"Failed to initialize macOS firewall: {e}") def _reload_macos_rules(self): - """Reload pfctl rules to ensure blocking rule is active""" try: - pf_conf_content = """# Simple Firewall - Dynamic IP Blocking Rules # This file is managed by Simple Firewall -# DO NOT EDIT MANUALLY - -# Table for blocked IPs table persist - -# Block all traffic from IPs in the blocked_ips table block drop in quick from to any block drop out quick from any to - -# Allow all other traffic (pass through) pass in all pass out all """ @@ -202,60 +241,36 @@ def _reload_macos_rules(self): f.write(pf_conf_content) cmd = ['sudo', 'pfctl', '-f', str(pf_conf_path)] - result = subprocess.run(cmd, capture_output=True, text=True) - - if result.returncode == 0: - self.logger.debug("pfctl rules loaded successfully") - else: - self.logger.warning(f"pfctl rule loading returned: {result.returncode}") - self.logger.debug(f"pfctl stderr: {result.stderr}") + subprocess.run(cmd, capture_output=True, text=True) except Exception as e: self.logger.error(f"Failed to reload macOS firewall rules: {e}") def _block_ip_macos(self, ip: str) -> bool: - """Block IP using pfctl on macOS""" try: - # First, add IP to a table cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'add', ip] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: - self.logger.debug(f"Successfully added {ip} to blocked_ips table") return True else: - - if 'already' in result.stderr.lower() or 'duplicate' in result.stderr.lower(): - self.logger.debug(f"IP {ip} already in blocked_ips table") + if 'already' in result.stderr.lower(): return True - else: - self.logger.error(f"Failed to add {ip} to blocked_ips table: {result.stderr}") - return False + return False except Exception as e: - self.logger.error(f"Exception while blocking IP {ip} on macOS: {e}") + self.logger.error(f"Exception while blocking IP macOS: {e}") return False def _unblock_ip_macos(self, ip: str) -> bool: - """Unblock IP using pfctl on macOS""" try: cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'delete', ip] result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode == 0: - self.logger.debug(f"Successfully removed {ip} from blocked_ips table") - return True - else: - if 'not found' in result.stderr.lower() or 'does not exist' in result.stderr.lower(): - self.logger.debug(f"IP {ip} not found in blocked_ips table (may have been already removed)") - return True - else: - self.logger.warning(f"Failed to remove {ip} from blocked_ips table: {result.stderr}") - return False + return result.returncode == 0 or "not found" in result.stderr.lower() except Exception as e: - self.logger.error(f"Exception while unblocking IP {ip} on macOS: {e}") + self.logger.error(f"Exception while unblocking IP macOS: {e}") return False - + def _block_ip_windows(self, ip: str) -> bool: - """Block IP using Windows Firewall (netsh)""" rule_name = f"SimpleFirewall_Block_{ip.replace('.', '_')}" cmd = [ 'netsh', 'advfirewall', 'firewall', 'add', 'rule', @@ -266,18 +281,12 @@ def _block_ip_windows(self, ip: str) -> bool: ] try: result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode == 0: - self.logger.debug(f"netsh add rule stdout: {result.stdout.strip()}") - return True - else: - self.logger.error(f"netsh add rule failed: rc={result.returncode} stdout={result.stdout.strip()} stderr={result.stderr.strip()}") - return False + return result.returncode == 0 except Exception as e: - self.logger.error(f"Exception when running netsh add rule: {e}") + self.logger.error(f"Exception running netsh add rule: {e}") return False def _unblock_ip_windows(self, ip: str) -> bool: - """Unblock IP using Windows Firewall (netsh)""" rule_name = f"SimpleFirewall_Block_{ip.replace('.', '_')}" cmd = [ 'netsh', 'advfirewall', 'firewall', 'delete', 'rule', @@ -285,28 +294,17 @@ def _unblock_ip_windows(self, ip: str) -> bool: ] try: result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode == 0: - self.logger.debug(f"netsh delete rule stdout: {result.stdout.strip()}") - return True - else: - # Sometimes netsh returns 1 when rule not found; log and return False - self.logger.error(f"netsh delete rule failed: rc={result.returncode} stdout={result.stdout.strip()} stderr={result.stderr.strip()}") - return False + return result.returncode == 0 except Exception as e: - self.logger.error(f"Exception when running netsh delete rule: {e}") + self.logger.error(f"Exception running netsh delete rule: {e}") return False - + def get_blocked_ips(self) -> Dict[str, str]: - """Get currently blocked IPs with their block times""" with self.lock: - return { - ip: block_time.isoformat() - for ip, block_time in self.blocked_ips.items() - } + return {ip: block_time.isoformat() for ip, block_time in self.blocked_ips.items()} def cleanup_all_blocks(self) -> List[str]: - """Remove all blocks (useful for shutdown)""" cleaned_ips = [] with self.lock: @@ -314,20 +312,17 @@ def cleanup_all_blocks(self) -> List[str]: if self.unblock_ip(ip): cleaned_ips.append(ip) - # macOS-specific: Clean up the entire table on shutdown + if self.platform == 'darwin' and cleaned_ips: try: - cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'flush'] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode == 0: - self.logger.info("Cleaned up all blocked IPs from pfctl table") + subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'flush'], + capture_output=True, text=True) except Exception as e: self.logger.warning(f"Failed to flush pfctl table: {e}") return cleaned_ips def get_stats(self) -> Dict[str, int]: - """Get blocking statistics""" with self.lock: return { 'currently_blocked': len(self.blocked_ips),