hosts_app/app.py

1617 lines
67 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for, flash, session, Response
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
import os, paramiko, threading, time, io, tempfile, csv
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timezone, timedelta
from io import StringIO
import socket
import ipaddress
import difflib
from croniter import croniter
from tzlocal import get_localzone
from werkzeug.serving import WSGIRequestHandler
WSGIRequestHandler.server_version = ""
WSGIRequestHandler.sys_version = ""
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SECRET_KEY'] = 'supersecretkey'
db = SQLAlchemy(app)
# MODELE
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(200), nullable=False)
hosts = db.relationship('Host', backref='user', lazy=True)
hostfiles = db.relationship('HostFile', backref='user', lazy=True)
settings = db.relationship('UserSettings', backref='user', uselist=False)
class Host(db.Model):
id = db.Column(db.Integer, primary_key=True)
hostname = db.Column(db.String(255), nullable=False)
username = db.Column(db.String(100), nullable=False)
password = db.Column(db.String(200), nullable=False)
type = db.Column(db.String(50), default='linux') # 'linux' albo 'mikrotik'
auth_method = db.Column(db.String(20), default='password')
private_key = db.Column(db.Text, nullable=True)
key_passphrase = db.Column(db.String(200), nullable=True)
port = db.Column(db.Integer, default=22)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
auto_deploy_enabled = db.Column(db.Boolean, default=True)
auto_backup_enabled = db.Column(db.Boolean, default=True)
preferred_hostfile_id = db.Column(db.Integer, db.ForeignKey('host_file.id'), nullable=True)
preferred_hostfile = db.relationship('HostFile', foreign_keys=[preferred_hostfile_id])
# Funkcja wspolpracy z hosts_daemon
use_daemon = db.Column(db.Boolean, default=False) # <--- NOWE
daemon_url = db.Column(db.String(255), nullable=True) # <--- NOWE
daemon_token = db.Column(db.String(255), nullable=True) # <--- NOWE
@property
def resolved_hostname(self):
try:
return socket.gethostbyaddr(self.hostname)[0]
except Exception:
return self.hostname
class DeployLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, default=db.func.current_timestamp())
details = db.Column(db.Text, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.relationship('User', backref='deploy_logs', lazy=True)
class HostFile(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
title = db.Column(db.String(100), nullable=False, default='Default Hosts')
content = db.Column(db.Text, nullable=False)
class UserSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
auto_deploy_enabled = db.Column(db.Boolean, default=False)
#deploy_interval = db.Column(db.Integer, default=60) # interwał wdrożeń (minuty)
#backup_interval = db.Column(db.Integer, default=60) # interwał backupów (minuty)
deploy_cron = db.Column(db.String(100), default="12 12 * * *")
backup_cron = db.Column(db.String(100), default="12 12 * * *")
auto_backup_enabled = db.Column(db.Boolean, default=False)
last_deploy_time = db.Column(db.DateTime, nullable=True)
regex_deploy_enabled = db.Column(db.Boolean, default=True)
backup_retention_days = db.Column(db.Integer, default=0)
class Backup(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
host_id = db.Column(db.Integer, db.ForeignKey('host.id'), nullable=True)
created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
content = db.Column(db.Text, nullable=False)
description = db.Column(db.String(255), nullable=True)
host = db.relationship('Host', backref='backups', lazy=True)
class RegexHostEntry(db.Model):
__tablename__ = 'regex_host_entry'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
cidr_range = db.Column(db.String(50), nullable=False) # np. '10.87.200.0/27'
gateway_ip = db.Column(db.String(50), nullable=True) # np. '10.87.200.30'
gateway_hostname = db.Column(db.String(255), nullable=True) # np. 'gw'
domain_suffix = db.Column(db.String(255), nullable=False, default="guest.domain.com")
host_prefix = db.Column(db.String(255), nullable=False, default="user")
use_gateway_ip = db.Column(db.Boolean, default=False)
comment = db.Column(db.String(255), nullable=True)
user = db.relationship('User', backref='regex_entries')
class HostFileVersion(db.Model):
id = db.Column(db.Integer, primary_key=True)
hostfile_id = db.Column(db.Integer, db.ForeignKey('host_file.id'), nullable=False)
content = db.Column(db.Text, nullable=False)
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
hostfile = db.relationship('HostFile', backref=db.backref('versions', lazy=True))
# Funkcje pomocnicze
def ensure_local_defaults(content):
required_lines = [
"127.0.0.1 localhost",
"::1 localhost ip6-localhost ip6-loopback",
"255.255.255.255 broadcasthost"
]
lines = [l.rstrip() for l in content.splitlines()]
lines = [line for line in lines if line not in required_lines]
lines = required_lines + lines
return "\n".join(lines) + "\n"
def wrap_content_with_comments(content):
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
header_comment = f"# Auto-hosts upload: {now_str}\n"
footer_comment = f"\n# End of auto-hosts upload: {now_str}\n"
return header_comment + content + footer_comment
def open_ssh_connection(host_obj):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if host_obj.auth_method == 'ssh_key' and host_obj.private_key:
key_file_obj = io.StringIO(host_obj.private_key)
passphrase = host_obj.key_passphrase if host_obj.key_passphrase else None
try:
pkey = paramiko.RSAKey.from_private_key(key_file_obj, password=passphrase)
except paramiko.SSHException as e:
raise Exception(f"Error reading private key: {str(e)}")
ssh.connect(
hostname=host_obj.hostname,
port=host_obj.port,
username=host_obj.username,
pkey=pkey,
timeout=10, # TCP connection timeout
banner_timeout=30 # Wait longer for SSH banner
)
else:
ssh.connect(
hostname=host_obj.hostname,
port=host_obj.port,
username=host_obj.username,
password=host_obj.password,
timeout=10,
banner_timeout=30
)
return ssh
def get_statistics(user_id):
user = db.session.get(User, user_id)
host_count = Host.query.filter_by(user_id=user_id).count()
logs = DeployLog.query.all()
filtered_logs = [log for log in logs if f"for user {user_id}" in log.details or f"for {user.username}" in log.details]
total_deployments = len(filtered_logs)
successful_deployments = len([log for log in filtered_logs if "Updated" in log.details or "Deployed" in log.details])
failed_deployments = len([log for log in filtered_logs if "Failed" in log.details])
return {
"host_count": host_count,
"total_deployments": total_deployments,
"successful_deployments": successful_deployments,
"failed_deployments": failed_deployments
}
def automated_backup_for_host(host):
try:
if host.use_daemon and host.type == 'linux':
import requests
# pobieramy /etc/hosts z demona:
url = host.daemon_url.rstrip('/') + '/hosts'
# Zmiana: jeśli demon wymaga nagłówka Bearer:
headers = {"Authorization": host.daemon_token}
resp = requests.get(url, headers=headers, timeout=10, verify=False)
if resp.status_code != 200:
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
data = resp.json()
content = data.get("hosts", "")
else:
# standard:
if host.type == 'mikrotik':
ssh = open_ssh_connection(host)
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
content = stdout.read().decode('utf-8')
ssh.close()
else:
ssh = open_ssh_connection(host)
sftp = ssh.open_sftp()
with sftp.open('/etc/hosts', 'r') as remote_file:
content = remote_file.read().decode('utf-8')
sftp.close()
ssh.close()
backup = Backup(
user_id=host.user_id,
host_id=host.id,
content=content,
description=f'Automated backup from {host.hostname} at {datetime.now(timezone.utc).isoformat()}'
)
db.session.add(backup)
db.session.commit()
log_entry = DeployLog(details=f'[BACKUP] Automatic backup created for host {host.hostname}',
user_id=host.user_id)
db.session.add(log_entry)
db.session.commit()
print(f'Automated backup for host {host.hostname} created successfully.')
except Exception as e:
print(f'Error creating automated backup for host {host.hostname}: {str(e)}')
def automated_backups():
with app.app_context():
now = datetime.now(timezone.utc)
hosts = Host.query.all()
for host in hosts:
# Dodaj warunek: backup dla danego hosta ma być wykonywany tylko, jeśli jest włączony
if not host.auto_backup_enabled:
continue
settings = UserSettings.query.filter_by(user_id=host.user_id).first()
if not settings or not settings.auto_backup_enabled or not settings.backup_cron:
continue
# Pobieramy ostatni backup dla hosta
last_backup = Backup.query.filter_by(user_id=host.user_id, host_id=host.id)\
.order_by(Backup.created_at.desc()).first()
if last_backup:
base_time = last_backup.created_at
if base_time.tzinfo is None:
base_time = base_time.replace(tzinfo=timezone.utc)
else:
base_time = datetime.now(timezone.utc) - timedelta(minutes=1)
cron = croniter(settings.backup_cron, base_time)
next_backup_time = cron.get_next(datetime)
if now >= next_backup_time:
automated_backup_for_host(host)
db.session.commit()
def wrap_content_with_comments(content):
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
header_comment = f"# Auto-hosts upload: {now_str}\n"
footer_comment = f"\n# End of auto-hosts upload: {now_str}\n"
return header_comment + content + footer_comment
def wrap_mikrotik_content(content):
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H:%M:%S_UTC")
header = f"127.0.0.99 #Auto-hosts_upload:{now_str}"
footer = f"127.0.0.199 #End_of_auto-hosts_upload:{now_str}"
return header + "\n" + content + "\n" + footer
def clear_linux(host, content):
"""Zastępuje /etc/hosts na hoście Linux zawartością `content`."""
ssh = open_ssh_connection(host)
import tempfile, os
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
tmpf.write(content)
tmp_file_path = tmpf.name
sftp = ssh.open_sftp()
sftp.put(tmp_file_path, '/etc/hosts')
sftp.close()
ssh.close()
os.remove(tmp_file_path)
def clear_mikrotik(host):
ssh = open_ssh_connection(host)
ssh.exec_command("/ip dns static remove [find]")
ssh.close()
def generate_regex_hosts(user_id):
entries = RegexHostEntry.query.filter_by(user_id=user_id).all()
lines = []
for entry in entries:
try:
network = ipaddress.ip_network(entry.cidr_range, strict=False)
all_hosts = list(network.hosts()) # IP w puli
host_index = 1
for ip_addr in all_hosts:
if (entry.use_gateway_ip
and entry.gateway_ip
and str(ip_addr) == entry.gateway_ip):
if entry.gateway_hostname:
hostname = f"{entry.gateway_hostname}.{entry.domain_suffix}"
else:
hostname = f"gw.{entry.domain_suffix}"
lines.append(f"{ip_addr} {hostname}")
else:
hostname = f"{entry.host_prefix}{host_index}.{entry.domain_suffix}"
lines.append(f"{ip_addr} {hostname}")
host_index += 1
except ValueError:
pass
return "\n".join(lines) + "\n" if lines else ""
def cleanup_old_backups():
with app.app_context():
all_settings = UserSettings.query.all()
for setting in all_settings:
days = setting.backup_retention_days or 0
if days > 0:
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
old_backups = Backup.query.filter(
Backup.user_id == setting.user_id,
Backup.created_at < cutoff_date
).all()
for b in old_backups:
db.session.delete(b)
db.session.commit()
# -------------------
# LOGOWANIE, REJESTRACJA, ZMIANA HASŁA
# -------------------
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
session['user_id'] = user.id
flash('Login successful', 'success')
return redirect(url_for('dashboard'))
flash('Invalid credentials', 'danger')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password_hash = generate_password_hash(request.form['password'])
user = User(username=username, password=password_hash)
db.session.add(user)
db.session.commit()
flash('Registration successful. Please log in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.pop('user_id', None)
flash('Logged out', 'info')
return redirect(url_for('login'))
@app.route('/change-password', methods=['GET', 'POST'])
def change_password():
if 'user_id' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
user = db.session.get(User, session['user_id'])
new_password = generate_password_hash(request.form['password'])
user.password = new_password
db.session.commit()
flash('Password changed successfully', 'success')
return redirect(url_for('change_password'))
return render_template('change_password.html')
# -------------------
# ZARZĄDZANIE HOSTAMI
# -------------------
@app.route('/add_server', methods=['GET', 'POST'])
def add_server():
if 'user_id' not in session:
return redirect(url_for('login'))
# Pobieramy wszystkie HostFile tego użytkownika, np. do wyświetlenia w <select>
user_hostfiles = HostFile.query.filter_by(user_id=session['user_id']).all()
if request.method == 'POST':
hostname = request.form['hostname']
username = request.form['username']
password_val = request.form['password']
host_type = request.form.get('host_type', 'linux')
auth_method = request.form.get('auth_method', 'password')
private_key = request.form.get('private_key', '').strip()
key_passphrase = request.form.get('key_passphrase', '').strip()
port_str = request.form.get('port', '22')
try:
port = int(port_str)
except ValueError:
port = 22
# Czy ma używać demona?
use_daemon = bool(request.form.get('use_daemon'))
daemon_url = request.form.get('daemon_url', '').strip()
daemon_token = request.form.get('daemon_token', '').strip()
# Jeśli auth_method == 'ssh_key'
stored_key = private_key if (auth_method == 'ssh_key' and private_key) else None
stored_passphrase = key_passphrase if (auth_method == 'ssh_key' and key_passphrase) else None
# Obsługa preferowanego pliku hosts
preferred_file_id_str = request.form.get('preferred_hostfile_id', '').strip()
if preferred_file_id_str == '':
chosen_file_id = None
else:
try:
chosen_file_id = int(preferred_file_id_str)
except ValueError:
chosen_file_id = None
# Tworzymy nowy obiekt Host
host = Host(
hostname=hostname,
username=username,
password=password_val,
type=host_type,
auth_method=auth_method,
private_key=stored_key,
key_passphrase=stored_passphrase,
port=port,
user_id=session['user_id'],
# Obsługa demona tylko jeśli host_type=='linux' i checkbox zaznaczony
use_daemon=use_daemon if host_type == 'linux' else False,
daemon_url=daemon_url if (use_daemon and host_type == 'linux') else None,
daemon_token=daemon_token if (use_daemon and host_type == 'linux') else None,
# Nowe pole preferowanego pliku
preferred_hostfile_id=chosen_file_id
)
db.session.add(host)
db.session.commit()
flash('Host added successfully', 'success')
return redirect(url_for('server_list'))
# GET -> wyświetlamy formularz add_server, przekazując listę user_hostfiles
return render_template('add_server.html', user_hostfiles=user_hostfiles)
@app.route('/delete-server/<int:id>')
def delete_server(id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, id)
if host and host.user_id == session['user_id']:
db.session.delete(host)
db.session.commit()
flash('Host deleted', 'info')
else:
flash('Host not found or unauthorized', 'danger')
return redirect(url_for('server_list'))
@app.route('/server-list', methods=['GET'])
def server_list():
if 'user_id' not in session:
return redirect(url_for('login'))
hosts = Host.query.filter_by(user_id=session['user_id']).all()
return render_template('server_list.html', hosts=hosts)
@app.route('/edit-server/<int:id>', methods=['GET', 'POST'])
def edit_server(id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, id)
if not host or host.user_id != session['user_id']:
flash('Server not found or unauthorized', 'danger')
return redirect(url_for('server_list'))
# Lista plików usera do <select>:
user_hostfiles = HostFile.query.filter_by(user_id=session['user_id']).all()
if request.method == 'POST':
host.hostname = request.form['hostname']
host.username = request.form['username']
new_password = request.form['password'] or ''
if new_password:
host.password = new_password
port_str = request.form.get('port', '22')
try:
host.port = int(port_str)
except ValueError:
host.port = 22
host.type = request.form.get('host_type', 'linux')
host.auth_method = request.form.get('auth_method', 'password')
new_private_key = request.form.get('private_key', '').strip()
new_passphrase = request.form.get('key_passphrase', '').strip()
if host.auth_method == 'ssh_key' and new_private_key:
host.private_key = new_private_key
if host.auth_method == 'ssh_key' and new_passphrase:
host.key_passphrase = new_passphrase
# Demon:
use_daemon = bool(request.form.get('use_daemon'))
daemon_url = request.form.get('daemon_url', '').strip()
daemon_token = request.form.get('daemon_token', '').strip()
if host.type == 'linux' and use_daemon:
host.use_daemon = True
host.daemon_url = daemon_url
host.daemon_token = daemon_token
else:
host.use_daemon = False
host.daemon_url = None
host.daemon_token = None
# Nowe pole: preferred_hostfile_id
preferred_file_id_str = request.form.get('preferred_hostfile_id', '').strip()
if preferred_file_id_str == '':
host.preferred_hostfile_id = None
else:
try:
host.preferred_hostfile_id = int(preferred_file_id_str)
except ValueError:
host.preferred_hostfile_id = None
db.session.commit()
flash('Server updated successfully', 'success')
return redirect(url_for('server_list'))
# GET -> renderuj z user_hostfiles
return render_template('edit_server.html', host=host, user_hostfiles=user_hostfiles)
# -------------------
# TESTOWANIE POŁĄCZENIA SSH DLA HOSTA
# -------------------
@app.route('/test-server-connection/<int:id>', methods=['GET'])
def test_server_connection(id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, id)
if not host or host.user_id != session['user_id']:
flash('Host not found or unauthorized', 'danger')
return redirect(url_for('server_list'))
try:
if host.use_daemon and host.type == 'linux':
# Połączenie przez demon (self-signed certy, verify=False)
import requests
headers = {"Authorization": host.daemon_token}
# Najpierw sprawdzenie /health
health_url = host.daemon_url.rstrip('/') + '/health'
resp = requests.get(health_url, headers=headers, verify=False, timeout=5)
if resp.status_code == 200:
flash(f'Demon connection successful (health OK) for {host.hostname}', 'success')
else:
raise Exception(f"Demon health check returned {resp.status_code}")
# Dodatkowe pobranie /system-info
sysinfo_url = host.daemon_url.rstrip('/') + '/system-info'
sysinfo_resp = requests.get(sysinfo_url, headers=headers, verify=False, timeout=5)
if sysinfo_resp.status_code == 200:
info = sysinfo_resp.json()
# Wyświetlamy kilka przykładowych danych w flash:
msg = (f"System-info for {host.hostname}: "
f"CPU={info.get('cpu_percent')}%, "
f"MEM={info.get('memory_percent')}%, "
f"DISK={info.get('disk_percent')}%, "
f"UPTIME={info.get('uptime_seconds')}s")
flash(msg, 'info')
else:
raise Exception(f"Demon system-info returned {sysinfo_resp.status_code}")
else:
# Standardowe sprawdzenie przez SSH
ssh = open_ssh_connection(host)
ssh.close()
flash(f'SSH connection to {host.hostname} successful.', 'success')
except Exception as e:
flash(f'Connection failed for {host.hostname}: {str(e)}', 'danger')
return redirect(url_for('server_list'))
# -------------------
# ROUTE: CZYSZCZENIE HOSTS - CAŁA GRUPA
# -------------------
@app.route('/clear-server', methods=['GET', 'POST'])
def clear_server():
if 'user_id' not in session:
return redirect(url_for('login'))
hosts = Host.query.filter_by(user_id=session['user_id']).all()
return render_template('clear_servers.html', hosts=hosts)
@app.route('/clear-single-server/<int:host_id>', methods=['POST'])
def clear_single_server(host_id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, host_id)
if not host or host.user_id != session['user_id']:
flash('Host not found or unauthorized', 'danger')
return redirect(url_for('clear_servers'))
default_content = ensure_local_defaults("")
try:
if host.use_daemon and host.type == 'linux':
import requests
url = host.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": host.daemon_token}
# Zakładamy, że demon potrafi zastąpić /etc/hosts treścią "default_content"
resp = requests.post(url, json={"hosts": default_content},
headers=headers, verify=False, timeout=10)
if resp.status_code != 200:
raise Exception(f"Daemon update error: {resp.status_code} - {resp.text}")
elif host.type == 'mikrotik':
clear_mikrotik(host)
else:
# standard linux
clear_linux(host, default_content)
flash(f'Cleared host: {host.hostname}', 'success')
except Exception as e:
flash(f'Error clearing host {host.hostname}: {str(e)}', 'danger')
return redirect(url_for('clear_all_server'))
@app.route('/clear-all-server', methods=['GET', 'POST'])
def clear_all_server():
if 'user_id' not in session:
return redirect(url_for('login'))
hosts = Host.query.filter_by(user_id=session['user_id']).all()
if request.method == 'POST':
linux_clear = request.form.get('linux')
mikrotik_clear = request.form.get('mikrotik')
default_content = ensure_local_defaults("")
for h in hosts:
try:
if h.type == 'linux' and linux_clear:
if h.use_daemon:
import requests
url = h.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": h.daemon_token}
resp = requests.post(url, json={"hosts": default_content},
headers=headers, verify=False, timeout=10)
if resp.status_code != 200:
raise Exception(f"Daemon update error: {resp.status_code} - {resp.text}")
else:
clear_linux(h, default_content)
flash(f'Cleared Linux host: {h.hostname}', 'success')
elif h.type == 'mikrotik' and mikrotik_clear:
clear_mikrotik(h)
flash(f'Cleared Mikrotik host: {h.hostname}', 'success')
except Exception as e:
flash(f'Error clearing host {h.hostname}: {str(e)}', 'danger')
return redirect(url_for('clear_all_server'))
return render_template('clear_servers.html', hosts=hosts)
# -------------------
# ZARZĄDZANIE PLIKAMI HOSTS (WIELOKROTNE PLIKI)
# -------------------
@app.route('/hosts-files')
def list_hosts_files():
if 'user_id' not in session:
return redirect(url_for('login'))
files = HostFile.query.filter_by(user_id=session['user_id']).all()
return render_template('hosts_files.html', files=files)
@app.route('/hosts-files/new', methods=['GET', 'POST'])
def new_hosts_file():
if 'user_id' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
title = request.form['title']
content = request.form['content']
new_file = HostFile(user_id=session['user_id'], title=title, content=content)
db.session.add(new_file)
db.session.commit()
flash('Hosts file created', 'success')
return redirect(url_for('list_hosts_files'))
return render_template('new_edit_hosts_file.html', file=None)
@app.route('/hosts-files/<int:file_id>/edit', methods=['GET', 'POST'])
def edit_hosts_file(file_id):
if 'user_id' not in session:
return redirect(url_for('login'))
file = db.session.get(HostFile, file_id)
if not file or file.user_id != session['user_id']:
flash('File not found or unauthorized', 'danger')
return redirect(url_for('list_hosts_files'))
if request.method == 'POST':
new_title = request.form['title']
new_content = request.form['content']
if file.content != new_content:
version = HostFileVersion(hostfile_id=file.id, content=file.content)
db.session.add(version)
file.title = new_title
file.content = new_content
db.session.commit()
flash('Hosts file updated and previous version saved.', 'success')
return redirect(url_for('list_hosts_files'))
return render_template('new_edit_hosts_file.html', file=file)
@app.route('/hosts-files/<int:file_id>/delete', methods=['GET'])
def delete_hosts_file(file_id):
if 'user_id' not in session:
return redirect(url_for('login'))
file = db.session.get(HostFile, file_id)
if not file or file.user_id != session['user_id']:
flash('File not found or unauthorized', 'danger')
else:
db.session.delete(file)
db.session.commit()
flash('Hosts file deleted', 'info')
return redirect(url_for('list_hosts_files'))
# -------------------
# WDROŻENIE WYBRANEGO PLIKU HOSTS NA WYBRANE SERWERY
# -------------------
@app.route('/deploy-hosts-file/<int:file_id>', methods=['GET', 'POST'])
def deploy_hosts_file(file_id):
if 'user_id' not in session:
return redirect(url_for('login'))
file = db.session.get(HostFile, file_id)
if not file or file.user_id != session['user_id']:
flash('File not found or unauthorized', 'danger')
return redirect(url_for('list_hosts_files'))
hosts = Host.query.filter_by(user_id=session['user_id']).all()
if request.method == 'POST':
selected_host_ids = request.form.getlist('hosts')
for host in hosts:
if str(host.id) in selected_host_ids:
try:
# Przygotuj zawartość do wgrania
adjusted_content = ensure_local_defaults(file.content)
wrapped_content = wrap_content_with_comments(adjusted_content)
if host.use_daemon and host.type == 'linux':
import requests
url = host.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": host.daemon_token}
resp = requests.post(url, json={"hosts": wrapped_content},
headers=headers, timeout=10, verify=False)
if resp.status_code != 200:
raise Exception(f"Daemon POST error: {resp.status_code} - {resp.text}")
db.session.add(DeployLog(
details=f'[LINUX/DAEMON] Deployed file "{file.title}" to {host.hostname} for user {session["user_id"]}',
user_id=session['user_id']
))
elif host.type == 'mikrotik':
# Mikrotik
wrapped_mikro = wrap_mikrotik_content(file.content)
deploy_mikrotik(host, wrapped_mikro)
db.session.add(DeployLog(
details=f'[MIKROTIK] Deployed file "{file.title}" to {host.hostname} for user {session["user_id"]}',
user_id=session['user_id']
))
else:
# Standard Linux (SSH)
ssh = open_ssh_connection(host)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
tmpf.write(wrapped_content)
tmp_file_path = tmpf.name
sftp = ssh.open_sftp()
sftp.put(tmp_file_path, '/etc/hosts')
sftp.close()
ssh.close()
os.remove(tmp_file_path)
db.session.add(DeployLog(
details=f'[LINUX] Deployed file "{file.title}" to {host.hostname} for user {session["user_id"]}',
user_id=session['user_id']
))
db.session.commit()
flash(f'Deployed file "{file.title}" to {host.hostname}', 'success')
except Exception as e:
flash(f'Error deploying file "{file.title}" to {host.hostname}: {str(e)}', 'danger')
return redirect(url_for('list_hosts_files'))
return render_template('deploy_hosts_file.html', file=file, hosts=hosts)
# -------------------
# BACKUP
# -------------------
@app.route('/server-backup/<int:host_id>', methods=['GET'])
def server_backup(host_id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, host_id)
if not host or host.user_id != session['user_id']:
flash('Host not found or unauthorized', 'danger')
return redirect(url_for('server_list'))
try:
if host.use_daemon and host.type == 'linux':
import requests
url = host.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": host.daemon_token}
resp = requests.get(url, headers=headers, timeout=10, verify=False)
if resp.status_code != 200:
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
data = resp.json()
content = data.get("hosts", "")
description = f'Backup (daemon) from {host.hostname}'
elif host.type == 'mikrotik':
ssh = open_ssh_connection(host)
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
content = stdout.read().decode('utf-8')
ssh.close()
description = f'Backup (mikrotik) from {host.hostname}'
else:
# Standard Linux (SSH)
ssh = open_ssh_connection(host)
sftp = ssh.open_sftp()
with sftp.open('/etc/hosts', 'r') as remote_file:
content = remote_file.read().decode('utf-8')
sftp.close()
ssh.close()
description = f'Backup from {host.hostname}'
backup = Backup(
user_id=session['user_id'],
host_id=host.id,
content=content,
description=description
)
db.session.add(backup)
db.session.commit()
flash(f'Backup for host {host.hostname} created successfully.', 'success')
except Exception as e:
flash(f'Error creating backup for host {host.hostname}: {str(e)}', 'danger')
return redirect(url_for('server_list'))
@app.route('/backups')
def backups():
if 'user_id' not in session:
return redirect(url_for('login'))
backups = Backup.query.filter_by(user_id=session['user_id']).order_by(Backup.created_at.desc()).all()
return render_template('backups.html', backups=backups)
@app.route('/restore-backup/<int:backup_id>', methods=['GET'])
def restore_backup(backup_id):
if 'user_id' not in session:
return redirect(url_for('login'))
backup = db.session.get(Backup, backup_id)
if not backup or backup.user_id != session['user_id']:
flash('Backup not found or unauthorized', 'danger')
return redirect(url_for('backups'))
if backup.host_id:
host = db.session.get(Host, backup.host_id)
if not host or host.user_id != session['user_id']:
flash('Associated host not found or unauthorized', 'danger')
return redirect(url_for('backups'))
try:
if host.type == 'mikrotik':
ssh = open_ssh_connection(host)
# Usuń istniejące wpisy
ssh.exec_command("/ip dns static remove [find]")
import time
time.sleep(1)
# Przygotuj jedno polecenie, które dodaje wszystkie wpisy
commands = []
for line in backup.content.splitlines():
line = line.strip()
if line.startswith("add "):
commands.append("/ip dns static " + line)
full_command = " ; ".join(commands)
#print("[DEBUG] Full command sent to Mikrotik:", full_command)
ssh.exec_command(full_command)
ssh.close()
flash(f'Backup restored to mikrotik host {host.hostname} successfully.', 'success')
else:
ssh = open_ssh_connection(host)
sftp = ssh.open_sftp()
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
tmpf.write(backup.content)
tmp_file_path = tmpf.name
#print(f"[DEBUG] Tymczasowy plik: {tmp_file_path} zawiera: {backup.content}")
sftp.put(tmp_file_path, '/etc/hosts')
sftp.close()
ssh.close()
os.remove(tmp_file_path)
flash(f'Backup restored to host {host.hostname} successfully.', 'success')
except Exception as e:
flash(f'Error restoring backup to host {host.hostname}: {str(e)}', 'danger')
else:
# Przywrócenie backupu jako domyślnej konfiguracji (Default Hosts)
hostfile = HostFile.query.filter_by(user_id=session['user_id'], title="Default Hosts").first()
if not hostfile:
hostfile = HostFile(user_id=session['user_id'], title="Default Hosts", content=backup.content)
db.session.add(hostfile)
else:
hostfile.content = backup.content
db.session.commit()
deploy_user(session['user_id'])
flash('Backup restored to default configuration and deployed successfully.', 'success')
return redirect(url_for('backups'))
@app.route('/view-backup/<int:backup_id>', methods=['GET'])
def view_backup(backup_id):
if 'user_id' not in session:
return redirect(url_for('login'))
backup = db.session.get(Backup, backup_id)
if not backup or backup.user_id != session['user_id']:
flash('Backup not found or unauthorized', 'danger')
return redirect(url_for('backups'))
host = None
if backup.host_id:
host = db.session.get(Host, backup.host_id)
return render_template('view_backup.html', backup=backup, host=host)
@app.route('/backup-all', methods=['GET'])
def backup_all():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
hosts = Host.query.filter_by(user_id=user_id).all()
for host in hosts:
try:
if host.use_daemon and host.type == 'linux':
import requests
url = host.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": host.daemon_token}
resp = requests.get(url, headers=headers, timeout=10, verify=False)
if resp.status_code != 200:
raise Exception(f"Daemon GET error: {resp.status_code} - {resp.text}")
data = resp.json()
content = data.get("hosts", "")
description = f'Backup (daemon) from {host.hostname}'
elif host.type == 'mikrotik':
ssh = open_ssh_connection(host)
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
content = stdout.read().decode('utf-8')
ssh.close()
description = f'Backup (mikrotik) from {host.hostname}'
else:
# Standard Linux (SSH)
ssh = open_ssh_connection(host)
sftp = ssh.open_sftp()
with sftp.open('/etc/hosts', 'r') as remote_file:
content = remote_file.read().decode('utf-8')
sftp.close()
ssh.close()
description = f'Backup from {host.hostname}'
backup = Backup(
user_id=user_id,
host_id=host.id,
content=content,
description=description
)
db.session.add(backup)
db.session.commit()
except Exception as e:
flash(f'Error creating backup for host {host.hostname}: {str(e)}', 'danger')
flash('Backup for all hosts created successfully.', 'success')
return redirect(url_for('backups'))
# -------------------
# IMPORT/EXPORT HOSTÓW
# -------------------
@app.route('/export-servers-to-csv', methods=['GET'])
def export_servers_to_csv():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
hosts = Host.query.filter_by(user_id=user_id).all()
si = StringIO()
cw = csv.writer(si)
cw.writerow(['hostname', 'username', 'password', 'port', 'type', 'auth_method', 'private_key', 'key_passphrase'])
for host in hosts:
cw.writerow([host.hostname, host.username, host.password, host.port, host.type, host.auth_method, host.private_key or '', host.key_passphrase or ''])
output = si.getvalue()
return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=servers.csv"})
@app.route('/import-servers', methods=['GET', 'POST'])
def import_servers():
if 'user_id' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
file = request.files.get('file')
if not file:
flash('No file uploaded', 'danger')
return redirect(url_for('import_servers'))
stream = StringIO(file.stream.read().decode("UTF8"), newline=None)
csv_input = csv.reader(stream)
header = next(csv_input)
for row in csv_input:
if len(row) < 8:
continue
hostname, username, password_val, port_str, host_type, auth_method, private_key, key_passphrase = row
try:
port = int(port_str)
except ValueError:
port = 22
host = Host(
hostname=hostname,
username=username,
password=password_val,
port=port,
type=host_type,
auth_method=auth_method,
private_key=private_key if private_key else None,
key_passphrase=key_passphrase if key_passphrase else None,
user_id=session['user_id']
)
db.session.add(host)
db.session.commit()
flash('Hosts imported successfully', 'success')
return redirect(url_for('server_list'))
return render_template('import_servers.html')
@app.route('/clear-host/<int:id>', methods=['GET'])
def clear_host(id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, id)
if not host or host.user_id != session['user_id']:
flash('Host not found or unauthorized', 'danger')
return redirect(url_for('server_list'))
try:
if host.type == 'linux':
default_content = ensure_local_defaults("")
clear_linux(host, default_content)
elif host.type == 'mikrotik':
clear_mikrotik(host)
flash(f'Cleared host: {host.hostname}', 'success')
except Exception as e:
flash(f'Error clearing host {host.hostname}: {str(e)}', 'danger')
return redirect(url_for('server_list'))
# -------------------
# STRONA USTAWIEŃ (SETTINGS)
# -------------------
@app.route('/settings', methods=['GET', 'POST'])
def settings():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
user_settings = UserSettings.query.filter_by(user_id=user_id).first()
if not user_settings:
user_settings = UserSettings(user_id=user_id)
db.session.add(user_settings)
db.session.commit()
if request.method == 'POST':
auto_deploy = request.form.get('auto_deploy')
deploy_cron = request.form.get('deploy_cron')
auto_backup = request.form.get('auto_backup')
backup_cron = request.form.get('backup_cron')
enable_regex_entries = request.form.get('enable_regex_entries')
retention_val = request.form.get('backup_retention_days', '0')
# Walidacja wyrażeń cron przy pomocy croniter
try:
croniter(deploy_cron)
except Exception as e:
flash("Błędne wyrażenie cron dla deploy: " + str(e), "danger")
return redirect(url_for('settings'))
try:
croniter(backup_cron)
except Exception as e:
flash("Błędne wyrażenie cron dla backup: " + str(e), "danger")
return redirect(url_for('settings'))
user_settings.auto_deploy_enabled = bool(auto_deploy)
user_settings.auto_backup_enabled = bool(auto_backup)
user_settings.deploy_cron = deploy_cron if deploy_cron else "12 12 * * *"
user_settings.backup_cron = backup_cron if backup_cron else "12 12 * * *"
user_settings.regex_deploy_enabled = bool(enable_regex_entries)
try:
user_settings.backup_retention_days = int(retention_val)
except ValueError:
user_settings.backup_retention_days = 0
db.session.commit()
flash('Settings updated', 'success')
return redirect(url_for('settings'))
return render_template('settings.html', settings=user_settings)
@app.route('/delete-backup/<int:backup_id>', methods=['POST'])
def delete_backup(backup_id):
if 'user_id' not in session:
return redirect(url_for('login'))
backup = Backup.query.get(backup_id)
if not backup or backup.user_id != session['user_id']:
flash('Backup not found or unauthorized', 'danger')
return redirect(url_for('backups'))
db.session.delete(backup)
db.session.commit()
flash('Backup deleted successfully', 'info')
return redirect(url_for('backups'))
# -------------------
# EDYCJA LOKALNEGO PLIKU HOSTS
# -------------------
# @app.route('/edit-local-hosts', methods=['GET', 'POST'], endpoint='edit_local_hosts')
# def edit_local_hosts():
# if 'user_id' not in session:
# return redirect(url_for('login'))
# user_id = session['user_id']
# hostfile = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first()
# if not hostfile:
# default_content = "# This is a sample hosts file.\n127.0.0.1 localhost\n"
# hostfile = HostFile(user_id=user_id, title="Default Hosts", content=default_content)
# db.session.add(hostfile)
# db.session.commit()
# if request.method == 'POST':
# new_content = request.form['hosts_content']
# hostfile.content = new_content
# db.session.commit()
# flash('Local hosts content updated successfully', 'success')
# return render_template('edit_hosts.html', content=hostfile.content)
# -------------------
# DEPLOYMENT DOMYŚLNY DLA UŻYTKOWNIKA
# -------------------
def deploy_user(user_id):
user_settings = UserSettings.query.filter_by(user_id=user_id).first()
# domyślny plik "Default Hosts" (np. szukasz title=="Default Hosts")
default_file = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first()
if not default_file:
# jeśli nie ma w ogóle, nic nie robimy
return
# ewentualnie regex
regex_lines = ""
if user_settings and user_settings.regex_deploy_enabled:
regex_lines = generate_regex_hosts(user_id)
hosts = Host.query.filter_by(user_id=user_id).all()
for h in hosts:
if not h.auto_deploy_enabled:
continue
# Który plik wgrywać?
if h.preferred_hostfile_id:
chosen_file = HostFile.query.filter_by(id=h.preferred_hostfile_id, user_id=user_id).first()
if not chosen_file:
# fallback do default
chosen_file = default_file
else:
chosen_file = default_file
# final_content
final_content = regex_lines + chosen_file.content
try:
if h.type == 'mikrotik':
wrapped_content = wrap_mikrotik_content(final_content)
deploy_mikrotik(h, wrapped_content)
db.session.add(DeployLog(
details=f'[MIKROTIK] Updated {h.hostname} for user {user_id}',
user_id=user_id
))
elif h.use_daemon and h.type == 'linux':
# Demon
import requests
adjusted_content = ensure_local_defaults(final_content)
wrapped_content = wrap_content_with_comments(adjusted_content)
url = h.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": h.daemon_token}
resp = requests.post(url, json={"hosts": wrapped_content}, headers=headers, timeout=10)
if resp.status_code != 200:
raise Exception(f"Daemon POST error: {resp.status_code} - {resp.text}")
db.session.add(DeployLog(
details=f'[LINUX/DAEMON] Updated {h.hostname} for user {user_id}',
user_id=user_id
))
else:
# standard linux - SSH
ssh = open_ssh_connection(h)
adjusted_content = ensure_local_defaults(final_content)
wrapped_content = wrap_content_with_comments(adjusted_content)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
tmpf.write(wrapped_content)
tmp_file_path = tmpf.name
sftp = ssh.open_sftp()
sftp.put(tmp_file_path, '/etc/hosts')
sftp.close()
ssh.close()
os.remove(tmp_file_path)
db.session.add(DeployLog(
details=f'[LINUX] Updated {h.hostname} for user {user_id}',
user_id=user_id
))
db.session.commit()
except Exception as e:
db.session.add(DeployLog(
details=f'Failed to update {h.hostname}: {str(e)} for user {user_id}'
))
db.session.commit()
def deploy_mikrotik(host, hosts_content):
ssh = open_ssh_connection(host)
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
exported = stdout.read().decode('utf-8').splitlines()
existing_dns = {}
for line in exported:
line = line.strip()
if not line.startswith('add '):
continue
line = line[4:].strip()
parts = line.split()
address_val = None
name_val = None
for part in parts:
if part.startswith('address='):
address_val = part.replace('address=', '')
elif part.startswith('name='):
name_val = part.replace('name=', '')
if address_val and name_val:
existing_dns[name_val] = address_val
desired_dns = {}
for line in hosts_content.splitlines():
line = line.strip()
if (not line
or line.startswith('#')
or 'Auto-hosts_upload:' in line
or 'End_of_auto-hosts_upload:' in line):
continue
parts = line.split()
if len(parts) < 2:
continue
ip_address = parts[0]
hostnames = parts[1:]
for hname in hostnames:
desired_dns[hname] = ip_address
for name_val, ip_val in desired_dns.items():
if name_val not in existing_dns:
add_cmd = f"/ip dns static add address={ip_val} name={name_val}"
ssh.exec_command(add_cmd)
else:
current_ip = existing_dns[name_val]
if current_ip != ip_val:
remove_cmd = f"/ip dns static remove [find where name={name_val}]"
ssh.exec_command(remove_cmd)
add_cmd = f"/ip dns static add address={ip_val} name={name_val}"
ssh.exec_command(add_cmd)
for existing_name, existing_ip in existing_dns.items():
if existing_name not in desired_dns:
remove_cmd = f"/ip dns static remove [find where name={existing_name}]"
ssh.exec_command(remove_cmd)
ssh.close()
@app.route('/deploy-now')
def deploy_now():
if 'user_id' not in session:
return redirect(url_for('login'))
deploy_user(session['user_id'])
flash('Deployment complete', 'success')
return redirect(url_for('dashboard'))
# -------------------
# DASHBOARD ZE STATYSTYKAMI
# -------------------
@app.route('/dashboard')
def dashboard():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
user = db.session.get(User, user_id)
logs = DeployLog.query.filter_by(user_id=user_id).order_by(DeployLog.timestamp.desc()).all()
stats = get_statistics(user_id)
for log in logs:
log.details = log.details.replace(f" for user {user_id}", "")
return render_template('dashboard.html', user=user, logs=logs, stats=stats)
# -------------------
# SCHEDULER - AUTOMATYCZNE WDROŻENIA
# -------------------
@app.route('/regex-hosts')
def list_regex_hosts():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
entries = RegexHostEntry.query.filter_by(user_id=user_id).all()
return render_template('list_regex_hosts.html', entries=entries)
@app.route('/regex-hosts/new', methods=['GET', 'POST'])
def new_regex_host():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
if request.method == 'POST':
cidr_range = request.form.get('cidr_range', '').strip()
gateway_ip = request.form.get('gateway_ip', '').strip()
gateway_hostname = request.form.get('gateway_hostname', '').strip()
domain_suffix = request.form.get('domain_suffix', '').strip()
host_prefix = request.form.get('host_prefix', '').strip()
use_gateway_ip = bool(request.form.get('use_gateway_ip'))
comment = request.form.get('comment', '').strip()
if not cidr_range:
flash('Please provide a CIDR range (e.g. 10.87.200.0/27).', 'danger')
return redirect(url_for('new_regex_host'))
if not domain_suffix:
domain_suffix = "domain.com"
if not host_prefix:
host_prefix = "ip"
entry = RegexHostEntry(
user_id=user_id,
cidr_range=cidr_range,
gateway_ip=gateway_ip,
gateway_hostname=gateway_hostname,
domain_suffix=domain_suffix,
host_prefix=host_prefix,
use_gateway_ip=use_gateway_ip,
comment=comment
)
db.session.add(entry)
db.session.commit()
flash('New CIDR entry with optional gateway has been added.', 'success')
return redirect(url_for('list_regex_hosts'))
return render_template('new_edit_regex_host.html', entry=None)
@app.route('/regex-hosts/<int:entry_id>/edit', methods=['GET', 'POST'])
def edit_regex_host(entry_id):
if 'user_id' not in session:
return redirect(url_for('login'))
entry = db.session.get(RegexHostEntry, entry_id)
if not entry or entry.user_id != session['user_id']:
flash('cant find row or row is empty', 'danger')
return redirect(url_for('list_regex_hosts'))
if request.method == 'POST':
cidr_range = request.form.get('cidr_range', '').strip()
gateway_ip = request.form.get('gateway_ip', '').strip()
gateway_hostname = request.form.get('gateway_hostname', '').strip()
domain_suffix = request.form.get('domain_suffix', '').strip()
host_prefix = request.form.get('host_prefix', '').strip()
use_gateway_ip = bool(request.form.get('use_gateway_ip'))
comment = request.form.get('comment', '').strip()
if not cidr_range:
flash('CIDR is required', 'danger')
return redirect(url_for('edit_regex_host', entry_id=entry_id))
entry.cidr_range = cidr_range
entry.gateway_ip = gateway_ip
entry.gateway_hostname = gateway_hostname
entry.domain_suffix = domain_suffix or "domain.com"
entry.host_prefix = host_prefix or "ip"
entry.use_gateway_ip = use_gateway_ip
entry.comment = comment
db.session.commit()
flash('Updated', 'success')
return redirect(url_for('list_regex_hosts'))
return render_template('new_edit_regex_host.html', entry=entry)
@app.route('/regex-hosts/<int:entry_id>/delete', methods=['POST'])
def delete_regex_host(entry_id):
if 'user_id' not in session:
return redirect(url_for('login'))
entry = db.session.get(RegexHostEntry, entry_id)
if not entry or entry.user_id != session['user_id']:
flash('cant find row or row is empty', 'danger')
return redirect(url_for('list_regex_hosts'))
db.session.delete(entry)
db.session.commit()
flash('Row deleted', 'info')
return redirect(url_for('list_regex_hosts'))
@app.route('/delete-selected-backups', methods=['POST'])
def delete_selected_backups():
if 'user_id' not in session:
return redirect(url_for('login'))
selected_ids = request.form.getlist('selected_backups')
for backup_id in selected_ids:
backup = db.session.get(Backup, backup_id)
if backup and backup.user_id == session['user_id']:
db.session.delete(backup)
db.session.commit()
flash('Zaznaczone backupy zostały usunięte.', 'info')
return redirect(url_for('backups'))
@app.route('/update-host-automation/<int:id>', methods=['POST'])
def update_host_automation(id):
if 'user_id' not in session:
return redirect(url_for('login'))
host = db.session.get(Host, id)
if not host or host.user_id != session['user_id']:
flash('Serwer nie istnieje lub nie masz uprawnień', 'danger')
return redirect(url_for('server_list'))
setting = request.form.get('setting')
enabled = request.form.get('enabled') == '1'
if setting == 'auto_deploy':
host.auto_deploy_enabled = enabled
elif setting == 'auto_backup':
host.auto_backup_enabled = enabled
db.session.commit()
flash('Ustawienia automatyzacji zostały zaktualizowane.', 'success')
return redirect(url_for('server_list'))
@app.route('/edit-local-hosts', methods=['GET', 'POST'], endpoint='edit_local_hosts')
def edit_local_hosts():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
hostfile = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first()
if not hostfile:
default_content = "# This is a sample hosts file.\n127.0.0.1 localhost\n"
hostfile = HostFile(user_id=user_id, title="Default Hosts", content=default_content)
db.session.add(hostfile)
db.session.commit()
if request.method == 'POST':
new_content = request.form['hosts_content']
# Zapisz obecną wersję do historii przed zmianą
version = HostFileVersion(hostfile_id=hostfile.id, content=hostfile.content)
db.session.add(version)
# Aktualizacja treści
hostfile.content = new_content
db.session.commit()
flash('Local hosts content updated successfully and previous version saved.', 'success')
return render_template('edit_hosts.html', content=hostfile.content, hostfile=hostfile)
@app.route('/hostfile/<int:hostfile_id>/versions', methods=['GET', 'POST'])
def hostfile_versions(hostfile_id):
if 'user_id' not in session:
return redirect(url_for('login'))
hostfile = HostFile.query.get(hostfile_id)
if not hostfile or hostfile.user_id != session['user_id']:
flash('Hostfile not found or unauthorized', 'danger')
return redirect(url_for('dashboard'))
if request.method == 'POST':
# Masowe usuwanie lista zaznaczonych wersji
selected_ids = request.form.getlist('selected_versions')
for version_id in selected_ids:
version = HostFileVersion.query.get(version_id)
if version and version.hostfile.user_id == session['user_id']:
db.session.delete(version)
db.session.commit()
flash('Wybrane wersje zostały usunięte.', 'info')
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
versions = HostFileVersion.query.filter_by(hostfile_id=hostfile.id)\
.order_by(HostFileVersion.timestamp.desc()).all()
return render_template('hostfile_versions.html', hostfile=hostfile, versions=versions)
@app.route('/hostfile/<int:hostfile_id>/versions/delete_old/<int:days>')
def delete_old_versions(hostfile_id, days):
if 'user_id' not in session:
return redirect(url_for('login'))
hostfile = HostFile.query.get(hostfile_id)
if not hostfile or hostfile.user_id != session['user_id']:
flash('Hostfile not found or unauthorized', 'danger')
return redirect(url_for('dashboard'))
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
old_versions = HostFileVersion.query.filter(HostFileVersion.hostfile_id == hostfile_id,
HostFileVersion.timestamp < cutoff).all()
for version in old_versions:
db.session.delete(version)
db.session.commit()
flash(f'Usunięto wersje starsze niż {days} dni.', 'info')
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
@app.route('/hostfile/diff_current/<int:hostfile_id>')
def diff_current_hostfile(hostfile_id):
if 'user_id' not in session:
return redirect(url_for('login'))
hostfile = HostFile.query.get(hostfile_id)
if not hostfile or hostfile.user_id != session['user_id']:
flash('Hostfile not found or unauthorized.', 'danger')
return redirect(url_for('dashboard'))
# Używamy len() zamiast |length
if not hostfile.versions or len(hostfile.versions) == 0:
flash('Brak zapisanej historii wersji do porównania.', 'warning')
return redirect(url_for('hostfile_versions', hostfile_id=hostfile_id))
latest_version = hostfile.versions[0]
differ = difflib.HtmlDiff(wrapcolumn=80)
diff_html = differ.make_table(
latest_version.content.splitlines(),
hostfile.content.splitlines(),
fromdesc=f"Najnowsza wersja historii (ID: {latest_version.id}, {latest_version.timestamp.strftime('%Y-%m-%d %H:%M:%S')})",
todesc="Aktualna zawartość",
context=True,
numlines=3
)
return render_template('diff_versions.html', diff_html=diff_html, hostfile_id=hostfile.id)
@app.route('/hostfile/diff/<int:version1_id>/<int:version2_id>')
def diff_hostfile_versions(version1_id, version2_id):
if 'user_id' not in session:
return redirect(url_for('login'))
version1 = HostFileVersion.query.get(version1_id)
version2 = HostFileVersion.query.get(version2_id)
if not version1 or not version2 or version1.hostfile.user_id != session['user_id'] or version2.hostfile.user_id != session['user_id']:
flash('Wersje nie znalezione lub brak uprawnień.', 'danger')
return redirect(url_for('dashboard'))
differ = difflib.HtmlDiff(wrapcolumn=80)
diff_html = differ.make_table(
version1.content.splitlines(),
version2.content.splitlines(),
fromdesc=f"Wersja {version1.id} - {version1.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
todesc=f"Wersja {version2.id} - {version2.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
context=True,
numlines=3
)
# Przekazujemy hostfile_id, zakładając, że obie wersje należą do tego samego pliku
hostfile_id = version1.hostfile_id
return render_template('diff_versions.html', diff_html=diff_html, hostfile_id=hostfile_id)
@app.route('/hostfile/version/<int:version_id>')
def view_hostfile_version(version_id):
if 'user_id' not in session:
return redirect(url_for('login'))
version = HostFileVersion.query.get(version_id)
if not version or version.hostfile.user_id != session['user_id']:
flash('Version not found or unauthorized', 'danger')
return redirect(url_for('dashboard'))
return render_template('view_hostfile_version.html', version=version)
@app.route('/hostfile/version/<int:version_id>/restore')
def restore_hostfile_version(version_id):
if 'user_id' not in session:
return redirect(url_for('login'))
version = HostFileVersion.query.get(version_id)
if not version or version.hostfile.user_id != session['user_id']:
flash('Version not found or unauthorized', 'danger')
return redirect(url_for('dashboard'))
# Przywróć zawartość wersji do głównego hostfile
hostfile = version.hostfile
hostfile.content = version.content
db.session.commit()
flash('Version restored successfully.', 'success')
return redirect(url_for('edit_local_hosts'))
@app.route('/hostfile/versions')
def default_hostfile_versions():
if 'user_id' not in session:
return redirect(url_for('login'))
# Zakładamy, że domyślny plik hosts ma tytuł "Default Hosts"
hostfile = HostFile.query.filter_by(user_id=session['user_id'], title="Default Hosts").first()
if not hostfile:
flash("Default Hosts file not found.", "danger")
return redirect(url_for('edit_local_hosts'))
return redirect(url_for('hostfile_versions', hostfile_id=hostfile.id))
def scheduled_deployments():
with app.app_context():
now = datetime.now(timezone.utc)
settings_list = UserSettings.query.filter_by(auto_deploy_enabled=True).all()
for setting in settings_list:
if not setting.deploy_cron:
continue
if setting.last_deploy_time:
base_time = setting.last_deploy_time
if base_time.tzinfo is None:
base_time = base_time.replace(tzinfo=timezone.utc)
else:
base_time = datetime(1970,1,1, tzinfo=timezone.utc)
cron = croniter(setting.deploy_cron, base_time)
next_deploy = cron.get_next(datetime)
if now >= next_deploy:
deploy_user(setting.user_id)
setting.last_deploy_time = now
db.session.commit()
scheduler = BackgroundScheduler(timezone=get_localzone())
scheduler.add_job(func=scheduled_deployments, trigger="interval", minutes=1, next_run_time=datetime.now())
scheduler.add_job(func=automated_backups, trigger="interval", minutes=1, next_run_time=datetime.now())
scheduler.add_job(func=cleanup_old_backups, trigger="interval", hours=24, next_run_time=datetime.now())
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(
host='0.0.0.0',
port=5580,
use_reloader=False,
)