fix w wykresach

This commit is contained in:
Mateusz Gruszczyński
2025-07-27 11:12:01 +02:00
parent 81985f7f84
commit 4b76df795b

199
app.py
View File

@@ -64,18 +64,15 @@ app = Flask(__name__)
app.config.from_object(Config)
# Konfiguracja nagłówków bezpieczeństwa z .env
csp_policy = None
if app.config.get("ENABLE_CSP", True):
csp_policy = {
"default-src": "'self'",
"script-src": "'self'", # wciąż bez inline JS
"style-src": "'self' 'unsafe-inline'", # dopuszczamy style w HTML-u
"img-src": "'self' data:", # pozwalamy na data:image (np. SVG)
"connect-src": "'self'", # WebSockety
"script-src": "'self' 'unsafe-inline'",
}
csp_policy = {
"default-src": "'self'",
"script-src": "'self' 'unsafe-inline'",
"style-src": "'self' 'unsafe-inline'",
"img-src": "'self' data:",
"connect-src": "'self'",
} if app.config.get("ENABLE_CSP", True) else None
permissions_policy = {"browsing-topics": "()"} if app.config["ENABLE_PP"] else None
permissions_policy = {"browsing-topics": "()"} if app.config.get("ENABLE_PP") else None
talisman_kwargs = {
"force_https": False,
@@ -85,11 +82,12 @@ talisman_kwargs = {
"content_security_policy": csp_policy,
"x_content_type_options": app.config.get("ENABLE_XCTO", True),
"strict_transport_security_include_subdomains": False,
"session_cookie_secure": app.config["SESSION_COOKIE_SECURE"],
"session_cookie_secure": app.config.get("SESSION_COOKIE_SECURE", False),
}
if app.config.get("REFERRER_POLICY"):
talisman_kwargs["referrer_policy"] = app.config["REFERRER_POLICY"]
referrer_policy = app.config.get("REFERRER_POLICY")
if referrer_policy:
talisman_kwargs["referrer_policy"] = referrer_policy
talisman = Talisman(app, **talisman_kwargs)
@@ -232,9 +230,9 @@ with app.app_context():
@static_bp.route("/static/js/<path:filename>")
def serve_js(filename):
response = send_from_directory("static/js", filename)
#response.cache_control.no_cache = True
#response.cache_control.no_store = True
#response.cache_control.must_revalidate = True
# response.cache_control.no_cache = True
# response.cache_control.no_store = True
# response.cache_control.must_revalidate = True
response.headers["Cache-Control"] = app.config["JS_CACHE_CONTROL"]
response.headers.pop("Content-Disposition", None)
response.headers.pop("Etag", None)
@@ -556,6 +554,7 @@ def attempts_remaining(ip):
####################################################
def get_client_ip():
# Obsługuje: X-Forwarded-For, X-Real-IP, fallback na remote_addr
for header in ["X-Forwarded-For", "X-Real-IP"]:
@@ -566,6 +565,7 @@ def get_client_ip():
return ip
return request.remote_addr
@login_manager.user_loader
def load_user(user_id):
# return User.query.get(int(user_id))
@@ -637,6 +637,7 @@ def require_system_password():
def start_timer():
request._start_time = time.time()
@app.after_request
def log_request(response):
if request.path == "/healthcheck":
@@ -651,7 +652,7 @@ def log_request(response):
duration = round((time.time() - start) * 1000, 2) if start else "-"
agent = request.headers.get("User-Agent", "-")
log_msg = f"{ip} - \"{method} {path}\" {status} {length} {duration}ms \"{agent}\""
log_msg = f'{ip} - "{method} {path}" {status} {length} {duration}ms "{agent}"'
app.logger.info(log_msg)
return response
@@ -838,7 +839,7 @@ def system_auth():
"authorized",
AUTHORIZED_COOKIE_VALUE,
max_age=max_age,
secure=request.is_secure
secure=request.is_secure,
)
return resp
else:
@@ -992,14 +993,13 @@ def login():
if user and check_password_hash(user.password_hash, request.form["password"]):
session.permanent = True
login_user(user)
#session["logged"] = True
# session["logged"] = True
flash("Zalogowano pomyślnie", "success")
return redirect(url_for("main_page"))
flash("Nieprawidłowy login lub hasło", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
@@ -1140,33 +1140,47 @@ def user_expenses_data():
end_date = request.args.get("end_date")
show_all = request.args.get("show_all", "false").lower() == "true"
query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
base_query = Expense.query.join(ShoppingList, Expense.list_id == ShoppingList.id)
if show_all:
query = query.filter(
base_query = base_query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
ShoppingList.owner_id == current_user.id,
ShoppingList.is_public == True,
)
)
else:
query = query.filter(ShoppingList.owner_id == current_user.id)
base_query = base_query.filter(ShoppingList.owner_id == current_user.id)
if start_date and end_date:
try:
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1)
query = query.filter(Expense.added_at >= start, Expense.added_at < end)
base_query = base_query.filter(
Expense.added_at >= start, Expense.added_at < end
)
except ValueError:
return jsonify({"error": "Błędne daty"}), 400
# Wybierz tylko najnowszy wydatek dla każdej listy
subq = (
db.session.query(
Expense.list_id,
func.max(Expense.added_at).label("latest"),
)
.group_by(Expense.list_id)
.subquery()
)
query = base_query.join(
subq, (Expense.list_id == subq.c.list_id) & (Expense.added_at == subq.c.latest)
)
expenses = query.all()
grouped = defaultdict(float)
for e in expenses:
# ts = e.added_at or datetime.utcnow()
ts = e.added_at or datetime.now(timezone.utc)
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
@@ -2067,108 +2081,47 @@ def admin_expenses_data():
range_type = request.args.get("range", "monthly")
start_date_str = request.args.get("start_date")
end_date_str = request.args.get("end_date")
# now = datetime.utcnow()
now = datetime.now(timezone.utc)
labels = []
expenses = []
subq = (
db.session.query(Expense.list_id, func.max(Expense.added_at).label("latest"))
.group_by(Expense.list_id)
.subquery()
)
query = db.session.query(Expense).join(
subq, (Expense.list_id == subq.c.list_id) & (Expense.added_at == subq.c.latest)
)
if start_date_str and end_date_str:
start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
try:
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.strptime(end_date_str, "%Y-%m-%d")
query = query.filter(Expense.added_at >= start, Expense.added_at <= end)
except ValueError:
return jsonify({"error": "Błędny zakres dat"}), 400
expenses_query = (
db.session.query(
extract("year", Expense.added_at).label("year"),
extract("month", Expense.added_at).label("month"),
func.sum(Expense.amount).label("total"),
)
.filter(Expense.added_at >= start_date, Expense.added_at <= end_date)
.group_by("year", "month")
.order_by("year", "month")
.all()
)
grouped = defaultdict(float)
for e in query.all():
ts = e.added_at or now
for row in expenses_query:
label = f"{int(row.month):02d}/{int(row.year)}"
labels.append(label)
expenses.append(round(row.total, 2))
if range_type == "monthly":
key = ts.strftime("%Y-%m")
elif range_type == "quarterly":
key = f"{ts.year}-Q{((ts.month - 1) // 3 + 1)}"
elif range_type == "halfyearly":
key = f"{ts.year}-H{1 if ts.month <= 6 else 2}"
elif range_type == "yearly":
key = str(ts.year)
else:
key = ts.strftime("%Y-%m-%d")
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = (
"no-store, no-cache, must-revalidate, max-age=0"
)
return response
grouped[key] += e.amount
if range_type == "monthly":
for i in range(11, -1, -1):
year = (now - timedelta(days=i * 30)).year
month = (now - timedelta(days=i * 30)).month
label = f"{month:02d}/{year}"
labels.append(label)
labels = sorted(grouped)
data = [round(grouped[k], 2) for k in labels]
month_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(extract("month", Expense.added_at) == month)
.scalar()
or 0
)
expenses.append(round(month_sum, 2))
elif range_type == "quarterly":
for i in range(3, -1, -1):
quarter_start = now - timedelta(days=i * 90)
year = quarter_start.year
quarter = (quarter_start.month - 1) // 3 + 1
label = f"Q{quarter}/{year}"
quarter_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter((extract("month", Expense.added_at) - 1) // 3 + 1 == quarter)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(quarter_sum, 2))
elif range_type == "halfyearly":
for i in range(1, -1, -1):
half_start = now - timedelta(days=i * 180)
year = half_start.year
half = 1 if half_start.month <= 6 else 2
label = f"H{half}/{year}"
half_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.filter(
(extract("month", Expense.added_at) <= 6)
if half == 1
else (extract("month", Expense.added_at) > 6)
)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(half_sum, 2))
elif range_type == "yearly":
for i in range(4, -1, -1):
year = now.year - i
label = str(year)
year_sum = (
db.session.query(func.sum(Expense.amount))
.filter(extract("year", Expense.added_at) == year)
.scalar()
or 0
)
labels.append(label)
expenses.append(round(year_sum, 2))
response = make_response(jsonify({"labels": labels, "expenses": expenses}))
response.headers["Cache-Control"] = "no-store, no-cache"
return response
return jsonify({"labels": labels, "expenses": data})
@app.route("/admin/promote_user/<int:user_id>")
@@ -2264,6 +2217,7 @@ def recalculate_filesizes():
)
return redirect(url_for("admin_receipts", id="all"))
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")
@@ -2273,6 +2227,7 @@ def healthcheck():
abort(404)
return "OK", 200
@app.route("/robots.txt")
def robots_txt():
if app.config.get("DISABLE_ROBOTS", False):