#!/usr/bin/env python3 """ CertPusher - Automated SSL Certificate Distribution Tool Distributes SSL certificates to remote servers via SSH/SCP Supports standard Linux servers, MikroTik RouterOS, and Proxmox VE """ import configparser import logging import sys import os import ssl import socket import re from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Tuple import paramiko from scp import SCPClient import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend # Logging configuration LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig( level=logging.DEBUG, format=LOG_FORMAT, handlers=[ logging.FileHandler(f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger('CertPusher') class CertificateManager: """Manages certificate comparison and validation""" @staticmethod def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]: """Load certificate from file""" try: with open(cert_path, 'rb') as f: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data, default_backend()) logger.debug(f"Loaded certificate from {cert_path}") logger.debug(f"Certificate subject: {cert.subject}") logger.debug(f"Certificate expires: {cert.not_valid_after_utc}") return cert except Exception as e: logger.error(f"Failed to load certificate from {cert_path}: {e}") return None @staticmethod def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]: """Retrieve certificate from HTTPS URL""" try: hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0] port = 443 if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]: port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1]) logger.debug(f"Connecting to {hostname}:{port} to retrieve certificate") context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=timeout) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: der_cert = ssock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(der_cert, default_backend()) logger.debug(f"Retrieved certificate from {url}") logger.debug(f"Certificate expires: {cert.not_valid_after_utc}") return cert except Exception as e: logger.warning(f"Failed to retrieve certificate from {url}: {e}") return None @staticmethod def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool: """Compare two certificates by serial number and fingerprint""" try: same_serial = cert1.serial_number == cert2.serial_number # Compare fingerprints from cryptography.hazmat.primitives import hashes fingerprint1 = cert1.fingerprint(hashes.SHA256()) fingerprint2 = cert2.fingerprint(hashes.SHA256()) same_fingerprint = fingerprint1 == fingerprint2 logger.debug(f"Certificate comparison - Serial match: {same_serial}, Fingerprint match: {same_fingerprint}") return same_serial and same_fingerprint except Exception as e: logger.error(f"Failed to compare certificates: {e}") return False @staticmethod def get_certificate_info(cert: x509.Certificate) -> str: """Get human-readable certificate information""" try: subject = cert.subject.rfc4514_string() issuer = cert.issuer.rfc4514_string() valid_from = cert.not_valid_before_utc valid_to = cert.not_valid_after_utc # Convert to naive datetime for comparison now = datetime.now(timezone.utc) days_left = (valid_to - now).days return f""" Certificate Info: Subject: {subject} Issuer: {issuer} Valid From: {valid_from} Valid To: {valid_to} Days Until Expiry: {days_left} """ except Exception as e: return f"Unable to extract certificate info: {e}" class SSHManager: """Manages SSH connections and file transfers""" def __init__(self, hostname: str, port: int, username: str, key_path: str): self.hostname = hostname self.port = port self.username = username self.key_path = key_path self.ssh_client = None def connect(self) -> bool: """Establish SSH connection""" try: self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}") logger.debug(f"Using SSH key: {self.key_path}") # Try to load different key types (DSS removed in paramiko 3.0+) private_key = None key_types = [ ('RSA', paramiko.RSAKey), ('Ed25519', paramiko.Ed25519Key), ('ECDSA', paramiko.ECDSAKey), ] for key_name, key_class in key_types: try: private_key = key_class.from_private_key_file(self.key_path) logger.debug(f"Successfully loaded {key_name} key") break except Exception as e: logger.debug(f"Not a {key_name} key: {e}") continue if not private_key: logger.error(f"Could not load SSH key from {self.key_path}") return False self.ssh_client.connect( hostname=self.hostname, port=self.port, username=self.username, pkey=private_key, timeout=30, banner_timeout=30, auth_timeout=30 ) logger.info(f"✓ Successfully connected to {self.hostname}:{self.port}") return True except Exception as e: logger.error(f"SSH connection failed to {self.hostname}:{self.port}: {e}") return False def upload_file(self, local_path: str, remote_path: str) -> bool: """Upload file via SCP""" try: logger.debug(f"Uploading {local_path} to {self.hostname}:{remote_path}") # Ensure remote directory exists remote_dir = os.path.dirname(remote_path) if remote_dir: self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True) with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(local_path, remote_path) logger.info(f"✓ Successfully uploaded {local_path} to {self.hostname}:{remote_path}") return True except Exception as e: logger.error(f"File upload failed: {e}") return False def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]: """Execute command on remote server""" try: logger.debug(f"Executing command on {self.hostname}: {command}") stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) exit_status = stdout.channel.recv_exit_status() stdout_text = stdout.read().decode('utf-8', errors='ignore') stderr_text = stderr.read().decode('utf-8', errors='ignore') if exit_status == 0: logger.info(f"✓ Command executed successfully on {self.hostname}") if stdout_text: logger.debug(f"STDOUT: {stdout_text}") else: if not ignore_error: logger.error(f"Command failed with exit code {exit_status}") logger.error(f"STDERR: {stderr_text}") else: logger.debug(f"Command failed (ignored): {stderr_text}") return exit_status == 0, stdout_text, stderr_text except Exception as e: logger.error(f"Command execution failed: {e}") return False, "", str(e) def disconnect(self): """Close SSH connection""" if self.ssh_client: self.ssh_client.close() logger.debug(f"Disconnected from {self.hostname}") class MikroTikManager(SSHManager): """Specialized manager for MikroTik RouterOS devices""" def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) self.cert_name = "ssl-cert" self.key_name = "ssl-key" def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: """ Check if certificate on MikroTik needs update Returns True if upload needed, False if current cert is OK """ try: logger.info("Checking certificate on MikroTik") # Get certificate details from MikroTik success, stdout, stderr = self.execute_command( f'/certificate print detail where name~"{self.cert_name}"', ignore_error=True ) if not success or not stdout: logger.info("No certificate found on MikroTik or cannot read it. Upload needed.") return True logger.debug(f"MikroTik certificate info:\n{stdout}") # Parse expiry date from output # Looking for "invalid-after" field invalid_after_match = re.search(r'invalid-after:\s+([a-zA-Z]{3}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})', stdout) if not invalid_after_match: logger.warning("Could not parse certificate expiry date from MikroTik. Proceeding with upload.") return True mikrotik_expiry_str = invalid_after_match.group(1) logger.debug(f"MikroTik certificate expires: {mikrotik_expiry_str}") # Parse MikroTik date format (e.g., "jan/24/2026 08:34:12") try: mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%b/%d/%Y %H:%M:%S') mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) except Exception as e: logger.warning(f"Could not parse MikroTik date: {e}. Proceeding with upload.") return True # Compare with source certificate source_expiry = source_cert.not_valid_after_utc # Also check fingerprint/serial if available fingerprint_match = re.search(r'fingerprint:\s+([a-f0-9]+)', stdout) if fingerprint_match: mikrotik_fingerprint = fingerprint_match.group(1) logger.debug(f"MikroTik certificate fingerprint: {mikrotik_fingerprint}") # Compare expiry dates (allowing 1 day tolerance for timezone differences) time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) if time_diff < 86400: # Less than 24 hours difference logger.info("✓ Certificate on MikroTik is current. Skipping upload.") return False else: logger.info(f"Certificate on MikroTik differs. Source expires: {source_expiry}, MikroTik expires: {mikrotik_expiry}") return True except Exception as e: logger.warning(f"Error checking MikroTik certificate: {e}. Proceeding with upload.") return True def upload_certificate(self, cert_path: str, key_path: str = None, check_first: bool = True, source_cert: x509.Certificate = None) -> bool: """ Upload and import certificate to MikroTik RouterOS Args: cert_path: Path to certificate file (PEM format, can be fullchain) key_path: Optional path to private key file check_first: If True, check existing certificate before uploading source_cert: Source certificate object for comparison """ try: logger.info(f"Starting MikroTik certificate deployment to {self.hostname}") # Check if upload is needed if check_first and source_cert: if not self.check_certificate_expiry(source_cert): return True # Certificate is current, no upload needed # Step 1: Disable www-ssl service logger.debug("Disabling www-ssl service") self.execute_command('/ip service disable www-ssl', ignore_error=True) # Step 2: Remove old certificates logger.debug("Removing old certificates") cleanup_commands = [ f'/certificate remove [find name~"{self.cert_name}"]', f'/file remove "{self.cert_name}.pem"', ] if key_path: cleanup_commands.append(f'/file remove "{self.key_name}.pem"') for cmd in cleanup_commands: self.execute_command(cmd, ignore_error=True) # Step 3: Upload certificate file logger.debug(f"Uploading certificate to MikroTik") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(cert_path, f'{self.cert_name}.pem') logger.info(f"✓ Certificate file uploaded") # Step 4: Upload private key if provided if key_path: logger.debug(f"Uploading private key to MikroTik") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(key_path, f'{self.key_name}.pem') logger.info(f"✓ Private key file uploaded") # Step 5: Import certificate logger.debug("Importing certificate into RouterOS") import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' success, stdout, stderr = self.execute_command(import_cmd, timeout=30) if not success and "failure" in stderr.lower(): logger.error(f"Certificate import failed: {stderr}") return False # Give RouterOS time to process import time time.sleep(2) # Step 6: Verify certificate was imported success, stdout, stderr = self.execute_command( f'/certificate print where name~"{self.cert_name}"' ) if success and stdout: logger.debug(f"Certificates after import:\n{stdout}") # Step 7: Configure services to use new certificate logger.info("Configuring www-ssl service to use new certificate") config_commands = [ f'/ip service set www-ssl certificate={self.cert_name}_0', '/ip service enable www-ssl', ] for cmd in config_commands: success, stdout, stderr = self.execute_command(cmd, ignore_error=True) logger.info(f"✓ Successfully deployed certificate to MikroTik {self.hostname}") return True except Exception as e: logger.error(f"MikroTik certificate deployment failed: {e}", exc_info=True) return False def verify_certificate(self) -> bool: """Verify certificate is properly installed""" try: logger.debug("Verifying certificate installation") success, stdout, stderr = self.execute_command( '/certificate print detail where name~"ssl-cert"' ) if success and stdout: logger.info(f"Certificate verification:\n{stdout}") return True logger.warning("Could not verify certificate installation") return False except Exception as e: logger.error(f"Certificate verification failed: {e}") return False class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def upload_certificate(self, cert_path: str, key_path: str) -> bool: """ Upload certificate to Proxmox VE Proxmox uses two separate files: - /etc/pve/local/pveproxy-ssl.pem (certificate) - /etc/pve/local/pveproxy-ssl.key (private key) Args: cert_path: Path to certificate file (fullchain) key_path: Path to private key file """ try: logger.info(f"Starting Proxmox certificate deployment to {self.hostname}") # Step 1: Upload certificate logger.debug("Uploading certificate to Proxmox") if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'): return False # Step 2: Upload private key logger.debug("Uploading private key to Proxmox") if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'): return False # Step 3: Set correct permissions logger.debug("Setting file permissions") commands = [ 'chmod 640 /etc/pve/local/pveproxy-ssl.key', 'chown root:www-data /etc/pve/local/pveproxy-ssl.key', ] for cmd in commands: self.execute_command(cmd, ignore_error=False) # Step 4: Restart pveproxy logger.info("Restarting pveproxy service") success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30) if not success: logger.error(f"Failed to restart pveproxy: {stderr}") return False # Step 5: Verify service is running import time time.sleep(3) success, stdout, stderr = self.execute_command('systemctl is-active pveproxy') if success and 'active' in stdout: logger.info(f"✓ Successfully deployed certificate to Proxmox {self.hostname}") return True else: logger.error("pveproxy service is not active after restart") # Show journal logs for debugging self.execute_command('journalctl -u pveproxy -n 20 --no-pager') return False except Exception as e: logger.error(f"Proxmox certificate deployment failed: {e}", exc_info=True) return False class CertPusher: """Main application class""" def __init__(self, config_file: str): self.config_file = config_file self.config = configparser.ConfigParser() self.cert_manager = CertificateManager() self.stats = { 'total': 0, 'uploaded': 0, 'skipped': 0, 'failed': 0 } def load_config(self) -> bool: """Load configuration from INI file""" try: logger.info(f"Loading configuration from {self.config_file}") self.config.read(self.config_file) if 'global' not in self.config: logger.error("Missing [global] section in config file") return False required_global = ['source_cert_path', 'default_ssh_key'] for key in required_global: if not self.config.has_option('global', key): logger.error(f"Missing required global option: {key}") return False logger.info(f"✓ Configuration loaded successfully") logger.debug(f"Found {len(self.config.sections()) - 1} host(s) in configuration") return True except Exception as e: logger.error(f"Failed to load configuration: {e}") return False def get_key_path(self, section: str, cert_path: str) -> str: """ Get private key path for certificate Priority: section-specific > global > derived from cert path """ # Check section-specific key if self.config.has_option(section, 'source_key_path'): return self.config.get(section, 'source_key_path') # Check global key if self.config.has_option('global', 'source_key_path'): return self.config.get('global', 'source_key_path') # Derive from certificate path key_path = cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') logger.debug(f"Derived key path: {key_path}") return key_path def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process MikroTik device specifically""" try: logger.info("Using MikroTik-specific deployment method") # Get private key path source_key_path = self.get_key_path(section, source_cert_path) logger.info(f"Certificate: {source_cert_path}") logger.info(f"Private key: {source_key_path}") if not os.path.exists(source_key_path): logger.error(f"Private key file not found: {source_key_path}") return False # Load source certificate for comparison source_cert = self.cert_manager.get_cert_from_file(source_cert_path) # Check if certificate check is enabled check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) # Connect mikrotik = MikroTikManager(hostname, port, username, ssh_key) if not mikrotik.connect(): self.stats['failed'] += 1 return False # Upload and import certificate (with optional check) if not mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert): mikrotik.disconnect() self.stats['failed'] += 1 return False # If we got here and check was enabled, certificate was actually uploaded if check_first and source_cert: # Check if it was actually uploaded or skipped success, stdout, stderr = mikrotik.execute_command( f'/certificate print where name~"{mikrotik.cert_name}"' ) if success and stdout and mikrotik.cert_name in stdout: self.stats['uploaded'] += 1 else: self.stats['skipped'] += 1 else: self.stats['uploaded'] += 1 # Verify installation mikrotik.verify_certificate() mikrotik.disconnect() logger.info(f"✓ Successfully processed MikroTik {section}") return True except Exception as e: logger.error(f"MikroTik processing failed: {e}", exc_info=True) self.stats['failed'] += 1 return False def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process Proxmox VE server specifically""" try: logger.info("Using Proxmox-specific deployment method") # Get private key path source_key_path = self.get_key_path(section, source_cert_path) logger.info(f"Certificate: {source_cert_path}") logger.info(f"Private key: {source_key_path}") if not os.path.exists(source_key_path): logger.error(f"Private key file not found: {source_key_path}") return False # Connect proxmox = ProxmoxManager(hostname, port, username, ssh_key) if not proxmox.connect(): self.stats['failed'] += 1 return False # Upload certificate if not proxmox.upload_certificate(source_cert_path, source_key_path): proxmox.disconnect() self.stats['failed'] += 1 return False proxmox.disconnect() self.stats['uploaded'] += 1 logger.info(f"✓ Successfully processed Proxmox {section}") return True except Exception as e: logger.error(f"Proxmox processing failed: {e}", exc_info=True) self.stats['failed'] += 1 return False def process_host(self, section: str) -> bool: """Process certificate deployment for a single host""" try: logger.info(f"\n{'='*60}") logger.info(f"Processing host: {section}") logger.info(f"{'='*60}") self.stats['total'] += 1 # Get configuration hostname = self.config.get(section, 'hostname') port = self.config.getint(section, 'port', fallback=22) username = self.config.get(section, 'username', fallback='root') device_type = self.config.get(section, 'type', fallback='standard') # Determine SSH key to use if self.config.has_option(section, 'ssh_key_path'): ssh_key = self.config.get(section, 'ssh_key_path') else: ssh_key = self.config.get('global', 'default_ssh_key') # Allow per-host certificate override if self.config.has_option(section, 'source_cert_path'): source_cert_path = self.config.get(section, 'source_cert_path') logger.info(f"Using host-specific certificate: {source_cert_path}") else: source_cert_path = self.config.get('global', 'source_cert_path') logger.debug(f"Using global certificate: {source_cert_path}") # Verify certificate exists if not os.path.exists(source_cert_path): logger.error(f"Certificate file not found: {source_cert_path}") self.stats['failed'] += 1 return False logger.info(f"Host: {hostname}:{port}") logger.info(f"Type: {device_type}") logger.info(f"Username: {username}") logger.info(f"SSH Key: {ssh_key}") # Handle device-specific deployments if device_type.lower() == 'mikrotik': return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path) elif device_type.lower() == 'proxmox': return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path) # Standard processing for other devices remote_cert_path = self.config.get(section, 'remote_cert_path') post_upload_command = self.config.get(section, 'post_upload_command', fallback='') check_url = self.config.get(section, 'check_url', fallback='') logger.info(f"Remote certificate path: {remote_cert_path}") # Check if upload is needed if check_url: logger.info(f"Checking current certificate at: {check_url}") source_cert = self.cert_manager.get_cert_from_file(source_cert_path) remote_cert = self.cert_manager.get_cert_from_url(check_url) if source_cert and remote_cert: if self.cert_manager.compare_certificates(source_cert, remote_cert): logger.info(f"✓ Certificate on {hostname} is already up to date. Skipping upload.") self.stats['skipped'] += 1 return True else: logger.info(f"Certificate on {hostname} is outdated. Upload needed.") logger.debug(self.cert_manager.get_certificate_info(source_cert)) else: logger.warning(f"Could not compare certificates. Proceeding with upload.") # Connect and upload ssh = SSHManager(hostname, port, username, ssh_key) if not ssh.connect(): self.stats['failed'] += 1 return False # Upload certificate if not ssh.upload_file(source_cert_path, remote_cert_path): ssh.disconnect() self.stats['failed'] += 1 return False # Upload private key if remote_key_path is specified if self.config.has_option(section, 'remote_key_path'): remote_key_path = self.config.get(section, 'remote_key_path') source_key_path = self.get_key_path(section, source_cert_path) logger.info(f"Remote key path: {remote_key_path}") logger.info(f"Uploading private key: {source_key_path} -> {remote_key_path}") if not os.path.exists(source_key_path): logger.error(f"Private key file not found: {source_key_path}") ssh.disconnect() self.stats['failed'] += 1 return False if not ssh.upload_file(source_key_path, remote_key_path): logger.warning(f"Failed to upload private key to {remote_key_path}") # Continue anyway - key might be uploaded via additional_files # Upload additional files if specified if self.config.has_option(section, 'additional_files'): additional_files = self.config.get(section, 'additional_files') # Format: local_path:remote_path,local_path:remote_path for file_pair in additional_files.split(','): if ':' in file_pair: local, remote = file_pair.strip().split(':', 1) logger.info(f"Uploading additional file: {local} -> {remote}") if not ssh.upload_file(local, remote): logger.warning(f"Failed to upload additional file: {local}") # Execute post-upload command if post_upload_command: logger.info(f"Executing post-upload command: {post_upload_command}") success, stdout, stderr = ssh.execute_command(post_upload_command) if not success: logger.warning(f"Post-upload command failed, but file was uploaded successfully") else: logger.info(f"✓ Post-upload command completed successfully") ssh.disconnect() self.stats['uploaded'] += 1 logger.info(f"✓ Successfully processed {section}") return True except Exception as e: logger.error(f"Failed to process host {section}: {e}", exc_info=True) self.stats['failed'] += 1 return False def run(self): """Main execution method""" logger.info("="*60) logger.info(" CertPusher - SSL Certificate Distribution Tool") logger.info("="*60) logger.info(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info("") if not self.load_config(): logger.error("Configuration loading failed. Exiting.") sys.exit(1) # Verify source certificate exists source_cert = self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert): logger.error(f"Source certificate not found: {source_cert}") sys.exit(1) logger.info(f"Source certificate: {source_cert}") # Display certificate info cert = self.cert_manager.get_cert_from_file(source_cert) if cert: logger.info(self.cert_manager.get_certificate_info(cert)) # Process each host for section in self.config.sections(): if section == 'global': continue try: self.process_host(section) except Exception as e: logger.error(f"Unexpected error processing {section}: {e}", exc_info=True) self.stats['failed'] += 1 # Print summary logger.info("\n" + "="*60) logger.info(" DEPLOYMENT SUMMARY") logger.info("="*60) logger.info(f"Total hosts: {self.stats['total']}") logger.info(f"✓ Uploaded: {self.stats['uploaded']}") logger.info(f"○ Skipped (up to date): {self.stats['skipped']}") logger.info(f"✗ Failed: {self.stats['failed']}") logger.info("="*60) logger.info(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if self.stats['failed'] > 0: sys.exit(1) def main(): """Entry point""" print(""" ╔═══════════════════════════════════════════════════════════╗ ║ CertPusher v1.0 ║ ║ Automated SSL Certificate Distribution Tool ║ ╚═══════════════════════════════════════════════════════════╝ """) if len(sys.argv) < 2: print("Usage: python certpusher.py ") print("Example: python certpusher.py config.ini") print("") sys.exit(1) config_file = sys.argv[1] if not os.path.exists(config_file): print(f"Error: Configuration file '{config_file}' not found") sys.exit(1) try: pusher = CertPusher(config_file) pusher.run() except KeyboardInterrupt: print("\n\nInterrupted by user. Exiting...") sys.exit(130) except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1) if __name__ == '__main__': main()