refaktoryzacja kodu #1

This commit is contained in:
Mateusz Gruszczyński
2025-07-08 09:55:42 +02:00
parent f9b655defd
commit 0700affb4e
2 changed files with 98 additions and 135 deletions

231
app.py
View File

@@ -21,6 +21,7 @@ from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
from collections import defaultdict, deque
from functools import wraps
app = Flask(__name__)
app.config.from_object(Config)
@@ -105,7 +106,6 @@ class Expense(db.Model):
receipt_filename = db.Column(db.String(255), nullable=True)
with app.app_context():
# Twój kod inicjalizacyjny, np. utworzenie konta admina
db.create_all()
from werkzeug.security import generate_password_hash
admin = User.query.filter_by(is_admin=True).first()
@@ -113,13 +113,11 @@ with app.app_context():
password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
password_hash = generate_password_hash(password)
if admin:
# Aktualizacja jeśli dane się różnią
if admin.username != username or not check_password_hash(admin.password_hash, password):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
# Brak admina utwórz nowe konto
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@@ -139,12 +137,9 @@ def serve_js(filename):
@static_bp.route('/static/css/<path:filename>')
def serve_css(filename):
response = send_from_directory('static/css', filename)
#response.cache_control.public = True
#response.cache_control.max_age = 3600
response.headers['Cache-Control'] = 'public, max-age=3600'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
#response.expires = 0
return response
@static_bp.route('/static/lib/js/<path:filename>')
@@ -169,6 +164,51 @@ app.register_blueprint(static_bp)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
"""Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4)."""
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash('Ta lista nie jest publicznie dostępna', 'danger')
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path: str, max_size=(2000, 2000)):
img = Image.open(file)
img.thumbnail(max_size)
img.save(path)
def redirect_with_flash(message: str, category: str = 'info', endpoint: str = 'main_page'):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash('Brak uprawnień do tej sekcji.', 'danger')
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
@@ -207,7 +247,6 @@ def reset_failed_attempts(ip):
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
@@ -234,10 +273,10 @@ def require_system_password():
if requested_file in PROTECTED_JS_FILES:
return redirect(url_for('system_auth', next=request.url))
else:
return # pozwól na inne pliki statyczne
return
if request.endpoint.startswith('static_bp.'):
return # np. CSS, favicon, inne — pozwól
return
if request.path == '/':
return redirect(url_for('system_auth'))
@@ -247,7 +286,6 @@ def require_system_password():
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for('system_auth', next=fixed_url))
@app.template_filter('filemtime')
def file_mtime_filter(path):
try:
@@ -309,15 +347,12 @@ def main_page():
now = datetime.utcnow()
if current_user.is_authenticated:
# Twoje listy aktywne
user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
).order_by(ShoppingList.created_at.desc()).all()
# Zarchiwizowane listy
archived_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True).order_by(ShoppingList.created_at.desc()).all()
# Publiczne listy innych użytkowników
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
@@ -333,13 +368,8 @@ def main_page():
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
# Dodajemy dane o przedmiotach i wydatkach
for l in user_lists + public_lists + archived_lists:
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
enrich_list_data(l)
return render_template("main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists)
@@ -376,10 +406,8 @@ def system_auth():
def toggle_archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('main_page'))
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
# Pobieramy parametr archive z query string
archive = request.args.get('archive', 'true').lower() == 'true'
if archive:
@@ -397,8 +425,7 @@ def toggle_archive_list(list_id):
def edit_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('main_page'))
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
if request.method == 'POST':
new_title = request.form.get('title')
@@ -459,8 +486,7 @@ def logout():
def create_list():
title = request.form.get('title')
is_temporary = 'temporary' in request.form
#token = secrets.token_hex(16)
token = secrets.token_hex(4)
token = generate_share_token(8)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
db.session.add(new_list)
@@ -471,16 +497,10 @@ def create_list():
@app.route('/list/<int:list_id>')
@login_required
def view_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
return render_template(
'list.html',
@@ -495,40 +515,18 @@ def view_list(list_id):
)
@app.route('/share/<token>')
def share_list(token):
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not shopping_list.is_public:
flash('Ta lista nie jest publicznie dostępna', 'danger')
return redirect(url_for('main_page'))
items = Item.query.filter_by(list_id=shopping_list.id).all()
receipt_pattern = f"list_{shopping_list.id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=shopping_list.id).all()
total_expense = sum(e.amount for e in expenses)
return render_template(
'list_share.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense
)
@app.route('/guest-list/<int:list_id>')
def guest_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for('main_page'))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
return render_template(
'list_share.html',
list=shopping_list,
@@ -542,7 +540,7 @@ def guest_list(list_id):
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = secrets.token_hex(8)
token = generate_share_token(8)
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
db.session.add(new_list)
db.session.commit()
@@ -578,9 +576,7 @@ def upload_receipt(list_id):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
img = Image.open(file)
img.thumbnail((2000, 2000))
img.save(file_path)
save_resized_image(file, file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
@@ -601,9 +597,8 @@ def uploaded_file(filename):
@app.route('/admin')
@login_required
@admin_required
def admin_panel():
if not current_user.is_admin:
return redirect(url_for('main_page'))
now = datetime.utcnow()
user_count = User.query.count()
@@ -614,13 +609,12 @@ def admin_panel():
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
expenses = Expense.query.filter_by(list_id=l.id).all()
total_expense = sum(e.amount for e in expenses)
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
@@ -631,7 +625,7 @@ def admin_panel():
'percent': round(percent),
'comments_count': comments_count,
'receipts_count': len(receipt_files),
'total_expense': total_expense
'total_expense': l.total_expense
})
top_products = (
@@ -662,7 +656,7 @@ def admin_panel():
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # w MB
app_mem = process.memory_info().rss // (1024 * 1024) # MB
return render_template(
'admin/admin_panel.html',
@@ -681,11 +675,12 @@ def admin_panel():
app_memory=f"{app_mem} MB",
)
@app.route('/admin/delete_list/<int:list_id>')
@login_required
@admin_required
def delete_list(list_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
@@ -697,10 +692,8 @@ def delete_list(list_id):
@app.route('/admin/add_user', methods=['POST'])
@login_required
@admin_required
def add_user():
if not current_user.is_admin:
return redirect(url_for('main_page'))
username = request.form['username']
password = request.form['password']
@@ -721,9 +714,8 @@ def add_user():
@app.route('/admin/users')
@login_required
@admin_required
def list_users():
if not current_user.is_admin:
return redirect(url_for('main_page'))
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
@@ -733,10 +725,8 @@ def list_users():
@app.route('/admin/change_password/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def reset_password(user_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
user = User.query.get_or_404(user_id)
new_password = request.form['password']
@@ -751,13 +741,10 @@ def reset_password(user_id):
@app.route('/admin/delete_user/<int:user_id>')
@login_required
@admin_required
def delete_user(user_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
user = User.query.get_or_404(user_id)
# Zabezpieczenie: sprawdź ilu adminów
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
@@ -769,12 +756,10 @@ def delete_user(user_id):
flash('Użytkownik usunięty', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/receipts')
@login_required
@admin_required
def admin_receipts():
if not current_user.is_admin:
return redirect(url_for('main_page'))
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
image_files = [f for f in all_files if allowed_file(f)]
return render_template(
@@ -785,9 +770,8 @@ def admin_receipts():
@app.route('/admin/delete_receipt/<filename>')
@login_required
@admin_required
def delete_receipt(filename):
if not current_user.is_admin:
return redirect(url_for('main_page'))
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
@@ -798,9 +782,8 @@ def delete_receipt(filename):
@app.route('/admin/delete_selected_lists', methods=['POST'])
@login_required
def delete_selected_lists():
if not current_user.is_admin:
return redirect(url_for('main_page'))
@admin_required
def delete_selected_lists():
ids = request.form.getlist('list_ids')
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
@@ -815,9 +798,8 @@ def delete_selected_lists():
@app.route('/admin/archive_list/<int:list_id>')
@login_required
def archive_list(list_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
@admin_required
def archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
l.is_archived = True
db.session.commit()
@@ -826,9 +808,8 @@ def archive_list(list_id):
@app.route('/admin/delete_all_items')
@login_required
def delete_all_items():
if not current_user.is_admin:
return redirect(url_for('main_page'))
@admin_required
def delete_all_items():
Item.query.delete()
db.session.commit()
flash('Usunięto wszystkie produkty', 'success')
@@ -836,10 +817,8 @@ def delete_all_items():
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_list(list_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
l = ShoppingList.query.get_or_404(list_id)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
@@ -895,10 +874,8 @@ def edit_list(list_id):
@app.route('/admin/products')
@login_required
@admin_required
def list_products():
if not current_user.is_admin:
return redirect(url_for('main_page'))
items = Item.query.order_by(Item.id.desc()).all()
users = User.query.all()
users_dict = {user.id: user.username for user in users}
@@ -947,7 +924,7 @@ def delete_suggestion_ajax(suggestion_id):
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({'error': 'Unauthorized'}), 403
return jsonify({'error': 'Brak uprawnień'}), 403
range_type = request.args.get('range', 'monthly')
start_date_str = request.args.get('start_date')
@@ -1047,9 +1024,8 @@ def admin_expenses_data():
@app.route('/admin/promote_user/<int:user_id>')
@login_required
@admin_required
def promote_user(user_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
@@ -1058,17 +1034,14 @@ def promote_user(user_id):
@app.route('/admin/demote_user/<int:user_id>')
@login_required
@admin_required
def demote_user(user_id):
if not current_user.is_admin:
return redirect(url_for('main_page'))
user = User.query.get_or_404(user_id)
# Nie pozwalamy zdegradować siebie
if user.id == current_user.id:
flash('Nie możesz zdegradować samego siebie!', 'danger')
return redirect(url_for('list_users'))
# Zabezpieczenie: sprawdź ilu jest adminów
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash('Nie można zdegradować. Musi pozostać co najmniej jeden administrator.', 'danger')
@@ -1079,17 +1052,6 @@ def demote_user(user_id):
flash(f'Użytkownik {user.username} został zdegradowany.', 'success')
return redirect(url_for('list_users'))
# chyba do usuniecia przeniesione na eventy socket.io
@app.route('/update-note/<int:item_id>', methods=['POST'])
def update_note(item_id):
item = Item.query.get_or_404(item_id)
note = request.form.get('note')
item.note = note
db.session.commit()
return {'success': True}
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@@ -1147,12 +1109,13 @@ def handle_join(data):
emit('joined_confirmation', {'room': room, 'list_title': list_title})
@socketio.on('disconnect')
def handle_disconnect():
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if current_user.username in users:
users.remove(current_user.username)
emit('user_left', {'username': current_user.username}, to=room)
if username in users:
users.remove(username)
emit('user_left', {'username': username}, to=room)
emit('user_list', {'users': list(users)}, to=room)
@socketio.on('add_item')

View File

@@ -61,7 +61,7 @@
{% if l.is_public %}
<a href="/toggle_visibility/{{ l.id }}" class="btn btn-sm btn-outline-light me-1 mb-1">🙈 Ukryj</a>
{% else %}
<a href="/toggle_visibility/{{ l.id }}" class="btn btn-sm btn-outline-light me-1 mb-1">👁️ Udostępnij</a>
<a href="/toggle_visibility/{{ l.id }}" class="btn btn-sm btn-outline-light me-1 mb-1">👁️ Odkryj</a>
{% endif %}
</div>
</div>