""" Backend dla ConfigServer Security & Firewall (CSF) """ import subprocess from pathlib import Path from .base import FirewallBackend class CSFBackend(FirewallBackend): """Backend dla ConfigServer Firewall""" def __init__(self, config): super().__init__(config) self.csf_path = config.get('backend_csf', 'csf_path', fallback='/usr/sbin/csf') if not self.test_availability(): self.logger.warning(f"CSF not found at {self.csf_path}") def test_availability(self): """Sprawdza czy CSF jest zainstalowany""" return Path(self.csf_path).exists() def ban_ip(self, ip, duration): """ Banuje IP używając CSF CSF używa: - csf -d IP "comment" - permanent deny - csf -td IP duration "comment" - temporary deny """ try: # Temporary deny na określony czas (w sekundach) cmd = [ self.csf_path, '-td', ip, str(duration), f"LogMon auto-ban" ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: self.logger.debug(f"CSF ban successful: {result.stdout.strip()}") return True else: self.logger.error(f"CSF ban failed: {result.stderr.strip()}") return False except subprocess.TimeoutExpired: self.logger.error(f"CSF ban command timed out for {ip}") return False except Exception as e: self.logger.error(f"Error banning IP with CSF: {e}") return False def unban_ip(self, ip): """Usuwa ban używając CSF""" try: # Remove temporary ban cmd = [self.csf_path, '-tr', ip] result = subprocess.run( cmd, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: self.logger.debug(f"CSF unban successful for {ip}") return True else: self.logger.warning(f"CSF unban may have failed: {result.stderr.strip()}") # CSF czasem zwraca error nawet gdy się udało return True except Exception as e: self.logger.error(f"Error unbanning IP with CSF: {e}") return False def is_banned(self, ip): """Sprawdza czy IP jest zbanowany w CSF""" try: cmd = [self.csf_path, '-g', ip] result = subprocess.run( cmd, capture_output=True, text=True, timeout=10 ) output = result.stdout.lower() return "deny" in output or "drop" in output except Exception as e: self.logger.error(f"Error checking ban status: {e}") return False