Mateusz Gruszczyński 47d6edafe7 comment croniter
2025-03-08 20:06:41 +01:00

258 lines
8.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import time
import logging
import difflib
import platform
import psutil
from flask import Flask, request, jsonify, abort
from flask_sslify import SSLify
from datetime import datetime, timezone, timedelta
#from croniter import croniter
app = Flask(__name__)
sslify = SSLify(app)
#
# DYNAMICZNE ŚCIEŻKI USTAWIANE PRZEZ ZMIENNE ŚRODOWISKOWE
# ------------------------------------------------------
# - Jeśli zmienna nie jest ustawiona, używamy wartości domyślnej.
#
LOG_DIR = os.environ.get("HOSTS_DAEMON_LOG_DIR", "logs")
TOKEN_FILE_PATH = os.environ.get("HOSTS_DAEMON_TOKEN_FILE", "daemon_token.txt")
def read_token_from_file(path):
"""Odczytuje token z pliku i zwraca jego zawartość (strip),
albo None, jeśli plik nie istnieje lub jest pusty."""
if os.path.isfile(path):
try:
with open(path, 'r') as f:
content = f.read().strip()
if content:
return content
except Exception as e:
logger.error(f"Nie udało się odczytać pliku tokenu: {str(e)}")
return None
# Na tym etapie nie mamy jeszcze loggera, więc jego konfiguracja będzie poniżej
#
# KONFIGURACJA LOGOWANIA
# ------------------------------------------------------
# Upewniamy się, że katalog logów istnieje
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "daemon.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("hosts_daemon")
#
# WYCZYTUJEMY TOKEN
# ------------------------------------------------------
file_token = read_token_from_file(TOKEN_FILE_PATH)
if file_token:
API_TOKEN = file_token
logger.info(f"API_TOKEN wczytany z pliku: {TOKEN_FILE_PATH}")
else:
env_token = os.environ.get("HOSTS_DAEMON_API_TOKEN")
if env_token:
API_TOKEN = env_token
logger.info("API_TOKEN wczytany ze zmiennej środowiskowej HOSTS_DAEMON_API_TOKEN.")
else:
API_TOKEN = "superSecretTokenABC123"
logger.info("API_TOKEN ustawiony na wartość domyślną: superSecretTokenABC123")
# Globalne metryki
metrics = {
"total_requests": 0,
"total_time": 0.0,
"endpoints": {},
"hosts_get": 0,
"hosts_post": 0,
}
# ------------------
# FUNKCJE POMOCNICZE
# ------------------
def require_auth():
"""Wymusza autoryzację przy pomocy nagłówka Authorization,
który powinien zawierać API_TOKEN."""
token = request.headers.get("Authorization")
logger.info(f"require_auth() -> Nagłówek Authorization: {token}")
if token != API_TOKEN:
logger.warning("Nieprawidłowy token w nagłówku Authorization. Oczekiwano innego ciągu znaków.")
abort(401, description="Unauthorized")
def validate_hosts_syntax(hosts_content):
import ipaddress
seen = {}
lines = hosts_content.splitlines()
for i, line in enumerate(lines, start=1):
line_strip = line.strip()
# Pomijamy puste i komentarze
if not line_strip or line_strip.startswith('#'):
continue
parts = line_strip.split()
if len(parts) < 2:
return f"Linia {i}: Za mało elementów, wymagane IP oraz co najmniej jeden hostname."
ip_addr = parts[0]
hostnames = parts[1:]
# Prosta weryfikacja IP
try:
_ = ipaddress.ip_address(ip_addr)
except ValueError:
return f"Linia {i}: '{ip_addr}' nie jest poprawnym adresem IP"
for hn in hostnames:
key = (ip_addr, hn)
if key in seen:
return f"Linia {i}: duplikat wpisu {ip_addr} -> {hn}"
seen[key] = True
return None
# ------------------
# HOOKS LOGOWANIA / METRYK
# ------------------
@app.before_request
def before_request_logging():
request.start_time = time.time()
client_ip = request.remote_addr
endpoint = request.path
logger.info(f"Request from {client_ip} to {endpoint} [{request.method}], Auth: {request.headers.get('Authorization')}")
metrics["total_requests"] += 1
if endpoint not in metrics["endpoints"]:
metrics["endpoints"][endpoint] = {"count": 0, "total_time": 0.0}
metrics["endpoints"][endpoint]["count"] += 1
@app.after_request
def after_request_logging(response):
elapsed = time.time() - request.start_time
metrics["total_time"] += elapsed
endpoint = request.path
if endpoint in metrics["endpoints"]:
metrics["endpoints"][endpoint]["total_time"] += elapsed
logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}")
return response
# ------------------
# ENDPOINTY
# ------------------
@app.route('/', methods=['GET'])
def root_index():
return jsonify({"info": "hosts_daemon is running. Try /health or /hosts"}), 200
@app.route('/hosts', methods=['GET'])
def get_hosts():
require_auth()
metrics["hosts_get"] += 1
try:
with open('/etc/hosts', 'r') as f:
content = f.read()
logger.info(f"/hosts GET successful from {request.remote_addr}")
return jsonify({"hosts": content})
except Exception as e:
logger.error(f"/hosts GET error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/hosts', methods=['POST'])
def update_hosts():
require_auth()
metrics["hosts_post"] += 1
data = request.get_json()
if not data or "hosts" not in data:
logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}")
return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400
new_content = data["hosts"]
error_msg = validate_hosts_syntax(new_content)
if error_msg:
logger.error(f"/hosts POST validation error: {error_msg}")
return jsonify({"error": error_msg}), 400
try:
with open('/etc/hosts', 'w') as f:
f.write(new_content)
logger.info(f"/hosts POST updated by {request.remote_addr}")
return jsonify({"message": "File updated successfully"})
except Exception as e:
logger.error(f"/hosts POST error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/health', methods=['GET'])
def health():
# Endpoint nie wymaga tokenu
uptime = time.time() - psutil.boot_time()
now = datetime.now(timezone.utc).isoformat()
logger.info(f"/health check from {request.remote_addr}")
return jsonify({
"status": "ok",
"time": now,
"uptime": f"{uptime:.1f} seconds"
}), 200
@app.route('/metrics', methods=['GET'])
def metrics_endpoint():
# Endpoint nie wymaga tokenu
avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0
ep_data = {}
for ep, data in metrics["endpoints"].items():
ep_avg = data["total_time"] / data["count"] if data["count"] > 0 else 0.0
ep_data[ep] = {"count": data["count"], "avg_time": ep_avg}
response_data = {
"total_requests": metrics["total_requests"],
"avg_response_time": avg_time,
"endpoints": ep_data,
"hosts_get": metrics.get("hosts_get", 0),
"hosts_post": metrics.get("hosts_post", 0)
}
logger.info(f"/metrics accessed by {request.remote_addr}")
return jsonify(response_data), 200
@app.route('/system-info', methods=['GET'])
def system_info():
info = {}
info["cpu_percent"] = psutil.cpu_percent(interval=0.1)
mem = psutil.virtual_memory()
info["memory_total"] = mem.total
info["memory_used"] = mem.used
info["memory_percent"] = mem.percent
disk = psutil.disk_usage('/')
info["disk_total"] = disk.total
info["disk_used"] = disk.used
info["disk_percent"] = disk.percent
dist = platform.platform()
info["platform"] = dist
sys_uptime = time.time() - psutil.boot_time()
info["uptime_seconds"] = sys_uptime
logger.info(f"/system-info accessed by {request.remote_addr}")
return jsonify(info), 200
if __name__ == '__main__':
logger.info("Uruchamiam hosts_daemon nasłuch na porcie 8000 (HTTPS).")
logger.info(f"LOG_DIR: {LOG_DIR}")
logger.info(f"TOKEN_FILE_PATH: {TOKEN_FILE_PATH}")
app.run(
host='0.0.0.0',
port=8000,
ssl_context=('ssl/hosts_daemon.crt', 'ssl/hosts_daemon.key')
)