1849 lines
77 KiB
Python
1849 lines
77 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, session, Response
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import os, paramiko, threading, time, io, tempfile, csv
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from datetime import datetime, timezone, timedelta
|
||
from io import StringIO
|
||
import socket
|
||
import ipaddress
|
||
import difflib
|
||
from croniter import croniter
|
||
from tzlocal import get_localzone
|
||
from werkzeug.serving import WSGIRequestHandler
|
||
|
||
WSGIRequestHandler.server_version = ""
|
||
WSGIRequestHandler.sys_version = ""
|
||
|
||
app = Flask(__name__)
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
|
||
app.config['SECRET_KEY'] = 'supersecretkey'
|
||
db = SQLAlchemy(app)
|
||
|
||
# MODELE
|
||
class User(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||
password = db.Column(db.String(200), nullable=False)
|
||
hosts = db.relationship('Host', backref='user', lazy=True)
|
||
hostfiles = db.relationship('HostFile', backref='user', lazy=True)
|
||
settings = db.relationship('UserSettings', backref='user', uselist=False)
|
||
|
||
class Host(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
hostname = db.Column(db.String(255), nullable=False)
|
||
username = db.Column(db.String(100), nullable=False)
|
||
password = db.Column(db.String(200), nullable=False)
|
||
type = db.Column(db.String(50), default='linux') # 'linux' albo 'mikrotik'
|
||
auth_method = db.Column(db.String(20), default='password')
|
||
private_key = db.Column(db.Text, nullable=True)
|
||
key_passphrase = db.Column(db.String(200), nullable=True)
|
||
port = db.Column(db.Integer, default=22)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
auto_deploy_enabled = db.Column(db.Boolean, default=True)
|
||
auto_backup_enabled = db.Column(db.Boolean, default=True)
|
||
preferred_hostfile_id = db.Column(db.Integer, db.ForeignKey('host_file.id'), nullable=True)
|
||
preferred_hostfile = db.relationship('HostFile', foreign_keys=[preferred_hostfile_id])
|
||
use_daemon = db.Column(db.Boolean, default=False)
|
||
daemon_url = db.Column(db.String(255), nullable=True)
|
||
daemon_token = db.Column(db.String(255), nullable=True)
|
||
disable_regex_deploy = db.Column(db.Boolean, default=False)
|
||
disable_local_default = db.Column(db.Boolean, default=False)
|
||
|
||
@property
|
||
def resolved_hostname(self):
|
||
try:
|
||
return socket.gethostbyaddr(self.hostname)[0]
|
||
except Exception:
|
||
return self.hostname
|
||
|
||
@property
|
||
def resolved_daemon(self):
|
||
if self.daemon_url:
|
||
try:
|
||
daemon_str = self.daemon_url.split("://")[-1]
|
||
daemon_ip = daemon_str.split(":")[0]
|
||
return socket.gethostbyaddr(daemon_ip)[0]
|
||
except Exception:
|
||
return daemon_ip
|
||
return ""
|
||
|
||
@property
|
||
def raw_ip(self):
|
||
if self.use_daemon and self.type == 'linux' and self.daemon_url:
|
||
daemon_str = self.daemon_url.split("://")[-1]
|
||
daemon_ip = daemon_str.split(":")[0]
|
||
return daemon_ip
|
||
return self.hostname
|
||
|
||
class DeployLog(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
|
||
details = db.Column(db.Text, nullable=False)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
user = db.relationship('User', backref='deploy_logs', lazy=True)
|
||
|
||
class HostFile(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
title = db.Column(db.String(100), nullable=False, default='Default Hosts')
|
||
content = db.Column(db.Text, nullable=False)
|
||
|
||
class UserSettings(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
|
||
auto_deploy_enabled = db.Column(db.Boolean, default=False)
|
||
#deploy_interval = db.Column(db.Integer, default=60) # interwał wdrożeń (minuty)
|
||
#backup_interval = db.Column(db.Integer, default=60) # interwał backupów (minuty)
|
||
deploy_cron = db.Column(db.String(100), default="12 12 * * *")
|
||
backup_cron = db.Column(db.String(100), default="12 12 * * *")
|
||
auto_backup_enabled = db.Column(db.Boolean, default=False)
|
||
last_deploy_time = db.Column(db.DateTime, nullable=True)
|
||
regex_deploy_enabled = db.Column(db.Boolean, default=True)
|
||
backup_retention_days = db.Column(db.Integer, default=0)
|
||
global_ssh_key = db.Column(db.Text, nullable=True)
|
||
global_key_passphrase = db.Column(db.String(200), nullable=True)
|
||
|
||
class Backup(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
host_id = db.Column(db.Integer, db.ForeignKey('host.id'), nullable=True)
|
||
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
|
||
content = db.Column(db.Text, nullable=False)
|
||
description = db.Column(db.String(255), nullable=True)
|
||
host = db.relationship('Host', backref='backups', lazy=True)
|
||
|
||
class RegexHostEntry(db.Model):
|
||
__tablename__ = 'regex_host_entry'
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
cidr_range = db.Column(db.String(50), nullable=False) # np. '10.87.200.0/27'
|
||
gateway_ip = db.Column(db.String(50), nullable=True) # np. '10.87.200.30'
|
||
gateway_hostname = db.Column(db.String(255), nullable=True) # np. 'gw'
|
||
domain_suffix = db.Column(db.String(255), nullable=False, default="guest.domain.com")
|
||
host_prefix = db.Column(db.String(255), nullable=False, default="user")
|
||
use_gateway_ip = db.Column(db.Boolean, default=False)
|
||
comment = db.Column(db.String(255), nullable=True)
|
||
user = db.relationship('User', backref='regex_entries')
|
||
class HostFileVersion(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
hostfile_id = db.Column(db.Integer, db.ForeignKey('host_file.id'), nullable=False)
|
||
content = db.Column(db.Text, nullable=False)
|
||
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
|
||
hostfile = db.relationship('HostFile', backref=db.backref('versions', lazy=True))
|
||
|
||
class LocalDefaultEntry(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
ip_address = db.Column(db.String(50), nullable=False)
|
||
hostname = db.Column(db.String(255), nullable=False)
|
||
dynamic_variable = db.Column(db.String(255), nullable=True)
|
||
entry = db.Column(db.Text, nullable=False) # To pole już istnieje
|
||
|
||
user = db.relationship('User', backref='local_defaults')
|
||
|
||
def formatted_entry(self, variables={}):
|
||
entry_content = f"{self.ip_address} {self.hostname}"
|
||
if self.dynamic_variable:
|
||
dynamic_content = self.dynamic_variable
|
||
for key, value in variables.items():
|
||
placeholder = f"${{{key}}}"
|
||
dynamic_content = dynamic_content.replace(placeholder, value)
|
||
entry_content += f" {dynamic_content}"
|
||
return entry_content
|
||
|
||
class UserDynamicVariables(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
|
||
variable_name = db.Column(db.String(255), nullable=False)
|
||
variable_value = db.Column(db.String(255), nullable=False)
|
||
|
||
user = db.relationship('User', backref='dynamic_variables')
|
||
|
||
# Funkcje pomocnicze
|
||
def get_user_dynamic_variables(user_id):
|
||
user_variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
|
||
return {var.variable_name: var.variable_value for var in user_variables}
|
||
|
||
def ensure_local_defaults(content, user_id):
|
||
default_entries = LocalDefaultEntry.query.filter_by(user_id=user_id).all()
|
||
required_lines = []
|
||
|
||
for entry in default_entries:
|
||
if entry.ip_address and entry.hostname:
|
||
required_lines.append(f"{entry.ip_address} {entry.hostname}".strip())
|
||
|
||
lines = [l.rstrip() for l in content.splitlines() if l.strip()]
|
||
lines = [line for line in lines if not any(line.startswith(e.split()[0]) for e in required_lines)]
|
||
|
||
lines = required_lines + lines
|
||
|
||
final_content = "\n".join(lines) + "\n"
|
||
|
||
return final_content
|
||
|
||
def format_host(host):
|
||
resolved_name = None
|
||
# Priorytet dla Linux Daemon
|
||
if host.use_daemon and host.type == 'linux' and host.daemon_url:
|
||
resolved_name = host.resolved_daemon or host.hostname
|
||
|
||
# Dla standardowych hostów używamy resolved_hostname
|
||
if not resolved_name:
|
||
resolved_name = host.resolved_hostname or host.hostname
|
||
|
||
# Jeśli resolved_name nadal jest IP, spróbuj rozwiązać przez DNS
|
||
if resolved_name == host.raw_ip:
|
||
try:
|
||
resolved_name = socket.gethostbyaddr(host.raw_ip)[0]
|
||
except (socket.herror, socket.gaierror):
|
||
pass # Jeśli nie można rozwiązać, pozostaw IP
|
||
|
||
return f"{resolved_name} ({host.raw_ip})"
|
||
|
||
def get_user_dynamic_variables(user_id):
|
||
user_variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
|
||
variables = {var.variable_name: var.variable_value for var in user_variables}
|
||
|
||
# Automatyczne dodanie wartości systemowych
|
||
variables["hostname"] = socket.gethostname() # Nazwa hosta
|
||
variables["resolved_hostname"] = socket.getfqdn() # Pełna nazwa hosta
|
||
return variables
|
||
|
||
|
||
def wrap_content_with_comments(content):
|
||
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||
header_comment = f"# Auto-hosts upload: {now_str}\n"
|
||
footer_comment = f"\n# End of auto-hosts upload: {now_str}\n"
|
||
return header_comment + content + footer_comment
|
||
|
||
def open_ssh_connection(host_obj):
|
||
ssh = paramiko.SSHClient()
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
if host_obj.auth_method in ['ssh_key', 'global_key']:
|
||
if host_obj.auth_method == 'ssh_key':
|
||
key_str = host_obj.private_key
|
||
key_passphrase = host_obj.key_passphrase if host_obj.key_passphrase else None
|
||
else: # global_key
|
||
# Pobieramy globalny klucz z ustawień użytkownika
|
||
user_settings = UserSettings.query.filter_by(user_id=host_obj.user_id).first()
|
||
if not user_settings or not user_settings.global_ssh_key:
|
||
raise Exception("Globalny klucz SSH nie został ustawiony w ustawieniach.")
|
||
key_str = user_settings.global_ssh_key
|
||
key_passphrase = user_settings.global_key_passphrase if user_settings.global_key_passphrase else None
|
||
|
||
key_file_obj = io.StringIO(key_str)
|
||
try:
|
||
pkey = paramiko.RSAKey.from_private_key(key_file_obj, password=key_passphrase)
|
||
except paramiko.SSHException as e:
|
||
raise Exception(f"Error reading private key: {str(e)}")
|
||
ssh.connect(
|
||
hostname=host_obj.hostname,
|
||
port=host_obj.port,
|
||
username=host_obj.username,
|
||
pkey=pkey,
|
||
timeout=10,
|
||
banner_timeout=30
|
||
)
|
||
else:
|
||
ssh.connect(
|
||
hostname=host_obj.hostname,
|
||
port=host_obj.port,
|
||
username=host_obj.username,
|
||
password=host_obj.password,
|
||
timeout=10,
|
||
banner_timeout=30
|
||
)
|
||
return ssh
|
||
|
||
def get_statistics(user_id):
|
||
user = db.session.get(User, user_id)
|
||
host_count = Host.query.filter_by(user_id=user_id).count()
|
||
logs = DeployLog.query.all()
|
||
filtered_logs = [log for log in logs if f"for user {user_id}" in log.details or f"for {user.username}" in log.details]
|
||
total_deployments = len(filtered_logs)
|
||
successful_deployments = len([log for log in filtered_logs if "Updated" in log.details or "Deployed" in log.details])
|
||
failed_deployments = len([log for log in filtered_logs if "Failed" in log.details])
|
||
return {
|
||
"host_count": host_count,
|
||
"total_deployments": total_deployments,
|
||
"successful_deployments": successful_deployments,
|
||
"failed_deployments": failed_deployments
|
||
}
|
||
|
||
def automated_backup_for_host(host):
|
||
try:
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
url = host.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": host.daemon_token}
|
||
resp = requests.get(url, headers=headers, timeout=10, verify=False)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
|
||
data = resp.json()
|
||
content = data.get("hosts", "")
|
||
# Używamy format_host, aby log był zgodny z deploy
|
||
backup_info = f"[BACKUP] Automatic backup created for server {format_host(host)}"
|
||
else:
|
||
if host.type == 'mikrotik':
|
||
ssh = open_ssh_connection(host)
|
||
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
|
||
content = stdout.read().decode('utf-8')
|
||
ssh.close()
|
||
backup_info = f"[BACKUP] Automatic backup created for server {format_host(host)}"
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
sftp = ssh.open_sftp()
|
||
with sftp.open('/etc/hosts', 'r') as remote_file:
|
||
content = remote_file.read().decode('utf-8')
|
||
sftp.close()
|
||
ssh.close()
|
||
backup_info = f"[BACKUP] Automatic backup created for server {format_host(host)}"
|
||
|
||
backup = Backup(
|
||
user_id=host.user_id,
|
||
host_id=host.id,
|
||
content=content,
|
||
description=f'Backup from server {format_host(host)} at {datetime.now(timezone.utc).isoformat()}'
|
||
)
|
||
db.session.add(backup)
|
||
db.session.commit()
|
||
|
||
log_entry = DeployLog(details=backup_info, user_id=host.user_id)
|
||
db.session.add(log_entry)
|
||
db.session.commit()
|
||
|
||
print(f'Automated backup for server {format_host(host)} created successfully.')
|
||
except Exception as e:
|
||
print(f'Error creating automated backup for server {format_host(host)}: {str(e)}')
|
||
|
||
def automated_backups():
|
||
with app.app_context():
|
||
now = datetime.now(timezone.utc)
|
||
hosts = Host.query.all()
|
||
for host in hosts:
|
||
# Dodaj warunek: backup dla danego hosta ma być wykonywany tylko, jeśli jest włączony
|
||
if not host.auto_backup_enabled:
|
||
continue
|
||
|
||
settings = UserSettings.query.filter_by(user_id=host.user_id).first()
|
||
if not settings or not settings.auto_backup_enabled or not settings.backup_cron:
|
||
continue
|
||
# Pobieramy ostatni backup dla hosta
|
||
last_backup = Backup.query.filter_by(user_id=host.user_id, host_id=host.id)\
|
||
.order_by(Backup.created_at.desc()).first()
|
||
if last_backup:
|
||
base_time = last_backup.created_at
|
||
if base_time.tzinfo is None:
|
||
base_time = base_time.replace(tzinfo=timezone.utc)
|
||
else:
|
||
base_time = datetime.now(timezone.utc) - timedelta(minutes=1)
|
||
cron = croniter(settings.backup_cron, base_time)
|
||
next_backup_time = cron.get_next(datetime)
|
||
if now >= next_backup_time:
|
||
automated_backup_for_host(host)
|
||
db.session.commit()
|
||
|
||
def wrap_content_with_comments(content):
|
||
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
|
||
header_comment = f"# Auto-hosts upload: {now_str}\n"
|
||
footer_comment = f"\n# End of auto-hosts upload: {now_str}\n"
|
||
return header_comment + content + footer_comment
|
||
|
||
def wrap_mikrotik_content(content):
|
||
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H:%M:%S_UTC")
|
||
header = f"127.0.0.99 #Auto-hosts_upload:{now_str}"
|
||
footer = f"127.0.0.199 #End_of_auto-hosts_upload:{now_str}"
|
||
return header + "\n" + content + "\n" + footer
|
||
|
||
def clear_linux(host, content):
|
||
"""Zastępuje /etc/hosts na hoście Linux zawartością `content`."""
|
||
ssh = open_ssh_connection(host)
|
||
import tempfile, os
|
||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
|
||
tmpf.write(content)
|
||
tmp_file_path = tmpf.name
|
||
sftp = ssh.open_sftp()
|
||
sftp.put(tmp_file_path, '/etc/hosts')
|
||
sftp.close()
|
||
ssh.close()
|
||
os.remove(tmp_file_path)
|
||
|
||
def clear_mikrotik(host):
|
||
ssh = open_ssh_connection(host)
|
||
ssh.exec_command("/ip dns static remove [find]")
|
||
ssh.close()
|
||
|
||
def generate_regex_hosts(user_id):
|
||
entries = RegexHostEntry.query.filter_by(user_id=user_id).all()
|
||
lines = []
|
||
|
||
for entry in entries:
|
||
try:
|
||
network = ipaddress.ip_network(entry.cidr_range, strict=False)
|
||
all_hosts = list(network.hosts()) # IP w puli
|
||
|
||
host_index = 1
|
||
for ip_addr in all_hosts:
|
||
if (entry.use_gateway_ip
|
||
and entry.gateway_ip
|
||
and str(ip_addr) == entry.gateway_ip):
|
||
if entry.gateway_hostname:
|
||
hostname = f"{entry.gateway_hostname}.{entry.domain_suffix}"
|
||
else:
|
||
hostname = f"gw.{entry.domain_suffix}"
|
||
lines.append(f"{ip_addr} {hostname}")
|
||
else:
|
||
hostname = f"{entry.host_prefix}{host_index}.{entry.domain_suffix}"
|
||
lines.append(f"{ip_addr} {hostname}")
|
||
host_index += 1
|
||
except ValueError:
|
||
pass
|
||
return "\n".join(lines) + "\n" if lines else ""
|
||
|
||
def cleanup_old_backups():
|
||
|
||
with app.app_context():
|
||
all_settings = UserSettings.query.all()
|
||
for setting in all_settings:
|
||
days = setting.backup_retention_days or 0
|
||
if days > 0:
|
||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
|
||
old_backups = Backup.query.filter(
|
||
Backup.user_id == setting.user_id,
|
||
Backup.created_at < cutoff_date
|
||
).all()
|
||
for b in old_backups:
|
||
db.session.delete(b)
|
||
db.session.commit()
|
||
|
||
def format_host(host):
|
||
if host.use_daemon and host.type == 'linux' and host.daemon_url:
|
||
resolved_name = host.resolved_daemon or host.hostname
|
||
else:
|
||
resolved_name = host.resolved_hostname or host.hostname
|
||
return f"{resolved_name} ({host.raw_ip})"
|
||
|
||
|
||
# -------------------
|
||
# LOGOWANIE, REJESTRACJA, ZMIANA HASŁA
|
||
# -------------------
|
||
@app.route('/')
|
||
def index():
|
||
if 'user_id' in session:
|
||
return redirect(url_for('dashboard'))
|
||
return redirect(url_for('login'))
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = request.form['password']
|
||
user = User.query.filter_by(username=username).first()
|
||
if user and check_password_hash(user.password, password):
|
||
session['user_id'] = user.id
|
||
flash('Login successful', 'success')
|
||
return redirect(url_for('dashboard'))
|
||
flash('Invalid credentials', 'danger')
|
||
return render_template('login.html')
|
||
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password_hash = generate_password_hash(request.form['password'])
|
||
user = User(username=username, password=password_hash)
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
flash('Registration successful. Please log in.', 'success')
|
||
return redirect(url_for('login'))
|
||
return render_template('register.html')
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
session.pop('user_id', None)
|
||
flash('Logged out', 'info')
|
||
return redirect(url_for('login'))
|
||
|
||
@app.route('/change-password', methods=['GET', 'POST'])
|
||
def change_password():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
if request.method == 'POST':
|
||
user = db.session.get(User, session['user_id'])
|
||
new_password = generate_password_hash(request.form['password'])
|
||
user.password = new_password
|
||
db.session.commit()
|
||
flash('Password changed successfully', 'success')
|
||
return redirect(url_for('change_password'))
|
||
return render_template('change_password.html')
|
||
|
||
# -------------------
|
||
# ZARZĄDZANIE SERWERAMI
|
||
# -------------------
|
||
|
||
@app.route('/add_server', methods=['GET', 'POST'])
|
||
def add_server():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
# Pobieramy wszystkie pliki hosts dla użytkownika (do wyboru preferowanego)
|
||
user_hostfiles = HostFile.query.filter_by(user_id=session['user_id']).all()
|
||
|
||
if request.method == 'POST':
|
||
hostname = request.form['hostname']
|
||
username = request.form['username']
|
||
password_val = request.form['password']
|
||
host_type = request.form.get('host_type', 'linux')
|
||
auth_method = request.form.get('auth_method', 'password')
|
||
private_key = request.form.get('private_key', '').strip()
|
||
key_passphrase = request.form.get('key_passphrase', '').strip()
|
||
port_str = request.form.get('port', '22')
|
||
try:
|
||
port = int(port_str)
|
||
except ValueError:
|
||
port = 22
|
||
|
||
# Używamy danych dla demona, jeśli checkbox zaznaczony
|
||
use_daemon = bool(request.form.get('use_daemon'))
|
||
daemon_url = request.form.get('daemon_url', '').strip()
|
||
daemon_token = request.form.get('daemon_token', '').strip()
|
||
|
||
# Dla metod 'ssh_key' i 'global_key' dane są rozróżniane:
|
||
if auth_method == 'ssh_key':
|
||
stored_key = private_key if private_key else None
|
||
stored_passphrase = key_passphrase if key_passphrase else None
|
||
elif auth_method == 'global_key':
|
||
# W przypadku global_key dane lokalne nie są zapisywane – będą pobierane z ustawień użytkownika
|
||
stored_key = None
|
||
stored_passphrase = None
|
||
else:
|
||
stored_key = None
|
||
stored_passphrase = None
|
||
|
||
# Obsługa preferowanego pliku hosts
|
||
preferred_file_id_str = request.form.get('preferred_hostfile_id', '').strip()
|
||
if preferred_file_id_str == '':
|
||
chosen_file_id = None
|
||
else:
|
||
try:
|
||
chosen_file_id = int(preferred_file_id_str)
|
||
except ValueError:
|
||
chosen_file_id = None
|
||
|
||
host = Host(
|
||
hostname=hostname,
|
||
username=username,
|
||
password=password_val,
|
||
type=host_type,
|
||
auth_method=auth_method,
|
||
private_key=stored_key,
|
||
key_passphrase=stored_passphrase,
|
||
port=port,
|
||
user_id=session['user_id'],
|
||
use_daemon=use_daemon if host_type == 'linux' else False,
|
||
daemon_url=daemon_url if (use_daemon and host_type == 'linux') else None,
|
||
daemon_token=daemon_token if (use_daemon and host_type == 'linux') else None,
|
||
preferred_hostfile_id=chosen_file_id
|
||
)
|
||
|
||
db.session.add(host)
|
||
db.session.commit()
|
||
flash('Host added successfully', 'success')
|
||
return redirect(url_for('server_list'))
|
||
|
||
return render_template('add_server.html', user_hostfiles=user_hostfiles)
|
||
|
||
@app.route('/delete-server/<int:id>')
|
||
def delete_server(id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, id)
|
||
if host and host.user_id == session['user_id']:
|
||
db.session.delete(host)
|
||
db.session.commit()
|
||
flash('Host deleted', 'info')
|
||
else:
|
||
flash('Host not found or unauthorized', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
|
||
@app.route('/server-list', methods=['GET'])
|
||
def server_list():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hosts = Host.query.filter_by(user_id=session['user_id']).all()
|
||
return render_template('server_list.html', hosts=hosts)
|
||
|
||
@app.route('/edit-server/<int:id>', methods=['GET', 'POST'])
|
||
def edit_server(id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Server not found or unauthorized', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
|
||
# Pobieramy listę plików hosts dla użytkownika (do wyboru preferowanego)
|
||
user_hostfiles = HostFile.query.filter_by(user_id=session['user_id']).all()
|
||
|
||
if request.method == 'POST':
|
||
host.hostname = request.form['hostname']
|
||
host.username = request.form['username']
|
||
new_password = request.form['password'] or ''
|
||
if new_password:
|
||
host.password = new_password
|
||
|
||
port_str = request.form.get('port', '22')
|
||
try:
|
||
host.port = int(port_str)
|
||
except ValueError:
|
||
host.port = 22
|
||
|
||
host.type = request.form.get('host_type', 'linux')
|
||
host.auth_method = request.form.get('auth_method', 'password')
|
||
|
||
if host.auth_method == 'ssh_key':
|
||
new_private_key = request.form.get('private_key', '').strip()
|
||
new_passphrase = request.form.get('key_passphrase', '').strip()
|
||
if new_private_key:
|
||
host.private_key = new_private_key
|
||
if new_passphrase:
|
||
host.key_passphrase = new_passphrase
|
||
elif host.auth_method == 'global_key':
|
||
# Dla global_key wyczyścimy lokalne pola – dane będą pobierane z ustawień
|
||
host.private_key = None
|
||
host.key_passphrase = None
|
||
|
||
use_daemon = bool(request.form.get('use_daemon'))
|
||
daemon_url = request.form.get('daemon_url', '').strip()
|
||
daemon_token = request.form.get('daemon_token', '').strip()
|
||
if host.type == 'linux' and use_daemon:
|
||
host.use_daemon = True
|
||
host.daemon_url = daemon_url
|
||
host.daemon_token = daemon_token
|
||
else:
|
||
host.use_daemon = False
|
||
host.daemon_url = None
|
||
host.daemon_token = None
|
||
|
||
preferred_file_id_str = request.form.get('preferred_hostfile_id', '').strip()
|
||
if preferred_file_id_str == '':
|
||
host.preferred_hostfile_id = None
|
||
else:
|
||
try:
|
||
host.preferred_hostfile_id = int(preferred_file_id_str)
|
||
except ValueError:
|
||
host.preferred_hostfile_id = None
|
||
|
||
db.session.commit()
|
||
flash('Server updated successfully', 'success')
|
||
return redirect(url_for('server_list'))
|
||
|
||
return render_template('edit_server.html', host=host, user_hostfiles=user_hostfiles)
|
||
|
||
# -------------------
|
||
# TESTOWANIE POŁĄCZENIA SSH DLA HOSTA
|
||
# -------------------
|
||
@app.route('/test-server-connection/<int:id>', methods=['GET'])
|
||
def test_server_connection(id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Host not found or unauthorized', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
try:
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
headers = {"Authorization": host.daemon_token}
|
||
health_url = host.daemon_url.rstrip('/') + '/health'
|
||
resp = requests.get(health_url, headers=headers, verify=False, timeout=5)
|
||
if resp.status_code == 200:
|
||
flash(f'Demon connection successful (health OK) for {format_host(host)}', 'success')
|
||
else:
|
||
raise Exception(f"Demon health check returned {resp.status_code}")
|
||
sysinfo_url = host.daemon_url.rstrip('/') + '/system-info'
|
||
sysinfo_resp = requests.get(sysinfo_url, headers=headers, verify=False, timeout=5)
|
||
if sysinfo_resp.status_code == 200:
|
||
info = sysinfo_resp.json()
|
||
msg = (f"System-info for {format_host(host)}: "
|
||
f"CPU={info.get('cpu_percent')}%, "
|
||
f"MEM={info.get('memory_percent')}%, "
|
||
f"DISK={info.get('disk_percent')}%, "
|
||
f"UPTIME={info.get('uptime_seconds')}s")
|
||
flash(msg, 'info')
|
||
else:
|
||
raise Exception(f"Demon system-info returned {sysinfo_resp.status_code}")
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
ssh.close()
|
||
flash(f'SSH connection to {format_host(host)} successful.', 'success')
|
||
except Exception as e:
|
||
flash(f'Connection failed for {format_host(host)}: {str(e)}', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
|
||
# -------------------
|
||
# ROUTE: CZYSZCZENIE HOSTS - CAŁA GRUPA
|
||
# -------------------
|
||
@app.route('/clear-server', methods=['GET', 'POST'])
|
||
def clear_server():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hosts = Host.query.filter_by(user_id=session['user_id']).all()
|
||
return render_template('clear_servers.html', hosts=hosts, format_host=format_host)
|
||
|
||
@app.route('/clear-single-server/<int:host_id>', methods=['POST'])
|
||
def clear_single_server(host_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, host_id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Host not found or unauthorized', 'danger')
|
||
return redirect(url_for('clear_servers'))
|
||
|
||
default_content = ensure_local_defaults("")
|
||
try:
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
url = host.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": host.daemon_token}
|
||
# Zakładamy, że demon potrafi zastąpić /etc/hosts treścią "default_content"
|
||
resp = requests.post(url, json={"hosts": default_content},
|
||
headers=headers, verify=False, timeout=10)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon update error: {resp.status_code} - {resp.text}")
|
||
|
||
elif host.type == 'mikrotik':
|
||
clear_mikrotik(host)
|
||
else:
|
||
# standard linux
|
||
clear_linux(host, default_content)
|
||
|
||
flash(f'Cleared host: {host.hostname}', 'success')
|
||
except Exception as e:
|
||
flash(f'Error clearing host {host.hostname}: {str(e)}', 'danger')
|
||
|
||
return redirect(url_for('clear_all_server'))
|
||
|
||
@app.route('/clear-all-server', methods=['GET', 'POST'])
|
||
def clear_all_server():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hosts = Host.query.filter_by(user_id=session['user_id']).all()
|
||
|
||
if request.method == 'POST':
|
||
linux_clear = request.form.get('linux')
|
||
mikrotik_clear = request.form.get('mikrotik')
|
||
default_content = ensure_local_defaults("")
|
||
|
||
for h in hosts:
|
||
try:
|
||
if h.type == 'linux' and linux_clear:
|
||
if h.use_daemon:
|
||
import requests
|
||
url = h.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": h.daemon_token}
|
||
resp = requests.post(url, json={"hosts": default_content},
|
||
headers=headers, verify=False, timeout=10)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon update error: {resp.status_code} - {resp.text}")
|
||
else:
|
||
clear_linux(h, default_content)
|
||
flash(f'Cleared Linux host: {h.hostname}', 'success')
|
||
|
||
elif h.type == 'mikrotik' and mikrotik_clear:
|
||
clear_mikrotik(h)
|
||
flash(f'Cleared Mikrotik host: {h.hostname}', 'success')
|
||
|
||
except Exception as e:
|
||
flash(f'Error clearing host {h.hostname}: {str(e)}', 'danger')
|
||
|
||
return redirect(url_for('clear_all_server'))
|
||
|
||
return render_template('clear_servers.html', hosts=hosts)
|
||
|
||
# -------------------
|
||
# ZARZĄDZANIE PLIKAMI HOSTS (WIELOKROTNE PLIKI)
|
||
# -------------------
|
||
@app.route('/hosts-files')
|
||
def list_hosts_files():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
files = HostFile.query.filter_by(user_id=session['user_id']).all()
|
||
return render_template('hosts_files.html', files=files)
|
||
|
||
@app.route('/hosts-files/new', methods=['GET', 'POST'])
|
||
def new_hosts_file():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
if request.method == 'POST':
|
||
title = request.form['title']
|
||
content = request.form['content']
|
||
new_file = HostFile(user_id=session['user_id'], title=title, content=content)
|
||
db.session.add(new_file)
|
||
db.session.commit()
|
||
flash('Hosts file created', 'success')
|
||
return redirect(url_for('list_hosts_files'))
|
||
return render_template('new_edit_hosts_file.html', file=None)
|
||
|
||
@app.route('/hosts-files/<int:file_id>/edit', methods=['GET', 'POST'])
|
||
def edit_hosts_file(file_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
file = db.session.get(HostFile, file_id)
|
||
if not file or file.user_id != session['user_id']:
|
||
flash('File not found or unauthorized', 'danger')
|
||
return redirect(url_for('list_hosts_files'))
|
||
if request.method == 'POST':
|
||
new_title = request.form['title']
|
||
new_content = request.form['content']
|
||
if file.content != new_content:
|
||
version = HostFileVersion(hostfile_id=file.id, content=file.content)
|
||
db.session.add(version)
|
||
file.title = new_title
|
||
file.content = new_content
|
||
db.session.commit()
|
||
flash('Hosts file updated and previous version saved.', 'success')
|
||
return redirect(url_for('list_hosts_files'))
|
||
return render_template('new_edit_hosts_file.html', file=file)
|
||
|
||
@app.route('/hosts-files/<int:file_id>/delete', methods=['GET'])
|
||
def delete_hosts_file(file_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
file = db.session.get(HostFile, file_id)
|
||
if not file or file.user_id != session['user_id']:
|
||
flash('File not found or unauthorized', 'danger')
|
||
else:
|
||
db.session.delete(file)
|
||
db.session.commit()
|
||
flash('Hosts file deleted', 'info')
|
||
return redirect(url_for('list_hosts_files'))
|
||
|
||
# -------------------
|
||
# WDROŻENIE WYBRANEGO PLIKU HOSTS NA WYBRANE SERWERY
|
||
# -------------------
|
||
|
||
|
||
@app.route('/deploy-hosts-file/<int:file_id>', methods=['GET', 'POST'])
|
||
def deploy_hosts_file(file_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
file = db.session.get(HostFile, file_id)
|
||
if not file or file.user_id != session['user_id']:
|
||
flash('File not found or unauthorized', 'danger')
|
||
return redirect(url_for('list_hosts_files'))
|
||
hosts = Host.query.filter_by(user_id=session['user_id']).all()
|
||
|
||
# Dla hostów korzystających z demona – obliczamy IP oraz rozwiązaną nazwę
|
||
for host in hosts:
|
||
if host.use_daemon and host.type == 'linux' and host.daemon_url:
|
||
daemon_str = host.daemon_url.split("://")[-1]
|
||
daemon_ip = daemon_str.split(":")[0]
|
||
host.daemon_ip = daemon_ip
|
||
try:
|
||
resolved_daemon = socket.gethostbyaddr(daemon_ip)[0]
|
||
except Exception:
|
||
resolved_daemon = daemon_ip
|
||
host._resolved_daemon_local = resolved_daemon
|
||
|
||
if request.method == 'POST':
|
||
selected_host_ids = request.form.getlist('hosts')
|
||
for host in hosts:
|
||
if str(host.id) in selected_host_ids:
|
||
try:
|
||
adjusted_content = ensure_local_defaults(file.content)
|
||
wrapped_content = wrap_content_with_comments(adjusted_content)
|
||
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
url = host.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": host.daemon_token}
|
||
resp = requests.post(url, json={"hosts": wrapped_content},
|
||
headers=headers, timeout=10, verify=False)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon POST error: {resp.status_code} - {resp.text}")
|
||
log_details = f'[LINUX/DAEMON] Updated {format_host(host)}'
|
||
db.session.add(DeployLog(details=log_details, user_id=session['user_id']))
|
||
elif host.type == 'mikrotik':
|
||
wrapped_mikro = wrap_mikrotik_content(file.content)
|
||
deploy_mikrotik(host, wrapped_mikro)
|
||
log_details = f'[MIKROTIK] Updated {format_host(host)}'
|
||
db.session.add(DeployLog(details=log_details, user_id=session['user_id']))
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
|
||
tmpf.write(wrapped_content)
|
||
tmp_file_path = tmpf.name
|
||
sftp = ssh.open_sftp()
|
||
sftp.put(tmp_file_path, '/etc/hosts')
|
||
sftp.close()
|
||
ssh.close()
|
||
os.remove(tmp_file_path)
|
||
log_details = f'[LINUX] Updated {format_host(host)}'
|
||
db.session.add(DeployLog(details=log_details, user_id=session['user_id']))
|
||
db.session.commit()
|
||
flash(f'Deployed file "{file.title}" to {format_host(host)}', 'success')
|
||
except Exception as e:
|
||
flash(f'Error deploying file "{file.title}" to {format_host(host)}: {str(e)}', 'danger')
|
||
return redirect(url_for('list_hosts_files'))
|
||
|
||
return render_template('deploy_hosts_file.html', file=file, hosts=hosts)
|
||
|
||
# -------------------
|
||
# BACKUP
|
||
# -------------------
|
||
|
||
@app.route('/server-backup/<int:host_id>', methods=['GET'])
|
||
def server_backup(host_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, host_id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Host not found or unauthorized', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
try:
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
url = host.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": host.daemon_token}
|
||
resp = requests.get(url, headers=headers, timeout=10, verify=False)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
|
||
data = resp.json()
|
||
content = data.get("hosts", "")
|
||
description = f'Backup from server {format_host(host)} at {datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")}'
|
||
elif host.type == 'mikrotik':
|
||
ssh = open_ssh_connection(host)
|
||
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
|
||
content = stdout.read().decode('utf-8')
|
||
ssh.close()
|
||
description = f'Backup from server {format_host(host)}'
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
sftp = ssh.open_sftp()
|
||
with sftp.open('/etc/hosts', 'r') as remote_file:
|
||
content = remote_file.read().decode('utf-8')
|
||
sftp.close()
|
||
ssh.close()
|
||
description = f'Backup from server {format_host(host)}'
|
||
backup = Backup(
|
||
user_id=session['user_id'],
|
||
host_id=host.id,
|
||
content=content,
|
||
description=description
|
||
)
|
||
db.session.add(backup)
|
||
db.session.commit()
|
||
flash(f'Backup for host {format_host(host)} created successfully.', 'success')
|
||
except Exception as e:
|
||
flash(f'Error creating backup for host {format_host(host)}: {str(e)}', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
|
||
@app.route('/backups')
|
||
def backups():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
backups = Backup.query.filter_by(user_id=session['user_id']).order_by(Backup.created_at.desc()).all()
|
||
return render_template('backups.html', backups=backups)
|
||
|
||
@app.route('/restore-backup/<int:backup_id>', methods=['GET'])
|
||
def restore_backup(backup_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
backup = db.session.get(Backup, backup_id)
|
||
if not backup or backup.user_id != session['user_id']:
|
||
flash('Backup not found or unauthorized', 'danger')
|
||
return redirect(url_for('backups'))
|
||
if backup.host_id:
|
||
host = db.session.get(Host, backup.host_id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Associated host not found or unauthorized', 'danger')
|
||
return redirect(url_for('backups'))
|
||
try:
|
||
if host.type == 'mikrotik':
|
||
ssh = open_ssh_connection(host)
|
||
ssh.exec_command("/ip dns static remove [find]")
|
||
import time
|
||
time.sleep(1)
|
||
commands = []
|
||
for line in backup.content.splitlines():
|
||
line = line.strip()
|
||
if line.startswith("add "):
|
||
commands.append("/ip dns static " + line)
|
||
full_command = " ; ".join(commands)
|
||
ssh.exec_command(full_command)
|
||
ssh.close()
|
||
flash(f'Backup restored to {format_host(host)} successfully.', 'success')
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
sftp = ssh.open_sftp()
|
||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
|
||
tmpf.write(backup.content)
|
||
tmp_file_path = tmpf.name
|
||
sftp.put(tmp_file_path, '/etc/hosts')
|
||
sftp.close()
|
||
ssh.close()
|
||
os.remove(tmp_file_path)
|
||
flash(f'Backup restored to {format_host(host)} successfully.', 'success')
|
||
except Exception as e:
|
||
flash(f'Error restoring backup to {format_host(host)}: {str(e)}', 'danger')
|
||
else:
|
||
hostfile = HostFile.query.filter_by(user_id=session['user_id'], title="Default Hosts").first()
|
||
if not hostfile:
|
||
hostfile = HostFile(user_id=session['user_id'], title="Default Hosts", content=backup.content)
|
||
db.session.add(hostfile)
|
||
else:
|
||
hostfile.content = backup.content
|
||
db.session.commit()
|
||
deploy_user(session['user_id'])
|
||
flash('Backup restored to default configuration and deployed successfully.', 'success')
|
||
return redirect(url_for('backups'))
|
||
|
||
@app.route('/view-backup/<int:backup_id>', methods=['GET'])
|
||
def view_backup(backup_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
backup = db.session.get(Backup, backup_id)
|
||
if not backup or backup.user_id != session['user_id']:
|
||
flash('Backup not found or unauthorized', 'danger')
|
||
return redirect(url_for('backups'))
|
||
host = None
|
||
if backup.host_id:
|
||
host = db.session.get(Host, backup.host_id)
|
||
return render_template('view_backup.html', backup=backup, host=host)
|
||
|
||
@app.route('/backup-all', methods=['GET'])
|
||
def backup_all():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
hosts = Host.query.filter_by(user_id=user_id).all()
|
||
for host in hosts:
|
||
try:
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
url = host.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": host.daemon_token}
|
||
resp = requests.get(url, headers=headers, timeout=10, verify=False)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
|
||
data = resp.json()
|
||
content = data.get("hosts", "")
|
||
description = f'Backup from server {format_host(host)}'
|
||
elif host.type == 'mikrotik':
|
||
ssh = open_ssh_connection(host)
|
||
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
|
||
content = stdout.read().decode('utf-8')
|
||
ssh.close()
|
||
description = f'Backup from server {format_host(host)}'
|
||
else:
|
||
ssh = open_ssh_connection(host)
|
||
sftp = ssh.open_sftp()
|
||
with sftp.open('/etc/hosts', 'r') as remote_file:
|
||
content = remote_file.read().decode('utf-8')
|
||
sftp.close()
|
||
ssh.close()
|
||
description = f'Backup from server {format_host(host)}'
|
||
backup = Backup(
|
||
user_id=user_id,
|
||
host_id=host.id,
|
||
content=content,
|
||
description=description
|
||
)
|
||
db.session.add(backup)
|
||
db.session.commit()
|
||
except Exception as e:
|
||
flash(f'Error creating backup for {format_host(host)}: {str(e)}', 'danger')
|
||
flash('Backup for all hosts created successfully.', 'success')
|
||
return redirect(url_for('backups'))
|
||
# -------------------
|
||
# IMPORT/EXPORT HOSTÓW
|
||
# -------------------
|
||
@app.route('/export-servers-to-csv', methods=['GET'])
|
||
def export_servers_to_csv():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
hosts = Host.query.filter_by(user_id=user_id).all()
|
||
si = StringIO()
|
||
cw = csv.writer(si)
|
||
# Dodajemy wszystkie istotne pola
|
||
cw.writerow([
|
||
'id', 'hostname', 'username', 'password', 'port', 'type',
|
||
'auth_method', 'private_key', 'key_passphrase', 'auto_deploy_enabled',
|
||
'auto_backup_enabled', 'preferred_hostfile_id', 'use_daemon', 'daemon_url', 'daemon_token'
|
||
])
|
||
for host in hosts:
|
||
cw.writerow([
|
||
host.id,
|
||
host.hostname,
|
||
host.username,
|
||
host.password,
|
||
host.port,
|
||
host.type,
|
||
host.auth_method,
|
||
host.private_key or '',
|
||
host.key_passphrase or '',
|
||
host.auto_deploy_enabled,
|
||
host.auto_backup_enabled,
|
||
host.preferred_hostfile_id if host.preferred_hostfile_id is not None else '',
|
||
host.use_daemon,
|
||
host.daemon_url or '',
|
||
host.daemon_token or ''
|
||
])
|
||
output = si.getvalue()
|
||
return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=servers_full.csv"})
|
||
|
||
@app.route('/import-servers', methods=['GET', 'POST'])
|
||
def import_servers():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
if request.method == 'POST':
|
||
file = request.files.get('file')
|
||
if not file:
|
||
flash('No file uploaded', 'danger')
|
||
return redirect(url_for('import_servers'))
|
||
stream = StringIO(file.stream.read().decode("UTF8"), newline=None)
|
||
csv_input = csv.reader(stream)
|
||
header = next(csv_input) # zakładamy, że pierwszy wiersz zawiera nagłówki
|
||
for row in csv_input:
|
||
# Sprawdzamy, czy wiersz zawiera wszystkie wymagane kolumny (w tym przypadku 15)
|
||
if len(row) < 15:
|
||
continue
|
||
# Rozpakowywanie wiersza zgodnie z kolejnością kolumn w eksporcie
|
||
(_, hostname, username, password_val, port_str, host_type, auth_method,
|
||
private_key, key_passphrase, auto_deploy_enabled, auto_backup_enabled,
|
||
preferred_hostfile_id, use_daemon, daemon_url, daemon_token) = row
|
||
|
||
try:
|
||
port = int(port_str)
|
||
except ValueError:
|
||
port = 22
|
||
|
||
# Konwersja wartości logicznych
|
||
auto_deploy_enabled = auto_deploy_enabled.lower() in ['true', '1', 'yes']
|
||
auto_backup_enabled = auto_backup_enabled.lower() in ['true', '1', 'yes']
|
||
use_daemon = use_daemon.lower() in ['true', '1', 'yes']
|
||
|
||
try:
|
||
preferred_hostfile_id = int(preferred_hostfile_id) if preferred_hostfile_id else None
|
||
except ValueError:
|
||
preferred_hostfile_id = None
|
||
|
||
host = Host(
|
||
hostname=hostname,
|
||
username=username,
|
||
password=password_val,
|
||
port=port,
|
||
type=host_type,
|
||
auth_method=auth_method,
|
||
private_key=private_key if private_key else None,
|
||
key_passphrase=key_passphrase if key_passphrase else None,
|
||
auto_deploy_enabled=auto_deploy_enabled,
|
||
auto_backup_enabled=auto_backup_enabled,
|
||
preferred_hostfile_id=preferred_hostfile_id,
|
||
use_daemon=use_daemon,
|
||
daemon_url=daemon_url if daemon_url else None,
|
||
daemon_token=daemon_token if daemon_token else None,
|
||
user_id=session['user_id']
|
||
)
|
||
db.session.add(host)
|
||
db.session.commit()
|
||
flash('Hosts imported successfully', 'success')
|
||
return redirect(url_for('server_list'))
|
||
return render_template('import_servers.html')
|
||
|
||
@app.route('/clear-host/<int:id>', methods=['GET'])
|
||
def clear_host(id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Host not found or unauthorized', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
try:
|
||
if host.type == 'linux':
|
||
default_content = ensure_local_defaults("")
|
||
clear_linux(host, default_content)
|
||
elif host.type == 'mikrotik':
|
||
clear_mikrotik(host)
|
||
flash(f'Cleared host: {host.hostname}', 'success')
|
||
except Exception as e:
|
||
flash(f'Error clearing host {host.hostname}: {str(e)}', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
|
||
# -------------------
|
||
# STRONA USTAWIEŃ (SETTINGS)
|
||
# -------------------
|
||
@app.route('/settings', methods=['GET', 'POST'])
|
||
def settings():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
user_settings = UserSettings.query.filter_by(user_id=user_id).first()
|
||
if not user_settings:
|
||
user_settings = UserSettings(user_id=user_id)
|
||
db.session.add(user_settings)
|
||
db.session.commit()
|
||
|
||
if request.method == 'POST':
|
||
auto_deploy = request.form.get('auto_deploy')
|
||
deploy_cron = request.form.get('deploy_cron')
|
||
auto_backup = request.form.get('auto_backup')
|
||
backup_cron = request.form.get('backup_cron')
|
||
enable_regex_entries = request.form.get('enable_regex_entries')
|
||
retention_val = request.form.get('backup_retention_days', '0')
|
||
|
||
# Pobierz wartości globalnego klucza SSH z formularza
|
||
global_ssh_key = request.form.get('global_ssh_key')
|
||
global_key_passphrase = request.form.get('global_key_passphrase')
|
||
|
||
# Walidacja wyrażeń cron przy pomocy croniter
|
||
try:
|
||
croniter(deploy_cron)
|
||
except Exception as e:
|
||
flash("Błędne wyrażenie cron dla deploy: " + str(e), "danger")
|
||
return redirect(url_for('settings'))
|
||
try:
|
||
croniter(backup_cron)
|
||
except Exception as e:
|
||
flash("Błędne wyrażenie cron dla backup: " + str(e), "danger")
|
||
return redirect(url_for('settings'))
|
||
|
||
user_settings.auto_deploy_enabled = bool(auto_deploy)
|
||
user_settings.auto_backup_enabled = bool(auto_backup)
|
||
user_settings.deploy_cron = deploy_cron if deploy_cron else "12 12 * * *"
|
||
user_settings.backup_cron = backup_cron if backup_cron else "12 12 * * *"
|
||
user_settings.regex_deploy_enabled = bool(enable_regex_entries)
|
||
try:
|
||
user_settings.backup_retention_days = int(retention_val)
|
||
except ValueError:
|
||
user_settings.backup_retention_days = 0
|
||
|
||
# Zapis globalnego klucza SSH i passphrase
|
||
user_settings.global_ssh_key = global_ssh_key
|
||
user_settings.global_key_passphrase = global_key_passphrase
|
||
|
||
db.session.commit()
|
||
flash('Settings updated', 'success')
|
||
return redirect(url_for('settings'))
|
||
|
||
return render_template('settings.html', settings=user_settings)
|
||
|
||
@app.route('/delete-backup/<int:backup_id>', methods=['POST'])
|
||
def delete_backup(backup_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
backup = Backup.query.get(backup_id)
|
||
if not backup or backup.user_id != session['user_id']:
|
||
flash('Backup not found or unauthorized', 'danger')
|
||
return redirect(url_for('backups'))
|
||
db.session.delete(backup)
|
||
db.session.commit()
|
||
flash('Backup deleted successfully', 'info')
|
||
return redirect(url_for('backups'))
|
||
|
||
# -------------------
|
||
# DEPLOYMENT DOMYŚLNY DLA UŻYTKOWNIKA
|
||
# -------------------
|
||
|
||
def deploy_user(user_id):
|
||
user_settings = UserSettings.query.filter_by(user_id=user_id).first()
|
||
default_file = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first()
|
||
if not default_file:
|
||
return
|
||
|
||
regex_lines = ""
|
||
if user_settings and user_settings.regex_deploy_enabled:
|
||
regex_lines = generate_regex_hosts(user_id)
|
||
|
||
hosts = Host.query.filter_by(user_id=user_id).all()
|
||
for h in hosts:
|
||
if not h.auto_deploy_enabled:
|
||
continue
|
||
|
||
# Pobranie pliku hosts wybranego dla serwera
|
||
if h.preferred_hostfile_id:
|
||
chosen_file = HostFile.query.filter_by(id=h.preferred_hostfile_id, user_id=user_id).first()
|
||
if not chosen_file:
|
||
chosen_file = default_file
|
||
else:
|
||
chosen_file = default_file
|
||
|
||
# 🛠 Jeśli disable_local_default jest włączone → wgraj tylko wybrany plik hosts
|
||
if h.disable_local_default:
|
||
final_content = chosen_file.content.strip()
|
||
else:
|
||
# W przeciwnym wypadku dodaj regex i ensure_local_defaults
|
||
final_content = chosen_file.content.strip()
|
||
|
||
if not h.disable_regex_deploy and regex_lines.strip():
|
||
final_content = regex_lines + "\n" + final_content
|
||
|
||
final_content = ensure_local_defaults(final_content, user_id)
|
||
|
||
try:
|
||
# 🖥 Wgrywanie na Mikrotik
|
||
if h.type == 'mikrotik':
|
||
wrapped_content = wrap_mikrotik_content(final_content)
|
||
deploy_mikrotik(h, wrapped_content)
|
||
log_details = f'[MIKROTIK] Updated {format_host(h)} for user {user_id}'
|
||
|
||
# 🖥 Wgrywanie na Linux Daemon
|
||
elif h.use_daemon and h.type == 'linux':
|
||
import requests
|
||
wrapped_content = wrap_content_with_comments(final_content)
|
||
url = h.daemon_url.rstrip('/') + '/hosts'
|
||
headers = {"Authorization": h.daemon_token}
|
||
resp = requests.post(url, json={"hosts": wrapped_content}, headers=headers, timeout=10, verify=False)
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Daemon POST error: {resp.status_code} - {resp.text}")
|
||
log_details = f'[LINUX/DAEMON] Updated {format_host(h)} for user {user_id}'
|
||
|
||
# 🖥 Wgrywanie na standardowy Linux przez SSH
|
||
else:
|
||
ssh = open_ssh_connection(h)
|
||
wrapped_content = wrap_content_with_comments(final_content)
|
||
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
|
||
tmpf.write(wrapped_content)
|
||
tmp_file_path = tmpf.name
|
||
sftp = ssh.open_sftp()
|
||
sftp.put(tmp_file_path, '/etc/hosts')
|
||
sftp.close()
|
||
ssh.close()
|
||
os.remove(tmp_file_path)
|
||
log_details = f'[LINUX] Updated {format_host(h)} for user {user_id}'
|
||
|
||
# Logowanie wdrożenia
|
||
db.session.add(DeployLog(details=log_details, user_id=user_id))
|
||
db.session.commit()
|
||
|
||
except Exception as e:
|
||
error_log = f'Failed to update {format_host(h)}: {str(e)} for user {user_id}'
|
||
db.session.add(DeployLog(details=error_log, user_id=user_id))
|
||
db.session.commit()
|
||
|
||
|
||
def deploy_mikrotik(host, hosts_content):
|
||
ssh = open_ssh_connection(host)
|
||
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
|
||
exported = stdout.read().decode('utf-8').splitlines()
|
||
existing_dns = {}
|
||
|
||
for line in exported:
|
||
line = line.strip()
|
||
if not line.startswith('add '):
|
||
continue
|
||
line = line[4:].strip()
|
||
parts = line.split()
|
||
address_val = None
|
||
name_val = None
|
||
for part in parts:
|
||
if part.startswith('address='):
|
||
address_val = part.replace('address=', '')
|
||
elif part.startswith('name='):
|
||
name_val = part.replace('name=', '')
|
||
if address_val and name_val:
|
||
existing_dns[name_val] = address_val
|
||
|
||
desired_dns = {}
|
||
for line in hosts_content.splitlines():
|
||
line = line.strip()
|
||
if (not line
|
||
or line.startswith('#')
|
||
or 'Auto-hosts_upload:' in line
|
||
or 'End_of_auto-hosts_upload:' in line):
|
||
continue
|
||
|
||
parts = line.split()
|
||
if len(parts) < 2:
|
||
continue
|
||
|
||
ip_address = parts[0]
|
||
hostnames = parts[1:]
|
||
for hname in hostnames:
|
||
desired_dns[hname] = ip_address
|
||
|
||
for name_val, ip_val in desired_dns.items():
|
||
if name_val not in existing_dns:
|
||
add_cmd = f"/ip dns static add address={ip_val} name={name_val}"
|
||
ssh.exec_command(add_cmd)
|
||
else:
|
||
current_ip = existing_dns[name_val]
|
||
if current_ip != ip_val:
|
||
remove_cmd = f"/ip dns static remove [find where name={name_val}]"
|
||
ssh.exec_command(remove_cmd)
|
||
add_cmd = f"/ip dns static add address={ip_val} name={name_val}"
|
||
ssh.exec_command(add_cmd)
|
||
for existing_name, existing_ip in existing_dns.items():
|
||
if existing_name not in desired_dns:
|
||
remove_cmd = f"/ip dns static remove [find where name={existing_name}]"
|
||
ssh.exec_command(remove_cmd)
|
||
|
||
ssh.close()
|
||
|
||
@app.route('/deploy-now')
|
||
def deploy_now():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
deploy_user(session['user_id'])
|
||
flash('Deployment complete', 'success')
|
||
return redirect(url_for('dashboard'))
|
||
|
||
# -------------------
|
||
# DASHBOARD ZE STATYSTYKAMI
|
||
# -------------------
|
||
|
||
@app.route('/dashboard')
|
||
def dashboard():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
user = db.session.get(User, user_id)
|
||
logs = DeployLog.query.filter_by(user_id=user_id).order_by(DeployLog.timestamp.desc()).all()
|
||
stats = get_statistics(user_id)
|
||
for log in logs:
|
||
log.details = log.details.replace(f" for user {user_id}", "")
|
||
#pass
|
||
|
||
return render_template('dashboard.html', user=user, logs=logs, stats=stats)
|
||
|
||
# -------------------
|
||
# SCHEDULER - AUTOMATYCZNE WDROŻENIA
|
||
# -------------------
|
||
|
||
@app.route('/regex-hosts')
|
||
def list_regex_hosts():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
entries = RegexHostEntry.query.filter_by(user_id=user_id).all()
|
||
return render_template('list_regex_hosts.html', entries=entries)
|
||
|
||
@app.route('/regex-hosts/new', methods=['GET', 'POST'])
|
||
def new_regex_host():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
|
||
if request.method == 'POST':
|
||
cidr_range = request.form.get('cidr_range', '').strip()
|
||
gateway_ip = request.form.get('gateway_ip', '').strip()
|
||
gateway_hostname = request.form.get('gateway_hostname', '').strip()
|
||
domain_suffix = request.form.get('domain_suffix', '').strip()
|
||
host_prefix = request.form.get('host_prefix', '').strip()
|
||
use_gateway_ip = bool(request.form.get('use_gateway_ip'))
|
||
comment = request.form.get('comment', '').strip()
|
||
|
||
if not cidr_range:
|
||
flash('Please provide a CIDR range (e.g. 10.87.200.0/27).', 'danger')
|
||
return redirect(url_for('new_regex_host'))
|
||
|
||
if not domain_suffix:
|
||
domain_suffix = "domain.com"
|
||
|
||
if not host_prefix:
|
||
host_prefix = "ip"
|
||
|
||
entry = RegexHostEntry(
|
||
user_id=user_id,
|
||
cidr_range=cidr_range,
|
||
gateway_ip=gateway_ip,
|
||
gateway_hostname=gateway_hostname,
|
||
domain_suffix=domain_suffix,
|
||
host_prefix=host_prefix,
|
||
use_gateway_ip=use_gateway_ip,
|
||
comment=comment
|
||
)
|
||
db.session.add(entry)
|
||
db.session.commit()
|
||
|
||
flash('New CIDR entry with optional gateway has been added.', 'success')
|
||
return redirect(url_for('list_regex_hosts'))
|
||
|
||
return render_template('new_edit_regex_host.html', entry=None)
|
||
|
||
@app.route('/regex-hosts/<int:entry_id>/edit', methods=['GET', 'POST'])
|
||
def edit_regex_host(entry_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
entry = db.session.get(RegexHostEntry, entry_id)
|
||
if not entry or entry.user_id != session['user_id']:
|
||
flash('cant find row or row is empty', 'danger')
|
||
return redirect(url_for('list_regex_hosts'))
|
||
if request.method == 'POST':
|
||
cidr_range = request.form.get('cidr_range', '').strip()
|
||
gateway_ip = request.form.get('gateway_ip', '').strip()
|
||
gateway_hostname = request.form.get('gateway_hostname', '').strip()
|
||
domain_suffix = request.form.get('domain_suffix', '').strip()
|
||
host_prefix = request.form.get('host_prefix', '').strip()
|
||
use_gateway_ip = bool(request.form.get('use_gateway_ip'))
|
||
comment = request.form.get('comment', '').strip()
|
||
|
||
if not cidr_range:
|
||
flash('CIDR is required', 'danger')
|
||
return redirect(url_for('edit_regex_host', entry_id=entry_id))
|
||
|
||
entry.cidr_range = cidr_range
|
||
entry.gateway_ip = gateway_ip
|
||
entry.gateway_hostname = gateway_hostname
|
||
entry.domain_suffix = domain_suffix or "domain.com"
|
||
entry.host_prefix = host_prefix or "ip"
|
||
entry.use_gateway_ip = use_gateway_ip
|
||
entry.comment = comment
|
||
|
||
db.session.commit()
|
||
flash('Updated', 'success')
|
||
return redirect(url_for('list_regex_hosts'))
|
||
|
||
return render_template('new_edit_regex_host.html', entry=entry)
|
||
|
||
@app.route('/regex-hosts/<int:entry_id>/delete', methods=['POST'])
|
||
def delete_regex_host(entry_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
entry = db.session.get(RegexHostEntry, entry_id)
|
||
if not entry or entry.user_id != session['user_id']:
|
||
flash('cant find row or row is empty', 'danger')
|
||
return redirect(url_for('list_regex_hosts'))
|
||
db.session.delete(entry)
|
||
db.session.commit()
|
||
flash('Row deleted', 'info')
|
||
return redirect(url_for('list_regex_hosts'))
|
||
|
||
@app.route('/delete-selected-backups', methods=['POST'])
|
||
def delete_selected_backups():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
selected_ids = request.form.getlist('selected_backups')
|
||
for backup_id in selected_ids:
|
||
backup = db.session.get(Backup, backup_id)
|
||
if backup and backup.user_id == session['user_id']:
|
||
db.session.delete(backup)
|
||
db.session.commit()
|
||
flash('Zaznaczone backupy zostały usunięte.', 'info')
|
||
return redirect(url_for('backups'))
|
||
|
||
@app.route('/update-host-automation/<int:id>', methods=['POST'])
|
||
def update_host_automation(id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
host = db.session.get(Host, id)
|
||
if not host or host.user_id != session['user_id']:
|
||
flash('Serwer nie istnieje lub nie masz uprawnień', 'danger')
|
||
return redirect(url_for('server_list'))
|
||
setting = request.form.get('setting')
|
||
enabled = request.form.get('enabled') == '1'
|
||
if setting == 'auto_deploy':
|
||
host.auto_deploy_enabled = enabled
|
||
elif setting == 'auto_backup':
|
||
host.auto_backup_enabled = enabled
|
||
elif setting == 'disable_regex':
|
||
host.disable_regex_deploy = enabled
|
||
elif setting == 'disable_local_default':
|
||
host.disable_local_default = enabled
|
||
db.session.commit()
|
||
flash('Ustawienia automatyzacji zostały zaktualizowane.', 'success')
|
||
return redirect(url_for('server_list'))
|
||
|
||
# -------------------
|
||
# EDYCJA LOKALNEGO PLIKU HOSTS
|
||
# -------------------
|
||
|
||
@app.route('/edit-local-hosts', methods=['GET', 'POST'], endpoint='edit_local_hosts')
|
||
def edit_local_hosts():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
user_id = session['user_id']
|
||
hostfile = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first()
|
||
if not hostfile:
|
||
default_content = "# This is a sample hosts file.\n127.0.0.1 localhost\n"
|
||
hostfile = HostFile(user_id=user_id, title="Default Hosts", content=default_content)
|
||
db.session.add(hostfile)
|
||
db.session.commit()
|
||
if request.method == 'POST':
|
||
new_content = request.form['hosts_content']
|
||
# Zapisz obecną wersję do historii przed zmianą
|
||
version = HostFileVersion(hostfile_id=hostfile.id, content=hostfile.content)
|
||
db.session.add(version)
|
||
# Aktualizacja treści
|
||
hostfile.content = new_content
|
||
db.session.commit()
|
||
flash('Local hosts content updated successfully and previous version saved.', 'success')
|
||
return render_template('edit_hosts.html', content=hostfile.content, hostfile=hostfile)
|
||
|
||
|
||
@app.route('/hostfile/<int:hostfile_id>/versions', methods=['GET', 'POST'])
|
||
def hostfile_versions(hostfile_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hostfile = HostFile.query.get(hostfile_id)
|
||
if not hostfile or hostfile.user_id != session['user_id']:
|
||
flash('Hostfile not found or unauthorized', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
|
||
if request.method == 'POST':
|
||
selected_ids = request.form.getlist('selected_versions')
|
||
for version_id in selected_ids:
|
||
version = HostFileVersion.query.get(version_id)
|
||
if version and version.hostfile.user_id == session['user_id']:
|
||
db.session.delete(version)
|
||
db.session.commit()
|
||
flash('Wybrane wersje zostały usunięte.', 'info')
|
||
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
|
||
|
||
versions = HostFileVersion.query.filter_by(hostfile_id=hostfile.id)\
|
||
.order_by(HostFileVersion.timestamp.desc()).all()
|
||
return render_template('hostfile_versions.html', hostfile=hostfile, versions=versions)
|
||
|
||
@app.route('/hostfile/<int:hostfile_id>/versions/delete_old/<int:days>')
|
||
def delete_old_versions(hostfile_id, days):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hostfile = HostFile.query.get(hostfile_id)
|
||
if not hostfile or hostfile.user_id != session['user_id']:
|
||
flash('Hostfile not found or unauthorized', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
|
||
old_versions = HostFileVersion.query.filter(HostFileVersion.hostfile_id == hostfile_id,
|
||
HostFileVersion.timestamp < cutoff).all()
|
||
for version in old_versions:
|
||
db.session.delete(version)
|
||
db.session.commit()
|
||
flash(f'Usunięto wersje starsze niż {days} dni.', 'info')
|
||
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
|
||
|
||
@app.route('/hostfile/diff_current/<int:hostfile_id>')
|
||
def diff_current_hostfile(hostfile_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
hostfile = HostFile.query.get(hostfile_id)
|
||
if not hostfile or hostfile.user_id != session['user_id']:
|
||
flash('Hostfile not found or unauthorized.', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
# Używamy len() zamiast |length
|
||
if not hostfile.versions or len(hostfile.versions) == 0:
|
||
flash('Brak zapisanej historii wersji do porównania.', 'warning')
|
||
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
|
||
latest_version = hostfile.versions[0]
|
||
differ = difflib.HtmlDiff(wrapcolumn=80)
|
||
diff_html = differ.make_table(
|
||
latest_version.content.splitlines(),
|
||
hostfile.content.splitlines(),
|
||
fromdesc=f"Najnowsza wersja historii (ID: {latest_version.id}, {latest_version.timestamp.strftime('%Y-%m-%d %H:%M:%S')})",
|
||
todesc="Aktualna zawartość",
|
||
context=True,
|
||
numlines=3
|
||
)
|
||
return render_template('diff_versions.html', diff_html=diff_html, hostfile_id=hostfile.id)
|
||
|
||
@app.route('/hostfile/diff/<int:version1_id>/<int:version2_id>')
|
||
def diff_hostfile_versions(version1_id, version2_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
version1 = HostFileVersion.query.get(version1_id)
|
||
version2 = HostFileVersion.query.get(version2_id)
|
||
if not version1 or not version2 or version1.hostfile.user_id != session['user_id'] or version2.hostfile.user_id != session['user_id']:
|
||
flash('Wersje nie znalezione lub brak uprawnień.', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
|
||
differ = difflib.HtmlDiff(wrapcolumn=80)
|
||
diff_html = differ.make_table(
|
||
version1.content.splitlines(),
|
||
version2.content.splitlines(),
|
||
fromdesc=f"Wersja {version1.id} - {version1.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
|
||
todesc=f"Wersja {version2.id} - {version2.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
|
||
context=True,
|
||
numlines=3
|
||
)
|
||
hostfile_id = version1.hostfile_id
|
||
return render_template('diff_versions.html', diff_html=diff_html, hostfile_id=hostfile_id)
|
||
|
||
@app.route('/hostfile/version/<int:version_id>')
|
||
def view_hostfile_version(version_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
version = HostFileVersion.query.get(version_id)
|
||
if not version or version.hostfile.user_id != session['user_id']:
|
||
flash('Version not found or unauthorized', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
return render_template('view_hostfile_version.html', version=version)
|
||
|
||
@app.route('/hostfile/version/<int:version_id>/restore')
|
||
def restore_hostfile_version(version_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
version = HostFileVersion.query.get(version_id)
|
||
if not version or version.hostfile.user_id != session['user_id']:
|
||
flash('Version not found or unauthorized', 'danger')
|
||
return redirect(url_for('dashboard'))
|
||
hostfile = version.hostfile
|
||
hostfile.content = version.content
|
||
db.session.commit()
|
||
flash('Version restored successfully.', 'success')
|
||
return redirect(url_for('edit_local_hosts'))
|
||
|
||
@app.route('/hostfile/versions')
|
||
def default_hostfile_versions():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
# Zakładamy, że domyślny plik hosts ma tytuł "Default Hosts"
|
||
hostfile = HostFile.query.filter_by(user_id=session['user_id'], title="Default Hosts").first()
|
||
if not hostfile:
|
||
flash("Default Hosts file not found.", "danger")
|
||
return redirect(url_for('edit_local_hosts'))
|
||
return redirect(url_for('hostfile_versions', hostfile_id=hostfile.id))
|
||
|
||
def scheduled_deployments():
|
||
with app.app_context():
|
||
now = datetime.now(timezone.utc)
|
||
settings_list = UserSettings.query.filter_by(auto_deploy_enabled=True).all()
|
||
for setting in settings_list:
|
||
if not setting.deploy_cron:
|
||
continue
|
||
if setting.last_deploy_time:
|
||
base_time = setting.last_deploy_time
|
||
if base_time.tzinfo is None:
|
||
base_time = base_time.replace(tzinfo=timezone.utc)
|
||
else:
|
||
base_time = datetime(1970,1,1, tzinfo=timezone.utc)
|
||
cron = croniter(setting.deploy_cron, base_time)
|
||
next_deploy = cron.get_next(datetime)
|
||
if now >= next_deploy:
|
||
deploy_user(setting.user_id)
|
||
setting.last_deploy_time = now
|
||
db.session.commit()
|
||
|
||
@app.route('/server-info/<int:id>', methods=['GET'])
|
||
def server_info(id):
|
||
if 'user_id' not in session:
|
||
return {"error": "Unauthorized"}, 401
|
||
host = db.session.get(Host, id)
|
||
if not host or host.user_id != session['user_id']:
|
||
return {"error": "Host not found or unauthorized"}, 404
|
||
|
||
if host.use_daemon and host.type == 'linux':
|
||
import requests
|
||
headers = {"Authorization": host.daemon_token}
|
||
sysinfo_url = host.daemon_url.rstrip('/') + '/system-info'
|
||
try:
|
||
resp = requests.get(sysinfo_url, headers=headers, verify=False, timeout=5)
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
return {
|
||
"hostname": host.resolved_daemon if host.daemon_url else host.resolved_hostname,
|
||
"ip": host.raw_ip,
|
||
"cpu": data.get('cpu_percent'),
|
||
"mem": data.get('memory_percent'),
|
||
"disk": data.get('disk_percent'),
|
||
"uptime_seconds": data.get('uptime_seconds')
|
||
}
|
||
else:
|
||
return {"error": f"Błąd demona: {resp.status_code}"}, 500
|
||
except Exception as e:
|
||
return {"error": str(e)}, 500
|
||
else:
|
||
return {"error": "This server do not use daemon."}, 400
|
||
|
||
@app.errorhandler(404)
|
||
def page_not_found(error):
|
||
return render_template("404.html", error=error), 404
|
||
|
||
@app.errorhandler(500)
|
||
def internal_server_error(error):
|
||
if app.debug:
|
||
return render_template("500.html", error=error), 500
|
||
|
||
@app.route('/local-defaults', methods=['GET', 'POST'])
|
||
def local_defaults():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
user_id = session['user_id']
|
||
|
||
if request.method == 'POST':
|
||
hostname = request.form.get('hostname', '').strip()
|
||
ip_address = request.form.get('ip_address', '').strip()
|
||
dynamic_variable = request.form.get('dynamic_variable', '').strip()
|
||
|
||
if hostname and ip_address:
|
||
entry_content = f"{ip_address} {hostname} {dynamic_variable}".strip()
|
||
new_entry = LocalDefaultEntry(user_id=user_id, hostname=hostname, ip_address=ip_address, dynamic_variable=dynamic_variable or None, entry=entry_content)
|
||
db.session.add(new_entry)
|
||
db.session.commit()
|
||
flash('Dodano nowy wpis.', 'success')
|
||
else:
|
||
flash('Hostname i adres IP są wymagane.', 'danger')
|
||
|
||
return redirect(url_for('local_defaults'))
|
||
|
||
entries = LocalDefaultEntry.query.filter_by(user_id=user_id).all()
|
||
return render_template('local_defaults.html', entries=entries)
|
||
|
||
@app.route('/local-defaults/delete/<int:entry_id>', methods=['POST'])
|
||
def delete_local_default(entry_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
entry = LocalDefaultEntry.query.get(entry_id)
|
||
if not entry or entry.user_id != session['user_id']:
|
||
flash('Wpis nie istnieje lub brak uprawnień.', 'danger')
|
||
return redirect(url_for('local_defaults'))
|
||
|
||
db.session.delete(entry)
|
||
db.session.commit()
|
||
flash('Wpis został usunięty.', 'info')
|
||
return redirect(url_for('local_defaults'))
|
||
|
||
@app.route('/dynamic-variables', methods=['GET', 'POST'])
|
||
def dynamic_variables():
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
user_id = session['user_id']
|
||
|
||
if request.method == 'POST':
|
||
variable_name = request.form.get('variable_name', '').strip()
|
||
variable_value = request.form.get('variable_value', '').strip()
|
||
if variable_name and variable_value:
|
||
new_variable = UserDynamicVariables(user_id=user_id, variable_name=variable_name, variable_value=variable_value)
|
||
db.session.add(new_variable)
|
||
db.session.commit()
|
||
flash('Dodano nową zmienną dynamiczną.', 'success')
|
||
else:
|
||
flash('Nazwa i wartość zmiennej są wymagane.', 'danger')
|
||
return redirect(url_for('dynamic_variables'))
|
||
|
||
variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
|
||
return render_template('dynamic_variables.html', variables=variables)
|
||
|
||
@app.route('/dynamic-variables/delete/<int:variable_id>', methods=['POST'])
|
||
def delete_dynamic_variable(variable_id):
|
||
if 'user_id' not in session:
|
||
return redirect(url_for('login'))
|
||
|
||
variable = UserDynamicVariables.query.get(variable_id)
|
||
if not variable or variable.user_id != session['user_id']:
|
||
flash('Nie znaleziono zmiennej lub brak uprawnień.', 'danger')
|
||
return redirect(url_for('dynamic_variables'))
|
||
|
||
db.session.delete(variable)
|
||
db.session.commit()
|
||
flash('Zmienna została usunięta.', 'info')
|
||
return redirect(url_for('dynamic_variables'))
|
||
|
||
scheduler = BackgroundScheduler(timezone=get_localzone())
|
||
scheduler.add_job(func=scheduled_deployments, trigger="interval", minutes=1, next_run_time=datetime.now())
|
||
scheduler.add_job(func=automated_backups, trigger="interval", minutes=1, next_run_time=datetime.now())
|
||
scheduler.add_job(func=cleanup_old_backups, trigger="interval", hours=24, next_run_time=datetime.now())
|
||
|
||
if __name__ == '__main__':
|
||
with app.app_context():
|
||
db.create_all()
|
||
app.run(
|
||
host='0.0.0.0',
|
||
port=5580,
|
||
use_reloader=False,
|
||
)
|