#!/usr/bin/env python3 """ CertPusher - Automated SSL Certificate Distribution Tool Version 1.1 - With serial number normalization """ import configparser import logging import sys import os import ssl import socket import re import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Tuple import paramiko from scp import SCPClient import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes # Create logs directory LOG_DIR = Path(__file__).parent / 'logs' LOG_DIR.mkdir(exist_ok=True) LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' LOG_FILE = LOG_DIR / f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' def setup_logging(debug: bool = False): """Setup logging with configurable level""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=log_level, format=LOG_FORMAT, handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout) ] ) logging.getLogger('paramiko').setLevel(logging.WARNING) logging.getLogger('paramiko.transport').setLevel(logging.WARNING) logger = logging.getLogger('CertPusher') def normalize_serial(serial: str) -> str: """ Normalize certificate serial number Removes leading zeros, colons, spaces and converts to uppercase Examples: '05BC46A0...' -> '5BC46A0...' '0005BC...' -> '5BC...' '00' -> '0' """ # Remove formatting characters normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '') # Remove leading zeros but keep at least one digit normalized = normalized.lstrip('0') or '0' return normalized class CertificateManager: """Manages certificate comparison and validation""" @staticmethod def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]: """Load certificate from file""" try: with open(cert_path, 'rb') as f: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data, default_backend()) logger.debug(f"Loaded certificate from {cert_path}") return cert except Exception as e: logger.error(f"Failed to load certificate from {cert_path}: {e}") return None @staticmethod def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]: """Retrieve certificate from HTTPS URL""" try: hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0] port = 443 if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]: port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1]) logger.debug(f"Checking certificate at {hostname}:{port}") context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=timeout) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: der_cert = ssock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(der_cert, default_backend()) return cert except Exception as e: logger.warning(f"Failed to retrieve certificate from {url}: {e}") return None @staticmethod def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool: """Compare two certificates by normalized serial number""" try: serial1 = normalize_serial(format(cert1.serial_number, 'X').upper()) serial2 = normalize_serial(format(cert2.serial_number, 'X').upper()) logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}") return serial1 == serial2 except Exception as e: logger.error(f"Failed to compare certificates: {e}") return False @staticmethod def get_certificate_info(cert: x509.Certificate) -> str: """Get human-readable certificate information""" try: subject = cert.subject.rfc4514_string() valid_to = cert.not_valid_after_utc now = datetime.now(timezone.utc) days_left = (valid_to - now).days serial = normalize_serial(format(cert.serial_number, 'X').upper()) return f"""Certificate: {subject} Serial: {serial} Expires: {valid_to} Days left: {days_left}""" except Exception as e: return f"Unable to extract certificate info: {e}" class SSHManager: """Manages SSH connections and file transfers""" def __init__(self, hostname: str, port: int, username: str, key_path: str): self.hostname = hostname self.port = port self.username = username self.key_path = key_path self.ssh_client = None def connect(self) -> bool: """Establish SSH connection""" try: self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}") private_key = None key_types = [ ('RSA', paramiko.RSAKey), ('Ed25519', paramiko.Ed25519Key), ('ECDSA', paramiko.ECDSAKey), ] for key_name, key_class in key_types: try: private_key = key_class.from_private_key_file(self.key_path) logger.debug(f"Loaded {key_name} key") break except Exception: continue if not private_key: logger.error(f"Could not load SSH key from {self.key_path}") return False self.ssh_client.connect( hostname=self.hostname, port=self.port, username=self.username, pkey=private_key, timeout=30, banner_timeout=30, auth_timeout=30 ) logger.info(f"✓ Connected to {self.hostname}:{self.port}") return True except Exception as e: logger.error(f"SSH connection failed: {e}") return False def upload_file(self, local_path: str, remote_path: str) -> bool: """Upload file via SCP""" try: logger.info(f"Uploading to {remote_path}") remote_dir = os.path.dirname(remote_path) if remote_dir: self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True) with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(local_path, remote_path) logger.info(f"✓ Upload successful") return True except Exception as e: logger.error(f"Upload failed: {e}") return False def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]: """Execute command on remote server""" try: logger.debug(f"Executing: {command}") stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) exit_status = stdout.channel.recv_exit_status() stdout_text = stdout.read().decode('utf-8', errors='ignore') stderr_text = stderr.read().decode('utf-8', errors='ignore') if exit_status != 0 and not ignore_error: logger.error(f"Command failed with exit code {exit_status}") if stderr_text: logger.debug(f"Error: {stderr_text}") return exit_status == 0, stdout_text, stderr_text except Exception as e: logger.error(f"Command execution failed: {e}") return False, "", str(e) def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool: """ Check if remote certificate matches source certificate Returns True if upload needed, False if certificates match """ try: logger.info("Checking remote certificate via SSH") success, stdout, stderr = self.execute_command( f'openssl x509 -in {remote_cert_path} -noout -serial 2>/dev/null || echo "NOTFOUND"', ignore_error=True ) if success and stdout and "NOTFOUND" not in stdout: serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE) if serial_match: remote_serial_raw = serial_match.group(1).upper() source_serial_raw = format(source_cert.serial_number, 'X').upper() # Normalize both serials remote_serial = normalize_serial(remote_serial_raw) source_serial = normalize_serial(source_serial_raw) logger.info(f"Source serial: {source_serial}") logger.info(f"Remote serial: {remote_serial}") if source_serial == remote_serial: logger.info("✓ Certificates match. Skipping upload.") return False else: logger.info("✗ Certificates differ. Upload needed.") return True logger.warning("Could not read remote certificate via SSH") return True except Exception as e: logger.warning(f"Error checking remote certificate: {e}") return True def disconnect(self): """Close SSH connection""" if self.ssh_client: self.ssh_client.close() logger.debug(f"Disconnected") class MikroTikManager(SSHManager): """Specialized manager for MikroTik RouterOS devices""" def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) # Use unique, timestamped certificate name self.cert_name = f"certpusher-{datetime.now().strftime('%Y%m')}" # This will create names like: certpusher-202510 def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: """Check if certificate on MikroTik needs update""" try: logger.info("Checking MikroTik certificate") # Get source certificate common-name for matching source_cn = source_cert.subject.rfc4514_string() source_expiry = source_cert.not_valid_after_utc logger.info(f"Looking for cert with CN: {source_cn}") logger.info(f"Source expires: {source_expiry}") # Search for certificate by common-name (more reliable than name) success, stdout, stderr = self.execute_command( f'/certificate print detail where common-name~"{source_cn.split("CN=")[1] if "CN=" in source_cn else source_cn}"', ignore_error=True ) if not success or not stdout or 'invalid-after' not in stdout.lower(): logger.info("Certificate not found on MikroTik. Upload needed.") return True logger.debug(f"Found certificate:\n{stdout}") # Parse expiry date (multiple formats) invalid_after_match = re.search( r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', stdout, re.IGNORECASE ) if not invalid_after_match: invalid_after_match = re.search( r'invalid-after[:\s=]+([a-zA-Z]{3}[/\s]\d{1,2}[/\s]\d{4}\s+\d{2}:\d{2}:\d{2})', stdout, re.IGNORECASE ) if not invalid_after_match: logger.warning("Could not parse expiry date") return True mikrotik_expiry_str = invalid_after_match.group(1) logger.debug(f"Parsed expiry: {mikrotik_expiry_str}") # Parse date mikrotik_expiry = None for fmt in ['%Y-%m-%d %H:%M:%S', '%b/%d/%Y %H:%M:%S', '%b %d %Y %H:%M:%S']: try: mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, fmt) mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) break except ValueError: continue if not mikrotik_expiry: logger.warning(f"Could not parse date: {mikrotik_expiry_str}") return True logger.info(f"MikroTik expires: {mikrotik_expiry}") time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) if time_diff < 86400: logger.info("✓ Certificate is current (within 24h). Skipping.") return False else: logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.") return True except Exception as e: logger.warning(f"Error checking: {e}") import traceback logger.debug(traceback.format_exc()) return True def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: """Upload certificate to MikroTik""" try: if check_first and source_cert: if not self.check_certificate_expiry(source_cert): return True, False logger.info("Deploying MikroTik certificate") # Disable service self.execute_command('/ip service disable www-ssl', ignore_error=True) # Remove OLD certificates with our naming pattern cleanup_commands = [ f'/certificate remove [find name~"certpusher"]', f'/file remove [find name~"certpusher"]', ] for cmd in cleanup_commands: self.execute_command(cmd, ignore_error=True) # Upload with unique name logger.info(f"Uploading as: {self.cert_name}") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(cert_path, f'{self.cert_name}.pem') if key_path: with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(key_path, f'{self.cert_name}-key.pem') # Import certificate logger.info("Importing certificate") import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' if key_path: import_cmd += f' name={self.cert_name}' self.execute_command(import_cmd, timeout=30) import time time.sleep(2) # Find the imported certificate (MikroTik adds _0, _1 suffixes) success, stdout, stderr = self.execute_command( f'/certificate print where name~"{self.cert_name}"' ) if success and stdout: logger.debug(f"Imported certificates:\n{stdout}") # Extract certificate name (usually cert_name_0) cert_match = re.search(r'name="([^"]+)"', stdout) if cert_match: imported_name = cert_match.group(1) logger.info(f"Certificate imported as: {imported_name}") # Configure service config_commands = [ f'/ip service set www-ssl certificate={imported_name}', '/ip service enable www-ssl', ] for cmd in config_commands: self.execute_command(cmd, ignore_error=True) else: logger.warning("Could not find imported certificate name, using default") self.execute_command(f'/ip service set www-ssl certificate={self.cert_name}_0', ignore_error=True) self.execute_command('/ip service enable www-ssl', ignore_error=True) logger.info(f"✓ MikroTik deployment successful") return True, True except Exception as e: logger.error(f"MikroTik deployment failed: {e}") return False, False def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: """ Upload certificate to MikroTik Returns (success, was_uploaded) """ try: if check_first and source_cert: if not self.check_certificate_expiry(source_cert): return True, False logger.info("Deploying MikroTik certificate") self.execute_command('/ip service disable www-ssl', ignore_error=True) cleanup_commands = [ f'/certificate remove [find name~"{self.cert_name}"]', f'/file remove "{self.cert_name}.pem"', ] if key_path: cleanup_commands.append(f'/file remove "ssl-key.pem"') for cmd in cleanup_commands: self.execute_command(cmd, ignore_error=True) logger.info("Uploading certificate") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(cert_path, f'{self.cert_name}.pem') if key_path: logger.info("Uploading private key") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(key_path, 'ssl-key.pem') logger.info("Importing certificate") self.execute_command(f'/certificate import file-name={self.cert_name}.pem passphrase=""', timeout=30) import time time.sleep(2) config_commands = [ f'/ip service set www-ssl certificate={self.cert_name}_0', '/ip service enable www-ssl', ] for cmd in config_commands: self.execute_command(cmd, ignore_error=True) logger.info(f"✓ MikroTik deployment successful") return True, True except Exception as e: logger.error(f"MikroTik deployment failed: {e}") return False, False class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool: """ Check if certificate on Proxmox needs update Returns True if upload needed, False if certificates match Uses two methods: 1. SSH: Direct file check (preferred) 2. URL: HTTPS check (fallback) """ try: logger.info("Checking Proxmox certificate") # Method 1: Check via SSH - Direct file access success, stdout, stderr = self.execute_command( 'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null', ignore_error=True ) if success and stdout: serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE) if serial_match: proxmox_serial_raw = serial_match.group(1).upper() source_serial_raw = format(source_cert.serial_number, 'X').upper() # Normalize both serials (remove leading zeros) proxmox_serial = normalize_serial(proxmox_serial_raw) source_serial = normalize_serial(source_serial_raw) logger.info(f"Source serial: {source_serial}") logger.info(f"Proxmox serial: {proxmox_serial}") if source_serial == proxmox_serial: logger.info("✓ Certificates match (SSH check). Skipping upload.") return False else: logger.info("✗ Certificates differ (SSH check). Upload needed.") return True # Method 2: Fallback to URL check if check_url: logger.info("SSH check failed. Trying URL-based check...") cert_manager = CertificateManager() remote_cert = cert_manager.get_cert_from_url(check_url) if remote_cert: # Compare using normalized serials remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper()) source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper()) logger.info(f"Source serial (URL): {source_serial}") logger.info(f"Remote serial (URL): {remote_serial}") if remote_serial == source_serial: logger.info("✓ Certificates match (URL check). Skipping upload.") return False else: logger.info("✗ Certificates differ (URL check). Upload needed.") return True logger.warning("Could not verify certificate. Proceeding with upload for safety.") return True except Exception as e: logger.warning(f"Error checking: {e}. Proceeding with upload.") return True def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate, check_url: str) -> Tuple[bool, bool]: """ Upload certificate to Proxmox Returns (success, was_uploaded) """ try: if check_first and source_cert: if not self.check_certificate(source_cert, check_url): return True, False logger.info("Deploying Proxmox certificate") if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'): return False, False if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'): return False, False commands = [ 'chmod 640 /etc/pve/local/pveproxy-ssl.key', 'chown root:www-data /etc/pve/local/pveproxy-ssl.key', ] for cmd in commands: self.execute_command(cmd) logger.info("Restarting pveproxy") self.execute_command('systemctl restart pveproxy', timeout=30) import time time.sleep(3) success, stdout, _ = self.execute_command('systemctl is-active pveproxy') if success and 'active' in stdout: logger.info(f"✓ Proxmox deployment successful") return True, True else: logger.error("pveproxy not active") return False, False except Exception as e: logger.error(f"Proxmox deployment failed: {e}") return False, False class CertPusher: """Main application class""" def __init__(self, config_file: str): self.config_file = config_file self.config = configparser.ConfigParser() self.cert_manager = CertificateManager() self.stats = {'total': 0, 'uploaded': 0, 'skipped': 0, 'failed': 0} def load_config(self) -> bool: """Load configuration""" try: logger.info(f"Loading config: {self.config_file}") self.config.read(self.config_file) if 'global' not in self.config: logger.error("Missing [global] section") return False required = ['source_cert_path', 'default_ssh_key'] for key in required: if not self.config.has_option('global', key): logger.error(f"Missing: {key}") return False logger.info(f"✓ Config loaded ({len(self.config.sections()) - 1} hosts)") return True except Exception as e: logger.error(f"Config load failed: {e}") return False def get_key_path(self, section: str, cert_path: str) -> str: """Get private key path""" if self.config.has_option(section, 'source_key_path'): return self.config.get(section, 'source_key_path') if self.config.has_option('global', 'source_key_path'): return self.config.get('global', 'source_key_path') return cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process MikroTik device""" try: source_key_path = self.get_key_path(section, source_cert_path) if not os.path.exists(source_key_path): logger.error(f"Key not found: {source_key_path}") return False source_cert = self.cert_manager.get_cert_from_file(source_cert_path) if not source_cert: return False check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) mikrotik = MikroTikManager(hostname, port, username, ssh_key) if not mikrotik.connect(): self.stats['failed'] += 1 return False success, was_uploaded = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert) mikrotik.disconnect() if success: if was_uploaded: self.stats['uploaded'] += 1 else: self.stats['skipped'] += 1 logger.info("✓ MikroTik processed") return True else: self.stats['failed'] += 1 return False except Exception as e: logger.error(f"MikroTik failed: {e}") self.stats['failed'] += 1 return False def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process Proxmox server""" try: source_key_path = self.get_key_path(section, source_cert_path) if not os.path.exists(source_key_path): logger.error(f"Key not found: {source_key_path}") return False source_cert = self.cert_manager.get_cert_from_file(source_cert_path) if not source_cert: return False logger.info(f"Using cert: {source_cert_path}") logger.info(self.cert_manager.get_certificate_info(source_cert)) check_url = self.config.get(section, 'check_url', fallback=None) check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) proxmox = ProxmoxManager(hostname, port, username, ssh_key) if not proxmox.connect(): self.stats['failed'] += 1 return False success, was_uploaded = proxmox.upload_certificate(source_cert_path, source_key_path, check_first, source_cert, check_url) proxmox.disconnect() if success: if was_uploaded: self.stats['uploaded'] += 1 else: self.stats['skipped'] += 1 logger.info("✓ Proxmox processed") return True else: self.stats['failed'] += 1 return False except Exception as e: logger.error(f"Proxmox failed: {e}") self.stats['failed'] += 1 return False def process_host(self, section: str) -> bool: """Process standard host""" try: logger.info(f"\n{'='*60}\nProcessing: {section}\n{'='*60}") self.stats['total'] += 1 hostname = self.config.get(section, 'hostname') port = self.config.getint(section, 'port', fallback=22) username = self.config.get(section, 'username', fallback='root') device_type = self.config.get(section, 'type', fallback='standard') ssh_key = self.config.get(section, 'ssh_key_path', fallback=None) or self.config.get('global', 'default_ssh_key') source_cert_path = self.config.get(section, 'source_cert_path', fallback=None) or self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert_path): logger.error(f"Certificate not found: {source_cert_path}") self.stats['failed'] += 1 return False logger.info(f"Host: {hostname}:{port} ({device_type})") logger.info(f"User: {username}") # Route to specialized handlers if device_type.lower() == 'mikrotik': return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path) elif device_type.lower() == 'proxmox': return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path) # Standard host processing remote_cert_path = self.config.get(section, 'remote_cert_path') post_upload_command = self.config.get(section, 'post_upload_command', fallback='') check_url = self.config.get(section, 'check_url', fallback='') check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) source_cert = self.cert_manager.get_cert_from_file(source_cert_path) if not source_cert: self.stats['failed'] += 1 return False ssh = SSHManager(hostname, port, username, ssh_key) if not ssh.connect(): self.stats['failed'] += 1 return False # Check if upload needed upload_needed = True if check_first: # Try SSH check first if not ssh.check_remote_certificate(remote_cert_path, source_cert): upload_needed = False # Try URL check if SSH check failed elif check_url: logger.info(f"SSH check failed. Trying URL: {check_url}") remote_cert = self.cert_manager.get_cert_from_url(check_url) if remote_cert and self.cert_manager.compare_certificates(source_cert, remote_cert): logger.info("✓ Certificate up to date via URL. Skipping.") upload_needed = False if not upload_needed: ssh.disconnect() self.stats['skipped'] += 1 return True # Upload certificate if not ssh.upload_file(source_cert_path, remote_cert_path): ssh.disconnect() self.stats['failed'] += 1 return False # Upload key if specified if self.config.has_option(section, 'remote_key_path'): remote_key_path = self.config.get(section, 'remote_key_path') source_key_path = self.get_key_path(section, source_cert_path) if os.path.exists(source_key_path): ssh.upload_file(source_key_path, remote_key_path) # Additional files if self.config.has_option(section, 'additional_files'): for file_pair in self.config.get(section, 'additional_files').split(','): if ':' in file_pair: local, remote = file_pair.strip().split(':', 1) ssh.upload_file(local, remote) # Post-upload command if post_upload_command: logger.info("Executing post-upload command") ssh.execute_command(post_upload_command) ssh.disconnect() self.stats['uploaded'] += 1 logger.info("✓ Host processed") return True except Exception as e: logger.error(f"Failed: {e}") self.stats['failed'] += 1 return False def run(self): """Main execution""" logger.info("="*60) logger.info(" CertPusher - SSL Certificate Distribution") logger.info("="*60) logger.info(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Log: {LOG_FILE}\n") if not self.load_config(): sys.exit(1) source_cert = self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert): logger.error(f"Source certificate not found: {source_cert}") sys.exit(1) logger.info(f"Global certificate: {source_cert}\n") for section in self.config.sections(): if section == 'global': continue self.process_host(section) logger.info("\n" + "="*60) logger.info(" SUMMARY") logger.info("="*60) logger.info(f"Total: {self.stats['total']}") logger.info(f"✓ Uploaded: {self.stats['uploaded']}") logger.info(f"○ Skipped: {self.stats['skipped']}") logger.info(f"✗ Failed: {self.stats['failed']}") logger.info("="*60) if self.stats['failed'] > 0: sys.exit(1) def main(): """Entry point""" parser = argparse.ArgumentParser(description='CertPusher - SSL Certificate Distribution') parser.add_argument('config', help='Configuration file') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging') parser.add_argument('-v', '--version', action='version', version='CertPusher 1.1') args = parser.parse_args() setup_logging(debug=args.debug) print(""" ╔═══════════════════════════════════════════════════════════╗ ║ CertPusher v1.1 ║ ║ Automated SSL Certificate Distribution Tool ║ ╚═══════════════════════════════════════════════════════════╝ """) if not os.path.exists(args.config): print(f"Error: Config file '{args.config}' not found") sys.exit(1) try: pusher = CertPusher(args.config) pusher.run() except KeyboardInterrupt: print("\n\nInterrupted. Exiting...") sys.exit(130) except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1) if __name__ == '__main__': main()