Files
nagios-plugins/check_mdns.py
2025-08-23 19:05:35 +02:00

240 lines
8.4 KiB
Python

#!/usr/bin/env python3
import sys
import argparse
import socket
import time
import os
import json
import csv
import re
import http.client
# Weryfikacja zależności
REQUIRED_MODULES = {
'zeroconf': 'zeroconf',
'netifaces': 'netifaces'
}
missing = []
for mod, pkg in REQUIRED_MODULES.items():
try:
__import__(mod)
except ImportError:
missing.append(pkg)
if missing:
print("UNKNOWN: Brakuje wymaganych modułów Pythona:")
for pkg in missing:
print(f" - {pkg}")
print(f"\nZainstaluj je poleceniem:\n pip install {' '.join(missing)}")
sys.exit(3)
import netifaces
from zeroconf import Zeroconf, ServiceBrowser, ServiceListener
MDNS_TYPES = [
"_http._tcp.local.",
"_ipp._tcp.local.",
"_airprint._tcp.local."
]
class MDNSListener(ServiceListener):
def __init__(self):
self.services = {}
def add_service(self, zeroconf, type_, name):
if name not in self.services:
self.services[name] = type_
def get_ip_for_interface(interface_name):
try:
addrs = netifaces.ifaddresses(interface_name)
ip_info = addrs.get(netifaces.AF_INET)
if ip_info:
return ip_info[0]['addr']
except Exception:
return None
return None
def get_zeroconf_instance(interface=None):
if interface:
try:
ip = socket.gethostbyname(interface)
except socket.gaierror:
ip = get_ip_for_interface(interface)
if not ip:
raise ValueError(f"Nie można ustalić IP dla interfejsu: {interface}")
return Zeroconf(interfaces=[ip])
else:
return Zeroconf()
def resolve_service(zeroconf, name, type_):
info = zeroconf.get_service_info(type_, name)
if info:
return {
"name": name,
"type": type_,
"ip": socket.inet_ntoa(info.addresses[0]) if info.addresses else None,
"port": info.port,
"hostname": info.server,
"properties": {k.decode(): v.decode() if v is not None else None for k, v in info.properties.items()}
}
return None
def test_service(ip, port):
try:
conn = http.client.HTTPConnection(ip, port, timeout=5)
conn.request("GET", "/")
response = conn.getresponse()
return response.status == 200
except Exception:
return False
def scan_mdns_services(timeout=5, interface=None, filter_type=None):
zeroconf = get_zeroconf_instance(interface)
listener = MDNSListener()
try:
types_to_scan = [filter_type] if filter_type else MDNS_TYPES
browsers = [ServiceBrowser(zeroconf, service_type, listener) for service_type in types_to_scan]
time.sleep(timeout)
resolved = {}
for name, proto in listener.services.items():
info = resolve_service(zeroconf, name, proto)
if info:
resolved[name] = info
return resolved
finally:
zeroconf.close()
def export_services(services, format):
if format == 'json':
with open('mdns_services.json', 'w') as f:
json.dump(services, f, indent=2)
print("Eksportowano do mdns_services.json")
elif format == 'csv':
with open('mdns_services.csv', 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['name', 'protocol', 'ip', 'port', 'hostname'])
for name, info in services.items():
writer.writerow([name, info['type'], info['ip'], info['port'], info['hostname']])
print("Eksportowano do mdns_services.csv")
def should_test(proto, test_mode):
return test_mode == 'all' or test_mode == proto
def get_test_status(info, test_mode):
proto_clean = info['type'].split('.')[0].strip('_')
if should_test(proto_clean, test_mode):
try:
conn = http.client.HTTPConnection(info['ip'], info['port'], timeout=5)
conn.request("GET", "/")
response = conn.getresponse()
return "OK" if response.status == 200 else f"Błąd {response.status}"
except Exception:
return "NIE"
return "-"
def main():
parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint)")
parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia (np. 'printer|tv')")
parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania mDNS (sek)")
parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz urządzenia")
parser.add_argument("--interface", help="Interfejs sieciowy lub adres IP (np. eth1, 192.168.0.1)")
parser.add_argument("--export", choices=['json', 'csv'], help="Eksport wyników")
parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj usługę")
parser.add_argument("--match-property", help="Sprawdź frazę w rekordach TXT (np. 'UUID')")
parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły (jeszcze niezaimplementowany)")
parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj usługi: ipp, http lub all")
args = parser.parse_args()
if args.service_mode:
print("UNKNOWN: Tryb serwisowy niezaimplementowany")
sys.exit(3)
if args.scan:
try:
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
if services:
script_path = os.path.realpath(__file__)
print("\n📡 Wykryte urządzenia mDNS:\n")
for name, info in services.items():
proto_clean = info['type'].split('.')[0].strip('_')
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}]")
if args.export:
export_services(services, args.export)
sys.exit(0)
else:
print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.")
sys.exit(2)
except Exception as e:
print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}")
sys.exit(3)
if not args.device:
print("UNKNOWN: Musisz podać parametr --device (chyba że używasz --scan)")
sys.exit(3)
try:
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
pattern = re.compile(args.device, re.IGNORECASE)
matches = {k: v for k, v in services.items() if pattern.search(k)}
if args.match_property:
matches = {
k: v for k, v in matches.items()
if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower()
for pk in (v['properties'] or {}))
}
if matches:
all_ok = True
devices_output = []
for name, info in matches.items():
proto_clean = info['type'].split('.')[0].strip('_')
test_status = get_test_status(info, args.test)
if args.test and should_test(proto_clean, args.test) and test_status != "OK":
all_ok = False
devices_output.append(
f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, "
f"host: {info['hostname']}, test: {test_status}, "
f"szczegóły: {info['properties'] if info['properties'] else 'brak'}]"
)
if all_ok:
print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:")
for line in devices_output:
print(line)
sys.exit(0)
else:
print(f"WARNING: Urządzenie '{args.device}' znalezione, ale test usług się nie powiódł:")
for line in devices_output:
print(line)
sys.exit(1)
elif services:
print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Dostępne inne:")
for name, info in services.items():
proto_clean = info['type'].split('.')[0].strip('_')
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]")
sys.exit(1)
else:
print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.")
sys.exit(2)
except Exception as e:
print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}")
sys.exit(3)
if __name__ == "__main__":
main()