first commit

This commit is contained in:
Mateusz Gruszczyński
2025-10-26 22:47:38 +01:00
commit ff578b7fad
5 changed files with 465 additions and 0 deletions

11
.gitignore vendored Normal file
View File

@@ -0,0 +1,11 @@
*.pyc
__pycache__/
*.log
config.ini
*.pem
*.key
*.crt
.venv/
venv/
*.swp
.DS_Store

72
README.md Normal file
View File

@@ -0,0 +1,72 @@
# CertPusher
Automated SSL certificate distribution tool for deploying certificates to multiple remote servers via SSH/SCP.
## Features
- **Multi-server deployment**: Deploy certificates to unlimited number of servers
- **Smart certificate comparison**: Checks if remote certificate needs updating via HTTPS
- **Flexible SSH authentication**: Global or per-host SSH key configuration
- **Post-deployment commands**: Execute commands after certificate upload (reload services, etc.)
- **Comprehensive logging**: Debug-level logging with timestamped log files
- **Safe execution**: Compares certificates before uploading to avoid unnecessary restarts
## Installation
git clone https://github.com/yourusername/certpusher.git
cd certpusher
pip install -r requirements.txt
## Configuration
1. Copy the example configuration:
cp config.ini.example config.ini
2. Edit `config.ini` with your server details:
### Global Section
- `source_cert_path`: Path to the SSL certificate to distribute
- `default_ssh_key`: Default SSH private key path
### Host Sections
Each host requires:
- `hostname`: IP address or hostname
- `port`: SSH port (default: 22)
- `username`: SSH username
- `remote_cert_path`: Destination path for the certificate
- `post_upload_command`: Command to run after upload (optional)
- `check_url`: HTTPS URL to check current certificate (optional)
- `ssh_key_path`: Override default SSH key (optional)
## Usage
python certpusher.py config.ini
## SSH Key Setup
Generate SSH key for authentication:
ssh-keygen -t ed25519 -f ~/.ssh/certpusher_key
ssh-copy-id -i ~/.ssh/certpusher_key.pub user@remote-host
2025-10-26 22:00:00 - CertPusher - INFO - ============================================================
2025-10-26 22:00:00 - CertPusher - INFO - CertPusher - SSL Certificate Distribution Tool
2025-10-26 22:00:00 - CertPusher - INFO - ============================================================
2025-10-26 22:00:01 - CertPusher - INFO - Processing host: webserver1
2025-10-26 22:00:02 - CertPusher - INFO - ✓ Successfully processed webserver1
## Security Considerations
- Store SSH private keys securely with proper permissions (chmod 600)
- Use dedicated SSH keys for certificate deployment
- Limit SSH key access with `authorized_keys` restrictions
- Consider using SSH certificates for enhanced security
- Rotate SSH keys regularly
## License
MIT License

338
certpusher.py Normal file
View File

@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""
CertPusher - Automated SSL Certificate Distribution Tool
Distributes SSL certificates to remote servers via SSH/SCP
"""
import configparser
import logging
import sys
import os
import ssl
import socket
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Tuple
import paramiko
from scp import SCPClient
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Logging configuration
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=logging.DEBUG,
format=LOG_FORMAT,
handlers=[
logging.FileHandler(f'certpusher_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger('CertPusher')
class CertificateManager:
"""Manages certificate comparison and validation"""
@staticmethod
def get_cert_from_file(cert_path: str) -> Optional[x509.Certificate]:
"""Load certificate from file"""
try:
with open(cert_path, 'rb') as f:
cert_data = f.read()
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
logger.debug(f"Loaded certificate from {cert_path}")
logger.debug(f"Certificate expires: {cert.not_valid_after}")
return cert
except Exception as e:
logger.error(f"Failed to load certificate from {cert_path}: {e}")
return None
@staticmethod
def get_cert_from_url(url: str, timeout: int = 10) -> Optional[x509.Certificate]:
"""Retrieve certificate from HTTPS URL"""
try:
hostname = url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[0]
port = 443
if ':' in url.replace('https://', '').replace('http://', '').split('/')[0]:
port = int(url.replace('https://', '').replace('http://', '').split('/')[0].split(':')[1])
logger.debug(f"Connecting to {hostname}:{port} to retrieve certificate")
context = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
cert = x509.load_der_x509_certificate(der_cert, default_backend())
logger.debug(f"Retrieved certificate from {url}")
logger.debug(f"Certificate expires: {cert.not_valid_after}")
return cert
except Exception as e:
logger.warning(f"Failed to retrieve certificate from {url}: {e}")
return None
@staticmethod
def compare_certificates(cert1: x509.Certificate, cert2: x509.Certificate) -> bool:
"""Compare two certificates by serial number and fingerprint"""
try:
same_serial = cert1.serial_number == cert2.serial_number
same_fingerprint = cert1.fingerprint(cert1.signature_hash_algorithm) == \
cert2.fingerprint(cert2.signature_hash_algorithm)
logger.debug(f"Certificate comparison - Serial match: {same_serial}, Fingerprint match: {same_fingerprint}")
return same_serial and same_fingerprint
except Exception as e:
logger.error(f"Failed to compare certificates: {e}")
return False
class SSHManager:
"""Manages SSH connections and file transfers"""
def __init__(self, hostname: str, port: int, username: str, key_path: str):
self.hostname = hostname
self.port = port
self.username = username
self.key_path = key_path
self.ssh_client = None
def connect(self) -> bool:
"""Establish SSH connection"""
try:
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
logger.debug(f"Connecting to {self.username}@{self.hostname}:{self.port}")
logger.debug(f"Using SSH key: {self.key_path}")
private_key = paramiko.RSAKey.from_private_key_file(self.key_path)
self.ssh_client.connect(
hostname=self.hostname,
port=self.port,
username=self.username,
pkey=private_key,
timeout=30,
banner_timeout=30
)
logger.info(f"Successfully connected to {self.hostname}:{self.port}")
return True
except Exception as e:
logger.error(f"SSH connection failed to {self.hostname}:{self.port}: {e}")
return False
def upload_file(self, local_path: str, remote_path: str) -> bool:
"""Upload file via SCP"""
try:
logger.debug(f"Uploading {local_path} to {self.hostname}:{remote_path}")
with SCPClient(self.ssh_client.get_transport()) as scp:
scp.put(local_path, remote_path)
logger.info(f"Successfully uploaded {local_path} to {self.hostname}:{remote_path}")
return True
except Exception as e:
logger.error(f"File upload failed: {e}")
return False
def execute_command(self, command: str) -> Tuple[bool, str, str]:
"""Execute command on remote server"""
try:
logger.debug(f"Executing command on {self.hostname}: {command}")
stdin, stdout, stderr = self.ssh_client.exec_command(command, timeout=60)
exit_status = stdout.channel.recv_exit_status()
stdout_text = stdout.read().decode('utf-8')
stderr_text = stderr.read().decode('utf-8')
if exit_status == 0:
logger.info(f"Command executed successfully on {self.hostname}")
logger.debug(f"STDOUT: {stdout_text}")
else:
logger.error(f"Command failed with exit code {exit_status}")
logger.error(f"STDERR: {stderr_text}")
return exit_status == 0, stdout_text, stderr_text
except Exception as e:
logger.error(f"Command execution failed: {e}")
return False, "", str(e)
def disconnect(self):
"""Close SSH connection"""
if self.ssh_client:
self.ssh_client.close()
logger.debug(f"Disconnected from {self.hostname}")
class CertPusher:
"""Main application class"""
def __init__(self, config_file: str):
self.config_file = config_file
self.config = configparser.ConfigParser()
self.cert_manager = CertificateManager()
self.stats = {
'total': 0,
'uploaded': 0,
'skipped': 0,
'failed': 0
}
def load_config(self) -> bool:
"""Load configuration from INI file"""
try:
logger.info(f"Loading configuration from {self.config_file}")
self.config.read(self.config_file)
if 'global' not in self.config:
logger.error("Missing [global] section in config file")
return False
logger.info(f"Configuration loaded successfully")
logger.debug(f"Found {len(self.config.sections()) - 1} host(s) in configuration")
return True
except Exception as e:
logger.error(f"Failed to load configuration: {e}")
return False
def process_host(self, section: str) -> bool:
"""Process certificate deployment for a single host"""
try:
logger.info(f"\n{'='*60}")
logger.info(f"Processing host: {section}")
logger.info(f"{'='*60}")
self.stats['total'] += 1
# Get configuration
hostname = self.config.get(section, 'hostname')
port = self.config.getint(section, 'port', fallback=22)
username = self.config.get(section, 'username', fallback='root')
remote_cert_path = self.config.get(section, 'remote_cert_path')
post_upload_command = self.config.get(section, 'post_upload_command', fallback='')
check_url = self.config.get(section, 'check_url', fallback='')
# Determine SSH key to use
if self.config.has_option(section, 'ssh_key_path'):
ssh_key = self.config.get(section, 'ssh_key_path')
else:
ssh_key = self.config.get('global', 'default_ssh_key')
source_cert_path = self.config.get('global', 'source_cert_path')
logger.info(f"Host: {hostname}:{port}")
logger.info(f"Username: {username}")
logger.info(f"SSH Key: {ssh_key}")
logger.info(f"Remote path: {remote_cert_path}")
# Check if upload is needed
if check_url:
logger.info(f"Checking current certificate at: {check_url}")
source_cert = self.cert_manager.get_cert_from_file(source_cert_path)
remote_cert = self.cert_manager.get_cert_from_url(check_url)
if source_cert and remote_cert:
if self.cert_manager.compare_certificates(source_cert, remote_cert):
logger.info(f"Certificate on {hostname} is already up to date. Skipping upload.")
self.stats['skipped'] += 1
return True
else:
logger.info(f"Certificate on {hostname} is outdated. Upload needed.")
else:
logger.warning(f"Could not compare certificates. Proceeding with upload.")
# Connect and upload
ssh = SSHManager(hostname, port, username, ssh_key)
if not ssh.connect():
self.stats['failed'] += 1
return False
if not ssh.upload_file(source_cert_path, remote_cert_path):
ssh.disconnect()
self.stats['failed'] += 1
return False
# Execute post-upload command
if post_upload_command:
logger.info(f"Executing post-upload command: {post_upload_command}")
success, stdout, stderr = ssh.execute_command(post_upload_command)
if not success:
logger.warning(f"Post-upload command failed, but file was uploaded successfully")
ssh.disconnect()
self.stats['uploaded'] += 1
logger.info(f"✓ Successfully processed {section}")
return True
except Exception as e:
logger.error(f"Failed to process host {section}: {e}", exc_info=True)
self.stats['failed'] += 1
return False
def run(self):
"""Main execution method"""
logger.info("="*60)
logger.info("CertPusher - SSL Certificate Distribution Tool")
logger.info("="*60)
if not self.load_config():
logger.error("Configuration loading failed. Exiting.")
sys.exit(1)
# Verify source certificate exists
source_cert = self.config.get('global', 'source_cert_path')
if not os.path.exists(source_cert):
logger.error(f"Source certificate not found: {source_cert}")
sys.exit(1)
logger.info(f"Source certificate: {source_cert}")
# Process each host
for section in self.config.sections():
if section == 'global':
continue
try:
self.process_host(section)
except Exception as e:
logger.error(f"Unexpected error processing {section}: {e}", exc_info=True)
self.stats['failed'] += 1
# Print summary
logger.info("\n" + "="*60)
logger.info("DEPLOYMENT SUMMARY")
logger.info("="*60)
logger.info(f"Total hosts: {self.stats['total']}")
logger.info(f"✓ Uploaded: {self.stats['uploaded']}")
logger.info(f"○ Skipped (up to date): {self.stats['skipped']}")
logger.info(f"✗ Failed: {self.stats['failed']}")
logger.info("="*60)
if self.stats['failed'] > 0:
sys.exit(1)
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: python certpusher.py <config_file>")
print("Example: python certpusher.py config.ini")
sys.exit(1)
config_file = sys.argv[1]
if not os.path.exists(config_file):
print(f"Error: Configuration file '{config_file}' not found")
sys.exit(1)
pusher = CertPusher(config_file)
pusher.run()

40
config.ini.example Normal file
View File

@@ -0,0 +1,40 @@
[global]
# Path to source SSL certificate (can be fullchain or single cert)
source_cert_path = /etc/letsencrypt/live/example.com/fullchain.pem
# Default SSH key for all hosts (can be overridden per host)
default_ssh_key = /root/.ssh/id_rsa
[webserver1]
hostname = 192.168.1.100
port = 22
username = root
remote_cert_path = /etc/nginx/ssl/certificate.pem
post_upload_command = systemctl reload nginx
check_url = https://example.com
[webserver2]
hostname = 192.168.1.101
port = 2222
username = admin
# Override SSH key for this host
ssh_key_path = /root/.ssh/webserver2_key
remote_cert_path = /etc/apache2/ssl/fullchain.pem
post_upload_command = systemctl reload apache2
check_url = https://subdomain.example.com
[mailserver]
hostname = mail.example.com
port = 22
username = root
remote_cert_path = /etc/postfix/ssl/cert.pem
post_upload_command = systemctl restart postfix && systemctl restart dovecot
# No check_url - always upload
[proxmox]
hostname = 10.0.0.50
port = 22
username = root
remote_cert_path = /etc/pve/local/pveproxy-ssl.pem
post_upload_command = systemctl restart pveproxy
check_url = https://10.0.0.50:8006

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
paramiko>=3.4.0
scp>=0.14.5
cryptography>=41.0.0
requests>=2.31.0