2025-02-26 14:29:36 +01:00

966 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# app.py
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from apscheduler.schedulers.background import BackgroundScheduler
import librouteros
import threading
import time
import requests
import smtplib
from email.mime.text import MIMEText
from flask import current_app as app
from flask import render_template
import atexit
from datetime import timedelta, datetime
import requests
from bs4 import BeautifulSoup
from flask import render_template, flash
import logging
import re
try:
from dateutil import parser as date_parser
except ImportError:
date_parser = None
# Konfiguracja aplikacji
app = Flask(__name__)
app.config['SECRET_KEY'] = 'twoj-sekret-klucz'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Konfiguracja Flask-Login
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# MODELE BAZY DANYCH
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
devices = db.relationship('Device', backref='owner', lazy=True)
settings = db.relationship('Settings', uselist=False, backref='user')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Device(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120))
ip = db.Column(db.String(120), nullable=False)
port = db.Column(db.Integer, default=8728)
device_username = db.Column(db.String(120), nullable=False)
device_password = db.Column(db.String(120), nullable=False)
branch = db.Column(db.String(20), default="stable")
update_required = db.Column(db.Boolean, default=False)
last_check = db.Column(db.DateTime)
last_log = db.Column(db.Text)
current_version = db.Column(db.String(50))
current_firmware = db.Column(db.String(50))
use_ssl = db.Column(db.Boolean, default=False) # Czy używać SSL?
ssl_insecure = db.Column(db.Boolean, default=False) # Jeśli True nie weryfikować certyfikatu SSL
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
class Settings(db.Model):
id = db.Column(db.Integer, primary_key=True)
pushover_user_key = db.Column(db.String(255))
pushover_token = db.Column(db.String(255))
pushover_enabled = db.Column(db.Boolean, default=False)
smtp_server = db.Column(db.String(255))
smtp_port = db.Column(db.Integer)
smtp_username = db.Column(db.String(255))
smtp_password = db.Column(db.String(255))
email_notifications_enabled = db.Column(db.Boolean, default=False)
check_interval = db.Column(db.Integer, default=60)
log_retention_days = db.Column(db.Integer, default=30)
recipient_email = db.Column(db.String(120))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True)
class Log(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
message = db.Column(db.Text)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
device = db.relationship('Device', backref='logs')
class UpdateHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
update_type = db.Column(db.String(50))
details = db.Column(db.Text)
device = db.relationship('Device', backref='update_histories')
class Anomaly(db.Model):
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
description = db.Column(db.Text)
resolved = db.Column(db.Boolean, default=False)
device = db.relationship('Device', backref='anomalies')
class ChangelogEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
version = db.Column(db.String(50), nullable=False)
details = db.Column(db.Text, nullable=False)
category = db.Column(db.String(10), nullable=False) # "6.x" or "7.x"
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
release_type = db.Column(db.String(10), nullable=False, default="stable")
# Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu)
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# FUNKCJE POWIADOMIEŃ
def send_pushover_notification(user, message):
# Sprawdzamy, czy użytkownik posiada ustawienia oraz wymagane pola
if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token:
return
data = {
"token": user.settings.pushover_token, # Używamy pushover_token z ustawień
"user": user.settings.pushover_user_key,
"message": message
}
try:
r = requests.post("https://api.pushover.net/1/messages.json", data=data)
# Możesz dodać logowanie odpowiedzi, jeśli potrzebne
except Exception as e:
print("Błąd przy wysyłaniu powiadomienia Pushover:", e)
def send_email_notification(user, subject, message):
if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server:
return
try:
html_body = get_email_template(subject, message)
msg = MIMEText(html_body, 'html')
msg["Subject"] = subject
msg["From"] = user.settings.smtp_username
# Używamy adresu z ustawień, jeśli został podany, lub domyślnie adresu z profilu użytkownika
to_email = user.settings.recipient_email if user.settings.recipient_email else user.email
msg["To"] = to_email
s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port)
s.starttls()
s.login(user.settings.smtp_username, user.settings.smtp_password)
s.sendmail(user.settings.smtp_username, [to_email], msg.as_string())
s.quit()
app.logger.debug("E-mail wysłany pomyślnie")
except Exception as e:
app.logger.error(f"Błąd przy wysyłaniu powiadomienia e-mail: {e}", exc_info=True)
# FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA
def check_device_update(device):
log_entries = []
update_available = False
current_version = None
current_firmware = None
try:
app.logger.debug(f"Connecting to device {device.ip}:{device.port} using SSL: {device.use_ssl}, ssl_verify: {not device.ssl_insecure}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug(f"Connection established to {device.ip}")
# Pobranie informacji o tożsamości urządzenia
identity_resp = list(api('/system/identity/print'))
app.logger.debug(f"Identity response: {identity_resp}")
if identity_resp:
identity = identity_resp[0].get('name', '')
log_entries.append(f"Identity: {identity}")
# Pobranie wersji systemu
resource_resp = list(api('/system/resource/print'))
app.logger.debug(f"Resource response: {resource_resp}")
if resource_resp:
version = resource_resp[0].get('version', '')
current_version = version
log_entries.append(f"System Version: {version}")
# Pobranie informacji o routerboard (firmware)
board_resp = list(api('/system/routerboard/print'))
app.logger.debug(f"Routerboard response: {board_resp}")
if board_resp:
board_info = board_resp[0]
# Próba odczytania firmware z kilku możliwych kluczy
firmware = board_info.get('firmware', board_info.get('firmware-version', board_info.get('upgrade-firmware', 'N/A')))
current_firmware = firmware
log_entries.append(f"Firmware: {firmware}")
# Sprawdzenie dostępnych aktualizacji
log_entries.append("Checking for updates...")
list(api('/system/package/update/check-for-updates'))
for _ in range(10):
time.sleep(1)
status_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update status response: {status_resp}")
if status_resp:
status = status_resp[0].get('status', '').lower()
if 'checking' not in status:
log_entries.append(f"Update check completed. Status: {status}")
break
update_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update response: {update_resp}")
if update_resp:
for res in update_resp:
installed = res.get('installed-version', '')
latest = res.get('latest-version', '')
if latest and latest != installed:
log_entries.append(f"Updates available: {installed} -> {latest}")
update_available = True
else:
log_entries.append("No updates available.")
return "\n".join(log_entries), update_available, current_version, current_firmware
except Exception as e:
app.logger.error(f"Error connecting to device {device.ip}: {e}", exc_info=True)
return f"Error: {str(e)}", False, None, None
def get_email_template(subject, message):
return f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}}
.container {{
max-width: 600px;
margin: 20px auto;
background-color: #ffffff;
padding: 20px;
border-radius: 5px;
box-shadow: 0 0 10px rgba(0,0,0,0.1);
}}
.header {{
background-color: #007bff;
color: #ffffff;
padding: 10px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
margin: 20px 0;
font-size: 16px;
line-height: 1.5;
}}
.footer {{
text-align: center;
font-size: 12px;
color: #777777;
border-top: 1px solid #dddddd;
padding-top: 10px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>{subject}</h2>
</div>
<div class="content">
<p>{message}</p>
</div>
<div class="footer">
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Backup.</p>
</div>
</div>
</body>
</html>
"""
def clean_old_changelogs():
with app.app_context():
cutoff_time = datetime.utcnow() - timedelta(days=7)
db.session.query(ChangelogEntry).filter(ChangelogEntry.timestamp < cutoff_time).delete()
db.session.commit()
def check_all_devices():
with app.app_context():
devices = Device.query.all()
for device in devices:
result, update_available, current_version, current_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
db.session.commit()
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
#log_message = f"Urządzenie {device.name or device.ip} - {result}"
#log_entry = Log(message=log_message, device_id=device.id, user_id=device.user_id)
db.session.add(log_entry)
db.session.commit()
# Powiadomienia, jeśli dostępna aktualizacja
if update_available:
user = device.owner
message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację."
send_pushover_notification(user, message)
send_email_notification(user, "Aktualizacja dostępna", message)
def bytes_to_human(n):
try:
n = int(n)
except Exception:
return n
i = 0
while n >= 1024 and i < 5:
n /= 1024.0
i += 1
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
return f"{n:.2f} {units[i]}"
def clean_old_logs():
with app.app_context():
all_settings = Settings.query.all()
for setting in all_settings:
if setting.log_retention_days:
cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days)
# Usuwamy logi starsze niż cutoff dla danego użytkownika
Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete()
db.session.commit()
def parse_release_date(text):
"""
Próbuje wyłuskać datę z tekstu.
Najpierw używa regex, a w razie niepowodzenia dateutil (jeśli dostępny).
"""
date_match = re.search(r"\((\d{4})-([A-Za-z]{3})-(\d{2})", text)
if date_match:
try:
return datetime.strptime(f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}", "%Y-%b-%d")
except Exception as e:
logging.error(f"Błąd parsowania daty przy użyciu strptime: {e}")
if date_parser:
try:
return date_parser.parse(text, fuzzy=True)
except Exception as e:
logging.error(f"Błąd parsowania daty przy użyciu dateutil: {e}")
return None
def get_release_type(version_text):
"""
Określa typ wydania na podstawie numeru wersji:
- Jeśli w tekście występuje "beta" zwraca "beta"
- Jeśli w tekście występuje "rc" zwraca "rc"
- W przeciwnym wypadku "stable"
"""
lower = version_text.lower()
if "beta" in lower:
return "beta"
elif "rc" in lower:
return "rc"
else:
return "stable"
def fetch_changelogs(force=False):
changelog_url = "https://mikrotik.com/download/changelogs"
current_date = datetime.utcnow()
try:
logging.info(f"Pobieranie changelogów z {changelog_url}...")
response = requests.get(changelog_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
changelog_sections = soup.find_all("li", class_="accordion-navigation")
logging.info(f"Znaleziono {len(changelog_sections)} sekcji changelogów.")
new_entries = 0
for section in changelog_sections:
a_tag = section.find("a")
if not a_tag:
continue
raw_text = a_tag.get_text(strip=True)
match = re.match(r"([0-9.]+[a-zA-Z0-9]*)", raw_text)
version_text = match.group(1) if match else raw_text
# Pomijamy wersje, które nie zaczynają się od "6." lub "7."
if not (version_text.startswith("6.") or version_text.startswith("7.")):
logging.info(f"Pomijam wersję {version_text} nie jest 6.x ani 7.x")
continue
details_div = section.find("div", class_="content")
changelog_file_url = details_div.get("data-url") if details_div else None
if not changelog_file_url:
logging.warning(f"Brak URL changeloga dla wersji {version_text}")
continue
try:
changelog_response = requests.get(changelog_file_url, timeout=10)
changelog_response.raise_for_status()
changelog_lines = changelog_response.text.splitlines()
if not changelog_lines:
logging.warning(f"Pusty changelog dla wersji {version_text}")
continue
first_line = changelog_lines[0].strip()
release_date_dt = parse_release_date(first_line)
if release_date_dt is None:
logging.warning(f"Nie udało się wyłuskać daty dla wersji {version_text}, pomijam ten changelog")
continue # Pomijamy wpis bez daty
changelog_text = "\n".join(changelog_lines).strip()
except Exception as e:
logging.error(f"Błąd pobierania changeloga z {changelog_file_url}: {e}")
continue
# Filtrowanie: dla 7.x pomijamy wersje starsze niż 1 rok, dla 6.x starsze niż 2 lata
if version_text.startswith("7.") and release_date_dt < (current_date - timedelta(days=365)):
logging.info(f"Pomijam wersję {version_text} - starsza niż 1 rok")
continue
if version_text.startswith("6.") and release_date_dt < (current_date - timedelta(days=730)):
logging.info(f"Pomijam wersję {version_text} - starsza niż 2 lata")
continue
# Określenie typu wydania (stable, rc, beta)
release_type = get_release_type(version_text)
new_entry = ChangelogEntry(
version=version_text,
details=changelog_text,
category="6.x" if version_text.startswith("6.") else "7.x",
timestamp=release_date_dt,
release_type=release_type
)
db.session.add(new_entry)
new_entries += 1
db.session.commit()
logging.info(f"Nowe wpisy dodane: {new_entries}")
except Exception as e:
logging.error(f"Błąd podczas pobierania changelogów: {e}")
def detect_anomalies():
with app.app_context():
# Ustal okres analizy, np. ostatnie 24 godziny
cutoff = datetime.utcnow() - timedelta(hours=24)
# Pobierz logi użytkowników (lub logi globalne) z tego okresu
logs = Log.query.filter(Log.timestamp >= cutoff).all()
# Przykładowa analiza: wykryj logi zawierające określone słowa kluczowe
error_keywords = ["błąd", "error", "niepowodzenie", "exception"]
detected = {}
for log in logs:
lower_msg = log.message.lower()
if any(keyword in lower_msg for keyword in error_keywords):
detected.setdefault(log.device_id, []).append(log.message)
# Dla każdego urządzenia, jeżeli wykryto więcej niż określony próg błędów, zapisz anomalię
for device_id, messages in detected.items():
if len(messages) >= 3: # przykładowy próg
description = f"Wykryto {len(messages)} błędne logi w ciągu ostatnich 24 godzin. Przykłady: " + "; ".join(messages[:3])
anomaly = Anomaly(device_id=device_id, description=description)
db.session.add(anomaly)
# Możesz również wysłać powiadomienie, np. e-mail lub Pushover
db.session.commit()
# Harmonogram sprawdzania aktualizacji wykorzystujemy APScheduler
scheduler = BackgroundScheduler()
# Inicjalizacja bazy i schedulera
with app.app_context():
db.create_all() # lub już wcześniej utworzona baza
# Pobranie globalnych ustawień zakładamy, że Settings.query.first() zwróci ustawienia globalne
global_settings = Settings.query.first()
if global_settings and global_settings.check_interval:
interval = global_settings.check_interval
else:
interval = 60
scheduler.add_job(
func=check_all_devices,
trigger="interval",
seconds=interval,
id="check_all_devices",
max_instances=1
)
scheduler.add_job(
func=detect_anomalies,
trigger="interval",
minutes=60,
id="detect_anomalies",
max_instances=1
)
scheduler.add_job(
func=clean_old_changelogs,
trigger="interval",
days=1,
id="clean_changelogs",
max_instances=1
)
scheduler.add_job(
func=lambda: fetch_changelogs(force=False),
trigger="interval",
days=1,
id="daily_changelog_fetch",
max_instances=1
)
app.logger.debug(f"Scheduler initialized with interval: {interval} seconds")
scheduler.start()
# ROUTY APLIKACJI
@app.template_filter('format_version')
def format_version(value):
import re
# Zamieniamy ciąg, który składa się z: liczba.część_1 + czterocyfrowy rok, na samą część_1.
# Przykładowo: "7.182025" => "7.18"
return re.sub(r"(\d+\.\d+)\d{4}", r"\1", value)
@app.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/dashboard')
@login_required
def dashboard():
devices_count = Device.query.count()
pending_updates_count = Device.query.filter_by(update_required=True).count()
logs_count = Log.query.count()
users_count = User.query.count()
anomalies_count = Anomaly.query.filter_by(resolved=False).count()
update_history_count = UpdateHistory.query.count()
recent_logs = Log.query.order_by(Log.timestamp.desc()).limit(5).all()
# Pobieramy najnowsze wersje stabilne dla 7.x i 6.x
latest_version_7 = ChangelogEntry.query.filter_by(category="7.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
latest_version_6 = ChangelogEntry.query.filter_by(category="6.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
return render_template('dashboard.html',
devices_count=devices_count,
pending_updates_count=pending_updates_count,
logs_count=logs_count,
users_count=users_count,
anomalies_count=anomalies_count,
update_history_count=update_history_count,
recent_logs=recent_logs,
latest_version_7=latest_version_7,
latest_version_6=latest_version_6)
# Rejestracja
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
# Prosta walidacja warto rozszerzyć
if User.query.filter_by(username=username).first():
flash("Użytkownik o tej nazwie już istnieje.")
return redirect(url_for('register'))
new_user = User(username=username, email=email)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
# Utwórz domyślne ustawienia dla użytkownika
default_settings = Settings(user_id=new_user.id, check_interval=60)
db.session.add(default_settings)
db.session.commit()
flash("Rejestracja zakończona. Możesz się zalogować.")
return redirect(url_for('login'))
return render_template('register.html')
# Logowanie
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash("Zalogowano pomyślnie.")
return redirect(url_for('dashboard'))
else:
flash("Nieprawidłowa nazwa użytkownika lub hasło.")
return render_template('login.html')
# Wylogowanie
@app.route('/logout')
@login_required
def logout():
logout_user()
flash("Wylogowano.")
return redirect(url_for('index'))
# Lista urządzeń użytkownika
@app.route('/devices')
@login_required
def devices():
user_devices = Device.query.filter_by(user_id=current_user.id).all()
return render_template('devices.html', devices=user_devices)
# Dodawanie urządzenia
@app.route('/device/add', methods=['GET', 'POST'])
@login_required
def add_device():
if request.method == 'POST':
name = request.form.get('name')
ip = request.form['ip']
port = int(request.form.get('port', 8728))
device_username = request.form['device_username']
device_password = request.form['device_password']
use_ssl = bool(request.form.get('use_ssl'))
ssl_insecure = bool(request.form.get('ssl_insecure'))
new_device = Device(
name=name,
ip=ip,
port=port,
device_username=device_username,
device_password=device_password,
use_ssl=use_ssl,
ssl_insecure=ssl_insecure,
user_id=current_user.id
)
db.session.add(new_device)
db.session.commit()
flash("Urządzenie dodane.")
return redirect(url_for('devices'))
return render_template('add_device.html')
# Szczegóły urządzenia
@app.route('/device/<int:device_id>')
@login_required
def device_detail(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
resource_data = {}
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
res_resp = list(api('/system/resource/print'))
if res_resp:
resource_data = res_resp[0]
# Konwersja wartości pamięci i dysku na czytelny format
if 'free-memory' in resource_data:
resource_data['free-memory'] = bytes_to_human(resource_data['free-memory'])
if 'total-memory' in resource_data:
resource_data['total-memory'] = bytes_to_human(resource_data['total-memory'])
if 'free-hdd-space' in resource_data:
resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space'])
except Exception as e:
resource_data = {'error': str(e)}
return render_template('device_detail.html', device=device, resource=resource_data)
# Strona z logami
@app.route('/logs')
@login_required
def logs():
user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all()
return render_template('logs.html', logs=user_logs)
# Strona ustawień powiadomień
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
user_settings = current_user.settings
if request.method == 'POST':
# Aktualizacja ustawień Pushover
user_settings.pushover_user_key = request.form.get('pushover_user_key')
user_settings.pushover_token = request.form.get('pushover_token')
user_settings.pushover_enabled = bool(request.form.get('pushover_enabled'))
# Aktualizacja ustawień SMTP
user_settings.smtp_server = request.form.get('smtp_server')
smtp_port = request.form.get('smtp_port')
user_settings.smtp_port = int(smtp_port) if smtp_port else None
user_settings.smtp_username = request.form.get('smtp_username')
user_settings.smtp_password = request.form.get('smtp_password')
user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled'))
# Aktualizacja adresu e-mail odbiorcy (może być inny niż email z profilu)
user_settings.recipient_email = request.form.get('recipient_email')
# Aktualizacja interwału sprawdzania
interval = request.form.get('check_interval')
user_settings.check_interval = int(interval) if interval else 60
# Aktualizacja retencji logów
retention = request.form.get('log_retention_days')
user_settings.log_retention_days = int(retention) if retention else 30
db.session.commit()
try:
scheduler.reschedule_job("check_all_devices", trigger="interval", seconds=user_settings.check_interval)
app.logger.debug(f"Scheduler rescheduled with new interval: {user_settings.check_interval} seconds")
except Exception as e:
app.logger.error(f"Error rescheduling job: {e}")
flash("Ustawienia zapisane.")
return redirect(url_for('settings'))
return render_template('settings.html', settings=user_settings)
@app.route('/device/<int:device_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
if request.method == 'POST':
device.name = request.form.get('name', device.name)
device.ip = request.form.get('ip', device.ip)
device.port = int(request.form.get('port', device.port or 8728))
device.device_username = request.form.get('device_username', device.device_username)
device.device_password = request.form.get('device_password', device.device_password)
device.branch = request.form.get('branch', device.branch or 'stable')
device.use_ssl = bool(request.form.get('use_ssl'))
device.ssl_insecure = bool(request.form.get('ssl_insecure'))
db.session.commit()
flash("Urządzenie zaktualizowane.")
return redirect(url_for('devices'))
return render_template('edit_device.html', device=device)
@app.route('/device/<int:device_id>/force_check')
@login_required
def force_check(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
result, update_available, current_version, current_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
db.session.commit()
flash("Sprawdzenie urządzenia zakończone.")
return redirect(url_for('devices'))
@app.route('/device/<int:device_id>/update', methods=['POST'])
@login_required
def update_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
try:
app.logger.debug(f"Initiating system update for device {device.ip}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug("Connection established, starting update command")
if device.branch == 'stable':
list(api('/system/package/update/install'))
elif device.branch == 'dev':
list(api('/system/package/update/install', branch='dev'))
elif device.branch == 'beta':
list(api('/system/package/update/install', branch='beta'))
else:
list(api('/system/package/update/install'))
history = UpdateHistory(
device_id=device.id,
update_type="system",
details=f"Aktualizacja systemu rozpoczęta na urządzeniu {device.name or device.ip}."
)
db.session.add(history)
db.session.commit()
flash("Aktualizacja systemu została rozpoczęta.")
app.logger.debug("System update command executed successfully")
except Exception as e:
app.logger.error(f"Błąd podczas aktualizacji urządzenia {device.ip}: {e}", exc_info=True)
flash(f"Błąd podczas aktualizacji: {e}")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/device/<int:device_id>/update_firmware', methods=['POST'])
@login_required
def update_firmware(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
# Przykładowa komenda aktualizacji firmware
list(api('/system/routerboard/upgrade'))
history = UpdateHistory(
device_id=device.id,
update_type="firmware",
details=f"Aktualizacja firmware rozpoczęta na urządzeniu {device.name or device.ip}."
)
db.session.add(history)
db.session.commit()
flash("Aktualizacja firmware została rozpoczęta.")
except Exception as e:
flash(f"Błąd podczas aktualizacji firmware: {e}")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/test_pushover', methods=['POST'])
@login_required
def test_pushover():
message = "To jest testowe powiadomienie Pushover z RouterOS Update."
send_pushover_notification(current_user, message)
flash("Test powiadomienia Pushover wysłany.")
return redirect(url_for('settings'))
@app.route('/test_email', methods=['POST'])
@login_required
def test_email():
subject = "Testowy E-mail z RouterOS Update"
message = "To jest testowa wiadomość e-mail wysłana z RouterOS Update."
send_email_notification(current_user, subject, message)
flash("Testowy e-mail wysłany.")
return redirect(url_for('settings'))
@app.route('/change_password', methods=['GET', 'POST'])
@login_required
def change_password():
if request.method == 'POST':
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not current_user.check_password(old_password):
flash("Stare hasło jest nieprawidłowe.")
return redirect(url_for('reset_password'))
if new_password != confirm_password:
flash("Nowe hasło i potwierdzenie nie są zgodne.")
return redirect(url_for('reset_password'))
current_user.set_password(new_password)
db.session.commit()
flash("Hasło zostało zresetowane.")
return redirect(url_for('reset_password'))
return render_template('change_password.html')
@app.route('/logs/clean', methods=['POST'])
@login_required
def clean_logs():
days = request.form.get('days')
if not days:
flash("Podaj liczbę dni.")
return redirect(url_for('logs'))
try:
days = int(days)
except ValueError:
flash("Niepoprawna wartość dni.")
return redirect(url_for('logs'))
cutoff = datetime.utcnow() - timedelta(days=days)
num_deleted = Log.query.filter(Log.user_id == current_user.id, Log.timestamp < cutoff).delete()
db.session.commit()
flash(f"Usunięto {num_deleted} logów starszych niż {days} dni.")
return redirect(url_for('logs'))
@app.route('/update_history')
@login_required
def update_history():
histories = UpdateHistory.query.join(Device).filter(Device.user_id == current_user.id).order_by(UpdateHistory.timestamp.desc()).all()
return render_template('update_history.html', histories=histories)
@app.route('/anomalies')
@login_required
def anomalies():
anomalies = Anomaly.query.join(Device).filter(Device.user_id == current_user.id).order_by(Anomaly.timestamp.desc()).all()
return render_template('anomalies.html', anomalies=anomalies)
@app.route('/devices/update_selected', methods=['POST'])
@login_required
def update_selected_devices():
selected_ids = request.form.getlist('selected_devices')
if not selected_ids:
flash("Nie wybrano żadnych urządzeń.")
return redirect(url_for('devices'))
for device_id in selected_ids:
device = Device.query.get(device_id)
if device and device.user_id == current_user.id:
result, update_available, current_version, current_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
db.session.commit()
# Dodaj log dla aktualizacji
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
db.session.add(log_entry)
db.session.commit()
flash("Wybrane urządzenia zostały zaktualizowane.")
return redirect(url_for('devices'))
@app.route('/routeros_changelog')
@login_required
def routeros_changelog():
channel = request.args.get('channel', 'stable') # "stable", "rc" lub "beta"
series = request.args.get('series', '7.x') # "7.x" lub "6.x"
selected_version = request.args.get('version')
# Pobieramy wszystkie wpisy dla danego kanału i serii, posortowane malejąco wg daty
entries = ChangelogEntry.query.filter_by(release_type=channel, category=series).order_by(ChangelogEntry.timestamp.desc()).all()
if selected_version:
selected_entry = ChangelogEntry.query.filter_by(version=selected_version, release_type=channel, category=series).first()
else:
selected_entry = entries[0] if entries else None
return render_template(
'routeros_changelog_tabs.html',
channel=channel,
series=series,
entries=entries,
selected_entry=selected_entry
)
@app.route('/force_fetch_changelogs')
@login_required
def force_fetch_changelogs():
with app.app_context():
# Usuwamy wszystkie stare wpisy
db.session.query(ChangelogEntry).delete()
db.session.commit()
# Pobieramy changelogi od nowa
fetch_changelogs(force=True)
flash("Changelog został całkowicie odświeżony.", "success")
return redirect(url_for('routeros_changelog'))
# Zamknięcie harmonogramu przy zatrzymaniu aplikacji
atexit.register(lambda: scheduler.shutdown())
if __name__ == '__main__':
scheduler.add_job(func=clean_old_logs, trigger="interval", days=1)
app.run(host='0.0.0.0', port=5582, use_reloader=False, debug=True)