Files
nagios-plugins/check_ping.py
2025-05-26 10:27:01 +02:00

100 lines
3.6 KiB
Python

#!/usr/bin/env python3
import argparse
import subprocess
import sys
import platform
import re
import time
import signal
# Fail-safe timeout
def set_fail_safe(seconds):
def handler(signum, frame):
print("PING UNKNOWN - Max runtime exceeded")
sys.exit(3)
signal.signal(signal.SIGALRM, handler)
signal.alarm(seconds)
def parse_args():
parser = argparse.ArgumentParser(description="check_ping replacement with delayed alerting.")
parser.add_argument("-H", required=True, help="Host address")
parser.add_argument("-w", required=True, help="Warning threshold: rta,pl%")
parser.add_argument("-c", required=True, help="Critical threshold: rta,pl%")
parser.add_argument("-p", type=int, default=5, help="Number of packets to send (default: 5)")
parser.add_argument("-t", type=int, default=5, help="Timeout per packet in seconds (default: 5)")
parser.add_argument("-4", dest="ipv4", action="store_true", help="Force IPv4")
parser.add_argument("-6", dest="ipv6", action="store_true", help="Force IPv6")
parser.add_argument("--fails", type=int, default=3, help="Number of consecutive failures before alerting (default: 3)")
parser.add_argument("--delay", type=int, default=10, help="Delay in seconds between retries (default: 10)")
parser.add_argument("--max-runtime", type=int, default=60, help="Max runtime in seconds before fail-safe exit (default: 60)")
parser.add_argument("--interface", help="Use specific network interface (e.g., eth0)")
return parser.parse_args()
def run_ping(host, count, timeout, ipv4, ipv6, interface):
cmd = ["ping", "-c", str(count), "-W", str(timeout)]
if ipv6:
cmd[0] = "ping6"
elif ipv4:
cmd.insert(1, "-4")
if interface:
cmd.extend(["-I", interface])
cmd.append(host)
result = subprocess.run(cmd, capture_output=True, text=True)
return result.stdout
def parse_ping_output(output):
loss_match = re.search(r"(\d+(?:\.\d+)?)% packet loss", output)
rtt_match = re.search(r"rtt [^=]*= ([\d\.]+)/", output)
if not loss_match or not rtt_match:
return None, None
return float(rtt_match.group(1)), float(loss_match.group(1))
def check_thresholds(rta, loss, warn, crit):
wrta, wpl = map(lambda x: float(x.strip('%')), warn.split(","))
crta, cpl = map(lambda x: float(x.strip('%')), crit.split(","))
if loss >= cpl or rta >= crta:
return 2
elif loss >= wpl or rta >= wrta:
return 1
return 0
def status_text(code):
return ["OK", "WARNING", "CRITICAL", "UNKNOWN"][code]
def main():
args = parse_args()
set_fail_safe(args.max_runtime)
fail_count = 0
while True:
output = run_ping(args.H, args.p, args.t, args.ipv4, args.ipv6, args.interface)
rta, loss = parse_ping_output(output)
if rta is None or loss is None:
print("PING UNKNOWN - Parsing error")
sys.exit(3)
status = check_thresholds(rta, loss, args.w, args.c)
perf = f"| rta={rta}ms;{args.w.split(',')[0]};{args.c.split(',')[0]} loss={loss}%;{args.w.split(',')[1].strip('%')};{args.c.split(',')[1].strip('%')}"
if status == 0:
fail_count = 0
print(f"PING OK - rta={rta}ms loss={loss}% {perf}")
sys.exit(0)
else:
fail_count += 1
print(f"Temporary issue ({fail_count}/{args.fails}) - rta={rta}ms loss={loss}%")
if fail_count >= args.fails:
print(f"PING {status_text(status)} - rta={rta}ms loss={loss}% {perf}")
sys.exit(status)
time.sleep(args.delay)
if __name__ == "__main__":
main()