nowe opcje w paragonacch

This commit is contained in:
Mateusz Gruszczyński
2025-07-20 22:08:25 +02:00
parent ae89f55446
commit 62939a9e9a
5 changed files with 190 additions and 79 deletions

219
app.py
View File

@@ -6,6 +6,9 @@ import mimetypes
import sys
import platform
import psutil
import secrets
import hashlib
from pillow_heif import register_heif_opener
from datetime import datetime, timedelta, UTC, timezone
@@ -39,7 +42,7 @@ from flask_compress import Compress
from flask_socketio import SocketIO, emit, join_room
from werkzeug.security import generate_password_hash, check_password_hash
from config import Config
from PIL import Image
from PIL import Image, ExifTags
from werkzeug.utils import secure_filename
from werkzeug.middleware.proxy_fix import ProxyFix
from sqlalchemy import func, extract
@@ -150,7 +153,7 @@ class Receipt(db.Model):
uploaded_at = db.Column(db.DateTime, default=datetime.utcnow)
shopping_list = db.relationship("ShoppingList", backref="receipts", lazy=True)
filesize = db.Column(db.Integer, nullable=True)
file_hash = db.Column(db.String(64), nullable=True, unique=True)
with app.app_context():
db.create_all()
@@ -253,14 +256,38 @@ def enrich_list_data(l):
l.total_expense = sum(e.amount for e in expenses)
return l
def save_resized_image(file, path):
image = Image.open(file)
try:
image = Image.open(file)
image.verify() # sprawdzenie poprawności pliku
file.seek(0) # reset do początku
image = Image.open(file) # ponowne otwarcie po verify()
except Exception:
raise ValueError("Nieprawidłowy plik graficzny")
# Obrót na podstawie EXIF
try:
exif = image._getexif()
if exif:
orientation_key = next(
k for k, v in ExifTags.TAGS.items() if v == "Orientation"
)
orientation = exif.get(orientation_key)
if orientation == 3:
image = image.rotate(180, expand=True)
elif orientation == 6:
image = image.rotate(270, expand=True)
elif orientation == 8:
image = image.rotate(90, expand=True)
except Exception:
pass # brak lub błędny EXIF
image.thumbnail((2000, 2000))
image = image.convert("RGB")
image.info.clear()
new_path = path.rsplit(".", 1)[0] + ".webp"
image = image.convert("RGB")
image.save(new_path, format="WEBP", quality=85)
image.save(new_path, format="WEBP", quality=85, method=6)
def redirect_with_flash(
@@ -299,6 +326,13 @@ def delete_receipts_for_list(list_id):
print(f"Nie udało się usunąć pliku {filename}: {e}")
def _receipt_error(message):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
return jsonify({"success": False, "error": message}), 400
flash(message, "danger")
return redirect(request.referrer or url_for("main_page"))
# zabezpieczenie logowani do systemu - błędne hasła
def is_ip_blocked(ip):
now = time.time()
@@ -911,34 +945,6 @@ def all_products():
return {"allproducts": unique_names}
""" @app.route('/upload_receipt/<int:list_id>', methods=['POST'])
def upload_receipt(list_id):
if 'receipt' not in request.files:
flash('Brak pliku', 'danger')
return redirect(request.referrer)
file = request.files['receipt']
if file.filename == '':
flash('Nie wybrano pliku', 'danger')
return redirect(request.referrer)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], f"list_{list_id}_{filename}")
save_resized_image(file, file_path)
flash('Wgrano paragon', 'success')
return redirect(request.referrer)
flash('Niedozwolony format pliku', 'danger')
return redirect(request.referrer) """
from datetime import datetime
@app.route("/upload_receipt/<int:list_id>", methods=["POST"])
def upload_receipt(list_id):
if "receipt" not in request.files:
@@ -949,39 +955,52 @@ def upload_receipt(list_id):
return _receipt_error("Nie wybrano pliku")
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
base_name = f"list_{list_id}_{filename.rsplit('.', 1)[0]}"
webp_filename = base_name + ".webp"
import hashlib
file_bytes = file.read()
file.seek(0)
file_hash = hashlib.sha256(file_bytes).hexdigest()
existing = Receipt.query.filter_by(file_hash=file_hash).first()
if existing:
return _receipt_error("Taki plik już istnieje")
now = datetime.now(timezone.utc)
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
webp_filename = f"list_{list_id}_{timestamp}_{random_part}.webp"
file_path = os.path.join(app.config["UPLOAD_FOLDER"], webp_filename)
save_resized_image(file, file_path)
try:
save_resized_image(file, file_path)
except ValueError as e:
return _receipt_error(str(e))
filesize = os.path.getsize(file_path) if os.path.exists(file_path) else None
uploaded_at = datetime.utcnow()
uploaded_at = datetime.now(timezone.utc)
new_receipt = Receipt(
list_id=list_id,
filename=webp_filename,
filesize=filesize,
uploaded_at=uploaded_at,
file_hash=file_hash,
)
db.session.add(new_receipt)
db.session.commit()
if (
request.is_json
or request.headers.get("X-Requested-With") == "XMLHttpRequest"
):
if request.is_json or request.headers.get("X-Requested-With") == "XMLHttpRequest":
url = url_for("uploaded_file", filename=webp_filename)
socketio.emit("receipt_added", {"url": url}, to=str(list_id))
return jsonify({"success": True, "url": url})
flash("Wgrano paragon", "success")
return redirect(request.referrer)
return redirect(request.referrer or url_for("main_page"))
return _receipt_error("Niedozwolony format pliku")
@app.route("/uploads/<filename>")
def uploaded_file(filename):
response = send_from_directory(app.config["UPLOAD_FOLDER"], filename)
@@ -1224,35 +1243,105 @@ def admin_receipts(id):
return render_template("admin/receipts.html", receipts=receipts)
@app.route("/admin/delete_receipt/<filename>")
@app.route("/admin/rotate_receipt/<int:receipt_id>")
@login_required
@admin_required
def delete_receipt(filename):
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
removed_file = False
removed_db = False
def rotate_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
filepath = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
# Usuń plik z dysku
if not os.path.exists(filepath):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
try:
image = Image.open(filepath)
rotated = image.rotate(-90, expand=True)
rotated.save(filepath, format="WEBP", quality=85)
flash("Obrócono paragon", "success")
except Exception as e:
flash(f"Błąd przy obracaniu: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/rename_receipt/<int:receipt_id>")
@login_required
@admin_required
def rename_receipt(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
old_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(old_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M")
random_part = secrets.token_hex(3)
new_filename = f"list_{receipt.list_id}_{timestamp}_{random_part}.webp"
new_path = os.path.join(app.config["UPLOAD_FOLDER"], new_filename)
try:
os.rename(old_path, new_path)
receipt.filename = new_filename
db.session.commit()
flash("Zmieniono nazwę pliku", "success")
except Exception as e:
flash(f"Błąd przy zmianie nazwy: {str(e)}", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/delete_receipt/<int:receipt_id>")
@login_required
@admin_required
def delete_receipt(receipt_id):
receipt = Receipt.query.get(receipt_id)
if not receipt:
flash("Paragon nie istnieje", "danger")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
# Usuń plik
if os.path.exists(file_path):
os.remove(file_path)
removed_file = True
try:
os.remove(file_path)
except Exception as e:
flash(f"Błąd przy usuwaniu pliku: {str(e)}", "danger")
# Usuń rekord z bazy
receipt = Receipt.query.filter_by(filename=filename).first()
if receipt:
db.session.delete(receipt)
db.session.delete(receipt)
db.session.commit()
flash("Paragon usunięty", "success")
return redirect(request.referrer or url_for("admin_receipts", id="all"))
@app.route("/admin/generate_receipt_hash/<int:receipt_id>")
@login_required
@admin_required
def generate_receipt_hash(receipt_id):
receipt = Receipt.query.get_or_404(receipt_id)
if receipt.file_hash:
flash("Hash już istnieje", "info")
return redirect(request.referrer)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], receipt.filename)
if not os.path.exists(file_path):
flash("Plik nie istnieje", "danger")
return redirect(request.referrer)
import hashlib
try:
with open(file_path, "rb") as f:
file_hash = hashlib.sha256(f.read()).hexdigest()
receipt.file_hash = file_hash
db.session.commit()
removed_db = True
flash("Hash wygenerowany", "success")
except Exception as e:
flash(f"Błąd przy generowaniu hasha: {e}", "danger")
# Komunikat
if removed_file or removed_db:
flash("Paragon usunięty", "success")
else:
flash("Paragon nie istnieje", "danger")
# Powrót
next_url = request.args.get("next")
return redirect(next_url or url_for("admin_receipts", id="all"))
return redirect(request.referrer)
@app.route("/admin/delete_selected_lists", methods=["POST"])