Add check_mdns.py
This commit is contained in:
192
check_mdns.py
Normal file
192
check_mdns.py
Normal file
@ -0,0 +1,192 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import socket
|
||||
import time
|
||||
import os
|
||||
import json
|
||||
import csv
|
||||
import re
|
||||
|
||||
# Weryfikacja zależności
|
||||
REQUIRED_MODULES = {
|
||||
'zeroconf': 'zeroconf',
|
||||
'netifaces': 'netifaces'
|
||||
}
|
||||
|
||||
missing = []
|
||||
for mod, pkg in REQUIRED_MODULES.items():
|
||||
try:
|
||||
__import__(mod)
|
||||
except ImportError:
|
||||
missing.append(pkg)
|
||||
|
||||
if missing:
|
||||
print("BŁĄD: Brakuje wymaganych modułów Pythona:")
|
||||
for pkg in missing:
|
||||
print(f" - {pkg}")
|
||||
print("\nZainstaluj je poleceniem:")
|
||||
print(f"pip install {' '.join(missing)}")
|
||||
sys.exit(3)
|
||||
|
||||
import netifaces
|
||||
from zeroconf import Zeroconf, ServiceBrowser, ServiceListener, ServiceInfo
|
||||
|
||||
MDNS_TYPES = [
|
||||
"_http._tcp.local.",
|
||||
"_ipp._tcp.local.",
|
||||
"_airprint._tcp.local."
|
||||
]
|
||||
|
||||
class MDNSListener(ServiceListener):
|
||||
def __init__(self):
|
||||
self.services = {}
|
||||
|
||||
def add_service(self, zeroconf, type_, name):
|
||||
if name not in self.services:
|
||||
self.services[name] = type_
|
||||
|
||||
def get_ip_for_interface(interface_name):
|
||||
try:
|
||||
addrs = netifaces.ifaddresses(interface_name)
|
||||
ip_info = addrs.get(netifaces.AF_INET)
|
||||
if ip_info:
|
||||
return ip_info[0]['addr']
|
||||
except Exception:
|
||||
return None
|
||||
return None
|
||||
|
||||
def get_zeroconf_instance(interface=None):
|
||||
if interface:
|
||||
try:
|
||||
ip = socket.gethostbyname(interface)
|
||||
except socket.gaierror:
|
||||
ip = get_ip_for_interface(interface)
|
||||
if not ip:
|
||||
raise ValueError(f"Nie można ustalić IP dla interfejsu: {interface}")
|
||||
return Zeroconf(interfaces=[ip])
|
||||
else:
|
||||
return Zeroconf()
|
||||
|
||||
def resolve_service(zeroconf, name, type_):
|
||||
info = zeroconf.get_service_info(type_, name)
|
||||
if info:
|
||||
return {
|
||||
"name": name,
|
||||
"type": type_,
|
||||
"ip": socket.inet_ntoa(info.addresses[0]) if info.addresses else None,
|
||||
"port": info.port,
|
||||
"hostname": info.server,
|
||||
"properties": {k.decode(): v.decode() if v is not None else None for k, v in info.properties.items()}
|
||||
}
|
||||
return None
|
||||
|
||||
def scan_mdns_services(timeout=5, interface=None, filter_type=None):
|
||||
zeroconf = get_zeroconf_instance(interface)
|
||||
listener = MDNSListener()
|
||||
try:
|
||||
types_to_scan = [filter_type] if filter_type else MDNS_TYPES
|
||||
browsers = [ServiceBrowser(zeroconf, service_type, listener) for service_type in types_to_scan]
|
||||
time.sleep(timeout)
|
||||
resolved = {}
|
||||
for name, proto in listener.services.items():
|
||||
info = resolve_service(zeroconf, name, proto)
|
||||
if info:
|
||||
resolved[name] = info
|
||||
return resolved
|
||||
finally:
|
||||
zeroconf.close()
|
||||
|
||||
def print_scan_report(services, script_path, interface=None):
|
||||
print("\n📡 Wykryte urządzenia mDNS:\n")
|
||||
for name, info in services.items():
|
||||
print(f" - {name} [protokoł: {info['type'].split('.')[0].strip('_')}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]")
|
||||
|
||||
print("\n🔧 Propozycje komend do użycia z Nagiosem:\n")
|
||||
for name, info in services.items():
|
||||
safe_device = name.split('.')[0].replace('"', '\\"')
|
||||
cmd = f"{script_path} --device \"{safe_device}\""
|
||||
if interface:
|
||||
cmd += f" --interface {interface}"
|
||||
print(f" {cmd}")
|
||||
|
||||
def export_services(services, format):
|
||||
if format == 'json':
|
||||
with open('mdns_services.json', 'w') as f:
|
||||
json.dump(services, f, indent=2)
|
||||
print("Eksportowano do mdns_services.json")
|
||||
elif format == 'csv':
|
||||
with open('mdns_services.csv', 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['name', 'protocol', 'ip', 'port', 'hostname'])
|
||||
for name, info in services.items():
|
||||
writer.writerow([name, info['type'], info['ip'], info['port'], info['hostname']])
|
||||
print("Eksportowano do mdns_services.csv")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z obsługą interfejsów, eksportu, regexów, szczegółów usług i filtrowania.")
|
||||
parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')")
|
||||
parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)")
|
||||
parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend")
|
||||
parser.add_argument("--interface", help="Nazwa interfejsu sieciowego lub adres IP (np. eth1 lub 192.168.10.1)")
|
||||
parser.add_argument("--export", choices=['json', 'csv'], help="Eksportuj wyniki skanu do pliku JSON lub CSV")
|
||||
parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)")
|
||||
parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')")
|
||||
parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.service_mode:
|
||||
print("Tryb serwisowy nie został jeszcze zaimplementowany.")
|
||||
sys.exit(3)
|
||||
|
||||
if args.scan:
|
||||
try:
|
||||
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
|
||||
if services:
|
||||
script_path = os.path.realpath(__file__)
|
||||
print_scan_report(services, script_path, args.interface)
|
||||
if args.export:
|
||||
export_services(services, args.export)
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.")
|
||||
sys.exit(2)
|
||||
except Exception as e:
|
||||
print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}")
|
||||
sys.exit(3)
|
||||
|
||||
if not args.device:
|
||||
print("BŁĄD: Musisz podać parametr --device (chyba że używasz --scan)")
|
||||
sys.exit(3)
|
||||
|
||||
try:
|
||||
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
|
||||
pattern = re.compile(args.device, re.IGNORECASE)
|
||||
matches = {k: v for k, v in services.items() if pattern.search(k)}
|
||||
|
||||
if args.match_property:
|
||||
matches = {k: v for k, v in matches.items() if any(args.match_property.lower() in (v['properties'] or {}).get(pk, '').lower() for pk in (v['properties'] or {}))}
|
||||
|
||||
if matches:
|
||||
print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:")
|
||||
for name, info in matches.items():
|
||||
proto_clean = info['type'].split('.')[0].strip('_')
|
||||
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]")
|
||||
sys.exit(0)
|
||||
elif services:
|
||||
print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:")
|
||||
for name, info in services.items():
|
||||
proto_clean = info['type'].split('.')[0].strip('_')
|
||||
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}]")
|
||||
sys.exit(1)
|
||||
else:
|
||||
print("CRITICAL: Nie wykryto żadnych urządzeń mDNS.")
|
||||
sys.exit(2)
|
||||
except Exception as e:
|
||||
print(f"UNKNOWN: Błąd podczas skanowania mDNS - {e}")
|
||||
sys.exit(3)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Reference in New Issue
Block a user