Files
haproxy-dashboard/log_parser.py
Mateusz Gruszczyński d01ca3512e new options
2025-11-03 10:27:52 +01:00

136 lines
4.7 KiB
Python

import re
def parse_log_file(log_file_path):
"""
Parse HAProxy syslog format and identify security threats.
Format: <134>Nov 3 09:18:35 haproxy[18]: IP:PORT [DATE:TIME] FRONTEND BACKEND STATUS BYTES ...
"""
parsed_entries = []
# Security threat patterns
xss_patterns = [
r'<\s*script\s*',
r'javascript:',
r'<\s*img\s*src\s*=?',
r'<\s*a\s*href\s*=?',
r'<\s*iframe\s*src\s*=?',
r'on\w+\s*=?',
r'<\s*input\s*[^>]*\s*value\s*=?',
r'<\s*form\s*action\s*=?',
r'<\s*svg\s*on\w+\s*=?',
r'alert\s*\(',
r'onerror',
r'onload',
]
sql_patterns = [
r'(union|select|insert|update|delete|drop)\s+(from|into|table)',
r';\s*(union|select|insert|update|delete|drop)',
r'substring\s*\(',
r'extract\s*\(',
r'order\s+by\s+\d+',
r'--\+',
r'1\s*=\s*1',
r'@@\w+',
r'`1',
]
webshells_patterns = [
r'eval\s*\(',
r'system\s*\(',
r'passthru\s*\(',
r'shell_exec\s*\(',
r'exec\s*\(',
r'popen\s*\(',
r'proc_open\s*\(',
r'backdoor|webshell|phpspy|c99|kacak|b374k|wsos',
]
# Compile patterns
xss_pattern = re.compile('|'.join(xss_patterns), re.IGNORECASE)
sql_pattern = re.compile('|'.join(sql_patterns), re.IGNORECASE)
webshell_pattern = re.compile('|'.join(webshells_patterns), re.IGNORECASE)
try:
with open(log_file_path, 'r', encoding='utf-8', errors='ignore') as log_file:
log_lines = log_file.readlines()
for line in log_lines:
if not line.strip():
continue
try:
# Extract syslog header
syslog_match = re.search(
r'<\d+>(\w+\s+\d+\s+\d+:\d+:\d+).*haproxy\[\d+\]:\s+',
line
)
if not syslog_match:
continue
timestamp = syslog_match.group(1)
# Extract IP:PORT
ip_match = re.search(r'(\d+\.\d+\.\d+\.\d+):(\d+)', line)
if not ip_match:
continue
ip_address = ip_match.group(1)
# Extract date/time in brackets
datetime_match = re.search(r'\[(\d{2}/\w+/\d{4}:\d{2}:\d{2}:\d{2})', line)
if datetime_match:
timestamp = datetime_match.group(1)
# Extract frontend and backend
fe_be_match = re.search(r'\]\s+(\S+)\s+(\S+)\s+(\d+/\d+/\d+/\d+/\d+)\s+(\d{3})', line)
if not fe_be_match:
continue
frontend = fe_be_match.group(1)
backend = fe_be_match.group(2)
status_code = fe_be_match.group(4)
# Extract HTTP method and URL
http_match = re.search(r'"(\w+)\s+([^\s]+)\s+HTTP', line)
if not http_match:
continue
http_method = http_match.group(1)
requested_url = http_match.group(2)
# Detect threats
xss_alert = bool(xss_pattern.search(line))
sql_alert = bool(sql_pattern.search(line))
webshell_alert = bool(webshell_pattern.search(line))
put_method = http_method == 'PUT'
illegal_resource = status_code == '403'
parsed_entries.append({
'timestamp': timestamp,
'ip_address': ip_address,
'http_method': http_method,
'requested_url': requested_url,
'status_code': status_code,
'frontend': frontend,
'backend': backend,
'xss_alert': xss_alert,
'sql_alert': sql_alert,
'put_method': put_method,
'illegal_resource': illegal_resource,
'webshell_alert': webshell_alert,
})
except Exception as e:
print(f"Error parsing line: {e}")
continue
except FileNotFoundError:
print(f"Log file not found: {log_file_path}")
return []
except Exception as e:
print(f"Error reading log file: {e}")
return []
return parsed_entries