Files
logmon/backends/nftables.py
Mateusz Gruszczyński 7b41672d05 upload
2025-10-28 21:27:10 +01:00

121 lines
4.4 KiB
Python

"""
Backend dla nftables
"""
import subprocess
from .base import FirewallBackend
class NFTablesBackend(FirewallBackend):
"""Backend dla nftables"""
def __init__(self, config):
super().__init__(config)
self.table = config.get('backend_nftables', 'table_name',
fallback='filter')
self.chain = config.get('backend_nftables', 'chain_name',
fallback='logmon_block')
if self.test_availability():
self.setup_chain()
else:
self.logger.warning("nftables not available")
def test_availability(self):
"""Sprawdza czy nftables jest dostępny"""
try:
result = subprocess.run(
['nft', '--version'],
capture_output=True,
timeout=5
)
return result.returncode == 0
except:
return False
def setup_chain(self):
"""Tworzy chain jeśli nie istnieje"""
try:
# Sprawdź czy chain istnieje
cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode != 0:
# Utwórz chain z priorytetem input
cmd = [
'nft', 'add', 'chain', 'inet', self.table, self.chain,
'{', 'type', 'filter', 'hook', 'input', 'priority', '0', ';', '}'
]
subprocess.run(cmd, check=True, timeout=5)
self.logger.info(f"Created nftables chain: {self.chain}")
except Exception as e:
self.logger.error(f"Error setting up nftables chain: {e}")
def ban_ip(self, ip, duration):
"""Banuje IP używając nftables"""
try:
# Dodaj regułę DROP
cmd = [
'nft', 'add', 'rule', 'inet', self.table, self.chain,
'ip', 'saddr', ip, 'counter', 'drop',
'comment', f'"LogMon ban {duration}s"'
]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"nftables ban successful for {ip}")
return True
else:
self.logger.error(f"nftables ban failed: {result.stderr.decode()}")
return False
except Exception as e:
self.logger.error(f"Error banning IP with nftables: {e}")
return False
def unban_ip(self, ip):
"""Usuwa ban używając nftables"""
try:
# Znajdź handle reguły
cmd = ['nft', '-a', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode != 0:
return False
# Parsuj output i znajdź handle
for line in result.stdout.split('\n'):
if ip in line and '# handle' in line:
try:
handle = line.split('# handle')[1].strip()
# Usuń regułę
cmd = [
'nft', 'delete', 'rule', 'inet',
self.table, self.chain, 'handle', handle
]
subprocess.run(cmd, check=True, timeout=5)
self.logger.debug(f"nftables unban successful for {ip}")
return True
except Exception as e:
self.logger.error(f"Error parsing handle: {e}")
return False
except Exception as e:
self.logger.error(f"Error unbanning IP with nftables: {e}")
return False
def is_banned(self, ip):
"""Sprawdza czy IP jest zbanowany"""
try:
cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
return ip in result.stdout
except Exception as e:
self.logger.error(f"Error checking ban status: {e}")
return False