commit1 permissions

This commit is contained in:
Mateusz Gruszczyński
2025-09-13 22:18:07 +02:00
parent 7bdd9239eb
commit 0e4375b561

363
app.py
View File

@@ -741,108 +741,6 @@ def handle_crop_receipt(receipt_id, file):
return {"success": False, "error": str(e)}
def get_total_expenses_grouped_by_list_created_at(
user_only=False,
admin=False,
show_all=False,
range_type="monthly",
start_date=None,
end_date=None,
user_id=None,
category_id=None,
):
# Widoczność
now = datetime.now(timezone.utc)
lists_q = ShoppingList.query.filter(
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
)
if admin:
pass
elif user_only:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
elif show_all:
perm_subq = user_permission_subq(user_id)
lists_q = lists_q.filter(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
# Filtr kategorii (bez ucinania wyników)
if category_id:
if str(category_id) == "none":
lists_q = lists_q.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
lists_q = lists_q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (TypeError, ValueError):
pass
# Zakres po CREATED_AT listy (tak jak wcześniej)
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(
ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end,
)
except Exception:
return {"error": "Błędne daty"}
lists = lists_q.options(joinedload(ShoppingList.categories)).all()
if not lists:
return {"labels": [], "expenses": []}
# Suma wydatków per lista (BEZ joinów nic nie utnie)
list_ids = [l.id for l in lists]
totals = (
db.session.query(
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_amount"),
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
expense_map = {lid: float(total or 0) for lid, total in totals}
# Grupowanie po bucketach czasu z created_at listy
def bucket_from_dt(ts: datetime) -> str:
if range_type == "daily":
return ts.strftime("%Y-%m-%d")
elif range_type == "weekly":
# ISO-tydzień jako YYYY-Www
return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}"
elif range_type == "quarterly":
return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}"
elif range_type == "halfyearly":
return f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
return str(ts.year)
else:
# monthly (domyślnie)
return ts.strftime("%Y-%m")
grouped = defaultdict(float)
for sl in lists:
grouped[bucket_from_dt(sl.created_at)] += expense_map.get(sl.id, 0.0)
labels = sorted(grouped.keys())
expenses = [round(grouped[l], 2) for l in labels]
return {"labels": labels, "expenses": expenses}
def recalculate_filesizes(receipt_id: int = None):
updated = 0
not_found = 0
@@ -953,11 +851,9 @@ def category_to_color(name):
r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
def get_total_expenses_grouped_by_category(
show_all, range_type, start_date, end_date, user_id, category_id=None
):
# Widoczność
now = datetime.now(timezone.utc)
lists_q = ShoppingList.query.filter(
ShoppingList.is_archived == False,
@@ -976,7 +872,6 @@ def get_total_expenses_grouped_by_category(
else:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
# Filtr kategorii na LISTACH (ważne: bez join =none nie ucina)
if category_id:
if str(category_id) == "none":
lists_q = lists_q.filter(~ShoppingList.categories.any())
@@ -990,15 +885,13 @@ def get_total_expenses_grouped_by_category(
except (TypeError, ValueError):
pass
# Zakres po CREATED_AT listy (jak pierwotnie)
# ZAKRES: zawsze po created_at LISTY
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(
ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end,
)
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end)
except Exception:
return {"error": "Błędne daty"}
@@ -1006,7 +899,7 @@ def get_total_expenses_grouped_by_category(
if not lists:
return {"labels": [], "datasets": []}
# Suma wydatków per lista
# SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense)
list_ids = [l.id for l in lists]
totals = (
db.session.query(
@@ -1019,7 +912,7 @@ def get_total_expenses_grouped_by_category(
)
expense_map = {lid: float(total or 0) for lid, total in totals}
# bucket z CREATED_AT listy
# bucket wg created_at LISTY
def bucket_from_dt(ts: datetime) -> str:
if range_type == "daily":
return ts.strftime("%Y-%m-%d")
@@ -1034,7 +927,6 @@ def get_total_expenses_grouped_by_category(
else:
return ts.strftime("%Y-%m")
# data_map: bucket -> {category_name -> suma}
data_map = defaultdict(lambda: defaultdict(float))
all_labels = set()
@@ -1042,8 +934,8 @@ def get_total_expenses_grouped_by_category(
key = bucket_from_dt(l.created_at)
all_labels.add(key)
total_expense = expense_map.get(l.id, 0.0)
if str(category_id) == "none":
# tu l ma brak kategorii (z filtra wyżej)
data_map[key]["Bez kategorii"] += total_expense
continue
@@ -1056,10 +948,7 @@ def get_total_expenses_grouped_by_category(
data_map[key][c.name] += total_expense
labels = sorted(all_labels)
# kategorie, które faktycznie mają wydatki
categories_with_expenses = sorted(
{cat for bucket in data_map.values() for cat, val in bucket.items() if val > 0}
)
cats = sorted({cat for b in data_map.values() for cat,v in b.items() if v > 0})
datasets = [
{
@@ -1067,11 +956,106 @@ def get_total_expenses_grouped_by_category(
"data": [round(data_map[label].get(cat, 0.0), 2) for label in labels],
"backgroundColor": category_to_color(cat),
}
for cat in categories_with_expenses
for cat in cats
]
return {"labels": labels, "datasets": datasets}
def get_total_expenses_grouped_by_list_created_at(
user_only=False,
admin=False,
show_all=False,
range_type="monthly",
start_date=None,
end_date=None,
user_id=None,
category_id=None,
):
now = datetime.now(timezone.utc)
lists_q = ShoppingList.query.filter(
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
)
if admin:
pass
elif user_only:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
elif show_all:
perm_subq = user_permission_subq(user_id)
lists_q = lists_q.filter(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
# kategorie (bez ucinania „none”)
if category_id:
if str(category_id) == "none":
lists_q = lists_q.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
lists_q = lists_q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (TypeError, ValueError):
pass
# ZAKRES: zawsze po created_at LISTY
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end)
except Exception:
return {"error": "Błędne daty"}
lists = lists_q.options(joinedload(ShoppingList.categories)).all()
if not lists:
return {"labels": [], "expenses": []}
# SUMY: po wszystkich wydatkach tych list (bez filtra dat po Expense)
list_ids = [l.id for l in lists]
totals = (
db.session.query(
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_amount"),
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
expense_map = {lid: float(total or 0) for lid, total in totals}
# bucket wg created_at LISTY
def bucket_from_dt(ts: datetime) -> str:
if range_type == "daily":
return ts.strftime("%Y-%m-%d")
elif range_type == "weekly":
return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}"
elif range_type == "quarterly":
return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}"
elif range_type == "halfyearly":
return f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
return str(ts.year)
else:
return ts.strftime("%Y-%m")
grouped = defaultdict(float)
for sl in lists:
grouped[bucket_from_dt(sl.created_at)] += expense_map.get(sl.id, 0.0)
labels = sorted(grouped.keys())
expenses = [round(grouped[l], 2) for l in labels]
return {"labels": labels, "expenses": expenses}
def save_pdf_as_webp(file, path):
try:
@@ -1976,86 +1960,97 @@ def view_list(list_id):
@app.route("/expenses")
@login_required
def expenses():
from sqlalchemy.orm import joinedload
from types import SimpleNamespace
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
category_id = request.args.get("category_id", type=str) # może być "none"
show_all = request.args.get("show_all", "true").lower() == "true"
now = datetime.now(timezone.utc)
# --- 1) Widoczne listy (właściciel/publiczne/udzielone), aktywne (niearch./niewygasłe) ---
visible_clause = visible_lists_clause_for_expenses(
user_id=current_user.id, include_shared=show_all, now_dt=now
)
# --- lista kategorii dostępnych w obrębie widocznych list, które mają wydatki ---
categories = (
Category.query
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
.join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
.distinct()
.order_by(Category.name.asc())
.all()
)
from types import SimpleNamespace
categories.append(SimpleNamespace(id="none", name="Bez kategorii"))
lists_q = ShoppingList.query.filter(*visible_clause)
# --- główne zapytanie o wydatki ---
expenses_query = (
Expense.query
.options(
joinedload(Expense.shopping_list).joinedload(ShoppingList.owner),
joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses),
joinedload(Expense.shopping_list).joinedload(ShoppingList.categories),
)
.join(ShoppingList, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
)
# filtr kategorii
if category_id:
if category_id == "none":
# Bez kategorii: NIE robimy join na tabeli łączącej — inaczej utnie wynik
expenses_query = expenses_query.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
except (TypeError, ValueError):
cid = None
if cid:
expenses_query = expenses_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
# filtr zakresu dat po dacie DODANIA WYDATKU
# Filtr zakresu po CREATED_AT LISTY (nie po Expense)
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
lists_q = lists_q.filter(
ShoppingList.created_at >= start,
ShoppingList.created_at < end,
)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
# Filtr kategorii na LISTACH (bez joinów, żeby nie ucinać „none”)
if category_id:
if category_id == "none":
lists_q = lists_q.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
lists_q = lists_q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (TypeError, ValueError):
pass
# sumy per lista (po faktycznie pobranych wydatkach)
list_ids = {e.list_id for e in expenses}
totals_map = {}
if list_ids:
totals = (
db.session.query(
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_expense")
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
# Materializacja list (to jest pełna, finalna lista do wyświetlenia)
lists_filtered = (
lists_q
.options(joinedload(ShoppingList.owner), joinedload(ShoppingList.categories))
.order_by(ShoppingList.created_at.desc())
.all()
)
list_ids = [l.id for l in lists_filtered] or [-1]
# --- 2) Wydatki: po wybranych listach (bez dodatkowego filtra dat, bo zakres dotyczy list) ---
expenses = (
Expense.query
.options(
joinedload(Expense.shopping_list).joinedload(ShoppingList.owner),
joinedload(Expense.shopping_list).joinedload(ShoppingList.categories),
)
totals_map = {t.list_id: float(t.total_expense or 0) for t in totals}
.filter(Expense.list_id.in_(list_ids))
.order_by(Expense.added_at.desc())
.all()
)
# --- 3) Sumy per lista (LEFT OUTER JOIN + COALESCE; nie utnie list bez wydatków) ---
totals_rows = (
db.session.query(
ShoppingList.id.label("lid"),
func.coalesce(func.sum(Expense.amount), 0).label("total_expense"),
)
.select_from(ShoppingList)
.filter(ShoppingList.id.in_(list_ids))
.outerjoin(Expense, Expense.list_id == ShoppingList.id)
.group_by(ShoppingList.id)
.all()
)
totals_map = {row.lid: float(row.total_expense or 0) for row in totals_rows}
# --- 4) Kategorie do filtra (z samych widocznych list po filtrach) ---
categories = (
Category.query
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
.join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id)
.filter(ShoppingList.id.in_(list_ids))
.distinct()
.order_by(Category.name.asc())
.all()
)
categories.append(SimpleNamespace(id="none", name="Bez kategorii"))
# --- 5) Dane do widoku (format bez zmian) ---
expense_table = [
{
"title": (e.shopping_list.title if e.shopping_list else "Nieznana"),
@@ -2070,11 +2065,11 @@ def expenses():
"id": l.id,
"title": l.title,
"created_at": l.created_at,
"total_expense": totals_map.get(l.id, 0.0),
"total_expense": totals_map.get(l.id, 0.0), # 0.0 gdy brak wydatków
"owner_username": l.owner.username if l.owner else "?",
"categories": [c.id for c in l.categories],
}
for l in {e.shopping_list for e in expenses if e.shopping_list}
for l in lists_filtered
]
return render_template(
@@ -2092,8 +2087,8 @@ def expenses():
def expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
category_id = request.args.get("category_id")
by_category = request.args.get("by_category", "false").lower() == "true"
@@ -2108,7 +2103,7 @@ def expenses_data():
)
else:
result = get_total_expenses_grouped_by_list_created_at(
user_only=True,
user_only=False,
admin=False,
show_all=show_all,
range_type=range_type,