hosts_daemon/app.py
Mateusz Gruszczyński fa73c9ea6d first commit
2025-03-06 15:07:51 +01:00

136 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import time
import logging
import difflib
from flask import Flask, request, jsonify, abort
from flask_sslify import SSLify
from datetime import datetime, timezone, timedelta
from croniter import croniter
import psutil
app = Flask(__name__)
sslify = SSLify(app)
# Konfiguracja tokenu autoryzacyjnego (przyjmujemy pre-shared secret)
API_TOKEN = os.environ.get("HOSTS_DAEMON_API_TOKEN", "changeme")
# Konfiguracja logowania logi zapisywane do pliku /opt/hosts_daemon/logs/daemon.log
LOG_DIR = "/opt/hosts_daemon/logs"
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "daemon.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger("hosts_daemon")
# Globalne metryki
metrics = {
"total_requests": 0,
"total_time": 0.0,
"endpoints": {},
"hosts_get": 0,
"hosts_post": 0,
}
# Funkcja sprawdzająca autoryzację
def require_auth():
token = request.headers.get("Authorization")
if token != API_TOKEN:
abort(401, description="Unauthorized")
# Logowanie i zbieranie metryk logujemy czas rozpoczęcia, adres IP, endpoint i metodę
@app.before_request
def before_request_logging():
request.start_time = time.time()
client_ip = request.remote_addr
endpoint = request.path
logger.info(f"Request from {client_ip} to {endpoint} [{request.method}], Auth: {request.headers.get('Authorization')}")
metrics["total_requests"] += 1
if endpoint not in metrics["endpoints"]:
metrics["endpoints"][endpoint] = {"count": 0, "total_time": 0.0}
metrics["endpoints"][endpoint]["count"] += 1
@app.after_request
def after_request_logging(response):
elapsed = time.time() - request.start_time
metrics["total_time"] += elapsed
endpoint = request.path
if endpoint in metrics["endpoints"]:
metrics["endpoints"][endpoint]["total_time"] += elapsed
logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}")
return response
# Endpoint GET /hosts pobiera zawartość /etc/hosts
@app.route('/hosts', methods=['GET'])
def get_hosts():
require_auth()
metrics["hosts_get"] += 1
try:
with open('/etc/hosts', 'r') as f:
content = f.read()
logger.info(f"/hosts GET successful from {request.remote_addr}")
return jsonify({"hosts": content})
except Exception as e:
logger.error(f"/hosts GET error: {str(e)}")
return jsonify({"error": str(e)}), 500
# Endpoint POST /hosts aktualizuje plik /etc/hosts
@app.route('/hosts', methods=['POST'])
def update_hosts():
require_auth()
metrics["hosts_post"] += 1
data = request.get_json()
if not data or "hosts" not in data:
logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}")
return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400
new_content = data["hosts"]
try:
with open('/etc/hosts', 'w') as f:
f.write(new_content)
logger.info(f"/hosts POST updated by {request.remote_addr}")
return jsonify({"message": "File updated successfully"})
except Exception as e:
logger.error(f"/hosts POST error: {str(e)}")
return jsonify({"error": str(e)}), 500
# Endpoint /health zwraca status usługi
@app.route('/health', methods=['GET'])
def health():
require_auth()
uptime = time.time() - psutil.boot_time()
now = datetime.now(timezone.utc).isoformat()
logger.info(f"/health check from {request.remote_addr}")
return jsonify({"status": "ok", "time": now, "uptime": uptime})
# Endpoint /metrics zwraca metryki
@app.route('/metrics', methods=['GET'])
def metrics_endpoint():
require_auth()
avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0
ep_data = {}
for ep, data in metrics["endpoints"].items():
ep_avg = data["total_time"] / data["count"] if data["count"] > 0 else 0.0
ep_data[ep] = {"count": data["count"], "avg_time": ep_avg}
response_data = {
"total_requests": metrics["total_requests"],
"avg_response_time": avg_time,
"endpoints": ep_data,
"hosts_get": metrics.get("hosts_get", 0),
"hosts_post": metrics.get("hosts_post", 0)
}
logger.info(f"/metrics accessed by {request.remote_addr}")
return jsonify(response_data)
if __name__ == '__main__':
app.run(
host='0.0.0.0',
port=8000,
ssl_context=('/opt/hosts_daemon/ssl/hosts_daemon.crt', '/opt/hosts_daemon/ssl/hosts_daemon.key')
)