diff --git a/tvheadend_advenced.py b/tvheadend_advenced.py new file mode 100644 index 0000000..46e450c --- /dev/null +++ b/tvheadend_advenced.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +import argparse +import requests +import sys +from requests.auth import HTTPDigestAuth + +def get_input_status(host, port, user, password, input_uuid, timeout): + url = f"http://{host}:{port}/api/status/inputs" + try: + r = requests.get(url, auth=HTTPDigestAuth(user, password), timeout=timeout) + if r.status_code != 200: + print(f"CRITICAL - API HTTP error: {r.status_code} {r.text}") + sys.exit(2) + try: + data = r.json() + except Exception as e: + print(f"CRITICAL - API returned invalid JSON: {e} | Response: {r.text}") + sys.exit(2) + # Obsługa odpowiedzi jako dict z kluczem 'entries' + if isinstance(data, dict) and "entries" in data: + entries = data["entries"] + elif isinstance(data, list): + entries = data + else: + print(f"CRITICAL - Unexpected API response type: {type(data)} | Response: {data}") + sys.exit(2) + for entry in entries: + if entry.get("uuid") == input_uuid: + return entry + print("CRITICAL - Input (tuner/mux) not found in API response") + sys.exit(2) + except Exception as e: + print(f"CRITICAL - API error: {e}") + sys.exit(2) + +def main(): + parser = argparse.ArgumentParser(description="Nagios plugin for TVHeadend DVB-T advanced monitoring (Digest Auth)") + parser.add_argument("--host", required=True) + parser.add_argument("--port", type=int, default=9981) + parser.add_argument("--user", required=True) + parser.add_argument("--password", required=True) + parser.add_argument("--input_uuid", required=True, help="UUID tunera lub muxa (sprawdź przez API TVHeadend)") + parser.add_argument("--timeout", type=int, default=10) + parser.add_argument("--warning_ber", type=float, default=1e-5) + parser.add_argument("--critical_ber", type=float, default=1e-4) + parser.add_argument("--warning_unc", type=int, default=10) + parser.add_argument("--critical_unc", type=int, default=100) + args = parser.parse_args() + + status = get_input_status(args.host, args.port, args.user, args.password, args.input_uuid, args.timeout) + if not status: + print("CRITICAL - Input (tuner/mux) not found or API error") + sys.exit(2) + + # Pobieranie parametrów + signal = status.get("signal") + snr = status.get("snr") + ber = status.get("ber") + mer = status.get("mer") + ec_bit = status.get("ec_bit") + ec_block = status.get("ec_block") + unc = status.get("unc") + cc = status.get("cc") + + # Ustalanie statusu + exit_code = 0 + messages = [] + + if ber is not None: + if ber >= args.critical_ber: + exit_code = 2 + messages.append(f"CRITICAL BER={ber}") + elif ber >= args.warning_ber: + exit_code = max(exit_code, 1) + messages.append(f"WARNING BER={ber}") + else: + messages.append(f"OK BER={ber}") + if unc is not None: + if unc >= args.critical_unc: + exit_code = 2 + messages.append(f"CRITICAL UNC={unc}") + elif unc >= args.warning_unc: + exit_code = max(exit_code, 1) + messages.append(f"WARNING UNC={unc}") + else: + messages.append(f"OK UNC={unc}") + + # Dodaj inne parametry do komunikatu + messages.append(f"Signal={signal}") + messages.append(f"SNR={snr}") + if mer is not None: + messages.append(f"MER={mer}") + if ec_bit is not None: + messages.append(f"EC_BIT={ec_bit}") + if ec_block is not None: + messages.append(f"EC_BLOCK={ec_block}") + if cc is not None: + messages.append(f"CC={cc}") + + # Komunikat końcowy + status_text = ["OK", "WARNING", "CRITICAL"][exit_code] + print(f"{status_text} - " + " | ".join(messages)) + sys.exit(exit_code) + +if __name__ == "__main__": + main() \ No newline at end of file