Files
lista_zakupowa_live/app.py
Mateusz Gruszczyński a90fc66c06 livs offline
2025-07-06 17:09:46 +02:00

984 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import secrets
import time
import mimetypes
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
from collections import defaultdict, deque
app = Flask(__name__)
app.config.from_object(Config)
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = 'login'
static_bp = Blueprint('static_bp', __name__)
# dla live
active_users = {}
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
owner = db.relationship('User', backref='lists', lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
name = db.Column(db.String(150), nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
with app.app_context():
# Twój kod inicjalizacyjny, np. utworzenie konta admina
db.create_all()
from werkzeug.security import generate_password_hash
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
password_hash = generate_password_hash(password)
if admin:
# Aktualizacja jeśli dane się różnią
if admin.username != username or not check_password_hash(admin.password_hash, password):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
# Brak admina utwórz nowe konto
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route('/static/js/<path:filename>')
def serve_js(filename):
response = send_from_directory('static/js', filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
#response.expires = 0
response.pragma = 'no-cache'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
@static_bp.route('/static/css/<path:filename>')
def serve_css(filename):
response = send_from_directory('static/css', filename)
#response.cache_control.public = True
#response.cache_control.max_age = 3600
response.headers['Cache-Control'] = 'public, max-age=3600'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
#response.expires = 0
return response
@static_bp.route('/static/lib/js/<path:filename>')
def serve_js_lib(filename):
response = send_from_directory('static/lib/js', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
# CSS z cache na tydzień
@static_bp.route('/static/lib/css/<path:filename>')
def serve_css_lib(filename):
response = send_from_directory('static/lib/css', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config['UPLOAD_FOLDER']
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
# zabezpieczenie logowani do systemy - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {'has_authorized_cookie': 'authorized' in request.cookies}
@app.before_request
def require_system_password():
if 'authorized' not in request.cookies \
and request.endpoint != 'system_auth' \
and not request.endpoint.startswith('login') \
and request.endpoint != 'favicon':
# specjalny wyjątek dla statycznych, ale sprawdzany ręcznie niżej
if request.endpoint == 'static_bp.serve_js':
# tu sprawdzamy czy to JS, który ma być chroniony
protected_js = ["live.js", "list_guest.js", "hide_list.js", "socket_reconnect.js"]
requested_file = request.view_args.get("filename", "")
if requested_file in protected_js:
return redirect(url_for('system_auth', next=request.url))
else:
return # pozwól na inne pliki statyczne
if request.endpoint.startswith('static_bp.'):
return # np. CSS, favicon, inne — pozwól
if request.path == '/':
return redirect(url_for('system_auth'))
else:
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for('system_auth', next=fixed_url))
@app.template_filter('filemtime')
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
return datetime.utcnow()
@app.template_filter('filesizeformat')
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(403)
def forbidden(e):
return '403 Forbidden', 403
@app.route('/favicon.svg')
def favicon():
svg = '''
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
'''
return svg, 200, {'Content-Type': 'image/svg+xml'}
@app.route('/')
def index_guest():
now = datetime.utcnow()
if current_user.is_authenticated:
# Twoje listy
user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
).order_by(ShoppingList.created_at.desc()).all()
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
else:
user_lists = []
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
for l in user_lists + public_lists:
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return render_template("main.html", user_lists=user_lists, public_lists=public_lists)
@app.route('/system-auth', methods=['GET', 'POST'])
def system_auth():
#ip = request.remote_addr
ip = request.access_route[0]
next_page = request.args.get('next') or url_for('index_guest')
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
if request.method == 'POST':
if request.form['password'] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
remaining = attempts_remaining(ip)
flash(f'Nieprawidłowe hasło do systemu. Pozostało prób: {remaining}', 'warning')
return render_template('system_auth.html')
@app.route('/archive_my_list/<int:list_id>')
@login_required
def archive_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('index_guest'))
l.is_archived = True
db.session.commit()
flash('Lista została zarchiwizowana', 'success')
return redirect(url_for('index_guest'))
@app.route('/edit_my_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
def edit_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('index_guest'))
if request.method == 'POST':
new_title = request.form.get('title')
if new_title and new_title.strip():
l.title = new_title.strip()
db.session.commit()
flash('Zaktualizowano tytuł listy', 'success')
return redirect(url_for('index_guest'))
else:
flash('Podaj poprawny tytuł', 'danger')
return render_template('edit_my_list.html', list=l)
@app.route('/toggle_visibility/<int:list_id>', methods=['GET', 'POST'])
@login_required
def toggle_visibility(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
if request.is_json or request.method == 'POST':
return {'error': 'Unauthorized'}, 403
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('index_guest'))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == 'POST':
return {'is_public': l.is_public, 'share_url': share_url}
if l.is_public:
flash('Lista została udostępniona publicznie', 'success')
else:
flash('Lista została ukryta przed gośćmi', 'info')
return redirect(url_for('index_guest'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = User.query.filter_by(username=request.form['username']).first()
if user and check_password_hash(user.password_hash, request.form['password']):
login_user(user)
flash('Zalogowano pomyślnie', 'success')
return redirect(url_for('index_guest'))
flash('Nieprawidłowy login lub hasło', 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Wylogowano pomyślnie', 'success')
return redirect(url_for('index_guest'))
@app.route('/create', methods=['POST'])
@login_required
def create_list():
title = request.form.get('title')
is_temporary = 'temporary' in request.form
#token = secrets.token_hex(16)
token = secrets.token_hex(4)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
db.session.add(new_list)
db.session.commit()
flash('Utworzono nową listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/list/<int:list_id>')
@login_required
def view_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
return render_template(
'list.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense
)
@app.route('/share/<token>')
def share_list(token):
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not shopping_list.is_public:
flash('Ta lista nie jest publicznie dostępna', 'danger')
return redirect(url_for('index_guest'))
items = Item.query.filter_by(list_id=shopping_list.id).all()
receipt_pattern = f"list_{shopping_list.id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=shopping_list.id).all()
total_expense = sum(e.amount for e in expenses)
return render_template(
'list_guest.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense
)
@app.route('/guest-list/<int:list_id>')
def guest_list(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
return render_template(
'list_guest.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense
)
@app.route('/copy/<int:list_id>')
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = secrets.token_hex(16)
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash('Skopiowano listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/suggest_products')
def suggest_products():
query = request.args.get('q', '')
suggestions = []
if query:
suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all()
return {'suggestions': [s.name for s in suggestions]}
@app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
img = Image.open(file)
img.thumbnail((2000, 2000))
img.save(file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer)
@app.route('/uploads/<filename>')
def uploaded_file(filename):
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename)
response.headers['Cache-Control'] = 'public, max-age=2592000, immutable'
response.headers.pop('Pragma', None)
response.headers.pop('Content-Disposition', None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers['Content-Type'] = mime
return response
@app.route('/admin')
@login_required
def admin_panel():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
enriched_lists = []
for l in all_lists:
items = Item.query.filter_by(list_id=l.id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
expenses = Expense.query.filter_by(list_id=l.id).all()
total_expense = sum(e.amount for e in expenses)
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
enriched_lists.append({
'list': l,
'total_count': total_count,
'purchased_count': purchased_count,
'percent': round(percent),
'comments_count': comments_count,
'receipts_count': len(receipt_files),
'total_expense': total_expense
})
top_products = (
db.session.query(Item.name, func.count(Item.id).label('count'))
.filter(Item.purchased == True)
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_year = datetime.utcnow().year
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.scalar() or 0
)
current_month = datetime.utcnow().month
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.filter(extract('month', Expense.added_at) == current_month)
.scalar() or 0
)
return render_template(
'admin/admin_panel.html',
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
)
@app.route('/admin/delete_list/<int:list_id>')
@login_required
def delete_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f'Usunięto listę: {list_to_delete.title}', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_user', methods=['GET', 'POST'])
@login_required
def add_user():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
if request.method == 'POST':
username = request.form['username']
password = generate_password_hash(request.form['password'])
new_user = User(username=username, password_hash=password)
db.session.add(new_user)
db.session.commit()
flash('Dodano nowego użytkownika', 'success')
return redirect(url_for('admin_panel'))
return render_template('admin/add_user.html')
@app.route('/admin/users')
@login_required
def list_users():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
@app.route('/admin/reset_password/<int:user_id>', methods=['GET', 'POST'])
@login_required
def reset_password(user_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user = User.query.get_or_404(user_id)
if request.method == 'POST':
new_password = generate_password_hash(request.form['password'])
user.password_hash = new_password
db.session.commit()
flash('Hasło zresetowane', 'success')
return redirect(url_for('list_users'))
return render_template('admin/reset_password.html', user=user)
@app.route('/admin/delete_user/<int:user_id>')
@login_required
def delete_user(user_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
flash('Użytkownik usunięty', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/receipts')
@login_required
def admin_receipts():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
image_files = [f for f in all_files if allowed_file(f)]
return render_template(
'admin/receipts.html',
image_files=image_files,
upload_folder=app.config['UPLOAD_FOLDER']
)
@app.route('/admin/delete_receipt/<filename>')
@login_required
def delete_receipt(filename):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
flash('Plik usunięty', 'success')
else:
flash('Plik nie istnieje', 'danger')
return redirect(url_for('admin_receipts'))
@app.route('/admin/delete_selected_lists', methods=['POST'])
@login_required
def delete_selected_lists():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
ids = request.form.getlist('list_ids')
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash('Usunięto wybrane listy', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/archive_list/<int:list_id>')
@login_required
def archive_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
l = ShoppingList.query.get_or_404(list_id)
l.is_archived = True
db.session.commit()
flash('Lista oznaczona jako archiwalna', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/delete_all_items')
@login_required
def delete_all_items():
if not current_user.is_admin:
return redirect(url_for('index_guest'))
Item.query.delete()
db.session.commit()
flash('Usunięto wszystkie produkty', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
def edit_list(list_id):
if not current_user.is_admin:
return redirect(url_for('index_guest'))
l = ShoppingList.query.get_or_404(list_id)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
if request.method == 'POST':
new_title = request.form.get('title')
new_amount_str = request.form.get('amount')
is_archived = 'archived' in request.form
new_owner_id = request.form.get('owner_id')
if new_title and new_title.strip():
l.title = new_title.strip()
l.is_archived = is_archived
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
if User.query.get(new_owner_id_int):
l.owner_id = new_owner_id_int
else:
flash('Wybrany użytkownik nie istnieje', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
except ValueError:
flash('Niepoprawny ID użytkownika', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
if expenses:
for expense in expenses:
db.session.delete(expense)
db.session.commit()
new_expense = Expense(list_id=list_id, amount=new_amount)
db.session.add(new_expense)
db.session.commit()
flash('Zaktualizowano tytuł, właściciela, archiwizację i/lub kwotę wydatku', 'success')
except ValueError:
flash('Niepoprawna kwota', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
else:
db.session.commit()
flash('Zaktualizowano tytuł, właściciela i/lub archiwizację', 'success')
return redirect(url_for('admin_panel'))
return render_template('admin/edit_list.html', list=l, total_expense=total_expense, users=users)
# chyba do usuniecia przeniesione na eventy socket.io
@app.route('/update-note/<int:item_id>', methods=['POST'])
def update_note(item_id):
item = Item.query.get_or_404(item_id)
note = request.form.get('note')
item.note = note
db.session.commit()
return {'success': True}
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on('delete_item')
def handle_delete_item(data):
item = Item.query.get(data['item_id'])
if item:
db.session.delete(item)
db.session.commit()
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
@socketio.on('edit_item')
def handle_edit_item(data):
item = Item.query.get(data['item_id'])
new_name = data['new_name']
new_quantity = data.get('new_quantity', item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit('item_edited', {
'item_id': item.id,
'new_name': item.name,
'new_quantity': item.quantity
}, to=str(item.list_id))
@socketio.on('join_list')
def handle_join(data):
global active_users
room = str(data['room'])
username = data.get('username', 'Gość')
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
shopping_list = ShoppingList.query.get(int(data['room']))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit('user_joined', {'username': username}, to=room)
emit('user_list', {'users': list(active_users[room])}, to=room)
emit('joined_confirmation', {'room': room, 'list_title': list_title})
@socketio.on('disconnect')
def handle_disconnect():
global active_users
for room, users in active_users.items():
if current_user.username in users:
users.remove(current_user.username)
emit('user_left', {'username': current_user.username}, to=room)
emit('user_list', {'users': list(users)}, to=room)
@socketio.on('add_item')
def handle_add_item(data):
list_id = data['list_id']
name = data['name']
quantity = data.get('quantity', 1)
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
added_by=current_user.id if current_user.is_authenticated else None
)
db.session.add(new_item)
if not SuggestedProduct.query.filter_by(name=name).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit('item_added', {
'id': new_item.id,
'name': new_item.name,
'quantity': new_item.quantity,
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
}, to=str(list_id), include_self=True)
@socketio.on('check_item')
def handle_check_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = True
item.purchased_at = datetime.utcnow()
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('uncheck_item')
def handle_uncheck_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('update_note')
def handle_update_note(data):
item_id = data['item_id']
note = data['note']
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id))
@socketio.on('add_expense')
def handle_add_expense(data):
list_id = data['list_id']
amount = data['amount']
new_expense = Expense(list_id=list_id, amount=amount)
db.session.add(new_expense)
db.session.commit()
total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0
emit('expense_added', {
'amount': amount,
'total': total
}, to=str(list_id))
@app.cli.command('create_db')
def create_db():
db.create_all()
print('Database created.')
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=8000, debug=True)