""" Backend dla UFW (Uncomplicated Firewall) """ import subprocess from .base import FirewallBackend class UFWBackend(FirewallBackend): """Backend dla UFW""" def test_availability(self): """Sprawdza czy UFW jest dostępny""" try: result = subprocess.run( ['ufw', 'version'], capture_output=True, timeout=5 ) return result.returncode == 0 except: return False def ban_ip(self, ip, duration): """Banuje IP używając UFW""" try: # UFW nie wspiera natywnie timeout, więc używamy prostego deny cmd = ['ufw', 'deny', 'from', ip] result = subprocess.run(cmd, capture_output=True, timeout=5) if result.returncode == 0: self.logger.debug(f"UFW ban successful for {ip}") # UFW wymaga reload subprocess.run(['ufw', 'reload'], capture_output=True, timeout=5) return True else: self.logger.error(f"UFW ban failed: {result.stderr.decode()}") return False except Exception as e: self.logger.error(f"Error banning IP with UFW: {e}") return False def unban_ip(self, ip): """Usuwa ban używając UFW""" try: cmd = ['ufw', 'delete', 'deny', 'from', ip] result = subprocess.run(cmd, capture_output=True, timeout=5) if result.returncode == 0: self.logger.debug(f"UFW unban successful for {ip}") subprocess.run(['ufw', 'reload'], capture_output=True, timeout=5) return True else: self.logger.warning(f"UFW unban may have failed") return False except Exception as e: self.logger.error(f"Error unbanning IP with UFW: {e}") return False def is_banned(self, ip): """Sprawdza czy IP jest zbanowany""" try: cmd = ['ufw', 'status', 'numbered'] result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) # Szukaj IP z DENY for line in result.stdout.split('\n'): if ip in line and 'DENY' in line: return True return False except Exception as e: self.logger.error(f"Error checking ban status: {e}") return False