mikrotik check cert

This commit is contained in:
Mateusz Gruszczyński
2025-10-27 09:21:24 +01:00
parent dccad5d7a4
commit 81d13e44eb

View File

@@ -269,8 +269,12 @@ class MikroTikManager(SSHManager):
self.cert_name = "letsencrypt"
self.key_name = "letsencrypt-key"
def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool:
"""Check if certificate on MikroTik needs update"""
def check_certificate_expiry(self, source_cert: x509.Certificate, services: List[str]) -> bool:
"""
Check if certificate on MikroTik needs update
Also verifies that services are properly configured
Returns True if upload needed, False if everything is OK
"""
try:
logger.info("Checking MikroTik certificate")
@@ -311,129 +315,173 @@ class MikroTikManager(SSHManager):
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
if time_diff < 86400:
logger.info("✓ Certificate is current. Skipping.")
return False
else:
if time_diff >= 86400:
logger.info(f"Certificate differs. Upload needed.")
return True
# Certificate is current, but check if services are properly configured
logger.info("Certificate is current. Verifying services configuration...")
cert_name = "letsencrypt.pem_0"
services_need_update = False
for service in services:
success, stdout, stderr = self.execute_command(
f'/ip service print where name="{service}"',
ignore_error=True
)
if success and stdout:
# Check if certificate is set correctly
if f'certificate={cert_name}' not in stdout and 'certificate=letsencrypt' not in stdout:
logger.warning(f"Service {service} not using correct certificate")
services_need_update = True
else:
logger.info(f"✓ Service {service} properly configured")
else:
logger.warning(f"Could not check service {service}")
services_need_update = True
if services_need_update:
logger.info("Services need reconfiguration. Updating...")
self.configure_services(services, cert_name)
return False # Don't need to upload cert, just reconfigure
logger.info("✓ Certificate and services are current. Skipping.")
return False
except Exception as e:
logger.warning(f"Error checking: {e}")
return True
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
if not services:
services = ['www-ssl']
logger.info(f"Deploying certificate for: {', '.join(services)}")
# Disable services
for service in services:
logger.info(f"Disabling {service}")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Cleanup
logger.info("Cleaning up old certificates")
cleanup = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Upload
logger.info("Uploading certificate")
def configure_services(self, services: List[str], cert_name: str):
"""Configure services to use certificate without re-uploading"""
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(cert_path, 'letsencrypt.pem')
logger.info("✓ Certificate uploaded")
for service in services:
logger.info(f"Configuring {service}")
# Set certificate
success, _, stderr = self.execute_command(
f'/ip service set {service} certificate="{cert_name}"',
ignore_error=True
)
if success:
logger.info(f"{service} configured")
else:
logger.warning(f"Failed to configure {service}: {stderr}")
# Ensure service is enabled
success, stdout, _ = self.execute_command(
f'/ip service print where name="{service}"',
ignore_error=True
)
if 'disabled=yes' in stdout:
logger.info(f"Enabling {service}")
self.execute_command(f'/ip service enable {service}')
except Exception as e:
logger.error(f"Upload failed: {e}")
return False, False
if key_path:
logger.info("Uploading key")
logger.error(f"Service configuration failed: {e}")
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if not services:
services = ['www-ssl']
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert, services):
return True, False # Certificate and services are OK
logger.info(f"Deploying certificate for: {', '.join(services)}")
# Disable services
for service in services:
logger.info(f"Disabling {service}")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Cleanup
logger.info("Cleaning up old certificates")
cleanup = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Upload
logger.info("Uploading certificate")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("Key uploaded")
scp.put(cert_path, 'letsencrypt.pem')
logger.info("Certificate uploaded")
except Exception as e:
logger.error(f"Key upload failed: {e}")
logger.error(f"Upload failed: {e}")
return False, False
time.sleep(2)
# Verify
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True
)
if not success or 'letsencrypt.pem' not in stdout:
logger.error("Files not found on MikroTik!")
return False, False
logger.info("✓ Files verified")
# Import
logger.info("Importing certificate")
success, stdout, stderr = self.execute_command(
'/certificate import file-name=letsencrypt.pem passphrase=""',
timeout=30
)
if not success:
logger.error(f"Import failed: {stderr}")
return False, False
logger.info("✓ Certificate imported")
time.sleep(2)
# Use predictable name
imported_cert_name = "letsencrypt.pem_0"
logger.info(f"Using certificate: {imported_cert_name}")
# Configure services
for service in services:
logger.info(f"Configuring {service}")
success, _, stderr = self.execute_command(
f'/ip service set {service} certificate="{imported_cert_name}"'
if key_path:
logger.info("Uploading key")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("✓ Key uploaded")
except Exception as e:
logger.error(f"Key upload failed: {e}")
return False, False
time.sleep(2)
# Verify
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True
)
if not success or 'letsencrypt.pem' not in stdout:
logger.error("Files not found on MikroTik!")
return False, False
logger.info("✓ Files verified")
# Import
logger.info("Importing certificate")
success, stdout, stderr = self.execute_command(
'/certificate import file-name=letsencrypt.pem passphrase=""',
timeout=30
)
if not success:
logger.error(f"Failed to configure {service}: {stderr}")
else:
logger.info(f"{service} configured")
# Enable services
for service in services:
logger.info(f"Enabling {service}")
self.execute_command(f'/ip service enable {service}')
time.sleep(1)
logger.info(f"✓ MikroTik deployment completed")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
logger.error(f"Import failed: {stderr}")
return False, False
logger.info("✓ Certificate imported")
time.sleep(2)
# Use predictable name
imported_cert_name = "letsencrypt.pem_0"
logger.info(f"Using certificate: {imported_cert_name}")
# Configure services using the new method
self.configure_services(services, imported_cert_name)
time.sleep(1)
logger.info(f"✓ MikroTik deployment completed")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
class ProxmoxManager(SSHManager):