121 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			121 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Backend dla nftables
 | |
| """
 | |
| 
 | |
| import subprocess
 | |
| from .base import FirewallBackend
 | |
| 
 | |
| 
 | |
| class NFTablesBackend(FirewallBackend):
 | |
|     """Backend dla nftables"""
 | |
|     
 | |
|     def __init__(self, config):
 | |
|         super().__init__(config)
 | |
|         self.table = config.get('backend_nftables', 'table_name', 
 | |
|                                fallback='filter')
 | |
|         self.chain = config.get('backend_nftables', 'chain_name', 
 | |
|                                fallback='logmon_block')
 | |
|         
 | |
|         if self.test_availability():
 | |
|             self.setup_chain()
 | |
|         else:
 | |
|             self.logger.warning("nftables not available")
 | |
|             
 | |
|     def test_availability(self):
 | |
|         """Sprawdza czy nftables jest dostępny"""
 | |
|         try:
 | |
|             result = subprocess.run(
 | |
|                 ['nft', '--version'], 
 | |
|                 capture_output=True,
 | |
|                 timeout=5
 | |
|             )
 | |
|             return result.returncode == 0
 | |
|         except:
 | |
|             return False
 | |
|             
 | |
|     def setup_chain(self):
 | |
|         """Tworzy chain jeśli nie istnieje"""
 | |
|         try:
 | |
|             # Sprawdź czy chain istnieje
 | |
|             cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
 | |
|             result = subprocess.run(cmd, capture_output=True, timeout=5)
 | |
|             
 | |
|             if result.returncode != 0:
 | |
|                 # Utwórz chain z priorytetem input
 | |
|                 cmd = [
 | |
|                     'nft', 'add', 'chain', 'inet', self.table, self.chain,
 | |
|                     '{', 'type', 'filter', 'hook', 'input', 'priority', '0', ';', '}'
 | |
|                 ]
 | |
|                 subprocess.run(cmd, check=True, timeout=5)
 | |
|                 self.logger.info(f"Created nftables chain: {self.chain}")
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error setting up nftables chain: {e}")
 | |
|             
 | |
|     def ban_ip(self, ip, duration):
 | |
|         """Banuje IP używając nftables"""
 | |
|         try:
 | |
|             # Dodaj regułę DROP
 | |
|             cmd = [
 | |
|                 'nft', 'add', 'rule', 'inet', self.table, self.chain,
 | |
|                 'ip', 'saddr', ip, 'counter', 'drop',
 | |
|                 'comment', f'"LogMon ban {duration}s"'
 | |
|             ]
 | |
|             
 | |
|             result = subprocess.run(cmd, capture_output=True, timeout=5)
 | |
|             
 | |
|             if result.returncode == 0:
 | |
|                 self.logger.debug(f"nftables ban successful for {ip}")
 | |
|                 return True
 | |
|             else:
 | |
|                 self.logger.error(f"nftables ban failed: {result.stderr.decode()}")
 | |
|                 return False
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error banning IP with nftables: {e}")
 | |
|             return False
 | |
|             
 | |
|     def unban_ip(self, ip):
 | |
|         """Usuwa ban używając nftables"""
 | |
|         try:
 | |
|             # Znajdź handle reguły
 | |
|             cmd = ['nft', '-a', 'list', 'chain', 'inet', self.table, self.chain]
 | |
|             result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
 | |
|             
 | |
|             if result.returncode != 0:
 | |
|                 return False
 | |
|                 
 | |
|             # Parsuj output i znajdź handle
 | |
|             for line in result.stdout.split('\n'):
 | |
|                 if ip in line and '# handle' in line:
 | |
|                     try:
 | |
|                         handle = line.split('# handle')[1].strip()
 | |
|                         
 | |
|                         # Usuń regułę
 | |
|                         cmd = [
 | |
|                             'nft', 'delete', 'rule', 'inet', 
 | |
|                             self.table, self.chain, 'handle', handle
 | |
|                         ]
 | |
|                         subprocess.run(cmd, check=True, timeout=5)
 | |
|                         self.logger.debug(f"nftables unban successful for {ip}")
 | |
|                         return True
 | |
|                         
 | |
|                     except Exception as e:
 | |
|                         self.logger.error(f"Error parsing handle: {e}")
 | |
|                         
 | |
|             return False
 | |
|             
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error unbanning IP with nftables: {e}")
 | |
|             return False
 | |
|             
 | |
|     def is_banned(self, ip):
 | |
|         """Sprawdza czy IP jest zbanowany"""
 | |
|         try:
 | |
|             cmd = ['nft', 'list', 'chain', 'inet', self.table, self.chain]
 | |
|             result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
 | |
|             return ip in result.stdout
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error checking ban status: {e}")
 | |
|             return False
 | 
