proxmox class

This commit is contained in:
Mateusz Gruszczyński
2025-10-27 07:58:05 +01:00
parent afafb8aeef
commit b9ae19865c

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
CertPusher - Automated SSL Certificate Distribution Tool
Version 1.1 - With unified certificate checking for all host types
Version 1.1 - With serial number normalization
"""
import configparser
@@ -48,6 +48,23 @@ def setup_logging(debug: bool = False):
logger = logging.getLogger('CertPusher')
def normalize_serial(serial: str) -> str:
"""
Normalize certificate serial number
Removes leading zeros, colons, spaces and converts to uppercase
Examples:
'05BC46A0...' -> '5BC46A0...'
'0005BC...' -> '5BC...'
'00' -> '0'
"""
# Remove formatting characters
normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '')
# Remove leading zeros but keep at least one digit
normalized = normalized.lstrip('0') or '0'
return normalized
class CertificateManager:
"""Manages certificate comparison and validation"""
@@ -91,12 +108,12 @@ class CertificateManager:
@staticmethod
def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool:
"""Compare two certificates by serial number"""
"""Compare two certificates by normalized serial number"""
try:
serial1 = format(cert1.serial_number, 'X').upper()
serial2 = format(cert2.serial_number, 'X').upper()
serial1 = normalize_serial(format(cert1.serial_number, 'X').upper())
serial2 = normalize_serial(format(cert2.serial_number, 'X').upper())
logger.debug(f"Comparing serials: {serial1} vs {serial2}")
logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}")
return serial1 == serial2
except Exception as e:
logger.error(f"Failed to compare certificates: {e}")
@@ -110,9 +127,10 @@ class CertificateManager:
valid_to = cert.not_valid_after_utc
now = datetime.now(timezone.utc)
days_left = (valid_to - now).days
serial = normalize_serial(format(cert.serial_number, 'X').upper())
return f"""Certificate: {subject}
Serial: {format(cert.serial_number, 'X').upper()}
Serial: {serial}
Expires: {valid_to}
Days left: {days_left}"""
except Exception as e:
@@ -222,7 +240,6 @@ class SSHManager:
try:
logger.info("Checking remote certificate via SSH")
# Try to read and compare certificate via SSH
success, stdout, stderr = self.execute_command(
f'openssl x509 -in {remote_cert_path} -noout -serial 2>/dev/null || echo "NOTFOUND"',
ignore_error=True
@@ -232,8 +249,12 @@ class SSHManager:
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
remote_serial = serial_match.group(1).upper()
source_serial = format(source_cert.serial_number, 'X').upper()
remote_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
# Normalize both serials
remote_serial = normalize_serial(remote_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Remote serial: {remote_serial}")
@@ -246,7 +267,7 @@ class SSHManager:
return True
logger.warning("Could not read remote certificate via SSH")
return True # Upload if we can't verify
return True
except Exception as e:
logger.warning(f"Error checking remote certificate: {e}")
@@ -316,7 +337,7 @@ class MikroTikManager(SSHManager):
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False # Success but skipped
return True, False
logger.info("Deploying MikroTik certificate")
@@ -357,7 +378,7 @@ class MikroTikManager(SSHManager):
self.execute_command(cmd, ignore_error=True)
logger.info(f"✓ MikroTik deployment successful")
return True, True # Success and uploaded
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
@@ -368,11 +389,18 @@ class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers"""
def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool:
"""Check if certificate on Proxmox needs update"""
"""
Check if certificate on Proxmox needs update
Returns True if upload needed, False if certificates match
Uses two methods:
1. SSH: Direct file check (preferred)
2. URL: HTTPS check (fallback)
"""
try:
logger.info("Checking Proxmox certificate")
# Method 1: Check via SSH
# Method 1: Check via SSH - Direct file access
success, stdout, stderr = self.execute_command(
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null',
ignore_error=True
@@ -382,30 +410,45 @@ class ProxmoxManager(SSHManager):
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
proxmox_serial = serial_match.group(1).upper()
source_serial = format(source_cert.serial_number, 'X').upper()
proxmox_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
# Normalize both serials (remove leading zeros)
proxmox_serial = normalize_serial(proxmox_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Proxmox serial: {proxmox_serial}")
if source_serial == proxmox_serial:
logger.info("✓ Certificates match. Skipping upload.")
logger.info("✓ Certificates match (SSH check). Skipping upload.")
return False
else:
logger.info("✗ Certificates differ. Upload needed.")
logger.info("✗ Certificates differ (SSH check). Upload needed.")
return True
# Method 2: Fallback to URL check
if check_url:
logger.info("Trying URL-based check")
logger.info("SSH check failed. Trying URL-based check...")
cert_manager = CertificateManager()
remote_cert = cert_manager.get_cert_from_url(check_url)
if remote_cert and cert_manager.compare_certificates(source_cert, remote_cert):
logger.info("✓ Certificates match via URL. Skipping.")
return False
if remote_cert:
# Compare using normalized serials
remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper())
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
logger.info(f"Source serial (URL): {source_serial}")
logger.info(f"Remote serial (URL): {remote_serial}")
if remote_serial == source_serial:
logger.info("✓ Certificates match (URL check). Skipping upload.")
return False
else:
logger.info("✗ Certificates differ (URL check). Upload needed.")
return True
logger.warning("Could not verify. Proceeding with upload.")
logger.warning("Could not verify certificate. Proceeding with upload for safety.")
return True
except Exception as e:
@@ -421,7 +464,7 @@ class ProxmoxManager(SSHManager):
try:
if check_first and source_cert:
if not self.check_certificate(source_cert, check_url):
return True, False # Success but skipped
return True, False
logger.info("Deploying Proxmox certificate")
@@ -637,7 +680,7 @@ class CertPusher:
upload_needed = False
# Try URL check if SSH check failed
elif check_url:
logger.info(f"Checking via URL: {check_url}")
logger.info(f"SSH check failed. Trying URL: {check_url}")
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if remote_cert and self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info("✓ Certificate up to date via URL. Skipping.")