#!/usr/bin/env python3 import sys import argparse import socket import time import os import json import csv import re import http.client # Weryfikacja zależności REQUIRED_MODULES = { 'zeroconf': 'zeroconf', 'netifaces': 'netifaces' } missing = [] for mod, pkg in REQUIRED_MODULES.items(): try: __import__(mod) except ImportError: missing.append(pkg) if missing: print("BŁĄD: Brakuje wymaganych modułów Pythona:") for pkg in missing: print(f" - {pkg}") print("\nZainstaluj je poleceniem:") print(f"pip install {' '.join(missing)}") sys.exit(3) import netifaces from zeroconf import Zeroconf, ServiceBrowser, ServiceListener, ServiceInfo MDNS_TYPES = [ "_http._tcp.local.", "_ipp._tcp.local.", "_airprint._tcp.local." ] class MDNSListener(ServiceListener): def __init__(self): self.services = {} def add_service(self, zeroconf, type_, name): if name not in self.services: self.services[name] = type_ def get_ip_for_interface(interface_name): try: addrs = netifaces.ifaddresses(interface_name) ip_info = addrs.get(netifaces.AF_INET) if ip_info: return ip_info[0]['addr'] except Exception: return None return None def get_zeroconf_instance(interface=None): if interface: try: ip = socket.gethostbyname(interface) except socket.gaierror: ip = get_ip_for_interface(interface) if not ip: raise ValueError(f"Nie można ustalić IP dla interfejsu: {interface}") return Zeroconf(interfaces=[ip]) else: return Zeroconf() def resolve_service(zeroconf, name, type_): info = zeroconf.get_service_info(type_, name) if info: return { "name": name, "type": type_, "ip": socket.inet_ntoa(info.addresses[0]) if info.addresses else None, "port": info.port, "hostname": info.server, "properties": {k.decode(): v.decode() if v is not None else None for k, v in info.properties.items()} } return None def test_service(ip, port): try: conn = http.client.HTTPConnection(ip, port, timeout=5) conn.request("GET", "/") response = conn.getresponse() return response.status == 200 except Exception: return False def scan_mdns_services(timeout=5, interface=None, filter_type=None): zeroconf = get_zeroconf_instance(interface) listener = MDNSListener() try: types_to_scan = [filter_type] if filter_type else MDNS_TYPES browsers = [ServiceBrowser(zeroconf, service_type, listener) for service_type in types_to_scan] time.sleep(timeout) resolved = {} for name, proto in listener.services.items(): info = resolve_service(zeroconf, name, proto) if info: resolved[name] = info return resolved finally: zeroconf.close() def export_services(services, format): if format == 'json': with open('mdns_services.json', 'w') as f: json.dump(services, f, indent=2) print("Eksportowano do mdns_services.json") elif format == 'csv': with open('mdns_services.csv', 'w', newline='') as f: writer = csv.writer(f) writer.writerow(['name', 'protocol', 'ip', 'port', 'hostname']) for name, info in services.items(): writer.writerow([name, info['type'], info['ip'], info['port'], info['hostname']]) print("Eksportowano do mdns_services.csv") def print_scan_report(services, script_path, interface=None, test_mode=None): print("\n📡 Wykryte urządzenia mDNS:\n") for name, info in services.items(): proto_clean = info['type'].split('.')[0].strip('_') if test_mode in ['all', proto_clean]: from http.client import HTTPConnection try: conn = HTTPConnection(info['ip'], info['port'], timeout=5) conn.request("GET", "/") resp = conn.getresponse() test_status = "OK" if resp.status == 200 else f"Błąd {resp.status}" except Exception as e: test_status = "NIE" else: test_status = "-" print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") print("\n🔧 Propozycje komend do użycia z Nagiosem:\n") for name, info in services.items(): safe_device = name.split('.')[0].replace('"', '\\"') cmd = f"{script_path} --device \"{safe_device}\"" if interface: cmd += f" --interface {interface}" if test_mode: cmd += f" --test {test_mode}" print(f" {cmd}") def should_test(proto, test_mode): return test_mode == 'all' or test_mode == proto def get_test_status(info, test_mode): proto_clean = info['type'].split('.')[0].strip('_') if should_test(proto_clean, test_mode): try: conn = http.client.HTTPConnection(info['ip'], info['port'], timeout=5) conn.request("GET", "/") response = conn.getresponse() return "OK" if response.status == 200 else f"Błąd {response.status}" except Exception: return "NIE" return "-" def main(): parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z testem działania, eksportem, regexami, filtrowaniem i pełnym podglądem usług.") parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')") parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)") parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend") parser.add_argument("--interface", help="Nazwa interfejsu sieciowego lub adres IP (np. eth1 lub 192.168.10.1)") parser.add_argument("--export", choices=['json', 'csv'], help="Eksportuj wyniki skanu do pliku JSON lub CSV") parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)") parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')") parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym") parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj działanie usług: ipp, http lub all") args = parser.parse_args() if args.service_mode: print("Tryb serwisowy nie został jeszcze zaimplementowany.") sys.exit(3) if args.scan: try: services = scan_mdns_services(args.timeout, args.interface, args.filter_type) if services: script_path = os.path.realpath(__file__) print_scan_report(services, script_path, args.interface, args.test) if args.export: export_services(services, args.export) sys.exit(0) else: print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.") sys.exit(2) except Exception as e: print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}") sys.exit(3) if not args.device: print("BŁĄD: Musisz podać parametr --device (chyba że używasz --scan)") sys.exit(3) try: services = scan_mdns_services(args.timeout, args.interface, args.filter_type) pattern = re.compile(args.device, re.IGNORECASE) matches = {k: v for k, v in services.items() if pattern.search(k)} if args.match_property: matches = {k: v for k, v in matches.items() if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower() for pk in (v['properties'] or {}))} if matches: print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:") all_ok = True for name, info in matches.items(): proto_clean = info['type'].split('.')[0].strip('_') test_status = get_test_status(info, args.test) if args.test and should_test(proto_clean, args.test) and test_status != "OK": all_ok = False print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]") sys.exit(0 if all_ok else 2) elif services: print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:") for name, info in services.items(): proto_clean = info['type'].split('.')[0].strip('_') print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]") sys.exit(1) else: print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.") sys.exit(2) except Exception as e: print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}") sys.exit(3) if __name__ == "__main__": main()