Files
adlist_mikrotik/app.py
2025-08-29 23:57:39 +02:00

796 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import redis
import requests
import socket
import time
import json
import hashlib
import ipaddress
import hmac, ipaddress
from datetime import datetime
from urllib.parse import urlparse, quote, unquote, urljoin
from functools import wraps
from typing import Optional
from datetime import timezone
import json as _json
from flask import (
Flask,
request,
render_template,
abort,
jsonify,
stream_with_context,
g,
Response,
)
from flask_compress import Compress
from flask_limiter import Limiter
import config
app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH
app.config["SECRET_KEY"] = config.SECRET_KEY
app.debug = config.FLASK_DEBUG
def build_redis():
if config.REDIS_URL:
return redis.Redis.from_url(config.REDIS_URL)
return redis.Redis(
host=config.REDIS_HOST, port=config.REDIS_PORT, db=config.REDIS_DB
)
redis_client = build_redis()
def get_client_ip():
xff = request.headers.get("X-Forwarded-For", "").split(",")
if xff and xff[0].strip():
return xff[0].strip()
return request.remote_addr
limiter = Limiter(
key_func=get_client_ip,
app=app,
default_limits=[config.RATE_LIMIT_DEFAULT],
storage_uri=config.REDIS_URL,
)
Compress(app)
@app.before_request
def track_request_data():
g.start_time = time.perf_counter()
redis_client.incr(
f"stats:user_agents:{quote(request.headers.get('User-Agent', 'Unknown'), safe='')}"
)
redis_client.incr(f"stats:client_ips:{get_client_ip()}")
redis_client.incr(f"stats:methods:{request.method}")
@app.after_request
def finalize_response(response):
elapsed = time.perf_counter() - g.start_time
redis_client.incrbyfloat("stats:processing_time_total", elapsed)
redis_client.incr("stats:processing_time_count")
try:
current_min = float(redis_client.get("stats:processing_time_min") or elapsed)
if elapsed < current_min:
redis_client.set("stats:processing_time_min", elapsed)
except Exception:
redis_client.set("stats:processing_time_min", elapsed)
try:
current_max = float(redis_client.get("stats:processing_time_max") or elapsed)
if elapsed > current_max:
redis_client.set("stats:processing_time_max", elapsed)
except Exception:
redis_client.set("stats:processing_time_max", elapsed)
path = request.path or "/"
if response.status_code >= 400:
response.headers["Cache-Control"] = "no-store"
return response
if path.startswith("/static/"):
response.headers.pop("Content-Disposition", None)
if path.endswith((".css", ".js")):
response.headers["Cache-Control"] = "public, max-age=31536000, immutable"
else:
response.headers["Cache-Control"] = "public, max-age=86400"
return response
if path == "/":
response.headers["Cache-Control"] = "private, no-store"
return response
return response
@app.template_filter("datetimeformat")
def datetimeformat_filter(value, format="%Y-%m-%d %H:%M"):
try:
dt = datetime.fromisoformat(value)
return dt.strftime(format)
except (ValueError, AttributeError):
return value
def basic_auth_required(realm: str, user: str, password: str):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not config.STATS_BASIC_AUTH_ENABLED:
return f(*args, **kwargs)
auth = request.authorization
if (
auth
and auth.type == "basic"
and auth.username == user
and auth.password == password
):
return f(*args, **kwargs)
resp = Response(status=401)
resp.headers["WWW-Authenticate"] = f'Basic realm="{realm}"'
return resp
return wrapper
return decorator
def cache_key(source_url, ip):
return f"cache:{source_url}:{ip}"
def should_ignore_domain(domain):
return domain.startswith(".") or any(
ch in domain for ch in ["~", "=", "$", "'", "^", "_", ">", "<", ":"]
)
def should_ignore_line(line):
return any(sym in line for sym in ["<", ">", "##", "###", "div", "span"])
def is_valid_domain(domain):
return bool(re.compile(r"^(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$").match(domain))
def is_private_client_ip() -> bool:
ip = get_client_ip()
try:
return ipaddress.ip_address(ip).is_private
except Exception:
return False
def convert_host_line(line: str, target_ip: str):
# szybkie odrzucenia
if not line:
return None
line = line.strip()
if not line or line.startswith(("!", "#", "/", ";")):
return None
for sep in (" #", " ;"):
idx = line.find(sep)
if idx != -1:
line = line[:idx].rstrip()
if not line:
return None
m = re.match(r"^\|\|([a-z0-9.-]+)\^", line, re.IGNORECASE)
if m:
domain = m.group(1).strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
parts = line.split()
if len(parts) >= 2 and (
re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", parts[0]) or ":" in parts[0]
):
domain = parts[1].strip().split("#", 1)[0].strip().strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
m = re.match(r"^(?:address|server)=/([a-z0-9.-]+)/", line, re.IGNORECASE)
if m:
domain = m.group(1).strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
token = parts[0].split("#", 1)[0].strip().strip(".")
if token and not should_ignore_domain(token) and is_valid_domain(token):
return f"{target_ip} {token}"
return None
def build_etag(
up_etag: Optional[str], up_lastmod: Optional[str], target_ip: str
) -> str:
base = (up_etag or up_lastmod or "no-upstream") + f"::{target_ip}::v1"
return 'W/"' + hashlib.sha1(base.encode("utf-8")).hexdigest() + '"'
def cache_headers(etag: str, up_lm: Optional[str]):
headers = {
"ETag": etag,
"Vary": "Accept-Encoding",
"Content-Type": "text/plain; charset=utf-8",
"X-Content-Type-Options": "nosniff",
# "Content-Disposition": "inline; filename=converted_hosts.txt",
}
if config.CACHE_ENABLED:
headers["Cache-Control"] = (
f"public, s-maxage={config.CACHE_S_MAXAGE}, max-age={config.CACHE_MAX_AGE}"
)
else:
headers["Cache-Control"] = "no-store"
if up_lm:
headers["Last-Modified"] = up_lm
return headers
def validate_and_normalize_url(url):
url = (url or "").strip()
# prosta sanity-check: usuń CR/LF
if any(c in url for c in ("\r", "\n")):
raise ValueError("Invalid characters in URL")
parsed = urlparse(url)
if not parsed.scheme:
url = f"https://{url}"
parsed = urlparse(url)
# akceptuj tylko http/https
if parsed.scheme not in {"http", "https"}:
raise ValueError(f"Unsupported scheme: {parsed.scheme}")
if not parsed.netloc:
raise ValueError("Missing host in URL")
return parsed.geturl()
def track_url_request(url):
redis_client.incr(f"stats:url_requests:{quote(url, safe='')}")
def add_recent_link(url, target_ip):
ts = datetime.now().isoformat()
new_item = f"{ts}|{url}|{target_ip}"
key = "recent_links"
current = redis_client.lrange(key, 0, -1)
filtered = []
for raw in current:
try:
s = raw.decode()
parts = s.split("|")
if len(parts) >= 3 and parts[1] == url and parts[2] == target_ip:
continue
except Exception:
pass
filtered.append(raw)
with redis_client.pipeline() as pipe:
pipe.delete(key)
pipe.lpush(key, new_item)
if filtered:
pipe.rpush(key, *filtered[:99])
pipe.ltrim(key, 0, 9)
pipe.execute()
redis_client.incr("stats:recent_links_added")
def get_recent_links():
links = redis_client.lrange("recent_links", 0, 9)
out = []
for link in links:
parts = link.decode().split("|")
if len(parts) >= 3:
out.append((parts[0], parts[1], parts[2]))
elif len(parts) == 2:
out.append((parts[0], parts[1], "127.0.0.1"))
return out
def get_hostname(ip):
key = f"reverse_dns:{ip}"
cached = redis_client.get(key)
if cached:
return cached.decode()
try:
hostname = socket.gethostbyaddr(ip)[0]
except Exception:
hostname = ip
redis_client.setex(key, 3600, hostname)
return hostname
def add_recent_convert():
ip = get_client_ip()
hostname = get_hostname(ip)
ua = request.headers.get("User-Agent", "Unknown")
time_str = datetime.now().astimezone().isoformat()
url = request.full_path
data = {
"url": url,
"ip": ip,
"hostname": hostname,
"time": time_str,
"user_agent": ua,
}
redis_client.lpush("recent_converts", json.dumps(data))
redis_client.ltrim("recent_converts", 0, 99)
def validate_ip(value: str) -> str:
v = (value or "").strip()
try:
return str(ipaddress.ip_address(v))
except Exception:
raise ValueError("Invalid IP address")
@app.route("/favicon.ico", methods=["GET"])
def favicon():
return Response(status=204)
@app.route("/", methods=["GET"])
def index():
recent_links = get_recent_links()
try:
return render_template(
"form.html",
recent_links=recent_links,
client_ip=get_client_ip(),
user_agent=request.headers.get("User-Agent", "Unknown"),
)
except Exception:
return jsonify(
{
"recent_links": recent_links,
"client_ip": get_client_ip(),
"user_agent": request.headers.get("User-Agent", "Unknown"),
}
)
@app.route("/convert")
@limiter.limit(config.RATE_LIMIT_CONVERT)
def convert():
def is_private_client_ip() -> bool:
ip = get_client_ip()
try:
return ipaddress.ip_address(ip).is_private
except Exception:
return False
requested_debug = request.args.get("debug", "").lower() in (
"1",
"true",
"t",
"yes",
"y",
"on",
)
debug_allowed = False
if config.DEBUG_ENABLE:
header_key = request.headers.get("X-Debug-Key", "")
if (
config.DEBUG_KEY
and header_key
and hmac.compare_digest(header_key, config.DEBUG_KEY)
):
debug_allowed = True
elif is_private_client_ip():
debug_allowed = True
if requested_debug and not debug_allowed:
abort(403)
debug_mode = requested_debug and debug_allowed
debug_lines = []
def d(msg):
ts = datetime.now().isoformat()
line = f"# [DEBUG {ts}] {msg}"
debug_lines.append(line)
app.logger.debug(line)
def debug_response(status=200):
body = "\n".join(debug_lines) + ("\n" if debug_lines else "")
resp = Response(body, mimetype="text/plain; charset=utf-8", status=status)
resp.headers["X-Debug-Mode"] = "1"
resp.headers["Cache-Control"] = "no-store"
return resp
try:
add_recent_convert()
redis_client.incr("stats:convert_requests")
if debug_mode:
d("Start /convert w trybie debug")
encoded_url = request.args.get("url")
if not encoded_url:
if debug_mode:
d("Brak parametru ?url")
return debug_response(status=400)
redis_client.incr("stats:errors_400")
abort(400, description="Missing URL parameter")
decoded_url = unquote(encoded_url)
try:
normalized_url = validate_and_normalize_url(decoded_url)
except ValueError as e:
if debug_mode:
d(f"Błąd walidacji URL: {e}")
return debug_response(status=400)
redis_client.incr("stats:errors_400")
abort(400)
try:
target_ip = validate_ip(request.args.get("ip", "127.0.0.1"))
except ValueError:
if debug_mode:
d("Bad parametr ?ip")
return debug_response(status=400)
redis_client.incr("stats:errors_400")
abort(400, description="Invalid IP")
if debug_mode:
d(f"URL (encoded): {encoded_url}")
d(f"URL (decoded): {decoded_url}")
d(f"URL (norm): {normalized_url}")
d(f"target_ip: {target_ip}")
track_url_request(normalized_url)
redis_client.incr(f"stats:target_ips:{target_ip}")
# nagłówki If-*
req_headers = {}
inm = request.headers.get("If-None-Match")
ims = request.headers.get("If-Modified-Since")
if inm:
req_headers["If-None-Match"] = inm
if ims:
req_headers["If-Modified-Since"] = ims
if debug_mode:
d("Wysyłam GET do upstreamu")
d(f"Nagłówki: {req_headers or '{}'}")
r = requests.get(
normalized_url, headers=req_headers, stream=True, timeout=(10, 60)
)
ct = r.headers.get("Content-Type", "")
if debug_mode:
d(f"Upstream status: {r.status_code}")
d(f"Content-Type: {ct or '(brak)'}")
d(f"ETag: {r.headers.get('ETag')}")
d(f"Last-Modified: {r.headers.get('Last-Modified')}")
if "text" not in ct and "octet-stream" not in ct and ct != "":
if debug_mode:
d("Unsupported Media Type -> 415")
r.close()
return debug_response(status=415)
r.close()
abort(415, description="Unsupported Media Type")
if r.status_code == 304:
etag = build_etag(
r.headers.get("ETag"), r.headers.get("Last-Modified"), target_ip
)
if debug_mode:
d("Upstream 304 zwracam 304")
r.close()
add_recent_link(normalized_url, target_ip)
return debug_response(status=304)
resp = Response(status=304)
resp.headers.update(cache_headers(etag, r.headers.get("Last-Modified")))
resp.direct_passthrough = True
r.close()
add_recent_link(normalized_url, target_ip)
return resp
up_etag = r.headers.get("ETag")
up_lm = r.headers.get("Last-Modified")
etag = build_etag(up_etag, up_lm, target_ip)
if debug_mode:
d(f"Etag dla klienta: {etag}")
@stream_with_context
def body_gen():
lines_read = 0
lines_emitted = 0
try:
if debug_mode:
yield "\n".join(debug_lines) + "\n"
debug_lines.clear()
for line in r.iter_lines(
decode_unicode=True, chunk_size=config.READ_CHUNK
):
if line is None:
continue
lines_read += 1
if len(line) > config.STREAM_LINE_LIMIT:
if debug_mode and lines_read <= 5:
yield f"# [DEBUG] pominięto długi wiersz ({len(line)} bajtów)\n"
continue
out = convert_host_line(line, target_ip)
if out:
lines_emitted += 1
yield out + "\n"
if debug_mode and lines_read <= 5:
preview = line[:200].replace("\r", "\\r").replace("\n", "\\n")
yield f"# [DEBUG] podgląd linii {lines_read}: {preview}\n"
if debug_mode:
yield f"# [DEBUG] podsumowanie: przeczytano={lines_read}, wyemitowano={lines_emitted}\n"
if lines_emitted == 0:
yield "# [DEBUG] Uwaga: 0 linii wynikowych czy format listy pasuje?\n"
redis_client.incrby("stats:content_size_total", 0)
redis_client.incr("stats:content_size_count")
finally:
r.close()
resp = Response(body_gen(), mimetype="text/plain; charset=utf-8")
resp.headers.update(cache_headers(etag, up_lm))
resp.direct_passthrough = True
redis_client.incr("stats:conversions_success")
add_recent_link(normalized_url, target_ip)
return resp
except requests.exceptions.RequestException as e:
app.logger.error(f"Request error: {str(e)}")
redis_client.incr("stats:errors_500")
if debug_mode:
d(f"Wyjątek requests: {e}")
return debug_response(status=502)
abort(500)
except ValueError as e:
app.logger.error(f"URL validation error: {str(e)}")
redis_client.incr("stats:errors_400")
if debug_mode:
d(f"Wyjątek ValueError: {e}")
return debug_response(status=400)
abort(400)
@app.route("/convert", methods=["HEAD"])
def convert_head():
encoded_url = request.args.get("url", config.DEFAULT_SOURCE_URL)
if not encoded_url:
abort(400)
decoded_url = unquote(encoded_url)
validate_and_normalize_url(decoded_url)
target_ip = validate_ip(request.args.get("ip", "127.0.0.1"))
etag = build_etag(None, None, target_ip)
resp = Response(status=200)
resp.headers.update(cache_headers(etag, None))
resp.direct_passthrough = True
return resp
@app.route("/stats")
@basic_auth_required(
realm=config.STATS_BASIC_AUTH_REALM,
user=config.STATS_BASIC_AUTH_USER,
password=config.STATS_BASIC_AUTH_PASS,
)
def stats():
stats_data, target_ips, url_requests, user_agents, client_ips = {}, {}, {}, {}, {}
# Zbierz klucze stats:*
for key in redis_client.scan_iter("stats:*"):
key_str = key.decode()
value = (redis_client.get(key) or b"0").decode()
if key_str.startswith("stats:target_ips:"):
ip = key_str.split(":", 2)[2]
target_ips[ip] = value
elif key_str.startswith("stats:url_requests:"):
url = unquote(key_str.split(":", 2)[2])
url_requests[url] = value
elif key_str.startswith("stats:user_agents:"):
ua = unquote(key_str.split(":", 2)[2])
user_agents[ua] = value
elif key_str.startswith("stats:client_ips:"):
ip = key_str.split(":", 2)[2]
client_ips[ip] = value
else:
stats_data[key_str] = value
recent_converts = []
for entry in redis_client.lrange("recent_converts", 0, 99):
try:
recent_converts.append(json.loads(entry.decode()))
except Exception:
pass
# Agregaty szczegółowe
processing_time_total = float(redis_client.get("stats:processing_time_total") or 0)
processing_time_count = int(redis_client.get("stats:processing_time_count") or 0)
avg_processing_time = (
processing_time_total / processing_time_count
if processing_time_count > 0
else 0
)
content_size_total = int(redis_client.get("stats:content_size_total") or 0)
content_size_count = int(redis_client.get("stats:content_size_count") or 0)
avg_content_size = (
content_size_total / content_size_count if content_size_count > 0 else 0
)
detailed_stats = {
"processing_time_total_sec": processing_time_total,
"processing_time_count": processing_time_count,
"processing_time_avg_sec": avg_processing_time,
"processing_time_min_sec": float(
redis_client.get("stats:processing_time_min") or 0
),
"processing_time_max_sec": float(
redis_client.get("stats:processing_time_max") or 0
),
"content_size_total_bytes": content_size_total,
"content_size_count": content_size_count,
"content_size_avg_bytes": avg_content_size,
}
# Surowe JSON do sekcji "Raw JSON" na stronie
raw_json = _json.dumps(
{
**stats_data,
"target_ips": target_ips,
"url_requests": url_requests,
"user_agents": user_agents,
"client_ips": client_ips,
"recent_converts": recent_converts,
"detailed_stats": detailed_stats,
},
indent=2,
)
return render_template(
"stats.html",
stats=stats_data,
target_ips=target_ips,
url_requests=url_requests,
user_agents=user_agents,
client_ips=client_ips,
recent=recent_converts,
detailed=detailed_stats,
raw_json=raw_json,
)
@app.route("/stats.json")
@basic_auth_required(
realm=config.STATS_BASIC_AUTH_REALM,
user=config.STATS_BASIC_AUTH_USER,
password=config.STATS_BASIC_AUTH_PASS,
)
def stats_json():
stats_data, target_ips, url_requests, user_agents, client_ips = {}, {}, {}, {}, {}
for key in redis_client.scan_iter("stats:*"):
key_str = key.decode()
value = (redis_client.get(key) or b"0").decode()
if key_str.startswith("stats:target_ips:"):
ip = key_str.split(":", 2)[2]
target_ips[ip] = value
elif key_str.startswith("stats:url_requests:"):
url = unquote(key_str.split(":", 2)[2])
url_requests[url] = value
elif key_str.startswith("stats:user_agents:"):
ua = unquote(key_str.split(":", 2)[2])
user_agents[ua] = value
elif key_str.startswith("stats:client_ips:"):
ip = key_str.split(":", 2)[2]
client_ips[ip] = value
else:
stats_data[key_str] = value
recent_converts = []
for entry in redis_client.lrange("recent_converts", 0, 99):
try:
recent_converts.append(json.loads(entry.decode()))
except Exception:
pass
processing_time_total = float(redis_client.get("stats:processing_time_total") or 0)
processing_time_count = int(redis_client.get("stats:processing_time_count") or 0)
avg_processing_time = (
processing_time_total / processing_time_count
if processing_time_count > 0
else 0
)
content_size_total = int(redis_client.get("stats:content_size_total") or 0)
content_size_count = int(redis_client.get("stats:content_size_count") or 0)
avg_content_size = (
content_size_total / content_size_count if content_size_count > 0 else 0
)
detailed_stats = {
"processing_time_total_sec": processing_time_total,
"processing_time_count": processing_time_count,
"processing_time_avg_sec": avg_processing_time,
"processing_time_min_sec": float(
redis_client.get("stats:processing_time_min") or 0
),
"processing_time_max_sec": float(
redis_client.get("stats:processing_time_max") or 0
),
"content_size_total_bytes": content_size_total,
"content_size_count": content_size_count,
"content_size_avg_bytes": avg_content_size,
}
return jsonify(
{
**stats_data,
"target_ips": target_ips,
"url_requests": url_requests,
"user_agents": user_agents,
"client_ips": client_ips,
"recent_converts": recent_converts,
"detailed_stats": detailed_stats,
}
)
@app.errorhandler(400)
@app.errorhandler(403)
@app.errorhandler(404)
@app.errorhandler(413)
@app.errorhandler(415)
@app.errorhandler(500)
def handle_errors(e):
try:
now_iso = datetime.now().astimezone().isoformat()
return render_template(
"error.html", error=e, code=getattr(e, "code", 500), now_iso=now_iso
), getattr(e, "code", 500)
except Exception:
return jsonify(
{
"error": getattr(e, "description", str(e)),
"code": getattr(e, "code", 500),
}
), getattr(e, "code", 500)
if __name__ == "__main__":
app.run(host=config.BIND_HOST, port=config.BIND_PORT)
else:
from asgiref.wsgi import WsgiToAsgi
asgi_app = WsgiToAsgi(app)