212 lines
7.2 KiB
Python
212 lines
7.2 KiB
Python
"""User management routes - Admin only"""
|
|
|
|
from flask import Blueprint, request, jsonify, session
|
|
from functools import wraps
|
|
from database import db
|
|
from database.models import User
|
|
from routes.auth_routes import admin_required
|
|
import logging
|
|
|
|
user_bp = Blueprint('users', __name__, url_prefix='/api/users')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def api_admin_required(f):
|
|
"""Decorator for API admin requirement"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Not authenticated'}), 401
|
|
|
|
user = User.query.get(session['user_id'])
|
|
if not user or not user.is_admin:
|
|
return jsonify({'error': 'Admin access required'}), 403
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
@user_bp.route('', methods=['GET'])
|
|
@api_admin_required
|
|
def list_users():
|
|
"""List all users"""
|
|
try:
|
|
users = User.query.all()
|
|
return jsonify({
|
|
'success': True,
|
|
'users': [{
|
|
'id': u.id,
|
|
'username': u.username,
|
|
'is_admin': u.is_admin,
|
|
'created_at': u.created_at.isoformat(),
|
|
'last_login': u.last_login.isoformat() if u.last_login else None
|
|
} for u in users]
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"[USERS] Error listing users: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@user_bp.route('', methods=['POST'])
|
|
@api_admin_required
|
|
def create_user():
|
|
"""Create new user"""
|
|
try:
|
|
data = request.json
|
|
username = data.get('username', '').strip()
|
|
password = data.get('password', '').strip()
|
|
is_admin = data.get('is_admin', False)
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Username and password required', 'success': False}), 400
|
|
|
|
if len(username) < 3:
|
|
return jsonify({'error': 'Username must be at least 3 characters', 'success': False}), 400
|
|
|
|
if len(password) < 6:
|
|
return jsonify({'error': 'Password must be at least 6 characters', 'success': False}), 400
|
|
|
|
# Check if exists
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Username already exists', 'success': False}), 400
|
|
|
|
user = User(username=username, is_admin=is_admin)
|
|
user.set_password(password)
|
|
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
logger.info(f"[USERS] Created user '{username}' by {session.get('username')}", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'is_admin': user.is_admin,
|
|
'message': 'User created successfully'
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[USERS] Error creating user: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@user_bp.route('/<int:user_id>', methods=['PUT'])
|
|
@api_admin_required
|
|
def update_user(user_id):
|
|
"""Update user"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'success': False}), 404
|
|
|
|
data = request.json
|
|
|
|
# Don't allow self-demotion
|
|
if user.id == session['user_id'] and 'is_admin' in data and not data['is_admin']:
|
|
return jsonify({'error': 'Cannot remove admin role from yourself', 'success': False}), 400
|
|
|
|
if 'is_admin' in data:
|
|
user.is_admin = data['is_admin']
|
|
|
|
if 'password' in data and data['password']:
|
|
password = data['password'].strip()
|
|
if len(password) < 6:
|
|
return jsonify({'error': 'Password must be at least 6 characters', 'success': False}), 400
|
|
user.set_password(password)
|
|
|
|
db.session.commit()
|
|
|
|
logger.info(f"[USERS] Updated user '{user.username}' by {session.get('username')}", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'User updated successfully',
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'is_admin': user.is_admin
|
|
})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[USERS] Error updating user: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@user_bp.route('/<int:user_id>', methods=['DELETE'])
|
|
@api_admin_required
|
|
def delete_user(user_id):
|
|
"""Delete user"""
|
|
try:
|
|
# Don't allow self-deletion
|
|
if user_id == session['user_id']:
|
|
return jsonify({'error': 'Cannot delete your own account', 'success': False}), 400
|
|
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'success': False}), 404
|
|
|
|
username = user.username
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
logger.info(f"[USERS] Deleted user '{username}' by {session.get('username')}", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': f'User {username} deleted successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[USERS] Error deleting user: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|
|
|
|
|
|
@user_bp.route('/<int:user_id>/change-password', methods=['POST'])
|
|
def change_password(user_id):
|
|
"""Change own password"""
|
|
try:
|
|
# Check if logged in
|
|
if 'user_id' not in session:
|
|
return jsonify({'error': 'Not authenticated', 'success': False}), 401
|
|
|
|
# Can only change own password, or admin can change others
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({'error': 'User not found', 'success': False}), 404
|
|
|
|
is_admin = session.get('is_admin', False)
|
|
is_own_account = session['user_id'] == user_id
|
|
|
|
if not is_own_account and not is_admin:
|
|
return jsonify({'error': 'Forbidden', 'success': False}), 403
|
|
|
|
data = request.json
|
|
|
|
# If changing own password, require old password
|
|
if is_own_account:
|
|
old_password = data.get('old_password', '').strip()
|
|
if not user.check_password(old_password):
|
|
return jsonify({'error': 'Current password is incorrect', 'success': False}), 400
|
|
|
|
new_password = data.get('new_password', '').strip()
|
|
if len(new_password) < 6:
|
|
return jsonify({'error': 'Password must be at least 6 characters', 'success': False}), 400
|
|
|
|
user.set_password(new_password)
|
|
db.session.commit()
|
|
|
|
logger.info(f"[USERS] Password changed for '{user.username}'", flush=True)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Password changed successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"[USERS] Error changing password: {e}", flush=True)
|
|
return jsonify({'error': str(e), 'success': False}), 500
|