Mateusz Gruszczyński 04c1e6d49a new options
2025-02-24 10:18:52 +01:00

667 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# app.py
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
import librouteros
import threading
import time
import requests
import smtplib
from email.mime.text import MIMEText
from flask import current_app as app
from flask import render_template
import atexit
from datetime import timedelta
# Konfiguracja aplikacji
app = Flask(__name__)
app.config['SECRET_KEY'] = 'twoj-sekret-klucz'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# Konfiguracja Flask-Login
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# MODELE BAZY DANYCH
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
devices = db.relationship('Device', backref='owner', lazy=True)
settings = db.relationship('Settings', uselist=False, backref='user')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Device(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120))
ip = db.Column(db.String(120), nullable=False)
port = db.Column(db.Integer, default=8728)
device_username = db.Column(db.String(120), nullable=False)
device_password = db.Column(db.String(120), nullable=False)
branch = db.Column(db.String(20), default="stable")
update_required = db.Column(db.Boolean, default=False)
last_check = db.Column(db.DateTime)
last_log = db.Column(db.Text)
current_version = db.Column(db.String(50))
current_firmware = db.Column(db.String(50))
use_ssl = db.Column(db.Boolean, default=False) # Czy używać SSL?
ssl_insecure = db.Column(db.Boolean, default=False) # Jeśli True nie weryfikować certyfikatu SSL
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
class Settings(db.Model):
id = db.Column(db.Integer, primary_key=True)
pushover_user_key = db.Column(db.String(255))
pushover_token = db.Column(db.String(255))
pushover_enabled = db.Column(db.Boolean, default=False)
smtp_server = db.Column(db.String(255))
smtp_port = db.Column(db.Integer)
smtp_username = db.Column(db.String(255))
smtp_password = db.Column(db.String(255))
email_notifications_enabled = db.Column(db.Boolean, default=False)
check_interval = db.Column(db.Integer, default=60)
log_retention_days = db.Column(db.Integer, default=30)
recipient_email = db.Column(db.String(120))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False, unique=True)
class Log(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
message = db.Column(db.Text)
device_id = db.Column(db.Integer, db.ForeignKey('device.id'))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
device = db.relationship('Device', backref='logs')
# Inicjalizacja bazy (utworzyć bazę przy pierwszym uruchomieniu)
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# FUNKCJE POWIADOMIEŃ
def send_pushover_notification(user, message):
# Sprawdzamy, czy użytkownik posiada ustawienia oraz wymagane pola
if not user.settings or not user.settings.pushover_enabled or not user.settings.pushover_user_key or not user.settings.pushover_token:
return
data = {
"token": user.settings.pushover_token, # Używamy pushover_token z ustawień
"user": user.settings.pushover_user_key,
"message": message
}
try:
r = requests.post("https://api.pushover.net/1/messages.json", data=data)
# Możesz dodać logowanie odpowiedzi, jeśli potrzebne
except Exception as e:
print("Błąd przy wysyłaniu powiadomienia Pushover:", e)
def send_email_notification(user, subject, message):
if not user.settings or not user.settings.email_notifications_enabled or not user.settings.smtp_server:
return
try:
html_body = get_email_template(subject, message)
msg = MIMEText(html_body, 'html')
msg["Subject"] = subject
msg["From"] = user.settings.smtp_username
# Używamy adresu z ustawień, jeśli został podany, lub domyślnie adresu z profilu użytkownika
to_email = user.settings.recipient_email if user.settings.recipient_email else user.email
msg["To"] = to_email
s = smtplib.SMTP(user.settings.smtp_server, user.settings.smtp_port)
s.starttls()
s.login(user.settings.smtp_username, user.settings.smtp_password)
s.sendmail(user.settings.smtp_username, [to_email], msg.as_string())
s.quit()
app.logger.debug("E-mail wysłany pomyślnie")
except Exception as e:
app.logger.error(f"Błąd przy wysyłaniu powiadomienia e-mail: {e}", exc_info=True)
# FUNKCJA SPRAWDZAJĄCA AKTUALIZACJE URZĄDZENIA
def check_device_update(device):
log_entries = []
update_available = False
current_version = None
current_firmware = None
try:
app.logger.debug(f"Connecting to device {device.ip}:{device.port} using SSL: {device.use_ssl}, ssl_verify: {not device.ssl_insecure}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug(f"Connection established to {device.ip}")
# Pobranie informacji o tożsamości urządzenia
identity_resp = list(api('/system/identity/print'))
app.logger.debug(f"Identity response: {identity_resp}")
if identity_resp:
identity = identity_resp[0].get('name', '')
log_entries.append(f"Identity: {identity}")
# Pobranie wersji systemu
resource_resp = list(api('/system/resource/print'))
app.logger.debug(f"Resource response: {resource_resp}")
if resource_resp:
version = resource_resp[0].get('version', '')
current_version = version
log_entries.append(f"System Version: {version}")
# Pobranie informacji o routerboard (firmware)
board_resp = list(api('/system/routerboard/print'))
app.logger.debug(f"Routerboard response: {board_resp}")
if board_resp:
board_info = board_resp[0]
# Próba odczytania firmware z kilku możliwych kluczy
firmware = board_info.get('firmware', board_info.get('firmware-version', board_info.get('upgrade-firmware', 'N/A')))
current_firmware = firmware
log_entries.append(f"Firmware: {firmware}")
# Sprawdzenie dostępnych aktualizacji
log_entries.append("Checking for updates...")
list(api('/system/package/update/check-for-updates'))
for _ in range(10):
time.sleep(1)
status_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update status response: {status_resp}")
if status_resp:
status = status_resp[0].get('status', '').lower()
if 'checking' not in status:
log_entries.append(f"Update check completed. Status: {status}")
break
update_resp = list(api('/system/package/update/print'))
app.logger.debug(f"Update response: {update_resp}")
if update_resp:
for res in update_resp:
installed = res.get('installed-version', '')
latest = res.get('latest-version', '')
if latest and latest != installed:
log_entries.append(f"Updates available: {installed} -> {latest}")
update_available = True
else:
log_entries.append("No updates available.")
return "\n".join(log_entries), update_available, current_version, current_firmware
except Exception as e:
app.logger.error(f"Error connecting to device {device.ip}: {e}", exc_info=True)
return f"Error: {str(e)}", False, None, None
def get_email_template(subject, message):
return f"""
<html>
<head>
<style>
body {{
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}}
.container {{
max-width: 600px;
margin: 20px auto;
background-color: #ffffff;
padding: 20px;
border-radius: 5px;
box-shadow: 0 0 10px rgba(0,0,0,0.1);
}}
.header {{
background-color: #007bff;
color: #ffffff;
padding: 10px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
margin: 20px 0;
font-size: 16px;
line-height: 1.5;
}}
.footer {{
text-align: center;
font-size: 12px;
color: #777777;
border-top: 1px solid #dddddd;
padding-top: 10px;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>{subject}</h2>
</div>
<div class="content">
<p>{message}</p>
</div>
<div class="footer">
<p>Wiadomość wygenerowana automatycznie przez system RouterOS Backup.</p>
</div>
</div>
</body>
</html>
"""
def check_all_devices():
with app.app_context():
devices = Device.query.all()
for device in devices:
result, update_available, current_version, current_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
db.session.commit()
# Zapis do tabeli logów
#log_entry = Log(message=result, device_id=device.id, user_id=device.user_id)
log_message = f"Urządzenie {device.name or device.ip} - {result}"
log_entry = Log(message=log_message, device_id=device.id, user_id=device.user_id)
db.session.add(log_entry)
db.session.commit()
# Powiadomienia, jeśli dostępna aktualizacja
if update_available:
user = device.owner
message = f"Urządzenie {device.name or device.ip} ma dostępną aktualizację."
send_pushover_notification(user, message)
send_email_notification(user, "Aktualizacja dostępna", message)
def bytes_to_human(n):
try:
n = int(n)
except Exception:
return n
i = 0
while n >= 1024 and i < 5:
n /= 1024.0
i += 1
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
return f"{n:.2f} {units[i]}"
def clean_old_logs():
with app.app_context():
all_settings = Settings.query.all()
for setting in all_settings:
if setting.log_retention_days:
cutoff = datetime.utcnow() - timedelta(days=setting.log_retention_days)
# Usuwamy logi starsze niż cutoff dla danego użytkownika
Log.query.filter(Log.user_id == setting.user_id, Log.timestamp < cutoff).delete()
db.session.commit()
# Harmonogram sprawdzania aktualizacji wykorzystujemy APScheduler
scheduler = BackgroundScheduler()
# Inicjalizacja bazy i schedulera
with app.app_context():
db.create_all() # lub już wcześniej utworzona baza
# Pobranie globalnych ustawień zakładamy, że Settings.query.first() zwróci ustawienia globalne
global_settings = Settings.query.first()
if global_settings and global_settings.check_interval:
interval = global_settings.check_interval
else:
interval = 60
scheduler.add_job(
func=check_all_devices,
trigger="interval",
seconds=interval,
id="check_all_devices",
max_instances=1
)
app.logger.debug(f"Scheduler initialized with interval: {interval} seconds")
scheduler.start()
# ROUTY APLIKACJI
@app.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/dashboard')
@login_required
def dashboard():
devices_count = Device.query.count()
pending_updates_count = Device.query.filter_by(update_required=True).count()
logs_count = Log.query.count()
users_count = User.query.count()
return render_template('dashboard.html',
devices_count=devices_count,
pending_updates_count=pending_updates_count,
logs_count=logs_count,
users_count=users_count)
# Rejestracja
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
# Prosta walidacja warto rozszerzyć
if User.query.filter_by(username=username).first():
flash("Użytkownik o tej nazwie już istnieje.")
return redirect(url_for('register'))
new_user = User(username=username, email=email)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
# Utwórz domyślne ustawienia dla użytkownika
default_settings = Settings(user_id=new_user.id, check_interval=60)
db.session.add(default_settings)
db.session.commit()
flash("Rejestracja zakończona. Możesz się zalogować.")
return redirect(url_for('login'))
return render_template('register.html')
# Logowanie
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash("Zalogowano pomyślnie.")
return redirect(url_for('dashboard'))
else:
flash("Nieprawidłowa nazwa użytkownika lub hasło.")
return render_template('login.html')
# Wylogowanie
@app.route('/logout')
@login_required
def logout():
logout_user()
flash("Wylogowano.")
return redirect(url_for('index'))
# Lista urządzeń użytkownika
@app.route('/devices')
@login_required
def devices():
user_devices = Device.query.filter_by(user_id=current_user.id).all()
return render_template('devices.html', devices=user_devices)
# Dodawanie urządzenia
@app.route('/device/add', methods=['GET', 'POST'])
@login_required
def add_device():
if request.method == 'POST':
name = request.form.get('name')
ip = request.form['ip']
port = int(request.form.get('port', 8728))
device_username = request.form['device_username']
device_password = request.form['device_password']
use_ssl = bool(request.form.get('use_ssl'))
ssl_insecure = bool(request.form.get('ssl_insecure'))
new_device = Device(
name=name,
ip=ip,
port=port,
device_username=device_username,
device_password=device_password,
use_ssl=use_ssl,
ssl_insecure=ssl_insecure,
user_id=current_user.id
)
db.session.add(new_device)
db.session.commit()
flash("Urządzenie dodane.")
return redirect(url_for('devices'))
return render_template('add_device.html')
# Szczegóły urządzenia
@app.route('/device/<int:device_id>')
@login_required
def device_detail(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
resource_data = {}
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
res_resp = list(api('/system/resource/print'))
if res_resp:
resource_data = res_resp[0]
# Konwersja wartości pamięci i dysku na czytelny format
if 'free-memory' in resource_data:
resource_data['free-memory'] = bytes_to_human(resource_data['free-memory'])
if 'total-memory' in resource_data:
resource_data['total-memory'] = bytes_to_human(resource_data['total-memory'])
if 'free-hdd-space' in resource_data:
resource_data['free-hdd-space'] = bytes_to_human(resource_data['free-hdd-space'])
except Exception as e:
resource_data = {'error': str(e)}
return render_template('device_detail.html', device=device, resource=resource_data)
# Strona z logami
@app.route('/logs')
@login_required
def logs():
user_logs = Log.query.filter_by(user_id=current_user.id).order_by(Log.timestamp.desc()).all()
return render_template('logs.html', logs=user_logs)
# Strona ustawień powiadomień
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
user_settings = current_user.settings
if request.method == 'POST':
# Aktualizacja ustawień Pushover
user_settings.pushover_user_key = request.form.get('pushover_user_key')
user_settings.pushover_token = request.form.get('pushover_token')
user_settings.pushover_enabled = bool(request.form.get('pushover_enabled'))
# Aktualizacja ustawień SMTP
user_settings.smtp_server = request.form.get('smtp_server')
smtp_port = request.form.get('smtp_port')
user_settings.smtp_port = int(smtp_port) if smtp_port else None
user_settings.smtp_username = request.form.get('smtp_username')
user_settings.smtp_password = request.form.get('smtp_password')
user_settings.email_notifications_enabled = bool(request.form.get('email_notifications_enabled'))
# Aktualizacja adresu e-mail odbiorcy (może być inny niż email z profilu)
user_settings.recipient_email = request.form.get('recipient_email')
# Aktualizacja interwału sprawdzania
interval = request.form.get('check_interval')
user_settings.check_interval = int(interval) if interval else 60
# Aktualizacja retencji logów
retention = request.form.get('log_retention_days')
user_settings.log_retention_days = int(retention) if retention else 30
db.session.commit()
try:
scheduler.reschedule_job("check_all_devices", trigger="interval", seconds=user_settings.check_interval)
app.logger.debug(f"Scheduler rescheduled with new interval: {user_settings.check_interval} seconds")
except Exception as e:
app.logger.error(f"Error rescheduling job: {e}")
flash("Ustawienia zapisane.")
return redirect(url_for('settings'))
return render_template('settings.html', settings=user_settings)
@app.route('/device/<int:device_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
if request.method == 'POST':
device.name = request.form.get('name', device.name)
device.ip = request.form.get('ip', device.ip)
device.port = int(request.form.get('port', device.port or 8728))
device.device_username = request.form.get('device_username', device.device_username)
device.device_password = request.form.get('device_password', device.device_password)
device.branch = request.form.get('branch', device.branch or 'stable')
device.use_ssl = bool(request.form.get('use_ssl'))
device.ssl_insecure = bool(request.form.get('ssl_insecure'))
db.session.commit()
flash("Urządzenie zaktualizowane.")
return redirect(url_for('devices'))
return render_template('edit_device.html', device=device)
@app.route('/device/<int:device_id>/force_check')
@login_required
def force_check(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
result, update_available, current_version, current_firmware = check_device_update(device)
device.last_log = result
device.last_check = datetime.utcnow()
device.update_required = update_available
device.current_version = current_version
device.current_firmware = current_firmware
db.session.commit()
flash("Sprawdzenie urządzenia zakończone.")
return redirect(url_for('devices'))
@app.route('/device/<int:device_id>/update', methods=['POST'])
@login_required
def update_device(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
try:
app.logger.debug(f"Initiating system update for device {device.ip}")
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15,
ssl=device.use_ssl,
ssl_verify=not device.ssl_insecure
)
app.logger.debug("Connection established, starting update command")
if device.branch == 'stable':
list(api('/system/package/update/install'))
elif device.branch == 'dev':
list(api('/system/package/update/install', branch='dev'))
elif device.branch == 'beta':
list(api('/system/package/update/install', branch='beta'))
else:
list(api('/system/package/update/install'))
flash("Aktualizacja systemu została rozpoczęta.")
app.logger.debug("System update command executed successfully")
except Exception as e:
app.logger.error(f"Błąd podczas aktualizacji urządzenia {device.ip}: {e}", exc_info=True)
flash(f"Błąd podczas aktualizacji: {e}")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/device/<int:device_id>/update_firmware', methods=['POST'])
@login_required
def update_firmware(device_id):
device = Device.query.get_or_404(device_id)
if device.user_id != current_user.id:
flash("Brak dostępu.")
return redirect(url_for('devices'))
try:
api = librouteros.connect(
host=device.ip,
username=device.device_username,
password=device.device_password,
port=device.port,
timeout=15
)
# Przykładowa komenda aktualizacji firmware
list(api('/system/routerboard/upgrade'))
flash("Aktualizacja firmware została rozpoczęta.")
except Exception as e:
flash(f"Błąd podczas aktualizacji firmware: {e}")
return redirect(url_for('device_detail', device_id=device.id))
@app.route('/test_pushover', methods=['POST'])
@login_required
def test_pushover():
message = "To jest testowe powiadomienie Pushover z RouterOS Update."
send_pushover_notification(current_user, message)
flash("Test powiadomienia Pushover wysłany.")
return redirect(url_for('settings'))
@app.route('/test_email', methods=['POST'])
@login_required
def test_email():
subject = "Testowy E-mail z RouterOS Update"
message = "To jest testowa wiadomość e-mail wysłana z RouterOS Update."
send_email_notification(current_user, subject, message)
flash("Testowy e-mail wysłany.")
return redirect(url_for('settings'))
@app.route('/reset_password', methods=['GET', 'POST'])
@login_required
def reset_password():
if request.method == 'POST':
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm_password = request.form.get('confirm_password')
if not current_user.check_password(old_password):
flash("Stare hasło jest nieprawidłowe.")
return redirect(url_for('reset_password'))
if new_password != confirm_password:
flash("Nowe hasło i potwierdzenie nie są zgodne.")
return redirect(url_for('reset_password'))
current_user.set_password(new_password)
db.session.commit()
flash("Hasło zostało zresetowane.")
return redirect(url_for('reset_password'))
return render_template('reset_password.html')
@app.route('/logs/clean', methods=['POST'])
@login_required
def clean_logs():
days = request.form.get('days')
if not days:
flash("Podaj liczbę dni.")
return redirect(url_for('logs'))
try:
days = int(days)
except ValueError:
flash("Niepoprawna wartość dni.")
return redirect(url_for('logs'))
cutoff = datetime.utcnow() - timedelta(days=days)
num_deleted = Log.query.filter(Log.user_id == current_user.id, Log.timestamp < cutoff).delete()
db.session.commit()
flash(f"Usunięto {num_deleted} logów starszych niż {days} dni.")
return redirect(url_for('logs'))
# Zamknięcie harmonogramu przy zatrzymaniu aplikacji
atexit.register(lambda: scheduler.shutdown())
if __name__ == '__main__':
scheduler.add_job(func=clean_old_logs, trigger="interval", days=1)
app.run(host='0.0.0.0', port=5582, use_reloader=False, debug=True)