dodatki i funckje

This commit is contained in:
Mateusz Gruszczyński
2025-08-16 22:34:45 +02:00
parent bf57b6b4e3
commit d526f392b8
4 changed files with 183 additions and 146 deletions

307
app.py
View File

@@ -54,14 +54,13 @@ from flask_session import Session
from types import SimpleNamespace
from pdf2image import convert_from_bytes
from urllib.parse import urlencode
from typing import Sequence, Any
# OCR
import pytesseract
from pytesseract import Output
import logging
from types import SimpleNamespace
app = Flask(__name__)
app.config.from_object(Config)
@@ -276,8 +275,10 @@ class Receipt(db.Model):
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
uploaded_by = db.Column(db.Integer, db.ForeignKey("user.id"))
shopping_list = db.relationship("ShoppingList", back_populates="receipts")
uploaded_by_user = db.relationship("User", backref="uploaded_receipts")
def hash_password(password):
@@ -519,17 +520,22 @@ def admin_required(f):
return decorated_function
def get_progress(list_id):
total_count, purchased_count = (
def get_progress(list_id: int) -> tuple[int, int, float]:
result = (
db.session.query(
func.count(Item.id), func.sum(case((Item.purchased == True, 1), else_=0))
func.count(Item.id),
func.sum(case((Item.purchased == True, 1), else_=0)),
)
.filter(Item.list_id == list_id)
.first()
)
total_count = total_count or 0
purchased_count = purchased_count or 0
if result is None:
total_count = 0
purchased_count = 0
else:
total_count = result[0] or 0
purchased_count = result[1] or 0
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
@@ -963,7 +969,33 @@ def get_active_months_query(visible_lists_query=None):
def normalize_name(name):
if not name:
return ""
return re.sub(r'\s+', ' ', name).strip().lower()
return re.sub(r"\s+", " ", name).strip().lower()
def get_valid_item_or_404(item_id: int, list_id: int) -> Item:
item = db.session.get(Item, item_id)
if not item or item.list_id != list_id:
abort(404, description="Nie znaleziono produktu")
return item
def paginate_items(
items: Sequence[Any], page: int, per_page: int
) -> tuple[list, int, int]:
total_items = len(items)
total_pages = (total_items + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
return items[start:end], total_items, total_pages
def get_page_args(
default_per_page: int = 100, max_per_page: int = 300
) -> tuple[int, int]:
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", default_per_page, type=int)
per_page = max(1, min(per_page, max_per_page))
return page, per_page
############# OCR ###########################
@@ -1958,32 +1990,36 @@ def all_products():
ItemAlias = aliased(Item)
SuggestedAlias = aliased(SuggestedProduct)
base_query = db.session.query(
func.lower(func.trim(ItemAlias.name)).label("normalized_name"),
func.count(func.distinct(ItemAlias.list_id)).label("count"),
func.min(ItemAlias.name).label("original_name")
).join(
SuggestedAlias,
func.lower(func.trim(ItemAlias.name)) == func.lower(func.trim(SuggestedAlias.name))
).group_by("normalized_name")
base_query = (
db.session.query(
func.lower(func.trim(ItemAlias.name)).label("normalized_name"),
func.count(func.distinct(ItemAlias.list_id)).label("count"),
func.min(ItemAlias.name).label("original_name"),
)
.join(
SuggestedAlias,
func.lower(func.trim(ItemAlias.name))
== func.lower(func.trim(SuggestedAlias.name)),
)
.group_by("normalized_name")
)
if sort == "popularity":
base_query = base_query.order_by(func.count(func.distinct(ItemAlias.list_id)).desc(), "normalized_name")
base_query = base_query.order_by(
func.count(func.distinct(ItemAlias.list_id)).desc(), "normalized_name"
)
else:
base_query = base_query.order_by("normalized_name")
results = base_query.offset(offset).limit(limit).all()
total_count = db.session.query(func.count()).select_from(
base_query.subquery()
).scalar()
total_count = (
db.session.query(func.count()).select_from(base_query.subquery()).scalar()
)
products = [{"name": row.original_name, "count": row.count} for row in results]
return jsonify({
"products": products,
"total_count": total_count
})
return jsonify({"products": products, "total_count": total_count})
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@@ -2032,6 +2068,7 @@ def upload_receipt(list_id):
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
uploaded_by=current_user.id,
)
db.session.add(new_receipt)
db.session.commit()
@@ -2393,17 +2430,28 @@ def add_user():
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
user_data = []
for user in users:
list_count = ShoppingList.query.filter_by(owner_id=user.id).count()
item_count = Item.query.filter_by(added_by=user.id).count()
receipt_count = Receipt.query.filter_by(uploaded_by=user.id).count()
user_data.append(
{
"user": user,
"list_count": list_count,
"item_count": item_count,
"receipt_count": receipt_count,
}
)
total_users = len(users)
return render_template(
"admin/user_management.html",
users=users,
user_count=user_count,
list_count=list_count,
item_count=item_count,
activity_log=activity_log,
user_data=user_data,
total_users=total_users,
)
@@ -2442,20 +2490,25 @@ def delete_user(user_id):
return redirect(url_for("list_users"))
from sqlalchemy.orm import joinedload
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
def admin_receipts(id):
try:
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 24, type=int)
per_page = max(1, min(per_page, 200)) # sanity check
page, per_page = get_page_args(default_per_page=24, max_per_page=200)
if id == "all":
all_filenames = {r.filename for r in Receipt.query.all()}
all_filenames = {
r.filename for r in Receipt.query.with_entities(Receipt.filename).all()
}
pagination = Receipt.query.order_by(Receipt.uploaded_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
pagination = (
Receipt.query.options(joinedload(Receipt.uploaded_by_user))
.order_by(Receipt.uploaded_at.desc())
.paginate(page=page, per_page=per_page, error_out=False)
)
receipts_paginated = pagination.items
@@ -2472,15 +2525,17 @@ def admin_receipts(id):
]
else:
list_id = int(id)
receipts_paginated = (
Receipt.query.filter_by(list_id=list_id)
all_receipts = (
Receipt.query.options(joinedload(Receipt.uploaded_by_user))
.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
receipts_paginated, total_items, total_pages = paginate_items(
all_receipts, page, per_page
)
orphan_files = []
page = 1
total_pages = 1
per_page = len(receipts_paginated) or 1
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
@@ -2622,7 +2677,7 @@ def delete_selected_lists():
@login_required
@admin_required
def edit_list(list_id):
l = db.session.get(
shopping_list = db.session.get(
ShoppingList,
list_id,
options=[
@@ -2634,13 +2689,12 @@ def edit_list(list_id):
],
)
if l is None:
if shopping_list is None:
abort(404)
total_expense = get_total_expense_for_list(l.id)
total_expense = get_total_expense_for_list(shopping_list.id)
categories = Category.query.order_by(Category.name.asc()).all()
selected_categories_ids = {c.id for c in l.categories}
selected_categories_ids = {c.id for c in shopping_list.categories}
if request.method == "POST":
action = request.form.get("action")
@@ -2652,34 +2706,33 @@ def edit_list(list_id):
is_public = "public" in request.form
is_temporary = "temporary" in request.form
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
shopping_list.title = new_title
l.is_archived = is_archived
l.is_public = is_public
l.is_temporary = is_temporary
shopping_list.is_archived = is_archived
shopping_list.is_public = is_public
shopping_list.is_temporary = is_temporary
if expires_date and expires_time:
try:
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
combined = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
shopping_list.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
shopping_list.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
shopping_list.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))
@@ -2690,7 +2743,7 @@ def edit_list(list_id):
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in l.expenses:
for expense in shopping_list.expenses:
db.session.delete(expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
@@ -2702,17 +2755,14 @@ def edit_list(list_id):
if created_month:
try:
year, month = map(int, created_month.split("-"))
l.created_at = datetime(year, month, 1, tzinfo=timezone.utc)
except ValueError:
flash(
"Nieprawidłowy format miesiąca (przeniesienie daty utworzenia)",
"danger",
shopping_list.created_at = datetime(
year, month, 1, tzinfo=timezone.utc
)
except ValueError:
flash("Nieprawidłowy format miesiąca", "danger")
return redirect(url_for("edit_list", list_id=list_id))
update_list_categories_from_form(l, request.form)
db.session.add(l)
update_list_categories_from_form(shopping_list, request.form)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
@@ -2744,7 +2794,6 @@ def edit_list(list_id):
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
@@ -2753,72 +2802,57 @@ def edit_list(list_id):
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
else:
flash("Nie znaleziono produktu", "danger")
item = get_valid_item_or_404(request.form.get("item_id"), list_id)
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
else:
flash("Nie znaleziono produktu", "danger")
item = get_valid_item_or_404(request.form.get("item_id"), list_id)
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
else:
flash("Nie znaleziono produktu", "danger")
item = get_valid_item_or_404(request.form.get("item_id"), list_id)
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
else:
flash("Nie znaleziono produktu", "danger")
item = get_valid_item_or_404(request.form.get("item_id"), list_id)
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "edit_quantity":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
try:
new_quantity = int(request.form.get("quantity"))
if new_quantity > 0:
item.quantity = new_quantity
db.session.commit()
flash("Zmieniono ilość produktu", "success")
except ValueError:
flash("Nieprawidłowa ilość", "danger")
else:
flash("Nie znaleziono produktu", "danger")
item = get_valid_item_or_404(request.form.get("item_id"), list_id)
try:
new_quantity = int(request.form.get("quantity"))
if new_quantity > 0:
item.quantity = new_quantity
db.session.commit()
flash("Zmieniono ilość produktu", "success")
except ValueError:
flash("Nieprawidłowa ilość", "danger")
return redirect(url_for("edit_list", list_id=list_id))
users = User.query.all()
items = l.items
receipts = l.receipts
items = shopping_list.items
receipts = shopping_list.receipts
return render_template(
"admin/edit_list.html",
list=l,
list=shopping_list,
total_expense=total_expense,
users=users,
items=items,
@@ -2832,14 +2866,10 @@ def edit_list(list_id):
@login_required
@admin_required
def list_products():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 100, type=int)
per_page = max(1, min(per_page, 300))
page, per_page = get_page_args()
all_items = (
Item.query.options(
joinedload(Item.added_by_user),
)
Item.query.options(joinedload(Item.added_by_user))
.order_by(Item.id.desc())
.all()
)
@@ -2852,32 +2882,27 @@ def list_products():
unique_items.append(item)
seen_names.add(key)
usage_counts = dict(
usage_results = (
db.session.query(
func.lower(Item.name),
func.coalesce(func.sum(Item.quantity), 0)
func.lower(Item.name).label("name"),
func.count(func.distinct(Item.list_id)).label("usage_count"),
)
.group_by(func.lower(Item.name))
.all()
)
usage_counts = {row.name: row.usage_count for row in usage_results}
total_items = len(unique_items)
total_pages = (total_items + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
items = unique_items[start:end]
items, total_items, total_pages = paginate_items(unique_items, page, per_page)
user_ids = {item.added_by for item in items if item.added_by}
users = User.query.filter(User.id.in_(user_ids)).all() if user_ids else []
users_dict = {u.id: u.username for u in users}
suggestions = SuggestedProduct.query.all()
all_suggestions_dict = {
normalize_name(s.name): s
for s in suggestions
if s.name and s.name.strip()
normalize_name(s.name): s for s in suggestions if s.name and s.name.strip()
}
used_suggestion_names = {normalize_name(i.name) for i in unique_items}
suggestions_dict = {
@@ -2885,12 +2910,13 @@ def list_products():
for name in used_suggestion_names
if name in all_suggestions_dict
}
orphan_suggestions = [
s for name, s in all_suggestions_dict.items()
s
for name, s in all_suggestions_dict.items()
if name not in used_suggestion_names
]
query_string = urlencode({k: v for k, v in request.args.items() if k != "page"})
synced_names = set(suggestions_dict.keys())
@@ -2906,11 +2932,10 @@ def list_products():
query_string=query_string,
total_items=total_items,
usage_counts=usage_counts,
synced_names=synced_names
synced_names=synced_names,
)
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
@login_required
def sync_suggestion_ajax(item_id):
@@ -3015,9 +3040,7 @@ def recalculate_filesizes_all():
@login_required
@admin_required
def admin_mass_edit_categories():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 50, type=int)
per_page = max(1, min(per_page, 200)) # ogranicz do sensownych wartości
page, per_page = get_page_args(default_per_page=50, max_per_page=200)
lists_query = ShoppingList.query.options(
joinedload(ShoppingList.categories),

View File

@@ -83,9 +83,11 @@
<div class="row mb-4">
<div class="col-md-6">
<label class="form-label">📆 Utworzono</label>
<p class="form-control-plaintext text-white">
{{ list.created_at.strftime('%Y-%m-%d') }}
</p>
<div>
<span class="badge rounded-pill bg-success rounded-pill text-dark ms-1">
{{ list.created_at.strftime('%Y-%m-%d') }}
</span>
</div>
</div>
<div class="col-md-6">
<label class="form-label">📁 Przenieś do miesiąca (format: rok-miesiąc np 2026-01)</label>

View File

@@ -26,6 +26,10 @@
<div class="card-body text-center">
<p class="small text-truncate mb-1">{{ r.filename }}</p>
<p class="small mb-1">Wgrano: {{ r.uploaded_at.strftime('%Y-%m-%d %H:%M') }}</p>
<p class="small mb-1">
Uploader: {{ r.uploaded_by_user.username if r.uploaded_by_user else "?" }}
</p>
{% if r.filesize and r.filesize >= 1024 * 1024 %}
<p class="small mb-1">Rozmiar: {{ (r.filesize / 1024 / 1024) | round(2) }} MB</p>
{% elif r.filesize %}

View File

@@ -37,11 +37,16 @@
<th>ID</th>
<th>Login</th>
<th>Rola</th>
<th>Listy</th>
<th>Produkty</th>
<th>Paragony</th>
<th>Akcje</th>
</tr>
</thead>
<tbody>
{% for user in users %}
{% for entry in user_data %}
{% set user = entry.user %}
<tr>
<td>{{ user.id }}</td>
<td class="fw-bold">{{ user.username }}</td>
@@ -52,6 +57,9 @@
<span class="badge rounded-pill bg-secondary">Użytkownik</span>
{% endif %}
</td>
<td>{{ entry.list_count }}</td>
<td>{{ entry.item_count }}</td>
<td>{{ entry.receipt_count }}</td>
<td>
<button class="btn btn-sm btn-outline-warning me-1" data-bs-toggle="modal"
data-bs-target="#resetPasswordModal" data-user-id="{{ user.id }}" data-username="{{ user.username }}">