Files
adlist_mikrotik/app_timeout.py
2025-08-28 21:45:02 +02:00

384 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import redis
import requests
import aiohttp
import asyncio
import socket
import time
import json
from datetime import datetime
from flask import Flask, request, render_template, abort, jsonify, g
from urllib.parse import urlparse, quote, unquote, urljoin
from functools import wraps
from flask_compress import Compress
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 * 1024 # limit
redis_client = redis.Redis(host='localhost', port=6379, db=7)
# Ustawienia do rate limiting 100 żądań na minutę
def get_client_ip():
"""Pobranie prawdziwego adresu IP klienta (uwzględniając proxy)"""
x_forwarded_for = request.headers.get('X-Forwarded-For', '').split(',')
if x_forwarded_for and x_forwarded_for[0].strip():
return x_forwarded_for[0].strip()
return request.remote_addr
limiter = Limiter(key_func=get_client_ip, default_limits=["100 per minute"], app=app)
Compress(app)
ALLOWED_IPS = {'127.0.0.1', '109.173.163.86'}
ALLOWED_DOMAIN = ''
@app.before_request
def track_request_data():
"""Rejestracja IP klienta, User-Agent, metody HTTP oraz rozpoczęcie pomiaru czasu requestu"""
g.start_time = time.perf_counter() # rozpoczęcie pomiaru czasu
client_ip = get_client_ip()
user_agent = request.headers.get('User-Agent', 'Unknown')
method = request.method
# Rejestracja User-Agent
redis_client.incr(f'stats:user_agents:{quote(user_agent, safe="")}')
# Rejestracja adresu IP klienta
redis_client.incr(f'stats:client_ips:{client_ip}')
# Rejestracja metody HTTP
redis_client.incr(f'stats:methods:{method}')
@app.after_request
def after_request(response):
"""Pomiar i rejestracja czasu przetwarzania żądania"""
elapsed = time.perf_counter() - g.start_time
# Aktualizacja statystyk czasu przetwarzania (w sekundach)
redis_client.incrbyfloat('stats:processing_time_total', elapsed)
redis_client.incr('stats:processing_time_count')
# Aktualizacja minimalnego czasu przetwarzania
try:
current_min = float(redis_client.get('stats:processing_time_min') or elapsed)
if elapsed < current_min:
redis_client.set('stats:processing_time_min', elapsed)
except Exception:
redis_client.set('stats:processing_time_min', elapsed)
# Aktualizacja maksymalnego czasu przetwarzania
try:
current_max = float(redis_client.get('stats:processing_time_max') or elapsed)
if elapsed > current_max:
redis_client.set('stats:processing_time_max', elapsed)
except Exception:
redis_client.set('stats:processing_time_max', elapsed)
return response
@app.template_filter('datetimeformat')
def datetimeformat_filter(value, format='%Y-%m-%d %H:%M'):
try:
dt = datetime.fromisoformat(value)
return dt.strftime(format)
except (ValueError, AttributeError):
return value
def ip_restriction(f):
@wraps(f)
def decorated(*args, **kwargs):
client_ip = get_client_ip()
host = request.host.split(':')[0]
allowed_conditions = [
client_ip in ALLOWED_IPS,
host == ALLOWED_DOMAIN,
request.headers.get('X-Forwarded-For', '').split(',')[0].strip() in ALLOWED_IPS
]
if any(allowed_conditions):
return f(*args, **kwargs)
redis_client.incr('stats:errors_403')
abort(403)
return decorated
def cache_key(source_url, ip):
return f"cache:{source_url}:{ip}"
def convert_hosts(content, target_ip):
"""Konwersja treści pliku hosts z uwzględnieniem walidacji"""
converted = []
for line in content.splitlines():
line = line.strip()
# Pomijanie pustych linii i komentarzy
if not line or line[0] in ('!', '#', '/') or '$' in line:
continue
# Reguły AdGuard
if line.startswith(('||', '|')):
domain = line.split('^')[0].lstrip('|')
if 1 < len(domain) <= 253 and '.' in domain[1:-1]:
converted.append(f"{target_ip} {domain}")
continue
# Klasyczny format hosts
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s+', line):
converted.append(re.sub(r'^\S+', target_ip, line, count=1))
return '\n'.join(converted)
def validate_and_normalize_url(url):
"""Walidacja i normalizacja adresu URL"""
parsed = urlparse(url)
if not parsed.scheme:
url = f'https://{url}'
parsed = urlparse(url)
if not parsed.netloc:
raise ValueError("Missing host in URL")
return parsed.geturl()
def track_url_request(url):
"""Rejestracja żądania dla określonego URL"""
redis_key = f"stats:url_requests:{quote(url, safe='')}"
redis_client.incr(redis_key)
def add_recent_link(url, target_ip):
"""Dodanie ostatniego linku do historii (ostatnie 10)"""
timestamp = datetime.now().isoformat()
link_data = f"{timestamp}|{url}|{target_ip}"
with redis_client.pipeline() as pipe:
pipe.lpush("recent_links", link_data)
pipe.ltrim("recent_links", 0, 9)
pipe.execute()
redis_client.incr('stats:recent_links_added')
def get_recent_links():
"""Pobranie ostatnich 10 linków"""
links = redis_client.lrange("recent_links", 0, 9)
parsed_links = []
for link in links:
parts = link.decode().split("|")
if len(parts) >= 3:
parsed_links.append((parts[0], parts[1], parts[2]))
elif len(parts) == 2:
parsed_links.append((parts[0], parts[1], "127.0.0.1"))
return parsed_links
def get_hostname(ip):
"""Cacheowanie wyników reverse DNS dla danego IP"""
key = f"reverse_dns:{ip}"
cached = redis_client.get(key)
if cached:
return cached.decode()
try:
hostname = socket.gethostbyaddr(ip)[0]
except Exception:
hostname = ip
# Cache na 1 godzinę
redis_client.setex(key, 3600, hostname)
return hostname
# Nowa funkcja do logowania requestów dla endpointu /convert
def add_recent_convert():
"""Dodaje dane żądania do listy ostatnich konwersji (/convert)"""
ip = get_client_ip()
hostname = get_hostname(ip)
user_agent = request.headers.get('User-Agent', 'Unknown')
time_str = datetime.now().astimezone().isoformat()
url = request.full_path # pełna ścieżka wraz z query string
data = {
"url": url,
"ip": ip,
"hostname": hostname,
"time": time_str,
"user_agent": user_agent
}
json_data = json.dumps(data)
redis_client.lpush("recent_converts", json_data)
redis_client.ltrim("recent_converts", 0, 49)
@app.route('/', methods=['GET'])
def index():
"""Strona główna z formularzem"""
generated_link = None
recent_links = get_recent_links()
url_param = request.args.get('url')
target_ip = request.args.get('ip', '127.0.0.1')
if url_param:
try:
normalized_url = validate_and_normalize_url(unquote(url_param))
encoded_url = quote(normalized_url, safe='')
generated_link = urljoin(
request.host_url,
f"convert?url={encoded_url}&ip={target_ip}"
)
add_recent_link(normalized_url, target_ip)
recent_links = get_recent_links()
except Exception as e:
app.logger.error(f"Error processing URL: {str(e)}")
return render_template('form.html',
generated_link=generated_link,
recent_links=recent_links)
@app.route('/convert')
@limiter.limit("100 per minute")
async def convert():
"""Asynchroniczny endpoint do konwersji z weryfikacją typu zawartości"""
try:
redis_client.incr('stats:convert_requests')
# Logowanie danych dla requestu do /convert
add_recent_convert()
encoded_url = request.args.get('url')
if not encoded_url:
redis_client.incr('stats:errors_400')
abort(400, description="Missing URL parameter")
decoded_url = unquote(encoded_url)
normalized_url = validate_and_normalize_url(decoded_url)
target_ip = request.args.get('ip', '127.0.0.1')
# Rejestracja statystyk dotyczących URL
track_url_request(normalized_url)
redis_client.incr(f'stats:target_ips:{target_ip}')
# Sprawdzenie pamięci podręcznej
cached = redis_client.get(cache_key(normalized_url, target_ip))
if cached:
redis_client.incr('stats:cache_hits')
return cached.decode('utf-8'), 200, {'Content-Type': 'text/plain'}
redis_client.incr('stats:cache_misses')
# Asynchroniczne pobranie zasobu za pomocą aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(normalized_url, timeout=15) as response:
# Sprawdzanie typu zawartości musi zawierać "text"
content_type = response.headers.get("Content-Type", "")
if "text" not in content_type:
abort(415, description="Unsupported Media Type")
content = b""
while True:
chunk = await response.content.read(2048)
if not chunk:
break
content += chunk
if len(content) > app.config['MAX_CONTENT_LENGTH']:
redis_client.incr('stats:errors_413')
abort(413)
# Rejestracja rozmiaru pobranej treści
content_size = len(content)
redis_client.incrby('stats:content_size_total', content_size)
redis_client.incr('stats:content_size_count')
converted = convert_hosts(content.decode('utf-8'), target_ip)
redis_client.setex(cache_key(normalized_url, target_ip), 43200, converted) # 12h cache
redis_client.incr('stats:conversions_success')
return converted, 200, {'Content-Type': 'text/plain'}
except aiohttp.ClientError as e:
app.logger.error(f"Request error: {str(e)}")
redis_client.incr('stats:errors_500')
abort(500)
except ValueError as e:
app.logger.error(f"URL validation error: {str(e)}")
redis_client.incr('stats:errors_400')
abort(400)
@app.route('/stats')
@ip_restriction
def stats():
"""Endpoint statystyk"""
stats_data = {}
target_ips = {}
url_requests = {}
user_agents = {}
client_ips = {}
# Agregacja statystyk z Redisa
for key in redis_client.scan_iter("stats:*"):
key_str = key.decode()
value = redis_client.get(key).decode()
if key_str.startswith('stats:target_ips:'):
ip = key_str.split(':', 2)[2]
target_ips[ip] = value
elif key_str.startswith('stats:url_requests:'):
url = unquote(key_str.split(':', 2)[2])
url_requests[url] = value
elif key_str.startswith('stats:user_agents:'):
ua = unquote(key_str.split(':', 2)[2])
user_agents[ua] = value
elif key_str.startswith('stats:client_ips:'):
ip = key_str.split(':', 2)[2]
client_ips[ip] = value
else:
stats_data[key_str] = value
# Pobranie ostatnich 50 requestów dla endpointu /convert
recent_converts = []
convert_entries = redis_client.lrange("recent_converts", 0, 49)
for entry in convert_entries:
try:
data = json.loads(entry.decode())
recent_converts.append(data)
except Exception:
pass
# Obliczenie średniego czasu przetwarzania żądań
processing_time_total = float(redis_client.get('stats:processing_time_total') or 0)
processing_time_count = int(redis_client.get('stats:processing_time_count') or 0)
avg_processing_time = processing_time_total / processing_time_count if processing_time_count > 0 else 0
# Obliczenie średniego rozmiaru pobranej treści dla /convert
content_size_total = int(redis_client.get('stats:content_size_total') or 0)
content_size_count = int(redis_client.get('stats:content_size_count') or 0)
avg_content_size = content_size_total / content_size_count if content_size_count > 0 else 0
# Rozszerzone statystyki dotyczące wydajności i rozmiarów danych
detailed_stats = {
"processing_time_total_sec": processing_time_total,
"processing_time_count": processing_time_count,
"processing_time_avg_sec": avg_processing_time,
"processing_time_min_sec": float(redis_client.get('stats:processing_time_min') or 0),
"processing_time_max_sec": float(redis_client.get('stats:processing_time_max') or 0),
"content_size_total_bytes": content_size_total,
"content_size_count": content_size_count,
"content_size_avg_bytes": avg_content_size
}
# Struktura odpowiedzi
response_data = {
**stats_data,
'target_ips': target_ips,
'url_requests': url_requests,
'user_agents': user_agents,
'client_ips': client_ips,
'recent_converts': recent_converts,
'detailed_stats': detailed_stats
}
return jsonify(response_data)
@app.errorhandler(400)
@app.errorhandler(403)
@app.errorhandler(404)
@app.errorhandler(413)
@app.errorhandler(415)
@app.errorhandler(500)
def handle_errors(e):
"""Obsługa błędów"""
return render_template('error.html', error=e), e.code
# Jeśli aplikacja jest uruchamiana bezpośrednio, korzystamy z Flask's run
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8283)
# W przeciwnym razie (np. przy uruchamianiu przez Gunicorn) opakowujemy aplikację w adapter ASGI
else:
from asgiref.wsgi import WsgiToAsgi
asgi_app = WsgiToAsgi(app)