Files
lista_zakupowa_live/app.py
Mateusz Gruszczyński c8a5db6715 talisman skip_if=csp_exempt
2025-07-25 21:25:44 +02:00

2617 lines
80 KiB
Python

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
import hashlib
import re
import traceback
from pillow_heif import register_heif_opener
from datetime import datetime, timedelta, UTC, timezone
from urllib.parse import urlparse, urlunparse
from flask import (
Flask,
render_template,
redirect,
url_for,
request,
flash,
Blueprint,
send_from_directory,
request,
abort,
session,
jsonify,
make_response,
)
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image, ExifTags, ImageFilter, ImageOps
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract, inspect, or_
from sqlalchemy.orm import joinedload
from collections import defaultdict, deque
from functools import wraps
from flask_talisman import Talisman
# OCR
import pytesseract
from collections import Counter
from pytesseract import Output
import logging
logging.getLogger("werkzeug").setLevel(logging.INFO)
app = Flask(__name__)
app.config.from_object(Config)
# Konfiguracja nagłówków bezpieczeństwa z .env
csp_policy = None
if app.config.get("ENABLE_CSP", True):
csp_policy = {
"default-src": "'self'",
"script-src": "'self'", # wciąż bez inline JS
"style-src": "'self' 'unsafe-inline'", # dopuszczamy style w HTML-u
"img-src": "'self' data:", # pozwalamy na data:image (np. SVG)
"connect-src": "'self'", # WebSockety
"script-src": "'self' 'unsafe-inline'",
}
permissions_policy = {"browsing-topics": "()"} if app.config["ENABLE_PP"] else None
talisman = Talisman(
app,
force_https=False,
strict_transport_security=app.config.get("ENABLE_HSTS", True),
frame_options="DENY" if app.config.get("ENABLE_XFO", True) else None,
permissions_policy=permissions_policy,
content_security_policy=csp_policy,
x_content_type_options=app.config.get("ENABLE_XCTO", True),
strict_transport_security_include_subdomains=False,
)
register_heif_opener() # pillow_heif dla HEIC
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic"}
SQLALCHEMY_ECHO = True
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme")
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads")
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9")
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234")
SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES", 10080))
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
DEBUG_MODE = app.config.get("DEBUG_MODE", False)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = "login"
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint("static_bp", __name__)
# dla live
active_users = {}
def utcnow():
return datetime.now(timezone.utc)
app_start_time = utcnow()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(512), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
owner = db.relationship("User", backref="lists", foreign_keys=[owner_id])
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
# expires_at = db.Column(db.DateTime, nullable=True)
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
name = db.Column(db.String(150), nullable=False)
# added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_at = db.Column(db.DateTime, default=utcnow)
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
added_by_user = db.relationship(
"User", backref="added_items", lazy=True, foreign_keys=[added_by]
)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
not_purchased = db.Column(db.Boolean, default=False)
not_purchased_reason = db.Column(db.Text, nullable=True)
position = db.Column(db.Integer, default=0)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
list = db.relationship("ShoppingList", backref="expenses", lazy=True)
class Receipt(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False)
filename = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
shopping_list = db.relationship("ShoppingList", backref="receipts", lazy=True)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
with app.app_context():
db.create_all()
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
password_hash = generate_password_hash(password)
if admin:
if admin.username != username or not check_password_hash(
admin.password_hash, password
):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route("/static/js/<path:filename>")
def serve_js(filename):
response = send_from_directory("static/js", filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
# response.expires = 0
response.pragma = "no-cache"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/css/<path:filename>")
def serve_css(filename):
response = send_from_directory("static/css", filename)
response.headers["Cache-Control"] = "public, max-age=3600"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/lib/js/<path:filename>")
def serve_js_lib(filename):
response = send_from_directory("static/lib/js", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
# CSS z cache na tydzień
@static_bp.route("/static/lib/css/<path:filename>")
def serve_css_lib(filename):
response = send_from_directory("static/lib/css", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
receipts = Receipt.query.filter_by(list_id=list_id).all()
receipt_files = [r.filename for r in receipts]
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash("Ta lista nie jest publicznie dostępna", "danger")
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path):
try:
# Otwórz i sprawdź poprawność pliku
image = Image.open(file)
image.verify()
file.seek(0)
image = Image.open(file)
except Exception:
raise ValueError("Nieprawidłowy plik graficzny")
try:
# Automatyczna rotacja według EXIF (np. zdjęcia z telefonu)
image = ImageOps.exif_transpose(image)
except Exception:
pass # ignorujemy, jeśli EXIF jest uszkodzony lub brak
try:
image.thumbnail((2000, 2000))
image = image.convert("RGB")
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
# image.save(new_path, format="WEBP", quality=100, method=0)
image.save(new_path, format="WEBP", lossless=True, method=6)
except Exception as e:
raise ValueError(f"Błąd podczas przetwarzania obrazu: {e}")
def redirect_with_flash(
message: str, category: str = "info", endpoint: str = "main_page"
):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger")
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config["UPLOAD_FOLDER"]
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
def _receipt_error(message):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"success": False, "error": message}), 400
flash(message, "danger")
return redirect(request.referrer or url_for("main_page"))
def rotate_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
raise FileNotFoundError("Plik nie istnieje")
image = Image.open(old_path)
rotated = image.rotate(-90, expand=True)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
rotated.save(new_path, format="WEBP", quality=100)
os.remove(old_path)
receipt.filename = new_filename
db.session.commit()
return receipt
def delete_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if os.path.exists(filepath):
os.remove(filepath)
db.session.delete(receipt)
db.session.commit()
return receipt
def generate_new_receipt_filename(list_id):
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
return f"list_{list_id}_{timestamp}_{random_part}.webp"
############# OCR ###########################
def preprocess_image_for_tesseract(image):
image = ImageOps.autocontrast(image)
image = image.point(lambda x: 0 if x < 150 else 255) # mocniejsza binarizacja
image = image.resize(
(image.width * 2, image.height * 2), Image.BICUBIC
) # większe powiększenie
return image
def extract_total_tesseract(image):
text = pytesseract.image_to_string(image, lang="pol", config="--psm 4")
lines = text.splitlines()
candidates = []
blacklist_keywords = re.compile(r"\b(ptu|vat|podatek|stawka)\b", re.IGNORECASE)
priority_keywords = re.compile(
r"""
\b(
razem\s*do\s*zap[łl][aąo0]ty |
do\s*zap[łl][aąo0]ty |
suma |
kwota |
warto[śćs] |
płatno[śćs] |
total |
amount
)\b
""",
re.IGNORECASE | re.VERBOSE,
)
for line in lines:
if not line.strip():
continue
if blacklist_keywords.search(line):
continue
is_priority = priority_keywords.search(line)
matches = re.findall(r"\d{1,4}[.,]\d{2}", line)
for match in matches:
try:
val = float(match.replace(",", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, is_priority is not None))
except:
continue
# Tylko w liniach priorytetowych: sprawdzamy spaced fallback
if is_priority:
spaced = re.findall(r"\d{1,4}\s\d{2}", line)
for match in spaced:
try:
val = float(match.replace(" ", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, True))
except:
continue
# Preferujemy linie priorytetowe
preferred = [(val, line) for val, line, is_pref in candidates if is_pref]
if preferred:
best_val = max(preferred, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
if candidates:
best_val = max(candidates, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
# Fallback: największy font + bold
data = pytesseract.image_to_data(
image, lang="pol", config="--psm 4", output_type=Output.DICT
)
font_candidates = []
for i in range(len(data["text"])):
word = data["text"][i].strip()
if not word or not re.match(r"^\d{1,5}[.,\s]\d{2}$", word):
continue
try:
val = float(word.replace(",", ".").replace(" ", "."))
height = data["height"][i]
conf = int(data.get("conf", ["0"] * len(data["text"]))[i])
if 0.1 <= val <= 100000:
font_candidates.append((val, height, conf))
except:
continue
if font_candidates:
# Preferuj najwyższy font z sensownym confidence
best = max(font_candidates, key=lambda x: (x[1], x[2]))
return round(best[0], 2), lines
return 0.0, lines
############# END OCR #######################
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
# return User.query.get(int(user_id))
return db.session.get(User, int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {"has_authorized_cookie": "authorized" in request.cookies}
@app.context_processor
def inject_is_blocked():
ip = request.access_route[0]
return {"is_blocked": is_ip_blocked(ip)}
@app.before_request
def require_system_password():
endpoint = request.endpoint
# Wyjątki: lib js/css zawsze przepuszczamy
if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"):
return
ip = request.access_route[0]
if is_ip_blocked(ip):
abort(403)
if endpoint is None:
return
if endpoint in ("system_auth", "healthcheck"):
return
if (
"authorized" not in request.cookies
and not endpoint.startswith("login")
and endpoint != "favicon"
):
# Dla serve_js przepuszczamy tylko toasts.js
if endpoint == "static_bp.serve_js":
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for("system_auth", next=request.url))
return
# Blokujemy pozostałe static_bp
if endpoint.startswith("static_bp."):
return
if request.path == "/":
return redirect(url_for("system_auth"))
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for("system_auth", next=fixed_url))
@app.template_filter("filemtime")
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
# return datetime.utcnow()
return datetime.now(timezone.utc)
@app.template_filter("filesizeformat")
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return (
render_template(
"errors.html",
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona.",
),
404,
)
@app.errorhandler(403)
def forbidden(e):
return (
render_template(
"errors.html",
code=403,
title="Brak dostępu",
message=(
e.description
if e.description
else "Nie masz uprawnień do wyświetlenia tej strony."
),
),
403,
)
@app.route("/favicon.ico")
def favicon_ico():
return redirect(url_for("static", filename="favicon.svg"))
@app.route("/favicon.svg")
def favicon():
svg = """
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
"""
return svg, 200, {"Content-Type": "image/svg+xml"}
@app.route("/")
def main_page():
now = datetime.now(timezone.utc)
month_str = request.args.get("month")
start = end = None
if month_str:
try:
year, month = map(int, month_str.split("-"))
start = datetime(year, month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
except:
start = end = None
def date_filter(query):
if start and end:
query = query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
return query
if current_user.is_authenticated:
user_lists = (
date_filter(
ShoppingList.query.filter_by(
owner_id=current_user.id, is_archived=False
).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
archived_lists = (
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True)
.order_by(ShoppingList.created_at.desc())
.all()
)
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
else:
user_lists = []
archived_lists = []
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
for l in user_lists + public_lists + archived_lists:
enrich_list_data(l)
return render_template(
"main.html",
user_lists=user_lists,
public_lists=public_lists,
archived_lists=archived_lists,
now=now,
timedelta=timedelta,
)
@app.route("/system-auth", methods=["GET", "POST"])
def system_auth():
if (
current_user.is_authenticated
or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE
):
flash("Jesteś już zalogowany lub autoryzowany.", "info")
return redirect(url_for("main_page"))
ip = request.access_route[0]
next_page = request.args.get("next") or url_for("main_page")
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
if request.method == "POST":
if request.form["password"] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
remaining = attempts_remaining(ip)
flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning")
return render_template("system_auth.html")
@app.route("/toggle_archive_list/<int:list_id>")
@login_required
def toggle_archive_list(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
archive = request.args.get("archive", "true").lower() == "true"
if archive:
l.is_archived = True
flash(f"Lista „{l.title}” została zarchiwizowana.", "success")
else:
l.is_archived = False
flash(f"Lista „{l.title}” została przywrócona.", "success")
db.session.commit()
return redirect(url_for("main_page"))
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
@login_required
def edit_my_list(list_id):
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
if request.method == "POST":
new_title = request.form.get("title", "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
# Walidacja tytułu
if not new_title:
flash("Podaj poprawny tytuł", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
l.title = new_title
l.is_public = is_public
l.is_temporary = is_temporary
l.is_archived = is_archived
# Obsługa daty wygaśnięcia
if expires_date and expires_time:
try:
combined = f"{expires_date} {expires_time}"
expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
l.expires_at = expires_dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Błędna data lub godzina wygasania", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
else:
l.expires_at = None
db.session.commit()
flash("Zaktualizowano dane listy", "success")
return redirect(url_for("main_page"))
return render_template("edit_my_list.html", list=l, receipts=receipts)
@app.route("/delete_user_list/<int:list_id>", methods=["POST"])
@login_required
def delete_user_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403)
delete_receipts_for_list(list_id)
Item.query.filter_by(list_id=list_id).delete()
Expense.query.filter_by(list_id=list_id).delete()
db.session.delete(l)
db.session.commit()
flash("Lista została usunięta", "success")
return redirect(url_for("main_page"))
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
@login_required
def toggle_visibility(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
if request.is_json or request.method == "POST":
return {"error": "Unauthorized"}, 403
flash("Nie masz uprawnień do tej listy", "danger")
return redirect(url_for("main_page"))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == "POST":
return {"is_public": l.is_public, "share_url": share_url}
if l.is_public:
flash("Lista została udostępniona publicznie", "success")
else:
flash("Lista została ukryta przed gośćmi", "info")
return redirect(url_for("main_page"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username_input = request.form["username"].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password_hash(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
flash("Nieprawidłowy login lub hasło", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
flash("Wylogowano pomyślnie", "success")
return redirect(url_for("main_page"))
@app.route("/create", methods=["POST"])
@login_required
def create_list():
title = request.form.get("title")
is_temporary = request.form.get("temporary") == "1"
token = generate_share_token(8)
# expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
expires_at = (
datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None
)
new_list = ShoppingList(
title=title,
owner_id=current_user.id,
is_temporary=is_temporary,
share_token=token,
expires_at=expires_at,
)
db.session.add(new_list)
db.session.commit()
flash("Utworzono nową listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/list/<int:list_id>")
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return render_template(
"list.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense,
is_share=False,
)
@app.route("/user_expenses")
@login_required
def user_expenses():
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
show_all = request.args.get("show_all", "false").lower() == "true"
start = None
end = None
expenses_query = Expense.query.join(
ShoppingList, Expense.list_id == ShoppingList.id
).options(joinedload(Expense.list))
# Jeśli show_all to False, filtruj tylko po bieżącym użytkowniku
if not show_all:
expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id)
else:
expenses_query = expenses_query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
list_ids = {e.list_id for e in expenses}
lists = (
ShoppingList.query.filter(ShoppingList.id.in_(list_ids))
.order_by(ShoppingList.created_at.desc())
.all()
)
expense_table = [
{
"title": e.list.title if e.list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at,
}
for e in expenses
]
lists_data = [
{
"id": l.id,
"title": l.title,
"created_at": l.created_at,
"total_expense": sum(
e.amount
for e in l.expenses
if (not start or not end) or (e.added_at >= start and e.added_at < end)
),
"owner_username": l.owner.username if l.owner else "?",
}
for l in lists
]
return render_template(
"user_expenses.html",
expense_table=expense_table,
lists_data=lists_data,
show_all=show_all,
)
@app.route("/user_expenses_data")
@login_required
def user_expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "false").lower() == "true"
query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
if show_all:
query = query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
else:
query = query.filter(ShoppingList.owner_id == current_user.id)
if start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
query = query.filter(Expense.added_at >= start, Expense.added_at < end)
except ValueError:
return jsonify({"error": "Błędne daty"}), 400
expenses = query.all()
grouped = defaultdict(float)
for e in expenses:
# ts = e.added_at or datetime.utcnow()
ts = e.added_at or datetime.now(timezone.utc)
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3) + 1}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
grouped[key] += e.amount
labels = sorted(grouped)
data = [round(grouped[label], 2) for label in labels]
return jsonify({"labels": labels, "expenses": data})
@app.route("/share/<token>")
@app.route("/guest-list/<int:list_id>")
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for("main_page"))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
return render_template(
"list_share.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense,
is_share=True,
)
@app.route("/copy/<int:list_id>")
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(
title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token
)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash("Skopiowano listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/suggest_products")
def suggest_products():
query = request.args.get("q", "")
suggestions = []
if query:
suggestions = (
SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
.limit(5)
.all()
)
return {"suggestions": [s.name for s in suggestions]}
@app.route("/all_products")
def all_products():
query = request.args.get("q", "")
top_products_query = SuggestedProduct.query
if query:
top_products_query = top_products_query.filter(
SuggestedProduct.name.ilike(f"%{query}%")
)
top_products = (
top_products_query.order_by(
SuggestedProduct.name.asc(), # musi być pierwsze
SuggestedProduct.usage_count.desc(),
)
.distinct(SuggestedProduct.name)
.limit(20)
.all()
)
top_names = [s.name for s in top_products]
rest_query = SuggestedProduct.query
if query:
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
if top_names:
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all()
all_names = top_names + [s.name for s in rest_products]
seen = set()
unique_names = []
for name in all_names:
name_lower = name.strip().lower()
if name_lower not in seen:
unique_names.append(name)
seen.add(name_lower)
return {"allproducts": unique_names}
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@login_required
def upload_receipt(list_id):
l = db.session.get(ShoppingList, list_id)
# if l is None or l.owner_id != current_user.id:
# return _receipt_error("Nie masz uprawnień do tej listy.")
if "receipt" not in request.files:
return _receipt_error("Brak pliku")
file = request.files["receipt"]
if file.filename == "":
return _receipt_error("Nie wybrano pliku")
if file and allowed_file(file.filename):
file_bytes = file.read()
file.seek(0)
file_hash = hashlib.sha256(file_bytes).hexdigest()
existing = Receipt.query.filter_by(file_hash=file_hash).first()
if existing:
return _receipt_error("Taki plik już istnieje")
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp"
file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename)
try:
save_resized_image(file, file_path)
except ValueError as e:
return _receipt_error(str(e))
filesize = os.path.getsize(file_path) if os.path.exists(file_path) else None
uploaded_at = datetime.now(timezone.utc)
new_receipt = Receipt(
list_id=list_id,
filename=webp_filename,
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
)
db.session.add(new_receipt)
db.session.commit()
if (
request.is_json
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
):
url = url_for("uploaded_file", filename=webp_filename)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
flash("Wgrano paragon", "success")
return redirect(request.referrer or url_for("main_page"))
return _receipt_error("Niedozwolony format pliku")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
response.headers["Cache-Control"] = "public, max-age=2592000, immutable"
response.headers.pop("Pragma", None)
response.headers.pop("Content-Disposition", None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers["Content-Type"] = mime
return response
@app.route("/reorder_items", methods=["POST"])
@login_required
def reorder_items():
data = request.get_json()
list_id = data.get("list_id")
order = data.get("order")
for index, item_id in enumerate(order):
item = db.session.get(Item, item_id)
if item and item.list_id == list_id:
item.position = index
db.session.commit()
socketio.emit(
"items_reordered", {"list_id": list_id, "order": order}, to=str(list_id)
)
return jsonify(success=True)
@app.route("/rotate_receipt/<int:receipt_id>")
@login_required
def rotate_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
rotate_receipt_by_id(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
@app.route("/delete_receipt/<int:receipt_id>")
@login_required
def delete_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
# OCR
@app.route("/lists/<int:list_id>/analyze", methods=["POST"])
@login_required
def analyze_receipts_for_list(list_id):
receipt_objs = Receipt.query.filter_by(list_id=list_id).all()
existing_expenses = {
e.receipt_filename
for e in Expense.query.filter_by(list_id=list_id).all()
if e.receipt_filename
}
results = []
total = 0.0
for receipt in receipt_objs:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(filepath):
continue
try:
raw_image = Image.open(filepath).convert("RGB")
image = preprocess_image_for_tesseract(raw_image)
value, lines = extract_total_tesseract(image)
except Exception as e:
print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}")
value = 0.0
lines = []
already_added = receipt.filename in existing_expenses
results.append(
{
"id": receipt.id,
"filename": receipt.filename,
"amount": round(value, 2),
"debug_text": lines,
"already_added": already_added,
}
)
if not already_added:
total += value
return jsonify({"results": results, "total": round(total, 2)})
@app.route("/admin")
@login_required
@admin_required
def admin_panel():
now = datetime.now(timezone.utc)
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ""])
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
# obliczenie czy wygasła
if l.is_temporary and l.expires_at:
expires_at = l.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
is_expired = expires_at < now
else:
is_expired = False
enriched_lists.append(
{
"list": l,
"total_count": total_count,
"purchased_count": purchased_count,
"percent": round(percent),
"comments_count": comments_count,
"receipts_count": len(receipt_files),
"total_expense": l.total_expense,
"expired": is_expired,
}
)
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_time = datetime.now(timezone.utc)
current_year = current_time.year
current_month = current_time.month
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.scalar()
or 0
)
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.filter(extract("month", Expense.added_at) == current_month)
.scalar()
or 0
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # MB
# Engine info
db_engine = db.engine
db_info = {
"engine": db_engine.name,
"version": getattr(db_engine.dialect, "server_version_info", None),
"url": str(db_engine.url).split("?")[0],
}
# Tabele
inspector = inspect(db_engine)
table_count = len(inspector.get_table_names())
# Rekordy (szybkie zliczenie)
record_total = (
db.session.query(func.count(User.id)).scalar()
+ db.session.query(func.count(ShoppingList.id)).scalar()
+ db.session.query(func.count(Item.id)).scalar()
+ db.session.query(func.count(Receipt.id)).scalar()
+ db.session.query(func.count(Expense.id)).scalar()
)
# Uptime
uptime_minutes = int(
(datetime.now(timezone.utc) - app_start_time).total_seconds() // 60
)
return render_template(
"admin/admin_panel.html",
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
db_info=db_info,
table_count=table_count,
record_total=record_total,
uptime_minutes=uptime_minutes,
)
@app.route("/admin/delete_list/<int:list_id>")
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f"Usunięto listę: {list_to_delete.title}", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/add_user", methods=["POST"])
@login_required
@admin_required
def add_user():
username = request.form["username"].lower()
password = request.form["password"]
if not username or not password:
flash("Wypełnij wszystkie pola", "danger")
return redirect(url_for("list_users"))
if User.query.filter(func.lower(User.username) == username).first():
flash("Użytkownik o takiej nazwie już istnieje", "warning")
return redirect(url_for("list_users"))
hashed_password = generate_password_hash(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash("Dodano nowego użytkownika", "success")
return redirect(url_for("list_users"))
@app.route("/admin/users")
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template(
"admin/user_management.html",
users=users,
user_count=user_count,
list_count=list_count,
item_count=item_count,
activity_log=activity_log,
)
@app.route("/admin/change_password/<int:user_id>", methods=["POST"])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form["password"]
if not new_password:
flash("Podaj nowe hasło", "danger")
return redirect(url_for("list_users"))
user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success")
return redirect(url_for("list_users"))
@app.route("/admin/delete_user/<int:user_id>")
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash("Nie można usunąć ostatniego administratora.", "danger")
return redirect(url_for("list_users"))
db.session.delete(user)
db.session.commit()
flash("Użytkownik usunięty", "success")
return redirect(url_for("list_users"))
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
def admin_receipts(id):
try:
if id == "all":
receipts = Receipt.query.order_by(Receipt.uploaded_at.desc()).all()
else:
list_id = int(id)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
return render_template("admin/receipts.html", receipts=receipts)
@app.route("/admin/rotate_receipt/<int:receipt_id>")
@login_required
@admin_required
def rotate_receipt(receipt_id):
try:
rotate_receipt_by_id(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/delete_receipt/<int:receipt_id>")
@login_required
@admin_required
def delete_receipt(receipt_id):
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/rename_receipt/<int:receipt_id>")
@login_required
@admin_required
def rename_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
try:
os.rename(old_path, new_path)
receipt.filename = new_filename
db.session.commit()
flash("Zmieniono nazwę pliku", "success")
except Exception as e:
flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/generate_receipt_hash/<int:receipt_id>")
@login_required
@admin_required
def generate_receipt_hash(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
if receipt.file_hash:
flash("Hash już istnieje", "info")
return redirect(request.referrer)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(file_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
try:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
receipt.file_hash = file_hash
db.session.commit()
flash("Hash wygenerowany", "success")
except Exception as e:
flash(f"Błąd przy generowaniu hasha: {e}", "danger")
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist("list_ids")
for list_id in ids:
# lst = ShoppingList.query.get(int(list_id))
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash("Usunięto wybrane listy", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
@login_required
@admin_required
def edit_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
items = (
db.session.query(Item).filter_by(list_id=list_id).order_by(Item.id.desc()).all()
)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
if request.method == "POST":
action = request.form.get("action")
if action == "save":
new_title = request.form.get("title", "").strip()
new_amount_str = request.form.get("amount")
is_archived = "archived" in request.form
is_public = "public" in request.form
is_temporary = "temporary" in request.form
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
l.is_archived = is_archived
l.is_public = is_public
l.is_temporary = is_temporary
if expires_date and expires_time:
try:
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))
except ValueError:
flash("Niepoprawny ID użytkownika", "danger")
return redirect(url_for("edit_list", list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in expenses:
db.session.delete(expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
except ValueError:
flash("Niepoprawna kwota", "danger")
return redirect(url_for("edit_list", list_id=list_id))
db.session.add(l)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "add_item":
item_name = request.form.get("item_name", "").strip()
quantity_str = request.form.get("quantity", "1")
if not item_name:
flash("Podaj nazwę produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
try:
quantity = max(1, int(quantity_str))
except ValueError:
quantity = 1
db.session.add(
Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
)
)
exists = (
db.session.query(SuggestedProduct)
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
flash("Dodano produkt", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "edit_quantity":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
try:
new_quantity = int(request.form.get("quantity"))
if new_quantity > 0:
item.quantity = new_quantity
db.session.commit()
flash("Zmieniono ilość produktu", "success")
except ValueError:
flash("Nieprawidłowa ilość", "danger")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
return render_template(
"admin/edit_list.html",
list=l,
total_expense=total_expense,
users=users,
items=items,
receipts=receipts,
upload_folder=app.config["UPLOAD_FOLDER"],
)
@app.route("/admin/products")
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
# users = User.query.all()
users = db.session.query(User).all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
suggestions_dict = {s.name.lower(): s for s in suggestions}
return render_template(
"admin/list_products.html",
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict,
)
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == item.name.lower()
).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify(
{
"success": True,
"message": f"Utworzono sugestię dla produktu: {item.name}",
}
)
else:
return jsonify(
{
"success": True,
"message": f"Sugestia dla produktu „{item.name}” już istnieje.",
}
)
@app.route("/admin/delete_suggestion/<int:suggestion_id>", methods=["POST"])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({"success": True, "message": "Sugestia została usunięta."})
@app.route("/admin/expenses_data")
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({"error": "Brak uprawnień"}), 403
range_type = request.args.get("range", "monthly")
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
labels = []
expenses = []
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
expenses_query = (
db.session.query(
extract("year", Expense.added_at).label("year"),
extract("month", Expense.added_at).label("month"),
func.sum(Expense.amount).label("total"),
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by("year", "month")
.order_by("year", "month")
.all()
)
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = (
"no-store, no-cache, must-revalidate, max-age=0"
)
return response
if range_type == "monthly":
for i in range(11, -1, -1):
year = (now - timedelta(days=i * 30)).year
month = (now - timedelta(days=i * 30)).month
label = f"{month:02d}/{year}"
labels.append(label)
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(extract("month", Expense.added_at) == month)
.scalar()
or 0
)
expenses.append(round(month_sum, 2))
elif range_type == "quarterly":
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i * 90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == "halfyearly":
for i in range(1, -1, -1):
half_start = now - timedelta(days=i * 180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(
(extract("month", Expense.added_at) <= 6)
if half == 1
else (extract("month", Expense.added_at) > 6)
)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == "yearly":
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
@app.route("/admin/promote_user/<int:user_id>")
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/demote_user/<int:user_id>")
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash("Nie możesz zdegradować samego siebie!", "danger")
return redirect(url_for("list_users"))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash(
"Nie można zdegradować. Musi pozostać co najmniej jeden administrator.",
"danger",
)
return redirect(url_for("list_users"))
user.is_admin = False
db.session.commit()
flash(f"Użytkownik {user.username} został zdegradowany.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/crop_receipt", methods=["POST"])
@login_required
@admin_required
def crop_receipt():
receipt_id = request.form.get("receipt_id")
file = request.files.get("cropped_image")
if not receipt_id or not file:
return jsonify(success=False, error="Brak danych")
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
try:
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
save_resized_image(file, new_path)
if os.path.exists(old_path):
os.remove(old_path)
receipt.filename = os.path.basename(new_path)
db.session.commit()
return jsonify(success=True)
except Exception as e:
return jsonify(success=False, error=str(e))
@app.route("/admin/recalculate_filesizes")
@login_required
@admin_required
def recalculate_filesizes():
updated = 0
not_found = 0
unchanged = 0
receipts = Receipt.query.all()
for r in receipts:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], r.filename)
if os.path.exists(filepath):
real_size = os.path.getsize(filepath)
if r.filesize != real_size:
r.filesize = real_size
updated += 1
else:
unchanged += 1
else:
not_found += 1
db.session.commit()
flash(
f"Zaktualizowano: {updated}, bez zmian: {unchanged}, brak pliku: {not_found}",
"success",
)
return redirect(url_for("admin_receipts", id="all"))
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
correct_token = app.config.get("HEALTHCHECK_TOKEN")
if header_token != correct_token:
abort(404)
return "OK", 200
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on("delete_item")
def handle_delete_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit("item_deleted", {"item_id": item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("edit_item")
def handle_edit_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
new_name = data["new_name"]
new_quantity = data.get("new_quantity", item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit(
"item_edited",
{"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity},
to=str(item.list_id),
)
@socketio.on("join_list")
def handle_join(data):
global active_users
room = str(data["room"])
username = data.get("username", "Gość")
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
# shopping_list = ShoppingList.query.get(int(data["room"]))
shopping_list = db.session.get(ShoppingList, int(data["room"]))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit("user_joined", {"username": username}, to=room)
emit("user_list", {"users": list(active_users[room])}, to=room)
emit("joined_confirmation", {"room": room, "list_title": list_title})
@socketio.on("disconnect")
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit("user_left", {"username": username}, to=room)
emit("user_list", {"users": list(users)}, to=room)
@socketio.on("add_item")
def handle_add_item(data):
list_id = data["list_id"]
name = data["name"].strip()
quantity = data.get("quantity", 1)
list_obj = db.session.get(ShoppingList, list_id)
if not list_obj:
return
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
existing_item = Item.query.filter(
Item.list_id == list_id,
func.lower(Item.name) == name.lower(),
Item.not_purchased == False,
).first()
if existing_item:
existing_item.quantity += quantity
db.session.commit()
emit(
"item_edited",
{
"item_id": existing_item.id,
"new_name": existing_item.name,
"new_quantity": existing_item.quantity,
},
to=str(list_id),
)
else:
max_position = (
db.session.query(func.max(Item.position))
.filter_by(list_id=list_id)
.scalar()
)
if max_position is None:
max_position = 0
user_id = current_user.id if current_user.is_authenticated else None
user_name = current_user.username if current_user.is_authenticated else "Gość"
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
position=max_position + 1,
added_by=user_id,
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == name.lower()
).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit(
"item_added",
{
"id": new_item.id,
"name": new_item.name,
"quantity": new_item.quantity,
"added_by": user_name,
"added_by_id": user_id,
"owner_id": list_obj.owner_id,
},
to=str(list_id),
include_self=True,
)
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("check_item")
def handle_check_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = True
# item.purchased_at = datetime.utcnow()
item.purchased_at = datetime.now(UTC)
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_checked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("uncheck_item")
def handle_uncheck_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("request_full_list")
def handle_request_full_list(data):
list_id = data["list_id"]
shopping_list = db.session.get(ShoppingList, list_id)
if not shopping_list:
return
owner_id = shopping_list.owner_id
items = (
Item.query.options(joinedload(Item.added_by_user))
.filter_by(list_id=list_id)
.order_by(Item.position.asc())
.all()
)
items_data = []
for item in items:
items_data.append(
{
"id": item.id,
"name": item.name,
"quantity": item.quantity,
"purchased": item.purchased if not item.not_purchased else False,
"not_purchased": item.not_purchased,
"not_purchased_reason": item.not_purchased_reason,
"note": item.note or "",
"added_by": item.added_by_user.username if item.added_by_user else None,
"added_by_id": item.added_by_user.id if item.added_by_user else None,
"owner_id": owner_id,
}
)
emit("full_list", {"items": items_data}, to=request.sid)
@socketio.on("update_note")
def handle_update_note(data):
item_id = data["item_id"]
note = data["note"]
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id))
@socketio.on("add_expense")
def handle_add_expense(data):
list_id = data["list_id"]
amount = data["amount"]
receipt_filename = data.get("receipt_filename")
if receipt_filename:
existing = Expense.query.filter_by(
list_id=list_id, receipt_filename=receipt_filename
).first()
if existing:
return
new_expense = Expense(
list_id=list_id, amount=amount, receipt_filename=receipt_filename
)
db.session.add(new_expense)
db.session.commit()
total = (
db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar()
or 0
)
emit("expense_added", {"amount": amount, "total": total}, to=str(list_id))
@socketio.on("mark_not_purchased")
def handle_mark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
reason = data.get("reason", "")
if item:
item.not_purchased = True
item.not_purchased_reason = reason
db.session.commit()
emit(
"item_marked_not_purchased",
{"item_id": item.id, "reason": reason},
to=str(item.list_id),
)
@socketio.on("unmark_not_purchased")
def handle_unmark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.not_purchased = False
item.purchased = False
item.purchased_at = None
item.not_purchased_reason = None
db.session.commit()
emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id))
@app.cli.command("create_db")
def create_db():
inspector = inspect(db.engine)
expected_tables = set(db.Model.metadata.tables.keys())
actual_tables = set(inspector.get_table_names())
missing_tables = expected_tables - actual_tables
extra_tables = actual_tables - expected_tables
if missing_tables:
print(f"Brakuje tabel: {', '.join(sorted(missing_tables))}")
if extra_tables:
print(f"Dodatkowe tabele w bazie: {', '.join(sorted(extra_tables))}")
critical_error = False
for table in expected_tables & actual_tables:
expected_columns = set(c.name for c in db.Model.metadata.tables[table].columns)
actual_columns = set(c["name"] for c in inspector.get_columns(table))
missing_cols = expected_columns - actual_columns
extra_cols = actual_columns - expected_columns
if missing_cols:
print(
f"Brakuje kolumn w tabeli '{table}': {', '.join(sorted(missing_cols))}"
)
critical_error = True
if extra_cols:
print(
f"Dodatkowe kolumny w tabeli '{table}': {', '.join(sorted(extra_cols))}"
)
if missing_tables or critical_error:
print("Struktura bazy jest niekompletna lub niezgodna. Przerwano.")
return
if not actual_tables:
db.create_all()
print("Utworzono strukturę bazy danych.")
else:
print("Struktura bazy danych jest poprawna.")
if __name__ == "__main__":
socketio.run(app, host="0.0.0.0", port=8000, debug=DEBUG_MODE)