diff --git a/certpusher.py b/certpusher.py index fb750da..b720ce5 100644 --- a/certpusher.py +++ b/certpusher.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ CertPusher - Automated SSL Certificate Distribution Tool -Version 1.1 - With unified certificate checking for all host types +Version 1.1 - With serial number normalization """ import configparser @@ -48,6 +48,23 @@ def setup_logging(debug: bool = False): logger = logging.getLogger('CertPusher') +def normalize_serial(serial: str) -> str: + """ + Normalize certificate serial number + Removes leading zeros, colons, spaces and converts to uppercase + + Examples: + '05BC46A0...' -> '5BC46A0...' + '0005BC...' -> '5BC...' + '00' -> '0' + """ + # Remove formatting characters + normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '') + # Remove leading zeros but keep at least one digit + normalized = normalized.lstrip('0') or '0' + return normalized + + class CertificateManager: """Manages certificate comparison and validation""" @@ -91,12 +108,12 @@ class CertificateManager: @staticmethod def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool: - """Compare two certificates by serial number""" + """Compare two certificates by normalized serial number""" try: - serial1 = format(cert1.serial_number, 'X').upper() - serial2 = format(cert2.serial_number, 'X').upper() + serial1 = normalize_serial(format(cert1.serial_number, 'X').upper()) + serial2 = normalize_serial(format(cert2.serial_number, 'X').upper()) - logger.debug(f"Comparing serials: {serial1} vs {serial2}") + logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}") return serial1 == serial2 except Exception as e: logger.error(f"Failed to compare certificates: {e}") @@ -110,9 +127,10 @@ class CertificateManager: valid_to = cert.not_valid_after_utc now = datetime.now(timezone.utc) days_left = (valid_to - now).days + serial = normalize_serial(format(cert.serial_number, 'X').upper()) return f"""Certificate: {subject} - Serial: {format(cert.serial_number, 'X').upper()} + Serial: {serial} Expires: {valid_to} Days left: {days_left}""" except Exception as e: @@ -222,7 +240,6 @@ class SSHManager: try: logger.info("Checking remote certificate via SSH") - # Try to read and compare certificate via SSH success, stdout, stderr = self.execute_command( f'openssl x509 -in {remote_cert_path} -noout -serial 2>/dev/null || echo "NOTFOUND"', ignore_error=True @@ -232,8 +249,12 @@ class SSHManager: serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE) if serial_match: - remote_serial = serial_match.group(1).upper() - source_serial = format(source_cert.serial_number, 'X').upper() + remote_serial_raw = serial_match.group(1).upper() + source_serial_raw = format(source_cert.serial_number, 'X').upper() + + # Normalize both serials + remote_serial = normalize_serial(remote_serial_raw) + source_serial = normalize_serial(source_serial_raw) logger.info(f"Source serial: {source_serial}") logger.info(f"Remote serial: {remote_serial}") @@ -246,7 +267,7 @@ class SSHManager: return True logger.warning("Could not read remote certificate via SSH") - return True # Upload if we can't verify + return True except Exception as e: logger.warning(f"Error checking remote certificate: {e}") @@ -316,7 +337,7 @@ class MikroTikManager(SSHManager): try: if check_first and source_cert: if not self.check_certificate_expiry(source_cert): - return True, False # Success but skipped + return True, False logger.info("Deploying MikroTik certificate") @@ -357,7 +378,7 @@ class MikroTikManager(SSHManager): self.execute_command(cmd, ignore_error=True) logger.info(f"✓ MikroTik deployment successful") - return True, True # Success and uploaded + return True, True except Exception as e: logger.error(f"MikroTik deployment failed: {e}") @@ -368,11 +389,18 @@ class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool: - """Check if certificate on Proxmox needs update""" + """ + Check if certificate on Proxmox needs update + Returns True if upload needed, False if certificates match + + Uses two methods: + 1. SSH: Direct file check (preferred) + 2. URL: HTTPS check (fallback) + """ try: logger.info("Checking Proxmox certificate") - # Method 1: Check via SSH + # Method 1: Check via SSH - Direct file access success, stdout, stderr = self.execute_command( 'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null', ignore_error=True @@ -382,30 +410,45 @@ class ProxmoxManager(SSHManager): serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE) if serial_match: - proxmox_serial = serial_match.group(1).upper() - source_serial = format(source_cert.serial_number, 'X').upper() + proxmox_serial_raw = serial_match.group(1).upper() + source_serial_raw = format(source_cert.serial_number, 'X').upper() + + # Normalize both serials (remove leading zeros) + proxmox_serial = normalize_serial(proxmox_serial_raw) + source_serial = normalize_serial(source_serial_raw) logger.info(f"Source serial: {source_serial}") logger.info(f"Proxmox serial: {proxmox_serial}") if source_serial == proxmox_serial: - logger.info("✓ Certificates match. Skipping upload.") + logger.info("✓ Certificates match (SSH check). Skipping upload.") return False else: - logger.info("✗ Certificates differ. Upload needed.") + logger.info("✗ Certificates differ (SSH check). Upload needed.") return True # Method 2: Fallback to URL check if check_url: - logger.info("Trying URL-based check") + logger.info("SSH check failed. Trying URL-based check...") cert_manager = CertificateManager() remote_cert = cert_manager.get_cert_from_url(check_url) - if remote_cert and cert_manager.compare_certificates(source_cert, remote_cert): - logger.info("✓ Certificates match via URL. Skipping.") - return False + if remote_cert: + # Compare using normalized serials + remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper()) + source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper()) + + logger.info(f"Source serial (URL): {source_serial}") + logger.info(f"Remote serial (URL): {remote_serial}") + + if remote_serial == source_serial: + logger.info("✓ Certificates match (URL check). Skipping upload.") + return False + else: + logger.info("✗ Certificates differ (URL check). Upload needed.") + return True - logger.warning("Could not verify. Proceeding with upload.") + logger.warning("Could not verify certificate. Proceeding with upload for safety.") return True except Exception as e: @@ -421,7 +464,7 @@ class ProxmoxManager(SSHManager): try: if check_first and source_cert: if not self.check_certificate(source_cert, check_url): - return True, False # Success but skipped + return True, False logger.info("Deploying Proxmox certificate") @@ -637,7 +680,7 @@ class CertPusher: upload_needed = False # Try URL check if SSH check failed elif check_url: - logger.info(f"Checking via URL: {check_url}") + logger.info(f"SSH check failed. Trying URL: {check_url}") remote_cert = self.cert_manager.get_cert_from_url(check_url) if remote_cert and self.cert_manager.compare_certificates(source_cert, remote_cert): logger.info("✓ Certificate up to date via URL. Skipping.")