Files
autoban/app.py
2026-01-01 02:13:34 +01:00

1824 lines
60 KiB
Python

import os
import re
import json
import uuid
import time
import socket
import ipaddress
import requests
import threading
import subprocess
import redis
import geoip2.database
import psutil
import platform
import json
import logging
import ipaddress
import re
#import sqlite3
from prometheus_client import start_http_server, Counter, Gauge, Histogram
from ua_parser import user_agent_parser
from markupsafe import escape
from functools import wraps
from base64 import b64encode
from datetime import datetime, timedelta
from collections import defaultdict
from flask import Flask, jsonify, request, render_template, Response, flash, redirect, url_for
from contextlib import closing
from config import CONFIG, SKIP_BACKUP_PREFIXES
if not os.path.exists("logs"):
os.makedirs("logs")
DEFAULT_LOG_LEVEL = "WARNING"
logging.basicConfig(
filename=os.path.join("logs", "app.log"),
level=getattr(logging, DEFAULT_LOG_LEVEL),
format="%(asctime)s [%(levelname)s] %(message)s",
)
logging.getLogger().setLevel(getattr(logging, DEFAULT_LOG_LEVEL))
app = Flask(__name__)
r = redis.Redis(
host=CONFIG["redis_host"], port=CONFIG["redis_port"], db=CONFIG["redis_db"]
)
geo_reader = geoip2.database.Reader(CONFIG["geoip_db"])
bin_path = CONFIG.get("webserver_bin", "/usr/sbin/nginx")
lock = threading.Lock()
start_time = datetime.now()
START_TIME = time.time()
REQUEST_COUNT = 0
METRICS = {
"total_bans": Counter("autoban_total_bans", "Total banned IPs"),
"active_bans": Gauge("autoban_active_bans", "Currently banned IPs"),
"requests_total": Counter(
"autoban_requests_total", "Total processed requests"
),
"bruteforce_attempts": Counter(
"autoban_bruteforce_total", "Bruteforce attempts detected"
),
"drupal_attacks": Counter(
"autoban_drupal_attacks", "Drupal-specific attacks detected"
),
"sqli_attacks": Counter("autoban_sqli_attacks", "SQL Injection attempts"),
"xss_attacks": Counter("autoban_xss_attacks", "XSS attempts"),
"rce_attacks": Counter("autoban_rce_attacks", "RCE attempts"),
"whitelisted_requests": Counter(
"autoban_whitelisted_total", "Whitelisted requests"
),
"processing_time": Histogram(
"autoban_processing_time", "Request processing time"
),
"ban_duration": Histogram(
"autoban_ban_duration", "Duration of bans in seconds"
),
"request_size": Histogram(
"autoban_request_size", "Size of requests in bytes"
),
"response_time": Histogram(
"autoban_response_time", "Response time in seconds"
),
}
def start_background_threads():
sync_existing_bans()
start_http_server(CONFIG["prometheus_port"])
threads = [
threading.Thread(target=tail_log, daemon=True),
threading.Thread(target=ban_cleaner, daemon=True),
threading.Thread(target=export_prometheus_metrics, daemon=True),
threading.Thread(target=cleanup_old_stats, daemon=True),
# threading.Thread(target=redis_sqlite_sync_daemon, daemon=True),
]
for t in threads:
t.start()
class NotificationBatcher:
def __init__(self, config):
self.config = config.get(
"notification_batching",
{
"enabled": False,
"batch_window": 300,
"threshold": 10,
"max_window": 900,
"summary_limit": 5,
"include_details": True,
},
)
self.pushover_config = config["pushover"]
self.batch_data = defaultdict(list)
self.batch_timers = {}
self.lock = threading.Lock()
def add_ban(self, ban_info):
if not self.config["enabled"]:
self._send_immediate_notification(ban_info)
return
current_time = time.time()
batch_key = self._get_batch_key(current_time)
with self.lock:
self.batch_data[batch_key].append(ban_info)
if len(self.batch_data[batch_key]) >= self.config["threshold"]:
self._send_batch_notification(batch_key)
elif batch_key not in self.batch_timers:
self._set_batch_timer(batch_key)
def _get_batch_key(self, timestamp):
window_start = (
int(timestamp // self.config["batch_window"]) * self.config["batch_window"]
)
return f"batch_{window_start}"
def _set_batch_timer(self, batch_key):
def send_after_timeout():
time.sleep(self.config["batch_window"])
with self.lock:
if batch_key in self.batch_data and self.batch_data[batch_key]:
self._send_batch_notification(batch_key)
timer = threading.Thread(target=send_after_timeout, daemon=True)
timer.start()
self.batch_timers[batch_key] = timer
def _send_batch_notification(self, batch_key):
if batch_key not in self.batch_data or not self.batch_data[batch_key]:
return
ban_list = self.batch_data[batch_key]
total_bans = len(ban_list)
message = self._format_batch_message(total_bans, ban_list)
self._send_pushover_notification(
message, title="Autoban Security: Batch Ban Report"
)
del self.batch_data[batch_key]
if batch_key in self.batch_timers:
del self.batch_timers[batch_key]
def _format_batch_message(self, total_bans, ban_list):
current_time = datetime.now().strftime("%H:%M:%S")
message_parts = [
f"🚨 Autoban: Batch Ban Report - {current_time}",
f"📊 Łącznie zbanowanych: {total_bans} IP",
]
ip_counts = defaultdict(int)
attack_types = defaultdict(int)
countries = defaultdict(int)
reasons = defaultdict(int)
for ban in ban_list:
ip_counts[ban["ip"]] += 1
if "attack_types" in ban and ban["attack_types"]:
for attack_type in ban["attack_types"]:
attack_types[attack_type] += 1
if ban.get("location") and ban["location"] != "Unknown":
countries[ban["location"]] += 1
reasons[ban["reason"]] += 1
top_ips = sorted(ip_counts.items(), key=lambda x: x[1], reverse=True)[
: self.config["summary_limit"]
]
if top_ips:
message_parts.append("\n🎯 Najczęściej atakujące IP:")
for ip, count in top_ips:
latest_ban = None
for ban in ban_list:
if ban["ip"] == ip:
if (
latest_ban is None
or ban["timestamp"] > latest_ban["timestamp"]
):
latest_ban = ban
location_info = ""
if latest_ban and latest_ban.get("location"):
if latest_ban.get("city"):
location_info = (
f" - {latest_ban['city']}, {latest_ban['location']}"
)
else:
location_info = f" - {latest_ban['location']}"
if count > 1:
message_parts.append(f"{ip} ({count}x){location_info}")
else:
message_parts.append(f"{ip}{location_info}")
if latest_ban and latest_ban.get("request_uri"):
uri_preview = latest_ban["request_uri"][:50]
if len(latest_ban["request_uri"]) > 50:
uri_preview += "..."
method = latest_ban.get("request_method", "GET")
message_parts.append(f" Ostatni: {method} {uri_preview}")
if reasons:
message_parts.append("\n⚡ Przyczyny banów:")
top_reasons = sorted(reasons.items(), key=lambda x: x[1], reverse=True)[:3]
for reason, count in top_reasons:
message_parts.append(f"{reason}: {count}")
if attack_types:
message_parts.append("\n🛡️ Wykryte ataki:")
top_attacks = sorted(
attack_types.items(), key=lambda x: x[1], reverse=True
)[:3]
for attack_type, count in top_attacks:
message_parts.append(f"{attack_type}: {count}")
if countries:
message_parts.append("\n🌍 Kraje pochodzenia:")
top_countries = sorted(countries.items(), key=lambda x: x[1], reverse=True)[
:3
]
for country, count in top_countries:
message_parts.append(f"{country}: {count}")
return "\n".join(message_parts)
def _send_pushover_notification(self, message, title="autoban Security Alert"):
if not self.pushover_config["enabled"]:
return
try:
data = {
"token": self.pushover_config["token"],
"user": self.pushover_config["user_key"],
"message": message,
"title": title,
"priority": 1,
}
response = requests.post(
"https://api.pushover.net/1/messages.json", data=data, timeout=10
)
if response.status_code == 200:
print(f"Batch notification sent successfully at {datetime.now()}")
else:
print(f"Failed to send batch notification: {response.status_code}")
except Exception as e:
print(f"Error sending batch notification: {e}")
def _send_immediate_notification(self, ban_info):
location_info = ""
if ban_info.get("city") and ban_info.get("location"):
location_info = f" ({ban_info['city']}, {ban_info['location']})"
elif ban_info.get("location"):
location_info = f" ({ban_info['location']})"
message_parts = [
f"🚨 autoban: IP banned",
f"IP: {ban_info['ip']}{location_info}",
f"Hostname: {ban_info.get('hostname', 'Unknown')}",
f"Reason: {ban_info['reason']}",
]
if ban_info.get("request_method") and ban_info.get("request_uri"):
uri_preview = ban_info["request_uri"][:100]
if len(ban_info["request_uri"]) > 100:
uri_preview += "..."
message_parts.append(f"Request: {ban_info['request_method']} {uri_preview}")
if ban_info.get("user_agent"):
ua_preview = ban_info["user_agent"][:80]
if len(ban_info["user_agent"]) > 80:
ua_preview += "..."
message_parts.append(f"User-Agent: {ua_preview}")
message = "\n".join(message_parts)
self._send_pushover_notification(message, title="autoban Security Alert")
notification_batcher = NotificationBatcher(CONFIG)
def sqlite_init():
with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS redis_dump (
key TEXT PRIMARY KEY,
type TEXT NOT NULL,
ttl INTEGER,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS meta (
k TEXT PRIMARY KEY,
v TEXT NOT NULL
)
"""
)
def _redis_key_to_record(r, key: bytes):
k = key.decode()
t = r.type(k).decode()
ttl = r.ttl(k)
if t == "string":
val = r.get(k)
payload = val.decode(errors="ignore") if val is not None else ""
elif t == "hash":
raw = r.hgetall(k)
payload = {kk.decode(): vv.decode(errors="ignore") for kk, vv in raw.items()}
payload = json.dumps(payload, ensure_ascii=False)
elif t == "list":
arr = [v.decode(errors="ignore") for v in r.lrange(k, 0, -1)]
payload = json.dumps(arr, ensure_ascii=False)
elif t == "set":
arr = [v.decode(errors="ignore") for v in r.smembers(k)]
payload = json.dumps(sorted(arr), ensure_ascii=False)
elif t == "zset":
arr = [
(m.decode(errors="ignore"), float(s))
for m, s in r.zrange(k, 0, -1, withscores=True)
]
payload = json.dumps(arr, ensure_ascii=False)
else:
return None
ttl = ttl if isinstance(ttl, int) and ttl >= 0 else None
value = payload
return (t, ttl, value)
def backup_redis_to_sqlite(r, app_logger=None):
sqlite_init()
now = datetime.utcnow().isoformat()
backed_up = 0
with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn:
for key in r.scan_iter("*"):
k = key.decode() if isinstance(key, (bytes, bytearray)) else str(key)
if k.startswith(SKIP_BACKUP_PREFIXES):
continue
ttl_now = r.ttl(key)
if ttl_now is not None and ttl_now > -1:
continue
rec = _redis_key_to_record(r, key)
if rec is None:
continue
t, ttl, value = rec
conn.execute(
"""
INSERT INTO redis_dump(key, type, ttl, value, updated_at)
VALUES(?,?,?,?,?)
ON CONFLICT(key) DO UPDATE SET
type=excluded.type,
ttl=excluded.ttl,
value=excluded.value,
updated_at=excluded.updated_at
""",
(k, t, ttl if ttl is not None else -1, value, now),
)
backed_up += 1
conn.execute(
"INSERT OR REPLACE INTO meta(k, v) VALUES('backup_last_run', ?)", (now,)
)
conn.execute(
"INSERT OR REPLACE INTO meta(k, v) VALUES('backup_last_count', ?)",
(str(backed_up),),
)
if app_logger:
app_logger.info(f"Redis -> SQLite backup done: {backed_up} keys.")
return backed_up
def _sanitize_ip_token(token: str) -> str:
token = re.sub(r";+\s*$", "", token.strip())
token = token.split("/")[0].strip()
return token
def _valid_ip(ip: str) -> bool:
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
def check_basic_auth(username, password):
expected = CONFIG.get("basic_auth", {})
return (
username == expected.get("username") and
password == expected.get("password")
)
def require_basic_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_basic_auth(auth.username, auth.password):
return Response(
"Unauthorized", 401,
{"WWW-Authenticate": 'Basic realm="Login Required"'}
)
return f(*args, **kwargs)
return decorated
@app.after_request
def add_header(response):
if request.path.startswith("/static/"):
response.cache_control.public = True
response.cache_control.max_age = int(timedelta(days=31).total_seconds())
response.cache_control.immutable = True
return response
def system_tiles():
return {
"active_bans": METRICS["active_bans"]._value.get(),
"total_bans": METRICS["total_bans"]._value.get(),
"drupal_attacks": METRICS["drupal_attacks"]._value.get(),
"memory_usage": f"{psutil.Process(os.getpid()).memory_info().rss/1024/1024:.2f} MB",
"uptime": str(datetime.now() - start_time).split(".")[0],
}
def restore_missing_from_sqlite(r, app_logger=None):
sqlite_init()
with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn:
rows = list(conn.execute("SELECT key, type, ttl, value FROM redis_dump"))
pipe = r.pipeline()
restored = 0
for k, t, ttl, value in rows:
try:
if r.exists(k):
continue
if t == "string":
pipe.set(k, value)
elif t == "hash":
pipe.hmset(k, json.loads(value))
elif t == "list":
arr = json.loads(value)
if arr:
pipe.rpush(k, *arr)
elif t == "set":
arr = json.loads(value)
if arr:
pipe.sadd(k, *arr)
elif t == "zset":
arr = json.loads(value)
if arr:
pipe.zadd(k, {m: float(s) for m, s in arr})
else:
continue
if isinstance(ttl, int) and ttl > 0:
pipe.expire(k, ttl)
restored += 1
except Exception:
continue
pipe.execute()
with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn, conn:
conn.execute(
"INSERT OR REPLACE INTO meta(k, v) VALUES('restore_last_count', ?)",
(str(restored),),
)
conn.execute(
"INSERT OR REPLACE INTO meta(k, v) VALUES('restore_last_run', ?)",
(datetime.utcnow().isoformat(),),
)
if app_logger:
app_logger.info(
f"SQLite restore completed: restored {restored} keys not present in Redis."
)
return restored
def redis_sqlite_sync_daemon():
try:
logger = app.logger if "app" in globals() else None
_ = restore_missing_from_sqlite(r, logger)
while True:
try:
backup_redis_to_sqlite(r, logger)
except Exception as e:
if logger:
logger.error(f"Redis -> SQLite backup failed: {e}")
time.sleep(24 * 3600)
except Exception as e:
try:
app.logger.error(f"sync daemon init error: {e}")
except Exception:
pass
def detect_attack_types(request_uri):
detected_types = []
for attack_type, patterns in CONFIG["attack_patterns"].items():
for pattern in patterns:
try:
if re.search(pattern, request_uri, re.IGNORECASE):
detected_types.append(attack_type)
break
except re.error:
continue
return detected_types
def is_endpoint_whitelisted(uri):
return any(uri.startswith(e) for e in CONFIG.get("whitelist_endpoints", []))
def get_geo_info(ip):
try:
response = geo_reader.city(ip)
return {
"country": response.country.name,
"city": response.city.name,
"latitude": response.location.latitude,
"longitude": response.location.longitude,
}
except Exception as e:
app.logger.error(f"GeoIP error for {ip}: {e}")
return {}
def is_whitelisted(ip, user_agent):
try:
for ip_range in CONFIG["whitelist"]["ip_ranges"]:
if ipaddress.ip_address(ip) in ipaddress.ip_network(ip_range):
METRICS["whitelisted_requests"].inc()
return True
except ValueError as e:
app.logger.error(f"IP validation error: {e}")
if any(
re.search(pattern, user_agent, re.I)
for pattern in CONFIG["whitelist"]["user_agents"]
):
METRICS["whitelisted_requests"].inc()
return True
return False
def send_pushover_notification(message):
if not CONFIG["pushover"]["enabled"]:
return
try:
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": CONFIG["pushover"]["token"],
"user": CONFIG["pushover"]["user_key"],
"message": message,
"title": "autoban Alert",
"priority": 1,
},
)
except Exception as e:
app.logger.error(f"Pushover error: {e}")
def detect_attack(log_entry):
uri = log_entry.get("request_uri", "")
method = log_entry.get("request_method", "")
full_request = f"{method} {uri} {log_entry.get('http_user_agent', '')}".lower()
for attack_type, patterns in CONFIG["attack_patterns"].items():
for pattern in patterns:
if re.search(pattern, full_request, re.IGNORECASE):
if attack_type == "drupal":
METRICS["drupal_attacks"].inc()
elif attack_type == "sqli":
METRICS["sqli_attacks"].inc()
elif attack_type == "xss":
METRICS["xss_attacks"].inc()
elif attack_type == "rce":
METRICS["rce_attacks"].inc()
return attack_type
return None
def analyze_sequence(ip):
sequence = r.lrange(f"sequence:{ip}", 0, -1)
sequence = [s.decode() for s in sequence]
login_attempts = sum(
1
for url in sequence
if any(u in url for u in CONFIG["bruteforce"]["login_urls"])
)
if login_attempts >= CONFIG["bruteforce"]["attempts_threshold"]:
METRICS["bruteforce_attempts"].inc()
return "bruteforce"
total_score = 0
for pattern in CONFIG["sequence"]["suspicious_patterns"]:
patt_list = pattern["pattern"]
patt_score = pattern["score"]
if len(sequence) >= len(patt_list):
for i in range(len(sequence) - len(patt_list) + 1):
if sequence[i : i + len(patt_list)] == patt_list:
total_score += patt_score
if total_score >= CONFIG["sequence"]["threshold"]:
return "suspicious_sequence"
return None
def ban_ip(ip, reason, log_entry):
if r.exists(f"unbanned:{ip}"):
logging.info(f"Skipping ban (recently unbanned): IP={ip}")
return
with lock:
if r.exists(f"banned:{ip}"):
logging.info(f"Already banned: IP={ip}")
return
try:
hostname = socket.gethostbyaddr(ip)[0]
except (socket.herror, socket.gaierror):
hostname = ip
ban_id = str(uuid.uuid4())
geo = get_geo_info(ip)
expires = time.time() + CONFIG["thresholds"]["ban_duration"]
request_size = int(log_entry.get("bytes_sent", 0))
ban_data = {
"ban_id": ban_id,
"ip": ip,
"hostname": hostname,
"timestamp": datetime.now().isoformat(),
"reason": (
f"large_request ({request_size} bytes)"
if reason == "large_request"
else reason
),
"user_agent": log_entry.get("http_user_agent", ""),
"last_request": log_entry["request_uri"],
"geo": json.dumps(geo),
"expires": expires,
"attack_details": json.dumps(
{
"method": log_entry.get("request_method"),
"payload": log_entry["request_uri"][:500],
"status": log_entry.get("status"),
"size": request_size,
"response_time": float(log_entry.get("request_time", 0)),
}
),
}
r.hmset(f"banned:{ip}", ban_data)
r.expire(f"banned:{ip}", CONFIG["thresholds"]["ban_duration"])
with open(CONFIG["deny_file"], "r") as f:
existing_ips = []
for line in f:
if not line.startswith("deny"):
continue
raw = line.split()[1]
ip = _sanitize_ip_token(raw)
if _valid_ip(ip):
existing_ips.append(ip)
if ip not in existing_ips:
with open(CONFIG["deny_file"], "a") as f:
f.write(f"deny {ip};\n")
try:
subprocess.run([bin_path, "-t"], check=True)
subprocess.run([bin_path, "-s", "reload"], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Angie config error: {e}")
r.delete(f"banned:{ip}")
return
METRICS["total_bans"].inc()
METRICS["ban_duration"].observe(CONFIG["thresholds"]["ban_duration"])
METRICS["request_size"].observe(request_size)
METRICS["response_time"].observe(float(log_entry.get("request_time", 0)))
notification_data = {
"ip": ip,
"hostname": hostname,
"reason": ban_data["reason"],
"location": geo.get("country", "Unknown"),
"city": geo.get("city", ""),
"request_method": log_entry.get("request_method", ""),
"request_uri": log_entry["request_uri"],
"user_agent": log_entry.get("http_user_agent", ""),
"request_size": request_size,
"status": log_entry.get("status", ""),
"response_time": float(log_entry.get("request_time", 0)),
"timestamp": time.time(),
"attack_types": detect_attack_types(log_entry["request_uri"]),
}
logging.info(f"Banning IP: {ip}, reason: {reason}")
notification_batcher.add_ban(notification_data)
log_entry["ban_reason"] = reason
update_stats(log_entry, is_blocked=True)
def parse_user_agent(ua_string):
try:
parsed = user_agent_parser.Parse(ua_string)
return f"{parsed['user_agent']['family']} {parsed['user_agent']['major']}"
except:
return "Unknown"
def update_stats(log_entry, is_blocked=False):
if not is_blocked:
return
now = datetime.now()
time_keys = {
"week": f"{now.year}:W{now.isocalendar()[1]}",
"month": f"{now.year}:M{now.month}",
"year": f"{now.year}",
}
pipeline = r.pipeline()
ip = log_entry.get("remote_addr")
url = log_entry.get("request_uri")
ua = log_entry.get("http_user_agent", "Unknown")
browser = parse_user_agent(ua)
country = get_geo_info(ip).get("country", "Unknown")
reason = log_entry.get("ban_reason", "unknown")
for period, key in time_keys.items():
pipeline.hincrby(f"stats:blocked:{key}", "total", 1)
pipeline.zincrby(f"stats:blocked_ips:{key}", 1, ip)
pipeline.zincrby(f"stats:blocked_urls:{key}", 1, url)
pipeline.zincrby(f"stats:blocked_browsers:{key}", 1, browser)
pipeline.zincrby(f"stats:blocked_countries:{key}", 1, country)
pipeline.zincrby(f"stats:blocked_reasons:{key}", 1, reason)
pipeline.execute()
def analyze_log_entry(log_entry):
try:
start_time = time.time()
ip = log_entry["remote_addr"]
status = int(log_entry.get("status", 0))
user_agent = log_entry.get("http_user_agent", "")
uri = log_entry.get("request_uri", "") or ""
method = log_entry.get("request_method", "GET")
if is_endpoint_whitelisted(uri) or uri in CONFIG.get("whitelist_endpoints", []):
logging.info(f"Endpoint whitelisted: {uri}, IP={ip}")
return
if is_whitelisted(ip, user_agent):
logging.info(f"IP whitelisted: {ip}")
return
logging.info(f"Start analyze_log_entry: IP={ip}, URL={uri}")
attack_type = detect_attack(log_entry)
if attack_type:
logging.warning(f"Detected attack type: {attack_type}, IP={ip}, URL={uri}")
ban_ip(ip, f"{attack_type}_attack", log_entry)
return
request_size = int(log_entry.get("bytes_sent", 0))
if request_size > CONFIG["thresholds"]["request_size"]:
logging.warning(f"Large request detected: IP={ip}, size={request_size}")
ban_ip(ip, "large_request", log_entry)
return
ttl_window = CONFIG["thresholds"]["requests_time_window"]
referer = log_entry.get("http_referer", "") or ""
haystack = " ".join([uri, referer, user_agent])
info_patterns = CONFIG["attack_patterns"].get("information_disclosure", [])
info_disclosure_detected = any(
re.search(p, haystack, re.IGNORECASE) for p in info_patterns
)
pipeline = r.pipeline()
pipeline.incr(f"metrics:ip:{ip}:requests")
if r.ttl(f"metrics:ip:{ip}:requests") == -1:
pipeline.expire(f"metrics:ip:{ip}:requests", ttl_window)
if status in CONFIG["thresholds"]["error_codes"]:
pipeline.incr(f"metrics:ip:{ip}:errors")
if r.ttl(f"metrics:ip:{ip}:errors") == -1:
pipeline.expire(f"metrics:ip:{ip}:errors", ttl_window)
elif status == 200:
pipeline.incr(f"metrics:ip:{ip}:success")
if r.ttl(f"metrics:ip:{ip}:success") == -1:
pipeline.expire(f"metrics:ip:{ip}:success", ttl_window)
if info_disclosure_detected:
pipeline.incr(f"metrics:ip:{ip}:info_disclosure_attacks")
if r.ttl(f"metrics:ip:{ip}:info_disclosure_attacks") == -1:
pipeline.expire(f"metrics:ip:{ip}:info_disclosure_attacks", ttl_window)
try:
METRICS["info_disclosure_attacks"].inc()
except Exception:
pass
logging.warning(
f"Information disclosure indicator matched, IP={ip}, URI={uri}"
)
pipeline.lpush(f"sequence:{ip}", uri)
pipeline.ltrim(f"sequence:{ip}", 0, CONFIG["sequence"]["window_size"] - 1)
pipeline.lpush(f"requests:{ip}", f"{method} {uri} {status}")
pipeline.ltrim(f"requests:{ip}", 0, 20)
if any(url in uri for url in CONFIG["bruteforce"]["login_urls"]):
if status != 200:
pipeline.incr(f"metrics:ip:{ip}:failed_logins")
pipeline.expire(
f"metrics:ip:{ip}:failed_logins",
CONFIG["bruteforce"]["time_window"],
)
pipeline.execute()
metrics = {
"requests": int(r.get(f"metrics:ip:{ip}:requests") or 0),
"errors": int(r.get(f"metrics:ip:{ip}:errors") or 0),
"success": int(r.get(f"metrics:ip:{ip}:success") or 0),
"failed_logins": int(r.get(f"metrics:ip:{ip}:failed_logins") or 0),
"info_disclosure_attacks": int(
r.get(f"metrics:ip:{ip}:info_disclosure_attacks") or 0
),
}
if metrics["requests"] > CONFIG["thresholds"]["requests"]:
logging.warning(f"High traffic: IP={ip}, requests={metrics['requests']}")
ban_ip(ip, "high_traffic", log_entry)
elif metrics["errors"] > CONFIG["thresholds"]["errors"]:
logging.warning(f"Many errors: IP={ip}, errors={metrics['errors']}")
ban_ip(ip, "many_errors", log_entry)
elif metrics["success"] > CONFIG["thresholds"]["success_requests"]:
logging.warning(f"High success: IP={ip}, success={metrics['success']}")
ban_ip(ip, "high_success", log_entry)
elif metrics["failed_logins"] >= CONFIG["bruteforce"]["attempts_threshold"]:
logging.warning(
f"Bruteforce: IP={ip}, failed_logins={metrics['failed_logins']}"
)
ban_ip(ip, "bruteforce", log_entry)
elif analyze_sequence(ip):
logging.warning(f"Suspicious sequence: IP={ip}")
ban_ip(ip, "suspicious_sequence", log_entry)
METRICS["processing_time"].observe(time.time() - start_time)
except Exception as e:
logging.error(f"Error analyzing log entry: {e}")
def tail_log_file(log_file):
current_inode = os.stat(log_file).st_ino
with open(log_file, "r") as f:
f.seek(0, 2)
while True:
line = f.readline()
if not line:
try:
if os.stat(log_file).st_ino != current_inode:
f.close()
current_inode = os.stat(log_file).st_ino
f = open(log_file, "r")
f.seek(0, 2)
except Exception as e:
time.sleep(1)
time.sleep(0.5)
continue
try:
log_entry = json.loads(line)
analyze_log_entry(log_entry)
except json.JSONDecodeError:
continue
def tail_log():
threads = []
for log_file in CONFIG["log_files"]:
t = threading.Thread(target=tail_log_file, args=(log_file,), daemon=True)
t.start()
threads.append(t)
for t in threads:
t.join()
def ban_cleaner():
while True:
time.sleep(3600)
with lock:
current_time = time.time()
banned_ips = r.keys("banned:*")
current_bans = []
for ip_key in banned_ips:
data = r.hgetall(ip_key)
if float(data.get(b"expires", 0)) < current_time:
ip = data.get(b"ip", b"").decode()
r.delete(ip_key)
r.setex(f"unbanned:{ip}", 86400, 1)
else:
current_bans.append(data[b"ip"].decode())
with open(CONFIG["deny_file"], "w") as f:
f.write("# Auto-generated deny list\n")
for ip_raw in current_bans:
ip = _sanitize_ip_token(ip_raw)
if _valid_ip(ip):
f.write(f"deny {ip};\n")
if len(banned_ips) != len(current_bans):
try:
subprocess.run([bin_path, "-s", "reload"], check=True)
except subprocess.CalledProcessError as e:
app.logger.error(f"NGINX reload error: {e}")
def export_prometheus_metrics():
while True:
METRICS["active_bans"].set(len(r.keys("banned:*")))
time.sleep(15)
def get_system_info():
try:
mem = psutil.virtual_memory()
disk = psutil.disk_usage("/")
load = psutil.getloadavg()
return {
"system": {
"os": platform.system(),
"cpu_usage": psutil.cpu_percent(),
"memory_used": f"{mem.used / (1024**3):.1f} GB",
"memory_total": f"{mem.total / (1024**3):.1f} GB",
"disk_used": f"{disk.used / (1024**3):.1f} GB",
"disk_free": f"{disk.free / (1024**3):.1f} GB",
"system_load": f"{load[0]:.2f} (1min), {load[1]:.2f} (5min), {load[2]:.2f} (15min)",
},
"application": {
"uptime": str(timedelta(seconds=time.time() - START_TIME)),
"python_version": platform.python_version(),
"total_requests": REQUEST_COUNT,
"current_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
}
except Exception as e:
return {"error": str(e)}
def cleanup_old_stats():
while True:
now = datetime.now()
weeks_to_keep = CONFIG["stats_retention"]["week"]
for i in range(weeks_to_keep, 53):
week_key = f"{(now - timedelta(weeks=i)).year}:W{(now - timedelta(weeks=i)).isocalendar()[1]}"
for stat_type in [
"requests",
"blocked",
"ips",
"urls",
"browsers",
"countries",
]:
r.delete(f"stats:{stat_type}:{week_key}")
months_to_keep = CONFIG["stats_retention"]["month"]
for i in range(months_to_keep, 13):
month_key = f"{(now - timedelta(days=30*i)).year}:M{(now - timedelta(days=30*i)).month}"
for stat_type in [
"requests",
"blocked",
"ips",
"urls",
"browsers",
"countries",
]:
r.delete(f"stats:{stat_type}:{month_key}")
years_to_keep = CONFIG["stats_retention"]["year"]
for i in range(years_to_keep, 10):
year_key = f"{(now - timedelta(days=365*i)).year}"
for stat_type in [
"requests",
"blocked",
"ips",
"urls",
"browsers",
"countries",
]:
r.delete(f"stats:{stat_type}:{year_key}")
time.sleep(86400)
def update_deny_file():
current_bans = []
for key in r.keys("banned:*"):
ip_raw = r.hget(key, "ip").decode()
ip = _sanitize_ip_token(ip_raw)
if _valid_ip(ip):
current_bans.append(ip)
try:
with open(CONFIG["deny_file"], "w") as f:
f.write("# Auto-generated deny list\n")
for ip in sorted(set(current_bans)):
f.write(f"deny {ip};\n")
subprocess.run([bin_path, "-t"], check=True)
subprocess.run([bin_path, "-s", "reload"], check=True)
except subprocess.CalledProcessError as e:
app.logger.error(f"angie/nginx configuration error: {e}")
raise RuntimeError("Failed to update firewall rules") from e
def sync_existing_bans():
with open(CONFIG["deny_file"], "r") as f:
for line in f:
if line.startswith("deny"):
raw = line.split()[1]
ip = _sanitize_ip_token(raw)
if not _valid_ip(ip):
continue
if not r.exists(f"banned:{ip}"):
ban_data = {
"ip": ip,
"reason": "Legacy ban",
"timestamp": datetime.now().isoformat(),
"expires": time.time() + 3600 * 24 * 30,
"hostname": "Legacy ban",
}
r.hmset(f"banned:{ip}", ban_data)
r.expire(f"banned:{ip}", 3600 * 24 * 30)
update_deny_file()
def safe_thread(target, name, retry_delay=5):
def wrapper():
while True:
try:
app.logger.info(f"Thread {name} started.")
target()
except Exception as e:
app.logger.error(f"Thread {name} crashed: {e}")
time.sleep(retry_delay)
t = threading.Thread(target=wrapper, daemon=True)
t.start()
return t
def _is_ip_in_cidrs(ip, cidrs):
try:
ip_obj = ipaddress.ip_address(ip)
except ValueError:
return False
for c in cidrs:
try:
if ip_obj in ipaddress.ip_network(c, strict=False):
return True
except ValueError:
if ip == c:
return True
return False
def _client_ip():
remote = request.remote_addr or ""
xff = (request.headers.get("X-Forwarded-For") or "").split(",")[0].strip()
proxies = CONFIG.get("trusted_proxies", [])
if _is_ip_in_cidrs(remote, proxies):
return xff or remote
return remote
def is_trusted_client():
ip = _client_ip()
for cidr in CONFIG.get("api_trusted_networks", []):
try:
if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
return True
except ValueError:
continue
return False
def is_local_request():
ip = _client_ip()
return ip in ("127.0.0.1", "::1")
def require_api_auth(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
if is_trusted_client():
return view_func(*args, **kwargs)
key = request.headers.get("X-API-Key") or request.args.get("api_key")
if key not in set((CONFIG.get("api_keys") or {}).values()):
return jsonify({"error": "Unauthorized"}), 401
return view_func(*args, **kwargs)
return wrapper
def _ban_record_minimal(h):
ip = h.get(b"ip", b"").decode()
reason = h.get(b"reason", b"").decode() or "unknown"
exp = float(h.get(b"expires", b"0") or 0.0)
return {
"ip": ip,
"reason": reason,
"banned_until": datetime.fromtimestamp(exp).isoformat(),
}
def _safe_json_load(s, fallback=None):
try:
return json.loads(s)
except Exception:
return fallback
def _ban_record_full(h):
d = {k.decode(): v.decode() for k, v in h.items()}
try:
exp = float(d.get("expires", "0") or 0.0)
except ValueError:
exp = 0.0
return {
"ban_id": d.get("ban_id"),
"ip": d.get("ip"),
"hostname": d.get("hostname"),
"reason": d.get("reason"),
"timestamp": d.get("timestamp"),
"expires": exp,
"banned_until": (datetime.fromtimestamp(exp).isoformat() if exp else None),
"user_agent": d.get("user_agent"),
"last_request": d.get("last_request"),
"geo": _safe_json_load(d.get("geo") or "{}", {}),
"attack_details": _safe_json_load(d.get("attack_details") or "{}", {}),
# "geo_raw": d.get("geo"),
# "attack_details_raw": d.get("attack_details"),
}
@app.route("/favicon.ico", methods=["GET"])
def favicon():
return Response(status=204)
@app.route("/")
@require_basic_auth
def index():
global REQUEST_COUNT
REQUEST_COUNT += 1
routes = []
for rule in app.url_map.iter_rules():
if rule.endpoint == "static":
continue
url = str(rule.rule)
if not (url.startswith("/api/") or url == "/healthcheck"):
continue
methods = ",".join(sorted(rule.methods))
routes.append({"url": url, "methods": methods})
routes = sorted(routes, key=lambda r: r["url"])
sys_info = get_system_info()
return render_template(
"index.html",
routes=routes,
sys_info=sys_info,
stats={"system": system_tiles()},
)
@app.route("/stats", methods=["GET"])
@require_basic_auth
def stats_page():
stats = {
"system": {
"active_bans": METRICS["active_bans"]._value.get(),
"total_bans": METRICS["total_bans"]._value.get(),
"drupal_attacks": METRICS["drupal_attacks"]._value.get(),
"memory_usage": f"{psutil.Process(os.getpid()).memory_info().rss/1024/1024:.2f} MB",
"uptime": str(datetime.now() - start_time).split(".")[0],
},
"geo_distribution": {},
"ban_reasons": {},
"banned_ips": [],
}
banned_ips = r.keys("banned:*")
for ip_key in banned_ips:
data = r.hgetall(ip_key)
ip = data.get(b"ip", b"").decode()
ban_entry = {
"ban_id": data.get(b"ban_id", b"").decode(),
"ip": ip,
"hostname": data.get(b"hostname", b"").decode(),
"banned_until": datetime.fromtimestamp(float(data[b"expires"])).isoformat(),
"geo": json.loads(data.get(b"geo", b"{}")),
"reason": data.get(b"reason", b"").decode(),
"user_agent": data.get(b"user_agent", b"").decode(),
"first_seen": data.get(b"timestamp", b"").decode(),
}
stats["banned_ips"].append(ban_entry)
country = ban_entry["geo"].get("country", "Unknown")
stats["geo_distribution"][country] = (
stats["geo_distribution"].get(country, 0) + 1
)
reason = ban_entry["reason"]
stats["ban_reasons"][reason] = stats["ban_reasons"].get(reason, 0) + 1
return render_template("stats.html", stats=stats)
@app.route("/list", methods=["GET", "POST"])
@require_basic_auth
def ban_management():
message = None
error = None
if request.method == "POST" and "delete" in request.form:
selected_ips = request.form.getlist("selected_ips")
if not selected_ips:
error = "Nie wybrano IP do usunięcia"
else:
with lock:
for ip in selected_ips:
r.delete(f"banned:{ip}")
r.setex(f"unbanned:{ip}", 86400, 1)
update_deny_file()
message = f"Usunięto {len(selected_ips)} banów"
if request.method == "POST" and "add_ban" in request.form:
ip = request.form.get("ip", "").strip()
reason = request.form.get("reason", "Manual ban").strip()
duration = int(request.form.get("duration", 3600))
try:
if r.exists(f"unbanned:{ip}"):
error = f"IP {ip} jest tymczasowo chronione przed banem"
else:
ipaddress.ip_address(ip)
if r.exists(f"banned:{ip}"):
error = f"IP {ip} jest już zbanowane"
else:
ban_data = {
"ip": ip,
"reason": reason,
"timestamp": datetime.now().isoformat(),
"expires": time.time() + duration,
"hostname": "Manual ban",
}
r.hmset(f"banned:{ip}", ban_data)
r.expire(f"banned:{ip}", duration)
update_deny_file()
message = f"Dodano bana dla {ip}"
except ValueError:
error = "Niepoprawny format adresu IP"
banned_ips = []
for key in r.keys("banned:*"):
data = r.hgetall(key)
banned_ips.append(
{
"ip": data.get(b"ip", b"").decode(),
"reason": data.get(b"reason", b"").decode(),
"expires": datetime.fromtimestamp(float(data[b"expires"])).strftime(
"%Y-%m-%d %H:%M:%S"
),
"hostname": data.get(b"hostname", b"").decode(),
"payload": data.get(b"last_request", b"").decode(),
}
)
return render_template(
"list.html", banned_ips=banned_ips, message=message, error=error
)
@app.route("/reset", methods=["GET", "POST"])
@require_basic_auth
def reset_counters():
message = None
error = None
if request.method == "POST":
if "reset_ip" in request.form:
ip = request.form.get("ip", "").strip()
try:
ipaddress.ip_address(ip)
deleted = r.delete(f"metrics:ip:{ip}:errors")
if deleted:
message = f"Resetowano liczniki błędów dla IP {ip}"
else:
message = f"Nie znaleziono liczników błędów dla IP {ip}"
except ValueError:
error = "Niepoprawny format adresu IP"
elif "reset_all_errors" in request.form:
keys = r.keys("metrics:ip:*:errors")
for key in keys:
r.delete(key)
message = f"Zresetowano wszystkie liczniki błędów ({len(keys)})"
return render_template("reset.html", message=message, error=error)
@app.route("/logs", methods=["GET"])
@require_basic_auth
def view_logs():
selected_level = request.args.get("level", "INFO").upper()
allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if selected_level not in allowed_levels:
selected_level = "WARNING"
current_level = logging.getLogger().getEffectiveLevel()
level_name = logging.getLevelName(current_level)
return render_template(
"logs.html", selected_level=selected_level, app_log_level=level_name
)
@app.route("/logs-data", methods=["GET"])
@require_basic_auth
def logs_data():
level = request.args.get("level", "INFO").upper()
query = request.args.get("query", "").lower()
allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if level not in allowed_levels:
level = "INFO"
logs_path = os.path.join("logs", "app.log")
logs = []
max_lines = 500
if os.path.exists(logs_path):
with open(logs_path, "r") as f:
lines = f.readlines()[-max_lines:]
lines.reverse()
for line in lines:
if f"[{level}]" in line or any(
allowed_levels.index(lvl) >= allowed_levels.index(level)
and f"[{lvl}]" in line
for lvl in allowed_levels
):
if query and query not in line.lower():
continue
logs.append(line.strip())
return jsonify({"logs": logs})
@app.route("/set-log-level", methods=["POST"])
@require_basic_auth
def set_log_level():
level = request.form.get("level", "INFO").upper()
allowed_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if level not in allowed_levels:
return jsonify({"status": "error", "message": "Nieprawidłowy poziom"}), 400
logging.getLogger().setLevel(getattr(logging, level))
return jsonify({"status": "ok", "message": f"Ustawiono poziom: {level}"})
@app.route("/check", methods=["GET", "POST"])
@require_basic_auth
def check_ip_info():
message = None
error = None
endpoints = []
metrics = {}
recent_errors = []
ip = ""
is_banned = False
ban_info = {}
if request.method == "POST":
ip = request.form.get("ip", "").strip()
try:
ipaddress.ip_address(ip)
endpoints = [e.decode() for e in r.lrange(f"sequence:{ip}", 0, -1)]
metrics = {
"requests": int(r.get(f"metrics:ip:{ip}:requests") or 0),
"errors": int(r.get(f"metrics:ip:{ip}:errors") or 0),
"success": int(r.get(f"metrics:ip:{ip}:success") or 0),
"failed_logins": int(r.get(f"metrics:ip:{ip}:failed_logins") or 0),
"info_disclosure_attacks": int(
r.get(f"metrics:ip:{ip}:info_disclosure_attacks") or 0
),
}
if r.exists(f"banned:{ip}"):
is_banned = True
raw = r.hgetall(f"banned:{ip}")
for k, v in raw.items():
ban_info[k.decode()] = v.decode()
raw_requests = r.lrange(f"requests:{ip}", 0, 20)
for req in raw_requests:
try:
decoded = req.decode()
if any(
code in decoded for code in ["403", "404", "500", "502", "503"]
):
recent_errors.append(decoded)
except Exception:
continue
if not endpoints and all(v == 0 for v in metrics.values()):
if is_banned:
reason = (
ban_info.get("reason")
or ban_info.get("source")
or "zbanowane (brak szczegółów)"
)
message = f"IP {ip} jest zbanowane — powód: {reason}"
else:
message = f"Nie znaleziono żadnych zapisów dla IP {ip}"
except ValueError:
error = "Niepoprawny format adresu IP"
return render_template(
"check.html",
ip=ip,
endpoints=endpoints,
metrics=metrics,
recent_errors=recent_errors,
message=message,
error=error,
is_banned=is_banned,
ban_info=ban_info,
)
@app.route("/backup-now", methods=["POST", "GET"])
@require_basic_auth
def backup_now():
try:
logger = app.logger
except Exception:
logger = None
try:
count = backup_redis_to_sqlite(r, logger)
except Exception as e:
if logger:
logger.error(f"Backup failed: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
all_keys = [k.decode() for k in r.scan_iter("*")]
keys_info = {}
for k in all_keys:
t = r.type(k).decode()
ttl = r.ttl(k)
try:
if t == "string":
v = r.get(k)
v = v.decode() if v is not None else None
elif t == "hash":
raw = r.hgetall(k)
v = {kk.decode(): vv.decode(errors="ignore") for kk, vv in raw.items()}
elif t == "list":
v = [vv.decode(errors="ignore") for vv in r.lrange(k, 0, -1)]
elif t == "set":
v = sorted([vv.decode(errors="ignore") for vv in r.smembers(k)])
elif t == "zset":
v = [
(m.decode(errors="ignore"), float(s))
for m, s in r.zrange(k, 0, -1, withscores=True)
]
else:
v = None
except Exception:
v = None
keys_info[k] = {"type": t, "ttl": ttl, "value": v}
try:
with closing(sqlite3.connect(CONFIG["sqlite_db"])) as conn:
records_total = conn.execute("SELECT COUNT(*) FROM redis_dump").fetchone()[
0
]
meta_rows = dict(conn.execute("SELECT k, v FROM meta").fetchall())
except Exception:
records_total = None
meta_rows = {}
stats = {
"ok": True,
"timestamp": datetime.utcnow().isoformat(),
"redis": {
"keys_total": len(all_keys),
},
"sqlite": {
"path": CONFIG.get("sqlite_db"),
"records_total": records_total,
"backup_last_run": meta_rows.get("backup_last_run"),
"backup_last_count": (
int(meta_rows["backup_last_count"])
if meta_rows.get("backup_last_count")
else None
),
"restore_last_run": meta_rows.get("restore_last_run"),
"restore_last_count": (
int(meta_rows["restore_last_count"])
if meta_rows.get("restore_last_count")
else None
),
},
}
try:
stats["requests"] = {
"total_since_start": REQUEST_TOTAL,
"by_endpoint": dict(REQUEST_COUNTERS),
"app_started_at": APP_START_TS,
}
except Exception:
pass
return jsonify({"stats": stats, "data": keys_info}), 200
@app.route("/charts", methods=["GET", "POST"])
@require_basic_auth
def charts_page():
now = datetime.now()
def _parse_top_n(val, default=50):
try:
n = int(val)
return max(1, min(100, n))
except (TypeError, ValueError):
return default
top_n = _parse_top_n(request.values.get("top_n"), default=50)
period = (request.values.get("period") or "month").lower()
if period not in ("week", "month", "year"):
period = "month"
period_key_map = {
"week": f"{now.year}:W{now.isocalendar()[1]}",
"month": f"{now.year}:M{now.month}",
"year": f"{now.year}",
}
period_key = period_key_map[period]
stats = {
"top_reasons": [
{"reason": reason.decode(), "count": int(score)}
for reason, score in r.zrevrange(
f"stats:blocked_reasons:{period_key}", 0, top_n - 1, withscores=True
)
],
"top_urls": [
{"url": url.decode(), "count": int(score)}
for url, score in r.zrevrange(
f"stats:blocked_urls:{period_key}", 0, top_n - 1, withscores=True
)
],
"top_countries": [
{"country": country.decode(), "count": int(score)}
for country, score in r.zrevrange(
f"stats:blocked_countries:{period_key}", 0, top_n - 1, withscores=True
)
],
}
weeks = []
bans_per_week = []
for i in range(5, -1, -1):
dt = now - timedelta(weeks=i)
wk_key = f"{dt.year}:W{dt.isocalendar()[1]}"
total = int(r.hget(f"stats:blocked:{wk_key}", "total") or 0)
weeks.append(f"{dt.isocalendar()[0]}-W{dt.isocalendar()[1]:02d}")
bans_per_week.append(total)
stats["weeks"] = weeks
stats["bans_per_week"] = bans_per_week
return render_template("charts.html", stats=stats, top_n=top_n, period=period)
######## API #########
@app.route("/api/banned/list", methods=["GET"])
@require_api_auth
def api_banned_list():
"""GET: lista banów; ?full_info=1 zwraca pełniejsze detale dla UI."""
want_full = str(request.args.get("full_info") or "0").lower() in (
"1",
"true",
"yes",
"on",
)
items = []
for key in r.keys("banned:*"):
h = r.hgetall(key)
try:
items.append(_ban_record_full(h) if want_full else _ban_record_minimal(h))
except Exception:
continue
return jsonify({"count": len(items), "items": items}), 200
@app.route("/api/banned/<ip>", methods=["GET", "POST", "DELETE"])
@require_api_auth
def api_banned_ip(ip):
"""GET: szczegóły (z ?full_info=1); POST: dodaj bana; DELETE: usuń bana.
DELETE /api/banned/all — usuń WSZYSTKIE bany."""
if request.method == "DELETE" and ip.lower() == "all":
with lock:
banned_keys = r.keys("banned:*")
removed_ips = []
for ip_key in banned_keys:
try:
ip_decoded = ip_key.decode().split(":")[1]
except Exception:
continue
r.delete(ip_key)
r.setex(f"unbanned:{ip_decoded}", 86400, 1)
removed_ips.append(ip_decoded)
try:
with open(CONFIG["deny_file"], "w") as f:
f.write("# Auto-generated deny list\n")
subprocess.run([bin_path, "-t"], check=True)
subprocess.run([bin_path, "-s", "reload"], check=True)
except subprocess.CalledProcessError as e:
app.logger.error(f"NGINX config error: {e}")
return (
jsonify(
{"error": "NGINX reload failed", "removed": len(removed_ips)}
),
500,
)
return (
jsonify(
{
"status": "all_unbanned",
"count": len(removed_ips),
"ips": removed_ips,
}
),
200,
)
try:
ipaddress.ip_address(ip)
except ValueError:
return jsonify({"error": "Invalid IP"}), 400
if request.method == "GET":
if not r.exists(f"banned:{ip}"):
return jsonify({"error": "IP not found"}), 404
h = r.hgetall(f"banned:{ip}")
want_full = str(request.args.get("full_info") or "0").lower() in (
"1",
"true",
"yes",
"on",
)
return (
jsonify(_ban_record_full(h) if want_full else _ban_record_minimal(h)),
200,
)
if request.method == "DELETE":
with lock:
if not r.exists(f"banned:{ip}"):
return jsonify({"error": "IP not found"}), 404
r.delete(f"banned:{ip}")
r.setex(f"unbanned:{ip}", 86400, 1)
try:
update_deny_file()
except Exception as e:
app.logger.error(f"Update deny failed: {e}")
return jsonify({"status": "unbanned", "ip": ip}), 200
# POST (dodanie bana)
payload = request.get_json(force=True, silent=True) or {}
reason = (payload.get("reason") or "Manual ban").strip()
default_duration = int(CONFIG["thresholds"].get("ban_duration", 3600))
try:
duration = int(payload.get("duration", default_duration))
duration = max(60, min(duration, 3600 * 24 * 365))
except (TypeError, ValueError):
duration = default_duration
with lock:
if r.exists(f"unbanned:{ip}"):
return jsonify({"error": "IP is temporarily protected after unban"}), 409
if r.exists(f"banned:{ip}"):
return jsonify({"error": "IP already banned"}), 409
ban_data = {
"ip": ip,
"reason": reason,
"timestamp": datetime.now().isoformat(),
"expires": time.time() + duration,
"hostname": "Manual ban (API)",
}
r.hmset(f"banned:{ip}", ban_data)
r.expire(f"banned:{ip}", duration)
try:
update_deny_file()
except Exception as e:
app.logger.error(f"angie/nginx update failed after API ban: {e}")
want_full = str(request.args.get("full_info") or "0").lower() in (
"1",
"true",
"yes",
"on",
)
h = r.hgetall(f"banned:{ip}")
body = _ban_record_full(h) if want_full else _ban_record_minimal(h)
body.update({"status": "banned"})
return jsonify(body), 201
@app.route("/healthcheck")
def healthcheck():
try:
checks = {"app_responsive": True, "status": "OK"}
with app.test_client() as client:
user = CONFIG["basic_auth"]["username"]
password = CONFIG["basic_auth"]["password"]
credentials = b64encode(f"{user}:{password}".encode()).decode()
headers = {"Authorization": f"Basic {credentials}"}
response = client.get("/", headers=headers)
checks["app_responsive"] = (response.status_code == 200)
if not all(checks.values()):
checks["status"] = "CRITICAL"
return jsonify(checks), 503
return jsonify(checks), 200
except Exception as e:
return jsonify({"status": "CRITICAL", "error": str(e)}), 503
######################################
if __name__ == "__main__":
sync_existing_bans()
start_http_server(CONFIG["prometheus_port"])
threads = [
safe_thread(tail_log, "tail_log"),
safe_thread(ban_cleaner, "ban_cleaner"),
safe_thread(export_prometheus_metrics, "export_prometheus"),
safe_thread(cleanup_old_stats, "cleanup_stats"),
safe_thread(redis_sqlite_sync_daemon, "redis_sqlite_sync"),
]
app.run(host="0.0.0.0", port=CONFIG["api_port"], use_reloader=False)