from flask import Flask, render_template, request, redirect, url_for, flash, session, Response from flask_sqlalchemy import SQLAlchemy from werkzeug.security import generate_password_hash, check_password_hash import os, paramiko, threading, time, io, tempfile, csv from datetime import datetime, timedelta, timezone from apscheduler.schedulers.background import BackgroundScheduler from io import StringIO import socket import ipaddress from werkzeug.serving import WSGIRequestHandler WSGIRequestHandler.server_version = "" WSGIRequestHandler.sys_version = "" app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db' app.config['SECRET_KEY'] = 'supersecretkey' db = SQLAlchemy(app) # MODELE class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password = db.Column(db.String(200), nullable=False) hosts = db.relationship('Host', backref='user', lazy=True) hostfiles = db.relationship('HostFile', backref='user', lazy=True) settings = db.relationship('UserSettings', backref='user', uselist=False) class Host(db.Model): id = db.Column(db.Integer, primary_key=True) hostname = db.Column(db.String(255), nullable=False) username = db.Column(db.String(100), nullable=False) password = db.Column(db.String(200), nullable=False) type = db.Column(db.String(50), default='linux') # 'linux' albo 'mikrotik' auth_method = db.Column(db.String(20), default='password') private_key = db.Column(db.Text, nullable=True) key_passphrase = db.Column(db.String(200), nullable=True) port = db.Column(db.Integer, default=22) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) @property def resolved_hostname(self): try: return socket.gethostbyaddr(self.hostname)[0] except Exception: return self.hostname class DeployLog(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, default=db.func.current_timestamp()) details = db.Column(db.Text, nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) user = db.relationship('User', backref='deploy_logs', lazy=True) class HostFile(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) title = db.Column(db.String(100), nullable=False, default='Default Hosts') content = db.Column(db.Text, nullable=False) class UserSettings(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False) auto_deploy_enabled = db.Column(db.Boolean, default=False) deploy_interval = db.Column(db.Integer, default=60) # interwał wdrożeń (minuty) backup_interval = db.Column(db.Integer, default=60) # interwał backupów (minuty) last_deploy_time = db.Column(db.DateTime, nullable=True) regex_deploy_enabled = db.Column(db.Boolean, default=True) backup_retention_days = db.Column(db.Integer, default=0) class Backup(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) host_id = db.Column(db.Integer, db.ForeignKey('host.id'), nullable=True) created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) content = db.Column(db.Text, nullable=False) description = db.Column(db.String(255), nullable=True) host = db.relationship('Host', backref='backups', lazy=True) class RegexHostEntry(db.Model): __tablename__ = 'regex_host_entry' id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) cidr_range = db.Column(db.String(50), nullable=False) # np. '10.87.200.0/27' gateway_ip = db.Column(db.String(50), nullable=True) # np. '10.87.200.30' gateway_hostname = db.Column(db.String(255), nullable=True) # np. 'gw' domain_suffix = db.Column(db.String(255), nullable=False, default="guest.domain.com") host_prefix = db.Column(db.String(255), nullable=False, default="user") use_gateway_ip = db.Column(db.Boolean, default=False) comment = db.Column(db.String(255), nullable=True) user = db.relationship('User', backref='regex_entries') # Funkcje pomocnicze def ensure_local_defaults(content): required_lines = [ "127.0.0.1 localhost", "::1 localhost ip6-localhost ip6-loopback", "255.255.255.255 broadcasthost" ] lines = [l.rstrip() for l in content.splitlines()] lines = [line for line in lines if line not in required_lines] lines = required_lines + lines return "\n".join(lines) + "\n" def wrap_content_with_comments(content): now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") header_comment = f"# Auto-hosts upload: {now_str}\n" footer_comment = f"\n# End of auto-hosts upload: {now_str}\n" return header_comment + content + footer_comment def open_ssh_connection(host_obj): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if host_obj.auth_method == 'ssh_key' and host_obj.private_key: key_file_obj = io.StringIO(host_obj.private_key) passphrase = host_obj.key_passphrase if host_obj.key_passphrase else None try: pkey = paramiko.RSAKey.from_private_key(key_file_obj, password=passphrase) except paramiko.SSHException as e: raise Exception(f"Error reading private key: {str(e)}") ssh.connect( hostname=host_obj.hostname, port=host_obj.port, username=host_obj.username, pkey=pkey, timeout=10, # TCP connection timeout banner_timeout=30 # Wait longer for SSH banner ) else: ssh.connect( hostname=host_obj.hostname, port=host_obj.port, username=host_obj.username, password=host_obj.password, timeout=10, banner_timeout=30 ) return ssh def get_statistics(user_id): user = db.session.get(User, user_id) host_count = Host.query.filter_by(user_id=user_id).count() logs = DeployLog.query.all() filtered_logs = [log for log in logs if f"for user {user_id}" in log.details or f"for {user.username}" in log.details] total_deployments = len(filtered_logs) successful_deployments = len([log for log in filtered_logs if "Updated" in log.details or "Deployed" in log.details]) failed_deployments = len([log for log in filtered_logs if "Failed" in log.details]) return { "host_count": host_count, "total_deployments": total_deployments, "successful_deployments": successful_deployments, "failed_deployments": failed_deployments } def automated_backup_for_host(host): try: ssh = open_ssh_connection(host) sftp = ssh.open_sftp() with sftp.open('/etc/hosts', 'r') as remote_file: content = remote_file.read().decode('utf-8') sftp.close() ssh.close() backup = Backup( user_id=host.user_id, host_id=host.id, content=content, description=f'Automated backup from {host.hostname} at {datetime.now(timezone.utc).isoformat()}' ) db.session.add(backup) db.session.commit() print(f'Automated backup for host {host.hostname} created successfully.') except Exception as e: print(f'Error creating automated backup for host {host.hostname}: {str(e)}') def automated_backups(): with app.app_context(): hosts = Host.query.all() for host in hosts: automated_backup_for_host(host) def wrap_content_with_comments(content): now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") header_comment = f"# Auto-hosts upload: {now_str}\n" footer_comment = f"\n# End of auto-hosts upload: {now_str}\n" return header_comment + content + footer_comment def wrap_mikrotik_content(content): now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H:%M:%S_UTC") header = f"127.0.0.99 #Auto-hosts_upload:{now_str}" footer = f"127.0.0.199 #End_of_auto-hosts_upload:{now_str}" return header + "\n" + content + "\n" + footer def clear_linux(host, content): """Zastępuje /etc/hosts na hoście Linux zawartością `content`.""" ssh = open_ssh_connection(host) import tempfile, os with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf: tmpf.write(content) tmp_file_path = tmpf.name sftp = ssh.open_sftp() sftp.put(tmp_file_path, '/etc/hosts') sftp.close() ssh.close() os.remove(tmp_file_path) def clear_mikrotik(host): ssh = open_ssh_connection(host) ssh.exec_command("/ip dns static remove [find]") ssh.close() def generate_regex_hosts(user_id): entries = RegexHostEntry.query.filter_by(user_id=user_id).all() lines = [] for entry in entries: try: network = ipaddress.ip_network(entry.cidr_range, strict=False) all_hosts = list(network.hosts()) # IP w puli host_index = 1 for ip_addr in all_hosts: if (entry.use_gateway_ip and entry.gateway_ip and str(ip_addr) == entry.gateway_ip): if entry.gateway_hostname: hostname = f"{entry.gateway_hostname}.{entry.domain_suffix}" else: hostname = f"gw.{entry.domain_suffix}" lines.append(f"{ip_addr} {hostname}") else: hostname = f"{entry.host_prefix}{host_index}.{entry.domain_suffix}" lines.append(f"{ip_addr} {hostname}") host_index += 1 except ValueError: pass return "\n".join(lines) + "\n" if lines else "" def cleanup_old_backups(): with app.app_context(): all_settings = UserSettings.query.all() for setting in all_settings: days = setting.backup_retention_days or 0 if days > 0: cutoff_date = datetime.now(timezone.utc) - timedelta(days=days) old_backups = Backup.query.filter( Backup.user_id == setting.user_id, Backup.created_at < cutoff_date ).all() for b in old_backups: db.session.delete(b) db.session.commit() # ------------------- # LOGOWANIE, REJESTRACJA, ZMIANA HASŁA # ------------------- @app.route('/') def index(): if 'user_id' in session: return redirect(url_for('dashboard')) return redirect(url_for('login')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): session['user_id'] = user.id flash('Login successful', 'success') return redirect(url_for('dashboard')) flash('Invalid credentials', 'danger') return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] password_hash = generate_password_hash(request.form['password']) user = User(username=username, password=password_hash) db.session.add(user) db.session.commit() flash('Registration successful. Please log in.', 'success') return redirect(url_for('login')) return render_template('register.html') @app.route('/logout') def logout(): session.pop('user_id', None) flash('Logged out', 'info') return redirect(url_for('login')) @app.route('/change-password', methods=['GET', 'POST']) def change_password(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'POST': user = db.session.get(User, session['user_id']) new_password = generate_password_hash(request.form['password']) user.password = new_password db.session.commit() flash('Password changed successfully', 'success') return redirect(url_for('dashboard')) return render_template('change_password.html') # ------------------- # ZARZĄDZANIE HOSTAMI # ------------------- @app.route('/hosts', methods=['GET', 'POST']) def manage_hosts(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'POST': hostname = request.form['hostname'] username = request.form['username'] password_val = request.form['password'] host_type = request.form.get('host_type', 'linux') auth_method = request.form.get('auth_method', 'password') private_key = request.form.get('private_key', '').strip() key_passphrase = request.form.get('key_passphrase', '').strip() port_str = request.form.get('port', '22') try: port = int(port_str) except ValueError: port = 22 stored_key = private_key if (auth_method == 'ssh_key' and private_key) else None stored_passphrase = key_passphrase if (auth_method == 'ssh_key' and key_passphrase) else None host = Host( hostname=hostname, username=username, password=password_val, type=host_type, auth_method=auth_method, private_key=stored_key, key_passphrase=stored_passphrase, port=port, user_id=session['user_id'] ) db.session.add(host) db.session.commit() flash('Host added successfully', 'success') hosts = Host.query.filter_by(user_id=session['user_id']).all() return render_template('hosts.html', hosts=hosts) @app.route('/delete-host/<int:id>') def delete_host(id): if 'user_id' not in session: return redirect(url_for('login')) host = db.session.get(Host, id) if host and host.user_id == session['user_id']: db.session.delete(host) db.session.commit() flash('Host deleted', 'info') else: flash('Host not found or unauthorized', 'danger') return redirect(url_for('manage_hosts')) @app.route('/edit-host/<int:id>', methods=['GET', 'POST']) def edit_host(id): if 'user_id' not in session: return redirect(url_for('login')) host = db.session.get(Host, id) if not host or host.user_id != session['user_id']: flash('Host not found or unauthorized', 'danger') return redirect(url_for('manage_hosts')) if request.method == 'POST': host.hostname = request.form['hostname'] host.username = request.form['username'] host.password = request.form['password'] or '' port_str = request.form.get('port', '22') try: host.port = int(port_str) except ValueError: host.port = 22 host.type = request.form.get('host_type', 'linux') host.auth_method = request.form.get('auth_method', 'password') new_private_key = request.form.get('private_key', '').strip() new_passphrase = request.form.get('key_passphrase', '').strip() if host.auth_method == 'ssh_key' and new_private_key: host.private_key = new_private_key if host.auth_method == 'ssh_key' and new_passphrase: host.key_passphrase = new_passphrase db.session.commit() flash('Host updated successfully', 'success') return redirect(url_for('manage_hosts')) return render_template('edit_host.html', host=host) # ------------------- # TESTOWANIE POŁĄCZENIA SSH DLA HOSTA # ------------------- @app.route('/test-host/<int:id>', methods=['GET']) def test_host(id): if 'user_id' not in session: return redirect(url_for('login')) host = db.session.get(Host, id) if not host or host.user_id != session['user_id']: flash('Host not found or unauthorized', 'danger') return redirect(url_for('manage_hosts')) try: ssh = open_ssh_connection(host) ssh.close() flash(f'SSH connection to {host.hostname} successful.', 'success') except Exception as e: flash(f'SSH connection to {host.hostname} failed: {str(e)}', 'danger') return redirect(url_for('manage_hosts')) # ------------------- # ROUTE: CZYSZCZENIE HOSTS - CAŁA GRUPA # ------------------- @app.route('/clear-hosts', methods=['GET', 'POST']) def clear_all_hosts(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'POST': user_id = session['user_id'] linux_clear = request.form.get('linux') mikrotik_clear = request.form.get('mikrotik') hosts = Host.query.filter_by(user_id=user_id).all() default_content = ensure_local_defaults("") for h in hosts: if h.type == 'linux' and linux_clear: try: clear_linux(h, default_content) flash(f'Cleared Linux host: {h.hostname}', 'success') except Exception as e: flash(f'Error clearing Linux host {h.hostname}: {str(e)}', 'danger') elif h.type == 'mikrotik' and mikrotik_clear: try: clear_mikrotik(h) flash(f'Cleared Mikrotik host: {h.hostname}', 'success') except Exception as e: flash(f'Error clearing Mikrotik host {h.hostname}: {str(e)}', 'danger') return redirect(url_for('dashboard')) return render_template('clear_hosts.html') # ------------------- # ZARZĄDZANIE PLIKAMI HOSTS (WIELOKROTNE PLIKI) # ------------------- @app.route('/hosts-files') def list_hosts_files(): if 'user_id' not in session: return redirect(url_for('login')) files = HostFile.query.filter_by(user_id=session['user_id']).all() return render_template('hosts_files.html', files=files) @app.route('/hosts-files/new', methods=['GET', 'POST']) def new_hosts_file(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'POST': title = request.form['title'] content = request.form['content'] new_file = HostFile(user_id=session['user_id'], title=title, content=content) db.session.add(new_file) db.session.commit() flash('Hosts file created', 'success') return redirect(url_for('list_hosts_files')) return render_template('new_edit_hosts_file.html', file=None) @app.route('/hosts-files/<int:file_id>/edit', methods=['GET', 'POST']) def edit_hosts_file(file_id): if 'user_id' not in session: return redirect(url_for('login')) file = db.session.get(HostFile, file_id) if not file or file.user_id != session['user_id']: flash('File not found or unauthorized', 'danger') return redirect(url_for('list_hosts_files')) if request.method == 'POST': file.title = request.form['title'] file.content = request.form['content'] db.session.commit() flash('Hosts file updated', 'success') return redirect(url_for('list_hosts_files')) return render_template('new_edit_hosts_file.html', file=file) @app.route('/hosts-files/<int:file_id>/delete', methods=['GET']) def delete_hosts_file(file_id): if 'user_id' not in session: return redirect(url_for('login')) file = db.session.get(HostFile, file_id) if not file or file.user_id != session['user_id']: flash('File not found or unauthorized', 'danger') else: db.session.delete(file) db.session.commit() flash('Hosts file deleted', 'info') return redirect(url_for('list_hosts_files')) # ------------------- # WDROŻENIE WYBRANEGO PLIKU HOSTS NA WYBRANE SERWERY # ------------------- @app.route('/deploy-hosts-file/<int:file_id>', methods=['GET', 'POST']) def deploy_hosts_file(file_id): if 'user_id' not in session: return redirect(url_for('login')) file = db.session.get(HostFile, file_id) if not file or file.user_id != session['user_id']: flash('File not found or unauthorized', 'danger') return redirect(url_for('list_hosts_files')) hosts = Host.query.filter_by(user_id=session['user_id']).all() if request.method == 'POST': selected_host_ids = request.form.getlist('hosts') for host in hosts: if str(host.id) in selected_host_ids: try: if host.type == 'linux': ssh = open_ssh_connection(host) adjusted_content = ensure_local_defaults(hosts_content) wrapped_content = wrap_content_with_comments(adjusted_content) with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf: tmpf.write(wrapped_content) tmp_file_path = tmpf.name sftp = ssh.open_sftp() sftp.put(tmp_file_path, '/etc/hosts') sftp.close() ssh.close() os.remove(tmp_file_path) db.session.add(DeployLog(details=f'[LINUX] Deployed file "{file.title}" to {host.hostname} for user {session["user_id"]}')) elif host.type == 'mikrotik': wrapped_content = wrap_mikrotik_content(file.content) deploy_mikrotik(host, wrapped_content) db.session.add(DeployLog(details=f'[MIKROTIK] Deployed file "{file.title}" to {host.hostname} for user {session["user_id"]}')) db.session.commit() flash(f'Deployed file "{file.title}" to {host.hostname}', 'success') except Exception as e: flash(f'Error deploying file "{file.title}" to {host.hostname}: {str(e)}', 'danger') return redirect(url_for('list_hosts_files')) return render_template('deploy_hosts_file.html', file=file, hosts=hosts) # ------------------- # BACKUP # ------------------- @app.route('/backup-host/<int:host_id>', methods=['GET']) def backup_host(host_id): if 'user_id' not in session: return redirect(url_for('login')) host = db.session.get(Host, host_id) if not host or host.user_id != session['user_id']: flash('Host not found or unauthorized', 'danger') return redirect(url_for('manage_hosts')) try: if host.type == 'mikrotik': ssh = open_ssh_connection(host) stdin, stdout, stderr = ssh.exec_command("/ip dns static export") content = stdout.read().decode('utf-8') ssh.close() description = f'Backup (mikrotik) from {host.hostname}' else: ssh = open_ssh_connection(host) sftp = ssh.open_sftp() with sftp.open('/etc/hosts', 'r') as remote_file: content = remote_file.read().decode('utf-8') sftp.close() ssh.close() description = f'Backup from {host.hostname}' backup = Backup( user_id=session['user_id'], host_id=host.id, content=content, description=description ) db.session.add(backup) db.session.commit() flash(f'Backup for host {host.hostname} created successfully.', 'success') except Exception as e: flash(f'Error creating backup for host {host.hostname}: {str(e)}', 'danger') return redirect(url_for('manage_hosts')) @app.route('/backups') def backups(): if 'user_id' not in session: return redirect(url_for('login')) backups = Backup.query.filter_by(user_id=session['user_id']).order_by(Backup.created_at.desc()).all() return render_template('backups.html', backups=backups) @app.route('/restore-backup/<int:backup_id>', methods=['GET']) def restore_backup(backup_id): if 'user_id' not in session: return redirect(url_for('login')) backup = db.session.get(Backup, backup_id) if not backup or backup.user_id != session['user_id']: flash('Backup not found or unauthorized', 'danger') return redirect(url_for('backups')) if backup.host_id: host = db.session.get(Host, backup.host_id) if not host or host.user_id != session['user_id']: flash('Associated host not found or unauthorized', 'danger') return redirect(url_for('backups')) try: if host.type == 'mikrotik': ssh = open_ssh_connection(host) ssh.exec_command("/ip dns static remove [find]") for line in backup.content.splitlines(): line = line.strip() if line.startswith("add "): ssh.exec_command(line) ssh.close() flash(f'Backup restored to mikrotik host {host.hostname} successfully.', 'success') else: ssh = open_ssh_connection(host) sftp = ssh.open_sftp() with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf: tmpf.write(backup.content) tmp_file_path = tmpf.name sftp.put(tmp_file_path, '/etc/hosts') sftp.close() ssh.close() os.remove(tmp_file_path) flash(f'Backup restored to host {host.hostname} successfully.', 'success') except Exception as e: flash(f'Error restoring backup to host {host.hostname}: {str(e)}', 'danger') else: # Przywrócenie backupu jako domyślnej konfiguracji (Default Hosts) hostfile = HostFile.query.filter_by(user_id=session['user_id'], title="Default Hosts").first() if not hostfile: hostfile = HostFile(user_id=session['user_id'], title="Default Hosts", content=backup.content) db.session.add(hostfile) else: hostfile.content = backup.content db.session.commit() flash('Backup restored to default configuration successfully.', 'success') return redirect(url_for('backups')) @app.route('/view-backup/<int:backup_id>', methods=['GET']) def view_backup(backup_id): if 'user_id' not in session: return redirect(url_for('login')) backup = db.session.get(Backup, backup_id) if not backup or backup.user_id != session['user_id']: flash('Backup not found or unauthorized', 'danger') return redirect(url_for('backups')) host = None if backup.host_id: host = db.session.get(Host, backup.host_id) return render_template('view_backup.html', backup=backup, host=host) @app.route('/backup-all', methods=['GET']) def backup_all(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] hosts = Host.query.filter_by(user_id=user_id).all() for host in hosts: try: if host.type == 'mikrotik': ssh = open_ssh_connection(host) stdin, stdout, stderr = ssh.exec_command("/ip dns static export") content = stdout.read().decode('utf-8') ssh.close() description = f'Backup (mikrotik) from {host.hostname}' else: ssh = open_ssh_connection(host) sftp = ssh.open_sftp() with sftp.open('/etc/hosts', 'r') as remote_file: content = remote_file.read().decode('utf-8') sftp.close() ssh.close() description = f'Backup from {host.hostname}' backup = Backup( user_id=user_id, host_id=host.id, content=content, description=description ) db.session.add(backup) db.session.commit() except Exception as e: flash(f'Error creating backup for host {host.hostname}: {str(e)}', 'danger') flash('Backup for all hosts created successfully.', 'success') return redirect(url_for('backups')) # ------------------- # IMPORT/EXPORT HOSTÓW # ------------------- @app.route('/export-hosts', methods=['GET']) def export_hosts(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] hosts = Host.query.filter_by(user_id=user_id).all() si = StringIO() cw = csv.writer(si) cw.writerow(['hostname', 'username', 'password', 'port', 'type', 'auth_method', 'private_key', 'key_passphrase']) for host in hosts: cw.writerow([host.hostname, host.username, host.password, host.port, host.type, host.auth_method, host.private_key or '', host.key_passphrase or '']) output = si.getvalue() return Response(output, mimetype="text/csv", headers={"Content-Disposition": "attachment;filename=hosts.csv"}) @app.route('/import-hosts', methods=['GET', 'POST']) def import_hosts(): if 'user_id' not in session: return redirect(url_for('login')) if request.method == 'POST': file = request.files.get('file') if not file: flash('No file uploaded', 'danger') return redirect(url_for('import_hosts')) stream = StringIO(file.stream.read().decode("UTF8"), newline=None) csv_input = csv.reader(stream) header = next(csv_input) for row in csv_input: if len(row) < 8: continue hostname, username, password_val, port_str, host_type, auth_method, private_key, key_passphrase = row try: port = int(port_str) except ValueError: port = 22 host = Host( hostname=hostname, username=username, password=password_val, port=port, type=host_type, auth_method=auth_method, private_key=private_key if private_key else None, key_passphrase=key_passphrase if key_passphrase else None, user_id=session['user_id'] ) db.session.add(host) db.session.commit() flash('Hosts imported successfully', 'success') return redirect(url_for('manage_hosts')) return render_template('import_hosts.html') @app.route('/clear-host/<int:id>', methods=['GET']) def clear_host(id): if 'user_id' not in session: return redirect(url_for('login')) host = db.session.get(Host, id) if not host or host.user_id != session['user_id']: flash('Host not found or unauthorized', 'danger') return redirect(url_for('manage_hosts')) try: if host.type == 'linux': default_content = ensure_local_defaults("") clear_linux(host, default_content) elif host.type == 'mikrotik': clear_mikrotik(host) flash(f'Cleared host: {host.hostname}', 'success') except Exception as e: flash(f'Error clearing host {host.hostname}: {str(e)}', 'danger') return redirect(url_for('manage_hosts')) # ------------------- # STRONA USTAWIEŃ (SETTINGS) # ------------------- @app.route('/settings', methods=['GET', 'POST']) def settings(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] user_settings = UserSettings.query.filter_by(user_id=user_id).first() if not user_settings: user_settings = UserSettings(user_id=user_id) db.session.add(user_settings) db.session.commit() if request.method == 'POST': auto_deploy = request.form.get('auto_deploy') deploy_interval = request.form.get('deploy_interval') backup_interval = request.form.get('backup_interval') enable_regex_entries = request.form.get('enable_regex_entries') retention_val = request.form.get('backup_retention_days', '0') user_settings.auto_deploy_enabled = bool(auto_deploy) try: user_settings.deploy_interval = int(deploy_interval) except ValueError: user_settings.deploy_interval = 60 try: user_settings.backup_interval = int(backup_interval) except ValueError: user_settings.backup_interval = 60 user_settings.regex_deploy_enabled = bool(enable_regex_entries) try: user_settings.backup_retention_days = int(retention_val) except ValueError: user_settings.backup_retention_days = 0 db.session.commit() flash('Settings updated', 'success') return redirect(url_for('dashboard')) return render_template('settings.html', settings=user_settings) @app.route('/delete-backup/<int:backup_id>', methods=['POST']) def delete_backup(backup_id): if 'user_id' not in session: return redirect(url_for('login')) backup = Backup.query.get(backup_id) if not backup or backup.user_id != session['user_id']: flash('Backup not found or unauthorized', 'danger') return redirect(url_for('backups')) db.session.delete(backup) db.session.commit() flash('Backup deleted successfully', 'info') return redirect(url_for('backups')) # ------------------- # EDYCJA LOKALNEGO PLIKU HOSTS # ------------------- @app.route('/edit-local-hosts', methods=['GET', 'POST'], endpoint='edit_local_hosts') def edit_local_hosts(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] hostfile = HostFile.query.filter_by(user_id=user_id, title="Default Hosts").first() if not hostfile: default_content = "# This is a sample hosts file.\n127.0.0.1 localhost\n" hostfile = HostFile(user_id=user_id, title="Default Hosts", content=default_content) db.session.add(hostfile) db.session.commit() if request.method == 'POST': new_content = request.form['hosts_content'] hostfile.content = new_content db.session.commit() flash('Local hosts content updated successfully', 'success') return render_template('edit_hosts.html', content=hostfile.content) # ------------------- # DEPLOYMENT DOMYŚLNY DLA UŻYTKOWNIKA # ------------------- def deploy_user(user_id): user_settings = UserSettings.query.filter_by(user_id=user_id).first() hostfile = HostFile.query.filter_by(user_id=user_id).first() if not hostfile: return hosts_content = hostfile.content if user_settings and user_settings.regex_deploy_enabled: regex_lines = generate_regex_hosts(user_id) else: regex_lines = "" final_content = regex_lines + hosts_content hosts = Host.query.filter_by(user_id=user_id).all() for h in hosts: try: if h.type == 'linux': ssh = open_ssh_connection(h) adjusted_content = ensure_local_defaults(final_content) wrapped_content = wrap_content_with_comments(adjusted_content) with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf: tmpf.write(wrapped_content) tmp_file_path = tmpf.name sftp = ssh.open_sftp() sftp.put(tmp_file_path, '/etc/hosts') sftp.close() ssh.close() os.remove(tmp_file_path) db.session.add(DeployLog(details=f'[LINUX] Updated {h.hostname} for user {user_id}',user_id=user_id)) elif h.type == 'mikrotik': wrapped_content = wrap_mikrotik_content(final_content) deploy_mikrotik(h, wrapped_content) db.session.add(DeployLog(details=f'[MIKROTIK] Updated {h.hostname} for user {user_id}',user_id=user_id)) db.session.commit() except Exception as e: db.session.add(DeployLog(details=f'Failed to update {h.hostname}: {str(e)} for user {user_id}')) db.session.commit() def deploy_mikrotik(host, hosts_content): ssh = open_ssh_connection(host) stdin, stdout, stderr = ssh.exec_command("/ip dns static export") exported = stdout.read().decode('utf-8').splitlines() existing_dns = {} for line in exported: line = line.strip() if not line.startswith('add '): continue line = line[4:].strip() parts = line.split() address_val = None name_val = None for part in parts: if part.startswith('address='): address_val = part.replace('address=', '') elif part.startswith('name='): name_val = part.replace('name=', '') if address_val and name_val: existing_dns[name_val] = address_val desired_dns = {} for line in hosts_content.splitlines(): line = line.strip() if (not line or line.startswith('#') or 'Auto-hosts_upload:' in line or 'End_of_auto-hosts_upload:' in line): continue parts = line.split() if len(parts) < 2: continue ip_address = parts[0] hostnames = parts[1:] for hname in hostnames: desired_dns[hname] = ip_address for name_val, ip_val in desired_dns.items(): if name_val not in existing_dns: add_cmd = f"/ip dns static add address={ip_val} name={name_val}" ssh.exec_command(add_cmd) else: current_ip = existing_dns[name_val] if current_ip != ip_val: remove_cmd = f"/ip dns static remove [find where name={name_val}]" ssh.exec_command(remove_cmd) add_cmd = f"/ip dns static add address={ip_val} name={name_val}" ssh.exec_command(add_cmd) for existing_name, existing_ip in existing_dns.items(): if existing_name not in desired_dns: remove_cmd = f"/ip dns static remove [find where name={existing_name}]" ssh.exec_command(remove_cmd) ssh.close() @app.route('/deploy-now') def deploy_now(): if 'user_id' not in session: return redirect(url_for('login')) deploy_user(session['user_id']) flash('Deployment complete', 'success') return redirect(url_for('dashboard')) # ------------------- # DASHBOARD ZE STATYSTYKAMI # ------------------- @app.route('/dashboard') def dashboard(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] user = db.session.get(User, user_id) logs = DeployLog.query.filter_by(user_id=user_id).order_by(DeployLog.timestamp.desc()).all() stats = get_statistics(user_id) for log in logs: log.details = log.details.replace(f" for user {user_id}", "") return render_template('dashboard.html', user=user, logs=logs, stats=stats) # ------------------- # SCHEDULER - AUTOMATYCZNE WDROŻENIA # ------------------- @app.route('/regex-hosts') def list_regex_hosts(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] entries = RegexHostEntry.query.filter_by(user_id=user_id).all() return render_template('list_regex_hosts.html', entries=entries) @app.route('/regex-hosts/new', methods=['GET', 'POST']) def new_regex_host(): if 'user_id' not in session: return redirect(url_for('login')) user_id = session['user_id'] if request.method == 'POST': cidr_range = request.form.get('cidr_range', '').strip() gateway_ip = request.form.get('gateway_ip', '').strip() gateway_hostname = request.form.get('gateway_hostname', '').strip() domain_suffix = request.form.get('domain_suffix', '').strip() host_prefix = request.form.get('host_prefix', '').strip() use_gateway_ip = bool(request.form.get('use_gateway_ip')) comment = request.form.get('comment', '').strip() if not cidr_range: flash('Please provide a CIDR range (e.g. 10.87.200.0/27).', 'danger') return redirect(url_for('new_regex_host')) if not domain_suffix: domain_suffix = "domain.com" if not host_prefix: host_prefix = "ip" entry = RegexHostEntry( user_id=user_id, cidr_range=cidr_range, gateway_ip=gateway_ip, gateway_hostname=gateway_hostname, domain_suffix=domain_suffix, host_prefix=host_prefix, use_gateway_ip=use_gateway_ip, comment=comment ) db.session.add(entry) db.session.commit() flash('New CIDR entry with optional gateway has been added.', 'success') return redirect(url_for('list_regex_hosts')) return render_template('new_edit_regex_host.html', entry=None) @app.route('/regex-hosts/<int:entry_id>/edit', methods=['GET', 'POST']) def edit_regex_host(entry_id): if 'user_id' not in session: return redirect(url_for('login')) entry = db.session.get(RegexHostEntry, entry_id) if not entry or entry.user_id != session['user_id']: flash('cant find row or row is empty', 'danger') return redirect(url_for('list_regex_hosts')) if request.method == 'POST': cidr_range = request.form.get('cidr_range', '').strip() gateway_ip = request.form.get('gateway_ip', '').strip() gateway_hostname = request.form.get('gateway_hostname', '').strip() domain_suffix = request.form.get('domain_suffix', '').strip() host_prefix = request.form.get('host_prefix', '').strip() use_gateway_ip = bool(request.form.get('use_gateway_ip')) comment = request.form.get('comment', '').strip() if not cidr_range: flash('CIDR is required', 'danger') return redirect(url_for('edit_regex_host', entry_id=entry_id)) entry.cidr_range = cidr_range entry.gateway_ip = gateway_ip entry.gateway_hostname = gateway_hostname entry.domain_suffix = domain_suffix or "domain.com" entry.host_prefix = host_prefix or "ip" entry.use_gateway_ip = use_gateway_ip entry.comment = comment db.session.commit() flash('Updated', 'success') return redirect(url_for('list_regex_hosts')) return render_template('new_edit_regex_host.html', entry=entry) @app.route('/regex-hosts/<int:entry_id>/delete', methods=['POST']) def delete_regex_host(entry_id): if 'user_id' not in session: return redirect(url_for('login')) entry = db.session.get(RegexHostEntry, entry_id) if not entry or entry.user_id != session['user_id']: flash('cant find row or row is empty', 'danger') return redirect(url_for('list_regex_hosts')) db.session.delete(entry) db.session.commit() flash('Row deleted', 'info') return redirect(url_for('list_regex_hosts')) def scheduled_deployments(): with app.app_context(): now = datetime.now(timezone.utc) all_settings = UserSettings.query.filter_by(auto_deploy_enabled=True).all() for setting in all_settings: last_deploy_time = setting.last_deploy_time if last_deploy_time: last_deploy_time = last_deploy_time.replace(tzinfo=timezone.utc) if not last_deploy_time or now - last_deploy_time >= timedelta(minutes=setting.deploy_interval): deploy_user(setting.user_id) setting.last_deploy_time = now db.session.commit() scheduler = BackgroundScheduler() scheduler.add_job(func=scheduled_deployments, trigger="interval", minutes=5) scheduler.add_job(func=automated_backups, trigger="interval", minutes=60) scheduler.add_job(func=cleanup_old_backups, trigger="interval", hours=24) scheduler.start() if __name__ == '__main__': with app.app_context(): db.create_all() app.run( host='0.0.0.0', port=5580, use_reloader=False, )