"""Certificate Management - Upload, List, Delete""" from flask import Blueprint, request, jsonify, session from functools import wraps from database import db from database.models import Certificate, VirtualHost from utils.cert_manager import parse_certificate, save_cert_file, delete_cert_file from config.settings import UPLOAD_FOLDER, MAX_CONTENT_LENGTH from datetime import datetime import os import logging cert_bp = Blueprint('certs', __name__, url_prefix='/api/certificates') logger = logging.getLogger(__name__) def login_required_api(f): """API version of login required""" @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: return jsonify({'error': 'Not authenticated', 'success': False}), 401 return f(*args, **kwargs) return decorated_function @cert_bp.route('', methods=['GET']) @login_required_api def list_certificates(): """Get all certificates""" try: certs = Certificate.query.order_by(Certificate.created_at.desc()).all() return jsonify({ 'success': True, 'certificates': [{ 'id': c.id, 'name': c.name, 'common_name': c.common_name, 'expires_at': c.expires_at.isoformat() if c.expires_at else None, 'created_at': c.created_at.isoformat(), 'vhost_count': len(c.vhosts), 'is_expired': c.expires_at < datetime.utcnow() if c.expires_at else False } for c in certs] }) except Exception as e: logger.error(f"[CERTS] Error listing: {e}", flush=True) return jsonify({'error': str(e), 'success': False}), 500 @cert_bp.route('/', methods=['GET']) @login_required_api def get_certificate(cert_id): """Get certificate details""" try: cert = Certificate.query.get(cert_id) if not cert: return jsonify({'error': 'Certificate not found', 'success': False}), 404 return jsonify({ 'success': True, 'certificate': { 'id': cert.id, 'name': cert.name, 'common_name': cert.common_name, 'subject_alt_names': cert.get_san_list(), 'issued_at': cert.issued_at.isoformat() if cert.issued_at else None, 'expires_at': cert.expires_at.isoformat() if cert.expires_at else None, 'created_at': cert.created_at.isoformat(), 'vhosts': [{ 'id': v.id, 'name': v.name, 'hostname': v.hostname } for v in cert.vhosts] } }) except Exception as e: logger.error(f"[CERTS] Error getting cert {cert_id}: {e}", flush=True) return jsonify({'error': str(e), 'success': False}), 500 @cert_bp.route('', methods=['POST']) @login_required_api def upload_certificate(): """Upload SSL certificate (PEM format)""" try: # Check if file provided if 'cert_file' not in request.files: return jsonify({'error': 'No certificate file provided', 'success': False}), 400 file = request.files['cert_file'] cert_name = request.form.get('name', '').strip() if not file or file.filename == '': return jsonify({'error': 'No selected file', 'success': False}), 400 if not cert_name: return jsonify({'error': 'Certificate name required', 'success': False}), 400 # Check if name already exists if Certificate.query.filter_by(name=cert_name).first(): return jsonify({'error': 'Certificate name already exists', 'success': False}), 400 # Read file content cert_content = file.read().decode('utf-8') # Validate and parse certificate cert_data = parse_certificate(cert_content) if 'error' in cert_data: return jsonify({'error': cert_data['error'], 'success': False}), 400 # Create certificate record cert = Certificate( name=cert_name, cert_content=cert_content, cert_only=cert_data.get('cert_only'), key_only=cert_data.get('key_only'), common_name=cert_data.get('common_name'), issued_at=cert_data.get('issued_at'), expires_at=cert_data.get('expires_at') ) if cert_data.get('subject_alt_names'): cert.set_san_list(cert_data['subject_alt_names']) db.session.add(cert) db.session.flush() # Save cert file to disk cert_path = os.path.join(UPLOAD_FOLDER, f'{cert_name}.pem') if not save_cert_file(cert_path, cert_content): db.session.rollback() return jsonify({'error': 'Failed to save certificate file', 'success': False}), 500 db.session.commit() logger.info(f"[CERTS] Uploaded certificate '{cert_name}' by {session.get('username')}", flush=True) return jsonify({ 'success': True, 'id': cert.id, 'name': cert.name, 'message': 'Certificate uploaded successfully' }), 201 except Exception as e: db.session.rollback() logger.error(f"[CERTS] Error uploading cert: {e}", flush=True) return jsonify({'error': str(e), 'success': False}), 500 @cert_bp.route('/', methods=['DELETE']) @login_required_api def delete_certificate(cert_id): """Delete certificate""" try: cert = Certificate.query.get(cert_id) if not cert: return jsonify({'error': 'Certificate not found', 'success': False}), 404 # Check if certificate is in use if cert.vhosts: vhost_names = [v.name for v in cert.vhosts] return jsonify({ 'error': f'Certificate is in use by vhosts: {", ".join(vhost_names)}', 'success': False }), 400 cert_name = cert.name # Delete file from disk cert_path = os.path.join(UPLOAD_FOLDER, f'{cert_name}.pem') delete_cert_file(cert_path) # Delete from database db.session.delete(cert) db.session.commit() logger.info(f"[CERTS] Deleted certificate '{cert_name}' by {session.get('username')}", flush=True) return jsonify({ 'success': True, 'message': f'Certificate {cert_name} deleted successfully' }) except Exception as e: db.session.rollback() logger.error(f"[CERTS] Error deleting cert: {e}", flush=True) return jsonify({'error': str(e), 'success': False}), 500 @cert_bp.route('//export', methods=['GET']) @login_required_api def export_certificate(cert_id): """Download certificate in PEM format""" try: cert = Certificate.query.get(cert_id) if not cert: return jsonify({'error': 'Certificate not found', 'success': False}), 404 # Read from disk cert_path = os.path.join(UPLOAD_FOLDER, f'{cert.name}.pem') if not os.path.exists(cert_path): return jsonify({'error': 'Certificate file not found', 'success': False}), 404 with open(cert_path, 'r') as f: cert_content = f.read() return jsonify({ 'success': True, 'name': cert.name, 'content': cert_content }) except Exception as e: logger.error(f"[CERTS] Error exporting cert: {e}", flush=True) return jsonify({'error': str(e), 'success': False}), 500