This commit is contained in:
Mateusz Gruszczyński
2025-10-27 07:18:12 +01:00
parent 5a37e451a5
commit 92f9e1edd1
2 changed files with 215 additions and 319 deletions

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ config.ini
venv/ venv/
*.swp *.swp
.DS_Store .DS_Store
logs/*.log

View File

@@ -12,6 +12,7 @@ import os
import ssl import ssl
import socket import socket
import re import re
import argparse
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Dict, Optional, Tuple from typing import Dict, Optional, Tuple
@@ -21,16 +22,31 @@ import requests
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
# Create logs directory if it doesn't exist
LOG_DIR = Path(__file__).parent / 'logs'
LOG_DIR.mkdir(exist_ok=True)
# Logging configuration # Logging configuration
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig( LOG_FILE = LOG_DIR / f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
level=logging.DEBUG,
format=LOG_FORMAT, def setup_logging(debug: bool = False):
handlers=[ """Setup logging with configurable level"""
logging.FileHandler(f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'), log_level = logging.DEBUG if debug else logging.INFO
logging.StreamHandler(sys.stdout)
] logging.basicConfig(
) level=log_level,
format=LOG_FORMAT,
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
# Reduce paramiko logging noise
logging.getLogger('paramiko').setLevel(logging.WARNING)
logging.getLogger('paramiko.transport').setLevel(logging.WARNING)
logger = logging.getLogger('CertPusher') logger = logging.getLogger('CertPusher')
@@ -45,8 +61,6 @@ class CertificateManager:
cert_data = f.read() cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend()) cert = x509.load_pem_x509_certificate(cert_data, default_backend())
logger.debug(f"Loaded certificate from {cert_path}") logger.debug(f"Loaded certificate from {cert_path}")
logger.debug(f"Certificate subject: {cert.subject}")
logger.debug(f"Certificate expires: {cert.not_valid_after_utc}")
return cert return cert
except Exception as e: except Exception as e:
logger.error(f"Failed to load certificate from {cert_path}: {e}") logger.error(f"Failed to load certificate from {cert_path}: {e}")
@@ -62,7 +76,7 @@ class CertificateManager:
if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]: if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]:
port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1]) port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1])
logger.debug(f"Connecting to {hostname}:{port} to retrieve certificate") logger.debug(f"Checking certificate at {hostname}:{port}")
context = ssl.create_default_context() context = ssl.create_default_context()
context.check_hostname = False context.check_hostname = False
@@ -72,8 +86,6 @@ class CertificateManager:
with context.wrap_socket(sock, server_hostname=hostname) as ssock: with context.wrap_socket(sock, server_hostname=hostname) as ssock:
der_cert = ssock.getpeercert(binary_form=True) der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend()) cert = x509.load_der_x509_certificate(der_cert, default_backend())
logger.debug(f"Retrieved certificate from {url}")
logger.debug(f"Certificate expires: {cert.not_valid_after_utc}")
return cert return cert
except Exception as e: except Exception as e:
logger.warning(f"Failed to retrieve certificate from {url}: {e}") logger.warning(f"Failed to retrieve certificate from {url}: {e}")
@@ -85,13 +97,11 @@ class CertificateManager:
try: try:
same_serial = cert1.serial_number == cert2.serial_number same_serial = cert1.serial_number == cert2.serial_number
# Compare fingerprints
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
fingerprint1 = cert1.fingerprint(hashes.SHA256()) fingerprint1 = cert1.fingerprint(hashes.SHA256())
fingerprint2 = cert2.fingerprint(hashes.SHA256()) fingerprint2 = cert2.fingerprint(hashes.SHA256())
same_fingerprint = fingerprint1 == fingerprint2 same_fingerprint = fingerprint1 == fingerprint2
logger.debug(f"Certificate comparison - Serial match: {same_serial}, Fingerprint match: {same_fingerprint}")
return same_serial and same_fingerprint return same_serial and same_fingerprint
except Exception as e: except Exception as e:
logger.error(f"Failed to compare certificates: {e}") logger.error(f"Failed to compare certificates: {e}")
@@ -106,7 +116,6 @@ class CertificateManager:
valid_from = cert.not_valid_before_utc valid_from = cert.not_valid_before_utc
valid_to = cert.not_valid_after_utc valid_to = cert.not_valid_after_utc
# Convert to naive datetime for comparison
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
days_left = (valid_to - now).days days_left = (valid_to - now).days
@@ -139,9 +148,7 @@ class SSHManager:
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}") logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}")
logger.debug(f"Using SSH key: {self.key_path}")
# Try to load different key types (DSS removed in paramiko 3.0+)
private_key = None private_key = None
key_types = [ key_types = [
('RSA', paramiko.RSAKey), ('RSA', paramiko.RSAKey),
@@ -152,10 +159,9 @@ class SSHManager:
for key_name, key_class in key_types: for key_name, key_class in key_types:
try: try:
private_key = key_class.from_private_key_file(self.key_path) private_key = key_class.from_private_key_file(self.key_path)
logger.debug(f"Successfully loaded {key_name} key") logger.debug(f"Loaded {key_name} key")
break break
except Exception as e: except Exception:
logger.debug(f"Not a {key_name} key: {e}")
continue continue
if not private_key: if not private_key:
@@ -172,7 +178,7 @@ class SSHManager:
auth_timeout=30 auth_timeout=30
) )
logger.info(f"Successfully connected to {self.hostname}:{self.port}") logger.info(f"Connected to {self.hostname}:{self.port}")
return True return True
except Exception as e: except Exception as e:
@@ -182,9 +188,8 @@ class SSHManager:
def upload_file(self, local_path: str, remote_path: str) -> bool: def upload_file(self, local_path: str, remote_path: str) -> bool:
"""Upload file via SCP""" """Upload file via SCP"""
try: try:
logger.debug(f"Uploading {local_path} to {self.hostname}:{remote_path}") logger.info(f"Uploading to {self.hostname}:{remote_path}")
# Ensure remote directory exists
remote_dir = os.path.dirname(remote_path) remote_dir = os.path.dirname(remote_path)
if remote_dir: if remote_dir:
self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True) self.execute_command(f"mkdir -p {remote_dir}", ignore_error=True)
@@ -192,17 +197,17 @@ class SSHManager:
with SCPClient(self.ssh_client.get_transport()) as scp: with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(local_path, remote_path) scp.put(local_path, remote_path)
logger.info(f"Successfully uploaded {local_path} to {self.hostname}:{remote_path}") logger.info(f"Upload successful")
return True return True
except Exception as e: except Exception as e:
logger.error(f"File upload failed: {e}") logger.error(f"Upload failed: {e}")
return False return False
def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]: def execute_command(self, command: str, timeout: int = 60, ignore_error: bool = False) -> Tuple[bool, str, str]:
"""Execute command on remote server""" """Execute command on remote server"""
try: try:
logger.debug(f"Executing command on {self.hostname}: {command}") logger.debug(f"Executing: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout) stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=timeout)
exit_status = stdout.channel.recv_exit_status() exit_status = stdout.channel.recv_exit_status()
@@ -211,15 +216,12 @@ class SSHManager:
stderr_text = stderr.read().decode('utf-8', errors='ignore') stderr_text = stderr.read().decode('utf-8', errors='ignore')
if exit_status == 0: if exit_status == 0:
logger.info(f"Command executed successfully on {self.hostname}") logger.debug(f"Command completed successfully")
if stdout_text:
logger.debug(f"STDOUT: {stdout_text}")
else: else:
if not ignore_error: if not ignore_error:
logger.error(f"Command failed with exit code {exit_status}") logger.error(f"Command failed with exit code {exit_status}")
logger.error(f"STDERR: {stderr_text}") if stderr_text:
else: logger.error(f"Error: {stderr_text}")
logger.debug(f"Command failed (ignored): {stderr_text}")
return exit_status == 0, stdout_text, stderr_text return exit_status == 0, stdout_text, stderr_text
@@ -248,85 +250,58 @@ class MikroTikManager(SSHManager):
Returns True if upload needed, False if current cert is OK Returns True if upload needed, False if current cert is OK
""" """
try: try:
logger.info("Checking certificate on MikroTik") logger.info("Checking MikroTik certificate")
# Get certificate details from MikroTik
success, stdout, stderr = self.execute_command( success, stdout, stderr = self.execute_command(
f'/certificate print detail where name~"{self.cert_name}"', f'/certificate print detail where name~"{self.cert_name}"',
ignore_error=True ignore_error=True
) )
if not success or not stdout: if not success or not stdout:
logger.info("No certificate found on MikroTik or cannot read it. Upload needed.") logger.info("No certificate found on MikroTik. Upload needed.")
return True return True
logger.debug(f"MikroTik certificate info:\n{stdout}")
# Parse expiry date from output
# Looking for "invalid-after" field
invalid_after_match = re.search(r'invalid-after:\s+([a-zA-Z]{3}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})', stdout) invalid_after_match = re.search(r'invalid-after:\s+([a-zA-Z]{3}/\d{2}/\d{4}\s+\d{2}:\d{2}:\d{2})', stdout)
if not invalid_after_match: if not invalid_after_match:
logger.warning("Could not parse certificate expiry date from MikroTik. Proceeding with upload.") logger.warning("Could not parse certificate expiry. Proceeding with upload.")
return True return True
mikrotik_expiry_str = invalid_after_match.group(1) mikrotik_expiry_str = invalid_after_match.group(1)
logger.debug(f"MikroTik certificate expires: {mikrotik_expiry_str}")
# Parse MikroTik date format (e.g., "jan/24/2026 08:34:12")
try: try:
mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%b/%d/%Y %H:%M:%S') mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%b/%d/%Y %H:%M:%S')
mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc)
except Exception as e: except Exception as e:
logger.warning(f"Could not parse MikroTik date: {e}. Proceeding with upload.") logger.warning(f"Could not parse date: {e}. Proceeding with upload.")
return True return True
# Compare with source certificate
source_expiry = source_cert.not_valid_after_utc source_expiry = source_cert.not_valid_after_utc
# Also check fingerprint/serial if available
fingerprint_match = re.search(r'fingerprint:\s+([a-f0-9]+)', stdout)
if fingerprint_match:
mikrotik_fingerprint = fingerprint_match.group(1)
logger.debug(f"MikroTik certificate fingerprint: {mikrotik_fingerprint}")
# Compare expiry dates (allowing 1 day tolerance for timezone differences)
time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) time_diff = abs((source_expiry - mikrotik_expiry).total_seconds())
if time_diff < 86400: # Less than 24 hours difference if time_diff < 86400:
logger.info("Certificate on MikroTik is current. Skipping upload.") logger.info("MikroTik certificate is current. Skipping upload.")
return False return False
else: else:
logger.info(f"Certificate on MikroTik differs. Source expires: {source_expiry}, MikroTik expires: {mikrotik_expiry}") logger.info(f"MikroTik certificate differs. Upload needed.")
return True return True
except Exception as e: except Exception as e:
logger.warning(f"Error checking MikroTik certificate: {e}. Proceeding with upload.") logger.warning(f"Error checking certificate: {e}. Proceeding with upload.")
return True return True
def upload_certificate(self, cert_path: str, key_path: str = None, check_first: bool = True, source_cert: x509.Certificate = None) -> bool: def upload_certificate(self, cert_path: str, key_path: str = None, check_first: bool = True, source_cert: x509.Certificate = None) -> bool:
""" """Upload and import certificate to MikroTik RouterOS"""
Upload and import certificate to MikroTik RouterOS
Args:
cert_path: Path to certificate file (PEM format, can be fullchain)
key_path: Optional path to private key file
check_first: If True, check existing certificate before uploading
source_cert: Source certificate object for comparison
"""
try: try:
logger.info(f"Starting MikroTik certificate deployment to {self.hostname}") logger.info(f"MikroTik certificate deployment")
# Check if upload is needed
if check_first and source_cert: if check_first and source_cert:
if not self.check_certificate_expiry(source_cert): if not self.check_certificate_expiry(source_cert):
return True # Certificate is current, no upload needed return True
# Step 1: Disable www-ssl service
logger.debug("Disabling www-ssl service") logger.debug("Disabling www-ssl service")
self.execute_command('/ip service disable www-ssl', ignore_error=True) self.execute_command('/ip service disable www-ssl', ignore_error=True)
# Step 2: Remove old certificates
logger.debug("Removing old certificates") logger.debug("Removing old certificates")
cleanup_commands = [ cleanup_commands = [
f'/certificate remove [find name~"{self.cert_name}"]', f'/certificate remove [find name~"{self.cert_name}"]',
@@ -339,21 +314,16 @@ class MikroTikManager(SSHManager):
for cmd in cleanup_commands: for cmd in cleanup_commands:
self.execute_command(cmd, ignore_error=True) self.execute_command(cmd, ignore_error=True)
# Step 3: Upload certificate file logger.info("Uploading certificate")
logger.debug(f"Uploading certificate to MikroTik")
with SCPClient(self.ssh_client.get_transport()) as scp: with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(cert_path, f'{self.cert_name}.pem') scp.put(cert_path, f'{self.cert_name}.pem')
logger.info(f"✓ Certificate file uploaded")
# Step 4: Upload private key if provided
if key_path: if key_path:
logger.debug(f"Uploading private key to MikroTik") logger.info("Uploading private key")
with SCPClient(self.ssh_client.get_transport()) as scp: with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(key_path, f'{self.key_name}.pem') scp.put(key_path, f'{self.key_name}.pem')
logger.info(f"✓ Private key file uploaded")
# Step 5: Import certificate logger.info("Importing certificate")
logger.debug("Importing certificate into RouterOS")
import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""'
success, stdout, stderr = self.execute_command(import_cmd, timeout=30) success, stdout, stderr = self.execute_command(import_cmd, timeout=30)
@@ -361,49 +331,36 @@ class MikroTikManager(SSHManager):
logger.error(f"Certificate import failed: {stderr}") logger.error(f"Certificate import failed: {stderr}")
return False return False
# Give RouterOS time to process
import time import time
time.sleep(2) time.sleep(2)
# Step 6: Verify certificate was imported logger.info("Configuring www-ssl service")
success, stdout, stderr = self.execute_command(
f'/certificate print where name~"{self.cert_name}"'
)
if success and stdout:
logger.debug(f"Certificates after import:\n{stdout}")
# Step 7: Configure services to use new certificate
logger.info("Configuring www-ssl service to use new certificate")
config_commands = [ config_commands = [
f'/ip service set www-ssl certificate={self.cert_name}_0', f'/ip service set www-ssl certificate={self.cert_name}_0',
'/ip service enable www-ssl', '/ip service enable www-ssl',
] ]
for cmd in config_commands: for cmd in config_commands:
success, stdout, stderr = self.execute_command(cmd, ignore_error=True) self.execute_command(cmd, ignore_error=True)
logger.info(f"Successfully deployed certificate to MikroTik {self.hostname}") logger.info(f"MikroTik deployment successful")
return True return True
except Exception as e: except Exception as e:
logger.error(f"MikroTik certificate deployment failed: {e}", exc_info=True) logger.error(f"MikroTik deployment failed: {e}")
return False return False
def verify_certificate(self) -> bool: def verify_certificate(self) -> bool:
"""Verify certificate is properly installed""" """Verify certificate is properly installed"""
try: try:
logger.debug("Verifying certificate installation")
success, stdout, stderr = self.execute_command( success, stdout, stderr = self.execute_command(
'/certificate print detail where name~"ssl-cert"' '/certificate print detail where name~"ssl-cert"'
) )
if success and stdout: if success and stdout:
logger.info(f"Certificate verification:\n{stdout}") logger.debug(f"Certificate verified")
return True return True
logger.warning("Could not verify certificate installation")
return False return False
except Exception as e: except Exception as e:
@@ -415,32 +372,19 @@ class ProxmoxManager(SSHManager):
"""Specialized manager for Proxmox VE servers""" """Specialized manager for Proxmox VE servers"""
def upload_certificate(self, cert_path: str, key_path: str) -> bool: def upload_certificate(self, cert_path: str, key_path: str) -> bool:
""" """Upload certificate to Proxmox VE"""
Upload certificate to Proxmox VE
Proxmox uses two separate files:
- /etc/pve/local/pveproxy-ssl.pem (certificate)
- /etc/pve/local/pveproxy-ssl.key (private key)
Args:
cert_path: Path to certificate file (fullchain)
key_path: Path to private key file
"""
try: try:
logger.info(f"Starting Proxmox certificate deployment to {self.hostname}") logger.info(f"Proxmox certificate deployment")
# Step 1: Upload certificate logger.info("Uploading certificate")
logger.debug("Uploading certificate to Proxmox")
if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'): if not self.upload_file(cert_path, '/etc/pve/local/pveproxy-ssl.pem'):
return False return False
# Step 2: Upload private key logger.info("Uploading private key")
logger.debug("Uploading private key to Proxmox")
if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'): if not self.upload_file(key_path, '/etc/pve/local/pveproxy-ssl.key'):
return False return False
# Step 3: Set correct permissions logger.debug("Setting permissions")
logger.debug("Setting file permissions")
commands = [ commands = [
'chmod 640 /etc/pve/local/pveproxy-ssl.key', 'chmod 640 /etc/pve/local/pveproxy-ssl.key',
'chown root:www-data /etc/pve/local/pveproxy-ssl.key', 'chown root:www-data /etc/pve/local/pveproxy-ssl.key',
@@ -449,30 +393,26 @@ class ProxmoxManager(SSHManager):
for cmd in commands: for cmd in commands:
self.execute_command(cmd, ignore_error=False) self.execute_command(cmd, ignore_error=False)
# Step 4: Restart pveproxy logger.info("Restarting pveproxy")
logger.info("Restarting pveproxy service")
success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30) success, stdout, stderr = self.execute_command('systemctl restart pveproxy', timeout=30)
if not success: if not success:
logger.error(f"Failed to restart pveproxy: {stderr}") logger.error(f"Failed to restart pveproxy")
return False return False
# Step 5: Verify service is running
import time import time
time.sleep(3) time.sleep(3)
success, stdout, stderr = self.execute_command('systemctl is-active pveproxy') success, stdout, stderr = self.execute_command('systemctl is-active pveproxy')
if success and 'active' in stdout: if success and 'active' in stdout:
logger.info(f"Successfully deployed certificate to Proxmox {self.hostname}") logger.info(f"Proxmox deployment successful")
return True return True
else: else:
logger.error("pveproxy service is not active after restart") logger.error("pveproxy service is not active")
# Show journal logs for debugging
self.execute_command('journalctl -u pveproxy -n 20 --no-pager')
return False return False
except Exception as e: except Exception as e:
logger.error(f"Proxmox certificate deployment failed: {e}", exc_info=True) logger.error(f"Proxmox deployment failed: {e}")
return False return False
@@ -506,8 +446,8 @@ class CertPusher:
logger.error(f"Missing required global option: {key}") logger.error(f"Missing required global option: {key}")
return False return False
logger.info(f"✓ Configuration loaded successfully") logger.info(f"✓ Configuration loaded")
logger.debug(f"Found {len(self.config.sections()) - 1} host(s) in configuration") logger.info(f"Found {len(self.config.sections()) - 1} host(s)")
return True return True
except Exception as e: except Exception as e:
@@ -515,80 +455,52 @@ class CertPusher:
return False return False
def get_key_path(self, section: str, cert_path: str) -> str: def get_key_path(self, section: str, cert_path: str) -> str:
""" """Get private key path for certificate"""
Get private key path for certificate
Priority: section-specific > global > derived from cert path
"""
# Check section-specific key
if self.config.has_option(section, 'source_key_path'): if self.config.has_option(section, 'source_key_path'):
return self.config.get(section, 'source_key_path') return self.config.get(section, 'source_key_path')
# Check global key
if self.config.has_option('global', 'source_key_path'): if self.config.has_option('global', 'source_key_path'):
return self.config.get('global', 'source_key_path') return self.config.get('global', 'source_key_path')
# Derive from certificate path
key_path = cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem') key_path = cert_path.replace('fullchain.pem', 'privkey.pem').replace('cert.pem', 'privkey.pem')
logger.debug(f"Derived key path: {key_path}")
return key_path return key_path
def process_mikrotik(self, section: str, hostname: str, port: int, def process_mikrotik(self, section: str, hostname: str, port: int,
username: str, ssh_key: str, source_cert_path: str) -> bool: username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process MikroTik device specifically""" """Process MikroTik device specifically"""
try: try:
logger.info("Using MikroTik-specific deployment method") logger.info("Using MikroTik deployment method")
# Get private key path
source_key_path = self.get_key_path(section, source_cert_path) source_key_path = self.get_key_path(section, source_cert_path)
logger.info(f"Certificate: {source_cert_path}")
logger.info(f"Private key: {source_key_path}")
if not os.path.exists(source_key_path): if not os.path.exists(source_key_path):
logger.error(f"Private key file not found: {source_key_path}") logger.error(f"Private key not found: {source_key_path}")
return False return False
# Load source certificate for comparison
source_cert = self.cert_manager.get_cert_from_file(source_cert_path) source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
# Check if certificate check is enabled
check_first = self.config.getboolean(section, 'check_before_upload', fallback=True) check_first = self.config.getboolean(section, 'check_before_upload', fallback=True)
# Connect
mikrotik = MikroTikManager(hostname, port, username, ssh_key) mikrotik = MikroTikManager(hostname, port, username, ssh_key)
if not mikrotik.connect(): if not mikrotik.connect():
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
# Upload and import certificate (with optional check) result = mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert)
if not mikrotik.upload_certificate(source_cert_path, source_key_path, check_first, source_cert):
if not result:
mikrotik.disconnect() mikrotik.disconnect()
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
# If we got here and check was enabled, certificate was actually uploaded
if check_first and source_cert:
# Check if it was actually uploaded or skipped
success, stdout, stderr = mikrotik.execute_command(
f'/certificate print where name~"{mikrotik.cert_name}"'
)
if success and stdout and mikrotik.cert_name in stdout:
self.stats['uploaded'] += 1
else:
self.stats['skipped'] += 1
else:
self.stats['uploaded'] += 1
# Verify installation
mikrotik.verify_certificate() mikrotik.verify_certificate()
mikrotik.disconnect() mikrotik.disconnect()
logger.info(f"✓ Successfully processed MikroTik {section}") self.stats['uploaded'] += 1
logger.info(f"✓ MikroTik processed successfully")
return True return True
except Exception as e: except Exception as e:
logger.error(f"MikroTik processing failed: {e}", exc_info=True) logger.error(f"MikroTik processing failed: {e}")
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
@@ -596,26 +508,20 @@ class CertPusher:
username: str, ssh_key: str, source_cert_path: str) -> bool: username: str, ssh_key: str, source_cert_path: str) -> bool:
"""Process Proxmox VE server specifically""" """Process Proxmox VE server specifically"""
try: try:
logger.info("Using Proxmox-specific deployment method") logger.info("Using Proxmox deployment method")
# Get private key path
source_key_path = self.get_key_path(section, source_cert_path) source_key_path = self.get_key_path(section, source_cert_path)
logger.info(f"Certificate: {source_cert_path}")
logger.info(f"Private key: {source_key_path}")
if not os.path.exists(source_key_path): if not os.path.exists(source_key_path):
logger.error(f"Private key file not found: {source_key_path}") logger.error(f"Private key not found: {source_key_path}")
return False return False
# Connect
proxmox = ProxmoxManager(hostname, port, username, ssh_key) proxmox = ProxmoxManager(hostname, port, username, ssh_key)
if not proxmox.connect(): if not proxmox.connect():
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
# Upload certificate
if not proxmox.upload_certificate(source_cert_path, source_key_path): if not proxmox.upload_certificate(source_cert_path, source_key_path):
proxmox.disconnect() proxmox.disconnect()
self.stats['failed'] += 1 self.stats['failed'] += 1
@@ -623,159 +529,139 @@ class CertPusher:
proxmox.disconnect() proxmox.disconnect()
self.stats['uploaded'] += 1 self.stats['uploaded'] += 1
logger.info(f"Successfully processed Proxmox {section}") logger.info(f"Proxmox processed successfully")
return True return True
except Exception as e: except Exception as e:
logger.error(f"Proxmox processing failed: {e}", exc_info=True) logger.error(f"Proxmox processing failed: {e}")
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
def process_host(self, section: str) -> bool: def process_host(self, section: str) -> bool:
"""Process certificate deployment for a single host""" """Process certificate deployment for a single host"""
try: try:
logger.info(f"\n{'='*60}") logger.info(f"\n{'='*60}")
logger.info(f"Processing host: {section}") logger.info(f"Processing: {section}")
logger.info(f"{'='*60}") logger.info(f"{'='*60}")
self.stats['total'] += 1
# Get configuration
hostname = self.config.get(section, 'hostname')
port = self.config.getint(section, 'port', fallback=22)
username = self.config.get(section, 'username', fallback='root')
device_type = self.config.get(section, 'type', fallback='standard')
# Determine SSH key to use
if self.config.has_option(section, 'ssh_key_path'):
ssh_key = self.config.get(section, 'ssh_key_path')
else:
ssh_key = self.config.get('global', 'default_ssh_key')
# Allow per-host certificate override
if self.config.has_option(section, 'source_cert_path'):
source_cert_path = self.config.get(section, 'source_cert_path')
logger.info(f"Using host-specific certificate: {source_cert_path}")
else:
source_cert_path = self.config.get('global', 'source_cert_path')
logger.debug(f"Using global certificate: {source_cert_path}")
# Verify certificate exists
if not os.path.exists(source_cert_path):
logger.error(f"Certificate file not found: {source_cert_path}")
self.stats['failed'] += 1
return False
logger.info(f"Host: {hostname}:{port}")
logger.info(f"Type: {device_type}")
logger.info(f"Username: {username}")
logger.info(f"SSH Key: {ssh_key}")
# Handle device-specific deployments
if device_type.lower() == 'mikrotik':
return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path)
elif device_type.lower() == 'proxmox':
return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path)
# Standard processing for other devices
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
logger.info(f"Remote certificate path: {remote_cert_path}")
# Check if upload is needed
if check_url:
logger.info(f"Checking current certificate at: {check_url}")
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if source_cert and remote_cert: self.stats['total'] += 1
if self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info(f"✓ Certificate on {hostname} is already up to date. Skipping upload.") hostname = self.config.get(section, 'hostname')
self.stats['skipped'] += 1 port = self.config.getint(section, 'port', fallback=22)
return True username = self.config.get(section, 'username', fallback='root')
else: device_type = self.config.get(section, 'type', fallback='standard')
logger.info(f"Certificate on {hostname} is outdated. Upload needed.")
logger.debug(self.cert_manager.get_certificate_info(source_cert)) if self.config.has_option(section, 'ssh_key_path'):
ssh_key = self.config.get(section, 'ssh_key_path')
else: else:
logger.warning(f"Could not compare certificates. Proceeding with upload.") ssh_key = self.config.get('global', 'default_ssh_key')
# Connect and upload
ssh = SSHManager(hostname, port, username, ssh_key)
if not ssh.connect():
self.stats['failed'] += 1
return False
# Upload certificate
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect()
self.stats['failed'] += 1
return False
# Upload private key if remote_key_path is specified
if self.config.has_option(section, 'remote_key_path'):
remote_key_path = self.config.get(section, 'remote_key_path')
source_key_path = self.get_key_path(section, source_cert_path)
logger.info(f"Remote key path: {remote_key_path}") if self.config.has_option(section, 'source_cert_path'):
logger.info(f"Uploading private key: {source_key_path} -> {remote_key_path}") source_cert_path = self.config.get(section, 'source_cert_path')
logger.info(f"Using host-specific certificate")
else:
source_cert_path = self.config.get('global', 'source_cert_path')
if not os.path.exists(source_key_path): if not os.path.exists(source_cert_path):
logger.error(f"Private key file not found: {source_key_path}") logger.error(f"Certificate not found: {source_cert_path}")
self.stats['failed'] += 1
return False
logger.info(f"Host: {hostname}:{port}")
logger.info(f"Type: {device_type}")
logger.info(f"User: {username}")
if device_type.lower() == 'mikrotik':
return self.process_mikrotik(section, hostname, port, username, ssh_key, source_cert_path)
elif device_type.lower() == 'proxmox':
return self.process_proxmox(section, hostname, port, username, ssh_key, source_cert_path)
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
if check_url:
logger.info(f"Checking certificate at {check_url}")
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if source_cert and remote_cert:
if self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info(f"✓ Certificate is up to date. Skipping.")
self.stats['skipped'] += 1
return True
else:
logger.info(f"Certificate is outdated. Uploading.")
else:
logger.warning(f"Could not compare certificates. Proceeding.")
ssh = SSHManager(hostname, port, username, ssh_key)
if not ssh.connect():
self.stats['failed'] += 1
return False
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect() ssh.disconnect()
self.stats['failed'] += 1 self.stats['failed'] += 1
return False return False
if not ssh.upload_file(source_key_path, remote_key_path): if self.config.has_option(section, 'remote_key_path'):
logger.warning(f"Failed to upload private key to {remote_key_path}") remote_key_path = self.config.get(section, 'remote_key_path')
# Continue anyway - key might be uploaded via additional_files source_key_path = self.get_key_path(section, source_cert_path)
# Upload additional files if specified logger.info(f"Uploading private key")
if self.config.has_option(section, 'additional_files'):
additional_files = self.config.get(section, 'additional_files') if not os.path.exists(source_key_path):
# Format: local_path:remote_path,local_path:remote_path logger.error(f"Private key not found: {source_key_path}")
for file_pair in additional_files.split(','): ssh.disconnect()
if ':' in file_pair: self.stats['failed'] += 1
local, remote = file_pair.strip().split(':', 1) return False
logger.info(f"Uploading additional file: {local} -> {remote}")
if not ssh.upload_file(local, remote): if not ssh.upload_file(source_key_path, remote_key_path):
logger.warning(f"Failed to upload additional file: {local}") logger.warning(f"Failed to upload private key")
# Execute post-upload command
if post_upload_command:
logger.info(f"Executing post-upload command: {post_upload_command}")
success, stdout, stderr = ssh.execute_command(post_upload_command)
if not success: if self.config.has_option(section, 'additional_files'):
logger.warning(f"Post-upload command failed, but file was uploaded successfully") additional_files = self.config.get(section, 'additional_files')
else: for file_pair in additional_files.split(','):
logger.info(f"✓ Post-upload command completed successfully") if ':' in file_pair:
local, remote = file_pair.strip().split(':', 1)
ssh.disconnect() logger.info(f"Uploading additional: {os.path.basename(local)}")
self.stats['uploaded'] += 1 if not ssh.upload_file(local, remote):
logger.info(f"✓ Successfully processed {section}") logger.warning(f"Failed to upload additional file")
return True
if post_upload_command:
except Exception as e: logger.info(f"Executing post-upload command")
logger.error(f"Failed to process host {section}: {e}", exc_info=True) success, stdout, stderr = ssh.execute_command(post_upload_command)
self.stats['failed'] += 1
return False if not success:
logger.warning(f"Post-upload command failed")
else:
logger.info(f"✓ Post-upload command completed")
ssh.disconnect()
self.stats['uploaded'] += 1
logger.info(f"✓ Host processed successfully")
return True
except Exception as e:
logger.error(f"Failed to process host: {e}")
self.stats['failed'] += 1
return False
def run(self): def run(self):
"""Main execution method""" """Main execution method"""
logger.info("="*60) logger.info("="*60)
logger.info(" CertPusher - SSL Certificate Distribution Tool") logger.info(" CertPusher - SSL Certificate Distribution")
logger.info("="*60) logger.info("="*60)
logger.info(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Log file: {LOG_FILE}")
logger.info("") logger.info("")
if not self.load_config(): if not self.load_config():
logger.error("Configuration loading failed. Exiting.") logger.error("Configuration loading failed")
sys.exit(1) sys.exit(1)
# Verify source certificate exists
source_cert = self.config.get('global', 'source_cert_path') source_cert = self.config.get('global', 'source_cert_path')
if not os.path.exists(source_cert): if not os.path.exists(source_cert):
logger.error(f"Source certificate not found: {source_cert}") logger.error(f"Source certificate not found: {source_cert}")
@@ -783,12 +669,10 @@ class CertPusher:
logger.info(f"Source certificate: {source_cert}") logger.info(f"Source certificate: {source_cert}")
# Display certificate info
cert = self.cert_manager.get_cert_from_file(source_cert) cert = self.cert_manager.get_cert_from_file(source_cert)
if cert: if cert:
logger.info(self.cert_manager.get_certificate_info(cert)) logger.info(self.cert_manager.get_certificate_info(cert))
# Process each host
for section in self.config.sections(): for section in self.config.sections():
if section == 'global': if section == 'global':
continue continue
@@ -796,19 +680,18 @@ class CertPusher:
try: try:
self.process_host(section) self.process_host(section)
except Exception as e: except Exception as e:
logger.error(f"Unexpected error processing {section}: {e}", exc_info=True) logger.error(f"Unexpected error: {e}")
self.stats['failed'] += 1 self.stats['failed'] += 1
# Print summary
logger.info("\n" + "="*60) logger.info("\n" + "="*60)
logger.info(" DEPLOYMENT SUMMARY") logger.info(" SUMMARY")
logger.info("="*60) logger.info("="*60)
logger.info(f"Total hosts: {self.stats['total']}") logger.info(f"Total hosts: {self.stats['total']}")
logger.info(f"✓ Uploaded: {self.stats['uploaded']}") logger.info(f"✓ Uploaded: {self.stats['uploaded']}")
logger.info(f"○ Skipped (up to date): {self.stats['skipped']}") logger.info(f"○ Skipped: {self.stats['skipped']}")
logger.info(f"✗ Failed: {self.stats['failed']}") logger.info(f"✗ Failed: {self.stats['failed']}")
logger.info("="*60) logger.info("="*60)
logger.info(f"Finished at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") logger.info(f"Finished: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if self.stats['failed'] > 0: if self.stats['failed'] > 0:
sys.exit(1) sys.exit(1)
@@ -816,6 +699,26 @@ class CertPusher:
def main(): def main():
"""Entry point""" """Entry point"""
parser = argparse.ArgumentParser(
description='CertPusher - SSL Certificate Distribution Tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s config.ini # Normal operation
%(prog)s config.ini --debug # Debug mode with verbose logging
%(prog)s config.ini -d # Same as above
"""
)
parser.add_argument('config', help='Path to configuration file')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
parser.add_argument('-v', '--version', action='version', version='CertPusher 1.0')
args = parser.parse_args()
# Setup logging based on debug flag
setup_logging(debug=args.debug)
print(""" print("""
╔═══════════════════════════════════════════════════════════╗ ╔═══════════════════════════════════════════════════════════╗
║ CertPusher v1.0 ║ ║ CertPusher v1.0 ║
@@ -823,20 +726,12 @@ def main():
╚═══════════════════════════════════════════════════════════╝ ╚═══════════════════════════════════════════════════════════╝
""") """)
if len(sys.argv) < 2: if not os.path.exists(args.config):
print("Usage: python certpusher.py <config_file>") print(f"Error: Configuration file '{args.config}' not found")
print("Example: python certpusher.py config.ini")
print("")
sys.exit(1)
config_file = sys.argv[1]
if not os.path.exists(config_file):
print(f"Error: Configuration file '{config_file}' not found")
sys.exit(1) sys.exit(1)
try: try:
pusher = CertPusher(config_file) pusher = CertPusher(args.config)
pusher.run() pusher.run()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n\nInterrupted by user. Exiting...") print("\n\nInterrupted by user. Exiting...")