110 lines
4.1 KiB
Python
110 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import requests
|
|
import sys
|
|
from requests.auth import HTTPDigestAuth
|
|
|
|
def get_input_status(host, port, user, password, input_uuid, timeout):
|
|
url = f"http://{host}:{port}/api/status/inputs"
|
|
try:
|
|
r = requests.get(url, auth=HTTPDigestAuth(user, password), timeout=timeout)
|
|
if r.status_code != 200:
|
|
print(f"CRITICAL - API HTTP error: {r.status_code} {r.text}")
|
|
sys.exit(2)
|
|
try:
|
|
data = r.json()
|
|
except Exception as e:
|
|
print(f"CRITICAL - API returned invalid JSON: {e} | Response: {r.text}")
|
|
sys.exit(2)
|
|
if isinstance(data, dict) and "entries" in data:
|
|
entries = data["entries"]
|
|
elif isinstance(data, list):
|
|
entries = data
|
|
else:
|
|
print(f"CRITICAL - Unexpected API response type: {type(data)} | Response: {data}")
|
|
sys.exit(2)
|
|
for entry in entries:
|
|
if entry.get("uuid") == input_uuid:
|
|
return entry
|
|
print("CRITICAL - Input (tuner/mux) not found in API response")
|
|
sys.exit(2)
|
|
except Exception as e:
|
|
print(f"CRITICAL - API error: {e}")
|
|
sys.exit(2)
|
|
|
|
def interpret_status(status, args):
|
|
signal = status.get("signal")
|
|
snr = status.get("snr")
|
|
ber = status.get("ber")
|
|
mer = status.get("mer")
|
|
ec_bit = status.get("ec_bit")
|
|
ec_block = status.get("ec_block")
|
|
unc = status.get("unc")
|
|
cc = status.get("cc")
|
|
|
|
exit_code = 0
|
|
messages = []
|
|
|
|
if ber is not None:
|
|
if ber >= args.critical_ber:
|
|
exit_code = 2
|
|
messages.append(f"CRITICAL BER={ber}")
|
|
elif ber >= args.warning_ber:
|
|
exit_code = max(exit_code, 1)
|
|
messages.append(f"WARNING BER={ber}")
|
|
else:
|
|
messages.append(f"OK BER={ber}")
|
|
if unc is not None:
|
|
if unc >= args.critical_unc:
|
|
exit_code = 2
|
|
messages.append(f"CRITICAL UNC={unc}")
|
|
elif unc >= args.warning_unc:
|
|
exit_code = max(exit_code, 1)
|
|
messages.append(f"WARNING UNC={unc}")
|
|
else:
|
|
messages.append(f"OK UNC={unc}")
|
|
messages.append(f"Signal={signal}")
|
|
messages.append(f"SNR={snr}")
|
|
if mer is not None:
|
|
messages.append(f"MER={mer}")
|
|
if ec_bit is not None:
|
|
messages.append(f"EC_BIT={ec_bit}")
|
|
if ec_block is not None:
|
|
messages.append(f"EC_BLOCK={ec_block}")
|
|
if cc is not None:
|
|
messages.append(f"CC={cc}")
|
|
return exit_code, messages
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Nagios plugin for TVHeadend DVB-T advanced monitoring (Digest Auth, multi-card, optional second card)")
|
|
parser.add_argument("--host", required=True)
|
|
parser.add_argument("--port", type=int, default=9981)
|
|
parser.add_argument("--user", required=True)
|
|
parser.add_argument("--password", required=True)
|
|
parser.add_argument("--input_uuid1", required=True, help="UUID tunera/muxa 1")
|
|
parser.add_argument("--input_uuid2", required=False, help="UUID tunera/muxa 2 (opcjonalnie)")
|
|
parser.add_argument("--timeout", type=int, default=10)
|
|
parser.add_argument("--warning_ber", type=float, default=1e-5)
|
|
parser.add_argument("--critical_ber", type=float, default=1e-4)
|
|
parser.add_argument("--warning_unc", type=int, default=10)
|
|
parser.add_argument("--critical_unc", type=int, default=100)
|
|
args = parser.parse_args()
|
|
|
|
status1 = get_input_status(args.host, args.port, args.user, args.password, args.input_uuid1, args.timeout)
|
|
code1, msg1 = interpret_status(status1, args)
|
|
|
|
if args.input_uuid2:
|
|
status2 = get_input_status(args.host, args.port, args.user, args.password, args.input_uuid2, args.timeout)
|
|
code2, msg2 = interpret_status(status2, args)
|
|
final_code = max(code1, code2)
|
|
status_text = ["OK", "WARNING", "CRITICAL"][final_code]
|
|
print(f"{status_text} - Karta1: {' | '.join(msg1)} || Karta2: {' | '.join(msg2)}")
|
|
sys.exit(final_code)
|
|
else:
|
|
status_text = ["OK", "WARNING", "CRITICAL"][code1]
|
|
print(f"{status_text} - Karta1: {' | '.join(msg1)}")
|
|
sys.exit(code1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|