nowe funkcje i zmiany ux

This commit is contained in:
Mateusz Gruszczyński
2025-07-17 13:35:21 +02:00
parent 133b91073d
commit 377e592f90
9 changed files with 255 additions and 111 deletions

272
app.py
View File

@@ -7,7 +7,8 @@ import sys
import platform
import psutil
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC, timezone
from flask import (
Flask,
render_template,
@@ -46,17 +47,22 @@ from functools import wraps
app = Flask(__name__)
app.config.from_object(Config)
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
SQLALCHEMY_ECHO = True
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme")
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads")
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9")
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234")
SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES", 10080))
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
@@ -79,6 +85,10 @@ static_bp = Blueprint("static_bp", __name__)
active_users = {}
def utcnow():
return datetime.now(timezone.utc)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
@@ -93,7 +103,8 @@ class ShoppingList(db.Model):
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
# expires_at = db.Column(db.DateTime, nullable=True)
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
@@ -103,7 +114,8 @@ class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
name = db.Column(db.String(150), nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
# added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_at = db.Column(db.DateTime, default=utcnow)
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
@@ -271,7 +283,7 @@ def delete_receipts_for_list(list_id):
print(f"Nie udało się usunąć pliku {filename}: {e}")
# zabezpieczenie logowani do systemy - błędne hasła
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
@@ -302,7 +314,8 @@ def attempts_remaining(ip):
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# return User.query.get(int(user_id))
return db.session.get(User, int(user_id))
@app.context_processor
@@ -374,7 +387,8 @@ def file_mtime_filter(path):
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
return datetime.utcnow()
# return datetime.utcnow()
return datetime.now(timezone.utc)
@app.template_filter("filesizeformat")
@@ -433,7 +447,8 @@ def favicon():
@app.route("/")
def main_page():
now = datetime.utcnow()
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
if current_user.is_authenticated:
user_lists = (
@@ -525,7 +540,12 @@ def system_auth():
@app.route("/toggle_archive_list/<int:list_id>")
@login_required
def toggle_archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
@@ -545,26 +565,60 @@ def toggle_archive_list(list_id):
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
@login_required
def edit_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
if request.method == "POST":
new_title = request.form.get("title")
if new_title and new_title.strip():
l.title = new_title.strip()
db.session.commit()
flash("Zaktualizowano tytuł listy", "success")
return redirect(url_for("main_page"))
else:
new_title = request.form.get("title", "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
# Walidacja tytułu
if not new_title:
flash("Podaj poprawny tytuł", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
l.title = new_title
l.is_public = is_public
l.is_temporary = is_temporary
l.is_archived = is_archived
# Obsługa daty wygaśnięcia
if expires_date and expires_time:
try:
combined = f"{expires_date} {expires_time}"
expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
l.expires_at = expires_dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Błędna data lub godzina wygasania", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
else:
l.expires_at = None
db.session.commit()
flash("Zaktualizowano dane listy", "success")
return redirect(url_for("main_page"))
return render_template("edit_my_list.html", list=l)
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
@login_required
def toggle_visibility(list_id):
l = ShoppingList.query.get_or_404(list_id)
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
if request.is_json or request.method == "POST":
return {"error": "Unauthorized"}, 403
@@ -587,15 +641,13 @@ def toggle_visibility(list_id):
return redirect(url_for("main_page"))
from sqlalchemy import func
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username_input = request.form["username"].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password_hash(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
@@ -617,7 +669,12 @@ def create_list():
title = request.form.get("title")
is_temporary = request.form.get("temporary") == "1"
token = generate_share_token(8)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
# expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
expires_at = (
datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None
)
new_list = ShoppingList(
title=title,
owner_id=current_user.id,
@@ -651,7 +708,7 @@ def view_list(list_id):
percent=percent,
expenses=expenses,
total_expense=total_expense,
is_share=False
is_share=False,
)
@@ -661,8 +718,7 @@ def user_expenses():
from sqlalchemy.orm import joinedload
expenses = (
Expense.query
.join(ShoppingList, Expense.list_id == ShoppingList.id)
Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
.options(joinedload(Expense.list))
.filter(ShoppingList.owner_id == current_user.id)
.order_by(Expense.added_at.desc())
@@ -673,7 +729,7 @@ def user_expenses():
{
"title": e.list.title if e.list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at
"added_at": e.added_at,
}
for e in expenses
]
@@ -681,7 +737,6 @@ def user_expenses():
return render_template("user_expenses.html", expense_table=rows)
@app.route("/user/expenses_data")
@login_required
def user_expenses_data():
@@ -689,10 +744,8 @@ def user_expenses_data():
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
query = (
Expense.query
.join(ShoppingList, Expense.list_id == ShoppingList.id)
.filter(ShoppingList.owner_id == current_user.id)
query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id).filter(
ShoppingList.owner_id == current_user.id
)
if start_date and end_date:
@@ -707,7 +760,10 @@ def user_expenses_data():
grouped = defaultdict(float)
for e in expenses:
ts = e.added_at or datetime.utcnow()
# ts = e.added_at or datetime.utcnow()
ts = e.added_at or datetime.now(timezone.utc)
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
@@ -747,7 +803,7 @@ def shared_list(token=None, list_id=None):
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense,
is_share=True
is_share=True,
)
@@ -824,7 +880,6 @@ def all_products():
return {"allproducts": unique_names}
""" @app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
@@ -914,8 +969,8 @@ def uploaded_file(filename):
@login_required
@admin_required
def admin_panel():
now = datetime.now(timezone.utc)
now = datetime.utcnow()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
@@ -933,6 +988,15 @@ def admin_panel():
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
# obliczenie czy wygasła
if l.is_temporary and l.expires_at:
expires_at = l.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
is_expired = expires_at < now
else:
is_expired = False
enriched_lists.append(
{
"list": l,
@@ -942,12 +1006,13 @@ def admin_panel():
"comments_count": comments_count,
"receipts_count": len(receipt_files),
"total_expense": l.total_expense,
"expired": is_expired,
}
)
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased == True)
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
@@ -957,7 +1022,10 @@ def admin_panel():
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_year = datetime.utcnow().year
current_time = datetime.now(timezone.utc)
current_year = current_time.year
current_month = current_time.month
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
@@ -965,7 +1033,6 @@ def admin_panel():
or 0
)
current_month = datetime.utcnow().month
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
@@ -1135,7 +1202,10 @@ def delete_receipt(filename):
def delete_selected_lists():
ids = request.form.getlist("list_ids")
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
# lst = ShoppingList.query.get(int(list_id))
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
@@ -1160,11 +1230,16 @@ def delete_all_items():
@login_required
@admin_required
def edit_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
items = Item.query.filter_by(list_id=list_id).order_by(Item.id.desc()).all()
items = (
db.session.query(Item).filter_by(list_id=list_id).order_by(Item.id.desc()).all()
)
receipt_pattern = f"list_{list_id}_"
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
@@ -1179,9 +1254,11 @@ def edit_list(list_id):
is_archived = "archived" in request.form
is_public = "public" in request.form
is_temporary = "temporary" in request.form
expires_at_raw = request.form.get("expires_at")
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
@@ -1189,18 +1266,22 @@ def edit_list(list_id):
l.is_public = is_public
l.is_temporary = is_temporary
if expires_at_raw:
if expires_date and expires_time:
try:
l.expires_at = datetime.strptime(expires_at_raw, "%Y-%m-%dT%H:%M")
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
l.expires_at = None
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
if User.query.get(new_owner_id_int):
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
@@ -1215,13 +1296,12 @@ def edit_list(list_id):
for expense in expenses:
db.session.delete(expense)
db.session.commit()
new_expense = Expense(list_id=list_id, amount=new_amount)
db.session.add(new_expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
except ValueError:
flash("Niepoprawna kwota", "danger")
return redirect(url_for("edit_list", list_id=list_id))
db.session.add(l)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
@@ -1229,28 +1309,32 @@ def edit_list(list_id):
elif action == "add_item":
item_name = request.form.get("item_name", "").strip()
quantity_str = request.form.get("quantity", "1")
if not item_name:
flash("Podaj nazwę produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
try:
quantity = int(quantity_str)
if quantity < 1:
quantity = 1
quantity = max(1, int(quantity_str))
except ValueError:
quantity = 1
new_item = Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
db.session.add(
Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
)
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == item_name.lower()
).first():
exists = (
db.session.query(SuggestedProduct)
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
@@ -1258,8 +1342,7 @@ def edit_list(list_id):
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item_id = request.form.get("item_id")
item = Item.query.get(item_id)
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
@@ -1269,8 +1352,7 @@ def edit_list(list_id):
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item_id = request.form.get("item_id")
item = Item.query.get(item_id)
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
@@ -1280,8 +1362,7 @@ def edit_list(list_id):
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item_id = request.form.get("item_id")
item = Item.query.get(item_id)
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
@@ -1293,8 +1374,7 @@ def edit_list(list_id):
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item_id = request.form.get("item_id")
item = Item.query.get(item_id)
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
@@ -1317,13 +1397,13 @@ def edit_list(list_id):
)
@app.route("/admin/products")
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
users = User.query.all()
# users = User.query.all()
users = db.session.query(User).all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
@@ -1390,7 +1470,9 @@ def admin_expenses_data():
range_type = request.args.get("range", "monthly")
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
now = datetime.utcnow()
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
labels = []
expenses = []
@@ -1544,7 +1626,9 @@ def healthcheck():
@socketio.on("delete_item")
def handle_delete_item(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
list_id = item.list_id
db.session.delete(item)
@@ -1566,7 +1650,9 @@ def handle_delete_item(data):
@socketio.on("edit_item")
def handle_edit_item(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
new_name = data["new_name"]
new_quantity = data.get("new_quantity", item.quantity)
@@ -1602,7 +1688,9 @@ def handle_join(data):
active_users[room] = set()
active_users[room].add(username)
shopping_list = ShoppingList.query.get(int(data["room"]))
# shopping_list = ShoppingList.query.get(int(data["room"]))
shopping_list = db.session.get(ShoppingList, int(data["room"]))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit("user_joined", {"username": username}, to=room)
@@ -1638,7 +1726,7 @@ def handle_add_item(data):
existing_item = Item.query.filter(
Item.list_id == list_id,
func.lower(Item.name) == name.lower(),
Item.not_purchased == False
Item.not_purchased == False,
).first()
if existing_item:
@@ -1650,7 +1738,7 @@ def handle_add_item(data):
{
"item_id": existing_item.id,
"new_name": existing_item.name,
"new_quantity": existing_item.quantity
"new_quantity": existing_item.quantity,
},
to=str(list_id),
)
@@ -1663,7 +1751,9 @@ def handle_add_item(data):
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == name.lower()).first():
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == name.lower()
).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
@@ -1699,10 +1789,14 @@ def handle_add_item(data):
@socketio.on("check_item")
def handle_check_item(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = True
item.purchased_at = datetime.utcnow()
# item.purchased_at = datetime.utcnow()
item.purchased_at = datetime.now(UTC)
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
@@ -1721,7 +1815,9 @@ def handle_check_item(data):
@socketio.on("uncheck_item")
def handle_uncheck_item(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = False
item.purchased_at = None
@@ -1755,7 +1851,7 @@ def handle_request_full_list(data):
"quantity": item.quantity,
"purchased": item.purchased if not item.not_purchased else False,
"not_purchased": item.not_purchased,
'not_purchased_reason': item.not_purchased_reason,
"not_purchased_reason": item.not_purchased_reason,
"note": item.note or "",
}
)
@@ -1793,7 +1889,9 @@ def handle_add_expense(data):
@socketio.on("mark_not_purchased")
def handle_mark_not_purchased(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
reason = data.get("reason", "")
if item:
item.not_purchased = True
@@ -1808,7 +1906,9 @@ def handle_mark_not_purchased(data):
@socketio.on("unmark_not_purchased")
def handle_unmark_not_purchased(data):
item = Item.query.get(data["item_id"])
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.not_purchased = False
item.purchased = False