From 37698e935ac81e64372641a498ec205ff0f9ce48 Mon Sep 17 00:00:00 2001 From: gru Date: Fri, 23 May 2025 10:09:59 +0200 Subject: [PATCH] Update check_mdns.py --- check_mdns.py | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/check_mdns.py b/check_mdns.py index 51cb49d..8cb6327 100644 --- a/check_mdns.py +++ b/check_mdns.py @@ -8,6 +8,7 @@ import os import json import csv import re +import http.client # Weryfikacja zale偶no艣ci REQUIRED_MODULES = { @@ -82,6 +83,15 @@ def resolve_service(zeroconf, name, type_): } return None +def test_service(ip, port): + try: + conn = http.client.HTTPConnection(ip, port, timeout=5) + conn.request("GET", "/") + response = conn.getresponse() + return response.status == 200 + except Exception: + return False + def scan_mdns_services(timeout=5, interface=None, filter_type=None): zeroconf = get_zeroconf_instance(interface) listener = MDNSListener() @@ -98,19 +108,6 @@ def scan_mdns_services(timeout=5, interface=None, filter_type=None): finally: zeroconf.close() -def print_scan_report(services, script_path, interface=None): - print("\n馃摗 Wykryte urz膮dzenia mDNS:\n") - for name, info in services.items(): - print(f" - {name} [protoko艂: {info['type'].split('.')[0].strip('_')}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczeg贸艂y: {info['properties'] if info['properties'] else 'brak'}]") - - print("\n馃敡 Propozycje komend do u偶ycia z Nagiosem:\n") - for name, info in services.items(): - safe_device = name.split('.')[0].replace('"', '\\"') - cmd = f"{script_path} --device \"{safe_device}\"" - if interface: - cmd += f" --interface {interface}" - print(f" {cmd}") - def export_services(services, format): if format == 'json': with open('mdns_services.json', 'w') as f: @@ -125,7 +122,7 @@ def export_services(services, format): print("Eksportowano do mdns_services.csv") def main(): - parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urz膮dze艅 mDNS (Bonjour, IPP, AirPrint) z obs艂ug膮 interfejs贸w, eksportu, regex贸w, szczeg贸艂贸w us艂ug i filtrowania.") + parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urz膮dze艅 mDNS (Bonjour, IPP, AirPrint) z testem dzia艂ania, eksportem, regexami, filtrowaniem i pe艂nym podgl膮dem us艂ug.") parser.add_argument("--device", help="Fragment lub regex nazwy urz膮dzenia do wykrycia (np. 'printer|tv')") parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)") parser.add_argument("--scan", action="store_true", help="Skanuj sie膰 i wypisz dost臋pne urz膮dzenia z propozycjami komend") @@ -134,6 +131,7 @@ def main(): parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po okre艣lonym typie us艂ugi (np. _ipp._tcp.local.)") parser.add_argument("--match-property", help="Sprawd藕 czy w rekordach TXT znajduje si臋 podana fraza (np. 'UUID')") parser.add_argument("--service-mode", action="store_true", help="Tryb ci膮g艂y - nas艂uchiwanie urz膮dze艅 i logowanie ich w czasie rzeczywistym") + parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj dzia艂anie us艂ug: ipp, http lub all") args = parser.parse_args() @@ -141,12 +139,18 @@ def main(): print("Tryb serwisowy nie zosta艂 jeszcze zaimplementowany.") sys.exit(3) + def should_test(proto): + if args.test == 'all': return proto in ['ipp', 'http'] + return args.test == proto + if args.scan: try: services = scan_mdns_services(args.timeout, args.interface, args.filter_type) if services: - script_path = os.path.realpath(__file__) - print_scan_report(services, script_path, args.interface) + for name, info in services.items(): + proto_clean = info['type'].split('.')[0].strip('_') + test_status = "OK" if should_test(proto_clean) and test_service(info['ip'], info['port']) else "NIE" if args.test and should_test(proto_clean) else "-" + print(f" - {name} [protoko艂: {proto_clean}, IP: {info['ip']}, port: {info['port']}, test: {test_status}]") if args.export: export_services(services, args.export) sys.exit(0) @@ -173,7 +177,8 @@ def main(): print(f"OK: Urz膮dzenie pasuj膮ce do '{args.device}' znalezione w mDNS:") for name, info in matches.items(): proto_clean = info['type'].split('.')[0].strip('_') - print(f" - {name} [protoko艂: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczeg贸艂y: {info['properties'] if info['properties'] else 'brak'}]") + test_status = "OK" if should_test(proto_clean) and test_service(info['ip'], info['port']) else "NIE" if args.test and should_test(proto_clean) else "-" + print(f" - {name} [protoko艂: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczeg贸艂y: {info['properties'] if info['properties'] else 'brak'}]") sys.exit(0) elif services: print(f"WARNING: Nie znaleziono pasuj膮cych urz膮dze艅 do '{args.device}'. Wykryto inne:") @@ -189,4 +194,4 @@ def main(): sys.exit(3) if __name__ == "__main__": - main() + main() \ No newline at end of file