#!/usr/bin/env python3 # app.py from flask import Flask, render_template, request, redirect, url_for, flash, session from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler import librouteros import threading import time import requests import smtplib import atexit from email.mime.text import MIMEText from datetime import timedelta # Konfiguracja aplikacji app = Flask(__name__) app.config['SECRET_KEY'] = 'twoj-sekret-klucz' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # Konfiguracja Flask-Login login_manager = LoginManager(app) login_manager.login_view = 'login' # MODELE BAZY DANYCH class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) devices = db.relationship('Device', backref='owner', lazy=True) settings = db.relationship('Settings', uselist=False, backref='user') def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) class Device(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(120)) # Nazwa urządzenia (opcjonalnie) ip = db.Column(db.String(120), nullable=False) port = db.Column(db.Integer, default=8728) device_username = db.Column(db.String(120), nullable=False) device_password = db.Column(db.String(120), nullable=False) branch = db.Column(db.String(20), default="stable") # Wybór gałęzi aktualizacji update_required = db.Column(db.Boolean, default=False) # True, gdy dostępna jest aktualizacja last_check = db.Column(db.DateTime) last_log = db.Column(db.Text) current_version = db.Column(db.String(50)) # Nowa kolumna – aktualna wersja systemu current_firmware = db.Column(db.String(50)) # Nowa kolumna – aktualna wersja firmware user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) class Settings(db.Model): id = db.Column(db.Integer, primary_key=True) pushover_user_key = db.Column(db.String(255)) pushover_token = db.Column(db.String(255)) pushover_enabled = db.Column(db.Boolean, default=False) smtp_server = db.Column(db.String(255)) smtp_port = db.Column(db.Integer) smtp_username = db.Column(db.String(255)) smtp_password = db.Column(db.String(255)) email_notifications_enabled = db.Column(db.Boolean, default=False) check_interval = db.Column(db.Integer, default=60) # interwał sprawdzania w sekundach log_retention_days = db.Column(db.Integer, default=30) # nowe pole – retencja logów w dniach user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True) class Log(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) message = db.Column(db.Text) device_id = db.Column(db.Integer, db.ForeignKey('device.id')) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) # Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu) with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # FUNKCJE POWIADOMIEŃ def send_pushover_notification(user, message): # Sprawdzamy, czy użytkownik posiada ustawienia oraz wymagane pola if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token: return data = { "token": user.settings.pushover_token, # Używamy pushover_token z ustawień "user": user.settings.pushover_user_key, "message": message } try: r = requests.post("https://api.pushover.net/1/messages.json", data=data) # Możesz dodać logowanie odpowiedzi, jeśli potrzebne except Exception as e: print("Błąd przy wysyłaniu powiadomienia Pushover:", e) def send_email_notification(user, subject, message): if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server: return try: msg = MIMEText(message) msg["Subject"] = subject msg["From"] = user.settings.smtp_username msg["To"] = user.email s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port) s.starttls() s.login(user.settings.smtp_username, user.settings.smtp_password) s.sendmail(user.settings.smtp_username, [user.email], msg.as_string()) s.quit() except Exception as e: print("Błąd przy wysyłaniu powiadomienia e-mail:", e) # FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA def check_device_update(device): log_entries = [] update_available = False current_version = None current_firmware = None try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) # Pobranie podstawowych informacji identity_resp = list(api('/system/identity/print')) if identity_resp: identity = identity_resp[0].get('name', '') log_entries.append(f"Identity: {identity}") # Pobranie wersji systemu resource_resp = list(api('/system/resource/print')) if resource_resp: version = resource_resp[0].get('version', '') current_version = version log_entries.append(f"System Version: {version}") # Pobranie informacji o urządzeniu, w tym firmware board_resp = list(api('/system/routerboard/print')) if board_resp: board_info = board_resp[0] firmware = board_info.get('firmware', board_info.get('firmware-version', board_info.get('upgrade-firmware', 'N/A'))) current_firmware = firmware log_entries.append(f"Firmware: {firmware}") # Sprawdzenie dostępnych aktualizacji log_entries.append("Checking for updates...") list(api('/system/package/update/check-for-updates')) # Czekamy aż operacja się zakończy for _ in range(10): time.sleep(1) status_resp = list(api('/system/package/update/print')) if status_resp: status = status_resp[0].get('status', '').lower() if 'checking' not in status: log_entries.append(f"Update check completed. Status: {status}") break update_resp = list(api('/system/package/update/print')) if update_resp: for res in update_resp: installed = res.get('installed-version', '') latest = res.get('latest-version', '') if latest and latest != installed: log_entries.append(f"Updates available: {installed} -> {latest}") update_available = True else: log_entries.append("No updates available.") return "\n".join(log_entries), update_available, current_version, current_firmware except Exception as e: return f"Error: {str(e)}", False, None, None def check_all_devices(): with app.app_context(): devices = Device.query.all() for device in devices: result, update_available, current_version, current_firmware = check_device_update(device) device.last_log = result device.last_check = datetime.utcnow() device.update_required = update_available device.current_version = current_version device.current_firmware = current_firmware db.session.commit() # Zapis do tabeli logów log_entry = Log(message=result, device_id=device.id, user_id=device.user_id) db.session.add(log_entry) db.session.commit() # Powiadomienia, jeśli dostępna aktualizacja if update_available: user = device.owner message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację." send_pushover_notification(user, message) send_email_notification(user, "Aktualizacja dostępna", message) def bytes_to_human(n): try: n = int(n) except Exception: return n i = 0 while n >= 1024 and i < 5: n /= 1024.0 i += 1 units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] return f"{n:.2f} {units[i]}" def clean_old_logs(): with app.app_context(): all_settings = Settings.query.all() for setting in all_settings: if setting.log_retention_days: cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days) # Usuwamy logi starsze niż cutoff dla danego użytkownika Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete() db.session.commit() # Harmonogram sprawdzania aktualizacji – wykorzystujemy APScheduler scheduler = BackgroundScheduler() scheduler.add_job(func=check_all_devices, trigger="interval", seconds=60) # co 60 sekund; można zmienić na podstawie ustawień użytkownika scheduler.start() # ROUTY APLIKACJI @app.route('/') def index(): return render_template('index.html') # Rejestracja @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] email = request.form['email'] password = request.form['password'] # Prosta walidacja – warto rozszerzyć if User.query.filter_by(username=username).first(): flash("Użytkownik o tej nazwie już istnieje.") return redirect(url_for('register')) new_user = User(username=username, email=email) new_user.set_password(password) db.session.add(new_user) db.session.commit() # Utwórz domyślne ustawienia dla użytkownika default_settings = Settings(user_id=new_user.id, check_interval=60) db.session.add(default_settings) db.session.commit() flash("Rejestracja zakończona. Możesz się zalogować.") return redirect(url_for('login')) return render_template('register.html') # Logowanie @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) flash("Zalogowano pomyślnie.") return redirect(url_for('devices')) else: flash("Nieprawidłowa nazwa użytkownika lub hasło.") return render_template('login.html') # Wylogowanie @app.route('/logout') @login_required def logout(): logout_user() flash("Wylogowano.") return redirect(url_for('index')) # Lista urządzeń użytkownika @app.route('/devices') @login_required def devices(): user_devices = Device.query.filter_by(user_id=current_user.id).all() return render_template('devices.html', devices=user_devices) # Dodawanie urządzenia @app.route('/device/add', methods=['GET', 'POST']) @login_required def add_device(): if request.method == 'POST': name = request.form.get('name') ip = request.form['ip'] port = int(request.form.get('port', 8728)) device_username = request.form['device_username'] device_password = request.form['device_password'] new_device = Device(name=name, ip=ip, port=port, device_username=device_username, device_password=device_password, user_id=current_user.id) db.session.add(new_device) db.session.commit() flash("Urządzenie dodane.") return redirect(url_for('devices')) return render_template('add_device.html') # Szczegóły urządzenia @app.route('/device/<int:device_id>') @login_required def device_detail(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.") return redirect(url_for('devices')) resource_data = {} try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) res_resp = list(api('/system/resource/print')) if res_resp: resource_data = res_resp[0] # Konwersja wartości pamięci i dysku na czytelny format if 'free-memory' in resource_data: resource_data['free-memory'] = bytes_to_human(resource_data['free-memory']) if 'total-memory' in resource_data: resource_data['total-memory'] = bytes_to_human(resource_data['total-memory']) if 'free-hdd-space' in resource_data: resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space']) except Exception as e: resource_data = {'error': str(e)} return render_template('device_detail.html', device=device, resource=resource_data) # Strona z logami @app.route('/logs') @login_required def logs(): user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all() return render_template('logs.html', logs=user_logs) # Strona ustawień powiadomień @app.route('/settings', methods=['GET', 'POST']) @login_required def settings(): user_settings = current_user.settings if request.method == 'POST': # Aktualizacja ustawień Pushover user_settings.pushover_user_key = request.form.get('pushover_user_key') user_settings.pushover_token = request.form.get('pushover_token') user_settings.pushover_enabled = bool(request.form.get('pushover_enabled')) # Aktualizacja ustawień SMTP user_settings.smtp_server = request.form.get('smtp_server') smtp_port = request.form.get('smtp_port') user_settings.smtp_port = int(smtp_port) if smtp_port else None user_settings.smtp_username = request.form.get('smtp_username') user_settings.smtp_password = request.form.get('smtp_password') user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled')) # Aktualizacja interwału sprawdzania interval = request.form.get('check_interval') user_settings.check_interval = int(interval) if interval else 60 # Aktualizacja retencji logów retention = request.form.get('log_retention_days') user_settings.log_retention_days = int(retention) if retention else 30 db.session.commit() flash("Ustawienia zapisane.") return redirect(url_for('settings')) return render_template('settings.html', settings=user_settings) @app.route('/device/<int:device_id>/edit', methods=['GET', 'POST']) @login_required def edit_device(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.") return redirect(url_for('devices')) if request.method == 'POST': # Używamy get() z wartością domyślną, aby nie wymuszać przesłania wszystkich pól device.name = request.form.get('name', device.name) device.ip = request.form.get('ip', device.ip) device.port = int(request.form.get('port', device.port or 8728)) device.device_username = request.form.get('device_username', device.device_username) device.device_password = request.form.get('device_password', device.device_password) device.branch = request.form.get('branch', device.branch or 'stable') db.session.commit() flash("Urządzenie zaktualizowane.") return redirect(url_for('devices')) return render_template('edit_device.html', device=device) @app.route('/device/<int:device_id>/force_check') @login_required def force_check(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.") return redirect(url_for('devices')) result, update_available, current_version, current_firmware = check_device_update(device) device.last_log = result device.last_check = datetime.utcnow() device.update_required = update_available device.current_version = current_version device.current_firmware = current_firmware db.session.commit() flash("Sprawdzenie urządzenia zakończone.") return redirect(url_for('devices')) @app.route('/device/<int:device_id>/update', methods=['POST']) @login_required def update_device(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.") return redirect(url_for('devices')) try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) # Przykładowo: wybór komendy w zależności od wybranego branch if device.branch == 'stable': list(api('/system/package/update/install')) elif device.branch == 'dev': list(api('/system/package/update/install', branch='dev')) elif device.branch == 'beta': list(api('/system/package/update/install', branch='beta')) else: list(api('/system/package/update/install')) flash("Aktualizacja systemu została rozpoczęta.") except Exception as e: flash(f"Błąd podczas aktualizacji: {e}") return redirect(url_for('device_detail', device_id=device.id)) @app.route('/device/<int:device_id>/update_firmware', methods=['POST']) @login_required def update_firmware(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.") return redirect(url_for('devices')) try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) # Przykładowa komenda aktualizacji firmware list(api('/system/routerboard/upgrade')) flash("Aktualizacja firmware została rozpoczęta.") except Exception as e: flash(f"Błąd podczas aktualizacji firmware: {e}") return redirect(url_for('device_detail', device_id=device.id)) # Zamknięcie harmonogramu przy zatrzymaniu aplikacji atexit.register(lambda: scheduler.shutdown()) if __name__ == '__main__': scheduler.add_job(func=clean_old_logs, trigger="interval", days=1) app.run(host='0.0.0.0', port=5581, use_reloader=False, debug=True)