poprawki i optymalizacje kodu

This commit is contained in:
Mateusz Gruszczyński
2025-08-17 17:07:43 +02:00
parent 8b9483952e
commit 6a8305b640
9 changed files with 436 additions and 272 deletions

255
app.py
View File

@@ -29,6 +29,7 @@ from flask import (
abort,
session,
jsonify,
g,
)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
@@ -44,7 +45,7 @@ from flask_socketio import SocketIO, emit, join_room
from config import Config
from PIL import Image, ExifTags, ImageFilter, ImageOps
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract, inspect, or_, case, text
from sqlalchemy import func, extract, inspect, or_, case, text, and_
from sqlalchemy.orm import joinedload, load_only, aliased
from collections import defaultdict, deque
from functools import wraps
@@ -219,10 +220,13 @@ class ShoppingList(db.Model):
# Relacje
items = db.relationship("Item", back_populates="shopping_list", lazy="select")
receipts = db.relationship("Receipt", back_populates="shopping_list", lazy="select")
receipts = db.relationship(
"Receipt",
back_populates="shopping_list",
cascade="all, delete-orphan",
lazy="select",
)
expenses = db.relationship("Expense", back_populates="shopping_list", lazy="select")
# Nowa relacja wiele-do-wielu
categories = db.relationship(
"Category",
secondary=shopping_list_category,
@@ -270,7 +274,11 @@ class Expense(db.Model):
class Receipt(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False)
list_id = db.Column(
db.Integer,
db.ForeignKey("shopping_list.id", ondelete="CASCADE"),
nullable=False,
)
filename = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
filesize = db.Column(db.Integer, nullable=True)
@@ -763,55 +771,70 @@ def get_admin_expense_summary():
current_year = now.year
current_month = now.month
def calc_sum(base_query):
total = base_query.scalar() or 0
def calc_summary(expense_query, list_query):
total = expense_query.scalar() or 0
year_total = (
base_query.filter(
expense_query.filter(
extract("year", ShoppingList.created_at) == current_year
).scalar()
or 0
)
month_total = (
base_query.filter(extract("year", ShoppingList.created_at) == current_year)
.filter(extract("month", ShoppingList.created_at) == current_month)
.scalar()
expense_query.filter(
extract("year", ShoppingList.created_at) == current_year,
extract("month", ShoppingList.created_at) == current_month,
).scalar()
or 0
)
return {"total": total, "year": year_total, "month": month_total}
list_count = list_query.count()
avg = round(total / list_count, 2) if list_count else 0
return {
"total": total,
"year": year_total,
"month": month_total,
"count": list_count,
"avg": avg,
}
base = db.session.query(func.sum(Expense.amount)).join(
expense_base = db.session.query(func.sum(Expense.amount)).join(
ShoppingList, ShoppingList.id == Expense.list_id
)
list_base = ShoppingList.query
all_lists = calc_sum(base)
all = calc_summary(expense_base, list_base)
active_lists = calc_sum(
base.filter(
ShoppingList.is_archived == False,
~(
(ShoppingList.is_temporary == True)
& (ShoppingList.expires_at != None)
& (ShoppingList.expires_at <= now)
),
)
active_condition = and_(
ShoppingList.is_archived == False,
~(
(ShoppingList.is_temporary == True)
& (ShoppingList.expires_at != None)
& (ShoppingList.expires_at <= now)
),
)
active = calc_summary(
expense_base.filter(active_condition), list_base.filter(active_condition)
)
archived_lists = calc_sum(base.filter(ShoppingList.is_archived == True))
archived_condition = ShoppingList.is_archived == True
archived = calc_summary(
expense_base.filter(archived_condition), list_base.filter(archived_condition)
)
expired_lists = calc_sum(
base.filter(
ShoppingList.is_archived == False,
(ShoppingList.is_temporary == True),
(ShoppingList.expires_at != None),
(ShoppingList.expires_at <= now),
)
expired_condition = and_(
ShoppingList.is_archived == False,
ShoppingList.is_temporary == True,
ShoppingList.expires_at != None,
ShoppingList.expires_at <= now,
)
expired = calc_summary(
expense_base.filter(expired_condition), list_base.filter(expired_condition)
)
return {
"all": all_lists,
"active": active_lists,
"archived": archived_lists,
"expired": expired_lists,
"all": all,
"active": active,
"archived": archived,
"expired": expired,
}
@@ -1205,7 +1228,7 @@ def require_system_password():
@app.before_request
def start_timer():
request._start_time = time.time()
g.start_time = time.time()
@app.after_request
@@ -1218,9 +1241,10 @@ def log_request(response):
path = request.path
status = response.status_code
length = response.content_length or "-"
start = getattr(request, "_start_time", None)
start = getattr(g, "start_time", None)
duration = round((time.time() - start) * 1000, 2) if start else "-"
agent = request.headers.get("User-Agent", "-")
if status == 304:
app.logger.info(
f'REVALIDATED: {ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"'
@@ -1229,6 +1253,7 @@ def log_request(response):
app.logger.info(
f'{ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"'
)
app.logger.debug(f"Request headers: {dict(request.headers)}")
app.logger.debug(f"Response headers: {dict(response.headers)}")
return response
@@ -2014,14 +2039,31 @@ def all_products():
base_query = base_query.order_by("normalized_name")
results = base_query.offset(offset).limit(limit).all()
total_count = (
db.session.query(func.count()).select_from(base_query.subquery()).scalar()
)
products = [{"name": row.original_name, "count": row.count} for row in results]
used_names = set(row.original_name.strip().lower() for row in results)
extra_suggestions = (
db.session.query(SuggestedProduct.name)
.filter(~func.lower(func.trim(SuggestedProduct.name)).in_(used_names))
.all()
)
return jsonify({"products": products, "total_count": total_count})
suggested_fallbacks = [
{"name": row.name.strip(), "count": 0} for row in extra_suggestions
]
if sort == "alphabetical":
products += suggested_fallbacks
products.sort(key=lambda x: x["name"].lower())
else:
products += suggested_fallbacks
return jsonify(
{"products": products, "total_count": total_count + len(suggested_fallbacks)}
)
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@@ -2251,7 +2293,6 @@ def admin_panel():
now = datetime.now(timezone.utc)
start = end = None
# Liczniki globalne
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
@@ -2270,17 +2311,12 @@ def admin_panel():
)
all_lists = base_query.all()
# tylko listy z danych miesięcy
month_options = get_active_months_query()
all_ids = [l.id for l in all_lists]
stats_map = {}
latest_expenses_map = {}
if all_ids:
# Statystyki produktów
stats = (
db.session.query(
Item.list_id,
@@ -2336,17 +2372,57 @@ def admin_panel():
}
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
not_purchased_count = Item.query.filter_by(not_purchased=True).count()
items_with_notes = Item.query.filter(Item.note.isnot(None), Item.note != "").count()
total_expense = db.session.query(func.sum(Expense.amount)).scalar() or 0
avg_list_expense = round(total_expense / list_count, 2) if list_count else 0
time_to_purchase = (
db.session.query(
func.avg(
func.strftime("%s", Item.purchased_at)
- func.strftime("%s", Item.added_at)
)
)
.filter(
Item.purchased == True,
Item.purchased_at.isnot(None),
Item.added_at.isnot(None),
)
.scalar()
)
avg_hours_to_purchase = round(time_to_purchase / 3600, 2) if time_to_purchase else 0
first_list = db.session.query(func.min(ShoppingList.created_at)).scalar()
last_list = db.session.query(func.max(ShoppingList.created_at)).scalar()
now_dt = datetime.now(timezone.utc)
if first_list and first_list.tzinfo is None:
first_list = first_list.replace(tzinfo=timezone.utc)
if last_list and last_list.tzinfo is None:
last_list = last_list.replace(tzinfo=timezone.utc)
if first_list and last_list:
days_span = max((now_dt - first_list).days, 1)
avg_per_day = list_count / days_span
avg_per_week = round(avg_per_day * 7, 2)
avg_per_month = round(avg_per_day * 30.44, 2)
avg_per_year = round(avg_per_day * 365, 2)
else:
avg_per_week = avg_per_month = avg_per_year = 0
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.limit(7)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
expense_summary = get_admin_expense_summary()
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024)
@@ -2365,12 +2441,21 @@ def admin_panel():
(datetime.now(timezone.utc) - app_start_time).total_seconds() // 60
)
month_options = get_active_months_query()
return render_template(
"admin/admin_panel.html",
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
not_purchased_count=not_purchased_count,
items_with_notes=items_with_notes,
avg_hours_to_purchase=avg_hours_to_purchase,
avg_list_expense=avg_list_expense,
avg_per_week=avg_per_week,
avg_per_month=avg_per_month,
avg_per_year=avg_per_year,
enriched_lists=enriched_lists,
top_products=top_products,
expense_summary=expense_summary,
@@ -2389,21 +2474,6 @@ def admin_panel():
)
@app.route("/admin/delete_list/<int:list_id>")
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f"Usunięto listę: {list_to_delete.title}", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/add_user", methods=["POST"])
@login_required
@admin_required
@@ -2481,20 +2551,41 @@ def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash("Nie można usunąć ostatniego administratora.", "danger")
return redirect(url_for("list_users"))
flash("Nie można usunąć konta administratora.", "warning")
return redirect(url_for("list_users"))
admin_user = User.query.filter_by(is_admin=True).first()
if not admin_user:
flash("Brak konta administratora do przeniesienia zawartości.", "danger")
return redirect(url_for("list_users"))
lists_owned = ShoppingList.query.filter_by(owner_id=user.id).count()
if lists_owned > 0:
ShoppingList.query.filter_by(owner_id=user.id).update(
{"owner_id": admin_user.id}
)
Receipt.query.filter_by(uploaded_by=user.id).update(
{"uploaded_by": admin_user.id}
)
Item.query.filter_by(added_by=user.id).update({"added_by": admin_user.id})
db.session.commit()
flash(
f"Użytkownik '{user.username}' został usunięty, a jego zawartość przeniesiona na administratora.",
"success",
)
else:
flash(
f"Użytkownik '{user.username}' został usunięty. Nie posiadał żadnych list zakupowych.",
"info",
)
db.session.delete(user)
db.session.commit()
flash("Użytkownik usunięty", "success")
return redirect(url_for("list_users"))
from sqlalchemy.orm import joinedload
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
@@ -2656,23 +2747,27 @@ def generate_receipt_hash(receipt_id):
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])
@app.route("/admin/delete_list", methods=["POST"])
@login_required
@admin_required
def delete_selected_lists():
def admin_delete_list():
ids = request.form.getlist("list_ids")
single_id = request.form.get("single_list_id")
if single_id:
ids.append(single_id)
for list_id in ids:
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Receipt.query.filter_by(list_id=lst.id).delete()
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash("Usunięto wybrane listy", "success")
return redirect(url_for("admin_panel"))
flash(f"Usunięto {len(ids)} list(y)", "success")
return redirect(request.referrer or url_for("admin_panel"))
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
@@ -2735,6 +2830,12 @@ def edit_list(list_id):
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
shopping_list.owner_id = new_owner_id_int
Item.query.filter_by(list_id=list_id).update(
{"added_by": new_owner_id_int}
)
Receipt.query.filter_by(list_id=list_id).update(
{"uploaded_by": new_owner_id_int}
)
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))