Files
lista_zakupowa_live/app.py
2025-07-10 23:18:19 +02:00

1370 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session, jsonify, make_response
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
from collections import defaultdict, deque
from functools import wraps
app = Flask(__name__)
app.config.from_object(Config)
app.config['COMPRESS_ALGORITHM'] = ['zstd', 'br', 'gzip', 'deflate']
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint('static_bp', __name__)
# dla live
active_users = {}
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
owner = db.relationship('User', backref='lists', lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
name = db.Column(db.String(150), nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
with app.app_context():
db.create_all()
from werkzeug.security import generate_password_hash
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
password_hash = generate_password_hash(password)
if admin:
if admin.username != username or not check_password_hash(admin.password_hash, password):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route('/static/js/<path:filename>')
def serve_js(filename):
response = send_from_directory('static/js', filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
#response.expires = 0
response.pragma = 'no-cache'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
@static_bp.route('/static/css/<path:filename>')
def serve_css(filename):
response = send_from_directory('static/css', filename)
response.headers['Cache-Control'] = 'public, max-age=3600'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
@static_bp.route('/static/lib/js/<path:filename>')
def serve_js_lib(filename):
response = send_from_directory('static/lib/js', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
# CSS z cache na tydzień
@static_bp.route('/static/lib/css/<path:filename>')
def serve_css_lib(filename):
response = send_from_directory('static/lib/css', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
"""Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4)."""
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash('Ta lista nie jest publicznie dostępna', 'danger')
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path: str, max_size=(2000, 2000)):
img = Image.open(file)
img.thumbnail(max_size)
img.save(path)
def redirect_with_flash(message: str, category: str = 'info', endpoint: str = 'main_page'):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash('Brak uprawnień do tej sekcji.', 'danger')
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config['UPLOAD_FOLDER']
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
# zabezpieczenie logowani do systemy - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {'has_authorized_cookie': 'authorized' in request.cookies}
""" @app.before_request
def require_system_password():
if request.endpoint is None:
return
if request.endpoint in ['forbidden', 'not_found', 'internal_error', 'system_auth']:
return
if 'authorized' not in request.cookies \
and request.endpoint != 'system_auth' \
and not request.endpoint.startswith('login') \
and request.endpoint != 'favicon':
if request.endpoint == 'static_bp.serve_js':
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for('system_auth', next=request.url))
else:
return
if request.endpoint.startswith('static_bp.'):
return
if request.path == '/':
return redirect(url_for('system_auth'))
else:
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for('system_auth', next=fixed_url)) """
@app.before_request
def require_system_password():
if request.endpoint is None:
return
if request.endpoint in ['forbidden', 'not_found', 'internal_error', 'system_auth']:
return
if 'authorized' not in request.cookies \
and request.endpoint != 'system_auth' \
and not request.endpoint.startswith('login') \
and request.endpoint != 'favicon':
if request.endpoint == 'static_bp.serve_js':
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
abort(403) # dla JS lepiej pokazać błąd, nie auth
else:
return
if request.endpoint.startswith('static_bp.'):
return
# Dla głównej strony i innych stron HTML przekierowanie na auth
if request.path == '/':
return redirect(url_for('system_auth'))
else:
return redirect(url_for('system_auth', next=request.url))
@app.template_filter('filemtime')
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
return datetime.utcnow()
@app.template_filter('filesizeformat')
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return render_template(
'errors.html',
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona."
), 404
@app.errorhandler(403)
def forbidden(e):
return render_template(
'errors.html',
code=403,
title="Brak dostępu",
message="Nie masz uprawnień do wyświetlenia tej strony."
), 403
@app.errorhandler(500)
def internal_error(e):
return render_template(
'errors.html',
code=500,
title="Błąd serwera",
message="Wystąpił nieoczekiwany błąd. Spróbuj ponownie później."
), 500
@app.route('/favicon.ico')
def favicon_ico():
return redirect(url_for('static', filename='favicon.svg'))
@app.route('/favicon.svg')
def favicon():
svg = '''
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
'''
return svg, 200, {'Content-Type': 'image/svg+xml'}
@app.route('/')
def main_page():
now = datetime.utcnow()
if current_user.is_authenticated:
user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
).order_by(ShoppingList.created_at.desc()).all()
archived_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True).order_by(ShoppingList.created_at.desc()).all()
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
else:
user_lists = []
archived_lists = []
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
for l in user_lists + public_lists + archived_lists:
enrich_list_data(l)
return render_template("main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists)
@app.route('/system-auth', methods=['GET', 'POST'])
def system_auth():
if current_user.is_authenticated or request.cookies.get('authorized') == AUTHORIZED_COOKIE_VALUE:
flash('Jesteś już zalogowany lub autoryzowany.', 'info')
return redirect(url_for('main_page'))
ip = request.access_route[0]
next_page = request.args.get('next') or url_for('main_page')
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
if request.method == 'POST':
if request.form['password'] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
remaining = attempts_remaining(ip)
flash(f'Nieprawidłowe hasło. Pozostało {remaining} prób.', 'warning')
return render_template('system_auth.html')
@app.route('/toggle_archive_list/<int:list_id>')
@login_required
def toggle_archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
archive = request.args.get('archive', 'true').lower() == 'true'
if archive:
l.is_archived = True
flash(f'Lista „{l.title}” została zarchiwizowana.', 'success')
else:
l.is_archived = False
flash(f'Lista „{l.title}” została przywrócona.', 'success')
db.session.commit()
return redirect(url_for('main_page'))
@app.route('/edit_my_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
def edit_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
if request.method == 'POST':
new_title = request.form.get('title')
if new_title and new_title.strip():
l.title = new_title.strip()
db.session.commit()
flash('Zaktualizowano tytuł listy', 'success')
return redirect(url_for('main_page'))
else:
flash('Podaj poprawny tytuł', 'danger')
return render_template('edit_my_list.html', list=l)
@app.route('/toggle_visibility/<int:list_id>', methods=['GET', 'POST'])
@login_required
def toggle_visibility(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
if request.is_json or request.method == 'POST':
return {'error': 'Unauthorized'}, 403
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('main_page'))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == 'POST':
return {'is_public': l.is_public, 'share_url': share_url}
if l.is_public:
flash('Lista została udostępniona publicznie', 'success')
else:
flash('Lista została ukryta przed gośćmi', 'info')
return redirect(url_for('main_page'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = User.query.filter_by(username=request.form['username']).first()
if user and check_password_hash(user.password_hash, request.form['password']):
login_user(user)
flash('Zalogowano pomyślnie', 'success')
return redirect(url_for('main_page'))
flash('Nieprawidłowy login lub hasło', 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Wylogowano pomyślnie', 'success')
return redirect(url_for('main_page'))
@app.route('/create', methods=['POST'])
@login_required
def create_list():
title = request.form.get('title')
is_temporary = 'temporary' in request.form
token = generate_share_token(8)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
db.session.add(new_list)
db.session.commit()
flash('Utworzono nową listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/list/<int:list_id>')
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return render_template(
'list.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense
)
@app.route('/share/<token>')
@app.route('/guest-list/<int:list_id>')
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for('main_page'))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
return render_template(
'list_share.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense
)
@app.route('/copy/<int:list_id>')
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash('Skopiowano listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/suggest_products')
def suggest_products():
query = request.args.get('q', '')
suggestions = []
if query:
suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all()
return {'suggestions': [s.name for s in suggestions]}
@app.route('/all_products')
def all_products():
query = request.args.get('q', '')
top_products_query = SuggestedProduct.query
if query:
top_products_query = top_products_query.filter(SuggestedProduct.name.ilike(f'%{query}%'))
top_products = (
top_products_query
.order_by(SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc())
.limit(20)
.all()
)
top_names = [s.name for s in top_products]
rest_query = SuggestedProduct.query
if query:
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f'%{query}%'))
if top_names:
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
rest_products = (
rest_query
.order_by(SuggestedProduct.name.asc())
.limit(200)
.all()
)
all_names = top_names + [s.name for s in rest_products]
return {'allproducts': all_names}
""" @app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
save_resized_image(file, file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer) """
@app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Brak pliku'}), 400
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Nie wybrano pliku'}), 400
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
full_filename = f"list_{list_id}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], full_filename)
save_resized_image(file, file_path)
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
url = url_for('uploaded_file', filename=full_filename)
socketio.emit('receipt_added', {'url': url}, to=str(list_id))
return jsonify({'success': True, 'url': url})
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Niedozwolony format pliku'}), 400
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer)
@app.route('/uploads/<filename>')
def uploaded_file(filename):
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename)
response.headers['Cache-Control'] = 'public, max-age=2592000, immutable'
response.headers.pop('Pragma', None)
response.headers.pop('Content-Disposition', None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers['Content-Type'] = mime
return response
@app.route('/admin')
@login_required
@admin_required
def admin_panel():
now = datetime.utcnow()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
enriched_lists.append({
'list': l,
'total_count': total_count,
'purchased_count': purchased_count,
'percent': round(percent),
'comments_count': comments_count,
'receipts_count': len(receipt_files),
'total_expense': l.total_expense
})
top_products = (
db.session.query(Item.name, func.count(Item.id).label('count'))
.filter(Item.purchased == True)
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_year = datetime.utcnow().year
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.scalar() or 0
)
current_month = datetime.utcnow().month
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.filter(extract('month', Expense.added_at) == current_month)
.scalar() or 0
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # MB
return render_template(
'admin/admin_panel.html',
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
)
@app.route('/admin/delete_list/<int:list_id>')
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f'Usunięto listę: {list_to_delete.title}', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_user', methods=['POST'])
@login_required
@admin_required
def add_user():
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('Wypełnij wszystkie pola', 'danger')
return redirect(url_for('list_users'))
if User.query.filter_by(username=username).first():
flash('Użytkownik o takiej nazwie już istnieje', 'warning')
return redirect(url_for('list_users'))
hashed_password = generate_password_hash(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash('Dodano nowego użytkownika', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/users')
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template('admin/user_management.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
@app.route('/admin/change_password/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form['password']
if not new_password:
flash('Podaj nowe hasło', 'danger')
return redirect(url_for('list_users'))
user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f'Hasło dla użytkownika {user.username} zostało zaktualizowane', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/delete_user/<int:user_id>')
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash('Nie można usunąć ostatniego administratora.', 'danger')
return redirect(url_for('list_users'))
db.session.delete(user)
db.session.commit()
flash('Użytkownik usunięty', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/receipts')
@login_required
@admin_required
def admin_receipts():
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
image_files = [f for f in all_files if allowed_file(f)]
return render_template(
'admin/receipts.html',
image_files=image_files,
upload_folder=app.config['UPLOAD_FOLDER']
)
@app.route('/admin/delete_receipt/<filename>')
@login_required
@admin_required
def delete_receipt(filename):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
flash('Plik usunięty', 'success')
else:
flash('Plik nie istnieje', 'danger')
return redirect(url_for('admin_receipts'))
@app.route('/admin/delete_selected_lists', methods=['POST'])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist('list_ids')
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash('Usunięto wybrane listy', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/archive_list/<int:list_id>')
@login_required
@admin_required
def archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
l.is_archived = True
db.session.commit()
flash('Lista oznaczona jako archiwalna', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/delete_all_items')
@login_required
@admin_required
def delete_all_items():
Item.query.delete()
db.session.commit()
flash('Usunięto wszystkie produkty', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
if request.method == 'POST':
new_title = request.form.get('title')
new_amount_str = request.form.get('amount')
is_archived = 'archived' in request.form
new_owner_id = request.form.get('owner_id')
if new_title and new_title.strip():
l.title = new_title.strip()
l.is_archived = is_archived
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
if User.query.get(new_owner_id_int):
l.owner_id = new_owner_id_int
else:
flash('Wybrany użytkownik nie istnieje', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
except ValueError:
flash('Niepoprawny ID użytkownika', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
if expenses:
for expense in expenses:
db.session.delete(expense)
db.session.commit()
new_expense = Expense(list_id=list_id, amount=new_amount)
db.session.add(new_expense)
db.session.commit()
flash('Zaktualizowano tytuł, właściciela, archiwizację i/lub kwotę wydatku', 'success')
except ValueError:
flash('Niepoprawna kwota', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
else:
db.session.commit()
flash('Zaktualizowano tytuł, właściciela i/lub archiwizację', 'success')
return redirect(url_for('admin_panel'))
return render_template('admin/edit_list.html', list=l, total_expense=total_expense, users=users)
@app.route('/admin/products')
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
users = User.query.all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
suggestions_dict = {s.name.lower(): s for s in suggestions}
return render_template(
'admin/list_products.html',
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict
)
@app.route('/admin/sync_suggestion/<int:item_id>', methods=['POST'])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == item.name.lower()).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify({'success': True, 'message': f'Utworzono sugestię dla produktu: {item.name}'})
else:
return jsonify({'success': True, 'message': f'Sugestia dla produktu „{item.name}” już istnieje.'})
@app.route('/admin/delete_suggestion/<int:suggestion_id>', methods=['POST'])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({'success': True, 'message': 'Sugestia została usunięta.'})
@app.route('/admin/expenses_data')
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({'error': 'Brak uprawnień'}), 403
range_type = request.args.get('range', 'monthly')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
now = datetime.utcnow()
labels = []
expenses = []
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
expenses_query = (
db.session.query(
extract('year', Expense.added_at).label('year'),
extract('month', Expense.added_at).label('month'),
func.sum(Expense.amount).label('total')
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by('year', 'month')
.order_by('year', 'month')
.all()
)
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
response = make_response(jsonify({'labels': labels, 'expenses': expenses}))
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
return response
if range_type == 'monthly':
for i in range(11, -1, -1):
year = (now - timedelta(days=i*30)).year
month = (now - timedelta(days=i*30)).month
label = f"{month:02d}/{year}"
labels.append(label)
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter(extract('month', Expense.added_at) == month)
.scalar() or 0
)
expenses.append(round(month_sum, 2))
elif range_type == 'quarterly':
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i*90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter((extract('month', Expense.added_at) - 1)//3 + 1 == quarter)
.scalar() or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == 'halfyearly':
for i in range(1, -1, -1):
half_start = now - timedelta(days=i*180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter(
(extract('month', Expense.added_at) <= 6) if half == 1 else (extract('month', Expense.added_at) > 6)
)
.scalar() or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == 'yearly':
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.scalar() or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({'labels': labels, 'expenses': expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
@app.route('/admin/promote_user/<int:user_id>')
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f'Użytkownik {user.username} został ustawiony jako admin.', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/demote_user/<int:user_id>')
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash('Nie możesz zdegradować samego siebie!', 'danger')
return redirect(url_for('list_users'))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash('Nie można zdegradować. Musi pozostać co najmniej jeden administrator.', 'danger')
return redirect(url_for('list_users'))
user.is_admin = False
db.session.commit()
flash(f'Użytkownik {user.username} został zdegradowany.', 'success')
return redirect(url_for('list_users'))
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on('delete_item')
def handle_delete_item(data):
item = Item.query.get(data['item_id'])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(list_id))
@socketio.on('edit_item')
def handle_edit_item(data):
item = Item.query.get(data['item_id'])
new_name = data['new_name']
new_quantity = data.get('new_quantity', item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit('item_edited', {
'item_id': item.id,
'new_name': item.name,
'new_quantity': item.quantity
}, to=str(item.list_id))
@socketio.on('join_list')
def handle_join(data):
global active_users
room = str(data['room'])
username = data.get('username', 'Gość')
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
shopping_list = ShoppingList.query.get(int(data['room']))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit('user_joined', {'username': username}, to=room)
emit('user_list', {'users': list(active_users[room])}, to=room)
emit('joined_confirmation', {'room': room, 'list_title': list_title})
@socketio.on('disconnect')
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit('user_left', {'username': username}, to=room)
emit('user_list', {'users': list(users)}, to=room)
@socketio.on('add_item')
def handle_add_item(data):
list_id = data['list_id']
name = data['name']
quantity = data.get('quantity', 1)
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
added_by=current_user.id if current_user.is_authenticated else None
)
db.session.add(new_item)
if not SuggestedProduct.query.filter_by(name=name).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit('item_added', {
'id': new_item.id,
'name': new_item.name,
'quantity': new_item.quantity,
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
}, to=str(list_id), include_self=True)
purchased_count, total_count, percent = get_progress(list_id)
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(list_id))
@socketio.on('check_item')
def handle_check_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = True
item.purchased_at = datetime.utcnow()
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('uncheck_item')
def handle_uncheck_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('request_full_list')
def handle_request_full_list(data):
list_id = data['list_id']
items = Item.query.filter_by(list_id=list_id).all()
items_data = []
for item in items:
items_data.append({
'id': item.id,
'name': item.name,
'quantity': item.quantity,
'purchased': item.purchased,
'note': item.note or ''
})
emit('full_list', {'items': items_data}, to=request.sid)
@socketio.on('update_note')
def handle_update_note(data):
item_id = data['item_id']
note = data['note']
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id))
@socketio.on('add_expense')
def handle_add_expense(data):
list_id = data['list_id']
amount = data['amount']
new_expense = Expense(list_id=list_id, amount=amount)
db.session.add(new_expense)
db.session.commit()
total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0
emit('expense_added', {
'amount': amount,
'total': total
}, to=str(list_id))
""" @socketio.on('receipt_uploaded')
def handle_receipt_uploaded(data):
list_id = data['list_id']
url = data['url']
emit('receipt_added', {
'url': url
}, to=str(list_id), include_self=False) """
@app.cli.command('create_db')
def create_db():
db.create_all()
print('Database created.')
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=8000, debug=True)