import re from collections import defaultdict from datetime import datetime def parse_log_file(log_file_path): parsed_entries = [] xss_patterns = [ r'<\s*script\s*', r'javascript:', r'<\s*img\s*src\s*=?', r'<\s*a\s*href\s*=?', r'<\s*iframe\s*src\s*=?', r'on\w+\s*=?', r'<\s*input\s*[^>]*\s*value\s*=?', r'<\s*form\s*action\s*=?', r'<\s*svg\s*on\w+\s*=?', r'script', r'alert', r'onerror', r'onload', ] sql_patterns = [ r'(union|select|insert|update|delete|drop)\s+(from|into|table)', r';\s*(union|select|insert|update|delete|drop)', r'substring\s*\(', r'extract\s*\(', r'order\s+by\s+\d+', r'--\+', r'1\s*=\s*1', r'@@\w+', r'`1', r'\|\|\s*chr\(', ] webshells_patterns = [ r'eval\s*\(', r'system\s*\(', r'passthru\s*\(', r'shell_exec\s*\(', r'exec\s*\(', r'popen\s*\(', r'proc_open\s*\(', r'pcntl_exec\s*\(', r'\.php\?cmd=', r'\.php\?id=', r'backdoor|webshell|phpspy|c99|kacak|b374k|wsos|madspot|r57|c100|r57shell', ] xss_pattern = re.compile('|'.join(xss_patterns), re.IGNORECASE) sql_pattern = re.compile('|'.join(sql_patterns), re.IGNORECASE) webshell_pattern = re.compile('|'.join(webshells_patterns), re.IGNORECASE) try: with open(log_file_path, 'r') as log_file: log_lines = log_file.readlines() for line in log_lines: if not line.strip(): continue match = re.search( r'(\w+\s+\d+\s\d+:\d+:\d+).*\s(\d+\.\d+\.\d+\.\d+).*"?\s*(GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS)\s+([^"\s]+)"?\s+(\d{3})', line ) if not match: continue timestamp = match.group(1) ip_address = match.group(2) http_method = match.group(3) requested_url = match.group(4) status_code = int(match.group(5)) threats = [] threat_level = 'info' if xss_pattern.search(line): threats.append('XSS Attack') threat_level = 'danger' if sql_pattern.search(line): threats.append('SQL Injection') threat_level = 'danger' if webshell_pattern.search(line): threats.append('Webshell') threat_level = 'danger' if http_method == 'PUT': threats.append('Remote Upload') threat_level = 'warning' if 'admin' in requested_url.lower() or 'config' in requested_url.lower(): if status_code == 403: threats.append('Unauthorized Access') threat_level = 'warning' status_category = 'info' if 200 <= status_code < 300: status_category = 'success' elif 300 <= status_code < 400: status_category = 'secondary' elif 400 <= status_code < 500: status_category = 'warning' elif status_code >= 500: status_category = 'danger' parsed_entries.append({ 'timestamp': timestamp, 'ip_address': ip_address, 'http_method': http_method, 'requested_url': requested_url, 'status_code': status_code, 'status_category': status_category, 'threats': threats if threats else ['None'], 'threat_level': threat_level if threats else 'info', 'is_threat': bool(threats), }) except FileNotFoundError: return [{'error': f'Log file not found: {log_file_path}'}] except Exception as e: return [{'error': f'Error parsing log: {str(e)}'}] return parsed_entries def get_log_statistics(parsed_entries): stats = { 'total_requests': len(parsed_entries), 'threat_count': sum(1 for e in parsed_entries if e.get('is_threat')), 'status_codes': defaultdict(int), 'http_methods': defaultdict(int), 'top_ips': defaultdict(int), 'threat_types': defaultdict(int), } for entry in parsed_entries: if 'error' in entry: continue stats['status_codes'][entry['status_code']] += 1 stats['http_methods'][entry['http_method']] += 1 stats['top_ips'][entry['ip_address']] += 1 for threat in entry.get('threats', []): if threat != 'None': stats['threat_types'][threat] += 1 stats['top_ips'] = sorted( stats['top_ips'].items(), key=lambda x: x[1], reverse=True )[:5] stats['status_codes'] = dict(stats['status_codes']) stats['http_methods'] = dict(stats['http_methods']) stats['threat_types'] = dict(stats['threat_types']) return stats def filter_logs(parsed_entries, filters=None): if not filters: return parsed_entries filtered = parsed_entries if 'status_code' in filters and filters['status_code']: filtered = [e for e in filtered if e.get('status_code') == int(filters['status_code'])] if 'threat_level' in filters and filters['threat_level']: filtered = [e for e in filtered if e.get('threat_level') == filters['threat_level']] if 'http_method' in filters and filters['http_method']: filtered = [e for e in filtered if e.get('http_method') == filters['http_method']] if 'ip_address' in filters and filters['ip_address']: filtered = [e for e in filtered if e.get('ip_address') == filters['ip_address']] if 'has_threat' in filters and filters['has_threat']: filtered = [e for e in filtered if e.get('is_threat')] return filtered