From fd5c9ef00c3be8cfc92aec8cec9b0a94ef6dea31 Mon Sep 17 00:00:00 2001 From: gru Date: Fri, 23 May 2025 10:00:08 +0200 Subject: [PATCH] Add check_mdns.py --- check_mdns.py | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 check_mdns.py diff --git a/check_mdns.py b/check_mdns.py new file mode 100644 index 0000000..51cb49d --- /dev/null +++ b/check_mdns.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 + +import sys +import argparse +import socket +import time +import os +import json +import csv +import re + +# Weryfikacja zależności +REQUIRED_MODULES = { + 'zeroconf': 'zeroconf', + 'netifaces': 'netifaces' +} + +missing = [] +for mod, pkg in REQUIRED_MODULES.items(): + try: + __import__(mod) + except ImportError: + missing.append(pkg) + +if missing: + print("BŁĄD: Brakuje wymaganych modułów Pythona:") + for pkg in missing: + print(f" - {pkg}") + print("\nZainstaluj je poleceniem:") + print(f"pip install {' '.join(missing)}") + sys.exit(3) + +import netifaces +from zeroconf import Zeroconf, ServiceBrowser, ServiceListener, ServiceInfo + +MDNS_TYPES = [ + "_http._tcp.local.", + "_ipp._tcp.local.", + "_airprint._tcp.local." +] + +class MDNSListener(ServiceListener): + def __init__(self): + self.services = {} + + def add_service(self, zeroconf, type_, name): + if name not in self.services: + self.services[name] = type_ + +def get_ip_for_interface(interface_name): + try: + addrs = netifaces.ifaddresses(interface_name) + ip_info = addrs.get(netifaces.AF_INET) + if ip_info: + return ip_info[0]['addr'] + except Exception: + return None + return None + +def get_zeroconf_instance(interface=None): + if interface: + try: + ip = socket.gethostbyname(interface) + except socket.gaierror: + ip = get_ip_for_interface(interface) + if not ip: + raise ValueError(f"Nie można ustalić IP dla interfejsu: {interface}") + return Zeroconf(interfaces=[ip]) + else: + return Zeroconf() + +def resolve_service(zeroconf, name, type_): + info = zeroconf.get_service_info(type_, name) + if info: + return { + "name": name, + "type": type_, + "ip": socket.inet_ntoa(info.addresses[0]) if info.addresses else None, + "port": info.port, + "hostname": info.server, + "properties": {k.decode(): v.decode() if v is not None else None for k, v in info.properties.items()} + } + return None + +def scan_mdns_services(timeout=5, interface=None, filter_type=None): + zeroconf = get_zeroconf_instance(interface) + listener = MDNSListener() + try: + types_to_scan = [filter_type] if filter_type else MDNS_TYPES + browsers = [ServiceBrowser(zeroconf, service_type, listener) for service_type in types_to_scan] + time.sleep(timeout) + resolved = {} + for name, proto in listener.services.items(): + info = resolve_service(zeroconf, name, proto) + if info: + resolved[name] = info + return resolved + finally: + zeroconf.close() + +def print_scan_report(services, script_path, interface=None): + print("\n📡 Wykryte urządzenia mDNS:\n") + for name, info in services.items(): + print(f" - {name} [protokoł: {info['type'].split('.')[0].strip('_')}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") + + print("\n🔧 Propozycje komend do użycia z Nagiosem:\n") + for name, info in services.items(): + safe_device = name.split('.')[0].replace('"', '\\"') + cmd = f"{script_path} --device \"{safe_device}\"" + if interface: + cmd += f" --interface {interface}" + print(f" {cmd}") + +def export_services(services, format): + if format == 'json': + with open('mdns_services.json', 'w') as f: + json.dump(services, f, indent=2) + print("Eksportowano do mdns_services.json") + elif format == 'csv': + with open('mdns_services.csv', 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['name', 'protocol', 'ip', 'port', 'hostname']) + for name, info in services.items(): + writer.writerow([name, info['type'], info['ip'], info['port'], info['hostname']]) + print("Eksportowano do mdns_services.csv") + +def main(): + parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z obsługą interfejsów, eksportu, regexów, szczegółów usług i filtrowania.") + parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')") + parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)") + parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend") + parser.add_argument("--interface", help="Nazwa interfejsu sieciowego lub adres IP (np. eth1 lub 192.168.10.1)") + parser.add_argument("--export", choices=['json', 'csv'], help="Eksportuj wyniki skanu do pliku JSON lub CSV") + parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)") + parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')") + parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym") + + args = parser.parse_args() + + if args.service_mode: + print("Tryb serwisowy nie został jeszcze zaimplementowany.") + sys.exit(3) + + if args.scan: + try: + services = scan_mdns_services(args.timeout, args.interface, args.filter_type) + if services: + script_path = os.path.realpath(__file__) + print_scan_report(services, script_path, args.interface) + if args.export: + export_services(services, args.export) + sys.exit(0) + else: + print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.") + sys.exit(2) + except Exception as e: + print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}") + sys.exit(3) + + if not args.device: + print("BŁĄD: Musisz podać parametr --device (chyba że używasz --scan)") + sys.exit(3) + + try: + services = scan_mdns_services(args.timeout, args.interface, args.filter_type) + pattern = re.compile(args.device, re.IGNORECASE) + matches = {k: v for k, v in services.items() if pattern.search(k)} + + if args.match_property: + matches = {k: v for k, v in matches.items() if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower() for pk in (v['properties'] or {}))} + + if matches: + print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:") + for name, info in matches.items(): + proto_clean = info['type'].split('.')[0].strip('_') + print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") + sys.exit(0) + elif services: + print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:") + for name, info in services.items(): + proto_clean = info['type'].split('.')[0].strip('_') + print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]") + sys.exit(1) + else: + print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.") + sys.exit(2) + except Exception as e: + print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}") + sys.exit(3) + +if __name__ == "__main__": + main()