Files
tools_scripts/usr/local/bin/check_dns_restart.py

172 lines
5.6 KiB
Python

#!/usr/bin/env python3
"""
Skrypt sprawdzający dostępność usług DNS na portach 53, 54, 55
i restartujący je jeśli połączenie z internetem jest dostępne
"""
import dns.resolver
import dns.message
import dns.query
import subprocess
import sys
import logging
import socket
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/dns_check_restart.log'),
logging.StreamHandler()
]
)
DNS_PORTS = {
53: {'service': 'dnsdist', 'name': 'dnsdist'},
54: {'service': 'dnsmasq', 'name': 'dnsmasq'},
55: {'service': 'unbound', 'name': 'unbound'}
}
TIMEOUT = 3
def check_internet_connection():
"""Sprawdza czy jest dostępne połączenie z internetem używając zewnętrznych DNS"""
logging.info("Sprawdzam połączenie z internetem...")
external_resolvers = ['8.8.8.8', '1.1.1.1', '9.9.9.9']
for resolver_ip in external_resolvers:
try:
resolver = dns.resolver.Resolver()
resolver.nameservers = [resolver_ip]
resolver.timeout = TIMEOUT
resolver.lifetime = TIMEOUT
answer = resolver.resolve('google.com', 'A')
if answer:
logging.info(f"Połączenie z internetem OK (resolver: {resolver_ip})")
return True
except Exception as e:
logging.debug(f"Błąd przy sprawdzaniu z {resolver_ip}: {e}")
continue
logging.warning("Brak połączenia z internetem")
return False
def test_dns_port(port, server='127.0.0.1'):
"""Testuje czy DNS odpowiada na danym porcie używając niskopoziomowego dns.query"""
logging.info(f"Testuję DNS na porcie {port} (serwer: {server})...")
try:
# Tworzymy zapytanie DNS dla google.com
query = dns.message.make_query('google.com', dns.rdatatype.A)
# Wysyłamy zapytanie UDP
response = dns.query.udp(query, server, timeout=TIMEOUT, port=port)
# Sprawdzamy czy otrzymaliśmy odpowiedź
if response and len(response.answer) > 0:
logging.info(f"Port {port}: OK - otrzymano odpowiedź ({len(response.answer)} rekordów)")
return True
elif response:
# Otrzymaliśmy odpowiedź, ale bez answer section (może NXDOMAIN)
logging.info(f"Port {port}: OK - serwer odpowiada (rcode: {response.rcode()})")
return True
else:
logging.error(f"Port {port}: BŁĄD - brak odpowiedzi")
return False
except dns.exception.Timeout:
logging.error(f"Port {port}: TIMEOUT - brak odpowiedzi")
return False
except socket.error as e:
logging.error(f"Port {port}: BŁĄD SOCKET - {e}")
return False
except Exception as e:
logging.error(f"Port {port}: BŁĄD - {type(e).__name__}: {e}")
return False
def restart_service(service_name):
"""Restartuje usługę systemd"""
try:
logging.info(f"Restartuję usługę {service_name}...")
result = subprocess.run(
['systemctl', 'restart', service_name],
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logging.info(f"Usługa {service_name} zrestartowana pomyślnie")
return True
else:
logging.error(f"Błąd restartowania {service_name}: {result.stderr}")
return False
except subprocess.TimeoutExpired:
logging.error(f"Timeout przy restartowaniu {service_name}")
return False
except Exception as e:
logging.error(f"Wyjątek przy restartowaniu {service_name}: {e}")
return False
def main():
logging.info("=" * 60)
logging.info("START: DNS Health Check & Restart Script")
logging.info("=" * 60)
if not check_internet_connection():
logging.error("Brak połączenia z internetem - przerywam działanie skryptu")
sys.exit(1)
results = {}
services_to_restart = []
for port, info in DNS_PORTS.items():
# Wszystkie testy wykonujemy na 127.0.0.1
is_working = test_dns_port(port, server='127.0.0.1')
results[port] = is_working
if not is_working:
services_to_restart.append(info['service'])
logging.info("")
logging.info("=" * 60)
logging.info("PODSUMOWANIE TESTÓW DNS:")
for port, status in results.items():
status_text = "✓ OK" if status else "✗ BŁĄD"
logging.info(f" Port {port} ({DNS_PORTS[port]['name']}): {status_text}")
logging.info("=" * 60)
logging.info("")
if services_to_restart:
logging.info(f"Usługi do restartu: {', '.join(services_to_restart)}")
for service in services_to_restart:
restart_service(service)
logging.info("")
logging.info("Oczekuję 5 sekund na uruchomienie usług...")
import time
time.sleep(5)
logging.info("")
logging.info("PONOWNE SPRAWDZENIE PO RESTARCIE:")
for port, info in DNS_PORTS.items():
if info['service'] in services_to_restart:
is_working = test_dns_port(port, server='127.0.0.1')
status_text = "✓ DZIAŁA" if is_working else "✗ NADAL NIE DZIAŁA"
logging.info(f" Port {port} ({info['name']}): {status_text}")
else:
logging.info("Wszystkie usługi DNS działają prawidłowo - brak potrzeby restartu")
logging.info("=" * 60)
logging.info("KONIEC: DNS Health Check & Restart Script")
logging.info("=" * 60)
if __name__ == "__main__":
main()