From c42250196fdc3fa1d54734b4a01b23d02b2c014e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Mon, 27 Oct 2025 09:23:13 +0100 Subject: [PATCH] mikrotik check cert --- certpusher.py | 254 ++++++++++++++++++++++++-------------------------- 1 file changed, 123 insertions(+), 131 deletions(-) diff --git a/certpusher.py b/certpusher.py index 3940e8f..57ad726 100644 --- a/certpusher.py +++ b/certpusher.py @@ -270,11 +270,7 @@ class MikroTikManager(SSHManager): self.key_name = "letsencrypt-key" def check_certificate_expiry(self, source_cert: x509.Certificate, services: List[str]) -> bool: - """ - Check if certificate on MikroTik needs update - Also verifies that services are properly configured - Returns True if upload needed, False if everything is OK - """ + """Check if certificate on MikroTik needs update""" try: logger.info("Checking MikroTik certificate") @@ -307,7 +303,7 @@ class MikroTikManager(SSHManager): mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S') mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) except ValueError: - logger.warning(f"Could not parse date") + logger.warning("Could not parse date") return True logger.info(f"Source expires: {source_expiry}") @@ -316,11 +312,11 @@ class MikroTikManager(SSHManager): time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) if time_diff >= 86400: - logger.info(f"Certificate differs. Upload needed.") + logger.info("Certificate differs. Upload needed.") return True - # Certificate is current, but check if services are properly configured - logger.info("Certificate is current. Verifying services configuration...") + # Certificate is current, check services + logger.info("Certificate is current. Verifying services...") cert_name = "letsencrypt.pem_0" services_need_update = False @@ -332,7 +328,6 @@ class MikroTikManager(SSHManager): ) if success and stdout: - # Check if certificate is set correctly if f'certificate={cert_name}' not in stdout and 'certificate=letsencrypt' not in stdout: logger.warning(f"Service {service} not using correct certificate") services_need_update = True @@ -345,7 +340,7 @@ class MikroTikManager(SSHManager): if services_need_update: logger.info("Services need reconfiguration. Updating...") self.configure_services(services, cert_name) - return False # Don't need to upload cert, just reconfigure + return False logger.info("✓ Certificate and services are current. Skipping.") return False @@ -353,135 +348,132 @@ class MikroTikManager(SSHManager): except Exception as e: logger.warning(f"Error checking: {e}") return True - - def configure_services(self, services: List[str], cert_name: str): - """Configure services to use certificate without re-uploading""" - try: - for service in services: - logger.info(f"Configuring {service}") - - # Set certificate - success, _, stderr = self.execute_command( - f'/ip service set {service} certificate="{cert_name}"', - ignore_error=True - ) - - if success: - logger.info(f"✓ {service} configured") - else: - logger.warning(f"Failed to configure {service}: {stderr}") - - # Ensure service is enabled - success, stdout, _ = self.execute_command( - f'/ip service print where name="{service}"', - ignore_error=True - ) - - if 'disabled=yes' in stdout: - logger.info(f"Enabling {service}") - self.execute_command(f'/ip service enable {service}') - - except Exception as e: - logger.error(f"Service configuration failed: {e}") - - def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, - source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]: - """Upload certificate to MikroTik""" - try: - if not services: - services = ['www-ssl'] + + def configure_services(self, services: List[str], cert_name: str): + """Configure services to use certificate""" + try: + for service in services: + logger.info(f"Configuring {service}") - if check_first and source_cert: - if not self.check_certificate_expiry(source_cert, services): - return True, False # Certificate and services are OK - - logger.info(f"Deploying certificate for: {', '.join(services)}") - - # Disable services - for service in services: - logger.info(f"Disabling {service}") - self.execute_command(f'/ip service disable {service}', ignore_error=True) - - import time - time.sleep(1) - - # Cleanup - logger.info("Cleaning up old certificates") - cleanup = [ - '/certificate remove [find name~"letsencrypt"]', - '/file remove "letsencrypt.pem"', - '/file remove "letsencrypt-key.pem"', - ] - - for cmd in cleanup: - self.execute_command(cmd, ignore_error=True) - - time.sleep(1) - - # Upload - logger.info("Uploading certificate") - try: - with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: - scp.put(cert_path, 'letsencrypt.pem') - logger.info("✓ Certificate uploaded") - except Exception as e: - logger.error(f"Upload failed: {e}") - return False, False - - if key_path: - logger.info("Uploading key") - try: - with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: - scp.put(key_path, 'letsencrypt-key.pem') - logger.info("✓ Key uploaded") - except Exception as e: - logger.error(f"Key upload failed: {e}") - return False, False - - time.sleep(2) - - # Verify - success, stdout, stderr = self.execute_command( - '/file print where name~"letsencrypt"', + success, _, stderr = self.execute_command( + f'/ip service set {service} certificate="{cert_name}"', ignore_error=True ) - if not success or 'letsencrypt.pem' not in stdout: - logger.error("Files not found on MikroTik!") - return False, False + if success: + logger.info(f"✓ {service} configured") + else: + logger.warning(f"Failed to configure {service}: {stderr}") - logger.info("✓ Files verified") - - # Import - logger.info("Importing certificate") - success, stdout, stderr = self.execute_command( - '/certificate import file-name=letsencrypt.pem passphrase=""', - timeout=30 + success, stdout, _ = self.execute_command( + f'/ip service print where name="{service}"', + ignore_error=True ) - if not success: - logger.error(f"Import failed: {stderr}") - return False, False - - logger.info("✓ Certificate imported") - - time.sleep(2) - - # Use predictable name - imported_cert_name = "letsencrypt.pem_0" - logger.info(f"Using certificate: {imported_cert_name}") - - # Configure services using the new method - self.configure_services(services, imported_cert_name) - - time.sleep(1) - - logger.info(f"✓ MikroTik deployment completed") - return True, True - + if 'disabled=yes' in stdout: + logger.info(f"Enabling {service}") + self.execute_command(f'/ip service enable {service}') + + except Exception as e: + logger.error(f"Service configuration failed: {e}") + + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, + source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]: + """Upload certificate to MikroTik""" + try: + if not services: + services = ['www-ssl'] + + if check_first and source_cert: + if not self.check_certificate_expiry(source_cert, services): + return True, False + + logger.info(f"Deploying certificate for: {', '.join(services)}") + + # Disable services + for service in services: + logger.info(f"Disabling {service}") + self.execute_command(f'/ip service disable {service}', ignore_error=True) + + import time + time.sleep(1) + + # Cleanup + logger.info("Cleaning up old certificates") + cleanup = [ + '/certificate remove [find name~"letsencrypt"]', + '/file remove "letsencrypt.pem"', + '/file remove "letsencrypt-key.pem"', + ] + + for cmd in cleanup: + self.execute_command(cmd, ignore_error=True) + + time.sleep(1) + + # Upload + logger.info("Uploading certificate") + try: + with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: + scp.put(cert_path, 'letsencrypt.pem') + logger.info("✓ Certificate uploaded") except Exception as e: - logger.error(f"MikroTik deployment failed: {e}") + logger.error(f"Upload failed: {e}") return False, False + + if key_path: + logger.info("Uploading key") + try: + with SCPClient(self.ssh_client.get_transport(), progress=None) as scp: + scp.put(key_path, 'letsencrypt-key.pem') + logger.info("✓ Key uploaded") + except Exception as e: + logger.error(f"Key upload failed: {e}") + return False, False + + time.sleep(2) + + # Verify + success, stdout, stderr = self.execute_command( + '/file print where name~"letsencrypt"', + ignore_error=True + ) + + if not success or 'letsencrypt.pem' not in stdout: + logger.error("Files not found on MikroTik!") + return False, False + + logger.info("✓ Files verified") + + # Import + logger.info("Importing certificate") + success, stdout, stderr = self.execute_command( + '/certificate import file-name=letsencrypt.pem passphrase=""', + timeout=30 + ) + + if not success: + logger.error(f"Import failed: {stderr}") + return False, False + + logger.info("✓ Certificate imported") + + time.sleep(2) + + # Configure + imported_cert_name = "letsencrypt.pem_0" + logger.info(f"Using certificate: {imported_cert_name}") + + self.configure_services(services, imported_cert_name) + + time.sleep(1) + + logger.info("✓ MikroTik deployment completed") + return True, True + + except Exception as e: + logger.error(f"MikroTik deployment failed: {e}") + return False, False class ProxmoxManager(SSHManager):