#!/usr/bin/env python3 # app.py from flask import Flask, render_template, request, redirect, url_for, flash, session from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required from werkzeug.security import generate_password_hash, check_password_hash from apscheduler.schedulers.background import BackgroundScheduler import librouteros import threading import time import requests import smtplib from email.mime.text import MIMEText from flask import current_app as app from flask import render_template import atexit from datetime import timedelta, datetime import requests from bs4 import BeautifulSoup from flask import render_template, flash import logging import re try: from dateutil import parser as date_parser except ImportError: date_parser = None from concurrent.futures import ThreadPoolExecutor, as_completed logging.basicConfig(level=logging.INFO) # Konfiguracja aplikacji app = Flask(__name__) app.config['SECRET_KEY'] = 'twoj-sekret-klucz' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # Konfiguracja Flask-Login login_manager = LoginManager(app) login_manager.login_view = 'login' # MODELE BAZY DANYCH class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) email = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) devices = db.relationship('Device', backref='owner', lazy=True) settings = db.relationship('Settings', uselist=False, backref='user') def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) class Device(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(120)) ip = db.Column(db.String(120), nullable=False) port = db.Column(db.Integer, default=8728) device_username = db.Column(db.String(120), nullable=False) device_password = db.Column(db.String(120), nullable=False) branch = db.Column(db.String(20), default="stable") update_required = db.Column(db.Boolean, default=False) last_check = db.Column(db.DateTime) last_log = db.Column(db.Text) current_version = db.Column(db.String(50)) current_firmware = db.Column(db.String(50)) upgrade_firmware = db.Column(db.String(50)) use_ssl = db.Column(db.Boolean, default=False) # Czy używać SSL? ssl_insecure = db.Column(db.Boolean, default=False) # Jeśli True – nie weryfikować certyfikatu SSL user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) class Settings(db.Model): id = db.Column(db.Integer, primary_key=True) pushover_user_key = db.Column(db.String(255)) pushover_token = db.Column(db.String(255)) pushover_enabled = db.Column(db.Boolean, default=False) smtp_server = db.Column(db.String(255)) smtp_port = db.Column(db.Integer) smtp_username = db.Column(db.String(255)) smtp_password = db.Column(db.String(255)) email_notifications_enabled = db.Column(db.Boolean, default=False) check_interval = db.Column(db.Integer, default=60) log_retention_days = db.Column(db.Integer, default=30) recipient_email = db.Column(db.String(120)) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True) class Log(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) message = db.Column(db.Text) device_id = db.Column(db.Integer, db.ForeignKey('device.id')) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) device = db.relationship('Device', backref='logs') class UpdateHistory(db.Model): id = db.Column(db.Integer, primary_key=True) device_id = db.Column(db.Integer, db.ForeignKey('device.id')) timestamp = db.Column(db.DateTime, default=datetime.utcnow) update_type = db.Column(db.String(50)) details = db.Column(db.Text) device = db.relationship('Device', backref='update_histories') class Anomaly(db.Model): id = db.Column(db.Integer, primary_key=True) device_id = db.Column(db.Integer, db.ForeignKey('device.id'), nullable=True) timestamp = db.Column(db.DateTime, default=datetime.utcnow) description = db.Column(db.Text) resolved = db.Column(db.Boolean, default=False) device = db.relationship('Device', backref='anomalies') class ChangelogEntry(db.Model): id = db.Column(db.Integer, primary_key=True) version = db.Column(db.String(50), nullable=False) details = db.Column(db.Text, nullable=False) category = db.Column(db.String(10), nullable=False) # "6.x" or "7.x" timestamp = db.Column(db.DateTime, default=datetime.utcnow) release_type = db.Column(db.String(10), nullable=False, default="stable") # Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu) with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # FUNKCJE POWIADOMIEŃ def send_pushover_notification(user, message): if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token: return data = { "token": user.settings.pushover_token, "user": user.settings.pushover_user_key, "message": message } try: r = requests.post("https://api.pushover.net/1/messages.json", data=data) except Exception as e: print("Błąd przy wysyłaniu powiadomienia Pushover:", e) def send_email_notification(user, subject, message): if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server: return try: html_body = get_email_template(subject, message) msg = MIMEText(html_body, 'html') msg["Subject"] = subject msg["From"] = user.settings.smtp_username # Używamy adresu z ustawień, jeśli został podany, lub domyślnie adresu z profilu użytkownika to_email = user.settings.recipient_email if user.settings.recipient_email else user.email msg["To"] = to_email s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port) s.starttls() s.login(user.settings.smtp_username, user.settings.smtp_password) s.sendmail(user.settings.smtp_username, [to_email], msg.as_string()) s.quit() app.logger.debug("E-mail wysłany pomyślnie") except Exception as e: app.logger.error(f"Błąd przy wysyłaniu powiadomienia e-mail: {e}", exc_info=True) # FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA def check_device_update(device): log_entries = [] update_available = False current_version = None current_firmware = None upgrade_firmware = None try: app.logger.debug(f"Connecting to device {device.ip}:{device.port} using SSL: {device.use_ssl}, ssl_verify: {not device.ssl_insecure}") api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15, ssl=device.use_ssl, ssl_verify=not device.ssl_insecure ) app.logger.debug(f"Connection established to {device.ip}") # Pobranie informacji o tożsamości urządzenia identity_resp = list(api('/system/identity/print')) app.logger.debug(f"Identity response: {identity_resp}") if identity_resp: identity = identity_resp[0].get('name', '') log_entries.append(f"Identity: {identity}") # Pobranie wersji systemu resource_resp = list(api('/system/resource/print')) app.logger.debug(f"Resource response: {resource_resp}") if resource_resp: version = resource_resp[0].get('version', '') current_version = version log_entries.append(f"System Version: {version}") # Pobranie informacji o routerboard (firmware) board_resp = list(api('/system/routerboard/print')) app.logger.debug(f"Routerboard response: {board_resp}") if board_resp: board_info = board_resp[0] # Pobieramy oddzielnie obie wersje: current_fw = board_info.get('firmware', board_info.get('firmware-version', board_info.get('current-firmware', 'N/A'))) upgrade_fw = board_info.get('upgrade-firmware', 'N/A') current_firmware = current_fw upgrade_firmware = upgrade_fw log_entries.append(f"Firmware: {current_fw}") log_entries.append(f"Upgrade Firmware: {upgrade_fw}") # Sprawdzenie dostępnych aktualizacji log_entries.append("Checking for updates...") list(api('/system/package/update/check-for-updates')) for _ in range(10): time.sleep(1) status_resp = list(api('/system/package/update/print')) app.logger.debug(f"Update status response: {status_resp}") if status_resp: status = status_resp[0].get('status', '').lower() if 'checking' not in status: log_entries.append(f"Update check completed. Status: {status}") break update_resp = list(api('/system/package/update/print')) app.logger.debug(f"Update response: {update_resp}") if update_resp: for res in update_resp: installed = res.get('installed-version', '') latest = res.get('latest-version', '') if latest and latest != installed: log_entries.append(f"Updates available: {installed} -> {latest}") update_available = True else: log_entries.append("No updates available.") # Zwracamy 5-elementową krotkę return "\n".join(log_entries), update_available, current_version, current_firmware, upgrade_firmware except Exception as e: app.logger.error(f"Error connecting to device {device.ip}: {e}", exc_info=True) return f"Error: {str(e)}", False, None, None, None def get_email_template(subject, message): return f"""

{subject}

{message}

""" def clean_old_changelogs(): with app.app_context(): cutoff_time = datetime.utcnow() - timedelta(days=7) db.session.query(ChangelogEntry).filter(ChangelogEntry.timestamp < cutoff_time).delete() db.session.commit() def check_all_devices(): with app.app_context(): devices = Device.query.all() for device in devices: result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device) device.last_log = result device.last_check = datetime.utcnow() device.update_required = update_available device.current_version = current_version device.current_firmware = current_firmware device.upgrade_firmware = upgrade_firmware db.session.commit() log_entry = Log(message=result, device_id=device.id, user_id=device.user_id) db.session.add(log_entry) db.session.commit() if update_available: user = device.owner message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację." send_pushover_notification(user, message) send_email_notification(user, "Aktualizacja dostępna", message) def bytes_to_human(n): try: n = int(n) except Exception: return n i = 0 while n >= 1024 and i < 5: n /= 1024.0 i += 1 units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] return f"{n:.2f} {units[i]}" def clean_old_logs(): with app.app_context(): all_settings = Settings.query.all() for setting in all_settings: if setting.log_retention_days: cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days) # Usuwamy logi starsze niż cutoff dla danego użytkownika Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete() db.session.commit() def parse_release_date(text): """ Próbuje wyłuskać datę z tekstu. Najpierw używa regex, a w razie niepowodzenia – dateutil (jeśli dostępny). """ date_match = re.search(r"\((\d{4})-([A-Za-z]{3})-(\d{2})", text) if date_match: try: return datetime.strptime(f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}", "%Y-%b-%d") except Exception as e: logging.error(f"Błąd parsowania daty przy użyciu strptime: {e}") if date_parser: try: return date_parser.parse(text, fuzzy=True) except Exception as e: logging.error(f"Błąd parsowania daty przy użyciu dateutil: {e}") return None def get_release_type(version_text): """ Określa typ wydania na podstawie numeru wersji: - Jeśli w tekście występuje "beta" – zwraca "beta" - Jeśli w tekście występuje "rc" – zwraca "rc" - W przeciwnym wypadku – "stable" """ lower = version_text.lower() if "beta" in lower: return "beta" elif "rc" in lower: return "rc" else: return "stable" @app.template_filter('format_version') def format_version(value): # Krok 1: Usuń wszystko w nawiasach (np. „(2025-Feb-12 11:20)”). value = re.sub(r'\(.*?\)', '', value).strip() # Krok 2: Spróbuj dopasować strukturę: major.minor(.patch)?(beta|rc)(num)? # 1) major (np. 7) # 2) minor (np. 18) # 3) patch (np. 20) # 4) suffix (beta|rc) # 5) suffixnum (np. 2025) pattern = r'^(\d+)' # grupa 1: major pattern += r'(?:\.(\d+))?' # grupa 2: minor (opcjonalnie) pattern += r'(?:\.(\d+))?' # grupa 3: patch (opcjonalnie) pattern += r'(?:(beta|rc)(\d+))?'# grupa 4 i 5: beta|rc + numer (opcjonalnie) match = re.match(pattern, value) if not match: # Jeśli nie uda się dopasować, zwracamy oryginał return value major, minor, patch, suffix, suffixnum = match.groups() # Funkcja pomocnicza do przycinania łańcucha cyfr do maksymalnie 2 znaków def truncate_2_digits(num_str): return num_str[:2] if len(num_str) > 2 else num_str # Składamy główne części wersji result = major if minor: # np. "182025" → "18" result += '.' + truncate_2_digits(minor) if patch: # np. "182025" → "18" result += '.' + truncate_2_digits(patch) if suffix: # Dodajemy samo "beta"/"rc" result += suffix if suffixnum: # 1) Najpierw sprawdźmy, czy w numerze sufiksu jest (zlepiony) rok, np. "22025" → "2" i "2025" # Wzorzec: do 2 cyfr + "20xx", np. "14" + "2025", "2" + "2025" itd. m_year = re.match(r'^(\d{1,2})(20\d{2})$', suffixnum) if m_year: # Jeśli tak, zostawiamy tylko pierwszą grupę (np. "14" z "14"+"2025") suffixnum = m_year.group(1) # 2) Ostatecznie przycinamy do 2 cyfr (jeśli ktoś wpisał np. "beta123") suffixnum = truncate_2_digits(suffixnum) result += suffixnum return result @app.template_global() def bootstrap_alert_category(cat): mapping = { 'error': 'danger', 'fail': 'danger', 'warning': 'warning', 'warn': 'warning', 'ok': 'success', 'success': 'success', 'info': 'info' } return mapping.get(cat.lower(), 'info') def fetch_changelogs(force=False): changelog_url = "https://mikrotik.com/download/changelogs" current_date = datetime.utcnow() def process_section(section): a_tag = section.find("a") if not a_tag: return None raw_text = a_tag.get_text(strip=True) logging.info(f"raw_text: {raw_text}") # Najpierw próbujemy znaleźć wersję za pomocą wzorca z "in" match = re.search(r"in\s+(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)", raw_text) if match: version_text = match.group(1) logging.info(f"Parsed version (pattern 1): {version_text}") else: # Jeśli nie znaleziono, próbujemy wychwycić wersję na początku łańcucha z dołączoną datą match = re.match(r"^(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)(\d{4}-\d{2}-\d{2})", raw_text) if match: version_text = match.group(1) logging.info(f"Parsed version (pattern 2): {version_text}") else: version_text = raw_text logging.info("Brak dopasowania regex, używam raw_text jako wersji") # Pomijamy wersje, które nie zaczynają się od "6." lub "7." if not (version_text.startswith("6.") or version_text.startswith("7.")): logging.info(f"Pomijam wersję {version_text} – nie jest 6.x ani 7.x") return None details_div = section.find("div", class_="content") changelog_file_url = details_div.get("data-url") if details_div else None if not changelog_file_url: logging.warning(f"Brak URL changeloga dla wersji {version_text}") return None try: changelog_response = requests.get(changelog_file_url, timeout=10) changelog_response.raise_for_status() changelog_lines = changelog_response.text.splitlines() if not changelog_lines: logging.warning(f"Pusty changelog dla wersji {version_text}") return None first_line = changelog_lines[0].strip() release_date_dt = parse_release_date(first_line) if release_date_dt is None: logging.warning(f"Nie udało się wyłuskać daty dla wersji {version_text}, pomijam ten changelog") return None changelog_text = "\n".join(changelog_lines).strip() except Exception as e: logging.error(f"Błąd pobierania changeloga z {changelog_file_url}: {e}") return None # Filtrowanie według daty: dla 7.x pomijamy wersje starsze niż 1 rok, dla 6.x starsze niż 2 lata if version_text.startswith("7.") and release_date_dt < (current_date - timedelta(days=365)): logging.info(f"Pomijam wersję {version_text} - starsza niż 1 rok") return None if version_text.startswith("6.") and release_date_dt < (current_date - timedelta(days=730)): logging.info(f"Pomijam wersję {version_text} - starsza niż 2 lata") return None release_type = get_release_type(version_text) return { "version": version_text, "details": changelog_text, "category": "6.x" if version_text.startswith("6.") else "7.x", "timestamp": release_date_dt, "release_type": release_type } try: logging.info(f"Pobieranie changelogów z {changelog_url}...") response = requests.get(changelog_url, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") changelog_sections = soup.find_all("li", class_="accordion-navigation") logging.info(f"Znaleziono {len(changelog_sections)} sekcji changelogów.") new_entries = 0 results = [] # Używamy równoległego przetwarzania sekcji with ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(process_section, section) for section in changelog_sections] for future in as_completed(futures): result = future.result() if result is not None: results.append(result) # Dodajemy wyniki do bazy danych for entry in results: new_entry = ChangelogEntry( version=entry["version"], details=entry["details"], category=entry["category"], timestamp=entry["timestamp"], release_type=entry["release_type"] ) db.session.add(new_entry) new_entries += 1 db.session.commit() logging.info(f"Nowe wpisy dodane: {new_entries}") except Exception as e: logging.error(f"Błąd podczas pobierania changelogów: {e}") def detect_anomalies(): with app.app_context(): # Ustal okres analizy, np. ostatnie 24 godziny cutoff = datetime.utcnow() - timedelta(hours=24) # Pobierz logi użytkowników (lub logi globalne) z tego okresu logs = Log.query.filter(Log.timestamp >= cutoff).all() # Przykładowa analiza: wykryj logi zawierające określone słowa kluczowe error_keywords = ["błąd", "error", "niepowodzenie", "exception"] detected = {} for log in logs: lower_msg = log.message.lower() if any(keyword in lower_msg for keyword in error_keywords): detected.setdefault(log.device_id, []).append(log.message) # Dla każdego urządzenia, jeżeli wykryto więcej niż określony próg błędów, zapisz anomalię for device_id, messages in detected.items(): if len(messages) >= 3: # przykładowy próg description = f"Wykryto {len(messages)} błędne logi w ciągu ostatnich 24 godzin. Przykłady: " + "; ".join(messages[:3]) anomaly = Anomaly(device_id=device_id, description=description) db.session.add(anomaly) # Możesz również wysłać powiadomienie, np. e-mail lub Pushover db.session.commit() # Harmonogram sprawdzania aktualizacji – wykorzystujemy APScheduler scheduler = BackgroundScheduler() # Inicjalizacja bazy i schedulera with app.app_context(): db.create_all() # lub już wcześniej utworzona baza # Pobranie globalnych ustawień – zakładamy, że Settings.query.first() zwróci ustawienia globalne global_settings = Settings.query.first() if global_settings and global_settings.check_interval: interval = global_settings.check_interval else: interval = 60 scheduler.add_job( func=check_all_devices, trigger="interval", seconds=interval, id="check_all_devices", max_instances=1 ) scheduler.add_job( func=detect_anomalies, trigger="interval", minutes=60, id="detect_anomalies", max_instances=1 ) scheduler.add_job( func=clean_old_changelogs, trigger="interval", days=1, id="clean_changelogs", max_instances=1 ) scheduler.add_job( func=lambda: fetch_changelogs(force=False), trigger="interval", days=1, id="daily_changelog_fetch", max_instances=1 ) app.logger.debug(f"Scheduler initialized with interval: {interval} seconds") scheduler.start() # ROUTY APLIKACJI @app.route('/') def index(): if current_user.is_authenticated: return redirect(url_for('dashboard')) return render_template('index.html') @app.route('/dashboard') @login_required def dashboard(): devices_count = Device.query.count() pending_updates_count = Device.query.filter_by(update_required=True).count() logs_count = Log.query.count() users_count = User.query.count() anomalies_count = Anomaly.query.filter_by(resolved=False).count() update_history_count = UpdateHistory.query.count() recent_logs = Log.query.order_by(Log.timestamp.desc()).limit(5).all() # Pobieramy najnowsze wersje stabilne dla 7.x i 6.x latest_version_7 = ChangelogEntry.query.filter_by(category="7.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first() latest_version_6 = ChangelogEntry.query.filter_by(category="6.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first() return render_template('dashboard.html', devices_count=devices_count, pending_updates_count=pending_updates_count, logs_count=logs_count, users_count=users_count, anomalies_count=anomalies_count, update_history_count=update_history_count, recent_logs=recent_logs, latest_version_7=latest_version_7, latest_version_6=latest_version_6) # Rejestracja @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] email = request.form['email'] password = request.form['password'] # Prosta walidacja – warto rozszerzyć if User.query.filter_by(username=username).first(): flash("Użytkownik o tej nazwie już istnieje.", "error") return redirect(url_for('register')) new_user = User(username=username, email=email) new_user.set_password(password) db.session.add(new_user) db.session.commit() # Utwórz domyślne ustawienia dla użytkownika default_settings = Settings(user_id=new_user.id, check_interval=60) db.session.add(default_settings) db.session.commit() flash("Rejestracja zakończona. Możesz się zalogować.", "success") return redirect(url_for('login')) return render_template('register.html') # Logowanie @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) flash("Zalogowano pomyślnie.", "success") return redirect(url_for('dashboard')) else: flash("Nieprawidłowa nazwa użytkownika lub hasło.", "error") return render_template('login.html') # Wylogowanie @app.route('/logout') @login_required def logout(): logout_user() flash("Wylogowano.", "success") return redirect(url_for('index')) # Lista urządzeń użytkownika @app.route('/devices') @login_required def devices(): user_devices = Device.query.filter_by(user_id=current_user.id).all() return render_template('devices.html', devices=user_devices) # Dodawanie urządzenia @app.route('/device/add', methods=['GET', 'POST']) @login_required def add_device(): if request.method == 'POST': name = request.form.get('name') ip = request.form['ip'] port = int(request.form.get('port', 8728)) device_username = request.form['device_username'] device_password = request.form['device_password'] use_ssl = bool(request.form.get('use_ssl')) ssl_insecure = bool(request.form.get('ssl_insecure')) new_device = Device( name=name, ip=ip, port=port, device_username=device_username, device_password=device_password, use_ssl=use_ssl, ssl_insecure=ssl_insecure, user_id=current_user.id ) db.session.add(new_device) db.session.commit() flash("Urządzenie dodane.", "success") return redirect(url_for('devices')) return render_template('add_device.html') # Szczegóły urządzenia @app.route('/device/') @login_required def device_detail(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) resource_data = {} try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) res_resp = list(api('/system/resource/print')) if res_resp: resource_data = res_resp[0] # Konwersja wartości pamięci i dysku na czytelny format if 'free-memory' in resource_data: resource_data['free-memory'] = bytes_to_human(resource_data['free-memory']) if 'total-memory' in resource_data: resource_data['total-memory'] = bytes_to_human(resource_data['total-memory']) if 'free-hdd-space' in resource_data: resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space']) except Exception as e: resource_data = {'error': str(e)} return render_template('device_detail.html', device=device, resource=resource_data) # Strona z logami @app.route('/logs') @login_required def logs(): user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all() return render_template('logs.html', logs=user_logs) # Strona ustawień powiadomień @app.route('/settings', methods=['GET', 'POST']) @login_required def settings(): user_settings = current_user.settings if request.method == 'POST': # Aktualizacja ustawień Pushover user_settings.pushover_user_key = request.form.get('pushover_user_key') user_settings.pushover_token = request.form.get('pushover_token') user_settings.pushover_enabled = bool(request.form.get('pushover_enabled')) # Aktualizacja ustawień SMTP user_settings.smtp_server = request.form.get('smtp_server') smtp_port = request.form.get('smtp_port') user_settings.smtp_port = int(smtp_port) if smtp_port else None user_settings.smtp_username = request.form.get('smtp_username') user_settings.smtp_password = request.form.get('smtp_password') user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled')) # Aktualizacja adresu e-mail odbiorcy (może być inny niż email z profilu) user_settings.recipient_email = request.form.get('recipient_email') # Aktualizacja interwału sprawdzania interval = request.form.get('check_interval') user_settings.check_interval = int(interval) if interval else 60 # Aktualizacja retencji logów retention = request.form.get('log_retention_days') user_settings.log_retention_days = int(retention) if retention else 30 db.session.commit() try: scheduler.reschedule_job("check_all_devices", trigger="interval", seconds=user_settings.check_interval) app.logger.debug(f"Scheduler rescheduled with new interval: {user_settings.check_interval} seconds") except Exception as e: app.logger.error(f"Error rescheduling job: {e}") flash("Ustawienia zapisane.", "success") return redirect(url_for('settings')) return render_template('settings.html', settings=user_settings) @app.route('/device//edit', methods=['GET', 'POST']) @login_required def edit_device(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) if request.method == 'POST': device.name = request.form.get('name', device.name) device.ip = request.form.get('ip', device.ip) device.port = int(request.form.get('port', device.port or 8728)) device.device_username = request.form.get('device_username', device.device_username) device.device_password = request.form.get('device_password', device.device_password) device.branch = request.form.get('branch', device.branch or 'stable') device.use_ssl = bool(request.form.get('use_ssl')) device.ssl_insecure = bool(request.form.get('ssl_insecure')) db.session.commit() flash("Urządzenie zaktualizowane.", "success") return redirect(url_for('devices')) return render_template('edit_device.html', device=device) @app.route('/device//force_check') @login_required def force_check(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device) device.last_log = result device.last_check = datetime.utcnow() device.update_required = update_available device.current_version = current_version device.current_firmware = current_firmware device.upgrade_firmware = upgrade_firmware db.session.commit() flash("Sprawdzenie urządzenia zakończone.", "success") return redirect(url_for('devices')) @app.route('/device//update', methods=['POST']) @login_required def update_device(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) try: app.logger.debug(f"Initiating system update for device {device.ip}") api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15, ssl=device.use_ssl, ssl_verify=not device.ssl_insecure ) app.logger.debug("Connection established, starting update command") if device.branch == 'stable': list(api('/system/package/update/install')) elif device.branch == 'dev': list(api('/system/package/update/install', branch='dev')) elif device.branch == 'beta': list(api('/system/package/update/install', branch='beta')) else: list(api('/system/package/update/install')) history = UpdateHistory( device_id=device.id, update_type="system", details=f"Aktualizacja systemu rozpoczęta na urządzeniu {device.name or device.ip}." ) db.session.add(history) db.session.commit() flash("Aktualizacja systemu została rozpoczęta.", "success") app.logger.debug("System update command executed successfully") except Exception as e: app.logger.error(f"Błąd podczas aktualizacji urządzenia {device.ip}: {e}", exc_info=True) flash(f"Błąd podczas aktualizacji: {e}", "error") return redirect(url_for('device_detail', device_id=device.id)) @app.route('/device//update_firmware', methods=['POST']) @login_required def update_firmware(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) # Przykładowa komenda aktualizacji firmware list(api('/system/routerboard/upgrade')) history = UpdateHistory( device_id=device.id, update_type="firmware", details=f"Aktualizacja firmware rozpoczęta na urządzeniu {device.name or device.ip}." ) db.session.add(history) db.session.commit() flash("Aktualizacja firmware została rozpoczęta.", "success") except Exception as e: flash(f"Błąd podczas aktualizacji firmware: {e}", "error") return redirect(url_for('device_detail', device_id=device.id)) @app.route('/test_pushover', methods=['POST']) @login_required def test_pushover(): message = "To jest testowe powiadomienie Pushover z RouterOS Update." send_pushover_notification(current_user, message) flash("Test powiadomienia Pushover wysłany.", "success") return redirect(url_for('settings')) @app.route('/test_email', methods=['POST']) @login_required def test_email(): subject = "Testowy E-mail z RouterOS Update" message = "To jest testowa wiadomość e-mail wysłana z RouterOS Update." send_email_notification(current_user, subject, message) flash("Testowy e-mail wysłany.", "success") return redirect(url_for('settings')) @app.route('/change_password', methods=['GET', 'POST']) @login_required def change_password(): if request.method == 'POST': old_password = request.form.get('old_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') if not current_user.check_password(old_password): flash("Stare hasło jest nieprawidłowe.", "error") return redirect(url_for('reset_password')) if new_password != confirm_password: flash("Nowe hasło i potwierdzenie nie są zgodne.", "warning") return redirect(url_for('reset_password')) current_user.set_password(new_password) db.session.commit() flash("Hasło zostało zresetowane.", "success") return redirect(url_for('reset_password')) return render_template('change_password.html') @app.route('/logs/clean', methods=['POST']) @login_required def clean_logs(): days = request.form.get('days') if not days: flash("Podaj liczbę dni.", "warning") return redirect(url_for('logs')) try: days = int(days) except ValueError: flash("Niepoprawna wartość dni.", "warning") return redirect(url_for('logs')) cutoff = datetime.utcnow() - timedelta(days=days) num_deleted = Log.query.filter(Log.user_id == current_user.id, Log.timestamp < cutoff).delete() db.session.commit() flash(f"Usunięto {num_deleted} logów starszych niż {days} dni.", "success") return redirect(url_for('logs')) @app.route('/update_history') @login_required def update_history(): histories = UpdateHistory.query.join(Device).filter(Device.user_id == current_user.id).order_by(UpdateHistory.timestamp.desc()).all() return render_template('update_history.html', histories=histories) @app.route('/anomalies') @login_required def anomalies(): anomalies = Anomaly.query.join(Device).filter(Device.user_id == current_user.id).order_by(Anomaly.timestamp.desc()).all() return render_template('anomalies.html', anomalies=anomalies) @app.route('/devices/update_selected', methods=['POST']) @login_required def update_selected_devices(): selected_ids = request.form.getlist('selected_devices') if not selected_ids: flash("Nie wybrano żadnych urządzeń.", "error") return redirect(url_for('devices')) for device_id in selected_ids: device = Device.query.get(device_id) if device and device.user_id == current_user.id: result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device) device.last_log = result device.last_check = datetime.utcnow() device.update_required = update_available device.current_version = current_version device.current_firmware = current_firmware device.upgrade_firmware = upgrade_firmware db.session.commit() log_entry = Log(message=result, device_id=device.id, user_id=device.user_id) db.session.add(log_entry) db.session.commit() flash("Wybrane urządzenia zostały zaktualizowane.", "success") return redirect(url_for('devices')) @app.route('/routeros_changelog') @login_required def routeros_changelog(): channel = request.args.get('channel', 'stable') # "stable", "rc" lub "beta" series = request.args.get('series', '7.x') # "7.x" lub "6.x" selected_version = request.args.get('version') # Pobieramy wszystkie wpisy dla danego kanału i serii, posortowane malejąco wg daty entries = ChangelogEntry.query.filter_by(release_type=channel, category=series).order_by(ChangelogEntry.timestamp.desc()).all() if selected_version: selected_entry = ChangelogEntry.query.filter_by(version=selected_version, release_type=channel, category=series).first() else: selected_entry = entries[0] if entries else None return render_template( 'routeros_changelog_tabs.html', channel=channel, series=series, entries=entries, selected_entry=selected_entry ) @app.route('/force_fetch_changelogs') @login_required def force_fetch_changelogs(): with app.app_context(): # Usuwamy wszystkie stare wpisy db.session.query(ChangelogEntry).delete() db.session.commit() # Pobieramy changelogi od nowa fetch_changelogs(force=True) flash("Changelog został całkowicie odświeżony.", "success") return redirect(url_for('routeros_changelog')) @app.route('/device//restart', methods=['POST']) @login_required def restart_device(device_id): device = Device.query.get_or_404(device_id) if device.user_id != current_user.id: flash("Brak dostępu.", "error") return redirect(url_for('devices')) try: api = librouteros.connect( host=device.ip, username=device.device_username, password=device.device_password, port=device.port, timeout=15 ) # Wysyłamy komendę reboot list(api('/system/reboot')) flash("Komenda reboot została wysłana.", "success") except Exception as e: flash(f"Błąd podczas wysyłania komendy reboot: {e}", "error") return ('', 204) # Zwracamy odpowiedź bez treści dla żądania AJAX # Zamknięcie harmonogramu przy zatrzymaniu aplikacji atexit.register(lambda: scheduler.shutdown()) if __name__ == '__main__': scheduler.add_job(func=clean_old_logs, trigger="interval", days=1) app.run(host='0.0.0.0', port=5582, use_reloader=False, debug=True)