ssl_monitor/app.py
Mateusz Gruszczyński 2a065aba3b commit
2025-03-18 12:53:33 +01:00

788 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import ssl
import socket
from datetime import datetime, timedelta
import requests
from flask import Flask, render_template, request, redirect, url_for, session, jsonify, flash, current_app
from flask_sqlalchemy import SQLAlchemy
from apscheduler.schedulers.background import BackgroundScheduler
from flask_bcrypt import Bcrypt
import smtplib
import imaplib
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Globalna referencja do aplikacji przydatna dla scheduler'a
app_instance = None
BASEDIR = os.path.abspath(os.path.dirname(__file__))
DATABASE_FILE = os.path.join(BASEDIR, 'ssl_monitor.db')
# Globalna flaga, by operację utworzenia domyślnego użytkownika wykonać tylko raz
default_user_created = False
db = SQLAlchemy()
bcrypt = Bcrypt()
def create_app():
global app_instance
app = Flask(__name__)
# W produkcji ustaw SECRET_KEY przez zmienną środowiskową!
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'moj-klucz-session')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DATABASE_FILE}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Konfiguracja ciasteczek sesyjnych (zalecane przy HTTPS)
app.config['SESSION_COOKIE_SECURE'] = False
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
db.init_app(app)
bcrypt.init_app(app)
with app.app_context():
db.create_all()
app_instance = app
init_scheduler(app)
register_routes(app)
return app
###############################################################################
# MODELE
###############################################################################
class User(db.Model):
"""
Model użytkownika hasło przechowywane jako hash przy użyciu bcrypt.
"""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(50), unique=True, nullable=False)
password = db.Column(db.String(200), nullable=False)
class MonitoredService(db.Model):
id = db.Column(db.Integer, primary_key=True)
host = db.Column(db.String(256), nullable=False)
port = db.Column(db.Integer, default=443)
protocol = db.Column(db.String(50), default='https')
region = db.Column(db.String(100), nullable=True)
certificate_type = db.Column(db.String(50), nullable=True)
last_check = db.Column(db.DateTime)
expiry_date = db.Column(db.DateTime)
status = db.Column(db.String(20))
class Settings(db.Model):
id = db.Column(db.Integer, primary_key=True)
check_interval_minutes = db.Column(db.Integer, default=60)
pushover_enabled = db.Column(db.Boolean, default=False)
pushover_token = db.Column(db.String(200), nullable=True)
pushover_userkey = db.Column(db.String(200), nullable=True)
alert_threshold_30 = db.Column(db.Integer, default=30)
alert_threshold_14 = db.Column(db.Integer, default=14)
alert_threshold_7 = db.Column(db.Integer, default=7)
alert_repeat = db.Column(db.Boolean, default=False)
logs_retention_days = db.Column(db.Integer, default=30)
class History(db.Model):
id = db.Column(db.Integer, primary_key=True)
service_id = db.Column(db.Integer, db.ForeignKey('monitored_service.id'), nullable=False)
# Dodaj relację, by mieć dostęp do obiektu MonitoredService
service = db.relationship("MonitoredService", backref=db.backref("history_entries", lazy=True))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
status = db.Column(db.String(20))
expiry_date = db.Column(db.DateTime, nullable=True)
message = db.Column(db.String(500))
class AuditLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
operation = db.Column(db.String(50))
timestamp = db.Column(db.DateTime, default=datetime.utcnow)
details = db.Column(db.Text)
class UserPreferences(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True)
default_grouping = db.Column(db.String(50), default='protocol') # lub "none", "host", "region", itp.
# Możesz dodać więcej pól, np. zapisanych filtrów w formie JSON:
filters = db.Column(db.Text, default='{}')
###############################################################################
# FUNKCJE POMOCNICZE SPRAWDZANIE CERTYFIKATU
###############################################################################
def check_https_cert(host, port=443):
"""
Pobiera datę wygaśnięcia certyfikatu HTTPS.
"""
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
not_after = cert["notAfter"]
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
return expiry_date
def update_certs():
with app_instance.app_context():
services = MonitoredService.query.all()
settings = Settings.query.first()
for service in services:
try:
proto = service.protocol.lower().strip().replace(" ", "_")
if proto == 'https':
exp_date = check_https_cert(service.host, service.port)
elif proto == 'smtp_starttls':
exp_date = check_smtp_starttls_cert(service.host, service.port)
elif proto == 'smtp_ssl':
exp_date = check_smtp_ssl_cert(service.host, service.port)
elif proto == 'imap_starttls':
exp_date = check_imap_starttls_cert(service.host, service.port)
elif proto == 'imap_ssl':
exp_date = check_imap_ssl_cert(service.host, service.port)
else:
service.status = 'ProtocolNotImplemented'
db.session.commit()
continue
service.expiry_date = exp_date
service.last_check = datetime.now()
if exp_date < datetime.now():
new_status = 'Expired'
elif exp_date < datetime.now() + timedelta(days=7):
new_status = 'ExpiringSoon'
else:
new_status = 'OK'
old_status = service.status
service.status = new_status
db.session.commit()
# Zapis do historii
history = History(service_id=service.id, status=new_status, expiry_date=exp_date,
message=f"Sprawdzenie. Poprzedni status: {old_status}")
db.session.add(history)
db.session.commit()
# Sprawdź progi alertów (dla certyfikatów, które nie wygasły)
if new_status in ['ExpiringSoon', 'OK']:
days_left = (exp_date - datetime.now()).days
alert_sent = False # Możesz rozwinąć logikę, by zapamiętać wysłane alerty
if days_left <= settings.alert_threshold_7:
# Wysyłamy alert o 7-dniowym progu
send_pushover_message("user_key_placeholder", "api_token_placeholder",
f"Certyfikat dla {service.host} wygasa za {days_left} dni (7-dniowy próg).",
"Alert SSL Monitor")
alert_sent = True
elif days_left <= settings.alert_threshold_14:
send_pushover_message("user_key_placeholder", "api_token_placeholder",
f"Certyfikat dla {service.host} wygasa za {days_left} dni (14-dniowy próg).",
"Alert SSL Monitor")
alert_sent = True
elif days_left <= settings.alert_threshold_30:
send_pushover_message("user_key_placeholder", "api_token_placeholder",
f"Certyfikat dla {service.host} wygasa za {days_left} dni (30-dniowy próg).",
"Alert SSL Monitor")
alert_sent = True
# Jeśli alert_repeat jest False, możesz zapisać, że alert dla tego progu został wysłany,
# aby nie wysyłać go ponownie.
except Exception as e:
service.status = 'Error'
db.session.commit()
history = History(service_id=service.id, status="Error", expiry_date=None, message=str(e))
db.session.add(history)
db.session.commit()
def check_smtp_starttls_cert(host, port=587):
context = ssl.create_default_context()
server = smtplib.SMTP(host, port, timeout=10)
server.ehlo()
server.starttls(context=context)
cert = server.sock.getpeercert()
server.quit()
not_after = cert["notAfter"]
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
return expiry_date
def check_smtp_ssl_cert(host, port=465):
context = ssl.create_default_context()
server = smtplib.SMTP_SSL(host, port, timeout=10, context=context)
cert = server.sock.getpeercert()
server.quit()
not_after = cert["notAfter"]
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
return expiry_date
def check_imap_starttls_cert(host, port=143):
context = ssl.create_default_context()
imap = imaplib.IMAP4(host, port)
imap.starttls(ssl_context=context)
cert = imap.sock.getpeercert()
imap.logout()
not_after = cert["notAfter"]
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
return expiry_date
def check_imap_ssl_cert(host, port=993):
context = ssl.create_default_context()
imap = imaplib.IMAP4_SSL(host, port, ssl_context=context)
cert = imap.sock.getpeercert()
imap.logout()
not_after = cert["notAfter"]
expiry_date = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
return expiry_date
def get_service_response(service):
"""
Nawiązuje połączenie z serwerem dla danej usługi i zwraca pierwsze 1024 bajty odpowiedzi.
Dla HTTPS wysyła żądanie HEAD, dla pozostałych protokołów pobiera banner.
"""
host = service.host
port = service.port
proto = service.protocol.lower().strip().replace(" ", "_")
timeout = 10
if proto == "https":
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
# Wysyłamy żądanie HEAD, aby uzyskać nagłówki
request = f"HEAD / HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
ssock.send(request.encode())
data = ssock.recv(1024)
return data.decode(errors='replace')
elif proto in ["smtp_starttls", "smtp_ssl"]:
if proto == "smtp_ssl":
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
data = ssock.recv(1024)
return data.decode(errors='replace')
else:
# smtp_starttls: banner jest wysyłany w wersji niezaszyfrowanej
with socket.create_connection((host, port), timeout=timeout) as sock:
data = sock.recv(1024)
return data.decode(errors='replace')
elif proto in ["imap_starttls", "imap_ssl"]:
if proto == "imap_ssl":
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
data = ssock.recv(1024)
return data.decode(errors='replace')
else:
# imap_starttls: banner wysyłany jest bez szyfrowania
with socket.create_connection((host, port), timeout=timeout) as sock:
data = sock.recv(1024)
return data.decode(errors='replace')
else:
raise Exception("Protocol not supported for response retrieval")
def send_pushover_message(user_key, api_token, message, title="SSL Monitor Alert"):
"""
Wysyła powiadomienie przez Pushover przy użyciu requests.
"""
url = "https://api.pushover.net/1/messages.json"
data = {
"token": api_token,
"user": user_key,
"message": message,
"title": title
}
response = requests.post(url, data=data)
return response.json()
def get_cert_details(service):
"""
Pobiera szczegółowe informacje o certyfikacie (HTTPS).
"""
host = service.host
port = service.port
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
return cert # Zwracamy cały słownik certyfikatu
def get_cert_details(svc):
"""
Pobiera szczegółowe informacje o certyfikacie dla danej usługi HTTPS.
Zwraca certyfikat w formie słownika.
"""
host = svc.host
port = svc.port
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
return cert
def format_cert_details_as_table(cert_details):
def flatten_pairs(value):
"""Rekurencyjnie przetwarza zagnieżdżone struktury i zwraca listę napisów 'klucz: wartość'."""
result = []
if isinstance(value, (list, tuple)):
for item in value:
# Jeżeli mamy krotkę o długości 2, a pierwszy element to napis traktuj ją jako parę
if isinstance(item, (list, tuple)) and len(item) == 2 and isinstance(item[0], str):
result.append(f"{item[0]}: {item[1]}")
else:
result.extend(flatten_pairs(item))
else:
result.append(str(value))
return result
table = '<table class="table table-sm table-dark table-bordered">'
table += '<thead><tr><th>Klucz</th><th>Wartość</th></tr></thead><tbody>'
for key, value in cert_details.items():
if isinstance(value, (list, tuple)):
flat = flatten_pairs(value)
formatted_value = "<br>".join(flat) if flat else str(value)
else:
formatted_value = str(value)
table += f'<tr><th>{key}</th><td>{formatted_value}</td></tr>'
table += '</tbody></table>'
return table
def get_cert_chain_details(host, port=443):
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend())
# Możesz wyekstrahować informacje o certyfikacie:
details = {
"subject": cert.subject.rfc4514_string(),
"issuer": cert.issuer.rfc4514_string(),
"not_before": cert.not_valid_before.isoformat(),
"not_after": cert.not_valid_after.isoformat(),
"serial_number": str(cert.serial_number),
"signature_algorithm": cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else "Unknown",
# Dla SAN: cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
}
return details
def get_cert_chain_html(host, port=443):
context = ssl.create_default_context()
with socket.create_connection((host, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=host) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend())
# Pobranie Subject Alternative Names (SAN) jeśli są dostępne
try:
san_extension = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
san = ", ".join(san_extension.value.get_values_for_type(x509.DNSName))
except Exception:
san = "N/A"
# Można dodać inne rozszerzenia, np. Key Usage, Extended Key Usage, itd.
try:
key_usage = cert.extensions.get_extension_for_class(x509.KeyUsage).value
key_usage_str = ", ".join([
"digitalSignature" if key_usage.digital_signature else "",
"contentCommitment" if key_usage.content_commitment else "",
"keyEncipherment" if key_usage.key_encipherment else "",
"dataEncipherment" if key_usage.data_encipherment else "",
"keyAgreement" if key_usage.key_agreement else "",
"keyCertSign" if key_usage.key_cert_sign else "",
"crlSign" if key_usage.crl_sign else "",
"encipherOnly" if key_usage.encipher_only else "",
"decipherOnly" if key_usage.decipher_only else ""
]).strip(", ")
except Exception:
key_usage_str = "N/A"
details_html = f"""
<h5 class="mb-3">Szczegółowe informacje o certyfikacie</h5>
<table class="table table-sm table-dark">
<tr>
<th>Subject</th>
<td>{cert.subject.rfc4514_string()}</td>
</tr>
<tr>
<th>Issuer</th>
<td>{cert.issuer.rfc4514_string()}</td>
</tr>
<tr>
<th>Not Before</th>
<td>{cert.not_valid_before.strftime('%Y-%m-%d %H:%M:%S')}</td>
</tr>
<tr>
<th>Not After</th>
<td>{cert.not_valid_after.strftime('%Y-%m-%d %H:%M:%S')}</td>
</tr>
<tr>
<th>Serial Number</th>
<td>{cert.serial_number}</td>
</tr>
<tr>
<th>Signature Algorithm</th>
<td>{cert.signature_hash_algorithm.name if cert.signature_hash_algorithm else 'Unknown'}</td>
</tr>
<tr>
<th>Subject Alternative Names</th>
<td>{san}</td>
</tr>
<tr>
<th>Key Usage</th>
<td>{key_usage_str}</td>
</tr>
</table>
"""
return details_html
###############################################################################
# KONFIGURACJA SCHEDULERA
###############################################################################
def init_scheduler(flask_app):
global scheduler
scheduler = BackgroundScheduler()
with flask_app.app_context():
settings = Settings.query.first()
if not settings:
settings = Settings(check_interval_minutes=60)
db.session.add(settings)
db.session.commit()
interval = settings.check_interval_minutes
scheduler.add_job(func=update_certs, trigger="interval", minutes=interval)
scheduler.start()
###############################################################################
# TRASY I WIDOKI
###############################################################################
def register_routes(app):
# @app.before_request
# def ensure_default_user():
# """
# Przy pierwszym żądaniu tworzymy domyślnego użytkownika 'admin' z zahashowanym hasłem.
# """
# global default_user_created
# if not default_user_created:
# if not User.query.filter_by(username='admin').first():
# hashed_pw = bcrypt.generate_password_hash("admin").decode('utf-8')
# user = User(username='admin', password=hashed_pw)
# db.session.add(user)
# db.session.commit()
# default_user_created = True
@app.route('/')
def index():
if 'logged_in' in session and session['logged_in']:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and bcrypt.check_password_hash(user.password, password):
session['logged_in'] = True
session['username'] = user.username
session['user_id'] = user.id # << Dodaj tę linię
flash("Zalogowano poprawnie.", "success")
# Log operacji logowania
log = AuditLog(user_id=user.id, operation="login", details="Użytkownik zalogowany.")
db.session.add(log)
db.session.commit()
return redirect(url_for('dashboard'))
else:
flash("Błędne dane logowania.", "danger")
return redirect(url_for('login'))
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash("Wylogowano.", "success")
return redirect(url_for('login'))
@app.route('/dashboard')
def dashboard():
if 'logged_in' not in session or not session['logged_in']:
flash("Musisz się zalogować.", "warning")
return redirect(url_for('login'))
services = MonitoredService.query.all()
total = len(services)
expired = sum(1 for s in services if s.status == 'Expired')
exp_soon = sum(1 for s in services if s.status == 'ExpiringSoon')
ok = sum(1 for s in services if s.status == 'OK')
error = sum(1 for s in services if s.status == 'Error')
# Pobranie preferencji użytkownika
user = User.query.filter_by(username=session.get('username')).first()
prefs = UserPreferences.query.filter_by(user_id=user.id).first() if user else None
default_grouping = prefs.default_grouping if prefs else "protocol"
return render_template('dashboard.html',
services=services,
total=total, expired=expired,
exp_soon=exp_soon, ok=ok, error=error,
default_grouping=default_grouping)
@app.route('/api/services', methods=['GET'])
def api_get_services():
services = MonitoredService.query.all()
data = []
for s in services:
data.append({
'id': s.id,
'host': s.host,
'port': s.port,
'protocol': s.protocol,
'region': s.region, # Dodane pole region
'last_check': s.last_check.strftime('%Y-%m-%d %H:%M:%S') if s.last_check else None,
'expiry_date': s.expiry_date.strftime('%Y-%m-%d %H:%M:%S') if s.expiry_date else None,
'status': s.status
})
return jsonify(data)
@app.route('/api/services/add', methods=['POST'])
def api_add_service():
req = request.json
host = req.get('host')
port = req.get('port', 443)
protocol = req.get('protocol', 'https')
region = req.get('region', '') # Pobranie regionu z żądania
new_svc = MonitoredService(
host=host,
port=port,
protocol=protocol,
region=region, # Zapis regionu
status='Unknown'
)
db.session.add(new_svc)
db.session.commit()
return jsonify({'message': 'Service added'}), 200
@app.route('/api/services/delete/<int:service_id>', methods=['DELETE'])
def api_delete_service(service_id):
svc = MonitoredService.query.get_or_404(service_id)
db.session.delete(svc)
db.session.commit()
return jsonify({'message': 'Service deleted'}), 200
@app.route('/api/services/edit/<int:service_id>', methods=['POST'])
def api_edit_service(service_id):
req = request.json
svc = MonitoredService.query.get_or_404(service_id)
svc.host = req.get('host', svc.host)
svc.port = req.get('port', svc.port)
svc.protocol = req.get('protocol', svc.protocol)
# Pobierz region jeśli nie został podany, ustaw wartość domyślną (np. "default")
region = req.get('region', '').strip()
if not region:
region = "default"
svc.region = region
db.session.commit()
return jsonify({'message': 'Service updated'}), 200
@app.route('/api/services/update/<int:service_id>', methods=['POST'])
def api_update_service(service_id):
svc = MonitoredService.query.get_or_404(service_id)
try:
# Ujednolicenie protokołu
proto = svc.protocol.lower().strip().replace(" ", "_")
if proto == 'https':
exp_date = check_https_cert(svc.host, svc.port)
elif proto == 'smtp_starttls':
exp_date = check_smtp_starttls_cert(svc.host, svc.port)
elif proto == 'smtp_ssl':
exp_date = check_smtp_ssl_cert(svc.host, svc.port)
elif proto == 'imap_starttls':
exp_date = check_imap_starttls_cert(svc.host, svc.port)
elif proto == 'imap_ssl':
exp_date = check_imap_ssl_cert(svc.host, svc.port)
else:
svc.status = 'ProtocolNotImplemented'
db.session.commit()
return jsonify({'message': 'Protocol not implemented'}), 200
svc.expiry_date = exp_date
svc.last_check = datetime.now()
if exp_date < datetime.now():
svc.status = 'Expired'
elif exp_date < datetime.now() + timedelta(days=7):
svc.status = 'ExpiringSoon'
else:
svc.status = 'OK'
db.session.commit()
return jsonify({'message': 'Service updated'}), 200
except Exception as e:
svc.status = 'Error'
db.session.commit()
return jsonify({'message': 'Error updating service', 'error': str(e)}), 500
@app.route('/api/services/bulk_update', methods=['POST'])
def api_bulk_update():
update_certs()
return jsonify({'message': 'Bulk update completed'}), 200
@app.route('/settings', methods=['GET', 'POST'])
def app_settings():
if 'logged_in' not in session or not session['logged_in']:
flash("Musisz się zalogować.", "warning")
return redirect(url_for('login'))
s = Settings.query.first()
if not s:
s = Settings()
db.session.add(s)
db.session.commit()
if request.method == 'POST':
s.check_interval_minutes = int(request.form.get('check_interval_minutes', 60))
s.pushover_enabled = bool(request.form.get('pushover_enabled', False))
s.pushover_token = request.form.get('pushover_token', '')
s.pushover_userkey = request.form.get('pushover_userkey', '')
s.alert_threshold_30 = int(request.form.get('alert_threshold_info', 30))
s.alert_threshold_14 = int(request.form.get('alert_threshold_warning', 14))
s.alert_threshold_7 = int(request.form.get('alert_threshold_critical', 7))
s.alert_repeat = bool(request.form.get('alert_repeat', False))
# Pobierz wartość z nowego pola, domyślnie 30 dni
s.logs_retention_days = int(request.form.get('logs_retention_days', 30))
db.session.commit()
global scheduler
if scheduler:
scheduler.remove_all_jobs()
scheduler.add_job(func=update_certs,
trigger="interval",
minutes=s.check_interval_minutes)
# Usuwanie logów starszych niż podana liczba dni
cutoff_date = datetime.utcnow() - timedelta(days=s.logs_retention_days)
deleted_count = History.query.filter(History.timestamp < cutoff_date).delete()
db.session.commit()
flash(f"Ustawienia zapisane. Usunięto {deleted_count} logów starszych niż {s.logs_retention_days} dni.", "success")
return redirect(url_for('app_settings'))
return render_template('settings.html', settings=s)
@app.route('/api/service/response/<int:service_id>', methods=['GET'])
def api_service_response(service_id):
svc = MonitoredService.query.get_or_404(service_id)
try:
response_data = get_service_response(svc)
return jsonify({'response': response_data})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
confirm = request.form.get('confirm')
if password != confirm:
flash("Hasła nie są zgodne.", "danger")
return redirect(url_for('register'))
if User.query.filter_by(username=username).first():
flash("Użytkownik o tej nazwie już istnieje.", "danger")
return redirect(url_for('register'))
hashed_pw = bcrypt.generate_password_hash(password).decode('utf-8')
new_user = User(username=username, password=hashed_pw)
db.session.add(new_user)
db.session.commit()
# Rejestracja operacji w logu audytu
log = AuditLog(user_id=new_user.id, operation="register", details="Nowy użytkownik zarejestrowany.")
db.session.add(log)
db.session.commit()
flash("Rejestracja zakończona powodzeniem. Zaloguj się.", "success")
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/change_password', methods=['GET', 'POST'])
def change_password():
if 'logged_in' not in session or not session['logged_in']:
flash("Musisz być zalogowany.", "warning")
return redirect(url_for('login'))
if request.method == 'POST':
current_password = request.form.get('current_password')
new_password = request.form.get('new_password')
confirm = request.form.get('confirm')
user = User.query.filter_by(username=session.get('username')).first()
if not user or not bcrypt.check_password_hash(user.password, current_password):
flash("Błędne aktualne hasło.", "danger")
return redirect(url_for('change_password'))
if new_password != confirm:
flash("Nowe hasła nie są zgodne.", "danger")
return redirect(url_for('change_password'))
user.password = bcrypt.generate_password_hash(new_password).decode('utf-8')
db.session.commit()
# Log operacji zmiany hasła
log = AuditLog(user_id=user.id, operation="change_password", details="Zmiana hasła.")
db.session.add(log)
db.session.commit()
flash("Hasło zostało zmienione.", "success")
return redirect(url_for('dashboard'))
return render_template('change_password.html')
@app.route('/api/cert_details/<int:service_id>', methods=['GET'])
def api_cert_details(service_id):
svc = MonitoredService.query.get_or_404(service_id)
try:
proto = svc.protocol.lower().strip().replace(" ", "_")
if proto != "https":
return jsonify({"error": "Szczegółowe informacje dostępne tylko dla HTTPS."}), 400
details = get_cert_details(svc)
html_table = format_cert_details_as_table(details)
return jsonify({"html": html_table})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/history')
def history():
history_records = History.query.order_by(History.timestamp.desc()).all()
return render_template('history.html', history=history_records)
@app.route('/preferences', methods=['GET', 'POST'])
def preferences():
if 'logged_in' not in session or not session['logged_in']:
flash("Musisz się zalogować.", "warning")
return redirect(url_for('login'))
user = User.query.filter_by(username=session.get('username')).first()
prefs = UserPreferences.query.filter_by(user_id=user.id).first()
if not prefs:
prefs = UserPreferences(user_id=user.id)
db.session.add(prefs)
db.session.commit()
if request.method == 'POST':
prefs.default_grouping = request.form.get('default_grouping', 'protocol')
# Przyjmijmy, że filtrujemy jako JSON np. {"region": "EU", "certificate_type": "DV"}
prefs.filters = request.form.get('filters', '{}')
db.session.commit()
flash("Preferencje widoku zapisane.", "success")
return redirect(url_for('dashboard'))
return render_template('preferences.html', prefs=prefs)
@app.route('/api/cert_chain/<int:service_id>', methods=['GET'])
def api_cert_chain(service_id):
svc = MonitoredService.query.get_or_404(service_id)
try:
proto = svc.protocol.lower().strip().replace(" ", "_")
if proto != "https":
return jsonify({"error": "Łańcuch certyfikatów dostępny tylko dla HTTPS."}), 400
html_details = get_cert_chain_html(svc.host, svc.port)
return jsonify({"html": html_details})
except Exception as e:
return jsonify({"error": str(e)}), 500
###############################################################################
# URUCHAMIANIE APLIKACJI (tryb deweloperski)
###############################################################################
if __name__ == '__main__':
app = create_app()
app.run(debug=True)