#!/usr/bin/env python3 import argparse import requests import sys from requests.auth import HTTPDigestAuth def get_input_status(host, port, user, password, input_uuid, timeout): url = f"http://{host}:{port}/api/status/inputs" try: r = requests.get(url, auth=HTTPDigestAuth(user, password), timeout=timeout) if r.status_code != 200: print(f"CRITICAL - API HTTP error: {r.status_code} {r.text}") sys.exit(2) try: data = r.json() except Exception as e: print(f"CRITICAL - API returned invalid JSON: {e} | Response: {r.text}") sys.exit(2) # Obsługa odpowiedzi jako dict z kluczem 'entries' if isinstance(data, dict) and "entries" in data: entries = data["entries"] elif isinstance(data, list): entries = data else: print(f"CRITICAL - Unexpected API response type: {type(data)} | Response: {data}") sys.exit(2) for entry in entries: if entry.get("uuid") == input_uuid: return entry print("CRITICAL - Input (tuner/mux) not found in API response") sys.exit(2) except Exception as e: print(f"CRITICAL - API error: {e}") sys.exit(2) def main(): parser = argparse.ArgumentParser(description="Nagios plugin for TVHeadend DVB-T advanced monitoring (Digest Auth)") parser.add_argument("--host", required=True) parser.add_argument("--port", type=int, default=9981) parser.add_argument("--user", required=True) parser.add_argument("--password", required=True) parser.add_argument("--input_uuid", required=True, help="UUID tunera lub muxa (sprawdź przez API TVHeadend)") parser.add_argument("--timeout", type=int, default=10) parser.add_argument("--warning_ber", type=float, default=1e-5) parser.add_argument("--critical_ber", type=float, default=1e-4) parser.add_argument("--warning_unc", type=int, default=10) parser.add_argument("--critical_unc", type=int, default=100) args = parser.parse_args() status = get_input_status(args.host, args.port, args.user, args.password, args.input_uuid, args.timeout) if not status: print("CRITICAL - Input (tuner/mux) not found or API error") sys.exit(2) # Pobieranie parametrów signal = status.get("signal") snr = status.get("snr") ber = status.get("ber") mer = status.get("mer") ec_bit = status.get("ec_bit") ec_block = status.get("ec_block") unc = status.get("unc") cc = status.get("cc") # Ustalanie statusu exit_code = 0 messages = [] if ber is not None: if ber >= args.critical_ber: exit_code = 2 messages.append(f"CRITICAL BER={ber}") elif ber >= args.warning_ber: exit_code = max(exit_code, 1) messages.append(f"WARNING BER={ber}") else: messages.append(f"OK BER={ber}") if unc is not None: if unc >= args.critical_unc: exit_code = 2 messages.append(f"CRITICAL UNC={unc}") elif unc >= args.warning_unc: exit_code = max(exit_code, 1) messages.append(f"WARNING UNC={unc}") else: messages.append(f"OK UNC={unc}") # Dodaj inne parametry do komunikatu messages.append(f"Signal={signal}") messages.append(f"SNR={snr}") if mer is not None: messages.append(f"MER={mer}") if ec_bit is not None: messages.append(f"EC_BIT={ec_bit}") if ec_block is not None: messages.append(f"EC_BLOCK={ec_block}") if cc is not None: messages.append(f"CC={cc}") # Komunikat końcowy status_text = ["OK", "WARNING", "CRITICAL"][exit_code] print(f"{status_text} - " + " | ".join(messages)) sys.exit(exit_code) if __name__ == "__main__": main()