Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 91 additions & 96 deletions src/firewall/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import platform
import tempfile
import shutil
import ipaddress
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Set, List
Expand All @@ -28,9 +29,26 @@ def __init__(self, block_duration: int, whitelist: Set[str]):
# macOS-specific: pfctl configuration
if self.platform == 'darwin':
self._init_macos_firewall()


def _is_valid_ip(self, ip: str) -> bool:
"""Validate IPv4, IPv6, or CIDR ranges"""
try:
# Supports IPv4, IPv6 and CIDR (ip_network supports ranges)
ipaddress.ip_network(ip, strict=False)
return True
except ValueError:
return False


def block_ip(self, ip: str, reason: str) -> bool:
"""Block an IP address using the appropriate firewall system"""

# Validate IP before doing anything
if not self._is_valid_ip(ip):
self.logger.error(f"Invalid IP address format: {ip}")
print(f"{Fore.YELLOW}⚠ INVALID IP: {ip}{Style.RESET_ALL}")
return False

if self._is_whitelisted(ip):
self.logger.info(f"IP {ip} is whitelisted, not blocking")
return False
Expand All @@ -55,9 +73,15 @@ def block_ip(self, ip: str, reason: str) -> bool:
else:
self.logger.debug(f"IP {ip} already blocked")
return True

def unblock_ip(self, ip: str) -> bool:
"""Manually unblock a specific IP address"""

# Validate IP
if not self._is_valid_ip(ip):
self.logger.error(f"Invalid IP address format (unblock): {ip}")
return False

with self.lock:
if ip in self.blocked_ips:
try:
Expand All @@ -78,9 +102,9 @@ def unblock_ip(self, ip: str) -> bool:
else:
self.logger.warning(f"IP {ip} was not blocked")
return False



def unblock_expired_ips(self) -> List[str]:
"""Unblock IPs that have exceeded the block duration"""
current_time = datetime.now()
block_duration = timedelta(seconds=self.block_duration)
unblocked_ips = []
Expand All @@ -96,17 +120,15 @@ def unblock_expired_ips(self) -> List[str]:
unblocked_ips.append(ip)

return unblocked_ips

def _is_whitelisted(self, ip: str) -> bool:
"""Check if IP is in whitelist"""
return ip in self.whitelist

def _execute_block_command(self, ip: str) -> bool:
"""Execute platform-specific block command"""
try:
if self.platform == 'linux':
return self._block_ip_linux(ip)
elif self.platform == 'darwin': # macOS
elif self.platform == 'darwin':
return self._block_ip_macos(ip)
elif self.platform == 'windows':
return self._block_ip_windows(ip)
Expand All @@ -116,13 +138,12 @@ def _execute_block_command(self, ip: str) -> bool:
except Exception as e:
self.logger.error(f"Platform-specific blocking failed: {e}")
return False

def _execute_unblock_command(self, ip: str) -> bool:
"""Execute platform-specific unblock command"""
try:
if self.platform == 'linux':
return self._unblock_ip_linux(ip)
elif self.platform == 'darwin': # macOS
elif self.platform == 'darwin':
return self._unblock_ip_macos(ip)
elif self.platform == 'windows':
return self._unblock_ip_windows(ip)
Expand All @@ -132,23 +153,52 @@ def _execute_unblock_command(self, ip: str) -> bool:
except Exception as e:
self.logger.error(f"Platform-specific unblocking failed: {e}")
return False



def _block_ip_linux(self, ip: str) -> bool:
"""Block IP using iptables on Linux"""
cmd = ['sudo', 'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP']
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
return result.returncode == 0

"""Block IPv4 or IPv6 using iptables / ip6tables"""

try:
parsed = ipaddress.ip_network(ip, strict=False)

# IPv6
if isinstance(parsed, ipaddress.IPv6Network):
cmd = ['sudo', 'ip6tables', '-A', 'INPUT', '-s', ip, '-j', 'DROP']

# IPv4
else:
cmd = ['sudo', 'iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP']

result = subprocess.run(cmd, check=True, capture_output=True, text=True)
return result.returncode == 0

except Exception as e:
self.logger.error(f"Linux block error for {ip}: {e}")
return False

def _unblock_ip_linux(self, ip: str) -> bool:
"""Unblock IP using iptables on Linux"""
cmd = ['sudo', 'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP']
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0

"""Unblock IPv4 or IPv6"""

try:
parsed = ipaddress.ip_network(ip, strict=False)

# IPv6
if isinstance(parsed, ipaddress.IPv6Network):
cmd = ['sudo', 'ip6tables', '-D', 'INPUT', '-s', ip, '-j', 'DROP']
# IPv4
else:
cmd = ['sudo', 'iptables', '-D', 'INPUT', '-s', ip, '-j', 'DROP']

result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0

except Exception as e:
self.logger.error(f"Linux unblock error for {ip}: {e}")
return False


def _init_macos_firewall(self):
"""Initialize macOS firewall (pfctl) - create table and rules"""
try:
# Check if pfctl is available
pfctl_path = shutil.which('pfctl')
if not pfctl_path:
self.logger.warning("pfctl not found.")
Expand All @@ -157,7 +207,6 @@ def _init_macos_firewall(self):
cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'show']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
# if doesn't exist, create dummy
subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'add', '127.0.0.1'],
capture_output=True, text=True)
subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'delete', '127.0.0.1'],
Expand All @@ -166,7 +215,6 @@ def _init_macos_firewall(self):
cmd = ['sudo', 'pfctl', '-s', 'info']
result = subprocess.run(cmd, capture_output=True, text=True)
if 'Status: Enabled' not in result.stdout:
# Try to enable pfctl (may require user interaction)
self.logger.info("Attempting to enable pfctl...")
subprocess.run(['sudo', 'pfctl', '-e'], capture_output=True, text=True)

Expand All @@ -177,21 +225,12 @@ def _init_macos_firewall(self):
self.logger.error(f"Failed to initialize macOS firewall: {e}")

def _reload_macos_rules(self):
"""Reload pfctl rules to ensure blocking rule is active"""
try:

pf_conf_content = """# Simple Firewall - Dynamic IP Blocking Rules
# This file is managed by Simple Firewall
# DO NOT EDIT MANUALLY

# Table for blocked IPs
table <blocked_ips> persist

# Block all traffic from IPs in the blocked_ips table
block drop in quick from <blocked_ips> to any
block drop out quick from any to <blocked_ips>

# Allow all other traffic (pass through)
pass in all
pass out all
"""
Expand All @@ -202,60 +241,36 @@ def _reload_macos_rules(self):
f.write(pf_conf_content)

cmd = ['sudo', 'pfctl', '-f', str(pf_conf_path)]
result = subprocess.run(cmd, capture_output=True, text=True)

if result.returncode == 0:
self.logger.debug("pfctl rules loaded successfully")
else:
self.logger.warning(f"pfctl rule loading returned: {result.returncode}")
self.logger.debug(f"pfctl stderr: {result.stderr}")
subprocess.run(cmd, capture_output=True, text=True)

except Exception as e:
self.logger.error(f"Failed to reload macOS firewall rules: {e}")

def _block_ip_macos(self, ip: str) -> bool:
"""Block IP using pfctl on macOS"""
try:
# First, add IP to a table
cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'add', ip]
result = subprocess.run(cmd, capture_output=True, text=True)

if result.returncode == 0:
self.logger.debug(f"Successfully added {ip} to blocked_ips table")
return True
else:

if 'already' in result.stderr.lower() or 'duplicate' in result.stderr.lower():
self.logger.debug(f"IP {ip} already in blocked_ips table")
if 'already' in result.stderr.lower():
return True
else:
self.logger.error(f"Failed to add {ip} to blocked_ips table: {result.stderr}")
return False
return False
except Exception as e:
self.logger.error(f"Exception while blocking IP {ip} on macOS: {e}")
self.logger.error(f"Exception while blocking IP macOS: {e}")
return False

def _unblock_ip_macos(self, ip: str) -> bool:
"""Unblock IP using pfctl on macOS"""
try:
cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'delete', ip]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.debug(f"Successfully removed {ip} from blocked_ips table")
return True
else:
if 'not found' in result.stderr.lower() or 'does not exist' in result.stderr.lower():
self.logger.debug(f"IP {ip} not found in blocked_ips table (may have been already removed)")
return True
else:
self.logger.warning(f"Failed to remove {ip} from blocked_ips table: {result.stderr}")
return False
return result.returncode == 0 or "not found" in result.stderr.lower()
except Exception as e:
self.logger.error(f"Exception while unblocking IP {ip} on macOS: {e}")
self.logger.error(f"Exception while unblocking IP macOS: {e}")
return False

def _block_ip_windows(self, ip: str) -> bool:
"""Block IP using Windows Firewall (netsh)"""
rule_name = f"SimpleFirewall_Block_{ip.replace('.', '_')}"
cmd = [
'netsh', 'advfirewall', 'firewall', 'add', 'rule',
Expand All @@ -266,68 +281,48 @@ def _block_ip_windows(self, ip: str) -> bool:
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.debug(f"netsh add rule stdout: {result.stdout.strip()}")
return True
else:
self.logger.error(f"netsh add rule failed: rc={result.returncode} stdout={result.stdout.strip()} stderr={result.stderr.strip()}")
return False
return result.returncode == 0
except Exception as e:
self.logger.error(f"Exception when running netsh add rule: {e}")
self.logger.error(f"Exception running netsh add rule: {e}")
return False

def _unblock_ip_windows(self, ip: str) -> bool:
"""Unblock IP using Windows Firewall (netsh)"""
rule_name = f"SimpleFirewall_Block_{ip.replace('.', '_')}"
cmd = [
'netsh', 'advfirewall', 'firewall', 'delete', 'rule',
f'name={rule_name}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.debug(f"netsh delete rule stdout: {result.stdout.strip()}")
return True
else:
# Sometimes netsh returns 1 when rule not found; log and return False
self.logger.error(f"netsh delete rule failed: rc={result.returncode} stdout={result.stdout.strip()} stderr={result.stderr.strip()}")
return False
return result.returncode == 0
except Exception as e:
self.logger.error(f"Exception when running netsh delete rule: {e}")
self.logger.error(f"Exception running netsh delete rule: {e}")
return False


def get_blocked_ips(self) -> Dict[str, str]:
"""Get currently blocked IPs with their block times"""
with self.lock:
return {
ip: block_time.isoformat()
for ip, block_time in self.blocked_ips.items()
}
return {ip: block_time.isoformat() for ip, block_time in self.blocked_ips.items()}

def cleanup_all_blocks(self) -> List[str]:
"""Remove all blocks (useful for shutdown)"""
cleaned_ips = []

with self.lock:
for ip in list(self.blocked_ips.keys()):
if self.unblock_ip(ip):
cleaned_ips.append(ip)

# macOS-specific: Clean up the entire table on shutdown

if self.platform == 'darwin' and cleaned_ips:
try:
cmd = ['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'flush']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info("Cleaned up all blocked IPs from pfctl table")
subprocess.run(['sudo', 'pfctl', '-t', 'blocked_ips', '-T', 'flush'],
capture_output=True, text=True)
except Exception as e:
self.logger.warning(f"Failed to flush pfctl table: {e}")

return cleaned_ips

def get_stats(self) -> Dict[str, int]:
"""Get blocking statistics"""
with self.lock:
return {
'currently_blocked': len(self.blocked_ips),
Expand Down
Loading