commit1 permissions

This commit is contained in:
Mateusz Gruszczyński
2025-09-13 18:32:54 +02:00
parent bf1c2e2a29
commit ce430f0f22

298
app.py
View File

@@ -562,6 +562,18 @@ def redirect_with_flash(
flash(message, category)
return redirect(url_for(endpoint))
def can_view_list(sl: ShoppingList) -> bool:
if current_user.is_authenticated:
if sl.owner_id == current_user.id:
return True
if sl.is_public:
return True
return db.session.query(ListPermission.id).filter_by(
list_id=sl.id, user_id=current_user.id
).first() is not None
return bool(sl.is_public)
def user_permission_subq(user_id):
return db.session.query(ListPermission.list_id).filter(
ListPermission.user_id == user_id
@@ -1420,8 +1432,6 @@ def favicon():
@app.route("/")
def main_page():
from sqlalchemy import func, or_, case # upewnij się, że masz te importy na górze pliku
perm_subq = (
user_permission_subq(current_user.id)
if current_user.is_authenticated
@@ -1916,91 +1926,109 @@ def view_list(list_id):
@app.route("/expenses")
@login_required
def expenses():
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
category_id = request.args.get("category_id", type=int)
show_all = request.args.get("show_all", "true").lower() == "true"
from sqlalchemy.orm import joinedload
from sqlalchemy import or_, func
categories = (
Category.query.join(
shopping_list_category, shopping_list_category.c.category_id == Category.id
)
.join(
ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id
)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(
# --- wejście ---
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
category_id = request.args.get("category_id", type=str) # może być "none"
show_all = request.args.get("show_all", "true").lower() == "true"
now = datetime.now(timezone.utc)
perm_subq = user_permission_subq(current_user.id)
# --- baza widoczności list ---
visible_clause = [
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
]
if show_all:
visible_clause.append(
or_(
ShoppingList.owner_id == current_user.id,
(
ShoppingList.is_public == True
if show_all
else ShoppingList.owner_id == current_user.id
),
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
visible_clause.append(ShoppingList.owner_id == current_user.id)
# --- kategorie dostępne z widocznych list, które mają wydatki ---
categories = (
Category.query
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
.join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
.distinct()
.order_by(Category.name.asc())
.all()
)
# „Bez kategorii”
from types import SimpleNamespace
categories.append(SimpleNamespace(id="none", name="Bez kategorii"))
start = None
end = None
expenses_query = Expense.query.options(
joinedload(Expense.shopping_list).joinedload(ShoppingList.owner),
joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses),
joinedload(Expense.shopping_list).joinedload(ShoppingList.categories),
).join(ShoppingList, Expense.list_id == ShoppingList.id)
if not show_all:
expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id)
else:
expenses_query = expenses_query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
# --- główne zapytanie o wydatki ---
expenses_query = (
Expense.query
.options(
joinedload(Expense.shopping_list).joinedload(ShoppingList.owner),
joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses),
joinedload(Expense.shopping_list).joinedload(ShoppingList.categories),
)
.join(ShoppingList, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
)
# filtr kategorii
if category_id:
if str(category_id) == "none": # Bez kategorii
lists_query = lists_query.filter(~ShoppingList.categories.any())
if category_id == "none":
expenses_query = expenses_query.filter(~ShoppingList.categories.any())
else:
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == category_id)
try:
cid = int(category_id)
except ValueError:
cid = None
if cid:
expenses_query = expenses_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
# filtr zakresu dat (po dacie dodania wydatku)
start = end = None
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(Expense.added_at >= start,
Expense.added_at < end)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
# sumy per lista
list_ids = {e.list_id for e in expenses}
totals_map = {}
if list_ids:
totals = (
db.session.query(
Expense.list_id, func.sum(Expense.amount).label("total_expense")
Expense.list_id,
func.coalesce(func.sum(Expense.amount), 0).label("total_expense")
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
totals_map = {t.list_id: t.total_expense or 0 for t in totals}
totals_map = {t.list_id: t.total_expense for t in totals}
# dane tabeli i list (tylko z list widocznych)
expense_table = [
{
"title": e.shopping_list.title if e.shopping_list else "Nieznana",
"title": e.shopping_list.title if e.shopping_list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at,
}
@@ -2032,37 +2060,161 @@ def expenses():
@app.route("/expenses_data")
@login_required
def expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
from sqlalchemy import func, or_
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
category_id = request.args.get("category_id")
by_category = request.args.get("by_category", "false").lower() == "true"
if by_category:
result = get_total_expenses_grouped_by_category(
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
now = datetime.now(timezone.utc)
perm_subq = user_permission_subq(current_user.id)
# widoczność
visible_clause = [
ShoppingList.is_archived == False,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
]
if show_all:
visible_clause.append(
or_(
ShoppingList.owner_id == current_user.id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
result = get_total_expenses_grouped_by_list_created_at(
user_only=True,
admin=False,
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
visible_clause.append(ShoppingList.owner_id == current_user.id)
q = db.session.query(
Expense.id,
Expense.amount,
Expense.added_at,
ShoppingList.id.label("list_id"),
ShoppingList.title.label("list_title"),
).join(ShoppingList, ShoppingList.id == Expense.list_id).filter(*visible_clause)
# filtr kategorii
if category_id:
if category_id == "none":
q = q.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
q = q.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (ValueError, TypeError):
pass
# zakres czasu
if range_type == "custom" and start_date and end_date:
try:
sd = datetime.strptime(start_date, "%Y-%m-%d")
ed = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
q = q.filter(Expense.added_at >= sd, Expense.added_at < ed)
except ValueError:
return jsonify({"error": "Błędny zakres dat"}), 400
# dla monthly/weekly — brak dodatkowego filtra (agregujemy po polu)
# agregacje
if by_category:
# suma po kategoriach list (lista może mieć wiele kategorii)
qc = (
db.session.query(
Category.id.label("category_id"),
Category.name.label("category_name"),
func.coalesce(func.sum(Expense.amount), 0).label("total"),
)
.join(shopping_list_category, shopping_list_category.c.category_id == Category.id)
.join(ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(*visible_clause)
)
if "error" in result:
return jsonify({"error": result["error"]}), 400
return jsonify(result)
if category_id:
if category_id == "none":
qc = qc.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
qc = qc.filter(Category.id == cid)
except (ValueError, TypeError):
pass
if range_type == "custom" and start_date and end_date:
qc = qc.filter(Expense.added_at >= sd, Expense.added_at < ed)
rows = (
qc.group_by(Category.id, Category.name)
.order_by(Category.name.asc())
.all()
)
data = [
{"category_id": r.category_id, "category_name": r.category_name, "total": float(r.total or 0)}
for r in rows
]
return jsonify({"by": "category", "data": data})
else:
# suma po liście + bucket czasu
if range_type == "weekly":
bucket = func.date_trunc("week", Expense.added_at)
else:
# domyślnie monthly
bucket = func.date_trunc("month", Expense.added_at)
rows = (
db.session.query(
bucket.label("bucket"),
ShoppingList.id.label("list_id"),
ShoppingList.title.label("list_title"),
func.coalesce(func.sum(Expense.amount), 0).label("total"),
)
.join(ShoppingList, ShoppingList.id == Expense.list_id)
.filter(*visible_clause)
)
if category_id:
if category_id == "none":
rows = rows.filter(~ShoppingList.categories.any())
else:
try:
cid = int(category_id)
rows = rows.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cid)
except (ValueError, TypeError):
pass
if range_type == "custom" and start_date and end_date:
rows = rows.filter(Expense.added_at >= sd, Expense.added_at < ed)
rows = (
rows.group_by("bucket", "list_id", "list_title")
.order_by("bucket", "list_title")
.all()
)
data = []
for r in rows:
# r.bucket może być datetime (Postgres) — serializuj do YYYY-MM
if r.bucket is None:
bucket_key = "unknown"
else:
# dla monthly: YYYY-MM; dla weekly: ISO week start date
bucket_key = r.bucket.strftime("%Y-%m") if range_type != "weekly" else r.bucket.strftime("%Y-%m-%d")
data.append({
"bucket": bucket_key,
"list_id": r.list_id,
"list_title": r.list_title,
"total": float(r.total or 0),
})
return jsonify({"by": "list_bucket", "range": range_type, "data": data})
@app.route("/share/<token>")