Update check_mdns.py
This commit is contained in:
@ -8,6 +8,7 @@ import os
|
|||||||
import json
|
import json
|
||||||
import csv
|
import csv
|
||||||
import re
|
import re
|
||||||
|
import http.client
|
||||||
|
|
||||||
# Weryfikacja zależności
|
# Weryfikacja zależności
|
||||||
REQUIRED_MODULES = {
|
REQUIRED_MODULES = {
|
||||||
@ -82,6 +83,15 @@ def resolve_service(zeroconf, name, type_):
|
|||||||
}
|
}
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def test_service(ip, port):
|
||||||
|
try:
|
||||||
|
conn = http.client.HTTPConnection(ip, port, timeout=5)
|
||||||
|
conn.request("GET", "/")
|
||||||
|
response = conn.getresponse()
|
||||||
|
return response.status == 200
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
def scan_mdns_services(timeout=5, interface=None, filter_type=None):
|
def scan_mdns_services(timeout=5, interface=None, filter_type=None):
|
||||||
zeroconf = get_zeroconf_instance(interface)
|
zeroconf = get_zeroconf_instance(interface)
|
||||||
listener = MDNSListener()
|
listener = MDNSListener()
|
||||||
@ -98,19 +108,6 @@ def scan_mdns_services(timeout=5, interface=None, filter_type=None):
|
|||||||
finally:
|
finally:
|
||||||
zeroconf.close()
|
zeroconf.close()
|
||||||
|
|
||||||
def print_scan_report(services, script_path, interface=None):
|
|
||||||
print("\n📡 Wykryte urządzenia mDNS:\n")
|
|
||||||
for name, info in services.items():
|
|
||||||
print(f" - {name} [protokoł: {info['type'].split('.')[0].strip('_')}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]")
|
|
||||||
|
|
||||||
print("\n🔧 Propozycje komend do użycia z Nagiosem:\n")
|
|
||||||
for name, info in services.items():
|
|
||||||
safe_device = name.split('.')[0].replace('"', '\\"')
|
|
||||||
cmd = f"{script_path} --device \"{safe_device}\""
|
|
||||||
if interface:
|
|
||||||
cmd += f" --interface {interface}"
|
|
||||||
print(f" {cmd}")
|
|
||||||
|
|
||||||
def export_services(services, format):
|
def export_services(services, format):
|
||||||
if format == 'json':
|
if format == 'json':
|
||||||
with open('mdns_services.json', 'w') as f:
|
with open('mdns_services.json', 'w') as f:
|
||||||
@ -125,7 +122,7 @@ def export_services(services, format):
|
|||||||
print("Eksportowano do mdns_services.csv")
|
print("Eksportowano do mdns_services.csv")
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z obsługą interfejsów, eksportu, regexów, szczegółów usług i filtrowania.")
|
parser = argparse.ArgumentParser(description="Skrypt Nagios do wykrywania urządzeń mDNS (Bonjour, IPP, AirPrint) z testem działania, eksportem, regexami, filtrowaniem i pełnym podglądem usług.")
|
||||||
parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')")
|
parser.add_argument("--device", help="Fragment lub regex nazwy urządzenia do wykrycia (np. 'printer|tv')")
|
||||||
parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)")
|
parser.add_argument("--timeout", type=int, default=5, help="Czas oczekiwania na odpowiedzi mDNS (sekundy)")
|
||||||
parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend")
|
parser.add_argument("--scan", action="store_true", help="Skanuj sieć i wypisz dostępne urządzenia z propozycjami komend")
|
||||||
@ -134,6 +131,7 @@ def main():
|
|||||||
parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)")
|
parser.add_argument("--filter-type", choices=[t.strip('.') for t in MDNS_TYPES], help="Filtruj tylko po określonym typie usługi (np. _ipp._tcp.local.)")
|
||||||
parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')")
|
parser.add_argument("--match-property", help="Sprawdź czy w rekordach TXT znajduje się podana fraza (np. 'UUID')")
|
||||||
parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym")
|
parser.add_argument("--service-mode", action="store_true", help="Tryb ciągły - nasłuchiwanie urządzeń i logowanie ich w czasie rzeczywistym")
|
||||||
|
parser.add_argument("--test", choices=['ipp', 'http', 'all'], help="Testuj działanie usług: ipp, http lub all")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
@ -141,12 +139,18 @@ def main():
|
|||||||
print("Tryb serwisowy nie został jeszcze zaimplementowany.")
|
print("Tryb serwisowy nie został jeszcze zaimplementowany.")
|
||||||
sys.exit(3)
|
sys.exit(3)
|
||||||
|
|
||||||
|
def should_test(proto):
|
||||||
|
if args.test == 'all': return proto in ['ipp', 'http']
|
||||||
|
return args.test == proto
|
||||||
|
|
||||||
if args.scan:
|
if args.scan:
|
||||||
try:
|
try:
|
||||||
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
|
services = scan_mdns_services(args.timeout, args.interface, args.filter_type)
|
||||||
if services:
|
if services:
|
||||||
script_path = os.path.realpath(__file__)
|
for name, info in services.items():
|
||||||
print_scan_report(services, script_path, args.interface)
|
proto_clean = info['type'].split('.')[0].strip('_')
|
||||||
|
test_status = "OK" if should_test(proto_clean) and test_service(info['ip'], info['port']) else "NIE" if args.test and should_test(proto_clean) else "-"
|
||||||
|
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, test: {test_status}]")
|
||||||
if args.export:
|
if args.export:
|
||||||
export_services(services, args.export)
|
export_services(services, args.export)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
@ -173,7 +177,8 @@ def main():
|
|||||||
print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:")
|
print(f"OK: Urządzenie pasujące do '{args.device}' znalezione w mDNS:")
|
||||||
for name, info in matches.items():
|
for name, info in matches.items():
|
||||||
proto_clean = info['type'].split('.')[0].strip('_')
|
proto_clean = info['type'].split('.')[0].strip('_')
|
||||||
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]")
|
test_status = "OK" if should_test(proto_clean) and test_service(info['ip'], info['port']) else "NIE" if args.test and should_test(proto_clean) else "-"
|
||||||
|
print(f" - {name} [protokoł: {proto_clean}, IP: {info['ip']}, port: {info['port']}, host: {info['hostname']}, test: {test_status}, szczegóły: {info['properties'] if info['properties'] else 'brak'}]")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
elif services:
|
elif services:
|
||||||
print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:")
|
print(f"WARNING: Nie znaleziono pasujących urządzeń do '{args.device}'. Wykryto inne:")
|
||||||
@ -189,4 +194,4 @@ def main():
|
|||||||
sys.exit(3)
|
sys.exit(3)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
Reference in New Issue
Block a user