From ce430f0f22c429b96e5fd0bf0facd148da1cc2a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Sat, 13 Sep 2025 18:32:54 +0200 Subject: [PATCH] commit1 permissions --- app.py | 298 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 225 insertions(+), 73 deletions(-) diff --git a/app.py b/app.py index eeb120c..2f68690 100644 --- a/app.py +++ b/app.py @@ -562,6 +562,18 @@ def redirect_with_flash( flash(message, category) return redirect(url_for(endpoint)) +def can_view_list(sl: ShoppingList) -> bool: + if current_user.is_authenticated: + if sl.owner_id == current_user.id: + return True + if sl.is_public: + return True + return db.session.query(ListPermission.id).filter_by( + list_id=sl.id, user_id=current_user.id + ).first() is not None + return bool(sl.is_public) + + def user_permission_subq(user_id): return db.session.query(ListPermission.list_id).filter( ListPermission.user_id == user_id @@ -1420,8 +1432,6 @@ def favicon(): @app.route("/") def main_page(): - from sqlalchemy import func, or_, case # upewnij się, że masz te importy na górze pliku - perm_subq = ( user_permission_subq(current_user.id) if current_user.is_authenticated @@ -1916,91 +1926,109 @@ def view_list(list_id): @app.route("/expenses") @login_required def expenses(): - start_date_str = request.args.get("start_date") - end_date_str = request.args.get("end_date") - category_id = request.args.get("category_id", type=int) - show_all = request.args.get("show_all", "true").lower() == "true" + from sqlalchemy.orm import joinedload + from sqlalchemy import or_, func - categories = ( - Category.query.join( - shopping_list_category, shopping_list_category.c.category_id == Category.id - ) - .join( - ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id - ) - .join(Expense, Expense.list_id == ShoppingList.id) - .filter( + # --- wejście --- + start_date_str = request.args.get("start_date") + end_date_str = request.args.get("end_date") + category_id = request.args.get("category_id", type=str) # może być "none" + show_all = request.args.get("show_all", "true").lower() == "true" + + now = datetime.now(timezone.utc) + perm_subq = user_permission_subq(current_user.id) + + # --- baza widoczności list --- + visible_clause = [ + ShoppingList.is_archived == False, + ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), + ] + if show_all: + visible_clause.append( or_( ShoppingList.owner_id == current_user.id, - ( - ShoppingList.is_public == True - if show_all - else ShoppingList.owner_id == current_user.id - ), + ShoppingList.is_public == True, + ShoppingList.id.in_(perm_subq), ) ) + else: + visible_clause.append(ShoppingList.owner_id == current_user.id) + + # --- kategorie dostępne z widocznych list, które mają wydatki --- + categories = ( + Category.query + .join(shopping_list_category, shopping_list_category.c.category_id == Category.id) + .join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id) + .join(Expense, Expense.list_id == ShoppingList.id) + .filter(*visible_clause) .distinct() .order_by(Category.name.asc()) .all() ) + # „Bez kategorii” + from types import SimpleNamespace categories.append(SimpleNamespace(id="none", name="Bez kategorii")) - start = None - end = None - - expenses_query = Expense.query.options( - joinedload(Expense.shopping_list).joinedload(ShoppingList.owner), - joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses), - joinedload(Expense.shopping_list).joinedload(ShoppingList.categories), - ).join(ShoppingList, Expense.list_id == ShoppingList.id) - - if not show_all: - expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id) - else: - expenses_query = expenses_query.filter( - or_( - ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True - ) + # --- główne zapytanie o wydatki --- + expenses_query = ( + Expense.query + .options( + joinedload(Expense.shopping_list).joinedload(ShoppingList.owner), + joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses), + joinedload(Expense.shopping_list).joinedload(ShoppingList.categories), ) + .join(ShoppingList, Expense.list_id == ShoppingList.id) + .filter(*visible_clause) + ) + # filtr kategorii if category_id: - if str(category_id) == "none": # Bez kategorii - lists_query = lists_query.filter(~ShoppingList.categories.any()) + if category_id == "none": + expenses_query = expenses_query.filter(~ShoppingList.categories.any()) else: - lists_query = lists_query.join( - shopping_list_category, - shopping_list_category.c.shopping_list_id == ShoppingList.id, - ).filter(shopping_list_category.c.category_id == category_id) + try: + cid = int(category_id) + except ValueError: + cid = None + if cid: + expenses_query = expenses_query.join( + shopping_list_category, + shopping_list_category.c.shopping_list_id == ShoppingList.id, + ).filter(shopping_list_category.c.category_id == cid) + # filtr zakresu dat (po dacie dodania wydatku) + start = end = None if start_date_str and end_date_str: try: start = datetime.strptime(start_date_str, "%Y-%m-%d") - end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) - expenses_query = expenses_query.filter( - Expense.added_at >= start, Expense.added_at < end - ) + end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) + expenses_query = expenses_query.filter(Expense.added_at >= start, + Expense.added_at < end) except ValueError: flash("Błędny zakres dat", "danger") expenses = expenses_query.order_by(Expense.added_at.desc()).all() + # sumy per lista list_ids = {e.list_id for e in expenses} totals_map = {} if list_ids: totals = ( db.session.query( - Expense.list_id, func.sum(Expense.amount).label("total_expense") + Expense.list_id, + func.coalesce(func.sum(Expense.amount), 0).label("total_expense") ) .filter(Expense.list_id.in_(list_ids)) .group_by(Expense.list_id) .all() ) - totals_map = {t.list_id: t.total_expense or 0 for t in totals} + totals_map = {t.list_id: t.total_expense for t in totals} + # dane tabeli i list (tylko z list widocznych) expense_table = [ { - "title": e.shopping_list.title if e.shopping_list else "Nieznana", + "title": e.shopping_list.title if e.shopping_list else "Nieznana", "amount": e.amount, "added_at": e.added_at, } @@ -2032,37 +2060,161 @@ def expenses(): @app.route("/expenses_data") @login_required def expenses_data(): - range_type = request.args.get("range", "monthly") - start_date = request.args.get("start_date") - end_date = request.args.get("end_date") - show_all = request.args.get("show_all", "true").lower() == "true" + from sqlalchemy import func, or_ + + range_type = request.args.get("range", "monthly") + start_date = request.args.get("start_date") + end_date = request.args.get("end_date") + show_all = request.args.get("show_all", "true").lower() == "true" category_id = request.args.get("category_id") by_category = request.args.get("by_category", "false").lower() == "true" - if by_category: - result = get_total_expenses_grouped_by_category( - show_all=show_all, - range_type=range_type, - start_date=start_date, - end_date=end_date, - user_id=current_user.id, - category_id=category_id, + now = datetime.now(timezone.utc) + perm_subq = user_permission_subq(current_user.id) + + # widoczność + visible_clause = [ + ShoppingList.is_archived == False, + ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), + ] + if show_all: + visible_clause.append( + or_( + ShoppingList.owner_id == current_user.id, + ShoppingList.is_public == True, + ShoppingList.id.in_(perm_subq), + ) ) else: - result = get_total_expenses_grouped_by_list_created_at( - user_only=True, - admin=False, - show_all=show_all, - range_type=range_type, - start_date=start_date, - end_date=end_date, - user_id=current_user.id, - category_id=category_id, + visible_clause.append(ShoppingList.owner_id == current_user.id) + + q = db.session.query( + Expense.id, + Expense.amount, + Expense.added_at, + ShoppingList.id.label("list_id"), + ShoppingList.title.label("list_title"), + ).join(ShoppingList, ShoppingList.id == Expense.list_id).filter(*visible_clause) + + # filtr kategorii + if category_id: + if category_id == "none": + q = q.filter(~ShoppingList.categories.any()) + else: + try: + cid = int(category_id) + q = q.join( + shopping_list_category, + shopping_list_category.c.shopping_list_id == ShoppingList.id, + ).filter(shopping_list_category.c.category_id == cid) + except (ValueError, TypeError): + pass + + # zakres czasu + if range_type == "custom" and start_date and end_date: + try: + sd = datetime.strptime(start_date, "%Y-%m-%d") + ed = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) + q = q.filter(Expense.added_at >= sd, Expense.added_at < ed) + except ValueError: + return jsonify({"error": "Błędny zakres dat"}), 400 + # dla monthly/weekly — brak dodatkowego filtra (agregujemy po polu) + + # agregacje + if by_category: + # suma po kategoriach list (lista może mieć wiele kategorii) + qc = ( + db.session.query( + Category.id.label("category_id"), + Category.name.label("category_name"), + func.coalesce(func.sum(Expense.amount), 0).label("total"), + ) + .join(shopping_list_category, shopping_list_category.c.category_id == Category.id) + .join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id) + .join(Expense, Expense.list_id == ShoppingList.id) + .filter(*visible_clause) ) - if "error" in result: - return jsonify({"error": result["error"]}), 400 - return jsonify(result) + if category_id: + if category_id == "none": + qc = qc.filter(~ShoppingList.categories.any()) + else: + try: + cid = int(category_id) + qc = qc.filter(Category.id == cid) + except (ValueError, TypeError): + pass + + if range_type == "custom" and start_date and end_date: + qc = qc.filter(Expense.added_at >= sd, Expense.added_at < ed) + + rows = ( + qc.group_by(Category.id, Category.name) + .order_by(Category.name.asc()) + .all() + ) + data = [ + {"category_id": r.category_id, "category_name": r.category_name, "total": float(r.total or 0)} + for r in rows + ] + return jsonify({"by": "category", "data": data}) + + else: + # suma po liście + bucket czasu + if range_type == "weekly": + bucket = func.date_trunc("week", Expense.added_at) + else: + # domyślnie monthly + bucket = func.date_trunc("month", Expense.added_at) + + rows = ( + db.session.query( + bucket.label("bucket"), + ShoppingList.id.label("list_id"), + ShoppingList.title.label("list_title"), + func.coalesce(func.sum(Expense.amount), 0).label("total"), + ) + .join(ShoppingList, ShoppingList.id == Expense.list_id) + .filter(*visible_clause) + ) + + if category_id: + if category_id == "none": + rows = rows.filter(~ShoppingList.categories.any()) + else: + try: + cid = int(category_id) + rows = rows.join( + shopping_list_category, + shopping_list_category.c.shopping_list_id == ShoppingList.id, + ).filter(shopping_list_category.c.category_id == cid) + except (ValueError, TypeError): + pass + + if range_type == "custom" and start_date and end_date: + rows = rows.filter(Expense.added_at >= sd, Expense.added_at < ed) + + rows = ( + rows.group_by("bucket", "list_id", "list_title") + .order_by("bucket", "list_title") + .all() + ) + + data = [] + for r in rows: + # r.bucket może być datetime (Postgres) — serializuj do YYYY-MM + if r.bucket is None: + bucket_key = "unknown" + else: + # dla monthly: YYYY-MM; dla weekly: ISO week start date + bucket_key = r.bucket.strftime("%Y-%m") if range_type != "weekly" else r.bucket.strftime("%Y-%m-%d") + data.append({ + "bucket": bucket_key, + "list_id": r.list_id, + "list_title": r.list_title, + "total": float(r.total or 0), + }) + return jsonify({"by": "list_bucket", "range": range_type, "data": data}) @app.route("/share/")