fix filtrowania
This commit is contained in:
189
app.py
189
app.py
@@ -417,6 +417,75 @@ def generate_new_receipt_filename(list_id):
|
||||
return f"list_{list_id}_{timestamp}_{random_part}.webp"
|
||||
|
||||
|
||||
def get_expenses_aggregated_by_list_created_at(
|
||||
user_only=False, admin=False, show_all=False,
|
||||
range_type="monthly", start_date=None, end_date=None, user_id=None
|
||||
):
|
||||
"""
|
||||
Wspólna logika: sumujemy najnowszy wydatek z każdej listy,
|
||||
ale do agregacji/filtra bierzemy ShoppingList.created_at!
|
||||
"""
|
||||
lists_query = ShoppingList.query
|
||||
# Uprawnienia
|
||||
if admin:
|
||||
# admin widzi wszystko, ewentualnie: dodać filtrowanie wg potrzeb
|
||||
pass
|
||||
elif show_all:
|
||||
lists_query = lists_query.filter(
|
||||
or_(
|
||||
ShoppingList.owner_id == user_id,
|
||||
ShoppingList.is_public == True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
lists_query = lists_query.filter(ShoppingList.owner_id == user_id)
|
||||
|
||||
# Filtrowanie po created_at listy
|
||||
if start_date and end_date:
|
||||
try:
|
||||
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
|
||||
except Exception:
|
||||
return {"error": "Błędne daty", "labels": [], "expenses": []}
|
||||
lists_query = lists_query.filter(
|
||||
ShoppingList.created_at >= dt_start,
|
||||
ShoppingList.created_at < dt_end
|
||||
)
|
||||
lists = lists_query.all()
|
||||
|
||||
# Najnowszy wydatek każdej listy
|
||||
data = []
|
||||
for sl in lists:
|
||||
latest_exp = (
|
||||
Expense.query
|
||||
.filter_by(list_id=sl.id)
|
||||
.order_by(Expense.added_at.desc())
|
||||
.first()
|
||||
)
|
||||
if latest_exp:
|
||||
data.append({"created_at": sl.created_at, "amount": latest_exp.amount})
|
||||
|
||||
# Grupowanie po wybranym zakresie wg utworzenia listy
|
||||
grouped = defaultdict(float)
|
||||
for rec in data:
|
||||
ts = rec["created_at"] or datetime.now(timezone.utc)
|
||||
if range_type == "monthly":
|
||||
key = ts.strftime("%Y-%m")
|
||||
elif range_type == "quarterly":
|
||||
key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}"
|
||||
elif range_type == "halfyearly":
|
||||
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
|
||||
elif range_type == "yearly":
|
||||
key = str(ts.year)
|
||||
else:
|
||||
key = ts.strftime("%Y-%m-%d")
|
||||
grouped[key] += rec["amount"]
|
||||
|
||||
labels = sorted(grouped)
|
||||
expenses = [round(grouped[l], 2) for l in labels]
|
||||
return {"labels": labels, "expenses": expenses}
|
||||
|
||||
|
||||
############# OCR ###########################
|
||||
|
||||
|
||||
@@ -1140,62 +1209,18 @@ def user_expenses_data():
|
||||
end_date = request.args.get("end_date")
|
||||
show_all = request.args.get("show_all", "false").lower() == "true"
|
||||
|
||||
base_query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
|
||||
|
||||
if show_all:
|
||||
base_query = base_query.filter(
|
||||
or_(
|
||||
ShoppingList.owner_id == current_user.id,
|
||||
ShoppingList.is_public == True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
base_query = base_query.filter(ShoppingList.owner_id == current_user.id)
|
||||
|
||||
if start_date and end_date:
|
||||
try:
|
||||
start = datetime.strptime(start_date, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
|
||||
base_query = base_query.filter(
|
||||
Expense.added_at >= start, Expense.added_at < end
|
||||
)
|
||||
except ValueError:
|
||||
return jsonify({"error": "Błędne daty"}), 400
|
||||
|
||||
# Wybierz tylko najnowszy wydatek dla każdej listy
|
||||
subq = (
|
||||
db.session.query(
|
||||
Expense.list_id,
|
||||
func.max(Expense.added_at).label("latest"),
|
||||
)
|
||||
.group_by(Expense.list_id)
|
||||
.subquery()
|
||||
result = get_expenses_aggregated_by_list_created_at(
|
||||
user_only=True,
|
||||
admin=False,
|
||||
show_all=show_all,
|
||||
range_type=range_type,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
user_id=current_user.id
|
||||
)
|
||||
|
||||
query = base_query.join(
|
||||
subq, (Expense.list_id == subq.c.list_id) & (Expense.added_at == subq.c.latest)
|
||||
)
|
||||
|
||||
expenses = query.all()
|
||||
|
||||
grouped = defaultdict(float)
|
||||
for e in expenses:
|
||||
ts = e.added_at or datetime.now(timezone.utc)
|
||||
if range_type == "monthly":
|
||||
key = ts.strftime("%Y-%m")
|
||||
elif range_type == "quarterly":
|
||||
key = f"{ts.year}-Q{((ts.month - 1) // 3) + 1}"
|
||||
elif range_type == "halfyearly":
|
||||
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
|
||||
elif range_type == "yearly":
|
||||
key = str(ts.year)
|
||||
else:
|
||||
key = ts.strftime("%Y-%m-%d")
|
||||
grouped[key] += e.amount
|
||||
|
||||
labels = sorted(grouped)
|
||||
data = [round(grouped[label], 2) for label in labels]
|
||||
return jsonify({"labels": labels, "expenses": data})
|
||||
if "error" in result:
|
||||
return jsonify({"error": result["error"]}), 400
|
||||
return jsonify(result)
|
||||
|
||||
|
||||
@app.route("/share/<token>")
|
||||
@@ -2079,49 +2104,21 @@ def admin_expenses_data():
|
||||
return jsonify({"error": "Brak uprawnień"}), 403
|
||||
|
||||
range_type = request.args.get("range", "monthly")
|
||||
start_date_str = request.args.get("start_date")
|
||||
end_date_str = request.args.get("end_date")
|
||||
now = datetime.now(timezone.utc)
|
||||
start_date = request.args.get("start_date")
|
||||
end_date = request.args.get("end_date")
|
||||
|
||||
subq = (
|
||||
db.session.query(Expense.list_id, func.max(Expense.added_at).label("latest"))
|
||||
.group_by(Expense.list_id)
|
||||
.subquery()
|
||||
result = get_expenses_aggregated_by_list_created_at(
|
||||
user_only=False,
|
||||
admin=True,
|
||||
show_all=True,
|
||||
range_type=range_type,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
user_id=None,
|
||||
)
|
||||
|
||||
query = db.session.query(Expense).join(
|
||||
subq, (Expense.list_id == subq.c.list_id) & (Expense.added_at == subq.c.latest)
|
||||
)
|
||||
|
||||
if start_date_str and end_date_str:
|
||||
try:
|
||||
start = datetime.strptime(start_date_str, "%Y-%m-%d")
|
||||
end = datetime.strptime(end_date_str, "%Y-%m-%d")
|
||||
query = query.filter(Expense.added_at >= start, Expense.added_at <= end)
|
||||
except ValueError:
|
||||
return jsonify({"error": "Błędny zakres dat"}), 400
|
||||
|
||||
grouped = defaultdict(float)
|
||||
for e in query.all():
|
||||
ts = e.added_at or now
|
||||
|
||||
if range_type == "monthly":
|
||||
key = ts.strftime("%Y-%m")
|
||||
elif range_type == "quarterly":
|
||||
key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}"
|
||||
elif range_type == "halfyearly":
|
||||
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
|
||||
elif range_type == "yearly":
|
||||
key = str(ts.year)
|
||||
else:
|
||||
key = ts.strftime("%Y-%m-%d")
|
||||
|
||||
grouped[key] += e.amount
|
||||
|
||||
labels = sorted(grouped)
|
||||
data = [round(grouped[k], 2) for k in labels]
|
||||
|
||||
return jsonify({"labels": labels, "expenses": data})
|
||||
if "error" in result:
|
||||
return jsonify({"error": result["error"]}), 400
|
||||
return jsonify(result)
|
||||
|
||||
|
||||
@app.route("/admin/promote_user/<int:user_id>")
|
||||
|
Reference in New Issue
Block a user