Files
lista_zakupowa_live/app.py
Mateusz Gruszczyński d15d83eea2 poprawki
2025-08-16 13:35:10 +02:00

3473 lines
106 KiB
Python

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
import hashlib
import re
import traceback
import bcrypt
import colorsys
from pillow_heif import register_heif_opener
from datetime import datetime, timedelta, UTC, timezone
from urllib.parse import urlparse, urlunparse
from flask import (
Flask,
render_template,
redirect,
url_for,
request,
flash,
Blueprint,
send_from_directory,
request,
abort,
session,
jsonify,
)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from config import Config
from PIL import Image, ExifTags, ImageFilter, ImageOps
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract, inspect, or_, case, text
from sqlalchemy.orm import joinedload, load_only, aliased
from collections import defaultdict, deque
from functools import wraps
# from flask_talisman import Talisman # import niżej pod warunkiem
from flask_session import Session
from types import SimpleNamespace
from pdf2image import convert_from_bytes
from urllib.parse import urlencode
# OCR
import pytesseract
from pytesseract import Output
import logging
from types import SimpleNamespace
app = Flask(__name__)
app.config.from_object(Config)
# Konfiguracja nagłówków bezpieczeństwa z .env
csp_policy = (
{
"default-src": "'self'",
"script-src": "'self' 'unsafe-inline'",
"style-src": "'self' 'unsafe-inline'",
"img-src": "'self' data:",
"connect-src": "'self'",
}
if app.config.get("ENABLE_CSP", True)
else None
)
permissions_policy = {"browsing-topics": "()"} if app.config.get("ENABLE_PP") else None
talisman_kwargs = {
"force_https": False,
"strict_transport_security": app.config.get("ENABLE_HSTS", True),
"frame_options": "DENY" if app.config.get("ENABLE_XFO", True) else None,
"permissions_policy": permissions_policy,
"content_security_policy": csp_policy,
"x_content_type_options": app.config.get("ENABLE_XCTO", True),
"strict_transport_security_include_subdomains": False,
}
referrer_policy = app.config.get("REFERRER_POLICY")
if referrer_policy:
talisman_kwargs["referrer_policy"] = referrer_policy
# jak naglowki wylaczone, nie ładuj talisman z pominięciem referrer_policy
effective_headers = {
k: v
for k, v in talisman_kwargs.items()
if k != "referrer_policy" and v not in (None, False)
}
if effective_headers:
from flask_talisman import Talisman
talisman = Talisman(
app,
session_cookie_secure=app.config.get("SESSION_COOKIE_SECURE", True),
**talisman_kwargs,
)
print("[TALISMAN] Włączony z nagłówkami:", list(effective_headers.keys()))
else:
print("[TALISMAN] Pominięty — wszystkie nagłówki security wyłączone.")
register_heif_opener() # pillow_heif dla HEIC
SQLALCHEMY_ECHO = True
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic", "pdf"}
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD")
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME")
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD")
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER")
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE")
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE")
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN")
SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES"))
SESSION_COOKIE_SECURE = app.config.get("SESSION_COOKIE_SECURE")
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
DEBUG_MODE = app.config.get("DEBUG_MODE", False)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
WEBP_SAVE_PARAMS = {
"format": "WEBP",
"lossless": True, # lub False jeśli chcesz używać quality
"method": 6,
# "quality": 95, # tylko jeśli lossless=False
}
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = "login"
# flask-session
app.config["SESSION_TYPE"] = "sqlalchemy"
app.config["SESSION_SQLALCHEMY"] = db
Session(app)
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint("static_bp", __name__)
# dla live
active_users = {}
def utcnow():
return datetime.now(timezone.utc)
app_start_time = utcnow()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(512), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
# Tabela pośrednia
shopping_list_category = db.Table(
"shopping_list_category",
db.Column(
"shopping_list_id",
db.Integer,
db.ForeignKey("shopping_list.id"),
primary_key=True,
),
db.Column(
"category_id", db.Integer, db.ForeignKey("category.id"), primary_key=True
),
)
class Category(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
owner = db.relationship("User", backref="lists", foreign_keys=[owner_id])
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
# Relacje
items = db.relationship("Item", back_populates="shopping_list", lazy="select")
receipts = db.relationship("Receipt", back_populates="shopping_list", lazy="select")
expenses = db.relationship("Expense", back_populates="shopping_list", lazy="select")
# Nowa relacja wiele-do-wielu
categories = db.relationship(
"Category",
secondary=shopping_list_category,
backref=db.backref("shopping_lists", lazy="dynamic"),
)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
name = db.Column(db.String(150), nullable=False)
# added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_at = db.Column(db.DateTime, default=utcnow)
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
added_by_user = db.relationship(
"User", backref="added_items", lazy="joined", foreign_keys=[added_by]
)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
not_purchased = db.Column(db.Boolean, default=False)
not_purchased_reason = db.Column(db.Text, nullable=True)
position = db.Column(db.Integer, default=0)
shopping_list = db.relationship("ShoppingList", back_populates="items")
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
shopping_list = db.relationship("ShoppingList", back_populates="expenses")
class Receipt(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False)
filename = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
shopping_list = db.relationship("ShoppingList", back_populates="receipts")
def hash_password(password):
pepper = app.config["BCRYPT_PEPPER"]
peppered = (password + pepper).encode("utf-8")
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(peppered, salt)
return hashed.decode("utf-8")
def check_password(stored_hash, password_input):
pepper = app.config["BCRYPT_PEPPER"]
peppered = (password_input + pepper).encode("utf-8")
if stored_hash.startswith("$2b$") or stored_hash.startswith("$2a$"):
try:
return bcrypt.checkpw(peppered, stored_hash.encode("utf-8"))
except Exception:
return False
return False
def set_authorized_cookie(response):
secure_flag = app.config["SESSION_COOKIE_SECURE"]
max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
response.set_cookie(
"authorized",
AUTHORIZED_COOKIE_VALUE,
max_age=max_age,
secure=secure_flag,
httponly=True,
)
return response
if app.config["SQLALCHEMY_DATABASE_URI"].startswith("sqlite:///"):
db_path = app.config["SQLALCHEMY_DATABASE_URI"].replace("sqlite:///", "", 1)
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
print(f"Utworzono katalog bazy: {db_dir}")
with app.app_context():
db.create_all()
# --- Tworzenie admina ---
admin_username = DEFAULT_ADMIN_USERNAME
admin_password = DEFAULT_ADMIN_PASSWORD
password_hash = hash_password(admin_password)
admin = User.query.filter_by(username=admin_username).first()
if admin:
if not admin.is_admin:
admin.is_admin = True
if not check_password(admin.password_hash, admin_password):
admin.password_hash = password_hash
print(f"[INFO] Zmieniono hasło admina '{admin_username}' z konfiguracji.")
db.session.commit()
else:
db.session.add(
User(username=admin_username, password_hash=password_hash, is_admin=True)
)
db.session.commit()
default_categories = app.config["DEFAULT_CATEGORIES"]
existing_names = {
c.name for c in Category.query.filter(Category.name.isnot(None)).all()
}
existing_names_lower = {name.lower() for name in existing_names}
missing = [
cat for cat in default_categories if cat.lower() not in existing_names_lower
]
if missing:
db.session.add_all(Category(name=cat) for cat in missing)
db.session.commit()
print(f"[INFO] Dodano brakujące kategorie: {', '.join(missing)}")
# else:
# print("[INFO] Wszystkie domyślne kategorie już istnieją")
@static_bp.route("/static/js/<path:filename>")
def serve_js(filename):
response = send_from_directory("static/js", filename)
response.headers["Cache-Control"] = app.config["JS_CACHE_CONTROL"]
response.headers.pop("Content-Disposition", None)
return response
@static_bp.route("/static/css/<path:filename>")
def serve_css(filename):
response = send_from_directory("static/css", filename)
response.headers["Cache-Control"] = app.config["CSS_CACHE_CONTROL"]
response.headers.pop("Content-Disposition", None)
return response
@static_bp.route("/static/lib/js/<path:filename>")
def serve_js_lib(filename):
response = send_from_directory("static/lib/js", filename)
response.headers["Cache-Control"] = app.config["LIB_JS_CACHE_CONTROL"]
response.headers.pop("Content-Disposition", None)
return response
@static_bp.route("/static/lib/css/<path:filename>")
def serve_css_lib(filename):
response = send_from_directory("static/lib/css", filename)
response.headers["Cache-Control"] = app.config["LIB_CSS_CACHE_CONTROL"]
response.headers.pop("Content-Disposition", None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.options(
joinedload(ShoppingList.items).joinedload(Item.added_by_user),
joinedload(ShoppingList.expenses),
joinedload(ShoppingList.receipts),
).get_or_404(list_id)
items = sorted(shopping_list.items, key=lambda i: i.position or 0)
expenses = shopping_list.expenses
total_expense = sum(e.amount for e in expenses) if expenses else 0
receipt_files = [r.filename for r in shopping_list.receipts]
return shopping_list, items, receipt_files, expenses, total_expense
def get_total_expense_for_list(list_id, start_date=None, end_date=None):
query = db.session.query(func.sum(Expense.amount)).filter(
Expense.list_id == list_id
)
if start_date and end_date:
query = query.filter(
Expense.added_at >= start_date, Expense.added_at < end_date
)
return query.scalar() or 0
def update_list_categories_from_form(shopping_list, form):
category_ids = form.getlist("categories")
shopping_list.categories.clear()
if category_ids:
cats = Category.query.filter(Category.id.in_(category_ids)).all()
shopping_list.categories.extend(cats)
def generate_share_token(length=8):
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash("Ta lista nie jest publicznie dostępna", "danger")
return False
return True
def enrich_list_data(l):
counts = (
db.session.query(
func.count(Item.id),
func.sum(case((Item.purchased == True, 1), else_=0)),
func.sum(Expense.amount),
)
.outerjoin(Expense, Expense.list_id == Item.list_id)
.filter(Item.list_id == l.id)
.first()
)
l.total_count = counts[0] or 0
l.purchased_count = counts[1] or 0
l.total_expense = counts[2] or 0
return l
def get_total_records():
total = 0
inspector = inspect(db.engine)
with db.engine.connect() as conn:
for table_name in inspector.get_table_names():
count = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}")).scalar()
total += count
return total
def save_resized_image(file, path):
try:
image = Image.open(file)
image.verify()
file.seek(0)
image = Image.open(file)
except Exception:
raise ValueError("Nieprawidłowy plik graficzny")
try:
image = ImageOps.exif_transpose(image)
except Exception:
pass
try:
image.thumbnail((2000, 2000))
image = image.convert("RGB")
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
image.save(new_path, **WEBP_SAVE_PARAMS)
except Exception as e:
raise ValueError(f"Błąd podczas przetwarzania obrazu: {e}")
def redirect_with_flash(
message: str, category: str = "info", endpoint: str = "main_page"
):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger")
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
total_count, purchased_count = (
db.session.query(
func.count(Item.id), func.sum(case((Item.purchased == True, 1), else_=0))
)
.filter(Item.list_id == list_id)
.first()
)
total_count = total_count or 0
purchased_count = purchased_count or 0
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config["UPLOAD_FOLDER"]
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
def receipt_error(message):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"success": False, "error": message}), 400
flash(message, "danger")
return redirect(request.referrer or url_for("main_page"))
def rotate_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
raise FileNotFoundError("Plik nie istnieje")
image = Image.open(old_path)
rotated = image.rotate(-90, expand=True)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
rotated.save(new_path, **WEBP_SAVE_PARAMS)
os.remove(old_path)
receipt.filename = new_filename
db.session.commit()
return receipt
def delete_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if os.path.exists(filepath):
os.remove(filepath)
db.session.delete(receipt)
db.session.commit()
return receipt
def generate_new_receipt_filename(list_id):
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
return f"list_{list_id}_{timestamp}_{random_part}.webp"
def handle_crop_receipt(receipt_id, file):
if not receipt_id or not file:
return {"success": False, "error": "Brak danych"}
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
try:
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
save_resized_image(file, new_path)
if os.path.exists(old_path):
os.remove(old_path)
receipt.filename = os.path.basename(new_path)
db.session.commit()
recalculate_filesizes(receipt.id)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
def get_total_expenses_grouped_by_list_created_at(
user_only=False,
admin=False,
show_all=False,
range_type="monthly",
start_date=None,
end_date=None,
user_id=None,
category_id=None,
):
lists_query = ShoppingList.query
if admin:
pass
elif show_all:
lists_query = lists_query.filter(
or_(
ShoppingList.owner_id == user_id,
ShoppingList.is_public == True,
)
)
else:
lists_query = lists_query.filter(ShoppingList.owner_id == user_id)
if category_id:
if str(category_id) == "none":
lists_query = lists_query.filter(~ShoppingList.categories.any())
else:
try:
cat_id_int = int(category_id)
except ValueError:
return {"labels": [], "expenses": []}
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cat_id_int)
today = datetime.now(timezone.utc).date()
if range_type == "last30days":
dt_start = today - timedelta(days=29)
dt_end = today + timedelta(days=1)
start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime(
"%Y-%m-%d"
)
elif range_type == "currentmonth":
dt_start = today.replace(day=1)
dt_end = today + timedelta(days=1)
start_date, end_date = dt_start.strftime("%Y-%m-%d"), dt_end.strftime(
"%Y-%m-%d"
)
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d")
if dt_end.tzinfo is None:
dt_end = dt_end.replace(tzinfo=timezone.utc)
dt_end += timedelta(days=1)
except Exception:
return {"error": "Błędne daty", "labels": [], "expenses": []}
lists_query = lists_query.filter(
ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end
)
lists = lists_query.all()
if not lists:
return {"labels": [], "expenses": []}
list_ids = [l.id for l in lists]
total_expenses = (
db.session.query(
Expense.list_id, func.sum(Expense.amount).label("total_amount")
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
expense_map = {lid: amt for lid, amt in total_expenses}
grouped = defaultdict(float)
for sl in lists:
if sl.id in expense_map:
ts = sl.created_at or datetime.now(timezone.utc)
if range_type in ("last30days", "currentmonth"):
key = ts.strftime("%Y-%m-%d") # dzienny widok
elif range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
grouped[key] += expense_map[sl.id]
labels = sorted(grouped)
expenses = [round(grouped[l], 2) for l in labels]
return {"labels": labels, "expenses": expenses}
def recalculate_filesizes(receipt_id: int = None):
updated = 0
not_found = 0
unchanged = 0
if receipt_id is not None:
receipt = db.session.get(Receipt, receipt_id)
receipts = [receipt] if receipt else []
else:
receipts = db.session.execute(db.select(Receipt)).scalars().all()
for r in receipts:
if not r:
continue
filepath = os.path.join(app.config["UPLOAD_FOLDER"], r.filename)
if os.path.exists(filepath):
real_size = os.path.getsize(filepath)
if r.filesize != real_size:
r.filesize = real_size
updated += 1
else:
unchanged += 1
else:
not_found += 1
db.session.commit()
return updated, unchanged, not_found
def get_admin_expense_summary():
now = datetime.now(timezone.utc)
current_year = now.year
current_month = now.month
def calc_sum(base_query):
total = base_query.scalar() or 0
year_total = (
base_query.filter(
extract("year", ShoppingList.created_at) == current_year
).scalar()
or 0
)
month_total = (
base_query.filter(extract("year", ShoppingList.created_at) == current_year)
.filter(extract("month", ShoppingList.created_at) == current_month)
.scalar()
or 0
)
return {"total": total, "year": year_total, "month": month_total}
base = db.session.query(func.sum(Expense.amount)).join(
ShoppingList, ShoppingList.id == Expense.list_id
)
all_lists = calc_sum(base)
active_lists = calc_sum(
base.filter(
ShoppingList.is_archived == False,
~(
(ShoppingList.is_temporary == True)
& (ShoppingList.expires_at != None)
& (ShoppingList.expires_at <= now)
),
)
)
archived_lists = calc_sum(base.filter(ShoppingList.is_archived == True))
expired_lists = calc_sum(
base.filter(
ShoppingList.is_archived == False,
(ShoppingList.is_temporary == True),
(ShoppingList.expires_at != None),
(ShoppingList.expires_at <= now),
)
)
return {
"all": all_lists,
"active": active_lists,
"archived": archived_lists,
"expired": expired_lists,
}
def category_to_color(name):
hash_val = int(hashlib.md5(name.encode("utf-8")).hexdigest(), 16)
hue = (hash_val % 360) / 360.0
saturation = 0.60 + ((hash_val >> 8) % 17) / 100.0
lightness = 0.28 + ((hash_val >> 16) % 11) / 100.0
r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
return f"#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}"
def get_total_expenses_grouped_by_category(
show_all, range_type, start_date, end_date, user_id, category_id=None
):
lists_query = ShoppingList.query
if show_all:
lists_query = lists_query.filter(
or_(ShoppingList.owner_id == user_id, ShoppingList.is_public == True)
)
else:
lists_query = lists_query.filter(ShoppingList.owner_id == user_id)
if category_id:
if str(category_id) == "none":
lists_query = lists_query.filter(~ShoppingList.categories.any())
else:
try:
cat_id_int = int(category_id)
except ValueError:
return {"labels": [], "datasets": []}
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == cat_id_int)
if start_date and end_date:
try:
dt_start = datetime.strptime(start_date, "%Y-%m-%d")
dt_end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
except Exception:
return {"error": "Błędne daty"}
lists_query = lists_query.filter(
ShoppingList.created_at >= dt_start, ShoppingList.created_at < dt_end
)
lists = lists_query.options(joinedload(ShoppingList.categories)).all()
if not lists:
return {"labels": [], "datasets": []}
data_map = defaultdict(lambda: defaultdict(float))
all_labels = set()
for l in lists:
total_expense = (
db.session.query(func.sum(Expense.amount))
.filter(Expense.list_id == l.id)
.scalar()
) or 0
if total_expense <= 0:
continue
if range_type == "monthly":
key = l.created_at.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{l.created_at.year}-Q{((l.created_at.month - 1) // 3 + 1)}"
elif range_type == "halfyearly":
key = f"{l.created_at.year}-H{1 if l.created_at.month <= 6 else 2}"
elif range_type == "yearly":
key = str(l.created_at.year)
else:
key = l.created_at.strftime("%Y-%m-%d")
all_labels.add(key)
if str(category_id) == "none":
if not l.categories:
data_map[key]["Bez kategorii"] += total_expense
continue
if not l.categories:
data_map[key]["Bez kategorii"] += total_expense
else:
for c in l.categories:
if category_id and str(c.id) != str(category_id):
continue
data_map[key][c.name] += total_expense
labels = sorted(all_labels)
categories_with_expenses = sorted(
{
cat
for cat_data in data_map.values()
for cat, value in cat_data.items()
if value > 0
}
)
datasets = []
for cat in categories_with_expenses:
datasets.append(
{
"label": cat,
"data": [round(data_map[label].get(cat, 0), 2) for label in labels],
"backgroundColor": category_to_color(cat),
}
)
return {"labels": labels, "datasets": datasets}
def save_pdf_as_webp(file, path):
try:
images = convert_from_bytes(file.read(), dpi=300)
if not images:
raise ValueError("Nie udało się przekonwertować PDF na obraz.")
total_height = sum(img.height for img in images)
max_width = max(img.width for img in images)
combined = Image.new("RGB", (max_width, total_height), (255, 255, 255))
y_offset = 0
for img in images:
combined.paste(img, (0, y_offset))
y_offset += img.height
combined.thumbnail((2000, 20000))
new_path = path.rsplit(".", 1)[0] + ".webp"
combined.save(new_path, **WEBP_SAVE_PARAMS)
except Exception as e:
raise ValueError(f"Błąd podczas przetwarzania PDF: {e}")
def get_active_months_query(visible_lists_query=None):
if db.engine.name == "sqlite":
month_col = func.strftime("%Y-%m", ShoppingList.created_at)
else:
month_col = func.to_char(ShoppingList.created_at, "YYYY-MM")
query = db.session.query(month_col.label("month"))
if visible_lists_query is not None:
query = query.select_from(visible_lists_query.subquery())
active_months = (
query.filter(ShoppingList.created_at != None).distinct().order_by("month").all()
)
return [row.month for row in active_months]
def normalize_name(name):
if not name:
return ""
return re.sub(r'\s+', ' ', name).strip().lower()
############# OCR ###########################
def preprocess_image_for_tesseract(image):
image = ImageOps.autocontrast(image)
image = image.point(lambda x: 0 if x < 150 else 255)
image = image.resize((image.width * 2, image.height * 2), Image.BICUBIC)
return image
def extract_total_tesseract(image):
text = pytesseract.image_to_string(image, lang="pol", config="--psm 4")
lines = text.splitlines()
candidates = []
blacklist_keywords = re.compile(r"\b(ptu|vat|podatek|stawka)\b", re.IGNORECASE)
priority_keywords = re.compile(
r"""
\b(
razem\s*do\s*zap[łl][aąo0]ty |
do\s*zap[łl][aąo0]ty |
suma |
kwota |
warto[śćs] |
płatno[śćs] |
total |
amount
)\b
""",
re.IGNORECASE | re.VERBOSE,
)
for line in lines:
if not line.strip():
continue
if blacklist_keywords.search(line):
continue
is_priority = priority_keywords.search(line)
matches = re.findall(r"\d{1,4}[.,]\d{2}", line)
for match in matches:
try:
val = float(match.replace(",", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, is_priority is not None))
except:
continue
if is_priority:
spaced = re.findall(r"\d{1,4}\s\d{2}", line)
for match in spaced:
try:
val = float(match.replace(" ", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, True))
except:
continue
preferred = [(val, line) for val, line, is_pref in candidates if is_pref]
if preferred:
best_val = max(preferred, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
if candidates:
best_val = max(candidates, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
data = pytesseract.image_to_data(
image, lang="pol", config="--psm 4", output_type=Output.DICT
)
font_candidates = []
for i in range(len(data["text"])):
word = data["text"][i].strip()
if not word or not re.match(r"^\d{1,5}[.,\s]\d{2}$", word):
continue
try:
val = float(word.replace(",", ".").replace(" ", "."))
height = data["height"][i]
conf = int(data.get("conf", ["0"] * len(data["text"]))[i])
if 0.1 <= val <= 100000:
font_candidates.append((val, height, conf))
except:
continue
if font_candidates:
best = max(font_candidates, key=lambda x: (x[1], x[2]))
return round(best[0], 2), lines
return 0.0, lines
############# END OCR #######################
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
def get_client_ip():
for header in ["X-Forwarded-For", "X-Real-IP"]:
if header in request.headers:
ip = request.headers[header].split(",")[0].strip()
if ip:
return ip
return request.remote_addr
@login_manager.user_loader
def load_user(user_id):
return db.session.get(User, int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {"has_authorized_cookie": "authorized" in request.cookies}
@app.context_processor
def inject_is_blocked():
ip = request.access_route[0]
return {"is_blocked": is_ip_blocked(ip)}
@app.before_request
def require_system_password():
endpoint = request.endpoint
if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"):
return
ip = request.access_route[0]
if is_ip_blocked(ip):
abort(403)
if endpoint is None:
return
if endpoint in ("system_auth", "healthcheck", "robots_txt"):
return
if (
"authorized" not in request.cookies
and not endpoint.startswith("login")
and endpoint != "favicon"
):
if endpoint == "static_bp.serve_js":
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for("system_auth", next=request.url))
return
if endpoint.startswith("static_bp."):
return
if request.path == "/":
return redirect(url_for("system_auth"))
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for("system_auth", next=fixed_url))
@app.before_request
def start_timer():
request._start_time = time.time()
@app.after_request
def log_request(response):
if request.path == "/healthcheck":
return response
ip = get_client_ip()
method = request.method
path = request.path
status = response.status_code
length = response.content_length or "-"
start = getattr(request, "_start_time", None)
duration = round((time.time() - start) * 1000, 2) if start else "-"
agent = request.headers.get("User-Agent", "-")
if status == 304:
app.logger.info(
f'REVALIDATED: {ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"'
)
else:
app.logger.info(
f'{ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"'
)
app.logger.debug(f"Request headers: {dict(request.headers)}")
app.logger.debug(f"Response headers: {dict(response.headers)}")
return response
@app.template_filter("filemtime")
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
# return datetime.utcnow()
return datetime.now(timezone.utc)
@app.template_filter("todatetime")
def to_datetime_filter(s):
return datetime.strptime(s, "%Y-%m-%d")
@app.template_filter("filesizeformat")
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return (
render_template(
"errors.html",
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona.",
),
404,
)
@app.errorhandler(403)
def forbidden(e):
return (
render_template(
"errors.html",
code=403,
title="Brak dostępu",
message=(
e.description
if e.description
else "Nie masz uprawnień do wyświetlenia tej strony."
),
),
403,
)
@app.route("/favicon.ico")
def favicon_ico():
return redirect(url_for("static", filename="favicon.svg"))
@app.route("/favicon.svg")
def favicon():
svg = """
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
"""
return svg, 200, {"Content-Type": "image/svg+xml"}
@app.route("/")
def main_page():
now = datetime.now(timezone.utc)
month_param = request.args.get("m", None)
start = end = None
if month_param in (None, ""):
# brak wyboru -> domyślnie aktualny miesiąc
month_str = now.strftime("%Y-%m")
start = datetime(now.year, now.month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
elif month_param == "all":
month_str = "all"
start = end = None
else:
month_str = month_param
try:
year, month = map(int, month_str.split("-"))
start = datetime(year, month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
except ValueError:
start = end = None
# dalej normalnie używasz date_filter:
def date_filter(query):
if start and end:
query = query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
return query
def date_filter(query):
if start and end:
query = query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
return query
if current_user.is_authenticated:
user_lists = (
date_filter(
ShoppingList.query.filter_by(
owner_id=current_user.id, is_archived=False
).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
archived_lists = (
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True)
.order_by(ShoppingList.created_at.desc())
.all()
)
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
else:
user_lists = []
archived_lists = []
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
# Definiujemy widoczny zakres list dla tego użytkownika
if current_user.is_authenticated:
visible_lists_query = ShoppingList.query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
else:
visible_lists_query = ShoppingList.query.filter(ShoppingList.is_public == True)
# Teraz możemy bezpiecznie pobrać miesiące
month_options = get_active_months_query(visible_lists_query)
all_lists = user_lists + public_lists + archived_lists
all_ids = [l.id for l in all_lists]
if all_ids:
stats = (
db.session.query(
Item.list_id,
func.count(Item.id).label("total_count"),
func.sum(case((Item.purchased == True, 1), else_=0)).label(
"purchased_count"
),
func.sum(case((Item.not_purchased == True, 1), else_=0)).label(
"not_purchased_count"
),
)
.filter(Item.list_id.in_(all_ids))
.group_by(Item.list_id)
.all()
)
stats_map = {
s.list_id: (
s.total_count or 0,
s.purchased_count or 0,
s.not_purchased_count or 0,
)
for s in stats
}
latest_expenses_map = dict(
db.session.query(
Expense.list_id, func.coalesce(func.sum(Expense.amount), 0)
)
.filter(Expense.list_id.in_(all_ids))
.group_by(Expense.list_id)
.all()
)
for l in all_lists:
total_count, purchased_count, not_purchased_count = stats_map.get(
l.id, (0, 0, 0)
)
l.total_count = total_count
l.purchased_count = purchased_count
l.not_purchased_count = not_purchased_count
l.total_expense = latest_expenses_map.get(l.id, 0)
l.category_badges = [
{"name": c.name, "color": category_to_color(c.name)}
for c in l.categories
]
else:
for l in all_lists:
l.total_count = 0
l.purchased_count = 0
l.not_purchased_count = 0
l.total_expense = 0
l.category_badges = []
return render_template(
"main.html",
user_lists=user_lists,
public_lists=public_lists,
archived_lists=archived_lists,
now=now,
timedelta=timedelta,
month_options=month_options,
selected_month=month_str,
)
@app.route("/system-auth", methods=["GET", "POST"])
def system_auth():
if (
current_user.is_authenticated
or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE
):
flash("Jesteś już zalogowany lub autoryzowany.", "info")
return redirect(url_for("main_page"))
ip = request.access_route[0]
next_page = request.args.get("next") or url_for("main_page")
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
if request.method == "POST":
if request.form["password"] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
return set_authorized_cookie(resp)
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
remaining = attempts_remaining(ip)
flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning")
return render_template("system_auth.html")
@app.route("/toggle_archive_list/<int:list_id>")
@login_required
def toggle_archive_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
archive = request.args.get("archive", "true").lower() == "true"
if archive:
l.is_archived = True
flash(f"Lista „{l.title}” została zarchiwizowana.", "success")
else:
l.is_archived = False
flash(f"Lista „{l.title}” została przywrócona.", "success")
db.session.commit()
return redirect(url_for("main_page"))
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
@login_required
def edit_my_list(list_id):
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
categories = Category.query.order_by(Category.name.asc()).all()
selected_categories_ids = {c.id for c in l.categories}
next_page = request.args.get("next") or request.referrer
if request.method == "POST":
move_to_month = request.form.get("move_to_month")
if move_to_month:
try:
year, month = map(int, move_to_month.split("-"))
new_created_at = datetime(year, month, 1, tzinfo=timezone.utc)
l.created_at = new_created_at
db.session.commit()
flash(
f"Zmieniono datę utworzenia listy na {new_created_at.strftime('%Y-%m-%d')}",
"success",
)
return redirect(next_page or url_for("main_page"))
except ValueError:
flash("Nieprawidłowy format miesiąca", "danger")
return redirect(next_page or url_for("main_page"))
new_title = request.form.get("title", "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if not new_title:
flash("Podaj poprawny tytuł", "danger")
return redirect(next_page or url_for("main_page"))
l.title = new_title
l.is_public = is_public
l.is_temporary = is_temporary
l.is_archived = is_archived
if expires_date and expires_time:
try:
combined = f"{expires_date} {expires_time}"
expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
l.expires_at = expires_dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Błędna data lub godzina wygasania", "danger")
return redirect(next_page or url_for("main_page"))
else:
l.expires_at = None
update_list_categories_from_form(l, request.form)
db.session.commit()
flash("Zaktualizowano dane listy", "success")
return redirect(next_page or url_for("main_page"))
return render_template(
"edit_my_list.html",
list=l,
receipts=receipts,
categories=categories,
selected_categories=selected_categories_ids,
)
@app.route("/delete_user_list/<int:list_id>", methods=["POST"])
@login_required
def delete_user_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403)
delete_receipts_for_list(list_id)
Item.query.filter_by(list_id=list_id).delete()
Expense.query.filter_by(list_id=list_id).delete()
db.session.delete(l)
db.session.commit()
flash("Lista została usunięta", "success")
return redirect(url_for("main_page"))
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
@login_required
def toggle_visibility(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
if request.is_json or request.method == "POST":
return {"error": "Unauthorized"}, 403
flash("Nie masz uprawnień do tej listy", "danger")
return redirect(url_for("main_page"))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == "POST":
return {"is_public": l.is_public, "share_url": share_url}
if l.is_public:
flash("Lista została udostępniona publicznie", "success")
else:
flash("Lista została ukryta przed gośćmi", "info")
return redirect(url_for("main_page"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username_input = request.form["username"].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
session.modified = True
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
flash("Nieprawidłowy login lub hasło", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
flash("Wylogowano pomyślnie", "success")
return redirect(url_for("main_page"))
@app.route("/create", methods=["POST"])
@login_required
def create_list():
title = request.form.get("title")
is_temporary = request.form.get("temporary") == "1"
token = generate_share_token(8)
expires_at = (
datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None
)
new_list = ShoppingList(
title=title,
owner_id=current_user.id,
is_temporary=is_temporary,
share_token=token,
expires_at=expires_at,
)
db.session.add(new_list)
db.session.commit()
flash("Utworzono nową listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/list/<int:list_id>")
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
is_owner = current_user.id == shopping_list.owner_id
for item in items:
if item.added_by != shopping_list.owner_id:
item.added_by_display = (
item.added_by_user.username if item.added_by_user else "?"
)
else:
item.added_by_display = None
shopping_list.category_badges = [
{"name": c.name, "color": category_to_color(c.name)}
for c in shopping_list.categories
]
return render_template(
"list.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense,
is_share=False,
is_owner=is_owner,
)
@app.route("/expenses")
@login_required
def expenses():
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
category_id = request.args.get("category_id", type=int)
show_all = request.args.get("show_all", "true").lower() == "true"
categories = (
Category.query.join(
shopping_list_category, shopping_list_category.c.category_id == Category.id
)
.join(
ShoppingList, ShoppingList.id == shopping_list_category.c.shopping_list_id
)
.join(Expense, Expense.list_id == ShoppingList.id)
.filter(
or_(
ShoppingList.owner_id == current_user.id,
(
ShoppingList.is_public == True
if show_all
else ShoppingList.owner_id == current_user.id
),
)
)
.distinct()
.order_by(Category.name.asc())
.all()
)
categories.append(SimpleNamespace(id="none", name="Bez kategorii"))
start = None
end = None
expenses_query = Expense.query.options(
joinedload(Expense.shopping_list).joinedload(ShoppingList.owner),
joinedload(Expense.shopping_list).joinedload(ShoppingList.expenses),
joinedload(Expense.shopping_list).joinedload(ShoppingList.categories),
).join(ShoppingList, Expense.list_id == ShoppingList.id)
if not show_all:
expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id)
else:
expenses_query = expenses_query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
if category_id:
if str(category_id) == "none": # Bez kategorii
lists_query = lists_query.filter(~ShoppingList.categories.any())
else:
lists_query = lists_query.join(
shopping_list_category,
shopping_list_category.c.shopping_list_id == ShoppingList.id,
).filter(shopping_list_category.c.category_id == category_id)
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
list_ids = {e.list_id for e in expenses}
totals_map = {}
if list_ids:
totals = (
db.session.query(
Expense.list_id, func.sum(Expense.amount).label("total_expense")
)
.filter(Expense.list_id.in_(list_ids))
.group_by(Expense.list_id)
.all()
)
totals_map = {t.list_id: t.total_expense or 0 for t in totals}
expense_table = [
{
"title": e.shopping_list.title if e.shopping_list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at,
}
for e in expenses
]
lists_data = [
{
"id": l.id,
"title": l.title,
"created_at": l.created_at,
"total_expense": totals_map.get(l.id, 0),
"owner_username": l.owner.username if l.owner else "?",
"categories": [c.id for c in l.categories],
}
for l in {e.shopping_list for e in expenses if e.shopping_list}
]
return render_template(
"expenses.html",
expense_table=expense_table,
lists_data=lists_data,
categories=categories,
selected_category=category_id,
show_all=show_all,
)
@app.route("/expenses_data")
@login_required
def expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "true").lower() == "true"
category_id = request.args.get("category_id")
by_category = request.args.get("by_category", "false").lower() == "true"
if by_category:
result = get_total_expenses_grouped_by_category(
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
)
else:
result = get_total_expenses_grouped_by_list_created_at(
user_only=True,
admin=False,
show_all=show_all,
range_type=range_type,
start_date=start_date,
end_date=end_date,
user_id=current_user.id,
category_id=category_id,
)
if "error" in result:
return jsonify({"error": result["error"]}), 400
return jsonify(result)
@app.route("/share/<token>")
@app.route("/guest-list/<int:list_id>")
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for("main_page"))
list_id = shopping_list.id
total_expense = get_total_expense_for_list(list_id)
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
shopping_list.category_badges = [
{"name": c.name, "color": category_to_color(c.name)}
for c in shopping_list.categories
]
for item in items:
if item.added_by != shopping_list.owner_id:
item.added_by_display = (
item.added_by_user.username if item.added_by_user else "?"
)
else:
item.added_by_display = None
return render_template(
"list_share.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense,
is_share=True,
)
@app.route("/copy/<int:list_id>")
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(
title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token
)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash("Skopiowano listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/suggest_products")
def suggest_products():
query = request.args.get("q", "")
suggestions = []
if query:
suggestions = (
SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
.limit(5)
.all()
)
return {"suggestions": [s.name for s in suggestions]}
@app.route("/all_products")
def all_products():
sort = request.args.get("sort", "popularity")
limit = request.args.get("limit", type=int) or 100
offset = request.args.get("offset", type=int) or 0
ItemAlias = aliased(Item)
SuggestedAlias = aliased(SuggestedProduct)
base_query = db.session.query(
func.lower(func.trim(ItemAlias.name)).label("normalized_name"),
func.count(func.distinct(ItemAlias.list_id)).label("count"),
func.min(ItemAlias.name).label("original_name")
).join(
SuggestedAlias,
func.lower(func.trim(ItemAlias.name)) == func.lower(func.trim(SuggestedAlias.name))
).group_by("normalized_name")
if sort == "popularity":
base_query = base_query.order_by(func.count(func.distinct(ItemAlias.list_id)).desc(), "normalized_name")
else:
base_query = base_query.order_by("normalized_name")
results = base_query.offset(offset).limit(limit).all()
total_count = db.session.query(func.count()).select_from(
base_query.subquery()
).scalar()
products = [{"name": row.original_name, "count": row.count} for row in results]
return jsonify({
"products": products,
"total_count": total_count
})
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@login_required
def upload_receipt(list_id):
l = db.session.get(ShoppingList, list_id)
if "receipt" not in request.files:
return receipt_error("Brak pliku")
file = request.files["receipt"]
if file.filename == "":
return receipt_error("Nie wybrano pliku")
if file and allowed_file(file.filename):
file_bytes = file.read()
file.seek(0)
file_hash = hashlib.sha256(file_bytes).hexdigest()
existing = Receipt.query.filter_by(file_hash=file_hash).first()
if existing:
return receipt_error("Taki plik już istnieje")
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp"
file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename)
try:
if file.filename.lower().endswith(".pdf"):
file.seek(0)
save_pdf_as_webp(file, file_path)
else:
save_resized_image(file, file_path)
except ValueError as e:
return receipt_error(str(e))
filesize = os.path.getsize(file_path)
uploaded_at = datetime.now(timezone.utc)
new_receipt = Receipt(
list_id=list_id,
filename=webp_filename,
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
)
db.session.add(new_receipt)
db.session.commit()
if (
request.is_json
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
):
url = url_for("uploaded_file", filename=webp_filename)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
flash("Wgrano paragon", "success")
return redirect(request.referrer or url_for("main_page"))
return receipt_error("Niedozwolony format pliku")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
response.headers["Cache-Control"] = app.config["UPLOADS_CACHE_CONTROL"]
response.headers.pop("Pragma", None)
response.headers.pop("Content-Disposition", None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers["Content-Type"] = mime
return response
@app.route("/reorder_items", methods=["POST"])
@login_required
def reorder_items():
data = request.get_json()
list_id = data.get("list_id")
order = data.get("order")
for index, item_id in enumerate(order):
item = db.session.get(Item, item_id)
if item and item.list_id == list_id:
item.position = index
db.session.commit()
socketio.emit(
"items_reordered", {"list_id": list_id, "order": order}, to=str(list_id)
)
return jsonify(success=True)
@app.route("/rotate_receipt/<int:receipt_id>")
@login_required
def rotate_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
rotate_receipt_by_id(receipt_id)
recalculate_filesizes(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
@app.route("/delete_receipt/<int:receipt_id>")
@login_required
def delete_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
# OCR
@app.route("/lists/<int:list_id>/analyze", methods=["POST"])
@login_required
def analyze_receipts_for_list(list_id):
receipt_objs = Receipt.query.filter_by(list_id=list_id).all()
existing_expenses = {
e.receipt_filename
for e in Expense.query.filter_by(list_id=list_id).all()
if e.receipt_filename
}
results = []
total = 0.0
for receipt in receipt_objs:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(filepath):
continue
try:
raw_image = Image.open(filepath).convert("RGB")
image = preprocess_image_for_tesseract(raw_image)
value, lines = extract_total_tesseract(image)
except Exception as e:
print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}")
value = 0.0
lines = []
already_added = receipt.filename in existing_expenses
results.append(
{
"id": receipt.id,
"filename": receipt.filename,
"amount": round(value, 2),
"debug_text": lines,
"already_added": already_added,
}
)
if not already_added:
total += value
return jsonify({"results": results, "total": round(total, 2)})
@app.route("/user_crop_receipt", methods=["POST"])
@login_required
def crop_receipt_user():
receipt_id = request.form.get("receipt_id")
file = request.files.get("cropped_image")
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if list_obj.owner_id != current_user.id and not current_user.is_admin:
return jsonify(success=False, error="Brak dostępu"), 403
result = handle_crop_receipt(receipt_id, file)
return jsonify(result)
@app.route("/admin")
@login_required
@admin_required
def admin_panel():
month_str = request.args.get("m")
if not month_str:
month_str = datetime.now(timezone.utc).strftime("%Y-%m")
show_all = month_str == "all"
if not show_all:
try:
if month_str:
year, month = map(int, month_str.split("-"))
now = datetime(year, month, 1, tzinfo=timezone.utc)
else:
now = datetime.now(timezone.utc)
month_str = now.strftime("%Y-%m")
except Exception:
now = datetime.now(timezone.utc)
month_str = now.strftime("%Y-%m")
start = now
end = (start + timedelta(days=31)).replace(day=1)
else:
now = datetime.now(timezone.utc)
start = end = None
# Liczniki globalne
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
base_query = ShoppingList.query.options(
joinedload(ShoppingList.owner),
joinedload(ShoppingList.items),
joinedload(ShoppingList.receipts),
joinedload(ShoppingList.expenses),
joinedload(ShoppingList.categories),
)
if not show_all and start and end:
base_query = base_query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
all_lists = base_query.all()
# tylko listy z danych miesięcy
month_options = get_active_months_query()
all_ids = [l.id for l in all_lists]
stats_map = {}
latest_expenses_map = {}
if all_ids:
# Statystyki produktów
stats = (
db.session.query(
Item.list_id,
func.count(Item.id).label("total_count"),
func.sum(case((Item.purchased == True, 1), else_=0)).label(
"purchased_count"
),
)
.filter(Item.list_id.in_(all_ids))
.group_by(Item.list_id)
.all()
)
stats_map = {
s.list_id: (s.total_count or 0, s.purchased_count or 0) for s in stats
}
latest_expenses_map = dict(
db.session.query(
Expense.list_id, func.coalesce(func.sum(Expense.amount), 0)
)
.filter(Expense.list_id.in_(all_ids))
.group_by(Expense.list_id)
.all()
)
enriched_lists = []
for l in all_lists:
total_count, purchased_count = stats_map.get(l.id, (0, 0))
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = sum(1 for i in l.items if i.note and i.note.strip() != "")
receipts_count = len(l.receipts)
total_expense = latest_expenses_map.get(l.id, 0)
if l.is_temporary and l.expires_at:
expires_at = l.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
is_expired = expires_at < now
else:
is_expired = False
enriched_lists.append(
{
"list": l,
"total_count": total_count,
"purchased_count": purchased_count,
"percent": round(percent),
"comments_count": comments_count,
"receipts_count": receipts_count,
"total_expense": total_expense,
"expired": is_expired,
"categories": l.categories,
}
)
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
expense_summary = get_admin_expense_summary()
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024)
db_engine = db.engine
db_info = {
"engine": db_engine.name,
"version": getattr(db_engine.dialect, "server_version_info", None),
"url": str(db_engine.url).split("?")[0],
}
inspector = inspect(db_engine)
table_count = len(inspector.get_table_names())
record_total = get_total_records()
uptime_minutes = int(
(datetime.now(timezone.utc) - app_start_time).total_seconds() // 60
)
return render_template(
"admin/admin_panel.html",
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
expense_summary=expense_summary,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
db_info=db_info,
table_count=table_count,
record_total=record_total,
uptime_minutes=uptime_minutes,
timedelta=timedelta,
show_all=show_all,
month_str=month_str,
month_options=month_options,
)
@app.route("/admin/delete_list/<int:list_id>")
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f"Usunięto listę: {list_to_delete.title}", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/add_user", methods=["POST"])
@login_required
@admin_required
def add_user():
username = request.form["username"].lower()
password = request.form["password"]
if not username or not password:
flash("Wypełnij wszystkie pola", "danger")
return redirect(url_for("list_users"))
if User.query.filter(func.lower(User.username) == username).first():
flash("Użytkownik o takiej nazwie już istnieje", "warning")
return redirect(url_for("list_users"))
hashed_password = hash_password(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash("Dodano nowego użytkownika", "success")
return redirect(url_for("list_users"))
@app.route("/admin/users")
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template(
"admin/user_management.html",
users=users,
user_count=user_count,
list_count=list_count,
item_count=item_count,
activity_log=activity_log,
)
@app.route("/admin/change_password/<int:user_id>", methods=["POST"])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form["password"]
if not new_password:
flash("Podaj nowe hasło", "danger")
return redirect(url_for("list_users"))
user.password_hash = hash_password(new_password)
db.session.commit()
flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success")
return redirect(url_for("list_users"))
@app.route("/admin/delete_user/<int:user_id>")
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash("Nie można usunąć ostatniego administratora.", "danger")
return redirect(url_for("list_users"))
db.session.delete(user)
db.session.commit()
flash("Użytkownik usunięty", "success")
return redirect(url_for("list_users"))
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
def admin_receipts(id):
try:
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 24, type=int)
per_page = max(1, min(per_page, 200)) # sanity check
if id == "all":
all_filenames = {r.filename for r in Receipt.query.all()}
pagination = Receipt.query.order_by(Receipt.uploaded_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
receipts_paginated = pagination.items
total_pages = pagination.pages
upload_folder = app.config["UPLOAD_FOLDER"]
files_on_disk = set(os.listdir(upload_folder))
orphan_files = [
f
for f in files_on_disk
if f.endswith(".webp")
and f not in all_filenames
and f.startswith("list_")
]
else:
list_id = int(id)
receipts_paginated = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
orphan_files = []
page = 1
total_pages = 1
per_page = len(receipts_paginated) or 1
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
query_string = urlencode({k: v for k, v in request.args.items() if k != "page"})
return render_template(
"admin/receipts.html",
receipts=receipts_paginated,
orphan_files=orphan_files,
orphan_files_count=len(orphan_files),
page=page,
per_page=per_page,
total_pages=total_pages,
id=id,
query_string=query_string,
)
@app.route("/admin/rotate_receipt/<int:receipt_id>")
@login_required
@admin_required
def rotate_receipt(receipt_id):
try:
rotate_receipt_by_id(receipt_id)
recalculate_filesizes(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/delete_receipt/<int:receipt_id>")
@app.route("/admin/delete_receipt/orphan/<path:filename>")
@login_required
@admin_required
def delete_receipt(receipt_id=None, filename=None):
if filename: # tryb orphan
safe_filename = os.path.basename(filename)
if Receipt.query.filter_by(filename=safe_filename).first():
flash("Nie można usunąć pliku powiązanego z bazą!", "danger")
else:
file_path = os.path.join(app.config["UPLOAD_FOLDER"], safe_filename)
if os.path.exists(file_path):
try:
os.remove(file_path)
flash(f"Usunięto plik: {safe_filename}", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {e}", "danger")
else:
flash("Plik już nie istnieje.", "warning")
return redirect(url_for("admin_receipts", id="all"))
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/rename_receipt/<int:receipt_id>")
@login_required
@admin_required
def rename_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
try:
os.rename(old_path, new_path)
receipt.filename = new_filename
db.session.flush()
recalculate_filesizes(receipt.id)
db.session.commit()
flash("Zmieniono nazwę pliku", "success")
except Exception as e:
flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/generate_receipt_hash/<int:receipt_id>")
@login_required
@admin_required
def generate_receipt_hash(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
if receipt.file_hash:
flash("Hash już istnieje", "info")
return redirect(request.referrer)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(file_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
try:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
receipt.file_hash = file_hash
db.session.commit()
flash("Hash wygenerowany", "success")
except Exception as e:
flash(f"Błąd przy generowaniu hasha: {e}", "danger")
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist("list_ids")
for list_id in ids:
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash("Usunięto wybrane listy", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
@login_required
@admin_required
def edit_list(list_id):
l = db.session.get(
ShoppingList,
list_id,
options=[
joinedload(ShoppingList.expenses),
joinedload(ShoppingList.receipts),
joinedload(ShoppingList.owner),
joinedload(ShoppingList.items),
joinedload(ShoppingList.categories),
],
)
if l is None:
abort(404)
total_expense = get_total_expense_for_list(l.id)
categories = Category.query.order_by(Category.name.asc()).all()
selected_categories_ids = {c.id for c in l.categories}
if request.method == "POST":
action = request.form.get("action")
if action == "save":
new_title = request.form.get("title", "").strip()
new_amount_str = request.form.get("amount")
is_archived = "archived" in request.form
is_public = "public" in request.form
is_temporary = "temporary" in request.form
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
l.is_archived = is_archived
l.is_public = is_public
l.is_temporary = is_temporary
if expires_date and expires_time:
try:
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))
except ValueError:
flash("Niepoprawny ID użytkownika", "danger")
return redirect(url_for("edit_list", list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in l.expenses:
db.session.delete(expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
except ValueError:
flash("Niepoprawna kwota", "danger")
return redirect(url_for("edit_list", list_id=list_id))
created_month = request.form.get("created_month")
if created_month:
try:
year, month = map(int, created_month.split("-"))
l.created_at = datetime(year, month, 1, tzinfo=timezone.utc)
except ValueError:
flash(
"Nieprawidłowy format miesiąca (przeniesienie daty utworzenia)",
"danger",
)
return redirect(url_for("edit_list", list_id=list_id))
update_list_categories_from_form(l, request.form)
db.session.add(l)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "add_item":
item_name = request.form.get("item_name", "").strip()
quantity_str = request.form.get("quantity", "1")
if not item_name:
flash("Podaj nazwę produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
try:
quantity = max(1, int(quantity_str))
except ValueError:
quantity = 1
db.session.add(
Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
)
)
exists = (
db.session.query(SuggestedProduct)
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
flash("Dodano produkt", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "edit_quantity":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
try:
new_quantity = int(request.form.get("quantity"))
if new_quantity > 0:
item.quantity = new_quantity
db.session.commit()
flash("Zmieniono ilość produktu", "success")
except ValueError:
flash("Nieprawidłowa ilość", "danger")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
users = User.query.all()
items = l.items
receipts = l.receipts
return render_template(
"admin/edit_list.html",
list=l,
total_expense=total_expense,
users=users,
items=items,
receipts=receipts,
categories=categories,
selected_categories=selected_categories_ids,
)
@app.route("/admin/products")
@login_required
@admin_required
def list_products():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 100, type=int)
per_page = max(1, min(per_page, 300))
all_items = (
Item.query.options(
joinedload(Item.added_by_user),
)
.order_by(Item.id.desc())
.all()
)
seen_names = set()
unique_items = []
for item in all_items:
key = normalize_name(item.name)
if key not in seen_names:
unique_items.append(item)
seen_names.add(key)
usage_counts = dict(
db.session.query(
func.lower(Item.name),
func.coalesce(func.sum(Item.quantity), 0)
)
.group_by(func.lower(Item.name))
.all()
)
total_items = len(unique_items)
total_pages = (total_items + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
items = unique_items[start:end]
user_ids = {item.added_by for item in items if item.added_by}
users = User.query.filter(User.id.in_(user_ids)).all() if user_ids else []
users_dict = {u.id: u.username for u in users}
suggestions = SuggestedProduct.query.all()
all_suggestions_dict = {
normalize_name(s.name): s
for s in suggestions
if s.name and s.name.strip()
}
used_suggestion_names = {normalize_name(i.name) for i in unique_items}
suggestions_dict = {
name: all_suggestions_dict[name]
for name in used_suggestion_names
if name in all_suggestions_dict
}
orphan_suggestions = [
s for name, s in all_suggestions_dict.items()
if name not in used_suggestion_names
]
query_string = urlencode({k: v for k, v in request.args.items() if k != "page"})
synced_names = set(suggestions_dict.keys())
return render_template(
"admin/list_products.html",
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict,
orphan_suggestions=orphan_suggestions,
page=page,
per_page=per_page,
total_pages=total_pages,
query_string=query_string,
total_items=total_items,
usage_counts=usage_counts,
synced_names=synced_names
)
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == item.name.lower()
).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify(
{
"success": True,
"message": f"Utworzono sugestię dla produktu: {item.name}",
}
)
else:
return jsonify(
{
"success": True,
"message": f"Sugestia dla produktu „{item.name}” już istnieje.",
}
)
@app.route("/admin/delete_suggestion/<int:suggestion_id>", methods=["POST"])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({"success": True, "message": "Sugestia została usunięta."})
@app.route("/admin/promote_user/<int:user_id>")
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/demote_user/<int:user_id>")
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash("Nie możesz zdegradować samego siebie!", "danger")
return redirect(url_for("list_users"))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash(
"Nie można zdegradować. Musi pozostać co najmniej jeden administrator.",
"danger",
)
return redirect(url_for("list_users"))
user.is_admin = False
db.session.commit()
flash(f"Użytkownik {user.username} został zdegradowany.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/crop_receipt", methods=["POST"])
@login_required
@admin_required
def crop_receipt_admin():
receipt_id = request.form.get("receipt_id")
file = request.files.get("cropped_image")
result = handle_crop_receipt(receipt_id, file)
return jsonify(result)
@app.route("/admin/recalculate_filesizes")
@login_required
@admin_required
def recalculate_filesizes_all():
updated, unchanged, not_found = recalculate_filesizes()
flash(
f"Zaktualizowano: {updated}, bez zmian: {unchanged}, brak pliku: {not_found}",
"success",
)
return redirect(url_for("admin_receipts", id="all"))
@app.route("/admin/mass_edit_categories", methods=["GET", "POST"])
@login_required
@admin_required
def admin_mass_edit_categories():
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 50, type=int)
per_page = max(1, min(per_page, 200)) # ogranicz do sensownych wartości
lists_query = ShoppingList.query.options(
joinedload(ShoppingList.categories),
joinedload(ShoppingList.items),
joinedload(ShoppingList.owner),
).order_by(ShoppingList.created_at.desc())
pagination = lists_query.paginate(page=page, per_page=per_page, error_out=False)
lists = pagination.items
categories = Category.query.order_by(Category.name.asc()).all()
for l in lists:
l.total_count = len(l.items)
l.owner_name = l.owner.username if l.owner else "?"
l.category_count = len(l.categories)
if request.method == "POST":
for l in lists:
selected_ids = request.form.getlist(f"categories_{l.id}")
l.categories.clear()
if selected_ids:
cats = Category.query.filter(Category.id.in_(selected_ids)).all()
l.categories.extend(cats)
db.session.commit()
flash("Zaktualizowano kategorie dla wybranych list", "success")
return redirect(
url_for("admin_mass_edit_categories", page=page, per_page=per_page)
)
query_string = urlencode({k: v for k, v in request.args.items() if k != "page"})
return render_template(
"admin/mass_edit_categories.html",
lists=lists,
categories=categories,
page=page,
per_page=per_page,
total_pages=pagination.pages,
total_items=pagination.total,
query_string=query_string,
)
@app.route("/admin/list_items/<int:list_id>")
@login_required
@admin_required
def admin_list_items_json(list_id):
l = db.session.get(ShoppingList, list_id)
if not l:
return jsonify({"error": "Lista nie istnieje"}), 404
items = [
{
"name": item.name,
"quantity": item.quantity,
"purchased": item.purchased,
"not_purchased": item.not_purchased,
}
for item in l.items
]
purchased_count = sum(1 for item in l.items if item.purchased)
total_expense = sum(exp.amount for exp in l.expenses)
return jsonify(
{
"title": l.title,
"items": items,
"total_count": len(l.items),
"purchased_count": purchased_count,
"total_expense": round(total_expense, 2),
}
)
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
correct_token = app.config.get("HEALTHCHECK_TOKEN")
if header_token != correct_token:
abort(404)
return "OK", 200
@app.route("/robots.txt")
def robots_txt():
content = (
"User-agent: *\nDisallow: /"
if app.config.get("DISABLE_ROBOTS")
else "User-agent: *\nAllow: /"
)
return content, 200, {"Content-Type": "text/plain"}
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on("delete_item")
def handle_delete_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit("item_deleted", {"item_id": item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("edit_item")
def handle_edit_item(data):
item = db.session.get(Item, data["item_id"])
new_name = data["new_name"]
new_quantity = data.get("new_quantity", item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit(
"item_edited",
{"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity},
to=str(item.list_id),
)
@socketio.on("join_list")
def handle_join(data):
global active_users
room = str(data["room"])
username = data.get("username", "Gość")
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
shopping_list = db.session.get(ShoppingList, int(data["room"]))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit("user_joined", {"username": username}, to=room)
emit("user_list", {"users": list(active_users[room])}, to=room)
emit("joined_confirmation", {"room": room, "list_title": list_title})
@socketio.on("disconnect")
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit("user_left", {"username": username}, to=room)
emit("user_list", {"users": list(users)}, to=room)
@socketio.on("add_item")
def handle_add_item(data):
list_id = data["list_id"]
name = data["name"].strip()
quantity = data.get("quantity", 1)
list_obj = db.session.get(ShoppingList, list_id)
if not list_obj:
return
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
existing_item = Item.query.filter(
Item.list_id == list_id,
func.lower(Item.name) == name.lower(),
Item.not_purchased == False,
).first()
if existing_item:
existing_item.quantity += quantity
db.session.commit()
emit(
"item_edited",
{
"item_id": existing_item.id,
"new_name": existing_item.name,
"new_quantity": existing_item.quantity,
},
to=str(list_id),
)
else:
max_position = (
db.session.query(func.max(Item.position))
.filter_by(list_id=list_id)
.scalar()
)
if max_position is None:
max_position = 0
user_id = current_user.id if current_user.is_authenticated else None
user_name = current_user.username if current_user.is_authenticated else "Gość"
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
position=max_position + 1,
added_by=user_id,
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == name.lower()
).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit(
"item_added",
{
"id": new_item.id,
"name": new_item.name,
"quantity": new_item.quantity,
"added_by": user_name,
"added_by_id": user_id,
"owner_id": list_obj.owner_id,
},
to=str(list_id),
include_self=True,
)
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("check_item")
def handle_check_item(data):
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = True
item.purchased_at = datetime.now(UTC)
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_checked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("uncheck_item")
def handle_uncheck_item(data):
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("request_full_list")
def handle_request_full_list(data):
list_id = data["list_id"]
shopping_list = db.session.get(ShoppingList, list_id)
if not shopping_list:
return
owner_id = shopping_list.owner_id
items = (
Item.query.options(joinedload(Item.added_by_user))
.filter_by(list_id=list_id)
.order_by(Item.position.asc())
.all()
)
items_data = []
for item in items:
items_data.append(
{
"id": item.id,
"name": item.name,
"quantity": item.quantity,
"purchased": item.purchased if not item.not_purchased else False,
"not_purchased": item.not_purchased,
"not_purchased_reason": item.not_purchased_reason,
"note": item.note or "",
"added_by": item.added_by_user.username if item.added_by_user else None,
"added_by_id": item.added_by_user.id if item.added_by_user else None,
"owner_id": owner_id,
}
)
emit("full_list", {"items": items_data}, to=request.sid)
@socketio.on("update_note")
def handle_update_note(data):
item_id = data["item_id"]
note = data["note"]
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id))
@socketio.on("add_expense")
def handle_add_expense(data):
list_id = data["list_id"]
amount = data["amount"]
receipt_filename = data.get("receipt_filename")
if receipt_filename:
existing = Expense.query.filter_by(
list_id=list_id, receipt_filename=receipt_filename
).first()
if existing:
return
new_expense = Expense(
list_id=list_id, amount=amount, receipt_filename=receipt_filename
)
db.session.add(new_expense)
db.session.commit()
total = (
db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar()
or 0
)
emit("expense_added", {"amount": amount, "total": total}, to=str(list_id))
@socketio.on("mark_not_purchased")
def handle_mark_not_purchased(data):
item = db.session.get(Item, data["item_id"])
reason = data.get("reason", "")
if item:
item.not_purchased = True
item.not_purchased_reason = reason
db.session.commit()
emit(
"item_marked_not_purchased",
{"item_id": item.id, "reason": reason},
to=str(item.list_id),
)
@socketio.on("unmark_not_purchased")
def handle_unmark_not_purchased(data):
item = db.session.get(Item, data["item_id"])
if item:
item.not_purchased = False
item.purchased = False
item.purchased_at = None
item.not_purchased_reason = None
db.session.commit()
emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id))
@app.cli.command("db_info")
def create_db():
with app.app_context():
inspector = inspect(db.engine)
actual_tables = inspector.get_table_names()
table_count = len(actual_tables)
record_total = 0
with db.engine.connect() as conn:
for table in actual_tables:
try:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar()
record_total += count
except Exception:
pass
print("\nStruktura bazy danych jest poprawna.")
print(f"Silnik: {db.engine.name}")
print(f"Liczba tabel: {table_count}")
print(f"Łączna liczba rekordów: {record_total}")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO)
socketio.run(app, host="0.0.0.0", port=8000, debug=False)