From 3b94f93892fc0e266e83872cde40f5bf80a1421b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Tue, 15 Jul 2025 22:48:25 +0200 Subject: [PATCH] funckja niekupione --- app.py | 1198 +++++++++++++++++++++++-------------- static/js/functions.js | 85 ++- static/js/live.js | 4 + static/js/sockets.js | 8 + templates/list.html | 13 +- templates/list_share.html | 35 +- 6 files changed, 870 insertions(+), 473 deletions(-) diff --git a/app.py b/app.py index 2403f1a..c9c34d2 100644 --- a/app.py +++ b/app.py @@ -8,10 +8,31 @@ import platform import psutil from datetime import datetime, timedelta -from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session, jsonify, make_response +from flask import ( + Flask, + render_template, + redirect, + url_for, + request, + flash, + Blueprint, + send_from_directory, + request, + abort, + session, + jsonify, + make_response, +) from markupsafe import Markup from flask_sqlalchemy import SQLAlchemy -from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from flask_login import ( + LoginManager, + UserMixin, + login_user, + login_required, + logout_user, + current_user, +) from flask_compress import Compress from flask_socketio import SocketIO, emit, join_room from werkzeug.security import generate_password_hash, check_password_hash @@ -25,17 +46,17 @@ from functools import wraps app = Flask(__name__) app.config.from_object(Config) -app.config['COMPRESS_ALGORITHM'] = ['zstd', 'br', 'gzip', 'deflate'] +app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"] app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) -SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme') -DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') -DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') -UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads') -ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} -AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9') -AUTH_COOKIE_MAX_AGE = app.config.get('AUTH_COOKIE_MAX_AGE', 86400) -HEALTHCHECK_TOKEN = app.config.get('HEALTHCHECK_TOKEN', 'alamapsaikota1234') +SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme") +DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin") +DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123") +UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads") +ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"} +AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9") +AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400) +HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234") os.makedirs(UPLOAD_FOLDER, exist_ok=True) @@ -46,65 +67,78 @@ TIME_WINDOW = 60 * 60 db = SQLAlchemy(app) socketio = SocketIO(app, async_mode="eventlet") login_manager = LoginManager(app) -login_manager.login_view = 'login' +login_manager.login_view = "login" # flask-compress compress = Compress() compress.init_app(app) -static_bp = Blueprint('static_bp', __name__) +static_bp = Blueprint("static_bp", __name__) # dla live active_users = {} + class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) + class ShoppingList(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(150), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) - owner_id = db.Column(db.Integer, db.ForeignKey('user.id')) + owner_id = db.Column(db.Integer, db.ForeignKey("user.id")) is_temporary = db.Column(db.Boolean, default=False) share_token = db.Column(db.String(64), unique=True, nullable=True) expires_at = db.Column(db.DateTime, nullable=True) - owner = db.relationship('User', backref='lists', lazy=True) + owner = db.relationship("User", backref="lists", lazy=True) is_archived = db.Column(db.Boolean, default=False) is_public = db.Column(db.Boolean, default=True) + + class Item(db.Model): id = db.Column(db.Integer, primary_key=True) - list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id')) + list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) name = db.Column(db.String(150), nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) - added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) + added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) purchased = db.Column(db.Boolean, default=False) purchased_at = db.Column(db.DateTime, nullable=True) quantity = db.Column(db.Integer, default=1) note = db.Column(db.Text, nullable=True) + not_purchased = db.Column(db.Boolean, default=False) + not_purchased_reason = db.Column(db.Text, nullable=True) + class SuggestedProduct(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(150), unique=True, nullable=False) usage_count = db.Column(db.Integer, default=0) + + class Expense(db.Model): id = db.Column(db.Integer, primary_key=True) - list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id')) + list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) amount = db.Column(db.Float, nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) receipt_filename = db.Column(db.String(255), nullable=True) + with app.app_context(): db.create_all() from werkzeug.security import generate_password_hash + admin = User.query.filter_by(is_admin=True).first() - username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') - password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') + username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin") + password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123") password_hash = generate_password_hash(password) if admin: - if admin.username != username or not check_password_hash(admin.password_hash, password): + if admin.username != username or not check_password_hash( + admin.password_hash, password + ): admin.username = username admin.password_hash = password_hash db.session.commit() @@ -113,68 +147,78 @@ with app.app_context(): db.session.add(admin) db.session.commit() -@static_bp.route('/static/js/') + +@static_bp.route("/static/js/") def serve_js(filename): - response = send_from_directory('static/js', filename) + response = send_from_directory("static/js", filename) response.cache_control.no_cache = True response.cache_control.no_store = True response.cache_control.must_revalidate = True - #response.expires = 0 - response.pragma = 'no-cache' - response.headers.pop('Content-Disposition', None) - response.headers.pop('Etag', None) + # response.expires = 0 + response.pragma = "no-cache" + response.headers.pop("Content-Disposition", None) + response.headers.pop("Etag", None) return response -@static_bp.route('/static/css/') + +@static_bp.route("/static/css/") def serve_css(filename): - response = send_from_directory('static/css', filename) - response.headers['Cache-Control'] = 'public, max-age=3600' - response.headers.pop('Content-Disposition', None) - response.headers.pop('Etag', None) + response = send_from_directory("static/css", filename) + response.headers["Cache-Control"] = "public, max-age=3600" + response.headers.pop("Content-Disposition", None) + response.headers.pop("Etag", None) return response -@static_bp.route('/static/lib/js/') + +@static_bp.route("/static/lib/js/") def serve_js_lib(filename): - response = send_from_directory('static/lib/js', filename) - response.headers['Cache-Control'] = 'public, max-age=604800' - response.headers.pop('Content-Disposition', None) - response.headers.pop('Etag', None) + response = send_from_directory("static/lib/js", filename) + response.headers["Cache-Control"] = "public, max-age=604800" + response.headers.pop("Content-Disposition", None) + response.headers.pop("Etag", None) return response + # CSS z cache na tydzień -@static_bp.route('/static/lib/css/') +@static_bp.route("/static/lib/css/") def serve_css_lib(filename): - response = send_from_directory('static/lib/css', filename) - response.headers['Cache-Control'] = 'public, max-age=604800' - response.headers.pop('Content-Disposition', None) - response.headers.pop('Etag', None) + response = send_from_directory("static/lib/css", filename) + response.headers["Cache-Control"] = "public, max-age=604800" + response.headers.pop("Content-Disposition", None) + response.headers.pop("Etag", None) return response + app.register_blueprint(static_bp) + def allowed_file(filename): - return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS + return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS + def get_list_details(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).all() receipt_pattern = f"list_{list_id}" - all_files = os.listdir(app.config['UPLOAD_FOLDER']) + all_files = os.listdir(app.config["UPLOAD_FOLDER"]) receipt_files = [f for f in all_files if receipt_pattern in f] expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) return shopping_list, items, receipt_files, expenses, total_expense + def generate_share_token(length=8): """Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4).""" return secrets.token_hex(length // 2) + def check_list_public(shopping_list): if not shopping_list.is_public: - flash('Ta lista nie jest publicznie dostępna', 'danger') + flash("Ta lista nie jest publicznie dostępna", "danger") return False return True + def enrich_list_data(l): items = Item.query.filter_by(list_id=l.id).all() l.total_count = len(items) @@ -183,23 +227,30 @@ def enrich_list_data(l): l.total_expense = sum(e.amount for e in expenses) return l + def save_resized_image(file, path: str, max_size=(2000, 2000)): img = Image.open(file) img.thumbnail(max_size) img.save(path) -def redirect_with_flash(message: str, category: str = 'info', endpoint: str = 'main_page'): + +def redirect_with_flash( + message: str, category: str = "info", endpoint: str = "main_page" +): flash(message, category) return redirect(url_for(endpoint)) + def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_admin: - return redirect_with_flash('Brak uprawnień do tej sekcji.', 'danger') + return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger") return f(*args, **kwargs) + return decorated_function + def get_progress(list_id): items = Item.query.filter_by(list_id=list_id).all() total_count = len(items) @@ -207,9 +258,10 @@ def get_progress(list_id): percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return purchased_count, total_count, percent + def delete_receipts_for_list(list_id): receipt_pattern = f"list_{list_id}_" - upload_folder = app.config['UPLOAD_FOLDER'] + upload_folder = app.config["UPLOAD_FOLDER"] for filename in os.listdir(upload_folder): if filename.startswith(receipt_pattern): try: @@ -217,6 +269,7 @@ def delete_receipts_for_list(list_id): except Exception as e: print(f"Nie udało się usunąć pliku {filename}: {e}") + # zabezpieczenie logowani do systemy - błędne hasła def is_ip_blocked(ip): now = time.time() @@ -225,6 +278,7 @@ def is_ip_blocked(ip): attempts.popleft() return len(attempts) >= MAX_ATTEMPTS + def register_failed_attempt(ip): now = time.time() attempts = failed_login_attempts[ip] @@ -232,37 +286,46 @@ def register_failed_attempt(ip): attempts.popleft() attempts.append(now) + def reset_failed_attempts(ip): failed_login_attempts[ip].clear() + def attempts_remaining(ip): attempts = failed_login_attempts[ip] return max(0, MAX_ATTEMPTS - len(attempts)) + + #################################################### + @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) + @app.context_processor def inject_time(): return dict(time=time) + @app.context_processor def inject_has_authorized_cookie(): - return {'has_authorized_cookie': 'authorized' in request.cookies} + return {"has_authorized_cookie": "authorized" in request.cookies} + @app.context_processor def inject_is_blocked(): ip = request.access_route[0] - return {'is_blocked': is_ip_blocked(ip)} + return {"is_blocked": is_ip_blocked(ip)} + @app.before_request def require_system_password(): endpoint = request.endpoint # Wyjątki: lib js/css zawsze przepuszczamy - if endpoint in ('static_bp.serve_js_lib', 'static_bp.serve_css_lib'): + if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"): return ip = request.access_route[0] @@ -272,34 +335,39 @@ def require_system_password(): if endpoint is None: return - if endpoint in ('system_auth', 'healthcheck'): + if endpoint in ("system_auth", "healthcheck"): return - if 'authorized' not in request.cookies and not endpoint.startswith('login') and endpoint != 'favicon': + if ( + "authorized" not in request.cookies + and not endpoint.startswith("login") + and endpoint != "favicon" + ): # Dla serve_js przepuszczamy tylko toasts.js - if endpoint == 'static_bp.serve_js': + if endpoint == "static_bp.serve_js": requested_file = request.view_args.get("filename", "") if requested_file == "toasts.js": return if requested_file.endswith(".js"): - return redirect(url_for('system_auth', next=request.url)) + return redirect(url_for("system_auth", next=request.url)) return # Blokujemy pozostałe static_bp - if endpoint.startswith('static_bp.'): + if endpoint.startswith("static_bp."): return - if request.path == '/': - return redirect(url_for('system_auth')) + if request.path == "/": + return redirect(url_for("system_auth")) from urllib.parse import urlparse, urlunparse + parsed = urlparse(request.url) fixed_url = urlunparse(parsed._replace(netloc=request.host)) - return redirect(url_for('system_auth', next=fixed_url)) + return redirect(url_for("system_auth", next=fixed_url)) -@app.template_filter('filemtime') +@app.template_filter("filemtime") def file_mtime_filter(path): try: t = os.path.getmtime(path) @@ -307,11 +375,12 @@ def file_mtime_filter(path): except Exception: return datetime.utcnow() -@app.template_filter('filesizeformat') + +@app.template_filter("filesizeformat") def filesizeformat_filter(path): try: size = os.path.getsize(path) - for unit in ['B', 'KB', 'MB', 'GB']: + for unit in ["B", "KB", "MB", "GB"]: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 @@ -319,203 +388,260 @@ def filesizeformat_filter(path): except Exception: return "N/A" + @app.errorhandler(404) def page_not_found(e): - return render_template( - 'errors.html', - code=404, - title="Strona nie znaleziona", - message="Ups! Podana strona nie istnieje lub została przeniesiona." - ), 404 + return ( + render_template( + "errors.html", + code=404, + title="Strona nie znaleziona", + message="Ups! Podana strona nie istnieje lub została przeniesiona.", + ), + 404, + ) + @app.errorhandler(403) def forbidden(e): - return render_template( - 'errors.html', - code=403, - title="Brak dostępu", - message="Nie masz uprawnień do wyświetlenia tej strony." - ), 403 + return ( + render_template( + "errors.html", + code=403, + title="Brak dostępu", + message="Nie masz uprawnień do wyświetlenia tej strony.", + ), + 403, + ) -@app.route('/favicon.ico') + +@app.route("/favicon.ico") def favicon_ico(): - return redirect(url_for('static', filename='favicon.svg')) + return redirect(url_for("static", filename="favicon.svg")) -@app.route('/favicon.svg') + +@app.route("/favicon.svg") def favicon(): - svg = ''' + svg = """ 🛒 - ''' - return svg, 200, {'Content-Type': 'image/svg+xml'} + """ + return svg, 200, {"Content-Type": "image/svg+xml"} -@app.route('/') + +@app.route("/") def main_page(): now = datetime.utcnow() if current_user.is_authenticated: - user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter( - (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) - ).order_by(ShoppingList.created_at.desc()).all() + user_lists = ( + ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False) + .filter((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)) + .order_by(ShoppingList.created_at.desc()) + .all() + ) - archived_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True).order_by(ShoppingList.created_at.desc()).all() + archived_lists = ( + ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True) + .order_by(ShoppingList.created_at.desc()) + .all() + ) - public_lists = ShoppingList.query.filter( - ShoppingList.is_public == True, - ShoppingList.owner_id != current_user.id, - ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), - ShoppingList.is_archived == False - ).order_by(ShoppingList.created_at.desc()).all() + public_lists = ( + ShoppingList.query.filter( + ShoppingList.is_public == True, + ShoppingList.owner_id != current_user.id, + ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), + ShoppingList.is_archived == False, + ) + .order_by(ShoppingList.created_at.desc()) + .all() + ) else: user_lists = [] archived_lists = [] - public_lists = ShoppingList.query.filter( - ShoppingList.is_public == True, - ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), - ShoppingList.is_archived == False - ).order_by(ShoppingList.created_at.desc()).all() + public_lists = ( + ShoppingList.query.filter( + ShoppingList.is_public == True, + ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), + ShoppingList.is_archived == False, + ) + .order_by(ShoppingList.created_at.desc()) + .all() + ) for l in user_lists + public_lists + archived_lists: enrich_list_data(l) - return render_template("main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists) + return render_template( + "main.html", + user_lists=user_lists, + public_lists=public_lists, + archived_lists=archived_lists, + ) -@app.route('/system-auth', methods=['GET', 'POST']) + +@app.route("/system-auth", methods=["GET", "POST"]) def system_auth(): - if current_user.is_authenticated or request.cookies.get('authorized') == AUTHORIZED_COOKIE_VALUE: - flash('Jesteś już zalogowany lub autoryzowany.', 'info') - return redirect(url_for('main_page')) + if ( + current_user.is_authenticated + or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE + ): + flash("Jesteś już zalogowany lub autoryzowany.", "info") + return redirect(url_for("main_page")) ip = request.access_route[0] - next_page = request.args.get('next') or url_for('main_page') + next_page = request.args.get("next") or url_for("main_page") if is_ip_blocked(ip): - flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger') - return render_template('system_auth.html'), 403 + flash( + "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", + "danger", + ) + return render_template("system_auth.html"), 403 - if request.method == 'POST': - if request.form['password'] == SYSTEM_PASSWORD: + if request.method == "POST": + if request.form["password"] == SYSTEM_PASSWORD: reset_failed_attempts(ip) resp = redirect(next_page) - max_age = app.config.get('AUTH_COOKIE_MAX_AGE', 86400) - resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE, max_age=max_age) + max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400) + resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age) return resp else: register_failed_attempt(ip) if is_ip_blocked(ip): - flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger') - return render_template('system_auth.html'), 403 + flash( + "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", + "danger", + ) + return render_template("system_auth.html"), 403 remaining = attempts_remaining(ip) - flash(f'Nieprawidłowe hasło. Pozostało {remaining} prób.', 'warning') - return render_template('system_auth.html') + flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning") + return render_template("system_auth.html") -@app.route('/toggle_archive_list/') + +@app.route("/toggle_archive_list/") @login_required def toggle_archive_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: - return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger') + return redirect_with_flash("Nie masz uprawnień do tej listy", "danger") - archive = request.args.get('archive', 'true').lower() == 'true' + archive = request.args.get("archive", "true").lower() == "true" if archive: l.is_archived = True - flash(f'Lista „{l.title}” została zarchiwizowana.', 'success') + flash(f"Lista „{l.title}” została zarchiwizowana.", "success") else: l.is_archived = False - flash(f'Lista „{l.title}” została przywrócona.', 'success') + flash(f"Lista „{l.title}” została przywrócona.", "success") db.session.commit() - return redirect(url_for('main_page')) + return redirect(url_for("main_page")) -@app.route('/edit_my_list/', methods=['GET', 'POST']) + +@app.route("/edit_my_list/", methods=["GET", "POST"]) @login_required def edit_my_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: - return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger') + return redirect_with_flash("Nie masz uprawnień do tej listy", "danger") - if request.method == 'POST': - new_title = request.form.get('title') + if request.method == "POST": + new_title = request.form.get("title") if new_title and new_title.strip(): l.title = new_title.strip() db.session.commit() - flash('Zaktualizowano tytuł listy', 'success') - return redirect(url_for('main_page')) + flash("Zaktualizowano tytuł listy", "success") + return redirect(url_for("main_page")) else: - flash('Podaj poprawny tytuł', 'danger') - return render_template('edit_my_list.html', list=l) + flash("Podaj poprawny tytuł", "danger") + return render_template("edit_my_list.html", list=l) -@app.route('/toggle_visibility/', methods=['GET', 'POST']) + +@app.route("/toggle_visibility/", methods=["GET", "POST"]) @login_required def toggle_visibility(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: - if request.is_json or request.method == 'POST': - return {'error': 'Unauthorized'}, 403 - flash('Nie masz uprawnień do tej listy', 'danger') - return redirect(url_for('main_page')) + if request.is_json or request.method == "POST": + return {"error": "Unauthorized"}, 403 + flash("Nie masz uprawnień do tej listy", "danger") + return redirect(url_for("main_page")) l.is_public = not l.is_public db.session.commit() share_url = f"{request.url_root}share/{l.share_token}" - if request.is_json or request.method == 'POST': - return {'is_public': l.is_public, 'share_url': share_url} + if request.is_json or request.method == "POST": + return {"is_public": l.is_public, "share_url": share_url} if l.is_public: - flash('Lista została udostępniona publicznie', 'success') + flash("Lista została udostępniona publicznie", "success") else: - flash('Lista została ukryta przed gośćmi', 'info') + flash("Lista została ukryta przed gośćmi", "info") + + return redirect(url_for("main_page")) - return redirect(url_for('main_page')) from sqlalchemy import func -@app.route('/login', methods=['GET', 'POST']) -def login(): - if request.method == 'POST': - username_input = request.form['username'].lower() - user = User.query.filter(func.lower(User.username) == username_input).first() - if user and check_password_hash(user.password_hash, request.form['password']): - login_user(user) - flash('Zalogowano pomyślnie', 'success') - return redirect(url_for('main_page')) - flash('Nieprawidłowy login lub hasło', 'danger') - return render_template('login.html') -@app.route('/logout') +@app.route("/login", methods=["GET", "POST"]) +def login(): + if request.method == "POST": + username_input = request.form["username"].lower() + user = User.query.filter(func.lower(User.username) == username_input).first() + if user and check_password_hash(user.password_hash, request.form["password"]): + login_user(user) + flash("Zalogowano pomyślnie", "success") + return redirect(url_for("main_page")) + flash("Nieprawidłowy login lub hasło", "danger") + return render_template("login.html") + + +@app.route("/logout") @login_required def logout(): logout_user() - flash('Wylogowano pomyślnie', 'success') - return redirect(url_for('main_page')) + flash("Wylogowano pomyślnie", "success") + return redirect(url_for("main_page")) -@app.route('/create', methods=['POST']) + +@app.route("/create", methods=["POST"]) @login_required def create_list(): - title = request.form.get('title') - is_temporary = 'temporary' in request.form + title = request.form.get("title") + is_temporary = "temporary" in request.form token = generate_share_token(8) expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None - new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at) + new_list = ShoppingList( + title=title, + owner_id=current_user.id, + is_temporary=is_temporary, + share_token=token, + expires_at=expires_at, + ) db.session.add(new_list) db.session.commit() - flash('Utworzono nową listę', 'success') - return redirect(url_for('view_list', list_id=new_list.id)) + flash("Utworzono nową listę", "success") + return redirect(url_for("view_list", list_id=new_list.id)) -@app.route('/list/') + +@app.route("/list/") @login_required def view_list(list_id): - shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id) + shopping_list, items, receipt_files, expenses, total_expense = get_list_details( + list_id + ) total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return render_template( - 'list.html', + "list.html", list=shopping_list, items=items, receipt_files=receipt_files, @@ -523,37 +649,43 @@ def view_list(list_id): purchased_count=purchased_count, percent=percent, expenses=expenses, - total_expense=total_expense + total_expense=total_expense, ) -@app.route('/share/') -@app.route('/guest-list/') + +@app.route("/share/") +@app.route("/guest-list/") def shared_list(token=None, list_id=None): if token: shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() if not check_list_public(shopping_list): - return redirect(url_for('main_page')) + return redirect(url_for("main_page")) list_id = shopping_list.id - shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id) + shopping_list, items, receipt_files, expenses, total_expense = get_list_details( + list_id + ) return render_template( - 'list_share.html', + "list_share.html", list=shopping_list, items=items, receipt_files=receipt_files, expenses=expenses, - total_expense=total_expense + total_expense=total_expense, ) -@app.route('/copy/') + +@app.route("/copy/") @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) token = generate_share_token(8) - new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token) + new_list = ShoppingList( + title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token + ) db.session.add(new_list) db.session.commit() original_items = Item.query.filter_by(list_id=original.id).all() @@ -561,27 +693,36 @@ def copy_list(list_id): copy_item = Item(list_id=new_list.id, name=item.name) db.session.add(copy_item) db.session.commit() - flash('Skopiowano listę', 'success') - return redirect(url_for('view_list', list_id=new_list.id)) + flash("Skopiowano listę", "success") + return redirect(url_for("view_list", list_id=new_list.id)) -@app.route('/suggest_products') + +@app.route("/suggest_products") def suggest_products(): - query = request.args.get('q', '') + query = request.args.get("q", "") suggestions = [] if query: - suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all() - return {'suggestions': [s.name for s in suggestions]} + suggestions = ( + SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%")) + .limit(5) + .all() + ) + return {"suggestions": [s.name for s in suggestions]} -@app.route('/all_products') + +@app.route("/all_products") def all_products(): - query = request.args.get('q', '') + query = request.args.get("q", "") top_products_query = SuggestedProduct.query if query: - top_products_query = top_products_query.filter(SuggestedProduct.name.ilike(f'%{query}%')) + top_products_query = top_products_query.filter( + SuggestedProduct.name.ilike(f"%{query}%") + ) top_products = ( - top_products_query - .order_by(SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc()) + top_products_query.order_by( + SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc() + ) .limit(20) .all() ) @@ -589,21 +730,17 @@ def all_products(): top_names = [s.name for s in top_products] rest_query = SuggestedProduct.query if query: - rest_query = rest_query.filter(SuggestedProduct.name.ilike(f'%{query}%')) + rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%")) if top_names: rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names)) - rest_products = ( - rest_query - .order_by(SuggestedProduct.name.asc()) - .limit(200) - .all() - ) + rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all() all_names = top_names + [s.name for s in rest_products] - return {'allproducts': all_names} + return {"allproducts": all_names} + """ @app.route('/upload_receipt/', methods=['POST']) def upload_receipt(list_id): @@ -629,67 +766,78 @@ def upload_receipt(list_id): flash('Niedozwolony format pliku', 'danger') return redirect(request.referrer) """ -@app.route('/upload_receipt/', methods=['POST']) + +@app.route("/upload_receipt/", methods=["POST"]) def upload_receipt(list_id): - if 'receipt' not in request.files: - if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest': - return jsonify({'success': False, 'message': 'Brak pliku'}), 400 - flash('Brak pliku', 'danger') + if "receipt" not in request.files: + if ( + request.is_json + or request.headers.get("X-Requested-With") == "XMLHttpRequest" + ): + return jsonify({"success": False, "message": "Brak pliku"}), 400 + flash("Brak pliku", "danger") return redirect(request.referrer) - file = request.files['receipt'] + file = request.files["receipt"] - if file.filename == '': - if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest': - return jsonify({'success': False, 'message': 'Nie wybrano pliku'}), 400 - flash('Nie wybrano pliku', 'danger') + if file.filename == "": + if ( + request.is_json + or request.headers.get("X-Requested-With") == "XMLHttpRequest" + ): + return jsonify({"success": False, "message": "Nie wybrano pliku"}), 400 + flash("Nie wybrano pliku", "danger") return redirect(request.referrer) if file and allowed_file(file.filename): filename = secure_filename(file.filename) full_filename = f"list_{list_id}_{filename}" - file_path = os.path.join(app.config['UPLOAD_FOLDER'], full_filename) + file_path = os.path.join(app.config["UPLOAD_FOLDER"], full_filename) save_resized_image(file, file_path) - if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest': - url = url_for('uploaded_file', filename=full_filename) + if ( + request.is_json + or request.headers.get("X-Requested-With") == "XMLHttpRequest" + ): + url = url_for("uploaded_file", filename=full_filename) - socketio.emit('receipt_added', {'url': url}, to=str(list_id)) + socketio.emit("receipt_added", {"url": url}, to=str(list_id)) - return jsonify({'success': True, 'url': url}) + return jsonify({"success": True, "url": url}) - flash('Wgrano paragon', 'success') + flash("Wgrano paragon", "success") return redirect(request.referrer) - if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest': - return jsonify({'success': False, 'message': 'Niedozwolony format pliku'}), 400 - flash('Niedozwolony format pliku', 'danger') + if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": + return jsonify({"success": False, "message": "Niedozwolony format pliku"}), 400 + flash("Niedozwolony format pliku", "danger") return redirect(request.referrer) -@app.route('/uploads/') +@app.route("/uploads/") def uploaded_file(filename): - response = send_from_directory(app.config['UPLOAD_FOLDER'], filename) - response.headers['Cache-Control'] = 'public, max-age=2592000, immutable' - response.headers.pop('Pragma', None) - response.headers.pop('Content-Disposition', None) + response = send_from_directory(app.config["UPLOAD_FOLDER"], filename) + response.headers["Cache-Control"] = "public, max-age=2592000, immutable" + response.headers.pop("Pragma", None) + response.headers.pop("Content-Disposition", None) mime, _ = mimetypes.guess_type(filename) if mime: - response.headers['Content-Type'] = mime + response.headers["Content-Type"] = mime return response -@app.route('/admin') + +@app.route("/admin") @login_required @admin_required def admin_panel(): - + now = datetime.utcnow() user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all() - all_files = os.listdir(app.config['UPLOAD_FOLDER']) + all_files = os.listdir(app.config["UPLOAD_FOLDER"]) enriched_lists = [] for l in all_lists: @@ -698,22 +846,24 @@ def admin_panel(): total_count = l.total_count purchased_count = l.purchased_count percent = (purchased_count / total_count * 100) if total_count > 0 else 0 - comments_count = len([i for i in items if i.note and i.note.strip() != '']) + comments_count = len([i for i in items if i.note and i.note.strip() != ""]) receipt_pattern = f"list_{l.id}" receipt_files = [f for f in all_files if receipt_pattern in f] - enriched_lists.append({ - 'list': l, - 'total_count': total_count, - 'purchased_count': purchased_count, - 'percent': round(percent), - 'comments_count': comments_count, - 'receipts_count': len(receipt_files), - 'total_expense': l.total_expense - }) + enriched_lists.append( + { + "list": l, + "total_count": total_count, + "purchased_count": purchased_count, + "percent": round(percent), + "comments_count": comments_count, + "receipts_count": len(receipt_files), + "total_expense": l.total_expense, + } + ) top_products = ( - db.session.query(Item.name, func.count(Item.id).label('count')) + db.session.query(Item.name, func.count(Item.id).label("count")) .filter(Item.purchased == True) .group_by(Item.name) .order_by(func.count(Item.id).desc()) @@ -727,23 +877,25 @@ def admin_panel(): current_year = datetime.utcnow().year year_expense_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == current_year) - .scalar() or 0 + .filter(extract("year", Expense.added_at) == current_year) + .scalar() + or 0 ) current_month = datetime.utcnow().month month_expense_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == current_year) - .filter(extract('month', Expense.added_at) == current_month) - .scalar() or 0 + .filter(extract("year", Expense.added_at) == current_year) + .filter(extract("month", Expense.added_at) == current_month) + .scalar() + or 0 ) process = psutil.Process(os.getpid()) app_mem = process.memory_info().rss // (1024 * 1024) # MB return render_template( - 'admin/admin_panel.html', + "admin/admin_panel.html", user_count=user_count, list_count=list_count, item_count=item_count, @@ -760,43 +912,45 @@ def admin_panel(): ) -@app.route('/admin/delete_list/') +@app.route("/admin/delete_list/") @login_required @admin_required def delete_list(list_id): - + delete_receipts_for_list(list_id) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() Expense.query.filter_by(list_id=list_to_delete.id).delete() db.session.delete(list_to_delete) db.session.commit() - flash(f'Usunięto listę: {list_to_delete.title}', 'success') - return redirect(url_for('admin_panel')) + flash(f"Usunięto listę: {list_to_delete.title}", "success") + return redirect(url_for("admin_panel")) -@app.route('/admin/add_user', methods=['POST']) + +@app.route("/admin/add_user", methods=["POST"]) @login_required @admin_required def add_user(): - username = request.form['username'].lower() - password = request.form['password'] + username = request.form["username"].lower() + password = request.form["password"] if not username or not password: - flash('Wypełnij wszystkie pola', 'danger') - return redirect(url_for('list_users')) + flash("Wypełnij wszystkie pola", "danger") + return redirect(url_for("list_users")) if User.query.filter(func.lower(User.username) == username).first(): - flash('Użytkownik o takiej nazwie już istnieje', 'warning') - return redirect(url_for('list_users')) + flash("Użytkownik o takiej nazwie już istnieje", "warning") + return redirect(url_for("list_users")) hashed_password = generate_password_hash(password) new_user = User(username=username, password_hash=hashed_password) db.session.add(new_user) db.session.commit() - flash('Dodano nowego użytkownika', 'success') - return redirect(url_for('list_users')) + flash("Dodano nowego użytkownika", "success") + return redirect(url_for("list_users")) -@app.route('/admin/users') + +@app.route("/admin/users") @login_required @admin_required def list_users(): @@ -805,25 +959,34 @@ def list_users(): list_count = ShoppingList.query.count() item_count = Item.query.count() activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"] - return render_template('admin/user_management.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log) + return render_template( + "admin/user_management.html", + users=users, + user_count=user_count, + list_count=list_count, + item_count=item_count, + activity_log=activity_log, + ) -@app.route('/admin/change_password/', methods=['POST']) + +@app.route("/admin/change_password/", methods=["POST"]) @login_required @admin_required def reset_password(user_id): user = User.query.get_or_404(user_id) - new_password = request.form['password'] + new_password = request.form["password"] if not new_password: - flash('Podaj nowe hasło', 'danger') - return redirect(url_for('list_users')) + flash("Podaj nowe hasło", "danger") + return redirect(url_for("list_users")) user.password_hash = generate_password_hash(new_password) db.session.commit() - flash(f'Hasło dla użytkownika {user.username} zostało zaktualizowane', 'success') - return redirect(url_for('list_users')) + flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success") + return redirect(url_for("list_users")) -@app.route('/admin/delete_user/') + +@app.route("/admin/delete_user/") @login_required @admin_required def delete_user(user_id): @@ -832,19 +995,20 @@ def delete_user(user_id): if user.is_admin: admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1: - flash('Nie można usunąć ostatniego administratora.', 'danger') - return redirect(url_for('list_users')) + flash("Nie można usunąć ostatniego administratora.", "danger") + return redirect(url_for("list_users")) db.session.delete(user) db.session.commit() - flash('Użytkownik usunięty', 'success') - return redirect(url_for('list_users')) + flash("Użytkownik usunięty", "success") + return redirect(url_for("list_users")) -@app.route('/admin/receipts/') + +@app.route("/admin/receipts/") @login_required @admin_required def admin_receipts(id): - all_files = os.listdir(app.config['UPLOAD_FOLDER']) + all_files = os.listdir(app.config["UPLOAD_FOLDER"]) image_files = [f for f in all_files if allowed_file(f)] if id == "all": @@ -856,35 +1020,37 @@ def admin_receipts(id): filtered_files = [f for f in image_files if f.startswith(receipt_prefix)] except ValueError: flash("Nieprawidłowe ID listy.", "danger") - return redirect(url_for('admin_panel')) + return redirect(url_for("admin_panel")) return render_template( - 'admin/receipts.html', + "admin/receipts.html", image_files=filtered_files, - upload_folder=app.config['UPLOAD_FOLDER'] + upload_folder=app.config["UPLOAD_FOLDER"], ) -@app.route('/admin/delete_receipt/') + +@app.route("/admin/delete_receipt/") @login_required @admin_required def delete_receipt(filename): - file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) + file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename) if os.path.exists(file_path): os.remove(file_path) - flash('Plik usunięty', 'success') + flash("Plik usunięty", "success") else: - flash('Plik nie istnieje', 'danger') - - next_url = request.args.get('next') + flash("Plik nie istnieje", "danger") + + next_url = request.args.get("next") if next_url: return redirect(next_url) - return redirect(url_for('admin_receipts')) + return redirect(url_for("admin_receipts")) -@app.route('/admin/delete_selected_lists', methods=['POST']) + +@app.route("/admin/delete_selected_lists", methods=["POST"]) @login_required @admin_required -def delete_selected_lists(): - ids = request.form.getlist('list_ids') +def delete_selected_lists(): + ids = request.form.getlist("list_ids") for list_id in ids: lst = ShoppingList.query.get(int(list_id)) if lst: @@ -893,19 +1059,21 @@ def delete_selected_lists(): Expense.query.filter_by(list_id=lst.id).delete() db.session.delete(lst) db.session.commit() - flash('Usunięto wybrane listy', 'success') - return redirect(url_for('admin_panel')) + flash("Usunięto wybrane listy", "success") + return redirect(url_for("admin_panel")) -@app.route('/admin/delete_all_items') + +@app.route("/admin/delete_all_items") @login_required @admin_required -def delete_all_items(): +def delete_all_items(): Item.query.delete() db.session.commit() - flash('Usunięto wszystkie produkty', 'success') - return redirect(url_for('admin_panel')) + flash("Usunięto wszystkie produkty", "success") + return redirect(url_for("admin_panel")) -@app.route('/admin/edit_list/', methods=['GET', 'POST']) + +@app.route("/admin/edit_list/", methods=["GET", "POST"]) @login_required @admin_required def edit_list(list_id): @@ -917,18 +1085,18 @@ def edit_list(list_id): # Pobranie listy plików paragonów receipt_pattern = f"list_{list_id}_" - all_files = os.listdir(app.config['UPLOAD_FOLDER']) + all_files = os.listdir(app.config["UPLOAD_FOLDER"]) receipts = [f for f in all_files if f.startswith(receipt_pattern)] - if request.method == 'POST': - action = request.form.get('action') + if request.method == "POST": + action = request.form.get("action") - if action == 'save': - new_title = request.form.get('title', '').strip() - new_amount_str = request.form.get('amount') - is_archived = 'archived' in request.form - is_public = 'public' in request.form - new_owner_id = request.form.get('owner_id') + if action == "save": + new_title = request.form.get("title", "").strip() + new_amount_str = request.form.get("amount") + is_archived = "archived" in request.form + is_public = "public" in request.form + new_owner_id = request.form.get("owner_id") if new_title: l.title = new_title @@ -942,11 +1110,11 @@ def edit_list(list_id): if User.query.get(new_owner_id_int): l.owner_id = new_owner_id_int else: - flash('Wybrany użytkownik nie istnieje', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Wybrany użytkownik nie istnieje", "danger") + return redirect(url_for("edit_list", list_id=list_id)) except ValueError: - flash('Niepoprawny ID użytkownika', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Niepoprawny ID użytkownika", "danger") + return redirect(url_for("edit_list", list_id=list_id)) if new_amount_str: try: @@ -958,19 +1126,19 @@ def edit_list(list_id): db.session.add(new_expense) db.session.commit() except ValueError: - flash('Niepoprawna kwota', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Niepoprawna kwota", "danger") + return redirect(url_for("edit_list", list_id=list_id)) db.session.commit() - flash('Zapisano zmiany listy', 'success') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Zapisano zmiany listy", "success") + return redirect(url_for("edit_list", list_id=list_id)) - elif action == 'add_item': - item_name = request.form.get('item_name', '').strip() - quantity_str = request.form.get('quantity', '1') + elif action == "add_item": + item_name = request.form.get("item_name", "").strip() + quantity_str = request.form.get("quantity", "1") if not item_name: - flash('Podaj nazwę produktu', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Podaj nazwę produktu", "danger") + return redirect(url_for("edit_list", list_id=list_id)) try: quantity = int(quantity_str) @@ -979,50 +1147,58 @@ def edit_list(list_id): except ValueError: quantity = 1 - new_item = Item(list_id=list_id, name=item_name, quantity=quantity, added_by=current_user.id) + new_item = Item( + list_id=list_id, + name=item_name, + quantity=quantity, + added_by=current_user.id, + ) db.session.add(new_item) - if not SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == item_name.lower()).first(): + if not SuggestedProduct.query.filter( + func.lower(SuggestedProduct.name) == item_name.lower() + ).first(): db.session.add(SuggestedProduct(name=item_name)) db.session.commit() - flash('Dodano produkt', 'success') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Dodano produkt", "success") + return redirect(url_for("edit_list", list_id=list_id)) - elif action == 'delete_item': - item_id = request.form.get('item_id') + elif action == "delete_item": + item_id = request.form.get("item_id") item = Item.query.get(item_id) if item and item.list_id == list_id: db.session.delete(item) db.session.commit() - flash('Usunięto produkt', 'success') + flash("Usunięto produkt", "success") else: - flash('Nie znaleziono produktu', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Nie znaleziono produktu", "danger") + return redirect(url_for("edit_list", list_id=list_id)) - elif action == 'toggle_purchased': - item_id = request.form.get('item_id') + elif action == "toggle_purchased": + item_id = request.form.get("item_id") item = Item.query.get(item_id) if item and item.list_id == list_id: item.purchased = not item.purchased db.session.commit() - flash('Zmieniono status oznaczenia produktu', 'success') + flash("Zmieniono status oznaczenia produktu", "success") else: - flash('Nie znaleziono produktu', 'danger') - return redirect(url_for('edit_list', list_id=list_id)) + flash("Nie znaleziono produktu", "danger") + return redirect(url_for("edit_list", list_id=list_id)) # Przekazanie receipts do szablonu return render_template( - 'admin/edit_list.html', + "admin/edit_list.html", list=l, total_expense=total_expense, users=users, items=items, receipts=receipts, - upload_folder=app.config['UPLOAD_FOLDER'] + upload_folder=app.config["UPLOAD_FOLDER"], ) -@app.route('/admin/products') + +@app.route("/admin/products") @login_required @admin_required def list_products(): @@ -1035,68 +1211,83 @@ def list_products(): suggestions_dict = {s.name.lower(): s for s in suggestions} return render_template( - 'admin/list_products.html', + "admin/list_products.html", items=items, users_dict=users_dict, - suggestions_dict=suggestions_dict + suggestions_dict=suggestions_dict, ) -@app.route('/admin/sync_suggestion/', methods=['POST']) + +@app.route("/admin/sync_suggestion/", methods=["POST"]) @login_required def sync_suggestion_ajax(item_id): if not current_user.is_admin: - return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403 + return jsonify({"success": False, "message": "Brak uprawnień"}), 403 item = Item.query.get_or_404(item_id) - existing = SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == item.name.lower()).first() + existing = SuggestedProduct.query.filter( + func.lower(SuggestedProduct.name) == item.name.lower() + ).first() if not existing: new_suggestion = SuggestedProduct(name=item.name) db.session.add(new_suggestion) db.session.commit() - return jsonify({'success': True, 'message': f'Utworzono sugestię dla produktu: {item.name}'}) + return jsonify( + { + "success": True, + "message": f"Utworzono sugestię dla produktu: {item.name}", + } + ) else: - return jsonify({'success': True, 'message': f'Sugestia dla produktu „{item.name}” już istnieje.'}) + return jsonify( + { + "success": True, + "message": f"Sugestia dla produktu „{item.name}” już istnieje.", + } + ) -@app.route('/admin/delete_suggestion/', methods=['POST']) + +@app.route("/admin/delete_suggestion/", methods=["POST"]) @login_required def delete_suggestion_ajax(suggestion_id): if not current_user.is_admin: - return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403 + return jsonify({"success": False, "message": "Brak uprawnień"}), 403 suggestion = SuggestedProduct.query.get_or_404(suggestion_id) db.session.delete(suggestion) db.session.commit() - return jsonify({'success': True, 'message': 'Sugestia została usunięta.'}) + return jsonify({"success": True, "message": "Sugestia została usunięta."}) -@app.route('/admin/expenses_data') + +@app.route("/admin/expenses_data") @login_required def admin_expenses_data(): if not current_user.is_admin: - return jsonify({'error': 'Brak uprawnień'}), 403 + return jsonify({"error": "Brak uprawnień"}), 403 - range_type = request.args.get('range', 'monthly') - start_date_str = request.args.get('start_date') - end_date_str = request.args.get('end_date') + range_type = request.args.get("range", "monthly") + start_date_str = request.args.get("start_date") + end_date_str = request.args.get("end_date") now = datetime.utcnow() labels = [] expenses = [] if start_date_str and end_date_str: - start_date = datetime.strptime(start_date_str, '%Y-%m-%d') - end_date = datetime.strptime(end_date_str, '%Y-%m-%d') + start_date = datetime.strptime(start_date_str, "%Y-%m-%d") + end_date = datetime.strptime(end_date_str, "%Y-%m-%d") expenses_query = ( db.session.query( - extract('year', Expense.added_at).label('year'), - extract('month', Expense.added_at).label('month'), - func.sum(Expense.amount).label('total') + extract("year", Expense.added_at).label("year"), + extract("month", Expense.added_at).label("month"), + func.sum(Expense.amount).label("total"), ) .filter(Expense.added_at >= start_date, Expense.added_at <= end_date) - .group_by('year', 'month') - .order_by('year', 'month') + .group_by("year", "month") + .order_by("year", "month") .all() ) @@ -1105,138 +1296,159 @@ def admin_expenses_data(): labels.append(label) expenses.append(round(row.total, 2)) - response = make_response(jsonify({'labels': labels, 'expenses': expenses})) - response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + response = make_response(jsonify({"labels": labels, "expenses": expenses})) + response.headers["Cache-Control"] = ( + "no-store, no-cache, must-revalidate, max-age=0" + ) return response - if range_type == 'monthly': + if range_type == "monthly": for i in range(11, -1, -1): - year = (now - timedelta(days=i*30)).year - month = (now - timedelta(days=i*30)).month + year = (now - timedelta(days=i * 30)).year + month = (now - timedelta(days=i * 30)).month label = f"{month:02d}/{year}" labels.append(label) month_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == year) - .filter(extract('month', Expense.added_at) == month) - .scalar() or 0 + .filter(extract("year", Expense.added_at) == year) + .filter(extract("month", Expense.added_at) == month) + .scalar() + or 0 ) expenses.append(round(month_sum, 2)) - elif range_type == 'quarterly': + elif range_type == "quarterly": for i in range(3, -1, -1): - quarter_start = now - timedelta(days=i*90) + quarter_start = now - timedelta(days=i * 90) year = quarter_start.year quarter = (quarter_start.month - 1) // 3 + 1 label = f"Q{quarter}/{year}" quarter_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == year) - .filter((extract('month', Expense.added_at) - 1)//3 + 1 == quarter) - .scalar() or 0 + .filter(extract("year", Expense.added_at) == year) + .filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter) + .scalar() + or 0 ) labels.append(label) expenses.append(round(quarter_sum, 2)) - elif range_type == 'halfyearly': + elif range_type == "halfyearly": for i in range(1, -1, -1): - half_start = now - timedelta(days=i*180) + half_start = now - timedelta(days=i * 180) year = half_start.year half = 1 if half_start.month <= 6 else 2 label = f"H{half}/{year}" half_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == year) + .filter(extract("year", Expense.added_at) == year) .filter( - (extract('month', Expense.added_at) <= 6) if half == 1 else (extract('month', Expense.added_at) > 6) + (extract("month", Expense.added_at) <= 6) + if half == 1 + else (extract("month", Expense.added_at) > 6) ) - .scalar() or 0 + .scalar() + or 0 ) labels.append(label) expenses.append(round(half_sum, 2)) - elif range_type == 'yearly': + elif range_type == "yearly": for i in range(4, -1, -1): year = now.year - i label = str(year) year_sum = ( db.session.query(func.sum(Expense.amount)) - .filter(extract('year', Expense.added_at) == year) - .scalar() or 0 + .filter(extract("year", Expense.added_at) == year) + .scalar() + or 0 ) labels.append(label) expenses.append(round(year_sum, 2)) - response = make_response(jsonify({'labels': labels, 'expenses': expenses})) + response = make_response(jsonify({"labels": labels, "expenses": expenses})) response.headers["Cache-Control"] = "no-store, no-cache" return response -@app.route('/admin/promote_user/') + +@app.route("/admin/promote_user/") @login_required @admin_required def promote_user(user_id): user = User.query.get_or_404(user_id) user.is_admin = True db.session.commit() - flash(f'Użytkownik {user.username} został ustawiony jako admin.', 'success') - return redirect(url_for('list_users')) + flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success") + return redirect(url_for("list_users")) -@app.route('/admin/demote_user/') + +@app.route("/admin/demote_user/") @login_required @admin_required def demote_user(user_id): user = User.query.get_or_404(user_id) - + if user.id == current_user.id: - flash('Nie możesz zdegradować samego siebie!', 'danger') - return redirect(url_for('list_users')) + flash("Nie możesz zdegradować samego siebie!", "danger") + return redirect(url_for("list_users")) admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1 and user.is_admin: - flash('Nie można zdegradować. Musi pozostać co najmniej jeden administrator.', 'danger') - return redirect(url_for('list_users')) + flash( + "Nie można zdegradować. Musi pozostać co najmniej jeden administrator.", + "danger", + ) + return redirect(url_for("list_users")) user.is_admin = False db.session.commit() - flash(f'Użytkownik {user.username} został zdegradowany.', 'success') - return redirect(url_for('list_users')) + flash(f"Użytkownik {user.username} został zdegradowany.", "success") + return redirect(url_for("list_users")) -@app.route('/healthcheck') + +@app.route("/healthcheck") def healthcheck(): header_token = request.headers.get("X-Internal-Check") - correct_token = app.config.get('HEALTHCHECK_TOKEN') + correct_token = app.config.get("HEALTHCHECK_TOKEN") if header_token != correct_token: abort(404) - return 'OK', 200 + return "OK", 200 + # ========================================================================================= # SOCKET.IO # ========================================================================================= -@socketio.on('delete_item') + +@socketio.on("delete_item") def handle_delete_item(data): - item = Item.query.get(data['item_id']) + item = Item.query.get(data["item_id"]) if item: list_id = item.list_id db.session.delete(item) db.session.commit() - emit('item_deleted', {'item_id': item.id}, to=str(item.list_id)) + emit("item_deleted", {"item_id": item.id}, to=str(item.list_id)) purchased_count, total_count, percent = get_progress(list_id) - emit('progress_updated', { - 'purchased_count': purchased_count, - 'total_count': total_count, - 'percent': percent - }, to=str(list_id)) + emit( + "progress_updated", + { + "purchased_count": purchased_count, + "total_count": total_count, + "percent": percent, + }, + to=str(list_id), + ) -@socketio.on('edit_item') + +@socketio.on("edit_item") def handle_edit_item(data): - item = Item.query.get(data['item_id']) - new_name = data['new_name'] - new_quantity = data.get('new_quantity', item.quantity) + item = Item.query.get(data["item_id"]) + new_name = data["new_name"] + new_quantity = data.get("new_quantity", item.quantity) if item and new_name.strip(): item.name = new_name.strip() @@ -1252,45 +1464,48 @@ def handle_edit_item(data): db.session.commit() - emit('item_edited', { - 'item_id': item.id, - 'new_name': item.name, - 'new_quantity': item.quantity - }, to=str(item.list_id)) + emit( + "item_edited", + {"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity}, + to=str(item.list_id), + ) -@socketio.on('join_list') + +@socketio.on("join_list") def handle_join(data): global active_users - room = str(data['room']) - username = data.get('username', 'Gość') + room = str(data["room"]) + username = data.get("username", "Gość") join_room(room) if room not in active_users: active_users[room] = set() active_users[room].add(username) - shopping_list = ShoppingList.query.get(int(data['room'])) + shopping_list = ShoppingList.query.get(int(data["room"])) list_title = shopping_list.title if shopping_list else "Twoja lista" - emit('user_joined', {'username': username}, to=room) - emit('user_list', {'users': list(active_users[room])}, to=room) - emit('joined_confirmation', {'room': room, 'list_title': list_title}) + emit("user_joined", {"username": username}, to=room) + emit("user_list", {"users": list(active_users[room])}, to=room) + emit("joined_confirmation", {"room": room, "list_title": list_title}) -@socketio.on('disconnect') + +@socketio.on("disconnect") def handle_disconnect(sid): global active_users username = current_user.username if current_user.is_authenticated else "Gość" for room, users in active_users.items(): if username in users: users.remove(username) - emit('user_left', {'username': username}, to=room) - emit('user_list', {'users': list(users)}, to=room) + emit("user_left", {"username": username}, to=room) + emit("user_list", {"users": list(users)}, to=room) -@socketio.on('add_item') + +@socketio.on("add_item") def handle_add_item(data): - list_id = data['list_id'] - name = data['name'] - quantity = data.get('quantity', 1) + list_id = data["list_id"] + name = data["name"] + quantity = data.get("quantity", 1) try: quantity = int(quantity) @@ -1303,7 +1518,7 @@ def handle_add_item(data): list_id=list_id, name=name, quantity=quantity, - added_by=current_user.id if current_user.is_authenticated else None + added_by=current_user.id if current_user.is_authenticated else None, ) db.session.add(new_item) @@ -1313,24 +1528,36 @@ def handle_add_item(data): db.session.commit() - emit('item_added', { - 'id': new_item.id, - 'name': new_item.name, - 'quantity': new_item.quantity, - 'added_by': current_user.username if current_user.is_authenticated else 'Gość' - }, to=str(list_id), include_self=True) + emit( + "item_added", + { + "id": new_item.id, + "name": new_item.name, + "quantity": new_item.quantity, + "added_by": ( + current_user.username if current_user.is_authenticated else "Gość" + ), + }, + to=str(list_id), + include_self=True, + ) purchased_count, total_count, percent = get_progress(list_id) - emit('progress_updated', { - 'purchased_count': purchased_count, - 'total_count': total_count, - 'percent': percent - }, to=str(list_id)) + emit( + "progress_updated", + { + "purchased_count": purchased_count, + "total_count": total_count, + "percent": percent, + }, + to=str(list_id), + ) -@socketio.on('check_item') + +@socketio.on("check_item") def handle_check_item(data): - item = Item.query.get(data['item_id']) + item = Item.query.get(data["item_id"]) if item: item.purchased = True item.purchased_at = datetime.utcnow() @@ -1338,16 +1565,21 @@ def handle_check_item(data): purchased_count, total_count, percent = get_progress(item.list_id) - emit('item_checked', {'item_id': item.id}, to=str(item.list_id)) - emit('progress_updated', { - 'purchased_count': purchased_count, - 'total_count': total_count, - 'percent': percent - }, to=str(item.list_id)) + emit("item_checked", {"item_id": item.id}, to=str(item.list_id)) + emit( + "progress_updated", + { + "purchased_count": purchased_count, + "total_count": total_count, + "percent": percent, + }, + to=str(item.list_id), + ) -@socketio.on('uncheck_item') + +@socketio.on("uncheck_item") def handle_uncheck_item(data): - item = Item.query.get(data['item_id']) + item = Item.query.get(data["item_id"]) if item: item.purchased = False item.purchased_at = None @@ -1355,55 +1587,93 @@ def handle_uncheck_item(data): purchased_count, total_count, percent = get_progress(item.list_id) - emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id)) - emit('progress_updated', { - 'purchased_count': purchased_count, - 'total_count': total_count, - 'percent': percent - }, to=str(item.list_id)) + emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id)) + emit( + "progress_updated", + { + "purchased_count": purchased_count, + "total_count": total_count, + "percent": percent, + }, + to=str(item.list_id), + ) -@socketio.on('request_full_list') + +@socketio.on("request_full_list") def handle_request_full_list(data): - list_id = data['list_id'] + list_id = data["list_id"] items = Item.query.filter_by(list_id=list_id).all() items_data = [] for item in items: - items_data.append({ - 'id': item.id, - 'name': item.name, - 'quantity': item.quantity, - 'purchased': item.purchased, - 'note': item.note or '' - }) + items_data.append( + { + "id": item.id, + "name": item.name, + "quantity": item.quantity, + "purchased": item.purchased if not item.not_purchased else False, + "not_purchased": item.not_purchased, + "note": item.note or "", + } + ) - emit('full_list', {'items': items_data}, to=request.sid) + emit("full_list", {"items": items_data}, to=request.sid) -@socketio.on('update_note') + +@socketio.on("update_note") def handle_update_note(data): - item_id = data['item_id'] - note = data['note'] + item_id = data["item_id"] + note = data["note"] item = Item.query.get(item_id) if item: item.note = note - db.session.commit() - emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id)) + db.session.commit() + emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id)) -@socketio.on('add_expense') + +@socketio.on("add_expense") def handle_add_expense(data): - list_id = data['list_id'] - amount = data['amount'] + list_id = data["list_id"] + amount = data["amount"] new_expense = Expense(list_id=list_id, amount=amount) db.session.add(new_expense) db.session.commit() - total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0 + total = ( + db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() + or 0 + ) + + emit("expense_added", {"amount": amount, "total": total}, to=str(list_id)) + + +@socketio.on("mark_not_purchased") +def handle_mark_not_purchased(data): + item = Item.query.get(data["item_id"]) + reason = data.get("reason", "") + if item: + item.not_purchased = True + item.not_purchased_reason = reason + db.session.commit() + emit( + "item_marked_not_purchased", + {"item_id": item.id, "reason": reason}, + to=str(item.list_id), + ) + + +@socketio.on("unmark_not_purchased") +def handle_unmark_not_purchased(data): + item = Item.query.get(data["item_id"]) + if item: + item.not_purchased = False + item.purchased = False + item.purchased_at = None + item.not_purchased_reason = None + db.session.commit() + emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id)) - emit('expense_added', { - 'amount': amount, - 'total': total - }, to=str(list_id)) """ @socketio.on('receipt_uploaded') def handle_receipt_uploaded(data): @@ -1414,10 +1684,12 @@ def handle_receipt_uploaded(data): 'url': url }, to=str(list_id), include_self=False) """ -@app.cli.command('create_db') + +@app.cli.command("create_db") def create_db(): db.create_all() - print('Database created.') + print("Database created.") -if __name__ == '__main__': - socketio.run(app, host='0.0.0.0', port=8000, debug=True) \ No newline at end of file + +if __name__ == "__main__": + socketio.run(app, host="0.0.0.0", port=8000, debug=True) diff --git a/static/js/functions.js b/static/js/functions.js index 45f6811..e88c39a 100644 --- a/static/js/functions.js +++ b/static/js/functions.js @@ -254,6 +254,14 @@ function toggleVisibility(listId) { }); } +function markNotPurchasedModal(e, id) { + e.stopPropagation(); + const reason = prompt("Podaj powód oznaczenia jako niekupione:"); + if (reason !== null) { + socket.emit('mark_not_purchased', { item_id: id, reason: reason }); + } +} + function showToast(message, type = 'primary') { const toastContainer = document.getElementById('toast-container'); const toast = document.createElement('div'); @@ -278,7 +286,7 @@ function isListDifferent(oldItems, newItems) { return false; } -function updateListSmoothly(newItems) { +/* function updateListSmoothly(newItems) { const itemsContainer = document.getElementById('items'); const existingItemsMap = new Map(); @@ -354,6 +362,81 @@ function updateListSmoothly(newItems) { itemsContainer.innerHTML = ''; itemsContainer.appendChild(fragment); + updateProgressBar(); + toggleEmptyPlaceholder(); + applyHidePurchased(); +} */ + +function updateListSmoothly(newItems) { + const itemsContainer = document.getElementById('items'); + const existingItemsMap = new Map(); + + Array.from(itemsContainer.querySelectorAll('li')).forEach(li => { + const id = parseInt(li.id.replace('item-', ''), 10); + existingItemsMap.set(id, li); + }); + + const fragment = document.createDocumentFragment(); + + newItems.forEach(item => { + // 🔥 Logujemy każdy item + console.log('Item:', item.name, 'Purchased:', item.purchased, 'Not purchased:', item.not_purchased); + + let li = existingItemsMap.get(item.id); + let quantityBadge = ''; + if (item.quantity && item.quantity > 1) { + quantityBadge = `x${item.quantity}`; + } + + if (!li) { + li = document.createElement('li'); + li.className = `list-group-item d-flex justify-content-between align-items-center flex-wrap clickable-item`; + li.id = `item-${item.id}`; + } + + // Ustaw klasy tła + li.className = `list-group-item d-flex justify-content-between align-items-center flex-wrap clickable-item ${ + item.purchased ? 'bg-success text-white' : + item.not_purchased ? 'bg-warning text-dark' : 'item-not-checked' + }`; + + // HTML wewnętrzny + li.innerHTML = ` +
+ ${!item.not_purchased ? ` + + ` : ` + 🚫 + `} + ${item.name} ${quantityBadge} + ${item.note ? `[ ${item.note} ]` : ''} +
+
+ ${item.not_purchased ? ` + + ` : ` + + + `} +
+ `; + + fragment.appendChild(li); + }); + + itemsContainer.innerHTML = ''; + itemsContainer.appendChild(fragment); + updateProgressBar(); toggleEmptyPlaceholder(); applyHidePurchased(); diff --git a/static/js/live.js b/static/js/live.js index bed3dbb..4bae47b 100644 --- a/static/js/live.js +++ b/static/js/live.js @@ -225,4 +225,8 @@ function setupList(listId, username) { window.LIST_ID = listId; window.usernameForReconnect = username; +} + +function unmarkNotPurchased(itemId) { + socket.emit('unmark_not_purchased', { item_id: itemId }); } \ No newline at end of file diff --git a/static/js/sockets.js b/static/js/sockets.js index 8cc9027..18f418d 100644 --- a/static/js/sockets.js +++ b/static/js/sockets.js @@ -119,4 +119,12 @@ socket.on('full_list', function (data) { showToast('Lista została zaktualizowana', 'info'); } didReceiveFirstFullList = true; +}); + +socket.on('item_marked_not_purchased', data => { + socket.emit('request_full_list', { list_id: window.LIST_ID }); +}); + +socket.on('item_unmarked_not_purchased', data => { + socket.emit('request_full_list', { list_id: window.LIST_ID }); }); \ No newline at end of file diff --git a/templates/list.html b/templates/list.html index 3562ab7..b0f9838 100644 --- a/templates/list.html +++ b/templates/list.html @@ -87,9 +87,16 @@ Lista: {{ list.title }}
    {% for item in items %} -
  • -
    - +
  • + +
    + + + {{ item.name }} {% if item.quantity and item.quantity > 1 %} diff --git a/templates/list_share.html b/templates/list_share.html index 052356a..2897e47 100644 --- a/templates/list_share.html +++ b/templates/list_share.html @@ -27,8 +27,16 @@
      {% for item in items %} -
    • - +
    • + +
      + + + {{ item.name }} {% if item.quantity and item.quantity > 1 %} @@ -40,10 +48,25 @@ [ {{ item.note }} ] {% endif %}
      - +
      + {% if item.not_purchased %} + + {% else %} + + + {% endif %} +
      + +
    • {% else %}