218 lines
7.6 KiB
Python
218 lines
7.6 KiB
Python
"""Certificate Management - Upload, List, Delete"""
|
|
|
|
from flask import Blueprint, request, jsonify, session
|
|
from functools import wraps
|
|
from database import db
|
|
from database.models import Certificate, VirtualHost
|
|
from utils.cert_manager import parse_certificate, save_cert_file, delete_cert_file
|
|
from config.settings import UPLOAD_FOLDER, MAX_CONTENT_LENGTH
|
|
from datetime import datetime
|
|
import os
|
|
import logging
|
|
|
|
cert_bp = Blueprint('certs', __name__, url_prefix='/api/certificates')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def login_required_api(f):
|
|
"""API version of login required"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Not authenticated', 'success': False}), 401
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
@cert_bp.route('', methods=['GET'])
|
|
@login_required_api
|
|
def list_certificates():
|
|
"""Get all certificates"""
|
|
try:
|
|
certs = Certificate.query.order_by(Certificate.created_at.desc()).all()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'certificates': [{
|
|
'id': c.id,
|
|
'name': c.name,
|
|
'common_name': c.common_name,
|
|
'expires_at': c.expires_at.isoformat() if c.expires_at else None,
|
|
'created_at': c.created_at.isoformat(),
|
|
'vhost_count': len(c.vhosts),
|
|
'is_expired': c.expires_at < datetime.utcnow() if c.expires_at else False
|
|
} for c in certs]
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"[CERTS] Error listing: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@cert_bp.route('/<int:cert_id>', methods=['GET'])
|
|
@login_required_api
|
|
def get_certificate(cert_id):
|
|
"""Get certificate details"""
|
|
try:
|
|
cert = Certificate.query.get(cert_id)
|
|
if not cert:
|
|
return jsonify({'error': 'Certificate not found', 'success': False}), 404
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'certificate': {
|
|
'id': cert.id,
|
|
'name': cert.name,
|
|
'common_name': cert.common_name,
|
|
'subject_alt_names': cert.get_san_list(),
|
|
'issued_at': cert.issued_at.isoformat() if cert.issued_at else None,
|
|
'expires_at': cert.expires_at.isoformat() if cert.expires_at else None,
|
|
'created_at': cert.created_at.isoformat(),
|
|
'vhosts': [{
|
|
'id': v.id,
|
|
'name': v.name,
|
|
'hostname': v.hostname
|
|
} for v in cert.vhosts]
|
|
}
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"[CERTS] Error getting cert {cert_id}: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@cert_bp.route('', methods=['POST'])
|
|
@login_required_api
|
|
def upload_certificate():
|
|
"""Upload SSL certificate (PEM format)"""
|
|
try:
|
|
# Check if file provided
|
|
if 'cert_file' not in request.files:
|
|
return jsonify({'error': 'No certificate file provided', 'success': False}), 400
|
|
|
|
file = request.files['cert_file']
|
|
cert_name = request.form.get('name', '').strip()
|
|
|
|
if not file or file.filename == '':
|
|
return jsonify({'error': 'No selected file', 'success': False}), 400
|
|
|
|
if not cert_name:
|
|
return jsonify({'error': 'Certificate name required', 'success': False}), 400
|
|
|
|
# Check if name already exists
|
|
if Certificate.query.filter_by(name=cert_name).first():
|
|
return jsonify({'error': 'Certificate name already exists', 'success': False}), 400
|
|
|
|
# Read file content
|
|
cert_content = file.read().decode('utf-8')
|
|
|
|
# Validate and parse certificate
|
|
cert_data = parse_certificate(cert_content)
|
|
if 'error' in cert_data:
|
|
return jsonify({'error': cert_data['error'], 'success': False}), 400
|
|
|
|
# Create certificate record
|
|
cert = Certificate(
|
|
name=cert_name,
|
|
cert_content=cert_content,
|
|
cert_only=cert_data.get('cert_only'),
|
|
key_only=cert_data.get('key_only'),
|
|
common_name=cert_data.get('common_name'),
|
|
issued_at=cert_data.get('issued_at'),
|
|
expires_at=cert_data.get('expires_at')
|
|
)
|
|
|
|
if cert_data.get('subject_alt_names'):
|
|
cert.set_san_list(cert_data['subject_alt_names'])
|
|
|
|
db.session.add(cert)
|
|
db.session.flush()
|
|
|
|
# Save cert file to disk
|
|
cert_path = os.path.join(UPLOAD_FOLDER, f'{cert_name}.pem')
|
|
if not save_cert_file(cert_path, cert_content):
|
|
db.session.rollback()
|
|
return jsonify({'error': 'Failed to save certificate file', 'success': False}), 500
|
|
|
|
db.session.commit()
|
|
|
|
logger.info(f"[CERTS] Uploaded certificate '{cert_name}' by {session.get('username')}", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'id': cert.id,
|
|
'name': cert.name,
|
|
'message': 'Certificate uploaded successfully'
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[CERTS] Error uploading cert: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@cert_bp.route('/<int:cert_id>', methods=['DELETE'])
|
|
@login_required_api
|
|
def delete_certificate(cert_id):
|
|
"""Delete certificate"""
|
|
try:
|
|
cert = Certificate.query.get(cert_id)
|
|
if not cert:
|
|
return jsonify({'error': 'Certificate not found', 'success': False}), 404
|
|
|
|
# Check if certificate is in use
|
|
if cert.vhosts:
|
|
vhost_names = [v.name for v in cert.vhosts]
|
|
return jsonify({
|
|
'error': f'Certificate is in use by vhosts: {", ".join(vhost_names)}',
|
|
'success': False
|
|
}), 400
|
|
|
|
cert_name = cert.name
|
|
|
|
# Delete file from disk
|
|
cert_path = os.path.join(UPLOAD_FOLDER, f'{cert_name}.pem')
|
|
delete_cert_file(cert_path)
|
|
|
|
# Delete from database
|
|
db.session.delete(cert)
|
|
db.session.commit()
|
|
|
|
logger.info(f"[CERTS] Deleted certificate '{cert_name}' by {session.get('username')}", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'Certificate {cert_name} deleted successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[CERTS] Error deleting cert: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@cert_bp.route('/<int:cert_id>/export', methods=['GET'])
|
|
@login_required_api
|
|
def export_certificate(cert_id):
|
|
"""Download certificate in PEM format"""
|
|
try:
|
|
cert = Certificate.query.get(cert_id)
|
|
if not cert:
|
|
return jsonify({'error': 'Certificate not found', 'success': False}), 404
|
|
|
|
# Read from disk
|
|
cert_path = os.path.join(UPLOAD_FOLDER, f'{cert.name}.pem')
|
|
if not os.path.exists(cert_path):
|
|
return jsonify({'error': 'Certificate file not found', 'success': False}), 404
|
|
|
|
with open(cert_path, 'r') as f:
|
|
cert_content = f.read()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'name': cert.name,
|
|
'content': cert_content
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"[CERTS] Error exporting cert: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|