Files
certpusher/certpusher.py
Mateusz Gruszczyński 7fc8b3894c proxmox class
2025-10-26 23:41:57 +01:00

755 lines
30 KiB
Python

#!/usr/bin/env python3
"""
CertPusher - Automated SSL Certificate Distribution Tool
Distributes SSL certificates to remote servers via SSH/SCP
Supports standard Linux servers, MikroTik RouterOS, and Proxmox VE
"""
import configparser
import logging
import sys
import os
import ssl
import socket
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Optional, Tuple
import paramiko
from scp import SCPClient
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Logging configuration
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.DEBUG,
format=LOG_FORMAT,
handlers=[
logging.FileHandler(f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('CertPusher')
class CertificateManager:
"""Manages certificate comparison and validation"""
@staticmethod
def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]:
"""Load certificate from file"""
try:
with open(cert_path, 'rb') as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
logger.debug(f"Loaded certificate from {cert_path}")
logger.debug(f"Certificate subject: {cert.subject}")
logger.debug(f"Certificate expires: {cert.not_valid_after_utc}")
return cert
except Exception as e:
logger.error(f"Failed to load certificate from {cert_path}: {e}")
return None
@staticmethod
def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]:
"""Retrieve certificate from HTTPS URL"""
try:
hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0]
port = 443
if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]:
port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1])
logger.debug(f"Connecting to {hostname}:{port} to retrieve certificate")
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with socket.create_connection((hostname, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend())
logger.debug(f"Retrieved certificate from {url}")
logger.debug(f"Certificate expires: {cert.not_valid_after_utc}")
return cert
except Exception as e:
logger.warning(f"Failed to retrieve certificate from {url}: {e}")
return None
@staticmethod
def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool:
"""Compare two certificates by serial number and fingerprint"""
try:
same_serial = cert1.serial_number == cert2.serial_number
# Compare fingerprints
from cryptography.hazmat.primitives import hashes
fingerprint1 = cert1.fingerprint(hashes.SHA256())
fingerprint2 = cert2.fingerprint(hashes.SHA256())
same_fingerprint = fingerprint1 == fingerprint2
logger.debug(f"Certificate comparison - Serial match: {same_serial}, Fingerprint match: {same_fingerprint}")
return same_serial and same_fingerprint
except Exception as e:
logger.error(f"Failed to compare certificates: {e}")
return False
@staticmethod
def get_certificate_info(cert: x509.Certificate) -> str:
"""Get human-readable certificate information"""
try:
subject = cert.subject.rfc4514_string()
issuer = cert.issuer.rfc4514_string()
valid_from = cert.not_valid_before_utc
valid_to = cert.not_valid_after_utc
# Convert to naive datetime for comparison
now = datetime.now(timezone.utc)
days_left = (valid_to - now).days
return f"""
Certificate Info:
Subject: {subject}
Issuer: {issuer}
Valid From: {valid_from}
Valid To: {valid_to}
Days Until Expiry: {days_left}
"""
except Exception as e:
return f"Unable to extract certificate info: {e}"
@staticmethod
def create_combined_cert(cert_path: str, key_path: str, output_path: str) -> bool:
"""Create combined certificate file (cert + key) - used by Proxmox"""
try:
logger.debug(f"Creating combined certificate: {cert_path} + {key_path} -> {output_path}")
with open(cert_path, 'r') as cert_file:
cert_content = cert_file.read()
with open(key_path, 'r') as key_file:
key_content = key_file.read()
# Combined format: certificate + private key
combined_content = cert_content.strip() + "\n" + key_content.strip() + "\n"
with open(output_path, 'w') as combined_file:
combined_file.write(combined_content)
logger.info(f"✓ Combined certificate created at {output_path}")
return True
except Exception as e:
logger.error(f"Failed to create combined certificate: {e}")
return False
class SSHManager:
"""Manages SSH connections and file transfers"""
def __init__(self, hostname: str, port: int, username: str, key_path: str):
self.hostname = hostname
self.port = port
self.username = username
self.key_path = key_path
self.ssh_client = None
def connect(self) -> bool:
"""Establish SSH connection"""
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}")
logger.debug(f"Using SSH key: {self.key_path}")
# Try to load different key types (DSS removed in paramiko 3.0+)
private_key = None
key_types = [
('RSA', paramiko.RSAKey),
('Ed25519', paramiko.Ed25519Key),
('ECDSA', paramiko.ECDSAKey),
]
for key_name, key_class in key_types:
try:
private_key = key_class.from_private_key_file(self.key_path)
logger.debug(f"Successfully loaded {key_name} key")
break
except Exception as e:
logger.debug(f"Not a {key_name} key: {e}")
continue
if not private_key:
logger.error(f"Could not load SSH key from {self.key_path}")
return False
self.ssh_client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
pkey=private_key,
timeout=30,
banner_timeout=30,
auth_timeout=30
)
logger.info(f"✓ Successfully connected to {self.hostname}:{self.port}")
return True
except Exception as e:
logger.error(f"SSH connection failed to {self.hostname}:{self.port}: {e}")
return False
def upload_file(self, local_path: str, remote_path: str) -> bool:
"""Upload file via SCP"""
try:
logger.debug(f"Uploading {local_path} to {self.hostname}:{remote_path}")
# Ensure remote directory exists
remote_dir = os.path.dirname(remote_path)
if remote_dir:
self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True)
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(local_path, remote_path)
logger.info(f"✓ Successfully uploaded {local_path} to {self.hostname}:{remote_path}")
return True
except Exception as e:
logger.error(f"File upload failed: {e}")
return False
def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]:
"""Execute command on remote server"""
try:
logger.debug(f"Executing command on {self.hostname}: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
exit_status = stdout.channel.recv_exit_status()
stdout_text = stdout.read().decode('utf-8', errors='ignore')
stderr_text = stderr.read().decode('utf-8', errors='ignore')
if exit_status == 0:
logger.info(f"✓ Command executed successfully on {self.hostname}")
if stdout_text:
logger.debug(f"STDOUT: {stdout_text}")
else:
if not ignore_error:
logger.error(f"Command failed with exit code {exit_status}")
logger.error(f"STDERR: {stderr_text}")
else:
logger.debug(f"Command failed (ignored): {stderr_text}")
return exit_status == 0, stdout_text, stderr_text
except Exception as e:
logger.error(f"Command execution failed: {e}")
return False, "", str(e)
def disconnect(self):
"""Close SSH connection"""
if self.ssh_client:
self.ssh_client.close()
logger.debug(f"Disconnected from {self.hostname}")
class MikroTikManager(SSHManager):
"""Specialized manager for MikroTik RouterOS devices"""
def __init__(self, hostname: str, port: int, username: str, key_path: str):
super().__init__(hostname, port, username, key_path)
self.cert_name = "ssl-cert"
self.key_name = "ssl-key"
def upload_certificate(self, cert_path: str, key_path: str = None) -> bool:
"""
Upload and import certificate to MikroTik RouterOS
Args:
cert_path: Path to certificate file (PEM format, can be fullchain)
key_path: Optional path to private key file
"""
try:
logger.info(f"Starting MikroTik certificate deployment to {self.hostname}")
# Step 1: Disable www-ssl service
logger.debug("Disabling www-ssl service")
self.execute_command('/ip service disable www-ssl', ignore_error=True)
# Step 2: Remove old certificates
logger.debug("Removing old certificates")
cleanup_commands = [
f'/certificate remove [find name~"{self.cert_name}"]',
f'/file remove "{self.cert_name}.pem"',
]
if key_path:
cleanup_commands.append(f'/file remove "{self.key_name}.pem"')
for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True)
# Step 3: Upload certificate file
logger.debug(f"Uploading certificate to MikroTik")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(cert_path, f'{self.cert_name}.pem')
logger.info(f"✓ Certificate file uploaded")
# Step 4: Upload private key if provided
if key_path:
logger.debug(f"Uploading private key to MikroTik")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(key_path, f'{self.key_name}.pem')
logger.info(f"✓ Private key file uploaded")
# Step 5: Import certificate
logger.debug("Importing certificate into RouterOS")
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
if not success and "failure" in stderr.lower():
logger.error(f"Certificate import failed: {stderr}")
return False
# Give RouterOS time to process
import time
time.sleep(2)
# Step 6: Verify certificate was imported
success, stdout, stderr = self.execute_command(
f'/certificate print where name~"{self.cert_name}"'
)
if success and stdout:
logger.debug(f"Certificates after import:\n{stdout}")
# Step 7: Configure services to use new certificate
logger.info("Configuring www-ssl service to use new certificate")
config_commands = [
f'/ip service set www-ssl certificate={self.cert_name}_0',
'/ip service enable www-ssl',
]
for cmd in config_commands:
success, stdout, stderr = self.execute_command(cmd, ignore_error=True)
logger.info(f"✓ Successfully deployed certificate to MikroTik {self.hostname}")
return True
except Exception as e:
logger.error(f"MikroTik certificate deployment failed: {e}", exc_info=True)
return False
def verify_certificate(self) -> bool:
"""Verify certificate is properly installed"""
try:
logger.debug("Verifying certificate installation")
success, stdout, stderr = self.execute_command(
'/certificate print detail where name~"ssl-cert"'
)
if success and stdout:
logger.info(f"Certificate verification:\n{stdout}")
return True
logger.warning("Could not verify certificate installation")
return False
except Exception as e:
logger.error(f"Certificate verification failed: {e}")
return False
class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers"""
def upload_certificate(self, cert_path: str, key_path: str) -> bool:
"""
Upload certificate to Proxmox VE
Proxmox uses two separate files:
- /etc/pve/local/pveproxy-ssl.pem (certificate)
- /etc/pve/local/pveproxy-ssl.key (private key)
Args:
cert_path: Path to certificate file (fullchain)
key_path: Path to private key file
"""
try:
logger.info(f"Starting Proxmox certificate deployment to {self.hostname}")
# Step 1: Upload certificate
logger.debug("Uploading certificate to Proxmox")
if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'):
return False
# Step 2: Upload private key
logger.debug("Uploading private key to Proxmox")
if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'):
return False
# Step 3: Set correct permissions
logger.debug("Setting file permissions")
commands = [
'chmod 640 /etc/pve/local/pveproxy-ssl.key',
'chown root:www-data /etc/pve/local/pveproxy-ssl.key',
]
for cmd in commands:
self.execute_command(cmd, ignore_error=False)
# Step 4: Restart pveproxy
logger.info("Restarting pveproxy service")
success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30)
if not success:
logger.error(f"Failed to restart pveproxy: {stderr}")
return False
# Step 5: Verify service is running
import time
time.sleep(3)
success, stdout, stderr = self.execute_command('systemctl is-active pveproxy')
if success and 'active' in stdout:
logger.info(f"✓ Successfully deployed certificate to Proxmox {self.hostname}")
return True
else:
logger.error("pveproxy service is not active after restart")
# Show journal logs for debugging
self.execute_command('journalctl -u pveproxy -n 20 --no-pager')
return False
except Exception as e:
logger.error(f"Proxmox certificate deployment failed: {e}", exc_info=True)
return False
class CertPusher:
"""Main application class"""
def __init__(self, config_file: str):
self.config_file = config_file
self.config = configparser.ConfigParser()
self.cert_manager = CertificateManager()
self.stats = {
'total': 0,
'uploaded': 0,
'skipped': 0,
'failed': 0
}
def load_config(self) -> bool:
"""Load configuration from INI file"""
try:
logger.info(f"Loading configuration from {self.config_file}")
self.config.read(self.config_file)
if 'global' not in self.config:
logger.error("Missing [global] section in config file")
return False
required_global = ['source_cert_path', 'default_ssh_key']
for key in required_global:
if not self.config.has_option('global', key):
logger.error(f"Missing required global option: {key}")
return False
logger.info(f"✓ Configuration loaded successfully")
logger.debug(f"Found {len(self.config.sections()) - 1} host(s) in configuration")
return True
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
return False
def process_mikrotik(self, section: str, hostname: str, port: int,
username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process MikroTik device specifically"""
try:
logger.info("Using MikroTik-specific deployment method")
# Get optional private key path
source_key_path = self.config.get(section, 'source_key_path', fallback=None)
if source_key_path:
logger.info(f"Private key: {source_key_path}")
if not os.path.exists(source_key_path):
logger.error(f"Private key file not found: {source_key_path}")
return False
# Connect
mikrotik = MikroTikManager(hostname, port, username, ssh_key)
if not mikrotik.connect():
self.stats['failed'] += 1
return False
# Upload and import certificate
if not mikrotik.upload_certificate(source_cert_path, source_key_path):
mikrotik.disconnect()
self.stats['failed'] += 1
return False
# Verify installation
mikrotik.verify_certificate()
mikrotik.disconnect()
self.stats['uploaded'] += 1
logger.info(f"✓ Successfully processed MikroTik {section}")
return True
except Exception as e:
logger.error(f"MikroTik processing failed: {e}", exc_info=True)
self.stats['failed'] += 1
return False
def process_proxmox(self, section: str, hostname: str, port: int,
username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process Proxmox VE server specifically"""
try:
logger.info("Using Proxmox-specific deployment method")
# Get private key path
if self.config.has_option(section, 'source_key_path'):
source_key_path = self.config.get(section, 'source_key_path')
else:
# Try to derive from cert path
source_key_path = source_cert_path.replace('fullchain.pem', 'privkey.pem')
logger.info(f"Certificate: {source_cert_path}")
logger.info(f"Private key: {source_key_path}")
if not os.path.exists(source_key_path):
logger.error(f"Private key file not found: {source_key_path}")
return False
# Connect
proxmox = ProxmoxManager(hostname, port, username, ssh_key)
if not proxmox.connect():
self.stats['failed'] += 1
return False
# Upload certificate
if not proxmox.upload_certificate(source_cert_path, source_key_path):
proxmox.disconnect()
self.stats['failed'] += 1
return False
proxmox.disconnect()
self.stats['uploaded'] += 1
logger.info(f"✓ Successfully processed Proxmox {section}")
return True
except Exception as e:
logger.error(f"Proxmox processing failed: {e}", exc_info=True)
self.stats['failed'] += 1
return False
def process_host(self, section: str) -> bool:
"""Process certificate deployment for a single host"""
try:
logger.info(f"\n{'='*60}")
logger.info(f"Processing host: {section}")
logger.info(f"{'='*60}")
self.stats['total'] += 1
# Get configuration
hostname = self.config.get(section, 'hostname')
port = self.config.getint(section, 'port', fallback=22)
username = self.config.get(section, 'username', fallback='root')
device_type = self.config.get(section, 'type', fallback='standard')
# Determine SSH key to use
if self.config.has_option(section, 'ssh_key_path'):
ssh_key = self.config.get(section, 'ssh_key_path')
else:
ssh_key = self.config.get('global', 'default_ssh_key')
# Allow per-host certificate override
if self.config.has_option(section, 'source_cert_path'):
source_cert_path = self.config.get(section, 'source_cert_path')
logger.info(f"Using host-specific certificate: {source_cert_path}")
else:
source_cert_path = self.config.get('global', 'source_cert_path')
logger.debug(f"Using global certificate: {source_cert_path}")
# Verify certificate exists
if not os.path.exists(source_cert_path):
logger.error(f"Certificate file not found: {source_cert_path}")
self.stats['failed'] += 1
return False
logger.info(f"Host: {hostname}:{port}")
logger.info(f"Type: {device_type}")
logger.info(f"Username: {username}")
logger.info(f"SSH Key: {ssh_key}")
# Handle device-specific deployments
if device_type.lower() == 'mikrotik':
return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path)
elif device_type.lower() == 'proxmox':
return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path)
# Standard processing for other devices
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
logger.info(f"Remote path: {remote_cert_path}")
# Check if upload is needed
if check_url:
logger.info(f"Checking current certificate at: {check_url}")
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if source_cert and remote_cert:
if self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info(f"✓ Certificate on {hostname} is already up to date. Skipping upload.")
self.stats['skipped'] += 1
return True
else:
logger.info(f"Certificate on {hostname} is outdated. Upload needed.")
logger.debug(self.cert_manager.get_certificate_info(source_cert))
else:
logger.warning(f"Could not compare certificates. Proceeding with upload.")
# Connect and upload
ssh = SSHManager(hostname, port, username, ssh_key)
if not ssh.connect():
self.stats['failed'] += 1
return False
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect()
self.stats['failed'] += 1
return False
# Upload additional files if specified
if self.config.has_option(section, 'additional_files'):
additional_files = self.config.get(section, 'additional_files')
# Format: local_path:remote_path,local_path:remote_path
for file_pair in additional_files.split(','):
if ':' in file_pair:
local, remote = file_pair.strip().split(':', 1)
logger.info(f"Uploading additional file: {local} -> {remote}")
if not ssh.upload_file(local, remote):
logger.warning(f"Failed to upload additional file: {local}")
# Execute post-upload command
if post_upload_command:
logger.info(f"Executing post-upload command: {post_upload_command}")
success, stdout, stderr = ssh.execute_command(post_upload_command)
if not success:
logger.warning(f"Post-upload command failed, but file was uploaded successfully")
else:
logger.info(f"✓ Post-upload command completed successfully")
ssh.disconnect()
self.stats['uploaded'] += 1
logger.info(f"✓ Successfully processed {section}")
return True
except Exception as e:
logger.error(f"Failed to process host {section}: {e}", exc_info=True)
self.stats['failed'] += 1
return False
def run(self):
"""Main execution method"""
logger.info("="*60)
logger.info(" CertPusher - SSL Certificate Distribution Tool")
logger.info("="*60)
logger.info(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("")
if not self.load_config():
logger.error("Configuration loading failed. Exiting.")
sys.exit(1)
# Verify source certificate exists
source_cert = self.config.get('global', 'source_cert_path')
if not os.path.exists(source_cert):
logger.error(f"Source certificate not found: {source_cert}")
sys.exit(1)
logger.info(f"Source certificate: {source_cert}")
# Display certificate info
cert = self.cert_manager.get_cert_from_file(source_cert)
if cert:
logger.info(self.cert_manager.get_certificate_info(cert))
# Process each host
for section in self.config.sections():
if section == 'global':
continue
try:
self.process_host(section)
except Exception as e:
logger.error(f"Unexpected error processing {section}: {e}", exc_info=True)
self.stats['failed'] += 1
# Print summary
logger.info("\n" + "="*60)
logger.info(" DEPLOYMENT SUMMARY")
logger.info("="*60)
logger.info(f"Total hosts: {self.stats['total']}")
logger.info(f"✓ Uploaded: {self.stats['uploaded']}")
logger.info(f"○ Skipped (up to date): {self.stats['skipped']}")
logger.info(f"✗ Failed: {self.stats['failed']}")
logger.info("="*60)
logger.info(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['failed'] > 0:
sys.exit(1)
def main():
"""Entry point"""
print("""
╔═══════════════════════════════════════════════════════════╗
║ CertPusher v1.0 ║
║ Automated SSL Certificate Distribution Tool ║
╚═══════════════════════════════════════════════════════════╝
""")
if len(sys.argv) < 2:
print("Usage: python certpusher.py <config_file>")
print("Example: python certpusher.py config.ini")
print("")
sys.exit(1)
config_file = sys.argv[1]
if not os.path.exists(config_file):
print(f"Error: Configuration file '{config_file}' not found")
sys.exit(1)
try:
pusher = CertPusher(config_file)
pusher.run()
except KeyboardInterrupt:
print("\n\nInterrupted by user. Exiting...")
sys.exit(130)
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()