282 lines
10 KiB
Python
282 lines
10 KiB
Python
import os
|
|
import secrets
|
|
from datetime import datetime, timedelta
|
|
from flask import Flask, render_template, redirect, url_for, request, flash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from flask_socketio import SocketIO, emit, join_room
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from config import Config
|
|
|
|
app = Flask(__name__)
|
|
app.config.from_object(Config)
|
|
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
|
|
|
|
db = SQLAlchemy(app)
|
|
socketio = SocketIO(app)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(150), nullable=False)
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
|
|
class ShoppingList(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
title = db.Column(db.String(150), nullable=False)
|
|
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
|
is_temporary = db.Column(db.Boolean, default=False)
|
|
share_token = db.Column(db.String(64), unique=True, nullable=True)
|
|
expires_at = db.Column(db.DateTime, nullable=True)
|
|
|
|
class Item(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
|
|
name = db.Column(db.String(150), nullable=False)
|
|
added_at = db.Column(db.DateTime, default=datetime.utcnow)
|
|
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
|
purchased = db.Column(db.Boolean, default=False)
|
|
purchased_at = db.Column(db.DateTime, nullable=True)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
@app.before_request
|
|
def require_system_password():
|
|
if 'authorized' not in request.cookies and request.endpoint != 'system_auth' and not request.endpoint.startswith('static') and not request.endpoint.startswith('login'):
|
|
return redirect(url_for('system_auth'))
|
|
|
|
@app.route('/system-auth', methods=['GET', 'POST'])
|
|
def system_auth():
|
|
if request.method == 'POST':
|
|
if request.form['password'] == SYSTEM_PASSWORD:
|
|
db.create_all()
|
|
if not User.query.filter_by(is_admin=True).first():
|
|
admin_user = User(username='admin', password_hash=generate_password_hash('admin123'), is_admin=True)
|
|
db.session.add(admin_user)
|
|
db.session.commit()
|
|
flash('Utworzono konto administratora: login=admin, hasło=admin123')
|
|
resp = redirect(url_for('index_guest'))
|
|
resp.set_cookie('authorized', 'true')
|
|
return resp
|
|
flash('Nieprawidłowe hasło do systemu')
|
|
return render_template('system_auth.html')
|
|
|
|
@app.route('/')
|
|
def index_guest():
|
|
lists = ShoppingList.query.all()
|
|
return render_template('index.html', lists=lists)
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
user = User.query.filter_by(username=request.form['username']).first()
|
|
if user and check_password_hash(user.password_hash, request.form['password']):
|
|
login_user(user)
|
|
flash('Zalogowano pomyślnie')
|
|
return redirect(url_for('dashboard'))
|
|
flash('Nieprawidłowy login lub hasło')
|
|
return render_template('login.html')
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
lists = ShoppingList.query.filter_by(owner_id=current_user.id).all()
|
|
return render_template('dashboard.html', lists=lists)
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('login'))
|
|
|
|
@app.route('/create', methods=['POST'])
|
|
@login_required
|
|
def create_list():
|
|
title = request.form.get('title')
|
|
is_temporary = 'temporary' in request.form
|
|
token = secrets.token_hex(16)
|
|
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
|
|
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
|
|
db.session.add(new_list)
|
|
db.session.commit()
|
|
return redirect(url_for('view_list', list_id=new_list.id))
|
|
|
|
@app.route('/list/<int:list_id>')
|
|
@login_required
|
|
def view_list(list_id):
|
|
shopping_list = ShoppingList.query.get_or_404(list_id)
|
|
items = Item.query.filter_by(list_id=list_id).all()
|
|
return render_template('list.html', list=shopping_list, items=items)
|
|
|
|
@app.route('/share/<token>')
|
|
def share_list(token):
|
|
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
|
|
items = Item.query.filter_by(list_id=shopping_list.id).all()
|
|
return render_template('list_guest.html', list=shopping_list, items=items)
|
|
|
|
@app.route('/copy/<int:list_id>')
|
|
@login_required
|
|
def copy_list(list_id):
|
|
original = ShoppingList.query.get_or_404(list_id)
|
|
token = secrets.token_hex(16)
|
|
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
|
|
db.session.add(new_list)
|
|
db.session.commit()
|
|
original_items = Item.query.filter_by(list_id=original.id).all()
|
|
for item in original_items:
|
|
copy_item = Item(list_id=new_list.id, name=item.name)
|
|
db.session.add(copy_item)
|
|
db.session.commit()
|
|
return redirect(url_for('view_list', list_id=new_list.id))
|
|
|
|
@app.route('/admin')
|
|
@login_required
|
|
def admin_panel():
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
user_count = User.query.count()
|
|
list_count = ShoppingList.query.count()
|
|
item_count = Item.query.count()
|
|
all_lists = ShoppingList.query.all()
|
|
return render_template('admin/admin_panel.html', user_count=user_count, list_count=list_count, item_count=item_count, all_lists=all_lists)
|
|
|
|
@app.route('/admin/delete_list/<int:list_id>')
|
|
@login_required
|
|
def delete_list(list_id):
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
list_to_delete = ShoppingList.query.get_or_404(list_id)
|
|
# Usuń wszystkie powiązane produkty
|
|
Item.query.filter_by(list_id=list_to_delete.id).delete()
|
|
db.session.delete(list_to_delete)
|
|
db.session.commit()
|
|
flash(f'Usunięto listę: {list_to_delete.title}')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
@app.route('/admin/delete_all_lists')
|
|
@login_required
|
|
def delete_all_lists():
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
Item.query.delete()
|
|
ShoppingList.query.delete()
|
|
db.session.commit()
|
|
flash('Usunięto wszystkie listy')
|
|
return redirect(url_for('admin_panel'))
|
|
|
|
@app.route('/admin/add_user', methods=['GET', 'POST'])
|
|
@login_required
|
|
def add_user():
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = generate_password_hash(request.form['password'])
|
|
new_user = User(username=username, password_hash=password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash('Dodano nowego użytkownika')
|
|
return redirect(url_for('admin_panel'))
|
|
return render_template('admin/add_user.html')
|
|
|
|
@app.route('/admin/users')
|
|
@login_required
|
|
def list_users():
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
users = User.query.all()
|
|
user_count = User.query.count()
|
|
list_count = ShoppingList.query.count()
|
|
item_count = Item.query.count()
|
|
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
|
|
return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
|
|
|
|
@app.route('/admin/reset_password/<int:user_id>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def reset_password(user_id):
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
user = User.query.get_or_404(user_id)
|
|
if request.method == 'POST':
|
|
new_password = generate_password_hash(request.form['password'])
|
|
user.password_hash = new_password
|
|
db.session.commit()
|
|
flash('Hasło zresetowane')
|
|
return redirect(url_for('list_users'))
|
|
return render_template('admin/reset_password.html', user=user)
|
|
|
|
@app.route('/admin/delete_user/<int:user_id>')
|
|
@login_required
|
|
def delete_user(user_id):
|
|
if not current_user.is_admin:
|
|
return redirect(url_for('index_guest'))
|
|
user = User.query.get_or_404(user_id)
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
flash('Użytkownik usunięty')
|
|
return redirect(url_for('list_users'))
|
|
|
|
@socketio.on('delete_item')
|
|
def handle_delete_item(data):
|
|
item = Item.query.get(data['item_id'])
|
|
if item:
|
|
db.session.delete(item)
|
|
db.session.commit()
|
|
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
|
|
|
|
@socketio.on('edit_item')
|
|
def handle_edit_item(data):
|
|
item = Item.query.get(data['item_id'])
|
|
new_name = data['new_name']
|
|
if item and new_name.strip():
|
|
item.name = new_name
|
|
db.session.commit()
|
|
emit('item_edited', {'item_id': item.id, 'new_name': item.name}, to=str(item.list_id))
|
|
|
|
|
|
@socketio.on('join_list')
|
|
def handle_join(data):
|
|
room = str(data['room'])
|
|
username = data.get('username', 'Gość')
|
|
join_room(room)
|
|
emit('user_joined', {'username': username}, to=room)
|
|
|
|
@socketio.on('add_item')
|
|
def handle_add_item(data):
|
|
list_id = data['list_id']
|
|
name = data['name']
|
|
new_item = Item(
|
|
list_id=list_id,
|
|
name=name,
|
|
added_by=current_user.id if current_user.is_authenticated else None
|
|
)
|
|
db.session.add(new_item)
|
|
db.session.commit()
|
|
emit('item_added', {
|
|
'id': new_item.id,
|
|
'name': new_item.name,
|
|
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
|
|
}, to=str(list_id), include_self=True)
|
|
|
|
@socketio.on('check_item')
|
|
def handle_check_item(data):
|
|
item = Item.query.get(data['item_id'])
|
|
if item:
|
|
item.purchased = True
|
|
item.purchased_at = datetime.utcnow()
|
|
db.session.commit()
|
|
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
|
|
|
|
@app.cli.command('create_db')
|
|
def create_db():
|
|
db.create_all()
|
|
print('Database created.')
|
|
|
|
if __name__ == '__main__':
|
|
socketio.run(app, debug=True)
|