1085 lines
43 KiB
Python
1085 lines
43 KiB
Python
#!/usr/bin/env python3
|
||
# app.py
|
||
|
||
from flask import Flask, render_template, request, redirect, url_for, flash, session
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
import librouteros
|
||
import threading
|
||
import time
|
||
import requests
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
from flask import current_app as app
|
||
from flask import render_template
|
||
import atexit
|
||
from datetime import timedelta, datetime
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from flask import render_template, flash
|
||
import logging
|
||
import re
|
||
try:
|
||
from dateutil import parser as date_parser
|
||
except ImportError:
|
||
date_parser = None
|
||
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
# Konfiguracja aplikacji
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = 'twoj-sekret-klucz'
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||
|
||
db = SQLAlchemy(app)
|
||
|
||
# Konfiguracja Flask-Login
|
||
login_manager = LoginManager(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
# MODELE BAZY DANYCH
|
||
class User(UserMixin, db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||
email = db.Column(db.String(120), unique=True, nullable=False)
|
||
password_hash = db.Column(db.String(128), nullable=False)
|
||
devices = db.relationship('Device', backref='owner', lazy=True)
|
||
settings = db.relationship('Settings', uselist=False, backref='user')
|
||
|
||
def set_password(self, password):
|
||
self.password_hash = generate_password_hash(password)
|
||
|
||
def check_password(self, password):
|
||
return check_password_hash(self.password_hash, password)
|
||
class Device(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(120))
|
||
ip = db.Column(db.String(120), nullable=False)
|
||
port = db.Column(db.Integer, default=8728)
|
||
device_username = db.Column(db.String(120), nullable=False)
|
||
device_password = db.Column(db.String(120), nullable=False)
|
||
branch = db.Column(db.String(20), default="stable")
|
||
update_required = db.Column(db.Boolean, default=False)
|
||
last_check = db.Column(db.DateTime)
|
||
last_log = db.Column(db.Text)
|
||
current_version = db.Column(db.String(50))
|
||
current_firmware = db.Column(db.String(50))
|
||
upgrade_firmware = db.Column(db.String(50))
|
||
use_ssl = db.Column(db.Boolean, default=False) # Czy używać SSL?
|
||
ssl_insecure = db.Column(db.Boolean, default=False) # Jeśli True – nie weryfikować certyfikatu SSL
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
class Settings(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
pushover_user_key = db.Column(db.String(255))
|
||
pushover_token = db.Column(db.String(255))
|
||
pushover_enabled = db.Column(db.Boolean, default=False)
|
||
smtp_server = db.Column(db.String(255))
|
||
smtp_port = db.Column(db.Integer)
|
||
smtp_username = db.Column(db.String(255))
|
||
smtp_password = db.Column(db.String(255))
|
||
email_notifications_enabled = db.Column(db.Boolean, default=False)
|
||
check_interval = db.Column(db.Integer, default=60)
|
||
log_retention_days = db.Column(db.Integer, default=30)
|
||
recipient_email = db.Column(db.String(120))
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True)
|
||
class Log(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
message = db.Column(db.Text)
|
||
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
||
device = db.relationship('Device', backref='logs')
|
||
class UpdateHistory(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
update_type = db.Column(db.String(50))
|
||
details = db.Column(db.Text)
|
||
device = db.relationship('Device', backref='update_histories')
|
||
class Anomaly(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
device_id = db.Column(db.Integer, db.ForeignKey('device.id'), nullable=True)
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
description = db.Column(db.Text)
|
||
resolved = db.Column(db.Boolean, default=False)
|
||
device = db.relationship('Device', backref='anomalies')
|
||
class ChangelogEntry(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
version = db.Column(db.String(50), nullable=False)
|
||
details = db.Column(db.Text, nullable=False)
|
||
category = db.Column(db.String(10), nullable=False) # "6.x" or "7.x"
|
||
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
|
||
release_type = db.Column(db.String(10), nullable=False, default="stable")
|
||
|
||
# Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu)
|
||
with app.app_context():
|
||
db.create_all()
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
return User.query.get(int(user_id))
|
||
|
||
# FUNKCJE POWIADOMIEŃ
|
||
def send_pushover_notification(user, message):
|
||
if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token:
|
||
return
|
||
data = {
|
||
"token": user.settings.pushover_token,
|
||
"user": user.settings.pushover_user_key,
|
||
"message": message
|
||
}
|
||
try:
|
||
r = requests.post("https://api.pushover.net/1/messages.json", data=data)
|
||
except Exception as e:
|
||
print("Błąd przy wysyłaniu powiadomienia Pushover:", e)
|
||
|
||
def send_email_notification(user, subject, message):
|
||
if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server:
|
||
return
|
||
try:
|
||
html_body = get_email_template(subject, message)
|
||
msg = MIMEText(html_body, 'html')
|
||
msg["Subject"] = subject
|
||
msg["From"] = user.settings.smtp_username
|
||
# Używamy adresu z ustawień, jeśli został podany, lub domyślnie adresu z profilu użytkownika
|
||
to_email = user.settings.recipient_email if user.settings.recipient_email else user.email
|
||
msg["To"] = to_email
|
||
|
||
s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port)
|
||
s.starttls()
|
||
s.login(user.settings.smtp_username, user.settings.smtp_password)
|
||
s.sendmail(user.settings.smtp_username, [to_email], msg.as_string())
|
||
s.quit()
|
||
app.logger.debug("E-mail wysłany pomyślnie")
|
||
except Exception as e:
|
||
app.logger.error(f"Błąd przy wysyłaniu powiadomienia e-mail: {e}", exc_info=True)
|
||
|
||
# FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA
|
||
def check_device_update(device):
|
||
log_entries = []
|
||
update_available = False
|
||
current_version = None
|
||
current_firmware = None
|
||
upgrade_firmware = None
|
||
try:
|
||
app.logger.debug(f"Connecting to device {device.ip}:{device.port} using SSL: {device.use_ssl}, ssl_verify: {not device.ssl_insecure}")
|
||
api = librouteros.connect(
|
||
host=device.ip,
|
||
username=device.device_username,
|
||
password=device.device_password,
|
||
port=device.port,
|
||
timeout=15,
|
||
ssl=device.use_ssl,
|
||
ssl_verify=not device.ssl_insecure
|
||
)
|
||
app.logger.debug(f"Connection established to {device.ip}")
|
||
|
||
# Pobranie informacji o tożsamości urządzenia
|
||
identity_resp = list(api('/system/identity/print'))
|
||
app.logger.debug(f"Identity response: {identity_resp}")
|
||
if identity_resp:
|
||
identity = identity_resp[0].get('name', '')
|
||
log_entries.append(f"Identity: {identity}")
|
||
|
||
# Pobranie wersji systemu
|
||
resource_resp = list(api('/system/resource/print'))
|
||
app.logger.debug(f"Resource response: {resource_resp}")
|
||
if resource_resp:
|
||
version = resource_resp[0].get('version', '')
|
||
current_version = version
|
||
log_entries.append(f"System Version: {version}")
|
||
|
||
# Pobranie informacji o routerboard (firmware)
|
||
board_resp = list(api('/system/routerboard/print'))
|
||
app.logger.debug(f"Routerboard response: {board_resp}")
|
||
if board_resp:
|
||
board_info = board_resp[0]
|
||
# Pobieramy oddzielnie obie wersje:
|
||
current_fw = board_info.get('firmware',
|
||
board_info.get('firmware-version',
|
||
board_info.get('current-firmware', 'N/A')))
|
||
upgrade_fw = board_info.get('upgrade-firmware', 'N/A')
|
||
current_firmware = current_fw
|
||
upgrade_firmware = upgrade_fw
|
||
log_entries.append(f"Firmware: {current_fw}")
|
||
log_entries.append(f"Upgrade Firmware: {upgrade_fw}")
|
||
|
||
# Sprawdzenie dostępnych aktualizacji
|
||
log_entries.append("Checking for updates...")
|
||
list(api('/system/package/update/check-for-updates'))
|
||
for _ in range(10):
|
||
time.sleep(1)
|
||
status_resp = list(api('/system/package/update/print'))
|
||
app.logger.debug(f"Update status response: {status_resp}")
|
||
if status_resp:
|
||
status = status_resp[0].get('status', '').lower()
|
||
if 'checking' not in status:
|
||
log_entries.append(f"Update check completed. Status: {status}")
|
||
break
|
||
|
||
update_resp = list(api('/system/package/update/print'))
|
||
app.logger.debug(f"Update response: {update_resp}")
|
||
if update_resp:
|
||
for res in update_resp:
|
||
installed = res.get('installed-version', '')
|
||
latest = res.get('latest-version', '')
|
||
if latest and latest != installed:
|
||
log_entries.append(f"Updates available: {installed} -> {latest}")
|
||
update_available = True
|
||
else:
|
||
log_entries.append("No updates available.")
|
||
# Zwracamy 5-elementową krotkę
|
||
return "\n".join(log_entries), update_available, current_version, current_firmware, upgrade_firmware
|
||
except Exception as e:
|
||
app.logger.error(f"Error connecting to device {device.ip}: {e}", exc_info=True)
|
||
return f"Error: {str(e)}", False, None, None, None
|
||
|
||
|
||
def get_email_template(subject, message):
|
||
return f"""
|
||
<html>
|
||
<head>
|
||
<style>
|
||
body {{
|
||
font-family: Arial, sans-serif;
|
||
background-color: #f4f4f4;
|
||
margin: 0;
|
||
padding: 0;
|
||
}}
|
||
.container {{
|
||
max-width: 600px;
|
||
margin: 20px auto;
|
||
background-color: #ffffff;
|
||
padding: 20px;
|
||
border-radius: 5px;
|
||
box-shadow: 0 0 10px rgba(0,0,0,0.1);
|
||
}}
|
||
.header {{
|
||
background-color: #007bff;
|
||
color: #ffffff;
|
||
padding: 10px;
|
||
text-align: center;
|
||
border-radius: 5px 5px 0 0;
|
||
}}
|
||
.content {{
|
||
margin: 20px 0;
|
||
font-size: 16px;
|
||
line-height: 1.5;
|
||
}}
|
||
.footer {{
|
||
text-align: center;
|
||
font-size: 12px;
|
||
color: #777777;
|
||
border-top: 1px solid #dddddd;
|
||
padding-top: 10px;
|
||
}}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="container">
|
||
<div class="header">
|
||
<h2>{subject}</h2>
|
||
</div>
|
||
<div class="content">
|
||
<p>{message}</p>
|
||
</div>
|
||
<div class="footer">
|
||
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Update</p>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
def clean_old_changelogs():
|
||
with app.app_context():
|
||
cutoff_time = datetime.utcnow() - timedelta(days=7)
|
||
db.session.query(ChangelogEntry).filter(ChangelogEntry.timestamp < cutoff_time).delete()
|
||
db.session.commit()
|
||
|
||
def check_all_devices():
|
||
with app.app_context():
|
||
devices = Device.query.all()
|
||
for device in devices:
|
||
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
|
||
device.last_log = result
|
||
device.last_check = datetime.utcnow()
|
||
device.update_required = update_available
|
||
device.current_version = current_version
|
||
device.current_firmware = current_firmware
|
||
device.upgrade_firmware = upgrade_firmware
|
||
db.session.commit()
|
||
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
|
||
db.session.add(log_entry)
|
||
db.session.commit()
|
||
if update_available:
|
||
user = device.owner
|
||
message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację."
|
||
send_pushover_notification(user, message)
|
||
send_email_notification(user, "Aktualizacja dostępna", message)
|
||
|
||
def bytes_to_human(n):
|
||
try:
|
||
n = int(n)
|
||
except Exception:
|
||
return n
|
||
i = 0
|
||
while n >= 1024 and i < 5:
|
||
n /= 1024.0
|
||
i += 1
|
||
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
|
||
return f"{n:.2f} {units[i]}"
|
||
|
||
def clean_old_logs():
|
||
with app.app_context():
|
||
all_settings = Settings.query.all()
|
||
for setting in all_settings:
|
||
if setting.log_retention_days:
|
||
cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days)
|
||
# Usuwamy logi starsze niż cutoff dla danego użytkownika
|
||
Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete()
|
||
db.session.commit()
|
||
|
||
def parse_release_date(text):
|
||
"""
|
||
Próbuje wyłuskać datę z tekstu.
|
||
Najpierw używa regex, a w razie niepowodzenia – dateutil (jeśli dostępny).
|
||
"""
|
||
date_match = re.search(r"\((\d{4})-([A-Za-z]{3})-(\d{2})", text)
|
||
if date_match:
|
||
try:
|
||
return datetime.strptime(f"{date_match.group(1)}-{date_match.group(2)}-{date_match.group(3)}", "%Y-%b-%d")
|
||
except Exception as e:
|
||
logging.error(f"Błąd parsowania daty przy użyciu strptime: {e}")
|
||
if date_parser:
|
||
try:
|
||
return date_parser.parse(text, fuzzy=True)
|
||
except Exception as e:
|
||
logging.error(f"Błąd parsowania daty przy użyciu dateutil: {e}")
|
||
return None
|
||
|
||
def get_release_type(version_text):
|
||
"""
|
||
Określa typ wydania na podstawie numeru wersji:
|
||
- Jeśli w tekście występuje "beta" – zwraca "beta"
|
||
- Jeśli w tekście występuje "rc" – zwraca "rc"
|
||
- W przeciwnym wypadku – "stable"
|
||
"""
|
||
lower = version_text.lower()
|
||
if "beta" in lower:
|
||
return "beta"
|
||
elif "rc" in lower:
|
||
return "rc"
|
||
else:
|
||
return "stable"
|
||
|
||
@app.template_filter('format_version')
|
||
def format_version(value):
|
||
# Krok 1: Usuń wszystko w nawiasach (np. „(2025-Feb-12 11:20)”).
|
||
value = re.sub(r'\(.*?\)', '', value).strip()
|
||
|
||
# Krok 2: Spróbuj dopasować strukturę: major.minor(.patch)?(beta|rc)(num)?
|
||
# 1) major (np. 7)
|
||
# 2) minor (np. 18)
|
||
# 3) patch (np. 20)
|
||
# 4) suffix (beta|rc)
|
||
# 5) suffixnum (np. 2025)
|
||
pattern = r'^(\d+)' # grupa 1: major
|
||
pattern += r'(?:\.(\d+))?' # grupa 2: minor (opcjonalnie)
|
||
pattern += r'(?:\.(\d+))?' # grupa 3: patch (opcjonalnie)
|
||
pattern += r'(?:(beta|rc)(\d+))?'# grupa 4 i 5: beta|rc + numer (opcjonalnie)
|
||
|
||
match = re.match(pattern, value)
|
||
if not match:
|
||
# Jeśli nie uda się dopasować, zwracamy oryginał
|
||
return value
|
||
|
||
major, minor, patch, suffix, suffixnum = match.groups()
|
||
|
||
# Funkcja pomocnicza do przycinania łańcucha cyfr do maksymalnie 2 znaków
|
||
def truncate_2_digits(num_str):
|
||
return num_str[:2] if len(num_str) > 2 else num_str
|
||
|
||
# Składamy główne części wersji
|
||
result = major
|
||
if minor:
|
||
# np. "182025" → "18"
|
||
result += '.' + truncate_2_digits(minor)
|
||
if patch:
|
||
# np. "182025" → "18"
|
||
result += '.' + truncate_2_digits(patch)
|
||
|
||
if suffix:
|
||
# Dodajemy samo "beta"/"rc"
|
||
result += suffix
|
||
if suffixnum:
|
||
# 1) Najpierw sprawdźmy, czy w numerze sufiksu jest (zlepiony) rok, np. "22025" → "2" i "2025"
|
||
# Wzorzec: do 2 cyfr + "20xx", np. "14" + "2025", "2" + "2025" itd.
|
||
m_year = re.match(r'^(\d{1,2})(20\d{2})$', suffixnum)
|
||
if m_year:
|
||
# Jeśli tak, zostawiamy tylko pierwszą grupę (np. "14" z "14"+"2025")
|
||
suffixnum = m_year.group(1)
|
||
|
||
# 2) Ostatecznie przycinamy do 2 cyfr (jeśli ktoś wpisał np. "beta123")
|
||
suffixnum = truncate_2_digits(suffixnum)
|
||
|
||
result += suffixnum
|
||
|
||
return result
|
||
|
||
@app.template_global()
|
||
def bootstrap_alert_category(cat):
|
||
mapping = {
|
||
'error': 'danger',
|
||
'fail': 'danger',
|
||
'warning': 'warning',
|
||
'warn': 'warning',
|
||
'ok': 'success',
|
||
'success': 'success',
|
||
'info': 'info'
|
||
}
|
||
return mapping.get(cat.lower(), 'info')
|
||
|
||
def fetch_changelogs(force=False):
|
||
changelog_url = "https://mikrotik.com/download/changelogs"
|
||
current_date = datetime.utcnow()
|
||
|
||
def process_section(section):
|
||
a_tag = section.find("a")
|
||
if not a_tag:
|
||
return None
|
||
|
||
raw_text = a_tag.get_text(strip=True)
|
||
logging.info(f"raw_text: {raw_text}")
|
||
|
||
# Najpierw próbujemy znaleźć wersję za pomocą wzorca z "in"
|
||
match = re.search(r"in\s+(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)", raw_text)
|
||
if match:
|
||
version_text = match.group(1)
|
||
logging.info(f"Parsed version (pattern 1): {version_text}")
|
||
else:
|
||
# Jeśli nie znaleziono, próbujemy wychwycić wersję na początku łańcucha z dołączoną datą
|
||
match = re.match(r"^(\d+\.\d+(?:\.\d+)?(?:beta|rc)?\d*)(\d{4}-\d{2}-\d{2})", raw_text)
|
||
if match:
|
||
version_text = match.group(1)
|
||
logging.info(f"Parsed version (pattern 2): {version_text}")
|
||
else:
|
||
version_text = raw_text
|
||
logging.info("Brak dopasowania regex, używam raw_text jako wersji")
|
||
|
||
# Pomijamy wersje, które nie zaczynają się od "6." lub "7."
|
||
if not (version_text.startswith("6.") or version_text.startswith("7.")):
|
||
logging.info(f"Pomijam wersję {version_text} – nie jest 6.x ani 7.x")
|
||
return None
|
||
|
||
details_div = section.find("div", class_="content")
|
||
changelog_file_url = details_div.get("data-url") if details_div else None
|
||
if not changelog_file_url:
|
||
logging.warning(f"Brak URL changeloga dla wersji {version_text}")
|
||
return None
|
||
|
||
try:
|
||
changelog_response = requests.get(changelog_file_url, timeout=10)
|
||
changelog_response.raise_for_status()
|
||
changelog_lines = changelog_response.text.splitlines()
|
||
|
||
if not changelog_lines:
|
||
logging.warning(f"Pusty changelog dla wersji {version_text}")
|
||
return None
|
||
|
||
first_line = changelog_lines[0].strip()
|
||
release_date_dt = parse_release_date(first_line)
|
||
if release_date_dt is None:
|
||
logging.warning(f"Nie udało się wyłuskać daty dla wersji {version_text}, pomijam ten changelog")
|
||
return None
|
||
|
||
changelog_text = "\n".join(changelog_lines).strip()
|
||
except Exception as e:
|
||
logging.error(f"Błąd pobierania changeloga z {changelog_file_url}: {e}")
|
||
return None
|
||
|
||
# Filtrowanie według daty: dla 7.x pomijamy wersje starsze niż 1 rok, dla 6.x starsze niż 2 lata
|
||
if version_text.startswith("7.") and release_date_dt < (current_date - timedelta(days=365)):
|
||
logging.info(f"Pomijam wersję {version_text} - starsza niż 1 rok")
|
||
return None
|
||
if version_text.startswith("6.") and release_date_dt < (current_date - timedelta(days=730)):
|
||
logging.info(f"Pomijam wersję {version_text} - starsza niż 2 lata")
|
||
return None
|
||
|
||
release_type = get_release_type(version_text)
|
||
return {
|
||
"version": version_text,
|
||
"details": changelog_text,
|
||
"category": "6.x" if version_text.startswith("6.") else "7.x",
|
||
"timestamp": release_date_dt,
|
||
"release_type": release_type
|
||
}
|
||
|
||
try:
|
||
logging.info(f"Pobieranie changelogów z {changelog_url}...")
|
||
response = requests.get(changelog_url, timeout=30)
|
||
response.raise_for_status()
|
||
soup = BeautifulSoup(response.text, "html.parser")
|
||
changelog_sections = soup.find_all("li", class_="accordion-navigation")
|
||
logging.info(f"Znaleziono {len(changelog_sections)} sekcji changelogów.")
|
||
|
||
new_entries = 0
|
||
results = []
|
||
# Używamy równoległego przetwarzania sekcji
|
||
with ThreadPoolExecutor(max_workers=8) as executor:
|
||
futures = [executor.submit(process_section, section) for section in changelog_sections]
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
if result is not None:
|
||
results.append(result)
|
||
|
||
# Dodajemy wyniki do bazy danych
|
||
for entry in results:
|
||
new_entry = ChangelogEntry(
|
||
version=entry["version"],
|
||
details=entry["details"],
|
||
category=entry["category"],
|
||
timestamp=entry["timestamp"],
|
||
release_type=entry["release_type"]
|
||
)
|
||
db.session.add(new_entry)
|
||
new_entries += 1
|
||
|
||
db.session.commit()
|
||
logging.info(f"Nowe wpisy dodane: {new_entries}")
|
||
except Exception as e:
|
||
logging.error(f"Błąd podczas pobierania changelogów: {e}")
|
||
|
||
|
||
|
||
def detect_anomalies():
|
||
with app.app_context():
|
||
# Ustal okres analizy, np. ostatnie 24 godziny
|
||
cutoff = datetime.utcnow() - timedelta(hours=24)
|
||
# Pobierz logi użytkowników (lub logi globalne) z tego okresu
|
||
logs = Log.query.filter(Log.timestamp >= cutoff).all()
|
||
|
||
# Przykładowa analiza: wykryj logi zawierające określone słowa kluczowe
|
||
error_keywords = ["błąd", "error", "niepowodzenie", "exception"]
|
||
detected = {}
|
||
for log in logs:
|
||
lower_msg = log.message.lower()
|
||
if any(keyword in lower_msg for keyword in error_keywords):
|
||
detected.setdefault(log.device_id, []).append(log.message)
|
||
|
||
# Dla każdego urządzenia, jeżeli wykryto więcej niż określony próg błędów, zapisz anomalię
|
||
for device_id, messages in detected.items():
|
||
if len(messages) >= 3: # przykładowy próg
|
||
description = f"Wykryto {len(messages)} błędne logi w ciągu ostatnich 24 godzin. Przykłady: " + "; ".join(messages[:3])
|
||
anomaly = Anomaly(device_id=device_id, description=description)
|
||
db.session.add(anomaly)
|
||
# Możesz również wysłać powiadomienie, np. e-mail lub Pushover
|
||
db.session.commit()
|
||
|
||
# Harmonogram sprawdzania aktualizacji – wykorzystujemy APScheduler
|
||
scheduler = BackgroundScheduler()
|
||
|
||
# Inicjalizacja bazy i schedulera
|
||
with app.app_context():
|
||
db.create_all() # lub już wcześniej utworzona baza
|
||
# Pobranie globalnych ustawień – zakładamy, że Settings.query.first() zwróci ustawienia globalne
|
||
global_settings = Settings.query.first()
|
||
if global_settings and global_settings.check_interval:
|
||
interval = global_settings.check_interval
|
||
else:
|
||
interval = 60
|
||
|
||
scheduler.add_job(
|
||
func=check_all_devices,
|
||
trigger="interval",
|
||
seconds=interval,
|
||
id="check_all_devices",
|
||
max_instances=1
|
||
)
|
||
scheduler.add_job(
|
||
func=detect_anomalies,
|
||
trigger="interval",
|
||
minutes=60,
|
||
id="detect_anomalies",
|
||
max_instances=1
|
||
)
|
||
scheduler.add_job(
|
||
func=clean_old_changelogs,
|
||
trigger="interval",
|
||
days=1,
|
||
id="clean_changelogs",
|
||
max_instances=1
|
||
)
|
||
scheduler.add_job(
|
||
func=lambda: fetch_changelogs(force=False),
|
||
trigger="interval",
|
||
days=1,
|
||
id="daily_changelog_fetch",
|
||
max_instances=1
|
||
)
|
||
|
||
app.logger.debug(f"Scheduler initialized with interval: {interval} seconds")
|
||
|
||
scheduler.start()
|
||
|
||
# ROUTY APLIKACJI
|
||
@app.route('/')
|
||
def index():
|
||
if current_user.is_authenticated:
|
||
return redirect(url_for('dashboard'))
|
||
return render_template('index.html')
|
||
|
||
@app.route('/dashboard')
|
||
@login_required
|
||
def dashboard():
|
||
devices_count = Device.query.count()
|
||
pending_updates_count = Device.query.filter_by(update_required=True).count()
|
||
logs_count = Log.query.count()
|
||
users_count = User.query.count()
|
||
anomalies_count = Anomaly.query.filter_by(resolved=False).count()
|
||
update_history_count = UpdateHistory.query.count()
|
||
recent_logs = Log.query.order_by(Log.timestamp.desc()).limit(5).all()
|
||
|
||
# Pobieramy najnowsze wersje stabilne dla 7.x i 6.x
|
||
latest_version_7 = ChangelogEntry.query.filter_by(category="7.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
|
||
latest_version_6 = ChangelogEntry.query.filter_by(category="6.x", release_type="stable").order_by(ChangelogEntry.timestamp.desc()).first()
|
||
|
||
return render_template('dashboard.html',
|
||
devices_count=devices_count,
|
||
pending_updates_count=pending_updates_count,
|
||
logs_count=logs_count,
|
||
users_count=users_count,
|
||
anomalies_count=anomalies_count,
|
||
update_history_count=update_history_count,
|
||
recent_logs=recent_logs,
|
||
latest_version_7=latest_version_7,
|
||
latest_version_6=latest_version_6)
|
||
|
||
# Rejestracja
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
email = request.form['email']
|
||
password = request.form['password']
|
||
# Prosta walidacja – warto rozszerzyć
|
||
if User.query.filter_by(username=username).first():
|
||
flash("Użytkownik o tej nazwie już istnieje.", "error")
|
||
return redirect(url_for('register'))
|
||
new_user = User(username=username, email=email)
|
||
new_user.set_password(password)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
# Utwórz domyślne ustawienia dla użytkownika
|
||
default_settings = Settings(user_id=new_user.id, check_interval=60)
|
||
db.session.add(default_settings)
|
||
db.session.commit()
|
||
flash("Rejestracja zakończona. Możesz się zalogować.", "success")
|
||
return redirect(url_for('login'))
|
||
return render_template('register.html')
|
||
|
||
# Logowanie
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
user = User.query.filter_by(username=username).first()
|
||
if user and user.check_password(password):
|
||
login_user(user)
|
||
flash("Zalogowano pomyślnie.", "success")
|
||
return redirect(url_for('dashboard'))
|
||
else:
|
||
flash("Nieprawidłowa nazwa użytkownika lub hasło.", "error")
|
||
return render_template('login.html')
|
||
|
||
# Wylogowanie
|
||
@app.route('/logout')
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
flash("Wylogowano.", "success")
|
||
return redirect(url_for('index'))
|
||
|
||
# Lista urządzeń użytkownika
|
||
@app.route('/devices')
|
||
@login_required
|
||
def devices():
|
||
user_devices = Device.query.filter_by(user_id=current_user.id).all()
|
||
return render_template('devices.html', devices=user_devices)
|
||
|
||
# Dodawanie urządzenia
|
||
@app.route('/device/add', methods=['GET', 'POST'])
|
||
@login_required
|
||
def add_device():
|
||
if request.method == 'POST':
|
||
name = request.form.get('name')
|
||
ip = request.form['ip']
|
||
port = int(request.form.get('port', 8728))
|
||
device_username = request.form['device_username']
|
||
device_password = request.form['device_password']
|
||
use_ssl = bool(request.form.get('use_ssl'))
|
||
ssl_insecure = bool(request.form.get('ssl_insecure'))
|
||
new_device = Device(
|
||
name=name,
|
||
ip=ip,
|
||
port=port,
|
||
device_username=device_username,
|
||
device_password=device_password,
|
||
use_ssl=use_ssl,
|
||
ssl_insecure=ssl_insecure,
|
||
user_id=current_user.id
|
||
)
|
||
db.session.add(new_device)
|
||
db.session.commit()
|
||
flash("Urządzenie dodane.", "success")
|
||
return redirect(url_for('devices'))
|
||
return render_template('add_device.html')
|
||
|
||
# Szczegóły urządzenia
|
||
@app.route('/device/<int:device_id>')
|
||
@login_required
|
||
def device_detail(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
resource_data = {}
|
||
try:
|
||
api = librouteros.connect(
|
||
host=device.ip,
|
||
username=device.device_username,
|
||
password=device.device_password,
|
||
port=device.port,
|
||
timeout=15
|
||
)
|
||
res_resp = list(api('/system/resource/print'))
|
||
if res_resp:
|
||
resource_data = res_resp[0]
|
||
# Konwersja wartości pamięci i dysku na czytelny format
|
||
if 'free-memory' in resource_data:
|
||
resource_data['free-memory'] = bytes_to_human(resource_data['free-memory'])
|
||
if 'total-memory' in resource_data:
|
||
resource_data['total-memory'] = bytes_to_human(resource_data['total-memory'])
|
||
if 'free-hdd-space' in resource_data:
|
||
resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space'])
|
||
except Exception as e:
|
||
resource_data = {'error': str(e)}
|
||
return render_template('device_detail.html', device=device, resource=resource_data)
|
||
|
||
# Strona z logami
|
||
@app.route('/logs')
|
||
@login_required
|
||
def logs():
|
||
user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all()
|
||
return render_template('logs.html', logs=user_logs)
|
||
|
||
# Strona ustawień powiadomień
|
||
@app.route('/settings', methods=['GET', 'POST'])
|
||
@login_required
|
||
def settings():
|
||
user_settings = current_user.settings
|
||
if request.method == 'POST':
|
||
# Aktualizacja ustawień Pushover
|
||
user_settings.pushover_user_key = request.form.get('pushover_user_key')
|
||
user_settings.pushover_token = request.form.get('pushover_token')
|
||
user_settings.pushover_enabled = bool(request.form.get('pushover_enabled'))
|
||
# Aktualizacja ustawień SMTP
|
||
user_settings.smtp_server = request.form.get('smtp_server')
|
||
smtp_port = request.form.get('smtp_port')
|
||
user_settings.smtp_port = int(smtp_port) if smtp_port else None
|
||
user_settings.smtp_username = request.form.get('smtp_username')
|
||
user_settings.smtp_password = request.form.get('smtp_password')
|
||
user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled'))
|
||
# Aktualizacja adresu e-mail odbiorcy (może być inny niż email z profilu)
|
||
user_settings.recipient_email = request.form.get('recipient_email')
|
||
# Aktualizacja interwału sprawdzania
|
||
interval = request.form.get('check_interval')
|
||
user_settings.check_interval = int(interval) if interval else 60
|
||
# Aktualizacja retencji logów
|
||
retention = request.form.get('log_retention_days')
|
||
user_settings.log_retention_days = int(retention) if retention else 30
|
||
db.session.commit()
|
||
try:
|
||
scheduler.reschedule_job("check_all_devices", trigger="interval", seconds=user_settings.check_interval)
|
||
app.logger.debug(f"Scheduler rescheduled with new interval: {user_settings.check_interval} seconds")
|
||
except Exception as e:
|
||
app.logger.error(f"Error rescheduling job: {e}")
|
||
|
||
flash("Ustawienia zapisane.", "success")
|
||
return redirect(url_for('settings'))
|
||
return render_template('settings.html', settings=user_settings)
|
||
|
||
@app.route('/device/<int:device_id>/edit', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_device(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
if request.method == 'POST':
|
||
device.name = request.form.get('name', device.name)
|
||
device.ip = request.form.get('ip', device.ip)
|
||
device.port = int(request.form.get('port', device.port or 8728))
|
||
device.device_username = request.form.get('device_username', device.device_username)
|
||
device.device_password = request.form.get('device_password', device.device_password)
|
||
device.branch = request.form.get('branch', device.branch or 'stable')
|
||
device.use_ssl = bool(request.form.get('use_ssl'))
|
||
device.ssl_insecure = bool(request.form.get('ssl_insecure'))
|
||
db.session.commit()
|
||
flash("Urządzenie zaktualizowane.", "success")
|
||
return redirect(url_for('devices'))
|
||
return render_template('edit_device.html', device=device)
|
||
|
||
@app.route('/device/<int:device_id>/force_check')
|
||
@login_required
|
||
def force_check(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
|
||
device.last_log = result
|
||
device.last_check = datetime.utcnow()
|
||
device.update_required = update_available
|
||
device.current_version = current_version
|
||
device.current_firmware = current_firmware
|
||
device.upgrade_firmware = upgrade_firmware
|
||
db.session.commit()
|
||
flash("Sprawdzenie urządzenia zakończone.", "success")
|
||
return redirect(url_for('devices'))
|
||
|
||
@app.route('/device/<int:device_id>/update', methods=['POST'])
|
||
@login_required
|
||
def update_device(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
try:
|
||
app.logger.debug(f"Initiating system update for device {device.ip}")
|
||
api = librouteros.connect(
|
||
host=device.ip,
|
||
username=device.device_username,
|
||
password=device.device_password,
|
||
port=device.port,
|
||
timeout=15,
|
||
ssl=device.use_ssl,
|
||
ssl_verify=not device.ssl_insecure
|
||
)
|
||
app.logger.debug("Connection established, starting update command")
|
||
if device.branch == 'stable':
|
||
list(api('/system/package/update/install'))
|
||
elif device.branch == 'dev':
|
||
list(api('/system/package/update/install', branch='dev'))
|
||
elif device.branch == 'beta':
|
||
list(api('/system/package/update/install', branch='beta'))
|
||
else:
|
||
list(api('/system/package/update/install'))
|
||
|
||
history = UpdateHistory(
|
||
device_id=device.id,
|
||
update_type="system",
|
||
details=f"Aktualizacja systemu rozpoczęta na urządzeniu {device.name or device.ip}."
|
||
)
|
||
db.session.add(history)
|
||
db.session.commit()
|
||
|
||
flash("Aktualizacja systemu została rozpoczęta.", "success")
|
||
app.logger.debug("System update command executed successfully")
|
||
except Exception as e:
|
||
app.logger.error(f"Błąd podczas aktualizacji urządzenia {device.ip}: {e}", exc_info=True)
|
||
flash(f"Błąd podczas aktualizacji: {e}", "error")
|
||
return redirect(url_for('device_detail', device_id=device.id))
|
||
|
||
@app.route('/device/<int:device_id>/update_firmware', methods=['POST'])
|
||
@login_required
|
||
def update_firmware(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
try:
|
||
api = librouteros.connect(
|
||
host=device.ip,
|
||
username=device.device_username,
|
||
password=device.device_password,
|
||
port=device.port,
|
||
timeout=15
|
||
)
|
||
# Przykładowa komenda aktualizacji firmware
|
||
list(api('/system/routerboard/upgrade'))
|
||
|
||
history = UpdateHistory(
|
||
device_id=device.id,
|
||
update_type="firmware",
|
||
details=f"Aktualizacja firmware rozpoczęta na urządzeniu {device.name or device.ip}."
|
||
)
|
||
db.session.add(history)
|
||
db.session.commit()
|
||
|
||
flash("Aktualizacja firmware została rozpoczęta.", "success")
|
||
except Exception as e:
|
||
flash(f"Błąd podczas aktualizacji firmware: {e}", "error")
|
||
return redirect(url_for('device_detail', device_id=device.id))
|
||
|
||
@app.route('/test_pushover', methods=['POST'])
|
||
@login_required
|
||
def test_pushover():
|
||
message = "To jest testowe powiadomienie Pushover z RouterOS Update."
|
||
send_pushover_notification(current_user, message)
|
||
flash("Test powiadomienia Pushover wysłany.", "success")
|
||
return redirect(url_for('settings'))
|
||
|
||
@app.route('/test_email', methods=['POST'])
|
||
@login_required
|
||
def test_email():
|
||
subject = "Testowy E-mail z RouterOS Update"
|
||
message = "To jest testowa wiadomość e-mail wysłana z RouterOS Update."
|
||
send_email_notification(current_user, subject, message)
|
||
flash("Testowy e-mail wysłany.", "success")
|
||
return redirect(url_for('settings'))
|
||
|
||
@app.route('/change_password', methods=['GET', 'POST'])
|
||
@login_required
|
||
def change_password():
|
||
if request.method == 'POST':
|
||
old_password = request.form.get('old_password')
|
||
new_password = request.form.get('new_password')
|
||
confirm_password = request.form.get('confirm_password')
|
||
if not current_user.check_password(old_password):
|
||
flash("Stare hasło jest nieprawidłowe.", "error")
|
||
return redirect(url_for('reset_password'))
|
||
if new_password != confirm_password:
|
||
flash("Nowe hasło i potwierdzenie nie są zgodne.", "warning")
|
||
return redirect(url_for('reset_password'))
|
||
current_user.set_password(new_password)
|
||
db.session.commit()
|
||
flash("Hasło zostało zresetowane.", "success")
|
||
return redirect(url_for('reset_password'))
|
||
return render_template('change_password.html')
|
||
|
||
@app.route('/logs/clean', methods=['POST'])
|
||
@login_required
|
||
def clean_logs():
|
||
days = request.form.get('days')
|
||
if not days:
|
||
flash("Podaj liczbę dni.", "warning")
|
||
return redirect(url_for('logs'))
|
||
try:
|
||
days = int(days)
|
||
except ValueError:
|
||
flash("Niepoprawna wartość dni.", "warning")
|
||
return redirect(url_for('logs'))
|
||
cutoff = datetime.utcnow() - timedelta(days=days)
|
||
num_deleted = Log.query.filter(Log.user_id == current_user.id, Log.timestamp < cutoff).delete()
|
||
db.session.commit()
|
||
flash(f"Usunięto {num_deleted} logów starszych niż {days} dni.", "success")
|
||
return redirect(url_for('logs'))
|
||
|
||
@app.route('/update_history')
|
||
@login_required
|
||
def update_history():
|
||
histories = UpdateHistory.query.join(Device).filter(Device.user_id == current_user.id).order_by(UpdateHistory.timestamp.desc()).all()
|
||
return render_template('update_history.html', histories=histories)
|
||
|
||
@app.route('/anomalies')
|
||
@login_required
|
||
def anomalies():
|
||
anomalies = Anomaly.query.join(Device).filter(Device.user_id == current_user.id).order_by(Anomaly.timestamp.desc()).all()
|
||
return render_template('anomalies.html', anomalies=anomalies)
|
||
|
||
@app.route('/devices/update_selected', methods=['POST'])
|
||
@login_required
|
||
def update_selected_devices():
|
||
selected_ids = request.form.getlist('selected_devices')
|
||
if not selected_ids:
|
||
flash("Nie wybrano żadnych urządzeń.", "error")
|
||
return redirect(url_for('devices'))
|
||
for device_id in selected_ids:
|
||
device = Device.query.get(device_id)
|
||
if device and device.user_id == current_user.id:
|
||
result, update_available, current_version, current_firmware, upgrade_firmware = check_device_update(device)
|
||
device.last_log = result
|
||
device.last_check = datetime.utcnow()
|
||
device.update_required = update_available
|
||
device.current_version = current_version
|
||
device.current_firmware = current_firmware
|
||
device.upgrade_firmware = upgrade_firmware
|
||
db.session.commit()
|
||
log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
|
||
db.session.add(log_entry)
|
||
db.session.commit()
|
||
flash("Wybrane urządzenia zostały zaktualizowane.", "success")
|
||
return redirect(url_for('devices'))
|
||
|
||
@app.route('/routeros_changelog')
|
||
@login_required
|
||
def routeros_changelog():
|
||
channel = request.args.get('channel', 'stable') # "stable", "rc" lub "beta"
|
||
series = request.args.get('series', '7.x') # "7.x" lub "6.x"
|
||
selected_version = request.args.get('version')
|
||
|
||
# Pobieramy wszystkie wpisy dla danego kanału i serii, posortowane malejąco wg daty
|
||
entries = ChangelogEntry.query.filter_by(release_type=channel, category=series).order_by(ChangelogEntry.timestamp.desc()).all()
|
||
|
||
if selected_version:
|
||
selected_entry = ChangelogEntry.query.filter_by(version=selected_version, release_type=channel, category=series).first()
|
||
else:
|
||
selected_entry = entries[0] if entries else None
|
||
|
||
return render_template(
|
||
'routeros_changelog_tabs.html',
|
||
channel=channel,
|
||
series=series,
|
||
entries=entries,
|
||
selected_entry=selected_entry
|
||
)
|
||
|
||
@app.route('/force_fetch_changelogs')
|
||
@login_required
|
||
def force_fetch_changelogs():
|
||
with app.app_context():
|
||
# Usuwamy wszystkie stare wpisy
|
||
db.session.query(ChangelogEntry).delete()
|
||
db.session.commit()
|
||
|
||
# Pobieramy changelogi od nowa
|
||
fetch_changelogs(force=True)
|
||
|
||
flash("Changelog został całkowicie odświeżony.", "success")
|
||
return redirect(url_for('routeros_changelog'))
|
||
|
||
@app.route('/device/<int:device_id>/restart', methods=['POST'])
|
||
@login_required
|
||
def restart_device(device_id):
|
||
device = Device.query.get_or_404(device_id)
|
||
if device.user_id != current_user.id:
|
||
flash("Brak dostępu.", "error")
|
||
return redirect(url_for('devices'))
|
||
try:
|
||
api = librouteros.connect(
|
||
host=device.ip,
|
||
username=device.device_username,
|
||
password=device.device_password,
|
||
port=device.port,
|
||
timeout=15
|
||
)
|
||
# Wysyłamy komendę reboot
|
||
list(api('/system/reboot'))
|
||
|
||
flash("Komenda reboot została wysłana.", "success")
|
||
except Exception as e:
|
||
flash(f"Błąd podczas wysyłania komendy reboot: {e}", "error")
|
||
return ('', 204) # Zwracamy odpowiedź bez treści dla żądania AJAX
|
||
|
||
# Zamknięcie harmonogramu przy zatrzymaniu aplikacji
|
||
atexit.register(lambda: scheduler.shutdown())
|
||
|
||
if __name__ == '__main__':
|
||
scheduler.add_job(func=clean_old_logs, trigger="interval", days=1)
|
||
app.run(host='0.0.0.0', port=5582, use_reloader=False, debug=True) |