Files
zbiorki_app/app.py
2025-08-28 12:42:57 +02:00

608 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
login_user,
login_required,
logout_user,
current_user,
UserMixin,
)
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from markupsafe import Markup
from sqlalchemy import event
from sqlalchemy.engine import Engine
from decimal import Decimal, InvalidOperation
import markdown as md
from flask import request, flash, abort
import os
import re
import socket
app = Flask(__name__)
# Ładujemy konfigurację z pliku config.py
app.config.from_object("config.Config")
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = "login"
# MODELE
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_admin = db.Column(db.Boolean, default=False) # Flaga głównego administratora
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Zbiorka(db.Model):
id = db.Column(db.Integer, primary_key=True)
nazwa = db.Column(db.String(100), nullable=False)
opis = db.Column(db.Text, nullable=False)
numer_konta = db.Column(db.String(50), nullable=False)
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
cel = db.Column(db.Float, nullable=False, default=0.0)
stan = db.Column(db.Float, default=0.0)
ukryta = db.Column(db.Boolean, default=False)
ukryj_kwote = db.Column(db.Boolean, default=False)
zrealizowana = db.Column(db.Boolean, default=False)
wplaty = db.relationship(
"Wplata",
back_populates="zbiorka",
lazy=True,
order_by="Wplata.data.desc()",
cascade="all, delete-orphan",
passive_deletes=True,
)
class Wplata(db.Model):
id = db.Column(db.Integer, primary_key=True)
zbiorka_id = db.Column(
db.Integer,
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
nullable=False,
)
kwota = db.Column(db.Float, nullable=False)
data = db.Column(db.DateTime, default=datetime.utcnow)
opis = db.Column(db.Text, nullable=True)
zbiorka = db.relationship("Zbiorka", back_populates="wplaty")
class GlobalSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
numer_konta = db.Column(db.String(50), nullable=False)
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
allowed_login_hosts = db.Column(db.Text, nullable=True)
logo_url = db.Column(db.String(255), nullable=True)
site_title = db.Column(db.String(120), nullable=True)
show_logo_in_navbar = db.Column(db.Boolean, default=False)
show_logo_in_navbar = db.Column(db.Boolean, default=False)
navbar_brand_mode = db.Column(db.String(10), default="text")
footer_brand_mode = db.Column(db.String(10), default="text")
footer_text = db.Column(db.String(200), nullable=True)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
try:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
except Exception:
pass
def get_real_ip():
headers = request.headers
cf_ip = headers.get("CF-Connecting-IP")
if cf_ip:
return cf_ip.split(",")[0].strip()
xff = headers.get("X-Forwarded-For")
if xff:
return xff.split(",")[0].strip()
x_real_ip = headers.get("X-Real-IP")
if x_real_ip:
return x_real_ip.strip()
return request.remote_addr
def is_allowed_ip(remote_ip, allowed_hosts_str):
if remote_ip in ("127.0.0.1", "::1"):
return True
if os.path.exists("emergency_access.txt"):
return True
allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip())
allowed_ips = set()
for host in allowed_hosts:
host = host.strip()
if not host:
continue
try:
resolved_ip = socket.gethostbyname(host)
allowed_ips.add(resolved_ip)
except Exception:
continue
try:
hostname = socket.gethostbyaddr(remote_ip)[0]
app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}")
except Exception as e:
app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}")
return remote_ip in allowed_ips
# Dodaj filtr Markdown pozwala na zagnieżdżanie linków i obrazków w opisie
@app.template_filter("markdown")
def markdown_filter(text):
return Markup(md.markdown(text))
@app.context_processor
def inject_globals():
settings = GlobalSettings.query.first()
allowed_hosts_str = (
settings.allowed_login_hosts
if settings and settings.allowed_login_hosts
else ""
)
client_ip = get_real_ip()
return {
"is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str),
"global_settings": settings,
}
# TRASY PUBLICZNE
@app.route("/")
def index():
zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all()
return render_template("index.html", zbiorki=zbiorki)
@app.route("/zbiorki_zrealizowane")
def zbiorki_zrealizowane():
zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
return render_template("index.html", zbiorki=zbiorki)
@app.errorhandler(404)
def page_not_found(e):
return redirect(url_for("index"))
@app.route("/zbiorka/<int:zbiorka_id>")
def zbiorka(zbiorka_id):
zb = Zbiorka.query.get_or_404(zbiorka_id)
# Jeżeli zbiórka jest ukryta i użytkownik nie jest administratorem, zwróć 404
if zb.ukryta and (not current_user.is_authenticated or not current_user.is_admin):
abort(404)
return render_template("zbiorka.html", zbiorka=zb)
# TRASY LOGOWANIA I REJESTRACJI
@app.route("/zaloguj", methods=["GET", "POST"])
def zaloguj():
# Pobierz ustawienia globalne, w tym dozwolone hosty
settings = GlobalSettings.query.first()
allowed_hosts_str = ""
if settings and settings.allowed_login_hosts:
allowed_hosts_str = settings.allowed_login_hosts
# Sprawdzenie, czy adres IP klienta jest dozwolony
client_ip = get_real_ip()
if not is_allowed_ip(client_ip, allowed_hosts_str):
flash(
"Dostęp do endpointu /login jest zablokowany dla Twojego adresu IP",
"danger",
)
return redirect(url_for("index"))
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash("Zalogowano pomyślnie", "success")
next_page = request.args.get("next")
return (
redirect(next_page)
if next_page
else redirect(url_for("admin_dashboard"))
)
else:
flash("Nieprawidłowe dane logowania", "danger")
return render_template("login.html")
@app.route("/wyloguj")
@login_required
def wyloguj():
logout_user()
flash("Wylogowano", "success")
return redirect(url_for("zaloguj"))
@app.route("/zarejestruj", methods=["GET", "POST"])
def zarejestruj():
if not app.config.get("ALLOW_REGISTRATION", False):
flash("Rejestracja została wyłączona przez administratora", "danger")
return redirect(url_for("zaloguj"))
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if User.query.filter_by(username=username).first():
flash("Użytkownik już istnieje", "danger")
return redirect(url_for("register"))
new_user = User(username=username)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
flash("Konto utworzone, możesz się zalogować", "success")
return redirect(url_for("zaloguj"))
return render_template("register.html")
# PANEL ADMINISTRACYJNY
@app.route("/admin")
@login_required
def admin_dashboard():
if not current_user.is_admin:
flash("Brak uprawnień do panelu administracyjnego", "danger")
return redirect(url_for("index"))
active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all()
completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
return render_template(
"admin/dashboard.html",
active_zbiorki=active_zbiorki,
completed_zbiorki=completed_zbiorki,
)
@app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"])
@app.route("/admin/zbiorka/edytuj/<int:zbiorka_id>", methods=["GET", "POST"])
@login_required
def formularz_zbiorek(zbiorka_id=None):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
# Tryb
is_edit = zbiorka_id is not None
zb = Zbiorka.query.get_or_404(zbiorka_id) if is_edit else None
global_settings = GlobalSettings.query.first()
if request.method == "POST":
# Pola wspólne
nazwa = request.form.get("nazwa", "").strip()
opis = request.form.get("opis", "").strip()
# IBAN/telefon — oczyść z nadmiarowych znaków odstępu (zostaw spacje w prezentacji frontu)
numer_konta = request.form.get("numer_konta", "").strip()
numer_telefonu_blik = request.form.get("numer_telefonu_blik", "").strip()
# Cel — walidacja liczby
try:
cel_str = request.form.get("cel", "").replace(",", ".").strip()
cel = Decimal(cel_str)
if cel <= Decimal("0"):
raise InvalidOperation
except (InvalidOperation, ValueError):
flash("Podano nieprawidłową wartość dla celu zbiórki", "danger")
# render z dotychczasowo wpisanymi danymi (w trybie dodawania tworzymy tymczasowy obiekt)
temp_zb = zb or Zbiorka(
nazwa=nazwa,
opis=opis,
numer_konta=numer_konta,
numer_telefonu_blik=numer_telefonu_blik,
cel=None,
ukryj_kwote=("ukryj_kwote" in request.form),
)
return render_template(
"admin/formularz_zbiorek.html",
zbiorka=temp_zb if is_edit else None if zb is None else temp_zb,
global_settings=global_settings,
)
ukryj_kwote = "ukryj_kwote" in request.form
if is_edit:
# Aktualizacja istniejącej
zb.nazwa = nazwa
zb.opis = opis
zb.numer_konta = numer_konta
zb.numer_telefonu_blik = numer_telefonu_blik
zb.cel = float(cel) # jeśli masz Decimal w modelu, przypisz bez konwersji
zb.ukryj_kwote = ukryj_kwote
db.session.commit()
flash("Zbiórka została zaktualizowana", "success")
else:
# Utworzenie nowej
nowa = Zbiorka(
nazwa=nazwa,
opis=opis,
numer_konta=numer_konta,
numer_telefonu_blik=numer_telefonu_blik,
cel=float(cel),
ukryj_kwote=ukryj_kwote,
)
db.session.add(nowa)
db.session.commit()
flash("Zbiórka została dodana", "success")
return redirect(url_for("admin_dashboard"))
# GET
return render_template(
"admin/formularz_zbiorek.html", zbiorka=zb, global_settings=global_settings
)
# TRASA DODAWANIA WPŁATY Z OPISEM
# TRASA DODAWANIA WPŁATY W PANELU ADMINA
@app.route("/admin/zbiorka/<int:zbiorka_id>/wplata/dodaj", methods=["GET", "POST"])
@login_required
def dodaj_wplate(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
if request.method == "POST":
kwota = float(request.form["kwota"])
opis = request.form.get("opis", "")
nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis)
zb.stan += kwota # Aktualizacja stanu zbiórki
db.session.add(nowa_wplata)
db.session.commit()
flash("Wpłata została dodana", "success")
return redirect(url_for("admin_dashboard"))
return render_template("admin/dodaj_wplate.html", zbiorka=zb)
@app.route("/admin/zbiorka/usun/<int:zbiorka_id>", methods=["POST"])
@login_required
def usun_zbiorka(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
db.session.delete(zb)
db.session.commit()
flash("Zbiórka została usunięta", "success")
return redirect(url_for("admin_dashboard"))
@app.route("/admin/zbiorka/edytuj_stan/<int:zbiorka_id>", methods=["GET", "POST"])
@login_required
def edytuj_stan(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
if request.method == "POST":
try:
nowy_stan = float(request.form["stan"])
except ValueError:
flash("Nieprawidłowa wartość kwoty", "danger")
return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id))
zb.stan = nowy_stan
db.session.commit()
flash("Stan zbiórki został zaktualizowany", "success")
return redirect(url_for("admin_dashboard"))
return render_template("admin/edytuj_stan.html", zbiorka=zb)
@app.route("/admin/zbiorka/zmien_widzialnosc/<int:zbiorka_id>", methods=["POST"])
@login_required
def zmien_widzialnosc(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
zb.ukryta = not zb.ukryta
db.session.commit()
flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success")
return redirect(url_for("admin_dashboard"))
def create_admin_account():
admin = User.query.filter_by(is_admin=True).first()
if not admin:
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
db.session.add(main_admin)
db.session.commit()
from flask import request
@app.after_request
def apply_headers(response):
# --- STATIC: jak wcześniej ---
if request.path.startswith("/static/"):
response.headers.pop("Content-Disposition", None)
response.headers["Vary"] = "Accept-Encoding"
response.headers["Cache-Control"] = app.config.get(
"CACHE_CONTROL_HEADER_STATIC"
)
if app.config.get("USE_ETAGS", True) and "ETag" not in response.headers:
response.add_etag()
response.make_conditional(request)
return response
path_norm = request.path.lstrip("/")
is_admin = path_norm.startswith("admin/") or path_norm == "admin"
if is_admin:
if (response.mimetype or "").startswith("text/html"):
response.headers["Cache-Control"] = "no-store, no-cache"
response.headers.pop("ETag", None)
return response
if response.status_code in (301, 302, 303, 307, 308):
response.headers.pop("Vary", None)
return response
if 400 <= response.status_code < 500:
response.headers["Cache-Control"] = "no-store"
response.headers["Content-Type"] = "text/html; charset=utf-8"
response.headers.pop("Vary", None)
elif 500 <= response.status_code < 600:
response.headers["Cache-Control"] = "no-store"
response.headers["Content-Type"] = "text/html; charset=utf-8"
response.headers["Retry-After"] = "120"
response.headers.pop("Vary", None)
else:
response.headers["Vary"] = "Cookie, Accept-Encoding"
default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, max-age=0"
response.headers["Cache-Control"] = default_cache
if (
app.config.get("BLOCK_BOTS", False)
and not is_admin
and not request.path.startswith("/static/")
):
cc_override = app.config.get("CACHE_CONTROL_HEADER")
if cc_override:
response.headers["Cache-Control"] = cc_override
response.headers["X-Robots-Tag"] = (
app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive"
)
return response
@app.route("/admin/ustawienia", methods=["GET", "POST"])
@login_required
def admin_ustawienia():
if not current_user.is_admin:
flash("Brak uprawnień do panelu administracyjnego", "danger")
return redirect(url_for("index"))
client_ip = get_real_ip()
settings = GlobalSettings.query.first()
if request.method == "POST":
numer_konta = request.form.get("numer_konta")
numer_telefonu_blik = request.form.get("numer_telefonu_blik")
allowed_login_hosts = request.form.get("allowed_login_hosts")
logo_url = request.form.get("logo_url")
site_title = request.form.get("site_title")
navbar_brand_mode = request.form.get("navbar_brand_mode", "text")
footer_brand_mode = request.form.get("footer_brand_mode", "text")
footer_text = request.form.get("footer_text") or None
show_logo_in_navbar = navbar_brand_mode == "logo"
if settings is None:
settings = GlobalSettings(
numer_konta=numer_konta,
numer_telefonu_blik=numer_telefonu_blik,
allowed_login_hosts=allowed_login_hosts,
logo_url=logo_url,
site_title=site_title,
show_logo_in_navbar=show_logo_in_navbar,
navbar_brand_mode=navbar_brand_mode,
footer_brand_mode=footer_brand_mode,
footer_text=footer_text,
)
db.session.add(settings)
else:
settings.numer_konta = numer_konta
settings.numer_telefonu_blik = numer_telefonu_blik
settings.allowed_login_hosts = allowed_login_hosts
settings.logo_url = logo_url
settings.site_title = site_title
settings.show_logo_in_navbar = show_logo_in_navbar
settings.navbar_brand_mode = navbar_brand_mode
settings.footer_brand_mode = footer_brand_mode
settings.footer_text = footer_text
db.session.commit()
flash("Ustawienia globalne zostały zaktualizowane", "success")
return redirect(url_for("admin_dashboard"))
return render_template(
"admin/ustawienia.html", settings=settings, client_ip=client_ip
)
@app.route(
"/admin/zbiorka/oznacz/niezrealizowana/<int:zbiorka_id>",
methods=["POST"],
endpoint="oznacz_niezrealizowana",
)
@app.route(
"/admin/zbiorka/oznacz/zrealizowana/<int:zbiorka_id>",
methods=["POST"],
endpoint="oznacz_zrealizowana",
)
@login_required
def oznacz_zbiorka(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień do wykonania tej operacji", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
if "niezrealizowana" in request.path:
zb.zrealizowana = False
msg = "Zbiórka została oznaczona jako niezrealizowana"
else:
zb.zrealizowana = True
msg = "Zbiórka została oznaczona jako zrealizowana"
db.session.commit()
flash(msg, "success")
return redirect(url_for("admin_dashboard"))
@app.route("/robots.txt")
def robots():
if app.config.get("BLOCK_BOTS", False):
robots_txt = "User-agent: *\nDisallow: /"
else:
robots_txt = "User-agent: *\nAllow: /"
return robots_txt, 200, {"Content-Type": "text/plain"}
@app.route("/favicon.ico")
def favicon():
return "", 204
if __name__ == "__main__":
with app.app_context():
db.create_all()
# Tworzenie konta głównego admina, jeśli nie istnieje
if not User.query.filter_by(is_admin=True).first():
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
db.session.add(main_admin)
db.session.commit()
app.run(debug=True)