import os import re import json import uuid import time import socket import ipaddress import requests import threading import subprocess import redis import geoip2.database import psutil import platform import json import logging import ipaddress import re #import sqlite3 from prometheus_client import start_http_server, Counter, Gauge, Histogram from ua_parser import user_agent_parser from markupsafe import escape from functools import wraps from base64 import b64encode from datetime import datetime, timedelta from collections import defaultdict from flask import Flask, jsonify, request, render_template, Response, flash, redirect, url_for from contextlib import closing from config import CONFIG, SKIP_BACKUP_PREFIXES if not os.path.exists("logs"): os.makedirs("logs") DEFAULT_LOG_LEVEL = "WARNING" logging.basicConfig( filename=os.path.join("logs", "app.log"), level=getattr(logging, DEFAULT_LOG_LEVEL), format="%(asctime)s [%(levelname)s] %(message)s", ) logging.getLogger().setLevel(getattr(logging, DEFAULT_LOG_LEVEL)) app = Flask(__name__) r = redis.Redis( host=CONFIG["redis_host"], port=CONFIG["redis_port"], db=CONFIG["redis_db"] ) geo_reader = geoip2.database.Reader(CONFIG["geoip_db"]) bin_path = CONFIG.get("webserver_bin", "/usr/sbin/nginx") lock = threading.Lock() start_time = datetime.now() START_TIME = time.time() REQUEST_COUNT = 0 METRICS = { "total_bans": Counter("autoban_total_bans", "Total banned IPs"), "active_bans": Gauge("autoban_active_bans", "Currently banned IPs"), "requests_total": Counter( "autoban_requests_total", "Total processed requests" ), "bruteforce_attempts": Counter( "autoban_bruteforce_total", "Bruteforce attempts detected" ), "drupal_attacks": Counter( "autoban_drupal_attacks", "Drupal-specific attacks detected" ), "sqli_attacks": Counter("autoban_sqli_attacks", "SQL Injection attempts"), "xss_attacks": Counter("autoban_xss_attacks", "XSS attempts"), "rce_attacks": Counter("autoban_rce_attacks", "RCE attempts"), "whitelisted_requests": Counter( "autoban_whitelisted_total", "Whitelisted requests" ), "processing_time": Histogram( "autoban_processing_time", "Request processing time" ), "ban_duration": Histogram( "autoban_ban_duration", "Duration of bans in seconds" ), "request_size": Histogram( "autoban_request_size", "Size of requests in bytes" ), "response_time": Histogram( "autoban_response_time", "Response time in seconds" ), } def start_background_threads(): sync_existing_bans() start_http_server(CONFIG["prometheus_port"]) threads = [ threading.Thread(target=tail_log, daemon=True), threading.Thread(target=ban_cleaner, daemon=True), threading.Thread(target=export_prometheus_metrics, daemon=True), threading.Thread(target=cleanup_old_stats, daemon=True), # threading.Thread(target=redis_sqlite_sync_daemon, daemon=True), ] for t in threads: t.start() class NotificationBatcher: def __init__(self, config): self.config = config.get( "notification_batching", { "enabled": False, "batch_window": 300, "threshold": 10, "max_window": 900, "summary_limit": 5, "include_details": True, }, ) self.pushover_config = config["pushover"] self.batch_data = defaultdict(list) self.batch_timers = {} self.lock = threading.Lock() def add_ban(self, ban_info): if not self.config["enabled"]: self._send_immediate_notification(ban_info) return current_time = time.time() batch_key = self._get_batch_key(current_time) with self.lock: self.batch_data[batch_key].append(ban_info) if len(self.batch_data[batch_key]) >= self.config["threshold"]: self._send_batch_notification(batch_key) elif batch_key not in self.batch_timers: self._set_batch_timer(batch_key) def _get_batch_key(self, timestamp): window_start = ( int(timestamp // self.config["batch_window"]) * self.config["batch_window"] ) return f"batch_{window_start}" def _set_batch_timer(self, batch_key): def send_after_timeout(): time.sleep(self.config["batch_window"]) with self.lock: if batch_key in self.batch_data and self.batch_data[batch_key]: self._send_batch_notification(batch_key) timer = threading.Thread(target=send_after_timeout, daemon=True) timer.start() self.batch_timers[batch_key] = timer def _send_batch_notification(self, batch_key): if batch_key not in self.batch_data or not self.batch_data[batch_key]: return ban_list = self.batch_data[batch_key] total_bans = len(ban_list) message = self._format_batch_message(total_bans, ban_list) self._send_pushover_notification( message, title="Autoban Security: Batch Ban Report" ) del self.batch_data[batch_key] if batch_key in self.batch_timers: del self.batch_timers[batch_key] def _format_batch_message(self, total_bans, ban_list): current_time = datetime.now().strftime("%H:%M:%S") message_parts = [ f"🚨 Autoban: Batch Ban Report - {current_time}", f"📊 Łącznie zbanowanych: {total_bans} IP", ] ip_counts = defaultdict(int) attack_types = defaultdict(int) countries = defaultdict(int) reasons = defaultdict(int) for ban in ban_list: ip_counts[ban["ip"]] += 1 if "attack_types" in ban and ban["attack_types"]: for attack_type in ban["attack_types"]: attack_types[attack_type] += 1 if ban.get("location") and ban["location"] != "Unknown": countries[ban["location"]] += 1 reasons[ban["reason"]] += 1 top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[ : self.config["summary_limit"] ] if top_ips: message_parts.append("\n🎯 Najczęściej atakujące IP:") for ip, count in top_ips: latest_ban = None for ban in ban_list: if ban["ip"] == ip: if ( latest_ban is None or ban["timestamp"] > latest_ban["timestamp"] ): latest_ban = ban location_info = "" if latest_ban and latest_ban.get("location"): if latest_ban.get("city"): location_info = ( f" - {latest_ban['city']}, {latest_ban['location']}" ) else: location_info = f" - {latest_ban['location']}" if count > 1: message_parts.append(f" • {ip} ({count}x){location_info}") else: message_parts.append(f" • {ip}{location_info}") if latest_ban and latest_ban.get("request_uri"): uri_preview = latest_ban["request_uri"][:50] if len(latest_ban["request_uri"]) > 50: uri_preview += "..." method = latest_ban.get("request_method", "GET") message_parts.append(f" Ostatni: {method} {uri_preview}") if reasons: message_parts.append("\n⚡ Przyczyny banów:") top_reasons = sorted(reasons.items(), key=lambda x: x[1], reverse=True)[:3] for reason, count in top_reasons: message_parts.append(f" • {reason}: {count}") if attack_types: message_parts.append("\n🛡️ Wykryte ataki:") top_attacks = sorted( attack_types.items(), key=lambda x: x[1], reverse=True )[:3] for attack_type, count in top_attacks: message_parts.append(f" • {attack_type}: {count}") if countries: message_parts.append("\n🌍 Kraje pochodzenia:") top_countries = sorted(countries.items(), key=lambda x: x[1], reverse=True)[ :3 ] for country, count in top_countries: message_parts.append(f" • {country}: {count}") return "\n".join(message_parts) def _send_pushover_notification(self, message, title="autoban Security Alert"): if not self.pushover_config["enabled"]: return try: data = { "token": self.pushover_config["token"], "user": self.pushover_config["user_key"], "message": message, "title": title, "priority": 1, } response = requests.post( "https://api.pushover.net/1/messages.json", data=data, timeout=10 ) if response.status_code == 200: print(f"Batch notification sent successfully at {datetime.now()}") else: print(f"Failed to send batch notification: {response.status_code}") except Exception as e: print(f"Error sending batch notification: {e}") def _send_immediate_notification(self, ban_info): location_info = "" if ban_info.get("city") and ban_info.get("location"): location_info = f" ({ban_info['city']}, {ban_info['location']})" elif ban_info.get("location"): location_info = f" ({ban_info['location']})" message_parts = [ f"🚨 autoban: IP banned", f"IP: {ban_info['ip']}{location_info}", f"Hostname: {ban_info.get('hostname', 'Unknown')}", f"Reason: {ban_info['reason']}", ] if ban_info.get("request_method") and ban_info.get("request_uri"): uri_preview = ban_info["request_uri"][:100] if len(ban_info["request_uri"]) > 100: uri_preview += "..." message_parts.append(f"Request: {ban_info['request_method']} {uri_preview}") if ban_info.get("user_agent"): ua_preview = ban_info["user_agent"][:80] if len(ban_info["user_agent"]) > 80: ua_preview += "..." message_parts.append(f"User-Agent: {ua_preview}") message = "\n".join(message_parts) self._send_pushover_notification(message, title="autoban Security Alert") notification_batcher = NotificationBatcher(CONFIG) def sqlite_init(): with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn: conn.execute( """ CREATE TABLE IF NOT EXISTS redis_dump ( key TEXT PRIMARY KEY, type TEXT NOT NULL, ttl INTEGER, value TEXT NOT NULL, updated_at TEXT NOT NULL ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS meta ( k TEXT PRIMARY KEY, v TEXT NOT NULL ) """ ) def _redis_key_to_record(r, key: bytes): k = key.decode() t = r.type(k).decode() ttl = r.ttl(k) if t == "string": val = r.get(k) payload = val.decode(errors="ignore") if val is not None else "" elif t == "hash": raw = r.hgetall(k) payload = {kk.decode(): vv.decode(errors="ignore") for kk, vv in raw.items()} payload = json.dumps(payload, ensure_ascii=False) elif t == "list": arr = [v.decode(errors="ignore") for v in r.lrange(k, 0, -1)] payload = json.dumps(arr, ensure_ascii=False) elif t == "set": arr = [v.decode(errors="ignore") for v in r.smembers(k)] payload = json.dumps(sorted(arr), ensure_ascii=False) elif t == "zset": arr = [ (m.decode(errors="ignore"), float(s)) for m, s in r.zrange(k, 0, -1, withscores=True) ] payload = json.dumps(arr, ensure_ascii=False) else: return None ttl = ttl if isinstance(ttl, int) and ttl >= 0 else None value = payload return (t, ttl, value) def backup_redis_to_sqlite(r, app_logger=None): sqlite_init() now = datetime.utcnow().isoformat() backed_up = 0 with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn: for key in r.scan_iter("*"): k = key.decode() if isinstance(key, (bytes, bytearray)) else str(key) if k.startswith(SKIP_BACKUP_PREFIXES): continue ttl_now = r.ttl(key) if ttl_now is not None and ttl_now > -1: continue rec = _redis_key_to_record(r, key) if rec is None: continue t, ttl, value = rec conn.execute( """ INSERT INTO redis_dump(key, type, ttl, value, updated_at) VALUES(?,?,?,?,?) ON CONFLICT(key) DO UPDATE SET type=excluded.type, ttl=excluded.ttl, value=excluded.value, updated_at=excluded.updated_at """, (k, t, ttl if ttl is not None else -1, value, now), ) backed_up += 1 conn.execute( "INSERT OR REPLACE INTO meta(k, v) VALUES('backup_last_run', ?)", (now,) ) conn.execute( "INSERT OR REPLACE INTO meta(k, v) VALUES('backup_last_count', ?)", (str(backed_up),), ) if app_logger: app_logger.info(f"Redis -> SQLite backup done: {backed_up} keys.") return backed_up def _sanitize_ip_token(token: str) -> str: token = re.sub(r";+\s*$", "", token.strip()) token = token.split("/")[0].strip() return token def _valid_ip(ip: str) -> bool: try: ipaddress.ip_address(ip) return True except ValueError: return False def check_basic_auth(username, password): expected = CONFIG.get("basic_auth", {}) return ( username == expected.get("username") and password == expected.get("password") ) def require_basic_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth or not check_basic_auth(auth.username, auth.password): return Response( "Unauthorized", 401, {"WWW-Authenticate": 'Basic realm="Login Required"'} ) return f(*args, **kwargs) return decorated @app.after_request def add_header(response): if request.path.startswith("/static/"): response.cache_control.public = True response.cache_control.max_age = int(timedelta(days=31).total_seconds()) response.cache_control.immutable = True return response def system_tiles(): return { "active_bans": METRICS["active_bans"]._value.get(), "total_bans": METRICS["total_bans"]._value.get(), "drupal_attacks": METRICS["drupal_attacks"]._value.get(), "memory_usage": f"{psutil.Process(os.getpid()).memory_info().rss/1024/1024:.2f} MB", "uptime": str(datetime.now() - start_time).split(".")[0], } def restore_missing_from_sqlite(r, app_logger=None): sqlite_init() with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn: rows = list(conn.execute("SELECT key, type, ttl, value FROM redis_dump")) pipe = r.pipeline() restored = 0 for k, t, ttl, value in rows: try: if r.exists(k): continue if t == "string": pipe.set(k, value) elif t == "hash": pipe.hmset(k, json.loads(value)) elif t == "list": arr = json.loads(value) if arr: pipe.rpush(k, *arr) elif t == "set": arr = json.loads(value) if arr: pipe.sadd(k, *arr) elif t == "zset": arr = json.loads(value) if arr: pipe.zadd(k, {m: float(s) for m, s in arr}) else: continue if isinstance(ttl, int) and ttl > 0: pipe.expire(k, ttl) restored += 1 except Exception: continue pipe.execute() with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn: conn.execute( "INSERT OR REPLACE INTO meta(k, v) VALUES('restore_last_count', ?)", (str(restored),), ) conn.execute( "INSERT OR REPLACE INTO meta(k, v) VALUES('restore_last_run', ?)", (datetime.utcnow().isoformat(),), ) if app_logger: app_logger.info( f"SQLite restore completed: restored {restored} keys not present in Redis." ) return restored def redis_sqlite_sync_daemon(): try: logger = app.logger if "app" in globals() else None _ = restore_missing_from_sqlite(r, logger) while True: try: backup_redis_to_sqlite(r, logger) except Exception as e: if logger: logger.error(f"Redis -> SQLite backup failed: {e}") time.sleep(24 * 3600) except Exception as e: try: app.logger.error(f"sync daemon init error: {e}") except Exception: pass def detect_attack_types(request_uri): detected_types = [] for attack_type, patterns in CONFIG["attack_patterns"].items(): for pattern in patterns: try: if re.search(pattern, request_uri, re.IGNORECASE): detected_types.append(attack_type) break except re.error: continue return detected_types def is_endpoint_whitelisted(uri): return any(uri.startswith(e) for e in CONFIG.get("whitelist_endpoints", [])) def get_geo_info(ip): try: response = geo_reader.city(ip) return { "country": response.country.name, "city": response.city.name, "latitude": response.location.latitude, "longitude": response.location.longitude, } except Exception as e: app.logger.error(f"GeoIP error for {ip}: {e}") return {} def is_whitelisted(ip, user_agent): try: for ip_range in CONFIG["whitelist"]["ip_ranges"]: if ipaddress.ip_address(ip) in ipaddress.ip_network(ip_range): METRICS["whitelisted_requests"].inc() return True except ValueError as e: app.logger.error(f"IP validation error: {e}") if any( re.search(pattern, user_agent, re.I) for pattern in CONFIG["whitelist"]["user_agents"] ): METRICS["whitelisted_requests"].inc() return True return False def send_pushover_notification(message): if not CONFIG["pushover"]["enabled"]: return try: requests.post( "https://api.pushover.net/1/messages.json", data={ "token": CONFIG["pushover"]["token"], "user": CONFIG["pushover"]["user_key"], "message": message, "title": "autoban Alert", "priority": 1, }, ) except Exception as e: app.logger.error(f"Pushover error: {e}") def detect_attack(log_entry): uri = log_entry.get("request_uri", "") method = log_entry.get("request_method", "") full_request = f"{method} {uri} {log_entry.get('http_user_agent', '')}".lower() for attack_type, patterns in CONFIG["attack_patterns"].items(): for pattern in patterns: if re.search(pattern, full_request, re.IGNORECASE): if attack_type == "drupal": METRICS["drupal_attacks"].inc() elif attack_type == "sqli": METRICS["sqli_attacks"].inc() elif attack_type == "xss": METRICS["xss_attacks"].inc() elif attack_type == "rce": METRICS["rce_attacks"].inc() return attack_type return None def analyze_sequence(ip): sequence = r.lrange(f"sequence:{ip}", 0, -1) sequence = [s.decode() for s in sequence] login_attempts = sum( 1 for url in sequence if any(u in url for u in CONFIG["bruteforce"]["login_urls"]) ) if login_attempts >= CONFIG["bruteforce"]["attempts_threshold"]: METRICS["bruteforce_attempts"].inc() return "bruteforce" total_score = 0 for pattern in CONFIG["sequence"]["suspicious_patterns"]: patt_list = pattern["pattern"] patt_score = pattern["score"] if len(sequence) >= len(patt_list): for i in range(len(sequence) - len(patt_list) + 1): if sequence[i : i + len(patt_list)] == patt_list: total_score += patt_score if total_score >= CONFIG["sequence"]["threshold"]: return "suspicious_sequence" return None def ban_ip(ip, reason, log_entry): if r.exists(f"unbanned:{ip}"): logging.info(f"Skipping ban (recently unbanned): IP={ip}") return with lock: if r.exists(f"banned:{ip}"): logging.info(f"Already banned: IP={ip}") return try: hostname = socket.gethostbyaddr(ip)[0] except (socket.herror, socket.gaierror): hostname = ip ban_id = str(uuid.uuid4()) geo = get_geo_info(ip) expires = time.time() + CONFIG["thresholds"]["ban_duration"] request_size = int(log_entry.get("bytes_sent", 0)) ban_data = { "ban_id": ban_id, "ip": ip, "hostname": hostname, "timestamp": datetime.now().isoformat(), "reason": ( f"large_request ({request_size} bytes)" if reason == "large_request" else reason ), "user_agent": log_entry.get("http_user_agent", ""), "last_request": log_entry["request_uri"], "geo": json.dumps(geo), "expires": expires, "attack_details": json.dumps( { "method": log_entry.get("request_method"), "payload": log_entry["request_uri"][:500], "status": log_entry.get("status"), "size": request_size, "response_time": float(log_entry.get("request_time", 0)), } ), } r.hmset(f"banned:{ip}", ban_data) r.expire(f"banned:{ip}", CONFIG["thresholds"]["ban_duration"]) with open(CONFIG["deny_file"], "r") as f: existing_ips = [] for line in f: if not line.startswith("deny"): continue raw = line.split()[1] ip = _sanitize_ip_token(raw) if _valid_ip(ip): existing_ips.append(ip) if ip not in existing_ips: with open(CONFIG["deny_file"], "a") as f: f.write(f"deny {ip};\n") try: subprocess.run([bin_path, "-t"], check=True) subprocess.run([bin_path, "-s", "reload"], check=True) except subprocess.CalledProcessError as e: logging.error(f"Angie config error: {e}") r.delete(f"banned:{ip}") return METRICS["total_bans"].inc() METRICS["ban_duration"].observe(CONFIG["thresholds"]["ban_duration"]) METRICS["request_size"].observe(request_size) METRICS["response_time"].observe(float(log_entry.get("request_time", 0))) notification_data = { "ip": ip, "hostname": hostname, "reason": ban_data["reason"], "location": geo.get("country", "Unknown"), "city": geo.get("city", ""), "request_method": log_entry.get("request_method", ""), "request_uri": log_entry["request_uri"], "user_agent": log_entry.get("http_user_agent", ""), "request_size": request_size, "status": log_entry.get("status", ""), "response_time": float(log_entry.get("request_time", 0)), "timestamp": time.time(), "attack_types": detect_attack_types(log_entry["request_uri"]), } logging.info(f"Banning IP: {ip}, reason: {reason}") notification_batcher.add_ban(notification_data) log_entry["ban_reason"] = reason update_stats(log_entry, is_blocked=True) def parse_user_agent(ua_string): try: parsed = user_agent_parser.Parse(ua_string) return f"{parsed['user_agent']['family']} {parsed['user_agent']['major']}" except: return "Unknown" def update_stats(log_entry, is_blocked=False): if not is_blocked: return now = datetime.now() time_keys = { "week": f"{now.year}:W{now.isocalendar()[1]}", "month": f"{now.year}:M{now.month}", "year": f"{now.year}", } pipeline = r.pipeline() ip = log_entry.get("remote_addr") url = log_entry.get("request_uri") ua = log_entry.get("http_user_agent", "Unknown") browser = parse_user_agent(ua) country = get_geo_info(ip).get("country", "Unknown") reason = log_entry.get("ban_reason", "unknown") for period, key in time_keys.items(): pipeline.hincrby(f"stats:blocked:{key}", "total", 1) pipeline.zincrby(f"stats:blocked_ips:{key}", 1, ip) pipeline.zincrby(f"stats:blocked_urls:{key}", 1, url) pipeline.zincrby(f"stats:blocked_browsers:{key}", 1, browser) pipeline.zincrby(f"stats:blocked_countries:{key}", 1, country) pipeline.zincrby(f"stats:blocked_reasons:{key}", 1, reason) pipeline.execute() def analyze_log_entry(log_entry): try: start_time = time.time() ip = log_entry["remote_addr"] status = int(log_entry.get("status", 0)) user_agent = log_entry.get("http_user_agent", "") uri = log_entry.get("request_uri", "") or "" method = log_entry.get("request_method", "GET") if is_endpoint_whitelisted(uri) or uri in CONFIG.get("whitelist_endpoints", []): logging.info(f"Endpoint whitelisted: {uri}, IP={ip}") return if is_whitelisted(ip, user_agent): logging.info(f"IP whitelisted: {ip}") return logging.info(f"Start analyze_log_entry: IP={ip}, URL={uri}") attack_type = detect_attack(log_entry) if attack_type: logging.warning(f"Detected attack type: {attack_type}, IP={ip}, URL={uri}") ban_ip(ip, f"{attack_type}_attack", log_entry) return request_size = int(log_entry.get("bytes_sent", 0)) if request_size > CONFIG["thresholds"]["request_size"]: logging.warning(f"Large request detected: IP={ip}, size={request_size}") ban_ip(ip, "large_request", log_entry) return ttl_window = CONFIG["thresholds"]["requests_time_window"] referer = log_entry.get("http_referer", "") or "" haystack = " ".join([uri, referer, user_agent]) info_patterns = CONFIG["attack_patterns"].get("information_disclosure", []) info_disclosure_detected = any( re.search(p, haystack, re.IGNORECASE) for p in info_patterns ) pipeline = r.pipeline() pipeline.incr(f"metrics:ip:{ip}:requests") if r.ttl(f"metrics:ip:{ip}:requests") == -1: pipeline.expire(f"metrics:ip:{ip}:requests", ttl_window) if status in CONFIG["thresholds"]["error_codes"]: pipeline.incr(f"metrics:ip:{ip}:errors") if r.ttl(f"metrics:ip:{ip}:errors") == -1: pipeline.expire(f"metrics:ip:{ip}:errors", ttl_window) elif status == 200: pipeline.incr(f"metrics:ip:{ip}:success") if r.ttl(f"metrics:ip:{ip}:success") == -1: pipeline.expire(f"metrics:ip:{ip}:success", ttl_window) if info_disclosure_detected: pipeline.incr(f"metrics:ip:{ip}:info_disclosure_attacks") if r.ttl(f"metrics:ip:{ip}:info_disclosure_attacks") == -1: pipeline.expire(f"metrics:ip:{ip}:info_disclosure_attacks", ttl_window) try: METRICS["info_disclosure_attacks"].inc() except Exception: pass logging.warning( f"Information disclosure indicator matched, IP={ip}, URI={uri}" ) pipeline.lpush(f"sequence:{ip}", uri) pipeline.ltrim(f"sequence:{ip}", 0, CONFIG["sequence"]["window_size"] - 1) pipeline.lpush(f"requests:{ip}", f"{method} {uri} {status}") pipeline.ltrim(f"requests:{ip}", 0, 20) if any(url in uri for url in CONFIG["bruteforce"]["login_urls"]): if status != 200: pipeline.incr(f"metrics:ip:{ip}:failed_logins") pipeline.expire( f"metrics:ip:{ip}:failed_logins", CONFIG["bruteforce"]["time_window"], ) pipeline.execute() metrics = { "requests": int(r.get(f"metrics:ip:{ip}:requests") or 0), "errors": int(r.get(f"metrics:ip:{ip}:errors") or 0), "success": int(r.get(f"metrics:ip:{ip}:success") or 0), "failed_logins": int(r.get(f"metrics:ip:{ip}:failed_logins") or 0), "info_disclosure_attacks": int( r.get(f"metrics:ip:{ip}:info_disclosure_attacks") or 0 ), } if metrics["requests"] > CONFIG["thresholds"]["requests"]: logging.warning(f"High traffic: IP={ip}, requests={metrics['requests']}") ban_ip(ip, "high_traffic", log_entry) elif metrics["errors"] > CONFIG["thresholds"]["errors"]: logging.warning(f"Many errors: IP={ip}, errors={metrics['errors']}") ban_ip(ip, "many_errors", log_entry) elif metrics["success"] > CONFIG["thresholds"]["success_requests"]: logging.warning(f"High success: IP={ip}, success={metrics['success']}") ban_ip(ip, "high_success", log_entry) elif metrics["failed_logins"] >= CONFIG["bruteforce"]["attempts_threshold"]: logging.warning( f"Bruteforce: IP={ip}, failed_logins={metrics['failed_logins']}" ) ban_ip(ip, "bruteforce", log_entry) elif analyze_sequence(ip): logging.warning(f"Suspicious sequence: IP={ip}") ban_ip(ip, "suspicious_sequence", log_entry) METRICS["processing_time"].observe(time.time() - start_time) except Exception as e: logging.error(f"Error analyzing log entry: {e}") def tail_log_file(log_file): current_inode = os.stat(log_file).st_ino with open(log_file, "r") as f: f.seek(0, 2) while True: line = f.readline() if not line: try: if os.stat(log_file).st_ino != current_inode: f.close() current_inode = os.stat(log_file).st_ino f = open(log_file, "r") f.seek(0, 2) except Exception as e: time.sleep(1) time.sleep(0.5) continue try: log_entry = json.loads(line) analyze_log_entry(log_entry) except json.JSONDecodeError: continue def tail_log(): threads = [] for log_file in CONFIG["log_files"]: t = threading.Thread(target=tail_log_file, args=(log_file,), daemon=True) t.start() threads.append(t) for t in threads: t.join() def ban_cleaner(): while True: time.sleep(3600) with lock: current_time = time.time() banned_ips = r.keys("banned:*") current_bans = [] for ip_key in banned_ips: data = r.hgetall(ip_key) if float(data.get(b"expires", 0)) < current_time: ip = data.get(b"ip", b"").decode() r.delete(ip_key) r.setex(f"unbanned:{ip}", 86400, 1) else: current_bans.append(data[b"ip"].decode()) with open(CONFIG["deny_file"], "w") as f: f.write("# Auto-generated deny list\n") for ip_raw in current_bans: ip = _sanitize_ip_token(ip_raw) if _valid_ip(ip): f.write(f"deny {ip};\n") if len(banned_ips) != len(current_bans): try: subprocess.run([bin_path, "-s", "reload"], check=True) except subprocess.CalledProcessError as e: app.logger.error(f"NGINX reload error: {e}") def export_prometheus_metrics(): while True: METRICS["active_bans"].set(len(r.keys("banned:*"))) time.sleep(15) def get_system_info(): try: mem = psutil.virtual_memory() disk = psutil.disk_usage("/") load = psutil.getloadavg() return { "system": { "os": platform.system(), "cpu_usage": psutil.cpu_percent(), "memory_used": f"{mem.used / (1024**3):.1f} GB", "memory_total": f"{mem.total / (1024**3):.1f} GB", "disk_used": f"{disk.used / (1024**3):.1f} GB", "disk_free": f"{disk.free / (1024**3):.1f} GB", "system_load": f"{load[0]:.2f} (1min), {load[1]:.2f} (5min), {load[2]:.2f} (15min)", }, "application": { "uptime": str(timedelta(seconds=time.time() - START_TIME)), "python_version": platform.python_version(), "total_requests": REQUEST_COUNT, "current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, } except Exception as e: return {"error": str(e)} def cleanup_old_stats(): while True: now = datetime.now() weeks_to_keep = CONFIG["stats_retention"]["week"] for i in range(weeks_to_keep, 53): week_key = f"{(now - timedelta(weeks=i)).year}:W{(now - timedelta(weeks=i)).isocalendar()[1]}" for stat_type in [ "requests", "blocked", "ips", "urls", "browsers", "countries", ]: r.delete(f"stats:{stat_type}:{week_key}") months_to_keep = CONFIG["stats_retention"]["month"] for i in range(months_to_keep, 13): month_key = f"{(now - timedelta(days=30*i)).year}:M{(now - timedelta(days=30*i)).month}" for stat_type in [ "requests", "blocked", "ips", "urls", "browsers", "countries", ]: r.delete(f"stats:{stat_type}:{month_key}") years_to_keep = CONFIG["stats_retention"]["year"] for i in range(years_to_keep, 10): year_key = f"{(now - timedelta(days=365*i)).year}" for stat_type in [ "requests", "blocked", "ips", "urls", "browsers", "countries", ]: r.delete(f"stats:{stat_type}:{year_key}") time.sleep(86400) def update_deny_file(): current_bans = [] for key in r.keys("banned:*"): ip_raw = r.hget(key, "ip").decode() ip = _sanitize_ip_token(ip_raw) if _valid_ip(ip): current_bans.append(ip) try: with open(CONFIG["deny_file"], "w") as f: f.write("# Auto-generated deny list\n") for ip in sorted(set(current_bans)): f.write(f"deny {ip};\n") subprocess.run([bin_path, "-t"], check=True) subprocess.run([bin_path, "-s", "reload"], check=True) except subprocess.CalledProcessError as e: app.logger.error(f"angie/nginx configuration error: {e}") raise RuntimeError("Failed to update firewall rules") from e def sync_existing_bans(): with open(CONFIG["deny_file"], "r") as f: for line in f: if line.startswith("deny"): raw = line.split()[1] ip = _sanitize_ip_token(raw) if not _valid_ip(ip): continue if not r.exists(f"banned:{ip}"): ban_data = { "ip": ip, "reason": "Legacy ban", "timestamp": datetime.now().isoformat(), "expires": time.time() + 3600 * 24 * 30, "hostname": "Legacy ban", } r.hmset(f"banned:{ip}", ban_data) r.expire(f"banned:{ip}", 3600 * 24 * 30) update_deny_file() def safe_thread(target, name, retry_delay=5): def wrapper(): while True: try: app.logger.info(f"Thread {name} started.") target() except Exception as e: app.logger.error(f"Thread {name} crashed: {e}") time.sleep(retry_delay) t = threading.Thread(target=wrapper, daemon=True) t.start() return t def _is_ip_in_cidrs(ip, cidrs): try: ip_obj = ipaddress.ip_address(ip) except ValueError: return False for c in cidrs: try: if ip_obj in ipaddress.ip_network(c, strict=False): return True except ValueError: if ip == c: return True return False def _client_ip(): remote = request.remote_addr or "" xff = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip() proxies = CONFIG.get("trusted_proxies", []) if _is_ip_in_cidrs(remote, proxies): return xff or remote return remote def is_trusted_client(): ip = _client_ip() for cidr in CONFIG.get("api_trusted_networks", []): try: if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr): return True except ValueError: continue return False def is_local_request(): ip = _client_ip() return ip in ("127.0.0.1", "::1") def require_api_auth(view_func): @wraps(view_func) def wrapper(*args, **kwargs): if is_trusted_client(): return view_func(*args, **kwargs) key = request.headers.get("X-API-Key") or request.args.get("api_key") if key not in set((CONFIG.get("api_keys") or {}).values()): return jsonify({"error": "Unauthorized"}), 401 return view_func(*args, **kwargs) return wrapper def _ban_record_minimal(h): ip = h.get(b"ip", b"").decode() reason = h.get(b"reason", b"").decode() or "unknown" exp = float(h.get(b"expires", b"0") or 0.0) return { "ip": ip, "reason": reason, "banned_until": datetime.fromtimestamp(exp).isoformat(), } def _safe_json_load(s, fallback=None): try: return json.loads(s) except Exception: return fallback def _ban_record_full(h): d = {k.decode(): v.decode() for k, v in h.items()} try: exp = float(d.get("expires", "0") or 0.0) except ValueError: exp = 0.0 return { "ban_id": d.get("ban_id"), "ip": d.get("ip"), "hostname": d.get("hostname"), "reason": d.get("reason"), "timestamp": d.get("timestamp"), "expires": exp, "banned_until": (datetime.fromtimestamp(exp).isoformat() if exp else None), "user_agent": d.get("user_agent"), "last_request": d.get("last_request"), "geo": _safe_json_load(d.get("geo") or "{}", {}), "attack_details": _safe_json_load(d.get("attack_details") or "{}", {}), # "geo_raw": d.get("geo"), # "attack_details_raw": d.get("attack_details"), } @app.route("/favicon.ico", methods=["GET"]) def favicon(): return Response(status=204) @app.route("/") @require_basic_auth def index(): global REQUEST_COUNT REQUEST_COUNT += 1 routes = [] for rule in app.url_map.iter_rules(): if rule.endpoint == "static": continue url = str(rule.rule) if not (url.startswith("/api/") or url == "/healthcheck"): continue methods = ",".join(sorted(rule.methods)) routes.append({"url": url, "methods": methods}) routes = sorted(routes, key=lambda r: r["url"]) sys_info = get_system_info() return render_template( "index.html", routes=routes, sys_info=sys_info, stats={"system": system_tiles()}, ) @app.route("/stats", methods=["GET"]) @require_basic_auth def stats_page(): stats = { "system": { "active_bans": METRICS["active_bans"]._value.get(), "total_bans": METRICS["total_bans"]._value.get(), "drupal_attacks": METRICS["drupal_attacks"]._value.get(), "memory_usage": f"{psutil.Process(os.getpid()).memory_info().rss/1024/1024:.2f} MB", "uptime": str(datetime.now() - start_time).split(".")[0], }, "geo_distribution": {}, "ban_reasons": {}, "banned_ips": [], } banned_ips = r.keys("banned:*") for ip_key in banned_ips: data = r.hgetall(ip_key) ip = data.get(b"ip", b"").decode() ban_entry = { "ban_id": data.get(b"ban_id", b"").decode(), "ip": ip, "hostname": data.get(b"hostname", b"").decode(), "banned_until": datetime.fromtimestamp(float(data[b"expires"])).isoformat(), "geo": json.loads(data.get(b"geo", b"{}")), "reason": data.get(b"reason", b"").decode(), "user_agent": data.get(b"user_agent", b"").decode(), "first_seen": data.get(b"timestamp", b"").decode(), } stats["banned_ips"].append(ban_entry) country = ban_entry["geo"].get("country", "Unknown") stats["geo_distribution"][country] = ( stats["geo_distribution"].get(country, 0) + 1 ) reason = ban_entry["reason"] stats["ban_reasons"][reason] = stats["ban_reasons"].get(reason, 0) + 1 return render_template("stats.html", stats=stats) @app.route("/list", methods=["GET", "POST"]) @require_basic_auth def ban_management(): message = None error = None if request.method == "POST" and "delete" in request.form: selected_ips = request.form.getlist("selected_ips") if not selected_ips: error = "Nie wybrano IP do usunięcia" else: with lock: for ip in selected_ips: r.delete(f"banned:{ip}") r.setex(f"unbanned:{ip}", 86400, 1) update_deny_file() message = f"Usunięto {len(selected_ips)} banów" if request.method == "POST" and "add_ban" in request.form: ip = request.form.get("ip", "").strip() reason = request.form.get("reason", "Manual ban").strip() duration = int(request.form.get("duration", 3600)) try: if r.exists(f"unbanned:{ip}"): error = f"IP {ip} jest tymczasowo chronione przed banem" else: ipaddress.ip_address(ip) if r.exists(f"banned:{ip}"): error = f"IP {ip} jest już zbanowane" else: ban_data = { "ip": ip, "reason": reason, "timestamp": datetime.now().isoformat(), "expires": time.time() + duration, "hostname": "Manual ban", } r.hmset(f"banned:{ip}", ban_data) r.expire(f"banned:{ip}", duration) update_deny_file() message = f"Dodano bana dla {ip}" except ValueError: error = "Niepoprawny format adresu IP" banned_ips = [] for key in r.keys("banned:*"): data = r.hgetall(key) banned_ips.append( { "ip": data.get(b"ip", b"").decode(), "reason": data.get(b"reason", b"").decode(), "expires": datetime.fromtimestamp(float(data[b"expires"])).strftime( "%Y-%m-%d %H:%M:%S" ), "hostname": data.get(b"hostname", b"").decode(), "payload": data.get(b"last_request", b"").decode(), } ) return render_template( "list.html", banned_ips=banned_ips, message=message, error=error ) @app.route("/reset", methods=["GET", "POST"]) @require_basic_auth def reset_counters(): message = None error = None if request.method == "POST": if "reset_ip" in request.form: ip = request.form.get("ip", "").strip() try: ipaddress.ip_address(ip) deleted = r.delete(f"metrics:ip:{ip}:errors") if deleted: message = f"Resetowano liczniki błędów dla IP {ip}" else: message = f"Nie znaleziono liczników błędów dla IP {ip}" except ValueError: error = "Niepoprawny format adresu IP" elif "reset_all_errors" in request.form: keys = r.keys("metrics:ip:*:errors") for key in keys: r.delete(key) message = f"Zresetowano wszystkie liczniki błędów ({len(keys)})" return render_template("reset.html", message=message, error=error) @app.route("/logs", methods=["GET"]) @require_basic_auth def view_logs(): selected_level = request.args.get("level", "INFO").upper() allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] if selected_level not in allowed_levels: selected_level = "WARNING" current_level = logging.getLogger().getEffectiveLevel() level_name = logging.getLevelName(current_level) return render_template( "logs.html", selected_level=selected_level, app_log_level=level_name ) @app.route("/logs-data", methods=["GET"]) @require_basic_auth def logs_data(): level = request.args.get("level", "INFO").upper() query = request.args.get("query", "").lower() allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] if level not in allowed_levels: level = "INFO" logs_path = os.path.join("logs", "app.log") logs = [] max_lines = 500 if os.path.exists(logs_path): with open(logs_path, "r") as f: lines = f.readlines()[-max_lines:] lines.reverse() for line in lines: if f"[{level}]" in line or any( allowed_levels.index(lvl) >= allowed_levels.index(level) and f"[{lvl}]" in line for lvl in allowed_levels ): if query and query not in line.lower(): continue logs.append(line.strip()) return jsonify({"logs": logs}) @app.route("/set-log-level", methods=["POST"]) @require_basic_auth def set_log_level(): level = request.form.get("level", "INFO").upper() allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] if level not in allowed_levels: return jsonify({"status": "error", "message": "Nieprawidłowy poziom"}), 400 logging.getLogger().setLevel(getattr(logging, level)) return jsonify({"status": "ok", "message": f"Ustawiono poziom: {level}"}) @app.route("/check", methods=["GET", "POST"]) @require_basic_auth def check_ip_info(): message = None error = None endpoints = [] metrics = {} recent_errors = [] ip = "" is_banned = False ban_info = {} if request.method == "POST": ip = request.form.get("ip", "").strip() try: ipaddress.ip_address(ip) endpoints = [e.decode() for e in r.lrange(f"sequence:{ip}", 0, -1)] metrics = { "requests": int(r.get(f"metrics:ip:{ip}:requests") or 0), "errors": int(r.get(f"metrics:ip:{ip}:errors") or 0), "success": int(r.get(f"metrics:ip:{ip}:success") or 0), "failed_logins": int(r.get(f"metrics:ip:{ip}:failed_logins") or 0), "info_disclosure_attacks": int( r.get(f"metrics:ip:{ip}:info_disclosure_attacks") or 0 ), } if r.exists(f"banned:{ip}"): is_banned = True raw = r.hgetall(f"banned:{ip}") for k, v in raw.items(): ban_info[k.decode()] = v.decode() raw_requests = r.lrange(f"requests:{ip}", 0, 20) for req in raw_requests: try: decoded = req.decode() if any( code in decoded for code in ["403", "404", "500", "502", "503"] ): recent_errors.append(decoded) except Exception: continue if not endpoints and all(v == 0 for v in metrics.values()): if is_banned: reason = ( ban_info.get("reason") or ban_info.get("source") or "zbanowane (brak szczegółów)" ) message = f"IP {ip} jest zbanowane — powód: {reason}" else: message = f"Nie znaleziono żadnych zapisów dla IP {ip}" except ValueError: error = "Niepoprawny format adresu IP" return render_template( "check.html", ip=ip, endpoints=endpoints, metrics=metrics, recent_errors=recent_errors, message=message, error=error, is_banned=is_banned, ban_info=ban_info, ) @app.route("/backup-now", methods=["POST", "GET"]) @require_basic_auth def backup_now(): try: logger = app.logger except Exception: logger = None try: count = backup_redis_to_sqlite(r, logger) except Exception as e: if logger: logger.error(f"Backup failed: {e}") return jsonify({"ok": False, "error": str(e)}), 500 all_keys = [k.decode() for k in r.scan_iter("*")] keys_info = {} for k in all_keys: t = r.type(k).decode() ttl = r.ttl(k) try: if t == "string": v = r.get(k) v = v.decode() if v is not None else None elif t == "hash": raw = r.hgetall(k) v = {kk.decode(): vv.decode(errors="ignore") for kk, vv in raw.items()} elif t == "list": v = [vv.decode(errors="ignore") for vv in r.lrange(k, 0, -1)] elif t == "set": v = sorted([vv.decode(errors="ignore") for vv in r.smembers(k)]) elif t == "zset": v = [ (m.decode(errors="ignore"), float(s)) for m, s in r.zrange(k, 0, -1, withscores=True) ] else: v = None except Exception: v = None keys_info[k] = {"type": t, "ttl": ttl, "value": v} try: with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn: records_total = conn.execute("SELECT COUNT(*) FROM redis_dump").fetchone()[ 0 ] meta_rows = dict(conn.execute("SELECT k, v FROM meta").fetchall()) except Exception: records_total = None meta_rows = {} stats = { "ok": True, "timestamp": datetime.utcnow().isoformat(), "redis": { "keys_total": len(all_keys), }, "sqlite": { "path": CONFIG.get("sqlite_db"), "records_total": records_total, "backup_last_run": meta_rows.get("backup_last_run"), "backup_last_count": ( int(meta_rows["backup_last_count"]) if meta_rows.get("backup_last_count") else None ), "restore_last_run": meta_rows.get("restore_last_run"), "restore_last_count": ( int(meta_rows["restore_last_count"]) if meta_rows.get("restore_last_count") else None ), }, } try: stats["requests"] = { "total_since_start": REQUEST_TOTAL, "by_endpoint": dict(REQUEST_COUNTERS), "app_started_at": APP_START_TS, } except Exception: pass return jsonify({"stats": stats, "data": keys_info}), 200 @app.route("/charts", methods=["GET", "POST"]) @require_basic_auth def charts_page(): now = datetime.now() def _parse_top_n(val, default=50): try: n = int(val) return max(1, min(100, n)) except (TypeError, ValueError): return default top_n = _parse_top_n(request.values.get("top_n"), default=50) period = (request.values.get("period") or "month").lower() if period not in ("week", "month", "year"): period = "month" period_key_map = { "week": f"{now.year}:W{now.isocalendar()[1]}", "month": f"{now.year}:M{now.month}", "year": f"{now.year}", } period_key = period_key_map[period] stats = { "top_reasons": [ {"reason": reason.decode(), "count": int(score)} for reason, score in r.zrevrange( f"stats:blocked_reasons:{period_key}", 0, top_n - 1, withscores=True ) ], "top_urls": [ {"url": url.decode(), "count": int(score)} for url, score in r.zrevrange( f"stats:blocked_urls:{period_key}", 0, top_n - 1, withscores=True ) ], "top_countries": [ {"country": country.decode(), "count": int(score)} for country, score in r.zrevrange( f"stats:blocked_countries:{period_key}", 0, top_n - 1, withscores=True ) ], } weeks = [] bans_per_week = [] for i in range(5, -1, -1): dt = now - timedelta(weeks=i) wk_key = f"{dt.year}:W{dt.isocalendar()[1]}" total = int(r.hget(f"stats:blocked:{wk_key}", "total") or 0) weeks.append(f"{dt.isocalendar()[0]}-W{dt.isocalendar()[1]:02d}") bans_per_week.append(total) stats["weeks"] = weeks stats["bans_per_week"] = bans_per_week return render_template("charts.html", stats=stats, top_n=top_n, period=period) ######## API ######### @app.route("/api/banned/list", methods=["GET"]) @require_api_auth def api_banned_list(): """GET: lista banów; ?full_info=1 zwraca pełniejsze detale dla UI.""" want_full = str(request.args.get("full_info") or "0").lower() in ( "1", "true", "yes", "on", ) items = [] for key in r.keys("banned:*"): h = r.hgetall(key) try: items.append(_ban_record_full(h) if want_full else _ban_record_minimal(h)) except Exception: continue return jsonify({"count": len(items), "items": items}), 200 @app.route("/api/banned/", methods=["GET", "POST", "DELETE"]) @require_api_auth def api_banned_ip(ip): """GET: szczegóły (z ?full_info=1); POST: dodaj bana; DELETE: usuń bana. DELETE /api/banned/all — usuń WSZYSTKIE bany.""" if request.method == "DELETE" and ip.lower() == "all": with lock: banned_keys = r.keys("banned:*") removed_ips = [] for ip_key in banned_keys: try: ip_decoded = ip_key.decode().split(":")[1] except Exception: continue r.delete(ip_key) r.setex(f"unbanned:{ip_decoded}", 86400, 1) removed_ips.append(ip_decoded) try: with open(CONFIG["deny_file"], "w") as f: f.write("# Auto-generated deny list\n") subprocess.run([bin_path, "-t"], check=True) subprocess.run([bin_path, "-s", "reload"], check=True) except subprocess.CalledProcessError as e: app.logger.error(f"NGINX config error: {e}") return ( jsonify( {"error": "NGINX reload failed", "removed": len(removed_ips)} ), 500, ) return ( jsonify( { "status": "all_unbanned", "count": len(removed_ips), "ips": removed_ips, } ), 200, ) try: ipaddress.ip_address(ip) except ValueError: return jsonify({"error": "Invalid IP"}), 400 if request.method == "GET": if not r.exists(f"banned:{ip}"): return jsonify({"error": "IP not found"}), 404 h = r.hgetall(f"banned:{ip}") want_full = str(request.args.get("full_info") or "0").lower() in ( "1", "true", "yes", "on", ) return ( jsonify(_ban_record_full(h) if want_full else _ban_record_minimal(h)), 200, ) if request.method == "DELETE": with lock: if not r.exists(f"banned:{ip}"): return jsonify({"error": "IP not found"}), 404 r.delete(f"banned:{ip}") r.setex(f"unbanned:{ip}", 86400, 1) try: update_deny_file() except Exception as e: app.logger.error(f"Update deny failed: {e}") return jsonify({"status": "unbanned", "ip": ip}), 200 # POST (dodanie bana) payload = request.get_json(force=True, silent=True) or {} reason = (payload.get("reason") or "Manual ban").strip() default_duration = int(CONFIG["thresholds"].get("ban_duration", 3600)) try: duration = int(payload.get("duration", default_duration)) duration = max(60, min(duration, 3600 * 24 * 365)) except (TypeError, ValueError): duration = default_duration with lock: if r.exists(f"unbanned:{ip}"): return jsonify({"error": "IP is temporarily protected after unban"}), 409 if r.exists(f"banned:{ip}"): return jsonify({"error": "IP already banned"}), 409 ban_data = { "ip": ip, "reason": reason, "timestamp": datetime.now().isoformat(), "expires": time.time() + duration, "hostname": "Manual ban (API)", } r.hmset(f"banned:{ip}", ban_data) r.expire(f"banned:{ip}", duration) try: update_deny_file() except Exception as e: app.logger.error(f"angie/nginx update failed after API ban: {e}") want_full = str(request.args.get("full_info") or "0").lower() in ( "1", "true", "yes", "on", ) h = r.hgetall(f"banned:{ip}") body = _ban_record_full(h) if want_full else _ban_record_minimal(h) body.update({"status": "banned"}) return jsonify(body), 201 @app.route("/healthcheck") def healthcheck(): try: checks = {"app_responsive": True, "status": "OK"} with app.test_client() as client: user = CONFIG["basic_auth"]["username"] password = CONFIG["basic_auth"]["password"] credentials = b64encode(f"{user}:{password}".encode()).decode() headers = {"Authorization": f"Basic {credentials}"} response = client.get("/", headers=headers) checks["app_responsive"] = (response.status_code == 200) if not all(checks.values()): checks["status"] = "CRITICAL" return jsonify(checks), 503 return jsonify(checks), 200 except Exception as e: return jsonify({"status": "CRITICAL", "error": str(e)}), 503 ###################################### if __name__ == "__main__": sync_existing_bans() start_http_server(CONFIG["prometheus_port"]) threads = [ safe_thread(tail_log, "tail_log"), safe_thread(ban_cleaner, "ban_cleaner"), safe_thread(export_prometheus_metrics, "export_prometheus"), safe_thread(cleanup_old_stats, "cleanup_stats"), safe_thread(redis_sqlite_sync_daemon, "redis_sqlite_sync"), ] app.run(host="0.0.0.0", port=CONFIG["api_port"], use_reloader=False)