From 0e4375b561d53932b50d417486b8243d83ebca61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Sat, 13 Sep 2025 22:18:07 +0200 Subject: [PATCH] commit1 permissions --- app.py | 363 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 179 insertions(+), 184 deletions(-) diff --git a/app.py b/app.py index 7b78a9f..dba782b 100644 --- a/app.py +++ b/app.py @@ -741,108 +741,6 @@ def handle_crop_receipt(receipt_id, file): return {"success": False, "error": str(e)} -def get_total_expenses_grouped_by_list_created_at( - user_only=False, - admin=False, - show_all=False, - range_type="monthly", - start_date=None, - end_date=None, - user_id=None, - category_id=None, -): - # Widoczność - now = datetime.now(timezone.utc) - lists_q = ShoppingList.query.filter( - ShoppingList.is_archived == False, - ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), - ) - - if admin: - pass - elif user_only: - lists_q = lists_q.filter(ShoppingList.owner_id == user_id) - elif show_all: - perm_subq = user_permission_subq(user_id) - lists_q = lists_q.filter( - or_( - ShoppingList.owner_id == user_id, - ShoppingList.is_public == True, - ShoppingList.id.in_(perm_subq), - ) - ) - else: - lists_q = lists_q.filter(ShoppingList.owner_id == user_id) - - # Filtr kategorii (bez ucinania wyników) - if category_id: - if str(category_id) == "none": - lists_q = lists_q.filter(~ShoppingList.categories.any()) - else: - try: - cid = int(category_id) - lists_q = lists_q.join( - shopping_list_category, - shopping_list_category.c.shopping_list_id == ShoppingList.id, - ).filter(shopping_list_category.c.category_id == cid) - except (TypeError, ValueError): - pass - - # Zakres po CREATED_AT listy (tak jak wcześniej) - if start_date and end_date: - try: - dt_start = datetime.strptime(start_date, "%Y-%m-%d") - dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) - lists_q = lists_q.filter( - ShoppingList.created_at >= dt_start, - ShoppingList.created_at < dt_end, - ) - except Exception: - return {"error": "Błędne daty"} - - lists = lists_q.options(joinedload(ShoppingList.categories)).all() - if not lists: - return {"labels": [], "expenses": []} - - # Suma wydatków per lista (BEZ joinów – nic nie utnie) - list_ids = [l.id for l in lists] - totals = ( - db.session.query( - Expense.list_id, - func.coalesce(func.sum(Expense.amount), 0).label("total_amount"), - ) - .filter(Expense.list_id.in_(list_ids)) - .group_by(Expense.list_id) - .all() - ) - expense_map = {lid: float(total or 0) for lid, total in totals} - - # Grupowanie po bucketach czasu z created_at listy - def bucket_from_dt(ts: datetime) -> str: - if range_type == "daily": - return ts.strftime("%Y-%m-%d") - elif range_type == "weekly": - # ISO-tydzień jako YYYY-Www - return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}" - elif range_type == "quarterly": - return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}" - elif range_type == "halfyearly": - return f"{ts.year}-H{1 if ts.month <= 6 else 2}" - elif range_type == "yearly": - return str(ts.year) - else: - # monthly (domyślnie) - return ts.strftime("%Y-%m") - - grouped = defaultdict(float) - for sl in lists: - grouped[bucket_from_dt(sl.created_at)] += expense_map.get(sl.id, 0.0) - - labels = sorted(grouped.keys()) - expenses = [round(grouped[l], 2) for l in labels] - return {"labels": labels, "expenses": expenses} - - def recalculate_filesizes(receipt_id: int = None): updated = 0 not_found = 0 @@ -953,11 +851,9 @@ def category_to_color(name): r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation) return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}" - def get_total_expenses_grouped_by_category( show_all, range_type, start_date, end_date, user_id, category_id=None ): - # Widoczność now = datetime.now(timezone.utc) lists_q = ShoppingList.query.filter( ShoppingList.is_archived == False, @@ -976,7 +872,6 @@ def get_total_expenses_grouped_by_category( else: lists_q = lists_q.filter(ShoppingList.owner_id == user_id) - # Filtr kategorii na LISTACH (ważne: bez join =none nie ucina) if category_id: if str(category_id) == "none": lists_q = lists_q.filter(~ShoppingList.categories.any()) @@ -990,15 +885,13 @@ def get_total_expenses_grouped_by_category( except (TypeError, ValueError): pass - # Zakres po CREATED_AT listy (jak pierwotnie) + # ZAKRES: zawsze po created_at LISTY if start_date and end_date: try: dt_start = datetime.strptime(start_date, "%Y-%m-%d") - dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) - lists_q = lists_q.filter( - ShoppingList.created_at >= dt_start, - ShoppingList.created_at < dt_end, - ) + dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) + lists_q = lists_q.filter(ShoppingList.created_at >= dt_start, + ShoppingList.created_at < dt_end) except Exception: return {"error": "Błędne daty"} @@ -1006,7 +899,7 @@ def get_total_expenses_grouped_by_category( if not lists: return {"labels": [], "datasets": []} - # Suma wydatków per lista + # SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense) list_ids = [l.id for l in lists] totals = ( db.session.query( @@ -1019,7 +912,7 @@ def get_total_expenses_grouped_by_category( ) expense_map = {lid: float(total or 0) for lid, total in totals} - # bucket z CREATED_AT listy + # bucket wg created_at LISTY def bucket_from_dt(ts: datetime) -> str: if range_type == "daily": return ts.strftime("%Y-%m-%d") @@ -1034,7 +927,6 @@ def get_total_expenses_grouped_by_category( else: return ts.strftime("%Y-%m") - # data_map: bucket -> {category_name -> suma} data_map = defaultdict(lambda: defaultdict(float)) all_labels = set() @@ -1042,8 +934,8 @@ def get_total_expenses_grouped_by_category( key = bucket_from_dt(l.created_at) all_labels.add(key) total_expense = expense_map.get(l.id, 0.0) + if str(category_id) == "none": - # tu l ma brak kategorii (z filtra wyżej) data_map[key]["Bez kategorii"] += total_expense continue @@ -1056,10 +948,7 @@ def get_total_expenses_grouped_by_category( data_map[key][c.name] += total_expense labels = sorted(all_labels) - # kategorie, które faktycznie mają wydatki - categories_with_expenses = sorted( - {cat for bucket in data_map.values() for cat, val in bucket.items() if val > 0} - ) + cats = sorted({cat for b in data_map.values() for cat,v in b.items() if v > 0}) datasets = [ { @@ -1067,11 +956,106 @@ def get_total_expenses_grouped_by_category( "data": [round(data_map[label].get(cat, 0.0), 2) for label in labels], "backgroundColor": category_to_color(cat), } - for cat in categories_with_expenses + for cat in cats ] - return {"labels": labels, "datasets": datasets} +def get_total_expenses_grouped_by_list_created_at( + user_only=False, + admin=False, + show_all=False, + range_type="monthly", + start_date=None, + end_date=None, + user_id=None, + category_id=None, +): + now = datetime.now(timezone.utc) + lists_q = ShoppingList.query.filter( + ShoppingList.is_archived == False, + ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), + ) + + if admin: + pass + elif user_only: + lists_q = lists_q.filter(ShoppingList.owner_id == user_id) + elif show_all: + perm_subq = user_permission_subq(user_id) + lists_q = lists_q.filter( + or_( + ShoppingList.owner_id == user_id, + ShoppingList.is_public == True, + ShoppingList.id.in_(perm_subq), + ) + ) + else: + lists_q = lists_q.filter(ShoppingList.owner_id == user_id) + + # kategorie (bez ucinania „none”) + if category_id: + if str(category_id) == "none": + lists_q = lists_q.filter(~ShoppingList.categories.any()) + else: + try: + cid = int(category_id) + lists_q = lists_q.join( + shopping_list_category, + shopping_list_category.c.shopping_list_id == ShoppingList.id, + ).filter(shopping_list_category.c.category_id == cid) + except (TypeError, ValueError): + pass + + # ZAKRES: zawsze po created_at LISTY + if start_date and end_date: + try: + dt_start = datetime.strptime(start_date, "%Y-%m-%d") + dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) + lists_q = lists_q.filter(ShoppingList.created_at >= dt_start, + ShoppingList.created_at < dt_end) + except Exception: + return {"error": "Błędne daty"} + + lists = lists_q.options(joinedload(ShoppingList.categories)).all() + if not lists: + return {"labels": [], "expenses": []} + + # SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense) + list_ids = [l.id for l in lists] + totals = ( + db.session.query( + Expense.list_id, + func.coalesce(func.sum(Expense.amount), 0).label("total_amount"), + ) + .filter(Expense.list_id.in_(list_ids)) + .group_by(Expense.list_id) + .all() + ) + expense_map = {lid: float(total or 0) for lid, total in totals} + + # bucket wg created_at LISTY + def bucket_from_dt(ts: datetime) -> str: + if range_type == "daily": + return ts.strftime("%Y-%m-%d") + elif range_type == "weekly": + return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}" + elif range_type == "quarterly": + return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}" + elif range_type == "halfyearly": + return f"{ts.year}-H{1 if ts.month <= 6 else 2}" + elif range_type == "yearly": + return str(ts.year) + else: + return ts.strftime("%Y-%m") + + grouped = defaultdict(float) + for sl in lists: + grouped[bucket_from_dt(sl.created_at)] += expense_map.get(sl.id, 0.0) + + labels = sorted(grouped.keys()) + expenses = [round(grouped[l], 2) for l in labels] + return {"labels": labels, "expenses": expenses} + def save_pdf_as_webp(file, path): try: @@ -1976,86 +1960,97 @@ def view_list(list_id): @app.route("/expenses") @login_required def expenses(): + from sqlalchemy.orm import joinedload + from types import SimpleNamespace + start_date_str = request.args.get("start_date") end_date_str = request.args.get("end_date") category_id = request.args.get("category_id", type=str) # może być "none" show_all = request.args.get("show_all", "true").lower() == "true" now = datetime.now(timezone.utc) + + # --- 1) Widoczne listy (właściciel/publiczne/udzielone), aktywne (niearch./niewygasłe) --- visible_clause = visible_lists_clause_for_expenses( user_id=current_user.id, include_shared=show_all, now_dt=now ) - # --- lista kategorii dostępnych w obrębie widocznych list, które mają wydatki --- - categories = ( - Category.query - .join(shopping_list_category, shopping_list_category.c.category_id == Category.id) - .join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id) - .join(Expense, Expense.list_id == ShoppingList.id) - .filter(*visible_clause) - .distinct() - .order_by(Category.name.asc()) - .all() - ) - from types import SimpleNamespace - categories.append(SimpleNamespace(id="none", name="Bez kategorii")) + lists_q = ShoppingList.query.filter(*visible_clause) - # --- główne zapytanie o wydatki --- - expenses_query = ( - Expense.query - .options( - joinedload(Expense.shopping_list).joinedload(ShoppingList.owner), - joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses), - joinedload(Expense.shopping_list).joinedload(ShoppingList.categories), - ) - .join(ShoppingList, Expense.list_id == ShoppingList.id) - .filter(*visible_clause) - ) - - # filtr kategorii - if category_id: - if category_id == "none": - # Bez kategorii: NIE robimy join na tabeli łączącej — inaczej utnie wynik - expenses_query = expenses_query.filter(~ShoppingList.categories.any()) - else: - try: - cid = int(category_id) - except (TypeError, ValueError): - cid = None - if cid: - expenses_query = expenses_query.join( - shopping_list_category, - shopping_list_category.c.shopping_list_id == ShoppingList.id, - ).filter(shopping_list_category.c.category_id == cid) - - # filtr zakresu dat po dacie DODANIA WYDATKU + # Filtr zakresu po CREATED_AT LISTY (nie po Expense) if start_date_str and end_date_str: try: start = datetime.strptime(start_date_str, "%Y-%m-%d") end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) - expenses_query = expenses_query.filter( - Expense.added_at >= start, Expense.added_at < end + lists_q = lists_q.filter( + ShoppingList.created_at >= start, + ShoppingList.created_at < end, ) except ValueError: flash("Błędny zakres dat", "danger") - expenses = expenses_query.order_by(Expense.added_at.desc()).all() + # Filtr kategorii na LISTACH (bez joinów, żeby nie ucinać „none”) + if category_id: + if category_id == "none": + lists_q = lists_q.filter(~ShoppingList.categories.any()) + else: + try: + cid = int(category_id) + lists_q = lists_q.join( + shopping_list_category, + shopping_list_category.c.shopping_list_id == ShoppingList.id, + ).filter(shopping_list_category.c.category_id == cid) + except (TypeError, ValueError): + pass - # sumy per lista (po faktycznie pobranych wydatkach) - list_ids = {e.list_id for e in expenses} - totals_map = {} - if list_ids: - totals = ( - db.session.query( - Expense.list_id, - func.coalesce(func.sum(Expense.amount), 0).label("total_expense") - ) - .filter(Expense.list_id.in_(list_ids)) - .group_by(Expense.list_id) - .all() + # Materializacja list (to jest pełna, finalna lista do wyświetlenia) + lists_filtered = ( + lists_q + .options(joinedload(ShoppingList.owner), joinedload(ShoppingList.categories)) + .order_by(ShoppingList.created_at.desc()) + .all() + ) + list_ids = [l.id for l in lists_filtered] or [-1] + + # --- 2) Wydatki: po wybranych listach (bez dodatkowego filtra dat, bo zakres dotyczy list) --- + expenses = ( + Expense.query + .options( + joinedload(Expense.shopping_list).joinedload(ShoppingList.owner), + joinedload(Expense.shopping_list).joinedload(ShoppingList.categories), ) - totals_map = {t.list_id: float(t.total_expense or 0) for t in totals} + .filter(Expense.list_id.in_(list_ids)) + .order_by(Expense.added_at.desc()) + .all() + ) + # --- 3) Sumy per lista (LEFT OUTER JOIN + COALESCE; nie utnie list bez wydatków) --- + totals_rows = ( + db.session.query( + ShoppingList.id.label("lid"), + func.coalesce(func.sum(Expense.amount), 0).label("total_expense"), + ) + .select_from(ShoppingList) + .filter(ShoppingList.id.in_(list_ids)) + .outerjoin(Expense, Expense.list_id == ShoppingList.id) + .group_by(ShoppingList.id) + .all() + ) + totals_map = {row.lid: float(row.total_expense or 0) for row in totals_rows} + + # --- 4) Kategorie do filtra (z samych widocznych list po filtrach) --- + categories = ( + Category.query + .join(shopping_list_category, shopping_list_category.c.category_id == Category.id) + .join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id) + .filter(ShoppingList.id.in_(list_ids)) + .distinct() + .order_by(Category.name.asc()) + .all() + ) + categories.append(SimpleNamespace(id="none", name="Bez kategorii")) + + # --- 5) Dane do widoku (format bez zmian) --- expense_table = [ { "title": (e.shopping_list.title if e.shopping_list else "Nieznana"), @@ -2070,11 +2065,11 @@ def expenses(): "id": l.id, "title": l.title, "created_at": l.created_at, - "total_expense": totals_map.get(l.id, 0.0), + "total_expense": totals_map.get(l.id, 0.0), # 0.0 gdy brak wydatków "owner_username": l.owner.username if l.owner else "?", "categories": [c.id for c in l.categories], } - for l in {e.shopping_list for e in expenses if e.shopping_list} + for l in lists_filtered ] return render_template( @@ -2092,8 +2087,8 @@ def expenses(): def expenses_data(): range_type = request.args.get("range", "monthly") start_date = request.args.get("start_date") - end_date = request.args.get("end_date") - show_all = request.args.get("show_all", "true").lower() == "true" + end_date = request.args.get("end_date") + show_all = request.args.get("show_all", "true").lower() == "true" category_id = request.args.get("category_id") by_category = request.args.get("by_category", "false").lower() == "true" @@ -2108,7 +2103,7 @@ def expenses_data(): ) else: result = get_total_expenses_grouped_by_list_created_at( - user_only=True, + user_only=False, admin=False, show_all=show_all, range_type=range_type,