mikrotik check cert

This commit is contained in:
Mateusz Gruszczyński
2025-10-27 09:23:13 +01:00
parent 81d13e44eb
commit c42250196f

View File

@@ -270,11 +270,7 @@ class MikroTikManager(SSHManager):
self.key_name = "letsencrypt-key" self.key_name = "letsencrypt-key"
def check_certificate_expiry(self, source_cert: x509.Certificate, services: List[str]) -> bool: def check_certificate_expiry(self, source_cert: x509.Certificate, services: List[str]) -> bool:
""" """Check if certificate on MikroTik needs update"""
Check if certificate on MikroTik needs update
Also verifies that services are properly configured
Returns True if upload needed, False if everything is OK
"""
try: try:
logger.info("Checking MikroTik certificate") logger.info("Checking MikroTik certificate")
@@ -307,7 +303,7 @@ class MikroTikManager(SSHManager):
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S') mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S')
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
except ValueError: except ValueError:
logger.warning(f"Could not parse date") logger.warning("Could not parse date")
return True return True
logger.info(f"Source expires: {source_expiry}") logger.info(f"Source expires: {source_expiry}")
@@ -316,11 +312,11 @@ class MikroTikManager(SSHManager):
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
if time_diff >= 86400: if time_diff >= 86400:
logger.info(f"Certificate differs. Upload needed.") logger.info("Certificate differs. Upload needed.")
return True return True
# Certificate is current, but check if services are properly configured # Certificate is current, check services
logger.info("Certificate is current. Verifying services configuration...") logger.info("Certificate is current. Verifying services...")
cert_name = "letsencrypt.pem_0" cert_name = "letsencrypt.pem_0"
services_need_update = False services_need_update = False
@@ -332,7 +328,6 @@ class MikroTikManager(SSHManager):
) )
if success and stdout: if success and stdout:
# Check if certificate is set correctly
if f'certificate={cert_name}' not in stdout and 'certificate=letsencrypt' not in stdout: if f'certificate={cert_name}' not in stdout and 'certificate=letsencrypt' not in stdout:
logger.warning(f"Service {service} not using correct certificate") logger.warning(f"Service {service} not using correct certificate")
services_need_update = True services_need_update = True
@@ -345,7 +340,7 @@ class MikroTikManager(SSHManager):
if services_need_update: if services_need_update:
logger.info("Services need reconfiguration. Updating...") logger.info("Services need reconfiguration. Updating...")
self.configure_services(services, cert_name) self.configure_services(services, cert_name)
return False # Don't need to upload cert, just reconfigure return False
logger.info("✓ Certificate and services are current. Skipping.") logger.info("✓ Certificate and services are current. Skipping.")
return False return False
@@ -354,135 +349,132 @@ class MikroTikManager(SSHManager):
logger.warning(f"Error checking: {e}") logger.warning(f"Error checking: {e}")
return True return True
def configure_services(self, services: List[str], cert_name: str): def configure_services(self, services: List[str], cert_name: str):
"""Configure services to use certificate without re-uploading""" """Configure services to use certificate"""
try: try:
for service in services: for service in services:
logger.info(f"Configuring {service}") logger.info(f"Configuring {service}")
# Set certificate success, _, stderr = self.execute_command(
success, _, stderr = self.execute_command( f'/ip service set {service} certificate="{cert_name}"',
f'/ip service set {service} certificate="{cert_name}"',
ignore_error=True
)
if success:
logger.info(f"{service} configured")
else:
logger.warning(f"Failed to configure {service}: {stderr}")
# Ensure service is enabled
success, stdout, _ = self.execute_command(
f'/ip service print where name="{service}"',
ignore_error=True
)
if 'disabled=yes' in stdout:
logger.info(f"Enabling {service}")
self.execute_command(f'/ip service enable {service}')
except Exception as e:
logger.error(f"Service configuration failed: {e}")
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if not services:
services = ['www-ssl']
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert, services):
return True, False # Certificate and services are OK
logger.info(f"Deploying certificate for: {', '.join(services)}")
# Disable services
for service in services:
logger.info(f"Disabling {service}")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Cleanup
logger.info("Cleaning up old certificates")
cleanup = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Upload
logger.info("Uploading certificate")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(cert_path, 'letsencrypt.pem')
logger.info("✓ Certificate uploaded")
except Exception as e:
logger.error(f"Upload failed: {e}")
return False, False
if key_path:
logger.info("Uploading key")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("✓ Key uploaded")
except Exception as e:
logger.error(f"Key upload failed: {e}")
return False, False
time.sleep(2)
# Verify
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True ignore_error=True
) )
if not success or 'letsencrypt.pem' not in stdout: if success:
logger.error("Files not found on MikroTik!") logger.info(f"{service} configured")
return False, False else:
logger.warning(f"Failed to configure {service}: {stderr}")
logger.info("✓ Files verified") success, stdout, _ = self.execute_command(
f'/ip service print where name="{service}"',
# Import ignore_error=True
logger.info("Importing certificate")
success, stdout, stderr = self.execute_command(
'/certificate import file-name=letsencrypt.pem passphrase=""',
timeout=30
) )
if not success: if 'disabled=yes' in stdout:
logger.error(f"Import failed: {stderr}") logger.info(f"Enabling {service}")
self.execute_command(f'/ip service enable {service}')
except Exception as e:
logger.error(f"Service configuration failed: {e}")
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if not services:
services = ['www-ssl']
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert, services):
return True, False
logger.info(f"Deploying certificate for: {', '.join(services)}")
# Disable services
for service in services:
logger.info(f"Disabling {service}")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Cleanup
logger.info("Cleaning up old certificates")
cleanup = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Upload
logger.info("Uploading certificate")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(cert_path, 'letsencrypt.pem')
logger.info("✓ Certificate uploaded")
except Exception as e:
logger.error(f"Upload failed: {e}")
return False, False
if key_path:
logger.info("Uploading key")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("✓ Key uploaded")
except Exception as e:
logger.error(f"Key upload failed: {e}")
return False, False return False, False
logger.info("✓ Certificate imported") time.sleep(2)
time.sleep(2) # Verify
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True
)
# Use predictable name if not success or 'letsencrypt.pem' not in stdout:
imported_cert_name = "letsencrypt.pem_0" logger.error("Files not found on MikroTik!")
logger.info(f"Using certificate: {imported_cert_name}")
# Configure services using the new method
self.configure_services(services, imported_cert_name)
time.sleep(1)
logger.info(f"✓ MikroTik deployment completed")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False return False, False
logger.info("✓ Files verified")
# Import
logger.info("Importing certificate")
success, stdout, stderr = self.execute_command(
'/certificate import file-name=letsencrypt.pem passphrase=""',
timeout=30
)
if not success:
logger.error(f"Import failed: {stderr}")
return False, False
logger.info("✓ Certificate imported")
time.sleep(2)
# Configure
imported_cert_name = "letsencrypt.pem_0"
logger.info(f"Using certificate: {imported_cert_name}")
self.configure_services(services, imported_cert_name)
time.sleep(1)
logger.info("✓ MikroTik deployment completed")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
class ProxmoxManager(SSHManager): class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers""" """Specialized manager for Proxmox VE servers"""