2025-02-28 16:26:27 +01:00

1465 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import difflib
import paramiko
import atexit
import io
import zipfile
import requests
import re
import smtplib
import shutil
import socket
import hashlib
from datetime import datetime
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email import encoders
from flask import jsonify
from flask import Flask
from datetime import datetime
from difflib import HtmlDiff
from datetime import datetime, timedelta
from sqlalchemy import text
from flask import (
Flask, render_template, request, redirect,
url_for, session, flash, send_file
)
from flask_sqlalchemy import SQLAlchemy
from passlib.context import CryptContext
#from flask_wtf.csrf import CSRFProtect
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
# REGEX dla nazwy urzadzenia
ALLOWED_NAME_REGEX = re.compile(r'^[A-Za-z0-9_-]+$')
###############################################################################
# Konfiguracja Flask
###############################################################################
app = Flask(__name__)
app.config['SECRET_KEY'] = 'super-secret-key'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///backup_routeros.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app_start_time = datetime.now()
#csrf = CSRFProtect(app)
db = SQLAlchemy(app)
# Folder do przechowywania plików (./data)
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
os.makedirs(DATA_DIR, exist_ok=True)
###############################################################################
# Modele bazy danych
###############################################################################
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
def set_password(self, password):
self.password_hash = pwd_context.hash(password)
def check_password(self, password):
return pwd_context.verify(password, self.password_hash)
class Router(db.Model):
__tablename__ = 'routers'
id = db.Column(db.Integer, primary_key=True)
owner_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
name = db.Column(db.String(120), nullable=False)
host = db.Column(db.String(255), nullable=False)
port = db.Column(db.Integer, default=22)
ssh_user = db.Column(db.String(120), default='admin')
ssh_key = db.Column(db.Text, nullable=True) # klucz prywatny (string lub ścieżka)
ssh_password = db.Column(db.String(120), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
backups = db.relationship('Backup', backref='router', lazy=True)
class Backup(db.Model):
__tablename__ = 'backups'
id = db.Column(db.Integer, primary_key=True)
router_id = db.Column(db.Integer, db.ForeignKey('routers.id'), nullable=False)
file_path = db.Column(db.String(255), nullable=False) # Ścieżka do pliku
backup_type = db.Column(db.String(50), default='export') # 'export' lub 'binary'
created_at = db.Column(db.DateTime, default=datetime.utcnow)
checksum = db.Column(db.String(64), nullable=True)
class OperationLog(db.Model):
__tablename__ = 'operation_logs'
__table_args__ = {'extend_existing': True} # Zapobiega redefinicji tabeli
id = db.Column(db.Integer, primary_key=True)
message = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
class GlobalSettings(db.Model):
__tablename__ = 'global_settings'
id = db.Column(db.Integer, primary_key=True)
backup_retention_days = db.Column(db.Integer, default=7)
auto_backup_interval_days = db.Column(db.Integer, default=1)
cron_schedule = db.Column(db.String(50), default="")
retention_cron = db.Column(db.String(50), default="")
export_cron = db.Column(db.String(50), default="")
binary_cron = db.Column(db.String(50), default="")
enable_auto_export = db.Column(db.Boolean, default=False)
global_ssh_key = db.Column(db.Text, nullable=True)
pushover_token = db.Column(db.String(255), nullable=True)
pushover_userkey = db.Column(db.String(255), nullable=True)
notify_failures_only = db.Column(db.Boolean, default=True)
smtp_host = db.Column(db.String(255), nullable=True)
smtp_port = db.Column(db.Integer, default=587)
smtp_login = db.Column(db.String(255), nullable=True)
smtp_password = db.Column(db.String(255), nullable=True)
smtp_notifications_enabled = db.Column(db.Boolean, default=False)
log_retention_days = db.Column(db.Integer, default=7)
recipient_email = db.Column(db.String(255), nullable=True)
###############################################################################
# Inicjalizacja bazy
###############################################################################
with app.app_context():
db.create_all()
if not GlobalSettings.query.first():
default_settings = GlobalSettings()
db.session.add(default_settings)
db.session.commit()
###############################################################################
# Pomocnicze dekoratory i funkcje
###############################################################################
def login_required(func):
def wrapper(*args, **kwargs):
if 'user_id' not in session:
flash("Musisz być zalogowany, aby uzyskać dostęp.")
return redirect(url_for('login'))
return func(*args, **kwargs)
wrapper.__name__ = func.__name__
return wrapper
def get_current_user():
if 'user_id' in session:
return User.query.get(session['user_id'])
return None
def get_settings() -> GlobalSettings:
s = GlobalSettings.query.first()
if not s:
s = GlobalSettings()
db.session.add(s)
db.session.commit()
return s
def log_operation(message: str):
log = OperationLog(message=message)
db.session.add(log)
db.session.commit()
def load_pkey(ssh_key_str: str):
key_str = ssh_key_str.strip()
if not key_str:
raise ValueError("SSH key is empty. Upewnij się, że podałeś poprawny klucz SSH (globalny lub indywidualny).")
if os.path.isfile(key_str):
return paramiko.RSAKey.from_private_key_file(key_str)
else:
key_buf = io.StringIO(key_str)
try:
return paramiko.RSAKey.from_private_key(key_buf)
except Exception as e:
raise ValueError("Nie udało się załadować klucza SSH. Sprawdź, czy klucz jest poprawny i nie jest zaszyfrowany") from e
def compute_checksum(file_path):
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
###############################################################################
# Funkcje SSH
###############################################################################
def ssh_export(router: Router) -> str:
print(f"[DEBUG] ssh_export -> router id={router.id}, host={router.host}")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Wybór klucza: użyj router.ssh_key, jeśli podany; w przeciwnym razie globalnego klucza
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
if key_source and key_source.strip():
try:
pkey = load_pkey(key_source)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
except Exception as e:
print("[DEBUG] ssh_export -> błąd przy load_pkey/connect:", e)
raise e
else:
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10)
stdin, stdout, stderr = client.exec_command('/export')
output = stdout.read().decode('utf-8', errors='ignore')
client.close()
print(f"[DEBUG] ssh_export -> output length={len(output)}")
return output
def ssh_backup(router: Router, backup_name: str) -> str:
print(f"[DEBUG] ssh_backup -> router id={router.id}, backup_name={backup_name}")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Wybór klucza: jeśli router.ssh_key jest podany, używamy go, w przeciwnym razie używamy global_ssh_key
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
if key_source and key_source.strip():
try:
pkey = load_pkey(key_source)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
except Exception as e:
print("[DEBUG] ssh_backup -> błąd przy load_pkey/connect:", e)
raise e
else:
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10)
command = f"/system backup save name={backup_name}"
stdin, stdout, stderr = client.exec_command(command)
stdout.channel.recv_exit_status()
sftp = client.open_sftp()
remote_file = f"{backup_name}.backup"
local_path = os.path.join(DATA_DIR, f"{backup_name}.backup")
sftp.get(remote_file, local_path)
# Usuwamy zdalny plik backupu, aby nie zapychać pamięci routera
try:
sftp.remove(remote_file)
print(f"[DEBUG] ssh_backup -> usunięto plik {remote_file} z routera")
except Exception as e:
print(f"[DEBUG] ssh_backup -> błąd przy usuwaniu pliku {remote_file} z routera: {e}")
sftp.close()
client.close()
print(f"[DEBUG] ssh_backup -> local_path={local_path}")
return local_path
def ssh_upload_backup(router: Router, local_backup_path: str, expected_checksum: str = None):
# Weryfikacja sumy kontrolnej, jeśli podana
if expected_checksum:
local_checksum = compute_checksum(local_backup_path)
if local_checksum != expected_checksum:
raise ValueError("Suma kontrolna pliku nie zgadza się plik może być uszkodzony.")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Wybór klucza: indywidualny lub globalny
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
if key_source and key_source.strip():
try:
pkey = load_pkey(key_source)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
except Exception as e:
print(f"[DEBUG] ssh_upload_backup -> błąd przy łączeniu z kluczem SSH: {e}")
raise e
else:
client.connect(router.host, port=router.port, username=router.ssh_user,
password=router.ssh_password, timeout=10,
allow_agent=False, look_for_keys=False, banner_timeout=10)
# Otwieramy sesję SFTP, przesyłamy plik, a następnie zamykamy połączenie
sftp = client.open_sftp()
remote_file = os.path.basename(local_backup_path)
sftp.put(local_backup_path, remote_file)
sftp.close()
client.close()
print(f"[DEBUG] ssh_upload_backup -> przesłano {local_backup_path} do routera")
def ssh_test_connection(router: Router) -> dict:
"""Testuje połączenie z routerem i zwraca informacje: model, uptime, hostname."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
if key_source and key_source.strip():
try:
pkey = load_pkey(key_source)
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
except Exception as e:
raise e
else:
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False)
stdin, stdout, stderr = client.exec_command('/system resource print without-paging')
resource_output = stdout.read().decode('utf-8', errors='ignore')
stdin, stdout, stderr = client.exec_command('/system identity print')
identity_output = stdout.read().decode('utf-8', errors='ignore')
client.close()
model = "Nieznany"
uptime = "Nieznany"
hostname = "Nieznany"
for line in resource_output.splitlines():
if "board-name" in line:
model = line.split(":", 1)[1].strip()
if "uptime" in line:
uptime = line.split(":", 1)[1].strip()
for line in identity_output.splitlines():
if "name" in line:
hostname = line.split(":", 1)[1].strip()
return {"model": model, "uptime": uptime, "hostname": hostname}
def get_valid_cron_trigger(cron_expr, default_expr):
try:
trigger = CronTrigger.from_crontab(cron_expr)
return trigger
except ValueError as e:
print(f"[DEBUG] Invalid cron expression: {cron_expr}. Error: {e}. Using default: {default_expr}")
return CronTrigger.from_crontab(default_expr)
def get_email_template(subject, message):
return f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}}
.container {{
max-width: 600px;
margin: 20px auto;
background-color: #ffffff;
padding: 20px;
border-radius: 5px;
box-shadow: 0 0 10px rgba(0,0,0,0.1);
}}
.header {{
background-color: #007bff;
color: #ffffff;
padding: 10px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
margin: 20px 0;
font-size: 16px;
line-height: 1.5;
}}
.footer {{
text-align: center;
font-size: 12px;
color: #777777;
border-top: 1px solid #dddddd;
padding-top: 10px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>{subject}</h2>
</div>
<div class="content">
<p>{message}</p>
</div>
<div class="footer">
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Backup.</p>
</div>
</div>
</body>
</html>
"""
###############################################################################
# Powiadomienia
###############################################################################
def send_pushover(token, userkey, message, title="RouterOS Backup"):
try:
resp = requests.post("https://api.pushover.net/1/messages.json", data={
"token": token,
"user": userkey,
"message": message,
"title": title
})
return resp.status_code == 200
except Exception as e:
print("Pushover error:", e)
return False
def send_mail_with_attachment(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, plain_body, attachment_path="", html_body=None):
if not (smtp_host and smtp_host.strip() and smtp_user and smtp_user.strip() and smtp_pass and smtp_pass.strip()):
print("SMTP not properly configured, skipping email sending.")
return False
try:
msg = MIMEMultipart("alternative")
msg["From"] = smtp_user
msg["To"] = to_address
msg["Subject"] = subject
part1 = MIMEText(plain_body, "plain")
msg.attach(part1)
if html_body is None:
html_body = get_email_template(subject, plain_body)
part2 = MIMEText(html_body, "html")
msg.attach(part2)
if attachment_path and os.path.isfile(attachment_path):
with open(attachment_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename={os.path.basename(attachment_path)}")
msg.attach(part)
if smtp_port == 465:
server = smtplib.SMTP_SSL(smtp_host, smtp_port)
server.login(smtp_user, smtp_pass)
server.send_message(msg)
server.quit()
else:
server = smtplib.SMTP(smtp_host, smtp_port)
server.ehlo()
if smtp_port == 587:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
server.quit()
return True
except Exception as e:
print("Mail error_send_mail_with_attachment:", e)
return False
def send_mail(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, body):
if not (smtp_host and smtp_user and smtp_pass):
print("SMTP not configured, skipping email")
return False
try:
msg = MIMEMultipart()
msg["From"] = smtp_user
msg["To"] = to_address
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg)
return True
except Exception as e:
print("Mail error_send_mail:", e)
return False
def notify(settings: GlobalSettings, message: str, success: bool):
if settings.notify_failures_only and success:
return
if settings.pushover_token and settings.pushover_userkey:
send_pushover(settings.pushover_token, settings.pushover_userkey, message)
# Wysyłka maila tylko jeśli SMTP Notifications są włączone
if settings.smtp_notifications_enabled:
if (settings.smtp_host and settings.smtp_host.strip() and
settings.smtp_login and settings.smtp_login.strip() and
settings.smtp_password and settings.smtp_password.strip()):
try:
to_address = settings.recipient_email.strip() if settings.recipient_email and settings.recipient_email.strip() else settings.smtp_login.strip()
send_mail_with_attachment(
smtp_host=settings.smtp_host.strip(),
smtp_port=settings.smtp_port,
smtp_user=settings.smtp_login.strip(),
smtp_pass=settings.smtp_password.strip(),
to_address=to_address,
subject="RouterOS Backup Notification",
plain_body=message
)
except Exception as e:
print("SMTP send error:", e)
else:
print("SMTP configuration is incomplete. Skipping email notification.")
###############################################################################
# Zadania cykliczne
###############################################################################
def cleanup_old_backups():
with app.app_context():
s = get_settings()
cutoff_date = datetime.utcnow() - timedelta(days=s.backup_retention_days)
old = Backup.query.filter(Backup.created_at < cutoff_date).all()
deleted_count = 0
for b in old:
try:
os.remove(b.file_path)
deleted_count += 1
except FileNotFoundError:
deleted_count += 1
db.session.delete(b)
db.session.commit()
log_operation(f"Automatyczna retencja wykonana at {datetime.utcnow()}: usunięto {deleted_count} backupów (próg: {s.backup_retention_days} dni).")
def scheduled_auto_backup():
with app.app_context():
s = get_settings()
routers = Router.query.all()
for r in routers:
try:
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
export_data = ssh_export(r)
filename = f"{backup_name}.rsc"
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(export_data)
b = Backup(router_id=r.id, file_path=filepath, backup_type='export')
db.session.add(b)
db.session.commit()
notify(s, f"Auto-export dla routera {r.name} OK", True)
log_operation(f"Automatyczny export dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.")
except Exception as e:
notify(s, f"Auto-export dla routera {r.name} FAILED: {e}", False)
log_operation(f"Automatyczny export dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
def scheduled_auto_binary_backup():
with app.app_context():
s = get_settings()
routers = Router.query.all()
for r in routers:
try:
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
local_path = ssh_backup(r, backup_name)
checksum = compute_checksum(local_path)
b = Backup(router_id=r.id, file_path=local_path, backup_type='binary', checksum=checksum)
db.session.add(b)
db.session.commit()
notify(s, f"Auto-binary backup dla routera {r.name} OK", True)
log_operation(f"Auto-binary backup dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.")
except Exception as e:
notify(s, f"Auto-binary backup dla routera {r.name} FAILED: {e}", False)
log_operation(f"Auto-binary backup dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
def schedule_auto_binary_backup_job():
s = get_settings()
try:
scheduler.remove_job("auto_binary_backup_job")
except Exception:
pass
if s.binary_cron:
trigger = get_valid_cron_trigger(s.binary_cron, "15 2 * * *")
scheduler.add_job(func=scheduled_auto_binary_backup, trigger=trigger, id="auto_binary_backup_job", replace_existing=True)
print(f"[DEBUG] schedule_auto_binary_backup_job -> cron_schedule={s.binary_cron}")
else:
scheduler.add_job(func=scheduled_auto_binary_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_binary_backup_job", replace_existing=True)
print(f"[DEBUG] schedule_auto_binary_backup_job -> interval days={s.auto_backup_interval_days}")
def schedule_auto_backup_job():
s = get_settings()
if s.cron_schedule:
trigger = get_valid_cron_trigger(s.cron_schedule, "0 */12 * * *")
scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_backup_job", replace_existing=True)
print(f"[DEBUG] schedule_auto_backup_job -> cron_schedule={s.cron_schedule}")
else:
scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_backup_job", replace_existing=True)
print(f"[DEBUG] schedule_auto_backup_job -> interval days={s.auto_backup_interval_days}")
def schedule_retention_job():
s = get_settings()
try:
scheduler.remove_job("retention_job")
except Exception:
pass
if s.retention_cron:
trigger = CronTrigger.from_crontab(s.retention_cron)
scheduler.add_job(func=cleanup_old_backups, trigger=trigger, id="retention_job", replace_existing=True)
print(f"[DEBUG] schedule_retention_job -> cron_schedule={s.retention_cron}")
else:
scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=s.backup_retention_days, id="retention_job", replace_existing=True)
print(f"[DEBUG] schedule_retention_job -> interval days={s.backup_retention_days}")
def schedule_auto_export_job():
s = get_settings()
try:
scheduler.remove_job("auto_export_job")
except Exception:
pass
if not s.enable_auto_export:
print("[DEBUG] schedule_auto_export_job -> auto export disabled")
return
if s.export_cron:
trigger = get_valid_cron_trigger(s.export_cron, "0 */12 * * *")
else:
trigger = CronTrigger.from_crontab("0 */12 * * *")
scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_export_job", replace_existing=True)
cron_used = s.export_cron if s.export_cron else "0 */12 * * *"
print(f"[DEBUG] schedule_auto_export_job -> cron_schedule={cron_used}")
def cleanup_old_logs():
with app.app_context():
s = get_settings()
cutoff_date = datetime.utcnow() - timedelta(days=s.log_retention_days)
old_logs = OperationLog.query.filter(OperationLog.timestamp < cutoff_date).all()
deleted_count = len(old_logs)
for log in old_logs:
db.session.delete(log)
db.session.commit()
log_operation(f"Automatyczna retencja logów: usunięto {deleted_count} logów starszych niż {s.log_retention_days} dni.")
###############################################################################
# Konfiguracja APScheduler - harmonogram zadań
###############################################################################
scheduler = BackgroundScheduler()
# Dodajemy dwa zadania cykliczne:
# 1) Czyszczenie starych backupów (default co 1 dzień)
# 2) Auto-backup (default co 1 dzień)
# Dodajemy z unikalnymi ID, co ułatwia re-schedulowanie
scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=1, id="cleanup_job")
scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=1, id="auto_backup_job")
scheduler.start()
# Sprzątanie przy zamykaniu
atexit.register(lambda: scheduler.shutdown())
def reschedule_jobs():
s = get_settings()
try:
scheduler.remove_job("auto_export_job")
except Exception:
pass
try:
scheduler.remove_job("retention_job")
except Exception:
pass
try:
scheduler.remove_job("auto_binary_backup_job")
except Exception:
pass
schedule_auto_export_job()
schedule_retention_job()
schedule_auto_binary_backup_job()
scheduler.add_job(func=cleanup_old_logs, trigger='interval', days=1, id="cleanup_logs_job", replace_existing=True)
###############################################################################
# Filtr Jinja2 - basename
###############################################################################
@app.template_filter('basename')
def basename_filter(path):
return os.path.basename(path) if path else path
@app.template_filter('filesize')
def filesize_filter(path):
if not path or not isinstance(path, (str, int, float)):
return "0 B"
# Jeśli wartość jest liczbą, traktujemy ją jako rozmiar w bajtach
if isinstance(path, (int, float)):
size = path
else:
if not os.path.isabs(path):
path = os.path.join(DATA_DIR, path)
if not os.path.exists(path):
return "0 B"
size = os.path.getsize(path)
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024:
return f"{size:.2f} {unit}"
size /= 1024
return f"{size:.2f} PB"
@app.template_filter('resolve_dns')
def resolve_dns_filter(host):
try:
# Zwraca nazwę hosta uzyskaną z reverse DNS
resolved = socket.gethostbyaddr(host)[0]
return resolved
except Exception:
return host
def get_data_folder_size():
total_size = 0
for dirpath, dirnames, filenames in os.walk(DATA_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
try:
total_size += os.path.getsize(fp)
except OSError:
pass
return total_size
@app.template_global()
def bootstrap_alert_category(cat):
mapping = {
"error": "danger",
"fail": "danger",
"warn": "warning",
"warning": "warning",
"ok": "success",
"success": "success",
"info": "info"
}
return mapping.get(cat.lower(), "info")
###############################################################################
# ROUTES
###############################################################################
@app.route('/')
def index():
if get_current_user():
return redirect(url_for('dashboard'))
return render_template('index.html')
# Globalna zmienna z czasem uruchomienia aplikacji
app_start_time = datetime.now()
def get_data_folder_size():
total_size = 0
for dirpath, dirnames, filenames in os.walk(DATA_DIR):
for f in filenames:
fp = os.path.join(dirpath, f)
try:
total_size += os.path.getsize(fp)
except OSError:
pass
return total_size
@app.route('/dashboard')
@login_required
def dashboard():
user = get_current_user()
routers_count = Router.query.filter_by(owner_id=user.id).count()
export_count = db.session.query(Backup).join(Router).filter(
Router.owner_id == user.id, Backup.backup_type == 'export'
).count()
binary_count = db.session.query(Backup).join(Router).filter(
Router.owner_id == user.id, Backup.backup_type == 'binary'
).count()
total_backups = export_count + binary_count
logs = OperationLog.query.order_by(OperationLog.timestamp.desc()).limit(10).all()
disk = shutil.disk_usage(DATA_DIR)
folder_used = get_data_folder_size()
uptime = datetime.now() - app_start_time
disk_usage_percent = (folder_used / disk.total) * 100 if disk.total > 0 else 0
s = get_settings()
success_ops = OperationLog.query.filter(OperationLog.message.ilike("%OK%")).count()
failure_ops = OperationLog.query.filter(OperationLog.message.ilike("%FAILED%")).count()
current_time = datetime.now().astimezone()
return render_template('dashboard.html',
user=user,
routers_count=routers_count,
export_count=export_count,
binary_count=binary_count,
total_backups=total_backups,
logs=logs,
disk_total=disk.total,
disk_used=folder_used,
disk_free=disk.free,
disk_usage_percent=disk_usage_percent,
uptime=uptime,
success_ops=success_ops,
failure_ops=failure_ops,
current_time=current_time,
settings=s)
@app.route('/advanced_schedule', methods=['GET', 'POST'])
@login_required
def advanced_schedule():
s = get_settings()
if request.method == 'POST':
s.retention_cron = request.form.get('retention_cron', '').strip()
s.binary_cron = request.form.get('binary_cron', '').strip()
s.export_cron = request.form.get('export_cron', '').strip()
s.backup_retention_days = int(request.form.get('backup_retention_days', s.backup_retention_days))
s.log_retention_days = int(request.form.get('log_retention_days', s.log_retention_days))
s.enable_auto_export = True if request.form.get('enable_auto_export') == 'on' else False
db.session.commit()
reschedule_jobs() # Aktualizacja harmonogramu zadań
flash("Ustawienia harmonogramu zostały zapisane.")
return redirect(url_for('advanced_schedule'))
return render_template('advanced_schedule.html', settings=s)
@app.route('/toggle_dark_mode')
def toggle_dark_mode():
current_mode = session.get('dark_mode', True)
session['dark_mode'] = not current_mode
return redirect(request.referrer or url_for('index'))
@app.route('/register', methods=['GET','POST'])
def register():
if request.method=='POST':
username = request.form['username']
password = request.form['password']
if User.query.filter_by(username=username).first():
flash("Użytkownik już istnieje")
return redirect(url_for('register'))
u = User(username=username, password_hash=bcrypt.hash(password))
db.session.add(u)
db.session.commit()
flash("Zarejestrowano, możesz się zalogować.")
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET','POST'])
def login():
if request.method=='POST':
username = request.form['username']
password = request.form['password']
u = User.query.filter_by(username=username).first()
if u and u.check_password(password):
session['user_id'] = u.id
flash("Zalogowano pomyślnie.")
return redirect(url_for('dashboard'))
else:
flash("Nieprawidłowe dane logowania.")
return redirect(url_for('login'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash("Wylogowano.")
return redirect(url_for('index'))
@app.route('/routers')
@login_required
def routers_list():
user = get_current_user()
routers = Router.query.filter_by(owner_id=user.id).order_by(Router.created_at.desc()).all()
return render_template('routers.html', user=user, routers=routers)
@app.route('/routers/add', methods=['GET','POST'])
@login_required
def add_router():
if request.method=='POST':
user = get_current_user()
name = request.form['name'].strip()
# Walidacja nazwy: tylko litery, cyfry, - i _
if not ALLOWED_NAME_REGEX.match(name):
flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).")
return redirect(url_for('add_router'))
host = request.form['host'].strip()
port = request.form.get('port','22').strip()
ssh_user = request.form['ssh_user'].strip()
ssh_key = request.form['ssh_key'].strip()
ssh_password = request.form['ssh_password'].strip()
r = Router(owner_id=user.id, name=name, host=host, port=int(port),
ssh_user=ssh_user, ssh_key=ssh_key, ssh_password=ssh_password)
db.session.add(r)
db.session.commit()
flash("Dodano router.")
return redirect(url_for('routers_list'))
return render_template('add_router.html')
@app.route('/router/<int:router_id>')
@login_required
def router_details(router_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
flash("Brak dostępu do routera.")
return redirect(url_for('routers_list'))
all_b = Backup.query.filter_by(router_id=router.id).order_by(Backup.created_at.desc()).all()
export_b = [x for x in all_b if x.backup_type=='export']
bin_b = [x for x in all_b if x.backup_type=='binary']
#return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b)
view = request.args.get('view', 'v2')
if view == 'v2':
return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v2')
else:
return render_template('router_details.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v1')
@app.route('/router/<int:router_id>/export', methods=['POST'])
@login_required
def router_export(router_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return {"status": "error", "message": "Brak routera."}, 400
flash("Brak routera.")
return redirect(url_for('routers_list'))
try:
backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}"
export_data = ssh_export(router)
filename = f"{backup_name}.rsc"
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(export_data)
b = Backup(router_id=router.id, file_path=filepath, backup_type='export')
db.session.add(b)
db.session.commit()
notify(get_settings(), f"Export {router.name} OK", True)
log_operation(f"Export wykonany dla routera {router.name} at {datetime.utcnow()}")
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return {"status": "success", "message": "export zakończony."}
flash("Export zakończony.")
except Exception as e:
notify(get_settings(), f"Export {router.name} FAIL: {e}", False)
log_operation(f"Export dla routera {router.name} FAILED: {e}")
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
return {"status": "error", "message": str(e)}, 500
flash(f"Błąd: {e}")
return redirect(url_for('router_details', router_id=router.id))
@app.route('/router/<int:router_id>/backup', methods=['POST'])
@login_required
def router_backup(router_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
flash("Brak routera.")
return redirect(url_for('routers_list'))
try:
backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}"
local_path = ssh_backup(router, backup_name)
checksum = compute_checksum(local_path)
b = Backup(router_id=router.id, file_path=local_path, backup_type='binary', checksum=checksum)
db.session.add(b)
db.session.commit()
notify(get_settings(), f"Backup {router.name} OK", True)
log_operation(f"Backup binarny wykonany dla routera {router.name} at {datetime.utcnow()}")
flash("Backup binarny zakończony.")
except Exception as e:
notify(get_settings(), f"Backup {router.name} FAIL: {e}", False)
log_operation(f"Backup dla routera {router.name} FAILED: {e}")
flash(f"Błąd: {e}")
return redirect(url_for('router_details', router_id=router.id))
@app.route('/router/<int:router_id>/upload_backup/<int:backup_id>', methods=['POST'])
@login_required
def upload_backup(router_id, backup_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
flash("Brak routera.")
return redirect(url_for('routers_list'))
b = Backup.query.filter_by(id=backup_id, router_id=router.id, backup_type='binary').first()
if not b:
flash("Nie znaleziono backupu binarnego.")
#return redirect(url_for('router_details', router_id=router.id))
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
return redirect(next_url)
local_checksum = compute_checksum(b.file_path)
if b.checksum != local_checksum:
flash("Błąd: suma kontrolna backupu nie zgadza się plik może być uszkodzony.")
#return redirect(url_for('router_details', router_id=router.id))
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
return redirect(next_url)
try:
ssh_upload_backup(router, b.file_path, expected_checksum=b.checksum)
log_operation(f"Backup {os.path.basename(b.file_path)} wgrany do routera {router.name} at {datetime.utcnow()}")
flash("Plik backupu wgrany do routera.")
except Exception as e:
flash(f"Błąd wgrywania: {e}")
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
return redirect(next_url)
@app.route('/backup/delete/<int:backup_id>', methods=['POST'])
@login_required
def delete_backup(backup_id):
user = get_current_user()
b = Backup.query.get(backup_id)
if not b or b.router.owner_id != user.id:
flash("Brak dostępu do backupu.")
return redirect(url_for('dashboard'))
try:
os.remove(b.file_path)
except FileNotFoundError:
pass
db.session.delete(b)
db.session.commit()
flash("Backup usunięty.")
#return redirect(url_for('router_details', router_id=b.router_id))
next_url = request.form.get('next') or url_for('dashboard')
return redirect(next_url)
@app.route('/router/delete/<int:router_id>', methods=['POST'])
@login_required
def delete_router(router_id):
user = get_current_user()
r = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not r:
flash("Brak dostępu do routera.")
return redirect(url_for('routers_list'))
for b in r.backups:
try:
os.remove(b.file_path)
except FileNotFoundError:
pass
db.session.delete(b)
db.session.delete(r)
db.session.commit()
log_operation(f"Router {r.name} usunięty at {datetime.utcnow()}")
flash("Router usunięty.")
return redirect(url_for('routers_list'))
@app.route('/routers/all_export', methods=['POST'], endpoint="export_all_routers")
@login_required
def export_all_routers():
user = get_current_user()
routers = Router.query.filter_by(owner_id=user.id).all()
messages = []
for r in routers:
try:
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
export_data = ssh_export(r)
filename = f"{backup_name}.rsc"
filepath = os.path.join(DATA_DIR, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(export_data)
b = Backup(router_id=r.id, file_path=filepath, backup_type='export')
db.session.add(b)
db.session.commit()
log_operation(f"Export dla routera {r.name} OK at {datetime.utcnow()}")
messages.append(f"{r.name} OK")
except Exception as e:
log_operation(f"Export dla routera {r.name} FAILED: {e}")
messages.append(f"{r.name} FAIL: {e}")
flash(" | ".join(messages))
return redirect(url_for('dashboard'))
@app.route('/diff_selector', methods=['GET', 'POST'])
@login_required
def diff_selector():
user = get_current_user()
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.backup_type=='export').order_by(Backup.created_at.desc()).all()
if request.method == 'POST':
bid1 = request.form.get('backup1')
bid2 = request.form.get('backup2')
if bid1 and bid2:
return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2))
else:
flash("Wybierz dwa backupy do porównania.")
return render_template('diff_selector.html', backups=backups)
@app.route('/all_files')
@login_required
def all_files():
user = get_current_user()
query = Backup.query.join(Router).filter(Router.owner_id == user.id)
# Filtrowanie wyszukiwanie po nazwie pliku (zastosowanie filtru "basename")
search = request.args.get('search', '')
if search:
query = query.filter(Backup.file_path.ilike(f"%{search}%"))
sort_by = request.args.get('sort_by', 'created_at')
order = request.args.get('order', 'desc')
if sort_by not in ['created_at', 'file_path']:
sort_by = 'created_at'
sort_column = getattr(Backup, sort_by)
if order == 'asc':
query = query.order_by(sort_column.asc())
else:
query = query.order_by(sort_column.desc())
files = query.all()
total_size = 0
for f in files:
full_path = f.file_path if os.path.isabs(f.file_path) else os.path.join(DATA_DIR, f.file_path)
try:
total_size += os.path.getsize(full_path)
except OSError:
pass
return render_template('all_files.html', files=files, total_size=total_size, search=search, sort_by=sort_by, order=order)
@app.route('/view_export/<int:backup_id>')
@login_required
def view_export(backup_id):
user = get_current_user()
b = Backup.query.get(backup_id)
if not b or b.router.owner_id != user.id:
flash("Brak dostępu do backupu.")
return redirect(url_for('all_files'))
if b.backup_type != 'export':
flash("Wybrany backup nie jest plikiem exportu.")
return redirect(url_for('all_files'))
try:
with open(b.file_path, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
flash("Błąd odczytu pliku: " + str(e))
return redirect(url_for('all_files'))
next_url = request.args.get("next") or request.referrer or url_for('all_files')
return render_template('view_export.html', backup=b, content=content, next_url=next_url)
@app.route('/send_export_email/<int:backup_id>', methods=['POST'])
@login_required
def send_export_email(backup_id):
user = get_current_user()
b = Backup.query.get(backup_id)
if not b or b.router.owner_id != user.id:
flash("Brak dostępu do backupu.")
return redirect(url_for('dashboard'))
s = get_settings()
if not (s.smtp_host and s.smtp_login and s.smtp_password):
flash("Nie skonfigurowano ustawień SMTP w panelu.")
return redirect(url_for('settings_view'))
subject = f"RouterOS Export: {os.path.basename(b.file_path)}"
body = f"Przesyłam export {os.path.basename(b.file_path)} z routera {b.router.name}."
if send_mail_with_attachment(s.smtp_host, s.smtp_port, s.smtp_login, s.smtp_password,
s.smtp_login, subject, body, b.file_path):
flash("Wysłano export mailem.")
else:
flash("Błąd wysyłki mailowej.")
#return redirect(url_for('router_details', router_id=b.router_id))
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
return redirect(next_url)
@app.route('/download/<path:filename>')
@login_required
def download_file(filename):
# Upewnij się, że pobierany plik znajduje się w katalogu DATA_DIR
safe_filename = os.path.basename(filename) # zapobiega path traversal
safe_path = os.path.join(DATA_DIR, safe_filename)
if not os.path.exists(safe_path):
flash("Plik nie istnieje.")
return redirect(url_for('all_files'))
return send_file(safe_path, as_attachment=True)
# Nowa podstrona: wybór backupów do diff (alternatywnie)
@app.route('/diff/selected', methods=['POST'])
@login_required
def diff_selected():
bid1 = request.form.get('backup1')
bid2 = request.form.get('backup2')
if bid1 and bid2:
return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2))
else:
flash("Musisz wybrać dwa pliki.")
return redirect(url_for('diff_selector'))
@app.route('/settings', methods=['GET','POST'], endpoint="settings_view")
@login_required
def settings_view():
user = get_current_user()
s = get_settings()
if request.method == 'POST':
# Usunięto pola backup oraz harmonogram CRON
s.global_ssh_key = request.form.get('global_ssh_key', '')
s.pushover_token = request.form.get('pushover_token', '')
s.pushover_userkey = request.form.get('pushover_userkey', '')
s.notify_failures_only = bool(request.form.get('notify_failures_only', False))
s.smtp_notifications_enabled = True if request.form.get('smtp_notifications_enabled') == 'on' else False
s.smtp_host = request.form.get('smtp_host', '')
s.smtp_port = int(request.form.get('smtp_port', '587'))
s.smtp_login = request.form.get('smtp_login', '')
s.smtp_password = request.form.get('smtp_password', '')
s.recipient_email = request.form.get('recipient_email', '')
db.session.commit()
#reschedule_jobs() # Aktualizacja zadań zadania dotyczące backupu/CRON zostaną teraz sterowane z /advanced_schedule
flash("Zapisano ustawienia.")
return redirect(url_for('settings_view'))
return render_template('settings.html', settings=s)
@app.route('/router/<int:router_id>/edit', methods=['GET','POST'])
@login_required
def edit_router(router_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
flash("Brak routera.")
return redirect(url_for('routers_list'))
if request.method == 'POST':
# Przytnij wejścia i waliduj nazwę
new_name = request.form['name'].strip()
if not ALLOWED_NAME_REGEX.match(new_name):
flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).")
return redirect(url_for('edit_router', router_id=router_id))
router.name = new_name
router.host = request.form['host'].strip()
router.port = int(request.form.get('port', '22').strip())
router.ssh_user = request.form['ssh_user'].strip()
router.ssh_key = request.form['ssh_key'].strip()
router.ssh_password = request.form['ssh_password'].strip()
db.session.commit()
flash("Zapisano zmiany w routerze.")
return redirect(url_for('routers_list'))
return render_template('edit_router.html', router=router)
@app.route('/download_zip', methods=['POST'])
@login_required
def download_zip():
"""
Pobieranie wybranych backupów (lista backup_id) w formie pliku ZIP.
"""
user = get_current_user()
backup_ids = request.form.getlist('backup_id') # np. checkboxy "backup_id"
# Filtrujemy tylko backupy należące do usera
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
if not backups:
flash("Nie wybrano żadnych plików lub brak dostępu.")
return redirect(url_for('dashboard'))
# Tworzymy ZIP w pamięci
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, 'w') as zf:
for b in backups:
arcname = os.path.basename(b.file_path)
if not os.path.isfile(b.file_path):
continue
zf.write(b.file_path, arcname=arcname)
mem_zip.seek(0)
return send_file(mem_zip, mimetype='application/zip',
as_attachment=True,
download_name='backup_export.zip')
@app.route('/send_by_email/<int:backup_id>', methods=['POST'])
@login_required
def send_by_email(backup_id):
user = get_current_user()
b = Backup.query.get(backup_id)
if not b or b.router.owner_id != user.id:
flash("Brak dostępu do backupu.")
return redirect(url_for('dashboard'))
settings = get_settings()
if not (settings.smtp_host and settings.smtp_login and settings.smtp_password):
flash("Nie skonfigurowano ustawień SMTP w panelu.")
return redirect(url_for('settings_view'))
subject = f"RouterOS Backup: {os.path.basename(b.file_path)}"
body = f"Przesyłam backup {os.path.basename(b.file_path)} z routera {b.router.name}."
if send_mail_with_attachment(settings.smtp_host, settings.smtp_port,
settings.smtp_login, settings.smtp_password,
settings.smtp_login, subject, body, b.file_path):
flash("Wysłano plik mailem.")
else:
flash("Błąd wysyłki mailowej.")
#return redirect(url_for('router_details', router_id=b.router_id))
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
return redirect(next_url)
@app.route('/diff/<int:backup_id1>/<int:backup_id2>')
@login_required
def diff_view(backup_id1, backup_id2):
user = get_current_user()
b1 = Backup.query.get(backup_id1)
b2 = Backup.query.get(backup_id2)
if not b1 or not b2:
flash("Brak backupów.")
return redirect(url_for('routers_list'))
if b1.router.owner_id != user.id or b2.router.owner_id != user.id:
flash("Brak dostępu.")
return redirect(url_for('routers_list'))
if b1.backup_type != 'export' or b2.backup_type != 'export':
flash("Diff można wykonać tylko dla plików 'export'.")
return redirect(url_for('router_details', router_id=b1.router_id))
try:
with open(b1.file_path, 'r', encoding='utf-8') as f1:
file1 = f1.read().splitlines()
with open(b2.file_path, 'r', encoding='utf-8') as f2:
file2 = f2.read().splitlines()
except Exception as e:
flash(f"Błąd czytania plików: {e}")
return redirect(url_for('router_details', router_id=b1.router_id))
diff_lines = list(difflib.unified_diff(
file1, file2,
fromfile=os.path.basename(b1.file_path),
tofile=os.path.basename(b2.file_path),
lineterm=''
))
diff_text = "\n".join(diff_lines)
return render_template('diff.html', diff_text=diff_text, backup1=b1, backup2=b2)
@app.route('/routers/all_backup', methods=['POST'])
@login_required
def backup_all_routers():
user = get_current_user()
routers = Router.query.filter_by(owner_id=user.id).all()
messages = []
for r in routers:
try:
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
local_path = ssh_backup(r, backup_name)
b = Backup(router_id=r.id, file_path=local_path, backup_type='binary')
db.session.add(b)
db.session.commit()
log_operation(f"Backup binarny dla routera {r.name} OK at {datetime.utcnow()}")
messages.append(f"{r.name} OK")
except Exception as e:
log_operation(f"Backup binarny dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
messages.append(f"{r.name} FAIL: {e}")
flash(" | ".join(messages))
return redirect(url_for('dashboard'))
@app.route('/mass_actions', methods=['POST'])
@login_required
def mass_actions():
user = get_current_user()
backup_ids = request.form.getlist('backup_id')
if not backup_ids:
flash("Nie wybrano żadnych backupów.")
return redirect(url_for('all_files'))
action = request.form.get('action')
if action == 'download':
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
if not backups:
flash("Brak dostępu lub nie wybrano żadnych backupów.")
return redirect(url_for('all_files'))
mem_zip = io.BytesIO()
with zipfile.ZipFile(mem_zip, 'w') as zf:
for b in backups:
arcname = os.path.basename(b.file_path)
if not os.path.isfile(b.file_path):
continue
zf.write(b.file_path, arcname=arcname)
mem_zip.seek(0)
return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='backup_export.zip')
elif action == 'delete':
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
if not backups:
flash("Brak dostępu lub nie wybrano żadnych backupów.")
return redirect(url_for('all_files'))
for b in backups:
try:
os.remove(b.file_path)
except Exception:
pass
db.session.delete(b)
db.session.commit()
flash("Wybrane backupy zostały usunięte.")
return redirect(url_for('all_files'))
else:
flash("Nieprawidłowa akcja.")
return redirect(url_for('all_files'))
@app.route('/health', methods=['GET'])
def healthcheck():
try:
db.session.execute(text('SELECT 1'))
status = 'ok'
except Exception as e:
status = 'error'
return jsonify({
"status": status,
"timestamp": datetime.utcnow().isoformat() + "Z"
})
@app.route('/change_password', methods=['GET', 'POST'])
@login_required
def change_password():
user = get_current_user()
if request.method == 'POST':
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not user.check_password(current_password):
flash("Obecne hasło jest nieprawidłowe.")
return redirect(url_for('change_password'))
if new_password != confirm_password:
flash("Nowe hasło i potwierdzenie nie są zgodne.")
return redirect(url_for('change_password'))
user.password_hash = pwd_context.hash(new_password)
db.session.commit()
flash("Hasło zostało zmienione pomyślnie.")
return redirect(url_for('dashboard'))
return render_template('change_password.html')
@app.route('/router/<int:router_id>/test_connection', methods=['GET'])
@login_required
def test_connection(router_id):
user = get_current_user()
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
if not router:
flash("Brak dostępu do routera.")
return redirect(url_for('routers_list'))
try:
result = ssh_test_connection(router)
except Exception as e:
flash(f"Błąd testu połączenia: {e}")
return redirect(url_for('routers_list'))
if request.args.get("modal") == "1":
return render_template("test_connection_modal.html", router=router, result=result)
return render_template("test_connection.html", router=router, result=result)
@app.route('/logs')
@login_required
def logs_page():
logs = OperationLog.query.order_by(OperationLog.timestamp.desc()).all()
return render_template('logs.html', logs=logs)
@app.route('/logs/delete', methods=['POST'])
@login_required
def delete_old_logs():
try:
delete_days = int(request.form.get('delete_days', 0))
except ValueError:
flash("Podana wartość jest nieprawidłowa.")
return redirect(url_for('logs_page'))
if delete_days < 1:
flash("Podaj wartość większą lub równą 1.")
return redirect(url_for('logs_page'))
cutoff_date = datetime.utcnow() - timedelta(days=delete_days)
old_logs = OperationLog.query.filter(OperationLog.timestamp < cutoff_date).all()
deleted_count = 0
for log in old_logs:
db.session.delete(log)
deleted_count += 1
db.session.commit()
flash(f"Usunięto {deleted_count} logów starszych niż {delete_days} dni.")
return redirect(url_for('logs_page'))
@app.route('/test_email', methods=['POST'])
@login_required
def test_email():
s = get_settings()
# Sprawdzamy, czy ustawienia SMTP są skonfigurowane
if not (s.smtp_host and s.smtp_login and s.smtp_password):
flash("Brak skonfigurowanych ustawień SMTP.")
return redirect(url_for('settings_view'))
subject = "Testowy e-mail z RouterOS Backup"
body = "To jest testowa wiadomość e-mail wysłana z systemu RouterOS Backup."
# Używamy recipient_email, jeśli jest ustawiony, w przeciwnym razie smtp_login
to_address = s.recipient_email.strip() if s.recipient_email and s.recipient_email.strip() else s.smtp_login.strip()
success = send_mail_with_attachment(
smtp_host=s.smtp_host,
smtp_port=s.smtp_port,
smtp_user=s.smtp_login,
smtp_pass=s.smtp_password,
to_address=to_address,
subject=subject,
plain_body=body
)
if success:
flash("Testowy e-mail został wysłany.")
else:
flash("Wysyłka testowego e-maila nie powiodła się.")
return redirect(url_for('settings_view'))
@app.route('/test_pushover', methods=['POST'])
@login_required
def test_pushover():
s = get_settings()
# Sprawdzamy, czy ustawienia Pushover są skonfigurowane
if not (s.pushover_token and s.pushover_userkey):
flash("Brak skonfigurowanych ustawień Pushover.")
return redirect(url_for('settings_view'))
message = "Testowe powiadomienie Pushover z systemu RouterOS Backup."
success = send_pushover(s.pushover_token, s.pushover_userkey, message)
if success:
flash("Testowe powiadomienie Pushover zostało wysłane.")
else:
flash("Wysyłka testowego powiadomienia Pushover nie powiodła się.")
return redirect(url_for('settings_view'))
if __name__ == '__main__':
with app.app_context():
reschedule_jobs()
atexit.register(lambda: scheduler.shutdown())
app.run(host='0.0.0.0', port=5581, use_reloader=False, debug=True)