import os import secrets import time import mimetypes from datetime import datetime, timedelta from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort from markupsafe import Markup from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from flask_socketio import SocketIO, emit, join_room from werkzeug.security import generate_password_hash, check_password_hash from config import Config from PIL import Image from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from sqlalchemy import func, extract from collections import defaultdict, deque app = Flask(__name__) app.config.from_object(Config) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme') DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads') ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'} AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9') os.makedirs(UPLOAD_FOLDER, exist_ok=True) failed_login_attempts = defaultdict(deque) MAX_ATTEMPTS = 10 TIME_WINDOW = 60 * 60 db = SQLAlchemy(app) socketio = SocketIO(app, async_mode="eventlet") login_manager = LoginManager(app) login_manager.login_view = 'login' static_bp = Blueprint('static_bp', __name__) class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) class ShoppingList(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(150), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) owner_id = db.Column(db.Integer, db.ForeignKey('user.id')) is_temporary = db.Column(db.Boolean, default=False) share_token = db.Column(db.String(64), unique=True, nullable=True) expires_at = db.Column(db.DateTime, nullable=True) owner = db.relationship('User', backref='lists', lazy=True) is_archived = db.Column(db.Boolean, default=False) is_public = db.Column(db.Boolean, default=True) class Item(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id')) name = db.Column(db.String(150), nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) purchased = db.Column(db.Boolean, default=False) purchased_at = db.Column(db.DateTime, nullable=True) quantity = db.Column(db.Integer, default=1) note = db.Column(db.Text, nullable=True) class SuggestedProduct(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(150), unique=True, nullable=False) class Expense(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id')) amount = db.Column(db.Float, nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) receipt_filename = db.Column(db.String(255), nullable=True) with app.app_context(): # Twój kod inicjalizacyjny, np. utworzenie konta admina db.create_all() from werkzeug.security import generate_password_hash admin = User.query.filter_by(is_admin=True).first() username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') password_hash = generate_password_hash(password) if admin: # Aktualizacja jeśli dane się różnią if admin.username != username or not check_password_hash(admin.password_hash, password): admin.username = username admin.password_hash = password_hash db.session.commit() else: # Brak admina – utwórz nowe konto admin = User(username=username, password_hash=password_hash, is_admin=True) db.session.add(admin) db.session.commit() @static_bp.route('/static/js/') def serve_js(filename): response = send_from_directory('static/js', filename) response.cache_control.no_cache = True response.cache_control.no_store = True response.cache_control.must_revalidate = True #response.expires = 0 response.pragma = 'no-cache' response.headers.pop('Content-Disposition', None) response.headers.pop('Etag', None) return response @static_bp.route('/static/css/') def serve_css(filename): response = send_from_directory('static/css', filename) #response.cache_control.public = True #response.cache_control.max_age = 3600 response.headers['Cache-Control'] = 'public, max-age=3600' response.headers.pop('Content-Disposition', None) response.headers.pop('Etag', None) #response.expires = 0 return response app.register_blueprint(static_bp) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_progress(list_id): items = Item.query.filter_by(list_id=list_id).all() total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 return purchased_count, total_count, percent def delete_receipts_for_list(list_id): receipt_pattern = f"list_{list_id}_" upload_folder = app.config['UPLOAD_FOLDER'] for filename in os.listdir(upload_folder): if filename.startswith(receipt_pattern): try: os.remove(os.path.join(upload_folder, filename)) except Exception as e: print(f"Nie udało się usunąć pliku {filename}: {e}") # zabezpieczenie logowani do systemy - błędne hasła def is_ip_blocked(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() return len(attempts) >= MAX_ATTEMPTS def register_failed_attempt(ip): now = time.time() attempts = failed_login_attempts[ip] while attempts and now - attempts[0] > TIME_WINDOW: attempts.popleft() attempts.append(now) def reset_failed_attempts(ip): failed_login_attempts[ip].clear() def attempts_remaining(ip): attempts = failed_login_attempts[ip] return max(0, MAX_ATTEMPTS - len(attempts)) #################################################### @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @app.context_processor def inject_time(): return dict(time=time) @app.context_processor def inject_has_authorized_cookie(): return {'has_authorized_cookie': 'authorized' in request.cookies} @app.before_request def require_system_password(): if 'authorized' not in request.cookies \ and request.endpoint != 'system_auth' \ and not request.endpoint.startswith('login') \ and request.endpoint != 'favicon': # specjalny wyjątek dla statycznych, ale sprawdzany ręcznie niżej if request.endpoint == 'static_bp.serve_js': # tu sprawdzamy czy to JS, który ma być chroniony protected_js = ["live.js", "list_guest.js", "hide_list.js", "socket_reconnect.js"] requested_file = request.view_args.get("filename", "") if requested_file in protected_js: return redirect(url_for('system_auth', next=request.url)) else: return # pozwól na inne pliki statyczne if request.endpoint.startswith('static_bp.'): return # np. CSS, favicon, inne — pozwól if request.path == '/': return redirect(url_for('system_auth')) else: from urllib.parse import urlparse, urlunparse parsed = urlparse(request.url) fixed_url = urlunparse(parsed._replace(netloc=request.host)) return redirect(url_for('system_auth', next=fixed_url)) @app.template_filter('filemtime') def file_mtime_filter(path): try: t = os.path.getmtime(path) return datetime.fromtimestamp(t) except Exception: return datetime.utcnow() @app.template_filter('filesizeformat') def filesizeformat_filter(path): try: size = os.path.getsize(path) for unit in ['B', 'KB', 'MB', 'GB']: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 return f"{size:.1f} TB" except Exception: return "N/A" @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(403) def forbidden(e): return '403 Forbidden', 403 @app.route('/favicon.svg') def favicon(): svg = ''' 🛒 ''' return svg, 200, {'Content-Type': 'image/svg+xml'} @app.route('/') def index_guest(): now = datetime.utcnow() if current_user.is_authenticated: # Twoje listy user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter( (ShoppingList.expires_at == None) | (ShoppingList.expires_at > now) ).order_by(ShoppingList.created_at.desc()).all() public_lists = ShoppingList.query.filter( ShoppingList.is_public == True, ShoppingList.owner_id != current_user.id, ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), ShoppingList.is_archived == False ).order_by(ShoppingList.created_at.desc()).all() else: user_lists = [] public_lists = ShoppingList.query.filter( ShoppingList.is_public == True, ((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)), ShoppingList.is_archived == False ).order_by(ShoppingList.created_at.desc()).all() for l in user_lists + public_lists: items = Item.query.filter_by(list_id=l.id).all() l.total_count = len(items) l.purchased_count = len([i for i in items if i.purchased]) expenses = Expense.query.filter_by(list_id=l.id).all() l.total_expense = sum(e.amount for e in expenses) return render_template("main.html", user_lists=user_lists, public_lists=public_lists) @app.route('/system-auth', methods=['GET', 'POST']) def system_auth(): #ip = request.remote_addr ip = request.access_route[0] next_page = request.args.get('next') or url_for('index_guest') if is_ip_blocked(ip): flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger') return render_template('system_auth.html'), 403 if request.method == 'POST': if request.form['password'] == SYSTEM_PASSWORD: reset_failed_attempts(ip) resp = redirect(next_page) resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE) return resp else: register_failed_attempt(ip) if is_ip_blocked(ip): flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger') return render_template('system_auth.html'), 403 remaining = attempts_remaining(ip) flash(f'Nieprawidłowe hasło do systemu. Pozostało prób: {remaining}', 'warning') return render_template('system_auth.html') @app.route('/archive_my_list/') @login_required def archive_my_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: flash('Nie masz uprawnień do tej listy', 'danger') return redirect(url_for('index_guest')) l.is_archived = True db.session.commit() flash('Lista została zarchiwizowana', 'success') return redirect(url_for('index_guest')) @app.route('/edit_my_list/', methods=['GET', 'POST']) @login_required def edit_my_list(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: flash('Nie masz uprawnień do tej listy', 'danger') return redirect(url_for('index_guest')) if request.method == 'POST': new_title = request.form.get('title') if new_title and new_title.strip(): l.title = new_title.strip() db.session.commit() flash('Zaktualizowano tytuł listy', 'success') return redirect(url_for('index_guest')) else: flash('Podaj poprawny tytuł', 'danger') return render_template('edit_my_list.html', list=l) @app.route('/toggle_visibility/', methods=['GET', 'POST']) @login_required def toggle_visibility(list_id): l = ShoppingList.query.get_or_404(list_id) if l.owner_id != current_user.id: if request.is_json or request.method == 'POST': return {'error': 'Unauthorized'}, 403 flash('Nie masz uprawnień do tej listy', 'danger') return redirect(url_for('index_guest')) l.is_public = not l.is_public db.session.commit() share_url = f"{request.url_root}share/{l.share_token}" if request.is_json or request.method == 'POST': return {'is_public': l.is_public, 'share_url': share_url} if l.is_public: flash('Lista została udostępniona publicznie', 'success') else: flash('Lista została ukryta przed gośćmi', 'info') return redirect(url_for('index_guest')) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': user = User.query.filter_by(username=request.form['username']).first() if user and check_password_hash(user.password_hash, request.form['password']): login_user(user) flash('Zalogowano pomyślnie', 'success') return redirect(url_for('index_guest')) flash('Nieprawidłowy login lub hasło', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() flash('Wylogowano pomyślnie', 'success') return redirect(url_for('index_guest')) @app.route('/create', methods=['POST']) @login_required def create_list(): title = request.form.get('title') is_temporary = 'temporary' in request.form #token = secrets.token_hex(16) token = secrets.token_hex(4) expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at) db.session.add(new_list) db.session.commit() flash('Utworzono nową listę', 'success') return redirect(url_for('view_list', list_id=new_list.id)) @app.route('/list/') @login_required def view_list(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).all() total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) receipt_pattern = f"list_{list_id}" all_files = os.listdir(app.config['UPLOAD_FOLDER']) receipt_files = [f for f in all_files if receipt_pattern in f] return render_template( 'list.html', list=shopping_list, items=items, receipt_files=receipt_files, total_count=total_count, purchased_count=purchased_count, percent=percent, expenses=expenses, total_expense=total_expense ) @app.route('/share/') def share_list(token): shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() if not shopping_list.is_public: flash('Ta lista nie jest publicznie dostępna', 'danger') return redirect(url_for('index_guest')) items = Item.query.filter_by(list_id=shopping_list.id).all() receipt_pattern = f"list_{shopping_list.id}" all_files = os.listdir(app.config['UPLOAD_FOLDER']) receipt_files = [f for f in all_files if receipt_pattern in f] expenses = Expense.query.filter_by(list_id=shopping_list.id).all() total_expense = sum(e.amount for e in expenses) return render_template( 'list_guest.html', list=shopping_list, items=items, receipt_files=receipt_files, expenses=expenses, total_expense=total_expense ) @app.route('/guest-list/') def guest_list(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).all() receipt_pattern = f"list_{list_id}" all_files = os.listdir(app.config['UPLOAD_FOLDER']) receipt_files = [f for f in all_files if receipt_pattern in f] expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) return render_template( 'list_guest.html', list=shopping_list, items=items, receipt_files=receipt_files, expenses=expenses, total_expense=total_expense ) @app.route('/copy/') @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) token = secrets.token_hex(16) new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token) db.session.add(new_list) db.session.commit() original_items = Item.query.filter_by(list_id=original.id).all() for item in original_items: copy_item = Item(list_id=new_list.id, name=item.name) db.session.add(copy_item) db.session.commit() flash('Skopiowano listę', 'success') return redirect(url_for('view_list', list_id=new_list.id)) @app.route('/suggest_products') def suggest_products(): query = request.args.get('q', '') suggestions = [] if query: suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all() return {'suggestions': [s.name for s in suggestions]} @app.route('/upload_receipt/', methods=['POST']) def upload_receipt(list_id): if 'receipt' not in request.files: flash('Brak pliku', 'danger') return redirect(request.referrer) file = request.files['receipt'] if file.filename == '': flash('Nie wybrano pliku', 'danger') return redirect(request.referrer) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}") img = Image.open(file) img.thumbnail((2000, 2000)) img.save(file_path) flash('Wgrano paragon', 'success') return redirect(request.referrer) flash('Niedozwolony format pliku', 'danger') return redirect(request.referrer) @app.route('/uploads/') def uploaded_file(filename): response = send_from_directory(app.config['UPLOAD_FOLDER'], filename) response.headers['Cache-Control'] = 'public, max-age=2592000, immutable' response.headers.pop('Pragma', None) response.headers.pop('Content-Disposition', None) mime, _ = mimetypes.guess_type(filename) if mime: response.headers['Content-Type'] = mime return response @app.route('/admin') @login_required def admin_panel(): if not current_user.is_admin: return redirect(url_for('index_guest')) user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all() all_files = os.listdir(app.config['UPLOAD_FOLDER']) enriched_lists = [] for l in all_lists: items = Item.query.filter_by(list_id=l.id).all() total_count = len(items) purchased_count = len([i for i in items if i.purchased]) percent = (purchased_count / total_count * 100) if total_count > 0 else 0 comments_count = len([i for i in items if i.note and i.note.strip() != '']) expenses = Expense.query.filter_by(list_id=l.id).all() total_expense = sum(e.amount for e in expenses) receipt_pattern = f"list_{l.id}" receipt_files = [f for f in all_files if receipt_pattern in f] enriched_lists.append({ 'list': l, 'total_count': total_count, 'purchased_count': purchased_count, 'percent': round(percent), 'comments_count': comments_count, 'receipts_count': len(receipt_files), 'total_expense': total_expense }) top_products = ( db.session.query(Item.name, func.count(Item.id).label('count')) .filter(Item.purchased == True) .group_by(Item.name) .order_by(func.count(Item.id).desc()) .limit(5) .all() ) purchased_items_count = Item.query.filter_by(purchased=True).count() total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0 current_year = datetime.utcnow().year year_expense_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract('year', Expense.added_at) == current_year) .scalar() or 0 ) current_month = datetime.utcnow().month month_expense_sum = ( db.session.query(func.sum(Expense.amount)) .filter(extract('year', Expense.added_at) == current_year) .filter(extract('month', Expense.added_at) == current_month) .scalar() or 0 ) return render_template( 'admin/admin_panel.html', user_count=user_count, list_count=list_count, item_count=item_count, purchased_items_count=purchased_items_count, enriched_lists=enriched_lists, top_products=top_products, total_expense_sum=total_expense_sum, year_expense_sum=year_expense_sum, month_expense_sum=month_expense_sum, ) @app.route('/admin/delete_list/') @login_required def delete_list(list_id): if not current_user.is_admin: return redirect(url_for('index_guest')) delete_receipts_for_list(list_id) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() Expense.query.filter_by(list_id=list_to_delete.id).delete() db.session.delete(list_to_delete) db.session.commit() flash(f'Usunięto listę: {list_to_delete.title}', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_user', methods=['GET', 'POST']) @login_required def add_user(): if not current_user.is_admin: return redirect(url_for('index_guest')) if request.method == 'POST': username = request.form['username'] password = generate_password_hash(request.form['password']) new_user = User(username=username, password_hash=password) db.session.add(new_user) db.session.commit() flash('Dodano nowego użytkownika', 'success') return redirect(url_for('admin_panel')) return render_template('admin/add_user.html') @app.route('/admin/users') @login_required def list_users(): if not current_user.is_admin: return redirect(url_for('index_guest')) users = User.query.all() user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"] return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log) @app.route('/admin/reset_password/', methods=['GET', 'POST']) @login_required def reset_password(user_id): if not current_user.is_admin: return redirect(url_for('index_guest')) user = User.query.get_or_404(user_id) if request.method == 'POST': new_password = generate_password_hash(request.form['password']) user.password_hash = new_password db.session.commit() flash('Hasło zresetowane', 'success') return redirect(url_for('list_users')) return render_template('admin/reset_password.html', user=user) @app.route('/admin/delete_user/') @login_required def delete_user(user_id): if not current_user.is_admin: return redirect(url_for('index_guest')) user = User.query.get_or_404(user_id) db.session.delete(user) db.session.commit() flash('Użytkownik usunięty', 'success') return redirect(url_for('list_users')) @app.route('/admin/receipts') @login_required def admin_receipts(): if not current_user.is_admin: return redirect(url_for('index_guest')) all_files = os.listdir(app.config['UPLOAD_FOLDER']) image_files = [f for f in all_files if allowed_file(f)] return render_template( 'admin/receipts.html', image_files=image_files, upload_folder=app.config['UPLOAD_FOLDER'] ) @app.route('/admin/delete_receipt/') @login_required def delete_receipt(filename): if not current_user.is_admin: return redirect(url_for('index_guest')) file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(file_path): os.remove(file_path) flash('Plik usunięty', 'success') else: flash('Plik nie istnieje', 'danger') return redirect(url_for('admin_receipts')) @app.route('/admin/delete_selected_lists', methods=['POST']) @login_required def delete_selected_lists(): if not current_user.is_admin: return redirect(url_for('index_guest')) ids = request.form.getlist('list_ids') for list_id in ids: lst = ShoppingList.query.get(int(list_id)) if lst: delete_receipts_for_list(lst.id) Item.query.filter_by(list_id=lst.id).delete() Expense.query.filter_by(list_id=lst.id).delete() db.session.delete(lst) db.session.commit() flash('Usunięto wybrane listy', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/archive_list/') @login_required def archive_list(list_id): if not current_user.is_admin: return redirect(url_for('index_guest')) l = ShoppingList.query.get_or_404(list_id) l.is_archived = True db.session.commit() flash('Lista oznaczona jako archiwalna', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/delete_all_items') @login_required def delete_all_items(): if not current_user.is_admin: return redirect(url_for('index_guest')) Item.query.delete() db.session.commit() flash('Usunięto wszystkie produkty', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/edit_list/', methods=['GET', 'POST']) @login_required def edit_list(list_id): if not current_user.is_admin: return redirect(url_for('index_guest')) l = ShoppingList.query.get_or_404(list_id) expenses = Expense.query.filter_by(list_id=list_id).all() total_expense = sum(e.amount for e in expenses) users = User.query.all() if request.method == 'POST': new_title = request.form.get('title') new_amount_str = request.form.get('amount') is_archived = 'archived' in request.form new_owner_id = request.form.get('owner_id') if new_title and new_title.strip(): l.title = new_title.strip() l.is_archived = is_archived if new_owner_id: try: new_owner_id_int = int(new_owner_id) if User.query.get(new_owner_id_int): l.owner_id = new_owner_id_int else: flash('Wybrany użytkownik nie istnieje', 'danger') return redirect(url_for('edit_list', list_id=list_id)) except ValueError: flash('Niepoprawny ID użytkownika', 'danger') return redirect(url_for('edit_list', list_id=list_id)) if new_amount_str: try: new_amount = float(new_amount_str) if expenses: for expense in expenses: db.session.delete(expense) db.session.commit() new_expense = Expense(list_id=list_id, amount=new_amount) db.session.add(new_expense) db.session.commit() flash('Zaktualizowano tytuł, właściciela, archiwizację i/lub kwotę wydatku', 'success') except ValueError: flash('Niepoprawna kwota', 'danger') return redirect(url_for('edit_list', list_id=list_id)) else: db.session.commit() flash('Zaktualizowano tytuł, właściciela i/lub archiwizację', 'success') return redirect(url_for('admin_panel')) return render_template('admin/edit_list.html', list=l, total_expense=total_expense, users=users) # chyba do usuniecia przeniesione na eventy socket.io @app.route('/update-note/', methods=['POST']) def update_note(item_id): item = Item.query.get_or_404(item_id) note = request.form.get('note') item.note = note db.session.commit() return {'success': True} # ========================================================================================= # SOCKET.IO # ========================================================================================= @socketio.on('delete_item') def handle_delete_item(data): item = Item.query.get(data['item_id']) if item: db.session.delete(item) db.session.commit() emit('item_deleted', {'item_id': item.id}, to=str(item.list_id)) @socketio.on('edit_item') def handle_edit_item(data): item = Item.query.get(data['item_id']) new_name = data['new_name'] new_quantity = data.get('new_quantity', item.quantity) if item and new_name.strip(): item.name = new_name.strip() try: new_quantity = int(new_quantity) if new_quantity < 1: new_quantity = 1 except: new_quantity = 1 item.quantity = new_quantity db.session.commit() emit('item_edited', { 'item_id': item.id, 'new_name': item.name, 'new_quantity': item.quantity }, to=str(item.list_id)) @socketio.on('join_list') def handle_join(data): room = str(data['room']) username = data.get('username', 'Gość') join_room(room) emit('user_joined', {'username': username}, to=room) emit('joined_confirmation', {'room': room}) @socketio.on('add_item') def handle_add_item(data): list_id = data['list_id'] name = data['name'] quantity = data.get('quantity', 1) try: quantity = int(quantity) if quantity < 1: quantity = 1 except: quantity = 1 new_item = Item( list_id=list_id, name=name, quantity=quantity, added_by=current_user.id if current_user.is_authenticated else None ) db.session.add(new_item) if not SuggestedProduct.query.filter_by(name=name).first(): new_suggestion = SuggestedProduct(name=name) db.session.add(new_suggestion) db.session.commit() emit('item_added', { 'id': new_item.id, 'name': new_item.name, 'quantity': new_item.quantity, 'added_by': current_user.username if current_user.is_authenticated else 'Gość' }, to=str(list_id), include_self=True) @socketio.on('check_item') def handle_check_item(data): item = Item.query.get(data['item_id']) if item: item.purchased = True item.purchased_at = datetime.utcnow() db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit('item_checked', {'item_id': item.id}, to=str(item.list_id)) emit('progress_updated', { 'purchased_count': purchased_count, 'total_count': total_count, 'percent': percent }, to=str(item.list_id)) @socketio.on('uncheck_item') def handle_uncheck_item(data): item = Item.query.get(data['item_id']) if item: item.purchased = False item.purchased_at = None db.session.commit() purchased_count, total_count, percent = get_progress(item.list_id) emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id)) emit('progress_updated', { 'purchased_count': purchased_count, 'total_count': total_count, 'percent': percent }, to=str(item.list_id)) @socketio.on('update_note') def handle_update_note(data): item_id = data['item_id'] note = data['note'] item = Item.query.get(item_id) if item: item.note = note db.session.commit() emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id)) @socketio.on('add_expense') def handle_add_expense(data): list_id = data['list_id'] amount = data['amount'] new_expense = Expense(list_id=list_id, amount=amount) db.session.add(new_expense) db.session.commit() total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0 emit('expense_added', { 'amount': amount, 'total': total }, to=str(list_id)) @app.cli.command('create_db') def create_db(): db.create_all() print('Database created.') if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=8000, debug=True)