import os import secrets import time import mimetypes import sys import platform import psutil import hashlib import re import traceback import bcrypt import colorsys from pillow_heif import register_heif_opener from datetime import datetime, timedelta, UTC, timezone from urllib.parse import urlparse, urlunparse from flask import ( Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session, jsonify, ) from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, UserMixin, login_user, login_required, logout_user, current_user, ) from flask_compress import Compress from flask_socketio import SocketIO, emit, join_room from config import Config from PIL import Image, ExifTags, ImageFilter, ImageOps from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import func, extract, inspect, or_, case, text from sqlalchemy.orm import joinedload, load_only from collections import defaultdict, deque from functools import wraps # from flask_talisman import Talisman # import niżej pod warunkiem from flask_session import Session from types import SimpleNamespace from pdf2image import convert_from_bytes from urllib.parse import urlencode # OCR import pytesseract from pytesseract import Output import logging from types import SimpleNamespace app = Flask(__name__) app.config.from_object(Config) # Konfiguracja nagłówków bezpieczeństwa z .env csp_policy = ( { "default-src": "'self'", "script-src": "'self' 'unsafe-inline'", "style-src": "'self' 'unsafe-inline'", "img-src": "'self' data:", "connect-src": "'self'", } if app.config.get("ENABLE_CSP", True) else None ) permissions_policy = {"browsing-topics": "()"} if app.config.get("ENABLE_PP") else None talisman_kwargs = { "force_https": False, "strict_transport_security": app.config.get("ENABLE_HSTS", True), "frame_options": "DENY" if app.config.get("ENABLE_XFO", True) else None, "permissions_policy": permissions_policy, "content_security_policy": csp_policy, "x_content_type_options": app.config.get("ENABLE_XCTO", True), "strict_transport_security_include_subdomains": False, } referrer_policy = app.config.get("REFERRER_POLICY") if referrer_policy: talisman_kwargs["referrer_policy"] = referrer_policy # jak naglowki wylaczone, nie ładuj talisman z pominięciem referrer_policy effective_headers = { k: v for k, v in talisman_kwargs.items() if k != "referrer_policy" and v not in (None, False) } if effective_headers: from flask_talisman import Talisman talisman = Talisman( app, session_cookie_secure=app.config.get("SESSION_COOKIE_SECURE", True), **talisman_kwargs, ) print("[TALISMAN] Włączony z nagłówkami:", list(effective_headers.keys())) else: print("[TALISMAN] Pominięty — wszystkie nagłówki security wyłączone.") register_heif_opener() # pillow_heif dla HEIC SQLALCHEMY_ECHO = True ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic", "pdf"} SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD") DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME") DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD") UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER") AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE") AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE") HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN") SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES")) SESSION_COOKIE_SECURE = app.config.get("SESSION_COOKIE_SECURE") app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"] app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) DEBUG_MODE = app.config.get("DEBUG_MODE", False) os.makedirs(UPLOAD_FOLDER, exist_ok=True) failed_login_attempts = defaultdict(deque) MAX_ATTEMPTS = 10 TIME_WINDOW = 60 * 60 WEBP_SAVE_PARAMS = { "format": "WEBP", "lossless": True, # lub False jeśli chcesz używać quality "method": 6, # "quality": 95, # tylko jeśli lossless=False } db = SQLAlchemy(app) socketio = SocketIO(app, async_mode="eventlet") login_manager = LoginManager(app) login_manager.login_view = "login" # flask-session app.config["SESSION_TYPE"] = "sqlalchemy" app.config["SESSION_SQLALCHEMY"] = db Session(app) # flask-compress compress = Compress() compress.init_app(app) static_bp = Blueprint("static_bp", __name__) # dla live active_users = {} def utcnow(): return datetime.now(timezone.utc) app_start_time = utcnow() class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(512), nullable=False) is_admin = db.Column(db.Boolean, default=False) # Tabela pośrednia shopping_list_category = db.Table( "shopping_list_category", db.Column( "shopping_list_id", db.Integer, db.ForeignKey("shopping_list.id"), primary_key=True, ), db.Column( "category_id", db.Integer, db.ForeignKey("category.id"), primary_key=True ), ) class Category(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), unique=True, nullable=False) class ShoppingList(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(150), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) owner_id = db.Column(db.Integer, db.ForeignKey("user.id")) owner = db.relationship("User", backref="lists", foreign_keys=[owner_id]) is_temporary = db.Column(db.Boolean, default=False) share_token = db.Column(db.String(64), unique=True, nullable=True) expires_at = db.Column(db.DateTime(timezone=True), nullable=True) owner = db.relationship("User", backref="lists", lazy=True) is_archived = db.Column(db.Boolean, default=False) is_public = db.Column(db.Boolean, default=True) # Relacje items = db.relationship("Item", back_populates="shopping_list", lazy="select") receipts = db.relationship("Receipt", back_populates="shopping_list", lazy="select") expenses = db.relationship("Expense", back_populates="shopping_list", lazy="select") # Nowa relacja wiele-do-wielu categories = db.relationship( "Category", secondary=shopping_list_category, backref=db.backref("shopping_lists", lazy="dynamic"), ) class Item(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) name = db.Column(db.String(150), nullable=False) # added_at = db.Column(db.DateTime, default=datetime.utcnow) added_at = db.Column(db.DateTime, default=utcnow) added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) added_by_user = db.relationship( "User", backref="added_items", lazy="joined", foreign_keys=[added_by] ) purchased = db.Column(db.Boolean, default=False) purchased_at = db.Column(db.DateTime, nullable=True) quantity = db.Column(db.Integer, default=1) note = db.Column(db.Text, nullable=True) not_purchased = db.Column(db.Boolean, default=False) not_purchased_reason = db.Column(db.Text, nullable=True) position = db.Column(db.Integer, default=0) shopping_list = db.relationship("ShoppingList", back_populates="items") class SuggestedProduct(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(150), unique=True, nullable=False) usage_count = db.Column(db.Integer, default=0) class Expense(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) amount = db.Column(db.Float, nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) receipt_filename = db.Column(db.String(255), nullable=True) shopping_list = db.relationship("ShoppingList", back_populates="expenses") class Receipt(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False) filename = db.Column(db.String(255), nullable=False) uploaded_at = db.Column(db.DateTime, default=datetime.utcnow) filesize = db.Column(db.Integer, nullable=True) file_hash = db.Column(db.String(64), nullable=True, unique=True) shopping_list = db.relationship("ShoppingList", back_populates="receipts") def hash_password(password): pepper = app.config["BCRYPT_PEPPER"] peppered = (password + pepper).encode("utf-8") salt = bcrypt.gensalt() hashed = bcrypt.hashpw(peppered, salt) return hashed.decode("utf-8") def check_password(stored_hash, password_input): pepper = app.config["BCRYPT_PEPPER"] peppered = (password_input + pepper).encode("utf-8") if stored_hash.startswith("$2b$") or stored_hash.startswith("$2a$"): try: return bcrypt.checkpw(peppered, stored_hash.encode("utf-8")) except Exception: return False return False def set_authorized_cookie(response): secure_flag = app.config["SESSION_COOKIE_SECURE"] max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400) response.set_cookie( "authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age, secure=secure_flag, httponly=True, ) return response if app.config["SQLALCHEMY_DATABASE_URI"].startswith("sqlite:///"): db_path = app.config["SQLALCHEMY_DATABASE_URI"].replace("sqlite:///", "", 1) db_dir = os.path.dirname(db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) print(f"Utworzono katalog bazy: {db_dir}") with app.app_context(): db.create_all() # --- Tworzenie admina --- admin_username = DEFAULT_ADMIN_USERNAME admin_password = DEFAULT_ADMIN_PASSWORD password_hash = hash_password(admin_password) admin = User.query.filter_by(username=admin_username).first() if admin: if not admin.is_admin: admin.is_admin = True if not check_password(admin.password_hash, admin_password): admin.password_hash = password_hash print(f"[INFO] Zmieniono hasło admina '{admin_username}' z konfiguracji.") db.session.commit() else: db.session.add( User(username=admin_username, password_hash=password_hash, is_admin=True) ) db.session.commit() default_categories = app.config["DEFAULT_CATEGORIES"] existing_names = { c.name for c in Category.query.filter(Category.name.isnot(None)).all() } existing_names_lower = {name.lower() for name in existing_names} missing = [ cat for cat in default_categories if cat.lower() not in existing_names_lower ] if missing: db.session.add_all(Category(name=cat) for cat in missing) db.session.commit() print(f"[INFO] Dodano brakujące kategorie: {', '.join(missing)}") # else: # print("[INFO] Wszystkie domyślne kategorie już istnieją") @static_bp.route("/static/js/") def serve_js(filename): response = send_from_directory("static/js", filename) response.headers["Cache-Control"] = app.config["JS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) return response @static_bp.route("/static/css/") def serve_css(filename): response = send_from_directory("static/css", filename) response.headers["Cache-Control"] = app.config["CSS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) return response @static_bp.route("/static/lib/js/") def serve_js_lib(filename): response = send_from_directory("static/lib/js", filename) response.headers["Cache-Control"] = app.config["LIB_JS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) return response @static_bp.route("/static/lib/css/") def serve_css_lib(filename): response = send_from_directory("static/lib/css", filename) response.headers["Cache-Control"] = app.config["LIB_CSS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) return response app.register_blueprint(static_bp) def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def get_list_details(list_id): shopping_list = ShoppingList.query.options( joinedload(ShoppingList.items).joinedload(Item.added_by_user), joinedload(ShoppingList.expenses), joinedload(ShoppingList.receipts), ).get_or_404(list_id) items = sorted(shopping_list.items, key=lambda i: i.position or 0) expenses = shopping_list.expenses total_expense = sum(e.amount for e in expenses) if expenses else 0 receipt_files = [r.filename for r in shopping_list.receipts] return shopping_list, items, receipt_files, expenses, total_expense def get_total_expense_for_list(list_id, start_date=None, end_date=None): query = db.session.query(func.sum(Expense.amount)).filter( Expense.list_id == list_id ) if start_date and end_date: query = query.filter( Expense.added_at >= start_date, Expense.added_at < end_date ) return query.scalar() or 0 def update_list_categories_from_form(shopping_list, form): category_ids = form.getlist("categories") shopping_list.categories.clear() if category_ids: cats = Category.query.filter(Category.id.in_(category_ids)).all() shopping_list.categories.extend(cats) def generate_share_token(length=8): return secrets.token_hex(length // 2) def check_list_public(shopping_list): if not shopping_list.is_public: flash("Ta lista nie jest publicznie dostępna", "danger") return False return True def enrich_list_data(l): counts = ( db.session.query( func.count(Item.id), func.sum(case((Item.purchased == True, 1), else_=0)), func.sum(Expense.amount), ) .outerjoin(Expense, Expense.list_id == Item.list_id) .filter(Item.list_id == l.id) .first() ) l.total_count = counts[0] or 0 l.purchased_count = counts[1] or 0 l.total_expense = counts[2] or 0 return l def get_total_records(): total = 0 inspector = inspect(db.engine) with db.engine.connect() as conn: for table_name in inspector.get_table_names(): count = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar() total += count return total def save_resized_image(file, path): try: image = Image.open(file) image.verify() file.seek(0) image = Image.open(file) except Exception: raise ValueError("Nieprawidłowy plik graficzny") try: image = ImageOps.exif_transpose(image) except Exception: pass try: image.thumbnail((2000, 2000)) image = image.convert("RGB") image.info.clear() new_path = path.rsplit(".", 1)[0] + ".webp" image.save(new_path, **WEBP_SAVE_PARAMS) except Exception as e: raise ValueError(f"Błąd podczas przetwarzania obrazu: {e}") def redirect_with_flash( message: str, category: str = "info", endpoint: str = "main_page" ): flash(message, category) return redirect(url_for(endpoint)) def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_admin: return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger") return f(*args, **kwargs) return decorated_function def get_progress(list_id): total_count, purchased_count = ( db.session.query( func.count(Item.id), func.sum(case((Item.purchased == True, 1), else_=0)) ) .filter(Item.list_id == list_id) .first() ) total_count = total_count or 0 purchased_count = purchased_count or 0 percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return purchased_count, total_count, percent def delete_receipts_for_list(list_id): receipt_pattern = f"list_{list_id}_" upload_folder = app.config["UPLOAD_FOLDER"] for filename in os.listdir(upload_folder): if filename.startswith(receipt_pattern): try: os.remove(os.path.join(upload_folder, filename)) except Exception as e: print(f"Nie udało się usunąć pliku {filename}: {e}") def receipt_error(message): if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": return jsonify({"success": False, "error": message}), 400 flash(message, "danger") return redirect(request.referrer or url_for("main_page")) def rotate_receipt_by_id(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(old_path): raise FileNotFoundError("Plik nie istnieje") image = Image.open(old_path) rotated = image.rotate(-90, expand=True) new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) rotated.save(new_path, **WEBP_SAVE_PARAMS) os.remove(old_path) receipt.filename = new_filename db.session.commit() return receipt def delete_receipt_by_id(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if os.path.exists(filepath): os.remove(filepath) db.session.delete(receipt) db.session.commit() return receipt def generate_new_receipt_filename(list_id): timestamp = datetime.now().strftime("%Y%m%d_%H%M") random_part = secrets.token_hex(3) return f"list_{list_id}_{timestamp}_{random_part}.webp" def handle_crop_receipt(receipt_id, file): if not receipt_id or not file: return {"success": False, "error": "Brak danych"} receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) try: new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) save_resized_image(file, new_path) if os.path.exists(old_path): os.remove(old_path) receipt.filename = os.path.basename(new_path) db.session.commit() recalculate_filesizes(receipt.id) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} def get_total_expenses_grouped_by_list_created_at( user_only=False, admin=False, show_all=False, range_type="monthly", start_date=None, end_date=None, user_id=None, category_id=None, ): lists_query = ShoppingList.query if admin: pass elif show_all: lists_query = lists_query.filter( or_( ShoppingList.owner_id == user_id, ShoppingList.is_public == True, ) ) else: lists_query = lists_query.filter(ShoppingList.owner_id == user_id) if category_id: if str(category_id) == "none": lists_query = lists_query.filter(~ShoppingList.categories.any()) else: try: cat_id_int = int(category_id) except ValueError: return {"labels": [], "expenses": []} lists_query = lists_query.join( shopping_list_category, shopping_list_category.c.shopping_list_id == ShoppingList.id, ).filter(shopping_list_category.c.category_id == cat_id_int) today = datetime.now(timezone.utc).date() if range_type == "last30days": dt_start = today - timedelta(days=29) dt_end = today + timedelta(days=1) start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime( "%Y-%m-%d" ) elif range_type == "currentmonth": dt_start = today.replace(day=1) dt_end = today + timedelta(days=1) start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime( "%Y-%m-%d" ) if start_date and end_date: try: dt_start = datetime.strptime(start_date, "%Y-%m-%d") dt_end = datetime.strptime(end_date, "%Y-%m-%d") if dt_end.tzinfo is None: dt_end = dt_end.replace(tzinfo=timezone.utc) dt_end += timedelta(days=1) except Exception: return {"error": "Błędne daty", "labels": [], "expenses": []} lists_query = lists_query.filter( ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end ) lists = lists_query.all() if not lists: return {"labels": [], "expenses": []} list_ids = [l.id for l in lists] total_expenses = ( db.session.query( Expense.list_id, func.sum(Expense.amount).label("total_amount") ) .filter(Expense.list_id.in_(list_ids)) .group_by(Expense.list_id) .all() ) expense_map = {lid: amt for lid, amt in total_expenses} grouped = defaultdict(float) for sl in lists: if sl.id in expense_map: ts = sl.created_at or datetime.now(timezone.utc) if range_type in ("last30days", "currentmonth"): key = ts.strftime("%Y-%m-%d") # dzienny widok elif range_type == "monthly": key = ts.strftime("%Y-%m") elif range_type == "quarterly": key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}" elif range_type == "halfyearly": key = f"{ts.year}-H{1 if ts.month <= 6 else 2}" elif range_type == "yearly": key = str(ts.year) else: key = ts.strftime("%Y-%m-%d") grouped[key] += expense_map[sl.id] labels = sorted(grouped) expenses = [round(grouped[l], 2) for l in labels] return {"labels": labels, "expenses": expenses} def recalculate_filesizes(receipt_id: int = None): updated = 0 not_found = 0 unchanged = 0 if receipt_id is not None: receipt = db.session.get(Receipt, receipt_id) receipts = [receipt] if receipt else [] else: receipts = db.session.execute(db.select(Receipt)).scalars().all() for r in receipts: if not r: continue filepath = os.path.join(app.config["UPLOAD_FOLDER"], r.filename) if os.path.exists(filepath): real_size = os.path.getsize(filepath) if r.filesize != real_size: r.filesize = real_size updated += 1 else: unchanged += 1 else: not_found += 1 db.session.commit() return updated, unchanged, not_found def get_admin_expense_summary(): now = datetime.now(timezone.utc) current_year = now.year current_month = now.month def calc_sum(base_query): total = base_query.scalar() or 0 year_total = ( base_query.filter( extract("year", ShoppingList.created_at) == current_year ).scalar() or 0 ) month_total = ( base_query.filter(extract("year", ShoppingList.created_at) == current_year) .filter(extract("month", ShoppingList.created_at) == current_month) .scalar() or 0 ) return {"total": total, "year": year_total, "month": month_total} base = db.session.query(func.sum(Expense.amount)).join( ShoppingList, ShoppingList.id == Expense.list_id ) all_lists = calc_sum(base) active_lists = calc_sum( base.filter( ShoppingList.is_archived == False, ~( (ShoppingList.is_temporary == True) & (ShoppingList.expires_at != None) & (ShoppingList.expires_at <= now) ), ) ) archived_lists = calc_sum(base.filter(ShoppingList.is_archived == True)) expired_lists = calc_sum( base.filter( ShoppingList.is_archived == False, (ShoppingList.is_temporary == True), (ShoppingList.expires_at != None), (ShoppingList.expires_at <= now), ) ) return { "all": all_lists, "active": active_lists, "archived": archived_lists, "expired": expired_lists, } def category_to_color(name): hash_val = int(hashlib.md5(name.encode("utf-8")).hexdigest(), 16) hue = (hash_val % 360) / 360.0 saturation = 0.60 + ((hash_val >> 8) % 17) / 100.0 lightness = 0.28 + ((hash_val >> 16) % 11) / 100.0 r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation) return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}" def get_total_expenses_grouped_by_category( show_all, range_type, start_date, end_date, user_id, category_id=None ): lists_query = ShoppingList.query if show_all: lists_query = lists_query.filter( or_(ShoppingList.owner_id == user_id, ShoppingList.is_public == True) ) else: lists_query = lists_query.filter(ShoppingList.owner_id == user_id) if category_id: if str(category_id) == "none": lists_query = lists_query.filter(~ShoppingList.categories.any()) else: try: cat_id_int = int(category_id) except ValueError: return {"labels": [], "datasets": []} lists_query = lists_query.join( shopping_list_category, shopping_list_category.c.shopping_list_id == ShoppingList.id, ).filter(shopping_list_category.c.category_id == cat_id_int) if start_date and end_date: try: dt_start = datetime.strptime(start_date, "%Y-%m-%d") dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) except Exception: return {"error": "Błędne daty"} lists_query = lists_query.filter( ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end ) lists = lists_query.options(joinedload(ShoppingList.categories)).all() if not lists: return {"labels": [], "datasets": []} data_map = defaultdict(lambda: defaultdict(float)) all_labels = set() for l in lists: total_expense = ( db.session.query(func.sum(Expense.amount)) .filter(Expense.list_id == l.id) .scalar() ) or 0 if total_expense <= 0: continue if range_type == "monthly": key = l.created_at.strftime("%Y-%m") elif range_type == "quarterly": key = f"{l.created_at.year}-Q{((l.created_at.month - 1) // 3 + 1)}" elif range_type == "halfyearly": key = f"{l.created_at.year}-H{1 if l.created_at.month <= 6 else 2}" elif range_type == "yearly": key = str(l.created_at.year) else: key = l.created_at.strftime("%Y-%m-%d") all_labels.add(key) if str(category_id) == "none": if not l.categories: data_map[key]["Bez kategorii"] += total_expense continue if not l.categories: data_map[key]["Bez kategorii"] += total_expense else: for c in l.categories: if category_id and str(c.id) != str(category_id): continue data_map[key][c.name] += total_expense labels = sorted(all_labels) categories_with_expenses = sorted( { cat for cat_data in data_map.values() for cat, value in cat_data.items() if value > 0 } ) datasets = [] for cat in categories_with_expenses: datasets.append( { "label": cat, "data": [round(data_map[label].get(cat, 0), 2) for label in labels], "backgroundColor": category_to_color(cat), } ) return {"labels": labels, "datasets": datasets} def save_pdf_as_webp(file, path): try: images = convert_from_bytes(file.read(), dpi=300) if not images: raise ValueError("Nie udało się przekonwertować PDF na obraz.") total_height = sum(img.height for img in images) max_width = max(img.width for img in images) combined = Image.new("RGB", (max_width, total_height), (255, 255, 255)) y_offset = 0 for img in images: combined.paste(img, (0, y_offset)) y_offset += img.height combined.thumbnail((2000, 20000)) new_path = path.rsplit(".", 1)[0] + ".webp" combined.save(new_path, **WEBP_SAVE_PARAMS) except Exception as e: raise ValueError(f"Błąd podczas przetwarzania PDF: {e}") def get_active_months_query(visible_lists_query=None): if db.engine.name == "sqlite": month_col = func.strftime("%Y-%m", ShoppingList.created_at) else: month_col = func.to_char(ShoppingList.created_at, "YYYY-MM") query = db.session.query(month_col.label("month")) if visible_lists_query is not None: query = query.select_from(visible_lists_query.subquery()) active_months = ( query.filter(ShoppingList.created_at != None).distinct().order_by("month").all() ) return [row.month for row in active_months] def normalize_name(name): if not name: return "" return re.sub(r'\s+', ' ', name).strip().lower() ############# OCR ########################### def preprocess_image_for_tesseract(image): image = ImageOps.autocontrast(image) image = image.point(lambda x: 0 if x < 150 else 255) image = image.resize((image.width * 2, image.height * 2), Image.BICUBIC) return image def extract_total_tesseract(image): text = pytesseract.image_to_string(image, lang="pol", config="--psm 4") lines = text.splitlines() candidates = [] blacklist_keywords = re.compile(r"\b(ptu|vat|podatek|stawka)\b", re.IGNORECASE) priority_keywords = re.compile( r""" \b( razem\s*do\s*zap[łl][aąo0]ty | do\s*zap[łl][aąo0]ty | suma | kwota | warto[śćs] | płatno[śćs] | total | amount )\b """, re.IGNORECASE | re.VERBOSE, ) for line in lines: if not line.strip(): continue if blacklist_keywords.search(line): continue is_priority = priority_keywords.search(line) matches = re.findall(r"\d{1,4}[.,]\d{2}", line) for match in matches: try: val = float(match.replace(",", ".")) if 0.1 <= val <= 100000: candidates.append((val, line, is_priority is not None)) except: continue if is_priority: spaced = re.findall(r"\d{1,4}\s\d{2}", line) for match in spaced: try: val = float(match.replace(" ", ".")) if 0.1 <= val <= 100000: candidates.append((val, line, True)) except: continue preferred = [(val, line) for val, line, is_pref in candidates if is_pref] if preferred: best_val = max(preferred, key=lambda x: x[0])[0] if best_val < 99999: return round(best_val, 2), lines if candidates: best_val = max(candidates, key=lambda x: x[0])[0] if best_val < 99999: return round(best_val, 2), lines data = pytesseract.image_to_data( image, lang="pol", config="--psm 4", output_type=Output.DICT ) font_candidates = [] for i in range(len(data["text"])): word = data["text"][i].strip() if not word or not re.match(r"^\d{1,5}[.,\s]\d{2}$", word): continue try: val = float(word.replace(",", ".").replace(" ", ".")) height = data["height"][i] conf = int(data.get("conf", ["0"] * len(data["text"]))[i]) if 0.1 <= val <= 100000: font_candidates.append((val, height, conf)) except: continue if font_candidates: best = max(font_candidates, key=lambda x: (x[1], x[2])) return round(best[0], 2), lines return 0.0, lines ############# END OCR ####################### # zabezpieczenie logowani do systemu - błędne hasła def is_ip_blocked(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() return len(attempts) >= MAX_ATTEMPTS def register_failed_attempt(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() attempts.append(now) def reset_failed_attempts(ip): failed_login_attempts[ip].clear() def attempts_remaining(ip): attempts = failed_login_attempts[ip] return max(0, MAX_ATTEMPTS - len(attempts)) #################################################### def get_client_ip(): for header in ["X-Forwarded-For", "X-Real-IP"]: if header in request.headers: ip = request.headers[header].split(",")[0].strip() if ip: return ip return request.remote_addr @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) @app.context_processor def inject_time(): return dict(time=time) @app.context_processor def inject_has_authorized_cookie(): return {"has_authorized_cookie": "authorized" in request.cookies} @app.context_processor def inject_is_blocked(): ip = request.access_route[0] return {"is_blocked": is_ip_blocked(ip)} @app.before_request def require_system_password(): endpoint = request.endpoint if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"): return ip = request.access_route[0] if is_ip_blocked(ip): abort(403) if endpoint is None: return if endpoint in ("system_auth", "healthcheck", "robots_txt"): return if ( "authorized" not in request.cookies and not endpoint.startswith("login") and endpoint != "favicon" ): if endpoint == "static_bp.serve_js": requested_file = request.view_args.get("filename", "") if requested_file == "toasts.js": return if requested_file.endswith(".js"): return redirect(url_for("system_auth", next=request.url)) return if endpoint.startswith("static_bp."): return if request.path == "/": return redirect(url_for("system_auth")) parsed = urlparse(request.url) fixed_url = urlunparse(parsed._replace(netloc=request.host)) return redirect(url_for("system_auth", next=fixed_url)) @app.before_request def start_timer(): request._start_time = time.time() @app.after_request def log_request(response): if request.path == "/healthcheck": return response ip = get_client_ip() method = request.method path = request.path status = response.status_code length = response.content_length or "-" start = getattr(request, "_start_time", None) duration = round((time.time() - start) * 1000, 2) if start else "-" agent = request.headers.get("User-Agent", "-") if status == 304: app.logger.info( f'REVALIDATED: {ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"' ) else: app.logger.info( f'{ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"' ) app.logger.debug(f"Request headers: {dict(request.headers)}") app.logger.debug(f"Response headers: {dict(response.headers)}") return response @app.template_filter("filemtime") def file_mtime_filter(path): try: t = os.path.getmtime(path) return datetime.fromtimestamp(t) except Exception: # return datetime.utcnow() return datetime.now(timezone.utc) @app.template_filter("todatetime") def to_datetime_filter(s): return datetime.strptime(s, "%Y-%m-%d") @app.template_filter("filesizeformat") def filesizeformat_filter(path): try: size = os.path.getsize(path) for unit in ["B", "KB", "MB", "GB"]: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} TB" except Exception: return "N/A" @app.errorhandler(404) def page_not_found(e): return ( render_template( "errors.html", code=404, title="Strona nie znaleziona", message="Ups! Podana strona nie istnieje lub została przeniesiona.", ), 404, ) @app.errorhandler(403) def forbidden(e): return ( render_template( "errors.html", code=403, title="Brak dostępu", message=( e.description if e.description else "Nie masz uprawnień do wyświetlenia tej strony." ), ), 403, ) @app.route("/favicon.ico") def favicon_ico(): return redirect(url_for("static", filename="favicon.svg")) @app.route("/favicon.svg") def favicon(): svg = """ 🛒 """ return svg, 200, {"Content-Type": "image/svg+xml"} @app.route("/") def main_page(): now = datetime.now(timezone.utc) month_param = request.args.get("m", None) start = end = None if month_param in (None, ""): # brak wyboru -> domyślnie aktualny miesiąc month_str = now.strftime("%Y-%m") start = datetime(now.year, now.month, 1, tzinfo=timezone.utc) end = (start + timedelta(days=31)).replace(day=1) elif month_param == "all": month_str = "all" start = end = None else: month_str = month_param try: year, month = map(int, month_str.split("-")) start = datetime(year, month, 1, tzinfo=timezone.utc) end = (start + timedelta(days=31)).replace(day=1) except ValueError: start = end = None # dalej normalnie używasz date_filter: def date_filter(query): if start and end: query = query.filter( ShoppingList.created_at >= start, ShoppingList.created_at < end ) return query def date_filter(query): if start and end: query = query.filter( ShoppingList.created_at >= start, ShoppingList.created_at < end ) return query if current_user.is_authenticated: user_lists = ( date_filter( ShoppingList.query.filter_by( owner_id=current_user.id, is_archived=False ).filter( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ) ) .order_by(ShoppingList.created_at.desc()) .all() ) archived_lists = ( ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True) .order_by(ShoppingList.created_at.desc()) .all() ) public_lists = ( date_filter( ShoppingList.query.filter( ShoppingList.is_public == True, ShoppingList.owner_id != current_user.id, ( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ), ShoppingList.is_archived == False, ) ) .order_by(ShoppingList.created_at.desc()) .all() ) else: user_lists = [] archived_lists = [] public_lists = ( date_filter( ShoppingList.query.filter( ShoppingList.is_public == True, ( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ), ShoppingList.is_archived == False, ) ) .order_by(ShoppingList.created_at.desc()) .all() ) # Definiujemy widoczny zakres list dla tego użytkownika if current_user.is_authenticated: visible_lists_query = ShoppingList.query.filter( or_( ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True ) ) else: visible_lists_query = ShoppingList.query.filter(ShoppingList.is_public == True) # Teraz możemy bezpiecznie pobrać miesiące month_options = get_active_months_query(visible_lists_query) all_lists = user_lists + public_lists + archived_lists all_ids = [l.id for l in all_lists] if all_ids: stats = ( db.session.query( Item.list_id, func.count(Item.id).label("total_count"), func.sum(case((Item.purchased == True, 1), else_=0)).label( "purchased_count" ), func.sum(case((Item.not_purchased == True, 1), else_=0)).label( "not_purchased_count" ), ) .filter(Item.list_id.in_(all_ids)) .group_by(Item.list_id) .all() ) stats_map = { s.list_id: ( s.total_count or 0, s.purchased_count or 0, s.not_purchased_count or 0, ) for s in stats } latest_expenses_map = dict( db.session.query( Expense.list_id, func.coalesce(func.sum(Expense.amount), 0) ) .filter(Expense.list_id.in_(all_ids)) .group_by(Expense.list_id) .all() ) for l in all_lists: total_count, purchased_count, not_purchased_count = stats_map.get( l.id, (0, 0, 0) ) l.total_count = total_count l.purchased_count = purchased_count l.not_purchased_count = not_purchased_count l.total_expense = latest_expenses_map.get(l.id, 0) l.category_badges = [ {"name": c.name, "color": category_to_color(c.name)} for c in l.categories ] else: for l in all_lists: l.total_count = 0 l.purchased_count = 0 l.not_purchased_count = 0 l.total_expense = 0 l.category_badges = [] return render_template( "main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists, now=now, timedelta=timedelta, month_options=month_options, selected_month=month_str, ) @app.route("/system-auth", methods=["GET", "POST"]) def system_auth(): if ( current_user.is_authenticated or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE ): flash("Jesteś już zalogowany lub autoryzowany.", "info") return redirect(url_for("main_page")) ip = request.access_route[0] next_page = request.args.get("next") or url_for("main_page") if is_ip_blocked(ip): flash( "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", "danger", ) return render_template("system_auth.html"), 403 if request.method == "POST": if request.form["password"] == SYSTEM_PASSWORD: reset_failed_attempts(ip) resp = redirect(next_page) return set_authorized_cookie(resp) else: register_failed_attempt(ip) if is_ip_blocked(ip): flash( "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", "danger", ) return render_template("system_auth.html"), 403 remaining = attempts_remaining(ip) flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning") return render_template("system_auth.html") @app.route("/toggle_archive_list/") @login_required def toggle_archive_list(list_id): l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: return redirect_with_flash("Nie masz uprawnień do tej listy", "danger") archive = request.args.get("archive", "true").lower() == "true" if archive: l.is_archived = True flash(f"Lista „{l.title}” została zarchiwizowana.", "success") else: l.is_archived = False flash(f"Lista „{l.title}” została przywrócona.", "success") db.session.commit() return redirect(url_for("main_page")) @app.route("/edit_my_list/", methods=["GET", "POST"]) @login_required def edit_my_list(list_id): receipts = ( Receipt.query.filter_by(list_id=list_id) .order_by(Receipt.uploaded_at.desc()) .all() ) l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: abort(403, description="Nie jesteś właścicielem tej listy.") categories = Category.query.order_by(Category.name.asc()).all() selected_categories_ids = {c.id for c in l.categories} next_page = request.args.get("next") or request.referrer if request.method == "POST": move_to_month = request.form.get("move_to_month") if move_to_month: try: year, month = map(int, move_to_month.split("-")) new_created_at = datetime(year, month, 1, tzinfo=timezone.utc) l.created_at = new_created_at db.session.commit() flash( f"Zmieniono datę utworzenia listy na {new_created_at.strftime('%Y-%m-%d')}", "success", ) return redirect(next_page or url_for("main_page")) except ValueError: flash("Nieprawidłowy format miesiąca", "danger") return redirect(next_page or url_for("main_page")) new_title = request.form.get("title", "").strip() is_public = "is_public" in request.form is_temporary = "is_temporary" in request.form is_archived = "is_archived" in request.form expires_date = request.form.get("expires_date") expires_time = request.form.get("expires_time") if not new_title: flash("Podaj poprawny tytuł", "danger") return redirect(next_page or url_for("main_page")) l.title = new_title l.is_public = is_public l.is_temporary = is_temporary l.is_archived = is_archived if expires_date and expires_time: try: combined = f"{expires_date} {expires_time}" expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M") l.expires_at = expires_dt.replace(tzinfo=timezone.utc) except ValueError: flash("Błędna data lub godzina wygasania", "danger") return redirect(next_page or url_for("main_page")) else: l.expires_at = None update_list_categories_from_form(l, request.form) db.session.commit() flash("Zaktualizowano dane listy", "success") return redirect(next_page or url_for("main_page")) return render_template( "edit_my_list.html", list=l, receipts=receipts, categories=categories, selected_categories=selected_categories_ids, ) @app.route("/delete_user_list/", methods=["POST"]) @login_required def delete_user_list(list_id): l = db.session.get(ShoppingList, list_id) if l is None or l.owner_id != current_user.id: abort(403, description="Nie jesteś właścicielem tej listy.") l = db.session.get(ShoppingList, list_id) if l is None or l.owner_id != current_user.id: abort(403) delete_receipts_for_list(list_id) Item.query.filter_by(list_id=list_id).delete() Expense.query.filter_by(list_id=list_id).delete() db.session.delete(l) db.session.commit() flash("Lista została usunięta", "success") return redirect(url_for("main_page")) @app.route("/toggle_visibility/", methods=["GET", "POST"]) @login_required def toggle_visibility(list_id): l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: if request.is_json or request.method == "POST": return {"error": "Unauthorized"}, 403 flash("Nie masz uprawnień do tej listy", "danger") return redirect(url_for("main_page")) l.is_public = not l.is_public db.session.commit() share_url = f"{request.url_root}share/{l.share_token}" if request.is_json or request.method == "POST": return {"is_public": l.is_public, "share_url": share_url} if l.is_public: flash("Lista została udostępniona publicznie", "success") else: flash("Lista została ukryta przed gośćmi", "info") return redirect(url_for("main_page")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username_input = request.form["username"].lower() user = User.query.filter(func.lower(User.username) == username_input).first() if user and check_password(user.password_hash, request.form["password"]): session.permanent = True login_user(user) session.modified = True flash("Zalogowano pomyślnie", "success") return redirect(url_for("main_page")) flash("Nieprawidłowy login lub hasło", "danger") return render_template("login.html") @app.route("/logout") @login_required def logout(): logout_user() flash("Wylogowano pomyślnie", "success") return redirect(url_for("main_page")) @app.route("/create", methods=["POST"]) @login_required def create_list(): title = request.form.get("title") is_temporary = request.form.get("temporary") == "1" token = generate_share_token(8) expires_at = ( datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None ) new_list = ShoppingList( title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at, ) db.session.add(new_list) db.session.commit() flash("Utworzono nową listę", "success") return redirect(url_for("view_list", list_id=new_list.id)) @app.route("/list/") @login_required def view_list(list_id): shopping_list, items, receipt_files, expenses, total_expense = get_list_details( list_id ) total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 is_owner = current_user.id == shopping_list.owner_id for item in items: if item.added_by != shopping_list.owner_id: item.added_by_display = ( item.added_by_user.username if item.added_by_user else "?" ) else: item.added_by_display = None shopping_list.category_badges = [ {"name": c.name, "color": category_to_color(c.name)} for c in shopping_list.categories ] return render_template( "list.html", list=shopping_list, items=items, receipt_files=receipt_files, total_count=total_count, purchased_count=purchased_count, percent=percent, expenses=expenses, total_expense=total_expense, is_share=False, is_owner=is_owner, ) @app.route("/expenses") @login_required def expenses(): start_date_str = request.args.get("start_date") end_date_str = request.args.get("end_date") category_id = request.args.get("category_id", type=int) show_all = request.args.get("show_all", "true").lower() == "true" categories = ( Category.query.join( shopping_list_category, shopping_list_category.c.category_id == Category.id ) .join( ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id ) .join(Expense, Expense.list_id == ShoppingList.id) .filter( or_( ShoppingList.owner_id == current_user.id, ( ShoppingList.is_public == True if show_all else ShoppingList.owner_id == current_user.id ), ) ) .distinct() .order_by(Category.name.asc()) .all() ) categories.append(SimpleNamespace(id="none", name="Bez kategorii")) start = None end = None expenses_query = Expense.query.options( joinedload(Expense.shopping_list).joinedload(ShoppingList.owner), joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses), joinedload(Expense.shopping_list).joinedload(ShoppingList.categories), ).join(ShoppingList, Expense.list_id == ShoppingList.id) if not show_all: expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id) else: expenses_query = expenses_query.filter( or_( ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True ) ) if category_id: if str(category_id) == "none": # Bez kategorii lists_query = lists_query.filter(~ShoppingList.categories.any()) else: lists_query = lists_query.join( shopping_list_category, shopping_list_category.c.shopping_list_id == ShoppingList.id, ).filter(shopping_list_category.c.category_id == category_id) if start_date_str and end_date_str: try: start = datetime.strptime(start_date_str, "%Y-%m-%d") end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) expenses_query = expenses_query.filter( Expense.added_at >= start, Expense.added_at < end ) except ValueError: flash("Błędny zakres dat", "danger") expenses = expenses_query.order_by(Expense.added_at.desc()).all() list_ids = {e.list_id for e in expenses} totals_map = {} if list_ids: totals = ( db.session.query( Expense.list_id, func.sum(Expense.amount).label("total_expense") ) .filter(Expense.list_id.in_(list_ids)) .group_by(Expense.list_id) .all() ) totals_map = {t.list_id: t.total_expense or 0 for t in totals} expense_table = [ { "title": e.shopping_list.title if e.shopping_list else "Nieznana", "amount": e.amount, "added_at": e.added_at, } for e in expenses ] lists_data = [ { "id": l.id, "title": l.title, "created_at": l.created_at, "total_expense": totals_map.get(l.id, 0), "owner_username": l.owner.username if l.owner else "?", "categories": [c.id for c in l.categories], } for l in {e.shopping_list for e in expenses if e.shopping_list} ] return render_template( "expenses.html", expense_table=expense_table, lists_data=lists_data, categories=categories, selected_category=category_id, show_all=show_all, ) @app.route("/expenses_data") @login_required def expenses_data(): range_type = request.args.get("range", "monthly") start_date = request.args.get("start_date") end_date = request.args.get("end_date") show_all = request.args.get("show_all", "true").lower() == "true" category_id = request.args.get("category_id") by_category = request.args.get("by_category", "false").lower() == "true" if by_category: result = get_total_expenses_grouped_by_category( show_all=show_all, range_type=range_type, start_date=start_date, end_date=end_date, user_id=current_user.id, category_id=category_id, ) else: result = get_total_expenses_grouped_by_list_created_at( user_only=True, admin=False, show_all=show_all, range_type=range_type, start_date=start_date, end_date=end_date, user_id=current_user.id, category_id=category_id, ) if "error" in result: return jsonify({"error": result["error"]}), 400 return jsonify(result) @app.route("/share/") @app.route("/guest-list/") def shared_list(token=None, list_id=None): if token: shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() if not check_list_public(shopping_list): return redirect(url_for("main_page")) list_id = shopping_list.id total_expense = get_total_expense_for_list(list_id) shopping_list, items, receipt_files, expenses, total_expense = get_list_details( list_id ) shopping_list.category_badges = [ {"name": c.name, "color": category_to_color(c.name)} for c in shopping_list.categories ] for item in items: if item.added_by != shopping_list.owner_id: item.added_by_display = ( item.added_by_user.username if item.added_by_user else "?" ) else: item.added_by_display = None return render_template( "list_share.html", list=shopping_list, items=items, receipt_files=receipt_files, expenses=expenses, total_expense=total_expense, is_share=True, ) @app.route("/copy/") @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) token = generate_share_token(8) new_list = ShoppingList( title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token ) db.session.add(new_list) db.session.commit() original_items = Item.query.filter_by(list_id=original.id).all() for item in original_items: copy_item = Item(list_id=new_list.id, name=item.name) db.session.add(copy_item) db.session.commit() flash("Skopiowano listę", "success") return redirect(url_for("view_list", list_id=new_list.id)) @app.route("/suggest_products") def suggest_products(): query = request.args.get("q", "") suggestions = [] if query: suggestions = ( SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%")) .limit(5) .all() ) return {"suggestions": [s.name for s in suggestions]} @app.route("/all_products") def all_products(): query = request.args.get("q", "") top_products_query = SuggestedProduct.query if query: top_products_query = top_products_query.filter( SuggestedProduct.name.ilike(f"%{query}%") ) top_products = ( db.session.query( func.lower(Item.name).label("name"), func.sum(Item.quantity).label("count") ) .join(ShoppingList, ShoppingList.id == Item.list_id) .filter(Item.purchased.is_(True)) .group_by(func.lower(Item.name)) .order_by(func.sum(Item.quantity).desc()) .limit(5) .all() ) top_names = [s.name for s in top_products] rest_query = SuggestedProduct.query if query: rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%")) if top_names: rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names)) rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all() all_names = top_names + [s.name for s in rest_products] seen = set() unique_names = [] for name in all_names: name_lower = name.strip().lower() if name_lower not in seen: unique_names.append(name) seen.add(name_lower) return {"allproducts": unique_names} @app.route("/upload_receipt/", methods=["POST"]) @login_required def upload_receipt(list_id): l = db.session.get(ShoppingList, list_id) if "receipt" not in request.files: return receipt_error("Brak pliku") file = request.files["receipt"] if file.filename == "": return receipt_error("Nie wybrano pliku") if file and allowed_file(file.filename): file_bytes = file.read() file.seek(0) file_hash = hashlib.sha256(file_bytes).hexdigest() existing = Receipt.query.filter_by(file_hash=file_hash).first() if existing: return receipt_error("Taki plik już istnieje") now = datetime.now(timezone.utc) timestamp = now.strftime("%Y%m%d_%H%M") random_part = secrets.token_hex(3) webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp" file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename) try: if file.filename.lower().endswith(".pdf"): file.seek(0) save_pdf_as_webp(file, file_path) else: save_resized_image(file, file_path) except ValueError as e: return receipt_error(str(e)) filesize = os.path.getsize(file_path) uploaded_at = datetime.now(timezone.utc) new_receipt = Receipt( list_id=list_id, filename=webp_filename, filesize=filesize, uploaded_at=uploaded_at, file_hash=file_hash, ) db.session.add(new_receipt) db.session.commit() if ( request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest" ): url = url_for("uploaded_file", filename=webp_filename) socketio.emit("receipt_added", {"url": url}, to=str(list_id)) return jsonify({"success": True, "url": url}) flash("Wgrano paragon", "success") return redirect(request.referrer or url_for("main_page")) return receipt_error("Niedozwolony format pliku") @app.route("/uploads/") def uploaded_file(filename): response = send_from_directory(app.config["UPLOAD_FOLDER"], filename) response.headers["Cache-Control"] = app.config["UPLOADS_CACHE_CONTROL"] response.headers.pop("Pragma", None) response.headers.pop("Content-Disposition", None) mime, _ = mimetypes.guess_type(filename) if mime: response.headers["Content-Type"] = mime return response @app.route("/reorder_items", methods=["POST"]) @login_required def reorder_items(): data = request.get_json() list_id = data.get("list_id") order = data.get("order") for index, item_id in enumerate(order): item = db.session.get(Item, item_id) if item and item.list_id == list_id: item.position = index db.session.commit() socketio.emit( "items_reordered", {"list_id": list_id, "order": order}, to=str(list_id) ) return jsonify(success=True) @app.route("/rotate_receipt/") @login_required def rotate_receipt_user(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) list_obj = ShoppingList.query.get_or_404(receipt.list_id) if not (current_user.is_admin or current_user.id == list_obj.owner_id): flash("Brak uprawnień do tej operacji", "danger") return redirect(url_for("main_page")) try: rotate_receipt_by_id(receipt_id) recalculate_filesizes(receipt_id) flash("Obrócono paragon", "success") except FileNotFoundError: flash("Plik nie istnieje", "danger") except Exception as e: flash(f"Błąd przy obracaniu: {str(e)}", "danger") return redirect(request.referrer or url_for("main_page")) @app.route("/delete_receipt/") @login_required def delete_receipt_user(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) list_obj = ShoppingList.query.get_or_404(receipt.list_id) if not (current_user.is_admin or current_user.id == list_obj.owner_id): flash("Brak uprawnień do tej operacji", "danger") return redirect(url_for("main_page")) try: delete_receipt_by_id(receipt_id) flash("Paragon usunięty", "success") except Exception as e: flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger") return redirect(request.referrer or url_for("main_page")) # OCR @app.route("/lists//analyze", methods=["POST"]) @login_required def analyze_receipts_for_list(list_id): receipt_objs = Receipt.query.filter_by(list_id=list_id).all() existing_expenses = { e.receipt_filename for e in Expense.query.filter_by(list_id=list_id).all() if e.receipt_filename } results = [] total = 0.0 for receipt in receipt_objs: filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(filepath): continue try: raw_image = Image.open(filepath).convert("RGB") image = preprocess_image_for_tesseract(raw_image) value, lines = extract_total_tesseract(image) except Exception as e: print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}") value = 0.0 lines = [] already_added = receipt.filename in existing_expenses results.append( { "id": receipt.id, "filename": receipt.filename, "amount": round(value, 2), "debug_text": lines, "already_added": already_added, } ) if not already_added: total += value return jsonify({"results": results, "total": round(total, 2)}) @app.route("/user_crop_receipt", methods=["POST"]) @login_required def crop_receipt_user(): receipt_id = request.form.get("receipt_id") file = request.files.get("cropped_image") receipt = Receipt.query.get_or_404(receipt_id) list_obj = ShoppingList.query.get_or_404(receipt.list_id) if list_obj.owner_id != current_user.id and not current_user.is_admin: return jsonify(success=False, error="Brak dostępu"), 403 result = handle_crop_receipt(receipt_id, file) return jsonify(result) @app.route("/admin") @login_required @admin_required def admin_panel(): month_str = request.args.get("m") if not month_str: month_str = datetime.now(timezone.utc).strftime("%Y-%m") show_all = month_str == "all" if not show_all: try: if month_str: year, month = map(int, month_str.split("-")) now = datetime(year, month, 1, tzinfo=timezone.utc) else: now = datetime.now(timezone.utc) month_str = now.strftime("%Y-%m") except Exception: now = datetime.now(timezone.utc) month_str = now.strftime("%Y-%m") start = now end = (start + timedelta(days=31)).replace(day=1) else: now = datetime.now(timezone.utc) start = end = None # Liczniki globalne user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() base_query = ShoppingList.query.options( joinedload(ShoppingList.owner), joinedload(ShoppingList.items), joinedload(ShoppingList.receipts), joinedload(ShoppingList.expenses), joinedload(ShoppingList.categories), ) if not show_all and start and end: base_query = base_query.filter( ShoppingList.created_at >= start, ShoppingList.created_at < end ) all_lists = base_query.all() # tylko listy z danych miesięcy month_options = get_active_months_query() all_ids = [l.id for l in all_lists] stats_map = {} latest_expenses_map = {} if all_ids: # Statystyki produktów stats = ( db.session.query( Item.list_id, func.count(Item.id).label("total_count"), func.sum(case((Item.purchased == True, 1), else_=0)).label( "purchased_count" ), ) .filter(Item.list_id.in_(all_ids)) .group_by(Item.list_id) .all() ) stats_map = { s.list_id: (s.total_count or 0, s.purchased_count or 0) for s in stats } latest_expenses_map = dict( db.session.query( Expense.list_id, func.coalesce(func.sum(Expense.amount), 0) ) .filter(Expense.list_id.in_(all_ids)) .group_by(Expense.list_id) .all() ) enriched_lists = [] for l in all_lists: total_count, purchased_count = stats_map.get(l.id, (0, 0)) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 comments_count = sum(1 for i in l.items if i.note and i.note.strip() != "") receipts_count = len(l.receipts) total_expense = latest_expenses_map.get(l.id, 0) if l.is_temporary and l.expires_at: expires_at = l.expires_at if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=timezone.utc) is_expired = expires_at < now else: is_expired = False enriched_lists.append( { "list": l, "total_count": total_count, "purchased_count": purchased_count, "percent": round(percent), "comments_count": comments_count, "receipts_count": receipts_count, "total_expense": total_expense, "expired": is_expired, "categories": l.categories, } ) top_products = ( db.session.query(Item.name, func.count(Item.id).label("count")) .filter(Item.purchased.is_(True)) .group_by(Item.name) .order_by(func.count(Item.id).desc()) .limit(5) .all() ) purchased_items_count = Item.query.filter_by(purchased=True).count() expense_summary = get_admin_expense_summary() process = psutil.Process(os.getpid()) app_mem = process.memory_info().rss // (1024 * 1024) db_engine = db.engine db_info = { "engine": db_engine.name, "version": getattr(db_engine.dialect, "server_version_info", None), "url": str(db_engine.url).split("?")[0], } inspector = inspect(db_engine) table_count = len(inspector.get_table_names()) record_total = get_total_records() uptime_minutes = int( (datetime.now(timezone.utc) - app_start_time).total_seconds() // 60 ) return render_template( "admin/admin_panel.html", user_count=user_count, list_count=list_count, item_count=item_count, purchased_items_count=purchased_items_count, enriched_lists=enriched_lists, top_products=top_products, expense_summary=expense_summary, now=now, python_version=sys.version, system_info=platform.platform(), app_memory=f"{app_mem} MB", db_info=db_info, table_count=table_count, record_total=record_total, uptime_minutes=uptime_minutes, timedelta=timedelta, show_all=show_all, month_str=month_str, month_options=month_options, ) @app.route("/admin/delete_list/") @login_required @admin_required def delete_list(list_id): delete_receipts_for_list(list_id) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() Expense.query.filter_by(list_id=list_to_delete.id).delete() db.session.delete(list_to_delete) db.session.commit() flash(f"Usunięto listę: {list_to_delete.title}", "success") return redirect(url_for("admin_panel")) @app.route("/admin/add_user", methods=["POST"]) @login_required @admin_required def add_user(): username = request.form["username"].lower() password = request.form["password"] if not username or not password: flash("Wypełnij wszystkie pola", "danger") return redirect(url_for("list_users")) if User.query.filter(func.lower(User.username) == username).first(): flash("Użytkownik o takiej nazwie już istnieje", "warning") return redirect(url_for("list_users")) hashed_password = hash_password(password) new_user = User(username=username, password_hash=hashed_password) db.session.add(new_user) db.session.commit() flash("Dodano nowego użytkownika", "success") return redirect(url_for("list_users")) @app.route("/admin/users") @login_required @admin_required def list_users(): users = User.query.all() user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"] return render_template( "admin/user_management.html", users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log, ) @app.route("/admin/change_password/", methods=["POST"]) @login_required @admin_required def reset_password(user_id): user = User.query.get_or_404(user_id) new_password = request.form["password"] if not new_password: flash("Podaj nowe hasło", "danger") return redirect(url_for("list_users")) user.password_hash = hash_password(new_password) db.session.commit() flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success") return redirect(url_for("list_users")) @app.route("/admin/delete_user/") @login_required @admin_required def delete_user(user_id): user = User.query.get_or_404(user_id) if user.is_admin: admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1: flash("Nie można usunąć ostatniego administratora.", "danger") return redirect(url_for("list_users")) db.session.delete(user) db.session.commit() flash("Użytkownik usunięty", "success") return redirect(url_for("list_users")) @app.route("/admin/receipts/") @login_required @admin_required def admin_receipts(id): try: page = request.args.get("page", 1, type=int) per_page = request.args.get("per_page", 24, type=int) per_page = max(1, min(per_page, 200)) # sanity check if id == "all": all_filenames = {r.filename for r in Receipt.query.all()} pagination = Receipt.query.order_by(Receipt.uploaded_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) receipts_paginated = pagination.items total_pages = pagination.pages upload_folder = app.config["UPLOAD_FOLDER"] files_on_disk = set(os.listdir(upload_folder)) orphan_files = [ f for f in files_on_disk if f.endswith(".webp") and f not in all_filenames and f.startswith("list_") ] else: list_id = int(id) receipts_paginated = ( Receipt.query.filter_by(list_id=list_id) .order_by(Receipt.uploaded_at.desc()) .all() ) orphan_files = [] page = 1 total_pages = 1 per_page = len(receipts_paginated) or 1 except ValueError: flash("Nieprawidłowe ID listy.", "danger") return redirect(url_for("admin_panel")) query_string = urlencode({k: v for k, v in request.args.items() if k != "page"}) return render_template( "admin/receipts.html", receipts=receipts_paginated, orphan_files=orphan_files, orphan_files_count=len(orphan_files), page=page, per_page=per_page, total_pages=total_pages, id=id, query_string=query_string, ) @app.route("/admin/rotate_receipt/") @login_required @admin_required def rotate_receipt(receipt_id): try: rotate_receipt_by_id(receipt_id) recalculate_filesizes(receipt_id) flash("Obrócono paragon", "success") except FileNotFoundError: flash("Plik nie istnieje", "danger") except Exception as e: flash(f"Błąd przy obracaniu: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/delete_receipt/") @app.route("/admin/delete_receipt/orphan/") @login_required @admin_required def delete_receipt(receipt_id=None, filename=None): if filename: # tryb orphan safe_filename = os.path.basename(filename) if Receipt.query.filter_by(filename=safe_filename).first(): flash("Nie można usunąć pliku powiązanego z bazą!", "danger") else: file_path = os.path.join(app.config["UPLOAD_FOLDER"], safe_filename) if os.path.exists(file_path): try: os.remove(file_path) flash(f"Usunięto plik: {safe_filename}", "success") except Exception as e: flash(f"Błąd przy usuwaniu pliku: {e}", "danger") else: flash("Plik już nie istnieje.", "warning") return redirect(url_for("admin_receipts", id="all")) try: delete_receipt_by_id(receipt_id) flash("Paragon usunięty", "success") except Exception as e: flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/rename_receipt/") @login_required @admin_required def rename_receipt(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(old_path): flash("Plik nie istnieje", "danger") return redirect(request.referrer) new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) try: os.rename(old_path, new_path) receipt.filename = new_filename db.session.flush() recalculate_filesizes(receipt.id) db.session.commit() flash("Zmieniono nazwę pliku", "success") except Exception as e: flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/generate_receipt_hash/") @login_required @admin_required def generate_receipt_hash(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) if receipt.file_hash: flash("Hash już istnieje", "info") return redirect(request.referrer) file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(file_path): flash("Plik nie istnieje", "danger") return redirect(request.referrer) try: with open(file_path, "rb") as f: file_hash = hashlib.sha256(f.read()).hexdigest() receipt.file_hash = file_hash db.session.commit() flash("Hash wygenerowany", "success") except Exception as e: flash(f"Błąd przy generowaniu hasha: {e}", "danger") return redirect(request.referrer) @app.route("/admin/delete_selected_lists", methods=["POST"]) @login_required @admin_required def delete_selected_lists(): ids = request.form.getlist("list_ids") for list_id in ids: lst = db.session.get(ShoppingList, int(list_id)) if lst: delete_receipts_for_list(lst.id) Item.query.filter_by(list_id=lst.id).delete() Expense.query.filter_by(list_id=lst.id).delete() db.session.delete(lst) db.session.commit() flash("Usunięto wybrane listy", "success") return redirect(url_for("admin_panel")) @app.route("/admin/edit_list/", methods=["GET", "POST"]) @login_required @admin_required def edit_list(list_id): l = db.session.get( ShoppingList, list_id, options=[ joinedload(ShoppingList.expenses), joinedload(ShoppingList.receipts), joinedload(ShoppingList.owner), joinedload(ShoppingList.items), joinedload(ShoppingList.categories), ], ) if l is None: abort(404) total_expense = get_total_expense_for_list(l.id) categories = Category.query.order_by(Category.name.asc()).all() selected_categories_ids = {c.id for c in l.categories} if request.method == "POST": action = request.form.get("action") if action == "save": new_title = request.form.get("title", "").strip() new_amount_str = request.form.get("amount") is_archived = "archived" in request.form is_public = "public" in request.form is_temporary = "temporary" in request.form new_owner_id = request.form.get("owner_id") expires_date = request.form.get("expires_date") expires_time = request.form.get("expires_time") if new_title: l.title = new_title l.is_archived = is_archived l.is_public = is_public l.is_temporary = is_temporary if expires_date and expires_time: try: combined_str = f"{expires_date} {expires_time}" dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M") l.expires_at = dt.replace(tzinfo=timezone.utc) except ValueError: flash("Niepoprawna data lub godzina wygasania", "danger") return redirect(url_for("edit_list", list_id=list_id)) else: l.expires_at = None if new_owner_id: try: new_owner_id_int = int(new_owner_id) user_obj = db.session.get(User, new_owner_id_int) if user_obj: l.owner_id = new_owner_id_int else: flash("Wybrany użytkownik nie istnieje", "danger") return redirect(url_for("edit_list", list_id=list_id)) except ValueError: flash("Niepoprawny ID użytkownika", "danger") return redirect(url_for("edit_list", list_id=list_id)) if new_amount_str: try: new_amount = float(new_amount_str) for expense in l.expenses: db.session.delete(expense) db.session.commit() db.session.add(Expense(list_id=list_id, amount=new_amount)) except ValueError: flash("Niepoprawna kwota", "danger") return redirect(url_for("edit_list", list_id=list_id)) created_month = request.form.get("created_month") if created_month: try: year, month = map(int, created_month.split("-")) l.created_at = datetime(year, month, 1, tzinfo=timezone.utc) except ValueError: flash( "Nieprawidłowy format miesiąca (przeniesienie daty utworzenia)", "danger", ) return redirect(url_for("edit_list", list_id=list_id)) update_list_categories_from_form(l, request.form) db.session.add(l) db.session.commit() flash("Zapisano zmiany listy", "success") return redirect(url_for("edit_list", list_id=list_id)) elif action == "add_item": item_name = request.form.get("item_name", "").strip() quantity_str = request.form.get("quantity", "1") if not item_name: flash("Podaj nazwę produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) try: quantity = max(1, int(quantity_str)) except ValueError: quantity = 1 db.session.add( Item( list_id=list_id, name=item_name, quantity=quantity, added_by=current_user.id, ) ) exists = ( db.session.query(SuggestedProduct) .filter(func.lower(SuggestedProduct.name) == item_name.lower()) .first() ) if not exists: db.session.add(SuggestedProduct(name=item_name)) db.session.commit() flash("Dodano produkt", "success") return redirect(url_for("edit_list", list_id=list_id)) elif action == "delete_item": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: db.session.delete(item) db.session.commit() flash("Usunięto produkt", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "toggle_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.purchased = not item.purchased db.session.commit() flash("Zmieniono status oznaczenia produktu", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "mark_not_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.not_purchased = True item.purchased = False item.purchased_at = None db.session.commit() flash("Oznaczono produkt jako niekupione", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "unmark_not_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.not_purchased = False item.not_purchased_reason = None item.purchased = False item.purchased_at = None db.session.commit() flash("Przywrócono produkt do listy", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "edit_quantity": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: try: new_quantity = int(request.form.get("quantity")) if new_quantity > 0: item.quantity = new_quantity db.session.commit() flash("Zmieniono ilość produktu", "success") except ValueError: flash("Nieprawidłowa ilość", "danger") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) users = User.query.all() items = l.items receipts = l.receipts return render_template( "admin/edit_list.html", list=l, total_expense=total_expense, users=users, items=items, receipts=receipts, categories=categories, selected_categories=selected_categories_ids, ) @app.route("/admin/products") @login_required @admin_required def list_products(): page = request.args.get("page", 1, type=int) per_page = request.args.get("per_page", 100, type=int) per_page = max(1, min(per_page, 300)) all_items = ( Item.query.options( joinedload(Item.added_by_user), ) .order_by(Item.id.desc()) .all() ) seen_names = set() unique_items = [] for item in all_items: key = normalize_name(item.name) if key not in seen_names: unique_items.append(item) seen_names.add(key) usage_counts = dict( db.session.query( func.lower(Item.name), func.coalesce(func.sum(Item.quantity), 0) ) .group_by(func.lower(Item.name)) .all() ) total_items = len(unique_items) total_pages = (total_items + per_page - 1) // per_page start = (page - 1) * per_page end = start + per_page items = unique_items[start:end] user_ids = {item.added_by for item in items if item.added_by} users = User.query.filter(User.id.in_(user_ids)).all() if user_ids else [] users_dict = {u.id: u.username for u in users} suggestions = SuggestedProduct.query.all() all_suggestions_dict = { normalize_name(s.name): s for s in suggestions if s.name and s.name.strip() } used_suggestion_names = {normalize_name(i.name) for i in unique_items} suggestions_dict = { name: all_suggestions_dict[name] for name in used_suggestion_names if name in all_suggestions_dict } orphan_suggestions = [ s for name, s in all_suggestions_dict.items() if name not in used_suggestion_names ] query_string = urlencode({k: v for k, v in request.args.items() if k != "page"}) synced_names = set(suggestions_dict.keys()) return render_template( "admin/list_products.html", items=items, users_dict=users_dict, suggestions_dict=suggestions_dict, orphan_suggestions=orphan_suggestions, page=page, per_page=per_page, total_pages=total_pages, query_string=query_string, total_items=total_items, usage_counts=usage_counts, synced_names=synced_names ) @app.route("/admin/sync_suggestion/", methods=["POST"]) @login_required def sync_suggestion_ajax(item_id): if not current_user.is_admin: return jsonify({"success": False, "message": "Brak uprawnień"}), 403 item = Item.query.get_or_404(item_id) existing = SuggestedProduct.query.filter( func.lower(SuggestedProduct.name) == item.name.lower() ).first() if not existing: new_suggestion = SuggestedProduct(name=item.name) db.session.add(new_suggestion) db.session.commit() return jsonify( { "success": True, "message": f"Utworzono sugestię dla produktu: {item.name}", } ) else: return jsonify( { "success": True, "message": f"Sugestia dla produktu „{item.name}” już istnieje.", } ) @app.route("/admin/delete_suggestion/", methods=["POST"]) @login_required def delete_suggestion_ajax(suggestion_id): if not current_user.is_admin: return jsonify({"success": False, "message": "Brak uprawnień"}), 403 suggestion = SuggestedProduct.query.get_or_404(suggestion_id) db.session.delete(suggestion) db.session.commit() return jsonify({"success": True, "message": "Sugestia została usunięta."}) @app.route("/admin/promote_user/") @login_required @admin_required def promote_user(user_id): user = User.query.get_or_404(user_id) user.is_admin = True db.session.commit() flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success") return redirect(url_for("list_users")) @app.route("/admin/demote_user/") @login_required @admin_required def demote_user(user_id): user = User.query.get_or_404(user_id) if user.id == current_user.id: flash("Nie możesz zdegradować samego siebie!", "danger") return redirect(url_for("list_users")) admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1 and user.is_admin: flash( "Nie można zdegradować. Musi pozostać co najmniej jeden administrator.", "danger", ) return redirect(url_for("list_users")) user.is_admin = False db.session.commit() flash(f"Użytkownik {user.username} został zdegradowany.", "success") return redirect(url_for("list_users")) @app.route("/admin/crop_receipt", methods=["POST"]) @login_required @admin_required def crop_receipt_admin(): receipt_id = request.form.get("receipt_id") file = request.files.get("cropped_image") result = handle_crop_receipt(receipt_id, file) return jsonify(result) @app.route("/admin/recalculate_filesizes") @login_required @admin_required def recalculate_filesizes_all(): updated, unchanged, not_found = recalculate_filesizes() flash( f"Zaktualizowano: {updated}, bez zmian: {unchanged}, brak pliku: {not_found}", "success", ) return redirect(url_for("admin_receipts", id="all")) @app.route("/admin/mass_edit_categories", methods=["GET", "POST"]) @login_required @admin_required def admin_mass_edit_categories(): page = request.args.get("page", 1, type=int) per_page = request.args.get("per_page", 50, type=int) per_page = max(1, min(per_page, 200)) # ogranicz do sensownych wartości lists_query = ShoppingList.query.options( joinedload(ShoppingList.categories), joinedload(ShoppingList.items), joinedload(ShoppingList.owner), ).order_by(ShoppingList.created_at.desc()) pagination = lists_query.paginate(page=page, per_page=per_page, error_out=False) lists = pagination.items categories = Category.query.order_by(Category.name.asc()).all() for l in lists: l.total_count = len(l.items) l.owner_name = l.owner.username if l.owner else "?" l.category_count = len(l.categories) if request.method == "POST": for l in lists: selected_ids = request.form.getlist(f"categories_{l.id}") l.categories.clear() if selected_ids: cats = Category.query.filter(Category.id.in_(selected_ids)).all() l.categories.extend(cats) db.session.commit() flash("Zaktualizowano kategorie dla wybranych list", "success") return redirect( url_for("admin_mass_edit_categories", page=page, per_page=per_page) ) query_string = urlencode({k: v for k, v in request.args.items() if k != "page"}) return render_template( "admin/mass_edit_categories.html", lists=lists, categories=categories, page=page, per_page=per_page, total_pages=pagination.pages, total_items=pagination.total, query_string=query_string, ) @app.route("/admin/list_items/") @login_required @admin_required def admin_list_items_json(list_id): l = db.session.get(ShoppingList, list_id) if not l: return jsonify({"error": "Lista nie istnieje"}), 404 items = [ { "name": item.name, "quantity": item.quantity, "purchased": item.purchased, "not_purchased": item.not_purchased, } for item in l.items ] purchased_count = sum(1 for item in l.items if item.purchased) total_expense = sum(exp.amount for exp in l.expenses) return jsonify( { "title": l.title, "items": items, "total_count": len(l.items), "purchased_count": purchased_count, "total_expense": round(total_expense, 2), } ) @app.route("/healthcheck") def healthcheck(): header_token = request.headers.get("X-Internal-Check") correct_token = app.config.get("HEALTHCHECK_TOKEN") if header_token != correct_token: abort(404) return "OK", 200 @app.route("/robots.txt") def robots_txt(): content = ( "User-agent: *\nDisallow: /" if app.config.get("DISABLE_ROBOTS") else "User-agent: *\nAllow: /" ) return content, 200, {"Content-Type": "text/plain"} # ========================================================================================= # SOCKET.IO # ========================================================================================= @socketio.on("delete_item") def handle_delete_item(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) if item: list_id = item.list_id db.session.delete(item) db.session.commit() emit("item_deleted", {"item_id": item.id}, to=str(item.list_id)) purchased_count, total_count, percent = get_progress(list_id) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(list_id), ) @socketio.on("edit_item") def handle_edit_item(data): item = db.session.get(Item, data["item_id"]) new_name = data["new_name"] new_quantity = data.get("new_quantity", item.quantity) if item and new_name.strip(): item.name = new_name.strip() try: new_quantity = int(new_quantity) if new_quantity < 1: new_quantity = 1 except: new_quantity = 1 item.quantity = new_quantity db.session.commit() emit( "item_edited", {"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity}, to=str(item.list_id), ) @socketio.on("join_list") def handle_join(data): global active_users room = str(data["room"]) username = data.get("username", "Gość") join_room(room) if room not in active_users: active_users[room] = set() active_users[room].add(username) shopping_list = db.session.get(ShoppingList, int(data["room"])) list_title = shopping_list.title if shopping_list else "Twoja lista" emit("user_joined", {"username": username}, to=room) emit("user_list", {"users": list(active_users[room])}, to=room) emit("joined_confirmation", {"room": room, "list_title": list_title}) @socketio.on("disconnect") def handle_disconnect(sid): global active_users username = current_user.username if current_user.is_authenticated else "Gość" for room, users in active_users.items(): if username in users: users.remove(username) emit("user_left", {"username": username}, to=room) emit("user_list", {"users": list(users)}, to=room) @socketio.on("add_item") def handle_add_item(data): list_id = data["list_id"] name = data["name"].strip() quantity = data.get("quantity", 1) list_obj = db.session.get(ShoppingList, list_id) if not list_obj: return try: quantity = int(quantity) if quantity < 1: quantity = 1 except: quantity = 1 existing_item = Item.query.filter( Item.list_id == list_id, func.lower(Item.name) == name.lower(), Item.not_purchased == False, ).first() if existing_item: existing_item.quantity += quantity db.session.commit() emit( "item_edited", { "item_id": existing_item.id, "new_name": existing_item.name, "new_quantity": existing_item.quantity, }, to=str(list_id), ) else: max_position = ( db.session.query(func.max(Item.position)) .filter_by(list_id=list_id) .scalar() ) if max_position is None: max_position = 0 user_id = current_user.id if current_user.is_authenticated else None user_name = current_user.username if current_user.is_authenticated else "Gość" new_item = Item( list_id=list_id, name=name, quantity=quantity, position=max_position + 1, added_by=user_id, ) db.session.add(new_item) if not SuggestedProduct.query.filter( func.lower(SuggestedProduct.name) == name.lower() ).first(): new_suggestion = SuggestedProduct(name=name) db.session.add(new_suggestion) db.session.commit() emit( "item_added", { "id": new_item.id, "name": new_item.name, "quantity": new_item.quantity, "added_by": user_name, "added_by_id": user_id, "owner_id": list_obj.owner_id, }, to=str(list_id), include_self=True, ) purchased_count, total_count, percent = get_progress(list_id) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(list_id), ) @socketio.on("check_item") def handle_check_item(data): item = db.session.get(Item, data["item_id"]) if item: item.purchased = True item.purchased_at = datetime.now(UTC) db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit("item_checked", {"item_id": item.id}, to=str(item.list_id)) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(item.list_id), ) @socketio.on("uncheck_item") def handle_uncheck_item(data): item = db.session.get(Item, data["item_id"]) if item: item.purchased = False item.purchased_at = None db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id)) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(item.list_id), ) @socketio.on("request_full_list") def handle_request_full_list(data): list_id = data["list_id"] shopping_list = db.session.get(ShoppingList, list_id) if not shopping_list: return owner_id = shopping_list.owner_id items = ( Item.query.options(joinedload(Item.added_by_user)) .filter_by(list_id=list_id) .order_by(Item.position.asc()) .all() ) items_data = [] for item in items: items_data.append( { "id": item.id, "name": item.name, "quantity": item.quantity, "purchased": item.purchased if not item.not_purchased else False, "not_purchased": item.not_purchased, "not_purchased_reason": item.not_purchased_reason, "note": item.note or "", "added_by": item.added_by_user.username if item.added_by_user else None, "added_by_id": item.added_by_user.id if item.added_by_user else None, "owner_id": owner_id, } ) emit("full_list", {"items": items_data}, to=request.sid) @socketio.on("update_note") def handle_update_note(data): item_id = data["item_id"] note = data["note"] item = Item.query.get(item_id) if item: item.note = note db.session.commit() emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id)) @socketio.on("add_expense") def handle_add_expense(data): list_id = data["list_id"] amount = data["amount"] receipt_filename = data.get("receipt_filename") if receipt_filename: existing = Expense.query.filter_by( list_id=list_id, receipt_filename=receipt_filename ).first() if existing: return new_expense = Expense( list_id=list_id, amount=amount, receipt_filename=receipt_filename ) db.session.add(new_expense) db.session.commit() total = ( db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0 ) emit("expense_added", {"amount": amount, "total": total}, to=str(list_id)) @socketio.on("mark_not_purchased") def handle_mark_not_purchased(data): item = db.session.get(Item, data["item_id"]) reason = data.get("reason", "") if item: item.not_purchased = True item.not_purchased_reason = reason db.session.commit() emit( "item_marked_not_purchased", {"item_id": item.id, "reason": reason}, to=str(item.list_id), ) @socketio.on("unmark_not_purchased") def handle_unmark_not_purchased(data): item = db.session.get(Item, data["item_id"]) if item: item.not_purchased = False item.purchased = False item.purchased_at = None item.not_purchased_reason = None db.session.commit() emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id)) @app.cli.command("db_info") def create_db(): with app.app_context(): inspector = inspect(db.engine) actual_tables = inspector.get_table_names() table_count = len(actual_tables) record_total = 0 with db.engine.connect() as conn: for table in actual_tables: try: count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar() record_total += count except Exception: pass print("\nStruktura bazy danych jest poprawna.") print(f"Silnik: {db.engine.name}") print(f"Liczba tabel: {table_count}") print(f"Łączna liczba rekordów: {record_total}") if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO) socketio.run(app, host="0.0.0.0", port=8000, debug=False)