from flask import Flask, render_template, request, redirect, url_for, flash from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, login_user, login_required, logout_user, current_user, UserMixin, ) from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime from markupsafe import Markup from sqlalchemy import event from sqlalchemy.engine import Engine from decimal import Decimal, InvalidOperation import markdown as md from flask import request, flash, abort import os import re import socket app = Flask(__name__) # Ładujemy konfigurację z pliku config.py app.config.from_object("config.Config") db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = "login" # MODELE class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) is_admin = db.Column(db.Boolean, default=False) # Flaga głównego administratora def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) class Zbiorka(db.Model): id = db.Column(db.Integer, primary_key=True) nazwa = db.Column(db.String(100), nullable=False) opis = db.Column(db.Text, nullable=False) numer_konta = db.Column(db.String(50), nullable=False) numer_telefonu_blik = db.Column(db.String(50), nullable=False) cel = db.Column(db.Float, nullable=False, default=0.0) stan = db.Column(db.Float, default=0.0) ukryta = db.Column(db.Boolean, default=False) ukryj_kwote = db.Column(db.Boolean, default=False) zrealizowana = db.Column(db.Boolean, default=False) wplaty = db.relationship( "Wplata", back_populates="zbiorka", lazy=True, order_by="Wplata.data.desc()", cascade="all, delete-orphan", passive_deletes=True, ) class Wplata(db.Model): id = db.Column(db.Integer, primary_key=True) zbiorka_id = db.Column( db.Integer, db.ForeignKey("zbiorka.id", ondelete="CASCADE"), nullable=False, ) kwota = db.Column(db.Float, nullable=False) data = db.Column(db.DateTime, default=datetime.utcnow) opis = db.Column(db.Text, nullable=True) zbiorka = db.relationship("Zbiorka", back_populates="wplaty") class GlobalSettings(db.Model): id = db.Column(db.Integer, primary_key=True) numer_konta = db.Column(db.String(50), nullable=False) numer_telefonu_blik = db.Column(db.String(50), nullable=False) allowed_login_hosts = db.Column(db.Text, nullable=True) logo_url = db.Column(db.String(255), nullable=True) site_title = db.Column(db.String(120), nullable=True) show_logo_in_navbar = db.Column(db.Boolean, default=False) show_logo_in_navbar = db.Column(db.Boolean, default=False) navbar_brand_mode = db.Column(db.String(10), default="text") footer_brand_mode = db.Column(db.String(10), default="text") footer_text = db.Column(db.String(200), nullable=True) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): try: cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() except Exception: pass def get_real_ip(): headers = request.headers cf_ip = headers.get("CF-Connecting-IP") if cf_ip: return cf_ip.split(",")[0].strip() xff = headers.get("X-Forwarded-For") if xff: return xff.split(",")[0].strip() x_real_ip = headers.get("X-Real-IP") if x_real_ip: return x_real_ip.strip() return request.remote_addr def is_allowed_ip(remote_ip, allowed_hosts_str): if remote_ip in ("127.0.0.1", "::1"): return True if os.path.exists("emergency_access.txt"): return True allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip()) allowed_ips = set() for host in allowed_hosts: host = host.strip() if not host: continue try: resolved_ip = socket.gethostbyname(host) allowed_ips.add(resolved_ip) except Exception: continue try: hostname = socket.gethostbyaddr(remote_ip)[0] app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}") except Exception as e: app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}") return remote_ip in allowed_ips # Dodaj filtr Markdown – pozwala na zagnieżdżanie linków i obrazków w opisie @app.template_filter("markdown") def markdown_filter(text): return Markup(md.markdown(text)) @app.context_processor def inject_globals(): settings = GlobalSettings.query.first() allowed_hosts_str = ( settings.allowed_login_hosts if settings and settings.allowed_login_hosts else "" ) client_ip = get_real_ip() return { "is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str), "global_settings": settings, } # TRASY PUBLICZNE @app.route("/") def index(): zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all() return render_template("index.html", zbiorki=zbiorki) @app.route("/zbiorki_zrealizowane") def zbiorki_zrealizowane(): zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template("index.html", zbiorki=zbiorki) @app.errorhandler(404) def page_not_found(e): return redirect(url_for("index")) @app.route("/zbiorka/") def zbiorka(zbiorka_id): zb = Zbiorka.query.get_or_404(zbiorka_id) # Jeżeli zbiórka jest ukryta i użytkownik nie jest administratorem, zwróć 404 if zb.ukryta and (not current_user.is_authenticated or not current_user.is_admin): abort(404) return render_template("zbiorka.html", zbiorka=zb) # TRASY LOGOWANIA I REJESTRACJI @app.route("/zaloguj", methods=["GET", "POST"]) def zaloguj(): # Pobierz ustawienia globalne, w tym dozwolone hosty settings = GlobalSettings.query.first() allowed_hosts_str = "" if settings and settings.allowed_login_hosts: allowed_hosts_str = settings.allowed_login_hosts # Sprawdzenie, czy adres IP klienta jest dozwolony client_ip = get_real_ip() if not is_allowed_ip(client_ip, allowed_hosts_str): flash( "Dostęp do endpointu /login jest zablokowany dla Twojego adresu IP", "danger", ) return redirect(url_for("index")) if request.method == "POST": username = request.form["username"] password = request.form["password"] user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) flash("Zalogowano pomyślnie", "success") next_page = request.args.get("next") return ( redirect(next_page) if next_page else redirect(url_for("admin_dashboard")) ) else: flash("Nieprawidłowe dane logowania", "danger") return render_template("login.html") @app.route("/wyloguj") @login_required def wyloguj(): logout_user() flash("Wylogowano", "success") return redirect(url_for("zaloguj")) @app.route("/zarejestruj", methods=["GET", "POST"]) def zarejestruj(): if not app.config.get("ALLOW_REGISTRATION", False): flash("Rejestracja została wyłączona przez administratora", "danger") return redirect(url_for("zaloguj")) if request.method == "POST": username = request.form["username"] password = request.form["password"] if User.query.filter_by(username=username).first(): flash("Użytkownik już istnieje", "danger") return redirect(url_for("register")) new_user = User(username=username) new_user.set_password(password) db.session.add(new_user) db.session.commit() flash("Konto utworzone, możesz się zalogować", "success") return redirect(url_for("zaloguj")) return render_template("register.html") # PANEL ADMINISTRACYJNY @app.route("/admin") @login_required def admin_dashboard(): if not current_user.is_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all() completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template( "admin/dashboard.html", active_zbiorki=active_zbiorki, completed_zbiorki=completed_zbiorki, ) @app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"]) @app.route("/admin/zbiorka/edytuj/", methods=["GET", "POST"]) @login_required def formularz_zbiorek(zbiorka_id=None): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) # Tryb is_edit = zbiorka_id is not None zb = Zbiorka.query.get_or_404(zbiorka_id) if is_edit else None global_settings = GlobalSettings.query.first() if request.method == "POST": # Pola wspólne nazwa = request.form.get("nazwa", "").strip() opis = request.form.get("opis", "").strip() # IBAN/telefon — oczyść z nadmiarowych znaków odstępu (zostaw spacje w prezentacji frontu) numer_konta = request.form.get("numer_konta", "").strip() numer_telefonu_blik = request.form.get("numer_telefonu_blik", "").strip() # Cel — walidacja liczby try: cel_str = request.form.get("cel", "").replace(",", ".").strip() cel = Decimal(cel_str) if cel <= Decimal("0"): raise InvalidOperation except (InvalidOperation, ValueError): flash("Podano nieprawidłową wartość dla celu zbiórki", "danger") # render z dotychczasowo wpisanymi danymi (w trybie dodawania tworzymy tymczasowy obiekt) temp_zb = zb or Zbiorka( nazwa=nazwa, opis=opis, numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, cel=None, ukryj_kwote=("ukryj_kwote" in request.form), ) return render_template( "admin/formularz_zbiorek.html", zbiorka=temp_zb if is_edit else None if zb is None else temp_zb, global_settings=global_settings, ) ukryj_kwote = "ukryj_kwote" in request.form if is_edit: # Aktualizacja istniejącej zb.nazwa = nazwa zb.opis = opis zb.numer_konta = numer_konta zb.numer_telefonu_blik = numer_telefonu_blik zb.cel = float(cel) # jeśli masz Decimal w modelu, przypisz bez konwersji zb.ukryj_kwote = ukryj_kwote db.session.commit() flash("Zbiórka została zaktualizowana", "success") else: # Utworzenie nowej nowa = Zbiorka( nazwa=nazwa, opis=opis, numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, cel=float(cel), ukryj_kwote=ukryj_kwote, ) db.session.add(nowa) db.session.commit() flash("Zbiórka została dodana", "success") return redirect(url_for("admin_dashboard")) # GET return render_template( "admin/formularz_zbiorek.html", zbiorka=zb, global_settings=global_settings ) # TRASA DODAWANIA WPŁATY Z OPISEM # TRASA DODAWANIA WPŁATY W PANELU ADMINA @app.route("/admin/zbiorka//wplata/dodaj", methods=["GET", "POST"]) @login_required def dodaj_wplate(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) if request.method == "POST": kwota = float(request.form["kwota"]) opis = request.form.get("opis", "") nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis) zb.stan += kwota # Aktualizacja stanu zbiórki db.session.add(nowa_wplata) db.session.commit() flash("Wpłata została dodana", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/dodaj_wplate.html", zbiorka=zb) @app.route("/admin/zbiorka/usun/", methods=["POST"]) @login_required def usun_zbiorka(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) db.session.delete(zb) db.session.commit() flash("Zbiórka została usunięta", "success") return redirect(url_for("admin_dashboard")) @app.route("/admin/zbiorka/edytuj_stan/", methods=["GET", "POST"]) @login_required def edytuj_stan(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) if request.method == "POST": try: nowy_stan = float(request.form["stan"]) except ValueError: flash("Nieprawidłowa wartość kwoty", "danger") return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id)) zb.stan = nowy_stan db.session.commit() flash("Stan zbiórki został zaktualizowany", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/edytuj_stan.html", zbiorka=zb) @app.route("/admin/zbiorka/zmien_widzialnosc/", methods=["POST"]) @login_required def zmien_widzialnosc(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) zb.ukryta = not zb.ukryta db.session.commit() flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success") return redirect(url_for("admin_dashboard")) def create_admin_account(): admin = User.query.filter_by(is_admin=True).first() if not admin: main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() from flask import request @app.after_request def apply_headers(response): # --- STATIC: jak wcześniej --- if request.path.startswith("/static/"): response.headers.pop("Content-Disposition", None) response.headers["Vary"] = "Accept-Encoding" response.headers["Cache-Control"] = app.config.get( "CACHE_CONTROL_HEADER_STATIC" ) if app.config.get("USE_ETAGS", True) and "ETag" not in response.headers: response.add_etag() response.make_conditional(request) return response path_norm = request.path.lstrip("/") is_admin = path_norm.startswith("admin/") or path_norm == "admin" if is_admin: if (response.mimetype or "").startswith("text/html"): response.headers["Cache-Control"] = "no-store, no-cache" response.headers.pop("ETag", None) return response if response.status_code in (301, 302, 303, 307, 308): response.headers.pop("Vary", None) return response if 400 <= response.status_code < 500: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers.pop("Vary", None) elif 500 <= response.status_code < 600: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers["Retry-After"] = "120" response.headers.pop("Vary", None) else: response.headers["Vary"] = "Cookie, Accept-Encoding" default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, max-age=0" response.headers["Cache-Control"] = default_cache if ( app.config.get("BLOCK_BOTS", False) and not is_admin and not request.path.startswith("/static/") ): cc_override = app.config.get("CACHE_CONTROL_HEADER") if cc_override: response.headers["Cache-Control"] = cc_override response.headers["X-Robots-Tag"] = ( app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive" ) return response @app.route("/admin/ustawienia", methods=["GET", "POST"]) @login_required def admin_ustawienia(): if not current_user.is_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) client_ip = get_real_ip() settings = GlobalSettings.query.first() if request.method == "POST": numer_konta = request.form.get("numer_konta") numer_telefonu_blik = request.form.get("numer_telefonu_blik") allowed_login_hosts = request.form.get("allowed_login_hosts") logo_url = request.form.get("logo_url") site_title = request.form.get("site_title") navbar_brand_mode = request.form.get("navbar_brand_mode", "text") footer_brand_mode = request.form.get("footer_brand_mode", "text") footer_text = request.form.get("footer_text") or None show_logo_in_navbar = navbar_brand_mode == "logo" if settings is None: settings = GlobalSettings( numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, allowed_login_hosts=allowed_login_hosts, logo_url=logo_url, site_title=site_title, show_logo_in_navbar=show_logo_in_navbar, navbar_brand_mode=navbar_brand_mode, footer_brand_mode=footer_brand_mode, footer_text=footer_text, ) db.session.add(settings) else: settings.numer_konta = numer_konta settings.numer_telefonu_blik = numer_telefonu_blik settings.allowed_login_hosts = allowed_login_hosts settings.logo_url = logo_url settings.site_title = site_title settings.show_logo_in_navbar = show_logo_in_navbar settings.navbar_brand_mode = navbar_brand_mode settings.footer_brand_mode = footer_brand_mode settings.footer_text = footer_text db.session.commit() flash("Ustawienia globalne zostały zaktualizowane", "success") return redirect(url_for("admin_dashboard")) return render_template( "admin/ustawienia.html", settings=settings, client_ip=client_ip ) @app.route( "/admin/zbiorka/oznacz/niezrealizowana/", methods=["POST"], endpoint="oznacz_niezrealizowana", ) @app.route( "/admin/zbiorka/oznacz/zrealizowana/", methods=["POST"], endpoint="oznacz_zrealizowana", ) @login_required def oznacz_zbiorka(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień do wykonania tej operacji", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) if "niezrealizowana" in request.path: zb.zrealizowana = False msg = "Zbiórka została oznaczona jako niezrealizowana" else: zb.zrealizowana = True msg = "Zbiórka została oznaczona jako zrealizowana" db.session.commit() flash(msg, "success") return redirect(url_for("admin_dashboard")) @app.route("/robots.txt") def robots(): if app.config.get("BLOCK_BOTS", False): robots_txt = "User-agent: *\nDisallow: /" else: robots_txt = "User-agent: *\nAllow: /" return robots_txt, 200, {"Content-Type": "text/plain"} @app.route("/favicon.ico") def favicon(): return "", 204 @app.route("/healthcheck") def healthcheck(): header_token = request.headers.get("X-Internal-Check") correct_token = app.config.get("HEALTHCHECK_TOKEN") if header_token != correct_token: abort(404) return "OK", 200 if __name__ == "__main__": with app.app_context(): db.create_all() # Tworzenie konta głównego admina, jeśli nie istnieje if not User.query.filter_by(is_admin=True).first(): main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() app.run(debug=True)