1696 lines
52 KiB
Python
1696 lines
52 KiB
Python
import os
|
|
import secrets
|
|
import time
|
|
import mimetypes
|
|
|
|
import sys
|
|
import platform
|
|
import psutil
|
|
|
|
from datetime import datetime, timedelta
|
|
from flask import (
|
|
Flask,
|
|
render_template,
|
|
redirect,
|
|
url_for,
|
|
request,
|
|
flash,
|
|
Blueprint,
|
|
send_from_directory,
|
|
request,
|
|
abort,
|
|
session,
|
|
jsonify,
|
|
make_response,
|
|
)
|
|
from markupsafe import Markup
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import (
|
|
LoginManager,
|
|
UserMixin,
|
|
login_user,
|
|
login_required,
|
|
logout_user,
|
|
current_user,
|
|
)
|
|
from flask_compress import Compress
|
|
from flask_socketio import SocketIO, emit, join_room
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from config import Config
|
|
from PIL import Image
|
|
from werkzeug.utils import secure_filename
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from sqlalchemy import func, extract
|
|
from collections import defaultdict, deque
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(Config)
|
|
app.config["COMPRESS_ALGORITHM"] = ["zstd", "br", "gzip", "deflate"]
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
|
|
|
|
SYSTEM_PASSWORD = app.config.get("SYSTEM_PASSWORD", "changeme")
|
|
DEFAULT_ADMIN_USERNAME = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
|
|
DEFAULT_ADMIN_PASSWORD = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
|
|
UPLOAD_FOLDER = app.config.get("UPLOAD_FOLDER", "uploads")
|
|
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif", "webp"}
|
|
AUTHORIZED_COOKIE_VALUE = app.config.get("AUTHORIZED_COOKIE_VALUE", "80d31cdfe63539c9")
|
|
AUTH_COOKIE_MAX_AGE = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
|
|
HEALTHCHECK_TOKEN = app.config.get("HEALTHCHECK_TOKEN", "alamapsaikota1234")
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
failed_login_attempts = defaultdict(deque)
|
|
MAX_ATTEMPTS = 10
|
|
TIME_WINDOW = 60 * 60
|
|
|
|
db = SQLAlchemy(app)
|
|
socketio = SocketIO(app, async_mode="eventlet")
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "login"
|
|
|
|
# flask-compress
|
|
compress = Compress()
|
|
compress.init_app(app)
|
|
|
|
static_bp = Blueprint("static_bp", __name__)
|
|
|
|
# dla live
|
|
active_users = {}
|
|
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(150), nullable=False)
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
|
|
|
|
class ShoppingList(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
title = db.Column(db.String(150), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
owner_id = db.Column(db.Integer, db.ForeignKey("user.id"))
|
|
is_temporary = db.Column(db.Boolean, default=False)
|
|
share_token = db.Column(db.String(64), unique=True, nullable=True)
|
|
expires_at = db.Column(db.DateTime, nullable=True)
|
|
owner = db.relationship("User", backref="lists", lazy=True)
|
|
is_archived = db.Column(db.Boolean, default=False)
|
|
is_public = db.Column(db.Boolean, default=True)
|
|
|
|
|
|
class Item(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
|
|
name = db.Column(db.String(150), nullable=False)
|
|
added_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
added_by = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=True)
|
|
purchased = db.Column(db.Boolean, default=False)
|
|
purchased_at = db.Column(db.DateTime, nullable=True)
|
|
quantity = db.Column(db.Integer, default=1)
|
|
note = db.Column(db.Text, nullable=True)
|
|
not_purchased = db.Column(db.Boolean, default=False)
|
|
not_purchased_reason = db.Column(db.Text, nullable=True)
|
|
|
|
|
|
class SuggestedProduct(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
name = db.Column(db.String(150), unique=True, nullable=False)
|
|
usage_count = db.Column(db.Integer, default=0)
|
|
|
|
|
|
class Expense(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
list_id = db.Column(db.Integer, db.ForeignKey("shopping_list.id"))
|
|
amount = db.Column(db.Float, nullable=False)
|
|
added_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
receipt_filename = db.Column(db.String(255), nullable=True)
|
|
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
from werkzeug.security import generate_password_hash
|
|
|
|
admin = User.query.filter_by(is_admin=True).first()
|
|
username = app.config.get("DEFAULT_ADMIN_USERNAME", "admin")
|
|
password = app.config.get("DEFAULT_ADMIN_PASSWORD", "admin123")
|
|
password_hash = generate_password_hash(password)
|
|
if admin:
|
|
if admin.username != username or not check_password_hash(
|
|
admin.password_hash, password
|
|
):
|
|
admin.username = username
|
|
admin.password_hash = password_hash
|
|
db.session.commit()
|
|
else:
|
|
admin = User(username=username, password_hash=password_hash, is_admin=True)
|
|
db.session.add(admin)
|
|
db.session.commit()
|
|
|
|
|
|
@static_bp.route("/static/js/<path:filename>")
|
|
def serve_js(filename):
|
|
response = send_from_directory("static/js", filename)
|
|
response.cache_control.no_cache = True
|
|
response.cache_control.no_store = True
|
|
response.cache_control.must_revalidate = True
|
|
# response.expires = 0
|
|
response.pragma = "no-cache"
|
|
response.headers.pop("Content-Disposition", None)
|
|
response.headers.pop("Etag", None)
|
|
return response
|
|
|
|
|
|
@static_bp.route("/static/css/<path:filename>")
|
|
def serve_css(filename):
|
|
response = send_from_directory("static/css", filename)
|
|
response.headers["Cache-Control"] = "public, max-age=3600"
|
|
response.headers.pop("Content-Disposition", None)
|
|
response.headers.pop("Etag", None)
|
|
return response
|
|
|
|
|
|
@static_bp.route("/static/lib/js/<path:filename>")
|
|
def serve_js_lib(filename):
|
|
response = send_from_directory("static/lib/js", filename)
|
|
response.headers["Cache-Control"] = "public, max-age=604800"
|
|
response.headers.pop("Content-Disposition", None)
|
|
response.headers.pop("Etag", None)
|
|
return response
|
|
|
|
|
|
# CSS z cache na tydzień
|
|
@static_bp.route("/static/lib/css/<path:filename>")
|
|
def serve_css_lib(filename):
|
|
response = send_from_directory("static/lib/css", filename)
|
|
response.headers["Cache-Control"] = "public, max-age=604800"
|
|
response.headers.pop("Content-Disposition", None)
|
|
response.headers.pop("Etag", None)
|
|
return response
|
|
|
|
|
|
app.register_blueprint(static_bp)
|
|
|
|
|
|
def allowed_file(filename):
|
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
def get_list_details(list_id):
|
|
shopping_list = ShoppingList.query.get_or_404(list_id)
|
|
items = Item.query.filter_by(list_id=list_id).all()
|
|
receipt_pattern = f"list_{list_id}"
|
|
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
|
|
receipt_files = [f for f in all_files if receipt_pattern in f]
|
|
expenses = Expense.query.filter_by(list_id=list_id).all()
|
|
total_expense = sum(e.amount for e in expenses)
|
|
return shopping_list, items, receipt_files, expenses, total_expense
|
|
|
|
|
|
def generate_share_token(length=8):
|
|
"""Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4)."""
|
|
return secrets.token_hex(length // 2)
|
|
|
|
|
|
def check_list_public(shopping_list):
|
|
if not shopping_list.is_public:
|
|
flash("Ta lista nie jest publicznie dostępna", "danger")
|
|
return False
|
|
return True
|
|
|
|
|
|
def enrich_list_data(l):
|
|
items = Item.query.filter_by(list_id=l.id).all()
|
|
l.total_count = len(items)
|
|
l.purchased_count = len([i for i in items if i.purchased])
|
|
expenses = Expense.query.filter_by(list_id=l.id).all()
|
|
l.total_expense = sum(e.amount for e in expenses)
|
|
return l
|
|
|
|
|
|
def save_resized_image(file, path: str, max_size=(2000, 2000)):
|
|
img = Image.open(file)
|
|
img.thumbnail(max_size)
|
|
img.save(path)
|
|
|
|
|
|
def redirect_with_flash(
|
|
message: str, category: str = "info", endpoint: str = "main_page"
|
|
):
|
|
flash(message, category)
|
|
return redirect(url_for(endpoint))
|
|
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if not current_user.is_authenticated or not current_user.is_admin:
|
|
return redirect_with_flash("Brak uprawnień do tej sekcji.", "danger")
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
|
|
def get_progress(list_id):
|
|
items = Item.query.filter_by(list_id=list_id).all()
|
|
total_count = len(items)
|
|
purchased_count = len([i for i in items if i.purchased])
|
|
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
|
return purchased_count, total_count, percent
|
|
|
|
|
|
def delete_receipts_for_list(list_id):
|
|
receipt_pattern = f"list_{list_id}_"
|
|
upload_folder = app.config["UPLOAD_FOLDER"]
|
|
for filename in os.listdir(upload_folder):
|
|
if filename.startswith(receipt_pattern):
|
|
try:
|
|
os.remove(os.path.join(upload_folder, filename))
|
|
except Exception as e:
|
|
print(f"Nie udało się usunąć pliku {filename}: {e}")
|
|
|
|
|
|
# zabezpieczenie logowani do systemy - błędne hasła
|
|
def is_ip_blocked(ip):
|
|
now = time.time()
|
|
attempts = failed_login_attempts[ip]
|
|
while attempts and now - attempts[0] > TIME_WINDOW:
|
|
attempts.popleft()
|
|
return len(attempts) >= MAX_ATTEMPTS
|
|
|
|
|
|
def register_failed_attempt(ip):
|
|
now = time.time()
|
|
attempts = failed_login_attempts[ip]
|
|
while attempts and now - attempts[0] > TIME_WINDOW:
|
|
attempts.popleft()
|
|
attempts.append(now)
|
|
|
|
|
|
def reset_failed_attempts(ip):
|
|
failed_login_attempts[ip].clear()
|
|
|
|
|
|
def attempts_remaining(ip):
|
|
attempts = failed_login_attempts[ip]
|
|
return max(0, MAX_ATTEMPTS - len(attempts))
|
|
|
|
|
|
####################################################
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
|
|
@app.context_processor
|
|
def inject_time():
|
|
return dict(time=time)
|
|
|
|
|
|
@app.context_processor
|
|
def inject_has_authorized_cookie():
|
|
return {"has_authorized_cookie": "authorized" in request.cookies}
|
|
|
|
|
|
@app.context_processor
|
|
def inject_is_blocked():
|
|
ip = request.access_route[0]
|
|
return {"is_blocked": is_ip_blocked(ip)}
|
|
|
|
|
|
@app.before_request
|
|
def require_system_password():
|
|
endpoint = request.endpoint
|
|
|
|
# Wyjątki: lib js/css zawsze przepuszczamy
|
|
if endpoint in ("static_bp.serve_js_lib", "static_bp.serve_css_lib"):
|
|
return
|
|
|
|
ip = request.access_route[0]
|
|
if is_ip_blocked(ip):
|
|
abort(403)
|
|
|
|
if endpoint is None:
|
|
return
|
|
|
|
if endpoint in ("system_auth", "healthcheck"):
|
|
return
|
|
|
|
if (
|
|
"authorized" not in request.cookies
|
|
and not endpoint.startswith("login")
|
|
and endpoint != "favicon"
|
|
):
|
|
|
|
# Dla serve_js przepuszczamy tylko toasts.js
|
|
if endpoint == "static_bp.serve_js":
|
|
requested_file = request.view_args.get("filename", "")
|
|
if requested_file == "toasts.js":
|
|
return
|
|
if requested_file.endswith(".js"):
|
|
return redirect(url_for("system_auth", next=request.url))
|
|
return
|
|
|
|
# Blokujemy pozostałe static_bp
|
|
if endpoint.startswith("static_bp."):
|
|
return
|
|
|
|
if request.path == "/":
|
|
return redirect(url_for("system_auth"))
|
|
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
parsed = urlparse(request.url)
|
|
fixed_url = urlunparse(parsed._replace(netloc=request.host))
|
|
return redirect(url_for("system_auth", next=fixed_url))
|
|
|
|
|
|
@app.template_filter("filemtime")
|
|
def file_mtime_filter(path):
|
|
try:
|
|
t = os.path.getmtime(path)
|
|
return datetime.fromtimestamp(t)
|
|
except Exception:
|
|
return datetime.utcnow()
|
|
|
|
|
|
@app.template_filter("filesizeformat")
|
|
def filesizeformat_filter(path):
|
|
try:
|
|
size = os.path.getsize(path)
|
|
for unit in ["B", "KB", "MB", "GB"]:
|
|
if size < 1024.0:
|
|
return f"{size:.1f} {unit}"
|
|
size /= 1024.0
|
|
return f"{size:.1f} TB"
|
|
except Exception:
|
|
return "N/A"
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return (
|
|
render_template(
|
|
"errors.html",
|
|
code=404,
|
|
title="Strona nie znaleziona",
|
|
message="Ups! Podana strona nie istnieje lub została przeniesiona.",
|
|
),
|
|
404,
|
|
)
|
|
|
|
|
|
@app.errorhandler(403)
|
|
def forbidden(e):
|
|
return (
|
|
render_template(
|
|
"errors.html",
|
|
code=403,
|
|
title="Brak dostępu",
|
|
message="Nie masz uprawnień do wyświetlenia tej strony.",
|
|
),
|
|
403,
|
|
)
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon_ico():
|
|
return redirect(url_for("static", filename="favicon.svg"))
|
|
|
|
|
|
@app.route("/favicon.svg")
|
|
def favicon():
|
|
svg = """
|
|
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
|
|
<text y="14" font-size="16">🛒</text>
|
|
</svg>
|
|
"""
|
|
return svg, 200, {"Content-Type": "image/svg+xml"}
|
|
|
|
|
|
@app.route("/")
|
|
def main_page():
|
|
now = datetime.utcnow()
|
|
|
|
if current_user.is_authenticated:
|
|
user_lists = (
|
|
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False)
|
|
.filter((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now))
|
|
.order_by(ShoppingList.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
archived_lists = (
|
|
ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True)
|
|
.order_by(ShoppingList.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
public_lists = (
|
|
ShoppingList.query.filter(
|
|
ShoppingList.is_public == True,
|
|
ShoppingList.owner_id != current_user.id,
|
|
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
|
|
ShoppingList.is_archived == False,
|
|
)
|
|
.order_by(ShoppingList.created_at.desc())
|
|
.all()
|
|
)
|
|
else:
|
|
user_lists = []
|
|
archived_lists = []
|
|
public_lists = (
|
|
ShoppingList.query.filter(
|
|
ShoppingList.is_public == True,
|
|
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
|
|
ShoppingList.is_archived == False,
|
|
)
|
|
.order_by(ShoppingList.created_at.desc())
|
|
.all()
|
|
)
|
|
|
|
for l in user_lists + public_lists + archived_lists:
|
|
enrich_list_data(l)
|
|
|
|
return render_template(
|
|
"main.html",
|
|
user_lists=user_lists,
|
|
public_lists=public_lists,
|
|
archived_lists=archived_lists,
|
|
)
|
|
|
|
|
|
@app.route("/system-auth", methods=["GET", "POST"])
|
|
def system_auth():
|
|
if (
|
|
current_user.is_authenticated
|
|
or request.cookies.get("authorized") == AUTHORIZED_COOKIE_VALUE
|
|
):
|
|
flash("Jesteś już zalogowany lub autoryzowany.", "info")
|
|
return redirect(url_for("main_page"))
|
|
|
|
ip = request.access_route[0]
|
|
next_page = request.args.get("next") or url_for("main_page")
|
|
|
|
if is_ip_blocked(ip):
|
|
flash(
|
|
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
|
|
"danger",
|
|
)
|
|
return render_template("system_auth.html"), 403
|
|
|
|
if request.method == "POST":
|
|
if request.form["password"] == SYSTEM_PASSWORD:
|
|
reset_failed_attempts(ip)
|
|
resp = redirect(next_page)
|
|
max_age = app.config.get("AUTH_COOKIE_MAX_AGE", 86400)
|
|
resp.set_cookie("authorized", AUTHORIZED_COOKIE_VALUE, max_age=max_age)
|
|
return resp
|
|
else:
|
|
register_failed_attempt(ip)
|
|
if is_ip_blocked(ip):
|
|
flash(
|
|
"Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.",
|
|
"danger",
|
|
)
|
|
return render_template("system_auth.html"), 403
|
|
remaining = attempts_remaining(ip)
|
|
flash(f"Nieprawidłowe hasło. Pozostało {remaining} prób.", "warning")
|
|
return render_template("system_auth.html")
|
|
|
|
|
|
@app.route("/toggle_archive_list/<int:list_id>")
|
|
@login_required
|
|
def toggle_archive_list(list_id):
|
|
l = ShoppingList.query.get_or_404(list_id)
|
|
if l.owner_id != current_user.id:
|
|
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
|
|
|
|
archive = request.args.get("archive", "true").lower() == "true"
|
|
|
|
if archive:
|
|
l.is_archived = True
|
|
flash(f"Lista „{l.title}” została zarchiwizowana.", "success")
|
|
else:
|
|
l.is_archived = False
|
|
flash(f"Lista „{l.title}” została przywrócona.", "success")
|
|
|
|
db.session.commit()
|
|
return redirect(url_for("main_page"))
|
|
|
|
|
|
@app.route("/edit_my_list/<int:list_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edit_my_list(list_id):
|
|
l = ShoppingList.query.get_or_404(list_id)
|
|
if l.owner_id != current_user.id:
|
|
return redirect_with_flash("Nie masz uprawnień do tej listy", "danger")
|
|
|
|
if request.method == "POST":
|
|
new_title = request.form.get("title")
|
|
if new_title and new_title.strip():
|
|
l.title = new_title.strip()
|
|
db.session.commit()
|
|
flash("Zaktualizowano tytuł listy", "success")
|
|
return redirect(url_for("main_page"))
|
|
else:
|
|
flash("Podaj poprawny tytuł", "danger")
|
|
return render_template("edit_my_list.html", list=l)
|
|
|
|
|
|
@app.route("/toggle_visibility/<int:list_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def toggle_visibility(list_id):
|
|
l = ShoppingList.query.get_or_404(list_id)
|
|
if l.owner_id != current_user.id:
|
|
if request.is_json or request.method == "POST":
|
|
return {"error": "Unauthorized"}, 403
|
|
flash("Nie masz uprawnień do tej listy", "danger")
|
|
return redirect(url_for("main_page"))
|
|
|
|
l.is_public = not l.is_public
|
|
db.session.commit()
|
|
|
|
share_url = f"{request.url_root}share/{l.share_token}"
|
|
|
|
if request.is_json or request.method == "POST":
|
|
return {"is_public": l.is_public, "share_url": share_url}
|
|
|
|
if l.is_public:
|
|
flash("Lista została udostępniona publicznie", "success")
|
|
else:
|
|
flash("Lista została ukryta przed gośćmi", "info")
|
|
|
|
return redirect(url_for("main_page"))
|
|
|
|
|
|
from sqlalchemy import func
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if request.method == "POST":
|
|
username_input = request.form["username"].lower()
|
|
user = User.query.filter(func.lower(User.username) == username_input).first()
|
|
if user and check_password_hash(user.password_hash, request.form["password"]):
|
|
login_user(user)
|
|
flash("Zalogowano pomyślnie", "success")
|
|
return redirect(url_for("main_page"))
|
|
flash("Nieprawidłowy login lub hasło", "danger")
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash("Wylogowano pomyślnie", "success")
|
|
return redirect(url_for("main_page"))
|
|
|
|
|
|
@app.route("/create", methods=["POST"])
|
|
@login_required
|
|
def create_list():
|
|
title = request.form.get("title")
|
|
is_temporary = "temporary" in request.form
|
|
token = generate_share_token(8)
|
|
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
|
|
new_list = ShoppingList(
|
|
title=title,
|
|
owner_id=current_user.id,
|
|
is_temporary=is_temporary,
|
|
share_token=token,
|
|
expires_at=expires_at,
|
|
)
|
|
db.session.add(new_list)
|
|
db.session.commit()
|
|
flash("Utworzono nową listę", "success")
|
|
return redirect(url_for("view_list", list_id=new_list.id))
|
|
|
|
|
|
@app.route("/list/<int:list_id>")
|
|
@login_required
|
|
def view_list(list_id):
|
|
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
|
|
list_id
|
|
)
|
|
total_count = len(items)
|
|
purchased_count = len([i for i in items if i.purchased])
|
|
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
|
|
|
return render_template(
|
|
"list.html",
|
|
list=shopping_list,
|
|
items=items,
|
|
receipt_files=receipt_files,
|
|
total_count=total_count,
|
|
purchased_count=purchased_count,
|
|
percent=percent,
|
|
expenses=expenses,
|
|
total_expense=total_expense,
|
|
)
|
|
|
|
|
|
@app.route("/share/<token>")
|
|
@app.route("/guest-list/<int:list_id>")
|
|
def shared_list(token=None, list_id=None):
|
|
if token:
|
|
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
|
|
|
|
if not check_list_public(shopping_list):
|
|
return redirect(url_for("main_page"))
|
|
|
|
list_id = shopping_list.id
|
|
|
|
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(
|
|
list_id
|
|
)
|
|
|
|
return render_template(
|
|
"list_share.html",
|
|
list=shopping_list,
|
|
items=items,
|
|
receipt_files=receipt_files,
|
|
expenses=expenses,
|
|
total_expense=total_expense,
|
|
)
|
|
|
|
|
|
@app.route("/copy/<int:list_id>")
|
|
@login_required
|
|
def copy_list(list_id):
|
|
original = ShoppingList.query.get_or_404(list_id)
|
|
token = generate_share_token(8)
|
|
new_list = ShoppingList(
|
|
title=original.title + " (Kopia)", owner_id=current_user.id, share_token=token
|
|
)
|
|
db.session.add(new_list)
|
|
db.session.commit()
|
|
original_items = Item.query.filter_by(list_id=original.id).all()
|
|
for item in original_items:
|
|
copy_item = Item(list_id=new_list.id, name=item.name)
|
|
db.session.add(copy_item)
|
|
db.session.commit()
|
|
flash("Skopiowano listę", "success")
|
|
return redirect(url_for("view_list", list_id=new_list.id))
|
|
|
|
|
|
@app.route("/suggest_products")
|
|
def suggest_products():
|
|
query = request.args.get("q", "")
|
|
suggestions = []
|
|
if query:
|
|
suggestions = (
|
|
SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
|
|
.limit(5)
|
|
.all()
|
|
)
|
|
return {"suggestions": [s.name for s in suggestions]}
|
|
|
|
|
|
@app.route("/all_products")
|
|
def all_products():
|
|
query = request.args.get("q", "")
|
|
|
|
top_products_query = SuggestedProduct.query
|
|
if query:
|
|
top_products_query = top_products_query.filter(
|
|
SuggestedProduct.name.ilike(f"%{query}%")
|
|
)
|
|
top_products = (
|
|
top_products_query.order_by(
|
|
SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc()
|
|
)
|
|
.limit(20)
|
|
.all()
|
|
)
|
|
|
|
top_names = [s.name for s in top_products]
|
|
rest_query = SuggestedProduct.query
|
|
if query:
|
|
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f"%{query}%"))
|
|
|
|
if top_names:
|
|
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
|
|
|
|
rest_products = rest_query.order_by(SuggestedProduct.name.asc()).limit(200).all()
|
|
|
|
all_names = top_names + [s.name for s in rest_products]
|
|
|
|
return {"allproducts": all_names}
|
|
|
|
|
|
""" @app.route('/upload_receipt/<int:list_id>', methods=['POST'])
|
|
def upload_receipt(list_id):
|
|
if 'receipt' not in request.files:
|
|
flash('Brak pliku', 'danger')
|
|
return redirect(request.referrer)
|
|
|
|
file = request.files['receipt']
|
|
|
|
if file.filename == '':
|
|
flash('Nie wybrano pliku', 'danger')
|
|
return redirect(request.referrer)
|
|
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
|
|
|
|
save_resized_image(file, file_path)
|
|
|
|
flash('Wgrano paragon', 'success')
|
|
return redirect(request.referrer)
|
|
|
|
flash('Niedozwolony format pliku', 'danger')
|
|
return redirect(request.referrer) """
|
|
|
|
|
|
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
|
|
def upload_receipt(list_id):
|
|
if "receipt" not in request.files:
|
|
if (
|
|
request.is_json
|
|
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
):
|
|
return jsonify({"success": False, "message": "Brak pliku"}), 400
|
|
flash("Brak pliku", "danger")
|
|
return redirect(request.referrer)
|
|
|
|
file = request.files["receipt"]
|
|
|
|
if file.filename == "":
|
|
if (
|
|
request.is_json
|
|
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
):
|
|
return jsonify({"success": False, "message": "Nie wybrano pliku"}), 400
|
|
flash("Nie wybrano pliku", "danger")
|
|
return redirect(request.referrer)
|
|
|
|
if file and allowed_file(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
full_filename = f"list_{list_id}_{filename}"
|
|
file_path = os.path.join(app.config["UPLOAD_FOLDER"], full_filename)
|
|
|
|
save_resized_image(file, file_path)
|
|
|
|
if (
|
|
request.is_json
|
|
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
|
|
):
|
|
url = url_for("uploaded_file", filename=full_filename)
|
|
|
|
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
|
|
|
|
return jsonify({"success": True, "url": url})
|
|
|
|
flash("Wgrano paragon", "success")
|
|
return redirect(request.referrer)
|
|
|
|
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
|
|
return jsonify({"success": False, "message": "Niedozwolony format pliku"}), 400
|
|
flash("Niedozwolony format pliku", "danger")
|
|
return redirect(request.referrer)
|
|
|
|
|
|
@app.route("/uploads/<filename>")
|
|
def uploaded_file(filename):
|
|
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
|
|
response.headers["Cache-Control"] = "public, max-age=2592000, immutable"
|
|
response.headers.pop("Pragma", None)
|
|
response.headers.pop("Content-Disposition", None)
|
|
mime, _ = mimetypes.guess_type(filename)
|
|
if mime:
|
|
response.headers["Content-Type"] = mime
|
|
return response
|
|
|
|
|
|
@app.route("/admin")
|
|
@login_required
|
|
@admin_required
|
|
def admin_panel():
|
|
|
|
now = datetime.utcnow()
|
|
user_count = User.query.count()
|
|
list_count = ShoppingList.query.count()
|
|
item_count = Item.query.count()
|
|
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
|
|
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
|
|
|
|
enriched_lists = []
|
|
for l in all_lists:
|
|
enrich_list_data(l)
|
|
items = Item.query.filter_by(list_id=l.id).all()
|
|
total_count = l.total_count
|
|
purchased_count = l.purchased_count
|
|
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
|
comments_count = len([i for i in items if i.note and i.note.strip() != ""])
|
|
receipt_pattern = f"list_{l.id}"
|
|
receipt_files = [f for f in all_files if receipt_pattern in f]
|
|
|
|
enriched_lists.append(
|
|
{
|
|
"list": l,
|
|
"total_count": total_count,
|
|
"purchased_count": purchased_count,
|
|
"percent": round(percent),
|
|
"comments_count": comments_count,
|
|
"receipts_count": len(receipt_files),
|
|
"total_expense": l.total_expense,
|
|
}
|
|
)
|
|
|
|
top_products = (
|
|
db.session.query(Item.name, func.count(Item.id).label("count"))
|
|
.filter(Item.purchased == True)
|
|
.group_by(Item.name)
|
|
.order_by(func.count(Item.id).desc())
|
|
.limit(5)
|
|
.all()
|
|
)
|
|
|
|
purchased_items_count = Item.query.filter_by(purchased=True).count()
|
|
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
|
|
|
|
current_year = datetime.utcnow().year
|
|
year_expense_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == current_year)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
|
|
current_month = datetime.utcnow().month
|
|
month_expense_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == current_year)
|
|
.filter(extract("month", Expense.added_at) == current_month)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
|
|
process = psutil.Process(os.getpid())
|
|
app_mem = process.memory_info().rss // (1024 * 1024) # MB
|
|
|
|
return render_template(
|
|
"admin/admin_panel.html",
|
|
user_count=user_count,
|
|
list_count=list_count,
|
|
item_count=item_count,
|
|
purchased_items_count=purchased_items_count,
|
|
enriched_lists=enriched_lists,
|
|
top_products=top_products,
|
|
total_expense_sum=total_expense_sum,
|
|
year_expense_sum=year_expense_sum,
|
|
month_expense_sum=month_expense_sum,
|
|
now=now,
|
|
python_version=sys.version,
|
|
system_info=platform.platform(),
|
|
app_memory=f"{app_mem} MB",
|
|
)
|
|
|
|
|
|
@app.route("/admin/delete_list/<int:list_id>")
|
|
@login_required
|
|
@admin_required
|
|
def delete_list(list_id):
|
|
|
|
delete_receipts_for_list(list_id)
|
|
list_to_delete = ShoppingList.query.get_or_404(list_id)
|
|
Item.query.filter_by(list_id=list_to_delete.id).delete()
|
|
Expense.query.filter_by(list_id=list_to_delete.id).delete()
|
|
db.session.delete(list_to_delete)
|
|
db.session.commit()
|
|
flash(f"Usunięto listę: {list_to_delete.title}", "success")
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
|
|
@app.route("/admin/add_user", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def add_user():
|
|
username = request.form["username"].lower()
|
|
password = request.form["password"]
|
|
|
|
if not username or not password:
|
|
flash("Wypełnij wszystkie pola", "danger")
|
|
return redirect(url_for("list_users"))
|
|
|
|
if User.query.filter(func.lower(User.username) == username).first():
|
|
flash("Użytkownik o takiej nazwie już istnieje", "warning")
|
|
return redirect(url_for("list_users"))
|
|
|
|
hashed_password = generate_password_hash(password)
|
|
new_user = User(username=username, password_hash=hashed_password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash("Dodano nowego użytkownika", "success")
|
|
return redirect(url_for("list_users"))
|
|
|
|
|
|
@app.route("/admin/users")
|
|
@login_required
|
|
@admin_required
|
|
def list_users():
|
|
users = User.query.all()
|
|
user_count = User.query.count()
|
|
list_count = ShoppingList.query.count()
|
|
item_count = Item.query.count()
|
|
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
|
|
return render_template(
|
|
"admin/user_management.html",
|
|
users=users,
|
|
user_count=user_count,
|
|
list_count=list_count,
|
|
item_count=item_count,
|
|
activity_log=activity_log,
|
|
)
|
|
|
|
|
|
@app.route("/admin/change_password/<int:user_id>", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def reset_password(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
new_password = request.form["password"]
|
|
|
|
if not new_password:
|
|
flash("Podaj nowe hasło", "danger")
|
|
return redirect(url_for("list_users"))
|
|
|
|
user.password_hash = generate_password_hash(new_password)
|
|
db.session.commit()
|
|
flash(f"Hasło dla użytkownika {user.username} zostało zaktualizowane", "success")
|
|
return redirect(url_for("list_users"))
|
|
|
|
|
|
@app.route("/admin/delete_user/<int:user_id>")
|
|
@login_required
|
|
@admin_required
|
|
def delete_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if user.is_admin:
|
|
admin_count = User.query.filter_by(is_admin=True).count()
|
|
if admin_count <= 1:
|
|
flash("Nie można usunąć ostatniego administratora.", "danger")
|
|
return redirect(url_for("list_users"))
|
|
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash("Użytkownik usunięty", "success")
|
|
return redirect(url_for("list_users"))
|
|
|
|
|
|
@app.route("/admin/receipts/<id>")
|
|
@login_required
|
|
@admin_required
|
|
def admin_receipts(id):
|
|
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
|
|
image_files = [f for f in all_files if allowed_file(f)]
|
|
|
|
if id == "all":
|
|
filtered_files = image_files
|
|
else:
|
|
try:
|
|
list_id = int(id)
|
|
receipt_prefix = f"list_{list_id}_"
|
|
filtered_files = [f for f in image_files if f.startswith(receipt_prefix)]
|
|
except ValueError:
|
|
flash("Nieprawidłowe ID listy.", "danger")
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
return render_template(
|
|
"admin/receipts.html",
|
|
image_files=filtered_files,
|
|
upload_folder=app.config["UPLOAD_FOLDER"],
|
|
)
|
|
|
|
|
|
@app.route("/admin/delete_receipt/<filename>")
|
|
@login_required
|
|
@admin_required
|
|
def delete_receipt(filename):
|
|
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
flash("Plik usunięty", "success")
|
|
else:
|
|
flash("Plik nie istnieje", "danger")
|
|
|
|
next_url = request.args.get("next")
|
|
if next_url:
|
|
return redirect(next_url)
|
|
return redirect(url_for("admin_receipts"))
|
|
|
|
|
|
@app.route("/admin/delete_selected_lists", methods=["POST"])
|
|
@login_required
|
|
@admin_required
|
|
def delete_selected_lists():
|
|
ids = request.form.getlist("list_ids")
|
|
for list_id in ids:
|
|
lst = ShoppingList.query.get(int(list_id))
|
|
if lst:
|
|
delete_receipts_for_list(lst.id)
|
|
Item.query.filter_by(list_id=lst.id).delete()
|
|
Expense.query.filter_by(list_id=lst.id).delete()
|
|
db.session.delete(lst)
|
|
db.session.commit()
|
|
flash("Usunięto wybrane listy", "success")
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
|
|
@app.route("/admin/delete_all_items")
|
|
@login_required
|
|
@admin_required
|
|
def delete_all_items():
|
|
Item.query.delete()
|
|
db.session.commit()
|
|
flash("Usunięto wszystkie produkty", "success")
|
|
return redirect(url_for("admin_panel"))
|
|
|
|
|
|
@app.route("/admin/edit_list/<int:list_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
@admin_required
|
|
def edit_list(list_id):
|
|
l = ShoppingList.query.get_or_404(list_id)
|
|
expenses = Expense.query.filter_by(list_id=list_id).all()
|
|
total_expense = sum(e.amount for e in expenses)
|
|
users = User.query.all()
|
|
items = Item.query.filter_by(list_id=list_id).order_by(Item.id.desc()).all()
|
|
|
|
# Pobranie listy plików paragonów
|
|
receipt_pattern = f"list_{list_id}_"
|
|
all_files = os.listdir(app.config["UPLOAD_FOLDER"])
|
|
receipts = [f for f in all_files if f.startswith(receipt_pattern)]
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action")
|
|
|
|
if action == "save":
|
|
new_title = request.form.get("title", "").strip()
|
|
new_amount_str = request.form.get("amount")
|
|
is_archived = "archived" in request.form
|
|
is_public = "public" in request.form
|
|
new_owner_id = request.form.get("owner_id")
|
|
|
|
if new_title:
|
|
l.title = new_title
|
|
|
|
l.is_archived = is_archived
|
|
l.is_public = is_public
|
|
|
|
if new_owner_id:
|
|
try:
|
|
new_owner_id_int = int(new_owner_id)
|
|
if User.query.get(new_owner_id_int):
|
|
l.owner_id = new_owner_id_int
|
|
else:
|
|
flash("Wybrany użytkownik nie istnieje", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
except ValueError:
|
|
flash("Niepoprawny ID użytkownika", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
if new_amount_str:
|
|
try:
|
|
new_amount = float(new_amount_str)
|
|
for expense in expenses:
|
|
db.session.delete(expense)
|
|
db.session.commit()
|
|
new_expense = Expense(list_id=list_id, amount=new_amount)
|
|
db.session.add(new_expense)
|
|
db.session.commit()
|
|
except ValueError:
|
|
flash("Niepoprawna kwota", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
db.session.commit()
|
|
flash("Zapisano zmiany listy", "success")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
elif action == "add_item":
|
|
item_name = request.form.get("item_name", "").strip()
|
|
quantity_str = request.form.get("quantity", "1")
|
|
if not item_name:
|
|
flash("Podaj nazwę produktu", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
try:
|
|
quantity = int(quantity_str)
|
|
if quantity < 1:
|
|
quantity = 1
|
|
except ValueError:
|
|
quantity = 1
|
|
|
|
new_item = Item(
|
|
list_id=list_id,
|
|
name=item_name,
|
|
quantity=quantity,
|
|
added_by=current_user.id,
|
|
)
|
|
db.session.add(new_item)
|
|
|
|
if not SuggestedProduct.query.filter(
|
|
func.lower(SuggestedProduct.name) == item_name.lower()
|
|
).first():
|
|
db.session.add(SuggestedProduct(name=item_name))
|
|
|
|
db.session.commit()
|
|
flash("Dodano produkt", "success")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
elif action == "delete_item":
|
|
item_id = request.form.get("item_id")
|
|
item = Item.query.get(item_id)
|
|
if item and item.list_id == list_id:
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
flash("Usunięto produkt", "success")
|
|
else:
|
|
flash("Nie znaleziono produktu", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
elif action == "toggle_purchased":
|
|
item_id = request.form.get("item_id")
|
|
item = Item.query.get(item_id)
|
|
if item and item.list_id == list_id:
|
|
item.purchased = not item.purchased
|
|
db.session.commit()
|
|
flash("Zmieniono status oznaczenia produktu", "success")
|
|
else:
|
|
flash("Nie znaleziono produktu", "danger")
|
|
return redirect(url_for("edit_list", list_id=list_id))
|
|
|
|
# Przekazanie receipts do szablonu
|
|
return render_template(
|
|
"admin/edit_list.html",
|
|
list=l,
|
|
total_expense=total_expense,
|
|
users=users,
|
|
items=items,
|
|
receipts=receipts,
|
|
upload_folder=app.config["UPLOAD_FOLDER"],
|
|
)
|
|
|
|
|
|
@app.route("/admin/products")
|
|
@login_required
|
|
@admin_required
|
|
def list_products():
|
|
items = Item.query.order_by(Item.id.desc()).all()
|
|
users = User.query.all()
|
|
users_dict = {user.id: user.username for user in users}
|
|
|
|
# Stabilne sortowanie sugestii
|
|
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
|
|
suggestions_dict = {s.name.lower(): s for s in suggestions}
|
|
|
|
return render_template(
|
|
"admin/list_products.html",
|
|
items=items,
|
|
users_dict=users_dict,
|
|
suggestions_dict=suggestions_dict,
|
|
)
|
|
|
|
|
|
@app.route("/admin/sync_suggestion/<int:item_id>", methods=["POST"])
|
|
@login_required
|
|
def sync_suggestion_ajax(item_id):
|
|
if not current_user.is_admin:
|
|
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
|
|
|
|
item = Item.query.get_or_404(item_id)
|
|
|
|
existing = SuggestedProduct.query.filter(
|
|
func.lower(SuggestedProduct.name) == item.name.lower()
|
|
).first()
|
|
if not existing:
|
|
new_suggestion = SuggestedProduct(name=item.name)
|
|
db.session.add(new_suggestion)
|
|
db.session.commit()
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Utworzono sugestię dla produktu: {item.name}",
|
|
}
|
|
)
|
|
else:
|
|
return jsonify(
|
|
{
|
|
"success": True,
|
|
"message": f"Sugestia dla produktu „{item.name}” już istnieje.",
|
|
}
|
|
)
|
|
|
|
|
|
@app.route("/admin/delete_suggestion/<int:suggestion_id>", methods=["POST"])
|
|
@login_required
|
|
def delete_suggestion_ajax(suggestion_id):
|
|
if not current_user.is_admin:
|
|
return jsonify({"success": False, "message": "Brak uprawnień"}), 403
|
|
|
|
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
|
|
db.session.delete(suggestion)
|
|
db.session.commit()
|
|
|
|
return jsonify({"success": True, "message": "Sugestia została usunięta."})
|
|
|
|
|
|
@app.route("/admin/expenses_data")
|
|
@login_required
|
|
def admin_expenses_data():
|
|
if not current_user.is_admin:
|
|
return jsonify({"error": "Brak uprawnień"}), 403
|
|
|
|
range_type = request.args.get("range", "monthly")
|
|
start_date_str = request.args.get("start_date")
|
|
end_date_str = request.args.get("end_date")
|
|
now = datetime.utcnow()
|
|
|
|
labels = []
|
|
expenses = []
|
|
|
|
if start_date_str and end_date_str:
|
|
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
|
|
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
|
|
|
|
expenses_query = (
|
|
db.session.query(
|
|
extract("year", Expense.added_at).label("year"),
|
|
extract("month", Expense.added_at).label("month"),
|
|
func.sum(Expense.amount).label("total"),
|
|
)
|
|
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
|
|
.group_by("year", "month")
|
|
.order_by("year", "month")
|
|
.all()
|
|
)
|
|
|
|
for row in expenses_query:
|
|
label = f"{int(row.month):02d}/{int(row.year)}"
|
|
labels.append(label)
|
|
expenses.append(round(row.total, 2))
|
|
|
|
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
|
|
response.headers["Cache-Control"] = (
|
|
"no-store, no-cache, must-revalidate, max-age=0"
|
|
)
|
|
return response
|
|
|
|
if range_type == "monthly":
|
|
for i in range(11, -1, -1):
|
|
year = (now - timedelta(days=i * 30)).year
|
|
month = (now - timedelta(days=i * 30)).month
|
|
label = f"{month:02d}/{year}"
|
|
labels.append(label)
|
|
|
|
month_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == year)
|
|
.filter(extract("month", Expense.added_at) == month)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
expenses.append(round(month_sum, 2))
|
|
|
|
elif range_type == "quarterly":
|
|
for i in range(3, -1, -1):
|
|
quarter_start = now - timedelta(days=i * 90)
|
|
year = quarter_start.year
|
|
quarter = (quarter_start.month - 1) // 3 + 1
|
|
label = f"Q{quarter}/{year}"
|
|
quarter_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == year)
|
|
.filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
labels.append(label)
|
|
expenses.append(round(quarter_sum, 2))
|
|
|
|
elif range_type == "halfyearly":
|
|
for i in range(1, -1, -1):
|
|
half_start = now - timedelta(days=i * 180)
|
|
year = half_start.year
|
|
half = 1 if half_start.month <= 6 else 2
|
|
label = f"H{half}/{year}"
|
|
half_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == year)
|
|
.filter(
|
|
(extract("month", Expense.added_at) <= 6)
|
|
if half == 1
|
|
else (extract("month", Expense.added_at) > 6)
|
|
)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
labels.append(label)
|
|
expenses.append(round(half_sum, 2))
|
|
|
|
elif range_type == "yearly":
|
|
for i in range(4, -1, -1):
|
|
year = now.year - i
|
|
label = str(year)
|
|
year_sum = (
|
|
db.session.query(func.sum(Expense.amount))
|
|
.filter(extract("year", Expense.added_at) == year)
|
|
.scalar()
|
|
or 0
|
|
)
|
|
labels.append(label)
|
|
expenses.append(round(year_sum, 2))
|
|
|
|
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
|
|
response.headers["Cache-Control"] = "no-store, no-cache"
|
|
return response
|
|
|
|
|
|
@app.route("/admin/promote_user/<int:user_id>")
|
|
@login_required
|
|
@admin_required
|
|
def promote_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
user.is_admin = True
|
|
db.session.commit()
|
|
flash(f"Użytkownik {user.username} został ustawiony jako admin.", "success")
|
|
return redirect(url_for("list_users"))
|
|
|
|
|
|
@app.route("/admin/demote_user/<int:user_id>")
|
|
@login_required
|
|
@admin_required
|
|
def demote_user(user_id):
|
|
user = User.query.get_or_404(user_id)
|
|
|
|
if user.id == current_user.id:
|
|
flash("Nie możesz zdegradować samego siebie!", "danger")
|
|
return redirect(url_for("list_users"))
|
|
|
|
admin_count = User.query.filter_by(is_admin=True).count()
|
|
if admin_count <= 1 and user.is_admin:
|
|
flash(
|
|
"Nie można zdegradować. Musi pozostać co najmniej jeden administrator.",
|
|
"danger",
|
|
)
|
|
return redirect(url_for("list_users"))
|
|
|
|
user.is_admin = False
|
|
db.session.commit()
|
|
flash(f"Użytkownik {user.username} został zdegradowany.", "success")
|
|
return redirect(url_for("list_users"))
|
|
|
|
|
|
@app.route("/healthcheck")
|
|
def healthcheck():
|
|
header_token = request.headers.get("X-Internal-Check")
|
|
correct_token = app.config.get("HEALTHCHECK_TOKEN")
|
|
|
|
if header_token != correct_token:
|
|
abort(404)
|
|
return "OK", 200
|
|
|
|
|
|
# =========================================================================================
|
|
# SOCKET.IO
|
|
# =========================================================================================
|
|
|
|
|
|
@socketio.on("delete_item")
|
|
def handle_delete_item(data):
|
|
item = Item.query.get(data["item_id"])
|
|
if item:
|
|
list_id = item.list_id
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
emit("item_deleted", {"item_id": item.id}, to=str(item.list_id))
|
|
|
|
purchased_count, total_count, percent = get_progress(list_id)
|
|
|
|
emit(
|
|
"progress_updated",
|
|
{
|
|
"purchased_count": purchased_count,
|
|
"total_count": total_count,
|
|
"percent": percent,
|
|
},
|
|
to=str(list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("edit_item")
|
|
def handle_edit_item(data):
|
|
item = Item.query.get(data["item_id"])
|
|
new_name = data["new_name"]
|
|
new_quantity = data.get("new_quantity", item.quantity)
|
|
|
|
if item and new_name.strip():
|
|
item.name = new_name.strip()
|
|
|
|
try:
|
|
new_quantity = int(new_quantity)
|
|
if new_quantity < 1:
|
|
new_quantity = 1
|
|
except:
|
|
new_quantity = 1
|
|
|
|
item.quantity = new_quantity
|
|
|
|
db.session.commit()
|
|
|
|
emit(
|
|
"item_edited",
|
|
{"item_id": item.id, "new_name": item.name, "new_quantity": item.quantity},
|
|
to=str(item.list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("join_list")
|
|
def handle_join(data):
|
|
global active_users
|
|
room = str(data["room"])
|
|
username = data.get("username", "Gość")
|
|
join_room(room)
|
|
|
|
if room not in active_users:
|
|
active_users[room] = set()
|
|
active_users[room].add(username)
|
|
|
|
shopping_list = ShoppingList.query.get(int(data["room"]))
|
|
list_title = shopping_list.title if shopping_list else "Twoja lista"
|
|
|
|
emit("user_joined", {"username": username}, to=room)
|
|
emit("user_list", {"users": list(active_users[room])}, to=room)
|
|
emit("joined_confirmation", {"room": room, "list_title": list_title})
|
|
|
|
|
|
@socketio.on("disconnect")
|
|
def handle_disconnect(sid):
|
|
global active_users
|
|
username = current_user.username if current_user.is_authenticated else "Gość"
|
|
for room, users in active_users.items():
|
|
if username in users:
|
|
users.remove(username)
|
|
emit("user_left", {"username": username}, to=room)
|
|
emit("user_list", {"users": list(users)}, to=room)
|
|
|
|
|
|
@socketio.on("add_item")
|
|
def handle_add_item(data):
|
|
list_id = data["list_id"]
|
|
name = data["name"]
|
|
quantity = data.get("quantity", 1)
|
|
|
|
try:
|
|
quantity = int(quantity)
|
|
if quantity < 1:
|
|
quantity = 1
|
|
except:
|
|
quantity = 1
|
|
|
|
new_item = Item(
|
|
list_id=list_id,
|
|
name=name,
|
|
quantity=quantity,
|
|
added_by=current_user.id if current_user.is_authenticated else None,
|
|
)
|
|
db.session.add(new_item)
|
|
|
|
if not SuggestedProduct.query.filter_by(name=name).first():
|
|
new_suggestion = SuggestedProduct(name=name)
|
|
db.session.add(new_suggestion)
|
|
|
|
db.session.commit()
|
|
|
|
emit(
|
|
"item_added",
|
|
{
|
|
"id": new_item.id,
|
|
"name": new_item.name,
|
|
"quantity": new_item.quantity,
|
|
"added_by": (
|
|
current_user.username if current_user.is_authenticated else "Gość"
|
|
),
|
|
},
|
|
to=str(list_id),
|
|
include_self=True,
|
|
)
|
|
|
|
purchased_count, total_count, percent = get_progress(list_id)
|
|
|
|
emit(
|
|
"progress_updated",
|
|
{
|
|
"purchased_count": purchased_count,
|
|
"total_count": total_count,
|
|
"percent": percent,
|
|
},
|
|
to=str(list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("check_item")
|
|
def handle_check_item(data):
|
|
item = Item.query.get(data["item_id"])
|
|
if item:
|
|
item.purchased = True
|
|
item.purchased_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
purchased_count, total_count, percent = get_progress(item.list_id)
|
|
|
|
emit("item_checked", {"item_id": item.id}, to=str(item.list_id))
|
|
emit(
|
|
"progress_updated",
|
|
{
|
|
"purchased_count": purchased_count,
|
|
"total_count": total_count,
|
|
"percent": percent,
|
|
},
|
|
to=str(item.list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("uncheck_item")
|
|
def handle_uncheck_item(data):
|
|
item = Item.query.get(data["item_id"])
|
|
if item:
|
|
item.purchased = False
|
|
item.purchased_at = None
|
|
db.session.commit()
|
|
|
|
purchased_count, total_count, percent = get_progress(item.list_id)
|
|
|
|
emit("item_unchecked", {"item_id": item.id}, to=str(item.list_id))
|
|
emit(
|
|
"progress_updated",
|
|
{
|
|
"purchased_count": purchased_count,
|
|
"total_count": total_count,
|
|
"percent": percent,
|
|
},
|
|
to=str(item.list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("request_full_list")
|
|
def handle_request_full_list(data):
|
|
list_id = data["list_id"]
|
|
items = Item.query.filter_by(list_id=list_id).all()
|
|
|
|
items_data = []
|
|
for item in items:
|
|
items_data.append(
|
|
{
|
|
"id": item.id,
|
|
"name": item.name,
|
|
"quantity": item.quantity,
|
|
"purchased": item.purchased if not item.not_purchased else False,
|
|
"not_purchased": item.not_purchased,
|
|
"note": item.note or "",
|
|
}
|
|
)
|
|
|
|
emit("full_list", {"items": items_data}, to=request.sid)
|
|
|
|
|
|
@socketio.on("update_note")
|
|
def handle_update_note(data):
|
|
item_id = data["item_id"]
|
|
note = data["note"]
|
|
item = Item.query.get(item_id)
|
|
if item:
|
|
item.note = note
|
|
db.session.commit()
|
|
emit("note_updated", {"item_id": item_id, "note": note}, to=str(item.list_id))
|
|
|
|
|
|
@socketio.on("add_expense")
|
|
def handle_add_expense(data):
|
|
list_id = data["list_id"]
|
|
amount = data["amount"]
|
|
|
|
new_expense = Expense(list_id=list_id, amount=amount)
|
|
db.session.add(new_expense)
|
|
db.session.commit()
|
|
|
|
total = (
|
|
db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar()
|
|
or 0
|
|
)
|
|
|
|
emit("expense_added", {"amount": amount, "total": total}, to=str(list_id))
|
|
|
|
|
|
@socketio.on("mark_not_purchased")
|
|
def handle_mark_not_purchased(data):
|
|
item = Item.query.get(data["item_id"])
|
|
reason = data.get("reason", "")
|
|
if item:
|
|
item.not_purchased = True
|
|
item.not_purchased_reason = reason
|
|
db.session.commit()
|
|
emit(
|
|
"item_marked_not_purchased",
|
|
{"item_id": item.id, "reason": reason},
|
|
to=str(item.list_id),
|
|
)
|
|
|
|
|
|
@socketio.on("unmark_not_purchased")
|
|
def handle_unmark_not_purchased(data):
|
|
item = Item.query.get(data["item_id"])
|
|
if item:
|
|
item.not_purchased = False
|
|
item.purchased = False
|
|
item.purchased_at = None
|
|
item.not_purchased_reason = None
|
|
db.session.commit()
|
|
emit("item_unmarked_not_purchased", {"item_id": item.id}, to=str(item.list_id))
|
|
|
|
|
|
""" @socketio.on('receipt_uploaded')
|
|
def handle_receipt_uploaded(data):
|
|
list_id = data['list_id']
|
|
url = data['url']
|
|
|
|
emit('receipt_added', {
|
|
'url': url
|
|
}, to=str(list_id), include_self=False) """
|
|
|
|
|
|
@app.cli.command("create_db")
|
|
def create_db():
|
|
db.create_all()
|
|
print("Database created.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
socketio.run(app, host="0.0.0.0", port=8000, debug=True)
|