Update scan.py

This commit is contained in:
gru 2025-05-06 11:02:35 +02:00
parent 48fd8d0cad
commit 5eb9ef8cc1

107
scan.py
View File

@ -1,21 +1,23 @@
import ipaddress import ipaddress
import subprocess import subprocess
import argparse import argparse
import csv
import json
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
# Lista OID-ów do sprawdzenia (etykieta => OID) # Domyślny OID tylko sysName
oids = { default_oids = {
"sysName": "1.3.6.1.2.1.1.5.0", "sysName": "1.3.6.1.2.1.1.5.0",
"sysDescr": "1.3.6.1.2.1.1.1.0", #"sysDescr": "1.3.6.1.2.1.1.1.0",
"sysLocation": "1.3.6.1.2.1.1.6.0", #"sysLocation": "1.3.6.1.2.1.1.6.0",
"sysContact": "1.3.6.1.2.1.1.4.0" #"sysContact": "1.3.6.1.2.1.1.4.0"
} }
def get_oid_value(ip, community, oid): def get_oid_value(ip, community, oid, timeout=1):
try: try:
result = subprocess.run( result = subprocess.run(
["snmpget", "-v2c", "-c", community, str(ip), oid], ["snmpget", "-v2c", "-c", community, str(ip), oid],
capture_output=True, text=True, timeout=2 capture_output=True, text=True, timeout=timeout
) )
if result.returncode == 0 and "STRING" in result.stdout: if result.returncode == 0 and "STRING" in result.stdout:
return result.stdout.split("STRING:")[-1].strip() return result.stdout.split("STRING:")[-1].strip()
@ -23,36 +25,91 @@ def get_oid_value(ip, community, oid):
return "Timeout" return "Timeout"
return "Brak danych" return "Brak danych"
def query_all_oids(ip, community): def query_all_oids(ip, community, oids, timeout=1):
values = {name: get_oid_value(ip, community, oid) for name, oid in oids.items()} values = {name: get_oid_value(ip, community, oid, timeout) for name, oid in oids.items()}
has_data = any(val not in ("Timeout", "Brak danych") for val in values.values()) has_data = any(val not in ("Timeout", "Brak danych") for val in values.values())
if has_data: if has_data:
return (str(ip), values) return (str(ip), values)
return None return None
def scan_subnet(subnet, community, oids):
results = []
for ip in subnet.hosts():
result = query_all_oids(ip, community, oids, timeout=1)
if result:
results.append(result)
return results
def split_subnet(supernet, new_prefix=24):
try:
network = ipaddress.IPv4Network(supernet)
return list(network.subnets(new_prefix=new_prefix))
except ValueError as e:
print(f"Błąd podsieci: {e}")
return []
def read_subnets_from_file(filename):
subnets = []
try:
with open(filename, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
subnets.append(line)
except Exception as e:
print(f"Błąd podczas wczytywania pliku podsieci: {e}")
return subnets
def save_to_csv(results, oids, filename="wyniki.csv"):
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
headers = ["IP"] + list(oids.keys())
writer.writerow(headers)
for ip, values in results:
row = [ip] + [values.get(key, "") for key in oids.keys()]
writer.writerow(row)
def main(): def main():
parser = argparse.ArgumentParser(description="Skaner SNMP dla wielu podsieci.") parser = argparse.ArgumentParser(description="Skaner SNMP z podziałem podsieci i eksportem do CSV.")
parser.add_argument("subnets", nargs="+", help="Podsieci w formacie CIDR (np. 10.1.1.0/24 10.2.2.0/24)") parser.add_argument("subnets", nargs="*", help="Podsieci w formacie CIDR (np. 172.16.0.0/16)")
parser.add_argument("-s", "--subnet-file", help="Plik tekstowy z listą podsieci CIDR (jedna na linię)")
parser.add_argument("-c", "--community", default="public", help="SNMP community (domyślnie: public)") parser.add_argument("-c", "--community", default="public", help="SNMP community (domyślnie: public)")
parser.add_argument("-o", "--oids", help="OID-y w formacie JSON, np. '{\"sysDescr\":\"1.3.6.1.2.1.1.1.0\"}'")
parser.add_argument("-w", "--workers", type=int, default=10, help="Liczba równoległych wątków (domyślnie: 10)")
parser.add_argument("-p", "--prefix", type=int, default=24, help="Wielkość podsieci do podziału (domyślnie: 24)")
parser.add_argument("-f", "--file", default="wyniki.csv", help="Nazwa pliku CSV do zapisu (domyślnie: wyniki.csv)")
args = parser.parse_args() args = parser.parse_args()
ips = [] oids = json.loads(args.oids) if args.oids else default_oids
for subnet in args.subnets:
try:
ips.extend(ipaddress.IPv4Network(subnet))
except ValueError:
print(f"Błędny format podsieci: {subnet}")
print(f"{'IP':<15} {'sysName':<25} {'sysDescr':<40} {'sysLocation':<25} {'sysContact'}") all_input_subnets = list(args.subnets)
print("-" * 130) if args.subnet_file:
all_input_subnets.extend(read_subnets_from_file(args.subnet_file))
with ThreadPoolExecutor(max_workers=6) as executor: if not all_input_subnets:
futures = {executor.submit(query_all_oids, ip, args.community): ip for ip in ips} print("Brak podsieci do przeskanowania (ani z linii poleceń, ani z pliku).")
return
subnets_to_scan = []
for subnet in all_input_subnets:
subnets_to_scan.extend(split_subnet(subnet, new_prefix=args.prefix))
print(f"{'IP':<15} " + " ".join([f"{name:<25}" for name in oids.keys()]))
print("-" * (15 + 26 * len(oids)))
all_results = []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {
executor.submit(scan_subnet, subnet, args.community, oids): subnet
for subnet in subnets_to_scan
}
for future in as_completed(futures): for future in as_completed(futures):
result = future.result() subnet_results = future.result()
if result: for ip_str, values in subnet_results:
ip_str, values = result all_results.append((ip_str, values))
print(f"{ip_str:<15} {values['sysName']:<25} {values['sysDescr']:<40} {values['sysLocation']:<25} {values['sysContact']}") print(f"{ip_str:<15} " + " ".join([f"{values[name]:<25}" for name in oids.keys()]))
save_to_csv(all_results, oids, filename=args.file)
if __name__ == "__main__": if __name__ == "__main__":
main() main()