""" Moduł monitorujący Dovecot IMAP/POP3 server """ import re import time from .base import LogModule class DovecotModule(LogModule): """Moduł monitorujący Dovecot""" def __init__(self, config, daemon): super().__init__(config, daemon) # Kompiluj wzorce regex dla wydajności self.patterns = self._load_patterns() # Regex do wyciągania IP z logów Dovecot # Obsługuje format: rip=IP, lip=IP self.ip_pattern = re.compile(r'rip=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})') # Ścieżka do pliku logu self.log_file = config.get('module_dovecot', 'log_file', fallback='/var/log/dovecot-info.log') # Czy ignorować błędy SSL/TLS (często są to skanery, nie ataki brute-force) self.ignore_ssl_errors = config.getboolean('module_dovecot', 'ignore_ssl_errors', fallback=True) # Czy ignorować połączenia z localhost self.ignore_localhost = config.getboolean('module_dovecot', 'ignore_localhost', fallback=True) self.logger.info(f"Loaded {len(self.patterns)} patterns for Dovecot") if self.ignore_ssl_errors: self.logger.info("SSL/TLS errors will be ignored") def _load_patterns(self): """Ładuje wzorce z konfiguracji""" patterns = [] pattern_names = self.config.get('module_dovecot', 'patterns', fallback='').split(',') for name in pattern_names: name = name.strip() if not name: continue section = f'pattern_{name}' if section not in self.config: self.logger.warning(f"Pattern section '{section}' not found in config") continue try: regex = self.config.get(section, 'regex') score = self.config.getint(section, 'score', fallback=1) patterns.append({ 'name': name, 'regex': re.compile(regex, re.IGNORECASE), 'score': score }) self.logger.debug(f"Loaded pattern '{name}': {regex} (score: {score})") except Exception as e: self.logger.error(f"Error loading pattern '{name}': {e}") return patterns def _run(self): """Główna pętla - tail -f na pliku logu""" self.logger.info(f"Tailing log file: {self.log_file}") try: with open(self.log_file, 'r') as f: # Przejdź na koniec pliku f.seek(0, 2) while self.running: line = f.readline() if line: self.process_line(line.strip()) else: # Brak nowych linii, czekaj chwilę time.sleep(0.1) except FileNotFoundError: self.logger.error(f"Log file not found: {self.log_file}") except PermissionError: self.logger.error(f"Permission denied reading: {self.log_file}") except Exception as e: self.logger.error(f"Error tailing log: {e}") def _is_ssl_error(self, line): """Sprawdza czy linia zawiera błąd SSL/TLS""" ssl_keywords = [ 'SSL_accept() failed', 'TLS handshaking', 'unsupported protocol', 'version too low', 'no shared cipher', 'wrong version number', 'internal error', 'Connection reset by peer', 'bad key share', 'unknown protocol', 'http request' ] line_lower = line.lower() return any(keyword.lower() in line_lower for keyword in ssl_keywords) def process_line(self, line): """ Przetwarza linię z logu Dovecot Przykłady linii: - imap-login: Info: Disconnected: Connection closed (auth failed, 1 attempts in 2 secs): user=, method=PLAIN, rip=1.2.3.4, lip=5.6.7.8, TLS - imap-login: Info: Disconnected: Connection closed: SSL_accept() failed (no auth attempts): user=<>, rip=1.2.3.4 """ # Ignoruj błędy SSL/TLS jeśli włączone if self.ignore_ssl_errors and self._is_ssl_error(line): return # Wyciągnij IP ip_match = self.ip_pattern.search(line) if not ip_match: return ip = ip_match.group(1) # Pomiń localhost jeśli włączone if self.ignore_localhost and (ip.startswith('127.') or ip == '::1'): return # Pomiń lokalne IP if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith('172.'): # Sprawdź czy to nie jest 172.16-31.x.x (prywatne) if ip.startswith('172.'): second_octet = int(ip.split('.')[1]) if 16 <= second_octet <= 31: return else: return # Sprawdź wzorce for pattern in self.patterns: if pattern['regex'].search(line): self.logger.debug( f"Pattern '{pattern['name']}' matched for IP {ip}" ) self.logger.debug(f"Line: {line}") # Zgłoś niepowodzenie do demona self.daemon.track_failure(ip, pattern['score']) # Tylko pierwszy pasujący wzorzec break