mikrotik check cert
This commit is contained in:
128
certpusher.py
128
certpusher.py
@@ -1,7 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
CertPusher - Automated SSL Certificate Distribution Tool
|
CertPusher - Automated SSL Certificate Distribution Tool
|
||||||
Version 1.1 - With serial number normalization
|
Version 1.1 - Production Ready
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import configparser
|
import configparser
|
||||||
@@ -52,15 +52,8 @@ def normalize_serial(serial: str) -> str:
|
|||||||
"""
|
"""
|
||||||
Normalize certificate serial number
|
Normalize certificate serial number
|
||||||
Removes leading zeros, colons, spaces and converts to uppercase
|
Removes leading zeros, colons, spaces and converts to uppercase
|
||||||
|
|
||||||
Examples:
|
|
||||||
'05BC46A0...' -> '5BC46A0...'
|
|
||||||
'0005BC...' -> '5BC...'
|
|
||||||
'00' -> '0'
|
|
||||||
"""
|
"""
|
||||||
# Remove formatting characters
|
|
||||||
normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '')
|
normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '')
|
||||||
# Remove leading zeros but keep at least one digit
|
|
||||||
normalized = normalized.lstrip('0') or '0'
|
normalized = normalized.lstrip('0') or '0'
|
||||||
return normalized
|
return normalized
|
||||||
|
|
||||||
@@ -112,7 +105,6 @@ class CertificateManager:
|
|||||||
try:
|
try:
|
||||||
serial1 = normalize_serial(format(cert1.serial_number, 'X').upper())
|
serial1 = normalize_serial(format(cert1.serial_number, 'X').upper())
|
||||||
serial2 = normalize_serial(format(cert2.serial_number, 'X').upper())
|
serial2 = normalize_serial(format(cert2.serial_number, 'X').upper())
|
||||||
|
|
||||||
logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}")
|
logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}")
|
||||||
return serial1 == serial2
|
return serial1 == serial2
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -233,10 +225,7 @@ class SSHManager:
|
|||||||
return False, "", str(e)
|
return False, "", str(e)
|
||||||
|
|
||||||
def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool:
|
def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool:
|
||||||
"""
|
"""Check if remote certificate matches source certificate"""
|
||||||
Check if remote certificate matches source certificate
|
|
||||||
Returns True if upload needed, False if certificates match
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
logger.info("Checking remote certificate via SSH")
|
logger.info("Checking remote certificate via SSH")
|
||||||
|
|
||||||
@@ -252,7 +241,6 @@ class SSHManager:
|
|||||||
remote_serial_raw = serial_match.group(1).upper()
|
remote_serial_raw = serial_match.group(1).upper()
|
||||||
source_serial_raw = format(source_cert.serial_number, 'X').upper()
|
source_serial_raw = format(source_cert.serial_number, 'X').upper()
|
||||||
|
|
||||||
# Normalize both serials
|
|
||||||
remote_serial = normalize_serial(remote_serial_raw)
|
remote_serial = normalize_serial(remote_serial_raw)
|
||||||
source_serial = normalize_serial(source_serial_raw)
|
source_serial = normalize_serial(source_serial_raw)
|
||||||
|
|
||||||
@@ -285,10 +273,63 @@ class MikroTikManager(SSHManager):
|
|||||||
|
|
||||||
def __init__(self, hostname: str, port: int, username: str, key_path: str):
|
def __init__(self, hostname: str, port: int, username: str, key_path: str):
|
||||||
super().__init__(hostname, port, username, key_path)
|
super().__init__(hostname, port, username, key_path)
|
||||||
# Use simple name without special characters
|
|
||||||
self.cert_name = "letsencrypt"
|
self.cert_name = "letsencrypt"
|
||||||
self.key_name = "letsencrypt-key"
|
self.key_name = "letsencrypt-key"
|
||||||
|
|
||||||
|
def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool:
|
||||||
|
"""Check if certificate on MikroTik needs update"""
|
||||||
|
try:
|
||||||
|
logger.info("Checking MikroTik certificate")
|
||||||
|
|
||||||
|
source_expiry = source_cert.not_valid_after_utc
|
||||||
|
|
||||||
|
# Look for certificate with our name pattern
|
||||||
|
success, stdout, stderr = self.execute_command(
|
||||||
|
'/certificate print detail where name~"letsencrypt"',
|
||||||
|
ignore_error=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if not success or not stdout or 'invalid-after' not in stdout.lower():
|
||||||
|
logger.info("Certificate not found. Upload needed.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.debug(f"Found certificate:\n{stdout[:500]}")
|
||||||
|
|
||||||
|
# Parse expiry (RouterOS 7.x format)
|
||||||
|
invalid_after_match = re.search(
|
||||||
|
r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})',
|
||||||
|
stdout,
|
||||||
|
re.IGNORECASE
|
||||||
|
)
|
||||||
|
|
||||||
|
if not invalid_after_match:
|
||||||
|
logger.warning("Could not parse expiry")
|
||||||
|
return True
|
||||||
|
|
||||||
|
mikrotik_expiry_str = invalid_after_match.group(1)
|
||||||
|
|
||||||
|
try:
|
||||||
|
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S')
|
||||||
|
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
logger.warning(f"Could not parse date: {mikrotik_expiry_str}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.info(f"Source expires: {source_expiry}")
|
||||||
|
logger.info(f"MikroTik expires: {mikrotik_expiry}")
|
||||||
|
|
||||||
|
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
|
||||||
|
|
||||||
|
if time_diff < 86400:
|
||||||
|
logger.info("✓ Certificate is current. Skipping.")
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error checking: {e}")
|
||||||
|
return True
|
||||||
|
|
||||||
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
|
||||||
"""Upload certificate to MikroTik"""
|
"""Upload certificate to MikroTik"""
|
||||||
@@ -319,24 +360,17 @@ class MikroTikManager(SSHManager):
|
|||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Step 3: Show current files (debug)
|
# Step 3: Upload certificate via SCP
|
||||||
logger.debug("Files before upload:")
|
|
||||||
success, stdout, stderr = self.execute_command('/file print', ignore_error=True)
|
|
||||||
if stdout:
|
|
||||||
logger.debug(stdout[:500])
|
|
||||||
|
|
||||||
# Step 4: Upload certificate via SCP
|
|
||||||
logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem")
|
logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem")
|
||||||
try:
|
try:
|
||||||
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
|
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
|
||||||
# Upload to root directory
|
|
||||||
scp.put(cert_path, 'letsencrypt.pem')
|
scp.put(cert_path, 'letsencrypt.pem')
|
||||||
logger.info("✓ Certificate file uploaded")
|
logger.info("✓ Certificate file uploaded")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"SCP upload failed: {e}")
|
logger.error(f"SCP upload failed: {e}")
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
# Step 5: Upload private key
|
# Step 4: Upload private key
|
||||||
if key_path:
|
if key_path:
|
||||||
logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem")
|
logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem")
|
||||||
try:
|
try:
|
||||||
@@ -349,7 +383,7 @@ class MikroTikManager(SSHManager):
|
|||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
# Step 6: Verify files were uploaded
|
# Step 5: Verify files were uploaded
|
||||||
logger.info("Verifying uploaded files...")
|
logger.info("Verifying uploaded files...")
|
||||||
success, stdout, stderr = self.execute_command(
|
success, stdout, stderr = self.execute_command(
|
||||||
'/file print where name~"letsencrypt"',
|
'/file print where name~"letsencrypt"',
|
||||||
@@ -365,18 +399,14 @@ class MikroTikManager(SSHManager):
|
|||||||
logger.info("✓ Files verified on MikroTik")
|
logger.info("✓ Files verified on MikroTik")
|
||||||
logger.debug(f"Files:\n{stdout}")
|
logger.debug(f"Files:\n{stdout}")
|
||||||
|
|
||||||
# Step 7: Import certificate
|
# Step 6: Import certificate
|
||||||
logger.info("Importing certificate into MikroTik")
|
logger.info("Importing certificate into MikroTik")
|
||||||
import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""'
|
import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""'
|
||||||
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
|
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
|
||||||
|
|
||||||
if not success:
|
if not success:
|
||||||
logger.error(f"Import failed: {stderr}")
|
logger.error(f"Import failed: {stderr}")
|
||||||
logger.error("Trying to diagnose...")
|
|
||||||
# Show file details
|
|
||||||
self.execute_command('/file print detail where name="letsencrypt.pem"')
|
self.execute_command('/file print detail where name="letsencrypt.pem"')
|
||||||
# Try to see if file is readable
|
|
||||||
self.execute_command('/file print file-name=letsencrypt.pem')
|
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
logger.info("✓ Certificate imported")
|
logger.info("✓ Certificate imported")
|
||||||
@@ -384,7 +414,7 @@ class MikroTikManager(SSHManager):
|
|||||||
|
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
# Step 8: Find imported certificate
|
# Step 7: Find imported certificate
|
||||||
logger.info("Looking for imported certificate...")
|
logger.info("Looking for imported certificate...")
|
||||||
success, stdout, stderr = self.execute_command(
|
success, stdout, stderr = self.execute_command(
|
||||||
'/certificate print terse where name~"letsencrypt"'
|
'/certificate print terse where name~"letsencrypt"'
|
||||||
@@ -392,7 +422,6 @@ class MikroTikManager(SSHManager):
|
|||||||
|
|
||||||
if not success or not stdout:
|
if not success or not stdout:
|
||||||
logger.error("Could not find imported certificate!")
|
logger.error("Could not find imported certificate!")
|
||||||
logger.error("All certificates:")
|
|
||||||
self.execute_command('/certificate print')
|
self.execute_command('/certificate print')
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
@@ -405,37 +434,34 @@ class MikroTikManager(SSHManager):
|
|||||||
logger.error("Could not parse certificate names")
|
logger.error("Could not parse certificate names")
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
# Use the first one (usually the leaf certificate)
|
|
||||||
imported_cert_name = cert_names[0]
|
imported_cert_name = cert_names[0]
|
||||||
logger.info(f"Using certificate: {imported_cert_name}")
|
logger.info(f"Using certificate: {imported_cert_name}")
|
||||||
|
|
||||||
# Step 9: Configure www-ssl service
|
# Step 8: Configure www-ssl service
|
||||||
logger.info("Configuring www-ssl to use new certificate")
|
logger.info("Configuring www-ssl to use new certificate")
|
||||||
config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"'
|
config_cmd = f'/ip service set www-ssl certificate="{imported_cert_name}"'
|
||||||
success, stdout, stderr = self.execute_command(config_cmd)
|
success, stdout, stderr = self.execute_command(config_cmd)
|
||||||
|
|
||||||
if not success:
|
if not success:
|
||||||
logger.error(f"Failed to configure service: {stderr}")
|
logger.error(f"Failed to configure service: {stderr}")
|
||||||
logger.error("Available certificates:")
|
|
||||||
self.execute_command('/certificate print')
|
self.execute_command('/certificate print')
|
||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
logger.info("✓ Service configured")
|
logger.info("✓ Service configured")
|
||||||
|
|
||||||
# Step 10: Enable www-ssl
|
# Step 9: Enable www-ssl
|
||||||
logger.info("Enabling www-ssl service")
|
logger.info("Enabling www-ssl service")
|
||||||
self.execute_command('/ip service enable www-ssl')
|
self.execute_command('/ip service enable www-ssl')
|
||||||
|
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# Step 11: Verify service status
|
# Step 10: Verify service status
|
||||||
success, stdout, stderr = self.execute_command(
|
success, stdout, stderr = self.execute_command(
|
||||||
'/ip service print where name="www-ssl"'
|
'/ip service print where name="www-ssl"'
|
||||||
)
|
)
|
||||||
|
|
||||||
if success and stdout:
|
if success and stdout:
|
||||||
logger.info("Service status:")
|
logger.debug(f"Service status:\n{stdout}")
|
||||||
logger.info(stdout)
|
|
||||||
|
|
||||||
logger.info(f"✓ MikroTik deployment completed successfully")
|
logger.info(f"✓ MikroTik deployment completed successfully")
|
||||||
return True, True
|
return True, True
|
||||||
@@ -447,23 +473,15 @@ class MikroTikManager(SSHManager):
|
|||||||
return False, False
|
return False, False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ProxmoxManager(SSHManager):
|
class ProxmoxManager(SSHManager):
|
||||||
"""Specialized manager for Proxmox VE servers"""
|
"""Specialized manager for Proxmox VE servers"""
|
||||||
|
|
||||||
def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool:
|
def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool:
|
||||||
"""
|
"""Check if certificate on Proxmox needs update"""
|
||||||
Check if certificate on Proxmox needs update
|
|
||||||
Returns True if upload needed, False if certificates match
|
|
||||||
|
|
||||||
Uses two methods:
|
|
||||||
1. SSH: Direct file check (preferred)
|
|
||||||
2. URL: HTTPS check (fallback)
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
logger.info("Checking Proxmox certificate")
|
logger.info("Checking Proxmox certificate")
|
||||||
|
|
||||||
# Method 1: Check via SSH - Direct file access
|
# Method 1: Check via SSH
|
||||||
success, stdout, stderr = self.execute_command(
|
success, stdout, stderr = self.execute_command(
|
||||||
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null',
|
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null',
|
||||||
ignore_error=True
|
ignore_error=True
|
||||||
@@ -476,7 +494,6 @@ class ProxmoxManager(SSHManager):
|
|||||||
proxmox_serial_raw = serial_match.group(1).upper()
|
proxmox_serial_raw = serial_match.group(1).upper()
|
||||||
source_serial_raw = format(source_cert.serial_number, 'X').upper()
|
source_serial_raw = format(source_cert.serial_number, 'X').upper()
|
||||||
|
|
||||||
# Normalize both serials (remove leading zeros)
|
|
||||||
proxmox_serial = normalize_serial(proxmox_serial_raw)
|
proxmox_serial = normalize_serial(proxmox_serial_raw)
|
||||||
source_serial = normalize_serial(source_serial_raw)
|
source_serial = normalize_serial(source_serial_raw)
|
||||||
|
|
||||||
@@ -497,13 +514,9 @@ class ProxmoxManager(SSHManager):
|
|||||||
remote_cert = cert_manager.get_cert_from_url(check_url)
|
remote_cert = cert_manager.get_cert_from_url(check_url)
|
||||||
|
|
||||||
if remote_cert:
|
if remote_cert:
|
||||||
# Compare using normalized serials
|
|
||||||
remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper())
|
remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper())
|
||||||
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
|
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
|
||||||
|
|
||||||
logger.info(f"Source serial (URL): {source_serial}")
|
|
||||||
logger.info(f"Remote serial (URL): {remote_serial}")
|
|
||||||
|
|
||||||
if remote_serial == source_serial:
|
if remote_serial == source_serial:
|
||||||
logger.info("✓ Certificates match (URL check). Skipping upload.")
|
logger.info("✓ Certificates match (URL check). Skipping upload.")
|
||||||
return False
|
return False
|
||||||
@@ -520,10 +533,7 @@ class ProxmoxManager(SSHManager):
|
|||||||
|
|
||||||
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
|
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
|
||||||
source_cert: x509.Certificate, check_url: str) -> Tuple[bool, bool]:
|
source_cert: x509.Certificate, check_url: str) -> Tuple[bool, bool]:
|
||||||
"""
|
"""Upload certificate to Proxmox"""
|
||||||
Upload certificate to Proxmox
|
|
||||||
Returns (success, was_uploaded)
|
|
||||||
"""
|
|
||||||
try:
|
try:
|
||||||
if check_first and source_cert:
|
if check_first and source_cert:
|
||||||
if not self.check_certificate(source_cert, check_url):
|
if not self.check_certificate(source_cert, check_url):
|
||||||
@@ -738,10 +748,8 @@ class CertPusher:
|
|||||||
upload_needed = True
|
upload_needed = True
|
||||||
|
|
||||||
if check_first:
|
if check_first:
|
||||||
# Try SSH check first
|
|
||||||
if not ssh.check_remote_certificate(remote_cert_path, source_cert):
|
if not ssh.check_remote_certificate(remote_cert_path, source_cert):
|
||||||
upload_needed = False
|
upload_needed = False
|
||||||
# Try URL check if SSH check failed
|
|
||||||
elif check_url:
|
elif check_url:
|
||||||
logger.info(f"SSH check failed. Trying URL: {check_url}")
|
logger.info(f"SSH check failed. Trying URL: {check_url}")
|
||||||
remote_cert = self.cert_manager.get_cert_from_url(check_url)
|
remote_cert = self.cert_manager.get_cert_from_url(check_url)
|
||||||
|
|||||||
Reference in New Issue
Block a user