hosts_daemon/app.py
Mateusz Gruszczyński 86d4bd1904 fix
2025-03-06 23:01:21 +01:00

263 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import time
import logging
import difflib
import platform
import psutil
from flask import Flask, request, jsonify, abort
from flask_sslify import SSLify
from datetime import datetime, timezone, timedelta
from croniter import croniter
app = Flask(__name__)
sslify = SSLify(app)
# Ścieżka do pliku z tokenem:
TOKEN_FILE_PATH = "/opt/hosts_daemon/token"
def read_token_from_file(path):
"""Odczytuje token z pliku i zwraca jego zawartość (strip),
albo None, jeśli plik nie istnieje lub jest pusty."""
if os.path.isfile(path):
try:
with open(path, 'r') as f:
content = f.read().strip()
if content:
return content
except Exception as e:
# Możesz zalogować błąd, jeśli np. plik jest niewidoczny/brak uprawnień
logger.error(f"Nie udało się odczytać pliku tokenu: {str(e)}")
return None
# LOGOWANIE
LOG_DIR = "/opt/hosts_daemon/logs"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "daemon.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("hosts_daemon")
# 1) Najpierw próbujemy odczytać token z pliku
file_token = read_token_from_file(TOKEN_FILE_PATH)
# 2) Jeśli w pliku nie ma tokenu, sprawdzamy zmienną środowiskową
if file_token:
# Użyjemy tokenu z pliku
API_TOKEN = file_token
logger.info("API_TOKEN wczytany z pliku.")
else:
# Fallback: odczytujemy z ENV albo dajemy domyślny
env_token = os.environ.get("HOSTS_DAEMON_API_TOKEN")
if env_token:
API_TOKEN = env_token
logger.info("API_TOKEN wczytany ze zmiennej środowiskowej.")
else:
API_TOKEN = "superSecretTokenABC123"
logger.info("API_TOKEN ustawiony na wartość domyślną: superSecretTokenABC123")
# Globalne metryki
metrics = {
"total_requests": 0,
"total_time": 0.0,
"endpoints": {},
"hosts_get": 0,
"hosts_post": 0,
}
# ------------------
# FUNKCJE POMOCNICZE
# ------------------
def require_auth():
"""Wymusza autoryzację przy pomocy nagłówka Authorization, który powinien zawierać API_TOKEN."""
token = request.headers.get("Authorization")
logger.info(f"require_auth() -> Nagłówek Authorization: {token}")
if token != API_TOKEN:
logger.warning("Nieprawidłowy token w nagłówku Authorization. Oczekiwano innego ciągu znaków.")
abort(401, description="Unauthorized")
def validate_hosts_syntax(hosts_content):
"""
Sprawdza podstawową poprawność składni /etc/hosts:
- Pomija puste linie i komentarze (zaczynające się od '#')
- Wymaga poprawnego IP (IPv4 lub IPv6) na początku
- Wymaga co najmniej jednego hostnname w linii
- Wykrywa powielone wpisy (IP+hostname) w obrębie *tej* zawartości
(ograniczone do nowej treści, nie sprawdza starych plików).
Zwraca None, jeśli OK, w przeciwnym razie string z opisem błędu.
"""
import ipaddress
seen = {}
lines = hosts_content.splitlines()
for i, line in enumerate(lines, start=1):
line_strip = line.strip()
# Pomijamy puste i komentarze
if not line_strip or line_strip.startswith('#'):
continue
parts = line_strip.split()
if len(parts) < 2:
return f"Linia {i}: Za mało elementów, wymagane IP oraz co najmniej jeden hostname."
ip_addr = parts[0]
hostnames = parts[1:]
# Prosta weryfikacja IP
try:
_ = ipaddress.ip_address(ip_addr)
except ValueError:
return f"Linia {i}: '{ip_addr}' nie jest poprawnym adresem IP"
for hn in hostnames:
key = (ip_addr, hn)
if key in seen:
return f"Linia {i}: duplikat wpisu {ip_addr} -> {hn}"
seen[key] = True
return None
# ------------------
# HOOKS LOGOWANIA / METRYK
# ------------------
@app.before_request
def before_request_logging():
request.start_time = time.time()
client_ip = request.remote_addr
endpoint = request.path
logger.info(f"Request from {client_ip} to {endpoint} [{request.method}], Auth: {request.headers.get('Authorization')}")
metrics["total_requests"] += 1
if endpoint not in metrics["endpoints"]:
metrics["endpoints"][endpoint] = {"count": 0, "total_time": 0.0}
metrics["endpoints"][endpoint]["count"] += 1
@app.after_request
def after_request_logging(response):
elapsed = time.time() - request.start_time
metrics["total_time"] += elapsed
endpoint = request.path
if endpoint in metrics["endpoints"]:
metrics["endpoints"][endpoint]["total_time"] += elapsed
logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}")
return response
# ------------------
# ENDPOINTY
# ------------------
@app.route('/', methods=['GET'])
def root_index():
# Możesz zwrócić coś bardziej przyjaznego niż 404
return jsonify({"info": "hosts_daemon is running. Try /health or /hosts"}), 200
@app.route('/hosts', methods=['GET'])
def get_hosts():
require_auth()
metrics["hosts_get"] += 1
try:
with open('/etc/hosts', 'r') as f:
content = f.read()
logger.info(f"/hosts GET successful from {request.remote_addr}")
return jsonify({"hosts": content})
except Exception as e:
logger.error(f"/hosts GET error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/hosts', methods=['POST'])
def update_hosts():
require_auth()
metrics["hosts_post"] += 1
data = request.get_json()
if not data or "hosts" not in data:
logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}")
return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400
new_content = data["hosts"]
# Walidacja nowej zawartości:
error_msg = validate_hosts_syntax(new_content)
if error_msg:
logger.error(f"/hosts POST validation error: {error_msg}")
return jsonify({"error": error_msg}), 400
try:
with open('/etc/hosts', 'w') as f:
f.write(new_content)
logger.info(f"/hosts POST updated by {request.remote_addr}")
return jsonify({"message": "File updated successfully"})
except Exception as e:
logger.error(f"/hosts POST error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
# Endpoint nie wymaga tokenu
uptime = time.time() - psutil.boot_time()
now = datetime.now(timezone.utc).isoformat()
logger.info(f"/health check from {request.remote_addr}")
return jsonify({
"status": "ok",
"time": now,
"uptime": f"{uptime:.1f} seconds"
}), 200
@app.route('/metrics', methods=['GET'])
def metrics_endpoint():
# Endpoint nie wymaga tokenu
avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0
ep_data = {}
for ep, data in metrics["endpoints"].items():
ep_avg = data["total_time"] / data["count"] if data["count"] > 0 else 0.0
ep_data[ep] = {"count": data["count"], "avg_time": ep_avg}
response_data = {
"total_requests": metrics["total_requests"],
"avg_response_time": avg_time,
"endpoints": ep_data,
"hosts_get": metrics.get("hosts_get", 0),
"hosts_post": metrics.get("hosts_post", 0)
}
logger.info(f"/metrics accessed by {request.remote_addr}")
return jsonify(response_data), 200
@app.route('/system-info', methods=['GET'])
def system_info():
# Przykładowe dane systemowe:
info = {}
# CPU
info["cpu_percent"] = psutil.cpu_percent(interval=0.1)
# RAM
mem = psutil.virtual_memory()
info["memory_total"] = mem.total
info["memory_used"] = mem.used
info["memory_percent"] = mem.percent
# Dysk (root '/')
disk = psutil.disk_usage('/')
info["disk_total"] = disk.total
info["disk_used"] = disk.used
info["disk_percent"] = disk.percent
# Dystrybucja, wersja
dist = platform.platform() # np. 'Linux-5.4.0-135-generic-x86_64-with-glibc2.29'
info["platform"] = dist
# Uptime
sys_uptime = time.time() - psutil.boot_time()
info["uptime_seconds"] = sys_uptime
logger.info(f"/system-info accessed by {request.remote_addr}")
return jsonify(info), 200
if __name__ == '__main__':
# Możesz zalogować tylko informację, że startujesz:
logger.info("Uruchamiam hosts_daemon token wczytany, nasłuch na porcie 8000 (HTTPS).")
app.run(
host='0.0.0.0',
port=8000,
ssl_context=('/opt/hosts_daemon/ssl/hosts_daemon.crt', '/opt/hosts_daemon/ssl/hosts_daemon.key')
)