Files
lista_zakupowa_live/app.py
Mateusz Gruszczyński 84d902deb1 cookie value
2025-07-03 22:52:09 +02:00

635 lines
22 KiB
Python

import os
import secrets
import time
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func
app = Flask(__name__)
app.config.from_object(Config)
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
db = SQLAlchemy(app)
socketio = SocketIO(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
static_bp = Blueprint('static_bp', __name__)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
owner = db.relationship('User', backref='lists', lazy=True)
is_archived = db.Column(db.Boolean, default=False)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
name = db.Column(db.String(150), nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
note = db.Column(db.Text, nullable=True)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
@static_bp.route('/static/js/live.js')
def serve_live_js():
response = send_from_directory('static/js', 'live.js')
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
response.expires = 0
response.pragma = 'no-cache'
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {'has_authorized_cookie': 'authorized' in request.cookies}
@app.before_request
def require_system_password():
if 'authorized' not in request.cookies \
and request.endpoint != 'system_auth' \
and not request.endpoint.startswith('static') \
and not request.endpoint.startswith('login'):
# Jeśli wchodzi na '/', nie dodawaj next
if request.path == '/':
return redirect(url_for('system_auth'))
else:
# W innym przypadku poprawiamy URL jak wcześniej
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for('system_auth', next=fixed_url))
@app.template_filter('filemtime')
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
return datetime.utcnow()
@app.template_filter('filesizeformat')
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
# Jeśli chcesz dokładniejszy format, np. KB, MB
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.route('/system-auth', methods=['GET', 'POST'])
def system_auth():
next_page = request.args.get('next') or url_for('index_guest')
if request.method == 'POST':
if request.form['password'] == SYSTEM_PASSWORD:
db.create_all()
if not User.query.filter_by(is_admin=True).first():
admin_user = User(
username=DEFAULT_ADMIN_USERNAME,
password_hash=generate_password_hash(DEFAULT_ADMIN_PASSWORD),
is_admin=True
)
db.session.add(admin_user)
db.session.commit()
flash(f'Utworzono konto administratora: login={DEFAULT_ADMIN_USERNAME}, hasło={DEFAULT_ADMIN_PASSWORD}')
resp = redirect(next_page)
resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE)
return resp
flash('Nieprawidłowe hasło do systemu','danger')
return render_template('system_auth.html')
@app.route('/')
def index_guest():
lists = ShoppingList.query.all()
for l in lists:
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
return render_template('index.html', lists=lists)
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.route('/favicon.svg')
def favicon():
svg = '''
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
'''
return svg, 200, {'Content-Type': 'image/svg+xml'}
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = User.query.filter_by(username=request.form['username']).first()
if user and check_password_hash(user.password_hash, request.form['password']):
login_user(user)
flash('Zalogowano pomyślnie', 'success')
return redirect(url_for('index_guest'))
flash('Nieprawidłowy login lub hasło', 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Wylogowano pomyślnie', 'success')
return redirect(url_for('login'))
@app.route('/create', methods=['POST'])
@login_required
def create_list():
title = request.form.get('title')
is_temporary = 'temporary' in request.form
token = secrets.token_hex(16)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
db.session.add(new_list)
db.session.commit()
flash('Utworzono nową listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/list/<int:list_id>')
@login_required
def view_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
return render_template(
'list.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent
)
@app.route('/share/<token>')
def share_list(token):
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
items = Item.query.filter_by(list_id=shopping_list.id).all()
receipt_pattern = f"list_{shopping_list.id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
return render_template('list_guest.html', list=shopping_list, items=items, receipt_files=receipt_files)
@app.route('/guest-list/<int:list_id>')
def guest_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
return render_template('list_guest.html', list=shopping_list, items=items, receipt_files=receipt_files)
@app.route('/copy/<int:list_id>')
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = secrets.token_hex(16)
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash('Skopiowano listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/suggest_products')
def suggest_products():
query = request.args.get('q', '')
suggestions = []
if query:
suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all()
return {'suggestions': [s.name for s in suggestions]}
@app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
img = Image.open(file)
img.thumbnail((800, 800))
img.save(file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer)
@app.route('/uploads/<filename>')
def uploaded_file(filename):
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename)
response.headers['Cache-Control'] = 'public, max-age=2592000, immutable'
response.headers.pop('Pragma', None)
return response
@app.route('/update-note/<int:item_id>', methods=['POST'])
def update_note(item_id):
item = Item.query.get_or_404(item_id)
note = request.form.get('note')
item.note = note
db.session.commit()
return {'success': True}
@app.route('/admin')
@login_required
def admin_panel():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
# Pobierz folder uploadów
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
enriched_lists = []
for l in all_lists:
items = Item.query.filter_by(list_id=l.id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
purchased_items_count = Item.query.filter_by(purchased=True).count()
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
enriched_lists.append({
'list': l,
'total_count': total_count,
'purchased_count': purchased_count,
'percent': round(percent),
'comments_count': comments_count,
'receipts_count': len(receipt_files)
})
top_products = (
db.session.query(Item.name, func.count(Item.id).label('count'))
.filter(Item.purchased == True)
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
return render_template(
'admin/admin_panel.html',
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
)
@app.route('/admin/delete_list/<int:list_id>')
@login_required
def delete_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f'Usunięto listę: {list_to_delete.title}', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/delete_all_lists')
@login_required
def delete_all_lists():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
Item.query.delete()
ShoppingList.query.delete()
db.session.commit()
flash('Usunięto wszystkie listy', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_user', methods=['GET', 'POST'])
@login_required
def add_user():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
if request.method == 'POST':
username = request.form['username']
password = generate_password_hash(request.form['password'])
new_user = User(username=username, password_hash=password)
db.session.add(new_user)
db.session.commit()
flash('Dodano nowego użytkownika', 'success')
return redirect(url_for('admin_panel'))
return render_template('admin/add_user.html')
@app.route('/admin/users')
@login_required
def list_users():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
@app.route('/admin/reset_password/<int:user_id>', methods=['GET', 'POST'])
@login_required
def reset_password(user_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user = User.query.get_or_404(user_id)
if request.method == 'POST':
new_password = generate_password_hash(request.form['password'])
user.password_hash = new_password
db.session.commit()
flash('Hasło zresetowane', 'success')
return redirect(url_for('list_users'))
return render_template('admin/reset_password.html', user=user)
@app.route('/admin/delete_user/<int:user_id>')
@login_required
def delete_user(user_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
flash('Użytkownik usunięty', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/receipts')
@login_required
def admin_receipts():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
image_files = [f for f in all_files if allowed_file(f)]
return render_template(
'admin/receipts.html',
image_files=image_files,
upload_folder=app.config['UPLOAD_FOLDER']
)
@app.route('/admin/delete_receipt/<filename>')
@login_required
def delete_receipt(filename):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
flash('Plik usunięty', 'success')
else:
flash('Plik nie istnieje', 'danger')
return redirect(url_for('admin_receipts'))
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
def edit_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
l = ShoppingList.query.get_or_404(list_id)
if request.method == 'POST':
new_title = request.form.get('title')
if new_title:
l.title = new_title
db.session.commit()
flash('Zaktualizowano tytuł listy', 'success')
return redirect(url_for('admin_panel'))
return render_template('admin/edit_list.html', list=l)
@app.route('/admin/delete_selected_lists', methods=['POST'])
@login_required
def delete_selected_lists():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
ids = request.form.getlist('list_ids')
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
if lst:
Item.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash('Usunięto wybrane listy', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/archive_list/<int:list_id>')
@login_required
def archive_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
l = ShoppingList.query.get_or_404(list_id)
l.is_archived = True
db.session.commit()
flash('Lista oznaczona jako archiwalna', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/delete_all_items')
@login_required
def delete_all_items():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
Item.query.delete()
db.session.commit()
flash('Usunięto wszystkie produkty', 'success')
return redirect(url_for('admin_panel'))
@socketio.on('delete_item')
def handle_delete_item(data):
item = Item.query.get(data['item_id'])
if item:
db.session.delete(item)
db.session.commit()
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
@socketio.on('edit_item')
def handle_edit_item(data):
item = Item.query.get(data['item_id'])
new_name = data['new_name']
if item and new_name.strip():
item.name = new_name
db.session.commit()
emit('item_edited', {'item_id': item.id, 'new_name': item.name}, to=str(item.list_id))
@socketio.on('join_list')
def handle_join(data):
room = str(data['room'])
username = data.get('username', 'Gość')
join_room(room)
emit('user_joined', {'username': username}, to=room)
@socketio.on('add_item')
def handle_add_item(data):
list_id = data['list_id']
name = data['name']
new_item = Item(
list_id=list_id,
name=name,
added_by=current_user.id if current_user.is_authenticated else None
)
db.session.add(new_item)
if not SuggestedProduct.query.filter_by(name=name).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit('item_added', {
'id': new_item.id,
'name': new_item.name,
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
}, to=str(list_id), include_self=True)
@socketio.on('check_item')
def handle_check_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = True
item.purchased_at = datetime.utcnow()
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('uncheck_item')
def handle_uncheck_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('update_note')
def handle_update_note(data):
item_id = data['item_id']
note = data['note']
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id))
@app.cli.command('create_db')
def create_db():
db.create_all()
print('Database created.')
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=8000, debug=True)