Files
certpusher/certpusher.py
Mateusz Gruszczyński cb9eb16e96 mikrotik check cert
2025-10-27 08:12:12 +01:00

1015 lines
40 KiB
Python

#!/usr/bin/env python3
"""
CertPusher - Automated SSL Certificate Distribution Tool
Version 1.1 - With serial number normalization
"""
import configparser
import logging
import sys
import os
import ssl
import socket
import re
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple
import paramiko
from scp import SCPClient
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
# Create logs directory
LOG_DIR = Path(__file__).parent / 'logs'
LOG_DIR.mkdir(exist_ok=True)
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
LOG_FILE = LOG_DIR / f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
def setup_logging(debug: bool = False):
"""Setup logging with configurable level"""
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=log_level,
format=LOG_FORMAT,
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logging.getLogger('paramiko').setLevel(logging.WARNING)
logging.getLogger('paramiko.transport').setLevel(logging.WARNING)
logger = logging.getLogger('CertPusher')
def normalize_serial(serial: str) -> str:
"""
Normalize certificate serial number
Removes leading zeros, colons, spaces and converts to uppercase
Examples:
'05BC46A0...' -> '5BC46A0...'
'0005BC...' -> '5BC...'
'00' -> '0'
"""
# Remove formatting characters
normalized = serial.upper().replace(':', '').replace(' ', '').replace('-', '')
# Remove leading zeros but keep at least one digit
normalized = normalized.lstrip('0') or '0'
return normalized
class CertificateManager:
"""Manages certificate comparison and validation"""
@staticmethod
def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]:
"""Load certificate from file"""
try:
with open(cert_path, 'rb') as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
logger.debug(f"Loaded certificate from {cert_path}")
return cert
except Exception as e:
logger.error(f"Failed to load certificate from {cert_path}: {e}")
return None
@staticmethod
def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]:
"""Retrieve certificate from HTTPS URL"""
try:
hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0]
port = 443
if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]:
port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1])
logger.debug(f"Checking certificate at {hostname}:{port}")
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((hostname, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend())
return cert
except Exception as e:
logger.warning(f"Failed to retrieve certificate from {url}: {e}")
return None
@staticmethod
def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool:
"""Compare two certificates by normalized serial number"""
try:
serial1 = normalize_serial(format(cert1.serial_number, 'X').upper())
serial2 = normalize_serial(format(cert2.serial_number, 'X').upper())
logger.debug(f"Comparing normalized serials: {serial1} vs {serial2}")
return serial1 == serial2
except Exception as e:
logger.error(f"Failed to compare certificates: {e}")
return False
@staticmethod
def get_certificate_info(cert: x509.Certificate) -> str:
"""Get human-readable certificate information"""
try:
subject = cert.subject.rfc4514_string()
valid_to = cert.not_valid_after_utc
now = datetime.now(timezone.utc)
days_left = (valid_to - now).days
serial = normalize_serial(format(cert.serial_number, 'X').upper())
return f"""Certificate: {subject}
Serial: {serial}
Expires: {valid_to}
Days left: {days_left}"""
except Exception as e:
return f"Unable to extract certificate info: {e}"
class SSHManager:
"""Manages SSH connections and file transfers"""
def __init__(self, hostname: str, port: int, username: str, key_path: str):
self.hostname = hostname
self.port = port
self.username = username
self.key_path = key_path
self.ssh_client = None
def connect(self) -> bool:
"""Establish SSH connection"""
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}")
private_key = None
key_types = [
('RSA', paramiko.RSAKey),
('Ed25519', paramiko.Ed25519Key),
('ECDSA', paramiko.ECDSAKey),
]
for key_name, key_class in key_types:
try:
private_key = key_class.from_private_key_file(self.key_path)
logger.debug(f"Loaded {key_name} key")
break
except Exception:
continue
if not private_key:
logger.error(f"Could not load SSH key from {self.key_path}")
return False
self.ssh_client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
pkey=private_key,
timeout=30,
banner_timeout=30,
auth_timeout=30
)
logger.info(f"✓ Connected to {self.hostname}:{self.port}")
return True
except Exception as e:
logger.error(f"SSH connection failed: {e}")
return False
def upload_file(self, local_path: str, remote_path: str) -> bool:
"""Upload file via SCP"""
try:
logger.info(f"Uploading to {remote_path}")
remote_dir = os.path.dirname(remote_path)
if remote_dir:
self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True)
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(local_path, remote_path)
logger.info(f"✓ Upload successful")
return True
except Exception as e:
logger.error(f"Upload failed: {e}")
return False
def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]:
"""Execute command on remote server"""
try:
logger.debug(f"Executing: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
exit_status = stdout.channel.recv_exit_status()
stdout_text = stdout.read().decode('utf-8', errors='ignore')
stderr_text = stderr.read().decode('utf-8', errors='ignore')
if exit_status != 0 and not ignore_error:
logger.error(f"Command failed with exit code {exit_status}")
if stderr_text:
logger.debug(f"Error: {stderr_text}")
return exit_status == 0, stdout_text, stderr_text
except Exception as e:
logger.error(f"Command execution failed: {e}")
return False, "", str(e)
def check_remote_certificate(self, remote_cert_path: str, source_cert: x509.Certificate) -> bool:
"""
Check if remote certificate matches source certificate
Returns True if upload needed, False if certificates match
"""
try:
logger.info("Checking remote certificate via SSH")
success, stdout, stderr = self.execute_command(
f'openssl x509 -in {remote_cert_path} -noout -serial 2>/dev/null || echo "NOTFOUND"',
ignore_error=True
)
if success and stdout and "NOTFOUND" not in stdout:
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
remote_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
# Normalize both serials
remote_serial = normalize_serial(remote_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Remote serial: {remote_serial}")
if source_serial == remote_serial:
logger.info("✓ Certificates match. Skipping upload.")
return False
else:
logger.info("✗ Certificates differ. Upload needed.")
return True
logger.warning("Could not read remote certificate via SSH")
return True
except Exception as e:
logger.warning(f"Error checking remote certificate: {e}")
return True
def disconnect(self):
"""Close SSH connection"""
if self.ssh_client:
self.ssh_client.close()
logger.debug(f"Disconnected")
class MikroTikManager(SSHManager):
"""Specialized manager for MikroTik RouterOS devices"""
def __init__(self, hostname: str, port: int, username: str, key_path: str):
super().__init__(hostname, port, username, key_path)
# Use simple name without special characters
self.cert_name = "letsencrypt"
self.key_name = "letsencrypt-key"
def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool:
"""Check if certificate on MikroTik needs update"""
try:
logger.info("Checking MikroTik certificate")
# Get source certificate common-name
source_cn = source_cert.subject.rfc4514_string()
source_expiry = source_cert.not_valid_after_utc
# Extract CN value (e.g., "*.linuxiarz.pl" from "CN=*.linuxiarz.pl")
cn_value = source_cn.split("CN=")[1].split(",")[0] if "CN=" in source_cn else source_cn
logger.info(f"Looking for cert with CN: {cn_value}")
logger.info(f"Source expires: {source_expiry}")
# Search by common-name (escape wildcards for MikroTik)
cn_search = cn_value.replace("*", "\\*")
success, stdout, stderr = self.execute_command(
f'/certificate print detail where common-name="{cn_search}"',
ignore_error=True
)
if not success or not stdout or 'invalid-after' not in stdout.lower():
logger.info("Certificate not found. Upload needed.")
return True
logger.debug(f"Found certificate:\n{stdout[:500]}")
# Parse expiry (RouterOS 7.x format: 2026-01-22 08:34:12)
invalid_after_match = re.search(
r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})',
stdout,
re.IGNORECASE
)
if not invalid_after_match:
logger.warning("Could not parse expiry")
return True
mikrotik_expiry_str = invalid_after_match.group(1)
try:
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S')
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
except ValueError:
logger.warning(f"Could not parse date: {mikrotik_expiry_str}")
return True
logger.info(f"MikroTik expires: {mikrotik_expiry}")
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
# Allow 24h tolerance
if time_diff < 86400:
logger.info("✓ Certificate is current. Skipping.")
return False
else:
logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.")
return True
except Exception as e:
logger.warning(f"Error checking: {e}")
import traceback
logger.debug(traceback.format_exc())
return True
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
logger.info("Deploying MikroTik certificate")
# Step 1: Disable www-ssl
logger.debug("Disabling www-ssl service")
self.execute_command('/ip service disable www-ssl', ignore_error=True)
import time
time.sleep(1)
# Step 2: Remove old certificates with our name
logger.debug("Cleaning up old certificates")
cleanup_commands = [
f'/certificate remove [find name~"{self.cert_name}"]',
f'/file remove "{self.cert_name}.pem"',
f'/file remove "{self.key_name}.pem"',
]
for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True)
time.sleep(1)
# Step 3: Upload certificate
logger.info(f"Uploading certificate as: {self.cert_name}.pem")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(cert_path, f'{self.cert_name}.pem')
# Step 4: Upload private key
if key_path:
logger.info(f"Uploading key as: {self.key_name}.pem")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(key_path, f'{self.key_name}.pem')
time.sleep(1)
# Step 5: Import certificate (fullchain includes both cert and chain)
logger.info("Importing certificate")
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
if not success:
logger.error(f"Certificate import failed: {stderr}")
# Show available files for debugging
self.execute_command('/file print where name~".pem"')
return False, False
time.sleep(2)
# Step 6: Find imported certificate name
success, stdout, stderr = self.execute_command(
f'/certificate print terse where name~"{self.cert_name}"'
)
if not success or not stdout:
logger.error("Could not find imported certificate")
return False, False
logger.debug(f"Imported certificates:\n{stdout}")
# Extract certificate name (MikroTik adds _0, _1 suffixes)
# Format: 0 name="letsencrypt_0" ...
cert_names = re.findall(r'name="([^"]+)"', stdout)
if not cert_names:
logger.error("Could not parse certificate names")
return False, False
# Use first certificate (usually the actual cert, not CA)
imported_cert_name = cert_names[0]
logger.info(f"Using certificate: {imported_cert_name}")
# Step 7: Configure www-ssl service
logger.info("Configuring www-ssl service")
config_cmd = f'/ip service set www-ssl certificate={imported_cert_name}'
success, stdout, stderr = self.execute_command(config_cmd)
if not success:
logger.error(f"Failed to set certificate: {stderr}")
# Show available certificates
self.execute_command('/certificate print')
return False, False
# Step 8: Enable www-ssl
self.execute_command('/ip service enable www-ssl', ignore_error=True)
time.sleep(1)
# Step 9: Verify
success, stdout, stderr = self.execute_command(
'/ip service print where name="www-ssl"'
)
if success:
logger.debug(f"Service status:\n{stdout}")
logger.info(f"✓ MikroTik deployment successful")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
import traceback
logger.debug(traceback.format_exc())
return False, False
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
"""Upload certificate to MikroTik"""
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
logger.info("Deploying MikroTik certificate")
# Disable service
self.execute_command('/ip service disable www-ssl', ignore_error=True)
# Remove OLD certificates with our naming pattern
cleanup_commands = [
f'/certificate remove [find name~"certpusher"]',
f'/file remove [find name~"certpusher"]',
]
for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True)
# Upload with unique name
logger.info(f"Uploading as: {self.cert_name}")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(cert_path, f'{self.cert_name}.pem')
if key_path:
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(key_path, f'{self.cert_name}-key.pem')
# Import certificate
logger.info("Importing certificate")
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
if key_path:
import_cmd += f' name={self.cert_name}'
self.execute_command(import_cmd, timeout=30)
import time
time.sleep(2)
# Find the imported certificate (MikroTik adds _0, _1 suffixes)
success, stdout, stderr = self.execute_command(
f'/certificate print where name~"{self.cert_name}"'
)
if success and stdout:
logger.debug(f"Imported certificates:\n{stdout}")
# Extract certificate name (usually cert_name_0)
cert_match = re.search(r'name="([^"]+)"', stdout)
if cert_match:
imported_name = cert_match.group(1)
logger.info(f"Certificate imported as: {imported_name}")
# Configure service
config_commands = [
f'/ip service set www-ssl certificate={imported_name}',
'/ip service enable www-ssl',
]
for cmd in config_commands:
self.execute_command(cmd, ignore_error=True)
else:
logger.warning("Could not find imported certificate name, using default")
self.execute_command(f'/ip service set www-ssl certificate={self.cert_name}_0', ignore_error=True)
self.execute_command('/ip service enable www-ssl', ignore_error=True)
logger.info(f"✓ MikroTik deployment successful")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]:
"""
Upload certificate to MikroTik
Returns (success, was_uploaded)
"""
try:
if check_first and source_cert:
if not self.check_certificate_expiry(source_cert):
return True, False
logger.info("Deploying MikroTik certificate")
self.execute_command('/ip service disable www-ssl', ignore_error=True)
cleanup_commands = [
f'/certificate remove [find name~"{self.cert_name}"]',
f'/file remove "{self.cert_name}.pem"',
]
if key_path:
cleanup_commands.append(f'/file remove "ssl-key.pem"')
for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True)
logger.info("Uploading certificate")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(cert_path, f'{self.cert_name}.pem')
if key_path:
logger.info("Uploading private key")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(key_path, 'ssl-key.pem')
logger.info("Importing certificate")
self.execute_command(f'/certificate import file-name={self.cert_name}.pem passphrase=""', timeout=30)
import time
time.sleep(2)
config_commands = [
f'/ip service set www-ssl certificate={self.cert_name}_0',
'/ip service enable www-ssl',
]
for cmd in config_commands:
self.execute_command(cmd, ignore_error=True)
logger.info(f"✓ MikroTik deployment successful")
return True, True
except Exception as e:
logger.error(f"MikroTik deployment failed: {e}")
return False, False
class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers"""
def check_certificate(self, source_cert: x509.Certificate, check_url: str) -> bool:
"""
Check if certificate on Proxmox needs update
Returns True if upload needed, False if certificates match
Uses two methods:
1. SSH: Direct file check (preferred)
2. URL: HTTPS check (fallback)
"""
try:
logger.info("Checking Proxmox certificate")
# Method 1: Check via SSH - Direct file access
success, stdout, stderr = self.execute_command(
'openssl x509 -in /etc/pve/local/pveproxy-ssl.pem -noout -serial -dates 2>/dev/null',
ignore_error=True
)
if success and stdout:
serial_match = re.search(r'serial=([A-F0-9]+)', stdout, re.IGNORECASE)
if serial_match:
proxmox_serial_raw = serial_match.group(1).upper()
source_serial_raw = format(source_cert.serial_number, 'X').upper()
# Normalize both serials (remove leading zeros)
proxmox_serial = normalize_serial(proxmox_serial_raw)
source_serial = normalize_serial(source_serial_raw)
logger.info(f"Source serial: {source_serial}")
logger.info(f"Proxmox serial: {proxmox_serial}")
if source_serial == proxmox_serial:
logger.info("✓ Certificates match (SSH check). Skipping upload.")
return False
else:
logger.info("✗ Certificates differ (SSH check). Upload needed.")
return True
# Method 2: Fallback to URL check
if check_url:
logger.info("SSH check failed. Trying URL-based check...")
cert_manager = CertificateManager()
remote_cert = cert_manager.get_cert_from_url(check_url)
if remote_cert:
# Compare using normalized serials
remote_serial = normalize_serial(format(remote_cert.serial_number, 'X').upper())
source_serial = normalize_serial(format(source_cert.serial_number, 'X').upper())
logger.info(f"Source serial (URL): {source_serial}")
logger.info(f"Remote serial (URL): {remote_serial}")
if remote_serial == source_serial:
logger.info("✓ Certificates match (URL check). Skipping upload.")
return False
else:
logger.info("✗ Certificates differ (URL check). Upload needed.")
return True
logger.warning("Could not verify certificate. Proceeding with upload for safety.")
return True
except Exception as e:
logger.warning(f"Error checking: {e}. Proceeding with upload.")
return True
def upload_certificate(self, cert_path: str, key_path: str, check_first: bool,
source_cert: x509.Certificate, check_url: str) -> Tuple[bool, bool]:
"""
Upload certificate to Proxmox
Returns (success, was_uploaded)
"""
try:
if check_first and source_cert:
if not self.check_certificate(source_cert, check_url):
return True, False
logger.info("Deploying Proxmox certificate")
if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'):
return False, False
if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'):
return False, False
commands = [
'chmod 640 /etc/pve/local/pveproxy-ssl.key',
'chown root:www-data /etc/pve/local/pveproxy-ssl.key',
]
for cmd in commands:
self.execute_command(cmd)
logger.info("Restarting pveproxy")
self.execute_command('systemctl restart pveproxy', timeout=30)
import time
time.sleep(3)
success, stdout, _ = self.execute_command('systemctl is-active pveproxy')
if success and 'active' in stdout:
logger.info(f"✓ Proxmox deployment successful")
return True, True
else:
logger.error("pveproxy not active")
return False, False
except Exception as e:
logger.error(f"Proxmox deployment failed: {e}")
return False, False
class CertPusher:
"""Main application class"""
def __init__(self, config_file: str):
self.config_file = config_file
self.config = configparser.ConfigParser()
self.cert_manager = CertificateManager()
self.stats = {'total': 0, 'uploaded': 0, 'skipped': 0, 'failed': 0}
def load_config(self) -> bool:
"""Load configuration"""
try:
logger.info(f"Loading config: {self.config_file}")
self.config.read(self.config_file)
if 'global' not in self.config:
logger.error("Missing [global] section")
return False
required = ['source_cert_path', 'default_ssh_key']
for key in required:
if not self.config.has_option('global', key):
logger.error(f"Missing: {key}")
return False
logger.info(f"✓ Config loaded ({len(self.config.sections()) - 1} hosts)")
return True
except Exception as e:
logger.error(f"Config load failed: {e}")
return False
def get_key_path(self, section: str, cert_path: str) -> str:
"""Get private key path"""
if self.config.has_option(section, 'source_key_path'):
return self.config.get(section, 'source_key_path')
if self.config.has_option('global', 'source_key_path'):
return self.config.get('global', 'source_key_path')
return cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem')
def process_mikrotik(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process MikroTik device"""
try:
source_key_path = self.get_key_path(section, source_cert_path)
if not os.path.exists(source_key_path):
logger.error(f"Key not found: {source_key_path}")
return False
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
if not source_cert:
return False
check_first = self.config.getboolean(section, 'check_before_upload', fallback=True)
mikrotik = MikroTikManager(hostname, port, username, ssh_key)
if not mikrotik.connect():
self.stats['failed'] += 1
return False
success, was_uploaded = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert)
mikrotik.disconnect()
if success:
if was_uploaded:
self.stats['uploaded'] += 1
else:
self.stats['skipped'] += 1
logger.info("✓ MikroTik processed")
return True
else:
self.stats['failed'] += 1
return False
except Exception as e:
logger.error(f"MikroTik failed: {e}")
self.stats['failed'] += 1
return False
def process_proxmox(self, section: str, hostname: str, port: int, username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process Proxmox server"""
try:
source_key_path = self.get_key_path(section, source_cert_path)
if not os.path.exists(source_key_path):
logger.error(f"Key not found: {source_key_path}")
return False
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
if not source_cert:
return False
logger.info(f"Using cert: {source_cert_path}")
logger.info(self.cert_manager.get_certificate_info(source_cert))
check_url = self.config.get(section, 'check_url', fallback=None)
check_first = self.config.getboolean(section, 'check_before_upload', fallback=True)
proxmox = ProxmoxManager(hostname, port, username, ssh_key)
if not proxmox.connect():
self.stats['failed'] += 1
return False
success, was_uploaded = proxmox.upload_certificate(source_cert_path, source_key_path, check_first, source_cert, check_url)
proxmox.disconnect()
if success:
if was_uploaded:
self.stats['uploaded'] += 1
else:
self.stats['skipped'] += 1
logger.info("✓ Proxmox processed")
return True
else:
self.stats['failed'] += 1
return False
except Exception as e:
logger.error(f"Proxmox failed: {e}")
self.stats['failed'] += 1
return False
def process_host(self, section: str) -> bool:
"""Process standard host"""
try:
logger.info(f"\n{'='*60}\nProcessing: {section}\n{'='*60}")
self.stats['total'] += 1
hostname = self.config.get(section, 'hostname')
port = self.config.getint(section, 'port', fallback=22)
username = self.config.get(section, 'username', fallback='root')
device_type = self.config.get(section, 'type', fallback='standard')
ssh_key = self.config.get(section, 'ssh_key_path', fallback=None) or self.config.get('global', 'default_ssh_key')
source_cert_path = self.config.get(section, 'source_cert_path', fallback=None) or self.config.get('global', 'source_cert_path')
if not os.path.exists(source_cert_path):
logger.error(f"Certificate not found: {source_cert_path}")
self.stats['failed'] += 1
return False
logger.info(f"Host: {hostname}:{port} ({device_type})")
logger.info(f"User: {username}")
# Route to specialized handlers
if device_type.lower() == 'mikrotik':
return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path)
elif device_type.lower() == 'proxmox':
return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path)
# Standard host processing
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
check_first = self.config.getboolean(section, 'check_before_upload', fallback=True)
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
if not source_cert:
self.stats['failed'] += 1
return False
ssh = SSHManager(hostname, port, username, ssh_key)
if not ssh.connect():
self.stats['failed'] += 1
return False
# Check if upload needed
upload_needed = True
if check_first:
# Try SSH check first
if not ssh.check_remote_certificate(remote_cert_path, source_cert):
upload_needed = False
# Try URL check if SSH check failed
elif check_url:
logger.info(f"SSH check failed. Trying URL: {check_url}")
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if remote_cert and self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info("✓ Certificate up to date via URL. Skipping.")
upload_needed = False
if not upload_needed:
ssh.disconnect()
self.stats['skipped'] += 1
return True
# Upload certificate
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect()
self.stats['failed'] += 1
return False
# Upload key if specified
if self.config.has_option(section, 'remote_key_path'):
remote_key_path = self.config.get(section, 'remote_key_path')
source_key_path = self.get_key_path(section, source_cert_path)
if os.path.exists(source_key_path):
ssh.upload_file(source_key_path, remote_key_path)
# Additional files
if self.config.has_option(section, 'additional_files'):
for file_pair in self.config.get(section, 'additional_files').split(','):
if ':' in file_pair:
local, remote = file_pair.strip().split(':', 1)
ssh.upload_file(local, remote)
# Post-upload command
if post_upload_command:
logger.info("Executing post-upload command")
ssh.execute_command(post_upload_command)
ssh.disconnect()
self.stats['uploaded'] += 1
logger.info("✓ Host processed")
return True
except Exception as e:
logger.error(f"Failed: {e}")
self.stats['failed'] += 1
return False
def run(self):
"""Main execution"""
logger.info("="*60)
logger.info(" CertPusher - SSL Certificate Distribution")
logger.info("="*60)
logger.info(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Log: {LOG_FILE}\n")
if not self.load_config():
sys.exit(1)
source_cert = self.config.get('global', 'source_cert_path')
if not os.path.exists(source_cert):
logger.error(f"Source certificate not found: {source_cert}")
sys.exit(1)
logger.info(f"Global certificate: {source_cert}\n")
for section in self.config.sections():
if section == 'global':
continue
self.process_host(section)
logger.info("\n" + "="*60)
logger.info(" SUMMARY")
logger.info("="*60)
logger.info(f"Total: {self.stats['total']}")
logger.info(f"✓ Uploaded: {self.stats['uploaded']}")
logger.info(f"○ Skipped: {self.stats['skipped']}")
logger.info(f"✗ Failed: {self.stats['failed']}")
logger.info("="*60)
if self.stats['failed'] > 0:
sys.exit(1)
def main():
"""Entry point"""
parser = argparse.ArgumentParser(description='CertPusher - SSL Certificate Distribution')
parser.add_argument('config', help='Configuration file')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
parser.add_argument('-v', '--version', action='version', version='CertPusher 1.1')
args = parser.parse_args()
setup_logging(debug=args.debug)
print("""
╔═══════════════════════════════════════════════════════════╗
║ CertPusher v1.1 ║
║ Automated SSL Certificate Distribution Tool ║
╚═══════════════════════════════════════════════════════════╝
""")
if not os.path.exists(args.config):
print(f"Error: Config file '{args.config}' not found")
sys.exit(1)
try:
pusher = CertPusher(args.config)
pusher.run()
except KeyboardInterrupt:
print("\n\nInterrupted. Exiting...")
sys.exit(130)
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()