fix with token

This commit is contained in:
Mateusz Gruszczyński 2025-03-07 08:24:35 +01:00
parent 86d4bd1904
commit 26643e1aa3

67
app.py
View File

@ -13,11 +13,16 @@ from croniter import croniter
app = Flask(__name__)
sslify = SSLify(app)
# Ścieżka do pliku z tokenem:
TOKEN_FILE_PATH = "/opt/hosts_daemon/token"
#
# DYNAMICZNE ŚCIEŻKI USTAWIANE PRZEZ ZMIENNE ŚRODOWISKOWE
# ------------------------------------------------------
# - Jeśli zmienna nie jest ustawiona, używamy wartości domyślnej.
#
LOG_DIR = os.environ.get("HOSTS_DAEMON_LOG_DIR", "logs")
TOKEN_FILE_PATH = os.environ.get("HOSTS_DAEMON_TOKEN_FILE", "daemon_token.txt")
def read_token_from_file(path):
"""Odczytuje token z pliku i zwraca jego zawartość (strip),
"""Odczytuje token z pliku i zwraca jego zawartość (strip),
albo None, jeśli plik nie istnieje lub jest pusty."""
if os.path.isfile(path):
try:
@ -26,14 +31,18 @@ def read_token_from_file(path):
if content:
return content
except Exception as e:
# Możesz zalogować błąd, jeśli np. plik jest niewidoczny/brak uprawnień
logger.error(f"Nie udało się odczytać pliku tokenu: {str(e)}")
return None
# LOGOWANIE
LOG_DIR = "/opt/hosts_daemon/logs"
# Na tym etapie nie mamy jeszcze loggera, więc jego konfiguracja będzie poniżej
#
# KONFIGURACJA LOGOWANIA
# ------------------------------------------------------
# Upewniamy się, że katalog logów istnieje
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE = os.path.join(LOG_DIR, "daemon.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
@ -44,24 +53,24 @@ logging.basicConfig(
)
logger = logging.getLogger("hosts_daemon")
# 1) Najpierw próbujemy odczytać token z pliku
file_token = read_token_from_file(TOKEN_FILE_PATH)
# 2) Jeśli w pliku nie ma tokenu, sprawdzamy zmienną środowiskową
#
# WYCZYTUJEMY TOKEN
# ------------------------------------------------------
file_token = read_token_from_file(TOKEN_FILE_PATH)
if file_token:
# Użyjemy tokenu z pliku
API_TOKEN = file_token
logger.info("API_TOKEN wczytany z pliku.")
logger.info(f"API_TOKEN wczytany z pliku: {TOKEN_FILE_PATH}")
else:
# Fallback: odczytujemy z ENV albo dajemy domyślny
env_token = os.environ.get("HOSTS_DAEMON_API_TOKEN")
if env_token:
API_TOKEN = env_token
logger.info("API_TOKEN wczytany ze zmiennej środowiskowej.")
logger.info("API_TOKEN wczytany ze zmiennej środowiskowej HOSTS_DAEMON_API_TOKEN.")
else:
API_TOKEN = "superSecretTokenABC123"
logger.info("API_TOKEN ustawiony na wartość domyślną: superSecretTokenABC123")
# Globalne metryki
metrics = {
"total_requests": 0,
@ -76,7 +85,8 @@ metrics = {
# ------------------
def require_auth():
"""Wymusza autoryzację przy pomocy nagłówka Authorization, który powinien zawierać API_TOKEN."""
"""Wymusza autoryzację przy pomocy nagłówka Authorization,
który powinien zawierać API_TOKEN."""
token = request.headers.get("Authorization")
logger.info(f"require_auth() -> Nagłówek Authorization: {token}")
if token != API_TOKEN:
@ -84,17 +94,7 @@ def require_auth():
abort(401, description="Unauthorized")
def validate_hosts_syntax(hosts_content):
"""
Sprawdza podstawową poprawność składni /etc/hosts:
- Pomija puste linie i komentarze (zaczynające się od '#')
- Wymaga poprawnego IP (IPv4 lub IPv6) na początku
- Wymaga co najmniej jednego hostnname w linii
- Wykrywa powielone wpisy (IP+hostname) w obrębie *tej* zawartości
(ograniczone do nowej treści, nie sprawdza starych plików).
Zwraca None, jeśli OK, w przeciwnym razie string z opisem błędu.
"""
import ipaddress
seen = {}
lines = hosts_content.splitlines()
for i, line in enumerate(lines, start=1):
@ -155,7 +155,6 @@ def after_request_logging(response):
@app.route('/', methods=['GET'])
def root_index():
# Możesz zwrócić coś bardziej przyjaznego niż 404
return jsonify({"info": "hosts_daemon is running. Try /health or /hosts"}), 200
@app.route('/hosts', methods=['GET'])
@ -181,7 +180,6 @@ def update_hosts():
return jsonify({"error": "Invalid request, missing 'hosts' key"}), 400
new_content = data["hosts"]
# Walidacja nowej zawartości:
error_msg = validate_hosts_syntax(new_content)
if error_msg:
logger.error(f"/hosts POST validation error: {error_msg}")
@ -228,24 +226,20 @@ def metrics_endpoint():
@app.route('/system-info', methods=['GET'])
def system_info():
# Przykładowe dane systemowe:
info = {}
# CPU
info["cpu_percent"] = psutil.cpu_percent(interval=0.1)
# RAM
mem = psutil.virtual_memory()
info["memory_total"] = mem.total
info["memory_used"] = mem.used
info["memory_percent"] = mem.percent
# Dysk (root '/')
disk = psutil.disk_usage('/')
info["disk_total"] = disk.total
info["disk_used"] = disk.used
info["disk_percent"] = disk.percent
# Dystrybucja, wersja
dist = platform.platform() # np. 'Linux-5.4.0-135-generic-x86_64-with-glibc2.29'
dist = platform.platform()
info["platform"] = dist
# Uptime
sys_uptime = time.time() - psutil.boot_time()
info["uptime_seconds"] = sys_uptime
@ -253,10 +247,11 @@ def system_info():
return jsonify(info), 200
if __name__ == '__main__':
# Możesz zalogować tylko informację, że startujesz:
logger.info("Uruchamiam hosts_daemon token wczytany, nasłuch na porcie 8000 (HTTPS).")
logger.info("Uruchamiam hosts_daemon nasłuch na porcie 8000 (HTTPS).")
logger.info(f"LOG_DIR: {LOG_DIR}")
logger.info(f"TOKEN_FILE_PATH: {TOKEN_FILE_PATH}")
app.run(
host='0.0.0.0',
port=8000,
ssl_context=('/opt/hosts_daemon/ssl/hosts_daemon.crt', '/opt/hosts_daemon/ssl/hosts_daemon.key')
ssl_context=('ssl/hosts_daemon.crt', 'ssl/hosts_daemon.key')
)