import os import secrets import time from datetime import datetime, timedelta from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from flask_socketio import SocketIO, emit, join_room from werkzeug.security import generate_password_hash, check_password_hash from config import Config app = Flask(__name__) app.config.from_object(Config) SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme') DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') db = SQLAlchemy(app) socketio = SocketIO(app) login_manager = LoginManager(app) login_manager.login_view = 'login' static_bp = Blueprint('static_bp', __name__) @static_bp.route('/static/js/live.js') def serve_live_js(): response = send_from_directory('static/js', 'live.js') response.cache_control.no_cache = True response.cache_control.no_store = True response.cache_control.must_revalidate = True response.expires = 0 response.pragma = 'no-cache' return response app.register_blueprint(static_bp) class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) class ShoppingList(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(150), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) owner_id = db.Column(db.Integer, db.ForeignKey('user.id')) is_temporary = db.Column(db.Boolean, default=False) share_token = db.Column(db.String(64), unique=True, nullable=True) expires_at = db.Column(db.DateTime, nullable=True) owner = db.relationship('User', backref='lists', lazy=True) class Item(db.Model): id = db.Column(db.Integer, primary_key=True) list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id')) name = db.Column(db.String(150), nullable=False) added_at = db.Column(db.DateTime, default=datetime.utcnow) added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True) purchased = db.Column(db.Boolean, default=False) purchased_at = db.Column(db.DateTime, nullable=True) class SuggestedProduct(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(150), unique=True, nullable=False) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @app.context_processor def inject_time(): return dict(time=time) @app.before_request def require_system_password(): if 'authorized' not in request.cookies \ and request.endpoint != 'system_auth' \ and not request.endpoint.startswith('static') \ and not request.endpoint.startswith('login'): return redirect(url_for('system_auth', next=request.url)) @app.route('/system-auth', methods=['GET', 'POST']) def system_auth(): DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin') DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123') next_page = request.args.get('next') or url_for('index_guest') if request.method == 'POST': if request.form['password'] == SYSTEM_PASSWORD: db.create_all() if not User.query.filter_by(is_admin=True).first(): admin_user = User( username=DEFAULT_ADMIN_USERNAME, password_hash=generate_password_hash(DEFAULT_ADMIN_PASSWORD), is_admin=True ) db.session.add(admin_user) db.session.commit() flash(f'Utworzono konto administratora: login={DEFAULT_ADMIN_USERNAME}, hasło={DEFAULT_ADMIN_PASSWORD}') resp = redirect(next_page) resp.set_cookie('authorized', 'true') return resp flash('Nieprawidłowe hasło do systemu') return render_template('system_auth.html') @app.route('/') def index_guest(): lists = ShoppingList.query.all() for l in lists: items = Item.query.filter_by(list_id=l.id).all() l.total_count = len(items) l.purchased_count = len([i for i in items if i.purchased]) return render_template('index.html', lists=lists) @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': user = User.query.filter_by(username=request.form['username']).first() if user and check_password_hash(user.password_hash, request.form['password']): login_user(user) flash('Zalogowano pomyślnie', 'success') return redirect(url_for('index_guest')) flash('Nieprawidłowy login lub hasło', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() flash('Wylogowano pomyślnie', 'success') return redirect(url_for('login')) @app.route('/create', methods=['POST']) @login_required def create_list(): title = request.form.get('title') is_temporary = 'temporary' in request.form token = secrets.token_hex(16) expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at) db.session.add(new_list) db.session.commit() flash('Utworzono nową listę', 'success') return redirect(url_for('view_list', list_id=new_list.id)) @app.route('/list/') @login_required def view_list(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).all() return render_template('list.html', list=shopping_list, items=items) @app.route('/share/') def share_list(token): shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404() items = Item.query.filter_by(list_id=shopping_list.id).all() return render_template('list_guest.html', list=shopping_list, items=items) @app.route('/guest-list/') def guest_list(list_id): shopping_list = ShoppingList.query.get_or_404(list_id) items = Item.query.filter_by(list_id=list_id).all() return render_template('list_guest.html', list=shopping_list, items=items) @app.route('/copy/') @login_required def copy_list(list_id): original = ShoppingList.query.get_or_404(list_id) token = secrets.token_hex(16) new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token) db.session.add(new_list) db.session.commit() original_items = Item.query.filter_by(list_id=original.id).all() for item in original_items: copy_item = Item(list_id=new_list.id, name=item.name) db.session.add(copy_item) db.session.commit() flash('Skopiowano listę', 'success') return redirect(url_for('view_list', list_id=new_list.id)) @app.route('/suggest_products') def suggest_products(): query = request.args.get('q', '') suggestions = [] if query: suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all() return {'suggestions': [s.name for s in suggestions]} @app.route('/admin') @login_required def admin_panel(): if not current_user.is_admin: return redirect(url_for('index_guest')) user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all() return render_template('admin/admin_panel.html', user_count=user_count, list_count=list_count, item_count=item_count, all_lists=all_lists) @app.route('/admin/delete_list/') @login_required def delete_list(list_id): if not current_user.is_admin: return redirect(url_for('index_guest')) list_to_delete = ShoppingList.query.get_or_404(list_id) Item.query.filter_by(list_id=list_to_delete.id).delete() db.session.delete(list_to_delete) db.session.commit() flash(f'Usunięto listę: {list_to_delete.title}', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/delete_all_lists') @login_required def delete_all_lists(): if not current_user.is_admin: return redirect(url_for('index_guest')) Item.query.delete() ShoppingList.query.delete() db.session.commit() flash('Usunięto wszystkie listy', 'success') return redirect(url_for('admin_panel')) @app.route('/admin/add_user', methods=['GET', 'POST']) @login_required def add_user(): if not current_user.is_admin: return redirect(url_for('index_guest')) if request.method == 'POST': username = request.form['username'] password = generate_password_hash(request.form['password']) new_user = User(username=username, password_hash=password) db.session.add(new_user) db.session.commit() flash('Dodano nowego użytkownika', 'success') return redirect(url_for('admin_panel')) return render_template('admin/add_user.html') @app.route('/admin/users') @login_required def list_users(): if not current_user.is_admin: return redirect(url_for('index_guest')) users = User.query.all() user_count = User.query.count() list_count = ShoppingList.query.count() item_count = Item.query.count() activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"] return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log) @app.route('/admin/reset_password/', methods=['GET', 'POST']) @login_required def reset_password(user_id): if not current_user.is_admin: return redirect(url_for('index_guest')) user = User.query.get_or_404(user_id) if request.method == 'POST': new_password = generate_password_hash(request.form['password']) user.password_hash = new_password db.session.commit() flash('Hasło zresetowane', 'success') return redirect(url_for('list_users')) return render_template('admin/reset_password.html', user=user) @app.route('/admin/delete_user/') @login_required def delete_user(user_id): if not current_user.is_admin: return redirect(url_for('index_guest')) user = User.query.get_or_404(user_id) db.session.delete(user) db.session.commit() flash('Użytkownik usunięty', 'success') return redirect(url_for('list_users')) @socketio.on('delete_item') def handle_delete_item(data): item = Item.query.get(data['item_id']) if item: db.session.delete(item) db.session.commit() emit('item_deleted', {'item_id': item.id}, to=str(item.list_id)) @socketio.on('edit_item') def handle_edit_item(data): item = Item.query.get(data['item_id']) new_name = data['new_name'] if item and new_name.strip(): item.name = new_name db.session.commit() emit('item_edited', {'item_id': item.id, 'new_name': item.name}, to=str(item.list_id)) @socketio.on('join_list') def handle_join(data): room = str(data['room']) username = data.get('username', 'Gość') join_room(room) emit('user_joined', {'username': username}, to=room) @socketio.on('add_item') def handle_add_item(data): list_id = data['list_id'] name = data['name'] new_item = Item( list_id=list_id, name=name, added_by=current_user.id if current_user.is_authenticated else None ) db.session.add(new_item) if not SuggestedProduct.query.filter_by(name=name).first(): new_suggestion = SuggestedProduct(name=name) db.session.add(new_suggestion) db.session.commit() emit('item_added', { 'id': new_item.id, 'name': new_item.name, 'added_by': current_user.username if current_user.is_authenticated else 'Gość' }, to=str(list_id), include_self=True) @socketio.on('check_item') def handle_check_item(data): item = Item.query.get(data['item_id']) if item: item.purchased = True item.purchased_at = datetime.utcnow() db.session.commit() emit('item_checked', {'item_id': item.id}, to=str(item.list_id)) @socketio.on('uncheck_item') def handle_uncheck_item(data): item = Item.query.get(data['item_id']) if item: item.purchased = False item.purchased_at = None db.session.commit() emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id)) @app.cli.command('create_db') def create_db(): db.create_all() print('Database created.') if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=8000, debug=True)