1337 lines
54 KiB
Python
1337 lines
54 KiB
Python
import os
|
||
import difflib
|
||
import paramiko
|
||
import atexit
|
||
import io
|
||
import zipfile
|
||
import requests
|
||
import re
|
||
import smtplib
|
||
import shutil
|
||
import socket
|
||
from datetime import datetime
|
||
from ftplib import FTP
|
||
from email.mime.base import MIMEBase
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.text import MIMEText
|
||
from email import encoders
|
||
from flask import jsonify
|
||
from flask import Flask
|
||
import shutil
|
||
from datetime import datetime
|
||
from difflib import HtmlDiff
|
||
import difflib
|
||
from datetime import datetime, timedelta
|
||
from sqlalchemy import text
|
||
|
||
from flask import (
|
||
Flask, render_template, request, redirect,
|
||
url_for, session, flash, send_file
|
||
)
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from passlib.hash import bcrypt
|
||
#from flask_wtf.csrf import CSRFProtect
|
||
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
|
||
# REGEX dla nazwy urzadzenia
|
||
ALLOWED_NAME_REGEX = re.compile(r'^[A-Za-z0-9_-]+$')
|
||
|
||
###############################################################################
|
||
# Konfiguracja Flask
|
||
###############################################################################
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = 'super-secret-key'
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///backup_routeros.db'
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||
|
||
app_start_time = datetime.now()
|
||
#csrf = CSRFProtect(app)
|
||
|
||
db = SQLAlchemy(app)
|
||
|
||
# Folder do przechowywania plików (./data)
|
||
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
|
||
os.makedirs(DATA_DIR, exist_ok=True)
|
||
|
||
###############################################################################
|
||
# Modele bazy danych
|
||
###############################################################################
|
||
class User(db.Model):
|
||
__tablename__ = 'users'
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(120), unique=True, nullable=False)
|
||
password_hash = db.Column(db.String(255), nullable=False)
|
||
|
||
def check_password(self, password):
|
||
return bcrypt.verify(password, self.password_hash)
|
||
|
||
class Router(db.Model):
|
||
__tablename__ = 'routers'
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
owner_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
|
||
name = db.Column(db.String(120), nullable=False)
|
||
host = db.Column(db.String(255), nullable=False)
|
||
port = db.Column(db.Integer, default=22)
|
||
ssh_user = db.Column(db.String(120), default='admin')
|
||
ssh_key = db.Column(db.Text, nullable=True) # klucz prywatny (string lub ścieżka)
|
||
ssh_password = db.Column(db.String(120), nullable=True)
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
backups = db.relationship('Backup', backref='router', lazy=True)
|
||
|
||
class Backup(db.Model):
|
||
__tablename__ = 'backups'
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
router_id = db.Column(db.Integer, db.ForeignKey('routers.id'), nullable=False)
|
||
file_path = db.Column(db.String(255), nullable=False) # Ścieżka do pliku
|
||
backup_type = db.Column(db.String(50), default='export') # 'export' lub 'binary'
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
class OperationLog(db.Model):
|
||
__tablename__ = 'operation_logs'
|
||
__table_args__ = {'extend_existing': True} # Zapobiega redefinicji tabeli
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
message = db.Column(db.Text, nullable=False)
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
class GlobalSettings(db.Model):
|
||
__tablename__ = 'global_settings'
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
backup_retention_days = db.Column(db.Integer, default=7)
|
||
auto_backup_interval_days = db.Column(db.Integer, default=1)
|
||
cron_schedule = db.Column(db.String(50), default="")
|
||
retention_cron = db.Column(db.String(50), default="")
|
||
export_cron = db.Column(db.String(50), default="")
|
||
binary_cron = db.Column(db.String(50), default="")
|
||
enable_auto_export = db.Column(db.Boolean, default=False)
|
||
global_ssh_key = db.Column(db.Text, nullable=True)
|
||
pushover_token = db.Column(db.String(255), nullable=True)
|
||
pushover_userkey = db.Column(db.String(255), nullable=True)
|
||
notify_failures_only = db.Column(db.Boolean, default=True)
|
||
smtp_host = db.Column(db.String(255), nullable=True)
|
||
smtp_port = db.Column(db.Integer, default=587)
|
||
smtp_login = db.Column(db.String(255), nullable=True)
|
||
smtp_password = db.Column(db.String(255), nullable=True)
|
||
smtp_notifications_enabled = db.Column(db.Boolean, default=False)
|
||
|
||
###############################################################################
|
||
# Inicjalizacja bazy
|
||
###############################################################################
|
||
with app.app_context():
|
||
db.create_all()
|
||
if not GlobalSettings.query.first():
|
||
default_settings = GlobalSettings()
|
||
db.session.add(default_settings)
|
||
db.session.commit()
|
||
|
||
###############################################################################
|
||
# Pomocnicze dekoratory i funkcje
|
||
###############################################################################
|
||
def login_required(func):
|
||
def wrapper(*args, **kwargs):
|
||
if 'user_id' not in session:
|
||
flash("Musisz być zalogowany, aby uzyskać dostęp.")
|
||
return redirect(url_for('login'))
|
||
return func(*args, **kwargs)
|
||
wrapper.__name__ = func.__name__
|
||
return wrapper
|
||
|
||
def get_current_user():
|
||
if 'user_id' in session:
|
||
return User.query.get(session['user_id'])
|
||
return None
|
||
|
||
def get_settings() -> GlobalSettings:
|
||
s = GlobalSettings.query.first()
|
||
if not s:
|
||
s = GlobalSettings()
|
||
db.session.add(s)
|
||
db.session.commit()
|
||
return s
|
||
|
||
def log_operation(message: str):
|
||
log = OperationLog(message=message)
|
||
db.session.add(log)
|
||
db.session.commit()
|
||
|
||
def load_pkey(ssh_key_str: str):
|
||
key_str = ssh_key_str.strip()
|
||
if not key_str:
|
||
raise ValueError("SSH key is empty. Upewnij się, że podałeś poprawny klucz SSH (globalny lub indywidualny).")
|
||
if os.path.isfile(key_str):
|
||
return paramiko.RSAKey.from_private_key_file(key_str)
|
||
else:
|
||
key_buf = io.StringIO(key_str)
|
||
try:
|
||
return paramiko.RSAKey.from_private_key(key_buf)
|
||
except Exception as e:
|
||
raise ValueError("Nie udało się załadować klucza SSH. Sprawdź, czy klucz jest poprawny i nie jest zaszyfrowany") from e
|
||
|
||
###############################################################################
|
||
# Funkcje SSH
|
||
###############################################################################
|
||
def ssh_export(router: Router) -> str:
|
||
print(f"[DEBUG] ssh_export -> router id={router.id}, host={router.host}")
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# Wybór klucza: użyj router.ssh_key, jeśli podany; w przeciwnym razie globalnego klucza
|
||
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
|
||
|
||
if key_source and key_source.strip():
|
||
try:
|
||
pkey = load_pkey(key_source)
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
|
||
except Exception as e:
|
||
print("[DEBUG] ssh_export -> błąd przy load_pkey/connect:", e)
|
||
raise e
|
||
else:
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10)
|
||
stdin, stdout, stderr = client.exec_command('/export')
|
||
output = stdout.read().decode('utf-8', errors='ignore')
|
||
client.close()
|
||
print(f"[DEBUG] ssh_export -> output length={len(output)}")
|
||
return output
|
||
|
||
def ssh_backup(router: Router, backup_name: str) -> str:
|
||
print(f"[DEBUG] ssh_backup -> router id={router.id}, backup_name={backup_name}")
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# Wybór klucza: jeśli router.ssh_key jest podany, używamy go, w przeciwnym razie używamy global_ssh_key
|
||
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
|
||
|
||
if key_source and key_source.strip():
|
||
try:
|
||
pkey = load_pkey(key_source)
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
|
||
except Exception as e:
|
||
print("[DEBUG] ssh_backup -> błąd przy load_pkey/connect:", e)
|
||
raise e
|
||
else:
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10)
|
||
|
||
command = f"/system backup save name={backup_name}"
|
||
stdin, stdout, stderr = client.exec_command(command)
|
||
stdout.channel.recv_exit_status()
|
||
|
||
sftp = client.open_sftp()
|
||
remote_file = f"{backup_name}.backup"
|
||
local_path = os.path.join(DATA_DIR, f"{backup_name}.backup")
|
||
sftp.get(remote_file, local_path)
|
||
# Usuwamy zdalny plik backupu, aby nie zapychać pamięci routera
|
||
try:
|
||
sftp.remove(remote_file)
|
||
print(f"[DEBUG] ssh_backup -> usunięto plik {remote_file} z routera")
|
||
except Exception as e:
|
||
print(f"[DEBUG] ssh_backup -> błąd przy usuwaniu pliku {remote_file} z routera: {e}")
|
||
sftp.close()
|
||
client.close()
|
||
print(f"[DEBUG] ssh_backup -> local_path={local_path}")
|
||
return local_path
|
||
|
||
def ssh_upload_backup(router: Router, local_backup_path: str):
|
||
print(f"[DEBUG] ssh_upload_backup -> router id={router.id}, local_backup_path={local_backup_path}")
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# Używamy indywidualnego klucza, a jeśli nie ma, to globalnego
|
||
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
|
||
if key_source and key_source.strip():
|
||
try:
|
||
pkey = load_pkey(key_source)
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
|
||
except Exception as e:
|
||
print(f"[DEBUG] ssh_upload_backup -> błąd przy łączeniu z kluczem SSH: {e}")
|
||
raise e
|
||
else:
|
||
client.connect(router.host, port=router.port, username=router.ssh_user,
|
||
password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False, banner_timeout=10)
|
||
|
||
sftp = client.open_sftp()
|
||
remote_file = os.path.basename(local_backup_path)
|
||
sftp.put(local_backup_path, remote_file)
|
||
sftp.close()
|
||
client.close()
|
||
print(f"[DEBUG] ssh_upload_backup -> przesłano {local_backup_path} do routera")
|
||
|
||
def ssh_test_connection(router: Router) -> dict:
|
||
"""Testuje połączenie z routerem i zwraca informacje: model, uptime, hostname."""
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
key_source = router.ssh_key if router.ssh_key and router.ssh_key.strip() else get_settings().global_ssh_key
|
||
if key_source and key_source.strip():
|
||
try:
|
||
pkey = load_pkey(key_source)
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, pkey=pkey, timeout=10)
|
||
except Exception as e:
|
||
raise e
|
||
else:
|
||
client.connect(router.host, port=router.port, username=router.ssh_user, password=router.ssh_password, timeout=10, allow_agent=False, look_for_keys=False)
|
||
|
||
stdin, stdout, stderr = client.exec_command('/system resource print without-paging')
|
||
resource_output = stdout.read().decode('utf-8', errors='ignore')
|
||
stdin, stdout, stderr = client.exec_command('/system identity print')
|
||
identity_output = stdout.read().decode('utf-8', errors='ignore')
|
||
client.close()
|
||
|
||
model = "Nieznany"
|
||
uptime = "Nieznany"
|
||
hostname = "Nieznany"
|
||
|
||
for line in resource_output.splitlines():
|
||
if "board-name" in line:
|
||
model = line.split(":", 1)[1].strip()
|
||
if "uptime" in line:
|
||
uptime = line.split(":", 1)[1].strip()
|
||
|
||
for line in identity_output.splitlines():
|
||
if "name" in line:
|
||
hostname = line.split(":", 1)[1].strip()
|
||
|
||
return {"model": model, "uptime": uptime, "hostname": hostname}
|
||
|
||
def get_valid_cron_trigger(cron_expr, default_expr):
|
||
try:
|
||
trigger = CronTrigger.from_crontab(cron_expr)
|
||
return trigger
|
||
except ValueError as e:
|
||
print(f"[DEBUG] Invalid cron expression: {cron_expr}. Error: {e}. Using default: {default_expr}")
|
||
return CronTrigger.from_crontab(default_expr)
|
||
|
||
def get_email_template(subject, message):
|
||
return f"""
|
||
<html>
|
||
<head>
|
||
<style>
|
||
body {{
|
||
font-family: Arial, sans-serif;
|
||
background-color: #f4f4f4;
|
||
margin: 0;
|
||
padding: 0;
|
||
}}
|
||
.container {{
|
||
max-width: 600px;
|
||
margin: 20px auto;
|
||
background-color: #ffffff;
|
||
padding: 20px;
|
||
border-radius: 5px;
|
||
box-shadow: 0 0 10px rgba(0,0,0,0.1);
|
||
}}
|
||
.header {{
|
||
background-color: #007bff;
|
||
color: #ffffff;
|
||
padding: 10px;
|
||
text-align: center;
|
||
border-radius: 5px 5px 0 0;
|
||
}}
|
||
.content {{
|
||
margin: 20px 0;
|
||
font-size: 16px;
|
||
line-height: 1.5;
|
||
}}
|
||
.footer {{
|
||
text-align: center;
|
||
font-size: 12px;
|
||
color: #777777;
|
||
border-top: 1px solid #dddddd;
|
||
padding-top: 10px;
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<div class="header">
|
||
<h2>{subject}</h2>
|
||
</div>
|
||
<div class="content">
|
||
<p>{message}</p>
|
||
</div>
|
||
<div class="footer">
|
||
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Backup.</p>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
###############################################################################
|
||
# Powiadomienia
|
||
###############################################################################
|
||
def send_pushover(token, userkey, message, title="RouterOS Backup"):
|
||
try:
|
||
resp = requests.post("https://api.pushover.net/1/messages.json", data={
|
||
"token": token,
|
||
"user": userkey,
|
||
"message": message,
|
||
"title": title
|
||
})
|
||
return resp.status_code == 200
|
||
except Exception as e:
|
||
print("Pushover error:", e)
|
||
return False
|
||
|
||
def send_mail_with_attachment(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, plain_body, attachment_path="", html_body=None):
|
||
if not (smtp_host and smtp_host.strip() and smtp_user and smtp_user.strip() and smtp_pass and smtp_pass.strip()):
|
||
print("SMTP not properly configured, skipping email sending.")
|
||
return False
|
||
try:
|
||
# Utwórz wiadomość typu alternative (obsługuje plain text i HTML)
|
||
msg = MIMEMultipart("alternative")
|
||
msg["From"] = smtp_user
|
||
msg["To"] = to_address
|
||
msg["Subject"] = subject
|
||
|
||
# Dodaj część tekstową
|
||
part1 = MIMEText(plain_body, "plain")
|
||
msg.attach(part1)
|
||
|
||
# Jeśli nie podano wersji HTML, wygeneruj domyślny szablon
|
||
if html_body is None:
|
||
html_body = get_email_template(subject, plain_body)
|
||
part2 = MIMEText(html_body, "html")
|
||
msg.attach(part2)
|
||
|
||
# Dodaj załącznik, jeśli podany i istnieje
|
||
if attachment_path and os.path.isfile(attachment_path):
|
||
with open(attachment_path, "rb") as attachment:
|
||
part = MIMEBase("application", "octet-stream")
|
||
part.set_payload(attachment.read())
|
||
encoders.encode_base64(part)
|
||
part.add_header("Content-Disposition", f"attachment; filename={os.path.basename(attachment_path)}")
|
||
msg.attach(part)
|
||
|
||
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
||
server.starttls()
|
||
server.login(smtp_user, smtp_pass)
|
||
server.send_message(msg)
|
||
return True
|
||
except Exception as e:
|
||
print("Mail error_send_mail_with_attachment:", e)
|
||
return False
|
||
|
||
def send_mail(smtp_host, smtp_port, smtp_user, smtp_pass, to_address, subject, body):
|
||
if not (smtp_host and smtp_user and smtp_pass):
|
||
print("SMTP not configured, skipping email")
|
||
return False
|
||
try:
|
||
msg = MIMEMultipart()
|
||
msg["From"] = smtp_user
|
||
msg["To"] = to_address
|
||
msg["Subject"] = subject
|
||
msg.attach(MIMEText(body, "plain"))
|
||
|
||
with smtplib.SMTP(smtp_host, smtp_port) as server:
|
||
server.starttls()
|
||
server.login(smtp_user, smtp_pass)
|
||
server.send_message(msg)
|
||
return True
|
||
except Exception as e:
|
||
print("Mail error_send_mail:", e)
|
||
return False
|
||
|
||
def notify(settings: GlobalSettings, message: str, success: bool):
|
||
if settings.notify_failures_only and success:
|
||
return
|
||
if settings.pushover_token and settings.pushover_userkey:
|
||
send_pushover(settings.pushover_token, settings.pushover_userkey, message)
|
||
# Wysyłka maila tylko jeśli SMTP Notifications są włączone
|
||
if settings.smtp_notifications_enabled:
|
||
if (settings.smtp_host and settings.smtp_host.strip() and
|
||
settings.smtp_login and settings.smtp_login.strip() and
|
||
settings.smtp_password and settings.smtp_password.strip()):
|
||
try:
|
||
send_mail_with_attachment(
|
||
smtp_host=settings.smtp_host.strip(),
|
||
smtp_port=settings.smtp_port,
|
||
smtp_user=settings.smtp_login.strip(),
|
||
smtp_pass=settings.smtp_password.strip(),
|
||
to_address=settings.smtp_login.strip(),
|
||
subject="RouterOS Backup Notification",
|
||
plain_body=message
|
||
)
|
||
except Exception as e:
|
||
print("SMTP send error:", e)
|
||
else:
|
||
print("SMTP configuration is incomplete. Skipping email notification.")
|
||
|
||
###############################################################################
|
||
# Zadania cykliczne
|
||
###############################################################################
|
||
|
||
def cleanup_old_backups():
|
||
with app.app_context():
|
||
s = get_settings()
|
||
cutoff_date = datetime.utcnow() - timedelta(days=s.backup_retention_days)
|
||
old = Backup.query.filter(Backup.created_at < cutoff_date).all()
|
||
deleted_count = 0
|
||
for b in old:
|
||
try:
|
||
os.remove(b.file_path)
|
||
deleted_count += 1
|
||
except FileNotFoundError:
|
||
deleted_count += 1
|
||
db.session.delete(b)
|
||
db.session.commit()
|
||
log_operation(f"Automatyczna retencja wykonana at {datetime.utcnow()}: usunięto {deleted_count} backupów (próg: {s.backup_retention_days} dni).")
|
||
|
||
def scheduled_auto_backup():
|
||
with app.app_context():
|
||
s = get_settings()
|
||
routers = Router.query.all()
|
||
for r in routers:
|
||
try:
|
||
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
export_data = ssh_export(r)
|
||
filename = f"{backup_name}.rsc"
|
||
filepath = os.path.join(DATA_DIR, filename)
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write(export_data)
|
||
b = Backup(router_id=r.id, file_path=filepath, backup_type='export')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
notify(s, f"Auto-export dla routera {r.name} OK", True)
|
||
log_operation(f"Automatyczny eksport dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.")
|
||
except Exception as e:
|
||
notify(s, f"Auto-export dla routera {r.name} FAILED: {e}", False)
|
||
log_operation(f"Automatyczny eksport dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
|
||
|
||
def scheduled_auto_binary_backup():
|
||
with app.app_context():
|
||
s = get_settings()
|
||
routers = Router.query.all()
|
||
for r in routers:
|
||
try:
|
||
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
local_path = ssh_backup(r, backup_name)
|
||
b = Backup(router_id=r.id, file_path=local_path, backup_type='binary')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
notify(s, f"Auto-binary backup dla routera {r.name} OK", True)
|
||
log_operation(f"Auto-binary backup dla routera {r.name} wykonany pomyślnie at {datetime.utcnow()}.")
|
||
except Exception as e:
|
||
notify(s, f"Auto-binary backup dla routera {r.name} FAILED: {e}", False)
|
||
log_operation(f"Auto-binary backup dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
|
||
|
||
def schedule_auto_binary_backup_job():
|
||
s = get_settings()
|
||
try:
|
||
scheduler.remove_job("auto_binary_backup_job")
|
||
except Exception:
|
||
pass
|
||
if s.binary_cron:
|
||
trigger = get_valid_cron_trigger(s.binary_cron, "15 2 * * *")
|
||
scheduler.add_job(func=scheduled_auto_binary_backup, trigger=trigger, id="auto_binary_backup_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_auto_binary_backup_job -> cron_schedule={s.binary_cron}")
|
||
else:
|
||
scheduler.add_job(func=scheduled_auto_binary_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_binary_backup_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_auto_binary_backup_job -> interval days={s.auto_backup_interval_days}")
|
||
|
||
def schedule_auto_backup_job():
|
||
s = get_settings()
|
||
if s.cron_schedule:
|
||
trigger = get_valid_cron_trigger(s.cron_schedule, "0 */12 * * *")
|
||
scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_backup_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_auto_backup_job -> cron_schedule={s.cron_schedule}")
|
||
else:
|
||
scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=s.auto_backup_interval_days, id="auto_backup_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_auto_backup_job -> interval days={s.auto_backup_interval_days}")
|
||
|
||
def schedule_retention_job():
|
||
s = get_settings()
|
||
try:
|
||
scheduler.remove_job("retention_job")
|
||
except Exception:
|
||
pass
|
||
if s.retention_cron:
|
||
trigger = CronTrigger.from_crontab(s.retention_cron)
|
||
scheduler.add_job(func=cleanup_old_backups, trigger=trigger, id="retention_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_retention_job -> cron_schedule={s.retention_cron}")
|
||
else:
|
||
scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=s.backup_retention_days, id="retention_job", replace_existing=True)
|
||
print(f"[DEBUG] schedule_retention_job -> interval days={s.backup_retention_days}")
|
||
|
||
def schedule_auto_export_job():
|
||
s = get_settings()
|
||
try:
|
||
scheduler.remove_job("auto_export_job")
|
||
except Exception:
|
||
pass
|
||
if not s.enable_auto_export:
|
||
print("[DEBUG] schedule_auto_export_job -> auto export disabled")
|
||
return
|
||
if s.export_cron:
|
||
trigger = get_valid_cron_trigger(s.export_cron, "0 */12 * * *")
|
||
else:
|
||
trigger = CronTrigger.from_crontab("0 */12 * * *")
|
||
scheduler.add_job(func=scheduled_auto_backup, trigger=trigger, id="auto_export_job", replace_existing=True)
|
||
cron_used = s.export_cron if s.export_cron else "0 */12 * * *"
|
||
print(f"[DEBUG] schedule_auto_export_job -> cron_schedule={cron_used}")
|
||
|
||
###############################################################################
|
||
# Konfiguracja APScheduler - harmonogram zadań
|
||
###############################################################################
|
||
scheduler = BackgroundScheduler()
|
||
|
||
# Dodajemy dwa zadania cykliczne:
|
||
# 1) Czyszczenie starych backupów (default co 1 dzień)
|
||
# 2) Auto-backup (default co 1 dzień)
|
||
|
||
# Dodajemy z unikalnymi ID, co ułatwia re-schedulowanie
|
||
scheduler.add_job(func=cleanup_old_backups, trigger='interval', days=1, id="cleanup_job")
|
||
scheduler.add_job(func=scheduled_auto_backup, trigger='interval', days=1, id="auto_backup_job")
|
||
|
||
scheduler.start()
|
||
# Sprzątanie przy zamykaniu
|
||
atexit.register(lambda: scheduler.shutdown())
|
||
|
||
def reschedule_jobs():
|
||
s = get_settings()
|
||
try:
|
||
scheduler.remove_job("auto_export_job")
|
||
except Exception:
|
||
pass
|
||
try:
|
||
scheduler.remove_job("retention_job")
|
||
except Exception:
|
||
pass
|
||
try:
|
||
scheduler.remove_job("auto_binary_backup_job")
|
||
except Exception:
|
||
pass
|
||
schedule_auto_export_job()
|
||
schedule_retention_job()
|
||
schedule_auto_binary_backup_job()
|
||
|
||
###############################################################################
|
||
# Filtr Jinja2 - basename
|
||
###############################################################################
|
||
@app.template_filter('basename')
|
||
def basename_filter(path):
|
||
return os.path.basename(path) if path else path
|
||
|
||
@app.template_filter('filesize')
|
||
def filesize_filter(path):
|
||
if not path or not isinstance(path, (str, int, float)):
|
||
return "0 B"
|
||
# Jeśli wartość jest liczbą, traktujemy ją jako rozmiar w bajtach
|
||
if isinstance(path, (int, float)):
|
||
size = path
|
||
else:
|
||
if not os.path.isabs(path):
|
||
path = os.path.join(DATA_DIR, path)
|
||
if not os.path.exists(path):
|
||
return "0 B"
|
||
size = os.path.getsize(path)
|
||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||
if size < 1024:
|
||
return f"{size:.2f} {unit}"
|
||
size /= 1024
|
||
return f"{size:.2f} PB"
|
||
|
||
@app.template_filter('resolve_dns')
|
||
def resolve_dns_filter(host):
|
||
try:
|
||
# Zwraca nazwę hosta uzyskaną z reverse DNS
|
||
resolved = socket.gethostbyaddr(host)[0]
|
||
return resolved
|
||
except Exception:
|
||
return host
|
||
|
||
def get_data_folder_size():
|
||
total_size = 0
|
||
for dirpath, dirnames, filenames in os.walk(DATA_DIR):
|
||
for f in filenames:
|
||
fp = os.path.join(dirpath, f)
|
||
try:
|
||
total_size += os.path.getsize(fp)
|
||
except OSError:
|
||
pass
|
||
return total_size
|
||
|
||
###############################################################################
|
||
# ROUTES
|
||
###############################################################################
|
||
@app.route('/')
|
||
def index():
|
||
if get_current_user():
|
||
return redirect(url_for('dashboard'))
|
||
return render_template('index.html')
|
||
|
||
# Globalna zmienna z czasem uruchomienia aplikacji
|
||
app_start_time = datetime.now()
|
||
|
||
def get_data_folder_size():
|
||
total_size = 0
|
||
for dirpath, dirnames, filenames in os.walk(DATA_DIR):
|
||
for f in filenames:
|
||
fp = os.path.join(dirpath, f)
|
||
try:
|
||
total_size += os.path.getsize(fp)
|
||
except OSError:
|
||
pass
|
||
return total_size
|
||
|
||
@app.route('/dashboard')
|
||
@login_required
|
||
def dashboard():
|
||
user = get_current_user()
|
||
routers_count = Router.query.filter_by(owner_id=user.id).count()
|
||
export_count = db.session.query(Backup).join(Router).filter(
|
||
Router.owner_id == user.id, Backup.backup_type == 'export'
|
||
).count()
|
||
binary_count = db.session.query(Backup).join(Router).filter(
|
||
Router.owner_id == user.id, Backup.backup_type == 'binary'
|
||
).count()
|
||
total_backups = export_count + binary_count
|
||
logs = OperationLog.query.order_by(OperationLog.timestamp.desc()).limit(10).all()
|
||
|
||
disk = shutil.disk_usage(DATA_DIR)
|
||
folder_used = get_data_folder_size()
|
||
uptime = datetime.now() - app_start_time
|
||
disk_usage_percent = (folder_used / disk.total) * 100 if disk.total > 0 else 0
|
||
|
||
s = get_settings()
|
||
|
||
success_ops = OperationLog.query.filter(OperationLog.message.ilike("%OK%")).count()
|
||
failure_ops = OperationLog.query.filter(OperationLog.message.ilike("%FAILED%")).count()
|
||
current_time = datetime.now().astimezone()
|
||
|
||
return render_template('dashboard.html',
|
||
user=user,
|
||
routers_count=routers_count,
|
||
export_count=export_count,
|
||
binary_count=binary_count,
|
||
total_backups=total_backups,
|
||
logs=logs,
|
||
disk_total=disk.total,
|
||
disk_used=folder_used,
|
||
disk_free=disk.free,
|
||
disk_usage_percent=disk_usage_percent,
|
||
uptime=uptime,
|
||
success_ops=success_ops,
|
||
failure_ops=failure_ops,
|
||
current_time=current_time,
|
||
settings=s)
|
||
|
||
@app.route('/advanced_schedule', methods=['GET', 'POST'])
|
||
@login_required
|
||
def advanced_schedule():
|
||
s = get_settings()
|
||
if request.method == 'POST':
|
||
s.retention_cron = request.form.get('retention_cron', '').strip()
|
||
s.binary_cron = request.form.get('binary_cron', '').strip() # nowe pole
|
||
s.export_cron = request.form.get('export_cron', '').strip()
|
||
# Checkbox: jeśli nie jest zaznaczony, nie pojawi się w formularzu, więc ustawiamy na False
|
||
s.enable_auto_export = True if request.form.get('enable_auto_export') == 'on' else False
|
||
db.session.commit()
|
||
reschedule_jobs() # Aktualizuje harmonogram zadań
|
||
flash("Zaawansowane ustawienia harmonogramu zostały zapisane.")
|
||
return redirect(url_for('advanced_schedule'))
|
||
return render_template('advanced_schedule.html', settings=s)
|
||
|
||
@app.route('/toggle_dark_mode')
|
||
def toggle_dark_mode():
|
||
current_mode = session.get('dark_mode', False)
|
||
session['dark_mode'] = not current_mode
|
||
return redirect(request.referrer or url_for('index'))
|
||
|
||
@app.route('/register', methods=['GET','POST'])
|
||
def register():
|
||
if request.method=='POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
if User.query.filter_by(username=username).first():
|
||
flash("Użytkownik już istnieje")
|
||
return redirect(url_for('register'))
|
||
u = User(username=username, password_hash=bcrypt.hash(password))
|
||
db.session.add(u)
|
||
db.session.commit()
|
||
flash("Zarejestrowano, możesz się zalogować.")
|
||
return redirect(url_for('login'))
|
||
return render_template('register.html')
|
||
|
||
@app.route('/login', methods=['GET','POST'])
|
||
def login():
|
||
if request.method=='POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
u = User.query.filter_by(username=username).first()
|
||
if u and u.check_password(password):
|
||
session['user_id'] = u.id
|
||
flash("Zalogowano pomyślnie.")
|
||
return redirect(url_for('dashboard'))
|
||
else:
|
||
flash("Nieprawidłowe dane logowania.")
|
||
return redirect(url_for('login'))
|
||
return render_template('login.html')
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.clear()
|
||
flash("Wylogowano.")
|
||
return redirect(url_for('index'))
|
||
|
||
@app.route('/routers')
|
||
@login_required
|
||
def routers_list():
|
||
user = get_current_user()
|
||
routers = Router.query.filter_by(owner_id=user.id).order_by(Router.created_at.desc()).all()
|
||
return render_template('routers.html', user=user, routers=routers)
|
||
|
||
|
||
|
||
@app.route('/routers/add', methods=['GET','POST'])
|
||
@login_required
|
||
def add_router():
|
||
if request.method=='POST':
|
||
user = get_current_user()
|
||
name = request.form['name'].strip()
|
||
# Walidacja nazwy: tylko litery, cyfry, - i _
|
||
if not ALLOWED_NAME_REGEX.match(name):
|
||
flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).")
|
||
return redirect(url_for('add_router'))
|
||
host = request.form['host'].strip()
|
||
port = request.form.get('port','22').strip()
|
||
ssh_user = request.form['ssh_user'].strip()
|
||
ssh_key = request.form['ssh_key'].strip()
|
||
ssh_password = request.form['ssh_password'].strip()
|
||
r = Router(owner_id=user.id, name=name, host=host, port=int(port),
|
||
ssh_user=ssh_user, ssh_key=ssh_key, ssh_password=ssh_password)
|
||
db.session.add(r)
|
||
db.session.commit()
|
||
flash("Dodano router.")
|
||
return redirect(url_for('routers_list'))
|
||
return render_template('add_router.html')
|
||
|
||
@app.route('/router/<int:router_id>')
|
||
@login_required
|
||
def router_details(router_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
flash("Brak dostępu do routera.")
|
||
return redirect(url_for('routers_list'))
|
||
all_b = Backup.query.filter_by(router_id=router.id).order_by(Backup.created_at.desc()).all()
|
||
export_b = [x for x in all_b if x.backup_type=='export']
|
||
bin_b = [x for x in all_b if x.backup_type=='binary']
|
||
#return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b)
|
||
view = request.args.get('view', 'v2')
|
||
if view == 'v2':
|
||
return render_template('router_details_v2.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v2')
|
||
else:
|
||
return render_template('router_details.html', router=router, export_backups=export_b, binary_backups=bin_b, current_view='v1')
|
||
|
||
@app.route('/router/<int:router_id>/export', methods=['POST'])
|
||
@login_required
|
||
def router_export(router_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
|
||
return {"status": "error", "message": "Brak routera."}, 400
|
||
flash("Brak routera.")
|
||
return redirect(url_for('routers_list'))
|
||
try:
|
||
backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
export_data = ssh_export(router)
|
||
filename = f"{backup_name}.rsc"
|
||
filepath = os.path.join(DATA_DIR, filename)
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write(export_data)
|
||
b = Backup(router_id=router.id, file_path=filepath, backup_type='export')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
notify(get_settings(), f"Export {router.name} OK", True)
|
||
log_operation(f"Export wykonany dla routera {router.name} at {datetime.utcnow()}")
|
||
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
|
||
return {"status": "success", "message": "Eksport zakończony."}
|
||
flash("Eksport zakończony.")
|
||
except Exception as e:
|
||
notify(get_settings(), f"Export {router.name} FAIL: {e}", False)
|
||
log_operation(f"Export dla routera {router.name} FAILED: {e}")
|
||
if request.headers.get("X-Requested-With") == "XMLHttpRequest":
|
||
return {"status": "error", "message": str(e)}, 500
|
||
flash(f"Błąd: {e}")
|
||
return redirect(url_for('router_details', router_id=router.id))
|
||
|
||
@app.route('/router/<int:router_id>/backup', methods=['POST'])
|
||
@login_required
|
||
def router_backup(router_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
flash("Brak routera.")
|
||
return redirect(url_for('routers_list'))
|
||
try:
|
||
backup_name = f"{router.name}_{router.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
local_path = ssh_backup(router, backup_name)
|
||
b = Backup(router_id=router.id, file_path=local_path, backup_type='binary')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
notify(get_settings(), f"Backup {router.name} OK", True)
|
||
log_operation(f"Backup binarny wykonany dla routera {router.name} at {datetime.utcnow()}")
|
||
flash("Backup binarny zakończony.")
|
||
except Exception as e:
|
||
notify(get_settings(), f"Backup {router.name} FAIL: {e}", False)
|
||
log_operation(f"Backup dla routera {router.name} FAILED: {e}")
|
||
flash(f"Błąd: {e}")
|
||
return redirect(url_for('router_details', router_id=router.id))
|
||
|
||
@app.route('/router/<int:router_id>/upload_backup/<int:backup_id>', methods=['POST'])
|
||
@login_required
|
||
def upload_backup(router_id, backup_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
flash("Brak routera.")
|
||
return redirect(url_for('routers_list'))
|
||
b = Backup.query.filter_by(id=backup_id, router_id=router.id, backup_type='binary').first()
|
||
if not b:
|
||
flash("Nie znaleziono backupu binarnego.")
|
||
return redirect(url_for('router_details', router_id=router.id))
|
||
try:
|
||
ssh_upload_backup(router, b.file_path)
|
||
log_operation(f"Backup {os.path.basename(b.file_path)} wgrany do routera {router.name} at {datetime.utcnow()}")
|
||
flash("Plik backupu wgrany do routera.")
|
||
except Exception as e:
|
||
flash(f"Błąd wgrywania: {e}")
|
||
#return redirect(url_for('router_details', router_id=router.id))
|
||
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
|
||
return redirect(next_url)
|
||
|
||
@app.route('/backup/delete/<int:backup_id>', methods=['POST'])
|
||
@login_required
|
||
def delete_backup(backup_id):
|
||
user = get_current_user()
|
||
b = Backup.query.get(backup_id)
|
||
if not b or b.router.owner_id != user.id:
|
||
flash("Brak dostępu do backupu.")
|
||
return redirect(url_for('dashboard'))
|
||
try:
|
||
os.remove(b.file_path)
|
||
except FileNotFoundError:
|
||
pass
|
||
db.session.delete(b)
|
||
db.session.commit()
|
||
flash("Backup usunięty.")
|
||
#return redirect(url_for('router_details', router_id=b.router_id))
|
||
next_url = request.form.get('next') or url_for('dashboard')
|
||
return redirect(next_url)
|
||
|
||
@app.route('/router/delete/<int:router_id>', methods=['POST'])
|
||
@login_required
|
||
def delete_router(router_id):
|
||
user = get_current_user()
|
||
r = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not r:
|
||
flash("Brak dostępu do routera.")
|
||
return redirect(url_for('routers_list'))
|
||
for b in r.backups:
|
||
try:
|
||
os.remove(b.file_path)
|
||
except FileNotFoundError:
|
||
pass
|
||
db.session.delete(b)
|
||
db.session.delete(r)
|
||
db.session.commit()
|
||
log_operation(f"Router {r.name} usunięty at {datetime.utcnow()}")
|
||
flash("Router usunięty.")
|
||
return redirect(url_for('routers_list'))
|
||
|
||
@app.route('/routers/all_export', methods=['POST'], endpoint="export_all_routers")
|
||
@login_required
|
||
def export_all_routers():
|
||
user = get_current_user()
|
||
routers = Router.query.filter_by(owner_id=user.id).all()
|
||
messages = []
|
||
for r in routers:
|
||
try:
|
||
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
export_data = ssh_export(r)
|
||
filename = f"{backup_name}.rsc"
|
||
filepath = os.path.join(DATA_DIR, filename)
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
f.write(export_data)
|
||
b = Backup(router_id=r.id, file_path=filepath, backup_type='export')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
log_operation(f"Export dla routera {r.name} OK at {datetime.utcnow()}")
|
||
messages.append(f"{r.name} OK")
|
||
except Exception as e:
|
||
log_operation(f"Export dla routera {r.name} FAILED: {e}")
|
||
messages.append(f"{r.name} FAIL: {e}")
|
||
flash(" | ".join(messages))
|
||
return redirect(url_for('dashboard'))
|
||
|
||
# Nowa podstrona: diff selector
|
||
@app.route('/diff_selector', methods=['GET', 'POST'])
|
||
@login_required
|
||
def diff_selector():
|
||
user = get_current_user()
|
||
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.backup_type=='export').order_by(Backup.created_at.desc()).all()
|
||
if request.method == 'POST':
|
||
bid1 = request.form.get('backup1')
|
||
bid2 = request.form.get('backup2')
|
||
if bid1 and bid2:
|
||
return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2))
|
||
else:
|
||
flash("Wybierz dwa backupy do porównania.")
|
||
return render_template('diff_selector.html', backups=backups)
|
||
|
||
@app.route('/all_files')
|
||
@login_required
|
||
def all_files():
|
||
user = get_current_user()
|
||
query = Backup.query.join(Router).filter(Router.owner_id == user.id)
|
||
|
||
# Filtrowanie – wyszukiwanie po nazwie pliku (zastosowanie filtru "basename")
|
||
search = request.args.get('search', '')
|
||
if search:
|
||
query = query.filter(Backup.file_path.ilike(f"%{search}%"))
|
||
|
||
# Sortowanie – sort_by i order
|
||
sort_by = request.args.get('sort_by', 'created_at')
|
||
order = request.args.get('order', 'desc')
|
||
if sort_by not in ['created_at', 'file_path']:
|
||
sort_by = 'created_at'
|
||
sort_column = getattr(Backup, sort_by)
|
||
if order == 'asc':
|
||
query = query.order_by(sort_column.asc())
|
||
else:
|
||
query = query.order_by(sort_column.desc())
|
||
|
||
files = query.all()
|
||
total_size = 0
|
||
for f in files:
|
||
full_path = f.file_path if os.path.isabs(f.file_path) else os.path.join(DATA_DIR, f.file_path)
|
||
try:
|
||
total_size += os.path.getsize(full_path)
|
||
except OSError:
|
||
pass
|
||
return render_template('all_files.html', files=files, total_size=total_size, search=search, sort_by=sort_by, order=order)
|
||
|
||
@app.route('/view_export/<int:backup_id>')
|
||
@login_required
|
||
def view_export(backup_id):
|
||
user = get_current_user()
|
||
b = Backup.query.get(backup_id)
|
||
if not b or b.router.owner_id != user.id:
|
||
flash("Brak dostępu do backupu.")
|
||
return redirect(url_for('all_files'))
|
||
if b.backup_type != 'export':
|
||
flash("Wybrany backup nie jest plikiem eksportu.")
|
||
return redirect(url_for('all_files'))
|
||
try:
|
||
with open(b.file_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
except Exception as e:
|
||
flash("Błąd odczytu pliku: " + str(e))
|
||
return redirect(url_for('all_files'))
|
||
next_url = request.args.get("next") or request.referrer or url_for('all_files')
|
||
return render_template('view_export.html', backup=b, content=content, next_url=next_url)
|
||
|
||
@app.route('/send_export_email/<int:backup_id>', methods=['POST'])
|
||
@login_required
|
||
def send_export_email(backup_id):
|
||
user = get_current_user()
|
||
b = Backup.query.get(backup_id)
|
||
if not b or b.router.owner_id != user.id:
|
||
flash("Brak dostępu do backupu.")
|
||
return redirect(url_for('dashboard'))
|
||
s = get_settings()
|
||
if not (s.smtp_host and s.smtp_login and s.smtp_password):
|
||
flash("Nie skonfigurowano ustawień SMTP w panelu.")
|
||
return redirect(url_for('settings_view'))
|
||
subject = f"RouterOS Export: {os.path.basename(b.file_path)}"
|
||
body = f"Przesyłam eksport {os.path.basename(b.file_path)} z routera {b.router.name}."
|
||
if send_mail_with_attachment(s.smtp_host, s.smtp_port, s.smtp_login, s.smtp_password,
|
||
s.smtp_login, subject, body, b.file_path):
|
||
flash("Wysłano eksport mailem.")
|
||
else:
|
||
flash("Błąd wysyłki mailowej.")
|
||
#return redirect(url_for('router_details', router_id=b.router_id))
|
||
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
|
||
return redirect(next_url)
|
||
|
||
@app.route('/download/<path:filename>')
|
||
@login_required
|
||
def download_file(filename):
|
||
# Upewnij się, że pobierany plik znajduje się w katalogu DATA_DIR
|
||
safe_filename = os.path.basename(filename) # zapobiega path traversal
|
||
safe_path = os.path.join(DATA_DIR, safe_filename)
|
||
if not os.path.exists(safe_path):
|
||
flash("Plik nie istnieje.")
|
||
return redirect(url_for('all_files'))
|
||
return send_file(safe_path, as_attachment=True)
|
||
|
||
# Nowa podstrona: wybór backupów do diff (alternatywnie)
|
||
@app.route('/diff/selected', methods=['POST'])
|
||
@login_required
|
||
def diff_selected():
|
||
bid1 = request.form.get('backup1')
|
||
bid2 = request.form.get('backup2')
|
||
if bid1 and bid2:
|
||
return redirect(url_for('diff_view', backup_id1=bid1, backup_id2=bid2))
|
||
else:
|
||
flash("Musisz wybrać dwa pliki.")
|
||
return redirect(url_for('diff_selector'))
|
||
|
||
@app.route('/settings', methods=['GET','POST'], endpoint="settings_view")
|
||
@login_required
|
||
def settings_view():
|
||
user = get_current_user()
|
||
s = get_settings()
|
||
if request.method == 'POST':
|
||
# Usunięto pola backup oraz harmonogram CRON
|
||
s.global_ssh_key = request.form.get('global_ssh_key', '')
|
||
s.pushover_token = request.form.get('pushover_token', '')
|
||
s.pushover_userkey = request.form.get('pushover_userkey', '')
|
||
s.notify_failures_only = bool(request.form.get('notify_failures_only', False))
|
||
s.smtp_notifications_enabled = True if request.form.get('smtp_notifications_enabled') == 'on' else False
|
||
s.smtp_host = request.form.get('smtp_host', '')
|
||
s.smtp_port = int(request.form.get('smtp_port', '587'))
|
||
s.smtp_login = request.form.get('smtp_login', '')
|
||
s.smtp_password = request.form.get('smtp_password', '')
|
||
db.session.commit()
|
||
reschedule_jobs() # Aktualizacja zadań – zadania dotyczące backupu/CRON zostaną teraz sterowane z /advanced_schedule
|
||
flash("Zapisano ustawienia.")
|
||
return redirect(url_for('settings_view'))
|
||
return render_template('settings.html', settings=s)
|
||
|
||
@app.route('/router/<int:router_id>/edit', methods=['GET','POST'])
|
||
@login_required
|
||
def edit_router(router_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
flash("Brak routera.")
|
||
return redirect(url_for('routers_list'))
|
||
if request.method == 'POST':
|
||
# Przytnij wejścia i waliduj nazwę
|
||
new_name = request.form['name'].strip()
|
||
if not ALLOWED_NAME_REGEX.match(new_name):
|
||
flash("Nazwa urządzenia może zawierać wyłącznie litery, cyfry, myślniki (-) oraz podkreślenia (_).")
|
||
return redirect(url_for('edit_router', router_id=router_id))
|
||
router.name = new_name
|
||
router.host = request.form['host'].strip()
|
||
router.port = int(request.form.get('port', '22').strip())
|
||
router.ssh_user = request.form['ssh_user'].strip()
|
||
router.ssh_key = request.form['ssh_key'].strip()
|
||
router.ssh_password = request.form['ssh_password'].strip()
|
||
db.session.commit()
|
||
flash("Zapisano zmiany w routerze.")
|
||
return redirect(url_for('routers_list'))
|
||
return render_template('edit_router.html', router=router)
|
||
|
||
@app.route('/download_zip', methods=['POST'])
|
||
@login_required
|
||
def download_zip():
|
||
"""
|
||
Pobieranie wybranych backupów (lista backup_id) w formie pliku ZIP.
|
||
"""
|
||
user = get_current_user()
|
||
backup_ids = request.form.getlist('backup_id') # np. checkboxy "backup_id"
|
||
|
||
# Filtrujemy tylko backupy należące do usera
|
||
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
|
||
if not backups:
|
||
flash("Nie wybrano żadnych plików lub brak dostępu.")
|
||
return redirect(url_for('dashboard'))
|
||
|
||
# Tworzymy ZIP w pamięci
|
||
mem_zip = io.BytesIO()
|
||
with zipfile.ZipFile(mem_zip, 'w') as zf:
|
||
for b in backups:
|
||
arcname = os.path.basename(b.file_path)
|
||
if not os.path.isfile(b.file_path):
|
||
continue
|
||
zf.write(b.file_path, arcname=arcname)
|
||
mem_zip.seek(0)
|
||
|
||
return send_file(mem_zip, mimetype='application/zip',
|
||
as_attachment=True,
|
||
download_name='backup_export.zip')
|
||
|
||
@app.route('/send_by_email/<int:backup_id>', methods=['POST'])
|
||
@login_required
|
||
def send_by_email(backup_id):
|
||
user = get_current_user()
|
||
b = Backup.query.get(backup_id)
|
||
if not b or b.router.owner_id != user.id:
|
||
flash("Brak dostępu do backupu.")
|
||
return redirect(url_for('dashboard'))
|
||
settings = get_settings()
|
||
if not (settings.smtp_host and settings.smtp_login and settings.smtp_password):
|
||
flash("Nie skonfigurowano ustawień SMTP w panelu.")
|
||
return redirect(url_for('settings_view'))
|
||
subject = f"RouterOS Backup: {os.path.basename(b.file_path)}"
|
||
body = f"Przesyłam backup {os.path.basename(b.file_path)} z routera {b.router.name}."
|
||
if send_mail_with_attachment(settings.smtp_host, settings.smtp_port,
|
||
settings.smtp_login, settings.smtp_password,
|
||
settings.smtp_login, subject, body, b.file_path):
|
||
flash("Wysłano plik mailem.")
|
||
else:
|
||
flash("Błąd wysyłki mailowej.")
|
||
#return redirect(url_for('router_details', router_id=b.router_id))
|
||
next_url = request.form.get('next') or request.referrer or url_for('dashboard')
|
||
return redirect(next_url)
|
||
|
||
@app.route('/diff/<int:backup_id1>/<int:backup_id2>')
|
||
@login_required
|
||
def diff_view(backup_id1, backup_id2):
|
||
user = get_current_user()
|
||
b1 = Backup.query.get(backup_id1)
|
||
b2 = Backup.query.get(backup_id2)
|
||
if not b1 or not b2:
|
||
flash("Brak backupów.")
|
||
return redirect(url_for('routers_list'))
|
||
if b1.router.owner_id != user.id or b2.router.owner_id != user.id:
|
||
flash("Brak dostępu.")
|
||
return redirect(url_for('routers_list'))
|
||
if b1.backup_type != 'export' or b2.backup_type != 'export':
|
||
flash("Diff można wykonać tylko dla plików 'export'.")
|
||
return redirect(url_for('router_details', router_id=b1.router_id))
|
||
try:
|
||
with open(b1.file_path, 'r', encoding='utf-8') as f1:
|
||
file1 = f1.read().splitlines()
|
||
with open(b2.file_path, 'r', encoding='utf-8') as f2:
|
||
file2 = f2.read().splitlines()
|
||
except Exception as e:
|
||
flash(f"Błąd czytania plików: {e}")
|
||
return redirect(url_for('router_details', router_id=b1.router_id))
|
||
|
||
diff_lines = list(difflib.unified_diff(
|
||
file1, file2,
|
||
fromfile=os.path.basename(b1.file_path),
|
||
tofile=os.path.basename(b2.file_path),
|
||
lineterm=''
|
||
))
|
||
diff_text = "\n".join(diff_lines)
|
||
|
||
return render_template('diff.html', diff_text=diff_text, backup1=b1, backup2=b2)
|
||
|
||
@app.route('/routers/all_backup', methods=['POST'])
|
||
@login_required
|
||
def backup_all_routers():
|
||
user = get_current_user()
|
||
routers = Router.query.filter_by(owner_id=user.id).all()
|
||
messages = []
|
||
for r in routers:
|
||
try:
|
||
backup_name = f"{r.name}_{r.id}_{datetime.now():%Y%m%d_%H%M%S}"
|
||
local_path = ssh_backup(r, backup_name)
|
||
b = Backup(router_id=r.id, file_path=local_path, backup_type='binary')
|
||
db.session.add(b)
|
||
db.session.commit()
|
||
log_operation(f"Backup binarny dla routera {r.name} OK at {datetime.utcnow()}")
|
||
messages.append(f"{r.name} OK")
|
||
except Exception as e:
|
||
log_operation(f"Backup binarny dla routera {r.name} FAILED at {datetime.utcnow()}: {e}")
|
||
messages.append(f"{r.name} FAIL: {e}")
|
||
flash(" | ".join(messages))
|
||
return redirect(url_for('dashboard'))
|
||
|
||
@app.route('/mass_actions', methods=['POST'])
|
||
@login_required
|
||
def mass_actions():
|
||
user = get_current_user()
|
||
backup_ids = request.form.getlist('backup_id')
|
||
if not backup_ids:
|
||
flash("Nie wybrano żadnych backupów.")
|
||
return redirect(url_for('all_files'))
|
||
action = request.form.get('action')
|
||
if action == 'download':
|
||
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
|
||
if not backups:
|
||
flash("Brak dostępu lub nie wybrano żadnych backupów.")
|
||
return redirect(url_for('all_files'))
|
||
mem_zip = io.BytesIO()
|
||
with zipfile.ZipFile(mem_zip, 'w') as zf:
|
||
for b in backups:
|
||
arcname = os.path.basename(b.file_path)
|
||
if not os.path.isfile(b.file_path):
|
||
continue
|
||
zf.write(b.file_path, arcname=arcname)
|
||
mem_zip.seek(0)
|
||
return send_file(mem_zip, mimetype='application/zip', as_attachment=True, download_name='backup_export.zip')
|
||
elif action == 'delete':
|
||
backups = Backup.query.join(Router).filter(Router.owner_id==user.id, Backup.id.in_(backup_ids)).all()
|
||
if not backups:
|
||
flash("Brak dostępu lub nie wybrano żadnych backupów.")
|
||
return redirect(url_for('all_files'))
|
||
for b in backups:
|
||
try:
|
||
os.remove(b.file_path)
|
||
except Exception:
|
||
pass
|
||
db.session.delete(b)
|
||
db.session.commit()
|
||
flash("Wybrane backupy zostały usunięte.")
|
||
return redirect(url_for('all_files'))
|
||
else:
|
||
flash("Nieprawidłowa akcja.")
|
||
return redirect(url_for('all_files'))
|
||
|
||
@app.route('/health', methods=['GET'])
|
||
def healthcheck():
|
||
try:
|
||
db.session.execute(text('SELECT 1'))
|
||
status = 'ok'
|
||
except Exception as e:
|
||
status = 'error'
|
||
return jsonify({
|
||
"status": status,
|
||
"timestamp": datetime.utcnow().isoformat() + "Z"
|
||
})
|
||
|
||
@app.route('/change_password', methods=['GET', 'POST'])
|
||
@login_required
|
||
def change_password():
|
||
user = get_current_user()
|
||
if request.method == 'POST':
|
||
current_password = request.form.get('current_password')
|
||
new_password = request.form.get('new_password')
|
||
confirm_password = request.form.get('confirm_password')
|
||
|
||
if not user.check_password(current_password):
|
||
flash("Obecne hasło jest nieprawidłowe.")
|
||
return redirect(url_for('change_password'))
|
||
if new_password != confirm_password:
|
||
flash("Nowe hasło i potwierdzenie nie są zgodne.")
|
||
return redirect(url_for('change_password'))
|
||
|
||
user.password_hash = bcrypt.hash(new_password)
|
||
db.session.commit()
|
||
flash("Hasło zostało zmienione pomyślnie.")
|
||
return redirect(url_for('dashboard'))
|
||
return render_template('change_password.html')
|
||
|
||
@app.route('/router/<int:router_id>/test_connection', methods=['GET'])
|
||
@login_required
|
||
def test_connection(router_id):
|
||
user = get_current_user()
|
||
router = Router.query.filter_by(id=router_id, owner_id=user.id).first()
|
||
if not router:
|
||
flash("Brak dostępu do routera.")
|
||
return redirect(url_for('routers_list'))
|
||
try:
|
||
result = ssh_test_connection(router)
|
||
except Exception as e:
|
||
flash(f"Błąd testu połączenia: {e}")
|
||
return redirect(url_for('routers_list'))
|
||
# Jeśli wywołanie zawiera parametr modal=1, zwracamy widok dla modalu
|
||
if request.args.get("modal") == "1":
|
||
return render_template("test_connection_modal.html", router=router, result=result)
|
||
return render_template("test_connection.html", router=router, result=result)
|
||
|
||
if __name__ == '__main__':
|
||
with app.app_context():
|
||
scheduler = BackgroundScheduler()
|
||
schedule_retention_job()
|
||
schedule_auto_export_job()
|
||
schedule_auto_binary_backup_job()
|
||
scheduler.start()
|
||
atexit.register(lambda: scheduler.shutdown())
|
||
app.run(host='0.0.0.0', port=5581, use_reloader=False, debug=True)
|