import ipaddress import subprocess import argparse import csv import json import time import threading import sys from concurrent.futures import ThreadPoolExecutor, as_completed default_oids = { "sysName": "1.3.6.1.2.1.1.5.0" } class Scanner: def __init__(self): self.stop_event = threading.Event() self.lock = threading.Lock() self.counter = [0] self.results = [] self.last_ip = None self.errors = [0] def get_oid_value(self, ip, community, oid, timeout=1): if self.stop_event.is_set(): return "Przerwano" try: result = subprocess.run( ["snmpget", "-v2c", "-c", community, str(ip), oid], capture_output=True, text=True, timeout=timeout ) if result.returncode == 0 and "STRING" in result.stdout: return result.stdout.split("STRING:")[-1].strip() else: with self.lock: self.errors[0] += 1 except subprocess.TimeoutExpired: with self.lock: self.errors[0] += 1 return "Timeout" except Exception: with self.lock: self.errors[0] += 1 return "Błąd" return "Brak danych" def query_all_oids(self, ip, community, oids, timeout=1): if self.stop_event.is_set(): return None values = {name: self.get_oid_value(ip, community, oid, timeout) for name, oid in oids.items()} has_data = any(val not in ("Timeout", "Brak danych", "Błąd", "Przerwano") for val in values.values()) if has_data: return (str(ip), values) return None def scan_subnet(self, subnet, community, oids, oids_order, live_mode): for ip in subnet.hosts(): if self.stop_event.is_set(): return result = self.query_all_oids(ip, community, oids) with self.lock: self.last_ip = str(ip) self.counter[0] += 1 if result: self.results.append(result) if live_mode: ip_str, values = result row = f"{ip_str:<15} " + " ".join([f"{values.get(k, ''):<25}" for k in oids_order]) sys.stdout.write("\r" + " " * 120 + "\r") sys.stdout.write(row + "\n") sys.stdout.flush() def show_progress(self, total): start_time = time.time() while not self.stop_event.is_set(): with self.lock: done = self.counter[0] last_ip = self.last_ip or "-" errors = self.errors[0] elapsed = time.time() - start_time speed = done / elapsed if elapsed > 0 else 0 avg_time_per_host = elapsed / done if done else 0 eta = (total - done) / speed if speed > 0 else float("inf") eta_str = time.strftime("%H:%M:%S", time.gmtime(eta)) if eta != float("inf") else "N/A" sys.stdout.write( f"\rPostęp: {done}/{total} ({(done/total)*100:.2f}%)" f" | {speed:.2f} hostów/s" f" | Śr. czas/hosta: {avg_time_per_host:.3f} s" f" | Błędy: {errors}" f" | Ostatni IP: {last_ip}" f" | ETA: {eta_str}" ) sys.stdout.flush() time.sleep(0.1) sys.stdout.write("\n") def read_subnets_from_file(filename): subnets = [] try: with open(filename, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: subnets.append(line) except Exception as e: print(f"Błąd podczas wczytywania pliku: {e}") return subnets def split_subnet(supernet, new_prefix=24): try: network = ipaddress.IPv4Network(supernet) return list(network.subnets(new_prefix=new_prefix)) except ValueError as e: print(f"Błąd podsieci: {e}") return [] def save_to_csv(results, oids, filename="wyniki.csv"): with open(filename, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) headers = ["IP"] + list(oids.keys()) writer.writerow(headers) for ip, values in results: row = [ip] + [values.get(key, "") for key in oids.keys()] writer.writerow(row) def sort_results(results, key): return sorted(results, key=lambda x: x[1].get(key, "").lower()) def main(): parser = argparse.ArgumentParser(description="SNMP skaner https://gitea.linuxiarz.pl/gru/snmp_skaner") parser.add_argument("subnets", nargs="*", help="Podsieci w formacie CIDR (np. 192.168.1.0/24)") parser.add_argument("-s", "--subnet-file", help="Plik z listą podsieci (jedna na linię)") parser.add_argument("-c", "--community", default="public", help="SNMP community (domyślnie: public)") parser.add_argument("-o", "--oids", help="OID-y w formacie JSON, np. '{\"sysName\":\"...\"}'") parser.add_argument("-w", "--workers", type=int, default=10, help="Liczba wątków (domyślnie: 10)") parser.add_argument("-p", "--prefix", type=int, default=24, help="Prefix dla podsieci (domyślnie: 24)") parser.add_argument("-f", "--file", default="wyniki.csv", help="Plik CSV wyników") parser.add_argument("--live", action="store_true", help="Tryb wypisywania wyników na żywo") parser.add_argument("--sort", help="Pole OID do sortowania wyników (np. sysName)") args = parser.parse_args() scanner = Scanner() try: oids = json.loads(args.oids) if args.oids else default_oids oids_order = list(oids.keys()) all_subnets = [] if args.subnet_file: all_subnets += read_subnets_from_file(args.subnet_file) if args.subnets: all_subnets += args.subnets if not all_subnets: print("Brak podsieci do skanowania.") return subnets_to_scan = [] for subnet in all_subnets: subnets_to_scan.extend(split_subnet(subnet, args.prefix)) total_ips = sum(1 for subnet in subnets_to_scan for _ in subnet.hosts()) if args.live: print(f"\n{'IP':<15} " + " ".join([f"{name:<25}" for name in oids_order])) print("-" * (15 + 26 * len(oids_order))) progress_thread = threading.Thread(target=scanner.show_progress, args=(total_ips,)) progress_thread.daemon = True progress_thread.start() with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = [] for subnet in subnets_to_scan: if scanner.stop_event.is_set(): break futures.append(executor.submit( scanner.scan_subnet, subnet, args.community, oids, oids_order, args.live )) try: for future in as_completed(futures): future.result() except KeyboardInterrupt: scanner.stop_event.set() print("\nPrzerwano przez użytkownika. Zapisuję dotychczasowe wyniki...") if not args.live: if args.sort and args.sort in oids: scanner.results = sort_results(scanner.results, args.sort) print(f"\n{'IP':<15} " + " ".join([f"{name:<25}" for name in oids_order])) print("-" * (15 + 26 * len(oids_order))) for ip, values in scanner.results: print(f"{ip:<15} " + " ".join([f"{values.get(k, ''):<25}" for k in oids_order])) save_to_csv(scanner.results, oids, filename=args.file) print(f"\nZapisano do pliku: {args.file}") except KeyboardInterrupt: scanner.stop_event.set() print("\nPrzerwano przez użytkownika. Zapisuję dotychczasowe wyniki...") save_to_csv(scanner.results, oids, filename=args.file) except Exception as e: scanner.stop_event.set() print(f"\nWystąpił błąd: {str(e)}") save_to_csv(scanner.results, oids, filename=args.file) if __name__ == "__main__": main()