import os import secrets import time import mimetypes import sys import platform import psutil import hashlib import re import traceback from pillow_heif import register_heif_opener from datetime import datetime, timedelta, UTC, timezone from urllib.parse import urlparse, urlunparse from flask import ( Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session, jsonify, make_response, ) from markupsafe import Markup from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, UserMixin, login_user, login_required, logout_user, current_user, ) from flask_compress import Compress from flask_socketio import SocketIO, emit, join_room from werkzeug.security import generate_password_hash, check_password_hash from config import Config from PIL import Image, ExifTags, ImageFilter, ImageOps from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import func, extract, inspect, or_ from sqlalchemy.orm import joinedload from collections import defaultdict, deque from functools import wraps from flask_talisman import Talisman # OCR import pytesseract from collections import Counter from pytesseract import Output import logging app = Flask(__name__) app.config.from_object(Config) # Konfiguracja nagłówków bezpieczeństwa z .env csp_policy = None if app.config.get("ENABLE_CSP", True): csp_policy = { "default-src": "'self'", "script-src": "'self'", # wciąż bez inline JS "style-src": "'self' 'unsafe-inline'", # dopuszczamy style w HTML-u "img-src": "'self' data:", # pozwalamy na data:image (np. SVG) "connect-src": "'self'", # WebSockety "script-src": "'self' 'unsafe-inline'", } permissions_policy = {"browsing-topics": "()"} if app.config["ENABLE_PP"] else None talisman = Talisman( app, force_https=False, strict_transport_security=app.config.get("ENABLE_HSTS", True), frame_options="DENY" if app.config.get("ENABLE_XFO", True) else None, permissions_policy=permissions_policy, content_security_policy=csp_policy, x_content_type_options=app.config.get("ENABLE_XCTO", True), strict_transport_security_include_subdomains=False, ) register_heif_opener() # pillow_heif dla HEIC ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic"} SQLALCHEMY_ECHO = True SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme") DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin") DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123") UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads") AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9") AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400) HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234") SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES", 10080)) app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"] app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) DEBUG_MODE = app.config.get("DEBUG_MODE", False) os.makedirs(UPLOAD_FOLDER, exist_ok=True) failed_login_attempts = defaultdict(deque) MAX_ATTEMPTS = 10 TIME_WINDOW = 60 * 60 db = SQLAlchemy(app) socketio = SocketIO(app, async_mode="eventlet") login_manager = LoginManager(app) login_manager.login_view = "login" # flask-compress compress = Compress() compress.init_app(app) static_bp = Blueprint("static_bp", __name__) # dla live active_users = {} def utcnow(): return datetime.now(timezone.utc) app_start_time = utcnow() class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(512), nullable=False) is_admin = db.Column(db.Boolean, default=False) class ShoppingList(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(150), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) owner_id = db.Column(db.Integer, db.ForeignKey("user.id")) owner = db.relationship("User", backref="lists", foreign_keys=[owner_id]) is_temporary = db.Column(db.Boolean, default=False) share_token = db.Column(db.String(64), unique=True, nullable=True) # expires_at = db.Column(db.DateTime, nullable=True) expires_at = db.Column(db.DateTime(timezone=True), nullable=True) owner = db.relationship("User", backref="lists", lazy=True) is_archived = db.Column(db.Boolean, default=False) is_public = db.Column(db.Boolean, default=True) class Item(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) name = db.Column(db.String(150), nullable=False) # added_at = db.Column(db.DateTime, default=datetime.utcnow) added_at = db.Column(db.DateTime, default=utcnow) added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True) added_by_user = db.relationship( "User", backref="added_items", lazy=True, foreign_keys=[added_by] ) purchased = db.Column(db.Boolean, default=False) purchased_at = db.Column(db.DateTime, nullable=True) quantity = db.Column(db.Integer, default=1) note = db.Column(db.Text, nullable=True) not_purchased = db.Column(db.Boolean, default=False) not_purchased_reason = db.Column(db.Text, nullable=True) position = db.Column(db.Integer, default=0) class SuggestedProduct(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(150), unique=True, nullable=False) usage_count = db.Column(db.Integer, default=0) class Expense(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id")) amount = db.Column(db.Float, nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) receipt_filename = db.Column(db.String(255), nullable=True) list = db.relationship("ShoppingList", backref="expenses", lazy=True) class Receipt(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False) filename = db.Column(db.String(255), nullable=False) uploaded_at = db.Column(db.DateTime, default=datetime.utcnow) shopping_list = db.relationship("ShoppingList", backref="receipts", lazy=True) filesize = db.Column(db.Integer, nullable=True) file_hash = db.Column(db.String(64), nullable=True, unique=True) with app.app_context(): db.create_all() admin = User.query.filter_by(is_admin=True).first() username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin") password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123") password_hash = generate_password_hash(password) if admin: if admin.username != username or not check_password_hash( admin.password_hash, password ): admin.username = username admin.password_hash = password_hash db.session.commit() else: admin = User(username=username, password_hash=password_hash, is_admin=True) db.session.add(admin) db.session.commit() @static_bp.route("/static/js/") def serve_js(filename): response = send_from_directory("static/js", filename) #response.cache_control.no_cache = True #response.cache_control.no_store = True #response.cache_control.must_revalidate = True response.headers["Cache-Control"] = app.config["JS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) response.headers.pop("Etag", None) return response @static_bp.route("/static/css/") def serve_css(filename): response = send_from_directory("static/css", filename) response.headers["Cache-Control"] = app.config["CSS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) response.headers.pop("Etag", None) return response @static_bp.route("/static/lib/js/") def serve_js_lib(filename): response = send_from_directory("static/lib/js", filename) response.headers["Cache-Control"] = app.config["LIB_JS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) response.headers.pop("Etag", None) return response # CSS z cache na tydzień @static_bp.route("/static/lib/css/") def serve_css_lib(filename): response = send_from_directory("static/lib/css", filename) response.headers["Cache-Control"] = app.config["LIB_CSS_CACHE_CONTROL"] response.headers.pop("Content-Disposition", None) response.headers.pop("Etag", None) return response app.register_blueprint(static_bp) def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def get_list_details(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all() expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) receipts = Receipt.query.filter_by(list_id=list_id).all() receipt_files = [r.filename for r in receipts] return shopping_list, items, receipt_files, expenses, total_expense def generate_share_token(length=8): return secrets.token_hex(length // 2) def check_list_public(shopping_list): if not shopping_list.is_public: flash("Ta lista nie jest publicznie dostępna", "danger") return False return True def enrich_list_data(l): items = Item.query.filter_by(list_id=l.id).all() l.total_count = len(items) l.purchased_count = len([i for i in items if i.purchased]) expenses = Expense.query.filter_by(list_id=l.id).all() l.total_expense = sum(e.amount for e in expenses) return l def save_resized_image(file, path): try: # Otwórz i sprawdź poprawność pliku image = Image.open(file) image.verify() file.seek(0) image = Image.open(file) except Exception: raise ValueError("Nieprawidłowy plik graficzny") try: # Automatyczna rotacja według EXIF (np. zdjęcia z telefonu) image = ImageOps.exif_transpose(image) except Exception: pass # ignorujemy, jeśli EXIF jest uszkodzony lub brak try: image.thumbnail((2000, 2000)) image = image.convert("RGB") image.info.clear() new_path = path.rsplit(".", 1)[0] + ".webp" # image.save(new_path, format="WEBP", quality=100, method=0) image.save(new_path, format="WEBP", lossless=True, method=6) except Exception as e: raise ValueError(f"Błąd podczas przetwarzania obrazu: {e}") def redirect_with_flash( message: str, category: str = "info", endpoint: str = "main_page" ): flash(message, category) return redirect(url_for(endpoint)) def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated or not current_user.is_admin: return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger") return f(*args, **kwargs) return decorated_function def get_progress(list_id): items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all() total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return purchased_count, total_count, percent def delete_receipts_for_list(list_id): receipt_pattern = f"list_{list_id}_" upload_folder = app.config["UPLOAD_FOLDER"] for filename in os.listdir(upload_folder): if filename.startswith(receipt_pattern): try: os.remove(os.path.join(upload_folder, filename)) except Exception as e: print(f"Nie udało się usunąć pliku {filename}: {e}") def _receipt_error(message): if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest": return jsonify({"success": False, "error": message}), 400 flash(message, "danger") return redirect(request.referrer or url_for("main_page")) def rotate_receipt_by_id(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(old_path): raise FileNotFoundError("Plik nie istnieje") image = Image.open(old_path) rotated = image.rotate(-90, expand=True) new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) rotated.save(new_path, format="WEBP", quality=100) os.remove(old_path) receipt.filename = new_filename db.session.commit() return receipt def delete_receipt_by_id(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if os.path.exists(filepath): os.remove(filepath) db.session.delete(receipt) db.session.commit() return receipt def generate_new_receipt_filename(list_id): timestamp = datetime.now().strftime("%Y%m%d_%H%M") random_part = secrets.token_hex(3) return f"list_{list_id}_{timestamp}_{random_part}.webp" ############# OCR ########################### def preprocess_image_for_tesseract(image): image = ImageOps.autocontrast(image) image = image.point(lambda x: 0 if x < 150 else 255) # mocniejsza binarizacja image = image.resize( (image.width * 2, image.height * 2), Image.BICUBIC ) # większe powiększenie return image def extract_total_tesseract(image): text = pytesseract.image_to_string(image, lang="pol", config="--psm 4") lines = text.splitlines() candidates = [] blacklist_keywords = re.compile(r"\b(ptu|vat|podatek|stawka)\b", re.IGNORECASE) priority_keywords = re.compile( r""" \b( razem\s*do\s*zap[łl][aąo0]ty | do\s*zap[łl][aąo0]ty | suma | kwota | warto[śćs] | płatno[śćs] | total | amount )\b """, re.IGNORECASE | re.VERBOSE, ) for line in lines: if not line.strip(): continue if blacklist_keywords.search(line): continue is_priority = priority_keywords.search(line) matches = re.findall(r"\d{1,4}[.,]\d{2}", line) for match in matches: try: val = float(match.replace(",", ".")) if 0.1 <= val <= 100000: candidates.append((val, line, is_priority is not None)) except: continue # Tylko w liniach priorytetowych: sprawdzamy spaced fallback if is_priority: spaced = re.findall(r"\d{1,4}\s\d{2}", line) for match in spaced: try: val = float(match.replace(" ", ".")) if 0.1 <= val <= 100000: candidates.append((val, line, True)) except: continue # Preferujemy linie priorytetowe preferred = [(val, line) for val, line, is_pref in candidates if is_pref] if preferred: best_val = max(preferred, key=lambda x: x[0])[0] if best_val < 99999: return round(best_val, 2), lines if candidates: best_val = max(candidates, key=lambda x: x[0])[0] if best_val < 99999: return round(best_val, 2), lines # Fallback: największy font + bold data = pytesseract.image_to_data( image, lang="pol", config="--psm 4", output_type=Output.DICT ) font_candidates = [] for i in range(len(data["text"])): word = data["text"][i].strip() if not word or not re.match(r"^\d{1,5}[.,\s]\d{2}$", word): continue try: val = float(word.replace(",", ".").replace(" ", ".")) height = data["height"][i] conf = int(data.get("conf", ["0"] * len(data["text"]))[i]) if 0.1 <= val <= 100000: font_candidates.append((val, height, conf)) except: continue if font_candidates: # Preferuj najwyższy font z sensownym confidence best = max(font_candidates, key=lambda x: (x[1], x[2])) return round(best[0], 2), lines return 0.0, lines ############# END OCR ####################### # zabezpieczenie logowani do systemu - błędne hasła def is_ip_blocked(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() return len(attempts) >= MAX_ATTEMPTS def register_failed_attempt(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() attempts.append(now) def reset_failed_attempts(ip): failed_login_attempts[ip].clear() def attempts_remaining(ip): attempts = failed_login_attempts[ip] return max(0, MAX_ATTEMPTS - len(attempts)) #################################################### def get_client_ip(): # Obsługuje: X-Forwarded-For, X-Real-IP, fallback na remote_addr for header in ["X-Forwarded-For", "X-Real-IP"]: if header in request.headers: # Pierwszy IP w X-Forwarded-For jest najczęściej klientem ip = request.headers[header].split(",")[0].strip() if ip: return ip return request.remote_addr @login_manager.user_loader def load_user(user_id): # return User.query.get(int(user_id)) return db.session.get(User, int(user_id)) @app.context_processor def inject_time(): return dict(time=time) @app.context_processor def inject_has_authorized_cookie(): return {"has_authorized_cookie": "authorized" in request.cookies} @app.context_processor def inject_is_blocked(): ip = request.access_route[0] return {"is_blocked": is_ip_blocked(ip)} @app.before_request def require_system_password(): endpoint = request.endpoint # Wyjątki: lib js/css zawsze przepuszczamy if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"): return ip = request.access_route[0] if is_ip_blocked(ip): abort(403) if endpoint is None: return if endpoint in ("system_auth", "healthcheck"): return if ( "authorized" not in request.cookies and not endpoint.startswith("login") and endpoint != "favicon" ): # Dla serve_js przepuszczamy tylko toasts.js if endpoint == "static_bp.serve_js": requested_file = request.view_args.get("filename", "") if requested_file == "toasts.js": return if requested_file.endswith(".js"): return redirect(url_for("system_auth", next=request.url)) return # Blokujemy pozostałe static_bp if endpoint.startswith("static_bp."): return if request.path == "/": return redirect(url_for("system_auth")) parsed = urlparse(request.url) fixed_url = urlunparse(parsed._replace(netloc=request.host)) return redirect(url_for("system_auth", next=fixed_url)) @app.before_request def start_timer(): request._start_time = time.time() @app.after_request def log_request(response): if request.path == "/healthcheck": return response ip = get_client_ip() method = request.method path = request.path status = response.status_code length = response.content_length or "-" start = getattr(request, "_start_time", None) duration = round((time.time() - start) * 1000, 2) if start else "-" agent = request.headers.get("User-Agent", "-") log_msg = f"{ip} - \"{method} {path}\" {status} {length} {duration}ms \"{agent}\"" app.logger.info(log_msg) return response @app.template_filter("filemtime") def file_mtime_filter(path): try: t = os.path.getmtime(path) return datetime.fromtimestamp(t) except Exception: # return datetime.utcnow() return datetime.now(timezone.utc) @app.template_filter("filesizeformat") def filesizeformat_filter(path): try: size = os.path.getsize(path) for unit in ["B", "KB", "MB", "GB"]: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} TB" except Exception: return "N/A" @app.errorhandler(404) def page_not_found(e): return ( render_template( "errors.html", code=404, title="Strona nie znaleziona", message="Ups! Podana strona nie istnieje lub została przeniesiona.", ), 404, ) @app.errorhandler(403) def forbidden(e): return ( render_template( "errors.html", code=403, title="Brak dostępu", message=( e.description if e.description else "Nie masz uprawnień do wyświetlenia tej strony." ), ), 403, ) @app.route("/favicon.ico") def favicon_ico(): return redirect(url_for("static", filename="favicon.svg")) @app.route("/favicon.svg") def favicon(): svg = """ 🛒 """ return svg, 200, {"Content-Type": "image/svg+xml"} @app.route("/") def main_page(): now = datetime.now(timezone.utc) month_str = request.args.get("month") start = end = None if month_str: try: year, month = map(int, month_str.split("-")) start = datetime(year, month, 1, tzinfo=timezone.utc) end = (start + timedelta(days=31)).replace(day=1) except: start = end = None def date_filter(query): if start and end: query = query.filter( ShoppingList.created_at >= start, ShoppingList.created_at < end ) return query if current_user.is_authenticated: user_lists = ( date_filter( ShoppingList.query.filter_by( owner_id=current_user.id, is_archived=False ).filter( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ) ) .order_by(ShoppingList.created_at.desc()) .all() ) archived_lists = ( ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True) .order_by(ShoppingList.created_at.desc()) .all() ) public_lists = ( date_filter( ShoppingList.query.filter( ShoppingList.is_public == True, ShoppingList.owner_id != current_user.id, ( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ), ShoppingList.is_archived == False, ) ) .order_by(ShoppingList.created_at.desc()) .all() ) else: user_lists = [] archived_lists = [] public_lists = ( date_filter( ShoppingList.query.filter( ShoppingList.is_public == True, ( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ), ShoppingList.is_archived == False, ) ) .order_by(ShoppingList.created_at.desc()) .all() ) for l in user_lists + public_lists + archived_lists: enrich_list_data(l) return render_template( "main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists, now=now, timedelta=timedelta, ) @app.route("/system-auth", methods=["GET", "POST"]) def system_auth(): if ( current_user.is_authenticated or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE ): flash("Jesteś już zalogowany lub autoryzowany.", "info") return redirect(url_for("main_page")) ip = request.access_route[0] next_page = request.args.get("next") or url_for("main_page") if is_ip_blocked(ip): flash( "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", "danger", ) return render_template("system_auth.html"), 403 if request.method == "POST": if request.form["password"] == SYSTEM_PASSWORD: reset_failed_attempts(ip) resp = redirect(next_page) max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400) resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age) return resp else: register_failed_attempt(ip) if is_ip_blocked(ip): flash( "Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.", "danger", ) return render_template("system_auth.html"), 403 remaining = attempts_remaining(ip) flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning") return render_template("system_auth.html") @app.route("/toggle_archive_list/") @login_required def toggle_archive_list(list_id): # l = ShoppingList.query.get_or_404(list_id) l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: return redirect_with_flash("Nie masz uprawnień do tej listy", "danger") archive = request.args.get("archive", "true").lower() == "true" if archive: l.is_archived = True flash(f"Lista „{l.title}” została zarchiwizowana.", "success") else: l.is_archived = False flash(f"Lista „{l.title}” została przywrócona.", "success") db.session.commit() return redirect(url_for("main_page")) @app.route("/edit_my_list/", methods=["GET", "POST"]) @login_required def edit_my_list(list_id): receipts = ( Receipt.query.filter_by(list_id=list_id) .order_by(Receipt.uploaded_at.desc()) .all() ) l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: abort(403, description="Nie jesteś właścicielem tej listy.") if request.method == "POST": new_title = request.form.get("title", "").strip() is_public = "is_public" in request.form is_temporary = "is_temporary" in request.form is_archived = "is_archived" in request.form expires_date = request.form.get("expires_date") expires_time = request.form.get("expires_time") # Walidacja tytułu if not new_title: flash("Podaj poprawny tytuł", "danger") return redirect(url_for("edit_my_list", list_id=list_id)) l.title = new_title l.is_public = is_public l.is_temporary = is_temporary l.is_archived = is_archived # Obsługa daty wygaśnięcia if expires_date and expires_time: try: combined = f"{expires_date} {expires_time}" expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M") l.expires_at = expires_dt.replace(tzinfo=timezone.utc) except ValueError: flash("Błędna data lub godzina wygasania", "danger") return redirect(url_for("edit_my_list", list_id=list_id)) else: l.expires_at = None db.session.commit() flash("Zaktualizowano dane listy", "success") return redirect(url_for("main_page")) return render_template("edit_my_list.html", list=l, receipts=receipts) @app.route("/delete_user_list/", methods=["POST"]) @login_required def delete_user_list(list_id): l = db.session.get(ShoppingList, list_id) if l is None or l.owner_id != current_user.id: abort(403, description="Nie jesteś właścicielem tej listy.") l = db.session.get(ShoppingList, list_id) if l is None or l.owner_id != current_user.id: abort(403) delete_receipts_for_list(list_id) Item.query.filter_by(list_id=list_id).delete() Expense.query.filter_by(list_id=list_id).delete() db.session.delete(l) db.session.commit() flash("Lista została usunięta", "success") return redirect(url_for("main_page")) @app.route("/toggle_visibility/", methods=["GET", "POST"]) @login_required def toggle_visibility(list_id): # l = ShoppingList.query.get_or_404(list_id) l = db.session.get(ShoppingList, list_id) if l is None: abort(404) if l.owner_id != current_user.id: if request.is_json or request.method == "POST": return {"error": "Unauthorized"}, 403 flash("Nie masz uprawnień do tej listy", "danger") return redirect(url_for("main_page")) l.is_public = not l.is_public db.session.commit() share_url = f"{request.url_root}share/{l.share_token}" if request.is_json or request.method == "POST": return {"is_public": l.is_public, "share_url": share_url} if l.is_public: flash("Lista została udostępniona publicznie", "success") else: flash("Lista została ukryta przed gośćmi", "info") return redirect(url_for("main_page")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username_input = request.form["username"].lower() user = User.query.filter(func.lower(User.username) == username_input).first() if user and check_password_hash(user.password_hash, request.form["password"]): session.permanent = True login_user(user) flash("Zalogowano pomyślnie", "success") return redirect(url_for("main_page")) flash("Nieprawidłowy login lub hasło", "danger") return render_template("login.html") @app.route("/logout") @login_required def logout(): logout_user() flash("Wylogowano pomyślnie", "success") return redirect(url_for("main_page")) @app.route("/create", methods=["POST"]) @login_required def create_list(): title = request.form.get("title") is_temporary = request.form.get("temporary") == "1" token = generate_share_token(8) # expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None expires_at = ( datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None ) new_list = ShoppingList( title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at, ) db.session.add(new_list) db.session.commit() flash("Utworzono nową listę", "success") return redirect(url_for("view_list", list_id=new_list.id)) @app.route("/list/") @login_required def view_list(list_id): shopping_list, items, receipt_files, expenses, total_expense = get_list_details( list_id ) total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return render_template( "list.html", list=shopping_list, items=items, receipt_files=receipt_files, total_count=total_count, purchased_count=purchased_count, percent=percent, expenses=expenses, total_expense=total_expense, is_share=False, ) @app.route("/user_expenses") @login_required def user_expenses(): start_date_str = request.args.get("start_date") end_date_str = request.args.get("end_date") show_all = request.args.get("show_all", "false").lower() == "true" start = None end = None expenses_query = Expense.query.join( ShoppingList, Expense.list_id == ShoppingList.id ).options(joinedload(Expense.list)) # Jeśli show_all to False, filtruj tylko po bieżącym użytkowniku if not show_all: expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id) else: expenses_query = expenses_query.filter( or_( ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True ) ) if start_date_str and end_date_str: try: start = datetime.strptime(start_date_str, "%Y-%m-%d") end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1) expenses_query = expenses_query.filter( Expense.added_at >= start, Expense.added_at < end ) except ValueError: flash("Błędny zakres dat", "danger") expenses = expenses_query.order_by(Expense.added_at.desc()).all() list_ids = {e.list_id for e in expenses} lists = ( ShoppingList.query.filter(ShoppingList.id.in_(list_ids)) .order_by(ShoppingList.created_at.desc()) .all() ) expense_table = [ { "title": e.list.title if e.list else "Nieznana", "amount": e.amount, "added_at": e.added_at, } for e in expenses ] lists_data = [ { "id": l.id, "title": l.title, "created_at": l.created_at, "total_expense": sum( e.amount for e in l.expenses if (not start or not end) or (e.added_at >= start and e.added_at < end) ), "owner_username": l.owner.username if l.owner else "?", } for l in lists ] return render_template( "user_expenses.html", expense_table=expense_table, lists_data=lists_data, show_all=show_all, ) @app.route("/user_expenses_data") @login_required def user_expenses_data(): range_type = request.args.get("range", "monthly") start_date = request.args.get("start_date") end_date = request.args.get("end_date") show_all = request.args.get("show_all", "false").lower() == "true" query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id) if show_all: query = query.filter( or_( ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True ) ) else: query = query.filter(ShoppingList.owner_id == current_user.id) if start_date and end_date: try: start = datetime.strptime(start_date, "%Y-%m-%d") end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) query = query.filter(Expense.added_at >= start, Expense.added_at < end) except ValueError: return jsonify({"error": "Błędne daty"}), 400 expenses = query.all() grouped = defaultdict(float) for e in expenses: # ts = e.added_at or datetime.utcnow() ts = e.added_at or datetime.now(timezone.utc) if range_type == "monthly": key = ts.strftime("%Y-%m") elif range_type == "quarterly": key = f"{ts.year}-Q{((ts.month - 1) // 3) + 1}" elif range_type == "halfyearly": key = f"{ts.year}-H{1 if ts.month <= 6 else 2}" elif range_type == "yearly": key = str(ts.year) else: key = ts.strftime("%Y-%m-%d") grouped[key] += e.amount labels = sorted(grouped) data = [round(grouped[label], 2) for label in labels] return jsonify({"labels": labels, "expenses": data}) @app.route("/share/") @app.route("/guest-list/") def shared_list(token=None, list_id=None): if token: shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() if not check_list_public(shopping_list): return redirect(url_for("main_page")) list_id = shopping_list.id shopping_list, items, receipt_files, expenses, total_expense = get_list_details( list_id ) return render_template( "list_share.html", list=shopping_list, items=items, receipt_files=receipt_files, expenses=expenses, total_expense=total_expense, is_share=True, ) @app.route("/copy/") @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) token = generate_share_token(8) new_list = ShoppingList( title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token ) db.session.add(new_list) db.session.commit() original_items = Item.query.filter_by(list_id=original.id).all() for item in original_items: copy_item = Item(list_id=new_list.id, name=item.name) db.session.add(copy_item) db.session.commit() flash("Skopiowano listę", "success") return redirect(url_for("view_list", list_id=new_list.id)) @app.route("/suggest_products") def suggest_products(): query = request.args.get("q", "") suggestions = [] if query: suggestions = ( SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%")) .limit(5) .all() ) return {"suggestions": [s.name for s in suggestions]} @app.route("/all_products") def all_products(): query = request.args.get("q", "") top_products_query = SuggestedProduct.query if query: top_products_query = top_products_query.filter( SuggestedProduct.name.ilike(f"%{query}%") ) top_products = ( top_products_query.order_by( SuggestedProduct.name.asc(), # musi być pierwsze SuggestedProduct.usage_count.desc(), ) .distinct(SuggestedProduct.name) .limit(20) .all() ) top_names = [s.name for s in top_products] rest_query = SuggestedProduct.query if query: rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%")) if top_names: rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names)) rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all() all_names = top_names + [s.name for s in rest_products] seen = set() unique_names = [] for name in all_names: name_lower = name.strip().lower() if name_lower not in seen: unique_names.append(name) seen.add(name_lower) return {"allproducts": unique_names} @app.route("/upload_receipt/", methods=["POST"]) @login_required def upload_receipt(list_id): l = db.session.get(ShoppingList, list_id) # if l is None or l.owner_id != current_user.id: # return _receipt_error("Nie masz uprawnień do tej listy.") if "receipt" not in request.files: return _receipt_error("Brak pliku") file = request.files["receipt"] if file.filename == "": return _receipt_error("Nie wybrano pliku") if file and allowed_file(file.filename): file_bytes = file.read() file.seek(0) file_hash = hashlib.sha256(file_bytes).hexdigest() existing = Receipt.query.filter_by(file_hash=file_hash).first() if existing: return _receipt_error("Taki plik już istnieje") now = datetime.now(timezone.utc) timestamp = now.strftime("%Y%m%d_%H%M") random_part = secrets.token_hex(3) webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp" file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename) try: save_resized_image(file, file_path) except ValueError as e: return _receipt_error(str(e)) filesize = os.path.getsize(file_path) if os.path.exists(file_path) else None uploaded_at = datetime.now(timezone.utc) new_receipt = Receipt( list_id=list_id, filename=webp_filename, filesize=filesize, uploaded_at=uploaded_at, file_hash=file_hash, ) db.session.add(new_receipt) db.session.commit() if ( request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest" ): url = url_for("uploaded_file", filename=webp_filename) socketio.emit("receipt_added", {"url": url}, to=str(list_id)) return jsonify({"success": True, "url": url}) flash("Wgrano paragon", "success") return redirect(request.referrer or url_for("main_page")) return _receipt_error("Niedozwolony format pliku") @app.route("/uploads/") def uploaded_file(filename): response = send_from_directory(app.config["UPLOAD_FOLDER"], filename) response.headers["Cache-Control"] = app.config["UPLOADS_CACHE_CONTROL"] response.headers.pop("Pragma", None) response.headers.pop("Content-Disposition", None) mime, _ = mimetypes.guess_type(filename) if mime: response.headers["Content-Type"] = mime return response @app.route("/reorder_items", methods=["POST"]) @login_required def reorder_items(): data = request.get_json() list_id = data.get("list_id") order = data.get("order") for index, item_id in enumerate(order): item = db.session.get(Item, item_id) if item and item.list_id == list_id: item.position = index db.session.commit() socketio.emit( "items_reordered", {"list_id": list_id, "order": order}, to=str(list_id) ) return jsonify(success=True) @app.route("/rotate_receipt/") @login_required def rotate_receipt_user(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) list_obj = ShoppingList.query.get_or_404(receipt.list_id) if not (current_user.is_admin or current_user.id == list_obj.owner_id): flash("Brak uprawnień do tej operacji", "danger") return redirect(url_for("main_page")) try: rotate_receipt_by_id(receipt_id) flash("Obrócono paragon", "success") except FileNotFoundError: flash("Plik nie istnieje", "danger") except Exception as e: flash(f"Błąd przy obracaniu: {str(e)}", "danger") return redirect(request.referrer or url_for("main_page")) @app.route("/delete_receipt/") @login_required def delete_receipt_user(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) list_obj = ShoppingList.query.get_or_404(receipt.list_id) if not (current_user.is_admin or current_user.id == list_obj.owner_id): flash("Brak uprawnień do tej operacji", "danger") return redirect(url_for("main_page")) try: delete_receipt_by_id(receipt_id) flash("Paragon usunięty", "success") except Exception as e: flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger") return redirect(request.referrer or url_for("main_page")) # OCR @app.route("/lists//analyze", methods=["POST"]) @login_required def analyze_receipts_for_list(list_id): receipt_objs = Receipt.query.filter_by(list_id=list_id).all() existing_expenses = { e.receipt_filename for e in Expense.query.filter_by(list_id=list_id).all() if e.receipt_filename } results = [] total = 0.0 for receipt in receipt_objs: filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(filepath): continue try: raw_image = Image.open(filepath).convert("RGB") image = preprocess_image_for_tesseract(raw_image) value, lines = extract_total_tesseract(image) except Exception as e: print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}") value = 0.0 lines = [] already_added = receipt.filename in existing_expenses results.append( { "id": receipt.id, "filename": receipt.filename, "amount": round(value, 2), "debug_text": lines, "already_added": already_added, } ) if not already_added: total += value return jsonify({"results": results, "total": round(total, 2)}) @app.route("/admin") @login_required @admin_required def admin_panel(): now = datetime.now(timezone.utc) user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all() all_files = os.listdir(app.config["UPLOAD_FOLDER"]) enriched_lists = [] for l in all_lists: enrich_list_data(l) items = Item.query.filter_by(list_id=l.id).all() total_count = l.total_count purchased_count = l.purchased_count percent = (purchased_count / total_count * 100) if total_count > 0 else 0 comments_count = len([i for i in items if i.note and i.note.strip() != ""]) receipt_pattern = f"list_{l.id}" receipt_files = [f for f in all_files if receipt_pattern in f] # obliczenie czy wygasła if l.is_temporary and l.expires_at: expires_at = l.expires_at if expires_at.tzinfo is None: expires_at = expires_at.replace(tzinfo=timezone.utc) is_expired = expires_at < now else: is_expired = False enriched_lists.append( { "list": l, "total_count": total_count, "purchased_count": purchased_count, "percent": round(percent), "comments_count": comments_count, "receipts_count": len(receipt_files), "total_expense": l.total_expense, "expired": is_expired, } ) top_products = ( db.session.query(Item.name, func.count(Item.id).label("count")) .filter(Item.purchased.is_(True)) .group_by(Item.name) .order_by(func.count(Item.id).desc()) .limit(5) .all() ) purchased_items_count = Item.query.filter_by(purchased=True).count() total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0 current_time = datetime.now(timezone.utc) current_year = current_time.year current_month = current_time.month year_expense_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == current_year) .scalar() or 0 ) month_expense_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == current_year) .filter(extract("month", Expense.added_at) == current_month) .scalar() or 0 ) process = psutil.Process(os.getpid()) app_mem = process.memory_info().rss // (1024 * 1024) # MB # Engine info db_engine = db.engine db_info = { "engine": db_engine.name, "version": getattr(db_engine.dialect, "server_version_info", None), "url": str(db_engine.url).split("?")[0], } # Tabele inspector = inspect(db_engine) table_count = len(inspector.get_table_names()) # Rekordy (szybkie zliczenie) record_total = ( db.session.query(func.count(User.id)).scalar() + db.session.query(func.count(ShoppingList.id)).scalar() + db.session.query(func.count(Item.id)).scalar() + db.session.query(func.count(Receipt.id)).scalar() + db.session.query(func.count(Expense.id)).scalar() ) # Uptime uptime_minutes = int( (datetime.now(timezone.utc) - app_start_time).total_seconds() // 60 ) return render_template( "admin/admin_panel.html", user_count=user_count, list_count=list_count, item_count=item_count, purchased_items_count=purchased_items_count, enriched_lists=enriched_lists, top_products=top_products, total_expense_sum=total_expense_sum, year_expense_sum=year_expense_sum, month_expense_sum=month_expense_sum, now=now, python_version=sys.version, system_info=platform.platform(), app_memory=f"{app_mem} MB", db_info=db_info, table_count=table_count, record_total=record_total, uptime_minutes=uptime_minutes, ) @app.route("/admin/delete_list/") @login_required @admin_required def delete_list(list_id): delete_receipts_for_list(list_id) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() Expense.query.filter_by(list_id=list_to_delete.id).delete() db.session.delete(list_to_delete) db.session.commit() flash(f"Usunięto listę: {list_to_delete.title}", "success") return redirect(url_for("admin_panel")) @app.route("/admin/add_user", methods=["POST"]) @login_required @admin_required def add_user(): username = request.form["username"].lower() password = request.form["password"] if not username or not password: flash("Wypełnij wszystkie pola", "danger") return redirect(url_for("list_users")) if User.query.filter(func.lower(User.username) == username).first(): flash("Użytkownik o takiej nazwie już istnieje", "warning") return redirect(url_for("list_users")) hashed_password = generate_password_hash(password) new_user = User(username=username, password_hash=hashed_password) db.session.add(new_user) db.session.commit() flash("Dodano nowego użytkownika", "success") return redirect(url_for("list_users")) @app.route("/admin/users") @login_required @admin_required def list_users(): users = User.query.all() user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"] return render_template( "admin/user_management.html", users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log, ) @app.route("/admin/change_password/", methods=["POST"]) @login_required @admin_required def reset_password(user_id): user = User.query.get_or_404(user_id) new_password = request.form["password"] if not new_password: flash("Podaj nowe hasło", "danger") return redirect(url_for("list_users")) user.password_hash = generate_password_hash(new_password) db.session.commit() flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success") return redirect(url_for("list_users")) @app.route("/admin/delete_user/") @login_required @admin_required def delete_user(user_id): user = User.query.get_or_404(user_id) if user.is_admin: admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1: flash("Nie można usunąć ostatniego administratora.", "danger") return redirect(url_for("list_users")) db.session.delete(user) db.session.commit() flash("Użytkownik usunięty", "success") return redirect(url_for("list_users")) @app.route("/admin/receipts/") @login_required @admin_required def admin_receipts(id): try: if id == "all": receipts = Receipt.query.order_by(Receipt.uploaded_at.desc()).all() else: list_id = int(id) receipts = ( Receipt.query.filter_by(list_id=list_id) .order_by(Receipt.uploaded_at.desc()) .all() ) except ValueError: flash("Nieprawidłowe ID listy.", "danger") return redirect(url_for("admin_panel")) return render_template("admin/receipts.html", receipts=receipts) @app.route("/admin/rotate_receipt/") @login_required @admin_required def rotate_receipt(receipt_id): try: rotate_receipt_by_id(receipt_id) flash("Obrócono paragon", "success") except FileNotFoundError: flash("Plik nie istnieje", "danger") except Exception as e: flash(f"Błąd przy obracaniu: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/delete_receipt/") @login_required @admin_required def delete_receipt(receipt_id): try: delete_receipt_by_id(receipt_id) flash("Paragon usunięty", "success") except Exception as e: flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/rename_receipt/") @login_required @admin_required def rename_receipt(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(old_path): flash("Plik nie istnieje", "danger") return redirect(request.referrer) new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) try: os.rename(old_path, new_path) receipt.filename = new_filename db.session.commit() flash("Zmieniono nazwę pliku", "success") except Exception as e: flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger") return redirect(request.referrer or url_for("admin_receipts", id="all")) @app.route("/admin/generate_receipt_hash/") @login_required @admin_required def generate_receipt_hash(receipt_id): receipt = Receipt.query.get_or_404(receipt_id) if receipt.file_hash: flash("Hash już istnieje", "info") return redirect(request.referrer) file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) if not os.path.exists(file_path): flash("Plik nie istnieje", "danger") return redirect(request.referrer) try: with open(file_path, "rb") as f: file_hash = hashlib.sha256(f.read()).hexdigest() receipt.file_hash = file_hash db.session.commit() flash("Hash wygenerowany", "success") except Exception as e: flash(f"Błąd przy generowaniu hasha: {e}", "danger") return redirect(request.referrer) @app.route("/admin/delete_selected_lists", methods=["POST"]) @login_required @admin_required def delete_selected_lists(): ids = request.form.getlist("list_ids") for list_id in ids: # lst = ShoppingList.query.get(int(list_id)) lst = db.session.get(ShoppingList, int(list_id)) if lst: delete_receipts_for_list(lst.id) Item.query.filter_by(list_id=lst.id).delete() Expense.query.filter_by(list_id=lst.id).delete() db.session.delete(lst) db.session.commit() flash("Usunięto wybrane listy", "success") return redirect(url_for("admin_panel")) @app.route("/admin/edit_list/", methods=["GET", "POST"]) @login_required @admin_required def edit_list(list_id): l = db.session.get(ShoppingList, list_id) if l is None: abort(404) expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) users = User.query.all() items = ( db.session.query(Item).filter_by(list_id=list_id).order_by(Item.id.desc()).all() ) receipts = ( Receipt.query.filter_by(list_id=list_id) .order_by(Receipt.uploaded_at.desc()) .all() ) if request.method == "POST": action = request.form.get("action") if action == "save": new_title = request.form.get("title", "").strip() new_amount_str = request.form.get("amount") is_archived = "archived" in request.form is_public = "public" in request.form is_temporary = "temporary" in request.form new_owner_id = request.form.get("owner_id") expires_date = request.form.get("expires_date") expires_time = request.form.get("expires_time") if new_title: l.title = new_title l.is_archived = is_archived l.is_public = is_public l.is_temporary = is_temporary if expires_date and expires_time: try: combined_str = f"{expires_date} {expires_time}" dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M") l.expires_at = dt.replace(tzinfo=timezone.utc) except ValueError: flash("Niepoprawna data lub godzina wygasania", "danger") return redirect(url_for("edit_list", list_id=list_id)) else: l.expires_at = None if new_owner_id: try: new_owner_id_int = int(new_owner_id) user_obj = db.session.get(User, new_owner_id_int) if user_obj: l.owner_id = new_owner_id_int else: flash("Wybrany użytkownik nie istnieje", "danger") return redirect(url_for("edit_list", list_id=list_id)) except ValueError: flash("Niepoprawny ID użytkownika", "danger") return redirect(url_for("edit_list", list_id=list_id)) if new_amount_str: try: new_amount = float(new_amount_str) for expense in expenses: db.session.delete(expense) db.session.commit() db.session.add(Expense(list_id=list_id, amount=new_amount)) except ValueError: flash("Niepoprawna kwota", "danger") return redirect(url_for("edit_list", list_id=list_id)) db.session.add(l) db.session.commit() flash("Zapisano zmiany listy", "success") return redirect(url_for("edit_list", list_id=list_id)) elif action == "add_item": item_name = request.form.get("item_name", "").strip() quantity_str = request.form.get("quantity", "1") if not item_name: flash("Podaj nazwę produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) try: quantity = max(1, int(quantity_str)) except ValueError: quantity = 1 db.session.add( Item( list_id=list_id, name=item_name, quantity=quantity, added_by=current_user.id, ) ) exists = ( db.session.query(SuggestedProduct) .filter(func.lower(SuggestedProduct.name) == item_name.lower()) .first() ) if not exists: db.session.add(SuggestedProduct(name=item_name)) db.session.commit() flash("Dodano produkt", "success") return redirect(url_for("edit_list", list_id=list_id)) elif action == "delete_item": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: db.session.delete(item) db.session.commit() flash("Usunięto produkt", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "toggle_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.purchased = not item.purchased db.session.commit() flash("Zmieniono status oznaczenia produktu", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "mark_not_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.not_purchased = True item.purchased = False item.purchased_at = None db.session.commit() flash("Oznaczono produkt jako niekupione", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "unmark_not_purchased": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: item.not_purchased = False item.not_purchased_reason = None item.purchased = False item.purchased_at = None db.session.commit() flash("Przywrócono produkt do listy", "success") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) elif action == "edit_quantity": item = db.session.get(Item, request.form.get("item_id")) if item and item.list_id == list_id: try: new_quantity = int(request.form.get("quantity")) if new_quantity > 0: item.quantity = new_quantity db.session.commit() flash("Zmieniono ilość produktu", "success") except ValueError: flash("Nieprawidłowa ilość", "danger") else: flash("Nie znaleziono produktu", "danger") return redirect(url_for("edit_list", list_id=list_id)) return render_template( "admin/edit_list.html", list=l, total_expense=total_expense, users=users, items=items, receipts=receipts, upload_folder=app.config["UPLOAD_FOLDER"], ) @app.route("/admin/products") @login_required @admin_required def list_products(): items = Item.query.order_by(Item.id.desc()).all() # users = User.query.all() users = db.session.query(User).all() users_dict = {user.id: user.username for user in users} # Stabilne sortowanie sugestii suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all() suggestions_dict = {s.name.lower(): s for s in suggestions} return render_template( "admin/list_products.html", items=items, users_dict=users_dict, suggestions_dict=suggestions_dict, ) @app.route("/admin/sync_suggestion/", methods=["POST"]) @login_required def sync_suggestion_ajax(item_id): if not current_user.is_admin: return jsonify({"success": False, "message": "Brak uprawnień"}), 403 item = Item.query.get_or_404(item_id) existing = SuggestedProduct.query.filter( func.lower(SuggestedProduct.name) == item.name.lower() ).first() if not existing: new_suggestion = SuggestedProduct(name=item.name) db.session.add(new_suggestion) db.session.commit() return jsonify( { "success": True, "message": f"Utworzono sugestię dla produktu: {item.name}", } ) else: return jsonify( { "success": True, "message": f"Sugestia dla produktu „{item.name}” już istnieje.", } ) @app.route("/admin/delete_suggestion/", methods=["POST"]) @login_required def delete_suggestion_ajax(suggestion_id): if not current_user.is_admin: return jsonify({"success": False, "message": "Brak uprawnień"}), 403 suggestion = SuggestedProduct.query.get_or_404(suggestion_id) db.session.delete(suggestion) db.session.commit() return jsonify({"success": True, "message": "Sugestia została usunięta."}) @app.route("/admin/expenses_data") @login_required def admin_expenses_data(): if not current_user.is_admin: return jsonify({"error": "Brak uprawnień"}), 403 range_type = request.args.get("range", "monthly") start_date_str = request.args.get("start_date") end_date_str = request.args.get("end_date") # now = datetime.utcnow() now = datetime.now(timezone.utc) labels = [] expenses = [] if start_date_str and end_date_str: start_date = datetime.strptime(start_date_str, "%Y-%m-%d") end_date = datetime.strptime(end_date_str, "%Y-%m-%d") expenses_query = ( db.session.query( extract("year", Expense.added_at).label("year"), extract("month", Expense.added_at).label("month"), func.sum(Expense.amount).label("total"), ) .filter(Expense.added_at >= start_date, Expense.added_at <= end_date) .group_by("year", "month") .order_by("year", "month") .all() ) for row in expenses_query: label = f"{int(row.month):02d}/{int(row.year)}" labels.append(label) expenses.append(round(row.total, 2)) response = make_response(jsonify({"labels": labels, "expenses": expenses})) response.headers["Cache-Control"] = ( "no-store, no-cache, must-revalidate, max-age=0" ) return response if range_type == "monthly": for i in range(11, -1, -1): year = (now - timedelta(days=i * 30)).year month = (now - timedelta(days=i * 30)).month label = f"{month:02d}/{year}" labels.append(label) month_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == year) .filter(extract("month", Expense.added_at) == month) .scalar() or 0 ) expenses.append(round(month_sum, 2)) elif range_type == "quarterly": for i in range(3, -1, -1): quarter_start = now - timedelta(days=i * 90) year = quarter_start.year quarter = (quarter_start.month - 1) // 3 + 1 label = f"Q{quarter}/{year}" quarter_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == year) .filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter) .scalar() or 0 ) labels.append(label) expenses.append(round(quarter_sum, 2)) elif range_type == "halfyearly": for i in range(1, -1, -1): half_start = now - timedelta(days=i * 180) year = half_start.year half = 1 if half_start.month <= 6 else 2 label = f"H{half}/{year}" half_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == year) .filter( (extract("month", Expense.added_at) <= 6) if half == 1 else (extract("month", Expense.added_at) > 6) ) .scalar() or 0 ) labels.append(label) expenses.append(round(half_sum, 2)) elif range_type == "yearly": for i in range(4, -1, -1): year = now.year - i label = str(year) year_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract("year", Expense.added_at) == year) .scalar() or 0 ) labels.append(label) expenses.append(round(year_sum, 2)) response = make_response(jsonify({"labels": labels, "expenses": expenses})) response.headers["Cache-Control"] = "no-store, no-cache" return response @app.route("/admin/promote_user/") @login_required @admin_required def promote_user(user_id): user = User.query.get_or_404(user_id) user.is_admin = True db.session.commit() flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success") return redirect(url_for("list_users")) @app.route("/admin/demote_user/") @login_required @admin_required def demote_user(user_id): user = User.query.get_or_404(user_id) if user.id == current_user.id: flash("Nie możesz zdegradować samego siebie!", "danger") return redirect(url_for("list_users")) admin_count = User.query.filter_by(is_admin=True).count() if admin_count <= 1 and user.is_admin: flash( "Nie można zdegradować. Musi pozostać co najmniej jeden administrator.", "danger", ) return redirect(url_for("list_users")) user.is_admin = False db.session.commit() flash(f"Użytkownik {user.username} został zdegradowany.", "success") return redirect(url_for("list_users")) @app.route("/admin/crop_receipt", methods=["POST"]) @login_required @admin_required def crop_receipt(): receipt_id = request.form.get("receipt_id") file = request.files.get("cropped_image") if not receipt_id or not file: return jsonify(success=False, error="Brak danych") receipt = Receipt.query.get_or_404(receipt_id) old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename) try: new_filename = generate_new_receipt_filename(receipt.list_id) new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename) save_resized_image(file, new_path) if os.path.exists(old_path): os.remove(old_path) receipt.filename = os.path.basename(new_path) db.session.commit() return jsonify(success=True) except Exception as e: return jsonify(success=False, error=str(e)) @app.route("/admin/recalculate_filesizes") @login_required @admin_required def recalculate_filesizes(): updated = 0 not_found = 0 unchanged = 0 receipts = Receipt.query.all() for r in receipts: filepath = os.path.join(app.config["UPLOAD_FOLDER"], r.filename) if os.path.exists(filepath): real_size = os.path.getsize(filepath) if r.filesize != real_size: r.filesize = real_size updated += 1 else: unchanged += 1 else: not_found += 1 db.session.commit() flash( f"Zaktualizowano: {updated}, bez zmian: {unchanged}, brak pliku: {not_found}", "success", ) return redirect(url_for("admin_receipts", id="all")) @app.route("/healthcheck") def healthcheck(): header_token = request.headers.get("X-Internal-Check") correct_token = app.config.get("HEALTHCHECK_TOKEN") if header_token != correct_token: abort(404) return "OK", 200 @app.route("/robots.txt") def robots_txt(): if app.config.get("DISABLE_ROBOTS", False): return "User-agent: *\nDisallow: /", 200, {"Content-Type": "text/plain"} return "User-agent: *\nAllow: /", 200, {"Content-Type": "text/plain"} # ========================================================================================= # SOCKET.IO # ========================================================================================= @socketio.on("delete_item") def handle_delete_item(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) if item: list_id = item.list_id db.session.delete(item) db.session.commit() emit("item_deleted", {"item_id": item.id}, to=str(item.list_id)) purchased_count, total_count, percent = get_progress(list_id) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(list_id), ) @socketio.on("edit_item") def handle_edit_item(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) new_name = data["new_name"] new_quantity = data.get("new_quantity", item.quantity) if item and new_name.strip(): item.name = new_name.strip() try: new_quantity = int(new_quantity) if new_quantity < 1: new_quantity = 1 except: new_quantity = 1 item.quantity = new_quantity db.session.commit() emit( "item_edited", {"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity}, to=str(item.list_id), ) @socketio.on("join_list") def handle_join(data): global active_users room = str(data["room"]) username = data.get("username", "Gość") join_room(room) if room not in active_users: active_users[room] = set() active_users[room].add(username) # shopping_list = ShoppingList.query.get(int(data["room"])) shopping_list = db.session.get(ShoppingList, int(data["room"])) list_title = shopping_list.title if shopping_list else "Twoja lista" emit("user_joined", {"username": username}, to=room) emit("user_list", {"users": list(active_users[room])}, to=room) emit("joined_confirmation", {"room": room, "list_title": list_title}) @socketio.on("disconnect") def handle_disconnect(sid): global active_users username = current_user.username if current_user.is_authenticated else "Gość" for room, users in active_users.items(): if username in users: users.remove(username) emit("user_left", {"username": username}, to=room) emit("user_list", {"users": list(users)}, to=room) @socketio.on("add_item") def handle_add_item(data): list_id = data["list_id"] name = data["name"].strip() quantity = data.get("quantity", 1) list_obj = db.session.get(ShoppingList, list_id) if not list_obj: return try: quantity = int(quantity) if quantity < 1: quantity = 1 except: quantity = 1 existing_item = Item.query.filter( Item.list_id == list_id, func.lower(Item.name) == name.lower(), Item.not_purchased == False, ).first() if existing_item: existing_item.quantity += quantity db.session.commit() emit( "item_edited", { "item_id": existing_item.id, "new_name": existing_item.name, "new_quantity": existing_item.quantity, }, to=str(list_id), ) else: max_position = ( db.session.query(func.max(Item.position)) .filter_by(list_id=list_id) .scalar() ) if max_position is None: max_position = 0 user_id = current_user.id if current_user.is_authenticated else None user_name = current_user.username if current_user.is_authenticated else "Gość" new_item = Item( list_id=list_id, name=name, quantity=quantity, position=max_position + 1, added_by=user_id, ) db.session.add(new_item) if not SuggestedProduct.query.filter( func.lower(SuggestedProduct.name) == name.lower() ).first(): new_suggestion = SuggestedProduct(name=name) db.session.add(new_suggestion) db.session.commit() emit( "item_added", { "id": new_item.id, "name": new_item.name, "quantity": new_item.quantity, "added_by": user_name, "added_by_id": user_id, "owner_id": list_obj.owner_id, }, to=str(list_id), include_self=True, ) purchased_count, total_count, percent = get_progress(list_id) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(list_id), ) @socketio.on("check_item") def handle_check_item(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) if item: item.purchased = True # item.purchased_at = datetime.utcnow() item.purchased_at = datetime.now(UTC) db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit("item_checked", {"item_id": item.id}, to=str(item.list_id)) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(item.list_id), ) @socketio.on("uncheck_item") def handle_uncheck_item(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) if item: item.purchased = False item.purchased_at = None db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id)) emit( "progress_updated", { "purchased_count": purchased_count, "total_count": total_count, "percent": percent, }, to=str(item.list_id), ) @socketio.on("request_full_list") def handle_request_full_list(data): list_id = data["list_id"] shopping_list = db.session.get(ShoppingList, list_id) if not shopping_list: return owner_id = shopping_list.owner_id items = ( Item.query.options(joinedload(Item.added_by_user)) .filter_by(list_id=list_id) .order_by(Item.position.asc()) .all() ) items_data = [] for item in items: items_data.append( { "id": item.id, "name": item.name, "quantity": item.quantity, "purchased": item.purchased if not item.not_purchased else False, "not_purchased": item.not_purchased, "not_purchased_reason": item.not_purchased_reason, "note": item.note or "", "added_by": item.added_by_user.username if item.added_by_user else None, "added_by_id": item.added_by_user.id if item.added_by_user else None, "owner_id": owner_id, } ) emit("full_list", {"items": items_data}, to=request.sid) @socketio.on("update_note") def handle_update_note(data): item_id = data["item_id"] note = data["note"] item = Item.query.get(item_id) if item: item.note = note db.session.commit() emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id)) @socketio.on("add_expense") def handle_add_expense(data): list_id = data["list_id"] amount = data["amount"] receipt_filename = data.get("receipt_filename") if receipt_filename: existing = Expense.query.filter_by( list_id=list_id, receipt_filename=receipt_filename ).first() if existing: return new_expense = Expense( list_id=list_id, amount=amount, receipt_filename=receipt_filename ) db.session.add(new_expense) db.session.commit() total = ( db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0 ) emit("expense_added", {"amount": amount, "total": total}, to=str(list_id)) @socketio.on("mark_not_purchased") def handle_mark_not_purchased(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) reason = data.get("reason", "") if item: item.not_purchased = True item.not_purchased_reason = reason db.session.commit() emit( "item_marked_not_purchased", {"item_id": item.id, "reason": reason}, to=str(item.list_id), ) @socketio.on("unmark_not_purchased") def handle_unmark_not_purchased(data): # item = Item.query.get(data["item_id"]) item = db.session.get(Item, data["item_id"]) if item: item.not_purchased = False item.purchased = False item.purchased_at = None item.not_purchased_reason = None db.session.commit() emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id)) @app.cli.command("create_db") def create_db(): inspector = inspect(db.engine) expected_tables = set(db.Model.metadata.tables.keys()) actual_tables = set(inspector.get_table_names()) missing_tables = expected_tables - actual_tables extra_tables = actual_tables - expected_tables if missing_tables: print(f"Brakuje tabel: {', '.join(sorted(missing_tables))}") if extra_tables: print(f"Dodatkowe tabele w bazie: {', '.join(sorted(extra_tables))}") critical_error = False for table in expected_tables & actual_tables: expected_columns = set(c.name for c in db.Model.metadata.tables[table].columns) actual_columns = set(c["name"] for c in inspector.get_columns(table)) missing_cols = expected_columns - actual_columns extra_cols = actual_columns - expected_columns if missing_cols: print( f"Brakuje kolumn w tabeli '{table}': {', '.join(sorted(missing_cols))}" ) critical_error = True if extra_cols: print( f"Dodatkowe kolumny w tabeli '{table}': {', '.join(sorted(extra_cols))}" ) if missing_tables or critical_error: print("Struktura bazy jest niekompletna lub niezgodna. Przerwano.") return if not actual_tables: db.create_all() print("Utworzono strukturę bazy danych.") else: print("Struktura bazy danych jest poprawna.") if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO) socketio.run(app, host="0.0.0.0", port=8000, debug=DEBUG_MODE)