Add ssh_system_update.py

This commit is contained in:
gru
2025-08-22 15:04:00 +02:00
parent ad7f74ce28
commit 1e265fcec7

375
ssh_system_update.py Normal file
View File

@@ -0,0 +1,375 @@
#!/usr/bin/env python3
"""
SSH Updater
===========
Czyta listę hostów z pliku INI i przez SSH:
1) wykrywa dystrybucję Linuksa,
2) wykonuje update/upgrade odpowiednim menedżerem pakietów,
3) czytelnie pokazuje postęp,
4) sprząta po aktualizacji (np. apt-get clean),
5) opcjonalnie działa równolegle na wielu hostach (--parallel).
Wspierane: Debian/Ubuntu (apt), openSUSE (zypper), Arch Linux (pacman), RHEL/CentOS/Alma/Rocky (dnf/yum).
Przykładowy plik hosts.ini:
[global] ; domyślne ustawienia dla wszystkich hostów (opcjonalne)
user = root
port = 22
password = supersecret
keyfile = /home/me/.ssh/id_rsa
use_sudo = false
[server1]
host = 192.0.2.10
user = ubuntu
port = 22
keyfile = /home/me/.ssh/id_rsa
use_sudo = true
[server2]
host = example.org ; odziedziczy user/port/hasło z [global], chyba że nadpiszesz je poniżej
Uwaga: możesz pominąć "password" i skorzystać z kluczy SSH. Jeśli
"use_sudo = true" i konto nie ma NOPASSWD, podaj --ask-sudo przy uruchomieniu.
Uruchomienie:
python ssh_updater.py --hosts hosts.ini --ask-sudo
Równoległość (np. 8 jednocześnie):
python ssh_updater.py --hosts hosts.ini --parallel 8
Zasada dziedziczenia:
- wartości z sekcji hosta **nadpisują** globalne, ale tylko jeśli są **podane i niepuste**,
- pusta wartość (np. `port =`) oznacza „użyj globalnej”.
Wymagania:
pip install paramiko rich
"""
from __future__ import annotations
import argparse
import configparser
import getpass
import re
import sys
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Optional, Tuple
import paramiko
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from rich.table import Table
console = Console()
SUPPORTED_IDS = {
"debian": "apt",
"ubuntu": "apt",
"opensuse": "zypper",
"opensuse-leap": "zypper",
"opensuse-tumbleweed": "zypper",
"sles": "zypper",
"arch": "pacman",
"rhel": "dnf",
"centos": "dnf",
"rocky": "dnf",
"almalinux": "dnf",
}
PKG_COMMANDS = {
"apt": [
"apt-get update",
"DEBIAN_FRONTEND=noninteractive apt-get -y upgrade",
"DEBIAN_FRONTEND=noninteractive apt-get -y autoremove",
"apt-get clean",
],
"zypper": [
"zypper --non-interactive refresh",
"zypper --non-interactive update",
"zypper clean --all",
],
"pacman": [
"pacman -Syu --noconfirm",
"pacman -Sc --noconfirm",
],
"dnf": [
"dnf -y update",
"dnf -y autoremove",
"dnf clean all",
],
}
@dataclass
class Host:
name: str
host: str
user: str
port: int = 22
keyfile: Optional[str] = None
password: Optional[str] = None
use_sudo: bool = True
def parse_hosts(path: Path) -> list[Host]:
cfg = configparser.ConfigParser()
with path.open() as f:
cfg.read_file(f)
def _get_default(key: str, fallback: Optional[str] = None) -> Optional[str]:
for sect in ("global", "ssh"):
if cfg.has_section(sect) and cfg.has_option(sect, key):
val = cfg.get(sect, key)
if val is not None and str(val).strip() != "":
return val
val = cfg.defaults().get(key, fallback)
if val is not None and str(val).strip() == "":
return fallback
return val
def _get_opt(section: str, key: str) -> Optional[str]:
if cfg.has_option(section, key):
val = cfg.get(section, key)
if val is not None and str(val).strip() != "":
return val
return None
global_user = _get_default("user", fallback="root")
try:
global_port = int(_get_default("port", fallback="22"))
except (TypeError, ValueError):
global_port = 22
global_keyfile = _get_default("keyfile")
global_password = _get_default("password")
global_use_sudo_str = _get_default("use_sudo", fallback="true") or "true"
global_use_sudo = str(global_use_sudo_str).strip().lower() in ("1", "true", "yes", "on")
hosts: list[Host] = []
for section in cfg.sections():
if section.lower() in {"ssh", "global", "defaults"}:
continue
host_addr = cfg.get(section, "host")
user = _get_opt(section, "user") or global_user
port_opt = _get_opt(section, "port")
if port_opt is not None:
try:
port = int(port_opt)
except ValueError:
port = global_port
else:
port = global_port
keyfile = _get_opt(section, "keyfile") or global_keyfile
password = _get_opt(section, "password") or global_password
use_sudo_opt = _get_opt(section, "use_sudo")
if use_sudo_opt is not None:
use_sudo = str(use_sudo_opt).strip().lower() in ("1", "true", "yes", "on")
else:
use_sudo = global_use_sudo
hosts.append(Host(section, host_addr, user, port, keyfile, password, use_sudo))
return hosts
def connect(h: Host) -> paramiko.SSHClient:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
h.host,
port=h.port,
username=h.user,
key_filename=h.keyfile,
password=h.password,
look_for_keys=True,
allow_agent=True,
timeout=20,
)
return client
except Exception as e:
raise RuntimeError(f"[{h.name}] Błąd połączenia: {e}")
def run(client: paramiko.SSHClient, command: str, sudo_password: Optional[str] = None) -> Tuple[int, str, str]:
"""Uruchom zdalne polecenie. Jeśli w komendzie jest 'sudo', a podano hasło, wyślij je na stdin."""
stdin, stdout, stderr = client.exec_command(command, get_pty=True)
if sudo_password and re.search(r"\\bsudo\\b", command):
try:
stdin.write(sudo_password + "\n")
stdin.flush()
except Exception:
pass
exit_status = stdout.channel.recv_exit_status()
out = stdout.read().decode(errors="ignore")
err = stderr.read().decode(errors="ignore")
return exit_status, out, err
def has_sudo(client: paramiko.SSHClient) -> bool:
code, out, _ = run(client, "command -v sudo >/dev/null 2>&1; echo $?")
try:
return int(out.strip().splitlines()[-1]) == 0
except Exception:
return False
def detect_os(client: paramiko.SSHClient) -> Tuple[str, str]:
code, out, err = run(client, "cat /etc/os-release || uname -a")
if code != 0:
raise RuntimeError(f"Nie można odczytać /etc/os-release: {err}")
os_id = ""
pretty = ""
for line in out.splitlines():
if line.startswith("ID="):
os_id = line.split("=", 1)[1].strip().strip('"')
if line.startswith("PRETTY_NAME="):
pretty = line.split("=", 1)[1].strip().strip('"')
if not os_id:
for line in out.splitlines():
if line.startswith("NAME="):
os_id = line.split("=", 1)[1].strip().strip('"').lower()
if not pretty:
pretty = os_id
return os_id, pretty
def pkg_manager(os_id: str) -> Optional[str]:
os_id = os_id.lower()
if os_id in SUPPORTED_IDS:
return SUPPORTED_IDS[os_id]
if any(k in os_id for k in ["debian", "ubuntu"]):
return "apt"
if "suse" in os_id:
return "zypper"
if "arch" in os_id:
return "pacman"
return None
def upgrade_host(h: Host, ask_sudo_pw: Optional[str]) -> Tuple[str, str]:
"""Zwraca (status, message)."""
try:
client = connect(h)
except Exception as e:
return ("error", str(e))
try:
os_id, pretty = detect_os(client)
mgr = pkg_manager(os_id)
if not mgr:
return ("skip", f"[{h.name}] Nieobsługiwany system: {pretty} (ID: {os_id})")
sudo_available = has_sudo(client)
effective_use_sudo = h.use_sudo and h.user != "root" and sudo_available
if h.use_sudo and h.user != "root" and not sudo_available:
return ("error", f"[{h.name}] Brak polecenia sudo, a użytkownik {h.user} nie jest root. Ustaw use_sudo=false lub użyj konta root.")
sudo_pw = ask_sudo_pw if effective_use_sudo else None
steps = PKG_COMMANDS[mgr]
for step in steps:
cmd = f"sudo -S -p '' {step}" if effective_use_sudo else step
code, out, err = run(client, cmd, sudo_password=sudo_pw)
if code != 0:
return ("error", f"[{h.name}] Błąd przy komendzie: {cmd}\n{err or out}")
return ("ok", f"[{h.name}] Zaktualizowano: {pretty} ({mgr})")
finally:
client.close()
def main():
parser = argparse.ArgumentParser(description="SSH updater dla wielu hostów z pliku INI")
parser.add_argument("--hosts", required=True, help="Ścieżka do pliku INI z hostami")
parser.add_argument("--ask-pass", action="store_true", help="Zapytaj o hasło SSH (dla hostów bez klucza)")
parser.add_argument("--ask-sudo", action="store_true", help="Zapytaj o hasło do sudo (używane gdy user != root)")
parser.add_argument("--parallel", type=int, default=1, help="Liczba hostów aktualizowanych równolegle (domyślnie 1)")
args = parser.parse_args()
hosts = parse_hosts(Path(args.hosts))
if not hosts:
console.print("[red]Brak hostów w pliku.[/red]")
sys.exit(1)
ssh_pw: Optional[str] = None
sudo_pw: Optional[str] = None
if args.ask_pass:
ssh_pw = getpass.getpass("Hasło SSH: ")
for h in hosts:
if h.password is None:
h.password = ssh_pw
if args.ask_sudo:
sudo_pw = getpass.getpass("Hasło sudo: ")
# (inicjalizacja hostów i haseł została przeniesiona powyżej)
table = Table(title="Aktualizacja systemów przez SSH", show_lines=False)
table.add_column("Host", style="bold")
table.add_column("Adres")
table.add_column("Użytkownik")
table.add_column("Sudo")
for h in hosts:
table.add_row(h.name, h.host, h.user, "tak" if h.use_sudo else "nie")
console.print(table)
results: list[Tuple[str, str]] = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TimeElapsedColumn(),
transient=True,
) as progress:
task = progress.add_task("Praca…", total=len(hosts))
if args.parallel <= 1:
# sekwencyjnie
for h in hosts:
progress.update(task, description=f"[bold]{h.name}[/bold]: łączenie i aktualizacja…")
status, msg = upgrade_host(h, sudo_pw)
results.append((status, msg))
progress.advance(task)
else:
# równolegle
max_workers = max(1, args.parallel)
with ThreadPoolExecutor(max_workers=max_workers) as ex:
future_map = {ex.submit(upgrade_host, h, sudo_pw): h for h in hosts}
for fut in as_completed(future_map):
h = future_map[fut]
progress.update(task, description=f"[bold]{h.name}[/bold]: łączenie i aktualizacja…")
try:
status, msg = fut.result()
except Exception as e:
status, msg = ("error", f"[{h.name}] Wyjątek: {e}")
results.append((status, msg))
progress.advance(task)
# Podsumowanie
ok = sum(1 for s, _ in results if s == "ok")
skip = sum(1 for s, _ in results if s == "skip")
err = [m for s, m in results if s == "error"]
summary = Table(title="Podsumowanie", show_lines=False)
summary.add_column("OK")
summary.add_column("Pominięte")
summary.add_column("Błędy")
summary.add_row(str(ok), str(skip), str(len(err)))
console.print(summary)
if err:
console.print("[red]Błędy:[/red]")
for m in err:
console.print(f"{m}")
sys.exit(0 if not err else 2)
if __name__ == "__main__":
main()