984 lines
35 KiB
Python
984 lines
35 KiB
Python
import os
|
||
import secrets
|
||
import time
|
||
import mimetypes
|
||
from datetime import datetime, timedelta
|
||
from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session
|
||
from markupsafe import Markup
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||
from flask_socketio import SocketIO, emit, join_room
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from config import Config
|
||
from PIL import Image
|
||
from werkzeug.utils import secure_filename
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
from sqlalchemy import func, extract
|
||
from collections import defaultdict, deque
|
||
|
||
|
||
|
||
app = Flask(__name__)
|
||
app.config.from_object(Config)
|
||
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
|
||
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
|
||
DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
|
||
DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
|
||
UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads')
|
||
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
|
||
AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9')
|
||
|
||
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
||
|
||
failed_login_attempts = defaultdict(deque)
|
||
MAX_ATTEMPTS = 10
|
||
TIME_WINDOW = 60 * 60
|
||
|
||
db = SQLAlchemy(app)
|
||
socketio = SocketIO(app, async_mode="eventlet")
|
||
login_manager = LoginManager(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
static_bp = Blueprint('static_bp', __name__)
|
||
|
||
# dla live
|
||
active_users = {}
|
||
class User(UserMixin, db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(150), unique=True, nullable=False)
|
||
password_hash = db.Column(db.String(150), nullable=False)
|
||
is_admin = db.Column(db.Boolean, default=False)
|
||
|
||
class ShoppingList(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
title = db.Column(db.String(150), nullable=False)
|
||
created_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
|
||
is_temporary = db.Column(db.Boolean, default=False)
|
||
share_token = db.Column(db.String(64), unique=True, nullable=True)
|
||
expires_at = db.Column(db.DateTime, nullable=True)
|
||
owner = db.relationship('User', backref='lists', lazy=True)
|
||
is_archived = db.Column(db.Boolean, default=False)
|
||
is_public = db.Column(db.Boolean, default=True)
|
||
class Item(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
|
||
name = db.Column(db.String(150), nullable=False)
|
||
added_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
|
||
purchased = db.Column(db.Boolean, default=False)
|
||
purchased_at = db.Column(db.DateTime, nullable=True)
|
||
quantity = db.Column(db.Integer, default=1)
|
||
note = db.Column(db.Text, nullable=True)
|
||
|
||
class SuggestedProduct(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
name = db.Column(db.String(150), unique=True, nullable=False)
|
||
|
||
class Expense(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
|
||
amount = db.Column(db.Float, nullable=False)
|
||
added_at = db.Column(db.DateTime, default=datetime.utcnow)
|
||
receipt_filename = db.Column(db.String(255), nullable=True)
|
||
|
||
with app.app_context():
|
||
# Twój kod inicjalizacyjny, np. utworzenie konta admina
|
||
db.create_all()
|
||
from werkzeug.security import generate_password_hash
|
||
admin = User.query.filter_by(is_admin=True).first()
|
||
username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
|
||
password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
|
||
password_hash = generate_password_hash(password)
|
||
if admin:
|
||
# Aktualizacja jeśli dane się różnią
|
||
if admin.username != username or not check_password_hash(admin.password_hash, password):
|
||
admin.username = username
|
||
admin.password_hash = password_hash
|
||
db.session.commit()
|
||
else:
|
||
# Brak admina – utwórz nowe konto
|
||
admin = User(username=username, password_hash=password_hash, is_admin=True)
|
||
db.session.add(admin)
|
||
db.session.commit()
|
||
|
||
@static_bp.route('/static/js/<path:filename>')
|
||
def serve_js(filename):
|
||
response = send_from_directory('static/js', filename)
|
||
response.cache_control.no_cache = True
|
||
response.cache_control.no_store = True
|
||
response.cache_control.must_revalidate = True
|
||
#response.expires = 0
|
||
response.pragma = 'no-cache'
|
||
response.headers.pop('Content-Disposition', None)
|
||
response.headers.pop('Etag', None)
|
||
return response
|
||
|
||
@static_bp.route('/static/css/<path:filename>')
|
||
def serve_css(filename):
|
||
response = send_from_directory('static/css', filename)
|
||
#response.cache_control.public = True
|
||
#response.cache_control.max_age = 3600
|
||
response.headers['Cache-Control'] = 'public, max-age=3600'
|
||
response.headers.pop('Content-Disposition', None)
|
||
response.headers.pop('Etag', None)
|
||
#response.expires = 0
|
||
return response
|
||
|
||
@static_bp.route('/static/lib/js/<path:filename>')
|
||
def serve_js_lib(filename):
|
||
response = send_from_directory('static/lib/js', filename)
|
||
response.headers['Cache-Control'] = 'public, max-age=604800'
|
||
response.headers.pop('Content-Disposition', None)
|
||
response.headers.pop('Etag', None)
|
||
return response
|
||
|
||
# CSS z cache na tydzień
|
||
@static_bp.route('/static/lib/css/<path:filename>')
|
||
def serve_css_lib(filename):
|
||
response = send_from_directory('static/lib/css', filename)
|
||
response.headers['Cache-Control'] = 'public, max-age=604800'
|
||
response.headers.pop('Content-Disposition', None)
|
||
response.headers.pop('Etag', None)
|
||
return response
|
||
|
||
|
||
app.register_blueprint(static_bp)
|
||
|
||
def allowed_file(filename):
|
||
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
||
|
||
def get_progress(list_id):
|
||
items = Item.query.filter_by(list_id=list_id).all()
|
||
total_count = len(items)
|
||
purchased_count = len([i for i in items if i.purchased])
|
||
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
||
return purchased_count, total_count, percent
|
||
|
||
def delete_receipts_for_list(list_id):
|
||
receipt_pattern = f"list_{list_id}_"
|
||
upload_folder = app.config['UPLOAD_FOLDER']
|
||
for filename in os.listdir(upload_folder):
|
||
if filename.startswith(receipt_pattern):
|
||
try:
|
||
os.remove(os.path.join(upload_folder, filename))
|
||
except Exception as e:
|
||
print(f"Nie udało się usunąć pliku {filename}: {e}")
|
||
|
||
# zabezpieczenie logowani do systemy - błędne hasła
|
||
def is_ip_blocked(ip):
|
||
now = time.time()
|
||
attempts = failed_login_attempts[ip]
|
||
while attempts and now - attempts[0] > TIME_WINDOW:
|
||
attempts.popleft()
|
||
return len(attempts) >= MAX_ATTEMPTS
|
||
|
||
def register_failed_attempt(ip):
|
||
now = time.time()
|
||
attempts = failed_login_attempts[ip]
|
||
while attempts and now - attempts[0] > TIME_WINDOW:
|
||
attempts.popleft()
|
||
attempts.append(now)
|
||
|
||
def reset_failed_attempts(ip):
|
||
failed_login_attempts[ip].clear()
|
||
|
||
def attempts_remaining(ip):
|
||
attempts = failed_login_attempts[ip]
|
||
return max(0, MAX_ATTEMPTS - len(attempts))
|
||
|
||
####################################################
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
return User.query.get(int(user_id))
|
||
|
||
@app.context_processor
|
||
def inject_time():
|
||
return dict(time=time)
|
||
|
||
@app.context_processor
|
||
def inject_has_authorized_cookie():
|
||
return {'has_authorized_cookie': 'authorized' in request.cookies}
|
||
|
||
@app.before_request
|
||
def require_system_password():
|
||
if 'authorized' not in request.cookies \
|
||
and request.endpoint != 'system_auth' \
|
||
and not request.endpoint.startswith('login') \
|
||
and request.endpoint != 'favicon':
|
||
# specjalny wyjątek dla statycznych, ale sprawdzany ręcznie niżej
|
||
if request.endpoint == 'static_bp.serve_js':
|
||
# tu sprawdzamy czy to JS, który ma być chroniony
|
||
protected_js = ["live.js", "list_guest.js", "hide_list.js", "socket_reconnect.js"]
|
||
requested_file = request.view_args.get("filename", "")
|
||
if requested_file in protected_js:
|
||
return redirect(url_for('system_auth', next=request.url))
|
||
else:
|
||
return # pozwól na inne pliki statyczne
|
||
|
||
if request.endpoint.startswith('static_bp.'):
|
||
return # np. CSS, favicon, inne — pozwól
|
||
|
||
if request.path == '/':
|
||
return redirect(url_for('system_auth'))
|
||
else:
|
||
from urllib.parse import urlparse, urlunparse
|
||
parsed = urlparse(request.url)
|
||
fixed_url = urlunparse(parsed._replace(netloc=request.host))
|
||
return redirect(url_for('system_auth', next=fixed_url))
|
||
|
||
@app.template_filter('filemtime')
|
||
def file_mtime_filter(path):
|
||
try:
|
||
t = os.path.getmtime(path)
|
||
return datetime.fromtimestamp(t)
|
||
except Exception:
|
||
return datetime.utcnow()
|
||
|
||
@app.template_filter('filesizeformat')
|
||
def filesizeformat_filter(path):
|
||
try:
|
||
size = os.path.getsize(path)
|
||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||
if size < 1024.0:
|
||
return f"{size:.1f} {unit}"
|
||
size /= 1024.0
|
||
return f"{size:.1f} TB"
|
||
except Exception:
|
||
return "N/A"
|
||
|
||
@app.errorhandler(404)
|
||
def page_not_found(e):
|
||
return render_template('404.html'), 404
|
||
|
||
@app.errorhandler(403)
|
||
def forbidden(e):
|
||
return '403 Forbidden', 403
|
||
|
||
@app.route('/favicon.svg')
|
||
def favicon():
|
||
svg = '''
|
||
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
|
||
<text y="14" font-size="16">🛒</text>
|
||
</svg>
|
||
'''
|
||
return svg, 200, {'Content-Type': 'image/svg+xml'}
|
||
|
||
@app.route('/')
|
||
def index_guest():
|
||
now = datetime.utcnow()
|
||
|
||
if current_user.is_authenticated:
|
||
# Twoje listy
|
||
user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter(
|
||
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
|
||
).order_by(ShoppingList.created_at.desc()).all()
|
||
|
||
public_lists = ShoppingList.query.filter(
|
||
ShoppingList.is_public == True,
|
||
ShoppingList.owner_id != current_user.id,
|
||
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
|
||
ShoppingList.is_archived == False
|
||
).order_by(ShoppingList.created_at.desc()).all()
|
||
else:
|
||
user_lists = []
|
||
public_lists = ShoppingList.query.filter(
|
||
ShoppingList.is_public == True,
|
||
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
|
||
ShoppingList.is_archived == False
|
||
).order_by(ShoppingList.created_at.desc()).all()
|
||
|
||
for l in user_lists + public_lists:
|
||
items = Item.query.filter_by(list_id=l.id).all()
|
||
l.total_count = len(items)
|
||
l.purchased_count = len([i for i in items if i.purchased])
|
||
expenses = Expense.query.filter_by(list_id=l.id).all()
|
||
l.total_expense = sum(e.amount for e in expenses)
|
||
|
||
return render_template("main.html", user_lists=user_lists, public_lists=public_lists)
|
||
|
||
@app.route('/system-auth', methods=['GET', 'POST'])
|
||
def system_auth():
|
||
#ip = request.remote_addr
|
||
ip = request.access_route[0]
|
||
next_page = request.args.get('next') or url_for('index_guest')
|
||
|
||
if is_ip_blocked(ip):
|
||
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
|
||
return render_template('system_auth.html'), 403
|
||
if request.method == 'POST':
|
||
if request.form['password'] == SYSTEM_PASSWORD:
|
||
reset_failed_attempts(ip)
|
||
resp = redirect(next_page)
|
||
resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE)
|
||
return resp
|
||
else:
|
||
register_failed_attempt(ip)
|
||
if is_ip_blocked(ip):
|
||
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
|
||
return render_template('system_auth.html'), 403
|
||
remaining = attempts_remaining(ip)
|
||
flash(f'Nieprawidłowe hasło do systemu. Pozostało prób: {remaining}', 'warning')
|
||
return render_template('system_auth.html')
|
||
|
||
@app.route('/archive_my_list/<int:list_id>')
|
||
@login_required
|
||
def archive_my_list(list_id):
|
||
l = ShoppingList.query.get_or_404(list_id)
|
||
if l.owner_id != current_user.id:
|
||
flash('Nie masz uprawnień do tej listy', 'danger')
|
||
return redirect(url_for('index_guest'))
|
||
l.is_archived = True
|
||
db.session.commit()
|
||
flash('Lista została zarchiwizowana', 'success')
|
||
return redirect(url_for('index_guest'))
|
||
|
||
@app.route('/edit_my_list/<int:list_id>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_my_list(list_id):
|
||
l = ShoppingList.query.get_or_404(list_id)
|
||
if l.owner_id != current_user.id:
|
||
flash('Nie masz uprawnień do tej listy', 'danger')
|
||
return redirect(url_for('index_guest'))
|
||
|
||
if request.method == 'POST':
|
||
new_title = request.form.get('title')
|
||
if new_title and new_title.strip():
|
||
l.title = new_title.strip()
|
||
db.session.commit()
|
||
flash('Zaktualizowano tytuł listy', 'success')
|
||
return redirect(url_for('index_guest'))
|
||
else:
|
||
flash('Podaj poprawny tytuł', 'danger')
|
||
return render_template('edit_my_list.html', list=l)
|
||
|
||
@app.route('/toggle_visibility/<int:list_id>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def toggle_visibility(list_id):
|
||
l = ShoppingList.query.get_or_404(list_id)
|
||
if l.owner_id != current_user.id:
|
||
if request.is_json or request.method == 'POST':
|
||
return {'error': 'Unauthorized'}, 403
|
||
flash('Nie masz uprawnień do tej listy', 'danger')
|
||
return redirect(url_for('index_guest'))
|
||
|
||
l.is_public = not l.is_public
|
||
db.session.commit()
|
||
|
||
share_url = f"{request.url_root}share/{l.share_token}"
|
||
|
||
if request.is_json or request.method == 'POST':
|
||
return {'is_public': l.is_public, 'share_url': share_url}
|
||
|
||
if l.is_public:
|
||
flash('Lista została udostępniona publicznie', 'success')
|
||
else:
|
||
flash('Lista została ukryta przed gośćmi', 'info')
|
||
|
||
return redirect(url_for('index_guest'))
|
||
|
||
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
if request.method == 'POST':
|
||
user = User.query.filter_by(username=request.form['username']).first()
|
||
if user and check_password_hash(user.password_hash, request.form['password']):
|
||
login_user(user)
|
||
flash('Zalogowano pomyślnie', 'success')
|
||
return redirect(url_for('index_guest'))
|
||
flash('Nieprawidłowy login lub hasło', 'danger')
|
||
return render_template('login.html')
|
||
|
||
@app.route('/logout')
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
flash('Wylogowano pomyślnie', 'success')
|
||
return redirect(url_for('index_guest'))
|
||
|
||
@app.route('/create', methods=['POST'])
|
||
@login_required
|
||
def create_list():
|
||
title = request.form.get('title')
|
||
is_temporary = 'temporary' in request.form
|
||
#token = secrets.token_hex(16)
|
||
token = secrets.token_hex(4)
|
||
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
|
||
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
|
||
db.session.add(new_list)
|
||
db.session.commit()
|
||
flash('Utworzono nową listę', 'success')
|
||
return redirect(url_for('view_list', list_id=new_list.id))
|
||
|
||
@app.route('/list/<int:list_id>')
|
||
@login_required
|
||
def view_list(list_id):
|
||
shopping_list = ShoppingList.query.get_or_404(list_id)
|
||
items = Item.query.filter_by(list_id=list_id).all()
|
||
total_count = len(items)
|
||
purchased_count = len([i for i in items if i.purchased])
|
||
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
||
expenses = Expense.query.filter_by(list_id=list_id).all()
|
||
total_expense = sum(e.amount for e in expenses)
|
||
receipt_pattern = f"list_{list_id}"
|
||
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
|
||
receipt_files = [f for f in all_files if receipt_pattern in f]
|
||
|
||
return render_template(
|
||
'list.html',
|
||
list=shopping_list,
|
||
items=items,
|
||
receipt_files=receipt_files,
|
||
total_count=total_count,
|
||
purchased_count=purchased_count,
|
||
percent=percent,
|
||
expenses=expenses,
|
||
total_expense=total_expense
|
||
)
|
||
|
||
@app.route('/share/<token>')
|
||
def share_list(token):
|
||
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
|
||
|
||
if not shopping_list.is_public:
|
||
flash('Ta lista nie jest publicznie dostępna', 'danger')
|
||
return redirect(url_for('index_guest'))
|
||
|
||
items = Item.query.filter_by(list_id=shopping_list.id).all()
|
||
|
||
receipt_pattern = f"list_{shopping_list.id}"
|
||
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
|
||
receipt_files = [f for f in all_files if receipt_pattern in f]
|
||
|
||
expenses = Expense.query.filter_by(list_id=shopping_list.id).all()
|
||
total_expense = sum(e.amount for e in expenses)
|
||
|
||
return render_template(
|
||
'list_guest.html',
|
||
list=shopping_list,
|
||
items=items,
|
||
receipt_files=receipt_files,
|
||
expenses=expenses,
|
||
total_expense=total_expense
|
||
)
|
||
|
||
@app.route('/guest-list/<int:list_id>')
|
||
def guest_list(list_id):
|
||
shopping_list = ShoppingList.query.get_or_404(list_id)
|
||
items = Item.query.filter_by(list_id=list_id).all()
|
||
receipt_pattern = f"list_{list_id}"
|
||
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
|
||
receipt_files = [f for f in all_files if receipt_pattern in f]
|
||
expenses = Expense.query.filter_by(list_id=list_id).all()
|
||
total_expense = sum(e.amount for e in expenses)
|
||
return render_template(
|
||
'list_guest.html',
|
||
list=shopping_list,
|
||
items=items,
|
||
receipt_files=receipt_files,
|
||
expenses=expenses,
|
||
total_expense=total_expense
|
||
)
|
||
|
||
@app.route('/copy/<int:list_id>')
|
||
@login_required
|
||
def copy_list(list_id):
|
||
original = ShoppingList.query.get_or_404(list_id)
|
||
token = secrets.token_hex(16)
|
||
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
|
||
db.session.add(new_list)
|
||
db.session.commit()
|
||
original_items = Item.query.filter_by(list_id=original.id).all()
|
||
for item in original_items:
|
||
copy_item = Item(list_id=new_list.id, name=item.name)
|
||
db.session.add(copy_item)
|
||
db.session.commit()
|
||
flash('Skopiowano listę', 'success')
|
||
return redirect(url_for('view_list', list_id=new_list.id))
|
||
|
||
@app.route('/suggest_products')
|
||
def suggest_products():
|
||
query = request.args.get('q', '')
|
||
suggestions = []
|
||
if query:
|
||
suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all()
|
||
return {'suggestions': [s.name for s in suggestions]}
|
||
|
||
@app.route('/upload_receipt/<int:list_id>', methods=['POST'])
|
||
def upload_receipt(list_id):
|
||
if 'receipt' not in request.files:
|
||
flash('Brak pliku', 'danger')
|
||
return redirect(request.referrer)
|
||
|
||
file = request.files['receipt']
|
||
|
||
if file.filename == '':
|
||
flash('Nie wybrano pliku', 'danger')
|
||
return redirect(request.referrer)
|
||
|
||
if file and allowed_file(file.filename):
|
||
filename = secure_filename(file.filename)
|
||
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
|
||
|
||
img = Image.open(file)
|
||
img.thumbnail((2000, 2000))
|
||
img.save(file_path)
|
||
|
||
flash('Wgrano paragon', 'success')
|
||
return redirect(request.referrer)
|
||
|
||
flash('Niedozwolony format pliku', 'danger')
|
||
return redirect(request.referrer)
|
||
|
||
@app.route('/uploads/<filename>')
|
||
def uploaded_file(filename):
|
||
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename)
|
||
response.headers['Cache-Control'] = 'public, max-age=2592000, immutable'
|
||
response.headers.pop('Pragma', None)
|
||
response.headers.pop('Content-Disposition', None)
|
||
mime, _ = mimetypes.guess_type(filename)
|
||
if mime:
|
||
response.headers['Content-Type'] = mime
|
||
return response
|
||
|
||
@app.route('/admin')
|
||
@login_required
|
||
def admin_panel():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
|
||
user_count = User.query.count()
|
||
list_count = ShoppingList.query.count()
|
||
item_count = Item.query.count()
|
||
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
|
||
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
|
||
|
||
enriched_lists = []
|
||
for l in all_lists:
|
||
items = Item.query.filter_by(list_id=l.id).all()
|
||
total_count = len(items)
|
||
purchased_count = len([i for i in items if i.purchased])
|
||
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
|
||
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
|
||
expenses = Expense.query.filter_by(list_id=l.id).all()
|
||
total_expense = sum(e.amount for e in expenses)
|
||
receipt_pattern = f"list_{l.id}"
|
||
receipt_files = [f for f in all_files if receipt_pattern in f]
|
||
|
||
enriched_lists.append({
|
||
'list': l,
|
||
'total_count': total_count,
|
||
'purchased_count': purchased_count,
|
||
'percent': round(percent),
|
||
'comments_count': comments_count,
|
||
'receipts_count': len(receipt_files),
|
||
'total_expense': total_expense
|
||
})
|
||
|
||
top_products = (
|
||
db.session.query(Item.name, func.count(Item.id).label('count'))
|
||
.filter(Item.purchased == True)
|
||
.group_by(Item.name)
|
||
.order_by(func.count(Item.id).desc())
|
||
.limit(5)
|
||
.all()
|
||
)
|
||
|
||
purchased_items_count = Item.query.filter_by(purchased=True).count()
|
||
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
|
||
|
||
current_year = datetime.utcnow().year
|
||
year_expense_sum = (
|
||
db.session.query(func.sum(Expense.amount))
|
||
.filter(extract('year', Expense.added_at) == current_year)
|
||
.scalar() or 0
|
||
)
|
||
|
||
current_month = datetime.utcnow().month
|
||
month_expense_sum = (
|
||
db.session.query(func.sum(Expense.amount))
|
||
.filter(extract('year', Expense.added_at) == current_year)
|
||
.filter(extract('month', Expense.added_at) == current_month)
|
||
.scalar() or 0
|
||
)
|
||
|
||
return render_template(
|
||
'admin/admin_panel.html',
|
||
user_count=user_count,
|
||
list_count=list_count,
|
||
item_count=item_count,
|
||
purchased_items_count=purchased_items_count,
|
||
enriched_lists=enriched_lists,
|
||
top_products=top_products,
|
||
total_expense_sum=total_expense_sum,
|
||
year_expense_sum=year_expense_sum,
|
||
month_expense_sum=month_expense_sum,
|
||
)
|
||
|
||
@app.route('/admin/delete_list/<int:list_id>')
|
||
@login_required
|
||
def delete_list(list_id):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
delete_receipts_for_list(list_id)
|
||
list_to_delete = ShoppingList.query.get_or_404(list_id)
|
||
Item.query.filter_by(list_id=list_to_delete.id).delete()
|
||
Expense.query.filter_by(list_id=list_to_delete.id).delete()
|
||
db.session.delete(list_to_delete)
|
||
db.session.commit()
|
||
flash(f'Usunięto listę: {list_to_delete.title}', 'success')
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/admin/add_user', methods=['GET', 'POST'])
|
||
@login_required
|
||
def add_user():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
if request.method == 'POST':
|
||
username = request.form['username']
|
||
password = generate_password_hash(request.form['password'])
|
||
new_user = User(username=username, password_hash=password)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
flash('Dodano nowego użytkownika', 'success')
|
||
return redirect(url_for('admin_panel'))
|
||
return render_template('admin/add_user.html')
|
||
|
||
@app.route('/admin/users')
|
||
@login_required
|
||
def list_users():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
users = User.query.all()
|
||
user_count = User.query.count()
|
||
list_count = ShoppingList.query.count()
|
||
item_count = Item.query.count()
|
||
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
|
||
return render_template('admin/list_users.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
|
||
|
||
@app.route('/admin/reset_password/<int:user_id>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def reset_password(user_id):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
user = User.query.get_or_404(user_id)
|
||
if request.method == 'POST':
|
||
new_password = generate_password_hash(request.form['password'])
|
||
user.password_hash = new_password
|
||
db.session.commit()
|
||
flash('Hasło zresetowane', 'success')
|
||
return redirect(url_for('list_users'))
|
||
return render_template('admin/reset_password.html', user=user)
|
||
|
||
@app.route('/admin/delete_user/<int:user_id>')
|
||
@login_required
|
||
def delete_user(user_id):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
user = User.query.get_or_404(user_id)
|
||
db.session.delete(user)
|
||
db.session.commit()
|
||
flash('Użytkownik usunięty', 'success')
|
||
return redirect(url_for('list_users'))
|
||
|
||
@app.route('/admin/receipts')
|
||
@login_required
|
||
def admin_receipts():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
|
||
image_files = [f for f in all_files if allowed_file(f)]
|
||
return render_template(
|
||
'admin/receipts.html',
|
||
image_files=image_files,
|
||
upload_folder=app.config['UPLOAD_FOLDER']
|
||
)
|
||
|
||
@app.route('/admin/delete_receipt/<filename>')
|
||
@login_required
|
||
def delete_receipt(filename):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
if os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
flash('Plik usunięty', 'success')
|
||
else:
|
||
flash('Plik nie istnieje', 'danger')
|
||
return redirect(url_for('admin_receipts'))
|
||
|
||
@app.route('/admin/delete_selected_lists', methods=['POST'])
|
||
@login_required
|
||
def delete_selected_lists():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
ids = request.form.getlist('list_ids')
|
||
for list_id in ids:
|
||
lst = ShoppingList.query.get(int(list_id))
|
||
if lst:
|
||
delete_receipts_for_list(lst.id)
|
||
Item.query.filter_by(list_id=lst.id).delete()
|
||
Expense.query.filter_by(list_id=lst.id).delete()
|
||
db.session.delete(lst)
|
||
db.session.commit()
|
||
flash('Usunięto wybrane listy', 'success')
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/admin/archive_list/<int:list_id>')
|
||
@login_required
|
||
def archive_list(list_id):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
l = ShoppingList.query.get_or_404(list_id)
|
||
l.is_archived = True
|
||
db.session.commit()
|
||
flash('Lista oznaczona jako archiwalna', 'success')
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/admin/delete_all_items')
|
||
@login_required
|
||
def delete_all_items():
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
Item.query.delete()
|
||
db.session.commit()
|
||
flash('Usunięto wszystkie produkty', 'success')
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_list(list_id):
|
||
if not current_user.is_admin:
|
||
return redirect(url_for('index_guest'))
|
||
|
||
l = ShoppingList.query.get_or_404(list_id)
|
||
expenses = Expense.query.filter_by(list_id=list_id).all()
|
||
total_expense = sum(e.amount for e in expenses)
|
||
|
||
users = User.query.all()
|
||
|
||
if request.method == 'POST':
|
||
new_title = request.form.get('title')
|
||
new_amount_str = request.form.get('amount')
|
||
is_archived = 'archived' in request.form
|
||
new_owner_id = request.form.get('owner_id')
|
||
|
||
if new_title and new_title.strip():
|
||
l.title = new_title.strip()
|
||
|
||
l.is_archived = is_archived
|
||
|
||
if new_owner_id:
|
||
try:
|
||
new_owner_id_int = int(new_owner_id)
|
||
if User.query.get(new_owner_id_int):
|
||
l.owner_id = new_owner_id_int
|
||
else:
|
||
flash('Wybrany użytkownik nie istnieje', 'danger')
|
||
return redirect(url_for('edit_list', list_id=list_id))
|
||
except ValueError:
|
||
flash('Niepoprawny ID użytkownika', 'danger')
|
||
return redirect(url_for('edit_list', list_id=list_id))
|
||
|
||
if new_amount_str:
|
||
try:
|
||
new_amount = float(new_amount_str)
|
||
|
||
if expenses:
|
||
for expense in expenses:
|
||
db.session.delete(expense)
|
||
db.session.commit()
|
||
|
||
new_expense = Expense(list_id=list_id, amount=new_amount)
|
||
db.session.add(new_expense)
|
||
db.session.commit()
|
||
flash('Zaktualizowano tytuł, właściciela, archiwizację i/lub kwotę wydatku', 'success')
|
||
except ValueError:
|
||
flash('Niepoprawna kwota', 'danger')
|
||
return redirect(url_for('edit_list', list_id=list_id))
|
||
else:
|
||
db.session.commit()
|
||
flash('Zaktualizowano tytuł, właściciela i/lub archiwizację', 'success')
|
||
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
return render_template('admin/edit_list.html', list=l, total_expense=total_expense, users=users)
|
||
|
||
|
||
|
||
# chyba do usuniecia przeniesione na eventy socket.io
|
||
@app.route('/update-note/<int:item_id>', methods=['POST'])
|
||
def update_note(item_id):
|
||
item = Item.query.get_or_404(item_id)
|
||
note = request.form.get('note')
|
||
item.note = note
|
||
db.session.commit()
|
||
return {'success': True}
|
||
|
||
# =========================================================================================
|
||
# SOCKET.IO
|
||
# =========================================================================================
|
||
|
||
@socketio.on('delete_item')
|
||
def handle_delete_item(data):
|
||
item = Item.query.get(data['item_id'])
|
||
if item:
|
||
db.session.delete(item)
|
||
db.session.commit()
|
||
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
|
||
|
||
@socketio.on('edit_item')
|
||
def handle_edit_item(data):
|
||
item = Item.query.get(data['item_id'])
|
||
new_name = data['new_name']
|
||
new_quantity = data.get('new_quantity', item.quantity)
|
||
|
||
if item and new_name.strip():
|
||
item.name = new_name.strip()
|
||
|
||
try:
|
||
new_quantity = int(new_quantity)
|
||
if new_quantity < 1:
|
||
new_quantity = 1
|
||
except:
|
||
new_quantity = 1
|
||
|
||
item.quantity = new_quantity
|
||
|
||
db.session.commit()
|
||
|
||
emit('item_edited', {
|
||
'item_id': item.id,
|
||
'new_name': item.name,
|
||
'new_quantity': item.quantity
|
||
}, to=str(item.list_id))
|
||
|
||
@socketio.on('join_list')
|
||
def handle_join(data):
|
||
global active_users
|
||
room = str(data['room'])
|
||
username = data.get('username', 'Gość')
|
||
join_room(room)
|
||
|
||
if room not in active_users:
|
||
active_users[room] = set()
|
||
active_users[room].add(username)
|
||
|
||
shopping_list = ShoppingList.query.get(int(data['room']))
|
||
list_title = shopping_list.title if shopping_list else "Twoja lista"
|
||
|
||
emit('user_joined', {'username': username}, to=room)
|
||
emit('user_list', {'users': list(active_users[room])}, to=room)
|
||
emit('joined_confirmation', {'room': room, 'list_title': list_title})
|
||
|
||
@socketio.on('disconnect')
|
||
def handle_disconnect():
|
||
global active_users
|
||
for room, users in active_users.items():
|
||
if current_user.username in users:
|
||
users.remove(current_user.username)
|
||
emit('user_left', {'username': current_user.username}, to=room)
|
||
emit('user_list', {'users': list(users)}, to=room)
|
||
|
||
@socketio.on('add_item')
|
||
def handle_add_item(data):
|
||
list_id = data['list_id']
|
||
name = data['name']
|
||
quantity = data.get('quantity', 1)
|
||
|
||
try:
|
||
quantity = int(quantity)
|
||
if quantity < 1:
|
||
quantity = 1
|
||
except:
|
||
quantity = 1
|
||
|
||
new_item = Item(
|
||
list_id=list_id,
|
||
name=name,
|
||
quantity=quantity,
|
||
added_by=current_user.id if current_user.is_authenticated else None
|
||
)
|
||
db.session.add(new_item)
|
||
|
||
if not SuggestedProduct.query.filter_by(name=name).first():
|
||
new_suggestion = SuggestedProduct(name=name)
|
||
db.session.add(new_suggestion)
|
||
|
||
db.session.commit()
|
||
|
||
emit('item_added', {
|
||
'id': new_item.id,
|
||
'name': new_item.name,
|
||
'quantity': new_item.quantity,
|
||
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
|
||
}, to=str(list_id), include_self=True)
|
||
|
||
@socketio.on('check_item')
|
||
def handle_check_item(data):
|
||
item = Item.query.get(data['item_id'])
|
||
if item:
|
||
item.purchased = True
|
||
item.purchased_at = datetime.utcnow()
|
||
db.session.commit()
|
||
|
||
purchased_count, total_count, percent = get_progress(item.list_id)
|
||
|
||
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
|
||
emit('progress_updated', {
|
||
'purchased_count': purchased_count,
|
||
'total_count': total_count,
|
||
'percent': percent
|
||
}, to=str(item.list_id))
|
||
|
||
@socketio.on('uncheck_item')
|
||
def handle_uncheck_item(data):
|
||
item = Item.query.get(data['item_id'])
|
||
if item:
|
||
item.purchased = False
|
||
item.purchased_at = None
|
||
db.session.commit()
|
||
|
||
purchased_count, total_count, percent = get_progress(item.list_id)
|
||
|
||
emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id))
|
||
emit('progress_updated', {
|
||
'purchased_count': purchased_count,
|
||
'total_count': total_count,
|
||
'percent': percent
|
||
}, to=str(item.list_id))
|
||
|
||
@socketio.on('update_note')
|
||
def handle_update_note(data):
|
||
item_id = data['item_id']
|
||
note = data['note']
|
||
item = Item.query.get(item_id)
|
||
if item:
|
||
item.note = note
|
||
db.session.commit()
|
||
emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id))
|
||
|
||
@socketio.on('add_expense')
|
||
def handle_add_expense(data):
|
||
list_id = data['list_id']
|
||
amount = data['amount']
|
||
|
||
new_expense = Expense(list_id=list_id, amount=amount)
|
||
db.session.add(new_expense)
|
||
db.session.commit()
|
||
|
||
total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0
|
||
|
||
emit('expense_added', {
|
||
'amount': amount,
|
||
'total': total
|
||
}, to=str(list_id))
|
||
|
||
@app.cli.command('create_db')
|
||
def create_db():
|
||
db.create_all()
|
||
print('Database created.')
|
||
|
||
if __name__ == '__main__':
|
||
socketio.run(app, host='0.0.0.0', port=8000, debug=True) |