""" Backend dla nftables """ import subprocess from .base import FirewallBackend class NFTablesBackend(FirewallBackend): """Backend dla nftables""" def __init__(self, config): super().__init__(config) self.table = config.get('backend_nftables', 'table_name', fallback='filter') self.chain = config.get('backend_nftables', 'chain_name', fallback='logmon_block') if self.test_availability(): self.setup_chain() else: self.logger.warning("nftables not available") def test_availability(self): """Sprawdza czy nftables jest dostępny""" try: result = subprocess.run( ['nft', '--version'], capture_output=True, timeout=5 ) return result.returncode == 0 except: return False def setup_chain(self): """Tworzy chain jeśli nie istnieje""" try: # Sprawdź czy chain istnieje cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain] result = subprocess.run(cmd, capture_output=True, timeout=5) if result.returncode != 0: # Utwórz chain z priorytetem input cmd = [ 'nft', 'add', 'chain', 'inet', self.table, self.chain, '{', 'type', 'filter', 'hook', 'input', 'priority', '0', ';', '}' ] subprocess.run(cmd, check=True, timeout=5) self.logger.info(f"Created nftables chain: {self.chain}") except Exception as e: self.logger.error(f"Error setting up nftables chain: {e}") def ban_ip(self, ip, duration): """Banuje IP używając nftables""" try: # Dodaj regułę DROP cmd = [ 'nft', 'add', 'rule', 'inet', self.table, self.chain, 'ip', 'saddr', ip, 'counter', 'drop', 'comment', f'"LogMon ban {duration}s"' ] result = subprocess.run(cmd, capture_output=True, timeout=5) if result.returncode == 0: self.logger.debug(f"nftables ban successful for {ip}") return True else: self.logger.error(f"nftables ban failed: {result.stderr.decode()}") return False except Exception as e: self.logger.error(f"Error banning IP with nftables: {e}") return False def unban_ip(self, ip): """Usuwa ban używając nftables""" try: # Znajdź handle reguły cmd = ['nft', '-a', 'list', 'chain', 'inet', self.table, self.chain] result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) if result.returncode != 0: return False # Parsuj output i znajdź handle for line in result.stdout.split('\n'): if ip in line and '# handle' in line: try: handle = line.split('# handle')[1].strip() # Usuń regułę cmd = [ 'nft', 'delete', 'rule', 'inet', self.table, self.chain, 'handle', handle ] subprocess.run(cmd, check=True, timeout=5) self.logger.debug(f"nftables unban successful for {ip}") return True except Exception as e: self.logger.error(f"Error parsing handle: {e}") return False except Exception as e: self.logger.error(f"Error unbanning IP with nftables: {e}") return False def is_banned(self, ip): """Sprawdza czy IP jest zbanowany""" try: cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain] result = subprocess.run(cmd, capture_output=True, text=True, timeout=5) return ip in result.stdout except Exception as e: self.logger.error(f"Error checking ban status: {e}") return False