Files
adlist_mikrotik/app.py
Mateusz Gruszczyński 01b8ff656e refactor
2025-08-28 22:54:52 +02:00

457 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import redis
import requests
import socket
import time
import json
import base64
import hashlib
from datetime import datetime
from urllib.parse import urlparse, quote, unquote, urljoin
from functools import wraps
from typing import Optional
from flask import Flask, request, render_template, abort, jsonify, stream_with_context, g, Response
from flask_compress import Compress
from flask_limiter import Limiter
import config
app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH
app.config["SECRET_KEY"] = config.SECRET_KEY
app.debug = config.FLASK_DEBUG
def build_redis():
if config.REDIS_URL:
return redis.Redis.from_url(config.REDIS_URL)
return redis.Redis(host=config.REDIS_HOST, port=config.REDIS_PORT, db=config.REDIS_DB)
redis_client = build_redis()
def get_client_ip():
xff = request.headers.get("X-Forwarded-For", "").split(",")
if xff and xff[0].strip():
return xff[0].strip()
return request.remote_addr
limiter = Limiter(
key_func=get_client_ip,
app=app,
default_limits=[config.RATE_LIMIT_DEFAULT],
storage_uri=config.REDIS_URL
)
Compress(app)
@app.before_request
def track_request_data():
g.start_time = time.perf_counter()
redis_client.incr(f"stats:user_agents:{quote(request.headers.get('User-Agent', 'Unknown'), safe='')}")
redis_client.incr(f"stats:client_ips:{get_client_ip()}")
redis_client.incr(f"stats:methods:{request.method}")
@app.after_request
def after_request(response):
elapsed = time.perf_counter() - g.start_time
redis_client.incrbyfloat("stats:processing_time_total", elapsed)
redis_client.incr("stats:processing_time_count")
try:
current_min = float(redis_client.get("stats:processing_time_min") or elapsed)
if elapsed < current_min:
redis_client.set("stats:processing_time_min", elapsed)
except Exception:
redis_client.set("stats:processing_time_min", elapsed)
try:
current_max = float(redis_client.get("stats:processing_time_max") or elapsed)
if elapsed > current_max:
redis_client.set("stats:processing_time_max", elapsed)
except Exception:
redis_client.set("stats:processing_time_max", elapsed)
return response
@app.template_filter("datetimeformat")
def datetimeformat_filter(value, format="%Y-%m-%d %H:%M"):
try:
dt = datetime.fromisoformat(value)
return dt.strftime(format)
except (ValueError, AttributeError):
return value
def basic_auth_required(realm: str, user: str, password: str):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not config.STATS_BASIC_AUTH_ENABLED:
return f(*args, **kwargs)
auth = request.headers.get("Authorization", "")
if auth.startswith("Basic "):
try:
decoded = base64.b64decode(auth[6:]).decode("utf-8", errors="ignore")
u, p = decoded.split(":", 1)
if u == user and p == password:
return f(*args, **kwargs)
except Exception:
pass
resp = Response(status=401)
resp.headers["WWW-Authenticate"] = f'Basic realm="{realm}"'
return resp
return wrapper
return decorator
def cache_key(source_url, ip):
return f"cache:{source_url}:{ip}"
def should_ignore_domain(domain):
return domain.startswith(".") or any(ch in domain for ch in ["~", "=", "$", "'", "^", "_", ">", "<", ":"])
def should_ignore_line(line):
return any(sym in line for sym in ["<", ">", "##", "###", "div", "span"])
def is_valid_domain(domain):
return bool(re.compile(r"^(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$").match(domain))
def convert_host_line(line: str, target_ip: str):
# szybkie odrzucenia
if not line:
return None
line = line.strip()
# komentarze/puste
if not line or line.startswith(("!", "#", "/", ";")):
return None
# wytnij komentarz końcowy (# lub ;) ostrożnie ze 'http://'
# usuwamy wszystko od ' #' lub ' ;' (spacja przed znacznikiem komentarza)
for sep in (" #", " ;"):
idx = line.find(sep)
if idx != -1:
line = line[:idx].rstrip()
if not line:
return None
# 1) AdGuard / uBlock DNS: ||domain^ (opcjonalnie z dodatkami po '^')
m = re.match(r"^\|\|([a-z0-9.-]+)\^", line, re.IGNORECASE)
if m:
domain = m.group(1).strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
parts = line.split()
# 2) Klasyczny hosts: "IP domena [...]" (IPv4 lub IPv6)
if len(parts) >= 2 and (
re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", parts[0]) or ":" in parts[0]
):
domain = parts[1].strip().split("#", 1)[0].strip().strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
# 3) dnsmasq: address=/domain/0.0.0.0 czy server=/domain/...
m = re.match(r"^(?:address|server)=/([a-z0-9.-]+)/", line, re.IGNORECASE)
if m:
domain = m.group(1).strip(".")
if not should_ignore_domain(domain) and is_valid_domain(domain):
return f"{target_ip} {domain}"
return None
# 4) Domain-only: "example.com" lub "example.com # komentarz"
token = parts[0].split("#", 1)[0].strip().strip(".")
if token and not should_ignore_domain(token) and is_valid_domain(token):
return f"{target_ip} {token}"
return None
def build_etag(up_etag: Optional[str], up_lastmod: Optional[str], target_ip: str) -> str:
base = (up_etag or up_lastmod or "no-upstream") + f"::{target_ip}::v1"
return 'W/"' + hashlib.sha1(base.encode("utf-8")).hexdigest() + '"'
def cache_headers(etag: str, up_lm: Optional[str]):
headers = {
"ETag": etag,
"Vary": "Accept-Encoding",
"Content-Type": "text/plain; charset=utf-8",
"X-Content-Type-Options": "nosniff",
"Content-Disposition": "inline; filename=converted_hosts.txt",
}
if config.CACHE_ENABLED:
headers["Cache-Control"] = f"public, s-maxage={config.CACHE_S_MAXAGE}, max-age={config.CACHE_MAX_AGE}"
else:
headers["Cache-Control"] = "no-store"
if up_lm:
headers["Last-Modified"] = up_lm
return headers
def validate_and_normalize_url(url):
parsed = urlparse(url)
if not parsed.scheme:
url = f"https://{url}"
parsed = urlparse(url)
if not parsed.netloc:
raise ValueError("Missing host in URL")
return parsed.geturl()
def track_url_request(url):
redis_client.incr(f"stats:url_requests:{quote(url, safe='')}")
def add_recent_link(url, target_ip):
ts = datetime.now().isoformat()
link_data = f"{ts}|{url}|{target_ip}"
with redis_client.pipeline() as pipe:
pipe.lpush("recent_links", link_data)
pipe.ltrim("recent_links", 0, 9)
pipe.execute()
redis_client.incr("stats:recent_links_added")
def get_recent_links():
links = redis_client.lrange("recent_links", 0, 9)
out = []
for link in links:
parts = link.decode().split("|")
if len(parts) >= 3:
out.append((parts[0], parts[1], parts[2]))
elif len(parts) == 2:
out.append((parts[0], parts[1], "127.0.0.1"))
return out
def get_hostname(ip):
key = f"reverse_dns:{ip}"
cached = redis_client.get(key)
if cached:
return cached.decode()
try:
hostname = socket.gethostbyaddr(ip)[0]
except Exception:
hostname = ip
redis_client.setex(key, 3600, hostname)
return hostname
def add_recent_convert():
ip = get_client_ip()
hostname = get_hostname(ip)
ua = request.headers.get("User-Agent", "Unknown")
time_str = datetime.now().astimezone().isoformat()
url = request.full_path
data = {"url": url, "ip": ip, "hostname": hostname, "time": time_str, "user_agent": ua}
redis_client.lpush("recent_converts", json.dumps(data))
redis_client.ltrim("recent_converts", 0, 49)
@app.route("/favicon.ico", methods=["GET"])
def favicon():
return Response(status=204)
@app.route("/", methods=["GET"])
def index():
generated_link = None
recent_links = get_recent_links()
url_param = request.args.get("url", config.DEFAULT_SOURCE_URL)
target_ip = request.args.get("ip", "127.0.0.1")
if url_param:
try:
normalized = validate_and_normalize_url(unquote(url_param))
encoded = quote(normalized, safe="")
generated_link = urljoin(request.host_url, f"convert?url={encoded}&ip={target_ip}")
add_recent_link(normalized, target_ip)
recent_links = get_recent_links()
except Exception as e:
app.logger.error(f"Error processing URL: {str(e)}")
try:
return render_template(
"form.html",
generated_link=generated_link,
recent_links=recent_links,
client_ip=get_client_ip(),
user_agent=request.headers.get("User-Agent", "Unknown"),
)
except Exception:
return jsonify(
{
"generated_link": generated_link,
"recent_links": recent_links,
"client_ip": get_client_ip(),
"user_agent": request.headers.get("User-Agent", "Unknown"),
}
)
@app.route("/convert")
@limiter.limit(config.RATE_LIMIT_CONVERT)
def convert():
try:
redis_client.incr("stats:convert_requests")
add_recent_convert()
encoded_url = request.args.get("url")
if not encoded_url:
redis_client.incr("stats:errors_400")
abort(400, description="Missing URL parameter")
decoded_url = unquote(encoded_url)
normalized_url = validate_and_normalize_url(decoded_url)
target_ip = request.args.get("ip", "127.0.0.1")
track_url_request(normalized_url)
redis_client.incr(f"stats:target_ips:{target_ip}")
req_headers = {}
inm = request.headers.get("If-None-Match")
ims = request.headers.get("If-Modified-Since")
if inm:
req_headers["If-None-Match"] = inm
if ims:
req_headers["If-Modified-Since"] = ims
with requests.get(normalized_url, headers=req_headers, stream=True, timeout=(10, 60)) as r:
ct = r.headers.get("Content-Type", "")
# pozwól na text/* oraz octet-stream (często używane przez listy)
if "text" not in ct and "octet-stream" not in ct and ct != "":
abort(415, description="Unsupported Media Type")
if r.status_code == 304:
etag = build_etag(r.headers.get("ETag"), r.headers.get("Last-Modified"), target_ip)
resp = Response(status=304)
resp.headers.update(cache_headers(etag, r.headers.get("Last-Modified")))
resp.direct_passthrough = True
return resp
up_etag = r.headers.get("ETag")
up_lm = r.headers.get("Last-Modified")
etag = build_etag(up_etag, up_lm, target_ip)
@stream_with_context
def body_gen():
total = 0
# iter_lines pewnie tnie po \n/\r\n i dekoduje do str
for line in r.iter_lines(decode_unicode=True, chunk_size=config.READ_CHUNK):
if line is None:
continue
# zabezpieczenie przed megadługimi wierszami
if len(line) > config.STREAM_LINE_LIMIT:
continue
out = convert_host_line(line, target_ip)
if out:
s = out + "\n"
total += len(s)
yield s
# statystyki po zakończeniu streamu
redis_client.incrby("stats:content_size_total", total)
redis_client.incr("stats:content_size_count")
resp = Response(body_gen(), mimetype="text/plain; charset=utf-8")
resp.headers.update(cache_headers(etag, up_lm))
# wyłącz kompresję/buforowanie dla strumienia
resp.direct_passthrough = True
redis_client.incr("stats:conversions_success")
return resp
except requests.exceptions.RequestException as e:
app.logger.error(f"Request error: {str(e)}")
redis_client.incr("stats:errors_500")
abort(500)
except ValueError as e:
app.logger.error(f"URL validation error: {str(e)}")
redis_client.incr("stats:errors_400")
abort(400)
@app.route("/convert", methods=["HEAD"])
def convert_head():
encoded_url = request.args.get("url", config.DEFAULT_SOURCE_URL)
if not encoded_url:
abort(400)
decoded_url = unquote(encoded_url)
validate_and_normalize_url(decoded_url)
target_ip = request.args.get("ip", "127.0.0.1")
etag = build_etag(None, None, target_ip)
resp = Response(status=200)
resp.headers.update(cache_headers(etag, None))
resp.direct_passthrough = True
return resp
@app.route("/stats")
@basic_auth_required(
realm=config.STATS_BASIC_AUTH_REALM,
user=config.STATS_BASIC_AUTH_USER,
password=config.STATS_BASIC_AUTH_PASS,
)
def stats():
stats_data, target_ips, url_requests, user_agents, client_ips = {}, {}, {}, {}, {}
for key in redis_client.scan_iter("stats:*"):
key_str = key.decode()
value = redis_client.get(key).decode()
if key_str.startswith("stats:target_ips:"):
ip = key_str.split(":", 2)[2]
target_ips[ip] = value
elif key_str.startswith("stats:url_requests:"):
url = unquote(key_str.split(":", 2)[2])
url_requests[url] = value
elif key_str.startswith("stats:user_agents:"):
ua = unquote(key_str.split(":", 2)[2])
user_agents[ua] = value
elif key_str.startswith("stats:client_ips:"):
ip = key_str.split(":", 2)[2]
client_ips[ip] = value
else:
stats_data[key_str] = value
recent_converts = []
for entry in redis_client.lrange("recent_converts", 0, 49):
try:
recent_converts.append(json.loads(entry.decode()))
except Exception:
pass
processing_time_total = float(redis_client.get("stats:processing_time_total") or 0)
processing_time_count = int(redis_client.get("stats:processing_time_count") or 0)
avg_processing_time = processing_time_total / processing_time_count if processing_time_count > 0 else 0
content_size_total = int(redis_client.get("stats:content_size_total") or 0)
content_size_count = int(redis_client.get("stats:content_size_count") or 0)
avg_content_size = content_size_total / content_size_count if content_size_count > 0 else 0
detailed_stats = {
"processing_time_total_sec": processing_time_total,
"processing_time_count": processing_time_count,
"processing_time_avg_sec": avg_processing_time,
"processing_time_min_sec": float(redis_client.get("stats:processing_time_min") or 0),
"processing_time_max_sec": float(redis_client.get("stats:processing_time_max") or 0),
"content_size_total_bytes": content_size_total,
"content_size_count": content_size_count,
"content_size_avg_bytes": avg_content_size,
}
return jsonify(
{
**stats_data,
"target_ips": target_ips,
"url_requests": url_requests,
"user_agents": user_agents,
"client_ips": client_ips,
"recent_converts": recent_converts,
"detailed_stats": detailed_stats,
}
)
@app.errorhandler(400)
@app.errorhandler(403)
@app.errorhandler(404)
@app.errorhandler(413)
@app.errorhandler(415)
@app.errorhandler(500)
def handle_errors(e):
try:
return render_template("error.html", error=e), e.code
except Exception:
return jsonify({"error": getattr(e, "description", str(e)), "code": e.code}), e.code
if __name__ == "__main__":
app.run(host=config.BIND_HOST, port=config.BIND_PORT)
else:
from asgiref.wsgi import WsgiToAsgi
asgi_app = WsgiToAsgi(app)