Files
lista_zakupowa_live/app.py
2025-07-25 19:58:05 +02:00

2625 lines
81 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
import hashlib
import re
from pillow_heif import register_heif_opener
from datetime import datetime, timedelta, UTC, timezone
from flask import (
Flask,
render_template,
redirect,
url_for,
request,
flash,
Blueprint,
send_from_directory,
request,
abort,
session,
jsonify,
make_response,
)
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image, ExifTags, ImageFilter, ImageOps
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract, inspect, or_
from sqlalchemy.orm import joinedload
from collections import defaultdict, deque
from functools import wraps
from flask_talisman import Talisman
# OCR
from collections import Counter
import pytesseract
from pytesseract import Output
if os.environ.get("FLASK_RUN_FROM_CLI") == "true":
print("""
NIE URUCHAMIAJ aplikacji przez `flask run`!
Socket.IO wymaga uruchamiania przez `python app.py`, bo `flask run`
nie obsługuje WebSocketów poprawnie (działa tylko z Werkzeugem).
Użyj: `python app.py`
""")
sys.exit(1)
app = Flask(__name__)
app.config.from_object(Config)
# Konfiguracja nagłówków bezpieczeństwa z .env
csp_policy = None
if app.config.get("ENABLE_CSP", True):
csp_policy = {
'default-src': "'self'",
'script-src': "'self'", # wciąż bez inline JS
'style-src': "'self' 'unsafe-inline'", # dopuszczamy style w HTML-u
'img-src': "'self' data:", # pozwalamy na data:image (np. SVG)
'connect-src': "'self'", # WebSockety
'script-src': "'self' 'unsafe-inline'"
}
talisman = Talisman(
app,
force_https=app.config.get("ENABLE_HSTS", True),
strict_transport_security=app.config.get("ENABLE_HSTS", True),
frame_options="DENY" if app.config.get("ENABLE_XFO", True) else None,
content_security_policy=csp_policy,
x_content_type_options=app.config.get("ENABLE_XCTO", True),
strict_transport_security_include_subdomains=False
)
register_heif_opener() # pillow_heif dla HEIC
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp", "heic"}
SQLALCHEMY_ECHO = True
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme")
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads")
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9")
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234")
SESSION_TIMEOUT_MINUTES = int(app.config.get("SESSION_TIMEOUT_MINUTES", 10080))
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(minutes=SESSION_TIMEOUT_MINUTES)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = "login"
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint("static_bp", __name__)
# dla live
active_users = {}
def utcnow():
return datetime.now(timezone.utc)
app_start_time = utcnow()
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(512), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
owner = db.relationship("User", backref="lists", foreign_keys=[owner_id])
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
# expires_at = db.Column(db.DateTime, nullable=True)
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
name = db.Column(db.String(150), nullable=False)
# added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_at = db.Column(db.DateTime, default=utcnow)
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
added_by_user = db.relationship(
"User", backref="added_items", lazy=True, foreign_keys=[added_by]
)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
not_purchased = db.Column(db.Boolean, default=False)
not_purchased_reason = db.Column(db.Text, nullable=True)
position = db.Column(db.Integer, default=0)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
list = db.relationship("ShoppingList", backref="expenses", lazy=True)
class Receipt(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"), nullable=False)
filename = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
shopping_list = db.relationship("ShoppingList", backref="receipts", lazy=True)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
with app.app_context():
db.create_all()
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
password_hash = generate_password_hash(password)
if admin:
if admin.username != username or not check_password_hash(
admin.password_hash, password
):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route("/static/js/<path:filename>")
def serve_js(filename):
response = send_from_directory("static/js", filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
# response.expires = 0
response.pragma = "no-cache"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/css/<path:filename>")
def serve_css(filename):
response = send_from_directory("static/css", filename)
response.headers["Cache-Control"] = "public, max-age=3600"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
@static_bp.route("/static/lib/js/<path:filename>")
def serve_js_lib(filename):
response = send_from_directory("static/lib/js", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
# CSS z cache na tydzień
@static_bp.route("/static/lib/css/<path:filename>")
def serve_css_lib(filename):
response = send_from_directory("static/lib/css", filename)
response.headers["Cache-Control"] = "public, max-age=604800"
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
receipts = Receipt.query.filter_by(list_id=list_id).all()
receipt_files = [r.filename for r in receipts]
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash("Ta lista nie jest publicznie dostępna", "danger")
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path):
try:
# Otwórz i sprawdź poprawność pliku
image = Image.open(file)
image.verify()
file.seek(0)
image = Image.open(file)
except Exception:
raise ValueError("Nieprawidłowy plik graficzny")
try:
# Automatyczna rotacja według EXIF (np. zdjęcia z telefonu)
image = ImageOps.exif_transpose(image)
except Exception:
pass # ignorujemy, jeśli EXIF jest uszkodzony lub brak
try:
image.thumbnail((2000, 2000))
image = image.convert("RGB")
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
# image.save(new_path, format="WEBP", quality=100, method=0)
image.save(new_path, format="WEBP", lossless=True, method=6)
except Exception as e:
raise ValueError(f"Błąd podczas przetwarzania obrazu: {e}")
def redirect_with_flash(
message: str, category: str = "info", endpoint: str = "main_page"
):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger")
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).order_by(Item.position.asc()).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config["UPLOAD_FOLDER"]
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
def _receipt_error(message):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"success": False, "error": message}), 400
flash(message, "danger")
return redirect(request.referrer or url_for("main_page"))
def rotate_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
raise FileNotFoundError("Plik nie istnieje")
image = Image.open(old_path)
rotated = image.rotate(-90, expand=True)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
rotated.save(new_path, format="WEBP", quality=100)
os.remove(old_path)
receipt.filename = new_filename
db.session.commit()
return receipt
def delete_receipt_by_id(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if os.path.exists(filepath):
os.remove(filepath)
db.session.delete(receipt)
db.session.commit()
return receipt
def generate_new_receipt_filename(list_id):
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
return f"list_{list_id}_{timestamp}_{random_part}.webp"
############# OCR ###########################
def preprocess_image_for_tesseract(image):
image = ImageOps.autocontrast(image)
image = image.point(lambda x: 0 if x < 150 else 255) # mocniejsza binarizacja
image = image.resize(
(image.width * 2, image.height * 2), Image.BICUBIC
) # większe powiększenie
return image
def extract_total_tesseract(image):
import pytesseract
from pytesseract import Output
import re
text = pytesseract.image_to_string(image, lang="pol", config="--psm 4")
lines = text.splitlines()
candidates = []
blacklist_keywords = re.compile(r"\b(ptu|vat|podatek|stawka)\b", re.IGNORECASE)
priority_keywords = re.compile(
r"""
\b(
razem\s*do\s*zap[łl][aąo0]ty |
do\s*zap[łl][aąo0]ty |
suma |
kwota |
warto[śćs] |
płatno[śćs] |
total |
amount
)\b
""",
re.IGNORECASE | re.VERBOSE,
)
for line in lines:
if not line.strip():
continue
if blacklist_keywords.search(line):
continue
is_priority = priority_keywords.search(line)
matches = re.findall(r"\d{1,4}[.,]\d{2}", line)
for match in matches:
try:
val = float(match.replace(",", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, is_priority is not None))
except:
continue
# Tylko w liniach priorytetowych: sprawdzamy spaced fallback
if is_priority:
spaced = re.findall(r"\d{1,4}\s\d{2}", line)
for match in spaced:
try:
val = float(match.replace(" ", "."))
if 0.1 <= val <= 100000:
candidates.append((val, line, True))
except:
continue
# Preferujemy linie priorytetowe
preferred = [(val, line) for val, line, is_pref in candidates if is_pref]
if preferred:
best_val = max(preferred, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
if candidates:
best_val = max(candidates, key=lambda x: x[0])[0]
if best_val < 99999:
return round(best_val, 2), lines
# Fallback: największy font + bold
data = pytesseract.image_to_data(
image, lang="pol", config="--psm 4", output_type=Output.DICT
)
font_candidates = []
for i in range(len(data["text"])):
word = data["text"][i].strip()
if not word or not re.match(r"^\d{1,5}[.,\s]\d{2}$", word):
continue
try:
val = float(word.replace(",", ".").replace(" ", "."))
height = data["height"][i]
conf = int(data.get("conf", ["0"] * len(data["text"]))[i])
if 0.1 <= val <= 100000:
font_candidates.append((val, height, conf))
except:
continue
if font_candidates:
# Preferuj najwyższy font z sensownym confidence
best = max(font_candidates, key=lambda x: (x[1], x[2]))
return round(best[0], 2), lines
return 0.0, lines
############# END OCR #######################
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
# return User.query.get(int(user_id))
return db.session.get(User, int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {"has_authorized_cookie": "authorized" in request.cookies}
@app.context_processor
def inject_is_blocked():
ip = request.access_route[0]
return {"is_blocked": is_ip_blocked(ip)}
@app.before_request
def require_system_password():
endpoint = request.endpoint
# Wyjątki: lib js/css zawsze przepuszczamy
if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"):
return
ip = request.access_route[0]
if is_ip_blocked(ip):
abort(403)
if endpoint is None:
return
if endpoint in ("system_auth", "healthcheck"):
return
if (
"authorized" not in request.cookies
and not endpoint.startswith("login")
and endpoint != "favicon"
):
# Dla serve_js przepuszczamy tylko toasts.js
if endpoint == "static_bp.serve_js":
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for("system_auth", next=request.url))
return
# Blokujemy pozostałe static_bp
if endpoint.startswith("static_bp."):
return
if request.path == "/":
return redirect(url_for("system_auth"))
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for("system_auth", next=fixed_url))
@app.template_filter("filemtime")
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
# return datetime.utcnow()
return datetime.now(timezone.utc)
@app.template_filter("filesizeformat")
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return (
render_template(
"errors.html",
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona.",
),
404,
)
@app.errorhandler(403)
def forbidden(e):
return (
render_template(
"errors.html",
code=403,
title="Brak dostępu",
message=(
e.description
if e.description
else "Nie masz uprawnień do wyświetlenia tej strony."
),
),
403,
)
@app.route("/favicon.ico")
def favicon_ico():
return redirect(url_for("static", filename="favicon.svg"))
@app.route("/favicon.svg")
def favicon():
svg = """
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
"""
return svg, 200, {"Content-Type": "image/svg+xml"}
@app.route("/")
def main_page():
now = datetime.now(timezone.utc)
month_str = request.args.get("month")
start = end = None
if month_str:
try:
year, month = map(int, month_str.split("-"))
start = datetime(year, month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
except:
start = end = None
def date_filter(query):
if start and end:
query = query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
return query
if current_user.is_authenticated:
user_lists = (
date_filter(
ShoppingList.query.filter_by(
owner_id=current_user.id, is_archived=False
).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
archived_lists = (
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True)
.order_by(ShoppingList.created_at.desc())
.all()
)
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
else:
user_lists = []
archived_lists = []
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
for l in user_lists + public_lists + archived_lists:
enrich_list_data(l)
return render_template(
"main.html",
user_lists=user_lists,
public_lists=public_lists,
archived_lists=archived_lists,
now=now,
timedelta=timedelta,
)
@app.route("/system-auth", methods=["GET", "POST"])
def system_auth():
if (
current_user.is_authenticated
or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE
):
flash("Jesteś już zalogowany lub autoryzowany.", "info")
return redirect(url_for("main_page"))
ip = request.access_route[0]
next_page = request.args.get("next") or url_for("main_page")
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
if request.method == "POST":
if request.form["password"] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash(
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
"danger",
)
return render_template("system_auth.html"), 403
remaining = attempts_remaining(ip)
flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning")
return render_template("system_auth.html")
@app.route("/toggle_archive_list/<int:list_id>")
@login_required
def toggle_archive_list(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
archive = request.args.get("archive", "true").lower() == "true"
if archive:
l.is_archived = True
flash(f"Lista „{l.title}” została zarchiwizowana.", "success")
else:
l.is_archived = False
flash(f"Lista „{l.title}” została przywrócona.", "success")
db.session.commit()
return redirect(url_for("main_page"))
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
@login_required
def edit_my_list(list_id):
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
if request.method == "POST":
new_title = request.form.get("title", "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
# Walidacja tytułu
if not new_title:
flash("Podaj poprawny tytuł", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
l.title = new_title
l.is_public = is_public
l.is_temporary = is_temporary
l.is_archived = is_archived
# Obsługa daty wygaśnięcia
if expires_date and expires_time:
try:
combined = f"{expires_date} {expires_time}"
expires_dt = datetime.strptime(combined, "%Y-%m-%d %H:%M")
l.expires_at = expires_dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Błędna data lub godzina wygasania", "danger")
return redirect(url_for("edit_my_list", list_id=list_id))
else:
l.expires_at = None
db.session.commit()
flash("Zaktualizowano dane listy", "success")
return redirect(url_for("main_page"))
return render_template("edit_my_list.html", list=l, receipts=receipts)
@app.route("/delete_user_list/<int:list_id>", methods=["POST"])
@login_required
def delete_user_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403, description="Nie jesteś właścicielem tej listy.")
l = db.session.get(ShoppingList, list_id)
if l is None or l.owner_id != current_user.id:
abort(403)
delete_receipts_for_list(list_id)
Item.query.filter_by(list_id=list_id).delete()
Expense.query.filter_by(list_id=list_id).delete()
db.session.delete(l)
db.session.commit()
flash("Lista została usunięta", "success")
return redirect(url_for("main_page"))
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
@login_required
def toggle_visibility(list_id):
# l = ShoppingList.query.get_or_404(list_id)
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
if l.owner_id != current_user.id:
if request.is_json or request.method == "POST":
return {"error": "Unauthorized"}, 403
flash("Nie masz uprawnień do tej listy", "danger")
return redirect(url_for("main_page"))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == "POST":
return {"is_public": l.is_public, "share_url": share_url}
if l.is_public:
flash("Lista została udostępniona publicznie", "success")
else:
flash("Lista została ukryta przed gośćmi", "info")
return redirect(url_for("main_page"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username_input = request.form["username"].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password_hash(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
flash("Nieprawidłowy login lub hasło", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
flash("Wylogowano pomyślnie", "success")
return redirect(url_for("main_page"))
@app.route("/create", methods=["POST"])
@login_required
def create_list():
title = request.form.get("title")
is_temporary = request.form.get("temporary") == "1"
token = generate_share_token(8)
# expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
expires_at = (
datetime.now(timezone.utc) + timedelta(days=7) if is_temporary else None
)
new_list = ShoppingList(
title=title,
owner_id=current_user.id,
is_temporary=is_temporary,
share_token=token,
expires_at=expires_at,
)
db.session.add(new_list)
db.session.commit()
flash("Utworzono nową listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/list/<int:list_id>")
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return render_template(
"list.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense,
is_share=False,
)
@app.route("/user_expenses")
@login_required
def user_expenses():
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
show_all = request.args.get("show_all", "false").lower() == "true"
start = None
end = None
expenses_query = Expense.query.join(
ShoppingList, Expense.list_id == ShoppingList.id
).options(joinedload(Expense.list))
# Jeśli show_all to False, filtruj tylko po bieżącym użytkowniku
if not show_all:
expenses_query = expenses_query.filter(ShoppingList.owner_id == current_user.id)
else:
expenses_query = expenses_query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
if start_date_str and end_date_str:
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(days=1)
expenses_query = expenses_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
except ValueError:
flash("Błędny zakres dat", "danger")
expenses = expenses_query.order_by(Expense.added_at.desc()).all()
list_ids = {e.list_id for e in expenses}
lists = (
ShoppingList.query.filter(ShoppingList.id.in_(list_ids))
.order_by(ShoppingList.created_at.desc())
.all()
)
expense_table = [
{
"title": e.list.title if e.list else "Nieznana",
"amount": e.amount,
"added_at": e.added_at,
}
for e in expenses
]
lists_data = [
{
"id": l.id,
"title": l.title,
"created_at": l.created_at,
"total_expense": sum(
e.amount
for e in l.expenses
if (not start or not end) or (e.added_at >= start and e.added_at < end)
),
"owner_username": l.owner.username if l.owner else "?",
}
for l in lists
]
return render_template(
"user_expenses.html",
expense_table=expense_table,
lists_data=lists_data,
show_all=show_all,
)
@app.route("/user_expenses_data")
@login_required
def user_expenses_data():
range_type = request.args.get("range", "monthly")
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "false").lower() == "true"
query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
if show_all:
query = query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
)
)
else:
query = query.filter(ShoppingList.owner_id == current_user.id)
if start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
query = query.filter(Expense.added_at >= start, Expense.added_at < end)
except ValueError:
return jsonify({"error": "Błędne daty"}), 400
expenses = query.all()
grouped = defaultdict(float)
for e in expenses:
# ts = e.added_at or datetime.utcnow()
ts = e.added_at or datetime.now(timezone.utc)
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3) + 1}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
grouped[key] += e.amount
labels = sorted(grouped)
data = [round(grouped[label], 2) for label in labels]
return jsonify({"labels": labels, "expenses": data})
@app.route("/share/<token>")
@app.route("/guest-list/<int:list_id>")
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for("main_page"))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
list_id
)
return render_template(
"list_share.html",
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense,
is_share=True,
)
@app.route("/copy/<int:list_id>")
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(
title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token
)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash("Skopiowano listę", "success")
return redirect(url_for("view_list", list_id=new_list.id))
@app.route("/suggest_products")
def suggest_products():
query = request.args.get("q", "")
suggestions = []
if query:
suggestions = (
SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
.limit(5)
.all()
)
return {"suggestions": [s.name for s in suggestions]}
@app.route("/all_products")
def all_products():
query = request.args.get("q", "")
top_products_query = SuggestedProduct.query
if query:
top_products_query = top_products_query.filter(
SuggestedProduct.name.ilike(f"%{query}%")
)
top_products = (
top_products_query.order_by(
SuggestedProduct.name.asc(), # musi być pierwsze
SuggestedProduct.usage_count.desc(),
)
.distinct(SuggestedProduct.name)
.limit(20)
.all()
)
top_names = [s.name for s in top_products]
rest_query = SuggestedProduct.query
if query:
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
if top_names:
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all()
all_names = top_names + [s.name for s in rest_products]
seen = set()
unique_names = []
for name in all_names:
name_lower = name.strip().lower()
if name_lower not in seen:
unique_names.append(name)
seen.add(name_lower)
return {"allproducts": unique_names}
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
@login_required
def upload_receipt(list_id):
l = db.session.get(ShoppingList, list_id)
# if l is None or l.owner_id != current_user.id:
# return _receipt_error("Nie masz uprawnień do tej listy.")
if "receipt" not in request.files:
return _receipt_error("Brak pliku")
file = request.files["receipt"]
if file.filename == "":
return _receipt_error("Nie wybrano pliku")
if file and allowed_file(file.filename):
file_bytes = file.read()
file.seek(0)
file_hash = hashlib.sha256(file_bytes).hexdigest()
existing = Receipt.query.filter_by(file_hash=file_hash).first()
if existing:
return _receipt_error("Taki plik już istnieje")
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp"
file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename)
try:
save_resized_image(file, file_path)
except ValueError as e:
return _receipt_error(str(e))
filesize = os.path.getsize(file_path) if os.path.exists(file_path) else None
uploaded_at = datetime.now(timezone.utc)
new_receipt = Receipt(
list_id=list_id,
filename=webp_filename,
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
)
db.session.add(new_receipt)
db.session.commit()
if (
request.is_json
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
):
url = url_for("uploaded_file", filename=webp_filename)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
flash("Wgrano paragon", "success")
return redirect(request.referrer or url_for("main_page"))
return _receipt_error("Niedozwolony format pliku")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
response.headers["Cache-Control"] = "public, max-age=2592000, immutable"
response.headers.pop("Pragma", None)
response.headers.pop("Content-Disposition", None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers["Content-Type"] = mime
return response
@app.route("/reorder_items", methods=["POST"])
@login_required
def reorder_items():
data = request.get_json()
list_id = data.get("list_id")
order = data.get("order")
for index, item_id in enumerate(order):
item = db.session.get(Item, item_id)
if item and item.list_id == list_id:
item.position = index
db.session.commit()
socketio.emit(
"items_reordered", {"list_id": list_id, "order": order}, to=str(list_id)
)
return jsonify(success=True)
@app.route("/rotate_receipt/<int:receipt_id>")
@login_required
def rotate_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
rotate_receipt_by_id(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
@app.route("/delete_receipt/<int:receipt_id>")
@login_required
def delete_receipt_user(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
list_obj = ShoppingList.query.get_or_404(receipt.list_id)
if not (current_user.is_admin or current_user.id == list_obj.owner_id):
flash("Brak uprawnień do tej operacji", "danger")
return redirect(url_for("main_page"))
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("main_page"))
# OCR
@app.route("/lists/<int:list_id>/analyze", methods=["POST"])
@login_required
def analyze_receipts_for_list(list_id):
receipt_objs = Receipt.query.filter_by(list_id=list_id).all()
existing_expenses = {
e.receipt_filename
for e in Expense.query.filter_by(list_id=list_id).all()
if e.receipt_filename
}
results = []
total = 0.0
for receipt in receipt_objs:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(filepath):
continue
try:
raw_image = Image.open(filepath).convert("RGB")
image = preprocess_image_for_tesseract(raw_image)
value, lines = extract_total_tesseract(image)
except Exception as e:
import traceback
print(f"OCR error for {receipt.filename}:\n{traceback.format_exc()}")
value = 0.0
lines = []
already_added = receipt.filename in existing_expenses
results.append(
{
"id": receipt.id,
"filename": receipt.filename,
"amount": round(value, 2),
"debug_text": lines,
"already_added": already_added,
}
)
if not already_added:
total += value
return jsonify({"results": results, "total": round(total, 2)})
@app.route("/admin")
@login_required
@admin_required
def admin_panel():
now = datetime.now(timezone.utc)
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ""])
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
# obliczenie czy wygasła
if l.is_temporary and l.expires_at:
expires_at = l.expires_at
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=timezone.utc)
is_expired = expires_at < now
else:
is_expired = False
enriched_lists.append(
{
"list": l,
"total_count": total_count,
"purchased_count": purchased_count,
"percent": round(percent),
"comments_count": comments_count,
"receipts_count": len(receipt_files),
"total_expense": l.total_expense,
"expired": is_expired,
}
)
top_products = (
db.session.query(Item.name, func.count(Item.id).label("count"))
.filter(Item.purchased.is_(True))
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_time = datetime.now(timezone.utc)
current_year = current_time.year
current_month = current_time.month
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.scalar()
or 0
)
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == current_year)
.filter(extract("month", Expense.added_at) == current_month)
.scalar()
or 0
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # MB
# Engine info
db_engine = db.engine
db_info = {
"engine": db_engine.name,
"version": getattr(db_engine.dialect, "server_version_info", None),
"url": str(db_engine.url).split("?")[0],
}
# Tabele
inspector = inspect(db_engine)
table_count = len(inspector.get_table_names())
# Rekordy (szybkie zliczenie)
record_total = (
db.session.query(func.count(User.id)).scalar()
+ db.session.query(func.count(ShoppingList.id)).scalar()
+ db.session.query(func.count(Item.id)).scalar()
+ db.session.query(func.count(Receipt.id)).scalar()
+ db.session.query(func.count(Expense.id)).scalar()
)
# Uptime
uptime_minutes = int(
(datetime.now(timezone.utc) - app_start_time).total_seconds() // 60
)
return render_template(
"admin/admin_panel.html",
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
db_info=db_info,
table_count=table_count,
record_total=record_total,
uptime_minutes=uptime_minutes,
)
@app.route("/admin/delete_list/<int:list_id>")
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f"Usunięto listę: {list_to_delete.title}", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/add_user", methods=["POST"])
@login_required
@admin_required
def add_user():
username = request.form["username"].lower()
password = request.form["password"]
if not username or not password:
flash("Wypełnij wszystkie pola", "danger")
return redirect(url_for("list_users"))
if User.query.filter(func.lower(User.username) == username).first():
flash("Użytkownik o takiej nazwie już istnieje", "warning")
return redirect(url_for("list_users"))
hashed_password = generate_password_hash(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash("Dodano nowego użytkownika", "success")
return redirect(url_for("list_users"))
@app.route("/admin/users")
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template(
"admin/user_management.html",
users=users,
user_count=user_count,
list_count=list_count,
item_count=item_count,
activity_log=activity_log,
)
@app.route("/admin/change_password/<int:user_id>", methods=["POST"])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form["password"]
if not new_password:
flash("Podaj nowe hasło", "danger")
return redirect(url_for("list_users"))
user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success")
return redirect(url_for("list_users"))
@app.route("/admin/delete_user/<int:user_id>")
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash("Nie można usunąć ostatniego administratora.", "danger")
return redirect(url_for("list_users"))
db.session.delete(user)
db.session.commit()
flash("Użytkownik usunięty", "success")
return redirect(url_for("list_users"))
@app.route("/admin/receipts/<id>")
@login_required
@admin_required
def admin_receipts(id):
try:
if id == "all":
receipts = Receipt.query.order_by(Receipt.uploaded_at.desc()).all()
else:
list_id = int(id)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for("admin_panel"))
return render_template("admin/receipts.html", receipts=receipts)
@app.route("/admin/rotate_receipt/<int:receipt_id>")
@login_required
@admin_required
def rotate_receipt(receipt_id):
try:
rotate_receipt_by_id(receipt_id)
flash("Obrócono paragon", "success")
except FileNotFoundError:
flash("Plik nie istnieje", "danger")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/delete_receipt/<int:receipt_id>")
@login_required
@admin_required
def delete_receipt(receipt_id):
try:
delete_receipt_by_id(receipt_id)
flash("Paragon usunięty", "success")
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/rename_receipt/<int:receipt_id>")
@login_required
@admin_required
def rename_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
try:
os.rename(old_path, new_path)
receipt.filename = new_filename
db.session.commit()
flash("Zmieniono nazwę pliku", "success")
except Exception as e:
flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/generate_receipt_hash/<int:receipt_id>")
@login_required
@admin_required
def generate_receipt_hash(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
if receipt.file_hash:
flash("Hash już istnieje", "info")
return redirect(request.referrer)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(file_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
try:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
receipt.file_hash = file_hash
db.session.commit()
flash("Hash wygenerowany", "success")
except Exception as e:
flash(f"Błąd przy generowaniu hasha: {e}", "danger")
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist("list_ids")
for list_id in ids:
# lst = ShoppingList.query.get(int(list_id))
lst = db.session.get(ShoppingList, int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash("Usunięto wybrane listy", "success")
return redirect(url_for("admin_panel"))
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
@login_required
@admin_required
def edit_list(list_id):
l = db.session.get(ShoppingList, list_id)
if l is None:
abort(404)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
items = (
db.session.query(Item).filter_by(list_id=list_id).order_by(Item.id.desc()).all()
)
receipts = (
Receipt.query.filter_by(list_id=list_id)
.order_by(Receipt.uploaded_at.desc())
.all()
)
if request.method == "POST":
action = request.form.get("action")
if action == "save":
new_title = request.form.get("title", "").strip()
new_amount_str = request.form.get("amount")
is_archived = "archived" in request.form
is_public = "public" in request.form
is_temporary = "temporary" in request.form
new_owner_id = request.form.get("owner_id")
expires_date = request.form.get("expires_date")
expires_time = request.form.get("expires_time")
if new_title:
l.title = new_title
l.is_archived = is_archived
l.is_public = is_public
l.is_temporary = is_temporary
if expires_date and expires_time:
try:
combined_str = f"{expires_date} {expires_time}"
dt = datetime.strptime(combined_str, "%Y-%m-%d %H:%M")
l.expires_at = dt.replace(tzinfo=timezone.utc)
except ValueError:
flash("Niepoprawna data lub godzina wygasania", "danger")
return redirect(url_for("edit_list", list_id=list_id))
else:
l.expires_at = None
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
user_obj = db.session.get(User, new_owner_id_int)
if user_obj:
l.owner_id = new_owner_id_int
else:
flash("Wybrany użytkownik nie istnieje", "danger")
return redirect(url_for("edit_list", list_id=list_id))
except ValueError:
flash("Niepoprawny ID użytkownika", "danger")
return redirect(url_for("edit_list", list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in expenses:
db.session.delete(expense)
db.session.commit()
db.session.add(Expense(list_id=list_id, amount=new_amount))
except ValueError:
flash("Niepoprawna kwota", "danger")
return redirect(url_for("edit_list", list_id=list_id))
db.session.add(l)
db.session.commit()
flash("Zapisano zmiany listy", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "add_item":
item_name = request.form.get("item_name", "").strip()
quantity_str = request.form.get("quantity", "1")
if not item_name:
flash("Podaj nazwę produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
try:
quantity = max(1, int(quantity_str))
except ValueError:
quantity = 1
db.session.add(
Item(
list_id=list_id,
name=item_name,
quantity=quantity,
added_by=current_user.id,
)
)
exists = (
db.session.query(SuggestedProduct)
.filter(func.lower(SuggestedProduct.name) == item_name.lower())
.first()
)
if not exists:
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
flash("Dodano produkt", "success")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "delete_item":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash("Usunięto produkt", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "toggle_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash("Zmieniono status oznaczenia produktu", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "mark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = True
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Oznaczono produkt jako niekupione", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "unmark_not_purchased":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
item.not_purchased = False
item.not_purchased_reason = None
item.purchased = False
item.purchased_at = None
db.session.commit()
flash("Przywrócono produkt do listy", "success")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
elif action == "edit_quantity":
item = db.session.get(Item, request.form.get("item_id"))
if item and item.list_id == list_id:
try:
new_quantity = int(request.form.get("quantity"))
if new_quantity > 0:
item.quantity = new_quantity
db.session.commit()
flash("Zmieniono ilość produktu", "success")
except ValueError:
flash("Nieprawidłowa ilość", "danger")
else:
flash("Nie znaleziono produktu", "danger")
return redirect(url_for("edit_list", list_id=list_id))
return render_template(
"admin/edit_list.html",
list=l,
total_expense=total_expense,
users=users,
items=items,
receipts=receipts,
upload_folder=app.config["UPLOAD_FOLDER"],
)
@app.route("/admin/products")
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
# users = User.query.all()
users = db.session.query(User).all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
suggestions_dict = {s.name.lower(): s for s in suggestions}
return render_template(
"admin/list_products.html",
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict,
)
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == item.name.lower()
).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify(
{
"success": True,
"message": f"Utworzono sugestię dla produktu: {item.name}",
}
)
else:
return jsonify(
{
"success": True,
"message": f"Sugestia dla produktu „{item.name}” już istnieje.",
}
)
@app.route("/admin/delete_suggestion/<int:suggestion_id>", methods=["POST"])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({"success": True, "message": "Sugestia została usunięta."})
@app.route("/admin/expenses_data")
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({"error": "Brak uprawnień"}), 403
range_type = request.args.get("range", "monthly")
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
labels = []
expenses = []
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
expenses_query = (
db.session.query(
extract("year", Expense.added_at).label("year"),
extract("month", Expense.added_at).label("month"),
func.sum(Expense.amount).label("total"),
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by("year", "month")
.order_by("year", "month")
.all()
)
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = (
"no-store, no-cache, must-revalidate, max-age=0"
)
return response
if range_type == "monthly":
for i in range(11, -1, -1):
year = (now - timedelta(days=i * 30)).year
month = (now - timedelta(days=i * 30)).month
label = f"{month:02d}/{year}"
labels.append(label)
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(extract("month", Expense.added_at) == month)
.scalar()
or 0
)
expenses.append(round(month_sum, 2))
elif range_type == "quarterly":
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i * 90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == "halfyearly":
for i in range(1, -1, -1):
half_start = now - timedelta(days=i * 180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(
(extract("month", Expense.added_at) <= 6)
if half == 1
else (extract("month", Expense.added_at) > 6)
)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == "yearly":
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
@app.route("/admin/promote_user/<int:user_id>")
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/demote_user/<int:user_id>")
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash("Nie możesz zdegradować samego siebie!", "danger")
return redirect(url_for("list_users"))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash(
"Nie można zdegradować. Musi pozostać co najmniej jeden administrator.",
"danger",
)
return redirect(url_for("list_users"))
user.is_admin = False
db.session.commit()
flash(f"Użytkownik {user.username} został zdegradowany.", "success")
return redirect(url_for("list_users"))
@app.route("/admin/crop_receipt", methods=["POST"])
@login_required
@admin_required
def crop_receipt():
receipt_id = request.form.get("receipt_id")
file = request.files.get("cropped_image")
if not receipt_id or not file:
return jsonify(success=False, error="Brak danych")
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
try:
new_filename = generate_new_receipt_filename(receipt.list_id)
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
save_resized_image(file, new_path)
if os.path.exists(old_path):
os.remove(old_path)
receipt.filename = os.path.basename(new_path)
db.session.commit()
return jsonify(success=True)
except Exception as e:
return jsonify(success=False, error=str(e))
@app.route("/admin/recalculate_filesizes")
@login_required
@admin_required
def recalculate_filesizes():
updated = 0
not_found = 0
unchanged = 0
receipts = Receipt.query.all()
for r in receipts:
filepath = os.path.join(app.config["UPLOAD_FOLDER"], r.filename)
if os.path.exists(filepath):
real_size = os.path.getsize(filepath)
if r.filesize != real_size:
r.filesize = real_size
updated += 1
else:
unchanged += 1
else:
not_found += 1
db.session.commit()
flash(
f"Zaktualizowano: {updated}, bez zmian: {unchanged}, brak pliku: {not_found}",
"success",
)
return redirect(url_for("admin_receipts", id="all"))
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
correct_token = app.config.get("HEALTHCHECK_TOKEN")
if header_token != correct_token:
abort(404)
return "OK", 200
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on("delete_item")
def handle_delete_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit("item_deleted", {"item_id": item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("edit_item")
def handle_edit_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
new_name = data["new_name"]
new_quantity = data.get("new_quantity", item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit(
"item_edited",
{"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity},
to=str(item.list_id),
)
@socketio.on("join_list")
def handle_join(data):
global active_users
room = str(data["room"])
username = data.get("username", "Gość")
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
# shopping_list = ShoppingList.query.get(int(data["room"]))
shopping_list = db.session.get(ShoppingList, int(data["room"]))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit("user_joined", {"username": username}, to=room)
emit("user_list", {"users": list(active_users[room])}, to=room)
emit("joined_confirmation", {"room": room, "list_title": list_title})
@socketio.on("disconnect")
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit("user_left", {"username": username}, to=room)
emit("user_list", {"users": list(users)}, to=room)
@socketio.on("add_item")
def handle_add_item(data):
list_id = data["list_id"]
name = data["name"].strip()
quantity = data.get("quantity", 1)
list_obj = db.session.get(ShoppingList, list_id)
if not list_obj:
return
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
existing_item = Item.query.filter(
Item.list_id == list_id,
func.lower(Item.name) == name.lower(),
Item.not_purchased == False,
).first()
if existing_item:
existing_item.quantity += quantity
db.session.commit()
emit(
"item_edited",
{
"item_id": existing_item.id,
"new_name": existing_item.name,
"new_quantity": existing_item.quantity,
},
to=str(list_id),
)
else:
max_position = (
db.session.query(func.max(Item.position))
.filter_by(list_id=list_id)
.scalar()
)
if max_position is None:
max_position = 0
user_id = current_user.id if current_user.is_authenticated else None
user_name = current_user.username if current_user.is_authenticated else "Gość"
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
position=max_position + 1,
added_by=user_id,
)
db.session.add(new_item)
if not SuggestedProduct.query.filter(
func.lower(SuggestedProduct.name) == name.lower()
).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit(
"item_added",
{
"id": new_item.id,
"name": new_item.name,
"quantity": new_item.quantity,
"added_by": user_name,
"added_by_id": user_id,
"owner_id": list_obj.owner_id,
},
to=str(list_id),
include_self=True,
)
purchased_count, total_count, percent = get_progress(list_id)
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(list_id),
)
@socketio.on("check_item")
def handle_check_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = True
# item.purchased_at = datetime.utcnow()
item.purchased_at = datetime.now(UTC)
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_checked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("uncheck_item")
def handle_uncheck_item(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id))
emit(
"progress_updated",
{
"purchased_count": purchased_count,
"total_count": total_count,
"percent": percent,
},
to=str(item.list_id),
)
@socketio.on("request_full_list")
def handle_request_full_list(data):
list_id = data["list_id"]
shopping_list = db.session.get(ShoppingList, list_id)
if not shopping_list:
return
owner_id = shopping_list.owner_id
items = (
Item.query.options(joinedload(Item.added_by_user))
.filter_by(list_id=list_id)
.order_by(Item.position.asc())
.all()
)
items_data = []
for item in items:
items_data.append(
{
"id": item.id,
"name": item.name,
"quantity": item.quantity,
"purchased": item.purchased if not item.not_purchased else False,
"not_purchased": item.not_purchased,
"not_purchased_reason": item.not_purchased_reason,
"note": item.note or "",
"added_by": item.added_by_user.username if item.added_by_user else None,
"added_by_id": item.added_by_user.id if item.added_by_user else None,
"owner_id": owner_id,
}
)
emit("full_list", {"items": items_data}, to=request.sid)
@socketio.on("update_note")
def handle_update_note(data):
item_id = data["item_id"]
note = data["note"]
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id))
@socketio.on("add_expense")
def handle_add_expense(data):
list_id = data["list_id"]
amount = data["amount"]
receipt_filename = data.get("receipt_filename")
if receipt_filename:
existing = Expense.query.filter_by(
list_id=list_id, receipt_filename=receipt_filename
).first()
if existing:
return
new_expense = Expense(
list_id=list_id, amount=amount, receipt_filename=receipt_filename
)
db.session.add(new_expense)
db.session.commit()
total = (
db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar()
or 0
)
emit("expense_added", {"amount": amount, "total": total}, to=str(list_id))
@socketio.on("mark_not_purchased")
def handle_mark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
reason = data.get("reason", "")
if item:
item.not_purchased = True
item.not_purchased_reason = reason
db.session.commit()
emit(
"item_marked_not_purchased",
{"item_id": item.id, "reason": reason},
to=str(item.list_id),
)
@socketio.on("unmark_not_purchased")
def handle_unmark_not_purchased(data):
# item = Item.query.get(data["item_id"])
item = db.session.get(Item, data["item_id"])
if item:
item.not_purchased = False
item.purchased = False
item.purchased_at = None
item.not_purchased_reason = None
db.session.commit()
emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id))
@app.cli.command("create_db")
def create_db():
inspector = inspect(db.engine)
expected_tables = set(db.Model.metadata.tables.keys())
actual_tables = set(inspector.get_table_names())
missing_tables = expected_tables - actual_tables
extra_tables = actual_tables - expected_tables
if missing_tables:
print(f"Brakuje tabel: {', '.join(sorted(missing_tables))}")
if extra_tables:
print(f"Dodatkowe tabele w bazie: {', '.join(sorted(extra_tables))}")
critical_error = False
for table in expected_tables & actual_tables:
expected_columns = set(c.name for c in db.Model.metadata.tables[table].columns)
actual_columns = set(c["name"] for c in inspector.get_columns(table))
missing_cols = expected_columns - actual_columns
extra_cols = actual_columns - expected_columns
if missing_cols:
print(f"Brakuje kolumn w tabeli '{table}': {', '.join(sorted(missing_cols))}")
critical_error = True
if extra_cols:
print(f"Dodatkowe kolumny w tabeli '{table}': {', '.join(sorted(extra_cols))}")
if missing_tables or critical_error:
print("Struktura bazy jest niekompletna lub niezgodna. Przerwano.")
return
if not actual_tables:
db.create_all()
print("Utworzono strukturę bazy danych.")
else:
print("Struktura bazy danych jest poprawna.")
if __name__ == "__main__":
socketio.run(app, host="0.0.0.0", port=8000, debug=True)