nowa funkcja / domyslne wpisy

This commit is contained in:
Mateusz Gruszczyński
2025-03-10 12:04:22 +01:00
parent 7fb796357f
commit 9cffcb0ca6
7 changed files with 363 additions and 82 deletions

149
app.py
View File

@@ -48,6 +48,7 @@ class Host(db.Model):
daemon_url = db.Column(db.String(255), nullable=True)
daemon_token = db.Column(db.String(255), nullable=True)
disable_regex_deploy = db.Column(db.Boolean, default=False)
disable_local_default = db.Column(db.Boolean, default=False)
@property
def resolved_hostname(self):
@@ -81,11 +82,13 @@ class DeployLog(db.Model):
details = db.Column(db.Text, nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.relationship('User', backref='deploy_logs', lazy=True)
class HostFile(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
title = db.Column(db.String(100), nullable=False, default='Default Hosts')
content = db.Column(db.Text, nullable=False)
class UserSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), unique=True, nullable=False)
@@ -100,6 +103,7 @@ class UserSettings(db.Model):
backup_retention_days = db.Column(db.Integer, default=0)
global_ssh_key = db.Column(db.Text, nullable=True)
global_key_passphrase = db.Column(db.String(200), nullable=True)
class Backup(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
@@ -128,18 +132,60 @@ class HostFileVersion(db.Model):
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
hostfile = db.relationship('HostFile', backref=db.backref('versions', lazy=True))
class LocalDefaultEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
ip_address = db.Column(db.String(50), nullable=False)
hostname = db.Column(db.String(255), nullable=False)
dynamic_variable = db.Column(db.String(255), nullable=True)
entry = db.Column(db.Text, nullable=False) # To pole już istnieje
user = db.relationship('User', backref='local_defaults')
def formatted_entry(self, variables={}):
entry_content = f"{self.ip_address} {self.hostname}"
if self.dynamic_variable:
dynamic_content = self.dynamic_variable
for key, value in variables.items():
placeholder = f"${{{key}}}"
dynamic_content = dynamic_content.replace(placeholder, value)
entry_content += f" {dynamic_content}"
return entry_content
class UserDynamicVariables(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
variable_name = db.Column(db.String(255), nullable=False)
variable_value = db.Column(db.String(255), nullable=False)
user = db.relationship('User', backref='dynamic_variables')
# Funkcje pomocnicze
def ensure_local_defaults(content):
required_lines = [
"127.0.0.1 localhost",
"::1 localhost ip6-localhost ip6-loopback",
"255.255.255.255 broadcasthost"
]
def get_user_dynamic_variables(user_id):
user_variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
return {var.variable_name: var.variable_value for var in user_variables}
def ensure_local_defaults(content, user_id):
default_entries = LocalDefaultEntry.query.filter_by(user_id=user_id).all()
required_lines = [entry.hostname for entry in default_entries]
lines = [l.rstrip() for l in content.splitlines()]
lines = [line for line in lines if line not in required_lines]
lines = required_lines + lines
return "\n".join(lines) + "\n"
def get_user_dynamic_variables(user_id):
user_variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
variables = {var.variable_name: var.variable_value for var in user_variables}
# Automatyczne dodanie wartości systemowych
variables["hostname"] = socket.gethostname() # Nazwa hosta
variables["resolved_hostname"] = socket.getfqdn() # Pełna nazwa hosta
return variables
def wrap_content_with_comments(content):
now_str = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
header_comment = f"# Auto-hosts upload: {now_str}\n"
@@ -1193,7 +1239,11 @@ def deploy_user(user_id):
chosen_file = default_file
else:
chosen_file = default_file
final_content = ("" if h.disable_regex_deploy else regex_lines) + chosen_file.content
# Nowa logika: jeśli `disable_local_default` jest włączone, pomijamy lokalne ustawienia
final_content = ("" if h.disable_regex_deploy else regex_lines) + \
("" if h.disable_local_default else ensure_local_defaults(chosen_file.content))
try:
if h.type == 'mikrotik':
wrapped_content = wrap_mikrotik_content(final_content)
@@ -1202,7 +1252,7 @@ def deploy_user(user_id):
db.session.add(DeployLog(details=log_details, user_id=user_id))
elif h.use_daemon and h.type == 'linux':
import requests
adjusted_content = ensure_local_defaults(final_content)
adjusted_content = ensure_local_defaults(final_content) if not h.disable_local_default else final_content
wrapped_content = wrap_content_with_comments(adjusted_content)
url = h.daemon_url.rstrip('/') + '/hosts'
headers = {"Authorization": h.daemon_token}
@@ -1213,7 +1263,7 @@ def deploy_user(user_id):
db.session.add(DeployLog(details=log_details, user_id=user_id))
else:
ssh = open_ssh_connection(h)
adjusted_content = ensure_local_defaults(final_content)
adjusted_content = ensure_local_defaults(final_content) if not h.disable_local_default else final_content
wrapped_content = wrap_content_with_comments(adjusted_content)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmpf:
tmpf.write(wrapped_content)
@@ -1230,7 +1280,6 @@ def deploy_user(user_id):
db.session.add(DeployLog(details=f'Failed to update {format_host(h)}: {str(e)} for user {user_id}', user_id=user_id))
db.session.commit()
def deploy_mikrotik(host, hosts_content):
ssh = open_ssh_connection(host)
stdin, stdout, stderr = ssh.exec_command("/ip dns static export")
@@ -1447,6 +1496,8 @@ def update_host_automation(id):
host.auto_backup_enabled = enabled
elif setting == 'disable_regex':
host.disable_regex_deploy = enabled
elif setting == 'disable_local_default':
host.disable_local_default = enabled
db.session.commit()
flash('Ustawienia automatyzacji zostały zaktualizowane.', 'success')
return redirect(url_for('server_list'))
@@ -1659,6 +1710,84 @@ def internal_server_error(error):
if app.debug:
return render_template("500.html", error=error), 500
@app.route('/local-defaults', methods=['GET', 'POST'])
def local_defaults():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
if request.method == 'POST':
hostname = request.form.get('hostname', '').strip()
ip_address = request.form.get('ip_address', '').strip()
dynamic_variable = request.form.get('dynamic_variable', '').strip()
if hostname and ip_address:
entry_content = f"{ip_address} {hostname} {dynamic_variable}".strip()
new_entry = LocalDefaultEntry(user_id=user_id, hostname=hostname, ip_address=ip_address, dynamic_variable=dynamic_variable or None, entry=entry_content)
db.session.add(new_entry)
db.session.commit()
flash('Dodano nowy wpis.', 'success')
else:
flash('Hostname i adres IP są wymagane.', 'danger')
return redirect(url_for('local_defaults'))
entries = LocalDefaultEntry.query.filter_by(user_id=user_id).all()
return render_template('local_defaults.html', entries=entries)
@app.route('/local-defaults/delete/<int:entry_id>', methods=['POST'])
def delete_local_default(entry_id):
if 'user_id' not in session:
return redirect(url_for('login'))
entry = LocalDefaultEntry.query.get(entry_id)
if not entry or entry.user_id != session['user_id']:
flash('Wpis nie istnieje lub brak uprawnień.', 'danger')
return redirect(url_for('local_defaults'))
db.session.delete(entry)
db.session.commit()
flash('Wpis został usunięty.', 'info')
return redirect(url_for('local_defaults'))
@app.route('/dynamic-variables', methods=['GET', 'POST'])
def dynamic_variables():
if 'user_id' not in session:
return redirect(url_for('login'))
user_id = session['user_id']
if request.method == 'POST':
variable_name = request.form.get('variable_name', '').strip()
variable_value = request.form.get('variable_value', '').strip()
if variable_name and variable_value:
new_variable = UserDynamicVariables(user_id=user_id, variable_name=variable_name, variable_value=variable_value)
db.session.add(new_variable)
db.session.commit()
flash('Dodano nową zmienną dynamiczną.', 'success')
else:
flash('Nazwa i wartość zmiennej są wymagane.', 'danger')
return redirect(url_for('dynamic_variables'))
variables = UserDynamicVariables.query.filter_by(user_id=user_id).all()
return render_template('dynamic_variables.html', variables=variables)
@app.route('/dynamic-variables/delete/<int:variable_id>', methods=['POST'])
def delete_dynamic_variable(variable_id):
if 'user_id' not in session:
return redirect(url_for('login'))
variable = UserDynamicVariables.query.get(variable_id)
if not variable or variable.user_id != session['user_id']:
flash('Nie znaleziono zmiennej lub brak uprawnień.', 'danger')
return redirect(url_for('dynamic_variables'))
db.session.delete(variable)
db.session.commit()
flash('Zmienna została usunięta.', 'info')
return redirect(url_for('dynamic_variables'))
scheduler = BackgroundScheduler(timezone=get_localzone())
scheduler.add_job(func=scheduled_deployments, trigger="interval", minutes=1, next_run_time=datetime.now())
scheduler.add_job(func=automated_backups, trigger="interval", minutes=1, next_run_time=datetime.now())