from flask import Flask, render_template, request, redirect, url_for, flash from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, login_user, login_required, logout_user, current_user, UserMixin, ) from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime from markupsafe import Markup from sqlalchemy import event from sqlalchemy.engine import Engine import markdown as md from flask import request, flash, abort import os import re import socket app = Flask(__name__) # Ładujemy konfigurację z pliku config.py app.config.from_object("config.Config") db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = "login" # MODELE class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(128), nullable=False) is_admin = db.Column(db.Boolean, default=False) # Flaga głównego administratora def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) class Zbiorka(db.Model): id = db.Column(db.Integer, primary_key=True) nazwa = db.Column(db.String(100), nullable=False) opis = db.Column(db.Text, nullable=False) numer_konta = db.Column(db.String(50), nullable=False) numer_telefonu_blik = db.Column(db.String(50), nullable=False) cel = db.Column(db.Float, nullable=False, default=0.0) stan = db.Column(db.Float, default=0.0) ukryta = db.Column(db.Boolean, default=False) ukryj_kwote = db.Column(db.Boolean, default=False) zrealizowana = db.Column(db.Boolean, default=False) wplaty = db.relationship( "Wplata", back_populates="zbiorka", lazy=True, order_by="Wplata.data.desc()", cascade="all, delete-orphan", passive_deletes=True, ) class Wplata(db.Model): id = db.Column(db.Integer, primary_key=True) zbiorka_id = db.Column( db.Integer, db.ForeignKey("zbiorka.id", ondelete="CASCADE"), nullable=False, ) kwota = db.Column(db.Float, nullable=False) data = db.Column(db.DateTime, default=datetime.utcnow) opis = db.Column(db.Text, nullable=True) zbiorka = db.relationship("Zbiorka", back_populates="wplaty") class GlobalSettings(db.Model): id = db.Column(db.Integer, primary_key=True) numer_konta = db.Column(db.String(50), nullable=False) numer_telefonu_blik = db.Column(db.String(50), nullable=False) allowed_login_hosts = db.Column(db.Text, nullable=True) logo_url = db.Column(db.String(255), nullable=True) site_title = db.Column(db.String(120), nullable=True) show_logo_in_navbar = db.Column(db.Boolean, default=False) show_logo_in_navbar = db.Column(db.Boolean, default=False) navbar_brand_mode = db.Column(db.String(10), default="text") footer_brand_mode = db.Column(db.String(10), default="text") footer_text = db.Column(db.String(200), nullable=True) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): try: cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() except Exception: pass def get_real_ip(): headers = request.headers cf_ip = headers.get("CF-Connecting-IP") if cf_ip: return cf_ip.split(",")[0].strip() xff = headers.get("X-Forwarded-For") if xff: return xff.split(",")[0].strip() x_real_ip = headers.get("X-Real-IP") if x_real_ip: return x_real_ip.strip() return request.remote_addr def is_allowed_ip(remote_ip, allowed_hosts_str): if remote_ip in ("127.0.0.1", "::1"): return True if os.path.exists("emergency_access.txt"): return True allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip()) allowed_ips = set() for host in allowed_hosts: host = host.strip() if not host: continue try: resolved_ip = socket.gethostbyname(host) allowed_ips.add(resolved_ip) except Exception: continue try: hostname = socket.gethostbyaddr(remote_ip)[0] app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}") except Exception as e: app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}") return remote_ip in allowed_ips # Dodaj filtr Markdown – pozwala na zagnieżdżanie linków i obrazków w opisie @app.template_filter("markdown") def markdown_filter(text): return Markup(md.markdown(text)) @app.context_processor def inject_globals(): settings = GlobalSettings.query.first() allowed_hosts_str = ( settings.allowed_login_hosts if settings and settings.allowed_login_hosts else "" ) client_ip = get_real_ip() return { "is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str), "global_settings": settings, } # TRASY PUBLICZNE @app.route("/") def index(): zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all() return render_template("index.html", zbiorki=zbiorki) @app.route("/zbiorki_zrealizowane") def zbiorki_zrealizowane(): zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template("index.html", zbiorki=zbiorki) @app.errorhandler(404) def page_not_found(e): return redirect(url_for("index")) @app.route("/zbiorka/") def zbiorka(zbiorka_id): zb = Zbiorka.query.get_or_404(zbiorka_id) # Jeżeli zbiórka jest ukryta i użytkownik nie jest administratorem, zwróć 404 if zb.ukryta and (not current_user.is_authenticated or not current_user.is_admin): abort(404) return render_template("zbiorka.html", zbiorka=zb) # TRASY LOGOWANIA I REJESTRACJI @app.route("/zaloguj", methods=["GET", "POST"]) def zaloguj(): # Pobierz ustawienia globalne, w tym dozwolone hosty settings = GlobalSettings.query.first() allowed_hosts_str = "" if settings and settings.allowed_login_hosts: allowed_hosts_str = settings.allowed_login_hosts # Sprawdzenie, czy adres IP klienta jest dozwolony client_ip = get_real_ip() if not is_allowed_ip(client_ip, allowed_hosts_str): flash( "Dostęp do endpointu /login jest zablokowany dla Twojego adresu IP", "danger", ) return redirect(url_for("index")) if request.method == "POST": username = request.form["username"] password = request.form["password"] user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) flash("Zalogowano pomyślnie", "success") next_page = request.args.get("next") return ( redirect(next_page) if next_page else redirect(url_for("admin_dashboard")) ) else: flash("Nieprawidłowe dane logowania", "danger") return render_template("login.html") @app.route("/wyloguj") @login_required def wyloguj(): logout_user() flash("Wylogowano", "success") return redirect(url_for("zaloguj")) @app.route("/zarejestruj", methods=["GET", "POST"]) def zarejestruj(): if not app.config.get("ALLOW_REGISTRATION", False): flash("Rejestracja została wyłączona przez administratora", "danger") return redirect(url_for("zaloguj")) if request.method == "POST": username = request.form["username"] password = request.form["password"] if User.query.filter_by(username=username).first(): flash("Użytkownik już istnieje", "danger") return redirect(url_for("register")) new_user = User(username=username) new_user.set_password(password) db.session.add(new_user) db.session.commit() flash("Konto utworzone, możesz się zalogować", "success") return redirect(url_for("zaloguj")) return render_template("register.html") # PANEL ADMINISTRACYJNY @app.route("/admin") @login_required def admin_dashboard(): if not current_user.is_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all() completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template( "admin/dashboard.html", active_zbiorki=active_zbiorki, completed_zbiorki=completed_zbiorki, ) @app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"]) @login_required def dodaj_zbiorke(): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia if request.method == "POST": nazwa = request.form["nazwa"] opis = request.form["opis"] # Pozyskujemy numer konta i telefon z formularza (mogą być nadpisane ręcznie) numer_konta = request.form["numer_konta"] numer_telefonu_blik = request.form["numer_telefonu_blik"] cel = float(request.form["cel"]) ukryj_kwote = "ukryj_kwote" in request.form nowa_zbiorka = Zbiorka( nazwa=nazwa, opis=opis, numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, cel=cel, ukryj_kwote=ukryj_kwote, ) db.session.add(nowa_zbiorka) db.session.commit() flash("Zbiórka została dodana", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/dodaj_zbiorke.html", global_settings=global_settings) @app.route("/admin/zbiorka/edytuj/", methods=["GET", "POST"]) @login_required def edytuj_zbiorka(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia if request.method == "POST": zb.nazwa = request.form["nazwa"] zb.opis = request.form["opis"] zb.numer_konta = request.form["numer_konta"] zb.numer_telefonu_blik = request.form["numer_telefonu_blik"] try: zb.cel = float(request.form["cel"]) except ValueError: flash("Podano nieprawidłową wartość dla celu zbiórki", "danger") return render_template( "admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings ) zb.ukryj_kwote = "ukryj_kwote" in request.form db.session.commit() flash("Zbiórka została zaktualizowana", "success") return redirect(url_for("admin_dashboard")) return render_template( "admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings ) # TRASA DODAWANIA WPŁATY Z OPISEM # TRASA DODAWANIA WPŁATY W PANELU ADMINA @app.route("/admin/zbiorka//wplata/dodaj", methods=["GET", "POST"]) @login_required def dodaj_wplate(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) if request.method == "POST": kwota = float(request.form["kwota"]) opis = request.form.get("opis", "") nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis) zb.stan += kwota # Aktualizacja stanu zbiórki db.session.add(nowa_wplata) db.session.commit() flash("Wpłata została dodana", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/dodaj_wplate.html", zbiorka=zb) @app.route("/admin/zbiorka/usun/", methods=["POST"]) @login_required def usun_zbiorka(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) db.session.delete(zb) db.session.commit() flash("Zbiórka została usunięta", "success") return redirect(url_for("admin_dashboard")) @app.route("/admin/zbiorka/edytuj_stan/", methods=["GET", "POST"]) @login_required def edytuj_stan(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) if request.method == "POST": try: nowy_stan = float(request.form["stan"]) except ValueError: flash("Nieprawidłowa wartość kwoty", "danger") return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id)) zb.stan = nowy_stan db.session.commit() flash("Stan zbiórki został zaktualizowany", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/edytuj_stan.html", zbiorka=zb) @app.route("/admin/zbiorka/zmien_widzialnosc/", methods=["POST"]) @login_required def zmien_widzialnosc(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) zb.ukryta = not zb.ukryta db.session.commit() flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success") return redirect(url_for("admin_dashboard")) def create_admin_account(): admin = User.query.filter_by(is_admin=True).first() if not admin: main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() @app.after_request def apply_headers(response): # Nagłówki niestandardowe custom_headers = app.config.get("ADD_HEADERS", {}) if isinstance(custom_headers, dict): for header, value in custom_headers.items(): response.headers[header] = str(value) # Wykluczenia if response.status_code in (301, 302, 303, 307, 308): response.headers.pop("Vary", None) return response if 400 <= response.status_code < 500: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers.pop("Vary", None) elif 500 <= response.status_code < 600: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers["Retry-After"] = "120" response.headers.pop("Vary", None) elif request.path.startswith("/admin"): response.headers.pop("Vary", None) response.headers["Cache-Control"] = ( "no-store, no-cache, must-revalidate, max-age=0" ) else: response.headers["Vary"] = "Cookie, Accept-Encoding" default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, max-age=0" response.headers["Cache-Control"] = default_cache # Blokowanie botów if app.config.get("BLOCK_BOTS", False): cc = ( app.config.get("CACHE_CONTROL_HEADER") or "no-store, no-cache, must-revalidate, max-age=0" ) response.headers["Cache-Control"] = cc response.headers["X-Robots-Tag"] = ( app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive" ) return response @app.route("/admin/ustawienia", methods=["GET", "POST"]) @login_required def admin_ustawienia(): if not current_user.is_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) client_ip = get_real_ip() settings = GlobalSettings.query.first() if request.method == "POST": numer_konta = request.form.get("numer_konta") numer_telefonu_blik = request.form.get("numer_telefonu_blik") allowed_login_hosts = request.form.get("allowed_login_hosts") logo_url = request.form.get("logo_url") site_title = request.form.get("site_title") navbar_brand_mode = request.form.get("navbar_brand_mode", "text") footer_brand_mode = request.form.get("footer_brand_mode", "text") footer_text = request.form.get("footer_text") or None show_logo_in_navbar = (navbar_brand_mode == "logo") if settings is None: settings = GlobalSettings( numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, allowed_login_hosts=allowed_login_hosts, logo_url=logo_url, site_title=site_title, show_logo_in_navbar=show_logo_in_navbar, navbar_brand_mode=navbar_brand_mode, footer_brand_mode=footer_brand_mode, footer_text=footer_text, ) db.session.add(settings) else: settings.numer_konta = numer_konta settings.numer_telefonu_blik = numer_telefonu_blik settings.allowed_login_hosts = allowed_login_hosts settings.logo_url = logo_url settings.site_title = site_title settings.show_logo_in_navbar = show_logo_in_navbar settings.navbar_brand_mode = navbar_brand_mode settings.footer_brand_mode = footer_brand_mode settings.footer_text = footer_text db.session.commit() flash("Ustawienia globalne zostały zaktualizowane", "success") return redirect(url_for("admin_dashboard")) return render_template( "admin/ustawienia.html", settings=settings, client_ip=client_ip ) @app.route("/admin/zbiorka/oznacz/", methods=["POST"]) @login_required def oznacz_zbiorka(zbiorka_id): if not current_user.is_admin: flash("Brak uprawnień do wykonania tej operacji", "danger") return redirect(url_for("index")) zb = Zbiorka.query.get_or_404(zbiorka_id) zb.zrealizowana = True db.session.commit() flash("Zbiórka została oznaczona jako zrealizowana", "success") return redirect(url_for("admin_dashboard")) @app.route("/robots.txt") def robots(): if app.config.get("BLOCK_BOTS", False): robots_txt = "User-agent: *\nDisallow: /" else: robots_txt = "User-agent: *\nAllow: /" return robots_txt, 200, {"Content-Type": "text/plain"} @app.route("/favicon.ico") def favicon(): return "", 204 if __name__ == "__main__": with app.app_context(): db.create_all() # Tworzenie konta głównego admina, jeśli nie istnieje if not User.query.filter_by(is_admin=True).first(): main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() app.run(debug=True)