import re def parse_log_file(log_file_path): """ Parse HAProxy syslog format and identify security threats. Format: <134>Nov 3 09:18:35 haproxy[18]: IP:PORT [DATE:TIME] FRONTEND BACKEND STATUS BYTES ... """ parsed_entries = [] # Security threat patterns xss_patterns = [ r'<\s*script\s*', r'javascript:', r'<\s*img\s*src\s*=?', r'<\s*a\s*href\s*=?', r'<\s*iframe\s*src\s*=?', r'on\w+\s*=?', r'<\s*input\s*[^>]*\s*value\s*=?', r'<\s*form\s*action\s*=?', r'<\s*svg\s*on\w+\s*=?', r'alert\s*\(', r'onerror', r'onload', ] sql_patterns = [ r'(union|select|insert|update|delete|drop)\s+(from|into|table)', r';\s*(union|select|insert|update|delete|drop)', r'substring\s*\(', r'extract\s*\(', r'order\s+by\s+\d+', r'--\+', r'1\s*=\s*1', r'@@\w+', r'`1', ] webshells_patterns = [ r'eval\s*\(', r'system\s*\(', r'passthru\s*\(', r'shell_exec\s*\(', r'exec\s*\(', r'popen\s*\(', r'proc_open\s*\(', r'backdoor|webshell|phpspy|c99|kacak|b374k|wsos', ] # Compile patterns xss_pattern = re.compile('|'.join(xss_patterns), re.IGNORECASE) sql_pattern = re.compile('|'.join(sql_patterns), re.IGNORECASE) webshell_pattern = re.compile('|'.join(webshells_patterns), re.IGNORECASE) try: with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as log_file: log_lines = log_file.readlines() for line in log_lines: if not line.strip(): continue try: # Extract syslog header syslog_match = re.search( r'<\d+>(\w+\s+\d+\s+\d+:\d+:\d+).*haproxy\[\d+\]:\s+', line ) if not syslog_match: continue timestamp = syslog_match.group(1) # Extract IP:PORT ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+):(\d+)', line) if not ip_match: continue ip_address = ip_match.group(1) # Extract date/time in brackets (preferred format) datetime_match = re.search(r'\[(\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2})', line) if datetime_match: timestamp = datetime_match.group(1) # Extract frontend and backend fe_be_match = re.search(r'\]\s+(\S+)\s+(\S+)\s+(\d+/\d+/\d+/\d+/\d+)\s+(\d{3})', line) if not fe_be_match: continue frontend = fe_be_match.group(1) backend = fe_be_match.group(2) status_code = fe_be_match.group(4) # Extract HTTP method and URL http_match = re.search(r'"(\w+)\s+([^\s]+)\s+HTTP', line) if not http_match: # Fallback: extract entire request line request_match = re.search(r'"([^"]*)"', line) if request_match: request_line = request_match.group(1).split() http_method = request_line[0] if len(request_line) > 0 else 'UNKNOWN' requested_url = request_line[1] if len(request_line) > 1 else '/' else: continue else: http_method = http_match.group(1) requested_url = http_match.group(2) # Detect threats xss_alert = bool(xss_pattern.search(line)) sql_alert = bool(sql_pattern.search(line)) webshell_alert = bool(webshell_pattern.search(line)) put_method = http_method == 'PUT' illegal_resource = status_code == '403' # Determine status class for UI coloring status_class = 'secondary' if status_code.startswith('2'): status_class = 'success' elif status_code.startswith('3'): status_class = 'info' elif status_code.startswith('4'): status_class = 'warning' if illegal_resource: status_class = 'warning' elif status_code.startswith('5'): status_class = 'danger' # Add threat flag if any security issue detected has_threat = xss_alert or sql_alert or webshell_alert or put_method or illegal_resource if has_threat: status_class = 'danger' parsed_entries.append({ 'timestamp': timestamp, 'ip_address': ip_address, 'http_method': http_method, 'requested_url': requested_url, 'status_code': status_code, 'frontend': frontend, 'backend': backend, 'xss_alert': xss_alert, 'sql_alert': sql_alert, 'put_method': put_method, 'illegal_resource': illegal_resource, 'webshell_alert': webshell_alert, 'status_class': status_class, 'has_threat': has_threat, 'message': f"{frontend}~ {backend} [{status_code}] {http_method} {requested_url}" }) except Exception as e: print(f"[LOG_PARSER] Error parsing line: {e}", flush=True) continue except FileNotFoundError: print(f"[LOG_PARSER] Log file not found: {log_file_path}", flush=True) return [] except Exception as e: print(f"[LOG_PARSER] Error reading log file: {e}", flush=True) return [] print(f"[LOG_PARSER] Parsed {len(parsed_entries)} log entries", flush=True) return parsed_entries