From d3a10d2734bdc13efba754a44dcb6637952d6bff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 27 Oct 2025 08:32:26 +0100 Subject: [PATCH] mikrotik check cert --- certpusher.py | 380 ++++++++++++++++++++++++++------------------------ 1 file changed, 194 insertions(+), 186 deletions(-) diff --git a/certpusher.py b/certpusher.py index 8b0117e..abd15c2 100644 --- a/certpusher.py +++ b/certpusher.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ CertPusher - Automated SSL Certificate Distribution Tool -Version 1.1 - With serial number normalization +Version 1.1 - Production Ready """ import configparser @@ -52,15 +52,8 @@ def normalize_serial(serial: str) -> str: """ Normalize certificate serial number Removes leading zeros, colons, spaces and converts to uppercase - - Examples: - '05BC46A0...' -> '5BC46A0...' - '0005BC...' -> '5BC...' - '00' -> '0' """ - # Remove formatting characters normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '') - # Remove leading zeros but keep at least one digit normalized = normalized.lstrip('0') or '0' return normalized @@ -112,7 +105,6 @@ class CertificateManager: try: serial1 = normalize_serial(format(cert1.serial_number, 'X').upper()) serial2 = normalize_serial(format(cert2.serial_number, 'X').upper()) - logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}") return serial1 == serial2 except Exception as e: @@ -233,10 +225,7 @@ class SSHManager: return False, "", str(e) def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool: - """ - Check if remote certificate matches source certificate - Returns True if upload needed, False if certificates match - """ + """Check if remote certificate matches source certificate""" try: logger.info("Checking remote certificate via SSH") @@ -252,7 +241,6 @@ class SSHManager: remote_serial_raw = serial_match.group(1).upper() source_serial_raw = format(source_cert.serial_number, 'X').upper() - # Normalize both serials remote_serial = normalize_serial(remote_serial_raw) source_serial = normalize_serial(source_serial_raw) @@ -285,185 +273,215 @@ class MikroTikManager(SSHManager): def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) - # Use simple name without special characters self.cert_name = "letsencrypt" self.key_name = "letsencrypt-key" - - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: - """Upload certificate to MikroTik""" + def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: + """Check if certificate on MikroTik needs update""" + try: + logger.info("Checking MikroTik certificate") + + source_expiry = source_cert.not_valid_after_utc + + # Look for certificate with our name pattern + success, stdout, stderr = self.execute_command( + '/certificate print detail where name~"letsencrypt"', + ignore_error=True + ) + + if not success or not stdout or 'invalid-after' not in stdout.lower(): + logger.info("Certificate not found. Upload needed.") + return True + + logger.debug(f"Found certificate:\n{stdout[:500]}") + + # Parse expiry (RouterOS 7.x format) + invalid_after_match = re.search( + r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', + stdout, + re.IGNORECASE + ) + + if not invalid_after_match: + logger.warning("Could not parse expiry") + return True + + mikrotik_expiry_str = invalid_after_match.group(1) + try: - if check_first and source_cert: - if not self.check_certificate_expiry(source_cert): - return True, False + mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S') + mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) + except ValueError: + logger.warning(f"Could not parse date: {mikrotik_expiry_str}") + return True + + logger.info(f"Source expires: {source_expiry}") + logger.info(f"MikroTik expires: {mikrotik_expiry}") + + time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) + + if time_diff < 86400: + logger.info("✓ Certificate is current. Skipping.") + return False + else: + logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.") + return True - logger.info("Deploying MikroTik certificate") - - # Step 1: Disable www-ssl - logger.info("Disabling www-ssl service") - self.execute_command('/ip service disable www-ssl', ignore_error=True) - - import time - time.sleep(1) - - # Step 2: Remove old files and certificates - logger.info("Cleaning up old certificates") - cleanup_commands = [ - '/certificate remove [find name~"letsencrypt"]', - '/file remove "letsencrypt.pem"', - '/file remove "letsencrypt-key.pem"', - ] - - for cmd in cleanup_commands: - self.execute_command(cmd, ignore_error=True) - - time.sleep(1) - - # Step 3: Show current files (debug) - logger.debug("Files before upload:") - success, stdout, stderr = self.execute_command('/file print', ignore_error=True) - if stdout: - logger.debug(stdout[:500]) - - # Step 4: Upload certificate via SCP - logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem") + except Exception as e: + logger.warning(f"Error checking: {e}") + return True + + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: + """Upload certificate to MikroTik""" + try: + if check_first and source_cert: + if not self.check_certificate_expiry(source_cert): + return True, False + + logger.info("Deploying MikroTik certificate") + + # Step 1: Disable www-ssl + logger.info("Disabling www-ssl service") + self.execute_command('/ip service disable www-ssl', ignore_error=True) + + import time + time.sleep(1) + + # Step 2: Remove old files and certificates + logger.info("Cleaning up old certificates") + cleanup_commands = [ + '/certificate remove [find name~"letsencrypt"]', + '/file remove "letsencrypt.pem"', + '/file remove "letsencrypt-key.pem"', + ] + + for cmd in cleanup_commands: + self.execute_command(cmd, ignore_error=True) + + time.sleep(1) + + # Step 3: Upload certificate via SCP + logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem") + try: + with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: + scp.put(cert_path, 'letsencrypt.pem') + logger.info("✓ Certificate file uploaded") + except Exception as e: + logger.error(f"SCP upload failed: {e}") + return False, False + + # Step 4: Upload private key + if key_path: + logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem") try: with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: - # Upload to root directory - scp.put(cert_path, 'letsencrypt.pem') - logger.info("✓ Certificate file uploaded") + scp.put(key_path, 'letsencrypt-key.pem') + logger.info("✓ Key file uploaded") except Exception as e: - logger.error(f"SCP upload failed: {e}") + logger.error(f"Key upload failed: {e}") return False, False - - # Step 5: Upload private key - if key_path: - logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem") - try: - with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: - scp.put(key_path, 'letsencrypt-key.pem') - logger.info("✓ Key file uploaded") - except Exception as e: - logger.error(f"Key upload failed: {e}") - return False, False - - time.sleep(2) - - # Step 6: Verify files were uploaded - logger.info("Verifying uploaded files...") - success, stdout, stderr = self.execute_command( - '/file print where name~"letsencrypt"', - ignore_error=True - ) - - if not success or 'letsencrypt.pem' not in stdout: - logger.error("Certificate file not found on MikroTik!") - logger.error("Available files:") - self.execute_command('/file print') - return False, False - - logger.info("✓ Files verified on MikroTik") - logger.debug(f"Files:\n{stdout}") - - # Step 7: Import certificate - logger.info("Importing certificate into MikroTik") - import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""' - success, stdout, stderr = self.execute_command(import_cmd, timeout=30) - - if not success: - logger.error(f"Import failed: {stderr}") - logger.error("Trying to diagnose...") - # Show file details - self.execute_command('/file print detail where name="letsencrypt.pem"') - # Try to see if file is readable - self.execute_command('/file print file-name=letsencrypt.pem') - return False, False - - logger.info("✓ Certificate imported") - logger.debug(f"Import output: {stdout}") - - time.sleep(2) - - # Step 8: Find imported certificate - logger.info("Looking for imported certificate...") - success, stdout, stderr = self.execute_command( - '/certificate print terse where name~"letsencrypt"' - ) - - if not success or not stdout: - logger.error("Could not find imported certificate!") - logger.error("All certificates:") - self.execute_command('/certificate print') - return False, False - - logger.debug(f"Found certificates:\n{stdout}") - - # Parse certificate names - cert_names = re.findall(r'name="([^"]+)"', stdout) - - if not cert_names: - logger.error("Could not parse certificate names") - return False, False - - # Use the first one (usually the leaf certificate) - imported_cert_name = cert_names[0] - logger.info(f"Using certificate: {imported_cert_name}") - - # Step 9: Configure www-ssl service - logger.info("Configuring www-ssl to use new certificate") - config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"' - success, stdout, stderr = self.execute_command(config_cmd) - - if not success: - logger.error(f"Failed to configure service: {stderr}") - logger.error("Available certificates:") - self.execute_command('/certificate print') - return False, False - - logger.info("✓ Service configured") - - # Step 10: Enable www-ssl - logger.info("Enabling www-ssl service") - self.execute_command('/ip service enable www-ssl') - - time.sleep(1) - - # Step 11: Verify service status - success, stdout, stderr = self.execute_command( - '/ip service print where name="www-ssl"' - ) - - if success and stdout: - logger.info("Service status:") - logger.info(stdout) - - logger.info(f"✓ MikroTik deployment completed successfully") - return True, True - - except Exception as e: - logger.error(f"MikroTik deployment failed: {e}") - import traceback - logger.error(traceback.format_exc()) + + time.sleep(2) + + # Step 5: Verify files were uploaded + logger.info("Verifying uploaded files...") + success, stdout, stderr = self.execute_command( + '/file print where name~"letsencrypt"', + ignore_error=True + ) + + if not success or 'letsencrypt.pem' not in stdout: + logger.error("Certificate file not found on MikroTik!") + logger.error("Available files:") + self.execute_command('/file print') return False, False - + + logger.info("✓ Files verified on MikroTik") + logger.debug(f"Files:\n{stdout}") + + # Step 6: Import certificate + logger.info("Importing certificate into MikroTik") + import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""' + success, stdout, stderr = self.execute_command(import_cmd, timeout=30) + + if not success: + logger.error(f"Import failed: {stderr}") + self.execute_command('/file print detail where name="letsencrypt.pem"') + return False, False + + logger.info("✓ Certificate imported") + logger.debug(f"Import output: {stdout}") + + time.sleep(2) + + # Step 7: Find imported certificate + logger.info("Looking for imported certificate...") + success, stdout, stderr = self.execute_command( + '/certificate print terse where name~"letsencrypt"' + ) + + if not success or not stdout: + logger.error("Could not find imported certificate!") + self.execute_command('/certificate print') + return False, False + + logger.debug(f"Found certificates:\n{stdout}") + + # Parse certificate names + cert_names = re.findall(r'name="([^"]+)"', stdout) + + if not cert_names: + logger.error("Could not parse certificate names") + return False, False + + imported_cert_name = cert_names[0] + logger.info(f"Using certificate: {imported_cert_name}") + + # Step 8: Configure www-ssl service + logger.info("Configuring www-ssl to use new certificate") + config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"' + success, stdout, stderr = self.execute_command(config_cmd) + + if not success: + logger.error(f"Failed to configure service: {stderr}") + self.execute_command('/certificate print') + return False, False + + logger.info("✓ Service configured") + + # Step 9: Enable www-ssl + logger.info("Enabling www-ssl service") + self.execute_command('/ip service enable www-ssl') + + time.sleep(1) + + # Step 10: Verify service status + success, stdout, stderr = self.execute_command( + '/ip service print where name="www-ssl"' + ) + + if success and stdout: + logger.debug(f"Service status:\n{stdout}") + + logger.info(f"✓ MikroTik deployment completed successfully") + return True, True + + except Exception as e: + logger.error(f"MikroTik deployment failed: {e}") + import traceback + logger.error(traceback.format_exc()) + return False, False class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers""" def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool: - """ - Check if certificate on Proxmox needs update - Returns True if upload needed, False if certificates match - - Uses two methods: - 1. SSH: Direct file check (preferred) - 2. URL: HTTPS check (fallback) - """ + """Check if certificate on Proxmox needs update""" try: logger.info("Checking Proxmox certificate") - # Method 1: Check via SSH - Direct file access + # Method 1: Check via SSH success, stdout, stderr = self.execute_command( 'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null', ignore_error=True @@ -476,7 +494,6 @@ class ProxmoxManager(SSHManager): proxmox_serial_raw = serial_match.group(1).upper() source_serial_raw = format(source_cert.serial_number, 'X').upper() - # Normalize both serials (remove leading zeros) proxmox_serial = normalize_serial(proxmox_serial_raw) source_serial = normalize_serial(source_serial_raw) @@ -497,13 +514,9 @@ class ProxmoxManager(SSHManager): remote_cert = cert_manager.get_cert_from_url(check_url) if remote_cert: - # Compare using normalized serials remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper()) source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper()) - logger.info(f"Source serial (URL): {source_serial}") - logger.info(f"Remote serial (URL): {remote_serial}") - if remote_serial == source_serial: logger.info("✓ Certificates match (URL check). Skipping upload.") return False @@ -518,12 +531,9 @@ class ProxmoxManager(SSHManager): logger.warning(f"Error checking: {e}. Proceeding with upload.") return True - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate, check_url: str) -> Tuple[bool, bool]: - """ - Upload certificate to Proxmox - Returns (success, was_uploaded) - """ + """Upload certificate to Proxmox""" try: if check_first and source_cert: if not self.check_certificate(source_cert, check_url): @@ -738,10 +748,8 @@ class CertPusher: upload_needed = True if check_first: - # Try SSH check first if not ssh.check_remote_certificate(remote_cert_path, source_cert): upload_needed = False - # Try URL check if SSH check failed elif check_url: logger.info(f"SSH check failed. Trying URL: {check_url}") remote_cert = self.cert_manager.get_cert_from_url(check_url)