diff --git a/app.py b/app.py index aebc889..e169907 100644 --- a/app.py +++ b/app.py @@ -21,6 +21,7 @@ from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import func, extract from collections import defaultdict, deque +from functools import wraps app = Flask(__name__) app.config.from_object(Config) @@ -105,7 +106,6 @@ class Expense(db.Model): receipt_filename = db.Column(db.String(255), nullable=True) with app.app_context(): - # Twój kod inicjalizacyjny, np. utworzenie konta admina db.create_all() from werkzeug.security import generate_password_hash admin = User.query.filter_by(is_admin=True).first() @@ -113,13 +113,11 @@ with app.app_context(): password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') password_hash = generate_password_hash(password) if admin: - # Aktualizacja jeśli dane się różnią if admin.username != username or not check_password_hash(admin.password_hash, password): admin.username = username admin.password_hash = password_hash db.session.commit() else: - # Brak admina – utwórz nowe konto admin = User(username=username, password_hash=password_hash, is_admin=True) db.session.add(admin) db.session.commit() @@ -139,12 +137,9 @@ def serve_js(filename): @static_bp.route('/static/css/') def serve_css(filename): response = send_from_directory('static/css', filename) - #response.cache_control.public = True - #response.cache_control.max_age = 3600 response.headers['Cache-Control'] = 'public, max-age=3600' response.headers.pop('Content-Disposition', None) response.headers.pop('Etag', None) - #response.expires = 0 return response @static_bp.route('/static/lib/js/') @@ -169,6 +164,51 @@ app.register_blueprint(static_bp) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS +def get_list_details(list_id): + shopping_list = ShoppingList.query.get_or_404(list_id) + items = Item.query.filter_by(list_id=list_id).all() + receipt_pattern = f"list_{list_id}" + all_files = os.listdir(app.config['UPLOAD_FOLDER']) + receipt_files = [f for f in all_files if receipt_pattern in f] + expenses = Expense.query.filter_by(list_id=list_id).all() + total_expense = sum(e.amount for e in expenses) + return shopping_list, items, receipt_files, expenses, total_expense + +def generate_share_token(length=8): + """Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4).""" + return secrets.token_hex(length // 2) + +def check_list_public(shopping_list): + if not shopping_list.is_public: + flash('Ta lista nie jest publicznie dostępna', 'danger') + return False + return True + +def enrich_list_data(l): + items = Item.query.filter_by(list_id=l.id).all() + l.total_count = len(items) + l.purchased_count = len([i for i in items if i.purchased]) + expenses = Expense.query.filter_by(list_id=l.id).all() + l.total_expense = sum(e.amount for e in expenses) + return l + +def save_resized_image(file, path: str, max_size=(2000, 2000)): + img = Image.open(file) + img.thumbnail(max_size) + img.save(path) + +def redirect_with_flash(message: str, category: str = 'info', endpoint: str = 'main_page'): + flash(message, category) + return redirect(url_for(endpoint)) + +def admin_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if not current_user.is_authenticated or not current_user.is_admin: + return redirect_with_flash('Brak uprawnień do tej sekcji.', 'danger') + return f(*args, **kwargs) + return decorated_function + def get_progress(list_id): items = Item.query.filter_by(list_id=list_id).all() total_count = len(items) @@ -207,7 +247,6 @@ def reset_failed_attempts(ip): def attempts_remaining(ip): attempts = failed_login_attempts[ip] return max(0, MAX_ATTEMPTS - len(attempts)) - #################################################### @login_manager.user_loader @@ -234,10 +273,10 @@ def require_system_password(): if requested_file in PROTECTED_JS_FILES: return redirect(url_for('system_auth', next=request.url)) else: - return # pozwól na inne pliki statyczne + return if request.endpoint.startswith('static_bp.'): - return # np. CSS, favicon, inne — pozwól + return if request.path == '/': return redirect(url_for('system_auth')) @@ -247,7 +286,6 @@ def require_system_password(): fixed_url = urlunparse(parsed._replace(netloc=request.host)) return redirect(url_for('system_auth', next=fixed_url)) - @app.template_filter('filemtime') def file_mtime_filter(path): try: @@ -309,15 +347,12 @@ def main_page(): now = datetime.utcnow() if current_user.is_authenticated: - # Twoje listy aktywne user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ).order_by(ShoppingList.created_at.desc()).all() - # Zarchiwizowane listy archived_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True).order_by(ShoppingList.created_at.desc()).all() - # Publiczne listy innych użytkowników public_lists = ShoppingList.query.filter( ShoppingList.is_public == True, ShoppingList.owner_id != current_user.id, @@ -333,13 +368,8 @@ def main_page(): ShoppingList.is_archived == False ).order_by(ShoppingList.created_at.desc()).all() - # Dodajemy dane o przedmiotach i wydatkach for l in user_lists + public_lists + archived_lists: - items = Item.query.filter_by(list_id=l.id).all() - l.total_count = len(items) - l.purchased_count = len([i for i in items if i.purchased]) - expenses = Expense.query.filter_by(list_id=l.id).all() - l.total_expense = sum(e.amount for e in expenses) + enrich_list_data(l) return render_template("main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists) @@ -376,10 +406,8 @@ def system_auth(): def toggle_archive_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: - flash('Nie masz uprawnień do tej listy', 'danger') - return redirect(url_for('main_page')) + return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger') - # Pobieramy parametr archive z query string archive = request.args.get('archive', 'true').lower() == 'true' if archive: @@ -397,8 +425,7 @@ def toggle_archive_list(list_id): def edit_my_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: - flash('Nie masz uprawnień do tej listy', 'danger') - return redirect(url_for('main_page')) + return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger') if request.method == 'POST': new_title = request.form.get('title') @@ -459,8 +486,7 @@ def logout(): def create_list(): title = request.form.get('title') is_temporary = 'temporary' in request.form - #token = secrets.token_hex(16) - token = secrets.token_hex(4) + token = generate_share_token(8) expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at) db.session.add(new_list) @@ -471,16 +497,10 @@ def create_list(): @app.route('/list/') @login_required def view_list(list_id): - shopping_list = ShoppingList.query.get_or_404(list_id) - items = Item.query.filter_by(list_id=list_id).all() + shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id) total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 - expenses = Expense.query.filter_by(list_id=list_id).all() - total_expense = sum(e.amount for e in expenses) - receipt_pattern = f"list_{list_id}" - all_files = os.listdir(app.config['UPLOAD_FOLDER']) - receipt_files = [f for f in all_files if receipt_pattern in f] return render_template( 'list.html', @@ -495,40 +515,18 @@ def view_list(list_id): ) @app.route('/share/') -def share_list(token): - shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() - - if not shopping_list.is_public: - flash('Ta lista nie jest publicznie dostępna', 'danger') - return redirect(url_for('main_page')) - - items = Item.query.filter_by(list_id=shopping_list.id).all() - - receipt_pattern = f"list_{shopping_list.id}" - all_files = os.listdir(app.config['UPLOAD_FOLDER']) - receipt_files = [f for f in all_files if receipt_pattern in f] - - expenses = Expense.query.filter_by(list_id=shopping_list.id).all() - total_expense = sum(e.amount for e in expenses) - - return render_template( - 'list_share.html', - list=shopping_list, - items=items, - receipt_files=receipt_files, - expenses=expenses, - total_expense=total_expense - ) - @app.route('/guest-list/') -def guest_list(list_id): - shopping_list = ShoppingList.query.get_or_404(list_id) - items = Item.query.filter_by(list_id=list_id).all() - receipt_pattern = f"list_{list_id}" - all_files = os.listdir(app.config['UPLOAD_FOLDER']) - receipt_files = [f for f in all_files if receipt_pattern in f] - expenses = Expense.query.filter_by(list_id=list_id).all() - total_expense = sum(e.amount for e in expenses) +def shared_list(token=None, list_id=None): + if token: + shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() + + if not check_list_public(shopping_list): + return redirect(url_for('main_page')) + + list_id = shopping_list.id + + shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id) + return render_template( 'list_share.html', list=shopping_list, @@ -542,7 +540,7 @@ def guest_list(list_id): @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) - token = secrets.token_hex(8) + token = generate_share_token(8) new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token) db.session.add(new_list) db.session.commit() @@ -578,9 +576,7 @@ def upload_receipt(list_id): filename = secure_filename(file.filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}") - img = Image.open(file) - img.thumbnail((2000, 2000)) - img.save(file_path) + save_resized_image(file, file_path) flash('Wgrano paragon', 'success') return redirect(request.referrer) @@ -601,9 +597,8 @@ def uploaded_file(filename): @app.route('/admin') @login_required +@admin_required def admin_panel(): - if not current_user.is_admin: - return redirect(url_for('main_page')) now = datetime.utcnow() user_count = User.query.count() @@ -614,13 +609,12 @@ def admin_panel(): enriched_lists = [] for l in all_lists: + enrich_list_data(l) items = Item.query.filter_by(list_id=l.id).all() - total_count = len(items) - purchased_count = len([i for i in items if i.purchased]) + total_count = l.total_count + purchased_count = l.purchased_count percent = (purchased_count / total_count * 100) if total_count > 0 else 0 comments_count = len([i for i in items if i.note and i.note.strip() != '']) - expenses = Expense.query.filter_by(list_id=l.id).all() - total_expense = sum(e.amount for e in expenses) receipt_pattern = f"list_{l.id}" receipt_files = [f for f in all_files if receipt_pattern in f] @@ -631,7 +625,7 @@ def admin_panel(): 'percent': round(percent), 'comments_count': comments_count, 'receipts_count': len(receipt_files), - 'total_expense': total_expense + 'total_expense': l.total_expense }) top_products = ( @@ -662,7 +656,7 @@ def admin_panel(): ) process = psutil.Process(os.getpid()) - app_mem = process.memory_info().rss // (1024 * 1024) # w MB + app_mem = process.memory_info().rss // (1024 * 1024) # MB return render_template( 'admin/admin_panel.html', @@ -681,11 +675,12 @@ def admin_panel(): app_memory=f"{app_mem} MB", ) + @app.route('/admin/delete_list/') @login_required +@admin_required def delete_list(list_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) + delete_receipts_for_list(list_id) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() @@ -697,10 +692,8 @@ def delete_list(list_id): @app.route('/admin/add_user', methods=['POST']) @login_required +@admin_required def add_user(): - if not current_user.is_admin: - return redirect(url_for('main_page')) - username = request.form['username'] password = request.form['password'] @@ -721,9 +714,8 @@ def add_user(): @app.route('/admin/users') @login_required +@admin_required def list_users(): - if not current_user.is_admin: - return redirect(url_for('main_page')) users = User.query.all() user_count = User.query.count() list_count = ShoppingList.query.count() @@ -733,10 +725,8 @@ def list_users(): @app.route('/admin/change_password/', methods=['POST']) @login_required +@admin_required def reset_password(user_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) - user = User.query.get_or_404(user_id) new_password = request.form['password'] @@ -751,13 +741,10 @@ def reset_password(user_id): @app.route('/admin/delete_user/') @login_required +@admin_required def delete_user(user_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) - user = User.query.get_or_404(user_id) - # Zabezpieczenie: sprawdź ilu adminów if user.is_admin: admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1: @@ -769,12 +756,10 @@ def delete_user(user_id): flash('Użytkownik usunięty', 'success') return redirect(url_for('list_users')) - @app.route('/admin/receipts') @login_required +@admin_required def admin_receipts(): - if not current_user.is_admin: - return redirect(url_for('main_page')) all_files = os.listdir(app.config['UPLOAD_FOLDER']) image_files = [f for f in all_files if allowed_file(f)] return render_template( @@ -785,9 +770,8 @@ def admin_receipts(): @app.route('/admin/delete_receipt/') @login_required +@admin_required def delete_receipt(filename): - if not current_user.is_admin: - return redirect(url_for('main_page')) file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): os.remove(file_path) @@ -798,9 +782,8 @@ def delete_receipt(filename): @app.route('/admin/delete_selected_lists', methods=['POST']) @login_required -def delete_selected_lists(): - if not current_user.is_admin: - return redirect(url_for('main_page')) +@admin_required +def delete_selected_lists(): ids = request.form.getlist('list_ids') for list_id in ids: lst = ShoppingList.query.get(int(list_id)) @@ -815,9 +798,8 @@ def delete_selected_lists(): @app.route('/admin/archive_list/') @login_required -def archive_list(list_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) +@admin_required +def archive_list(list_id): l = ShoppingList.query.get_or_404(list_id) l.is_archived = True db.session.commit() @@ -826,9 +808,8 @@ def archive_list(list_id): @app.route('/admin/delete_all_items') @login_required -def delete_all_items(): - if not current_user.is_admin: - return redirect(url_for('main_page')) +@admin_required +def delete_all_items(): Item.query.delete() db.session.commit() flash('Usunięto wszystkie produkty', 'success') @@ -836,10 +817,8 @@ def delete_all_items(): @app.route('/admin/edit_list/', methods=['GET', 'POST']) @login_required +@admin_required def edit_list(list_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) - l = ShoppingList.query.get_or_404(list_id) expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) @@ -895,10 +874,8 @@ def edit_list(list_id): @app.route('/admin/products') @login_required +@admin_required def list_products(): - if not current_user.is_admin: - return redirect(url_for('main_page')) - items = Item.query.order_by(Item.id.desc()).all() users = User.query.all() users_dict = {user.id: user.username for user in users} @@ -947,7 +924,7 @@ def delete_suggestion_ajax(suggestion_id): @login_required def admin_expenses_data(): if not current_user.is_admin: - return jsonify({'error': 'Unauthorized'}), 403 + return jsonify({'error': 'Brak uprawnień'}), 403 range_type = request.args.get('range', 'monthly') start_date_str = request.args.get('start_date') @@ -1047,9 +1024,8 @@ def admin_expenses_data(): @app.route('/admin/promote_user/') @login_required +@admin_required def promote_user(user_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) user = User.query.get_or_404(user_id) user.is_admin = True db.session.commit() @@ -1058,17 +1034,14 @@ def promote_user(user_id): @app.route('/admin/demote_user/') @login_required +@admin_required def demote_user(user_id): - if not current_user.is_admin: - return redirect(url_for('main_page')) user = User.query.get_or_404(user_id) - # Nie pozwalamy zdegradować siebie if user.id == current_user.id: flash('Nie możesz zdegradować samego siebie!', 'danger') return redirect(url_for('list_users')) - # Zabezpieczenie: sprawdź ilu jest adminów admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1 and user.is_admin: flash('Nie można zdegradować. Musi pozostać co najmniej jeden administrator.', 'danger') @@ -1079,17 +1052,6 @@ def demote_user(user_id): flash(f'Użytkownik {user.username} został zdegradowany.', 'success') return redirect(url_for('list_users')) - - -# chyba do usuniecia przeniesione na eventy socket.io -@app.route('/update-note/', methods=['POST']) -def update_note(item_id): - item = Item.query.get_or_404(item_id) - note = request.form.get('note') - item.note = note - db.session.commit() - return {'success': True} - # ========================================================================================= # SOCKET.IO # ========================================================================================= @@ -1147,12 +1109,13 @@ def handle_join(data): emit('joined_confirmation', {'room': room, 'list_title': list_title}) @socketio.on('disconnect') -def handle_disconnect(): +def handle_disconnect(sid): global active_users + username = current_user.username if current_user.is_authenticated else "Gość" for room, users in active_users.items(): - if current_user.username in users: - users.remove(current_user.username) - emit('user_left', {'username': current_user.username}, to=room) + if username in users: + users.remove(username) + emit('user_left', {'username': username}, to=room) emit('user_list', {'users': list(users)}, to=room) @socketio.on('add_item') diff --git a/templates/main.html b/templates/main.html index 8ff9762..3fe4220 100644 --- a/templates/main.html +++ b/templates/main.html @@ -61,7 +61,7 @@ {% if l.is_public %} 🙈 Ukryj {% else %} - 👁️ Udostępnij + 👁️ Odkryj {% endif %}