mikrotik check cert
This commit is contained in:
426
certpusher.py
426
certpusher.py
@@ -289,315 +289,163 @@ class MikroTikManager(SSHManager):
|
|||||||
self.cert_name = "letsencrypt"
|
self.cert_name = "letsencrypt"
|
||||||
self.key_name = "letsencrypt-key"
|
self.key_name = "letsencrypt-key"
|
||||||
|
|
||||||
def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool:
|
|
||||||
"""Check if certificate on MikroTik needs update"""
|
|
||||||
try:
|
|
||||||
logger.info("Checking MikroTik certificate")
|
|
||||||
|
|
||||||
# Get source certificate common-name
|
|
||||||
source_cn = source_cert.subject.rfc4514_string()
|
|
||||||
source_expiry = source_cert.not_valid_after_utc
|
|
||||||
|
|
||||||
# Extract CN value (e.g., "*.linuxiarz.pl" from "CN=*.linuxiarz.pl")
|
|
||||||
cn_value = source_cn.split("CN=")[1].split(",")[0] if "CN=" in source_cn else source_cn
|
|
||||||
|
|
||||||
logger.info(f"Looking for cert with CN: {cn_value}")
|
|
||||||
logger.info(f"Source expires: {source_expiry}")
|
|
||||||
|
|
||||||
# Search by common-name (escape wildcards for MikroTik)
|
|
||||||
cn_search = cn_value.replace("*", "\\*")
|
|
||||||
success, stdout, stderr = self.execute_command(
|
|
||||||
f'/certificate print detail where common-name="{cn_search}"',
|
|
||||||
ignore_error=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if not success or not stdout or 'invalid-after' not in stdout.lower():
|
|
||||||
logger.info("Certificate not found. Upload needed.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
logger.debug(f"Found certificate:\n{stdout[:500]}")
|
|
||||||
|
|
||||||
# Parse expiry (RouterOS 7.x format: 2026-01-22 08:34:12)
|
|
||||||
invalid_after_match = re.search(
|
|
||||||
r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})',
|
|
||||||
stdout,
|
|
||||||
re.IGNORECASE
|
|
||||||
)
|
|
||||||
|
|
||||||
if not invalid_after_match:
|
|
||||||
logger.warning("Could not parse expiry")
|
|
||||||
return True
|
|
||||||
|
|
||||||
mikrotik_expiry_str = invalid_after_match.group(1)
|
|
||||||
|
|
||||||
|
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
||||||
|
"""Upload certificate to MikroTik"""
|
||||||
try:
|
try:
|
||||||
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S')
|
if check_first and source_cert:
|
||||||
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
|
if not self.check_certificate_expiry(source_cert):
|
||||||
except ValueError:
|
return True, False
|
||||||
logger.warning(f"Could not parse date: {mikrotik_expiry_str}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
logger.info(f"MikroTik expires: {mikrotik_expiry}")
|
logger.info("Deploying MikroTik certificate")
|
||||||
|
|
||||||
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
|
# Step 1: Disable www-ssl
|
||||||
|
logger.info("Disabling www-ssl service")
|
||||||
|
self.execute_command('/ip service disable www-ssl', ignore_error=True)
|
||||||
|
|
||||||
# Allow 24h tolerance
|
import time
|
||||||
if time_diff < 86400:
|
time.sleep(1)
|
||||||
logger.info("✓ Certificate is current. Skipping.")
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
# Step 2: Remove old files and certificates
|
||||||
logger.warning(f"Error checking: {e}")
|
logger.info("Cleaning up old certificates")
|
||||||
import traceback
|
cleanup_commands = [
|
||||||
logger.debug(traceback.format_exc())
|
'/certificate remove [find name~"letsencrypt"]',
|
||||||
return True
|
'/file remove "letsencrypt.pem"',
|
||||||
|
'/file remove "letsencrypt-key.pem"',
|
||||||
|
]
|
||||||
|
|
||||||
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
for cmd in cleanup_commands:
|
||||||
"""Upload certificate to MikroTik"""
|
self.execute_command(cmd, ignore_error=True)
|
||||||
try:
|
|
||||||
if check_first and source_cert:
|
|
||||||
if not self.check_certificate_expiry(source_cert):
|
|
||||||
return True, False
|
|
||||||
|
|
||||||
logger.info("Deploying MikroTik certificate")
|
time.sleep(1)
|
||||||
|
|
||||||
# Step 1: Disable www-ssl
|
# Step 3: Show current files (debug)
|
||||||
logger.debug("Disabling www-ssl service")
|
logger.debug("Files before upload:")
|
||||||
self.execute_command('/ip service disable www-ssl', ignore_error=True)
|
success, stdout, stderr = self.execute_command('/file print', ignore_error=True)
|
||||||
|
if stdout:
|
||||||
|
logger.debug(stdout[:500])
|
||||||
|
|
||||||
import time
|
# Step 4: Upload certificate via SCP
|
||||||
time.sleep(1)
|
logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem")
|
||||||
|
try:
|
||||||
|
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
|
||||||
|
# Upload to root directory
|
||||||
|
scp.put(cert_path, 'letsencrypt.pem')
|
||||||
|
logger.info("✓ Certificate file uploaded")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"SCP upload failed: {e}")
|
||||||
|
return False, False
|
||||||
|
|
||||||
# Step 2: Remove old certificates with our name
|
# Step 5: Upload private key
|
||||||
logger.debug("Cleaning up old certificates")
|
if key_path:
|
||||||
cleanup_commands = [
|
logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem")
|
||||||
f'/certificate remove [find name~"{self.cert_name}"]',
|
try:
|
||||||
f'/file remove "{self.cert_name}.pem"',
|
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
|
||||||
f'/file remove "{self.key_name}.pem"',
|
scp.put(key_path, 'letsencrypt-key.pem')
|
||||||
]
|
logger.info("✓ Key file uploaded")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Key upload failed: {e}")
|
||||||
|
return False, False
|
||||||
|
|
||||||
for cmd in cleanup_commands:
|
time.sleep(2)
|
||||||
self.execute_command(cmd, ignore_error=True)
|
|
||||||
|
|
||||||
time.sleep(1)
|
# Step 6: Verify files were uploaded
|
||||||
|
logger.info("Verifying uploaded files...")
|
||||||
|
success, stdout, stderr = self.execute_command(
|
||||||
|
'/file print where name~"letsencrypt"',
|
||||||
|
ignore_error=True
|
||||||
|
)
|
||||||
|
|
||||||
# Step 3: Upload certificate
|
if not success or 'letsencrypt.pem' not in stdout:
|
||||||
logger.info(f"Uploading certificate as: {self.cert_name}.pem")
|
logger.error("Certificate file not found on MikroTik!")
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
logger.error("Available files:")
|
||||||
scp.put(cert_path, f'{self.cert_name}.pem')
|
self.execute_command('/file print')
|
||||||
|
return False, False
|
||||||
|
|
||||||
# Step 4: Upload private key
|
logger.info("✓ Files verified on MikroTik")
|
||||||
if key_path:
|
logger.debug(f"Files:\n{stdout}")
|
||||||
logger.info(f"Uploading key as: {self.key_name}.pem")
|
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
|
||||||
scp.put(key_path, f'{self.key_name}.pem')
|
|
||||||
|
|
||||||
time.sleep(1)
|
# Step 7: Import certificate
|
||||||
|
logger.info("Importing certificate into MikroTik")
|
||||||
|
import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""'
|
||||||
|
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
|
||||||
|
|
||||||
# Step 5: Import certificate (fullchain includes both cert and chain)
|
if not success:
|
||||||
logger.info("Importing certificate")
|
logger.error(f"Import failed: {stderr}")
|
||||||
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
|
logger.error("Trying to diagnose...")
|
||||||
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
|
# Show file details
|
||||||
|
self.execute_command('/file print detail where name="letsencrypt.pem"')
|
||||||
|
# Try to see if file is readable
|
||||||
|
self.execute_command('/file print file-name=letsencrypt.pem')
|
||||||
|
return False, False
|
||||||
|
|
||||||
if not success:
|
logger.info("✓ Certificate imported")
|
||||||
logger.error(f"Certificate import failed: {stderr}")
|
logger.debug(f"Import output: {stdout}")
|
||||||
# Show available files for debugging
|
|
||||||
self.execute_command('/file print where name~".pem"')
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Step 8: Find imported certificate
|
||||||
|
logger.info("Looking for imported certificate...")
|
||||||
|
success, stdout, stderr = self.execute_command(
|
||||||
|
'/certificate print terse where name~"letsencrypt"'
|
||||||
|
)
|
||||||
|
|
||||||
|
if not success or not stdout:
|
||||||
|
logger.error("Could not find imported certificate!")
|
||||||
|
logger.error("All certificates:")
|
||||||
|
self.execute_command('/certificate print')
|
||||||
|
return False, False
|
||||||
|
|
||||||
|
logger.debug(f"Found certificates:\n{stdout}")
|
||||||
|
|
||||||
|
# Parse certificate names
|
||||||
|
cert_names = re.findall(r'name="([^"]+)"', stdout)
|
||||||
|
|
||||||
|
if not cert_names:
|
||||||
|
logger.error("Could not parse certificate names")
|
||||||
|
return False, False
|
||||||
|
|
||||||
|
# Use the first one (usually the leaf certificate)
|
||||||
|
imported_cert_name = cert_names[0]
|
||||||
|
logger.info(f"Using certificate: {imported_cert_name}")
|
||||||
|
|
||||||
|
# Step 9: Configure www-ssl service
|
||||||
|
logger.info("Configuring www-ssl to use new certificate")
|
||||||
|
config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"'
|
||||||
|
success, stdout, stderr = self.execute_command(config_cmd)
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
logger.error(f"Failed to configure service: {stderr}")
|
||||||
|
logger.error("Available certificates:")
|
||||||
|
self.execute_command('/certificate print')
|
||||||
|
return False, False
|
||||||
|
|
||||||
|
logger.info("✓ Service configured")
|
||||||
|
|
||||||
|
# Step 10: Enable www-ssl
|
||||||
|
logger.info("Enabling www-ssl service")
|
||||||
|
self.execute_command('/ip service enable www-ssl')
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Step 11: Verify service status
|
||||||
|
success, stdout, stderr = self.execute_command(
|
||||||
|
'/ip service print where name="www-ssl"'
|
||||||
|
)
|
||||||
|
|
||||||
|
if success and stdout:
|
||||||
|
logger.info("Service status:")
|
||||||
|
logger.info(stdout)
|
||||||
|
|
||||||
|
logger.info(f"✓ MikroTik deployment completed successfully")
|
||||||
|
return True, True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"MikroTik deployment failed: {e}")
|
||||||
|
import traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Step 6: Find imported certificate name
|
|
||||||
success, stdout, stderr = self.execute_command(
|
|
||||||
f'/certificate print terse where name~"{self.cert_name}"'
|
|
||||||
)
|
|
||||||
|
|
||||||
if not success or not stdout:
|
|
||||||
logger.error("Could not find imported certificate")
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
logger.debug(f"Imported certificates:\n{stdout}")
|
|
||||||
|
|
||||||
# Extract certificate name (MikroTik adds _0, _1 suffixes)
|
|
||||||
# Format: 0 name="letsencrypt_0" ...
|
|
||||||
cert_names = re.findall(r'name="([^"]+)"', stdout)
|
|
||||||
|
|
||||||
if not cert_names:
|
|
||||||
logger.error("Could not parse certificate names")
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
# Use first certificate (usually the actual cert, not CA)
|
|
||||||
imported_cert_name = cert_names[0]
|
|
||||||
logger.info(f"Using certificate: {imported_cert_name}")
|
|
||||||
|
|
||||||
# Step 7: Configure www-ssl service
|
|
||||||
logger.info("Configuring www-ssl service")
|
|
||||||
config_cmd = f'/ip service set www-ssl certificate={imported_cert_name}'
|
|
||||||
success, stdout, stderr = self.execute_command(config_cmd)
|
|
||||||
|
|
||||||
if not success:
|
|
||||||
logger.error(f"Failed to set certificate: {stderr}")
|
|
||||||
# Show available certificates
|
|
||||||
self.execute_command('/certificate print')
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
# Step 8: Enable www-ssl
|
|
||||||
self.execute_command('/ip service enable www-ssl', ignore_error=True)
|
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Step 9: Verify
|
|
||||||
success, stdout, stderr = self.execute_command(
|
|
||||||
'/ip service print where name="www-ssl"'
|
|
||||||
)
|
|
||||||
|
|
||||||
if success:
|
|
||||||
logger.debug(f"Service status:\n{stdout}")
|
|
||||||
|
|
||||||
logger.info(f"✓ MikroTik deployment successful")
|
|
||||||
return True, True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"MikroTik deployment failed: {e}")
|
|
||||||
import traceback
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
|
|
||||||
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
|
||||||
"""Upload certificate to MikroTik"""
|
|
||||||
try:
|
|
||||||
if check_first and source_cert:
|
|
||||||
if not self.check_certificate_expiry(source_cert):
|
|
||||||
return True, False
|
|
||||||
|
|
||||||
logger.info("Deploying MikroTik certificate")
|
|
||||||
|
|
||||||
# Disable service
|
|
||||||
self.execute_command('/ip service disable www-ssl', ignore_error=True)
|
|
||||||
|
|
||||||
# Remove OLD certificates with our naming pattern
|
|
||||||
cleanup_commands = [
|
|
||||||
f'/certificate remove [find name~"certpusher"]',
|
|
||||||
f'/file remove [find name~"certpusher"]',
|
|
||||||
]
|
|
||||||
|
|
||||||
for cmd in cleanup_commands:
|
|
||||||
self.execute_command(cmd, ignore_error=True)
|
|
||||||
|
|
||||||
# Upload with unique name
|
|
||||||
logger.info(f"Uploading as: {self.cert_name}")
|
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
|
||||||
scp.put(cert_path, f'{self.cert_name}.pem')
|
|
||||||
|
|
||||||
if key_path:
|
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
|
||||||
scp.put(key_path, f'{self.cert_name}-key.pem')
|
|
||||||
|
|
||||||
# Import certificate
|
|
||||||
logger.info("Importing certificate")
|
|
||||||
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
|
|
||||||
if key_path:
|
|
||||||
import_cmd += f' name={self.cert_name}'
|
|
||||||
|
|
||||||
self.execute_command(import_cmd, timeout=30)
|
|
||||||
|
|
||||||
import time
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
# Find the imported certificate (MikroTik adds _0, _1 suffixes)
|
|
||||||
success, stdout, stderr = self.execute_command(
|
|
||||||
f'/certificate print where name~"{self.cert_name}"'
|
|
||||||
)
|
|
||||||
|
|
||||||
if success and stdout:
|
|
||||||
logger.debug(f"Imported certificates:\n{stdout}")
|
|
||||||
|
|
||||||
# Extract certificate name (usually cert_name_0)
|
|
||||||
cert_match = re.search(r'name="([^"]+)"', stdout)
|
|
||||||
if cert_match:
|
|
||||||
imported_name = cert_match.group(1)
|
|
||||||
logger.info(f"Certificate imported as: {imported_name}")
|
|
||||||
|
|
||||||
# Configure service
|
|
||||||
config_commands = [
|
|
||||||
f'/ip service set www-ssl certificate={imported_name}',
|
|
||||||
'/ip service enable www-ssl',
|
|
||||||
]
|
|
||||||
|
|
||||||
for cmd in config_commands:
|
|
||||||
self.execute_command(cmd, ignore_error=True)
|
|
||||||
else:
|
|
||||||
logger.warning("Could not find imported certificate name, using default")
|
|
||||||
self.execute_command(f'/ip service set www-ssl certificate={self.cert_name}_0', ignore_error=True)
|
|
||||||
self.execute_command('/ip service enable www-ssl', ignore_error=True)
|
|
||||||
|
|
||||||
logger.info(f"✓ MikroTik deployment successful")
|
|
||||||
return True, True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"MikroTik deployment failed: {e}")
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
|
||||||
"""
|
|
||||||
Upload certificate to MikroTik
|
|
||||||
Returns (success, was_uploaded)
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
if check_first and source_cert:
|
|
||||||
if not self.check_certificate_expiry(source_cert):
|
|
||||||
return True, False
|
|
||||||
|
|
||||||
logger.info("Deploying MikroTik certificate")
|
|
||||||
|
|
||||||
self.execute_command('/ip service disable www-ssl', ignore_error=True)
|
|
||||||
|
|
||||||
cleanup_commands = [
|
|
||||||
f'/certificate remove [find name~"{self.cert_name}"]',
|
|
||||||
f'/file remove "{self.cert_name}.pem"',
|
|
||||||
]
|
|
||||||
|
|
||||||
if key_path:
|
|
||||||
cleanup_commands.append(f'/file remove "ssl-key.pem"')
|
|
||||||
|
|
||||||
for cmd in cleanup_commands:
|
|
||||||
self.execute_command(cmd, ignore_error=True)
|
|
||||||
|
|
||||||
logger.info("Uploading certificate")
|
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
|
||||||
scp.put(cert_path, f'{self.cert_name}.pem')
|
|
||||||
|
|
||||||
if key_path:
|
|
||||||
logger.info("Uploading private key")
|
|
||||||
with SCPClient(self.ssh_client.get_transport()) as scp:
|
|
||||||
scp.put(key_path, 'ssl-key.pem')
|
|
||||||
|
|
||||||
logger.info("Importing certificate")
|
|
||||||
self.execute_command(f'/certificate import file-name={self.cert_name}.pem passphrase=""', timeout=30)
|
|
||||||
|
|
||||||
import time
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
config_commands = [
|
|
||||||
f'/ip service set www-ssl certificate={self.cert_name}_0',
|
|
||||||
'/ip service enable www-ssl',
|
|
||||||
]
|
|
||||||
|
|
||||||
for cmd in config_commands:
|
|
||||||
self.execute_command(cmd, ignore_error=True)
|
|
||||||
|
|
||||||
logger.info(f"✓ MikroTik deployment successful")
|
|
||||||
return True, True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"MikroTik deployment failed: {e}")
|
|
||||||
return False, False
|
|
||||||
|
|
||||||
|
|
||||||
class ProxmoxManager(SSHManager):
|
class ProxmoxManager(SSHManager):
|
||||||
|
|||||||
Reference in New Issue
Block a user