mikrotik check cert
This commit is contained in:
		
							
								
								
									
										195
									
								
								certpusher.py
									
									
									
									
									
								
							
							
						
						
									
										195
									
								
								certpusher.py
									
									
									
									
									
								
							| @@ -331,18 +331,32 @@ class MikroTikManager(SSHManager): | |||||||
|             logger.warning(f"Error checking: {e}") |             logger.warning(f"Error checking: {e}") | ||||||
|             return True |             return True | ||||||
|      |      | ||||||
|     def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: |         def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate, services: list = None) -> Tuple[bool, bool]: | ||||||
|         """Upload certificate to MikroTik""" |         """ | ||||||
|  |         Upload certificate to MikroTik | ||||||
|  |          | ||||||
|  |         Args: | ||||||
|  |             cert_path: Path to certificate file | ||||||
|  |             key_path: Path to private key file | ||||||
|  |             check_first: Whether to check existing certificate first | ||||||
|  |             source_cert: Source certificate object for comparison | ||||||
|  |             services: List of services to configure (e.g. ['www-ssl', 'api-ssl']) | ||||||
|  |         """ | ||||||
|         try: |         try: | ||||||
|             if check_first and source_cert: |             if check_first and source_cert: | ||||||
|                 if not self.check_certificate_expiry(source_cert): |                 if not self.check_certificate_expiry(source_cert): | ||||||
|                     return True, False |                     return True, False | ||||||
|              |              | ||||||
|             logger.info("Deploying MikroTik certificate") |             # Default to www-ssl if not specified | ||||||
|  |             if not services: | ||||||
|  |                 services = ['www-ssl'] | ||||||
|              |              | ||||||
|             # Step 1: Disable www-ssl |             logger.info(f"Deploying MikroTik certificate for services: {', '.join(services)}") | ||||||
|             logger.info("Disabling www-ssl service") |              | ||||||
|             self.execute_command('/ip service disable www-ssl', ignore_error=True) |             # Step 1: Disable all services that will use the certificate | ||||||
|  |             for service in services: | ||||||
|  |                 logger.info(f"Disabling {service} service") | ||||||
|  |                 self.execute_command(f'/ip service disable {service}', ignore_error=True) | ||||||
|              |              | ||||||
|             import time |             import time | ||||||
|             time.sleep(1) |             time.sleep(1) | ||||||
| @@ -392,7 +406,6 @@ class MikroTikManager(SSHManager): | |||||||
|              |              | ||||||
|             if not success or 'letsencrypt.pem' not in stdout: |             if not success or 'letsencrypt.pem' not in stdout: | ||||||
|                 logger.error("Certificate file not found on MikroTik!") |                 logger.error("Certificate file not found on MikroTik!") | ||||||
|                 logger.error("Available files:") |  | ||||||
|                 self.execute_command('/file print') |                 self.execute_command('/file print') | ||||||
|                 return False, False |                 return False, False | ||||||
|              |              | ||||||
| @@ -406,7 +419,6 @@ class MikroTikManager(SSHManager): | |||||||
|              |              | ||||||
|             if not success: |             if not success: | ||||||
|                 logger.error(f"Import failed: {stderr}") |                 logger.error(f"Import failed: {stderr}") | ||||||
|                 self.execute_command('/file print detail where name="letsencrypt.pem"') |  | ||||||
|                 return False, False |                 return False, False | ||||||
|              |              | ||||||
|             logger.info("✓ Certificate imported") |             logger.info("✓ Certificate imported") | ||||||
| @@ -414,76 +426,54 @@ class MikroTikManager(SSHManager): | |||||||
|              |              | ||||||
|             time.sleep(2) |             time.sleep(2) | ||||||
|              |              | ||||||
|             # Step 7: Find imported certificate |             # Step 7: Use predictable certificate name | ||||||
|             logger.info("Looking for imported certificate...") |             # MikroTik always creates filename_0 for the leaf certificate | ||||||
|  |             imported_cert_name = "letsencrypt.pem_0" | ||||||
|  |             logger.info(f"Using certificate: {imported_cert_name}") | ||||||
|  |              | ||||||
|  |             # Verify it exists | ||||||
|             success, stdout, stderr = self.execute_command( |             success, stdout, stderr = self.execute_command( | ||||||
|                 '/certificate print terse where name~"letsencrypt"' |                 f'/certificate print where name="{imported_cert_name}"' | ||||||
|             ) |             ) | ||||||
|              |              | ||||||
|             if not success or not stdout: |             if not success or not stdout: | ||||||
|                 logger.error("Could not find imported certificate!") |                 logger.warning(f"Certificate {imported_cert_name} not found, trying to find it...") | ||||||
|                 self.execute_command('/certificate print') |                 success, stdout, stderr = self.execute_command('/certificate print where name~"letsencrypt"') | ||||||
|                 return False, False |                 if stdout: | ||||||
|  |                     logger.debug(f"Available certificates:\n{stdout}") | ||||||
|  |                     # Try to parse actual name | ||||||
|  |                     match = re.search(r'name="?([a-zA-Z0-9._-]*letsencrypt[^"\s]*)"?', stdout) | ||||||
|  |                     if match: | ||||||
|  |                         imported_cert_name = match.group(1) | ||||||
|  |                         logger.info(f"Found certificate: {imported_cert_name}") | ||||||
|              |              | ||||||
|             logger.debug(f"Found certificates:\n{stdout}") |             # Step 8: Configure all specified services | ||||||
|  |             for service in services: | ||||||
|  |                 logger.info(f"Configuring {service} to use certificate") | ||||||
|  |                 config_cmd = f'/ip service set {service} certificate="{imported_cert_name}"' | ||||||
|  |                 success, stdout, stderr = self.execute_command(config_cmd) | ||||||
|                  |                  | ||||||
|             # Parse certificate names - terse format: "154 LT name=letsencrypt.pem_0" |                 if not success: | ||||||
|             # Try both formats (with and without quotes) |                     logger.error(f"Failed to configure {service}: {stderr}") | ||||||
|             cert_names = re.findall(r'name="?([^"\s]+)"?', stdout) |                     logger.warning(f"Continuing with other services...") | ||||||
|  |                 else: | ||||||
|  |                     logger.info(f"✓ {service} configured") | ||||||
|              |              | ||||||
|             if not cert_names: |             # Step 9: Enable all services | ||||||
|                 logger.error("Could not parse certificate names") |             for service in services: | ||||||
|                 logger.error("Trying alternative parsing...") |                 logger.info(f"Enabling {service} service") | ||||||
|                 # Alternative: parse lines |                 self.execute_command(f'/ip service enable {service}') | ||||||
|                 for line in stdout.split('\n'): |  | ||||||
|                     if 'name=' in line and 'letsencrypt' in line: |  | ||||||
|                         match = re.search(r'name=([^\s]+)', line) |  | ||||||
|                         if match: |  | ||||||
|                             cert_names.append(match.group(1)) |  | ||||||
|                  |  | ||||||
|                 if not cert_names: |  | ||||||
|                     logger.error("Still could not find certificate name!") |  | ||||||
|                     return False, False |  | ||||||
|  |  | ||||||
|             # Filter to get the leaf certificate (not intermediate CA) |  | ||||||
|             # Usually it's the first one or the one with common-name matching our domain |  | ||||||
|             imported_cert_name = None |  | ||||||
|             for name in cert_names: |  | ||||||
|                 if '_0' in name:  # Usually the leaf cert |  | ||||||
|                     imported_cert_name = name |  | ||||||
|                     break |  | ||||||
|  |  | ||||||
|             if not imported_cert_name: |  | ||||||
|                 imported_cert_name = cert_names[0] |  | ||||||
|  |  | ||||||
|             logger.info(f"Using certificate: {imported_cert_name}") |  | ||||||
|  |  | ||||||
|              |  | ||||||
|             # Step 8: Configure www-ssl service |  | ||||||
|             logger.info("Configuring www-ssl to use new certificate") |  | ||||||
|             config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"' |  | ||||||
|             success, stdout, stderr = self.execute_command(config_cmd) |  | ||||||
|              |  | ||||||
|             if not success: |  | ||||||
|                 logger.error(f"Failed to configure service: {stderr}") |  | ||||||
|                 self.execute_command('/certificate print') |  | ||||||
|                 return False, False |  | ||||||
|              |  | ||||||
|             logger.info("✓ Service configured") |  | ||||||
|              |  | ||||||
|             # Step 9: Enable www-ssl |  | ||||||
|             logger.info("Enabling www-ssl service") |  | ||||||
|             self.execute_command('/ip service enable www-ssl') |  | ||||||
|              |              | ||||||
|             time.sleep(1) |             time.sleep(1) | ||||||
|              |              | ||||||
|             # Step 10: Verify service status |             # Step 10: Verify services status | ||||||
|             success, stdout, stderr = self.execute_command( |             for service in services: | ||||||
|                 '/ip service print where name="www-ssl"' |                 success, stdout, stderr = self.execute_command( | ||||||
|             ) |                     f'/ip service print where name="{service}"' | ||||||
|  |                 ) | ||||||
|                  |                  | ||||||
|             if success and stdout: |                 if success and stdout: | ||||||
|                 logger.debug(f"Service status:\n{stdout}") |                     logger.debug(f"{service} status:\n{stdout}") | ||||||
|              |              | ||||||
|             logger.info(f"✓ MikroTik deployment completed successfully") |             logger.info(f"✓ MikroTik deployment completed successfully") | ||||||
|             return True, True |             return True, True | ||||||
| @@ -637,45 +627,58 @@ class CertPusher: | |||||||
|         return cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') |         return cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') | ||||||
|      |      | ||||||
|     def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: |     def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: | ||||||
|         """Process MikroTik device""" |     """Process MikroTik device""" | ||||||
|         try: |     try: | ||||||
|             source_key_path = self.get_key_path(section, source_cert_path) |         source_key_path = self.get_key_path(section, source_cert_path) | ||||||
|          |          | ||||||
|             if not os.path.exists(source_key_path): |         if not os.path.exists(source_key_path): | ||||||
|                 logger.error(f"Key not found: {source_key_path}") |             logger.error(f"Key not found: {source_key_path}") | ||||||
|                 return False |             return False | ||||||
|          |          | ||||||
|             source_cert = self.cert_manager.get_cert_from_file(source_cert_path) |         source_cert = self.cert_manager.get_cert_from_file(source_cert_path) | ||||||
|             if not source_cert: |         if not source_cert: | ||||||
|                 return False |             return False | ||||||
|          |          | ||||||
|             check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) |         check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) | ||||||
|          |          | ||||||
|             mikrotik = MikroTikManager(hostname, port, username, ssh_key) |         # Get services to configure (default: www-ssl) | ||||||
|  |         services_str = self.config.get(section, 'mikrotik_services', fallback='www-ssl') | ||||||
|  |         services = [s.strip() for s in services_str.split(',')] | ||||||
|          |          | ||||||
|             if not mikrotik.connect(): |         logger.info(f"Target services: {', '.join(services)}") | ||||||
|                 self.stats['failed'] += 1 |  | ||||||
|                 return False |  | ||||||
|          |          | ||||||
|             success, was_uploaded = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert) |         mikrotik = MikroTikManager(hostname, port, username, ssh_key) | ||||||
|             mikrotik.disconnect() |  | ||||||
|          |          | ||||||
|             if success: |         if not mikrotik.connect(): | ||||||
|                 if was_uploaded: |  | ||||||
|                     self.stats['uploaded'] += 1 |  | ||||||
|                 else: |  | ||||||
|                     self.stats['skipped'] += 1 |  | ||||||
|                 logger.info("✓ MikroTik processed") |  | ||||||
|                 return True |  | ||||||
|             else: |  | ||||||
|                 self.stats['failed'] += 1 |  | ||||||
|                 return False |  | ||||||
|              |  | ||||||
|         except Exception as e: |  | ||||||
|             logger.error(f"MikroTik failed: {e}") |  | ||||||
|             self.stats['failed'] += 1 |             self.stats['failed'] += 1 | ||||||
|             return False |             return False | ||||||
|          |          | ||||||
|  |         success, was_uploaded = mikrotik.upload_certificate( | ||||||
|  |             source_cert_path,  | ||||||
|  |             source_key_path,  | ||||||
|  |             check_first,  | ||||||
|  |             source_cert, | ||||||
|  |             services  # Pass services list | ||||||
|  |         ) | ||||||
|  |         mikrotik.disconnect() | ||||||
|  |          | ||||||
|  |         if success: | ||||||
|  |             if was_uploaded: | ||||||
|  |                 self.stats['uploaded'] += 1 | ||||||
|  |             else: | ||||||
|  |                 self.stats['skipped'] += 1 | ||||||
|  |             logger.info("✓ MikroTik processed") | ||||||
|  |             return True | ||||||
|  |         else: | ||||||
|  |             self.stats['failed'] += 1 | ||||||
|  |             return False | ||||||
|  |          | ||||||
|  |     except Exception as e: | ||||||
|  |         logger.error(f"MikroTik failed: {e}") | ||||||
|  |         self.stats['failed'] += 1 | ||||||
|  |         return False | ||||||
|  |  | ||||||
|  |      | ||||||
|     def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: |     def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool: | ||||||
|         """Process Proxmox server""" |         """Process Proxmox server""" | ||||||
|         try: |         try: | ||||||
|   | |||||||
| @@ -23,6 +23,8 @@ username = admin | |||||||
| ssh_key_path = /root/.ssh/id_rsa_proxy | ssh_key_path = /root/.ssh/id_rsa_proxy | ||||||
| # Check certificate before upload (default: true) | # Check certificate before upload (default: true) | ||||||
| check_before_upload = true | check_before_upload = true | ||||||
|  | mikrotik_services = www-ssl,api-ssl | ||||||
|  |  | ||||||
|  |  | ||||||
| [mikrotik_switch] | [mikrotik_switch] | ||||||
| type = mikrotik | type = mikrotik | ||||||
| @@ -31,6 +33,8 @@ port = 22 | |||||||
| username = admin | username = admin | ||||||
| # Always upload without checking | # Always upload without checking | ||||||
| check_before_upload = false | check_before_upload = false | ||||||
|  | mikrotik_services = www-ssl,api-ssl | ||||||
|  |  | ||||||
|  |  | ||||||
| # ═══════════════════════════════════════════════════════════ | # ═══════════════════════════════════════════════════════════ | ||||||
| # PROXMOX VE SERVERS | # PROXMOX VE SERVERS | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Mateusz Gruszczyński
					Mateusz Gruszczyński