diff --git a/certpusher.py b/certpusher.py index 319caa1..2fe89e7 100644 --- a/certpusher.py +++ b/certpusher.py @@ -285,35 +285,39 @@ class MikroTikManager(SSHManager): def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) - # Use unique, timestamped certificate name - self.cert_name = f"certpusher-{datetime.now().strftime('%Y%m')}" - # This will create names like: certpusher-202510 + # Use simple name without special characters + self.cert_name = "letsencrypt" + self.key_name = "letsencrypt-key" def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: """Check if certificate on MikroTik needs update""" try: logger.info("Checking MikroTik certificate") - # Get source certificate common-name for matching + # Get source certificate common-name source_cn = source_cert.subject.rfc4514_string() source_expiry = source_cert.not_valid_after_utc - logger.info(f"Looking for cert with CN: {source_cn}") + # Extract CN value (e.g., "*.linuxiarz.pl" from "CN=*.linuxiarz.pl") + cn_value = source_cn.split("CN=")[1].split(",")[0] if "CN=" in source_cn else source_cn + + logger.info(f"Looking for cert with CN: {cn_value}") logger.info(f"Source expires: {source_expiry}") - # Search for certificate by common-name (more reliable than name) + # Search by common-name (escape wildcards for MikroTik) + cn_search = cn_value.replace("*", "\\*") success, stdout, stderr = self.execute_command( - f'/certificate print detail where common-name~"{source_cn.split("CN=")[1] if "CN=" in source_cn else source_cn}"', + f'/certificate print detail where common-name="{cn_search}"', ignore_error=True ) if not success or not stdout or 'invalid-after' not in stdout.lower(): - logger.info("Certificate not found on MikroTik. Upload needed.") + logger.info("Certificate not found. Upload needed.") return True - logger.debug(f"Found certificate:\n{stdout}") + logger.debug(f"Found certificate:\n{stdout[:500]}") - # Parse expiry date (multiple formats) + # Parse expiry (RouterOS 7.x format: 2026-01-22 08:34:12) invalid_after_match = re.search( r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', stdout, @@ -321,30 +325,15 @@ class MikroTikManager(SSHManager): ) if not invalid_after_match: - invalid_after_match = re.search( - r'invalid-after[:\s=]+([a-zA-Z]{3}[/\s]\d{1,2}[/\s]\d{4}\s+\d{2}:\d{2}:\d{2})', - stdout, - re.IGNORECASE - ) - - if not invalid_after_match: - logger.warning("Could not parse expiry date") + logger.warning("Could not parse expiry") return True mikrotik_expiry_str = invalid_after_match.group(1) - logger.debug(f"Parsed expiry: {mikrotik_expiry_str}") - # Parse date - mikrotik_expiry = None - for fmt in ['%Y-%m-%d %H:%M:%S', '%b/%d/%Y %H:%M:%S', '%b %d %Y %H:%M:%S']: - try: - mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, fmt) - mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) - break - except ValueError: - continue - - if not mikrotik_expiry: + try: + mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, '%Y-%m-%d %H:%M:%S') + mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) + except ValueError: logger.warning(f"Could not parse date: {mikrotik_expiry_str}") return True @@ -352,8 +341,9 @@ class MikroTikManager(SSHManager): time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) + # Allow 24h tolerance if time_diff < 86400: - logger.info("✓ Certificate is current (within 24h). Skipping.") + logger.info("✓ Certificate is current. Skipping.") return False else: logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.") @@ -365,6 +355,118 @@ class MikroTikManager(SSHManager): logger.debug(traceback.format_exc()) return True + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: + """Upload certificate to MikroTik""" + try: + if check_first and source_cert: + if not self.check_certificate_expiry(source_cert): + return True, False + + logger.info("Deploying MikroTik certificate") + + # Step 1: Disable www-ssl + logger.debug("Disabling www-ssl service") + self.execute_command('/ip service disable www-ssl', ignore_error=True) + + import time + time.sleep(1) + + # Step 2: Remove old certificates with our name + logger.debug("Cleaning up old certificates") + cleanup_commands = [ + f'/certificate remove [find name~"{self.cert_name}"]', + f'/file remove "{self.cert_name}.pem"', + f'/file remove "{self.key_name}.pem"', + ] + + for cmd in cleanup_commands: + self.execute_command(cmd, ignore_error=True) + + time.sleep(1) + + # Step 3: Upload certificate + logger.info(f"Uploading certificate as: {self.cert_name}.pem") + with SCPClient(self.ssh_client.get_transport()) as scp: + scp.put(cert_path, f'{self.cert_name}.pem') + + # Step 4: Upload private key + if key_path: + logger.info(f"Uploading key as: {self.key_name}.pem") + with SCPClient(self.ssh_client.get_transport()) as scp: + scp.put(key_path, f'{self.key_name}.pem') + + time.sleep(1) + + # Step 5: Import certificate (fullchain includes both cert and chain) + logger.info("Importing certificate") + import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' + success, stdout, stderr = self.execute_command(import_cmd, timeout=30) + + if not success: + logger.error(f"Certificate import failed: {stderr}") + # Show available files for debugging + self.execute_command('/file print where name~".pem"') + return False, False + + time.sleep(2) + + # Step 6: Find imported certificate name + success, stdout, stderr = self.execute_command( + f'/certificate print terse where name~"{self.cert_name}"' + ) + + if not success or not stdout: + logger.error("Could not find imported certificate") + return False, False + + logger.debug(f"Imported certificates:\n{stdout}") + + # Extract certificate name (MikroTik adds _0, _1 suffixes) + # Format: 0 name="letsencrypt_0" ... + cert_names = re.findall(r'name="([^"]+)"', stdout) + + if not cert_names: + logger.error("Could not parse certificate names") + return False, False + + # Use first certificate (usually the actual cert, not CA) + imported_cert_name = cert_names[0] + logger.info(f"Using certificate: {imported_cert_name}") + + # Step 7: Configure www-ssl service + logger.info("Configuring www-ssl service") + config_cmd = f'/ip service set www-ssl certificate={imported_cert_name}' + success, stdout, stderr = self.execute_command(config_cmd) + + if not success: + logger.error(f"Failed to set certificate: {stderr}") + # Show available certificates + self.execute_command('/certificate print') + return False, False + + # Step 8: Enable www-ssl + self.execute_command('/ip service enable www-ssl', ignore_error=True) + + time.sleep(1) + + # Step 9: Verify + success, stdout, stderr = self.execute_command( + '/ip service print where name="www-ssl"' + ) + + if success: + logger.debug(f"Service status:\n{stdout}") + + logger.info(f"✓ MikroTik deployment successful") + return True, True + + except Exception as e: + logger.error(f"MikroTik deployment failed: {e}") + import traceback + logger.debug(traceback.format_exc()) + return False, False + + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: """Upload certificate to MikroTik""" try: