fix w zakresie ostatnih 30 dni

This commit is contained in:
Mateusz Gruszczyński
2025-08-22 11:43:05 +02:00
parent 8685e65d22
commit 5415e3435e

103
app.py
View File

@@ -409,9 +409,11 @@ app.register_blueprint(static_bp)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def generate_version_token():
return secrets.token_hex(8)
def get_list_details(list_id):
shopping_list = ShoppingList.query.options(
joinedload(ShoppingList.items).joinedload(Item.added_by_user),
@@ -508,7 +510,7 @@ def save_resized_image(file, path):
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
#image.save(new_path, **WEBP_SAVE_PARAMS)
# image.save(new_path, **WEBP_SAVE_PARAMS)
image.save(new_path, format="WEBP", method=6, quality=100)
except Exception as e:
@@ -876,6 +878,19 @@ def get_total_expenses_grouped_by_category(
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cat_id_int)
if not start_date and not end_date:
today = datetime.now(timezone.utc).date()
if range_type == "last30days":
dt_start = today - timedelta(days=29)
dt_end = today + timedelta(days=1)
start_date = dt_start.strftime("%Y-%m-%d")
end_date = dt_end.strftime("%Y-%m-%d")
elif range_type == "currentmonth":
dt_start = today.replace(day=1)
dt_end = today + timedelta(days=1)
start_date = dt_start.strftime("%Y-%m-%d")
end_date = dt_end.strftime("%Y-%m-%d")
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
@@ -969,7 +984,7 @@ def save_pdf_as_webp(file, path):
y_offset += img.height
new_path = path.rsplit(".", 1)[0] + ".webp"
#combined.save(new_path, **WEBP_SAVE_PARAMS)
# combined.save(new_path, **WEBP_SAVE_PARAMS)
combined.save(new_path, format="WEBP")
except Exception as e:
@@ -1742,14 +1757,17 @@ def view_list(list_id):
is_owner = current_user.id == shopping_list.owner_id
if not is_owner:
flash("Nie jesteś właścicielem listy, przekierowano do widoku publicznego.", "warning")
flash(
"Nie jesteś właścicielem listy, przekierowano do widoku publicznego.",
"warning",
)
if current_user.is_admin:
flash("W celu modyfikacji listy, przejdź do panelu administracyjnego.", "info")
flash(
"W celu modyfikacji listy, przejdź do panelu administracyjnego.", "info"
)
return redirect(url_for("shared_list", token=shopping_list.share_token))
shopping_list, items, receipts, expenses, total_expense = get_list_details(
list_id
)
shopping_list, items, receipts, expenses, total_expense = get_list_details(list_id)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
@@ -1947,9 +1965,7 @@ def shared_list(token=None, list_id=None):
list_id = shopping_list.id
total_expense = get_total_expense_for_list(list_id)
shopping_list, items, receipts, expenses, total_expense = get_list_details(
list_id
)
shopping_list, items, receipts, expenses, total_expense = get_list_details(list_id)
shopping_list.category_badges = [
{"name": c.name, "color": category_to_color(c.name)}
@@ -2062,20 +2078,18 @@ def all_products():
{"products": products, "total_count": (total_count if products else len(products))}
) """
@app.route("/all_products")
def all_products():
sort = request.args.get("sort", "popularity")
limit = request.args.get("limit", type=int) or 100
offset = request.args.get("offset", type=int) or 0
products_from_items = (
db.session.query(
func.lower(func.trim(Item.name)).label("normalized_name"),
func.min(Item.name).label("display_name"),
func.count(func.distinct(Item.list_id)).label("count"),
)
.group_by(func.lower(func.trim(Item.name)))
)
products_from_items = db.session.query(
func.lower(func.trim(Item.name)).label("normalized_name"),
func.min(Item.name).label("display_name"),
func.count(func.distinct(Item.list_id)).label("count"),
).group_by(func.lower(func.trim(Item.name)))
products_from_suggested = (
db.session.query(
@@ -2083,29 +2097,32 @@ def all_products():
func.min(SuggestedProduct.name).label("display_name"),
db.literal(1).label("count"),
)
.filter(~func.lower(func.trim(SuggestedProduct.name)).in_(
db.session.query(func.lower(func.trim(Item.name)))
))
.filter(
~func.lower(func.trim(SuggestedProduct.name)).in_(
db.session.query(func.lower(func.trim(Item.name)))
)
)
.group_by(func.lower(func.trim(SuggestedProduct.name)))
)
union_q = products_from_items.union_all(products_from_suggested).subquery()
final_q = (
db.session.query(
union_q.c.normalized_name,
union_q.c.display_name,
func.sum(union_q.c.count).label("count"),
)
.group_by(union_q.c.normalized_name, union_q.c.display_name)
)
final_q = db.session.query(
union_q.c.normalized_name,
union_q.c.display_name,
func.sum(union_q.c.count).label("count"),
).group_by(union_q.c.normalized_name, union_q.c.display_name)
if sort == "alphabetical":
final_q = final_q.order_by(func.lower(union_q.c.display_name).asc())
else:
final_q = final_q.order_by(func.sum(union_q.c.count).desc(), func.lower(union_q.c.display_name).asc())
final_q = final_q.order_by(
func.sum(union_q.c.count).desc(), func.lower(union_q.c.display_name).asc()
)
total_count = db.session.query(func.count()).select_from(final_q.subquery()).scalar()
total_count = (
db.session.query(func.count()).select_from(final_q.subquery()).scalar()
)
products = final_q.offset(offset).limit(limit).all()
out = [{"name": row.display_name, "count": row.count} for row in products]
@@ -2113,7 +2130,6 @@ def all_products():
return jsonify({"products": out, "total_count": total_count})
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@login_required
def upload_receipt(list_id):
@@ -2165,7 +2181,10 @@ def upload_receipt(list_id):
return receipt_error(f"Błąd zapisu do bazy: {str(e)}")
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
url = url_for("uploaded_file", filename=webp_filename) + f"?v={new_receipt.version_token or '0'}"
url = (
url_for("uploaded_file", filename=webp_filename)
+ f"?v={new_receipt.version_token or '0'}"
)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
@@ -2287,7 +2306,7 @@ def analyze_receipts_for_list(list_id):
}
)
#if not already_added:
# if not already_added:
total += value
return jsonify({"results": results, "total": round(total, 2)})
@@ -2422,11 +2441,17 @@ def admin_panel():
avg_list_expense = round(total_expense / list_count, 2) if list_count else 0
if db.engine.name == "sqlite":
timestamp_diff = func.strftime("%s", Item.purchased_at) - func.strftime("%s", Item.added_at)
timestamp_diff = func.strftime("%s", Item.purchased_at) - func.strftime(
"%s", Item.added_at
)
elif db.engine.name in ("postgresql", "postgres"):
timestamp_diff = func.extract("epoch", Item.purchased_at) - func.extract("epoch", Item.added_at)
timestamp_diff = func.extract("epoch", Item.purchased_at) - func.extract(
"epoch", Item.added_at
)
elif db.engine.name in ("mysql", "mariadb"):
timestamp_diff = func.timestampdiff(text("SECOND"), Item.added_at, Item.purchased_at)
timestamp_diff = func.timestampdiff(
text("SECOND"), Item.added_at, Item.purchased_at
)
else:
timestamp_diff = None
@@ -2686,9 +2711,7 @@ def admin_receipts(id):
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
total_filesize = (
db.session.query(func.sum(Receipt.filesize)).scalar() or 0
)
total_filesize = db.session.query(func.sum(Receipt.filesize)).scalar() or 0
page_filesize = sum(r.filesize or 0 for r in receipts_paginated)