From b8751516bdb6008c74fbbc7d7bbd4b67fa2f83bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 27 Oct 2025 08:16:50 +0100 Subject: [PATCH] mikrotik check cert --- certpusher.py | 458 +++++++++++++++++--------------------------------- 1 file changed, 153 insertions(+), 305 deletions(-) diff --git a/certpusher.py b/certpusher.py index 2fe89e7..8b0117e 100644 --- a/certpusher.py +++ b/certpusher.py @@ -289,316 +289,164 @@ class MikroTikManager(SSHManager): self.cert_name = "letsencrypt" self.key_name = "letsencrypt-key" - def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: - """Check if certificate on MikroTik needs update""" - try: - logger.info("Checking MikroTik certificate") - - # Get source certificate common-name - source_cn = source_cert.subject.rfc4514_string() - source_expiry = source_cert.not_valid_after_utc - - # Extract CN value (e.g., "*.linuxiarz.pl" from "CN=*.linuxiarz.pl") - cn_value = source_cn.split("CN=")[1].split(",")[0] if "CN=" in source_cn else source_cn - - logger.info(f"Looking for cert with CN: {cn_value}") - logger.info(f"Source expires: {source_expiry}") - - # Search by common-name (escape wildcards for MikroTik) - cn_search = cn_value.replace("*", "\\*") - success, stdout, stderr = self.execute_command( - f'/certificate print detail where common-name="{cn_search}"', - ignore_error=True - ) - - if not success or not stdout or 'invalid-after' not in stdout.lower(): - logger.info("Certificate not found. Upload needed.") - return True - - logger.debug(f"Found certificate:\n{stdout[:500]}") - - # Parse expiry (RouterOS 7.x format: 2026-01-22 08:34:12) - invalid_after_match = re.search( - r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', - stdout, - re.IGNORECASE - ) - - if not invalid_after_match: - logger.warning("Could not parse expiry") - return True - - mikrotik_expiry_str = invalid_after_match.group(1) - + + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: + """Upload certificate to MikroTik""" try: - mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S') - mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) - except ValueError: - logger.warning(f"Could not parse date: {mikrotik_expiry_str}") - return True - - logger.info(f"MikroTik expires: {mikrotik_expiry}") - - time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) - - # Allow 24h tolerance - if time_diff < 86400: - logger.info("✓ Certificate is current. Skipping.") - return False - else: - logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.") - return True + if check_first and source_cert: + if not self.check_certificate_expiry(source_cert): + return True, False - except Exception as e: - logger.warning(f"Error checking: {e}") - import traceback - logger.debug(traceback.format_exc()) - return True - - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: - """Upload certificate to MikroTik""" - try: - if check_first and source_cert: - if not self.check_certificate_expiry(source_cert): - return True, False - - logger.info("Deploying MikroTik certificate") - - # Step 1: Disable www-ssl - logger.debug("Disabling www-ssl service") - self.execute_command('/ip service disable www-ssl', ignore_error=True) - - import time - time.sleep(1) - - # Step 2: Remove old certificates with our name - logger.debug("Cleaning up old certificates") - cleanup_commands = [ - f'/certificate remove [find name~"{self.cert_name}"]', - f'/file remove "{self.cert_name}.pem"', - f'/file remove "{self.key_name}.pem"', - ] - - for cmd in cleanup_commands: - self.execute_command(cmd, ignore_error=True) - - time.sleep(1) - - # Step 3: Upload certificate - logger.info(f"Uploading certificate as: {self.cert_name}.pem") - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(cert_path, f'{self.cert_name}.pem') - - # Step 4: Upload private key - if key_path: - logger.info(f"Uploading key as: {self.key_name}.pem") - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(key_path, f'{self.key_name}.pem') - - time.sleep(1) - - # Step 5: Import certificate (fullchain includes both cert and chain) - logger.info("Importing certificate") - import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' - success, stdout, stderr = self.execute_command(import_cmd, timeout=30) - - if not success: - logger.error(f"Certificate import failed: {stderr}") - # Show available files for debugging - self.execute_command('/file print where name~".pem"') - return False, False - - time.sleep(2) - - # Step 6: Find imported certificate name - success, stdout, stderr = self.execute_command( - f'/certificate print terse where name~"{self.cert_name}"' - ) - - if not success or not stdout: - logger.error("Could not find imported certificate") - return False, False - - logger.debug(f"Imported certificates:\n{stdout}") - - # Extract certificate name (MikroTik adds _0, _1 suffixes) - # Format: 0 name="letsencrypt_0" ... - cert_names = re.findall(r'name="([^"]+)"', stdout) - - if not cert_names: - logger.error("Could not parse certificate names") - return False, False - - # Use first certificate (usually the actual cert, not CA) - imported_cert_name = cert_names[0] - logger.info(f"Using certificate: {imported_cert_name}") - - # Step 7: Configure www-ssl service - logger.info("Configuring www-ssl service") - config_cmd = f'/ip service set www-ssl certificate={imported_cert_name}' - success, stdout, stderr = self.execute_command(config_cmd) - - if not success: - logger.error(f"Failed to set certificate: {stderr}") - # Show available certificates - self.execute_command('/certificate print') - return False, False - - # Step 8: Enable www-ssl - self.execute_command('/ip service enable www-ssl', ignore_error=True) - - time.sleep(1) - - # Step 9: Verify - success, stdout, stderr = self.execute_command( - '/ip service print where name="www-ssl"' - ) - - if success: - logger.debug(f"Service status:\n{stdout}") - - logger.info(f"✓ MikroTik deployment successful") - return True, True - - except Exception as e: - logger.error(f"MikroTik deployment failed: {e}") - import traceback - logger.debug(traceback.format_exc()) - return False, False - - - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: - """Upload certificate to MikroTik""" - try: - if check_first and source_cert: - if not self.check_certificate_expiry(source_cert): - return True, False - - logger.info("Deploying MikroTik certificate") - - # Disable service - self.execute_command('/ip service disable www-ssl', ignore_error=True) - - # Remove OLD certificates with our naming pattern - cleanup_commands = [ - f'/certificate remove [find name~"certpusher"]', - f'/file remove [find name~"certpusher"]', - ] - - for cmd in cleanup_commands: - self.execute_command(cmd, ignore_error=True) - - # Upload with unique name - logger.info(f"Uploading as: {self.cert_name}") - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(cert_path, f'{self.cert_name}.pem') - - if key_path: - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(key_path, f'{self.cert_name}-key.pem') - - # Import certificate - logger.info("Importing certificate") - import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' - if key_path: - import_cmd += f' name={self.cert_name}' - - self.execute_command(import_cmd, timeout=30) - - import time - time.sleep(2) - - # Find the imported certificate (MikroTik adds _0, _1 suffixes) - success, stdout, stderr = self.execute_command( - f'/certificate print where name~"{self.cert_name}"' - ) - - if success and stdout: - logger.debug(f"Imported certificates:\n{stdout}") + logger.info("Deploying MikroTik certificate") - # Extract certificate name (usually cert_name_0) - cert_match = re.search(r'name="([^"]+)"', stdout) - if cert_match: - imported_name = cert_match.group(1) - logger.info(f"Certificate imported as: {imported_name}") - - # Configure service - config_commands = [ - f'/ip service set www-ssl certificate={imported_name}', - '/ip service enable www-ssl', - ] - - for cmd in config_commands: - self.execute_command(cmd, ignore_error=True) - else: - logger.warning("Could not find imported certificate name, using default") - self.execute_command(f'/ip service set www-ssl certificate={self.cert_name}_0', ignore_error=True) - self.execute_command('/ip service enable www-ssl', ignore_error=True) - - logger.info(f"✓ MikroTik deployment successful") - return True, True - - except Exception as e: - logger.error(f"MikroTik deployment failed: {e}") - return False, False + # Step 1: Disable www-ssl + logger.info("Disabling www-ssl service") + self.execute_command('/ip service disable www-ssl', ignore_error=True) + + import time + time.sleep(1) + + # Step 2: Remove old files and certificates + logger.info("Cleaning up old certificates") + cleanup_commands = [ + '/certificate remove [find name~"letsencrypt"]', + '/file remove "letsencrypt.pem"', + '/file remove "letsencrypt-key.pem"', + ] + + for cmd in cleanup_commands: + self.execute_command(cmd, ignore_error=True) + + time.sleep(1) + + # Step 3: Show current files (debug) + logger.debug("Files before upload:") + success, stdout, stderr = self.execute_command('/file print', ignore_error=True) + if stdout: + logger.debug(stdout[:500]) + + # Step 4: Upload certificate via SCP + logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem") + try: + with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: + # Upload to root directory + scp.put(cert_path, 'letsencrypt.pem') + logger.info("✓ Certificate file uploaded") + except Exception as e: + logger.error(f"SCP upload failed: {e}") + return False, False + + # Step 5: Upload private key + if key_path: + logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem") + try: + with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: + scp.put(key_path, 'letsencrypt-key.pem') + logger.info("✓ Key file uploaded") + except Exception as e: + logger.error(f"Key upload failed: {e}") + return False, False + + time.sleep(2) + + # Step 6: Verify files were uploaded + logger.info("Verifying uploaded files...") + success, stdout, stderr = self.execute_command( + '/file print where name~"letsencrypt"', + ignore_error=True + ) + + if not success or 'letsencrypt.pem' not in stdout: + logger.error("Certificate file not found on MikroTik!") + logger.error("Available files:") + self.execute_command('/file print') + return False, False + + logger.info("✓ Files verified on MikroTik") + logger.debug(f"Files:\n{stdout}") + + # Step 7: Import certificate + logger.info("Importing certificate into MikroTik") + import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""' + success, stdout, stderr = self.execute_command(import_cmd, timeout=30) + + if not success: + logger.error(f"Import failed: {stderr}") + logger.error("Trying to diagnose...") + # Show file details + self.execute_command('/file print detail where name="letsencrypt.pem"') + # Try to see if file is readable + self.execute_command('/file print file-name=letsencrypt.pem') + return False, False + + logger.info("✓ Certificate imported") + logger.debug(f"Import output: {stdout}") + + time.sleep(2) + + # Step 8: Find imported certificate + logger.info("Looking for imported certificate...") + success, stdout, stderr = self.execute_command( + '/certificate print terse where name~"letsencrypt"' + ) + + if not success or not stdout: + logger.error("Could not find imported certificate!") + logger.error("All certificates:") + self.execute_command('/certificate print') + return False, False + + logger.debug(f"Found certificates:\n{stdout}") + + # Parse certificate names + cert_names = re.findall(r'name="([^"]+)"', stdout) + + if not cert_names: + logger.error("Could not parse certificate names") + return False, False + + # Use the first one (usually the leaf certificate) + imported_cert_name = cert_names[0] + logger.info(f"Using certificate: {imported_cert_name}") + + # Step 9: Configure www-ssl service + logger.info("Configuring www-ssl to use new certificate") + config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"' + success, stdout, stderr = self.execute_command(config_cmd) + + if not success: + logger.error(f"Failed to configure service: {stderr}") + logger.error("Available certificates:") + self.execute_command('/certificate print') + return False, False + + logger.info("✓ Service configured") + + # Step 10: Enable www-ssl + logger.info("Enabling www-ssl service") + self.execute_command('/ip service enable www-ssl') + + time.sleep(1) + + # Step 11: Verify service status + success, stdout, stderr = self.execute_command( + '/ip service print where name="www-ssl"' + ) + + if success and stdout: + logger.info("Service status:") + logger.info(stdout) + + logger.info(f"✓ MikroTik deployment completed successfully") + return True, True + + except Exception as e: + logger.error(f"MikroTik deployment failed: {e}") + import traceback + logger.error(traceback.format_exc()) + return False, False - - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: - """ - Upload certificate to MikroTik - Returns (success, was_uploaded) - """ - try: - if check_first and source_cert: - if not self.check_certificate_expiry(source_cert): - return True, False - - logger.info("Deploying MikroTik certificate") - - self.execute_command('/ip service disable www-ssl', ignore_error=True) - - cleanup_commands = [ - f'/certificate remove [find name~"{self.cert_name}"]', - f'/file remove "{self.cert_name}.pem"', - ] - - if key_path: - cleanup_commands.append(f'/file remove "ssl-key.pem"') - - for cmd in cleanup_commands: - self.execute_command(cmd, ignore_error=True) - - logger.info("Uploading certificate") - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(cert_path, f'{self.cert_name}.pem') - - if key_path: - logger.info("Uploading private key") - with SCPClient(self.ssh_client.get_transport()) as scp: - scp.put(key_path, 'ssl-key.pem') - - logger.info("Importing certificate") - self.execute_command(f'/certificate import file-name={self.cert_name}.pem passphrase=""', timeout=30) - - import time - time.sleep(2) - - config_commands = [ - f'/ip service set www-ssl certificate={self.cert_name}_0', - '/ip service enable www-ssl', - ] - - for cmd in config_commands: - self.execute_command(cmd, ignore_error=True) - - logger.info(f"✓ MikroTik deployment successful") - return True, True - - except Exception as e: - logger.error(f"MikroTik deployment failed: {e}") - return False, False - class ProxmoxManager(SSHManager): """Specialized manager for Proxmox VE servers"""