This commit is contained in:
Mateusz Gruszczyński
2025-11-04 09:56:37 +01:00
parent 32ef62e4ac
commit addb21bc3e
34 changed files with 3864 additions and 367 deletions

123
utils/cert_manager.py Normal file
View File

@@ -0,0 +1,123 @@
"""Certificate Management Utilities - Parse, Validate, Save"""
import os
import re
from datetime import datetime
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import logging
logger = logging.getLogger(__name__)
def parse_certificate(pem_content):
"""
Parse PEM certificate and extract metadata
Returns: dict with 'common_name', 'expires_at', 'subject_alt_names', 'error'
"""
try:
# Split cert and key if combined
cert_only = None
key_only = None
# Extract certificate
cert_match = re.search(
r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
pem_content,
re.DOTALL
)
if not cert_match:
return {'error': 'No valid certificate found in PEM file'}
cert_only = cert_match.group(0)
# Extract private key if present
key_match = re.search(
r'-----BEGIN (?:RSA )?PRIVATE KEY-----.*?-----END (?:RSA )?PRIVATE KEY-----',
pem_content,
re.DOTALL
)
if key_match:
key_only = key_match.group(0)
# Parse certificate
try:
cert = x509.load_pem_x509_certificate(
cert_only.encode(),
default_backend()
)
except Exception as e:
return {'error': f'Invalid certificate: {str(e)}'}
# Extract common name
common_name = None
try:
common_name = cert.subject.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)[0].value
except:
pass
# Extract Subject Alternative Names (SAN)
subject_alt_names = []
try:
san_ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
for name in san_ext.value:
if isinstance(name, x509.DNSName):
subject_alt_names.append(name.value)
except:
pass
# Extract dates
issued_at = cert.not_valid_before if cert.not_valid_before else None
expires_at = cert.not_valid_after if cert.not_valid_after else None
result = {
'common_name': common_name,
'cert_only': cert_only,
'key_only': key_only,
'issued_at': issued_at,
'expires_at': expires_at,
'subject_alt_names': subject_alt_names
}
logger.info(f"[CERT] Parsed certificate: CN={common_name}, expires={expires_at}", flush=True)
return result
except Exception as e:
logger.error(f"[CERT] Error parsing certificate: {e}", flush=True)
return {'error': f'Failed to parse certificate: {str(e)}'}
def save_cert_file(cert_path, cert_content):
"""Save certificate to disk"""
try:
# Create directory if not exists
os.makedirs(os.path.dirname(cert_path), exist_ok=True)
# Write file
with open(cert_path, 'w') as f:
f.write(cert_content)
# Set permissions (owner only can read)
os.chmod(cert_path, 0o600)
logger.info(f"[CERT] Saved certificate file: {cert_path}", flush=True)
return True
except Exception as e:
logger.error(f"[CERT] Error saving certificate file: {e}", flush=True)
return False
def delete_cert_file(cert_path):
"""Delete certificate file from disk"""
try:
if os.path.exists(cert_path):
os.remove(cert_path)
logger.info(f"[CERT] Deleted certificate file: {cert_path}", flush=True)
return True
except Exception as e:
logger.error(f"[CERT] Error deleting certificate file: {e}", flush=True)
return False