Files
skrypty_narzedzia/service_watcher.py
2025-10-07 11:55:34 +02:00

168 lines
5.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
service_watcher.py — prosty watcher do crona
DZIAŁANIE
- Oblicza sumę SHA256 monitorowanego pliku.
- Porównuje z poprzednią wartością zapisaną w /dev/shm (tmpfs, RAM).
- Gdy są zmiany → restartuje podane usługi (systemd) i aktualizuje sumę.
UŻYCIE
python3 service_watcher.py -f /ścieżka/do/pliku -s nginx -s php-fpm
python3 service_watcher.py -f /etc/nginx/nginx.conf -s nginx
# lista po przecinku też działa:
python3 service_watcher.py -f /etc/my.conf -s "nginx,php-fpm@1,redis"
CRON (przykład: co minutę)
* * * * * /usr/bin/python3 /usr/local/bin/service_watcher.py -f /etc/nginx/nginx.conf -s nginx
* * * * * /usr/bin/python3 /usr/local/bin/service_watcher.py -f /etc/hosts -s dnsmasq.service -s dnsdist -s unbound -q
WYMAGANIA
- systemd (systemctl), uprawnienia do restartu usług (root lub sudoers).
- Python 3.6+
OPCJE
-f/--file monitorowany plik (wymagane)
-s/--service usługa; można podawać wiele razy lub jako CSV
--state-dir katalog na stan (domyślnie: /dev/shm/service_watcher)
--no-hash użyj tylko mtime+size (szybciej, mniej dokładnie)
-q/--quiet bez logów (kody wyjścia nadal znaczące)
KODY WYJŚCIA
0: brak zmian / restart(-y) OK
1: błąd wykonania
2: plik nie istnieje
"""
import argparse
import hashlib
import os
import sys
import json
import fcntl
import subprocess
from pathlib import Path
from typing import List
def parse_args():
p = argparse.ArgumentParser(description="Restartuje usługi po zmianie pliku.")
p.add_argument("-f", "--file", required=True, help="Monitorowany plik.")
p.add_argument("-s", "--service", action="append", default=[],
help="Usługa (systemd). Można powtarzać lub podać CSV.")
p.add_argument("--state-dir", default="/dev/shm/service_watcher",
help="Katalog na stan (domyślnie: /dev/shm/service_watcher).")
p.add_argument("--no-hash", action="store_true",
help="Porównuj tylko mtime+size (bez SHA256).")
p.add_argument("-q", "--quiet", action="store_true", help="Tryb cichy.")
return p.parse_args()
def log(msg: str, quiet: bool):
if not quiet:
print(msg)
def split_services(svcs: List[str]) -> List[str]:
out = []
for s in svcs:
if s:
out.extend([x.strip() for x in s.split(",") if x.strip()])
return list(dict.fromkeys(out)) # bez duplikatów, zachowaj kolejność
def file_signature(path: Path, use_hash: bool) -> str:
st = path.stat()
base = f"{st.st_mtime_ns}:{st.st_size}"
if not use_hash:
return base
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return base + ":" + h.hexdigest()
def lockfile(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
f = path.open("a+")
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
return f
def state_paths(state_dir: Path, target_file: Path):
safe = target_file.as_posix().replace("/", "_")
meta = state_dir / f"{safe}.json"
lck = state_dir / f"{safe}.lock"
return meta, lck
def load_state(p: Path) -> dict:
if not p.exists():
return {}
try:
return json.loads(p.read_text() or "{}")
except Exception:
return {}
def save_state(p: Path, data: dict):
p.parent.mkdir(parents=True, exist_ok=True)
tmp = p.with_suffix(".json.tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False))
os.replace(tmp, p)
def restart_service(svc: str) -> subprocess.CompletedProcess:
# pozwala działać z sudoers bez hasła dla systemctl
return subprocess.run(
["systemctl", "restart", svc],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=False
)
def main():
args = parse_args()
file_path = Path(args.file)
if not file_path.exists():
print(f"Plik nie istnieje: {file_path}", file=sys.stderr)
sys.exit(2)
services = split_services(args.service)
if not services:
print("Podaj co najmniej jedną usługę (-s/--service).", file=sys.stderr)
sys.exit(1)
state_dir = Path(args.state_dir)
meta_path, lock_path = state_paths(state_dir, file_path)
# blokada współbieżnych uruchomień (np. overlap w cron)
with lockfile(lock_path):
sig = file_signature(file_path, use_hash=not args.no_hash)
state = load_state(meta_path)
prev = state.get("signature")
if prev == sig:
log("Brak zmian — nic do zrobienia.", args.quiet)
sys.exit(0)
log(f"Wykryto zmianę w {file_path}. Restart usług: {', '.join(services)}", args.quiet)
failures = []
for svc in services:
res = restart_service(svc)
if res.returncode != 0:
failures.append((svc, res.stderr.strip()))
log(f"[FAIL] {svc}: {res.stderr.strip()}", args.quiet)
else:
log(f"[OK] {svc}", args.quiet)
if failures:
# nie zapisuj nowej sygnatury, aby ponowić przy następnym uruchomieniu
print("Co najmniej jeden restart nie powiódł się.", file=sys.stderr)
for svc, err in failures:
print(f"{svc}: {err}", file=sys.stderr)
sys.exit(1)
# zapisz nową sygnaturę po udanych restartach
state.update({"signature": sig})
save_state(meta_path, state)
log("Zakończono pomyślnie.", args.quiet)
sys.exit(0)
if __name__ == "__main__":
main()