This commit is contained in:
Mateusz Gruszczyński
2025-10-28 21:27:10 +01:00
commit 7b41672d05
14 changed files with 1253 additions and 0 deletions

63
backends/base.py Normal file
View File

@@ -0,0 +1,63 @@
"""
Bazowa klasa dla backendów firewall
"""
import logging
class FirewallBackend:
"""Bazowa klasa dla backendów firewall"""
def __init__(self, config):
"""
Args:
config: ConfigParser object z konfiguracją
"""
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
def ban_ip(self, ip, duration):
"""
Banuje IP na określony czas
Args:
ip: Adres IP do zbanowania
duration: Czas bana w sekundach
Returns:
bool: True jeśli ban się powiódł
"""
raise NotImplementedError("Subclasses must implement ban_ip()")
def unban_ip(self, ip):
"""
Usuwa ban dla IP
Args:
ip: Adres IP do odbanowania
Returns:
bool: True jeśli odbanowanie się powiodło
"""
raise NotImplementedError("Subclasses must implement unban_ip()")
def is_banned(self, ip):
"""
Sprawdza czy IP jest zbanowany
Args:
ip: Adres IP do sprawdzenia
Returns:
bool: True jeśli IP jest zbanowany
"""
raise NotImplementedError("Subclasses must implement is_banned()")
def test_availability(self):
"""
Sprawdza czy backend jest dostępny w systemie
Returns:
bool: True jeśli backend jest dostępny
"""
return True

102
backends/csf.py Normal file
View File

@@ -0,0 +1,102 @@
"""
Backend dla ConfigServer Security & Firewall (CSF)
"""
import subprocess
from pathlib import Path
from .base import FirewallBackend
class CSFBackend(FirewallBackend):
"""Backend dla ConfigServer Firewall"""
def __init__(self, config):
super().__init__(config)
self.csf_path = config.get('backend_csf', 'csf_path',
fallback='/usr/sbin/csf')
if not self.test_availability():
self.logger.warning(f"CSF not found at {self.csf_path}")
def test_availability(self):
"""Sprawdza czy CSF jest zainstalowany"""
return Path(self.csf_path).exists()
def ban_ip(self, ip, duration):
"""
Banuje IP używając CSF
CSF używa:
- csf -d IP "comment" - permanent deny
- csf -td IP duration "comment" - temporary deny
"""
try:
# Temporary deny na określony czas (w sekundach)
cmd = [
self.csf_path, '-td', ip,
str(duration),
f"LogMon auto-ban"
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
self.logger.debug(f"CSF ban successful: {result.stdout.strip()}")
return True
else:
self.logger.error(f"CSF ban failed: {result.stderr.strip()}")
return False
except subprocess.TimeoutExpired:
self.logger.error(f"CSF ban command timed out for {ip}")
return False
except Exception as e:
self.logger.error(f"Error banning IP with CSF: {e}")
return False
def unban_ip(self, ip):
"""Usuwa ban używając CSF"""
try:
# Remove temporary ban
cmd = [self.csf_path, '-tr', ip]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
self.logger.debug(f"CSF unban successful for {ip}")
return True
else:
self.logger.warning(f"CSF unban may have failed: {result.stderr.strip()}")
# CSF czasem zwraca error nawet gdy się udało
return True
except Exception as e:
self.logger.error(f"Error unbanning IP with CSF: {e}")
return False
def is_banned(self, ip):
"""Sprawdza czy IP jest zbanowany w CSF"""
try:
cmd = [self.csf_path, '-g', ip]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10
)
output = result.stdout.lower()
return "deny" in output or "drop" in output
except Exception as e:
self.logger.error(f"Error checking ban status: {e}")
return False

17
backends/init.py Normal file
View File

@@ -0,0 +1,17 @@
"""
LogMon Backends - Integracje z różnymi firewallami
"""
from .base import FirewallBackend
from .csf import CSFBackend
from .nftables import NFTablesBackend
from .iptables import IPTablesBackend
from .ufw import UFWBackend
__all__ = [
'FirewallBackend',
'CSFBackend',
'NFTablesBackend',
'IPTablesBackend',
'UFWBackend'
]

104
backends/iptables.py Normal file
View File

@@ -0,0 +1,104 @@
"""
Backend dla iptables
"""
import subprocess
from .base import FirewallBackend
class IPTablesBackend(FirewallBackend):
"""Backend dla iptables"""
def __init__(self, config):
super().__init__(config)
self.chain = config.get('backend_iptables', 'chain_name',
fallback='LOGMON_BLOCK')
if self.test_availability():
self.setup_chain()
else:
self.logger.warning("iptables not available")
def test_availability(self):
"""Sprawdza czy iptables jest dostępny"""
try:
result = subprocess.run(
['iptables', '--version'],
capture_output=True,
timeout=5
)
return result.returncode == 0
except:
return False
def setup_chain(self):
"""Tworzy chain jeśli nie istnieje"""
try:
# Sprawdź czy chain istnieje
cmd = ['iptables', '-L', self.chain, '-n']
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode != 0:
# Utwórz chain
subprocess.run(['iptables', '-N', self.chain], check=True, timeout=5)
# Dodaj do INPUT na początku
subprocess.run(
['iptables', '-I', 'INPUT', '1', '-j', self.chain],
check=True,
timeout=5
)
self.logger.info(f"Created iptables chain: {self.chain}")
except Exception as e:
self.logger.error(f"Error setting up iptables chain: {e}")
def ban_ip(self, ip, duration):
"""Banuje IP używając iptables"""
try:
cmd = [
'iptables', '-I', self.chain, '1',
'-s', ip, '-j', 'DROP',
'-m', 'comment', '--comment', f'LogMon ban {duration}s'
]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"iptables ban successful for {ip}")
return True
else:
self.logger.error(f"iptables ban failed: {result.stderr.decode()}")
return False
except Exception as e:
self.logger.error(f"Error banning IP with iptables: {e}")
return False
def unban_ip(self, ip):
"""Usuwa ban używając iptables"""
try:
cmd = ['iptables', '-D', self.chain, '-s', ip, '-j', 'DROP']
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"iptables unban successful for {ip}")
return True
else:
self.logger.warning(f"iptables unban may have failed")
return False
except Exception as e:
self.logger.error(f"Error unbanning IP with iptables: {e}")
return False
def is_banned(self, ip):
"""Sprawdza czy IP jest zbanowany"""
try:
cmd = ['iptables', '-L', self.chain, '-n', '--line-numbers']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
return ip in result.stdout
except Exception as e:
self.logger.error(f"Error checking ban status: {e}")
return False

120
backends/nftables.py Normal file
View File

@@ -0,0 +1,120 @@
"""
Backend dla nftables
"""
import subprocess
from .base import FirewallBackend
class NFTablesBackend(FirewallBackend):
"""Backend dla nftables"""
def __init__(self, config):
super().__init__(config)
self.table = config.get('backend_nftables', 'table_name',
fallback='filter')
self.chain = config.get('backend_nftables', 'chain_name',
fallback='logmon_block')
if self.test_availability():
self.setup_chain()
else:
self.logger.warning("nftables not available")
def test_availability(self):
"""Sprawdza czy nftables jest dostępny"""
try:
result = subprocess.run(
['nft', '--version'],
capture_output=True,
timeout=5
)
return result.returncode == 0
except:
return False
def setup_chain(self):
"""Tworzy chain jeśli nie istnieje"""
try:
# Sprawdź czy chain istnieje
cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode != 0:
# Utwórz chain z priorytetem input
cmd = [
'nft', 'add', 'chain', 'inet', self.table, self.chain,
'{', 'type', 'filter', 'hook', 'input', 'priority', '0', ';', '}'
]
subprocess.run(cmd, check=True, timeout=5)
self.logger.info(f"Created nftables chain: {self.chain}")
except Exception as e:
self.logger.error(f"Error setting up nftables chain: {e}")
def ban_ip(self, ip, duration):
"""Banuje IP używając nftables"""
try:
# Dodaj regułę DROP
cmd = [
'nft', 'add', 'rule', 'inet', self.table, self.chain,
'ip', 'saddr', ip, 'counter', 'drop',
'comment', f'"LogMon ban {duration}s"'
]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"nftables ban successful for {ip}")
return True
else:
self.logger.error(f"nftables ban failed: {result.stderr.decode()}")
return False
except Exception as e:
self.logger.error(f"Error banning IP with nftables: {e}")
return False
def unban_ip(self, ip):
"""Usuwa ban używając nftables"""
try:
# Znajdź handle reguły
cmd = ['nft', '-a', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
if result.returncode != 0:
return False
# Parsuj output i znajdź handle
for line in result.stdout.split('\n'):
if ip in line and '# handle' in line:
try:
handle = line.split('# handle')[1].strip()
# Usuń regułę
cmd = [
'nft', 'delete', 'rule', 'inet',
self.table, self.chain, 'handle', handle
]
subprocess.run(cmd, check=True, timeout=5)
self.logger.debug(f"nftables unban successful for {ip}")
return True
except Exception as e:
self.logger.error(f"Error parsing handle: {e}")
return False
except Exception as e:
self.logger.error(f"Error unbanning IP with nftables: {e}")
return False
def is_banned(self, ip):
"""Sprawdza czy IP jest zbanowany"""
try:
cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
return ip in result.stdout
except Exception as e:
self.logger.error(f"Error checking ban status: {e}")
return False

78
backends/ufw.py Normal file
View File

@@ -0,0 +1,78 @@
"""
Backend dla UFW (Uncomplicated Firewall)
"""
import subprocess
from .base import FirewallBackend
class UFWBackend(FirewallBackend):
"""Backend dla UFW"""
def test_availability(self):
"""Sprawdza czy UFW jest dostępny"""
try:
result = subprocess.run(
['ufw', 'version'],
capture_output=True,
timeout=5
)
return result.returncode == 0
except:
return False
def ban_ip(self, ip, duration):
"""Banuje IP używając UFW"""
try:
# UFW nie wspiera natywnie timeout, więc używamy prostego deny
cmd = ['ufw', 'deny', 'from', ip]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"UFW ban successful for {ip}")
# UFW wymaga reload
subprocess.run(['ufw', 'reload'], capture_output=True, timeout=5)
return True
else:
self.logger.error(f"UFW ban failed: {result.stderr.decode()}")
return False
except Exception as e:
self.logger.error(f"Error banning IP with UFW: {e}")
return False
def unban_ip(self, ip):
"""Usuwa ban używając UFW"""
try:
cmd = ['ufw', 'delete', 'deny', 'from', ip]
result = subprocess.run(cmd, capture_output=True, timeout=5)
if result.returncode == 0:
self.logger.debug(f"UFW unban successful for {ip}")
subprocess.run(['ufw', 'reload'], capture_output=True, timeout=5)
return True
else:
self.logger.warning(f"UFW unban may have failed")
return False
except Exception as e:
self.logger.error(f"Error unbanning IP with UFW: {e}")
return False
def is_banned(self, ip):
"""Sprawdza czy IP jest zbanowany"""
try:
cmd = ['ufw', 'status', 'numbered']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
# Szukaj IP z DENY
for line in result.stdout.split('\n'):
if ip in line and 'DENY' in line:
return True
return False
except Exception as e:
self.logger.error(f"Error checking ban status: {e}")
return False