diff --git a/.gitignore b/.gitignore index e471238..64d42d8 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ config.ini venv/ *.swp .DS_Store +logs/*.log \ No newline at end of file diff --git a/certpusher.py b/certpusher.py index 5364f7f..2667fa2 100644 --- a/certpusher.py +++ b/certpusher.py @@ -12,6 +12,7 @@ import os import ssl import socket import re +import argparse from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Tuple @@ -21,16 +22,31 @@ import requests from cryptography import x509 from cryptography.hazmat.backends import default_backend +# Create logs directory if it doesn't exist +LOG_DIR = Path(__file__).parent / 'logs' +LOG_DIR.mkdir(exist_ok=True) + # Logging configuration LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' -logging.basicConfig( - level=logging.DEBUG, - format=LOG_FORMAT, - handlers=[ - logging.FileHandler(f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), - logging.StreamHandler(sys.stdout) - ] -) +LOG_FILE = LOG_DIR / f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + +def setup_logging(debug: bool = False): + """Setup logging with configurable level""" + log_level = logging.DEBUG if debug else logging.INFO + + logging.basicConfig( + level=log_level, + format=LOG_FORMAT, + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler(sys.stdout) + ] + ) + + # Reduce paramiko logging noise + logging.getLogger('paramiko').setLevel(logging.WARNING) + logging.getLogger('paramiko.transport').setLevel(logging.WARNING) + logger = logging.getLogger('CertPusher') @@ -45,8 +61,6 @@ class CertificateManager: cert_data = f.read() cert = x509.load_pem_x509_certificate(cert_data, default_backend()) logger.debug(f"Loaded certificate from {cert_path}") - logger.debug(f"Certificate subject: {cert.subject}") - logger.debug(f"Certificate expires: {cert.not_valid_after_utc}") return cert except Exception as e: logger.error(f"Failed to load certificate from {cert_path}: {e}") @@ -62,7 +76,7 @@ class CertificateManager: if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]: port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1]) - logger.debug(f"Connecting to {hostname}:{port} to retrieve certificate") + logger.debug(f"Checking certificate at {hostname}:{port}") context = ssl.create_default_context() context.check_hostname = False @@ -72,8 +86,6 @@ class CertificateManager: with context.wrap_socket(sock, server_hostname=hostname) as ssock: der_cert = ssock.getpeercert(binary_form=True) cert = x509.load_der_x509_certificate(der_cert, default_backend()) - logger.debug(f"Retrieved certificate from {url}") - logger.debug(f"Certificate expires: {cert.not_valid_after_utc}") return cert except Exception as e: logger.warning(f"Failed to retrieve certificate from {url}: {e}") @@ -85,13 +97,11 @@ class CertificateManager: try: same_serial = cert1.serial_number == cert2.serial_number - # Compare fingerprints from cryptography.hazmat.primitives import hashes fingerprint1 = cert1.fingerprint(hashes.SHA256()) fingerprint2 = cert2.fingerprint(hashes.SHA256()) same_fingerprint = fingerprint1 == fingerprint2 - logger.debug(f"Certificate comparison - Serial match: {same_serial}, Fingerprint match: {same_fingerprint}") return same_serial and same_fingerprint except Exception as e: logger.error(f"Failed to compare certificates: {e}") @@ -106,7 +116,6 @@ class CertificateManager: valid_from = cert.not_valid_before_utc valid_to = cert.not_valid_after_utc - # Convert to naive datetime for comparison now = datetime.now(timezone.utc) days_left = (valid_to - now).days @@ -139,9 +148,7 @@ class SSHManager: self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}") - logger.debug(f"Using SSH key: {self.key_path}") - # Try to load different key types (DSS removed in paramiko 3.0+) private_key = None key_types = [ ('RSA', paramiko.RSAKey), @@ -152,10 +159,9 @@ class SSHManager: for key_name, key_class in key_types: try: private_key = key_class.from_private_key_file(self.key_path) - logger.debug(f"Successfully loaded {key_name} key") + logger.debug(f"Loaded {key_name} key") break - except Exception as e: - logger.debug(f"Not a {key_name} key: {e}") + except Exception: continue if not private_key: @@ -172,7 +178,7 @@ class SSHManager: auth_timeout=30 ) - logger.info(f"✓ Successfully connected to {self.hostname}:{self.port}") + logger.info(f"✓ Connected to {self.hostname}:{self.port}") return True except Exception as e: @@ -182,9 +188,8 @@ class SSHManager: def upload_file(self, local_path: str, remote_path: str) -> bool: """Upload file via SCP""" try: - logger.debug(f"Uploading {local_path} to {self.hostname}:{remote_path}") + logger.info(f"Uploading to {self.hostname}:{remote_path}") - # Ensure remote directory exists remote_dir = os.path.dirname(remote_path) if remote_dir: self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True) @@ -192,17 +197,17 @@ class SSHManager: with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(local_path, remote_path) - logger.info(f"✓ Successfully uploaded {local_path} to {self.hostname}:{remote_path}") + logger.info(f"✓ Upload successful") return True except Exception as e: - logger.error(f"File upload failed: {e}") + logger.error(f"Upload failed: {e}") return False def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]: """Execute command on remote server""" try: - logger.debug(f"Executing command on {self.hostname}: {command}") + logger.debug(f"Executing: {command}") stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) exit_status = stdout.channel.recv_exit_status() @@ -211,15 +216,12 @@ class SSHManager: stderr_text = stderr.read().decode('utf-8', errors='ignore') if exit_status == 0: - logger.info(f"✓ Command executed successfully on {self.hostname}") - if stdout_text: - logger.debug(f"STDOUT: {stdout_text}") + logger.debug(f"Command completed successfully") else: if not ignore_error: logger.error(f"Command failed with exit code {exit_status}") - logger.error(f"STDERR: {stderr_text}") - else: - logger.debug(f"Command failed (ignored): {stderr_text}") + if stderr_text: + logger.error(f"Error: {stderr_text}") return exit_status == 0, stdout_text, stderr_text @@ -248,85 +250,58 @@ class MikroTikManager(SSHManager): Returns True if upload needed, False if current cert is OK """ try: - logger.info("Checking certificate on MikroTik") + logger.info("Checking MikroTik certificate") - # Get certificate details from MikroTik success, stdout, stderr = self.execute_command( f'/certificate print detail where name~"{self.cert_name}"', ignore_error=True ) if not success or not stdout: - logger.info("No certificate found on MikroTik or cannot read it. Upload needed.") + logger.info("No certificate found on MikroTik. Upload needed.") return True - logger.debug(f"MikroTik certificate info:\n{stdout}") - - # Parse expiry date from output - # Looking for "invalid-after" field invalid_after_match = re.search(r'invalid-after:\s+([a-zA-Z]{3}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})', stdout) if not invalid_after_match: - logger.warning("Could not parse certificate expiry date from MikroTik. Proceeding with upload.") + logger.warning("Could not parse certificate expiry. Proceeding with upload.") return True mikrotik_expiry_str = invalid_after_match.group(1) - logger.debug(f"MikroTik certificate expires: {mikrotik_expiry_str}") - # Parse MikroTik date format (e.g., "jan/24/2026 08:34:12") try: mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%b/%d/%Y %H:%M:%S') mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) except Exception as e: - logger.warning(f"Could not parse MikroTik date: {e}. Proceeding with upload.") + logger.warning(f"Could not parse date: {e}. Proceeding with upload.") return True - # Compare with source certificate source_expiry = source_cert.not_valid_after_utc - - # Also check fingerprint/serial if available - fingerprint_match = re.search(r'fingerprint:\s+([a-f0-9]+)', stdout) - if fingerprint_match: - mikrotik_fingerprint = fingerprint_match.group(1) - logger.debug(f"MikroTik certificate fingerprint: {mikrotik_fingerprint}") - - # Compare expiry dates (allowing 1 day tolerance for timezone differences) time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) - if time_diff < 86400: # Less than 24 hours difference - logger.info("✓ Certificate on MikroTik is current. Skipping upload.") + if time_diff < 86400: + logger.info("✓ MikroTik certificate is current. Skipping upload.") return False else: - logger.info(f"Certificate on MikroTik differs. Source expires: {source_expiry}, MikroTik expires: {mikrotik_expiry}") + logger.info(f"MikroTik certificate differs. Upload needed.") return True except Exception as e: - logger.warning(f"Error checking MikroTik certificate: {e}. Proceeding with upload.") + logger.warning(f"Error checking certificate: {e}. Proceeding with upload.") return True def upload_certificate(self, cert_path: str, key_path: str = None, check_first: bool = True, source_cert: x509.Certificate = None) -> bool: - """ - Upload and import certificate to MikroTik RouterOS - - Args: - cert_path: Path to certificate file (PEM format, can be fullchain) - key_path: Optional path to private key file - check_first: If True, check existing certificate before uploading - source_cert: Source certificate object for comparison - """ + """Upload and import certificate to MikroTik RouterOS""" try: - logger.info(f"Starting MikroTik certificate deployment to {self.hostname}") + logger.info(f"MikroTik certificate deployment") - # Check if upload is needed if check_first and source_cert: if not self.check_certificate_expiry(source_cert): - return True # Certificate is current, no upload needed + return True - # Step 1: Disable www-ssl service logger.debug("Disabling www-ssl service") self.execute_command('/ip service disable www-ssl', ignore_error=True) - # Step 2: Remove old certificates logger.debug("Removing old certificates") cleanup_commands = [ f'/certificate remove [find name~"{self.cert_name}"]', @@ -339,21 +314,16 @@ class MikroTikManager(SSHManager): for cmd in cleanup_commands: self.execute_command(cmd, ignore_error=True) - # Step 3: Upload certificate file - logger.debug(f"Uploading certificate to MikroTik") + logger.info("Uploading certificate") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(cert_path, f'{self.cert_name}.pem') - logger.info(f"✓ Certificate file uploaded") - # Step 4: Upload private key if provided if key_path: - logger.debug(f"Uploading private key to MikroTik") + logger.info("Uploading private key") with SCPClient(self.ssh_client.get_transport()) as scp: scp.put(key_path, f'{self.key_name}.pem') - logger.info(f"✓ Private key file uploaded") - # Step 5: Import certificate - logger.debug("Importing certificate into RouterOS") + logger.info("Importing certificate") import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' success, stdout, stderr = self.execute_command(import_cmd, timeout=30) @@ -361,49 +331,36 @@ class MikroTikManager(SSHManager): logger.error(f"Certificate import failed: {stderr}") return False - # Give RouterOS time to process import time time.sleep(2) - # Step 6: Verify certificate was imported - success, stdout, stderr = self.execute_command( - f'/certificate print where name~"{self.cert_name}"' - ) - - if success and stdout: - logger.debug(f"Certificates after import:\n{stdout}") - - # Step 7: Configure services to use new certificate - logger.info("Configuring www-ssl service to use new certificate") - + logger.info("Configuring www-ssl service") config_commands = [ f'/ip service set www-ssl certificate={self.cert_name}_0', '/ip service enable www-ssl', ] for cmd in config_commands: - success, stdout, stderr = self.execute_command(cmd, ignore_error=True) + self.execute_command(cmd, ignore_error=True) - logger.info(f"✓ Successfully deployed certificate to MikroTik {self.hostname}") + logger.info(f"✓ MikroTik deployment successful") return True except Exception as e: - logger.error(f"MikroTik certificate deployment failed: {e}", exc_info=True) + logger.error(f"MikroTik deployment failed: {e}") return False def verify_certificate(self) -> bool: """Verify certificate is properly installed""" try: - logger.debug("Verifying certificate installation") success, stdout, stderr = self.execute_command( '/certificate print detail where name~"ssl-cert"' ) if success and stdout: - logger.info(f"Certificate verification:\n{stdout}") + logger.debug(f"Certificate verified") return True - logger.warning("Could not verify certificate installation") return False except Exception as e: @@ -415,32 +372,19 @@ class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def upload_certificate(self, cert_path: str, key_path: str) -> bool: - """ - Upload certificate to Proxmox VE - - Proxmox uses two separate files: - - /etc/pve/local/pveproxy-ssl.pem (certificate) - - /etc/pve/local/pveproxy-ssl.key (private key) - - Args: - cert_path: Path to certificate file (fullchain) - key_path: Path to private key file - """ + """Upload certificate to Proxmox VE""" try: - logger.info(f"Starting Proxmox certificate deployment to {self.hostname}") + logger.info(f"Proxmox certificate deployment") - # Step 1: Upload certificate - logger.debug("Uploading certificate to Proxmox") + logger.info("Uploading certificate") if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'): return False - # Step 2: Upload private key - logger.debug("Uploading private key to Proxmox") + logger.info("Uploading private key") if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'): return False - # Step 3: Set correct permissions - logger.debug("Setting file permissions") + logger.debug("Setting permissions") commands = [ 'chmod 640 /etc/pve/local/pveproxy-ssl.key', 'chown root:www-data /etc/pve/local/pveproxy-ssl.key', @@ -449,30 +393,26 @@ class ProxmoxManager(SSHManager): for cmd in commands: self.execute_command(cmd, ignore_error=False) - # Step 4: Restart pveproxy - logger.info("Restarting pveproxy service") + logger.info("Restarting pveproxy") success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30) if not success: - logger.error(f"Failed to restart pveproxy: {stderr}") + logger.error(f"Failed to restart pveproxy") return False - # Step 5: Verify service is running import time time.sleep(3) success, stdout, stderr = self.execute_command('systemctl is-active pveproxy') if success and 'active' in stdout: - logger.info(f"✓ Successfully deployed certificate to Proxmox {self.hostname}") + logger.info(f"✓ Proxmox deployment successful") return True else: - logger.error("pveproxy service is not active after restart") - # Show journal logs for debugging - self.execute_command('journalctl -u pveproxy -n 20 --no-pager') + logger.error("pveproxy service is not active") return False except Exception as e: - logger.error(f"Proxmox certificate deployment failed: {e}", exc_info=True) + logger.error(f"Proxmox deployment failed: {e}") return False @@ -506,8 +446,8 @@ class CertPusher: logger.error(f"Missing required global option: {key}") return False - logger.info(f"✓ Configuration loaded successfully") - logger.debug(f"Found {len(self.config.sections()) - 1} host(s) in configuration") + logger.info(f"✓ Configuration loaded") + logger.info(f"Found {len(self.config.sections()) - 1} host(s)") return True except Exception as e: @@ -515,80 +455,52 @@ class CertPusher: return False def get_key_path(self, section: str, cert_path: str) -> str: - """ - Get private key path for certificate - Priority: section-specific > global > derived from cert path - """ - # Check section-specific key + """Get private key path for certificate""" if self.config.has_option(section, 'source_key_path'): return self.config.get(section, 'source_key_path') - # Check global key if self.config.has_option('global', 'source_key_path'): return self.config.get('global', 'source_key_path') - # Derive from certificate path key_path = cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') - logger.debug(f"Derived key path: {key_path}") return key_path def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process MikroTik device specifically""" try: - logger.info("Using MikroTik-specific deployment method") + logger.info("Using MikroTik deployment method") - # Get private key path source_key_path = self.get_key_path(section, source_cert_path) - logger.info(f"Certificate: {source_cert_path}") - logger.info(f"Private key: {source_key_path}") - if not os.path.exists(source_key_path): - logger.error(f"Private key file not found: {source_key_path}") + logger.error(f"Private key not found: {source_key_path}") return False - # Load source certificate for comparison source_cert = self.cert_manager.get_cert_from_file(source_cert_path) - - # Check if certificate check is enabled check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) - # Connect mikrotik = MikroTikManager(hostname, port, username, ssh_key) if not mikrotik.connect(): self.stats['failed'] += 1 return False - # Upload and import certificate (with optional check) - if not mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert): + result = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert) + + if not result: mikrotik.disconnect() self.stats['failed'] += 1 return False - # If we got here and check was enabled, certificate was actually uploaded - if check_first and source_cert: - # Check if it was actually uploaded or skipped - success, stdout, stderr = mikrotik.execute_command( - f'/certificate print where name~"{mikrotik.cert_name}"' - ) - if success and stdout and mikrotik.cert_name in stdout: - self.stats['uploaded'] += 1 - else: - self.stats['skipped'] += 1 - else: - self.stats['uploaded'] += 1 - - # Verify installation mikrotik.verify_certificate() - mikrotik.disconnect() - logger.info(f"✓ Successfully processed MikroTik {section}") + self.stats['uploaded'] += 1 + logger.info(f"✓ MikroTik processed successfully") return True except Exception as e: - logger.error(f"MikroTik processing failed: {e}", exc_info=True) + logger.error(f"MikroTik processing failed: {e}") self.stats['failed'] += 1 return False @@ -596,26 +508,20 @@ class CertPusher: username: str, ssh_key: str, source_cert_path: str) -> bool: """Process Proxmox VE server specifically""" try: - logger.info("Using Proxmox-specific deployment method") + logger.info("Using Proxmox deployment method") - # Get private key path source_key_path = self.get_key_path(section, source_cert_path) - logger.info(f"Certificate: {source_cert_path}") - logger.info(f"Private key: {source_key_path}") - if not os.path.exists(source_key_path): - logger.error(f"Private key file not found: {source_key_path}") + logger.error(f"Private key not found: {source_key_path}") return False - # Connect proxmox = ProxmoxManager(hostname, port, username, ssh_key) if not proxmox.connect(): self.stats['failed'] += 1 return False - # Upload certificate if not proxmox.upload_certificate(source_cert_path, source_key_path): proxmox.disconnect() self.stats['failed'] += 1 @@ -623,159 +529,139 @@ class CertPusher: proxmox.disconnect() self.stats['uploaded'] += 1 - logger.info(f"✓ Successfully processed Proxmox {section}") + logger.info(f"✓ Proxmox processed successfully") return True except Exception as e: - logger.error(f"Proxmox processing failed: {e}", exc_info=True) + logger.error(f"Proxmox processing failed: {e}") self.stats['failed'] += 1 return False def process_host(self, section: str) -> bool: - """Process certificate deployment for a single host""" - try: - logger.info(f"\n{'='*60}") - logger.info(f"Processing host: {section}") - logger.info(f"{'='*60}") - - self.stats['total'] += 1 - - # Get configuration - hostname = self.config.get(section, 'hostname') - port = self.config.getint(section, 'port', fallback=22) - username = self.config.get(section, 'username', fallback='root') - device_type = self.config.get(section, 'type', fallback='standard') - - # Determine SSH key to use - if self.config.has_option(section, 'ssh_key_path'): - ssh_key = self.config.get(section, 'ssh_key_path') - else: - ssh_key = self.config.get('global', 'default_ssh_key') - - # Allow per-host certificate override - if self.config.has_option(section, 'source_cert_path'): - source_cert_path = self.config.get(section, 'source_cert_path') - logger.info(f"Using host-specific certificate: {source_cert_path}") - else: - source_cert_path = self.config.get('global', 'source_cert_path') - logger.debug(f"Using global certificate: {source_cert_path}") - - # Verify certificate exists - if not os.path.exists(source_cert_path): - logger.error(f"Certificate file not found: {source_cert_path}") - self.stats['failed'] += 1 - return False - - logger.info(f"Host: {hostname}:{port}") - logger.info(f"Type: {device_type}") - logger.info(f"Username: {username}") - logger.info(f"SSH Key: {ssh_key}") - - # Handle device-specific deployments - if device_type.lower() == 'mikrotik': - return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path) - elif device_type.lower() == 'proxmox': - return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path) - - # Standard processing for other devices - remote_cert_path = self.config.get(section, 'remote_cert_path') - post_upload_command = self.config.get(section, 'post_upload_command', fallback='') - check_url = self.config.get(section, 'check_url', fallback='') - - logger.info(f"Remote certificate path: {remote_cert_path}") - - # Check if upload is needed - if check_url: - logger.info(f"Checking current certificate at: {check_url}") - source_cert = self.cert_manager.get_cert_from_file(source_cert_path) - remote_cert = self.cert_manager.get_cert_from_url(check_url) + """Process certificate deployment for a single host""" + try: + logger.info(f"\n{'='*60}") + logger.info(f"Processing: {section}") + logger.info(f"{'='*60}") - if source_cert and remote_cert: - if self.cert_manager.compare_certificates(source_cert, remote_cert): - logger.info(f"✓ Certificate on {hostname} is already up to date. Skipping upload.") - self.stats['skipped'] += 1 - return True - else: - logger.info(f"Certificate on {hostname} is outdated. Upload needed.") - logger.debug(self.cert_manager.get_certificate_info(source_cert)) + self.stats['total'] += 1 + + hostname = self.config.get(section, 'hostname') + port = self.config.getint(section, 'port', fallback=22) + username = self.config.get(section, 'username', fallback='root') + device_type = self.config.get(section, 'type', fallback='standard') + + if self.config.has_option(section, 'ssh_key_path'): + ssh_key = self.config.get(section, 'ssh_key_path') else: - logger.warning(f"Could not compare certificates. Proceeding with upload.") - - # Connect and upload - ssh = SSHManager(hostname, port, username, ssh_key) - - if not ssh.connect(): - self.stats['failed'] += 1 - return False - - # Upload certificate - if not ssh.upload_file(source_cert_path, remote_cert_path): - ssh.disconnect() - self.stats['failed'] += 1 - return False - - # Upload private key if remote_key_path is specified - if self.config.has_option(section, 'remote_key_path'): - remote_key_path = self.config.get(section, 'remote_key_path') - source_key_path = self.get_key_path(section, source_cert_path) + ssh_key = self.config.get('global', 'default_ssh_key') - logger.info(f"Remote key path: {remote_key_path}") - logger.info(f"Uploading private key: {source_key_path} -> {remote_key_path}") + if self.config.has_option(section, 'source_cert_path'): + source_cert_path = self.config.get(section, 'source_cert_path') + logger.info(f"Using host-specific certificate") + else: + source_cert_path = self.config.get('global', 'source_cert_path') - if not os.path.exists(source_key_path): - logger.error(f"Private key file not found: {source_key_path}") + if not os.path.exists(source_cert_path): + logger.error(f"Certificate not found: {source_cert_path}") + self.stats['failed'] += 1 + return False + + logger.info(f"Host: {hostname}:{port}") + logger.info(f"Type: {device_type}") + logger.info(f"User: {username}") + + if device_type.lower() == 'mikrotik': + return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path) + elif device_type.lower() == 'proxmox': + return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path) + + remote_cert_path = self.config.get(section, 'remote_cert_path') + post_upload_command = self.config.get(section, 'post_upload_command', fallback='') + check_url = self.config.get(section, 'check_url', fallback='') + + if check_url: + logger.info(f"Checking certificate at {check_url}") + source_cert = self.cert_manager.get_cert_from_file(source_cert_path) + remote_cert = self.cert_manager.get_cert_from_url(check_url) + + if source_cert and remote_cert: + if self.cert_manager.compare_certificates(source_cert, remote_cert): + logger.info(f"✓ Certificate is up to date. Skipping.") + self.stats['skipped'] += 1 + return True + else: + logger.info(f"Certificate is outdated. Uploading.") + else: + logger.warning(f"Could not compare certificates. Proceeding.") + + ssh = SSHManager(hostname, port, username, ssh_key) + + if not ssh.connect(): + self.stats['failed'] += 1 + return False + + if not ssh.upload_file(source_cert_path, remote_cert_path): ssh.disconnect() self.stats['failed'] += 1 return False - if not ssh.upload_file(source_key_path, remote_key_path): - logger.warning(f"Failed to upload private key to {remote_key_path}") - # Continue anyway - key might be uploaded via additional_files - - # Upload additional files if specified - if self.config.has_option(section, 'additional_files'): - additional_files = self.config.get(section, 'additional_files') - # Format: local_path:remote_path,local_path:remote_path - for file_pair in additional_files.split(','): - if ':' in file_pair: - local, remote = file_pair.strip().split(':', 1) - logger.info(f"Uploading additional file: {local} -> {remote}") - if not ssh.upload_file(local, remote): - logger.warning(f"Failed to upload additional file: {local}") - - # Execute post-upload command - if post_upload_command: - logger.info(f"Executing post-upload command: {post_upload_command}") - success, stdout, stderr = ssh.execute_command(post_upload_command) + if self.config.has_option(section, 'remote_key_path'): + remote_key_path = self.config.get(section, 'remote_key_path') + source_key_path = self.get_key_path(section, source_cert_path) + + logger.info(f"Uploading private key") + + if not os.path.exists(source_key_path): + logger.error(f"Private key not found: {source_key_path}") + ssh.disconnect() + self.stats['failed'] += 1 + return False + + if not ssh.upload_file(source_key_path, remote_key_path): + logger.warning(f"Failed to upload private key") - if not success: - logger.warning(f"Post-upload command failed, but file was uploaded successfully") - else: - logger.info(f"✓ Post-upload command completed successfully") - - ssh.disconnect() - self.stats['uploaded'] += 1 - logger.info(f"✓ Successfully processed {section}") - return True - - except Exception as e: - logger.error(f"Failed to process host {section}: {e}", exc_info=True) - self.stats['failed'] += 1 - return False + if self.config.has_option(section, 'additional_files'): + additional_files = self.config.get(section, 'additional_files') + for file_pair in additional_files.split(','): + if ':' in file_pair: + local, remote = file_pair.strip().split(':', 1) + logger.info(f"Uploading additional: {os.path.basename(local)}") + if not ssh.upload_file(local, remote): + logger.warning(f"Failed to upload additional file") + + if post_upload_command: + logger.info(f"Executing post-upload command") + success, stdout, stderr = ssh.execute_command(post_upload_command) + + if not success: + logger.warning(f"Post-upload command failed") + else: + logger.info(f"✓ Post-upload command completed") + + ssh.disconnect() + self.stats['uploaded'] += 1 + logger.info(f"✓ Host processed successfully") + return True + + except Exception as e: + logger.error(f"Failed to process host: {e}") + self.stats['failed'] += 1 + return False def run(self): """Main execution method""" logger.info("="*60) - logger.info(" CertPusher - SSL Certificate Distribution Tool") + logger.info(" CertPusher - SSL Certificate Distribution") logger.info("="*60) - logger.info(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"Log file: {LOG_FILE}") logger.info("") if not self.load_config(): - logger.error("Configuration loading failed. Exiting.") + logger.error("Configuration loading failed") sys.exit(1) - # Verify source certificate exists source_cert = self.config.get('global', 'source_cert_path') if not os.path.exists(source_cert): logger.error(f"Source certificate not found: {source_cert}") @@ -783,12 +669,10 @@ class CertPusher: logger.info(f"Source certificate: {source_cert}") - # Display certificate info cert = self.cert_manager.get_cert_from_file(source_cert) if cert: logger.info(self.cert_manager.get_certificate_info(cert)) - # Process each host for section in self.config.sections(): if section == 'global': continue @@ -796,19 +680,18 @@ class CertPusher: try: self.process_host(section) except Exception as e: - logger.error(f"Unexpected error processing {section}: {e}", exc_info=True) + logger.error(f"Unexpected error: {e}") self.stats['failed'] += 1 - # Print summary logger.info("\n" + "="*60) - logger.info(" DEPLOYMENT SUMMARY") + logger.info(" SUMMARY") logger.info("="*60) - logger.info(f"Total hosts: {self.stats['total']}") - logger.info(f"✓ Uploaded: {self.stats['uploaded']}") - logger.info(f"○ Skipped (up to date): {self.stats['skipped']}") - logger.info(f"✗ Failed: {self.stats['failed']}") + logger.info(f"Total hosts: {self.stats['total']}") + logger.info(f"✓ Uploaded: {self.stats['uploaded']}") + logger.info(f"○ Skipped: {self.stats['skipped']}") + logger.info(f"✗ Failed: {self.stats['failed']}") logger.info("="*60) - logger.info(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") if self.stats['failed'] > 0: sys.exit(1) @@ -816,6 +699,26 @@ class CertPusher: def main(): """Entry point""" + parser = argparse.ArgumentParser( + description='CertPusher - SSL Certificate Distribution Tool', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s config.ini # Normal operation + %(prog)s config.ini --debug # Debug mode with verbose logging + %(prog)s config.ini -d # Same as above + """ + ) + + parser.add_argument('config', help='Path to configuration file') + parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging') + parser.add_argument('-v', '--version', action='version', version='CertPusher 1.0') + + args = parser.parse_args() + + # Setup logging based on debug flag + setup_logging(debug=args.debug) + print(""" ╔═══════════════════════════════════════════════════════════╗ ║ CertPusher v1.0 ║ @@ -823,20 +726,12 @@ def main(): ╚═══════════════════════════════════════════════════════════╝ """) - if len(sys.argv) < 2: - print("Usage: python certpusher.py ") - print("Example: python certpusher.py config.ini") - print("") - sys.exit(1) - - config_file = sys.argv[1] - - if not os.path.exists(config_file): - print(f"Error: Configuration file '{config_file}' not found") + if not os.path.exists(args.config): + print(f"Error: Configuration file '{args.config}' not found") sys.exit(1) try: - pusher = CertPusher(config_file) + pusher = CertPusher(args.config) pusher.run() except KeyboardInterrupt: print("\n\nInterrupted by user. Exiting...")