Files
lista_zakupowa_live/app.py
Mateusz Gruszczyński db6f70349e ocr usprawnienia
2025-07-22 11:28:11 +02:00

2275 lines
70 KiB
Python

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
import secrets
import hashlib
import re
import numpy as np
from pillow_heif import register_heif_opener
from datetime import datetime, timedelta, UTC, timezone
from flask import (
Flask,
render_template,
redirect,
url_for,
request,
flash,
Blueprint,
send_from_directory,
request,
abort,
session,
jsonify,
make_response,
)
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image, ExifTags, ImageFilter, ImageOps
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
from collections import defaultdict, deque
from functools import wraps
# OCR
from collections import Counter
import pytesseract
from pytesseract import Output
app = Flask(__name__)
app.config.from_object(Config)
register_heif_opener() # pillow_heif dla HEIC
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic"}
SQLALCHEMY_ECHO = True
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme")
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads")
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9")
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234")
SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES", 10080))
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = "login"
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint("static_bp", __name__)
# dla live
active_users = {}
def utcnow():
return datetime.now(timezone.utc)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
# expires_at = db.Column(db.DateTime, nullable=True)
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
name = db.Column(db.String(150), nullable=False)
# added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_at = db.Column(db.DateTime, default=utcnow)
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
not_purchased = db.Column(db.Boolean, default=False)
not_purchased_reason = db.Column(db.Text, nullable=True)
position = db.Column(db.Integer, default=0)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
list = db.relationship("ShoppingList", backref="expenses", lazy=True)
class Receipt(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False)
filename = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
shopping_list = db.relationship("ShoppingList", backref="receipts", lazy=True)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
with app.app_context():
db.create_all()
from werkzeug.security import generate_password_hash
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
password_hash = generate_password_hash(password)
if admin:
if admin.username != username or not check_password_hash(
admin.password_hash, password
):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route("/static/js/<path:filename>")
def serve_js(filename):
response = send_from_directory("static/js", filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
# response.expires = 0
response.pragma = "no-cache"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/css/<path:filename>")
def serve_css(filename):
response = send_from_directory("static/css", filename)
response.headers["Cache-Control"] = "public, max-age=3600"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/lib/js/<path:filename>")
def serve_js_lib(filename):
response = send_from_directory("static/lib/js", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
# CSS z cache na tydzień
@static_bp.route("/static/lib/css/<path:filename>")
def serve_css_lib(filename):
response = send_from_directory("static/lib/css", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
receipts = Receipt.query.filter_by(list_id=list_id).all()
receipt_files = [r.filename for r in receipts]
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash("Ta lista nie jest publicznie dostępna", "danger")
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path):
try:
image = Image.open(file)
image.verify() # sprawdzenie poprawności pliku
file.seek(0) # reset do początku
image = Image.open(file) # ponowne otwarcie po verify()
except Exception:
raise ValueError("Nieprawidłowy plik graficzny")
# Obrót na podstawie EXIF
try:
exif = image._getexif()
if exif:
orientation_key = next(
k for k, v in ExifTags.TAGS.items() if v == "Orientation"
)
orientation = exif.get(orientation_key)
if orientation == 3:
image = image.rotate(180, expand=True)
elif orientation == 6:
image = image.rotate(270, expand=True)
elif orientation == 8:
image = image.rotate(90, expand=True)
except Exception:
pass # brak lub błędny EXIF
image.thumbnail((2000, 2000))
image = image.convert("RGB")
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
#image.save(new_path, format="WEBP", quality=85, method=6)
image.save(new_path, format="WEBP", quality=100, method=0)
def redirect_with_flash(
message: str, category: str = "info", endpoint: str = "main_page"
):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger")
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config["UPLOAD_FOLDER"]
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
def _receipt_error(message):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"success": False, "error": message}), 400
flash(message, "danger")
return redirect(request.referrer or url_for("main_page"))
############# OCR ###########################
def preprocess_image_for_tesseract(image):
image = ImageOps.autocontrast(image)
image = image.point(lambda x: 0 if x < 160 else 255) # mocniejsza binarizacja
#image = image.resize((image.width * 2, image.height * 2), Image.BICUBIC) # większe powiększenie
return image
def extract_total_tesseract(image):
text = pytesseract.image_to_string(image, lang="pol", config="--psm 4")
lines = text.splitlines()
candidates = []
keyword_lines_debug = []
fuzzy_regex = re.compile(r"[\dOo][.,:;g9zZ][\d]{2}")
keyword_pattern = re.compile(
r"""
\b(
[5s]u[mn][aąo0]? |
razem |
zap[łl][aąo0]ty |
do\s+zap[łl][aąo0]ty |
kwota |
płatno[śćs] |
warto[śćs] |
total |
amount
)\b
""",
re.IGNORECASE | re.VERBOSE
)
for idx, line in enumerate(lines):
if keyword_pattern.search(line[:30]):
keyword_lines_debug.append((idx, line))
for line in lines:
if not line.strip():
continue
matches = re.findall(r"\d{1,4}\s?[.,]\d{2}", line)
for match in matches:
try:
val = float(match.replace(" ", "").replace(",", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line))
except:
continue
spaced = re.findall(r"\d{1,4}\s\d{2}", line)
for match in spaced:
try:
val = float(match.replace(" ", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line))
except:
continue
fuzzy_matches = fuzzy_regex.findall(line)
for match in fuzzy_matches:
cleaned = (
match.replace("O", "0")
.replace("o", "0")
.replace(":", ".")
.replace(";", ".")
.replace(",", ".")
.replace("g", "9")
.replace("z", "9")
.replace("Z", "9")
)
try:
val = float(cleaned)
if 0.1 <= val <= 100000:
candidates.append((val, line))
except:
continue
preferred = [
(val, line)
for val, line in candidates
if keyword_pattern.search(line.lower())
]
if preferred:
max_val = max(preferred, key=lambda x: x[0])[0]
return round(max_val, 2), lines
if candidates:
max_val = max([val for val, _ in candidates])
return round(max_val, 2), lines
data = pytesseract.image_to_data(image, lang="pol", config="--psm 4", output_type=Output.DICT)
font_candidates = []
for i in range(len(data["text"])):
word = data["text"][i].strip()
if not word:
continue
if re.match(r"^\d{1,5}[.,\s]\d{2}$", word):
try:
val = float(word.replace(",", ".").replace(" ", "."))
height = data["height"][i]
if 0.1 <= val <= 10000:
font_candidates.append((val, height, word))
except:
continue
if font_candidates:
best = max(font_candidates, key=lambda x: x[1])
return round(best[0], 2), lines
return 0.0, lines
############# END OCR #######################
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
# return User.query.get(int(user_id))
return db.session.get(User, int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {"has_authorized_cookie": "authorized" in request.cookies}
@app.context_processor
def inject_is_blocked():
ip = request.access_route[0]
return {"is_blocked": is_ip_blocked(ip)}
@app.before_request
def require_system_password():
endpoint = request.endpoint
# Wyjątki: lib js/css zawsze przepuszczamy
if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"):
return
ip = request.access_route[0]
if is_ip_blocked(ip):
abort(403)
if endpoint is None:
return
if endpoint in ("system_auth", "healthcheck"):
return
if (
"authorized" not in request.cookies
and not endpoint.startswith("login")
and endpoint != "favicon"
):
# Dla serve_js przepuszczamy tylko toasts.js
if endpoint == "static_bp.serve_js":
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for("system_auth", next=request.url))
return
# Blokujemy pozostałe static_bp
if endpoint.startswith("static_bp."):
return
if request.path == "/":
return redirect(url_for("system_auth"))
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for("system_auth", next=fixed_url))
@app.template_filter("filemtime")
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
# return datetime.utcnow()
return datetime.now(timezone.utc)
@app.template_filter("filesizeformat")
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return (
render_template(
"errors.html",
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona.",
),
404,
)
@app.errorhandler(403)
def forbidden(e):
return (
render_template(
"errors.html",
code=403,
title="Brak dostępu",
message="Nie masz uprawnień do wyświetlenia tej strony.",
),
403,
)
@app.route("/favicon.ico")
def favicon_ico():
return redirect(url_for("static", filename="favicon.svg"))
@app.route("/favicon.svg")
def favicon():
svg = """
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
"""
return svg, 200, {"Content-Type": "image/svg+xml"}
@app.route("/")
def main_page():
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
if current_user.is_authenticated:
user_lists = (
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False)
.filter((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now))
.order_by(ShoppingList.created_at.desc())
.all()
)
archived_lists = (
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True)
.order_by(ShoppingList.created_at.desc())
.all()
)
public_lists = (
ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False,
)
.order_by(ShoppingList.created_at.desc())
.all()
)
else:
user_lists = []
archived_lists = []
public_lists = (
ShoppingList.query.filter(
ShoppingList.is_public == True,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False,
)
.order_by(ShoppingList.created_at.desc())
.all()
)
for l in user_lists + public_lists + archived_lists:
enrich_list_data(l)
return render_template(
"main.html",
user_lists=user_lists,
public_lists=public_lists,
archived_lists=archived_lists,
)
@app.route("/system-auth", methods=["GET", "POST"])
def system_auth():
if (
current_user.is_authenticated
or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE
):
flash("Jesteś już zalogowany lub autoryzowany.", "info")
return redirect(url_for("main_page"))
ip = request.access_route[0]
next_page = request.args.get("next") or url_for("main_page")
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
if request.method == "POST":
if request.form["password"] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
remaining = attempts_remaining(ip)
flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning")
return render_template("system_auth.html")
@app.route("/toggle_archive_list/<int:list_id>")
@login_required
def toggle_archive_list(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
archive = request.args.get("archive", "true").lower() == "true"
if archive:
l.is_archived = True
flash(f"Lista „{l.title}” została zarchiwizowana.", "success")
else:
l.is_archived = False
flash(f"Lista „{l.title}” została przywrócona.", "success")
db.session.commit()
return redirect(url_for("main_page"))
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
@login_required
def edit_my_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
if request.method == "POST":
new_title = request.form.get("title", "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
# Walidacja tytułu
if not new_title:
flash("Podaj poprawny tytuł", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
l.title = new_title
l.is_public = is_public
l.is_temporary = is_temporary
l.is_archived = is_archived
# Obsługa daty wygaśnięcia
if expires_date and expires_time:
try:
combined = f"{expires_date} {expires_time}"
expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
l.expires_at = expires_dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Błędna data lub godzina wygasania", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
else:
l.expires_at = None
db.session.commit()
flash("Zaktualizowano dane listy", "success")
return redirect(url_for("main_page"))
return render_template("edit_my_list.html", list=l)
@app.route("/delete_user_list/<int:list_id>", methods=["POST"])
@login_required
def delete_user_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403)
delete_receipts_for_list(list_id)
Item.query.filter_by(list_id=list_id).delete()
Expense.query.filter_by(list_id=list_id).delete()
db.session.delete(l)
db.session.commit()
flash("Lista została usunięta", "success")
return redirect(url_for("main_page"))
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
@login_required
def toggle_visibility(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
if request.is_json or request.method == "POST":
return {"error": "Unauthorized"}, 403
flash("Nie masz uprawnień do tej listy", "danger")
return redirect(url_for("main_page"))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == "POST":
return {"is_public": l.is_public, "share_url": share_url}
if l.is_public:
flash("Lista została udostępniona publicznie", "success")
else:
flash("Lista została ukryta przed gośćmi", "info")
return redirect(url_for("main_page"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username_input = request.form["username"].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password_hash(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
flash("Nieprawidłowy login lub hasło", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
flash("Wylogowano pomyślnie", "success")
return redirect(url_for("main_page"))
@app.route("/create", methods=["POST"])
@login_required
def create_list():
title = request.form.get("title")
is_temporary = request.form.get("temporary") == "1"
token = generate_share_token(8)
# expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
expires_at = (
datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None
)
new_list = ShoppingList(
title=title,
owner_id=current_user.id,
is_temporary=is_temporary,
share_token=token,
expires_at=expires_at,
)
db.session.add(new_list)
db.session.commit()
flash("Utworzono nową listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/list/<int:list_id>")
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return render_template(
"list.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense,
is_share=False,
)
@app.route("/user_expenses")
@login_required
def user_expenses():
from sqlalchemy.orm import joinedload
expenses = (
Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
.options(joinedload(Expense.list))
.filter(ShoppingList.owner_id == current_user.id)
.order_by(Expense.added_at.desc())
.all()
)
rows = [
{
"title": e.list.title if e.list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at,
}
for e in expenses
]
return render_template("user_expenses.html", expense_table=rows)
@app.route("/user/expenses_data")
@login_required
def user_expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id).filter(
ShoppingList.owner_id == current_user.id
)
if start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
query = query.filter(Expense.timestamp >= start, Expense.timestamp < end)
except ValueError:
return jsonify({"error": "Błędne daty"}), 400
expenses = query.all()
grouped = defaultdict(float)
for e in expenses:
# ts = e.added_at or datetime.utcnow()
ts = e.added_at or datetime.now(timezone.utc)
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3) + 1}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
grouped[key] += e.amount
labels = sorted(grouped)
data = [round(grouped[label], 2) for label in labels]
return jsonify({"labels": labels, "expenses": data})
@app.route("/share/<token>")
@app.route("/guest-list/<int:list_id>")
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for("main_page"))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
return render_template(
"list_share.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense,
is_share=True,
)
@app.route("/copy/<int:list_id>")
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(
title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token
)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash("Skopiowano listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/suggest_products")
def suggest_products():
query = request.args.get("q", "")
suggestions = []
if query:
suggestions = (
SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
.limit(5)
.all()
)
return {"suggestions": [s.name for s in suggestions]}
@app.route("/all_products")
def all_products():
query = request.args.get("q", "")
top_products_query = SuggestedProduct.query
if query:
top_products_query = top_products_query.filter(
SuggestedProduct.name.ilike(f"%{query}%")
)
top_products = (
top_products_query.order_by(
SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc()
)
.distinct(SuggestedProduct.name)
.limit(20)
.all()
)
top_names = [s.name for s in top_products]
rest_query = SuggestedProduct.query
if query:
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
if top_names:
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all()
all_names = top_names + [s.name for s in rest_products]
seen = set()
unique_names = []
for name in all_names:
name_lower = name.strip().lower()
if name_lower not in seen:
unique_names.append(name)
seen.add(name_lower)
return {"allproducts": unique_names}
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@login_required
def upload_receipt(list_id):
if "receipt" not in request.files:
return _receipt_error("Brak pliku")
file = request.files["receipt"]
if file.filename == "":
return _receipt_error("Nie wybrano pliku")
if file and allowed_file(file.filename):
file_bytes = file.read()
file.seek(0)
file_hash = hashlib.sha256(file_bytes).hexdigest()
existing = Receipt.query.filter_by(file_hash=file_hash).first()
if existing:
return _receipt_error("Taki plik już istnieje")
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp"
file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename)
try:
save_resized_image(file, file_path)
except ValueError as e:
return _receipt_error(str(e))
filesize = os.path.getsize(file_path) if os.path.exists(file_path) else None
uploaded_at = datetime.now(timezone.utc)
new_receipt = Receipt(
list_id=list_id,
filename=webp_filename,
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
)
db.session.add(new_receipt)
db.session.commit()
if (
request.is_json
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
):
url = url_for("uploaded_file", filename=webp_filename)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
flash("Wgrano paragon", "success")
return redirect(request.referrer or url_for("main_page"))
return _receipt_error("Niedozwolony format pliku")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
response.headers["Cache-Control"] = "public, max-age=2592000, immutable"
response.headers.pop("Pragma", None)
response.headers.pop("Content-Disposition", None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers["Content-Type"] = mime
return response
@app.route("/reorder_items", methods=["POST"])
@login_required
def reorder_items():
data = request.get_json()
list_id = data.get("list_id")
order = data.get("order")
for index, item_id in enumerate(order):
item = db.session.get(Item, item_id)
if item and item.list_id == list_id:
item.position = index
db.session.commit()
socketio.emit(
"items_reordered", {"list_id": list_id, "order": order}, to=str(list_id)
)
return jsonify(success=True)
# OCR
@app.route("/lists/<int:list_id>/analyze", methods=["POST"])
@login_required
def analyze_receipts_for_list(list_id):
receipt_objs = Receipt.query.filter_by(list_id=list_id).all()
existing_expenses = {
e.receipt_filename
for e in Expense.query.filter_by(list_id=list_id).all()
if e.receipt_filename
}
results = []
total = 0.0
for receipt in receipt_objs:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(filepath):
continue
try:
raw_image = Image.open(filepath).convert("RGB")
image = preprocess_image_for_tesseract(raw_image)
value, lines = extract_total_tesseract(image)
except Exception as e:
import traceback
print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}")
value = 0.0
lines = []
already_added = receipt.filename in existing_expenses
results.append(
{
"id": receipt.id,
"filename": receipt.filename,
"amount": round(value, 2),
"debug_text": lines,
"already_added": already_added,
}
)
if not already_added:
total += value
return jsonify({"results": results, "total": round(total, 2)})
@app.route("/admin")
@login_required
@admin_required
def admin_panel():
now = datetime.now(timezone.utc)
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ""])
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
# obliczenie czy wygasła
if l.is_temporary and l.expires_at:
expires_at = l.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
is_expired = expires_at < now
else:
is_expired = False
enriched_lists.append(
{
"list": l,
"total_count": total_count,
"purchased_count": purchased_count,
"percent": round(percent),
"comments_count": comments_count,
"receipts_count": len(receipt_files),
"total_expense": l.total_expense,
"expired": is_expired,
}
)
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_time = datetime.now(timezone.utc)
current_year = current_time.year
current_month = current_time.month
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.scalar()
or 0
)
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.filter(extract("month", Expense.added_at) == current_month)
.scalar()
or 0
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # MB
return render_template(
"admin/admin_panel.html",
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
)
@app.route("/admin/delete_list/<int:list_id>")
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f"Usunięto listę: {list_to_delete.title}", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/add_user", methods=["POST"])
@login_required
@admin_required
def add_user():
username = request.form["username"].lower()
password = request.form["password"]
if not username or not password:
flash("Wypełnij wszystkie pola", "danger")
return redirect(url_for("list_users"))
if User.query.filter(func.lower(User.username) == username).first():
flash("Użytkownik o takiej nazwie już istnieje", "warning")
return redirect(url_for("list_users"))
hashed_password = generate_password_hash(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash("Dodano nowego użytkownika", "success")
return redirect(url_for("list_users"))
@app.route("/admin/users")
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template(
"admin/user_management.html",
users=users,
user_count=user_count,
list_count=list_count,
item_count=item_count,
activity_log=activity_log,
)
@app.route("/admin/change_password/<int:user_id>", methods=["POST"])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form["password"]
if not new_password:
flash("Podaj nowe hasło", "danger")
return redirect(url_for("list_users"))
user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success")
return redirect(url_for("list_users"))
@app.route("/admin/delete_user/<int:user_id>")
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash("Nie można usunąć ostatniego administratora.", "danger")
return redirect(url_for("list_users"))
db.session.delete(user)
db.session.commit()
flash("Użytkownik usunięty", "success")
return redirect(url_for("list_users"))
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
def admin_receipts(id):
try:
if id == "all":
receipts = Receipt.query.order_by(Receipt.uploaded_at.desc()).all()
else:
list_id = int(id)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
return render_template("admin/receipts.html", receipts=receipts)
@app.route("/admin/rotate_receipt/<int:receipt_id>")
@login_required
@admin_required
def rotate_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(filepath):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
try:
image = Image.open(filepath)
rotated = image.rotate(-90, expand=True)
rotated.save(filepath, format="WEBP", quality=85)
flash("Obrócono paragon", "success")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/rename_receipt/<int:receipt_id>")
@login_required
@admin_required
def rename_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
new_filename = f"list_{receipt.list_id}_{timestamp}_{random_part}.webp"
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
try:
os.rename(old_path, new_path)
receipt.filename = new_filename
db.session.commit()
flash("Zmieniono nazwę pliku", "success")
except Exception as e:
flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/delete_receipt/<int:receipt_id>")
@login_required
@admin_required
def delete_receipt(receipt_id):
receipt = Receipt.query.get(receipt_id)
if not receipt:
flash("Paragon nie istnieje", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
# Usuń plik
if os.path.exists(file_path):
try:
os.remove(file_path)
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
# Usuń rekord z bazy
db.session.delete(receipt)
db.session.commit()
flash("Paragon usunięty", "success")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/generate_receipt_hash/<int:receipt_id>")
@login_required
@admin_required
def generate_receipt_hash(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
if receipt.file_hash:
flash("Hash już istnieje", "info")
return redirect(request.referrer)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(file_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
try:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
receipt.file_hash = file_hash
db.session.commit()
flash("Hash wygenerowany", "success")
except Exception as e:
flash(f"Błąd przy generowaniu hasha: {e}", "danger")
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist("list_ids")
for list_id in ids:
# lst = ShoppingList.query.get(int(list_id))
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash("Usunięto wybrane listy", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
@login_required
@admin_required
def edit_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
items = (
db.session.query(Item).filter_by(list_id=list_id).order_by(Item.id.desc()).all()
)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
if request.method == "POST":
action = request.form.get("action")
if action == "save":
new_title = request.form.get("title", "").strip()
new_amount_str = request.form.get("amount")
is_archived = "archived" in request.form
is_public = "public" in request.form
is_temporary = "temporary" in request.form
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
l.is_archived = is_archived
l.is_public = is_public
l.is_temporary = is_temporary
if expires_date and expires_time:
try:
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))
except ValueError:
flash("Niepoprawny ID użytkownika", "danger")
return redirect(url_for("edit_list", list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in expenses:
db.session.delete(expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
except ValueError:
flash("Niepoprawna kwota", "danger")
return redirect(url_for("edit_list", list_id=list_id))
db.session.add(l)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "add_item":
item_name = request.form.get("item_name", "").strip()
quantity_str = request.form.get("quantity", "1")
if not item_name:
flash("Podaj nazwę produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
try:
quantity = max(1, int(quantity_str))
except ValueError:
quantity = 1
db.session.add(
Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
)
)
exists = (
db.session.query(SuggestedProduct)
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
flash("Dodano produkt", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
return render_template(
"admin/edit_list.html",
list=l,
total_expense=total_expense,
users=users,
items=items,
receipts=receipts,
upload_folder=app.config["UPLOAD_FOLDER"],
)
@app.route("/admin/products")
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
# users = User.query.all()
users = db.session.query(User).all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
suggestions_dict = {s.name.lower(): s for s in suggestions}
return render_template(
"admin/list_products.html",
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict,
)
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == item.name.lower()
).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify(
{
"success": True,
"message": f"Utworzono sugestię dla produktu: {item.name}",
}
)
else:
return jsonify(
{
"success": True,
"message": f"Sugestia dla produktu „{item.name}” już istnieje.",
}
)
@app.route("/admin/delete_suggestion/<int:suggestion_id>", methods=["POST"])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({"success": True, "message": "Sugestia została usunięta."})
@app.route("/admin/expenses_data")
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({"error": "Brak uprawnień"}), 403
range_type = request.args.get("range", "monthly")
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
labels = []
expenses = []
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
expenses_query = (
db.session.query(
extract("year", Expense.added_at).label("year"),
extract("month", Expense.added_at).label("month"),
func.sum(Expense.amount).label("total"),
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by("year", "month")
.order_by("year", "month")
.all()
)
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = (
"no-store, no-cache, must-revalidate, max-age=0"
)
return response
if range_type == "monthly":
for i in range(11, -1, -1):
year = (now - timedelta(days=i * 30)).year
month = (now - timedelta(days=i * 30)).month
label = f"{month:02d}/{year}"
labels.append(label)
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(extract("month", Expense.added_at) == month)
.scalar()
or 0
)
expenses.append(round(month_sum, 2))
elif range_type == "quarterly":
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i * 90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == "halfyearly":
for i in range(1, -1, -1):
half_start = now - timedelta(days=i * 180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(
(extract("month", Expense.added_at) <= 6)
if half == 1
else (extract("month", Expense.added_at) > 6)
)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == "yearly":
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
@app.route("/admin/promote_user/<int:user_id>")
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/demote_user/<int:user_id>")
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash("Nie możesz zdegradować samego siebie!", "danger")
return redirect(url_for("list_users"))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash(
"Nie można zdegradować. Musi pozostać co najmniej jeden administrator.",
"danger",
)
return redirect(url_for("list_users"))
user.is_admin = False
db.session.commit()
flash(f"Użytkownik {user.username} został zdegradowany.", "success")
return redirect(url_for("list_users"))
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
correct_token = app.config.get("HEALTHCHECK_TOKEN")
if header_token != correct_token:
abort(404)
return "OK", 200
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on("delete_item")
def handle_delete_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit("item_deleted", {"item_id": item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("edit_item")
def handle_edit_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
new_name = data["new_name"]
new_quantity = data.get("new_quantity", item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit(
"item_edited",
{"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity},
to=str(item.list_id),
)
@socketio.on("join_list")
def handle_join(data):
global active_users
room = str(data["room"])
username = data.get("username", "Gość")
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
# shopping_list = ShoppingList.query.get(int(data["room"]))
shopping_list = db.session.get(ShoppingList, int(data["room"]))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit("user_joined", {"username": username}, to=room)
emit("user_list", {"users": list(active_users[room])}, to=room)
emit("joined_confirmation", {"room": room, "list_title": list_title})
@socketio.on("disconnect")
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit("user_left", {"username": username}, to=room)
emit("user_list", {"users": list(users)}, to=room)
@socketio.on("add_item")
def handle_add_item(data):
list_id = data["list_id"]
name = data["name"].strip()
quantity = data.get("quantity", 1)
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
existing_item = Item.query.filter(
Item.list_id == list_id,
func.lower(Item.name) == name.lower(),
Item.not_purchased == False,
).first()
if existing_item:
existing_item.quantity += quantity
db.session.commit()
emit(
"item_edited",
{
"item_id": existing_item.id,
"new_name": existing_item.name,
"new_quantity": existing_item.quantity,
},
to=str(list_id),
)
else:
max_position = (
db.session.query(func.max(Item.position))
.filter_by(list_id=list_id)
.scalar()
)
if max_position is None:
max_position = 0
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
position=max_position + 1,
added_by=current_user.id if current_user.is_authenticated else None,
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == name.lower()
).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit(
"item_added",
{
"id": new_item.id,
"name": new_item.name,
"quantity": new_item.quantity,
"added_by": (
current_user.username if current_user.is_authenticated else "Gość"
),
},
to=str(list_id),
include_self=True,
)
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("check_item")
def handle_check_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = True
# item.purchased_at = datetime.utcnow()
item.purchased_at = datetime.now(UTC)
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_checked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("uncheck_item")
def handle_uncheck_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("request_full_list")
def handle_request_full_list(data):
list_id = data["list_id"]
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
items_data = []
for item in items:
items_data.append(
{
"id": item.id,
"name": item.name,
"quantity": item.quantity,
"purchased": item.purchased if not item.not_purchased else False,
"not_purchased": item.not_purchased,
"not_purchased_reason": item.not_purchased_reason,
"note": item.note or "",
}
)
emit("full_list", {"items": items_data}, to=request.sid)
@socketio.on("update_note")
def handle_update_note(data):
item_id = data["item_id"]
note = data["note"]
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id))
@socketio.on("add_expense")
def handle_add_expense(data):
list_id = data["list_id"]
amount = data["amount"]
receipt_filename = data.get("receipt_filename")
if receipt_filename:
existing = Expense.query.filter_by(
list_id=list_id, receipt_filename=receipt_filename
).first()
if existing:
return
new_expense = Expense(
list_id=list_id, amount=amount, receipt_filename=receipt_filename
)
db.session.add(new_expense)
db.session.commit()
total = (
db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar()
or 0
)
emit("expense_added", {"amount": amount, "total": total}, to=str(list_id))
@socketio.on("mark_not_purchased")
def handle_mark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
reason = data.get("reason", "")
if item:
item.not_purchased = True
item.not_purchased_reason = reason
db.session.commit()
emit(
"item_marked_not_purchased",
{"item_id": item.id, "reason": reason},
to=str(item.list_id),
)
@socketio.on("unmark_not_purchased")
def handle_unmark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.not_purchased = False
item.purchased = False
item.purchased_at = None
item.not_purchased_reason = None
db.session.commit()
emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id))
""" @socketio.on('receipt_uploaded')
def handle_receipt_uploaded(data):
list_id = data['list_id']
url = data['url']
emit('receipt_added', {
'url': url
}, to=str(list_id), include_self=False) """
@app.cli.command("create_db")
def create_db():
db.create_all()
print("Database created.")
if __name__ == "__main__":
socketio.run(app, host="0.0.0.0", port=8000, debug=True)