import re import redis import requests import socket import time import json import hashlib import ipaddress from datetime import datetime from urllib.parse import urlparse, quote, unquote, urljoin from functools import wraps from typing import Optional from datetime import timezone import json as _json from flask import ( Flask, request, render_template, abort, jsonify, stream_with_context, g, Response, ) from flask_compress import Compress from flask_limiter import Limiter import config app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = config.MAX_CONTENT_LENGTH app.config["SECRET_KEY"] = config.SECRET_KEY app.debug = config.FLASK_DEBUG def build_redis(): if config.REDIS_URL: return redis.Redis.from_url(config.REDIS_URL) return redis.Redis( host=config.REDIS_HOST, port=config.REDIS_PORT, db=config.REDIS_DB ) redis_client = build_redis() def get_client_ip(): xff = request.headers.get("X-Forwarded-For", "").split(",") if xff and xff[0].strip(): return xff[0].strip() return request.remote_addr limiter = Limiter( key_func=get_client_ip, app=app, default_limits=[config.RATE_LIMIT_DEFAULT], storage_uri=config.REDIS_URL, ) Compress(app) @app.before_request def track_request_data(): g.start_time = time.perf_counter() redis_client.incr( f"stats:user_agents:{quote(request.headers.get('User-Agent', 'Unknown'), safe='')}" ) redis_client.incr(f"stats:client_ips:{get_client_ip()}") redis_client.incr(f"stats:methods:{request.method}") @app.after_request def finalize_response(response): elapsed = time.perf_counter() - g.start_time redis_client.incrbyfloat("stats:processing_time_total", elapsed) redis_client.incr("stats:processing_time_count") try: current_min = float(redis_client.get("stats:processing_time_min") or elapsed) if elapsed < current_min: redis_client.set("stats:processing_time_min", elapsed) except Exception: redis_client.set("stats:processing_time_min", elapsed) try: current_max = float(redis_client.get("stats:processing_time_max") or elapsed) if elapsed > current_max: redis_client.set("stats:processing_time_max", elapsed) except Exception: redis_client.set("stats:processing_time_max", elapsed) path = request.path or "/" if response.status_code >= 400: response.headers["Cache-Control"] = "no-store" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response if path.startswith("/static/"): response.headers.pop("Content-Disposition", None) if path.endswith((".css", ".js")): response.headers["Cache-Control"] = "public, max-age=31536000, immutable" else: response.headers["Cache-Control"] = "public, max-age=86400" return response if path == "/": response.headers["Cache-Control"] = "private, no-store" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response return response @app.template_filter("datetimeformat") def datetimeformat_filter(value, format="%Y-%m-%d %H:%M"): try: dt = datetime.fromisoformat(value) return dt.strftime(format) except (ValueError, AttributeError): return value def basic_auth_required(realm: str, user: str, password: str): def decorator(f): @wraps(f) def wrapper(*args, **kwargs): if not config.STATS_BASIC_AUTH_ENABLED: return f(*args, **kwargs) auth = request.authorization if ( auth and auth.type == "basic" and auth.username == user and auth.password == password ): return f(*args, **kwargs) resp = Response(status=401) resp.headers["WWW-Authenticate"] = f'Basic realm="{realm}"' return resp return wrapper return decorator def cache_key(source_url, ip): return f"cache:{source_url}:{ip}" def should_ignore_domain(domain): return domain.startswith(".") or any( ch in domain for ch in ["~", "=", "$", "'", "^", "_", ">", "<", ":"] ) def should_ignore_line(line): return any(sym in line for sym in ["<", ">", "##", "###", "div", "span"]) def is_valid_domain(domain): return bool(re.compile(r"^(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$").match(domain)) def is_private_client_ip() -> bool: ip = get_client_ip() try: return ipaddress.ip_address(ip).is_private except Exception: return False def convert_host_line(line: str, target_ip: str): # szybkie odrzucenia if not line: return None line = line.strip() if not line or line.startswith(("!", "#", "/", ";")): return None for sep in (" #", " ;"): idx = line.find(sep) if idx != -1: line = line[:idx].rstrip() if not line: return None m = re.match(r"^\|\|([a-z0-9.-]+)\^", line, re.IGNORECASE) if m: domain = m.group(1).strip(".") if not should_ignore_domain(domain) and is_valid_domain(domain): return f"{target_ip} {domain}" return None parts = line.split() if len(parts) >= 2 and ( re.match(r"^\d{1,3}(?:\.\d{1,3}){3}$", parts[0]) or ":" in parts[0] ): domain = parts[1].strip().split("#", 1)[0].strip().strip(".") if not should_ignore_domain(domain) and is_valid_domain(domain): return f"{target_ip} {domain}" return None m = re.match(r"^(?:address|server)=/([a-z0-9.-]+)/", line, re.IGNORECASE) if m: domain = m.group(1).strip(".") if not should_ignore_domain(domain) and is_valid_domain(domain): return f"{target_ip} {domain}" return None token = parts[0].split("#", 1)[0].strip().strip(".") if token and not should_ignore_domain(token) and is_valid_domain(token): return f"{target_ip} {token}" return None def build_etag( up_etag: Optional[str], up_lastmod: Optional[str], target_ip: str ) -> str: base = (up_etag or up_lastmod or "no-upstream") + f"::{target_ip}::v1" return 'W/"' + hashlib.sha1(base.encode("utf-8")).hexdigest() + '"' def cache_headers(etag: str, up_lm: Optional[str]): headers = { "ETag": etag, "Vary": "Accept-Encoding", "Content-Type": "text/plain; charset=utf-8", "X-Content-Type-Options": "nosniff", # "Content-Disposition": "inline; filename=converted_hosts.txt", } if config.CACHE_ENABLED: headers["Cache-Control"] = ( f"public, s-maxage={config.CACHE_S_MAXAGE}, max-age={config.CACHE_MAX_AGE}" ) else: headers["Cache-Control"] = "no-store" if up_lm: headers["Last-Modified"] = up_lm return headers def validate_and_normalize_url(url): url = (url or "").strip() # prosta sanity-check: usuń CR/LF if any(c in url for c in ("\r", "\n")): raise ValueError("Invalid characters in URL") parsed = urlparse(url) if not parsed.scheme: url = f"https://{url}" parsed = urlparse(url) # akceptuj tylko http/https if parsed.scheme not in {"http", "https"}: raise ValueError(f"Unsupported scheme: {parsed.scheme}") if not parsed.netloc: raise ValueError("Missing host in URL") return parsed.geturl() def track_url_request(url): redis_client.incr(f"stats:url_requests:{quote(url, safe='')}") def add_recent_link(url, target_ip): ts = datetime.now().isoformat() link_data = f"{ts}|{url}|{target_ip}" with redis_client.pipeline() as pipe: pipe.lpush("recent_links", link_data) pipe.ltrim("recent_links", 0, 9) pipe.execute() redis_client.incr("stats:recent_links_added") def get_recent_links(): links = redis_client.lrange("recent_links", 0, 9) out = [] for link in links: parts = link.decode().split("|") if len(parts) >= 3: out.append((parts[0], parts[1], parts[2])) elif len(parts) == 2: out.append((parts[0], parts[1], "127.0.0.1")) return out def get_hostname(ip): key = f"reverse_dns:{ip}" cached = redis_client.get(key) if cached: return cached.decode() try: hostname = socket.gethostbyaddr(ip)[0] except Exception: hostname = ip redis_client.setex(key, 3600, hostname) return hostname def add_recent_convert(): ip = get_client_ip() hostname = get_hostname(ip) ua = request.headers.get("User-Agent", "Unknown") time_str = datetime.now().astimezone().isoformat() url = request.full_path data = { "url": url, "ip": ip, "hostname": hostname, "time": time_str, "user_agent": ua, } redis_client.lpush("recent_converts", json.dumps(data)) redis_client.ltrim("recent_converts", 0, 99) def validate_ip(value: str) -> str: v = (value or "").strip() try: return str(ipaddress.ip_address(v)) except Exception: raise ValueError("Invalid IP address") @app.route("/favicon.ico", methods=["GET"]) def favicon(): return Response(status=204) @app.route("/", methods=["GET"]) def index(): generated_link = None recent_links = get_recent_links() url_param = request.args.get("url", config.DEFAULT_SOURCE_URL) raw_ip = request.args.get("ip", "127.0.0.1") try: target_ip = validate_ip(raw_ip) except ValueError: target_ip = "127.0.0.1" if url_param: try: normalized = validate_and_normalize_url(unquote(url_param)) encoded = quote(normalized, safe="") generated_link = urljoin( request.host_url, f"convert?url={encoded}&ip={target_ip}" ) add_recent_link(normalized, target_ip) recent_links = get_recent_links() except Exception as e: app.logger.error(f"Error processing URL: {str(e)}") try: return render_template( "form.html", generated_link=generated_link, recent_links=recent_links, client_ip=get_client_ip(), user_agent=request.headers.get("User-Agent", "Unknown"), ) except Exception: return jsonify( { "generated_link": generated_link, "recent_links": recent_links, "client_ip": get_client_ip(), "user_agent": request.headers.get("User-Agent", "Unknown"), } ) @app.route("/convert") @limiter.limit(config.RATE_LIMIT_CONVERT) def convert(): import hmac, ipaddress def is_private_client_ip() -> bool: ip = get_client_ip() try: return ipaddress.ip_address(ip).is_private except Exception: return False requested_debug = request.args.get("debug", "").lower() in ( "1", "true", "t", "yes", "y", "on", ) debug_allowed = False if config.DEBUG_ENABLE: header_key = request.headers.get("X-Debug-Key", "") if ( config.DEBUG_KEY and header_key and hmac.compare_digest(header_key, config.DEBUG_KEY) ): debug_allowed = True elif is_private_client_ip(): debug_allowed = True if requested_debug and not debug_allowed: abort(403) debug_mode = requested_debug and debug_allowed debug_lines = [] def d(msg): ts = datetime.now().isoformat() line = f"# [DEBUG {ts}] {msg}" debug_lines.append(line) app.logger.debug(line) def debug_response(status=200): body = "\n".join(debug_lines) + ("\n" if debug_lines else "") resp = Response(body, mimetype="text/plain; charset=utf-8", status=status) resp.headers["X-Debug-Mode"] = "1" resp.headers["Cache-Control"] = "no-store" return resp try: redis_client.incr("stats:convert_requests") add_recent_convert() if debug_mode: d("Start /convert w trybie debug") encoded_url = request.args.get("url") if not encoded_url: if debug_mode: d("Brak parametru ?url") return debug_response(status=400) redis_client.incr("stats:errors_400") abort(400, description="Missing URL parameter") decoded_url = unquote(encoded_url) try: normalized_url = validate_and_normalize_url(decoded_url) except ValueError as e: if debug_mode: d(f"Błąd walidacji URL: {e}") return debug_response(status=400) redis_client.incr("stats:errors_400") abort(400) try: target_ip = validate_ip(request.args.get("ip", "127.0.0.1")) except ValueError: if debug_mode: d("Bad parametr ?ip") return debug_response(status=400) redis_client.incr("stats:errors_400") abort(400, description="Invalid IP") if debug_mode: d(f"URL (encoded): {encoded_url}") d(f"URL (decoded): {decoded_url}") d(f"URL (norm): {normalized_url}") d(f"target_ip: {target_ip}") track_url_request(normalized_url) redis_client.incr(f"stats:target_ips:{target_ip}") # nagłówki If-* req_headers = {} inm = request.headers.get("If-None-Match") ims = request.headers.get("If-Modified-Since") if inm: req_headers["If-None-Match"] = inm if ims: req_headers["If-Modified-Since"] = ims if debug_mode: d("Wysyłam GET do upstreamu") d(f"Nagłówki: {req_headers or '{}'}") r = requests.get( normalized_url, headers=req_headers, stream=True, timeout=(10, 60) ) ct = r.headers.get("Content-Type", "") if debug_mode: d(f"Upstream status: {r.status_code}") d(f"Content-Type: {ct or '(brak)'}") d(f"ETag: {r.headers.get('ETag')}") d(f"Last-Modified: {r.headers.get('Last-Modified')}") if "text" not in ct and "octet-stream" not in ct and ct != "": if debug_mode: d("Unsupported Media Type -> 415") r.close() return debug_response(status=415) r.close() abort(415, description="Unsupported Media Type") if r.status_code == 304: etag = build_etag( r.headers.get("ETag"), r.headers.get("Last-Modified"), target_ip ) if debug_mode: d("Upstream 304 – zwracam 304") r.close() return debug_response(status=304) resp = Response(status=304) resp.headers.update(cache_headers(etag, r.headers.get("Last-Modified"))) resp.direct_passthrough = True r.close() return resp up_etag = r.headers.get("ETag") up_lm = r.headers.get("Last-Modified") etag = build_etag(up_etag, up_lm, target_ip) if debug_mode: d(f"Etag dla klienta: {etag}") @stream_with_context def body_gen(): lines_read = 0 lines_emitted = 0 try: if debug_mode: yield "\n".join(debug_lines) + "\n" debug_lines.clear() for line in r.iter_lines( decode_unicode=True, chunk_size=config.READ_CHUNK ): if line is None: continue lines_read += 1 if len(line) > config.STREAM_LINE_LIMIT: if debug_mode and lines_read <= 5: yield f"# [DEBUG] pominięto długi wiersz ({len(line)} bajtów)\n" continue out = convert_host_line(line, target_ip) if out: lines_emitted += 1 yield out + "\n" if debug_mode and lines_read <= 5: preview = line[:200].replace("\r", "\\r").replace("\n", "\\n") yield f"# [DEBUG] podgląd linii {lines_read}: {preview}\n" if debug_mode: yield f"# [DEBUG] podsumowanie: przeczytano={lines_read}, wyemitowano={lines_emitted}\n" if lines_emitted == 0: yield "# [DEBUG] Uwaga: 0 linii wynikowych – czy format listy pasuje?\n" redis_client.incrby("stats:content_size_total", 0) redis_client.incr("stats:content_size_count") finally: r.close() resp = Response(body_gen(), mimetype="text/plain; charset=utf-8") resp.headers.update(cache_headers(etag, up_lm)) resp.direct_passthrough = True redis_client.incr("stats:conversions_success") return resp except requests.exceptions.RequestException as e: app.logger.error(f"Request error: {str(e)}") redis_client.incr("stats:errors_500") if debug_mode: d(f"Wyjątek requests: {e}") return debug_response(status=502) abort(500) except ValueError as e: app.logger.error(f"URL validation error: {str(e)}") redis_client.incr("stats:errors_400") if debug_mode: d(f"Wyjątek ValueError: {e}") return debug_response(status=400) abort(400) @app.route("/convert", methods=["HEAD"]) def convert_head(): encoded_url = request.args.get("url", config.DEFAULT_SOURCE_URL) if not encoded_url: abort(400) decoded_url = unquote(encoded_url) validate_and_normalize_url(decoded_url) target_ip = validate_ip(request.args.get("ip", "127.0.0.1")) etag = build_etag(None, None, target_ip) resp = Response(status=200) resp.headers.update(cache_headers(etag, None)) resp.direct_passthrough = True return resp @app.route("/stats") @basic_auth_required( realm=config.STATS_BASIC_AUTH_REALM, user=config.STATS_BASIC_AUTH_USER, password=config.STATS_BASIC_AUTH_PASS, ) def stats(): stats_data, target_ips, url_requests, user_agents, client_ips = {}, {}, {}, {}, {} # Zbierz klucze stats:* for key in redis_client.scan_iter("stats:*"): key_str = key.decode() value = (redis_client.get(key) or b"0").decode() if key_str.startswith("stats:target_ips:"): ip = key_str.split(":", 2)[2] target_ips[ip] = value elif key_str.startswith("stats:url_requests:"): url = unquote(key_str.split(":", 2)[2]) url_requests[url] = value elif key_str.startswith("stats:user_agents:"): ua = unquote(key_str.split(":", 2)[2]) user_agents[ua] = value elif key_str.startswith("stats:client_ips:"): ip = key_str.split(":", 2)[2] client_ips[ip] = value else: stats_data[key_str] = value recent_converts = [] for entry in redis_client.lrange("recent_converts", 0, 99): try: recent_converts.append(json.loads(entry.decode())) except Exception: pass # Agregaty szczegółowe processing_time_total = float(redis_client.get("stats:processing_time_total") or 0) processing_time_count = int(redis_client.get("stats:processing_time_count") or 0) avg_processing_time = ( processing_time_total / processing_time_count if processing_time_count > 0 else 0 ) content_size_total = int(redis_client.get("stats:content_size_total") or 0) content_size_count = int(redis_client.get("stats:content_size_count") or 0) avg_content_size = ( content_size_total / content_size_count if content_size_count > 0 else 0 ) detailed_stats = { "processing_time_total_sec": processing_time_total, "processing_time_count": processing_time_count, "processing_time_avg_sec": avg_processing_time, "processing_time_min_sec": float( redis_client.get("stats:processing_time_min") or 0 ), "processing_time_max_sec": float( redis_client.get("stats:processing_time_max") or 0 ), "content_size_total_bytes": content_size_total, "content_size_count": content_size_count, "content_size_avg_bytes": avg_content_size, } # Surowe JSON do sekcji "Raw JSON" na stronie raw_json = _json.dumps( { **stats_data, "target_ips": target_ips, "url_requests": url_requests, "user_agents": user_agents, "client_ips": client_ips, "recent_converts": recent_converts, "detailed_stats": detailed_stats, }, indent=2, ) return render_template( "stats.html", stats=stats_data, target_ips=target_ips, url_requests=url_requests, user_agents=user_agents, client_ips=client_ips, recent=recent_converts, detailed=detailed_stats, raw_json=raw_json, ) @app.route("/stats.json") @basic_auth_required( realm=config.STATS_BASIC_AUTH_REALM, user=config.STATS_BASIC_AUTH_USER, password=config.STATS_BASIC_AUTH_PASS, ) def stats_json(): stats_data, target_ips, url_requests, user_agents, client_ips = {}, {}, {}, {}, {} for key in redis_client.scan_iter("stats:*"): key_str = key.decode() value = (redis_client.get(key) or b"0").decode() if key_str.startswith("stats:target_ips:"): ip = key_str.split(":", 2)[2] target_ips[ip] = value elif key_str.startswith("stats:url_requests:"): url = unquote(key_str.split(":", 2)[2]) url_requests[url] = value elif key_str.startswith("stats:user_agents:"): ua = unquote(key_str.split(":", 2)[2]) user_agents[ua] = value elif key_str.startswith("stats:client_ips:"): ip = key_str.split(":", 2)[2] client_ips[ip] = value else: stats_data[key_str] = value recent_converts = [] for entry in redis_client.lrange("recent_converts", 0, 99): try: recent_converts.append(json.loads(entry.decode())) except Exception: pass processing_time_total = float(redis_client.get("stats:processing_time_total") or 0) processing_time_count = int(redis_client.get("stats:processing_time_count") or 0) avg_processing_time = ( processing_time_total / processing_time_count if processing_time_count > 0 else 0 ) content_size_total = int(redis_client.get("stats:content_size_total") or 0) content_size_count = int(redis_client.get("stats:content_size_count") or 0) avg_content_size = ( content_size_total / content_size_count if content_size_count > 0 else 0 ) detailed_stats = { "processing_time_total_sec": processing_time_total, "processing_time_count": processing_time_count, "processing_time_avg_sec": avg_processing_time, "processing_time_min_sec": float( redis_client.get("stats:processing_time_min") or 0 ), "processing_time_max_sec": float( redis_client.get("stats:processing_time_max") or 0 ), "content_size_total_bytes": content_size_total, "content_size_count": content_size_count, "content_size_avg_bytes": avg_content_size, } return jsonify( { **stats_data, "target_ips": target_ips, "url_requests": url_requests, "user_agents": user_agents, "client_ips": client_ips, "recent_converts": recent_converts, "detailed_stats": detailed_stats, } ) @app.errorhandler(400) @app.errorhandler(403) @app.errorhandler(404) @app.errorhandler(413) @app.errorhandler(415) @app.errorhandler(500) def handle_errors(e): try: now_iso = datetime.now().astimezone().isoformat() return render_template( "error.html", error=e, code=getattr(e, "code", 500), now_iso=now_iso ), getattr(e, "code", 500) except Exception: return jsonify( { "error": getattr(e, "description", str(e)), "code": getattr(e, "code", 500), } ), getattr(e, "code", 500) if __name__ == "__main__": app.run(host=config.BIND_HOST, port=config.BIND_PORT) else: from asgiref.wsgi import WsgiToAsgi asgi_app = WsgiToAsgi(app)