rewrite
This commit is contained in:
@@ -1,10 +1,9 @@
|
|||||||
"""Authentication routes - Login, Logout, Register"""
|
"""Authentication routes - Login, Logout"""
|
||||||
|
|
||||||
from flask import Blueprint, render_template, request, redirect, url_for, session, jsonify, flash
|
from flask import Blueprint, render_template, request, redirect, url_for, session, jsonify
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from database import db
|
from database import db
|
||||||
from database.models import User
|
from database.models import User
|
||||||
from datetime import datetime
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
auth_bp = Blueprint('auth', __name__)
|
auth_bp = Blueprint('auth', __name__)
|
||||||
@@ -12,91 +11,94 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
def login_required(f):
|
def login_required(f):
|
||||||
"""Decorator to require login"""
|
"""Decorator - require user to be logged in"""
|
||||||
@wraps(f)
|
|
||||||
def decorated_function(*args, **kwargs):
|
|
||||||
if 'user_id' not in session:
|
|
||||||
return redirect(url_for('auth.login', next=request.url))
|
|
||||||
return f(*args, **kwargs)
|
|
||||||
return decorated_function
|
|
||||||
|
|
||||||
|
|
||||||
def admin_required(f):
|
|
||||||
"""Decorator to require admin role"""
|
|
||||||
@wraps(f)
|
@wraps(f)
|
||||||
def decorated_function(*args, **kwargs):
|
def decorated_function(*args, **kwargs):
|
||||||
if 'user_id' not in session:
|
if 'user_id' not in session:
|
||||||
return redirect(url_for('auth.login'))
|
return redirect(url_for('auth.login'))
|
||||||
|
|
||||||
user = User.query.get(session['user_id'])
|
|
||||||
if not user or not user.is_admin:
|
|
||||||
flash('Admin access required', 'danger')
|
|
||||||
return redirect(url_for('main.index'))
|
|
||||||
|
|
||||||
return f(*args, **kwargs)
|
return f(*args, **kwargs)
|
||||||
return decorated_function
|
return decorated_function
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/login', methods=['GET', 'POST'])
|
@auth_bp.route('/login', methods=['GET', 'POST'])
|
||||||
def login():
|
def login():
|
||||||
"""Login page"""
|
"""Login page and authentication"""
|
||||||
if request.method == 'POST':
|
if request.method == 'GET':
|
||||||
username = request.form.get('username')
|
if 'user_id' in session:
|
||||||
password = request.form.get('password')
|
|
||||||
|
|
||||||
if not username or not password:
|
|
||||||
flash('Username and password required', 'warning')
|
|
||||||
return redirect(url_for('auth.login'))
|
|
||||||
|
|
||||||
user = User.query.filter_by(username=username).first()
|
|
||||||
|
|
||||||
if user and user.check_password(password):
|
|
||||||
session['user_id'] = user.id
|
|
||||||
session['username'] = user.username
|
|
||||||
session['is_admin'] = user.is_admin
|
|
||||||
session.permanent = True
|
|
||||||
|
|
||||||
user.last_login = datetime.utcnow()
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
logger.info(f"[AUTH] User '{username}' logged in", flush=True)
|
|
||||||
|
|
||||||
next_page = request.args.get('next')
|
|
||||||
if next_page and next_page.startswith('/'):
|
|
||||||
return redirect(next_page)
|
|
||||||
return redirect(url_for('main.index'))
|
return redirect(url_for('main.index'))
|
||||||
|
|
||||||
logger.warning(f"[AUTH] Failed login attempt for '{username}'", flush=True)
|
return render_template('auth/login.html')
|
||||||
flash('Invalid username or password', 'danger')
|
|
||||||
|
|
||||||
return render_template('login.html')
|
# POST - process login
|
||||||
|
username = request.form.get('username', '').strip()
|
||||||
|
password = request.form.get('password', '').strip()
|
||||||
|
|
||||||
|
if not username or not password:
|
||||||
|
return render_template('auth/login.html', error='Username and password required'), 400
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Find user
|
||||||
|
user = User.query.filter_by(username=username).first()
|
||||||
|
|
||||||
|
if not user:
|
||||||
|
logger.warning(f"[AUTH] Login failed - user '{username}' not found", flush=True)
|
||||||
|
return render_template('auth/login.html', error='Invalid credentials'), 401
|
||||||
|
|
||||||
|
# Check password
|
||||||
|
if not user.check_password(password):
|
||||||
|
logger.warning(f"[AUTH] Login failed - wrong password for '{username}'", flush=True)
|
||||||
|
return render_template('auth/login.html', error='Invalid credentials'), 401
|
||||||
|
|
||||||
|
session.clear()
|
||||||
|
session['user_id'] = user.id
|
||||||
|
session['username'] = user.username
|
||||||
|
session['is_admin'] = user.is_admin
|
||||||
|
session.permanent = True
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
user.last_login = datetime.utcnow()
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
logger.info(f"[AUTH] User '{username}' logged in successfully", flush=True)
|
||||||
|
|
||||||
|
return redirect(url_for('main.index'))
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[AUTH] Login error: {e}", flush=True)
|
||||||
|
return render_template('auth/login.html', error='Login error'), 500
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/logout')
|
@auth_bp.route('/logout', methods=['GET', 'POST'])
|
||||||
def logout():
|
def logout():
|
||||||
"""Logout"""
|
"""Logout"""
|
||||||
username = session.get('username', 'Unknown')
|
username = session.get('username', 'unknown')
|
||||||
session.clear()
|
session.clear()
|
||||||
logger.info(f"[AUTH] User '{username}' logged out", flush=True)
|
logger.info(f"[AUTH] User '{username}' logged out", flush=True)
|
||||||
flash('You have been logged out', 'info')
|
|
||||||
return redirect(url_for('auth.login'))
|
return redirect(url_for('auth.login'))
|
||||||
|
|
||||||
|
|
||||||
@auth_bp.route('/api/current-user')
|
@auth_bp.route('/api/current-user', methods=['GET'])
|
||||||
def api_current_user():
|
def current_user():
|
||||||
"""Get current user info (API)"""
|
"""Get current logged in user info"""
|
||||||
if 'user_id' not in session:
|
if 'user_id' not in session:
|
||||||
return jsonify({'error': 'Not authenticated'}), 401
|
return jsonify({'error': 'Not authenticated', 'success': False}), 401
|
||||||
|
|
||||||
user = User.query.get(session['user_id'])
|
try:
|
||||||
if not user:
|
user = User.query.get(session['user_id'])
|
||||||
session.clear()
|
|
||||||
return jsonify({'error': 'User not found'}), 404
|
|
||||||
|
|
||||||
return jsonify({
|
if not user:
|
||||||
'id': user.id,
|
session.clear()
|
||||||
'username': user.username,
|
return jsonify({'error': 'User not found', 'success': False}), 401
|
||||||
'is_admin': user.is_admin,
|
|
||||||
'created_at': user.created_at.isoformat() if user.created_at else None,
|
return jsonify({
|
||||||
'last_login': user.last_login.isoformat() if user.last_login else None
|
'success': True,
|
||||||
})
|
'user': {
|
||||||
|
'id': user.id,
|
||||||
|
'username': user.username,
|
||||||
|
'is_admin': user.is_admin
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[AUTH] Error getting current user: {e}", flush=True)
|
||||||
|
return jsonify({'error': str(e), 'success': False}), 500
|
||||||
|
|||||||
Reference in New Issue
Block a user