diff --git a/certpusher.py b/certpusher.py index cb160a3..319caa1 100644 --- a/certpusher.py +++ b/certpusher.py @@ -285,93 +285,58 @@ class MikroTikManager(SSHManager): def __init__(self, hostname: str, port: int, username: str, key_path: str): super().__init__(hostname, port, username, key_path) - self.cert_name = "ssl-cert" + # Use unique, timestamped certificate name + self.cert_name = f"certpusher-{datetime.now().strftime('%Y%m')}" + # This will create names like: certpusher-202510 def check_certificate_expiry(self, source_cert: x509.Certificate) -> bool: """Check if certificate on MikroTik needs update""" try: logger.info("Checking MikroTik certificate") - # First, check what certificates exist + # Get source certificate common-name for matching + source_cn = source_cert.subject.rfc4514_string() + source_expiry = source_cert.not_valid_after_utc + + logger.info(f"Looking for cert with CN: {source_cn}") + logger.info(f"Source expires: {source_expiry}") + + # Search for certificate by common-name (more reliable than name) success, stdout, stderr = self.execute_command( - '/certificate print', + f'/certificate print detail where common-name~"{source_cn.split("CN=")[1] if "CN=" in source_cn else source_cn}"', ignore_error=True ) - if success and stdout: - logger.debug(f"Certificates on MikroTik:\n{stdout}") - - # Get detailed info about our certificate - # Try multiple patterns to find our cert - patterns = [ - f'name~"{self.cert_name}"', # ssl-cert - f'name~"{self.cert_name}_0"', # ssl-cert_0 - 'common-name~".*"', # Any cert - ] - - cert_output = None - used_pattern = None - - for pattern in patterns: - success, stdout, stderr = self.execute_command( - f'/certificate print detail where {pattern}', - ignore_error=True - ) - - if success and stdout and 'invalid-after' in stdout.lower(): - cert_output = stdout - used_pattern = pattern - logger.debug(f"Found certificate using pattern: {pattern}") - break - - if not cert_output: - logger.info("No certificate found on MikroTik. Upload needed.") + if not success or not stdout or 'invalid-after' not in stdout.lower(): + logger.info("Certificate not found on MikroTik. Upload needed.") return True - # Show raw output for debugging - logger.debug(f"Certificate details:\n{cert_output}") + logger.debug(f"Found certificate:\n{stdout}") - # Try multiple date formats + # Parse expiry date (multiple formats) invalid_after_match = re.search( - r'invalid-after[:\s=]+([a-zA-Z]{3}[/\s]\d{1,2}[/\s]\d{4}\s+\d{2}:\d{2}:\d{2})', - cert_output, + r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', + stdout, re.IGNORECASE ) if not invalid_after_match: invalid_after_match = re.search( - r'invalid-after[:\s=]+(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', - cert_output, + r'invalid-after[:\s=]+([a-zA-Z]{3}[/\s]\d{1,2}[/\s]\d{4}\s+\d{2}:\d{2}:\d{2})', + stdout, re.IGNORECASE ) if not invalid_after_match: - # Try to find ANY date pattern - invalid_after_match = re.search( - r'(jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)[/\s-]\d{1,2}[/\s-]\d{4}\s+\d{2}:\d{2}:\d{2}', - cert_output, - re.IGNORECASE - ) - - if not invalid_after_match: - logger.warning(f"Could not parse expiry date") - logger.info(f"Raw output to help debug:") - logger.info(cert_output[:500]) # First 500 chars - logger.info("Cannot verify - proceeding with upload for safety.") + logger.warning("Could not parse expiry date") return True mikrotik_expiry_str = invalid_after_match.group(1) - logger.info(f"Found expiry: {mikrotik_expiry_str}") + logger.debug(f"Parsed expiry: {mikrotik_expiry_str}") - # Parse with flexible format + # Parse date mikrotik_expiry = None - date_formats = [ - '%b/%d/%Y %H:%M:%S', - '%b %d %Y %H:%M:%S', - '%Y-%m-%d %H:%M:%S', - ] - - for fmt in date_formats: + for fmt in ['%Y-%m-%d %H:%M:%S', '%b/%d/%Y %H:%M:%S', '%b %d %Y %H:%M:%S']: try: mikrotik_expiry = datetime.strptime(mikrotik_expiry_str, fmt) mikrotik_expiry = mikrotik_expiry.replace(tzinfo=timezone.utc) @@ -383,17 +348,15 @@ class MikroTikManager(SSHManager): logger.warning(f"Could not parse date: {mikrotik_expiry_str}") return True - source_expiry = source_cert.not_valid_after_utc - time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) - - logger.info(f"Source expires: {source_expiry}") logger.info(f"MikroTik expires: {mikrotik_expiry}") + time_diff = abs((source_expiry - mikrotik_expiry).total_seconds()) + if time_diff < 86400: - logger.info("✓ MikroTik certificate is current. Skipping.") + logger.info("✓ Certificate is current (within 24h). Skipping.") return False else: - logger.info(f"Certificate differs (diff: {time_diff/86400:.1f} days). Upload needed.") + logger.info(f"Certificate differs ({time_diff/86400:.1f} days). Upload needed.") return True except Exception as e: @@ -401,6 +364,82 @@ class MikroTikManager(SSHManager): import traceback logger.debug(traceback.format_exc()) return True + + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: + """Upload certificate to MikroTik""" + try: + if check_first and source_cert: + if not self.check_certificate_expiry(source_cert): + return True, False + + logger.info("Deploying MikroTik certificate") + + # Disable service + self.execute_command('/ip service disable www-ssl', ignore_error=True) + + # Remove OLD certificates with our naming pattern + cleanup_commands = [ + f'/certificate remove [find name~"certpusher"]', + f'/file remove [find name~"certpusher"]', + ] + + for cmd in cleanup_commands: + self.execute_command(cmd, ignore_error=True) + + # Upload with unique name + logger.info(f"Uploading as: {self.cert_name}") + with SCPClient(self.ssh_client.get_transport()) as scp: + scp.put(cert_path, f'{self.cert_name}.pem') + + if key_path: + with SCPClient(self.ssh_client.get_transport()) as scp: + scp.put(key_path, f'{self.cert_name}-key.pem') + + # Import certificate + logger.info("Importing certificate") + import_cmd = f'/certificate import file-name={self.cert_name}.pem passphrase=""' + if key_path: + import_cmd += f' name={self.cert_name}' + + self.execute_command(import_cmd, timeout=30) + + import time + time.sleep(2) + + # Find the imported certificate (MikroTik adds _0, _1 suffixes) + success, stdout, stderr = self.execute_command( + f'/certificate print where name~"{self.cert_name}"' + ) + + if success and stdout: + logger.debug(f"Imported certificates:\n{stdout}") + + # Extract certificate name (usually cert_name_0) + cert_match = re.search(r'name="([^"]+)"', stdout) + if cert_match: + imported_name = cert_match.group(1) + logger.info(f"Certificate imported as: {imported_name}") + + # Configure service + config_commands = [ + f'/ip service set www-ssl certificate={imported_name}', + '/ip service enable www-ssl', + ] + + for cmd in config_commands: + self.execute_command(cmd, ignore_error=True) + else: + logger.warning("Could not find imported certificate name, using default") + self.execute_command(f'/ip service set www-ssl certificate={self.cert_name}_0', ignore_error=True) + self.execute_command('/ip service enable www-ssl', ignore_error=True) + + logger.info(f"✓ MikroTik deployment successful") + return True, True + + except Exception as e: + logger.error(f"MikroTik deployment failed: {e}") + return False, False + def upload_certificate(self, cert_path: str, key_path: str, check_first: bool, source_cert: x509.Certificate) -> Tuple[bool, bool]: