import os import difflib import paramiko import atexit import io import zipfile import requests import re import smtplib import shutil import socket import hashlib from datetime import datetime from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email import encoders from flask import jsonify from flask import Flask from datetime import datetime from difflib import HtmlDiff from datetime import datetime, timedelta from sqlalchemy import text from flask import ( Flask, render_template, request, redirect, url_for, session, flash, send_file ) from flask_sqlalchemy import SQLAlchemy from passlib.context import CryptContext #from flask_wtf.csrf import CSRFProtect from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger # REGEX dla nazwy urzadzenia ALLOWED_NAME_REGEX = re.compile(r'^[A-Za-z0-9_-]+$') ############################################################################### # Konfiguracja Flask ############################################################################### app = Flask(__name__) app.config['SECRET_KEY'] = 'super-secret-key' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///backup_routeros.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app_start_time = datetime.now() #csrf = CSRFProtect(app) db = SQLAlchemy(app) # Folder do przechowywania plików (./data) DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') os.makedirs(DATA_DIR, exist_ok=True) ############################################################################### # Modele bazy danych ############################################################################### pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(120), unique=True, nullable=False) password_hash = db.Column(db.String(255), nullable=False) def set_password(self, password): self.password_hash = pwd_context.hash(password) def check_password(self, password): return pwd_context.verify(password, self.password_hash) class Router(db.Model): __tablename__ = 'routers' id = db.Column(db.Integer, primary_key=True) owner_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False) name = db.Column(db.String(120), nullable=False) host = db.Column(db.String(255), nullable=False) port = db.Column(db.Integer, default=22) ssh_user = db.Column(db.String(120), default='admin') ssh_key = db.Column(db.Text, nullable=True) # klucz prywatny (string lub ścieżka) ssh_password = db.Column(db.String(120), nullable=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) backups = db.relationship('Backup', backref='router', lazy=True) class Backup(db.Model): __tablename__ = 'backups' id = db.Column(db.Integer, primary_key=True) router_id = db.Column(db.Integer, db.ForeignKey('routers.id'), nullable=False) file_path = db.Column(db.String(255), nullable=False) # Ścieżka do pliku backup_type = db.Column(db.String(50), default='export') # 'export' lub 'binary' created_at = db.Column(db.DateTime, default=datetime.utcnow) checksum = db.Column(db.String(64), nullable=True) class OperationLog(db.Model): __tablename__ = 'operation_logs' __table_args__ = {'extend_existing': True} # Zapobiega redefinicji tabeli id = db.Column(db.Integer, primary_key=True) message = db.Column(db.Text, nullable=False) timestamp = db.Column(db.DateTime, default=datetime.utcnow) class GlobalSettings(db.Model): __tablename__ = 'global_settings' id = db.Column(db.Integer, primary_key=True) backup_retention_days = db.Column(db.Integer, default=7) auto_backup_interval_days = db.Column(db.Integer, default=1) cron_schedule = db.Column(db.String(50), default="") retention_cron = db.Column(db.String(50), default="") export_cron = db.Column(db.String(50), default="") binary_cron = db.Column(db.String(50), default="") enable_auto_export = db.Column(db.Boolean, default=False) global_ssh_key = db.Column(db.Text, nullable=True) pushover_token = db.Column(db.String(255), nullable=True) pushover_userkey = db.Column(db.String(255), nullable=True) notify_failures_only = db.Column(db.Boolean, default=True) smtp_host = db.Column(db.String(255), nullable=True) smtp_port = db.Column(db.Integer, default=587) smtp_login = db.Column(db.String(255), nullable=True) smtp_password = db.Column(db.String(255), nullable=True) smtp_notifications_enabled = db.Column(db.Boolean, default=False) log_retention_days = db.Column(db.Integer, default=7) recipient_email = db.Column(db.String(255), nullable=True) ############################################################################### # Inicjalizacja bazy ############################################################################### with app.app_context(): db.create_all() if not GlobalSettings.query.first(): default_settings = GlobalSettings() db.session.add(default_settings) db.session.commit() ############################################################################### # Pomocnicze dekoratory i funkcje ############################################################################### def login_required(func): def wrapper(*args, **kwargs): if 'user_id' not in session: flash("Musisz być zalogowany, aby uzyskać dostęp.") return redirect(url_for('login')) return func(*args, **kwargs) wrapper.__name__ = func.__name__ return wrapper def get_current_user(): if 'user_id' in session: return User.query.get(session['user_id']) return None def get_settings() -> GlobalSettings: s = GlobalSettings.query.first() if not s: s = GlobalSettings() db.session.add(s) db.session.commit() return s def log_operation(message: str): log = OperationLog(message=message) db.session.add(log) db.session.commit() def load_pkey(ssh_key_str: str): key_str = ssh_key_str.strip() if not key_str: raise ValueError("SSH key is empty. Upewnij się, że podałeś poprawny klucz SSH (globalny lub indywidualny).") if os.path.isfile(key_str): return paramiko.RSAKey.from_private_key_file(key_str) else: key_buf = io.StringIO(key_str) try: return paramiko.RSAKey.from_private_key(key_buf) except Exception as e: raise ValueError("Nie udało się załadować klucza SSH. Sprawdź, czy klucz jest poprawny i nie jest zaszyfrowany") from e def compute_checksum(file_path): sha256 = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): sha256.update(chunk) return sha256.hexdigest() ############################################################################### # Funkcje SSH ############################################################################### def ssh_export(router: Router) -> str: print(f"[DEBUG] ssh_export -> router id={router.id}, host={router.host}") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Wybór klucza: użyj router.ssh_key, jeśli podany; w przeciwnym razie globalnego klucza key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key if key_source and key_source.strip(): try: pkey = load_pkey(key_source) client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10) except Exception as e: print("[DEBUG] ssh_export -> błąd przy load_pkey/connect:", e) raise e else: client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10) stdin, stdout, stderr = client.exec_command('/export') output = stdout.read().decode('utf-8', errors='ignore') client.close() print(f"[DEBUG] ssh_export -> output length={len(output)}") return output def ssh_backup(router: Router, backup_name: str) -> str: print(f"[DEBUG] ssh_backup -> router id={router.id}, backup_name={backup_name}") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Wybór klucza: jeśli router.ssh_key jest podany, używamy go, w przeciwnym razie używamy global_ssh_key key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key if key_source and key_source.strip(): try: pkey = load_pkey(key_source) client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10) except Exception as e: print("[DEBUG] ssh_backup -> błąd przy load_pkey/connect:", e) raise e else: client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10) command = f"/system backup save name={backup_name}" stdin, stdout, stderr = client.exec_command(command) stdout.channel.recv_exit_status() sftp = client.open_sftp() remote_file = f"{backup_name}.backup" local_path = os.path.join(DATA_DIR, f"{backup_name}.backup") sftp.get(remote_file, local_path) # Usuwamy zdalny plik backupu, aby nie zapychać pamięci routera try: sftp.remove(remote_file) print(f"[DEBUG] ssh_backup -> usunięto plik {remote_file} z routera") except Exception as e: print(f"[DEBUG] ssh_backup -> błąd przy usuwaniu pliku {remote_file} z routera: {e}") sftp.close() client.close() print(f"[DEBUG] ssh_backup -> local_path={local_path}") return local_path def ssh_upload_backup(router: Router, local_backup_path: str, expected_checksum: str = None): # Weryfikacja sumy kontrolnej, jeśli podana if expected_checksum: local_checksum = compute_checksum(local_backup_path) if local_checksum != expected_checksum: raise ValueError("Suma kontrolna pliku nie zgadza się – plik może być uszkodzony.") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Wybór klucza: indywidualny lub globalny key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key if key_source and key_source.strip(): try: pkey = load_pkey(key_source) client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10) except Exception as e: print(f"[DEBUG] ssh_upload_backup -> błąd przy łączeniu z kluczem SSH: {e}") raise e else: client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10) # Otwieramy sesję SFTP, przesyłamy plik, a następnie zamykamy połączenie sftp = client.open_sftp() remote_file = os.path.basename(local_backup_path) sftp.put(local_backup_path, remote_file) sftp.close() client.close() print(f"[DEBUG] ssh_upload_backup -> przesłano {local_backup_path} do routera") def ssh_test_connection(router: Router) -> dict: """Testuje połączenie z routerem i zwraca informacje: model, uptime, hostname.""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key if key_source and key_source.strip(): try: pkey = load_pkey(key_source) client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10) except Exception as e: raise e else: client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False) stdin, stdout, stderr = client.exec_command('/system resource print without-paging') resource_output = stdout.read().decode('utf-8', errors='ignore') stdin, stdout, stderr = client.exec_command('/system identity print') identity_output = stdout.read().decode('utf-8', errors='ignore') client.close() model = "Nieznany" uptime = "Nieznany" hostname = "Nieznany" for line in resource_output.splitlines(): if "board-name" in line: model = line.split(":", 1)[1].strip() if "uptime" in line: uptime = line.split(":", 1)[1].strip() for line in identity_output.splitlines(): if "name" in line: hostname = line.split(":", 1)[1].strip() return {"model": model, "uptime": uptime, "hostname": hostname} def get_valid_cron_trigger(cron_expr, default_expr): try: trigger = CronTrigger.from_crontab(cron_expr) return trigger except ValueError as e: print(f"[DEBUG] Invalid cron expression: {cron_expr}. Error: {e}. Using default: {default_expr}") return CronTrigger.from_crontab(default_expr) def get_email_template(subject, message): return f"""

{subject}

{message}

""" ############################################################################### # Powiadomienia ############################################################################### def send_pushover(token, userkey, message, title="RouterOS Backup"): try: resp = requests.post("https://api.pushover.net/1/messages.json", data={ "token": token, "user": userkey, "message": message, "title": title }) return resp.status_code == 200 except Exception as e: print("Pushover error:", e) return False def send_mail_with_attachment(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, plain_body, attachment_path="", html_body=None): if not (smtp_host and smtp_host.strip() and smtp_user and smtp_user.strip() and smtp_pass and smtp_pass.strip()): print("SMTP not properly configured, skipping email sending.") return False try: msg = MIMEMultipart("alternative") msg["From"] = smtp_user msg["To"] = to_address msg["Subject"] = subject part1 = MIMEText(plain_body, "plain") msg.attach(part1) if html_body is None: html_body = get_email_template(subject, plain_body) part2 = MIMEText(html_body, "html") msg.attach(part2) if attachment_path and os.path.isfile(attachment_path): with open(attachment_path, "rb") as attachment: part = MIMEBase("application", "octet-stream") part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename={os.path.basename(attachment_path)}") msg.attach(part) if smtp_port == 465: server = smtplib.SMTP_SSL(smtp_host, smtp_port) server.login(smtp_user, smtp_pass) server.send_message(msg) server.quit() else: server = smtplib.SMTP(smtp_host, smtp_port) server.ehlo() if smtp_port == 587: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) server.quit() return True except Exception as e: print("Mail error_send_mail_with_attachment:", e) return False def send_mail(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, body): if not (smtp_host and smtp_user and smtp_pass): print("SMTP not configured, skipping email") return False try: msg = MIMEMultipart() msg["From"] = smtp_user msg["To"] = to_address msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg) return True except Exception as e: print("Mail error_send_mail:", e) return False def notify(settings: GlobalSettings, message: str, success: bool): if settings.notify_failures_only and success: return if settings.pushover_token and settings.pushover_userkey: send_pushover(settings.pushover_token, settings.pushover_userkey, message) # Wysyłka maila tylko jeśli SMTP Notifications są włączone if settings.smtp_notifications_enabled: if (settings.smtp_host and settings.smtp_host.strip() and settings.smtp_login and settings.smtp_login.strip() and settings.smtp_password and settings.smtp_password.strip()): try: to_address = settings.recipient_email.strip() if settings.recipient_email and settings.recipient_email.strip() else settings.smtp_login.strip() send_mail_with_attachment( smtp_host=settings.smtp_host.strip(), smtp_port=settings.smtp_port, smtp_user=settings.smtp_login.strip(), smtp_pass=settings.smtp_password.strip(), to_address=to_address, subject="RouterOS Backup Notification", plain_body=message ) except Exception as e: print("SMTP send error:", e) else: print("SMTP configuration is incomplete. Skipping email notification.") ############################################################################### # Zadania cykliczne ############################################################################### def cleanup_old_backups(): with app.app_context(): s = get_settings() cutoff_date = datetime.utcnow() - timedelta(days=s.backup_retention_days) old = Backup.query.filter(Backup.created_at < cutoff_date).all() deleted_count = 0 for b in old: try: os.remove(b.file_path) deleted_count += 1 except FileNotFoundError: deleted_count += 1 db.session.delete(b) db.session.commit() log_operation(f"Automatyczna retencja wykonana at {datetime.utcnow()}: usunięto {deleted_count} backupów (próg: {s.backup_retention_days} dni).") def scheduled_auto_backup(): with app.app_context(): s = get_settings() routers = Router.query.all() for r in routers: try: backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}" export_data = ssh_export(r) filename = f"{backup_name}.rsc" filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(export_data) b = Backup(router_id=r.id, file_path=filepath, backup_type='export') db.session.add(b) db.session.commit() notify(s, f"Auto-export dla routera {r.name} OK", True) log_operation(f"Automatyczny export dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.") except Exception as e: notify(s, f"Auto-export dla routera {r.name} FAILED: {e}", False) log_operation(f"Automatyczny export dla routera {r.name} FAILED at {datetime.utcnow()}: {e}") def scheduled_auto_binary_backup(): with app.app_context(): s = get_settings() routers = Router.query.all() for r in routers: try: backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}" local_path = ssh_backup(r, backup_name) checksum = compute_checksum(local_path) b = Backup(router_id=r.id, file_path=local_path, backup_type='binary', checksum=checksum) db.session.add(b) db.session.commit() notify(s, f"Auto-binary backup dla routera {r.name} OK", True) log_operation(f"Auto-binary backup dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.") except Exception as e: notify(s, f"Auto-binary backup dla routera {r.name} FAILED: {e}", False) log_operation(f"Auto-binary backup dla routera {r.name} FAILED at {datetime.utcnow()}: {e}") def schedule_auto_binary_backup_job(): s = get_settings() try: scheduler.remove_job("auto_binary_backup_job") except Exception: pass if s.binary_cron: trigger = get_valid_cron_trigger(s.binary_cron, "15 2 * * *") scheduler.add_job(func=scheduled_auto_binary_backup, trigger=trigger, id="auto_binary_backup_job", replace_existing=True) print(f"[DEBUG] schedule_auto_binary_backup_job -> cron_schedule={s.binary_cron}") else: scheduler.add_job(func=scheduled_auto_binary_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_binary_backup_job", replace_existing=True) print(f"[DEBUG] schedule_auto_binary_backup_job -> interval days={s.auto_backup_interval_days}") def schedule_auto_backup_job(): s = get_settings() if s.cron_schedule: trigger = get_valid_cron_trigger(s.cron_schedule, "0 */12 * * *") scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_backup_job", replace_existing=True) print(f"[DEBUG] schedule_auto_backup_job -> cron_schedule={s.cron_schedule}") else: scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_backup_job", replace_existing=True) print(f"[DEBUG] schedule_auto_backup_job -> interval days={s.auto_backup_interval_days}") def schedule_retention_job(): s = get_settings() try: scheduler.remove_job("retention_job") except Exception: pass if s.retention_cron: trigger = CronTrigger.from_crontab(s.retention_cron) scheduler.add_job(func=cleanup_old_backups, trigger=trigger, id="retention_job", replace_existing=True) print(f"[DEBUG] schedule_retention_job -> cron_schedule={s.retention_cron}") else: scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=s.backup_retention_days, id="retention_job", replace_existing=True) print(f"[DEBUG] schedule_retention_job -> interval days={s.backup_retention_days}") def schedule_auto_export_job(): s = get_settings() try: scheduler.remove_job("auto_export_job") except Exception: pass if not s.enable_auto_export: print("[DEBUG] schedule_auto_export_job -> auto export disabled") return if s.export_cron: trigger = get_valid_cron_trigger(s.export_cron, "0 */12 * * *") else: trigger = CronTrigger.from_crontab("0 */12 * * *") scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_export_job", replace_existing=True) cron_used = s.export_cron if s.export_cron else "0 */12 * * *" print(f"[DEBUG] schedule_auto_export_job -> cron_schedule={cron_used}") def cleanup_old_logs(): with app.app_context(): s = get_settings() cutoff_date = datetime.utcnow() - timedelta(days=s.log_retention_days) old_logs = OperationLog.query.filter(OperationLog.timestamp < cutoff_date).all() deleted_count = len(old_logs) for log in old_logs: db.session.delete(log) db.session.commit() log_operation(f"Automatyczna retencja logów: usunięto {deleted_count} logów starszych niż {s.log_retention_days} dni.") ############################################################################### # Konfiguracja APScheduler - harmonogram zadań ############################################################################### scheduler = BackgroundScheduler() # Dodajemy dwa zadania cykliczne: # 1) Czyszczenie starych backupów (default co 1 dzień) # 2) Auto-backup (default co 1 dzień) # Dodajemy z unikalnymi ID, co ułatwia re-schedulowanie scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=1, id="cleanup_job") scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=1, id="auto_backup_job") scheduler.start() # Sprzątanie przy zamykaniu atexit.register(lambda: scheduler.shutdown()) def reschedule_jobs(): s = get_settings() try: scheduler.remove_job("auto_export_job") except Exception: pass try: scheduler.remove_job("retention_job") except Exception: pass try: scheduler.remove_job("auto_binary_backup_job") except Exception: pass schedule_auto_export_job() schedule_retention_job() schedule_auto_binary_backup_job() scheduler.add_job(func=cleanup_old_logs, trigger='interval', days=1, id="cleanup_logs_job", replace_existing=True) ############################################################################### # Filtr Jinja2 - basename ############################################################################### @app.template_filter('basename') def basename_filter(path): return os.path.basename(path) if path else path @app.template_filter('filesize') def filesize_filter(path): if not path or not isinstance(path, (str, int, float)): return "0 B" # Jeśli wartość jest liczbą, traktujemy ją jako rozmiar w bajtach if isinstance(path, (int, float)): size = path else: if not os.path.isabs(path): path = os.path.join(DATA_DIR, path) if not os.path.exists(path): return "0 B" size = os.path.getsize(path) for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" @app.template_filter('resolve_dns') def resolve_dns_filter(host): try: # Zwraca nazwę hosta uzyskaną z reverse DNS resolved = socket.gethostbyaddr(host)[0] return resolved except Exception: return host def get_data_folder_size(): total_size = 0 for dirpath, dirnames, filenames in os.walk(DATA_DIR): for f in filenames: fp = os.path.join(dirpath, f) try: total_size += os.path.getsize(fp) except OSError: pass return total_size @app.template_global() def bootstrap_alert_category(cat): mapping = { "error": "danger", "fail": "danger", "warn": "warning", "warning": "warning", "ok": "success", "success": "success", "info": "info" } return mapping.get(cat.lower(), "info") ############################################################################### # ROUTES ############################################################################### @app.route('/') def index(): if get_current_user(): return redirect(url_for('dashboard')) return render_template('index.html') # Globalna zmienna z czasem uruchomienia aplikacji app_start_time = datetime.now() def get_data_folder_size(): total_size = 0 for dirpath, dirnames, filenames in os.walk(DATA_DIR): for f in filenames: fp = os.path.join(dirpath, f) try: total_size += os.path.getsize(fp) except OSError: pass return total_size @app.route('/dashboard') @login_required def dashboard(): user = get_current_user() routers_count = Router.query.filter_by(owner_id=user.id).count() export_count = db.session.query(Backup).join(Router).filter( Router.owner_id == user.id, Backup.backup_type == 'export' ).count() binary_count = db.session.query(Backup).join(Router).filter( Router.owner_id == user.id, Backup.backup_type == 'binary' ).count() total_backups = export_count + binary_count logs = OperationLog.query.order_by(OperationLog.timestamp.desc()).limit(10).all() disk = shutil.disk_usage(DATA_DIR) folder_used = get_data_folder_size() uptime = datetime.now() - app_start_time disk_usage_percent = (folder_used / disk.total) * 100 if disk.total > 0 else 0 s = get_settings() success_ops = OperationLog.query.filter(OperationLog.message.ilike("%OK%")).count() failure_ops = OperationLog.query.filter(OperationLog.message.ilike("%FAILED%")).count() current_time = datetime.now().astimezone() return render_template('dashboard.html', user=user, routers_count=routers_count, export_count=export_count, binary_count=binary_count, total_backups=total_backups, logs=logs, disk_total=disk.total, disk_used=folder_used, disk_free=disk.free, disk_usage_percent=disk_usage_percent, uptime=uptime, success_ops=success_ops, failure_ops=failure_ops, current_time=current_time, settings=s) @app.route('/advanced_schedule', methods=['GET', 'POST']) @login_required def advanced_schedule(): s = get_settings() if request.method == 'POST': s.retention_cron = request.form.get('retention_cron', '').strip() s.binary_cron = request.form.get('binary_cron', '').strip() s.export_cron = request.form.get('export_cron', '').strip() s.backup_retention_days = int(request.form.get('backup_retention_days', s.backup_retention_days)) s.log_retention_days = int(request.form.get('log_retention_days', s.log_retention_days)) s.enable_auto_export = True if request.form.get('enable_auto_export') == 'on' else False db.session.commit() reschedule_jobs() # Aktualizacja harmonogramu zadań flash("Ustawienia harmonogramu zostały zapisane.") return redirect(url_for('advanced_schedule')) return render_template('advanced_schedule.html', settings=s) @app.route('/toggle_dark_mode') def toggle_dark_mode(): current_mode = session.get('dark_mode', True) session['dark_mode'] = not current_mode return redirect(request.referrer or url_for('index')) @app.route('/register', methods=['GET','POST']) def register(): if request.method=='POST': username = request.form['username'] password = request.form['password'] if User.query.filter_by(username=username).first(): flash("Użytkownik już istnieje") return redirect(url_for('register')) u = User(username=username, password_hash=bcrypt.hash(password)) db.session.add(u) db.session.commit() flash("Zarejestrowano, możesz się zalogować.") return redirect(url_for('login')) return render_template('register.html') @app.route('/login', methods=['GET','POST']) def login(): if request.method=='POST': username = request.form['username'] password = request.form['password'] u = User.query.filter_by(username=username).first() if u and u.check_password(password): session['user_id'] = u.id flash("Zalogowano pomyślnie.") return redirect(url_for('dashboard')) else: flash("Nieprawidłowe dane logowania.") return redirect(url_for('login')) return render_template('login.html') @app.route('/logout') def logout(): session.clear() flash("Wylogowano.") return redirect(url_for('index')) @app.route('/routers') @login_required def routers_list(): user = get_current_user() routers = Router.query.filter_by(owner_id=user.id).order_by(Router.created_at.desc()).all() return render_template('routers.html', user=user, routers=routers) @app.route('/routers/add', methods=['GET','POST']) @login_required def add_router(): if request.method=='POST': user = get_current_user() name = request.form['name'].strip() # Walidacja nazwy: tylko litery, cyfry, - i _ if not ALLOWED_NAME_REGEX.match(name): flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).") return redirect(url_for('add_router')) host = request.form['host'].strip() port = request.form.get('port','22').strip() ssh_user = request.form['ssh_user'].strip() ssh_key = request.form['ssh_key'].strip() ssh_password = request.form['ssh_password'].strip() r = Router(owner_id=user.id, name=name, host=host, port=int(port), ssh_user=ssh_user, ssh_key=ssh_key, ssh_password=ssh_password) db.session.add(r) db.session.commit() flash("Dodano router.") return redirect(url_for('routers_list')) return render_template('add_router.html') @app.route('/router/') @login_required def router_details(router_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: flash("Brak dostępu do routera.") return redirect(url_for('routers_list')) all_b = Backup.query.filter_by(router_id=router.id).order_by(Backup.created_at.desc()).all() export_b = [x for x in all_b if x.backup_type=='export'] bin_b = [x for x in all_b if x.backup_type=='binary'] #return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b) view = request.args.get('view', 'v2') if view == 'v2': return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v2') else: return render_template('router_details.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v1') @app.route('/router//export', methods=['POST']) @login_required def router_export(router_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: if request.headers.get("X-Requested-With") == "XMLHttpRequest": return {"status": "error", "message": "Brak routera."}, 400 flash("Brak routera.") return redirect(url_for('routers_list')) try: backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}" export_data = ssh_export(router) filename = f"{backup_name}.rsc" filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(export_data) b = Backup(router_id=router.id, file_path=filepath, backup_type='export') db.session.add(b) db.session.commit() notify(get_settings(), f"Export {router.name} OK", True) log_operation(f"Export wykonany dla routera {router.name} at {datetime.utcnow()}") if request.headers.get("X-Requested-With") == "XMLHttpRequest": return {"status": "success", "message": "export zakończony."} flash("Export zakończony.") except Exception as e: notify(get_settings(), f"Export {router.name} FAIL: {e}", False) log_operation(f"Export dla routera {router.name} FAILED: {e}") if request.headers.get("X-Requested-With") == "XMLHttpRequest": return {"status": "error", "message": str(e)}, 500 flash(f"Błąd: {e}") return redirect(url_for('router_details', router_id=router.id)) @app.route('/router//backup', methods=['POST']) @login_required def router_backup(router_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: flash("Brak routera.") return redirect(url_for('routers_list')) try: backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}" local_path = ssh_backup(router, backup_name) checksum = compute_checksum(local_path) b = Backup(router_id=router.id, file_path=local_path, backup_type='binary', checksum=checksum) db.session.add(b) db.session.commit() notify(get_settings(), f"Backup {router.name} OK", True) log_operation(f"Backup binarny wykonany dla routera {router.name} at {datetime.utcnow()}") flash("Backup binarny zakończony.") except Exception as e: notify(get_settings(), f"Backup {router.name} FAIL: {e}", False) log_operation(f"Backup dla routera {router.name} FAILED: {e}") flash(f"Błąd: {e}") return redirect(url_for('router_details', router_id=router.id)) @app.route('/router//upload_backup/', methods=['POST']) @login_required def upload_backup(router_id, backup_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: flash("Brak routera.") return redirect(url_for('routers_list')) b = Backup.query.filter_by(id=backup_id, router_id=router.id, backup_type='binary').first() if not b: flash("Nie znaleziono backupu binarnego.") #return redirect(url_for('router_details', router_id=router.id)) next_url = request.form.get('next') or request.referrer or url_for('dashboard') return redirect(next_url) local_checksum = compute_checksum(b.file_path) if b.checksum != local_checksum: flash("Błąd: suma kontrolna backupu nie zgadza się – plik może być uszkodzony.") #return redirect(url_for('router_details', router_id=router.id)) next_url = request.form.get('next') or request.referrer or url_for('dashboard') return redirect(next_url) try: ssh_upload_backup(router, b.file_path, expected_checksum=b.checksum) log_operation(f"Backup {os.path.basename(b.file_path)} wgrany do routera {router.name} at {datetime.utcnow()}") flash("Plik backupu wgrany do routera.") except Exception as e: flash(f"Błąd wgrywania: {e}") next_url = request.form.get('next') or request.referrer or url_for('dashboard') return redirect(next_url) @app.route('/backup/delete/', methods=['POST']) @login_required def delete_backup(backup_id): user = get_current_user() b = Backup.query.get(backup_id) if not b or b.router.owner_id != user.id: flash("Brak dostępu do backupu.") return redirect(url_for('dashboard')) try: os.remove(b.file_path) except FileNotFoundError: pass db.session.delete(b) db.session.commit() flash("Backup usunięty.") #return redirect(url_for('router_details', router_id=b.router_id)) next_url = request.form.get('next') or url_for('dashboard') return redirect(next_url) @app.route('/router/delete/', methods=['POST']) @login_required def delete_router(router_id): user = get_current_user() r = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not r: flash("Brak dostępu do routera.") return redirect(url_for('routers_list')) for b in r.backups: try: os.remove(b.file_path) except FileNotFoundError: pass db.session.delete(b) db.session.delete(r) db.session.commit() log_operation(f"Router {r.name} usunięty at {datetime.utcnow()}") flash("Router usunięty.") return redirect(url_for('routers_list')) @app.route('/routers/all_export', methods=['POST'], endpoint="export_all_routers") @login_required def export_all_routers(): user = get_current_user() routers = Router.query.filter_by(owner_id=user.id).all() messages = [] for r in routers: try: backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}" export_data = ssh_export(r) filename = f"{backup_name}.rsc" filepath = os.path.join(DATA_DIR, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(export_data) b = Backup(router_id=r.id, file_path=filepath, backup_type='export') db.session.add(b) db.session.commit() log_operation(f"Export dla routera {r.name} OK at {datetime.utcnow()}") messages.append(f"{r.name} OK") except Exception as e: log_operation(f"Export dla routera {r.name} FAILED: {e}") messages.append(f"{r.name} FAIL: {e}") flash(" | ".join(messages)) return redirect(url_for('dashboard')) @app.route('/diff_selector', methods=['GET', 'POST']) @login_required def diff_selector(): user = get_current_user() backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.backup_type=='export').order_by(Backup.created_at.desc()).all() if request.method == 'POST': bid1 = request.form.get('backup1') bid2 = request.form.get('backup2') if bid1 and bid2: return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2)) else: flash("Wybierz dwa backupy do porównania.") return render_template('diff_selector.html', backups=backups) @app.route('/all_files') @login_required def all_files(): user = get_current_user() query = Backup.query.join(Router).filter(Router.owner_id == user.id) # Filtrowanie – wyszukiwanie po nazwie pliku (zastosowanie filtru "basename") search = request.args.get('search', '') if search: query = query.filter(Backup.file_path.ilike(f"%{search}%")) sort_by = request.args.get('sort_by', 'created_at') order = request.args.get('order', 'desc') if sort_by not in ['created_at', 'file_path']: sort_by = 'created_at' sort_column = getattr(Backup, sort_by) if order == 'asc': query = query.order_by(sort_column.asc()) else: query = query.order_by(sort_column.desc()) files = query.all() total_size = 0 for f in files: full_path = f.file_path if os.path.isabs(f.file_path) else os.path.join(DATA_DIR, f.file_path) try: total_size += os.path.getsize(full_path) except OSError: pass return render_template('all_files.html', files=files, total_size=total_size, search=search, sort_by=sort_by, order=order) @app.route('/view_export/') @login_required def view_export(backup_id): user = get_current_user() b = Backup.query.get(backup_id) if not b or b.router.owner_id != user.id: flash("Brak dostępu do backupu.") return redirect(url_for('all_files')) if b.backup_type != 'export': flash("Wybrany backup nie jest plikiem exportu.") return redirect(url_for('all_files')) try: with open(b.file_path, 'r', encoding='utf-8') as f: content = f.read() except Exception as e: flash("Błąd odczytu pliku: " + str(e)) return redirect(url_for('all_files')) next_url = request.args.get("next") or request.referrer or url_for('all_files') return render_template('view_export.html', backup=b, content=content, next_url=next_url) @app.route('/send_export_email/', methods=['POST']) @login_required def send_export_email(backup_id): user = get_current_user() b = Backup.query.get(backup_id) if not b or b.router.owner_id != user.id: flash("Brak dostępu do backupu.") return redirect(url_for('dashboard')) s = get_settings() if not (s.smtp_host and s.smtp_login and s.smtp_password): flash("Nie skonfigurowano ustawień SMTP w panelu.") return redirect(url_for('settings_view')) subject = f"RouterOS Export: {os.path.basename(b.file_path)}" body = f"Przesyłam export {os.path.basename(b.file_path)} z routera {b.router.name}." if send_mail_with_attachment(s.smtp_host, s.smtp_port, s.smtp_login, s.smtp_password, s.smtp_login, subject, body, b.file_path): flash("Wysłano export mailem.") else: flash("Błąd wysyłki mailowej.") #return redirect(url_for('router_details', router_id=b.router_id)) next_url = request.form.get('next') or request.referrer or url_for('dashboard') return redirect(next_url) @app.route('/download/') @login_required def download_file(filename): # Upewnij się, że pobierany plik znajduje się w katalogu DATA_DIR safe_filename = os.path.basename(filename) # zapobiega path traversal safe_path = os.path.join(DATA_DIR, safe_filename) if not os.path.exists(safe_path): flash("Plik nie istnieje.") return redirect(url_for('all_files')) return send_file(safe_path, as_attachment=True) # Nowa podstrona: wybór backupów do diff (alternatywnie) @app.route('/diff/selected', methods=['POST']) @login_required def diff_selected(): bid1 = request.form.get('backup1') bid2 = request.form.get('backup2') if bid1 and bid2: return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2)) else: flash("Musisz wybrać dwa pliki.") return redirect(url_for('diff_selector')) @app.route('/settings', methods=['GET','POST'], endpoint="settings_view") @login_required def settings_view(): user = get_current_user() s = get_settings() if request.method == 'POST': # Usunięto pola backup oraz harmonogram CRON s.global_ssh_key = request.form.get('global_ssh_key', '') s.pushover_token = request.form.get('pushover_token', '') s.pushover_userkey = request.form.get('pushover_userkey', '') s.notify_failures_only = bool(request.form.get('notify_failures_only', False)) s.smtp_notifications_enabled = True if request.form.get('smtp_notifications_enabled') == 'on' else False s.smtp_host = request.form.get('smtp_host', '') s.smtp_port = int(request.form.get('smtp_port', '587')) s.smtp_login = request.form.get('smtp_login', '') s.smtp_password = request.form.get('smtp_password', '') s.recipient_email = request.form.get('recipient_email', '') db.session.commit() #reschedule_jobs() # Aktualizacja zadań – zadania dotyczące backupu/CRON zostaną teraz sterowane z /advanced_schedule flash("Zapisano ustawienia.") return redirect(url_for('settings_view')) return render_template('settings.html', settings=s) @app.route('/router//edit', methods=['GET','POST']) @login_required def edit_router(router_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: flash("Brak routera.") return redirect(url_for('routers_list')) if request.method == 'POST': # Przytnij wejścia i waliduj nazwę new_name = request.form['name'].strip() if not ALLOWED_NAME_REGEX.match(new_name): flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).") return redirect(url_for('edit_router', router_id=router_id)) router.name = new_name router.host = request.form['host'].strip() router.port = int(request.form.get('port', '22').strip()) router.ssh_user = request.form['ssh_user'].strip() router.ssh_key = request.form['ssh_key'].strip() router.ssh_password = request.form['ssh_password'].strip() db.session.commit() flash("Zapisano zmiany w routerze.") return redirect(url_for('routers_list')) return render_template('edit_router.html', router=router) @app.route('/download_zip', methods=['POST']) @login_required def download_zip(): """ Pobieranie wybranych backupów (lista backup_id) w formie pliku ZIP. """ user = get_current_user() backup_ids = request.form.getlist('backup_id') # np. checkboxy "backup_id" # Filtrujemy tylko backupy należące do usera backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all() if not backups: flash("Nie wybrano żadnych plików lub brak dostępu.") return redirect(url_for('dashboard')) # Tworzymy ZIP w pamięci mem_zip = io.BytesIO() with zipfile.ZipFile(mem_zip, 'w') as zf: for b in backups: arcname = os.path.basename(b.file_path) if not os.path.isfile(b.file_path): continue zf.write(b.file_path, arcname=arcname) mem_zip.seek(0) return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='backup_export.zip') @app.route('/send_by_email/', methods=['POST']) @login_required def send_by_email(backup_id): user = get_current_user() b = Backup.query.get(backup_id) if not b or b.router.owner_id != user.id: flash("Brak dostępu do backupu.") return redirect(url_for('dashboard')) settings = get_settings() if not (settings.smtp_host and settings.smtp_login and settings.smtp_password): flash("Nie skonfigurowano ustawień SMTP w panelu.") return redirect(url_for('settings_view')) subject = f"RouterOS Backup: {os.path.basename(b.file_path)}" body = f"Przesyłam backup {os.path.basename(b.file_path)} z routera {b.router.name}." if send_mail_with_attachment(settings.smtp_host, settings.smtp_port, settings.smtp_login, settings.smtp_password, settings.smtp_login, subject, body, b.file_path): flash("Wysłano plik mailem.") else: flash("Błąd wysyłki mailowej.") #return redirect(url_for('router_details', router_id=b.router_id)) next_url = request.form.get('next') or request.referrer or url_for('dashboard') return redirect(next_url) @app.route('/diff//') @login_required def diff_view(backup_id1, backup_id2): user = get_current_user() b1 = Backup.query.get(backup_id1) b2 = Backup.query.get(backup_id2) if not b1 or not b2: flash("Brak backupów.") return redirect(url_for('routers_list')) if b1.router.owner_id != user.id or b2.router.owner_id != user.id: flash("Brak dostępu.") return redirect(url_for('routers_list')) if b1.backup_type != 'export' or b2.backup_type != 'export': flash("Diff można wykonać tylko dla plików 'export'.") return redirect(url_for('router_details', router_id=b1.router_id)) try: with open(b1.file_path, 'r', encoding='utf-8') as f1: file1 = f1.read().splitlines() with open(b2.file_path, 'r', encoding='utf-8') as f2: file2 = f2.read().splitlines() except Exception as e: flash(f"Błąd czytania plików: {e}") return redirect(url_for('router_details', router_id=b1.router_id)) diff_lines = list(difflib.unified_diff( file1, file2, fromfile=os.path.basename(b1.file_path), tofile=os.path.basename(b2.file_path), lineterm='' )) diff_text = "\n".join(diff_lines) return render_template('diff.html', diff_text=diff_text, backup1=b1, backup2=b2) @app.route('/routers/all_backup', methods=['POST']) @login_required def backup_all_routers(): user = get_current_user() routers = Router.query.filter_by(owner_id=user.id).all() messages = [] for r in routers: try: backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}" local_path = ssh_backup(r, backup_name) b = Backup(router_id=r.id, file_path=local_path, backup_type='binary') db.session.add(b) db.session.commit() log_operation(f"Backup binarny dla routera {r.name} OK at {datetime.utcnow()}") messages.append(f"{r.name} OK") except Exception as e: log_operation(f"Backup binarny dla routera {r.name} FAILED at {datetime.utcnow()}: {e}") messages.append(f"{r.name} FAIL: {e}") flash(" | ".join(messages)) return redirect(url_for('dashboard')) @app.route('/mass_actions', methods=['POST']) @login_required def mass_actions(): user = get_current_user() backup_ids = request.form.getlist('backup_id') if not backup_ids: flash("Nie wybrano żadnych backupów.") return redirect(url_for('all_files')) action = request.form.get('action') if action == 'download': backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all() if not backups: flash("Brak dostępu lub nie wybrano żadnych backupów.") return redirect(url_for('all_files')) mem_zip = io.BytesIO() with zipfile.ZipFile(mem_zip, 'w') as zf: for b in backups: arcname = os.path.basename(b.file_path) if not os.path.isfile(b.file_path): continue zf.write(b.file_path, arcname=arcname) mem_zip.seek(0) return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='backup_export.zip') elif action == 'delete': backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all() if not backups: flash("Brak dostępu lub nie wybrano żadnych backupów.") return redirect(url_for('all_files')) for b in backups: try: os.remove(b.file_path) except Exception: pass db.session.delete(b) db.session.commit() flash("Wybrane backupy zostały usunięte.") return redirect(url_for('all_files')) else: flash("Nieprawidłowa akcja.") return redirect(url_for('all_files')) @app.route('/health', methods=['GET']) def healthcheck(): try: db.session.execute(text('SELECT 1')) status = 'ok' except Exception as e: status = 'error' return jsonify({ "status": status, "timestamp": datetime.utcnow().isoformat() + "Z" }) @app.route('/change_password', methods=['GET', 'POST']) @login_required def change_password(): user = get_current_user() if request.method == 'POST': current_password = request.form.get('current_password') new_password = request.form.get('new_password') confirm_password = request.form.get('confirm_password') if not user.check_password(current_password): flash("Obecne hasło jest nieprawidłowe.") return redirect(url_for('change_password')) if new_password != confirm_password: flash("Nowe hasło i potwierdzenie nie są zgodne.") return redirect(url_for('change_password')) user.password_hash = pwd_context.hash(new_password) db.session.commit() flash("Hasło zostało zmienione pomyślnie.") return redirect(url_for('dashboard')) return render_template('change_password.html') @app.route('/router//test_connection', methods=['GET']) @login_required def test_connection(router_id): user = get_current_user() router = Router.query.filter_by(id=router_id, owner_id=user.id).first() if not router: flash("Brak dostępu do routera.") return redirect(url_for('routers_list')) try: result = ssh_test_connection(router) except Exception as e: flash(f"Błąd testu połączenia: {e}") return redirect(url_for('routers_list')) if request.args.get("modal") == "1": return render_template("test_connection_modal.html", router=router, result=result) return render_template("test_connection.html", router=router, result=result) @app.route('/logs') @login_required def logs_page(): logs = OperationLog.query.order_by(OperationLog.timestamp.desc()).all() return render_template('logs.html', logs=logs) @app.route('/logs/delete', methods=['POST']) @login_required def delete_old_logs(): try: delete_days = int(request.form.get('delete_days', 0)) except ValueError: flash("Podana wartość jest nieprawidłowa.") return redirect(url_for('logs_page')) if delete_days < 1: flash("Podaj wartość większą lub równą 1.") return redirect(url_for('logs_page')) cutoff_date = datetime.utcnow() - timedelta(days=delete_days) old_logs = OperationLog.query.filter(OperationLog.timestamp < cutoff_date).all() deleted_count = 0 for log in old_logs: db.session.delete(log) deleted_count += 1 db.session.commit() flash(f"Usunięto {deleted_count} logów starszych niż {delete_days} dni.") return redirect(url_for('logs_page')) @app.route('/test_email', methods=['POST']) @login_required def test_email(): s = get_settings() # Sprawdzamy, czy ustawienia SMTP są skonfigurowane if not (s.smtp_host and s.smtp_login and s.smtp_password): flash("Brak skonfigurowanych ustawień SMTP.") return redirect(url_for('settings_view')) subject = "Testowy e-mail z RouterOS Backup" body = "To jest testowa wiadomość e-mail wysłana z systemu RouterOS Backup." # Używamy recipient_email, jeśli jest ustawiony, w przeciwnym razie smtp_login to_address = s.recipient_email.strip() if s.recipient_email and s.recipient_email.strip() else s.smtp_login.strip() success = send_mail_with_attachment( smtp_host=s.smtp_host, smtp_port=s.smtp_port, smtp_user=s.smtp_login, smtp_pass=s.smtp_password, to_address=to_address, subject=subject, plain_body=body ) if success: flash("Testowy e-mail został wysłany.") else: flash("Wysyłka testowego e-maila nie powiodła się.") return redirect(url_for('settings_view')) @app.route('/test_pushover', methods=['POST']) @login_required def test_pushover(): s = get_settings() # Sprawdzamy, czy ustawienia Pushover są skonfigurowane if not (s.pushover_token and s.pushover_userkey): flash("Brak skonfigurowanych ustawień Pushover.") return redirect(url_for('settings_view')) message = "Testowe powiadomienie Pushover z systemu RouterOS Backup." success = send_pushover(s.pushover_token, s.pushover_userkey, message) if success: flash("Testowe powiadomienie Pushover zostało wysłane.") else: flash("Wysyłka testowego powiadomienia Pushover nie powiodła się.") return redirect(url_for('settings_view')) if __name__ == '__main__': with app.app_context(): reschedule_jobs() atexit.register(lambda: scheduler.shutdown()) app.run(host='0.0.0.0', port=5581, use_reloader=False, debug=True)