#!/usr/bin/env python3 """ SSH Updater =========== Czyta listę hostów z pliku INI i przez SSH: 1) wykrywa dystrybucję Linuksa, 2) wykonuje update/upgrade odpowiednim menedżerem pakietów, 3) czytelnie pokazuje postęp, 4) sprząta po aktualizacji (np. apt-get clean), 5) opcjonalnie działa równolegle na wielu hostach (--parallel). Wspierane: Debian/Ubuntu (apt), openSUSE (zypper), Arch Linux (pacman), RHEL/CentOS/Alma/Rocky (dnf/yum). Przykładowy plik hosts.ini: [global] ; domyślne ustawienia dla wszystkich hostów (opcjonalne) user = root port = 22 password = supersecret keyfile = /home/me/.ssh/id_rsa use_sudo = false [server1] host = 192.0.2.10 user = ubuntu port = 22 keyfile = /home/me/.ssh/id_rsa use_sudo = true [server2] host = example.org ; odziedziczy user/port/hasło z [global], chyba że nadpiszesz je poniżej Uwaga: możesz pominąć "password" i skorzystać z kluczy SSH. Jeśli "use_sudo = true" i konto nie ma NOPASSWD, podaj --ask-sudo przy uruchomieniu. Uruchomienie: python ssh_updater.py --hosts hosts.ini --ask-sudo Równoległość (np. 8 jednocześnie): python ssh_updater.py --hosts hosts.ini --parallel 8 Zasada dziedziczenia: - wartości z sekcji hosta **nadpisują** globalne, ale tylko jeśli są **podane i niepuste**, - pusta wartość (np. `port =`) oznacza „użyj globalnej”. Wymagania: pip install paramiko rich """ from __future__ import annotations import argparse import configparser import getpass import re import sys from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Optional, Tuple import paramiko from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from rich.table import Table console = Console() SUPPORTED_IDS = { "debian": "apt", "ubuntu": "apt", "opensuse": "zypper", "opensuse-leap": "zypper", "opensuse-tumbleweed": "zypper", "sles": "zypper", "arch": "pacman", "rhel": "dnf", "centos": "dnf", "rocky": "dnf", "almalinux": "dnf", } PKG_COMMANDS = { "apt": [ "apt-get update", "DEBIAN_FRONTEND=noninteractive apt-get -y upgrade", "DEBIAN_FRONTEND=noninteractive apt-get -y autoremove", "apt-get clean", ], "zypper": [ "zypper --non-interactive refresh", "zypper --non-interactive update", "zypper clean --all", ], "pacman": [ "pacman -Syu --noconfirm", "pacman -Sc --noconfirm", ], "dnf": [ "dnf -y update", "dnf -y autoremove", "dnf clean all", ], } @dataclass class Host: name: str host: str user: str port: int = 22 keyfile: Optional[str] = None password: Optional[str] = None use_sudo: bool = True def parse_hosts(path: Path) -> list[Host]: cfg = configparser.ConfigParser() with path.open() as f: cfg.read_file(f) def _get_default(key: str, fallback: Optional[str] = None) -> Optional[str]: for sect in ("global", "ssh"): if cfg.has_section(sect) and cfg.has_option(sect, key): val = cfg.get(sect, key) if val is not None and str(val).strip() != "": return val val = cfg.defaults().get(key, fallback) if val is not None and str(val).strip() == "": return fallback return val def _get_opt(section: str, key: str) -> Optional[str]: if cfg.has_option(section, key): val = cfg.get(section, key) if val is not None and str(val).strip() != "": return val return None global_user = _get_default("user", fallback="root") try: global_port = int(_get_default("port", fallback="22")) except (TypeError, ValueError): global_port = 22 global_keyfile = _get_default("keyfile") global_password = _get_default("password") global_use_sudo_str = _get_default("use_sudo", fallback="true") or "true" global_use_sudo = str(global_use_sudo_str).strip().lower() in ("1", "true", "yes", "on") hosts: list[Host] = [] for section in cfg.sections(): if section.lower() in {"ssh", "global", "defaults"}: continue host_addr = cfg.get(section, "host") user = _get_opt(section, "user") or global_user port_opt = _get_opt(section, "port") if port_opt is not None: try: port = int(port_opt) except ValueError: port = global_port else: port = global_port keyfile = _get_opt(section, "keyfile") or global_keyfile password = _get_opt(section, "password") or global_password use_sudo_opt = _get_opt(section, "use_sudo") if use_sudo_opt is not None: use_sudo = str(use_sudo_opt).strip().lower() in ("1", "true", "yes", "on") else: use_sudo = global_use_sudo hosts.append(Host(section, host_addr, user, port, keyfile, password, use_sudo)) return hosts def connect(h: Host) -> paramiko.SSHClient: client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: client.connect( h.host, port=h.port, username=h.user, key_filename=h.keyfile, password=h.password, look_for_keys=True, allow_agent=True, timeout=20, ) return client except Exception as e: raise RuntimeError(f"[{h.name}] Błąd połączenia: {e}") def run(client: paramiko.SSHClient, command: str, sudo_password: Optional[str] = None) -> Tuple[int, str, str]: """Uruchom zdalne polecenie. Jeśli w komendzie jest 'sudo', a podano hasło, wyślij je na stdin.""" stdin, stdout, stderr = client.exec_command(command, get_pty=True) if sudo_password and re.search(r"\\bsudo\\b", command): try: stdin.write(sudo_password + "\n") stdin.flush() except Exception: pass exit_status = stdout.channel.recv_exit_status() out = stdout.read().decode(errors="ignore") err = stderr.read().decode(errors="ignore") return exit_status, out, err def has_sudo(client: paramiko.SSHClient) -> bool: code, out, _ = run(client, "command -v sudo >/dev/null 2>&1; echo $?") try: return int(out.strip().splitlines()[-1]) == 0 except Exception: return False def detect_os(client: paramiko.SSHClient) -> Tuple[str, str]: code, out, err = run(client, "cat /etc/os-release || uname -a") if code != 0: raise RuntimeError(f"Nie można odczytać /etc/os-release: {err}") os_id = "" pretty = "" for line in out.splitlines(): if line.startswith("ID="): os_id = line.split("=", 1)[1].strip().strip('"') if line.startswith("PRETTY_NAME="): pretty = line.split("=", 1)[1].strip().strip('"') if not os_id: for line in out.splitlines(): if line.startswith("NAME="): os_id = line.split("=", 1)[1].strip().strip('"').lower() if not pretty: pretty = os_id return os_id, pretty def pkg_manager(os_id: str) -> Optional[str]: os_id = os_id.lower() if os_id in SUPPORTED_IDS: return SUPPORTED_IDS[os_id] if any(k in os_id for k in ["debian", "ubuntu"]): return "apt" if "suse" in os_id: return "zypper" if "arch" in os_id: return "pacman" return None def upgrade_host(h: Host, ask_sudo_pw: Optional[str]) -> Tuple[str, str]: """Zwraca (status, message).""" try: client = connect(h) except Exception as e: return ("error", str(e)) try: os_id, pretty = detect_os(client) mgr = pkg_manager(os_id) if not mgr: return ("skip", f"[{h.name}] Nieobsługiwany system: {pretty} (ID: {os_id})") sudo_available = has_sudo(client) effective_use_sudo = h.use_sudo and h.user != "root" and sudo_available if h.use_sudo and h.user != "root" and not sudo_available: return ("error", f"[{h.name}] Brak polecenia sudo, a użytkownik {h.user} nie jest root. Ustaw use_sudo=false lub użyj konta root.") sudo_pw = ask_sudo_pw if effective_use_sudo else None steps = PKG_COMMANDS[mgr] for step in steps: cmd = f"sudo -S -p '' {step}" if effective_use_sudo else step code, out, err = run(client, cmd, sudo_password=sudo_pw) if code != 0: return ("error", f"[{h.name}] Błąd przy komendzie: {cmd}\n{err or out}") return ("ok", f"[{h.name}] Zaktualizowano: {pretty} ({mgr})") finally: client.close() def main(): parser = argparse.ArgumentParser(description="SSH updater dla wielu hostów z pliku INI") parser.add_argument("--hosts", required=True, help="Ścieżka do pliku INI z hostami") parser.add_argument("--ask-pass", action="store_true", help="Zapytaj o hasło SSH (dla hostów bez klucza)") parser.add_argument("--ask-sudo", action="store_true", help="Zapytaj o hasło do sudo (używane gdy user != root)") parser.add_argument("--parallel", type=int, default=1, help="Liczba hostów aktualizowanych równolegle (domyślnie 1)") args = parser.parse_args() hosts = parse_hosts(Path(args.hosts)) if not hosts: console.print("[red]Brak hostów w pliku.[/red]") sys.exit(1) ssh_pw: Optional[str] = None sudo_pw: Optional[str] = None if args.ask_pass: ssh_pw = getpass.getpass("Hasło SSH: ") for h in hosts: if h.password is None: h.password = ssh_pw if args.ask_sudo: sudo_pw = getpass.getpass("Hasło sudo: ") # (inicjalizacja hostów i haseł została przeniesiona powyżej) table = Table(title="Aktualizacja systemów przez SSH", show_lines=False) table.add_column("Host", style="bold") table.add_column("Adres") table.add_column("Użytkownik") table.add_column("Sudo") for h in hosts: table.add_row(h.name, h.host, h.user, "tak" if h.use_sudo else "nie") console.print(table) results: list[Tuple[str, str]] = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TimeElapsedColumn(), transient=True, ) as progress: task = progress.add_task("Praca…", total=len(hosts)) if args.parallel <= 1: # sekwencyjnie for h in hosts: progress.update(task, description=f"[bold]{h.name}[/bold]: łączenie i aktualizacja…") status, msg = upgrade_host(h, sudo_pw) results.append((status, msg)) progress.advance(task) else: # równolegle max_workers = max(1, args.parallel) with ThreadPoolExecutor(max_workers=max_workers) as ex: future_map = {ex.submit(upgrade_host, h, sudo_pw): h for h in hosts} for fut in as_completed(future_map): h = future_map[fut] progress.update(task, description=f"[bold]{h.name}[/bold]: łączenie i aktualizacja…") try: status, msg = fut.result() except Exception as e: status, msg = ("error", f"[{h.name}] Wyjątek: {e}") results.append((status, msg)) progress.advance(task) # Podsumowanie ok = sum(1 for s, _ in results if s == "ok") skip = sum(1 for s, _ in results if s == "skip") err = [m for s, m in results if s == "error"] summary = Table(title="Podsumowanie", show_lines=False) summary.add_column("OK") summary.add_column("Pominięte") summary.add_column("Błędy") summary.add_row(str(ok), str(skip), str(len(err))) console.print(summary) if err: console.print("[red]Błędy:[/red]") for m in err: console.print(f"• {m}") sys.exit(0 if not err else 2) if __name__ == "__main__": main()