2025-03-04 11:00:51 +01:00

1085 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# app.py
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from apscheduler.schedulers.background import BackgroundScheduler
import librouteros
import threading
import time
import requests
import smtplib
from email.mime.text import MIMEText
from flask import current_app as app
from flask import render_template
import atexit
from datetime import timedelta, datetime
import requests
from bs4 import BeautifulSoup
from flask import render_template, flash
import logging
import re
try:
from dateutil import parser as date_parser
except ImportError:
date_parser = None
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO)
# Konfiguracja aplikacji
app = Flask(__name__)
app.config['SECRET_KEY'] = 'twoj-sekret-klucz'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Konfiguracja Flask-Login
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# MODELE BAZY DANYCH
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
devices = db.relationship('Device', backref='owner', lazy=True)
settings = db.relationship('Settings', uselist=False, backref='user')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Device(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120))
ip = db.Column(db.String(120), nullable=False)
port = db.Column(db.Integer, default=8728)
device_username = db.Column(db.String(120), nullable=False)
device_password = db.Column(db.String(120), nullable=False)
branch = db.Column(db.String(20), default="stable")
update_required = db.Column(db.Boolean, default=False)
last_check = db.Column(db.DateTime)
last_log = db.Column(db.Text)
current_version = db.Column(db.String(50))
current_firmware = db.Column(db.String(50))
upgrade_firmware = db.Column(db.String(50))
use_ssl = db.Column(db.Boolean, default=False) # Czy używać SSL?
ssl_insecure = db.Column(db.Boolean, default=False) # Jeśli True nie weryfikować certyfikatu SSL
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
class Settings(db.Model):
id = db.Column(db.Integer, primary_key=True)
pushover_user_key = db.Column(db.String(255))
pushover_token = db.Column(db.String(255))
pushover_enabled = db.Column(db.Boolean, default=False)
smtp_server = db.Column(db.String(255))
smtp_port = db.Column(db.Integer)
smtp_username = db.Column(db.String(255))
smtp_password = db.Column(db.String(255))
email_notifications_enabled = db.Column(db.Boolean, default=False)
check_interval = db.Column(db.Integer, default=60)
log_retention_days = db.Column(db.Integer, default=30)
recipient_email = db.Column(db.String(120))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True)
class Log(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
message = db.Column(db.Text)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
device = db.relationship('Device', backref='logs')
class UpdateHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
update_type = db.Column(db.String(50))
details = db.Column(db.Text)
device = db.relationship('Device', backref='update_histories')
class Anomaly(db.Model):
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'), nullable=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
description = db.Column(db.Text)
resolved = db.Column(db.Boolean, default=False)
device = db.relationship('Device', backref='anomalies')
class ChangelogEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
version = db.Column(db.String(50), nullable=False)
details = db.Column(db.Text, nullable=False)
category = db.Column(db.String(10), nullable=False) # "6.x" or "7.x"
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
release_type = db.Column(db.String(10), nullable=False, default="stable")
# Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu)
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# FUNKCJE POWIADOMIEŃ
def send_pushover_notification(user, message):
if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token:
return
data = {
"token": user.settings.pushover_token,
"user": user.settings.pushover_user_key,
"message": message
}
try:
r = requests.post("https://api.pushover.net/1/messages.json", data=data)
except Exception as e:
print("Błąd przy wysyłaniu powiadomienia Pushover:", e)
def send_email_notification(user, subject, message):
if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server:
return
try:
html_body = get_email_template(subject, message)
msg = MIMEText(html_body, 'html')
msg["Subject"] = subject
msg["From"] = user.settings.smtp_username
# Używamy adresu z ustawień, jeśli został podany, lub domyślnie adresu z profilu użytkownika
to_email = user.settings.recipient_email if user.settings.recipient_email else user.email
msg["To"] = to_email
s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port)
s.starttls()
s.login(user.settings.smtp_username, user.settings.smtp_password)
s.sendmail(user.settings.smtp_username, [to_email], msg.as_string())
s.quit()
app.logger.debug("E-mail wysłany pomyślnie")
except Exception as e:
app.logger.error(f"Błąd przy wysyłaniu powiadomienia e-mail: {e}", exc_info=True)
# FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA
def check_device_update(device):
log_entries = []
update_available = False
current_version = None
current_firmware = None
upgrade_firmware = None
try:
app.logger.debug(f"Connecting to device {device.ip}:{device.port} using SSL: {device.use_ssl}, ssl_verify: {not device.ssl_insecure}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug(f"Connection established to {device.ip}")
# Pobranie informacji o tożsamości urządzenia
identity_resp = list(api('/system/identity/print'))
app.logger.debug(f"Identity response: {identity_resp}")
if identity_resp:
identity = identity_resp[0].get('name', '')
log_entries.append(f"Identity: {identity}")
# Pobranie wersji systemu
resource_resp = list(api('/system/resource/print'))
app.logger.debug(f"Resource response: {resource_resp}")
if resource_resp:
version = resource_resp[0].get('version', '')
current_version = version
log_entries.append(f"System Version: {version}")
# Pobranie informacji o routerboard (firmware)
board_resp = list(api('/system/routerboard/print'))
app.logger.debug(f"Routerboard response: {board_resp}")
if board_resp:
board_info = board_resp[0]
# Pobieramy oddzielnie obie wersje:
current_fw = board_info.get('firmware',
board_info.get('firmware-version',
board_info.get('current-firmware', 'N/A')))
upgrade_fw = board_info.get('upgrade-firmware', 'N/A')
current_firmware = current_fw
upgrade_firmware = upgrade_fw
log_entries.append(f"Firmware: {current_fw}")
log_entries.append(f"Upgrade Firmware: {upgrade_fw}")
# Sprawdzenie dostępnych aktualizacji
log_entries.append("Checking for updates...")
list(api('/system/package/update/check-for-updates'))
for _ in range(10):
time.sleep(1)
status_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update status response: {status_resp}")
if status_resp:
status = status_resp[0].get('status', '').lower()
if 'checking' not in status:
log_entries.append(f"Update check completed. Status: {status}")
break
update_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update response: {update_resp}")
if update_resp:
for res in update_resp:
installed = res.get('installed-version', '')
latest = res.get('latest-version', '')
if latest and latest != installed:
log_entries.append(f"Updates available: {installed} -> {latest}")
update_available = True
else:
log_entries.append("No updates available.")
# Zwracamy 5-elementową krotkę
return "\n".join(log_entries), update_available, current_version, current_firmware, upgrade_firmware
except Exception as e:
app.logger.error(f"Error connecting to device {device.ip}: {e}", exc_info=True)
return f"Error: {str(e)}", False, None, None, None
def get_email_template(subject, message):
return f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}}
.container {{
max-width: 600px;
margin: 20px auto;
background-color: #ffffff;
padding: 20px;
border-radius: 5px;
box-shadow: 0 0 10px rgba(0,0,0,0.1);
}}
.header {{
background-color: #007bff;
color: #ffffff;
padding: 10px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
margin: 20px 0;
font-size: 16px;
line-height: 1.5;
}}
.footer {{
text-align: center;
font-size: 12px;
color: #777777;
border-top: 1px solid #dddddd;
padding-top: 10px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>{subject}</h2>
</div>
<div class="content">
<p>{message}</p>
</div>
<div class="footer">
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Update</p>
</div>
</div>
</body>
</html>
"""
def clean_old_changelogs():
with app.app_context():
cutoff_time = datetime.utcnow() - timedelta(days=7)
db.session.query(ChangelogEntry).filter(ChangelogEntry.timestamp < cutoff_time).delete()
db.session.commit()
def check_all_devices():
with app.app_context():
devices = Device.query.all()
for device in devices:
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
device.upgrade_firmware = upgrade_firmware
db.session.commit()
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
db.session.add(log_entry)
db.session.commit()
if update_available:
user = device.owner
message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację."
send_pushover_notification(user, message)
send_email_notification(user, "Aktualizacja dostępna", message)
def bytes_to_human(n):
try:
n = int(n)
except Exception:
return n
i = 0
while n >= 1024 and i < 5:
n /= 1024.0
i += 1
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
return f"{n:.2f} {units[i]}"
def clean_old_logs():
with app.app_context():
all_settings = Settings.query.all()
for setting in all_settings:
if setting.log_retention_days:
cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days)
# Usuwamy logi starsze niż cutoff dla danego użytkownika
Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete()
db.session.commit()
def parse_release_date(text):
"""
Próbuje wyłuskać datę z tekstu.
Najpierw używa regex, a w razie niepowodzenia dateutil (jeśli dostępny).
"""
date_match = re.search(r"\((\d{4})-([A-Za-z]{3})-(\d{2})", text)
if date_match:
try:
return datetime.strptime(f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}", "%Y-%b-%d")
except Exception as e:
logging.error(f"Błąd parsowania daty przy użyciu strptime: {e}")
if date_parser:
try:
return date_parser.parse(text, fuzzy=True)
except Exception as e:
logging.error(f"Błąd parsowania daty przy użyciu dateutil: {e}")
return None
def get_release_type(version_text):
"""
Określa typ wydania na podstawie numeru wersji:
- Jeśli w tekście występuje "beta" zwraca "beta"
- Jeśli w tekście występuje "rc" zwraca "rc"
- W przeciwnym wypadku "stable"
"""
lower = version_text.lower()
if "beta" in lower:
return "beta"
elif "rc" in lower:
return "rc"
else:
return "stable"
@app.template_filter('format_version')
def format_version(value):
# Krok 1: Usuń wszystko w nawiasach (np. „(2025-Feb-12 11:20)”).
value = re.sub(r'\(.*?\)', '', value).strip()
# Krok 2: Spróbuj dopasować strukturę: major.minor(.patch)?(beta|rc)(num)?
# 1) major (np. 7)
# 2) minor (np. 18)
# 3) patch (np. 20)
# 4) suffix (beta|rc)
# 5) suffixnum (np. 2025)
pattern = r'^(\d+)' # grupa 1: major
pattern += r'(?:\.(\d+))?' # grupa 2: minor (opcjonalnie)
pattern += r'(?:\.(\d+))?' # grupa 3: patch (opcjonalnie)
pattern += r'(?:(beta|rc)(\d+))?'# grupa 4 i 5: beta|rc + numer (opcjonalnie)
match = re.match(pattern, value)
if not match:
# Jeśli nie uda się dopasować, zwracamy oryginał
return value
major, minor, patch, suffix, suffixnum = match.groups()
# Funkcja pomocnicza do przycinania łańcucha cyfr do maksymalnie 2 znaków
def truncate_2_digits(num_str):
return num_str[:2] if len(num_str) > 2 else num_str
# Składamy główne części wersji
result = major
if minor:
# np. "182025" → "18"
result += '.' + truncate_2_digits(minor)
if patch:
# np. "182025" → "18"
result += '.' + truncate_2_digits(patch)
if suffix:
# Dodajemy samo "beta"/"rc"
result += suffix
if suffixnum:
# 1) Najpierw sprawdźmy, czy w numerze sufiksu jest (zlepiony) rok, np. "22025" → "2" i "2025"
# Wzorzec: do 2 cyfr + "20xx", np. "14" + "2025", "2" + "2025" itd.
m_year = re.match(r'^(\d{1,2})(20\d{2})$', suffixnum)
if m_year:
# Jeśli tak, zostawiamy tylko pierwszą grupę (np. "14" z "14"+"2025")
suffixnum = m_year.group(1)
# 2) Ostatecznie przycinamy do 2 cyfr (jeśli ktoś wpisał np. "beta123")
suffixnum = truncate_2_digits(suffixnum)
result += suffixnum
return result
@app.template_global()
def bootstrap_alert_category(cat):
mapping = {
'error': 'danger',
'fail': 'danger',
'warning': 'warning',
'warn': 'warning',
'ok': 'success',
'success': 'success',
'info': 'info'
}
return mapping.get(cat.lower(), 'info')
def fetch_changelogs(force=False):
changelog_url = "https://mikrotik.com/download/changelogs"
current_date = datetime.utcnow()
def process_section(section):
a_tag = section.find("a")
if not a_tag:
return None
raw_text = a_tag.get_text(strip=True)
logging.info(f"raw_text: {raw_text}")
# Najpierw próbujemy znaleźć wersję za pomocą wzorca z "in"
match = re.search(r"in\s+(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)", raw_text)
if match:
version_text = match.group(1)
logging.info(f"Parsed version (pattern 1): {version_text}")
else:
# Jeśli nie znaleziono, próbujemy wychwycić wersję na początku łańcucha z dołączoną datą
match = re.match(r"^(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)(\d{4}-\d{2}-\d{2})", raw_text)
if match:
version_text = match.group(1)
logging.info(f"Parsed version (pattern 2): {version_text}")
else:
version_text = raw_text
logging.info("Brak dopasowania regex, używam raw_text jako wersji")
# Pomijamy wersje, które nie zaczynają się od "6." lub "7."
if not (version_text.startswith("6.") or version_text.startswith("7.")):
logging.info(f"Pomijam wersję {version_text} nie jest 6.x ani 7.x")
return None
details_div = section.find("div", class_="content")
changelog_file_url = details_div.get("data-url") if details_div else None
if not changelog_file_url:
logging.warning(f"Brak URL changeloga dla wersji {version_text}")
return None
try:
changelog_response = requests.get(changelog_file_url, timeout=10)
changelog_response.raise_for_status()
changelog_lines = changelog_response.text.splitlines()
if not changelog_lines:
logging.warning(f"Pusty changelog dla wersji {version_text}")
return None
first_line = changelog_lines[0].strip()
release_date_dt = parse_release_date(first_line)
if release_date_dt is None:
logging.warning(f"Nie udało się wyłuskać daty dla wersji {version_text}, pomijam ten changelog")
return None
changelog_text = "\n".join(changelog_lines).strip()
except Exception as e:
logging.error(f"Błąd pobierania changeloga z {changelog_file_url}: {e}")
return None
# Filtrowanie według daty: dla 7.x pomijamy wersje starsze niż 1 rok, dla 6.x starsze niż 2 lata
if version_text.startswith("7.") and release_date_dt < (current_date - timedelta(days=365)):
logging.info(f"Pomijam wersję {version_text} - starsza niż 1 rok")
return None
if version_text.startswith("6.") and release_date_dt < (current_date - timedelta(days=730)):
logging.info(f"Pomijam wersję {version_text} - starsza niż 2 lata")
return None
release_type = get_release_type(version_text)
return {
"version": version_text,
"details": changelog_text,
"category": "6.x" if version_text.startswith("6.") else "7.x",
"timestamp": release_date_dt,
"release_type": release_type
}
try:
logging.info(f"Pobieranie changelogów z {changelog_url}...")
response = requests.get(changelog_url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
changelog_sections = soup.find_all("li", class_="accordion-navigation")
logging.info(f"Znaleziono {len(changelog_sections)} sekcji changelogów.")
new_entries = 0
results = []
# Używamy równoległego przetwarzania sekcji
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(process_section, section) for section in changelog_sections]
for future in as_completed(futures):
result = future.result()
if result is not None:
results.append(result)
# Dodajemy wyniki do bazy danych
for entry in results:
new_entry = ChangelogEntry(
version=entry["version"],
details=entry["details"],
category=entry["category"],
timestamp=entry["timestamp"],
release_type=entry["release_type"]
)
db.session.add(new_entry)
new_entries += 1
db.session.commit()
logging.info(f"Nowe wpisy dodane: {new_entries}")
except Exception as e:
logging.error(f"Błąd podczas pobierania changelogów: {e}")
def detect_anomalies():
with app.app_context():
# Ustal okres analizy, np. ostatnie 24 godziny
cutoff = datetime.utcnow() - timedelta(hours=24)
# Pobierz logi użytkowników (lub logi globalne) z tego okresu
logs = Log.query.filter(Log.timestamp >= cutoff).all()
# Przykładowa analiza: wykryj logi zawierające określone słowa kluczowe
error_keywords = ["błąd", "error", "niepowodzenie", "exception"]
detected = {}
for log in logs:
lower_msg = log.message.lower()
if any(keyword in lower_msg for keyword in error_keywords):
detected.setdefault(log.device_id, []).append(log.message)
# Dla każdego urządzenia, jeżeli wykryto więcej niż określony próg błędów, zapisz anomalię
for device_id, messages in detected.items():
if len(messages) >= 3: # przykładowy próg
description = f"Wykryto {len(messages)} błędne logi w ciągu ostatnich 24 godzin. Przykłady: " + "; ".join(messages[:3])
anomaly = Anomaly(device_id=device_id, description=description)
db.session.add(anomaly)
# Możesz również wysłać powiadomienie, np. e-mail lub Pushover
db.session.commit()
# Harmonogram sprawdzania aktualizacji wykorzystujemy APScheduler
scheduler = BackgroundScheduler()
# Inicjalizacja bazy i schedulera
with app.app_context():
db.create_all() # lub już wcześniej utworzona baza
# Pobranie globalnych ustawień zakładamy, że Settings.query.first() zwróci ustawienia globalne
global_settings = Settings.query.first()
if global_settings and global_settings.check_interval:
interval = global_settings.check_interval
else:
interval = 60
scheduler.add_job(
func=check_all_devices,
trigger="interval",
seconds=interval,
id="check_all_devices",
max_instances=1
)
scheduler.add_job(
func=detect_anomalies,
trigger="interval",
minutes=60,
id="detect_anomalies",
max_instances=1
)
scheduler.add_job(
func=clean_old_changelogs,
trigger="interval",
days=1,
id="clean_changelogs",
max_instances=1
)
scheduler.add_job(
func=lambda: fetch_changelogs(force=False),
trigger="interval",
days=1,
id="daily_changelog_fetch",
max_instances=1
)
app.logger.debug(f"Scheduler initialized with interval: {interval} seconds")
scheduler.start()
# ROUTY APLIKACJI
@app.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/dashboard')
@login_required
def dashboard():
devices_count = Device.query.count()
pending_updates_count = Device.query.filter_by(update_required=True).count()
logs_count = Log.query.count()
users_count = User.query.count()
anomalies_count = Anomaly.query.filter_by(resolved=False).count()
update_history_count = UpdateHistory.query.count()
recent_logs = Log.query.order_by(Log.timestamp.desc()).limit(5).all()
# Pobieramy najnowsze wersje stabilne dla 7.x i 6.x
latest_version_7 = ChangelogEntry.query.filter_by(category="7.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
latest_version_6 = ChangelogEntry.query.filter_by(category="6.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
return render_template('dashboard.html',
devices_count=devices_count,
pending_updates_count=pending_updates_count,
logs_count=logs_count,
users_count=users_count,
anomalies_count=anomalies_count,
update_history_count=update_history_count,
recent_logs=recent_logs,
latest_version_7=latest_version_7,
latest_version_6=latest_version_6)
# Rejestracja
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
# Prosta walidacja warto rozszerzyć
if User.query.filter_by(username=username).first():
flash("Użytkownik o tej nazwie już istnieje.", "error")
return redirect(url_for('register'))
new_user = User(username=username, email=email)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
# Utwórz domyślne ustawienia dla użytkownika
default_settings = Settings(user_id=new_user.id, check_interval=60)
db.session.add(default_settings)
db.session.commit()
flash("Rejestracja zakończona. Możesz się zalogować.", "success")
return redirect(url_for('login'))
return render_template('register.html')
# Logowanie
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash("Zalogowano pomyślnie.", "success")
return redirect(url_for('dashboard'))
else:
flash("Nieprawidłowa nazwa użytkownika lub hasło.", "error")
return render_template('login.html')
# Wylogowanie
@app.route('/logout')
@login_required
def logout():
logout_user()
flash("Wylogowano.", "success")
return redirect(url_for('index'))
# Lista urządzeń użytkownika
@app.route('/devices')
@login_required
def devices():
user_devices = Device.query.filter_by(user_id=current_user.id).all()
return render_template('devices.html', devices=user_devices)
# Dodawanie urządzenia
@app.route('/device/add', methods=['GET', 'POST'])
@login_required
def add_device():
if request.method == 'POST':
name = request.form.get('name')
ip = request.form['ip']
port = int(request.form.get('port', 8728))
device_username = request.form['device_username']
device_password = request.form['device_password']
use_ssl = bool(request.form.get('use_ssl'))
ssl_insecure = bool(request.form.get('ssl_insecure'))
new_device = Device(
name=name,
ip=ip,
port=port,
device_username=device_username,
device_password=device_password,
use_ssl=use_ssl,
ssl_insecure=ssl_insecure,
user_id=current_user.id
)
db.session.add(new_device)
db.session.commit()
flash("Urządzenie dodane.", "success")
return redirect(url_for('devices'))
return render_template('add_device.html')
# Szczegóły urządzenia
@app.route('/device/<int:device_id>')
@login_required
def device_detail(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
resource_data = {}
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
res_resp = list(api('/system/resource/print'))
if res_resp:
resource_data = res_resp[0]
# Konwersja wartości pamięci i dysku na czytelny format
if 'free-memory' in resource_data:
resource_data['free-memory'] = bytes_to_human(resource_data['free-memory'])
if 'total-memory' in resource_data:
resource_data['total-memory'] = bytes_to_human(resource_data['total-memory'])
if 'free-hdd-space' in resource_data:
resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space'])
except Exception as e:
resource_data = {'error': str(e)}
return render_template('device_detail.html', device=device, resource=resource_data)
# Strona z logami
@app.route('/logs')
@login_required
def logs():
user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all()
return render_template('logs.html', logs=user_logs)
# Strona ustawień powiadomień
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
user_settings = current_user.settings
if request.method == 'POST':
# Aktualizacja ustawień Pushover
user_settings.pushover_user_key = request.form.get('pushover_user_key')
user_settings.pushover_token = request.form.get('pushover_token')
user_settings.pushover_enabled = bool(request.form.get('pushover_enabled'))
# Aktualizacja ustawień SMTP
user_settings.smtp_server = request.form.get('smtp_server')
smtp_port = request.form.get('smtp_port')
user_settings.smtp_port = int(smtp_port) if smtp_port else None
user_settings.smtp_username = request.form.get('smtp_username')
user_settings.smtp_password = request.form.get('smtp_password')
user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled'))
# Aktualizacja adresu e-mail odbiorcy (może być inny niż email z profilu)
user_settings.recipient_email = request.form.get('recipient_email')
# Aktualizacja interwału sprawdzania
interval = request.form.get('check_interval')
user_settings.check_interval = int(interval) if interval else 60
# Aktualizacja retencji logów
retention = request.form.get('log_retention_days')
user_settings.log_retention_days = int(retention) if retention else 30
db.session.commit()
try:
scheduler.reschedule_job("check_all_devices", trigger="interval", seconds=user_settings.check_interval)
app.logger.debug(f"Scheduler rescheduled with new interval: {user_settings.check_interval} seconds")
except Exception as e:
app.logger.error(f"Error rescheduling job: {e}")
flash("Ustawienia zapisane.", "success")
return redirect(url_for('settings'))
return render_template('settings.html', settings=user_settings)
@app.route('/device/<int:device_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
if request.method == 'POST':
device.name = request.form.get('name', device.name)
device.ip = request.form.get('ip', device.ip)
device.port = int(request.form.get('port', device.port or 8728))
device.device_username = request.form.get('device_username', device.device_username)
device.device_password = request.form.get('device_password', device.device_password)
device.branch = request.form.get('branch', device.branch or 'stable')
device.use_ssl = bool(request.form.get('use_ssl'))
device.ssl_insecure = bool(request.form.get('ssl_insecure'))
db.session.commit()
flash("Urządzenie zaktualizowane.", "success")
return redirect(url_for('devices'))
return render_template('edit_device.html', device=device)
@app.route('/device/<int:device_id>/force_check')
@login_required
def force_check(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
device.upgrade_firmware = upgrade_firmware
db.session.commit()
flash("Sprawdzenie urządzenia zakończone.", "success")
return redirect(url_for('devices'))
@app.route('/device/<int:device_id>/update', methods=['POST'])
@login_required
def update_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
try:
app.logger.debug(f"Initiating system update for device {device.ip}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug("Connection established, starting update command")
if device.branch == 'stable':
list(api('/system/package/update/install'))
elif device.branch == 'dev':
list(api('/system/package/update/install', branch='dev'))
elif device.branch == 'beta':
list(api('/system/package/update/install', branch='beta'))
else:
list(api('/system/package/update/install'))
history = UpdateHistory(
device_id=device.id,
update_type="system",
details=f"Aktualizacja systemu rozpoczęta na urządzeniu {device.name or device.ip}."
)
db.session.add(history)
db.session.commit()
flash("Aktualizacja systemu została rozpoczęta.", "success")
app.logger.debug("System update command executed successfully")
except Exception as e:
app.logger.error(f"Błąd podczas aktualizacji urządzenia {device.ip}: {e}", exc_info=True)
flash(f"Błąd podczas aktualizacji: {e}", "error")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/device/<int:device_id>/update_firmware', methods=['POST'])
@login_required
def update_firmware(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
# Przykładowa komenda aktualizacji firmware
list(api('/system/routerboard/upgrade'))
history = UpdateHistory(
device_id=device.id,
update_type="firmware",
details=f"Aktualizacja firmware rozpoczęta na urządzeniu {device.name or device.ip}."
)
db.session.add(history)
db.session.commit()
flash("Aktualizacja firmware została rozpoczęta.", "success")
except Exception as e:
flash(f"Błąd podczas aktualizacji firmware: {e}", "error")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/test_pushover', methods=['POST'])
@login_required
def test_pushover():
message = "To jest testowe powiadomienie Pushover z RouterOS Update."
send_pushover_notification(current_user, message)
flash("Test powiadomienia Pushover wysłany.", "success")
return redirect(url_for('settings'))
@app.route('/test_email', methods=['POST'])
@login_required
def test_email():
subject = "Testowy E-mail z RouterOS Update"
message = "To jest testowa wiadomość e-mail wysłana z RouterOS Update."
send_email_notification(current_user, subject, message)
flash("Testowy e-mail wysłany.", "success")
return redirect(url_for('settings'))
@app.route('/change_password', methods=['GET', 'POST'])
@login_required
def change_password():
if request.method == 'POST':
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not current_user.check_password(old_password):
flash("Stare hasło jest nieprawidłowe.", "error")
return redirect(url_for('reset_password'))
if new_password != confirm_password:
flash("Nowe hasło i potwierdzenie nie są zgodne.", "warning")
return redirect(url_for('reset_password'))
current_user.set_password(new_password)
db.session.commit()
flash("Hasło zostało zresetowane.", "success")
return redirect(url_for('reset_password'))
return render_template('change_password.html')
@app.route('/logs/clean', methods=['POST'])
@login_required
def clean_logs():
days = request.form.get('days')
if not days:
flash("Podaj liczbę dni.", "warning")
return redirect(url_for('logs'))
try:
days = int(days)
except ValueError:
flash("Niepoprawna wartość dni.", "warning")
return redirect(url_for('logs'))
cutoff = datetime.utcnow() - timedelta(days=days)
num_deleted = Log.query.filter(Log.user_id == current_user.id, Log.timestamp < cutoff).delete()
db.session.commit()
flash(f"Usunięto {num_deleted} logów starszych niż {days} dni.", "success")
return redirect(url_for('logs'))
@app.route('/update_history')
@login_required
def update_history():
histories = UpdateHistory.query.join(Device).filter(Device.user_id == current_user.id).order_by(UpdateHistory.timestamp.desc()).all()
return render_template('update_history.html', histories=histories)
@app.route('/anomalies')
@login_required
def anomalies():
anomalies = Anomaly.query.join(Device).filter(Device.user_id == current_user.id).order_by(Anomaly.timestamp.desc()).all()
return render_template('anomalies.html', anomalies=anomalies)
@app.route('/devices/update_selected', methods=['POST'])
@login_required
def update_selected_devices():
selected_ids = request.form.getlist('selected_devices')
if not selected_ids:
flash("Nie wybrano żadnych urządzeń.", "error")
return redirect(url_for('devices'))
for device_id in selected_ids:
device = Device.query.get(device_id)
if device and device.user_id == current_user.id:
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
device.upgrade_firmware = upgrade_firmware
db.session.commit()
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
db.session.add(log_entry)
db.session.commit()
flash("Wybrane urządzenia zostały zaktualizowane.", "success")
return redirect(url_for('devices'))
@app.route('/routeros_changelog')
@login_required
def routeros_changelog():
channel = request.args.get('channel', 'stable') # "stable", "rc" lub "beta"
series = request.args.get('series', '7.x') # "7.x" lub "6.x"
selected_version = request.args.get('version')
# Pobieramy wszystkie wpisy dla danego kanału i serii, posortowane malejąco wg daty
entries = ChangelogEntry.query.filter_by(release_type=channel, category=series).order_by(ChangelogEntry.timestamp.desc()).all()
if selected_version:
selected_entry = ChangelogEntry.query.filter_by(version=selected_version, release_type=channel, category=series).first()
else:
selected_entry = entries[0] if entries else None
return render_template(
'routeros_changelog_tabs.html',
channel=channel,
series=series,
entries=entries,
selected_entry=selected_entry
)
@app.route('/force_fetch_changelogs')
@login_required
def force_fetch_changelogs():
with app.app_context():
# Usuwamy wszystkie stare wpisy
db.session.query(ChangelogEntry).delete()
db.session.commit()
# Pobieramy changelogi od nowa
fetch_changelogs(force=True)
flash("Changelog został całkowicie odświeżony.", "success")
return redirect(url_for('routeros_changelog'))
@app.route('/device/<int:device_id>/restart', methods=['POST'])
@login_required
def restart_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.", "error")
return redirect(url_for('devices'))
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
# Wysyłamy komendę reboot
list(api('/system/reboot'))
flash("Komenda reboot została wysłana.", "success")
except Exception as e:
flash(f"Błąd podczas wysyłania komendy reboot: {e}", "error")
return ('', 204) # Zwracamy odpowiedź bez treści dla żądania AJAX
# Zamknięcie harmonogramu przy zatrzymaniu aplikacji
atexit.register(lambda: scheduler.shutdown())
if __name__ == '__main__':
scheduler.add_job(func=clean_old_logs, trigger="interval", days=1)
app.run(host='0.0.0.0', port=5582, use_reloader=False, debug=True)