commit1 permissions

This commit is contained in:
Mateusz Gruszczyński
2025-09-13 18:53:29 +02:00
parent ce430f0f22
commit 7bdd9239eb

520
app.py
View File

@@ -573,6 +573,57 @@ def can_view_list(sl: ShoppingList) -> bool:
).first() is not None
return bool(sl.is_public)
def db_bucket(col, kind: str = "month"):
name = db.engine.name # 'sqlite', 'mysql', 'mariadb', 'postgresql', ...
kind = (kind or "month").lower()
if kind == "day":
if name == "sqlite":
return func.strftime("%Y-%m-%d", col)
elif name in ("mysql", "mariadb"):
return func.date_format(col, "%Y-%m-%d")
else:
return func.to_char(col, "YYYY-MM-DD")
if kind == "week":
if name == "sqlite":
return func.printf("%s-W%s",
func.strftime("%Y", col),
func.strftime("%W", col))
elif name in ("mysql", "mariadb"):
return func.date_format(col, "%x-W%v")
else:
return func.to_char(col, 'IYYY-"W"IW')
if name == "sqlite":
return func.strftime("%Y-%m", col)
elif name in ("mysql", "mariadb"):
return func.date_format(col, "%Y-%m")
else:
return func.to_char(col, "YYYY-MM")
def visible_lists_clause_for_expenses(user_id: int, include_shared: bool, now_dt):
perm_subq = user_permission_subq(user_id)
base = [
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now_dt)),
]
if include_shared:
base.append(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
base.append(ShoppingList.owner_id == user_id)
return base
def user_permission_subq(user_id):
return db.session.query(ListPermission.list_id).filter(
@@ -700,99 +751,94 @@ def get_total_expenses_grouped_by_list_created_at(
user_id=None,
category_id=None,
):
lists_query = ShoppingList.query
# Widoczność
now = datetime.now(timezone.utc)
lists_q = ShoppingList.query.filter(
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
)
if admin:
pass
elif user_only:
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
elif show_all:
lists_query = lists_query.filter(
perm_subq = user_permission_subq(user_id)
lists_q = lists_q.filter(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
lists_query = lists_query.filter(ShoppingList.owner_id == user_id)
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
# Filtr kategorii (bez ucinania wyników)
if category_id:
if str(category_id) == "none":
lists_query = lists_query.filter(~ShoppingList.categories.any())
lists_q = lists_q.filter(~ShoppingList.categories.any())
else:
try:
cat_id_int = int(category_id)
except ValueError:
return {"labels": [], "expenses": []}
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cat_id_int)
today = datetime.now(timezone.utc).date()
if range_type == "last30days":
dt_start = today - timedelta(days=29)
dt_end = today + timedelta(days=1)
start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime(
"%Y-%m-%d"
)
elif range_type == "currentmonth":
dt_start = today.replace(day=1)
dt_end = today + timedelta(days=1)
start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime(
"%Y-%m-%d"
)
cid = int(category_id)
lists_q = lists_q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (TypeError, ValueError):
pass
# Zakres po CREATED_AT listy (tak jak wcześniej)
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d")
if dt_end.tzinfo is None:
dt_end = dt_end.replace(tzinfo=timezone.utc)
dt_end += timedelta(days=1)
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(
ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end,
)
except Exception:
return {"error": "Błędne daty", "labels": [], "expenses": []}
return {"error": "Błędne daty"}
lists_query = lists_query.filter(
ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end
)
lists = lists_query.all()
lists = lists_q.options(joinedload(ShoppingList.categories)).all()
if not lists:
return {"labels": [], "expenses": []}
# Suma wydatków per lista (BEZ joinów nic nie utnie)
list_ids = [l.id for l in lists]
total_expenses = (
totals = (
db.session.query(
Expense.list_id, func.sum(Expense.amount).label("total_amount")
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_amount"),
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
expense_map = {lid: float(total or 0) for lid, total in totals}
expense_map = {lid: amt for lid, amt in total_expenses}
# Grupowanie po bucketach czasu z created_at listy
def bucket_from_dt(ts: datetime) -> str:
if range_type == "daily":
return ts.strftime("%Y-%m-%d")
elif range_type == "weekly":
# ISO-tydzień jako YYYY-Www
return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}"
elif range_type == "quarterly":
return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}"
elif range_type == "halfyearly":
return f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
return str(ts.year)
else:
# monthly (domyślnie)
return ts.strftime("%Y-%m")
grouped = defaultdict(float)
for sl in lists:
if sl.id in expense_map:
ts = sl.created_at or datetime.now(timezone.utc)
if range_type in ("last30days", "currentmonth"):
key = ts.strftime("%Y-%m-%d") # dzienny widok
elif range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
grouped[key] += expense_map[sl.id]
grouped[bucket_from_dt(sl.created_at)] += expense_map.get(sl.id, 0.0)
labels = sorted(grouped)
labels = sorted(grouped.keys())
expenses = [round(grouped[l], 2) for l in labels]
return {"labels": labels, "expenses": expenses}
@@ -911,84 +957,94 @@ def category_to_color(name):
def get_total_expenses_grouped_by_category(
show_all, range_type, start_date, end_date, user_id, category_id=None
):
lists_query = ShoppingList.query
# Widoczność
now = datetime.now(timezone.utc)
lists_q = ShoppingList.query.filter(
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
)
if show_all:
lists_query = lists_query.filter(
or_(ShoppingList.owner_id == user_id, ShoppingList.is_public == True)
perm_subq = user_permission_subq(user_id)
lists_q = lists_q.filter(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
lists_query = lists_query.filter(ShoppingList.owner_id == user_id)
lists_q = lists_q.filter(ShoppingList.owner_id == user_id)
# Filtr kategorii na LISTACH (ważne: bez join =none nie ucina)
if category_id:
if str(category_id) == "none":
lists_query = lists_query.filter(~ShoppingList.categories.any())
lists_q = lists_q.filter(~ShoppingList.categories.any())
else:
try:
cat_id_int = int(category_id)
except ValueError:
return {"labels": [], "datasets": []}
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cat_id_int)
if not start_date and not end_date:
today = datetime.now(timezone.utc).date()
if range_type == "last30days":
dt_start = today - timedelta(days=29)
dt_end = today + timedelta(days=1)
start_date = dt_start.strftime("%Y-%m-%d")
end_date = dt_end.strftime("%Y-%m-%d")
elif range_type == "currentmonth":
dt_start = today.replace(day=1)
dt_end = today + timedelta(days=1)
start_date = dt_start.strftime("%Y-%m-%d")
end_date = dt_end.strftime("%Y-%m-%d")
cid = int(category_id)
lists_q = lists_q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (TypeError, ValueError):
pass
# Zakres po CREATED_AT listy (jak pierwotnie)
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
lists_q = lists_q.filter(
ShoppingList.created_at >= dt_start,
ShoppingList.created_at < dt_end,
)
except Exception:
return {"error": "Błędne daty"}
lists_query = lists_query.filter(
ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end
)
lists = lists_query.options(joinedload(ShoppingList.categories)).all()
lists = lists_q.options(joinedload(ShoppingList.categories)).all()
if not lists:
return {"labels": [], "datasets": []}
# Suma wydatków per lista
list_ids = [l.id for l in lists]
totals = (
db.session.query(
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_amount"),
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
expense_map = {lid: float(total or 0) for lid, total in totals}
# bucket z CREATED_AT listy
def bucket_from_dt(ts: datetime) -> str:
if range_type == "daily":
return ts.strftime("%Y-%m-%d")
elif range_type == "weekly":
return f"{ts.isocalendar().year}-W{ts.isocalendar().week:02d}"
elif range_type == "quarterly":
return f"{ts.year}-Q{((ts.month - 1)//3 + 1)}"
elif range_type == "halfyearly":
return f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
return str(ts.year)
else:
return ts.strftime("%Y-%m")
# data_map: bucket -> {category_name -> suma}
data_map = defaultdict(lambda: defaultdict(float))
all_labels = set()
for l in lists:
total_expense = (
db.session.query(func.sum(Expense.amount))
.filter(Expense.list_id == l.id)
.scalar()
) or 0
if total_expense <= 0:
continue
if range_type == "monthly":
key = l.created_at.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{l.created_at.year}-Q{((l.created_at.month - 1) // 3 + 1)}"
elif range_type == "halfyearly":
key = f"{l.created_at.year}-H{1 if l.created_at.month <= 6 else 2}"
elif range_type == "yearly":
key = str(l.created_at.year)
else:
key = l.created_at.strftime("%Y-%m-%d")
key = bucket_from_dt(l.created_at)
all_labels.add(key)
total_expense = expense_map.get(l.id, 0.0)
if str(category_id) == "none":
if not l.categories:
data_map[key]["Bez kategorii"] += total_expense
# tu l ma brak kategorii (z filtra wyżej)
data_map[key]["Bez kategorii"] += total_expense
continue
if not l.categories:
@@ -1000,25 +1056,19 @@ def get_total_expenses_grouped_by_category(
data_map[key][c.name] += total_expense
labels = sorted(all_labels)
# kategorie, które faktycznie mają wydatki
categories_with_expenses = sorted(
{
cat
for cat_data in data_map.values()
for cat, value in cat_data.items()
if value > 0
}
{cat for bucket in data_map.values() for cat, val in bucket.items() if val > 0}
)
datasets = []
for cat in categories_with_expenses:
datasets.append(
{
"label": cat,
"data": [round(data_map[label].get(cat, 0), 2) for label in labels],
"backgroundColor": category_to_color(cat),
}
)
datasets = [
{
"label": cat,
"data": [round(data_map[label].get(cat, 0.0), 2) for label in labels],
"backgroundColor": category_to_color(cat),
}
for cat in categories_with_expenses
]
return {"labels": labels, "datasets": datasets}
@@ -1926,35 +1976,17 @@ def view_list(list_id):
@app.route("/expenses")
@login_required
def expenses():
from sqlalchemy.orm import joinedload
from sqlalchemy import or_, func
# --- wejście ---
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
category_id = request.args.get("category_id", type=str) # może być "none"
show_all = request.args.get("show_all", "true").lower() == "true"
now = datetime.now(timezone.utc)
perm_subq = user_permission_subq(current_user.id)
visible_clause = visible_lists_clause_for_expenses(
user_id=current_user.id, include_shared=show_all, now_dt=now
)
# --- baza widoczności list ---
visible_clause = [
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
]
if show_all:
visible_clause.append(
or_(
ShoppingList.owner_id == current_user.id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
visible_clause.append(ShoppingList.owner_id == current_user.id)
# --- kategorie dostępne z widocznych list, które mają wydatki ---
# --- lista kategorii dostępnych w obrębie widocznych list, które mają wydatki ---
categories = (
Category.query
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
@@ -1965,8 +1997,6 @@ def expenses():
.order_by(Category.name.asc())
.all()
)
# „Bez kategorii”
from types import SimpleNamespace
categories.append(SimpleNamespace(id="none", name="Bez kategorii"))
@@ -1985,11 +2015,12 @@ def expenses():
# filtr kategorii
if category_id:
if category_id == "none":
# Bez kategorii: NIE robimy join na tabeli łączącej — inaczej utnie wynik
expenses_query = expenses_query.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
except ValueError:
except (TypeError, ValueError):
cid = None
if cid:
expenses_query = expenses_query.join(
@@ -1997,20 +2028,20 @@ def expenses():
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
# filtr zakresu dat (po dacie dodania wydatku)
start = end = None
# filtr zakresu dat po dacie DODANIA WYDATKU
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(Expense.added_at >= start,
Expense.added_at < end)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
# sumy per lista
# sumy per lista (po faktycznie pobranych wydatkach)
list_ids = {e.list_id for e in expenses}
totals_map = {}
if list_ids:
@@ -2023,13 +2054,12 @@ def expenses():
.group_by(Expense.list_id)
.all()
)
totals_map = {t.list_id: t.total_expense for t in totals}
totals_map = {t.list_id: float(t.total_expense or 0) for t in totals}
# dane tabeli i list (tylko z list widocznych)
expense_table = [
{
"title": e.shopping_list.title if e.shopping_list else "Nieznana",
"amount": e.amount,
"title": (e.shopping_list.title if e.shopping_list else "Nieznana"),
"amount": e.amount,
"added_at": e.added_at,
}
for e in expenses
@@ -2040,7 +2070,7 @@ def expenses():
"id": l.id,
"title": l.title,
"created_at": l.created_at,
"total_expense": totals_map.get(l.id, 0),
"total_expense": totals_map.get(l.id, 0.0),
"owner_username": l.owner.username if l.owner else "?",
"categories": [c.id for c in l.categories],
}
@@ -2060,161 +2090,37 @@ def expenses():
@app.route("/expenses_data")
@login_required
def expenses_data():
from sqlalchemy import func, or_
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
category_id = request.args.get("category_id")
by_category = request.args.get("by_category", "false").lower() == "true"
now = datetime.now(timezone.utc)
perm_subq = user_permission_subq(current_user.id)
# widoczność
visible_clause = [
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
]
if show_all:
visible_clause.append(
or_(
ShoppingList.owner_id == current_user.id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
visible_clause.append(ShoppingList.owner_id == current_user.id)
q = db.session.query(
Expense.id,
Expense.amount,
Expense.added_at,
ShoppingList.id.label("list_id"),
ShoppingList.title.label("list_title"),
).join(ShoppingList, ShoppingList.id == Expense.list_id).filter(*visible_clause)
# filtr kategorii
if category_id:
if category_id == "none":
q = q.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
q = q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (ValueError, TypeError):
pass
# zakres czasu
if range_type == "custom" and start_date and end_date:
try:
sd = datetime.strptime(start_date, "%Y-%m-%d")
ed = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
q = q.filter(Expense.added_at >= sd, Expense.added_at < ed)
except ValueError:
return jsonify({"error": "Błędny zakres dat"}), 400
# dla monthly/weekly — brak dodatkowego filtra (agregujemy po polu)
# agregacje
if by_category:
# suma po kategoriach list (lista może mieć wiele kategorii)
qc = (
db.session.query(
Category.id.label("category_id"),
Category.name.label("category_name"),
func.coalesce(func.sum(Expense.amount), 0).label("total"),
)
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
.join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
result = get_total_expenses_grouped_by_category(
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
)
if category_id:
if category_id == "none":
qc = qc.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
qc = qc.filter(Category.id == cid)
except (ValueError, TypeError):
pass
if range_type == "custom" and start_date and end_date:
qc = qc.filter(Expense.added_at >= sd, Expense.added_at < ed)
rows = (
qc.group_by(Category.id, Category.name)
.order_by(Category.name.asc())
.all()
)
data = [
{"category_id": r.category_id, "category_name": r.category_name, "total": float(r.total or 0)}
for r in rows
]
return jsonify({"by": "category", "data": data})
else:
# suma po liście + bucket czasu
if range_type == "weekly":
bucket = func.date_trunc("week", Expense.added_at)
else:
# domyślnie monthly
bucket = func.date_trunc("month", Expense.added_at)
rows = (
db.session.query(
bucket.label("bucket"),
ShoppingList.id.label("list_id"),
ShoppingList.title.label("list_title"),
func.coalesce(func.sum(Expense.amount), 0).label("total"),
)
.join(ShoppingList, ShoppingList.id == Expense.list_id)
.filter(*visible_clause)
result = get_total_expenses_grouped_by_list_created_at(
user_only=True,
admin=False,
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
)
if category_id:
if category_id == "none":
rows = rows.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
rows = rows.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (ValueError, TypeError):
pass
if range_type == "custom" and start_date and end_date:
rows = rows.filter(Expense.added_at >= sd, Expense.added_at < ed)
rows = (
rows.group_by("bucket", "list_id", "list_title")
.order_by("bucket", "list_title")
.all()
)
data = []
for r in rows:
# r.bucket może być datetime (Postgres) — serializuj do YYYY-MM
if r.bucket is None:
bucket_key = "unknown"
else:
# dla monthly: YYYY-MM; dla weekly: ISO week start date
bucket_key = r.bucket.strftime("%Y-%m") if range_type != "weekly" else r.bucket.strftime("%Y-%m-%d")
data.append({
"bucket": bucket_key,
"list_id": r.list_id,
"list_title": r.list_title,
"total": float(r.total or 0),
})
return jsonify({"by": "list_bucket", "range": range_type, "data": data})
if "error" in result:
return jsonify({"error": result["error"]}), 400
return jsonify(result)
@app.route("/share/<token>")