#!/usr/bin/env python3 import os import time import logging import difflib import platform import psutil from flask import Flask, request, jsonify, abort from flask_sslify import SSLify from datetime import datetime, timezone, timedelta #from croniter import croniter app = Flask(__name__) sslify = SSLify(app) # # DYNAMICZNE ŚCIEŻKI USTAWIANE PRZEZ ZMIENNE ŚRODOWISKOWE # ------------------------------------------------------ # - Jeśli zmienna nie jest ustawiona, używamy wartości domyślnej. # LOG_DIR = os.environ.get("HOSTS_DAEMON_LOG_DIR", "logs") TOKEN_FILE_PATH = os.environ.get("HOSTS_DAEMON_TOKEN_FILE", "daemon_token.txt") def read_token_from_file(path): """Odczytuje token z pliku i zwraca jego zawartość (strip), albo None, jeśli plik nie istnieje lub jest pusty.""" if os.path.isfile(path): try: with open(path, 'r') as f: content = f.read().strip() if content: return content except Exception as e: logger.error(f"Nie udało się odczytać pliku tokenu: {str(e)}") return None # Na tym etapie nie mamy jeszcze loggera, więc jego konfiguracja będzie poniżej # # KONFIGURACJA LOGOWANIA # ------------------------------------------------------ # Upewniamy się, że katalog logów istnieje os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "daemon.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger("hosts_daemon") # # WYCZYTUJEMY TOKEN # ------------------------------------------------------ file_token = read_token_from_file(TOKEN_FILE_PATH) if file_token: API_TOKEN = file_token logger.info(f"API_TOKEN wczytany z pliku: {TOKEN_FILE_PATH}") else: env_token = os.environ.get("HOSTS_DAEMON_API_TOKEN") if env_token: API_TOKEN = env_token logger.info("API_TOKEN wczytany ze zmiennej środowiskowej HOSTS_DAEMON_API_TOKEN.") else: API_TOKEN = "superSecretTokenABC123" logger.info("API_TOKEN ustawiony na wartość domyślną: superSecretTokenABC123") # Globalne metryki metrics = { "total_requests": 0, "total_time": 0.0, "endpoints": {}, "hosts_get": 0, "hosts_post": 0, } # ------------------ # FUNKCJE POMOCNICZE # ------------------ def require_auth(): """Wymusza autoryzację przy pomocy nagłówka Authorization, który powinien zawierać API_TOKEN.""" token = request.headers.get("Authorization") logger.info(f"require_auth() -> Nagłówek Authorization: {token}") if token != API_TOKEN: logger.warning("Nieprawidłowy token w nagłówku Authorization. Oczekiwano innego ciągu znaków.") abort(401, description="Unauthorized") def validate_hosts_syntax(hosts_content): import ipaddress seen = {} lines = hosts_content.splitlines() for i, line in enumerate(lines, start=1): line_strip = line.strip() # Pomijamy puste i komentarze if not line_strip or line_strip.startswith('#'): continue parts = line_strip.split() if len(parts) < 2: return f"Linia {i}: Za mało elementów, wymagane IP oraz co najmniej jeden hostname." ip_addr = parts[0] hostnames = parts[1:] # Prosta weryfikacja IP try: _ = ipaddress.ip_address(ip_addr) except ValueError: return f"Linia {i}: '{ip_addr}' nie jest poprawnym adresem IP" for hn in hostnames: key = (ip_addr, hn) if key in seen: return f"Linia {i}: duplikat wpisu {ip_addr} -> {hn}" seen[key] = True return None # ------------------ # HOOKS LOGOWANIA / METRYK # ------------------ @app.before_request def before_request_logging(): request.start_time = time.time() client_ip = request.remote_addr endpoint = request.path logger.info(f"Request from {client_ip} to {endpoint} [{request.method}], Auth: {request.headers.get('Authorization')}") metrics["total_requests"] += 1 if endpoint not in metrics["endpoints"]: metrics["endpoints"][endpoint] = {"count": 0, "total_time": 0.0} metrics["endpoints"][endpoint]["count"] += 1 @app.after_request def after_request_logging(response): elapsed = time.time() - request.start_time metrics["total_time"] += elapsed endpoint = request.path if endpoint in metrics["endpoints"]: metrics["endpoints"][endpoint]["total_time"] += elapsed logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}") return response # ------------------ # ENDPOINTY # ------------------ @app.route('/', methods=['GET']) def root_index(): return jsonify({"info": "hosts_daemon is running. Try /health or /hosts"}), 200 @app.route('/hosts', methods=['GET']) def get_hosts(): require_auth() metrics["hosts_get"] += 1 try: with open('/etc/hosts', 'r') as f: content = f.read() logger.info(f"/hosts GET successful from {request.remote_addr}") return jsonify({"hosts": content}) except Exception as e: logger.error(f"/hosts GET error: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/hosts', methods=['POST']) def update_hosts(): require_auth() metrics["hosts_post"] += 1 data = request.get_json() if not data or "hosts" not in data: logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}") return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400 new_content = data["hosts"] error_msg = validate_hosts_syntax(new_content) if error_msg: logger.error(f"/hosts POST validation error: {error_msg}") return jsonify({"error": error_msg}), 400 try: with open('/etc/hosts', 'w') as f: f.write(new_content) logger.info(f"/hosts POST updated by {request.remote_addr}") return jsonify({"message": "File updated successfully"}) except Exception as e: logger.error(f"/hosts POST error: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/health', methods=['GET']) def health(): # Endpoint nie wymaga tokenu uptime = time.time() - psutil.boot_time() now = datetime.now(timezone.utc).isoformat() logger.info(f"/health check from {request.remote_addr}") return jsonify({ "status": "ok", "time": now, "uptime": f"{uptime:.1f} seconds" }), 200 @app.route('/metrics', methods=['GET']) def metrics_endpoint(): # Endpoint nie wymaga tokenu avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0 ep_data = {} for ep, data in metrics["endpoints"].items(): ep_avg = data["total_time"] / data["count"] if data["count"] > 0 else 0.0 ep_data[ep] = {"count": data["count"], "avg_time": ep_avg} response_data = { "total_requests": metrics["total_requests"], "avg_response_time": avg_time, "endpoints": ep_data, "hosts_get": metrics.get("hosts_get", 0), "hosts_post": metrics.get("hosts_post", 0) } logger.info(f"/metrics accessed by {request.remote_addr}") return jsonify(response_data), 200 @app.route('/system-info', methods=['GET']) def system_info(): info = {} info["cpu_percent"] = psutil.cpu_percent(interval=0.1) mem = psutil.virtual_memory() info["memory_total"] = mem.total info["memory_used"] = mem.used info["memory_percent"] = mem.percent disk = psutil.disk_usage('/') info["disk_total"] = disk.total info["disk_used"] = disk.used info["disk_percent"] = disk.percent dist = platform.platform() info["platform"] = dist sys_uptime = time.time() - psutil.boot_time() info["uptime_seconds"] = sys_uptime logger.info(f"/system-info accessed by {request.remote_addr}") return jsonify(info), 200 if __name__ == '__main__': logger.info("Uruchamiam hosts_daemon – nasłuch na porcie 8000 (HTTPS).") logger.info(f"LOG_DIR: {LOG_DIR}") logger.info(f"TOKEN_FILE_PATH: {TOKEN_FILE_PATH}") app.run( host='0.0.0.0', port=8000, ssl_context=('ssl/hosts_daemon.crt', 'ssl/hosts_daemon.key') )