#!/usr/bin/env python3 """ Skrypt sprawdzający dostępność usług DNS na portach 53, 54, 55 i restartujący je jeśli połączenie z internetem jest dostępne """ import dns.resolver import dns.message import dns.query import subprocess import sys import logging import socket from datetime import datetime logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/var/log/dns_check_restart.log'), logging.StreamHandler() ] ) DNS_PORTS = { 53: {'service': 'dnsdist', 'name': 'dnsdist'}, 54: {'service': 'dnsmasq', 'name': 'dnsmasq'}, 55: {'service': 'unbound', 'name': 'unbound'} } TIMEOUT = 3 def check_internet_connection(): """Sprawdza czy jest dostępne połączenie z internetem używając zewnętrznych DNS""" logging.info("Sprawdzam połączenie z internetem...") external_resolvers = ['8.8.8.8', '1.1.1.1', '9.9.9.9'] for resolver_ip in external_resolvers: try: resolver = dns.resolver.Resolver() resolver.nameservers = [resolver_ip] resolver.timeout = TIMEOUT resolver.lifetime = TIMEOUT answer = resolver.resolve('google.com', 'A') if answer: logging.info(f"Połączenie z internetem OK (resolver: {resolver_ip})") return True except Exception as e: logging.debug(f"Błąd przy sprawdzaniu z {resolver_ip}: {e}") continue logging.warning("Brak połączenia z internetem") return False def test_dns_port(port, server='127.0.0.1'): """Testuje czy DNS odpowiada na danym porcie używając niskopoziomowego dns.query""" logging.info(f"Testuję DNS na porcie {port} (serwer: {server})...") try: # Tworzymy zapytanie DNS dla google.com query = dns.message.make_query('google.com', dns.rdatatype.A) # Wysyłamy zapytanie UDP response = dns.query.udp(query, server, timeout=TIMEOUT, port=port) # Sprawdzamy czy otrzymaliśmy odpowiedź if response and len(response.answer) > 0: logging.info(f"Port {port}: OK - otrzymano odpowiedź ({len(response.answer)} rekordów)") return True elif response: # Otrzymaliśmy odpowiedź, ale bez answer section (może NXDOMAIN) logging.info(f"Port {port}: OK - serwer odpowiada (rcode: {response.rcode()})") return True else: logging.error(f"Port {port}: BŁĄD - brak odpowiedzi") return False except dns.exception.Timeout: logging.error(f"Port {port}: TIMEOUT - brak odpowiedzi") return False except socket.error as e: logging.error(f"Port {port}: BŁĄD SOCKET - {e}") return False except Exception as e: logging.error(f"Port {port}: BŁĄD - {type(e).__name__}: {e}") return False def restart_service(service_name): """Restartuje usługę systemd""" try: logging.info(f"Restartuję usługę {service_name}...") result = subprocess.run( ['systemctl', 'restart', service_name], capture_output=True, text=True, timeout=30 ) if result.returncode == 0: logging.info(f"Usługa {service_name} zrestartowana pomyślnie") return True else: logging.error(f"Błąd restartowania {service_name}: {result.stderr}") return False except subprocess.TimeoutExpired: logging.error(f"Timeout przy restartowaniu {service_name}") return False except Exception as e: logging.error(f"Wyjątek przy restartowaniu {service_name}: {e}") return False def main(): logging.info("=" * 60) logging.info("START: DNS Health Check & Restart Script") logging.info("=" * 60) if not check_internet_connection(): logging.error("Brak połączenia z internetem - przerywam działanie skryptu") sys.exit(1) results = {} services_to_restart = [] for port, info in DNS_PORTS.items(): # Wszystkie testy wykonujemy na 127.0.0.1 is_working = test_dns_port(port, server='127.0.0.1') results[port] = is_working if not is_working: services_to_restart.append(info['service']) logging.info("") logging.info("=" * 60) logging.info("PODSUMOWANIE TESTÓW DNS:") for port, status in results.items(): status_text = "✓ OK" if status else "✗ BŁĄD" logging.info(f" Port {port} ({DNS_PORTS[port]['name']}): {status_text}") logging.info("=" * 60) logging.info("") if services_to_restart: logging.info(f"Usługi do restartu: {', '.join(services_to_restart)}") for service in services_to_restart: restart_service(service) logging.info("") logging.info("Oczekuję 5 sekund na uruchomienie usług...") import time time.sleep(5) logging.info("") logging.info("PONOWNE SPRAWDZENIE PO RESTARCIE:") for port, info in DNS_PORTS.items(): if info['service'] in services_to_restart: is_working = test_dns_port(port, server='127.0.0.1') status_text = "✓ DZIAŁA" if is_working else "✗ NADAL NIE DZIAŁA" logging.info(f" Port {port} ({info['name']}): {status_text}") else: logging.info("Wszystkie usługi DNS działają prawidłowo - brak potrzeby restartu") logging.info("=" * 60) logging.info("KONIEC: DNS Health Check & Restart Script") logging.info("=" * 60) if __name__ == "__main__": main()