#!/usr/bin/env python3 import argparse import subprocess import sys import platform import re import time import signal # Fail-safe timeout def set_fail_safe(seconds): def handler(signum, frame): print("PING UNKNOWN - Max runtime exceeded") sys.exit(3) signal.signal(signal.SIGALRM, handler) signal.alarm(seconds) def parse_args(): parser = argparse.ArgumentParser(description="check_ping replacement with delayed alerting.") parser.add_argument("-H", "--host", required=True, help="Host address") parser.add_argument("-w", "--warning", required=True, help="Warning threshold: rta,pl%") parser.add_argument("-c", "--critical", required=True, help="Critical threshold: rta,pl%") parser.add_argument("-p", "--packets", type=int, default=5, help="Number of packets to send (default: 5)") parser.add_argument("-t", "--timeout", type=int, default=5, help="Timeout per packet in seconds (default: 5)") parser.add_argument("-4", "--ipv4", dest="ipv4", action="store_true", help="Force IPv4") parser.add_argument("-6", "--ipv6", dest="ipv6", action="store_true", help="Force IPv6") parser.add_argument("--fails", "-f", type=int, default=3, help="Number of consecutive failures before alerting (default: 3)") parser.add_argument("--delay", "-d", type=int, default=10, help="Delay in seconds between retries (default: 10)") parser.add_argument("--max-runtime", "-m", type=int, default=60, help="Max runtime in seconds before fail-safe exit (default: 60)") parser.add_argument("--interface", "-i", help="Use specific network interface (e.g., eth0)") return parser.parse_args() def run_ping(host, count, timeout, ipv4, ipv6, interface): cmd = ["ping", "-c", str(count), "-W", str(timeout)] if ipv6: cmd[0] = "ping6" elif ipv4: cmd.insert(1, "-4") if interface: cmd.extend(["-I", interface]) cmd.append(host) result = subprocess.run(cmd, capture_output=True, text=True) return result.stdout def parse_ping_output(output): loss_match = re.search(r"(\d+(?:\.\d+)?)% packet loss", output) rtt_match = re.search(r"rtt [^=]*= ([\d\.]+)/", output) if not loss_match or not rtt_match: return None, None return float(rtt_match.group(1)), float(loss_match.group(1)) def check_thresholds(rta, loss, warn, crit): wrta, wpl = map(lambda x: float(x.strip('%')), warn.split(",")) crta, cpl = map(lambda x: float(x.strip('%')), crit.split(",")) if loss >= cpl or rta >= crta: return 2 elif loss >= wpl or rta >= wrta: return 1 return 0 def status_text(code): return ["OK", "WARNING", "CRITICAL", "UNKNOWN"][code] def main(): args = parse_args() set_fail_safe(args.max_runtime) fail_count = 0 while True: output = run_ping(args.host, args.packets, args.timeout, args.ipv4, args.ipv6, args.interface) rta, loss = parse_ping_output(output) if rta is None or loss is None: print("PING UNKNOWN - Parsing error") sys.exit(3) status = check_thresholds(rta, loss, args.warning, args.critical) perf = f"| rta={rta}ms;{args.warning.split(',')[0]};{args.critical.split(',')[0]} loss={loss}%;{args.warning.split(',')[1].strip('%')};{args.critical.split(',')[1].strip('%')}" if status == 0: fail_count = 0 print(f"PING OK - rta={rta}ms loss={loss}% {perf}") sys.exit(0) else: fail_count += 1 print(f"Temporary issue ({fail_count}/{args.fails}) - rta={rta}ms loss={loss}%") if fail_count >= args.fails: print(f"PING {status_text(status)} - rta={rta}ms loss={loss}% {perf}") sys.exit(status) time.sleep(args.delay) if __name__ == "__main__": main()