105 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			105 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Backend dla iptables
 | |
| """
 | |
| 
 | |
| import subprocess
 | |
| from .base import FirewallBackend
 | |
| 
 | |
| 
 | |
| class IPTablesBackend(FirewallBackend):
 | |
|     """Backend dla iptables"""
 | |
|     
 | |
|     def __init__(self, config):
 | |
|         super().__init__(config)
 | |
|         self.chain = config.get('backend_iptables', 'chain_name', 
 | |
|                                fallback='LOGMON_BLOCK')
 | |
|         
 | |
|         if self.test_availability():
 | |
|             self.setup_chain()
 | |
|         else:
 | |
|             self.logger.warning("iptables not available")
 | |
|             
 | |
|     def test_availability(self):
 | |
|         """Sprawdza czy iptables jest dostępny"""
 | |
|         try:
 | |
|             result = subprocess.run(
 | |
|                 ['iptables', '--version'], 
 | |
|                 capture_output=True,
 | |
|                 timeout=5
 | |
|             )
 | |
|             return result.returncode == 0
 | |
|         except:
 | |
|             return False
 | |
|             
 | |
|     def setup_chain(self):
 | |
|         """Tworzy chain jeśli nie istnieje"""
 | |
|         try:
 | |
|             # Sprawdź czy chain istnieje
 | |
|             cmd = ['iptables', '-L', self.chain, '-n']
 | |
|             result = subprocess.run(cmd, capture_output=True, timeout=5)
 | |
|             
 | |
|             if result.returncode != 0:
 | |
|                 # Utwórz chain
 | |
|                 subprocess.run(['iptables', '-N', self.chain], check=True, timeout=5)
 | |
|                 
 | |
|                 # Dodaj do INPUT na początku
 | |
|                 subprocess.run(
 | |
|                     ['iptables', '-I', 'INPUT', '1', '-j', self.chain], 
 | |
|                     check=True, 
 | |
|                     timeout=5
 | |
|                 )
 | |
|                 
 | |
|                 self.logger.info(f"Created iptables chain: {self.chain}")
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error setting up iptables chain: {e}")
 | |
|             
 | |
|     def ban_ip(self, ip, duration):
 | |
|         """Banuje IP używając iptables"""
 | |
|         try:
 | |
|             cmd = [
 | |
|                 'iptables', '-I', self.chain, '1',
 | |
|                 '-s', ip, '-j', 'DROP',
 | |
|                 '-m', 'comment', '--comment', f'LogMon ban {duration}s'
 | |
|             ]
 | |
|             
 | |
|             result = subprocess.run(cmd, capture_output=True, timeout=5)
 | |
|             
 | |
|             if result.returncode == 0:
 | |
|                 self.logger.debug(f"iptables ban successful for {ip}")
 | |
|                 return True
 | |
|             else:
 | |
|                 self.logger.error(f"iptables ban failed: {result.stderr.decode()}")
 | |
|                 return False
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error banning IP with iptables: {e}")
 | |
|             return False
 | |
|             
 | |
|     def unban_ip(self, ip):
 | |
|         """Usuwa ban używając iptables"""
 | |
|         try:
 | |
|             cmd = ['iptables', '-D', self.chain, '-s', ip, '-j', 'DROP']
 | |
|             result = subprocess.run(cmd, capture_output=True, timeout=5)
 | |
|             
 | |
|             if result.returncode == 0:
 | |
|                 self.logger.debug(f"iptables unban successful for {ip}")
 | |
|                 return True
 | |
|             else:
 | |
|                 self.logger.warning(f"iptables unban may have failed")
 | |
|                 return False
 | |
|                 
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error unbanning IP with iptables: {e}")
 | |
|             return False
 | |
|             
 | |
|     def is_banned(self, ip):
 | |
|         """Sprawdza czy IP jest zbanowany"""
 | |
|         try:
 | |
|             cmd = ['iptables', '-L', self.chain, '-n', '--line-numbers']
 | |
|             result = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
 | |
|             return ip in result.stdout
 | |
|         except Exception as e:
 | |
|             self.logger.error(f"Error checking ban status: {e}")
 | |
|             return False
 | 
