mikrotik check cert

This commit is contained in:
Mateusz Gruszczyński
2025-10-27 09:10:54 +01:00
parent 864ee27d01
commit dccad5d7a4

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
CertPusher - Automated SSL Certificate Distribution Tool
Version 1.1 - Production Ready
Version 1.2 - Production Ready with MikroTik Services Support
"""
import configparser
@@ -14,10 +14,9 @@ import re
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple
from typing import Dict, Optional, Tuple, List
import paramiko
from scp import SCPClient
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
@@ -49,10 +48,7 @@ logger = logging.getLogger('CertPusher')
def normalize_serial(serial: str) -> str:
"""
Normalize certificate serial number
Removes leading zeros, colons, spaces and converts to uppercase
"""
"""Normalize certificate serial number"""
normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '')
normalized = normalized.lstrip('0') or '0'
return normalized
@@ -71,7 +67,7 @@ class CertificateManager:
logger.debug(f"Loaded certificate from {cert_path}")
return cert
except Exception as e:
logger.error(f"Failed to load certificate from {cert_path}: {e}")
logger.error(f"Failed to load certificate: {e}")
return None
@staticmethod
@@ -96,19 +92,19 @@ class CertificateManager:
cert = x509.load_der_x509_certificate(der_cert, default_backend())
return cert
except Exception as e:
logger.warning(f"Failed to retrieve certificate from {url}: {e}")
logger.warning(f"Failed to retrieve certificate: {e}")
return None
@staticmethod
def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool:
"""Compare two certificates by normalized serial number"""
"""Compare two certificates by normalized serial"""
try:
serial1 = normalize_serial(format(cert1.serial_number, 'X').upper())
serial2 = normalize_serial(format(cert2.serial_number, 'X').upper())
logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}")
logger.debug(f"Comparing: {serial1} vs {serial2}")
return serial1 == serial2
except Exception as e:
logger.error(f"Failed to compare certificates: {e}")
logger.error(f"Comparison failed: {e}")
return False
@staticmethod
@@ -126,7 +122,7 @@ class CertificateManager:
Expires: {valid_to}
Days left: {days_left}"""
except Exception as e:
return f"Unable to extract certificate info: {e}"
return f"Unable to extract info: {e}"
class SSHManager:
@@ -163,7 +159,7 @@ class SSHManager:
continue
if not private_key:
logger.error(f"Could not load SSH key from {self.key_path}")
logger.error(f"Could not load SSH key")
return False
self.ssh_client.connect(
@@ -225,7 +221,7 @@ class SSHManager:
return False, "", str(e)
def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool:
"""Check if remote certificate matches source certificate"""
"""Check if remote certificate matches source"""
try:
logger.info("Checking remote certificate via SSH")
@@ -238,27 +234,24 @@ class SSHManager:
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
remote_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
remote_serial = normalize_serial(serial_match.group(1).upper())
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
remote_serial = normalize_serial(remote_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Remote serial: {remote_serial}")
logger.info(f"Source: {source_serial}")
logger.info(f"Remote: {remote_serial}")
if source_serial == remote_serial:
logger.info("✓ Certificates match. Skipping upload.")
logger.info("✓ Certificates match. Skipping.")
return False
else:
logger.info("✗ Certificates differ. Upload needed.")
return True
logger.warning("Could not read remote certificate via SSH")
logger.warning("Could not read remote certificate")
return True
except Exception as e:
logger.warning(f"Error checking remote certificate: {e}")
logger.warning(f"Error checking: {e}")
return True
def disconnect(self):
@@ -283,7 +276,6 @@ class MikroTikManager(SSHManager):
source_expiry = source_cert.not_valid_after_utc
# Look for certificate with our name pattern
success, stdout, stderr = self.execute_command(
'/certificate print detail where name~"letsencrypt"',
ignore_error=True
@@ -295,7 +287,6 @@ class MikroTikManager(SSHManager):
logger.debug(f"Found certificate:\n{stdout[:500]}")
# Parse expiry (RouterOS 7.x format)
invalid_after_match = re.search(
r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})',
stdout,
@@ -312,7 +303,7 @@ class MikroTikManager(SSHManager):
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S')
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
except ValueError:
logger.warning(f"Could not parse date: {mikrotik_expiry_str}")
logger.warning(f"Could not parse date")
return True
logger.info(f"Source expires: {source_expiry}")
@@ -324,165 +315,126 @@ class MikroTikManager(SSHManager):
logger.info("✓ Certificate is current. Skipping.")
return False
else:
logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.")
logger.info(f"Certificate differs. Upload needed.")
return True
except Exception as e:
logger.warning(f"Error checking: {e}")
return True
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate, services: list = None) -> Tuple[bool, bool]:
"""
Upload certificate to MikroTik
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, services: List[str] = None) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
Args:
cert_path: Path to certificate file
key_path: Path to private key file
check_first: Whether to check existing certificate first
source_cert: Source certificate object for comparison
services: List of services to configure (e.g. ['www-ssl', 'api-ssl'])
"""
if not services:
services = ['www-ssl']
logger.info(f"Deploying certificate for: {', '.join(services)}")
# Disable services
for service in services:
logger.info(f"Disabling {service}")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Cleanup
logger.info("Cleaning up old certificates")
cleanup = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Upload
logger.info("Uploading certificate")
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
# Default to www-ssl if not specified
if not services:
services = ['www-ssl']
logger.info(f"Deploying MikroTik certificate for services: {', '.join(services)}")
# Step 1: Disable all services that will use the certificate
for service in services:
logger.info(f"Disabling {service} service")
self.execute_command(f'/ip service disable {service}', ignore_error=True)
import time
time.sleep(1)
# Step 2: Remove old files and certificates
logger.info("Cleaning up old certificates")
cleanup_commands = [
'/certificate remove [find name~"letsencrypt"]',
'/file remove "letsencrypt.pem"',
'/file remove "letsencrypt-key.pem"',
]
for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Step 3: Upload certificate via SCP
logger.info(f"Uploading certificate: {cert_path} -> letsencrypt.pem")
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(cert_path, 'letsencrypt.pem')
logger.info("✓ Certificate uploaded")
except Exception as e:
logger.error(f"Upload failed: {e}")
return False, False
if key_path:
logger.info("Uploading key")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(cert_path, 'letsencrypt.pem')
logger.info("Certificate file uploaded")
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("Key uploaded")
except Exception as e:
logger.error(f"SCP upload failed: {e}")
logger.error(f"Key upload failed: {e}")
return False, False
# Step 4: Upload private key
if key_path:
logger.info(f"Uploading key: {key_path} -> letsencrypt-key.pem")
try:
with SCPClient(self.ssh_client.get_transport(), progress=None) as scp:
scp.put(key_path, 'letsencrypt-key.pem')
logger.info("✓ Key file uploaded")
except Exception as e:
logger.error(f"Key upload failed: {e}")
return False, False
time.sleep(2)
# Step 5: Verify files were uploaded
logger.info("Verifying uploaded files...")
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True
time.sleep(2)
# Verify
success, stdout, stderr = self.execute_command(
'/file print where name~"letsencrypt"',
ignore_error=True
)
if not success or 'letsencrypt.pem' not in stdout:
logger.error("Files not found on MikroTik!")
return False, False
logger.info("✓ Files verified")
# Import
logger.info("Importing certificate")
success, stdout, stderr = self.execute_command(
'/certificate import file-name=letsencrypt.pem passphrase=""',
timeout=30
)
if not success:
logger.error(f"Import failed: {stderr}")
return False, False
logger.info("✓ Certificate imported")
time.sleep(2)
# Use predictable name
imported_cert_name = "letsencrypt.pem_0"
logger.info(f"Using certificate: {imported_cert_name}")
# Configure services
for service in services:
logger.info(f"Configuring {service}")
success, _, stderr = self.execute_command(
f'/ip service set {service} certificate="{imported_cert_name}"'
)
if not success or 'letsencrypt.pem' not in stdout:
logger.error("Certificate file not found on MikroTik!")
self.execute_command('/file print')
return False, False
logger.info("✓ Files verified on MikroTik")
logger.debug(f"Files:\n{stdout}")
# Step 6: Import certificate
logger.info("Importing certificate into MikroTik")
import_cmd = '/certificate import file-name=letsencrypt.pem passphrase=""'
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
if not success:
logger.error(f"Import failed: {stderr}")
return False, False
logger.info("✓ Certificate imported")
logger.debug(f"Import output: {stdout}")
time.sleep(2)
# Step 7: Use predictable certificate name
# MikroTik always creates filename_0 for the leaf certificate
imported_cert_name = "letsencrypt.pem_0"
logger.info(f"Using certificate: {imported_cert_name}")
# Verify it exists
success, stdout, stderr = self.execute_command(
f'/certificate print where name="{imported_cert_name}"'
)
if not success or not stdout:
logger.warning(f"Certificate {imported_cert_name} not found, trying to find it...")
success, stdout, stderr = self.execute_command('/certificate print where name~"letsencrypt"')
if stdout:
logger.debug(f"Available certificates:\n{stdout}")
# Try to parse actual name
match = re.search(r'name="?([a-zA-Z0-9._-]*letsencrypt[^"\s]*)"?', stdout)
if match:
imported_cert_name = match.group(1)
logger.info(f"Found certificate: {imported_cert_name}")
# Step 8: Configure all specified services
for service in services:
logger.info(f"Configuring {service} to use certificate")
config_cmd = f'/ip service set {service} certificate="{imported_cert_name}"'
success, stdout, stderr = self.execute_command(config_cmd)
if not success:
logger.error(f"Failed to configure {service}: {stderr}")
logger.warning(f"Continuing with other services...")
else:
logger.info(f"{service} configured")
# Step 9: Enable all services
for service in services:
logger.info(f"Enabling {service} service")
self.execute_command(f'/ip service enable {service}')
time.sleep(1)
# Step 10: Verify services status
for service in services:
success, stdout, stderr = self.execute_command(
f'/ip service print where name="{service}"'
)
if success and stdout:
logger.debug(f"{service} status:\n{stdout}")
logger.info(f"✓ MikroTik deployment completed successfully")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
import traceback
logger.error(traceback.format_exc())
return False, False
logger.error(f"Failed to configure {service}: {stderr}")
else:
logger.info(f"{service} configured")
# Enable services
for service in services:
logger.info(f"Enabling {service}")
self.execute_command(f'/ip service enable {service}')
time.sleep(1)
logger.info(f"✓ MikroTik deployment completed")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers"""
@@ -492,9 +444,8 @@ class ProxmoxManager(SSHManager):
try:
logger.info("Checking Proxmox certificate")
# Method 1: Check via SSH
success, stdout, stderr = self.execute_command(
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null',
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial 2>/dev/null',
ignore_error=True
)
@@ -502,44 +453,34 @@ class ProxmoxManager(SSHManager):
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
proxmox_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
proxmox_serial = normalize_serial(proxmox_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Proxmox serial: {proxmox_serial}")
if source_serial == proxmox_serial:
logger.info("✓ Certificates match (SSH check). Skipping upload.")
return False
else:
logger.info("✗ Certificates differ (SSH check). Upload needed.")
return True
# Method 2: Fallback to URL check
if check_url:
logger.info("SSH check failed. Trying URL-based check...")
cert_manager = CertificateManager()
remote_cert = cert_manager.get_cert_from_url(check_url)
if remote_cert:
remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper())
proxmox_serial = normalize_serial(serial_match.group(1).upper())
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
if remote_serial == source_serial:
logger.info("✓ Certificates match (URL check). Skipping upload.")
logger.info(f"Source: {source_serial}")
logger.info(f"Proxmox: {proxmox_serial}")
if source_serial == proxmox_serial:
logger.info("✓ Certificates match. Skipping.")
return False
else:
logger.info("✗ Certificates differ (URL check). Upload needed.")
logger.info("✗ Certificates differ. Upload needed.")
return True
logger.warning("Could not verify certificate. Proceeding with upload for safety.")
if check_url:
logger.info("Trying URL check...")
cert_mgr = CertificateManager()
remote_cert = cert_mgr.get_cert_from_url(check_url)
if remote_cert:
if cert_mgr.compare_certificates(source_cert, remote_cert):
logger.info("✓ Certificates match via URL. Skipping.")
return False
logger.warning("Could not verify. Proceeding with upload.")
return True
except Exception as e:
logger.warning(f"Error checking: {e}. Proceeding with upload.")
logger.warning(f"Error checking: {e}")
return True
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
@@ -558,13 +499,8 @@ class ProxmoxManager(SSHManager):
if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'):
return False, False
commands = [
'chmod 640 /etc/pve/local/pveproxy-ssl.key',
'chown root:www-data /etc/pve/local/pveproxy-ssl.key',
]
for cmd in commands:
self.execute_command(cmd)
self.execute_command('chmod 640 /etc/pve/local/pveproxy-ssl.key')
self.execute_command('chown root:www-data /etc/pve/local/pveproxy-ssl.key')
logger.info("Restarting pveproxy")
self.execute_command('systemctl restart pveproxy', timeout=30)
@@ -584,7 +520,6 @@ class ProxmoxManager(SSHManager):
logger.error(f"Proxmox deployment failed: {e}")
return False, False
class CertPusher:
"""Main application class"""
@@ -640,7 +575,7 @@ class CertPusher:
check_first = self.config.getboolean(section, 'check_before_upload', fallback=True)
# Get services to configure (default: www-ssl)
# Get services to configure
services_str = self.config.get(section, 'mikrotik_services', fallback='www-ssl')
services = [s.strip() for s in services_str.split(',')]
@@ -657,7 +592,7 @@ class CertPusher:
source_key_path,
check_first,
source_cert,
services # Pass services list
services
)
mikrotik.disconnect()
@@ -676,7 +611,6 @@ class CertPusher:
logger.error(f"MikroTik failed: {e}")
self.stats['failed'] += 1
return False
def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process Proxmox server"""
@@ -745,13 +679,12 @@ class CertPusher:
logger.info(f"Host: {hostname}:{port} ({device_type})")
logger.info(f"User: {username}")
# Route to specialized handlers
if device_type.lower() == 'mikrotik':
return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path)
elif device_type.lower() == 'proxmox':
return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path)
# Standard host processing
# Standard host
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
@@ -768,7 +701,6 @@ class CertPusher:
self.stats['failed'] += 1
return False
# Check if upload needed
upload_needed = True
if check_first:
@@ -786,13 +718,11 @@ class CertPusher:
self.stats['skipped'] += 1
return True
# Upload certificate
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect()
self.stats['failed'] += 1
return False
# Upload key if specified
if self.config.has_option(section, 'remote_key_path'):
remote_key_path = self.config.get(section, 'remote_key_path')
source_key_path = self.get_key_path(section, source_cert_path)
@@ -800,14 +730,12 @@ class CertPusher:
if os.path.exists(source_key_path):
ssh.upload_file(source_key_path, remote_key_path)
# Additional files
if self.config.has_option(section, 'additional_files'):
for file_pair in self.config.get(section, 'additional_files').split(','):
if ':' in file_pair:
local, remote = file_pair.strip().split(':', 1)
ssh.upload_file(local, remote)
# Post-upload command
if post_upload_command:
logger.info("Executing post-upload command")
ssh.execute_command(post_upload_command)
@@ -863,14 +791,14 @@ def main():
parser = argparse.ArgumentParser(description='CertPusher - SSL Certificate Distribution')
parser.add_argument('config', help='Configuration file')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
parser.add_argument('-v', '--version', action='version', version='CertPusher 1.1')
parser.add_argument('-v', '--version', action='version', version='CertPusher 1.2')
args = parser.parse_args()
setup_logging(debug=args.debug)
print("""
╔═══════════════════════════════════════════════════════════╗
║ CertPusher v1.1
║ CertPusher v1.2
║ Automated SSL Certificate Distribution Tool ║
╚═══════════════════════════════════════════════════════════╝
""")