import markdown as md import hashlib, os import re import socket from flask import Flask, render_template, request, redirect, url_for, flash from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, login_user, login_required, logout_user, current_user, UserMixin, ) from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime, timezone from markupsafe import Markup from sqlalchemy import event, Numeric, select from sqlalchemy.engine import Engine from decimal import Decimal, InvalidOperation from flask import request, flash, abort try: from zoneinfo import ZoneInfo # Python 3.9+ except ImportError: from backports.zoneinfo import ZoneInfo app = Flask(__name__) # Ładujemy konfigurację z pliku config.py app.config.from_object("config.Config") db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = "zaloguj" LOCAL_TZ = ZoneInfo("Europe/Warsaw") def read_commit_and_date(filename="version.txt", root_path=None): base = root_path or os.path.dirname(os.path.abspath(__file__)) path = os.path.join(base, filename) if not os.path.exists(path): return None, None try: commit = open(path, "r", encoding="utf-8").read().strip() if commit: commit = commit[:12] except Exception: commit = None try: ts = os.path.getmtime(path) date_str = datetime.fromtimestamp(ts).strftime("%Y.%m.%d") except Exception: date_str = None return date_str, commit deploy_date, commit = read_commit_and_date("version.txt", root_path=os.path.dirname(__file__)) if not deploy_date: deploy_date = datetime.now().strftime("%Y.%m.%d") if not commit: commit = "dev" APP_VERSION = f"{deploy_date}+{commit}" app.config["APP_VERSION"] = APP_VERSION # MODELE class Uzytkownik(UserMixin, db.Model): __tablename__ = "uzytkownik" id = db.Column(db.Integer, primary_key=True) uzytkownik = db.Column(db.String(80), unique=True, nullable=False) haslo_hash = db.Column(db.String(128), nullable=False) czy_admin = db.Column(db.Boolean, default=False) def set_password(self, password): self.haslo_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.haslo_hash, password) class Zbiorka(db.Model): id = db.Column(db.Integer, primary_key=True) nazwa = db.Column(db.String(100), nullable=False) opis = db.Column(db.Text, nullable=False) numer_konta = db.Column(db.String(50), nullable=True) numer_telefonu_blik = db.Column(db.String(50), nullable=True) cel = db.Column(Numeric(12, 2), nullable=False, default=0) stan = db.Column(Numeric(12, 2), default=0) ukryta = db.Column(db.Boolean, default=False) ukryj_kwote = db.Column(db.Boolean, default=False) zrealizowana = db.Column(db.Boolean, default=False) pokaz_postep_finanse = db.Column(db.Boolean, default=True, nullable=False) pokaz_postep_pozycje = db.Column(db.Boolean, default=True, nullable=False) pokaz_postep_kwotowo = db.Column(db.Boolean, default=True, nullable=False) uzyj_konta = db.Column(db.Boolean, default=True, nullable=False) uzyj_blik = db.Column(db.Boolean, default=True, nullable=False) wplaty = db.relationship( "Wplata", back_populates="zbiorka", lazy=True, order_by="Wplata.data.desc()", cascade="all, delete-orphan", passive_deletes=True, ) wydatki = db.relationship( "Wydatek", backref="zbiorka", lazy=True, order_by="Wydatek.data.desc()", cascade="all, delete-orphan", passive_deletes=True, ) przedmioty = db.relationship( "Przedmiot", backref="zbiorka", lazy=True, order_by="Przedmiot.id.asc()", cascade="all, delete-orphan", passive_deletes=True, ) class Przedmiot(db.Model): id = db.Column(db.Integer, primary_key=True) zbiorka_id = db.Column( db.Integer, db.ForeignKey("zbiorka.id", ondelete="CASCADE"), nullable=False, ) nazwa = db.Column(db.String(120), nullable=False) link = db.Column(db.String(255), nullable=True) cena = db.Column(Numeric(12, 2), nullable=True) kupione = db.Column(db.Boolean, default=False) class Wplata(db.Model): id = db.Column(db.Integer, primary_key=True) zbiorka_id = db.Column( db.Integer, db.ForeignKey("zbiorka.id", ondelete="CASCADE"), nullable=False, ) kwota = db.Column(Numeric(12, 2), nullable=False) data = db.Column(db.DateTime, default=datetime.utcnow) opis = db.Column(db.Text, nullable=True) zbiorka = db.relationship("Zbiorka", back_populates="wplaty") ukryta = db.Column(db.Boolean, nullable=False, default=False) class Wydatek(db.Model): id = db.Column(db.Integer, primary_key=True) zbiorka_id = db.Column( db.Integer, db.ForeignKey("zbiorka.id", ondelete="CASCADE"), nullable=False, ) kwota = db.Column(Numeric(12, 2), nullable=False) data = db.Column(db.DateTime, default=datetime.utcnow) opis = db.Column(db.Text, nullable=True) ukryta = db.Column(db.Boolean, nullable=False, default=False) class UstawieniaGlobalne(db.Model): __tablename__ = "ustawienia_globalne" id = db.Column(db.Integer, primary_key=True) numer_konta = db.Column(db.String(50), nullable=False) numer_telefonu_blik = db.Column(db.String(50), nullable=False) dozwolone_hosty_logowania = db.Column(db.Text, nullable=True) logo_url = db.Column(db.String(255), nullable=True) tytul_strony = db.Column(db.String(120), nullable=True) pokaz_logo_w_navbar = db.Column(db.Boolean, default=False) typ_navbar = db.Column(db.String(10), default="text") typ_stopka = db.Column(db.String(10), default="text") stopka_text = db.Column(db.String(200), nullable=True) @login_manager.user_loader def load_user(user_id): return db.session.get(Uzytkownik, int(user_id)) @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): if dbapi_connection.__class__.__module__.startswith('sqlite3'): try: cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() except Exception: pass def get_real_ip(): headers = request.headers cf_ip = headers.get("CF-Connecting-IP") if cf_ip: return cf_ip.split(",")[0].strip() xff = headers.get("X-Forwarded-For") if xff: return xff.split(",")[0].strip() x_real_ip = headers.get("X-Real-IP") if x_real_ip: return x_real_ip.strip() return request.remote_addr def is_allowed_ip(remote_ip, allowed_hosts_str): if remote_ip in ("127.0.0.1", "::1"): return True if os.path.exists("emergency_access.txt"): return True allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip()) allowed_ips = set() for host in allowed_hosts: host = host.strip() if not host: continue try: resolved_ip = socket.gethostbyname(host) allowed_ips.add(resolved_ip) except Exception: continue try: hostname = socket.gethostbyaddr(remote_ip)[0] app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}") except Exception as e: app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}") return remote_ip in allowed_ips def to_local(dt): if dt is None: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(LOCAL_TZ) @app.template_filter("dt") def dt_filter(dt, fmt="%Y-%m-%d %H:%M"): try: ldt = to_local(dt) return ldt.strftime(fmt) if ldt else "" except Exception: return "" @app.template_filter("markdown") def markdown_filter(text): return Markup(md.markdown(text)) @app.context_processor def inject_globals(): settings = UstawieniaGlobalne.query.first() allowed_hosts_str = ( settings.dozwolone_hosty_logowania if settings and settings.dozwolone_hosty_logowania else "" ) client_ip = get_real_ip() return { "is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str), "global_settings": settings, } @app.context_processor def inject_version(): return {'APP_VERSION': app.config['APP_VERSION']} # TRASY PUBLICZNE @app.route("/") def index(): zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all() return render_template("index.html", zbiorki=zbiorki) @app.route("/zrealizowane") def zbiorki_zrealizowane(): zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template("index.html", zbiorki=zbiorki) @app.errorhandler(404) def page_not_found(e): return redirect(url_for("index")) @app.route("/zbiorka/") def zbiorka(zbiorka_id): zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) if zb.ukryta and (not current_user.is_authenticated or not current_user.czy_admin): abort(404) is_admin = current_user.is_authenticated and current_user.czy_admin show_hidden = is_admin and (request.args.get("show_hidden") in ("1", "true", "yes")) # wpłaty / wydatki z filtrem ukrycia wplaty = [ {"typ": "wpłata", "kwota": w.kwota, "opis": w.opis, "data": w.data, "ukryta": getattr(w, "ukryta", False)} for w in zb.wplaty if show_hidden or not getattr(w, "ukryta", False) ] wydatki = [ {"typ": "wydatek", "kwota": x.kwota, "opis": x.opis, "data": x.data, "ukryta": getattr(x, "ukryta", False)} for x in zb.wydatki if show_hidden or not getattr(x, "ukryta", False) ] aktywnosci = wplaty + wydatki aktywnosci.sort(key=lambda a: a["data"], reverse=True) return render_template("zbiorka.html", zbiorka=zb, aktywnosci=aktywnosci, show_hidden=show_hidden) # TRASY LOGOWANIA I REJESTRACJI @app.route("/zaloguj", methods=["GET", "POST"]) def zaloguj(): settings = UstawieniaGlobalne.query.first() allowed_hosts_str = settings.dozwolone_hosty_logowania or "" if settings else "" client_ip = get_real_ip() if not is_allowed_ip(client_ip, allowed_hosts_str): flash("Dostęp do tego systemu jest zablokowany dla Twojego adresu IP", "danger") return redirect(url_for("index")) if request.method == "POST": login = request.form["uzytkownik"] password = request.form["haslo"] user = Uzytkownik.query.filter_by(uzytkownik=login).first() if user and user.check_password(password): login_user(user) flash("Zalogowano pomyślnie", "success") next_page = request.args.get("next") return redirect(next_page) if next_page else redirect(url_for("admin_dashboard")) else: flash("Nieprawidłowe dane logowania", "danger") return render_template("login.html") @app.route("/wyloguj") @login_required def wyloguj(): logout_user() flash("Wylogowano", "success") return redirect(url_for("zaloguj")) @app.route("/zarejestruj", methods=["GET", "POST"]) def zarejestruj(): if not app.config.get("ALLOW_REGISTRATION", False): flash("Rejestracja została wyłączona przez administratora", "danger") return redirect(url_for("zaloguj")) if request.method == "POST": login = request.form["uzytkownik"] password = request.form["haslo"] if Uzytkownik.query.filter_by(uzytkownik=login).first(): flash("Użytkownik już istnieje", "danger") return redirect(url_for("register")) new_user = Uzytkownik(uzytkownik=login) new_user.set_password(password) db.session.add(new_user) db.session.commit() flash("Konto utworzone, możesz się zalogować", "success") return redirect(url_for("zaloguj")) return render_template("register.html") # PANEL ADMINISTRACYJNY @app.route("/admin") @login_required def admin_dashboard(): if not current_user.czy_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all() completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all() return render_template( "admin/dashboard.html", active_zbiorki=active_zbiorki, completed_zbiorki=completed_zbiorki, ) @app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"]) @app.route("/admin/zbiorka/edytuj/", methods=["GET", "POST"]) @login_required def formularz_zbiorek(zbiorka_id=None): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) is_edit = zbiorka_id is not None zb = db.session.get(Zbiorka, zbiorka_id) if is_edit else None if is_edit and zb is None: abort(404) global_settings = UstawieniaGlobalne.query.first() def _temp_obj(): t = zb or Zbiorka() t.nazwa = (request.form.get("nazwa", "") or "").strip() t.opis = (request.form.get("opis", "") or "").strip() t.numer_konta = (request.form.get("numer_konta", "") or "").strip() t.numer_telefonu_blik = (request.form.get("numer_telefonu_blik", "") or "").strip() t.ukryj_kwote = "ukryj_kwote" in request.form t.pokaz_postep_finanse = "pokaz_postep_finanse" in request.form t.pokaz_postep_pozycje = "pokaz_postep_pozycje" in request.form t.pokaz_postep_kwotowo = "pokaz_postep_kwotowo" in request.form t.uzyj_konta = "uzyj_konta" in request.form t.uzyj_blik = "uzyj_blik" in request.form return t if request.method == "POST": # Pola nazwa = (request.form.get("nazwa", "") or "").strip() opis = (request.form.get("opis", "") or "").strip() numer_konta = (request.form.get("numer_konta", "") or "").strip() numer_telefonu_blik = (request.form.get("numer_telefonu_blik", "") or "").strip() # Przełączniki płatności uzyj_konta = "uzyj_konta" in request.form uzyj_blik = "uzyj_blik" in request.form # Widoczność/metryki ukryj_kwote = "ukryj_kwote" in request.form pokaz_postep_finanse = "pokaz_postep_finanse" in request.form pokaz_postep_pozycje = "pokaz_postep_pozycje" in request.form pokaz_postep_kwotowo = "pokaz_postep_kwotowo" in request.form # Walidacje if not nazwa: flash("Nazwa jest wymagana", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) if not opis: flash("Opis jest wymagany", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) # Co najmniej jeden kanał if not (uzyj_konta or uzyj_blik): flash("Włącz co najmniej jeden kanał wpłat (konto lub BLIK).", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) # Warunkowe wartości if uzyj_konta and not numer_konta: flash("Numer konta jest wymagany (kanał przelewu włączony).", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) if uzyj_blik and not numer_telefonu_blik: flash("Numer telefonu BLIK jest wymagany (kanał BLIK włączony).", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) # Cel > 0 cel_raw = (request.form.get("cel", "") or "") cel_norm = cel_raw.replace(" ", "").replace("\u00A0", "").replace(",", ".").strip() try: if not cel_norm: raise InvalidOperation cel = Decimal(cel_norm) if cel <= Decimal("0"): raise InvalidOperation except (InvalidOperation, ValueError): flash("Podano nieprawidłową wartość dla celu zbiórki", "danger") return render_template("admin/formularz_zbiorek.html", zbiorka=_temp_obj(), global_settings=global_settings) # Produkty names = request.form.getlist("item_nazwa[]") links = request.form.getlist("item_link[]") prices = request.form.getlist("item_cena[]") def _read_price(val: str): if not val or not val.strip(): return None try: d = Decimal(val.replace(",", ".")) return d if d >= 0 else None except Exception: return None # Zapis if is_edit: zb.nazwa = nazwa zb.opis = opis # NOT NULL-safe: puste stringi gdy wyłączone zb.uzyj_konta = uzyj_konta zb.uzyj_blik = uzyj_blik zb.numer_konta = numer_konta if uzyj_konta else "" zb.numer_telefonu_blik = numer_telefonu_blik if uzyj_blik else "" zb.cel = cel zb.ukryj_kwote = ukryj_kwote zb.pokaz_postep_finanse = pokaz_postep_finanse zb.pokaz_postep_pozycje = pokaz_postep_pozycje zb.pokaz_postep_kwotowo = pokaz_postep_kwotowo db.session.commit() # Nadpisz pozycje zb.przedmioty.clear() for i, raw_name in enumerate(names): name = (raw_name or "").strip() if not name: continue link = (links[i] if i < len(links) else "").strip() or None cena_val = _read_price(prices[i] if i < len(prices) else "") kupione_val = request.form.get(f"item_kupione_val_{i}") == "1" db.session.add(Przedmiot( zbiorka_id=zb.id, nazwa=name, link=link, cena=cena_val, kupione=kupione_val )) db.session.commit() flash("Zbiórka została zaktualizowana", "success") else: nowa = Zbiorka( nazwa=nazwa, opis=opis, uzyj_konta=uzyj_konta, uzyj_blik=uzyj_blik, numer_konta=(numer_konta if uzyj_konta else ""), numer_telefonu_blik=(numer_telefonu_blik if uzyj_blik else ""), cel=cel, ukryj_kwote=ukryj_kwote, pokaz_postep_finanse=pokaz_postep_finanse, pokaz_postep_pozycje=pokaz_postep_pozycje, pokaz_postep_kwotowo=pokaz_postep_kwotowo, ) db.session.add(nowa) db.session.commit() # potrzebne ID for i, raw_name in enumerate(names): name = (raw_name or "").strip() if not name: continue link = (links[i] if i < len(links) else "").strip() or None cena_val = _read_price(prices[i] if i < len(prices) else "") kupione_val = request.form.get(f"item_kupione_val_{i}") == "1" db.session.add(Przedmiot( zbiorka_id=nowa.id, nazwa=name, link=link, cena=cena_val, kupione=kupione_val )) db.session.commit() flash("Zbiórka została dodana", "success") return redirect(url_for("admin_dashboard")) # GET return render_template( "admin/formularz_zbiorek.html", zbiorka=zb, global_settings=global_settings ) @app.route("/admin/zbiorka//wplata/dodaj", methods=["GET", "POST"]) @login_required def dodaj_wplate(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if not zb: abort(404) if request.method == "POST": try: kwota = Decimal(request.form.get("kwota", "").replace(",", ".")) if kwota <= 0: raise InvalidOperation except (InvalidOperation, ValueError): flash("Nieprawidłowa kwota (musi być > 0)", "danger") return redirect(url_for("dodaj_wplate", zbiorka_id=zbiorka_id)) opis = request.form.get("opis", "") nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis) zb.stan = (zb.stan or Decimal("0")) + kwota db.session.add(nowa_wplata) db.session.commit() flash("Wpłata została dodana", "success") next_url = request.args.get("next") return redirect(next_url or url_for("transakcje_zbiorki", zbiorka_id=zb.id)) return render_template("admin/dodaj_wplate.html", zbiorka=zb) @app.route("/admin/zbiorka/usun/", methods=["POST"]) @login_required def usun_zbiorka(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) db.session.delete(zb) db.session.commit() flash("Zbiórka została usunięta", "success") return redirect(url_for("admin_dashboard")) @app.route("/admin/zbiorka/edytuj_stan/", methods=["GET", "POST"]) @login_required def edytuj_stan(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) if request.method == "POST": try: nowy_stan = Decimal(request.form.get("stan", "").replace(",", ".")) except (InvalidOperation, ValueError): flash("Nieprawidłowa wartość kwoty", "danger") return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id)) zb.stan = nowy_stan db.session.commit() flash("Stan zbiórki został zaktualizowany", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/edytuj_stan.html", zbiorka=zb) @app.route("/admin/zbiorka/zmien_widzialnosc/", methods=["POST"]) @login_required def zmien_widzialnosc(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) zb.ukryta = not zb.ukryta db.session.commit() flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success") return redirect(url_for("admin_dashboard")) def create_admin_account(): admin = Uzytkownik.query.filter_by(czy_admin=True).first() if not admin: main_admin = Uzytkownik( uzytkownik=app.config["MAIN_ADMIN_USERNAME"], czy_admin=True ) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() @app.after_request def apply_headers(response): if request.path.startswith("/static/"): response.headers.pop("Content-Disposition", None) response.headers["Vary"] = "Accept-Encoding" response.headers["Cache-Control"] = app.config.get( "CACHE_CONTROL_HEADER_STATIC" ) if app.config.get("USE_ETAGS", True) and "ETag" not in response.headers: response.add_etag() response.make_conditional(request) return response path_norm = request.path.lstrip("/") czy_admin = path_norm.startswith("admin/") or path_norm == "admin" if czy_admin: if (response.mimetype or "").startswith("text/html"): response.headers["Cache-Control"] = "no-store, no-cache" response.headers.pop("ETag", None) return response if response.status_code in (301, 302, 303, 307, 308): response.headers.pop("Vary", None) return response if 400 <= response.status_code < 500: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers.pop("Vary", None) elif 500 <= response.status_code < 600: response.headers["Cache-Control"] = "no-store" response.headers["Content-Type"] = "text/html; charset=utf-8" response.headers["Retry-After"] = "120" response.headers.pop("Vary", None) else: response.headers["Vary"] = "Cookie, Accept-Encoding" default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, no-store" response.headers["Cache-Control"] = default_cache if ( app.config.get("BLOCK_BOTS", False) and not czy_admin and not request.path.startswith("/static/") ): cc_override = app.config.get("CACHE_CONTROL_HEADER") if cc_override: response.headers["Cache-Control"] = cc_override response.headers["X-Robots-Tag"] = ( app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive" ) return response @app.route("/admin/ustawienia", methods=["GET", "POST"]) @login_required def admin_ustawienia(): if not current_user.czy_admin: flash("Brak uprawnień do panelu administracyjnego", "danger") return redirect(url_for("index")) client_ip = get_real_ip() settings = UstawieniaGlobalne.query.first() if request.method == "POST": numer_konta = request.form.get("numer_konta") numer_telefonu_blik = request.form.get("numer_telefonu_blik") dozwolone_hosty_logowania = request.form.get("dozwolone_hosty_logowania") logo_url = request.form.get("logo_url") tytul_strony = request.form.get("tytul_strony") typ_navbar = request.form.get("typ_navbar", "text") typ_stopka = request.form.get("typ_stopka", "text") stopka_text = request.form.get("stopka_text") or None pokaz_logo_w_navbar = (typ_navbar == "logo") if settings is None: settings = UstawieniaGlobalne( numer_konta=numer_konta, numer_telefonu_blik=numer_telefonu_blik, dozwolone_hosty_logowania=dozwolone_hosty_logowania, logo_url=logo_url, tytul_strony=tytul_strony, pokaz_logo_w_navbar=pokaz_logo_w_navbar, typ_navbar=typ_navbar, typ_stopka=typ_stopka, stopka_text=stopka_text, ) db.session.add(settings) else: settings.numer_konta = numer_konta settings.numer_telefonu_blik = numer_telefonu_blik settings.dozwolone_hosty_logowania = dozwolone_hosty_logowania settings.logo_url = logo_url settings.tytul_strony = tytul_strony settings.pokaz_logo_w_navbar = pokaz_logo_w_navbar settings.typ_navbar = typ_navbar settings.typ_stopka = typ_stopka settings.stopka_text = stopka_text db.session.commit() flash("Ustawienia globalne zostały zaktualizowane", "success") return redirect(url_for("admin_dashboard")) return render_template("admin/ustawienia.html", settings=settings, client_ip=client_ip) @app.route("/admin/zbiorka//wydatek/dodaj", methods=["GET", "POST"]) @login_required def dodaj_wydatek(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) if request.method == "POST": try: kwota = Decimal(request.form.get("kwota", "").replace(",", ".")) if kwota <= 0: raise InvalidOperation except (InvalidOperation, ValueError): flash("Nieprawidłowa kwota (musi być > 0)", "danger") return redirect(url_for("dodaj_wydatek", zbiorka_id=zbiorka_id)) opis = request.form.get("opis", "") nowy_wydatek = Wydatek(zbiorka_id=zb.id, kwota=kwota, opis=opis) zb.stan = (zb.stan or Decimal("0")) - kwota db.session.add(nowy_wydatek) db.session.commit() flash("Wydatek został dodany", "success") next_url = request.args.get("next") return redirect(next_url or url_for("transakcje_zbiorki", zbiorka_id=zb.id)) return render_template("admin/dodaj_wydatek.html", zbiorka=zb) @app.route( "/admin/zbiorka/oznacz/niezrealizowana/", methods=["POST"], endpoint="oznacz_niezrealizowana", ) @app.route( "/admin/zbiorka/oznacz/zrealizowana/", methods=["POST"], endpoint="oznacz_zrealizowana", ) @login_required def oznacz_zbiorka(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień do wykonania tej operacji", "danger") return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) if "niezrealizowana" in request.path: zb.zrealizowana = False msg = "Zbiórka została oznaczona jako niezrealizowana" else: zb.zrealizowana = True msg = "Zbiórka została oznaczona jako zrealizowana" db.session.commit() flash(msg, "success") return redirect(url_for("admin_dashboard")) @app.route("/robots.txt") def robots(): if app.config.get("BLOCK_BOTS", False): robots_txt = "User-agent: *\nDisallow: /" else: robots_txt = "User-agent: *\nAllow: /" return robots_txt, 200, {"Content-Type": "text/plain"} @app.route("/admin/zbiorka//transakcje") @login_required def transakcje_zbiorki(zbiorka_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger"); return redirect(url_for("index")) zb = db.session.get(Zbiorka, zbiorka_id) if zb is None: abort(404) aktywnosci = ( [ { "typ": "wpłata", "id": w.id, "kwota": w.kwota, "opis": w.opis, "data": w.data, "ukryta": bool(w.ukryta), } for w in zb.wplaty ] + [ { "typ": "wydatek", "id": x.id, "kwota": x.kwota, "opis": x.opis, "data": x.data, "ukryta": bool(x.ukryta), } for x in zb.wydatki ] ) aktywnosci.sort(key=lambda a: a["data"], reverse=True) return render_template("admin/transakcje.html", zbiorka=zb, aktywnosci=aktywnosci) @app.route("/admin/wplata//zapisz", methods=["POST"]) @login_required def zapisz_wplate(wplata_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger"); return redirect(url_for("index")) w = db.session.get(Wplata, wplata_id) if w is None: abort(404) zb = w.zbiorka try: nowa_kwota = Decimal(request.form.get("kwota", "").replace(",", ".")) if nowa_kwota <= 0: raise InvalidOperation except (InvalidOperation, ValueError): flash("Nieprawidłowa kwota (musi być > 0)", "danger") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) delta = nowa_kwota - (w.kwota or Decimal("0")) w.kwota = nowa_kwota w.opis = request.form.get("opis", "") zb.stan = (zb.stan or Decimal("0")) + delta db.session.commit() flash("Wpłata zaktualizowana", "success") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) @app.post("/wplata//ukryj") @login_required def ukryj_wplate(wplata_id): if not current_user.czy_admin: abort(403) w = db.session.get(Wplata, wplata_id) if not w: abort(404) w.ukryta = True db.session.commit() flash("Wpłata ukryta.", "success") return redirect(request.referrer or url_for("admin_dashboard")) @app.post("/wplata//odkryj") @login_required def odkryj_wplate(wplata_id): if not current_user.czy_admin: abort(403) w = db.session.get(Wplata, wplata_id) if not w: abort(404) w.ukryta = False db.session.commit() flash("Wpłata odkryta.", "success") return redirect(request.referrer or url_for("admin_dashboard")) @app.post("/wydatek//ukryj") @login_required def ukryj_wydatek(wydatek_id): if not current_user.czy_admin: abort(403) w = db.session.get(Wydatek, wydatek_id) if not w: abort(404) w.ukryta = True db.session.commit() flash("Wydatek ukryty.", "success") return redirect(request.referrer or url_for("admin_dashboard")) @app.post("/wydatek//odkryj") @login_required def odkryj_wydatek(wydatek_id): if not current_user.czy_admin: abort(403) w = db.session.get(Wydatek, wydatek_id) if not w: abort(404) w.ukryta = False db.session.commit() flash("Wydatek odkryty.", "success") return redirect(request.referrer or url_for("admin_dashboard")) @app.route("/admin/wplata//usun", methods=["POST"]) @login_required def usun_wplate(wplata_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger"); return redirect(url_for("index")) w = db.session.get(Wplata, wplata_id) if w is None: abort(404) zb = w.zbiorka zb.stan -= w.kwota db.session.delete(w) db.session.commit() flash("Wpłata usunięta", "success") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) @app.route("/admin/wydatek//zapisz", methods=["POST"]) @login_required def zapisz_wydatek(wydatek_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger"); return redirect(url_for("index")) x = db.session.get(Wydatek, wydatek_id) if x is None: abort(404) zb = x.zbiorka try: nowa_kwota = Decimal(request.form.get("kwota", "").replace(",", ".")) if nowa_kwota <= 0: raise InvalidOperation except (InvalidOperation, ValueError): flash("Nieprawidłowa kwota (musi być > 0)", "danger") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) delta = nowa_kwota - (x.kwota or Decimal("0")) x.kwota = nowa_kwota x.opis = request.form.get("opis", "") # wydatki zmniejszają stan; jeżeli delta>0, stan spada bardziej zb.stan = (zb.stan or Decimal("0")) - delta db.session.commit() flash("Wydatek zaktualizowany", "success") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) @app.route("/admin/wydatek//usun", methods=["POST"]) @login_required def usun_wydatek(wydatek_id): if not current_user.czy_admin: flash("Brak uprawnień", "danger"); return redirect(url_for("index")) x = db.session.get(Wydatek, wydatek_id) if x is None: abort(404) zb = x.zbiorka zb.stan += x.kwota db.session.delete(x) db.session.commit() flash("Wydatek usunięty", "success") return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id)) @app.route("/favicon.ico") def favicon(): return "", 204 @app.route("/healthcheck") def healthcheck(): header_token = request.headers.get("X-Internal-Check") correct_token = app.config.get("HEALTHCHECK_TOKEN") if header_token != correct_token: abort(404) return "OK", 200 if __name__ == "__main__": with app.app_context(): db.create_all() stmt = select(Uzytkownik).filter_by(czy_admin=True) admin = db.session.execute(stmt).scalars().first() if not admin: main_admin = Uzytkownik( uzytkownik=app.config["MAIN_ADMIN_USERNAME"], czy_admin=True ) main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"]) db.session.add(main_admin) db.session.commit() app.run(debug=True)