#!/usr/bin/env python3 """ CertPusher - Automated SSL Certificate Distribution Tool Distributes SSL certificates to remote servers via SSH/SCP Supports standard Linux servers, MikroTik RouterOS, and Proxmox VE """ import configparser import logging import sys import os import ssl import socket import re import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Tuple import paramiko from scp import SCPClient import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend # Create logs directory if it doesn't exist LOG_DIR = Path(__file__).parent / 'logs' LOG_DIR.mkdir(exist_ok=True) # Logging configuration LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' LOG_FILE = LOG_DIR / f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' def setup_logging(debug: bool = False): """Setup logging with configurable level""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=log_level, format=LOG_FORMAT, handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout) ] ) # Reduce paramiko logging noise logging.getLogger('paramiko').setLevel(logging.WARNING) logging.getLogger('paramiko.transport').setLevel(logging.WARNING) logger = logging.getLogger('CertPusher') class CertificateManager: """Manages certificate comparison and validation""" @staticmethod def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]: """Load certificate from file""" try: with open(cert_path, 'rb') as f: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data, default_backend()) logger.debug(f"Loaded certificate from {cert_path}") return cert except Exception as e: logger.error(f"Failed to load certificate from {cert_path}: {e}") return None @staticmethod def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]: """Retrieve certificate from HTTPS URL""" try: hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0] port = 443 if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]: port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1]) logger.debug(f"Checking certificate at {hostname}:{port}") context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with socket.create_connection((hostname, port), timeout=timeout) as sock: with context.wrap_socket(sock, server_hostname=hostname) as ssock: der_cert = ssock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(der_cert, default_backend()) return cert except Exception as e: logger.warning(f"Failed to retrieve certificate from {url}: {e}") return None @staticmethod def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool: """Compare two certificates by serial number and fingerprint""" try: same_serial = cert1.serial_number == cert2.serial_number from cryptography.hazmat.primitives import hashes fingerprint1 = cert1.fingerprint(hashes.SHA256()) fingerprint2 = cert2.fingerprint(hashes.SHA256()) same_fingerprint = fingerprint1 == fingerprint2 return same_serial and same_fingerprint except Exception as e: logger.error(f"Failed to compare certificates: {e}") return False @staticmethod def get_certificate_info(cert: x509.Certificate) -> str: """Get human-readable certificate information""" try: subject = cert.subject.rfc4514_string() issuer = cert.issuer.rfc4514_string() valid_from = cert.not_valid_before_utc valid_to = cert.not_valid_after_utc now = datetime.now(timezone.utc) days_left = (valid_to - now).days return f""" Certificate Info: Subject: {subject} Issuer: {issuer} Valid From: {valid_from} Valid To: {valid_to} Days Until Expiry: {days_left} """ except Exception as e: return f"Unable to extract certificate info: {e}" class SSHManager: """Manages SSH connections and file transfers""" def __init__(self, hostname: str, port: int, username: str, key_path: str): self.hostname = hostname self.port = port self.username = username self.key_path = key_path self.ssh_client = None def connect(self) -> bool: """Establish SSH connection""" try: self.ssh_client = paramiko.SSHClient() self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}") private_key = None key_types = [ ('RSA', paramiko.RSAKey), ('Ed25519', paramiko.Ed25519Key), ('ECDSA', paramiko.ECDSAKey), ] for key_name, key_class in key_types: try: private_key = key_class.from_private_key_file(self.key_path) logger.debug(f"Loaded {key_name} key") break except Exception: continue if not private_key: logger.error(f"Could not load SSH key from {self.key_path}") return False self.ssh_client.connect( hostname=self.hostname, port=self.port, username=self.username, pkey=private_key, timeout=30, banner_timeout=30, auth_timeout=30 ) logger.info(f"✓ Connected to {self.hostname}:{self.port}") return True except Exception as e: logger.error(f"SSH connection failed to {self.hostname}:{self.port}: {e}") return False def upload_file(self, local_path: str, remote_path: str) -> bool: """Upload file via SCP""" try: logger.info(f"Uploading to {self.hostname}:{remote_path}") remote_dir = os.path.dirname(remote_path) if remote_dir: self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True) with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(local_path, remote_path) logger.info(f"✓ Upload successful") return True except Exception as e: logger.error(f"Upload failed: {e}") return False def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]: """Execute command on remote server""" try: logger.debug(f"Executing: {command}") stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) exit_status = stdout.channel.recv_exit_status() stdout_text = stdout.read().decode('utf-8', errors='ignore') stderr_text = stderr.read().decode('utf-8', errors='ignore') if exit_status == 0: logger.debug(f"Command completed successfully") else: if not ignore_error: logger.error(f"Command failed with exit code {exit_status}") if stderr_text: logger.error(f"Error: {stderr_text}") return exit_status == 0, stdout_text, stderr_text except Exception as e: logger.error(f"Command execution failed: {e}") return False, "", str(e) def disconnect(self): """Close SSH connection""" if self.ssh_client: self.ssh_client.close() logger.debug(f"Disconnected from {self.hostname}") class MikroTikManager(SSHManager): """Specialized manager for MikroTik RouterOS devices""" def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) self.cert_name = "ssl-cert" self.key_name = "ssl-key" def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: """ Check if certificate on MikroTik needs update Returns True if upload needed, False if current cert is OK """ try: logger.info("Checking MikroTik certificate") success, stdout, stderr = self.execute_command( f'/certificate print detail where name~"{self.cert_name}"', ignore_error=True ) if not success or not stdout: logger.info("No certificate found on MikroTik. Upload needed.") return True invalid_after_match = re.search(r'invalid-after:\s+([a-zA-Z]{3}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})', stdout) if not invalid_after_match: logger.warning("Could not parse certificate expiry. Proceeding with upload.") return True mikrotik_expiry_str = invalid_after_match.group(1) try: mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%b/%d/%Y %H:%M:%S') mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) except Exception as e: logger.warning(f"Could not parse date: {e}. Proceeding with upload.") return True source_expiry = source_cert.not_valid_after_utc time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) if time_diff < 86400: logger.info("✓ MikroTik certificate is current. Skipping upload.") return False else: logger.info(f"MikroTik certificate differs. Upload needed.") return True except Exception as e: logger.warning(f"Error checking certificate: {e}. Proceeding with upload.") return True def upload_certificate(self, cert_path: str, key_path: str = None, check_first: bool = True, source_cert: x509.Certificate = None) -> bool: """Upload and import certificate to MikroTik RouterOS""" try: logger.info(f"MikroTik certificate deployment") if check_first and source_cert: if not self.check_certificate_expiry(source_cert): return True logger.debug("Disabling www-ssl service") self.execute_command('/ip service disable www-ssl', ignore_error=True) logger.debug("Removing old certificates") cleanup_commands = [ f'/certificate remove [find name~"{self.cert_name}"]', f'/file remove "{self.cert_name}.pem"', ] if key_path: cleanup_commands.append(f'/file remove "{self.key_name}.pem"') for cmd in cleanup_commands: self.execute_command(cmd, ignore_error=True) logger.info("Uploading certificate") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(cert_path, f'{self.cert_name}.pem') if key_path: logger.info("Uploading private key") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(key_path, f'{self.key_name}.pem') logger.info("Importing certificate") import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' success, stdout, stderr = self.execute_command(import_cmd, timeout=30) if not success and "failure" in stderr.lower(): logger.error(f"Certificate import failed: {stderr}") return False import time time.sleep(2) logger.info("Configuring www-ssl service") config_commands = [ f'/ip service set www-ssl certificate={self.cert_name}_0', '/ip service enable www-ssl', ] for cmd in config_commands: self.execute_command(cmd, ignore_error=True) logger.info(f"✓ MikroTik deployment successful") return True except Exception as e: logger.error(f"MikroTik deployment failed: {e}") return False def verify_certificate(self) -> bool: """Verify certificate is properly installed""" try: success, stdout, stderr = self.execute_command( '/certificate print detail where name~"ssl-cert"' ) if success and stdout: logger.debug(f"Certificate verified") return True return False except Exception as e: logger.error(f"Certificate verification failed: {e}") return False class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool: """ Check if certificate on Proxmox needs update Returns True if upload needed, False if current cert is OK """ try: logger.info("Checking Proxmox certificate") # Method 1: Check via SSH - read cert file directly success, stdout, stderr = self.execute_command( 'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates', ignore_error=True ) if success and stdout: logger.debug(f"Proxmox certificate info:\n{stdout}") # Parse serial number serial_match = re.search(r'serial=([A-F0-9]+)', stdout) # Parse expiry date notAfter_match = re.search(r'notAfter=(.+)', stdout) if serial_match and notAfter_match: proxmox_serial = serial_match.group(1) source_serial = format(source_cert.serial_number, 'X') logger.debug(f"Source serial: {source_serial}") logger.debug(f"Proxmox serial: {proxmox_serial}") if source_serial == proxmox_serial: logger.info("✓ Proxmox certificate is current. Skipping upload.") return False else: logger.info("Proxmox certificate differs. Upload needed.") return True # Method 2: Fallback - try URL check if check_url: cert_manager = CertificateManager() remote_cert = cert_manager.get_cert_from_url(check_url) if remote_cert: if cert_manager.compare_certificates(source_cert, remote_cert): logger.info("✓ Certificate verified via URL. Skipping upload.") return False # If we can't verify, proceed with upload logger.warning("Could not verify certificate. Proceeding with upload.") return True except Exception as e: logger.warning(f"Error checking certificate: {e}. Proceeding with upload.") return True def upload_certificate(self, cert_path: str, key_path: str, check_first: bool = True, source_cert: x509.Certificate = None, check_url: str = None) -> bool: """Upload certificate to Proxmox VE""" try: logger.info(f"Proxmox certificate deployment") # Check if upload is needed if check_first and source_cert: if not self.check_certificate(source_cert, check_url): return True # Certificate is current, skip upload logger.info("Uploading certificate") if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'): return False logger.info("Uploading private key") if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'): return False logger.debug("Setting permissions") commands = [ 'chmod 640 /etc/pve/local/pveproxy-ssl.key', 'chown root:www-data /etc/pve/local/pveproxy-ssl.key', ] for cmd in commands: self.execute_command(cmd, ignore_error=False) logger.info("Restarting pveproxy") success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30) if not success: logger.error(f"Failed to restart pveproxy") return False import time time.sleep(3) success, stdout, stderr = self.execute_command('systemctl is-active pveproxy') if success and 'active' in stdout: logger.info(f"✓ Proxmox deployment successful") return True else: logger.error("pveproxy service is not active") return False except Exception as e: logger.error(f"Proxmox deployment failed: {e}") return False class CertPusher: """Main application class""" def __init__(self, config_file: str): self.config_file = config_file self.config = configparser.ConfigParser() self.cert_manager = CertificateManager() self.stats = { 'total': 0, 'uploaded': 0, 'skipped': 0, 'failed': 0 } def load_config(self) -> bool: """Load configuration from INI file""" try: logger.info(f"Loading configuration from {self.config_file}") self.config.read(self.config_file) if 'global' not in self.config: logger.error("Missing [global] section in config file") return False required_global = ['source_cert_path', 'default_ssh_key'] for key in required_global: if not self.config.has_option('global', key): logger.error(f"Missing required global option: {key}") return False logger.info(f"✓ Configuration loaded") logger.info(f"Found {len(self.config.sections()) - 1} host(s)") return True except Exception as e: logger.error(f"Failed to load configuration: {e}") return False def get_key_path(self, section: str, cert_path: str) -> str: """Get private key path for certificate""" if self.config.has_option(section, 'source_key_path'): return self.config.get(section, 'source_key_path') if self.config.has_option('global', 'source_key_path'): return self.config.get('global', 'source_key_path') key_path = cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') return key_path def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process MikroTik device specifically""" try: logger.info("Using MikroTik deployment method") source_key_path = self.get_key_path(section, source_cert_path) if not os.path.exists(source_key_path): logger.error(f"Private key not found: {source_key_path}") return False source_cert = self.cert_manager.get_cert_from_file(source_cert_path) check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) mikrotik = MikroTikManager(hostname, port, username, ssh_key) if not mikrotik.connect(): self.stats['failed'] += 1 return False result = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert) if not result: mikrotik.disconnect() self.stats['failed'] += 1 return False mikrotik.verify_certificate() mikrotik.disconnect() self.stats['uploaded'] += 1 logger.info(f"✓ MikroTik processed successfully") return True except Exception as e: logger.error(f"MikroTik processing failed: {e}") self.stats['failed'] += 1 return False def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process Proxmox VE server specifically""" try: logger.info("Using Proxmox deployment method") source_key_path = self.get_key_path(section, source_cert_path) if not os.path.exists(source_key_path): logger.error(f"Private key not found: {source_key_path}") return False # Load source certificate for comparison source_cert = self.cert_manager.get_cert_from_file(source_cert_path) # Get check URL if available check_url = self.config.get(section, 'check_url', fallback=None) # Check if we should verify before upload check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) proxmox = ProxmoxManager(hostname, port, username, ssh_key) if not proxmox.connect(): self.stats['failed'] += 1 return False # Upload with optional checking if not proxmox.upload_certificate(source_cert_path, source_key_path, check_first, source_cert, check_url): proxmox.disconnect() self.stats['failed'] += 1 return False proxmox.disconnect() self.stats['uploaded'] += 1 logger.info(f"✓ Proxmox processed successfully") return True except Exception as e: logger.error(f"Proxmox processing failed: {e}") self.stats['failed'] += 1 return False def process_host(self, section: str) -> bool: """Process certificate deployment for a single host""" try: logger.info(f"\n{'='*60}") logger.info(f"Processing: {section}") logger.info(f"{'='*60}") self.stats['total'] += 1 hostname = self.config.get(section, 'hostname') port = self.config.getint(section, 'port', fallback=22) username = self.config.get(section, 'username', fallback='root') device_type = self.config.get(section, 'type', fallback='standard') if self.config.has_option(section, 'ssh_key_path'): ssh_key = self.config.get(section, 'ssh_key_path') else: ssh_key = self.config.get('global', 'default_ssh_key') if self.config.has_option(section, 'source_cert_path'): source_cert_path = self.config.get(section, 'source_cert_path') logger.info(f"Using host-specific certificate") else: source_cert_path = self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert_path): logger.error(f"Certificate not found: {source_cert_path}") self.stats['failed'] += 1 return False logger.info(f"Host: {hostname}:{port}") logger.info(f"Type: {device_type}") logger.info(f"User: {username}") if device_type.lower() == 'mikrotik': return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path) elif device_type.lower() == 'proxmox': return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path) remote_cert_path = self.config.get(section, 'remote_cert_path') post_upload_command = self.config.get(section, 'post_upload_command', fallback='') check_url = self.config.get(section, 'check_url', fallback='') if check_url: logger.info(f"Checking certificate at {check_url}") source_cert = self.cert_manager.get_cert_from_file(source_cert_path) remote_cert = self.cert_manager.get_cert_from_url(check_url) if source_cert and remote_cert: if self.cert_manager.compare_certificates(source_cert, remote_cert): logger.info(f"✓ Certificate is up to date. Skipping.") self.stats['skipped'] += 1 return True else: logger.info(f"Certificate is outdated. Uploading.") else: logger.warning(f"Could not compare certificates. Proceeding.") ssh = SSHManager(hostname, port, username, ssh_key) if not ssh.connect(): self.stats['failed'] += 1 return False if not ssh.upload_file(source_cert_path, remote_cert_path): ssh.disconnect() self.stats['failed'] += 1 return False if self.config.has_option(section, 'remote_key_path'): remote_key_path = self.config.get(section, 'remote_key_path') source_key_path = self.get_key_path(section, source_cert_path) logger.info(f"Uploading private key") if not os.path.exists(source_key_path): logger.error(f"Private key not found: {source_key_path}") ssh.disconnect() self.stats['failed'] += 1 return False if not ssh.upload_file(source_key_path, remote_key_path): logger.warning(f"Failed to upload private key") if self.config.has_option(section, 'additional_files'): additional_files = self.config.get(section, 'additional_files') for file_pair in additional_files.split(','): if ':' in file_pair: local, remote = file_pair.strip().split(':', 1) logger.info(f"Uploading additional: {os.path.basename(local)}") if not ssh.upload_file(local, remote): logger.warning(f"Failed to upload additional file") if post_upload_command: logger.info(f"Executing post-upload command") success, stdout, stderr = ssh.execute_command(post_upload_command) if not success: logger.warning(f"Post-upload command failed") else: logger.info(f"✓ Post-upload command completed") ssh.disconnect() self.stats['uploaded'] += 1 logger.info(f"✓ Host processed successfully") return True except Exception as e: logger.error(f"Failed to process host: {e}") self.stats['failed'] += 1 return False def run(self): """Main execution method""" logger.info("="*60) logger.info(" CertPusher - SSL Certificate Distribution") logger.info("="*60) logger.info(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Log file: {LOG_FILE}") logger.info("") if not self.load_config(): logger.error("Configuration loading failed") sys.exit(1) source_cert = self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert): logger.error(f"Source certificate not found: {source_cert}") sys.exit(1) logger.info(f"Source certificate: {source_cert}") cert = self.cert_manager.get_cert_from_file(source_cert) if cert: logger.info(self.cert_manager.get_certificate_info(cert)) for section in self.config.sections(): if section == 'global': continue try: self.process_host(section) except Exception as e: logger.error(f"Unexpected error: {e}") self.stats['failed'] += 1 logger.info("\n" + "="*60) logger.info(" SUMMARY") logger.info("="*60) logger.info(f"Total hosts: {self.stats['total']}") logger.info(f"✓ Uploaded: {self.stats['uploaded']}") logger.info(f"○ Skipped: {self.stats['skipped']}") logger.info(f"✗ Failed: {self.stats['failed']}") logger.info("="*60) logger.info(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if self.stats['failed'] > 0: sys.exit(1) def main(): """Entry point""" parser = argparse.ArgumentParser( description='CertPusher - SSL Certificate Distribution Tool', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s config.ini # Normal operation %(prog)s config.ini --debug # Debug mode with verbose logging %(prog)s config.ini -d # Same as above """ ) parser.add_argument('config', help='Path to configuration file') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging') parser.add_argument('-v', '--version', action='version', version='CertPusher 1.0') args = parser.parse_args() # Setup logging based on debug flag setup_logging(debug=args.debug) print(""" ╔═══════════════════════════════════════════════════════════╗ ║ CertPusher v1.0 ║ ║ Automated SSL Certificate Distribution Tool ║ ╚═══════════════════════════════════════════════════════════╝ """) if not os.path.exists(args.config): print(f"Error: Configuration file '{args.config}' not found") sys.exit(1) try: pusher = CertPusher(args.config) pusher.run() except KeyboardInterrupt: print("\n\nInterrupted by user. Exiting...") sys.exit(130) except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1) if __name__ == '__main__': main()