Files
skrypty_narzedzia/node_exporter_manager.py
2025-06-08 14:49:09 +02:00

474 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import sys
import subprocess
import requests
import tarfile
import shutil
import logging
import pwd
from pathlib import Path
import bcrypt
import hashlib
import platform
import distro
# Stałe ścieżki i konfiguracja
BIN_TARGET = '/usr/local/bin/node_exporter'
SERVICE_FILE = '/etc/systemd/system/node_exporter.service'
LOG_FILE = '/var/log/node_exporter_installer.log'
USER_NAME = 'node_exporter'
USER_HOME = '/var/lib/node_exporter'
CONFIG_DIR = '/etc/node_exporter'
CONFIG_PATH = os.path.join(CONFIG_DIR, 'config.yml')
# Konfiguracja logowania
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
# Rozpoznanie architektury
ARCH_MAP = {
'x86_64': 'amd64',
'aarch64': 'arm64',
'armv7l': 'armv7',
'armv6l': 'armv6',
'armv5l': 'armv5',
'i686': '386',
'i386': '386'
}
DRY_RUN = '--dry-run' in sys.argv
# ----------------- FUNKCJE POMOCNICZE -----------------
def ensure_root():
if os.geteuid() != 0:
print("Ten skrypt musi być uruchomiony jako root.")
sys.exit(1)
def run_cmd(cmd, check=True):
if DRY_RUN:
print(f"[dry-run] {' '.join(cmd)}")
return ""
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=check, text=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
logging.error(f"Błąd komendy: {cmd}{e.stderr}")
raise
def get_latest_version():
try:
r = requests.get('https://api.github.com/repos/prometheus/node_exporter/releases/latest', timeout=10)
r.raise_for_status()
return r.json()['tag_name'].lstrip('v'), r.json()
except requests.RequestException as e:
logging.error(f"Błąd pobierania wersji z GitHub: {e}")
raise
def get_local_version():
if Path(BIN_TARGET).exists():
try:
output = run_cmd([BIN_TARGET, '--version'])
for line in output.splitlines():
if "version" in line:
parts = line.split()
if len(parts) >= 3:
return parts[2]
except Exception as e:
logging.warning(f"Nie udało się odczytać wersji lokalnej: {e}")
return None
def download_and_extract(url, download_path='/tmp'):
filename = os.path.join(download_path, url.split('/')[-1])
extract_dirname = filename.replace('.tar.gz', '')
extract_path = Path(extract_dirname)
try:
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
except requests.RequestException as e:
logging.error(f"Błąd pobierania pliku: {e}")
raise
with tarfile.open(filename) as tar:
if sys.version_info >= (3, 12):
tar.extractall(path=download_path, filter='data')
else:
tar.extractall(path=download_path)
if not extract_path.is_dir():
raise Exception(f"Nie znaleziono rozpakowanego katalogu: {extract_path}")
return extract_path
def verify_checksum(filepath, sha256sum):
h = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
h.update(chunk)
return h.hexdigest() == sha256sum
def get_sha256_from_release(release, filename):
for asset in release['assets']:
if asset['name'] == 'sha256sums.txt':
sha_url = asset['browser_download_url']
try:
r = requests.get(sha_url, timeout=10)
r.raise_for_status()
for line in r.text.strip().splitlines():
if filename in line:
return line.split()[0]
except requests.RequestException:
return None
return None
def check_node_exporter():
try:
subprocess.run(['curl', '-f', 'http://localhost:9100/metrics'], check=True, stdout=subprocess.DEVNULL)
print("✅ Node Exporter działa poprawnie")
except subprocess.CalledProcessError:
print("❌ Node Exporter nie odpowiada — restart...")
run_cmd(['systemctl', 'restart', 'node_exporter'])
def detect_os_family():
try:
os_id = distro.id().lower()
if os_id in ['ubuntu', 'debian']:
return 'debian'
elif os_id in ['arch', 'manjaro']:
return 'arch'
elif os_id in ['opensuse', 'suse']:
return 'suse'
elif os_id in ['centos', 'rhel', 'fedora']:
return 'rhel'
else:
return 'unknown'
except Exception:
return 'unknown'
# ----------------- INSTALACJA I KONFIGURACJA -----------------
def install_binary(extracted_dir):
src = Path(extracted_dir) / 'node_exporter'
if Path(BIN_TARGET).exists():
shutil.copy(BIN_TARGET, BIN_TARGET + '.bak')
logging.info("Backup binarki zrobiony jako node_exporter.bak")
shutil.copy(src, BIN_TARGET)
os.chmod(BIN_TARGET, 0o755)
logging.info("Zainstalowano binarkę do /usr/local/bin")
def create_user():
try:
pwd.getpwnam(USER_NAME)
logging.info("Użytkownik node_exporter już istnieje.")
except KeyError:
run_cmd(['useradd', '--system', '--home', USER_HOME, '--shell', '/bin/false', USER_NAME])
logging.info("Utworzono użytkownika node_exporter")
home_path = Path(USER_HOME)
home_path.mkdir(parents=True, exist_ok=True)
shutil.chown(home_path, user=USER_NAME, group=USER_NAME)
logging.info(f"Utworzono katalog {USER_HOME} i przypisano właściciela.")
def setup_service():
service_content = f"""[Unit]
Description=Node Exporter
Wants=network-online.target
After=network-online.target
[Service]
User={USER_NAME}
Group={USER_NAME}
WorkingDirectory={USER_HOME}
ExecStart={BIN_TARGET}
Restart=on-failure
[Install]
WantedBy=default.target
"""
with open(SERVICE_FILE, 'w') as f:
f.write(service_content)
logging.info("Zapisano konfigurację usługi systemd")
run_cmd(['systemctl', 'daemon-reload'])
run_cmd(['systemctl', 'enable', '--now', 'node_exporter'])
logging.info("Włączono i uruchomiono usługę node_exporter")
def setup_secured_config():
os.makedirs(CONFIG_DIR, exist_ok=True)
subprocess.run([
"openssl", "req", "-new", "-newkey", "rsa:4096", "-days", "3650", "-nodes", "-x509",
"-subj", "/C=PL/ST=X/L=X/O=secure/CN=localhost",
"-keyout", f"{CONFIG_DIR}/node_exporter.key",
"-out", f"{CONFIG_DIR}/node_exporter.crt"
], check=True)
config = """basic_auth_users:
root: $2y$10$SNr5iyJMvqiecOx6tXgDTuBpxyd40Byp2j.iBM5lR/oQnlpi8nAje
tls_server_config:
cert_file: node_exporter.crt
key_file: node_exporter.key
"""
with open(CONFIG_PATH, "w") as f:
f.write(config)
shutil.chown(CONFIG_DIR, user=USER_NAME, group=USER_NAME)
for f_name in os.listdir(CONFIG_DIR):
shutil.chown(os.path.join(CONFIG_DIR, f_name), user=USER_NAME, group=USER_NAME)
def change_password(user, password):
if not os.path.exists(CONFIG_PATH):
print("Brak pliku config.yml najpierw użyj --install-secured")
return
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
with open(CONFIG_PATH, 'r') as f:
lines = f.readlines()
new_lines = []
in_auth = False
user_updated = False
for line in lines:
if line.strip().startswith("basic_auth_users:"):
in_auth = True
new_lines.append(line)
continue
if in_auth and line.startswith(" ") and ":" in line:
existing_user = line.split(":")[0].strip()
if existing_user == user:
new_lines.append(f" {user}: {hashed}\n")
user_updated = True
else:
new_lines.append(line)
elif in_auth and not line.startswith(" "):
if not user_updated:
new_lines.append(f" {user}: {hashed}\n")
user_updated = True
in_auth = False
new_lines.append(line)
else:
new_lines.append(line)
if in_auth and not user_updated:
new_lines.append(f" {user}: {hashed}\n")
with open(CONFIG_PATH, "w") as f:
f.writelines(new_lines)
print(f"Zmieniono hasło dla użytkownika '{user}'")
def uninstall():
if Path(SERVICE_FILE).exists():
run_cmd(['systemctl', 'disable', '--now', 'node_exporter'])
os.remove(SERVICE_FILE)
logging.info("Usunięto plik usługi i zatrzymano node_exporter")
if Path(BIN_TARGET).exists():
os.remove(BIN_TARGET)
logging.info("Usunięto binarkę node_exporter")
try:
pwd.getpwnam(USER_NAME)
run_cmd(['userdel', USER_NAME])
logging.info("Usunięto użytkownika node_exporter")
except KeyError:
logging.info("Użytkownik już nie istnieje")
if Path(USER_HOME).exists():
shutil.rmtree(USER_HOME)
logging.info("Usunięto katalog /var/lib/node_exporter")
def install():
ensure_root()
if Path(BIN_TARGET).exists():
print("Node Exporter już zainstalowany. Użyj --update.")
return
version, release = get_latest_version()
url = next(asset['browser_download_url'] for asset in release['assets'] if 'linux-amd64.tar.gz' in asset['browser_download_url'])
extracted = download_and_extract(url)
# Weryfikacja sumy SHA256
filename = url.split('/')[-1]
sha256_expected = get_sha256_from_release(release, filename)
local_path = os.path.join('/tmp', filename)
if sha256_expected and not verify_checksum(local_path, sha256_expected):
print("❌ Weryfikacja SHA256 nie powiodła się.")
sys.exit(1)
install_binary(extracted)
create_user()
setup_service()
logging.info("Instalacja zakończona")
print("✅ Node Exporter został zainstalowany")
def update():
ensure_root()
local_version = get_local_version()
latest_version, release = get_latest_version()
# Sprawdź czy mamy wymusić aktualizację (--force lub --force-update)
force_update = '--force' in sys.argv or '--force-update' in sys.argv
if not force_update and local_version == latest_version:
print(f"Node Exporter już aktualny ({local_version})")
print("Użyj --update --force aby wymusić aktualizację")
return
print(f"Aktualizacja z {local_version} do {latest_version}...")
run_cmd(['systemctl', 'stop', 'node_exporter'])
# Pobierz odpowiedni plik dla architektury
machine = platform.machine().lower()
arch = ARCH_MAP.get(machine, 'amd64') # Domyślnie amd64 jeśli architektura nieznana
url = next(
asset['browser_download_url'] for asset in release['assets']
if f'linux-{arch}.tar.gz' in asset['browser_download_url']
)
extracted = download_and_extract(url)
# Weryfikacja sumy SHA256
filename = url.split('/')[-1]
sha256_expected = get_sha256_from_release(release, filename)
local_path = os.path.join('/tmp', filename)
if sha256_expected and not verify_checksum(local_path, sha256_expected):
print("❌ Weryfikacja SHA256 nie powiodła się.")
sys.exit(1)
install_binary(extracted)
run_cmd(['systemctl', 'start', 'node_exporter'])
check_node_exporter()
logging.info(f"Zaktualizowano Node Exporter do wersji {latest_version}")
print(f"✅ Node Exporter został zaktualizowany do wersji {latest_version}")
def setup():
source_path = Path(__file__).resolve()
target_path = Path('/usr/local/bin/node_exporter_manager.py')
if source_path == target_path:
print(" Skrypt już działa z docelowej lokalizacji.")
elif not target_path.exists():
shutil.copy(source_path, target_path)
target_path.chmod(0o755)
logging.info(f"Zainstalowano skrypt jako {target_path}")
print(f"✅ Skrypt zainstalowany w {target_path}")
else:
print(f" Skrypt już zainstalowany w {target_path}")
cron_line = f"15 3 * * * {target_path} --update >> /var/log/node_exporter_cron.log 2>&1"
try:
cron_result = run_cmd(['crontab', '-l'], check=False)
except Exception:
cron_result = ""
if cron_line not in cron_result:
with open('/tmp/node_exporter_cron', 'w') as f:
f.write(cron_result.strip() + '\n' + cron_line + '\n')
run_cmd(['crontab', '/tmp/node_exporter_cron'])
os.remove('/tmp/node_exporter_cron')
logging.info("Dodano wpis do crontaba")
print("✅ Zadanie cron dodane")
else:
print(" Wpis cron już istnieje.")
logrotate_path = '/etc/logrotate.d/node_exporter_manager'
logrotate_config = f"""{LOG_FILE} /var/log/node_exporter_cron.log {{
weekly
rotate 4
compress
missingok
notifempty
create 644 root root
}}
"""
with open(logrotate_path, 'w') as f:
f.write(logrotate_config)
logging.info("Skonfigurowano logrotate")
print(f"✅ Logrotate dodany w {logrotate_path}")
def print_status():
print("📦 Status Node Exporter")
version = get_local_version()
if version:
print(f"✔️ Zainstalowana wersja: {version}")
else:
print("❌ Node Exporter nie jest zainstalowany")
service_status = subprocess.run(['systemctl', 'is-active', 'node_exporter'], stdout=subprocess.PIPE)
if service_status.returncode == 0:
print("✔️ Usługa node_exporter działa")
else:
print("❌ Usługa node_exporter nie działa")
if Path(CONFIG_PATH).exists():
print(f"✔️ Znaleziono config.yml: {CONFIG_PATH}")
else:
print(" Brak pliku config.yml")
# ----------------- GŁÓWNY BLOK -----------------
def main():
if os.geteuid() != 0:
print("Ten skrypt musi być uruchomiony jako root.")
sys.exit(1)
if len(sys.argv) < 2:
print("""Użycie:
node_exporter_manager.py --install # Instaluje node_exporter i uruchamia usługę
node_exporter_manager.py --update # Aktualizuje node_exporter do najnowszej wersji (jeśli potrzeba)
node_exporter_manager.py --update --force # Wymusza aktualizację nawet jeśli wersja jest taka sama
node_exporter_manager.py --uninstall # Usuwa node_exporter, usługę, użytkownika
node_exporter_manager.py --setup # Instaluje ten skrypt do /usr/local/bin, dodaje CRON i logrotate
node_exporter_manager.py --setup --force # Wymusza nadpisanie istniejącego skryptu
node_exporter_manager.py --install-secured # Instalacja z TLS + basic auth (certyfikat + config.yml)
node_exporter_manager.py --set-password=user:haslo # Zmiana hasła w config.yml
node_exporter_manager.py --dry-run # Symuluje działania bez wprowadzania zmian
node_exporter_manager.py --status # Pokazuje status node_exportera
""")
sys.exit(1)
try:
if sys.argv[1] == '--install':
install()
elif sys.argv[1] == '--update':
update()
elif sys.argv[1] == '--force-update':
sys.argv.insert(1, '--update')
sys.argv.insert(2, '--force')
update()
elif sys.argv[1] == '--uninstall':
uninstall()
elif sys.argv[1] == '--setup':
setup()
elif sys.argv[1] == '--install-secured':
install()
setup_secured_config()
elif sys.argv[1].startswith('--set-password='):
user_pass = sys.argv[1].split('=')[1]
if ":" not in user_pass:
print("Użyj formatu --set-password=user:haslo")
sys.exit(1)
u, p = user_pass.split(":", 1)
change_password(u, p)
elif sys.argv[1] == '--status':
print_status()
else:
print("Nieznany argument. Użyj --help")
sys.exit(1)
except Exception as e:
logging.error(f"Błąd krytyczny: {e}")
print(f"Wystąpił błąd: {e}")
sys.exit(1)
if __name__ == '__main__':
main()