From 32c2dbc281c18847992b5e68c650bec32e348b4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 27 Oct 2025 09:00:57 +0100 Subject: [PATCH] mikrotik check cert --- certpusher.py | 215 +++++++++++++++++++++++---------------------- config.ini.example | 4 + 2 files changed, 113 insertions(+), 106 deletions(-) diff --git a/certpusher.py b/certpusher.py index 382c9b6..9754e9c 100644 --- a/certpusher.py +++ b/certpusher.py @@ -331,18 +331,32 @@ class MikroTikManager(SSHManager): logger.warning(f"Error checking: {e}") return True - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: - """Upload certificate to MikroTik""" + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate, services: list = None) -> Tuple[bool, bool]: + """ + Upload certificate to MikroTik + + Args: + cert_path: Path to certificate file + key_path: Path to private key file + check_first: Whether to check existing certificate first + source_cert: Source certificate object for comparison + services: List of services to configure (e.g. ['www-ssl', 'api-ssl']) + """ try: if check_first and source_cert: if not self.check_certificate_expiry(source_cert): return True, False - logger.info("Deploying MikroTik certificate") + # Default to www-ssl if not specified + if not services: + services = ['www-ssl'] - # Step 1: Disable www-ssl - logger.info("Disabling www-ssl service") - self.execute_command('/ip service disable www-ssl', ignore_error=True) + logger.info(f"Deploying MikroTik certificate for services: {', '.join(services)}") + + # Step 1: Disable all services that will use the certificate + for service in services: + logger.info(f"Disabling {service} service") + self.execute_command(f'/ip service disable {service}', ignore_error=True) import time time.sleep(1) @@ -392,7 +406,6 @@ class MikroTikManager(SSHManager): if not success or 'letsencrypt.pem' not in stdout: logger.error("Certificate file not found on MikroTik!") - logger.error("Available files:") self.execute_command('/file print') return False, False @@ -406,7 +419,6 @@ class MikroTikManager(SSHManager): if not success: logger.error(f"Import failed: {stderr}") - self.execute_command('/file print detail where name="letsencrypt.pem"') return False, False logger.info("✓ Certificate imported") @@ -414,76 +426,54 @@ class MikroTikManager(SSHManager): time.sleep(2) - # Step 7: Find imported certificate - logger.info("Looking for imported certificate...") - success, stdout, stderr = self.execute_command( - '/certificate print terse where name~"letsencrypt"' - ) - - if not success or not stdout: - logger.error("Could not find imported certificate!") - self.execute_command('/certificate print') - return False, False - - logger.debug(f"Found certificates:\n{stdout}") - - # Parse certificate names - terse format: "154 LT name=letsencrypt.pem_0" - # Try both formats (with and without quotes) - cert_names = re.findall(r'name="?([^"\s]+)"?', stdout) - - if not cert_names: - logger.error("Could not parse certificate names") - logger.error("Trying alternative parsing...") - # Alternative: parse lines - for line in stdout.split('\n'): - if 'name=' in line and 'letsencrypt' in line: - match = re.search(r'name=([^\s]+)', line) - if match: - cert_names.append(match.group(1)) - - if not cert_names: - logger.error("Still could not find certificate name!") - return False, False - - # Filter to get the leaf certificate (not intermediate CA) - # Usually it's the first one or the one with common-name matching our domain - imported_cert_name = None - for name in cert_names: - if '_0' in name: # Usually the leaf cert - imported_cert_name = name - break - - if not imported_cert_name: - imported_cert_name = cert_names[0] - + # Step 7: Use predictable certificate name + # MikroTik always creates filename_0 for the leaf certificate + imported_cert_name = "letsencrypt.pem_0" logger.info(f"Using certificate: {imported_cert_name}") - - # Step 8: Configure www-ssl service - logger.info("Configuring www-ssl to use new certificate") - config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"' - success, stdout, stderr = self.execute_command(config_cmd) + # Verify it exists + success, stdout, stderr = self.execute_command( + f'/certificate print where name="{imported_cert_name}"' + ) - if not success: - logger.error(f"Failed to configure service: {stderr}") - self.execute_command('/certificate print') - return False, False + if not success or not stdout: + logger.warning(f"Certificate {imported_cert_name} not found, trying to find it...") + success, stdout, stderr = self.execute_command('/certificate print where name~"letsencrypt"') + if stdout: + logger.debug(f"Available certificates:\n{stdout}") + # Try to parse actual name + match = re.search(r'name="?([a-zA-Z0-9._-]*letsencrypt[^"\s]*)"?', stdout) + if match: + imported_cert_name = match.group(1) + logger.info(f"Found certificate: {imported_cert_name}") - logger.info("✓ Service configured") + # Step 8: Configure all specified services + for service in services: + logger.info(f"Configuring {service} to use certificate") + config_cmd = f'/ip service set {service} certificate="{imported_cert_name}"' + success, stdout, stderr = self.execute_command(config_cmd) + + if not success: + logger.error(f"Failed to configure {service}: {stderr}") + logger.warning(f"Continuing with other services...") + else: + logger.info(f"✓ {service} configured") - # Step 9: Enable www-ssl - logger.info("Enabling www-ssl service") - self.execute_command('/ip service enable www-ssl') + # Step 9: Enable all services + for service in services: + logger.info(f"Enabling {service} service") + self.execute_command(f'/ip service enable {service}') time.sleep(1) - # Step 10: Verify service status - success, stdout, stderr = self.execute_command( - '/ip service print where name="www-ssl"' - ) - - if success and stdout: - logger.debug(f"Service status:\n{stdout}") + # Step 10: Verify services status + for service in services: + success, stdout, stderr = self.execute_command( + f'/ip service print where name="{service}"' + ) + + if success and stdout: + logger.debug(f"{service} status:\n{stdout}") logger.info(f"✓ MikroTik deployment completed successfully") return True, True @@ -637,44 +627,57 @@ class CertPusher: return cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: - """Process MikroTik device""" - try: - source_key_path = self.get_key_path(section, source_cert_path) - - if not os.path.exists(source_key_path): - logger.error(f"Key not found: {source_key_path}") - return False - - source_cert = self.cert_manager.get_cert_from_file(source_cert_path) - if not source_cert: - return False - - check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) - - mikrotik = MikroTikManager(hostname, port, username, ssh_key) - - if not mikrotik.connect(): - self.stats['failed'] += 1 - return False - - success, was_uploaded = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert) - mikrotik.disconnect() - - if success: - if was_uploaded: - self.stats['uploaded'] += 1 - else: - self.stats['skipped'] += 1 - logger.info("✓ MikroTik processed") - return True - else: - self.stats['failed'] += 1 - return False - - except Exception as e: - logger.error(f"MikroTik failed: {e}") + """Process MikroTik device""" + try: + source_key_path = self.get_key_path(section, source_cert_path) + + if not os.path.exists(source_key_path): + logger.error(f"Key not found: {source_key_path}") + return False + + source_cert = self.cert_manager.get_cert_from_file(source_cert_path) + if not source_cert: + return False + + check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) + + # Get services to configure (default: www-ssl) + services_str = self.config.get(section, 'mikrotik_services', fallback='www-ssl') + services = [s.strip() for s in services_str.split(',')] + + logger.info(f"Target services: {', '.join(services)}") + + mikrotik = MikroTikManager(hostname, port, username, ssh_key) + + if not mikrotik.connect(): self.stats['failed'] += 1 return False + + success, was_uploaded = mikrotik.upload_certificate( + source_cert_path, + source_key_path, + check_first, + source_cert, + services # Pass services list + ) + mikrotik.disconnect() + + if success: + if was_uploaded: + self.stats['uploaded'] += 1 + else: + self.stats['skipped'] += 1 + logger.info("✓ MikroTik processed") + return True + else: + self.stats['failed'] += 1 + return False + + except Exception as e: + logger.error(f"MikroTik failed: {e}") + self.stats['failed'] += 1 + return False + def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: """Process Proxmox server""" diff --git a/config.ini.example b/config.ini.example index 7b228ae..3ae2d14 100644 --- a/config.ini.example +++ b/config.ini.example @@ -23,6 +23,8 @@ username = admin ssh_key_path = /root/.ssh/id_rsa_proxy # Check certificate before upload (default: true) check_before_upload = true +mikrotik_services = www-ssl,api-ssl + [mikrotik_switch] type = mikrotik @@ -31,6 +33,8 @@ port = 22 username = admin # Always upload without checking check_before_upload = false +mikrotik_services = www-ssl,api-ssl + # ═══════════════════════════════════════════════════════════ # PROXMOX VE SERVERS