Files
lista_zakupowa_live/app.py
2025-07-12 23:25:42 +02:00

1423 lines
49 KiB
Python

import os
import secrets
import time
import mimetypes
import sys
import platform
import psutil
from datetime import datetime, timedelta
from flask import Flask, render_template, redirect, url_for, request, flash, Blueprint, send_from_directory, request, abort, session, jsonify, make_response
from markupsafe import Markup
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
from collections import defaultdict, deque
from functools import wraps
app = Flask(__name__)
app.config.from_object(Config)
app.config['COMPRESS_ALGORITHM'] = ['zstd', 'br', 'gzip', 'deflate']
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1)
SYSTEM_PASSWORD = app.config.get('SYSTEM_PASSWORD', 'changeme')
DEFAULT_ADMIN_USERNAME = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
DEFAULT_ADMIN_PASSWORD = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
UPLOAD_FOLDER = app.config.get('UPLOAD_FOLDER', 'uploads')
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp'}
AUTHORIZED_COOKIE_VALUE = app.config.get('AUTHORIZED_COOKIE_VALUE', '80d31cdfe63539c9')
AUTH_COOKIE_MAX_AGE = app.config.get('AUTH_COOKIE_MAX_AGE', 86400)
HEALTHCHECK_TOKEN = app.config.get('HEALTHCHECK_TOKEN', 'alamapsaikota1234')
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
failed_login_attempts = defaultdict(deque)
MAX_ATTEMPTS = 10
TIME_WINDOW = 60 * 60
db = SQLAlchemy(app)
socketio = SocketIO(app, async_mode="eventlet")
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# flask-compress
compress = Compress()
compress.init_app(app)
static_bp = Blueprint('static_bp', __name__)
# dla live
active_users = {}
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
class ShoppingList(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(150), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
owner_id = db.Column(db.Integer, db.ForeignKey('user.id'))
is_temporary = db.Column(db.Boolean, default=False)
share_token = db.Column(db.String(64), unique=True, nullable=True)
expires_at = db.Column(db.DateTime, nullable=True)
owner = db.relationship('User', backref='lists', lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
class Item(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
name = db.Column(db.String(150), nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
added_by = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
purchased = db.Column(db.Boolean, default=False)
purchased_at = db.Column(db.DateTime, nullable=True)
quantity = db.Column(db.Integer, default=1)
note = db.Column(db.Text, nullable=True)
class SuggestedProduct(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(150), unique=True, nullable=False)
usage_count = db.Column(db.Integer, default=0)
class Expense(db.Model):
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(db.Integer, db.ForeignKey('shopping_list.id'))
amount = db.Column(db.Float, nullable=False)
added_at = db.Column(db.DateTime, default=datetime.utcnow)
receipt_filename = db.Column(db.String(255), nullable=True)
with app.app_context():
db.create_all()
from werkzeug.security import generate_password_hash
admin = User.query.filter_by(is_admin=True).first()
username = app.config.get('DEFAULT_ADMIN_USERNAME', 'admin')
password = app.config.get('DEFAULT_ADMIN_PASSWORD', 'admin123')
password_hash = generate_password_hash(password)
if admin:
if admin.username != username or not check_password_hash(admin.password_hash, password):
admin.username = username
admin.password_hash = password_hash
db.session.commit()
else:
admin = User(username=username, password_hash=password_hash, is_admin=True)
db.session.add(admin)
db.session.commit()
@static_bp.route('/static/js/<path:filename>')
def serve_js(filename):
response = send_from_directory('static/js', filename)
response.cache_control.no_cache = True
response.cache_control.no_store = True
response.cache_control.must_revalidate = True
#response.expires = 0
response.pragma = 'no-cache'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
@static_bp.route('/static/css/<path:filename>')
def serve_css(filename):
response = send_from_directory('static/css', filename)
response.headers['Cache-Control'] = 'public, max-age=3600'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
@static_bp.route('/static/lib/js/<path:filename>')
def serve_js_lib(filename):
response = send_from_directory('static/lib/js', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
# CSS z cache na tydzień
@static_bp.route('/static/lib/css/<path:filename>')
def serve_css_lib(filename):
response = send_from_directory('static/lib/css', filename)
response.headers['Cache-Control'] = 'public, max-age=604800'
response.headers.pop('Content-Disposition', None)
response.headers.pop('Etag', None)
return response
app.register_blueprint(static_bp)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_list_details(list_id):
shopping_list = ShoppingList.query.get_or_404(list_id)
items = Item.query.filter_by(list_id=list_id).all()
receipt_pattern = f"list_{list_id}"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipt_files = [f for f in all_files if receipt_pattern in f]
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
return shopping_list, items, receipt_files, expenses, total_expense
def generate_share_token(length=8):
"""Generuje token do udostępniania. Parametr `length` to liczba znaków (domyślnie 4)."""
return secrets.token_hex(length // 2)
def check_list_public(shopping_list):
if not shopping_list.is_public:
flash('Ta lista nie jest publicznie dostępna', 'danger')
return False
return True
def enrich_list_data(l):
items = Item.query.filter_by(list_id=l.id).all()
l.total_count = len(items)
l.purchased_count = len([i for i in items if i.purchased])
expenses = Expense.query.filter_by(list_id=l.id).all()
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path: str, max_size=(2000, 2000)):
img = Image.open(file)
img.thumbnail(max_size)
img.save(path)
def redirect_with_flash(message: str, category: str = 'info', endpoint: str = 'main_page'):
flash(message, category)
return redirect(url_for(endpoint))
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
return redirect_with_flash('Brak uprawnień do tej sekcji.', 'danger')
return f(*args, **kwargs)
return decorated_function
def get_progress(list_id):
items = Item.query.filter_by(list_id=list_id).all()
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return purchased_count, total_count, percent
def delete_receipts_for_list(list_id):
receipt_pattern = f"list_{list_id}_"
upload_folder = app.config['UPLOAD_FOLDER']
for filename in os.listdir(upload_folder):
if filename.startswith(receipt_pattern):
try:
os.remove(os.path.join(upload_folder, filename))
except Exception as e:
print(f"Nie udało się usunąć pliku {filename}: {e}")
# zabezpieczenie logowani do systemy - błędne hasła
def is_ip_blocked(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
return len(attempts) >= MAX_ATTEMPTS
def register_failed_attempt(ip):
now = time.time()
attempts = failed_login_attempts[ip]
while attempts and now - attempts[0] > TIME_WINDOW:
attempts.popleft()
attempts.append(now)
def reset_failed_attempts(ip):
failed_login_attempts[ip].clear()
def attempts_remaining(ip):
attempts = failed_login_attempts[ip]
return max(0, MAX_ATTEMPTS - len(attempts))
####################################################
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@app.context_processor
def inject_time():
return dict(time=time)
@app.context_processor
def inject_has_authorized_cookie():
return {'has_authorized_cookie': 'authorized' in request.cookies}
@app.context_processor
def inject_is_blocked():
ip = request.access_route[0]
return {'is_blocked': is_ip_blocked(ip)}
@app.before_request
def require_system_password():
endpoint = request.endpoint
# Wyjątki: lib js/css zawsze przepuszczamy
if endpoint in ('static_bp.serve_js_lib', 'static_bp.serve_css_lib'):
return
ip = request.access_route[0]
if is_ip_blocked(ip):
abort(403)
if endpoint is None:
return
if endpoint in ('system_auth', 'healthcheck'):
return
if 'authorized' not in request.cookies and not endpoint.startswith('login') and endpoint != 'favicon':
# Dla serve_js przepuszczamy tylko toasts.js
if endpoint == 'static_bp.serve_js':
requested_file = request.view_args.get("filename", "")
if requested_file == "toasts.js":
return
if requested_file.endswith(".js"):
return redirect(url_for('system_auth', next=request.url))
return
# Blokujemy pozostałe static_bp
if endpoint.startswith('static_bp.'):
return
if request.path == '/':
return redirect(url_for('system_auth'))
from urllib.parse import urlparse, urlunparse
parsed = urlparse(request.url)
fixed_url = urlunparse(parsed._replace(netloc=request.host))
return redirect(url_for('system_auth', next=fixed_url))
@app.template_filter('filemtime')
def file_mtime_filter(path):
try:
t = os.path.getmtime(path)
return datetime.fromtimestamp(t)
except Exception:
return datetime.utcnow()
@app.template_filter('filesizeformat')
def filesizeformat_filter(path):
try:
size = os.path.getsize(path)
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except Exception:
return "N/A"
@app.errorhandler(404)
def page_not_found(e):
return render_template(
'errors.html',
code=404,
title="Strona nie znaleziona",
message="Ups! Podana strona nie istnieje lub została przeniesiona."
), 404
@app.errorhandler(403)
def forbidden(e):
return render_template(
'errors.html',
code=403,
title="Brak dostępu",
message="Nie masz uprawnień do wyświetlenia tej strony."
), 403
@app.route('/favicon.ico')
def favicon_ico():
return redirect(url_for('static', filename='favicon.svg'))
@app.route('/favicon.svg')
def favicon():
svg = '''
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 16 16">
<text y="14" font-size="16">🛒</text>
</svg>
'''
return svg, 200, {'Content-Type': 'image/svg+xml'}
@app.route('/')
def main_page():
now = datetime.utcnow()
if current_user.is_authenticated:
user_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=False).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
).order_by(ShoppingList.created_at.desc()).all()
archived_lists = ShoppingList.query.filter_by(owner_id=current_user.id, is_archived=True).order_by(ShoppingList.created_at.desc()).all()
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
else:
user_lists = []
archived_lists = []
public_lists = ShoppingList.query.filter(
ShoppingList.is_public == True,
((ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)),
ShoppingList.is_archived == False
).order_by(ShoppingList.created_at.desc()).all()
for l in user_lists + public_lists + archived_lists:
enrich_list_data(l)
return render_template("main.html", user_lists=user_lists, public_lists=public_lists, archived_lists=archived_lists)
@app.route('/system-auth', methods=['GET', 'POST'])
def system_auth():
if current_user.is_authenticated or request.cookies.get('authorized') == AUTHORIZED_COOKIE_VALUE:
flash('Jesteś już zalogowany lub autoryzowany.', 'info')
return redirect(url_for('main_page'))
ip = request.access_route[0]
next_page = request.args.get('next') or url_for('main_page')
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
if request.method == 'POST':
if request.form['password'] == SYSTEM_PASSWORD:
reset_failed_attempts(ip)
resp = redirect(next_page)
max_age = app.config.get('AUTH_COOKIE_MAX_AGE', 86400)
resp.set_cookie('authorized', AUTHORIZED_COOKIE_VALUE, max_age=max_age)
return resp
else:
register_failed_attempt(ip)
if is_ip_blocked(ip):
flash('Przekroczono limit prób logowania. Dostęp zablokowany na 1 godzinę.', 'danger')
return render_template('system_auth.html'), 403
remaining = attempts_remaining(ip)
flash(f'Nieprawidłowe hasło. Pozostało {remaining} prób.', 'warning')
return render_template('system_auth.html')
@app.route('/toggle_archive_list/<int:list_id>')
@login_required
def toggle_archive_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
archive = request.args.get('archive', 'true').lower() == 'true'
if archive:
l.is_archived = True
flash(f'Lista „{l.title}” została zarchiwizowana.', 'success')
else:
l.is_archived = False
flash(f'Lista „{l.title}” została przywrócona.', 'success')
db.session.commit()
return redirect(url_for('main_page'))
@app.route('/edit_my_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
def edit_my_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
return redirect_with_flash('Nie masz uprawnień do tej listy', 'danger')
if request.method == 'POST':
new_title = request.form.get('title')
if new_title and new_title.strip():
l.title = new_title.strip()
db.session.commit()
flash('Zaktualizowano tytuł listy', 'success')
return redirect(url_for('main_page'))
else:
flash('Podaj poprawny tytuł', 'danger')
return render_template('edit_my_list.html', list=l)
@app.route('/toggle_visibility/<int:list_id>', methods=['GET', 'POST'])
@login_required
def toggle_visibility(list_id):
l = ShoppingList.query.get_or_404(list_id)
if l.owner_id != current_user.id:
if request.is_json or request.method == 'POST':
return {'error': 'Unauthorized'}, 403
flash('Nie masz uprawnień do tej listy', 'danger')
return redirect(url_for('main_page'))
l.is_public = not l.is_public
db.session.commit()
share_url = f"{request.url_root}share/{l.share_token}"
if request.is_json or request.method == 'POST':
return {'is_public': l.is_public, 'share_url': share_url}
if l.is_public:
flash('Lista została udostępniona publicznie', 'success')
else:
flash('Lista została ukryta przed gośćmi', 'info')
return redirect(url_for('main_page'))
from sqlalchemy import func
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username_input = request.form['username'].lower()
user = User.query.filter(func.lower(User.username) == username_input).first()
if user and check_password_hash(user.password_hash, request.form['password']):
login_user(user)
flash('Zalogowano pomyślnie', 'success')
return redirect(url_for('main_page'))
flash('Nieprawidłowy login lub hasło', 'danger')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Wylogowano pomyślnie', 'success')
return redirect(url_for('main_page'))
@app.route('/create', methods=['POST'])
@login_required
def create_list():
title = request.form.get('title')
is_temporary = 'temporary' in request.form
token = generate_share_token(8)
expires_at = datetime.utcnow() + timedelta(days=7) if is_temporary else None
new_list = ShoppingList(title=title, owner_id=current_user.id, is_temporary=is_temporary, share_token=token, expires_at=expires_at)
db.session.add(new_list)
db.session.commit()
flash('Utworzono nową listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/list/<int:list_id>')
@login_required
def view_list(list_id):
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
total_count = len(items)
purchased_count = len([i for i in items if i.purchased])
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
return render_template(
'list.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
total_count=total_count,
purchased_count=purchased_count,
percent=percent,
expenses=expenses,
total_expense=total_expense
)
@app.route('/share/<token>')
@app.route('/guest-list/<int:list_id>')
def shared_list(token=None, list_id=None):
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
return redirect(url_for('main_page'))
list_id = shopping_list.id
shopping_list, items, receipt_files, expenses, total_expense = get_list_details(list_id)
return render_template(
'list_share.html',
list=shopping_list,
items=items,
receipt_files=receipt_files,
expenses=expenses,
total_expense=total_expense
)
@app.route('/copy/<int:list_id>')
@login_required
def copy_list(list_id):
original = ShoppingList.query.get_or_404(list_id)
token = generate_share_token(8)
new_list = ShoppingList(title=original.title + ' (Kopia)', owner_id=current_user.id, share_token=token)
db.session.add(new_list)
db.session.commit()
original_items = Item.query.filter_by(list_id=original.id).all()
for item in original_items:
copy_item = Item(list_id=new_list.id, name=item.name)
db.session.add(copy_item)
db.session.commit()
flash('Skopiowano listę', 'success')
return redirect(url_for('view_list', list_id=new_list.id))
@app.route('/suggest_products')
def suggest_products():
query = request.args.get('q', '')
suggestions = []
if query:
suggestions = SuggestedProduct.query.filter(SuggestedProduct.name.ilike(f'%{query}%')).limit(5).all()
return {'suggestions': [s.name for s in suggestions]}
@app.route('/all_products')
def all_products():
query = request.args.get('q', '')
top_products_query = SuggestedProduct.query
if query:
top_products_query = top_products_query.filter(SuggestedProduct.name.ilike(f'%{query}%'))
top_products = (
top_products_query
.order_by(SuggestedProduct.usage_count.desc(), SuggestedProduct.name.asc())
.limit(20)
.all()
)
top_names = [s.name for s in top_products]
rest_query = SuggestedProduct.query
if query:
rest_query = rest_query.filter(SuggestedProduct.name.ilike(f'%{query}%'))
if top_names:
rest_query = rest_query.filter(~SuggestedProduct.name.in_(top_names))
rest_products = (
rest_query
.order_by(SuggestedProduct.name.asc())
.limit(200)
.all()
)
all_names = top_names + [s.name for s in rest_products]
return {'allproducts': all_names}
""" @app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
save_resized_image(file, file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer) """
@app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Brak pliku'}), 400
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Nie wybrano pliku'}), 400
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
full_filename = f"list_{list_id}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], full_filename)
save_resized_image(file, file_path)
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
url = url_for('uploaded_file', filename=full_filename)
socketio.emit('receipt_added', {'url': url}, to=str(list_id))
return jsonify({'success': True, 'url': url})
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
if request.is_json or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': 'Niedozwolony format pliku'}), 400
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer)
@app.route('/uploads/<filename>')
def uploaded_file(filename):
response = send_from_directory(app.config['UPLOAD_FOLDER'], filename)
response.headers['Cache-Control'] = 'public, max-age=2592000, immutable'
response.headers.pop('Pragma', None)
response.headers.pop('Content-Disposition', None)
mime, _ = mimetypes.guess_type(filename)
if mime:
response.headers['Content-Type'] = mime
return response
@app.route('/admin')
@login_required
@admin_required
def admin_panel():
now = datetime.utcnow()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
all_lists = ShoppingList.query.options(db.joinedload(ShoppingList.owner)).all()
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
enriched_lists = []
for l in all_lists:
enrich_list_data(l)
items = Item.query.filter_by(list_id=l.id).all()
total_count = l.total_count
purchased_count = l.purchased_count
percent = (purchased_count / total_count * 100) if total_count > 0 else 0
comments_count = len([i for i in items if i.note and i.note.strip() != ''])
receipt_pattern = f"list_{l.id}"
receipt_files = [f for f in all_files if receipt_pattern in f]
enriched_lists.append({
'list': l,
'total_count': total_count,
'purchased_count': purchased_count,
'percent': round(percent),
'comments_count': comments_count,
'receipts_count': len(receipt_files),
'total_expense': l.total_expense
})
top_products = (
db.session.query(Item.name, func.count(Item.id).label('count'))
.filter(Item.purchased == True)
.group_by(Item.name)
.order_by(func.count(Item.id).desc())
.limit(5)
.all()
)
purchased_items_count = Item.query.filter_by(purchased=True).count()
total_expense_sum = db.session.query(func.sum(Expense.amount)).scalar() or 0
current_year = datetime.utcnow().year
year_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.scalar() or 0
)
current_month = datetime.utcnow().month
month_expense_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == current_year)
.filter(extract('month', Expense.added_at) == current_month)
.scalar() or 0
)
process = psutil.Process(os.getpid())
app_mem = process.memory_info().rss // (1024 * 1024) # MB
return render_template(
'admin/admin_panel.html',
user_count=user_count,
list_count=list_count,
item_count=item_count,
purchased_items_count=purchased_items_count,
enriched_lists=enriched_lists,
top_products=top_products,
total_expense_sum=total_expense_sum,
year_expense_sum=year_expense_sum,
month_expense_sum=month_expense_sum,
now=now,
python_version=sys.version,
system_info=platform.platform(),
app_memory=f"{app_mem} MB",
)
@app.route('/admin/delete_list/<int:list_id>')
@login_required
@admin_required
def delete_list(list_id):
delete_receipts_for_list(list_id)
list_to_delete = ShoppingList.query.get_or_404(list_id)
Item.query.filter_by(list_id=list_to_delete.id).delete()
Expense.query.filter_by(list_id=list_to_delete.id).delete()
db.session.delete(list_to_delete)
db.session.commit()
flash(f'Usunięto listę: {list_to_delete.title}', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/add_user', methods=['POST'])
@login_required
@admin_required
def add_user():
username = request.form['username'].lower()
password = request.form['password']
if not username or not password:
flash('Wypełnij wszystkie pola', 'danger')
return redirect(url_for('list_users'))
if User.query.filter(func.lower(User.username) == username).first():
flash('Użytkownik o takiej nazwie już istnieje', 'warning')
return redirect(url_for('list_users'))
hashed_password = generate_password_hash(password)
new_user = User(username=username, password_hash=hashed_password)
db.session.add(new_user)
db.session.commit()
flash('Dodano nowego użytkownika', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/users')
@login_required
@admin_required
def list_users():
users = User.query.all()
user_count = User.query.count()
list_count = ShoppingList.query.count()
item_count = Item.query.count()
activity_log = ["Utworzono listę: Zakupy weekendowe", "Dodano produkt: Mleko"]
return render_template('admin/user_management.html', users=users, user_count=user_count, list_count=list_count, item_count=item_count, activity_log=activity_log)
@app.route('/admin/change_password/<int:user_id>', methods=['POST'])
@login_required
@admin_required
def reset_password(user_id):
user = User.query.get_or_404(user_id)
new_password = request.form['password']
if not new_password:
flash('Podaj nowe hasło', 'danger')
return redirect(url_for('list_users'))
user.password_hash = generate_password_hash(new_password)
db.session.commit()
flash(f'Hasło dla użytkownika {user.username} zostało zaktualizowane', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/delete_user/<int:user_id>')
@login_required
@admin_required
def delete_user(user_id):
user = User.query.get_or_404(user_id)
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash('Nie można usunąć ostatniego administratora.', 'danger')
return redirect(url_for('list_users'))
db.session.delete(user)
db.session.commit()
flash('Użytkownik usunięty', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/receipts/<id>')
@login_required
@admin_required
def admin_receipts(id):
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
image_files = [f for f in all_files if allowed_file(f)]
if id == "all":
filtered_files = image_files
else:
try:
list_id = int(id)
receipt_prefix = f"list_{list_id}_"
filtered_files = [f for f in image_files if f.startswith(receipt_prefix)]
except ValueError:
flash("Nieprawidłowe ID listy.", "danger")
return redirect(url_for('admin_panel'))
return render_template(
'admin/receipts.html',
image_files=filtered_files,
upload_folder=app.config['UPLOAD_FOLDER']
)
@app.route('/admin/delete_receipt/<filename>')
@login_required
@admin_required
def delete_receipt(filename):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
flash('Plik usunięty', 'success')
else:
flash('Plik nie istnieje', 'danger')
next_url = request.args.get('next')
if next_url:
return redirect(next_url)
return redirect(url_for('admin_receipts'))
@app.route('/admin/delete_selected_lists', methods=['POST'])
@login_required
@admin_required
def delete_selected_lists():
ids = request.form.getlist('list_ids')
for list_id in ids:
lst = ShoppingList.query.get(int(list_id))
if lst:
delete_receipts_for_list(lst.id)
Item.query.filter_by(list_id=lst.id).delete()
Expense.query.filter_by(list_id=lst.id).delete()
db.session.delete(lst)
db.session.commit()
flash('Usunięto wybrane listy', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/delete_all_items')
@login_required
@admin_required
def delete_all_items():
Item.query.delete()
db.session.commit()
flash('Usunięto wszystkie produkty', 'success')
return redirect(url_for('admin_panel'))
@app.route('/admin/edit_list/<int:list_id>', methods=['GET', 'POST'])
@login_required
@admin_required
def edit_list(list_id):
l = ShoppingList.query.get_or_404(list_id)
expenses = Expense.query.filter_by(list_id=list_id).all()
total_expense = sum(e.amount for e in expenses)
users = User.query.all()
items = Item.query.filter_by(list_id=list_id).order_by(Item.id.desc()).all()
# Pobranie listy plików paragonów
receipt_pattern = f"list_{list_id}_"
all_files = os.listdir(app.config['UPLOAD_FOLDER'])
receipts = [f for f in all_files if f.startswith(receipt_pattern)]
if request.method == 'POST':
action = request.form.get('action')
if action == 'save':
new_title = request.form.get('title', '').strip()
new_amount_str = request.form.get('amount')
is_archived = 'archived' in request.form
is_public = 'public' in request.form
new_owner_id = request.form.get('owner_id')
if new_title:
l.title = new_title
l.is_archived = is_archived
l.is_public = is_public
if new_owner_id:
try:
new_owner_id_int = int(new_owner_id)
if User.query.get(new_owner_id_int):
l.owner_id = new_owner_id_int
else:
flash('Wybrany użytkownik nie istnieje', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
except ValueError:
flash('Niepoprawny ID użytkownika', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
if new_amount_str:
try:
new_amount = float(new_amount_str)
for expense in expenses:
db.session.delete(expense)
db.session.commit()
new_expense = Expense(list_id=list_id, amount=new_amount)
db.session.add(new_expense)
db.session.commit()
except ValueError:
flash('Niepoprawna kwota', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
db.session.commit()
flash('Zapisano zmiany listy', 'success')
return redirect(url_for('edit_list', list_id=list_id))
elif action == 'add_item':
item_name = request.form.get('item_name', '').strip()
quantity_str = request.form.get('quantity', '1')
if not item_name:
flash('Podaj nazwę produktu', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
try:
quantity = int(quantity_str)
if quantity < 1:
quantity = 1
except ValueError:
quantity = 1
new_item = Item(list_id=list_id, name=item_name, quantity=quantity, added_by=current_user.id)
db.session.add(new_item)
if not SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == item_name.lower()).first():
db.session.add(SuggestedProduct(name=item_name))
db.session.commit()
flash('Dodano produkt', 'success')
return redirect(url_for('edit_list', list_id=list_id))
elif action == 'delete_item':
item_id = request.form.get('item_id')
item = Item.query.get(item_id)
if item and item.list_id == list_id:
db.session.delete(item)
db.session.commit()
flash('Usunięto produkt', 'success')
else:
flash('Nie znaleziono produktu', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
elif action == 'toggle_purchased':
item_id = request.form.get('item_id')
item = Item.query.get(item_id)
if item and item.list_id == list_id:
item.purchased = not item.purchased
db.session.commit()
flash('Zmieniono status oznaczenia produktu', 'success')
else:
flash('Nie znaleziono produktu', 'danger')
return redirect(url_for('edit_list', list_id=list_id))
# Przekazanie receipts do szablonu
return render_template(
'admin/edit_list.html',
list=l,
total_expense=total_expense,
users=users,
items=items,
receipts=receipts,
upload_folder=app.config['UPLOAD_FOLDER']
)
@app.route('/admin/products')
@login_required
@admin_required
def list_products():
items = Item.query.order_by(Item.id.desc()).all()
users = User.query.all()
users_dict = {user.id: user.username for user in users}
# Stabilne sortowanie sugestii
suggestions = SuggestedProduct.query.order_by(SuggestedProduct.name.asc()).all()
suggestions_dict = {s.name.lower(): s for s in suggestions}
return render_template(
'admin/list_products.html',
items=items,
users_dict=users_dict,
suggestions_dict=suggestions_dict
)
@app.route('/admin/sync_suggestion/<int:item_id>', methods=['POST'])
@login_required
def sync_suggestion_ajax(item_id):
if not current_user.is_admin:
return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403
item = Item.query.get_or_404(item_id)
existing = SuggestedProduct.query.filter(func.lower(SuggestedProduct.name) == item.name.lower()).first()
if not existing:
new_suggestion = SuggestedProduct(name=item.name)
db.session.add(new_suggestion)
db.session.commit()
return jsonify({'success': True, 'message': f'Utworzono sugestię dla produktu: {item.name}'})
else:
return jsonify({'success': True, 'message': f'Sugestia dla produktu „{item.name}” już istnieje.'})
@app.route('/admin/delete_suggestion/<int:suggestion_id>', methods=['POST'])
@login_required
def delete_suggestion_ajax(suggestion_id):
if not current_user.is_admin:
return jsonify({'success': False, 'message': 'Brak uprawnień'}), 403
suggestion = SuggestedProduct.query.get_or_404(suggestion_id)
db.session.delete(suggestion)
db.session.commit()
return jsonify({'success': True, 'message': 'Sugestia została usunięta.'})
@app.route('/admin/expenses_data')
@login_required
def admin_expenses_data():
if not current_user.is_admin:
return jsonify({'error': 'Brak uprawnień'}), 403
range_type = request.args.get('range', 'monthly')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
now = datetime.utcnow()
labels = []
expenses = []
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
expenses_query = (
db.session.query(
extract('year', Expense.added_at).label('year'),
extract('month', Expense.added_at).label('month'),
func.sum(Expense.amount).label('total')
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by('year', 'month')
.order_by('year', 'month')
.all()
)
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
response = make_response(jsonify({'labels': labels, 'expenses': expenses}))
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
return response
if range_type == 'monthly':
for i in range(11, -1, -1):
year = (now - timedelta(days=i*30)).year
month = (now - timedelta(days=i*30)).month
label = f"{month:02d}/{year}"
labels.append(label)
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter(extract('month', Expense.added_at) == month)
.scalar() or 0
)
expenses.append(round(month_sum, 2))
elif range_type == 'quarterly':
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i*90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter((extract('month', Expense.added_at) - 1)//3 + 1 == quarter)
.scalar() or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == 'halfyearly':
for i in range(1, -1, -1):
half_start = now - timedelta(days=i*180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.filter(
(extract('month', Expense.added_at) <= 6) if half == 1 else (extract('month', Expense.added_at) > 6)
)
.scalar() or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == 'yearly':
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract('year', Expense.added_at) == year)
.scalar() or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({'labels': labels, 'expenses': expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
@app.route('/admin/promote_user/<int:user_id>')
@login_required
@admin_required
def promote_user(user_id):
user = User.query.get_or_404(user_id)
user.is_admin = True
db.session.commit()
flash(f'Użytkownik {user.username} został ustawiony jako admin.', 'success')
return redirect(url_for('list_users'))
@app.route('/admin/demote_user/<int:user_id>')
@login_required
@admin_required
def demote_user(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash('Nie możesz zdegradować samego siebie!', 'danger')
return redirect(url_for('list_users'))
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1 and user.is_admin:
flash('Nie można zdegradować. Musi pozostać co najmniej jeden administrator.', 'danger')
return redirect(url_for('list_users'))
user.is_admin = False
db.session.commit()
flash(f'Użytkownik {user.username} został zdegradowany.', 'success')
return redirect(url_for('list_users'))
@app.route('/healthcheck')
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
correct_token = app.config.get('HEALTHCHECK_TOKEN')
if header_token != correct_token:
abort(404)
return 'OK', 200
# =========================================================================================
# SOCKET.IO
# =========================================================================================
@socketio.on('delete_item')
def handle_delete_item(data):
item = Item.query.get(data['item_id'])
if item:
list_id = item.list_id
db.session.delete(item)
db.session.commit()
emit('item_deleted', {'item_id': item.id}, to=str(item.list_id))
purchased_count, total_count, percent = get_progress(list_id)
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(list_id))
@socketio.on('edit_item')
def handle_edit_item(data):
item = Item.query.get(data['item_id'])
new_name = data['new_name']
new_quantity = data.get('new_quantity', item.quantity)
if item and new_name.strip():
item.name = new_name.strip()
try:
new_quantity = int(new_quantity)
if new_quantity < 1:
new_quantity = 1
except:
new_quantity = 1
item.quantity = new_quantity
db.session.commit()
emit('item_edited', {
'item_id': item.id,
'new_name': item.name,
'new_quantity': item.quantity
}, to=str(item.list_id))
@socketio.on('join_list')
def handle_join(data):
global active_users
room = str(data['room'])
username = data.get('username', 'Gość')
join_room(room)
if room not in active_users:
active_users[room] = set()
active_users[room].add(username)
shopping_list = ShoppingList.query.get(int(data['room']))
list_title = shopping_list.title if shopping_list else "Twoja lista"
emit('user_joined', {'username': username}, to=room)
emit('user_list', {'users': list(active_users[room])}, to=room)
emit('joined_confirmation', {'room': room, 'list_title': list_title})
@socketio.on('disconnect')
def handle_disconnect(sid):
global active_users
username = current_user.username if current_user.is_authenticated else "Gość"
for room, users in active_users.items():
if username in users:
users.remove(username)
emit('user_left', {'username': username}, to=room)
emit('user_list', {'users': list(users)}, to=room)
@socketio.on('add_item')
def handle_add_item(data):
list_id = data['list_id']
name = data['name']
quantity = data.get('quantity', 1)
try:
quantity = int(quantity)
if quantity < 1:
quantity = 1
except:
quantity = 1
new_item = Item(
list_id=list_id,
name=name,
quantity=quantity,
added_by=current_user.id if current_user.is_authenticated else None
)
db.session.add(new_item)
if not SuggestedProduct.query.filter_by(name=name).first():
new_suggestion = SuggestedProduct(name=name)
db.session.add(new_suggestion)
db.session.commit()
emit('item_added', {
'id': new_item.id,
'name': new_item.name,
'quantity': new_item.quantity,
'added_by': current_user.username if current_user.is_authenticated else 'Gość'
}, to=str(list_id), include_self=True)
purchased_count, total_count, percent = get_progress(list_id)
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(list_id))
@socketio.on('check_item')
def handle_check_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = True
item.purchased_at = datetime.utcnow()
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_checked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('uncheck_item')
def handle_uncheck_item(data):
item = Item.query.get(data['item_id'])
if item:
item.purchased = False
item.purchased_at = None
db.session.commit()
purchased_count, total_count, percent = get_progress(item.list_id)
emit('item_unchecked', {'item_id': item.id}, to=str(item.list_id))
emit('progress_updated', {
'purchased_count': purchased_count,
'total_count': total_count,
'percent': percent
}, to=str(item.list_id))
@socketio.on('request_full_list')
def handle_request_full_list(data):
list_id = data['list_id']
items = Item.query.filter_by(list_id=list_id).all()
items_data = []
for item in items:
items_data.append({
'id': item.id,
'name': item.name,
'quantity': item.quantity,
'purchased': item.purchased,
'note': item.note or ''
})
emit('full_list', {'items': items_data}, to=request.sid)
@socketio.on('update_note')
def handle_update_note(data):
item_id = data['item_id']
note = data['note']
item = Item.query.get(item_id)
if item:
item.note = note
db.session.commit()
emit('note_updated', {'item_id': item_id, 'note': note}, to=str(item.list_id))
@socketio.on('add_expense')
def handle_add_expense(data):
list_id = data['list_id']
amount = data['amount']
new_expense = Expense(list_id=list_id, amount=amount)
db.session.add(new_expense)
db.session.commit()
total = db.session.query(func.sum(Expense.amount)).filter_by(list_id=list_id).scalar() or 0
emit('expense_added', {
'amount': amount,
'total': total
}, to=str(list_id))
""" @socketio.on('receipt_uploaded')
def handle_receipt_uploaded(data):
list_id = data['list_id']
url = data['url']
emit('receipt_added', {
'url': url
}, to=str(list_id), include_self=False) """
@app.cli.command('create_db')
def create_db():
db.create_all()
print('Database created.')
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=8000, debug=True)