nowe funkcje

This commit is contained in:
Mateusz Gruszczyński 2025-03-06 19:20:06 +01:00
parent e3f26f546e
commit 556d74816e
2 changed files with 109 additions and 13 deletions

120
app.py
View File

@ -3,17 +3,18 @@ import os
import time import time
import logging import logging
import difflib import difflib
import platform
import psutil
from flask import Flask, request, jsonify, abort from flask import Flask, request, jsonify, abort
from flask_sslify import SSLify from flask_sslify import SSLify
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from croniter import croniter from croniter import croniter
import psutil
app = Flask(__name__) app = Flask(__name__)
sslify = SSLify(app) sslify = SSLify(app)
# Konfiguracja tokenu autoryzacyjnego (przyjmujemy pre-shared secret) # Konfiguracja tokenu autoryzacyjnego (przyjmujemy pre-shared secret)
API_TOKEN = os.environ.get("HOSTS_DAEMON_API_TOKEN", "changeme") API_TOKEN = os.environ.get("HOSTS_DAEMON_API_TOKEN", "superSecretTokenABC123")
# Konfiguracja logowania logi zapisywane do pliku /opt/hosts_daemon/logs/daemon.log # Konfiguracja logowania logi zapisywane do pliku /opt/hosts_daemon/logs/daemon.log
LOG_DIR = "/opt/hosts_daemon/logs" LOG_DIR = "/opt/hosts_daemon/logs"
@ -38,13 +39,63 @@ metrics = {
"hosts_post": 0, "hosts_post": 0,
} }
# Funkcja sprawdzająca autoryzację # ------------------
# FUNKCJE POMOCNICZE
# ------------------
def require_auth(): def require_auth():
"""Wymusza autoryzację przy pomocy nagłówka Authorization, który powinien zawierać API_TOKEN."""
token = request.headers.get("Authorization") token = request.headers.get("Authorization")
if token != API_TOKEN: if token != API_TOKEN:
abort(401, description="Unauthorized") abort(401, description="Unauthorized")
# Logowanie i zbieranie metryk logujemy czas rozpoczęcia, adres IP, endpoint i metodę def validate_hosts_syntax(hosts_content):
"""
Sprawdza podstawową poprawność składni /etc/hosts:
- Pomija puste linie i komentarze (zaczynające się od '#')
- Wymaga poprawnego IP (IPv4 lub IPv6) na początku
- Wymaga co najmniej jednego hostnname w linii
- Wykrywa powielone wpisy (IP+hostname) w obrębie *tej* zawartości
(ograniczone do nowej treści, nie sprawdza starych plików).
Zwraca None, jeśli OK, w przeciwnym razie string z opisem błędu.
"""
# Słownik do wykrywania duplikatów: (ip, hostname) -> bool
seen = {}
lines = hosts_content.splitlines()
for i, line in enumerate(lines, start=1):
line_strip = line.strip()
# Pomijamy puste i komentarze
if not line_strip or line_strip.startswith('#'):
continue
parts = line_strip.split()
if len(parts) < 2:
return f"Linia {i}: Za mało elementów, wymagane IP oraz co najmniej jeden hostname."
ip_addr = parts[0]
hostnames = parts[1:]
# Prosta weryfikacja IP - w Pythonie można użyć ipaddress, ale zrobimy skrótowo:
import ipaddress
try:
_ = ipaddress.ip_address(ip_addr)
except ValueError:
return f"Linia {i}: '{ip_addr}' nie jest poprawnym adresem IP"
if not hostnames:
return f"Linia {i}: brak hostnamów."
for hn in hostnames:
key = (ip_addr, hn)
if key in seen:
return f"Linia {i}: duplikat wpisu {ip_addr} -> {hn}"
seen[key] = True
return None
# ------------------
# HOOKS LOGOWANIA / METRYK
# ------------------
@app.before_request @app.before_request
def before_request_logging(): def before_request_logging():
request.start_time = time.time() request.start_time = time.time()
@ -66,7 +117,16 @@ def after_request_logging(response):
logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}") logger.info(f"Completed {endpoint} in {elapsed:.3f} sec with status {response.status_code}")
return response return response
# Endpoint GET /hosts pobiera zawartość /etc/hosts
# ------------------
# ENDPOINTY
# ------------------
@app.route('/', methods=['GET'])
def root_index():
# Możesz zwrócić coś bardziej przyjaznego niż 404
return jsonify({"info": "hosts_daemon is running. Try /health or /hosts"}), 200
@app.route('/hosts', methods=['GET']) @app.route('/hosts', methods=['GET'])
def get_hosts(): def get_hosts():
require_auth() require_auth()
@ -80,7 +140,6 @@ def get_hosts():
logger.error(f"/hosts GET error: {str(e)}") logger.error(f"/hosts GET error: {str(e)}")
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
# Endpoint POST /hosts aktualizuje plik /etc/hosts
@app.route('/hosts', methods=['POST']) @app.route('/hosts', methods=['POST'])
def update_hosts(): def update_hosts():
require_auth() require_auth()
@ -89,7 +148,14 @@ def update_hosts():
if not data or "hosts" not in data: if not data or "hosts" not in data:
logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}") logger.warning(f"/hosts POST: missing 'hosts' key from {request.remote_addr}")
return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400 return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400
new_content = data["hosts"] new_content = data["hosts"]
# Walidacja nowej zawartości:
error_msg = validate_hosts_syntax(new_content)
if error_msg:
logger.error(f"/hosts POST validation error: {error_msg}")
return jsonify({"error": error_msg}), 400
try: try:
with open('/etc/hosts', 'w') as f: with open('/etc/hosts', 'w') as f:
f.write(new_content) f.write(new_content)
@ -99,19 +165,21 @@ def update_hosts():
logger.error(f"/hosts POST error: {str(e)}") logger.error(f"/hosts POST error: {str(e)}")
return jsonify({"error": str(e)}), 500 return jsonify({"error": str(e)}), 500
# Endpoint /health zwraca status usługi
@app.route('/health', methods=['GET']) @app.route('/health', methods=['GET'])
def health(): def health():
#require_auth() # Endpoint nie wymaga tokenu
uptime = time.time() - psutil.boot_time() uptime = time.time() - psutil.boot_time()
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
logger.info(f"/health check from {request.remote_addr}") logger.info(f"/health check from {request.remote_addr}")
return jsonify({"status": "ok", "time": now, "uptime": uptime}) return jsonify({
"status": "ok",
"time": now,
"uptime": f"{uptime:.1f} seconds"
}), 200
# Endpoint /metrics zwraca metryki
@app.route('/metrics', methods=['GET']) @app.route('/metrics', methods=['GET'])
def metrics_endpoint(): def metrics_endpoint():
#require_auth() # Endpoint nie wymaga tokenu
avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0 avg_time = metrics["total_time"] / metrics["total_requests"] if metrics["total_requests"] > 0 else 0.0
ep_data = {} ep_data = {}
for ep, data in metrics["endpoints"].items(): for ep, data in metrics["endpoints"].items():
@ -125,7 +193,33 @@ def metrics_endpoint():
"hosts_post": metrics.get("hosts_post", 0) "hosts_post": metrics.get("hosts_post", 0)
} }
logger.info(f"/metrics accessed by {request.remote_addr}") logger.info(f"/metrics accessed by {request.remote_addr}")
return jsonify(response_data) return jsonify(response_data), 200
@app.route('/system-info', methods=['GET'])
def system_info():
# Przykładowe dane systemowe:
info = {}
# CPU
info["cpu_percent"] = psutil.cpu_percent(interval=0.1)
# RAM
mem = psutil.virtual_memory()
info["memory_total"] = mem.total
info["memory_used"] = mem.used
info["memory_percent"] = mem.percent
# Dysk (root '/')
disk = psutil.disk_usage('/')
info["disk_total"] = disk.total
info["disk_used"] = disk.used
info["disk_percent"] = disk.percent
# Dystrybucja, wersja
dist = platform.platform() # np. 'Linux-5.4.0-135-generic-x86_64-with-glibc2.29'
info["platform"] = dist
# Uptime
sys_uptime = time.time() - psutil.boot_time()
info["uptime_seconds"] = sys_uptime
logger.info(f"/system-info accessed by {request.remote_addr}")
return jsonify(info), 200
if __name__ == '__main__': if __name__ == '__main__':
app.run( app.run(

2
config.ini Normal file
View File

@ -0,0 +1,2 @@
[daemon]
API_TOKEN = superSecretTokenABC123