From 40e8a092fc59edf28b6d2b55c28b4c0122f042df Mon Sep 17 00:00:00 2001 From: gru Date: Sat, 23 Aug 2025 19:05:35 +0200 Subject: [PATCH] Update check_mdns.py --- check_mdns.py | 104 +++++++++++++++++++++++++------------------------- 1 file changed, 53 insertions(+), 51 deletions(-) diff --git a/check_mdns.py b/check_mdns.py index 5490c63..3451231 100644 --- a/check_mdns.py +++ b/check_mdns.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 - import sys import argparse import socket @@ -24,15 +23,14 @@ for mod, pkg in REQUIRED_MODULES.items(): missing.append(pkg) if missing: - print("BŁĄD: Brakuje wymaganych modułów Pythona:") + print("UNKNOWN: Brakuje wymaganych modułów Pythona:") for pkg in missing: print(f" - {pkg}") - print("\nZainstaluj je poleceniem:") - print(f"pip install {' '.join(missing)}") + print(f"\nZainstaluj je poleceniem:\n pip install {' '.join(missing)}") sys.exit(3) import netifaces -from zeroconf import Zeroconf, ServiceBrowser, ServiceListener, ServiceInfo +from zeroconf import Zeroconf, ServiceBrowser, ServiceListener MDNS_TYPES = [ "_http._tcp.local.", @@ -48,6 +46,7 @@ class MDNSListener(ServiceListener): if name not in self.services: self.services[name] = type_ + def get_ip_for_interface(interface_name): try: addrs = netifaces.ifaddresses(interface_name) @@ -58,6 +57,7 @@ def get_ip_for_interface(interface_name): return None return None + def get_zeroconf_instance(interface=None): if interface: try: @@ -70,6 +70,7 @@ def get_zeroconf_instance(interface=None): else: return Zeroconf() + def resolve_service(zeroconf, name, type_): info = zeroconf.get_service_info(type_, name) if info: @@ -83,6 +84,7 @@ def resolve_service(zeroconf, name, type_): } return None + def test_service(ip, port): try: conn = http.client.HTTPConnection(ip, port, timeout=5) @@ -92,6 +94,7 @@ def test_service(ip, port): except Exception: return False + def scan_mdns_services(timeout=5, interface=None, filter_type=None): zeroconf = get_zeroconf_instance(interface) listener = MDNSListener() @@ -108,6 +111,7 @@ def scan_mdns_services(timeout=5, interface=None, filter_type=None): finally: zeroconf.close() + def export_services(services, format): if format == 'json': with open('mdns_services.json', 'w') as f: @@ -121,37 +125,11 @@ def export_services(services, format): writer.writerow([name, info['type'], info['ip'], info['port'], info['hostname']]) print("Eksportowano do mdns_services.csv") -def print_scan_report(services, script_path, interface=None, test_mode=None): - print("\n📡 Wykryte urządzenia mDNS:\n") - for name, info in services.items(): - proto_clean = info['type'].split('.')[0].strip('_') - if test_mode in ['all', proto_clean]: - from http.client import HTTPConnection - try: - conn = HTTPConnection(info['ip'], info['port'], timeout=5) - conn.request("GET", "/") - resp = conn.getresponse() - test_status = "OK" if resp.status == 200 else f"Błąd {resp.status}" - except Exception as e: - test_status = "NIE" - else: - test_status = "-" - - print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") - - print("\n🔧 Propozycje komend do użycia z Nagiosem:\n") - for name, info in services.items(): - safe_device = name.split('.')[0].replace('"', '\\"') - cmd = f"{script_path} --device \"{safe_device}\"" - if interface: - cmd += f" --interface {interface}" - if test_mode: - cmd += f" --test {test_mode}" - print(f" {cmd}") def should_test(proto, test_mode): return test_mode == 'all' or test_mode == proto + def get_test_status(info, test_mode): proto_clean = info['type'].split('.')[0].strip('_') if should_test(proto_clean, test_mode): @@ -164,22 +142,23 @@ def get_test_status(info, test_mode): return "NIE" return "-" + def main(): - parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z testem działania, eksportem, regexami, filtrowaniem i pełnym podglądem usług.") - parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')") - parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)") - parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend") - parser.add_argument("--interface", help="Nazwa interfejsu sieciowego lub adres IP (np. eth1 lub 192.168.10.1)") - parser.add_argument("--export", choices=['json', 'csv'], help="Eksportuj wyniki skanu do pliku JSON lub CSV") - parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)") - parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')") - parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym") - parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj działanie usług: ipp, http lub all") + parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint)") + parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia (np. 'printer|tv')") + parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania mDNS (sek)") + parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz urządzenia") + parser.add_argument("--interface", help="Interfejs sieciowy lub adres IP (np. eth1, 192.168.0.1)") + parser.add_argument("--export", choices=['json', 'csv'], help="Eksport wyników") + parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj usługę") + parser.add_argument("--match-property", help="Sprawdź frazę w rekordach TXT (np. 'UUID')") + parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły (jeszcze niezaimplementowany)") + parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj usługi: ipp, http lub all") args = parser.parse_args() if args.service_mode: - print("Tryb serwisowy nie został jeszcze zaimplementowany.") + print("UNKNOWN: Tryb serwisowy niezaimplementowany") sys.exit(3) if args.scan: @@ -187,7 +166,10 @@ def main(): services = scan_mdns_services(args.timeout, args.interface, args.filter_type) if services: script_path = os.path.realpath(__file__) - print_scan_report(services, script_path, args.interface, args.test) + print("\n📡 Wykryte urządzenia mDNS:\n") + for name, info in services.items(): + proto_clean = info['type'].split('.')[0].strip('_') + print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}]") if args.export: export_services(services, args.export) sys.exit(0) @@ -199,7 +181,7 @@ def main(): sys.exit(3) if not args.device: - print("BŁĄD: Musisz podać parametr --device (chyba że używasz --scan)") + print("UNKNOWN: Musisz podać parametr --device (chyba że używasz --scan)") sys.exit(3) try: @@ -208,30 +190,50 @@ def main(): matches = {k: v for k, v in services.items() if pattern.search(k)} if args.match_property: - matches = {k: v for k, v in matches.items() if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower() for pk in (v['properties'] or {}))} + matches = { + k: v for k, v in matches.items() + if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower() + for pk in (v['properties'] or {})) + } if matches: - print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:") all_ok = True + devices_output = [] for name, info in matches.items(): proto_clean = info['type'].split('.')[0].strip('_') test_status = get_test_status(info, args.test) if args.test and should_test(proto_clean, args.test) and test_status != "OK": all_ok = False - print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") - sys.exit(0 if all_ok else 2) + devices_output.append( + f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, " + f"host: {info['hostname']}, test: {test_status}, " + f"szczegóły: {info['properties'] if info['properties'] else 'brak'}]" + ) + + if all_ok: + print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:") + for line in devices_output: + print(line) + sys.exit(0) + else: + print(f"WARNING: Urządzenie '{args.device}' znalezione, ale test usług się nie powiódł:") + for line in devices_output: + print(line) + sys.exit(1) elif services: - print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:") + print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Dostępne inne:") for name, info in services.items(): proto_clean = info['type'].split('.')[0].strip('_') - print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]") + print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]") sys.exit(1) else: print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.") sys.exit(2) + except Exception as e: print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}") sys.exit(3) + if __name__ == "__main__": main()