955 lines
32 KiB
Python
955 lines
32 KiB
Python
import markdown as md
|
|
import hashlib, os
|
|
import re
|
|
import socket
|
|
from flask import Flask, render_template, request, redirect, url_for, flash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import (
|
|
LoginManager,
|
|
login_user,
|
|
login_required,
|
|
logout_user,
|
|
current_user,
|
|
UserMixin,
|
|
)
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from datetime import datetime, timezone
|
|
from markupsafe import Markup
|
|
from sqlalchemy import event, Numeric, select
|
|
from sqlalchemy.engine import Engine
|
|
from decimal import Decimal, InvalidOperation
|
|
from flask import request, flash, abort
|
|
try:
|
|
from zoneinfo import ZoneInfo # Python 3.9+
|
|
except ImportError:
|
|
from backports.zoneinfo import ZoneInfo
|
|
|
|
|
|
app = Flask(__name__)
|
|
# Ładujemy konfigurację z pliku config.py
|
|
app.config.from_object("config.Config")
|
|
|
|
db = SQLAlchemy(app)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "zaloguj"
|
|
LOCAL_TZ = ZoneInfo("Europe/Warsaw")
|
|
|
|
def read_commit_and_date(filename="version.txt", root_path=None):
|
|
base = root_path or os.path.dirname(os.path.abspath(__file__))
|
|
path = os.path.join(base, filename)
|
|
if not os.path.exists(path):
|
|
return None, None
|
|
|
|
try:
|
|
commit = open(path, "r", encoding="utf-8").read().strip()
|
|
if commit:
|
|
commit = commit[:12]
|
|
except Exception:
|
|
commit = None
|
|
|
|
try:
|
|
ts = os.path.getmtime(path)
|
|
date_str = datetime.fromtimestamp(ts).strftime("%Y.%m.%d")
|
|
except Exception:
|
|
date_str = None
|
|
|
|
return date_str, commit
|
|
|
|
deploy_date, commit = read_commit_and_date("version.txt", root_path=os.path.dirname(__file__))
|
|
if not deploy_date:
|
|
deploy_date = datetime.now().strftime("%Y.%m.%d")
|
|
if not commit:
|
|
commit = "dev"
|
|
|
|
APP_VERSION = f"{deploy_date}+{commit}"
|
|
app.config["APP_VERSION"] = APP_VERSION
|
|
|
|
# MODELE
|
|
class Uzytkownik(UserMixin, db.Model):
|
|
__tablename__ = "uzytkownik"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
uzytkownik = db.Column(db.String(80), unique=True, nullable=False)
|
|
haslo_hash = db.Column(db.String(128), nullable=False)
|
|
czy_admin = db.Column(db.Boolean, default=False)
|
|
|
|
def set_password(self, password):
|
|
self.haslo_hash = generate_password_hash(password)
|
|
def check_password(self, password):
|
|
return check_password_hash(self.haslo_hash, password)
|
|
|
|
class Zbiorka(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
nazwa = db.Column(db.String(100), nullable=False)
|
|
opis = db.Column(db.Text, nullable=False)
|
|
numer_konta = db.Column(db.String(50), nullable=False)
|
|
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
|
|
cel = db.Column(Numeric(12, 2), nullable=False, default=0)
|
|
stan = db.Column(Numeric(12, 2), default=0)
|
|
ukryta = db.Column(db.Boolean, default=False)
|
|
ukryj_kwote = db.Column(db.Boolean, default=False)
|
|
zrealizowana = db.Column(db.Boolean, default=False)
|
|
pokaz_postep_finanse = db.Column(db.Boolean, default=True, nullable=False)
|
|
pokaz_postep_pozycje = db.Column(db.Boolean, default=True, nullable=False)
|
|
pokaz_postep_kwotowo = db.Column(db.Boolean, default=True, nullable=False)
|
|
|
|
wplaty = db.relationship(
|
|
"Wplata",
|
|
back_populates="zbiorka",
|
|
lazy=True,
|
|
order_by="Wplata.data.desc()",
|
|
cascade="all, delete-orphan",
|
|
passive_deletes=True,
|
|
)
|
|
|
|
wydatki = db.relationship(
|
|
"Wydatek",
|
|
backref="zbiorka",
|
|
lazy=True,
|
|
order_by="Wydatek.data.desc()",
|
|
cascade="all, delete-orphan",
|
|
passive_deletes=True,
|
|
)
|
|
|
|
przedmioty = db.relationship(
|
|
"Przedmiot",
|
|
backref="zbiorka",
|
|
lazy=True,
|
|
order_by="Przedmiot.id.asc()",
|
|
cascade="all, delete-orphan",
|
|
passive_deletes=True,
|
|
)
|
|
|
|
class Przedmiot(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
zbiorka_id = db.Column(
|
|
db.Integer,
|
|
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
nazwa = db.Column(db.String(120), nullable=False)
|
|
link = db.Column(db.String(255), nullable=True)
|
|
cena = db.Column(Numeric(12, 2), nullable=True)
|
|
kupione = db.Column(db.Boolean, default=False)
|
|
|
|
class Wplata(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
zbiorka_id = db.Column(
|
|
db.Integer,
|
|
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
kwota = db.Column(Numeric(12, 2), nullable=False)
|
|
data = db.Column(db.DateTime, default=datetime.utcnow)
|
|
opis = db.Column(db.Text, nullable=True)
|
|
zbiorka = db.relationship("Zbiorka", back_populates="wplaty")
|
|
|
|
class Wydatek(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
zbiorka_id = db.Column(
|
|
db.Integer,
|
|
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
)
|
|
kwota = db.Column(Numeric(12, 2), nullable=False)
|
|
data = db.Column(db.DateTime, default=datetime.utcnow)
|
|
opis = db.Column(db.Text, nullable=True)
|
|
|
|
|
|
class UstawieniaGlobalne(db.Model):
|
|
__tablename__ = "ustawienia_globalne"
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
numer_konta = db.Column(db.String(50), nullable=False)
|
|
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
|
|
dozwolone_hosty_logowania = db.Column(db.Text, nullable=True)
|
|
logo_url = db.Column(db.String(255), nullable=True)
|
|
tytul_strony = db.Column(db.String(120), nullable=True)
|
|
pokaz_logo_w_navbar = db.Column(db.Boolean, default=False)
|
|
typ_navbar = db.Column(db.String(10), default="text")
|
|
typ_stopka = db.Column(db.String(10), default="text")
|
|
stopka_text = db.Column(db.String(200), nullable=True)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return db.session.get(Uzytkownik, int(user_id))
|
|
|
|
|
|
@event.listens_for(Engine, "connect")
|
|
def set_sqlite_pragma(dbapi_connection, connection_record):
|
|
try:
|
|
cursor = dbapi_connection.cursor()
|
|
cursor.execute("PRAGMA foreign_keys=ON")
|
|
cursor.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_real_ip():
|
|
headers = request.headers
|
|
cf_ip = headers.get("CF-Connecting-IP")
|
|
if cf_ip:
|
|
return cf_ip.split(",")[0].strip()
|
|
xff = headers.get("X-Forwarded-For")
|
|
if xff:
|
|
return xff.split(",")[0].strip()
|
|
x_real_ip = headers.get("X-Real-IP")
|
|
if x_real_ip:
|
|
return x_real_ip.strip()
|
|
return request.remote_addr
|
|
|
|
|
|
def is_allowed_ip(remote_ip, allowed_hosts_str):
|
|
if remote_ip in ("127.0.0.1", "::1"):
|
|
return True
|
|
|
|
if os.path.exists("emergency_access.txt"):
|
|
return True
|
|
|
|
allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip())
|
|
allowed_ips = set()
|
|
for host in allowed_hosts:
|
|
host = host.strip()
|
|
if not host:
|
|
continue
|
|
try:
|
|
resolved_ip = socket.gethostbyname(host)
|
|
allowed_ips.add(resolved_ip)
|
|
except Exception:
|
|
continue
|
|
|
|
try:
|
|
hostname = socket.gethostbyaddr(remote_ip)[0]
|
|
app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}")
|
|
except Exception as e:
|
|
app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}")
|
|
|
|
return remote_ip in allowed_ips
|
|
|
|
|
|
def to_local(dt):
|
|
if dt is None:
|
|
return None
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.astimezone(LOCAL_TZ)
|
|
|
|
|
|
@app.template_filter("dt")
|
|
def dt_filter(dt, fmt="%Y-%m-%d %H:%M"):
|
|
try:
|
|
ldt = to_local(dt)
|
|
return ldt.strftime(fmt) if ldt else ""
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
@app.template_filter("markdown")
|
|
def markdown_filter(text):
|
|
return Markup(md.markdown(text))
|
|
|
|
|
|
@app.context_processor
|
|
def inject_globals():
|
|
settings = UstawieniaGlobalne.query.first()
|
|
allowed_hosts_str = (
|
|
settings.dozwolone_hosty_logowania if settings and settings.dozwolone_hosty_logowania else ""
|
|
)
|
|
client_ip = get_real_ip()
|
|
return {
|
|
"is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str),
|
|
"global_settings": settings,
|
|
}
|
|
|
|
|
|
@app.context_processor
|
|
def inject_version():
|
|
return {'APP_VERSION': app.config['APP_VERSION']}
|
|
|
|
|
|
# TRASY PUBLICZNE
|
|
@app.route("/")
|
|
def index():
|
|
zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all()
|
|
return render_template("index.html", zbiorki=zbiorki)
|
|
|
|
|
|
@app.route("/zrealizowane")
|
|
def zbiorki_zrealizowane():
|
|
zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
|
|
return render_template("index.html", zbiorki=zbiorki)
|
|
|
|
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return redirect(url_for("index"))
|
|
|
|
|
|
@app.route("/zbiorka/<int:zbiorka_id>")
|
|
def zbiorka(zbiorka_id):
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
if zb.ukryta and (not current_user.is_authenticated or not current_user.czy_admin):
|
|
abort(404)
|
|
|
|
# scalona oś czasu: wpłaty + wydatki
|
|
aktywnosci = [
|
|
{"typ": "wpłata", "kwota": w.kwota, "opis": w.opis, "data": w.data}
|
|
for w in zb.wplaty
|
|
] + [
|
|
{"typ": "wydatek", "kwota": x.kwota, "opis": x.opis, "data": x.data}
|
|
for x in zb.wydatki
|
|
]
|
|
aktywnosci.sort(key=lambda a: a["data"], reverse=True)
|
|
|
|
return render_template("zbiorka.html", zbiorka=zb, aktywnosci=aktywnosci)
|
|
|
|
|
|
# TRASY LOGOWANIA I REJESTRACJI
|
|
|
|
|
|
@app.route("/zaloguj", methods=["GET", "POST"])
|
|
def zaloguj():
|
|
settings = UstawieniaGlobalne.query.first()
|
|
allowed_hosts_str = settings.dozwolone_hosty_logowania or "" if settings else ""
|
|
client_ip = get_real_ip()
|
|
if not is_allowed_ip(client_ip, allowed_hosts_str):
|
|
flash("Dostęp do tego systemu jest zablokowany dla Twojego adresu IP", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
if request.method == "POST":
|
|
login = request.form["uzytkownik"]
|
|
password = request.form["haslo"]
|
|
user = Uzytkownik.query.filter_by(uzytkownik=login).first()
|
|
if user and user.check_password(password):
|
|
login_user(user)
|
|
flash("Zalogowano pomyślnie", "success")
|
|
next_page = request.args.get("next")
|
|
return redirect(next_page) if next_page else redirect(url_for("admin_dashboard"))
|
|
else:
|
|
flash("Nieprawidłowe dane logowania", "danger")
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/wyloguj")
|
|
@login_required
|
|
def wyloguj():
|
|
logout_user()
|
|
flash("Wylogowano", "success")
|
|
return redirect(url_for("zaloguj"))
|
|
|
|
|
|
@app.route("/zarejestruj", methods=["GET", "POST"])
|
|
def zarejestruj():
|
|
if not app.config.get("ALLOW_REGISTRATION", False):
|
|
flash("Rejestracja została wyłączona przez administratora", "danger")
|
|
return redirect(url_for("zaloguj"))
|
|
if request.method == "POST":
|
|
login = request.form["uzytkownik"]
|
|
password = request.form["haslo"]
|
|
if Uzytkownik.query.filter_by(uzytkownik=login).first():
|
|
flash("Użytkownik już istnieje", "danger")
|
|
return redirect(url_for("register"))
|
|
new_user = Uzytkownik(uzytkownik=login)
|
|
new_user.set_password(password)
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash("Konto utworzone, możesz się zalogować", "success")
|
|
return redirect(url_for("zaloguj"))
|
|
return render_template("register.html")
|
|
|
|
|
|
# PANEL ADMINISTRACYJNY
|
|
@app.route("/admin")
|
|
@login_required
|
|
def admin_dashboard():
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień do panelu administracyjnego", "danger")
|
|
return redirect(url_for("index"))
|
|
active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all()
|
|
completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
|
|
return render_template(
|
|
"admin/dashboard.html",
|
|
active_zbiorki=active_zbiorki,
|
|
completed_zbiorki=completed_zbiorki,
|
|
)
|
|
|
|
|
|
@app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"])
|
|
@app.route("/admin/zbiorka/edytuj/<int:zbiorka_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def formularz_zbiorek(zbiorka_id=None):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
is_edit = zbiorka_id is not None
|
|
zb = None
|
|
if is_edit:
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
|
|
global_settings = UstawieniaGlobalne.query.first()
|
|
|
|
if request.method == "POST":
|
|
# Pola wspólne
|
|
nazwa = request.form.get("nazwa", "").strip()
|
|
opis = request.form.get("opis", "").strip()
|
|
numer_konta = request.form.get("numer_konta", "").strip()
|
|
numer_telefonu_blik = request.form.get("numer_telefonu_blik", "").strip()
|
|
|
|
# Widoczność kwot i poszczególnych pasków postępu
|
|
ukryj_kwote = "ukryj_kwote" in request.form
|
|
pokaz_postep_finanse = "pokaz_postep_finanse" in request.form
|
|
pokaz_postep_pozycje = "pokaz_postep_pozycje" in request.form
|
|
pokaz_postep_kwotowo = "pokaz_postep_kwotowo" in request.form
|
|
|
|
# Cel — Decimal, > 0
|
|
try:
|
|
cel_str = request.form.get("cel", "").replace(",", ".").strip()
|
|
cel = Decimal(cel_str)
|
|
if cel <= Decimal("0"):
|
|
raise InvalidOperation
|
|
except (InvalidOperation, ValueError):
|
|
flash("Podano nieprawidłową wartość dla celu zbiórki", "danger")
|
|
|
|
# Przy błędzie celu odtwórz stan formularza (również przełączniki)
|
|
if is_edit and zb:
|
|
zb.nazwa = nazwa
|
|
zb.opis = opis
|
|
zb.numer_konta = numer_konta
|
|
zb.numer_telefonu_blik = numer_telefonu_blik
|
|
zb.ukryj_kwote = ukryj_kwote
|
|
zb.pokaz_postep_finanse = pokaz_postep_finanse
|
|
zb.pokaz_postep_pozycje = pokaz_postep_pozycje
|
|
zb.pokaz_postep_kwotowo = pokaz_postep_kwotowo
|
|
temp_zb = zb
|
|
else:
|
|
temp_zb = Zbiorka(
|
|
nazwa=nazwa,
|
|
opis=opis,
|
|
numer_konta=numer_konta,
|
|
numer_telefonu_blik=numer_telefonu_blik,
|
|
cel=None,
|
|
ukryj_kwote=ukryj_kwote,
|
|
pokaz_postep_finanse=pokaz_postep_finanse,
|
|
pokaz_postep_pozycje=pokaz_postep_pozycje,
|
|
pokaz_postep_kwotowo=pokaz_postep_kwotowo,
|
|
)
|
|
|
|
return render_template(
|
|
"admin/formularz_zbiorek.html",
|
|
zbiorka=temp_zb,
|
|
global_settings=global_settings,
|
|
)
|
|
|
|
# Lista produktów
|
|
names = request.form.getlist("item_nazwa[]")
|
|
links = request.form.getlist("item_link[]")
|
|
prices = request.form.getlist("item_cena[]")
|
|
|
|
def _read_price(val):
|
|
"""Zwraca Decimal(>=0) albo None; akceptuje przecinek jako separator dziesiętny."""
|
|
if not val or not val.strip():
|
|
return None
|
|
try:
|
|
d = Decimal(val.replace(",", "."))
|
|
if d < 0:
|
|
return None
|
|
return d
|
|
except Exception:
|
|
return None
|
|
|
|
# --- ZAPIS ZBIÓRKI + PRODUKTÓW ---
|
|
if is_edit:
|
|
# Aktualizacja istniejącej zbiórki
|
|
zb.nazwa = nazwa
|
|
zb.opis = opis
|
|
zb.numer_konta = numer_konta
|
|
zb.numer_telefonu_blik = numer_telefonu_blik
|
|
zb.cel = cel
|
|
zb.ukryj_kwote = ukryj_kwote
|
|
zb.pokaz_postep_finanse = pokaz_postep_finanse
|
|
zb.pokaz_postep_pozycje = pokaz_postep_pozycje
|
|
zb.pokaz_postep_kwotowo = pokaz_postep_kwotowo
|
|
db.session.commit() # zapisz bazowe pola
|
|
|
|
# Nadpisz listę produktów
|
|
zb.przedmioty.clear()
|
|
for i, raw_name in enumerate(names):
|
|
name = (raw_name or "").strip()
|
|
if not name:
|
|
continue
|
|
link = (links[i] if i < len(links) else "").strip() or None
|
|
cena_val = _read_price(prices[i] if i < len(prices) else "")
|
|
kupione_val = request.form.get(f"item_kupione_val_{i}") == "1"
|
|
|
|
db.session.add(Przedmiot(
|
|
zbiorka_id=zb.id,
|
|
nazwa=name,
|
|
link=link,
|
|
cena=cena_val,
|
|
kupione=kupione_val
|
|
))
|
|
|
|
db.session.commit()
|
|
flash("Zbiórka została zaktualizowana", "success")
|
|
|
|
else:
|
|
# Utworzenie nowej zbiórki
|
|
nowa = Zbiorka(
|
|
nazwa=nazwa,
|
|
opis=opis,
|
|
numer_konta=numer_konta,
|
|
numer_telefonu_blik=numer_telefonu_blik,
|
|
cel=cel,
|
|
ukryj_kwote=ukryj_kwote,
|
|
pokaz_postep_finanse=pokaz_postep_finanse,
|
|
pokaz_postep_pozycje=pokaz_postep_pozycje,
|
|
pokaz_postep_kwotowo=pokaz_postep_kwotowo,
|
|
)
|
|
db.session.add(nowa)
|
|
db.session.commit() # potrzebne ID
|
|
|
|
# Dodaj produkty do nowej zbiórki
|
|
for i, raw_name in enumerate(names):
|
|
name = (raw_name or "").strip()
|
|
if not name:
|
|
continue
|
|
link = (links[i] if i < len(links) else "").strip() or None
|
|
cena_val = _read_price(prices[i] if i < len(prices) else "")
|
|
kupione_val = request.form.get(f"item_kupione_val_{i}") == "1"
|
|
|
|
db.session.add(Przedmiot(
|
|
zbiorka_id=nowa.id,
|
|
nazwa=name,
|
|
link=link,
|
|
cena=cena_val,
|
|
kupione=kupione_val
|
|
))
|
|
|
|
db.session.commit()
|
|
flash("Zbiórka została dodana", "success")
|
|
|
|
return redirect(url_for("admin_dashboard"))
|
|
|
|
# GET
|
|
return render_template(
|
|
"admin/formularz_zbiorek.html",
|
|
zbiorka=zb,
|
|
global_settings=global_settings
|
|
)
|
|
|
|
|
|
@app.route("/admin/zbiorka/<int:zbiorka_id>/wplata/dodaj", methods=["GET", "POST"])
|
|
@login_required
|
|
def dodaj_wplate(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if not zb:
|
|
abort(404)
|
|
|
|
if request.method == "POST":
|
|
try:
|
|
kwota = Decimal(request.form.get("kwota", "").replace(",", "."))
|
|
if kwota <= 0:
|
|
raise InvalidOperation
|
|
except (InvalidOperation, ValueError):
|
|
flash("Nieprawidłowa kwota (musi być > 0)", "danger")
|
|
return redirect(url_for("dodaj_wplate", zbiorka_id=zbiorka_id))
|
|
|
|
opis = request.form.get("opis", "")
|
|
nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis)
|
|
zb.stan = (zb.stan or Decimal("0")) + kwota
|
|
db.session.add(nowa_wplata)
|
|
db.session.commit()
|
|
flash("Wpłata została dodana", "success")
|
|
|
|
next_url = request.args.get("next")
|
|
return redirect(next_url or url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
return render_template("admin/dodaj_wplate.html", zbiorka=zb)
|
|
|
|
|
|
@app.route("/admin/zbiorka/usun/<int:zbiorka_id>", methods=["POST"])
|
|
@login_required
|
|
def usun_zbiorka(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
db.session.delete(zb)
|
|
db.session.commit()
|
|
flash("Zbiórka została usunięta", "success")
|
|
return redirect(url_for("admin_dashboard"))
|
|
|
|
|
|
@app.route("/admin/zbiorka/edytuj_stan/<int:zbiorka_id>", methods=["GET", "POST"])
|
|
@login_required
|
|
def edytuj_stan(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
if request.method == "POST":
|
|
try:
|
|
nowy_stan = Decimal(request.form.get("stan", "").replace(",", "."))
|
|
except (InvalidOperation, ValueError):
|
|
flash("Nieprawidłowa wartość kwoty", "danger")
|
|
return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id))
|
|
zb.stan = nowy_stan
|
|
db.session.commit()
|
|
flash("Stan zbiórki został zaktualizowany", "success")
|
|
return redirect(url_for("admin_dashboard"))
|
|
return render_template("admin/edytuj_stan.html", zbiorka=zb)
|
|
|
|
|
|
@app.route("/admin/zbiorka/zmien_widzialnosc/<int:zbiorka_id>", methods=["POST"])
|
|
@login_required
|
|
def zmien_widzialnosc(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
zb.ukryta = not zb.ukryta
|
|
db.session.commit()
|
|
flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success")
|
|
return redirect(url_for("admin_dashboard"))
|
|
|
|
|
|
def create_admin_account():
|
|
admin = Uzytkownik.query.filter_by(czy_admin=True).first()
|
|
if not admin:
|
|
main_admin = Uzytkownik(
|
|
uzytkownik=app.config["MAIN_ADMIN_USERNAME"],
|
|
czy_admin=True
|
|
)
|
|
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
|
|
db.session.add(main_admin)
|
|
db.session.commit()
|
|
|
|
|
|
@app.after_request
|
|
def apply_headers(response):
|
|
if request.path.startswith("/static/"):
|
|
response.headers.pop("Content-Disposition", None)
|
|
response.headers["Vary"] = "Accept-Encoding"
|
|
response.headers["Cache-Control"] = app.config.get(
|
|
"CACHE_CONTROL_HEADER_STATIC"
|
|
)
|
|
if app.config.get("USE_ETAGS", True) and "ETag" not in response.headers:
|
|
response.add_etag()
|
|
response.make_conditional(request)
|
|
return response
|
|
|
|
path_norm = request.path.lstrip("/")
|
|
czy_admin = path_norm.startswith("admin/") or path_norm == "admin"
|
|
|
|
if czy_admin:
|
|
if (response.mimetype or "").startswith("text/html"):
|
|
response.headers["Cache-Control"] = "no-store, no-cache"
|
|
response.headers.pop("ETag", None)
|
|
|
|
return response
|
|
|
|
if response.status_code in (301, 302, 303, 307, 308):
|
|
response.headers.pop("Vary", None)
|
|
return response
|
|
|
|
if 400 <= response.status_code < 500:
|
|
response.headers["Cache-Control"] = "no-store"
|
|
response.headers["Content-Type"] = "text/html; charset=utf-8"
|
|
response.headers.pop("Vary", None)
|
|
elif 500 <= response.status_code < 600:
|
|
response.headers["Cache-Control"] = "no-store"
|
|
response.headers["Content-Type"] = "text/html; charset=utf-8"
|
|
response.headers["Retry-After"] = "120"
|
|
response.headers.pop("Vary", None)
|
|
else:
|
|
response.headers["Vary"] = "Cookie, Accept-Encoding"
|
|
default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, no-store"
|
|
response.headers["Cache-Control"] = default_cache
|
|
|
|
if (
|
|
app.config.get("BLOCK_BOTS", False)
|
|
and not czy_admin
|
|
and not request.path.startswith("/static/")
|
|
):
|
|
cc_override = app.config.get("CACHE_CONTROL_HEADER")
|
|
if cc_override:
|
|
response.headers["Cache-Control"] = cc_override
|
|
response.headers["X-Robots-Tag"] = (
|
|
app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive"
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
@app.route("/admin/ustawienia", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin_ustawienia():
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień do panelu administracyjnego", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
client_ip = get_real_ip()
|
|
settings = UstawieniaGlobalne.query.first()
|
|
if request.method == "POST":
|
|
numer_konta = request.form.get("numer_konta")
|
|
numer_telefonu_blik = request.form.get("numer_telefonu_blik")
|
|
dozwolone_hosty_logowania = request.form.get("dozwolone_hosty_logowania")
|
|
logo_url = request.form.get("logo_url")
|
|
tytul_strony = request.form.get("tytul_strony")
|
|
typ_navbar = request.form.get("typ_navbar", "text")
|
|
typ_stopka = request.form.get("typ_stopka", "text")
|
|
stopka_text = request.form.get("stopka_text") or None
|
|
pokaz_logo_w_navbar = (typ_navbar == "logo")
|
|
|
|
if settings is None:
|
|
settings = UstawieniaGlobalne(
|
|
numer_konta=numer_konta,
|
|
numer_telefonu_blik=numer_telefonu_blik,
|
|
dozwolone_hosty_logowania=dozwolone_hosty_logowania,
|
|
logo_url=logo_url,
|
|
tytul_strony=tytul_strony,
|
|
pokaz_logo_w_navbar=pokaz_logo_w_navbar,
|
|
typ_navbar=typ_navbar,
|
|
typ_stopka=typ_stopka,
|
|
stopka_text=stopka_text,
|
|
)
|
|
db.session.add(settings)
|
|
else:
|
|
settings.numer_konta = numer_konta
|
|
settings.numer_telefonu_blik = numer_telefonu_blik
|
|
settings.dozwolone_hosty_logowania = dozwolone_hosty_logowania
|
|
settings.logo_url = logo_url
|
|
settings.tytul_strony = tytul_strony
|
|
settings.pokaz_logo_w_navbar = pokaz_logo_w_navbar
|
|
settings.typ_navbar = typ_navbar
|
|
settings.typ_stopka = typ_stopka
|
|
settings.stopka_text = stopka_text
|
|
|
|
db.session.commit()
|
|
flash("Ustawienia globalne zostały zaktualizowane", "success")
|
|
return redirect(url_for("admin_dashboard"))
|
|
|
|
return render_template("admin/ustawienia.html", settings=settings, client_ip=client_ip)
|
|
|
|
|
|
@app.route("/admin/zbiorka/<int:zbiorka_id>/wydatek/dodaj", methods=["GET", "POST"])
|
|
@login_required
|
|
def dodaj_wydatek(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
|
|
if request.method == "POST":
|
|
try:
|
|
kwota = Decimal(request.form.get("kwota", "").replace(",", "."))
|
|
if kwota <= 0:
|
|
raise InvalidOperation
|
|
except (InvalidOperation, ValueError):
|
|
flash("Nieprawidłowa kwota (musi być > 0)", "danger")
|
|
return redirect(url_for("dodaj_wydatek", zbiorka_id=zbiorka_id))
|
|
|
|
opis = request.form.get("opis", "")
|
|
nowy_wydatek = Wydatek(zbiorka_id=zb.id, kwota=kwota, opis=opis)
|
|
zb.stan = (zb.stan or Decimal("0")) - kwota
|
|
db.session.add(nowy_wydatek)
|
|
db.session.commit()
|
|
flash("Wydatek został dodany", "success")
|
|
|
|
next_url = request.args.get("next")
|
|
return redirect(next_url or url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
return render_template("admin/dodaj_wydatek.html", zbiorka=zb)
|
|
|
|
|
|
@app.route(
|
|
"/admin/zbiorka/oznacz/niezrealizowana/<int:zbiorka_id>",
|
|
methods=["POST"],
|
|
endpoint="oznacz_niezrealizowana",
|
|
)
|
|
@app.route(
|
|
"/admin/zbiorka/oznacz/zrealizowana/<int:zbiorka_id>",
|
|
methods=["POST"],
|
|
endpoint="oznacz_zrealizowana",
|
|
)
|
|
@login_required
|
|
def oznacz_zbiorka(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień do wykonania tej operacji", "danger")
|
|
return redirect(url_for("index"))
|
|
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
|
|
if "niezrealizowana" in request.path:
|
|
zb.zrealizowana = False
|
|
msg = "Zbiórka została oznaczona jako niezrealizowana"
|
|
else:
|
|
zb.zrealizowana = True
|
|
msg = "Zbiórka została oznaczona jako zrealizowana"
|
|
|
|
db.session.commit()
|
|
flash(msg, "success")
|
|
return redirect(url_for("admin_dashboard"))
|
|
|
|
|
|
@app.route("/robots.txt")
|
|
def robots():
|
|
if app.config.get("BLOCK_BOTS", False):
|
|
robots_txt = "User-agent: *\nDisallow: /"
|
|
else:
|
|
robots_txt = "User-agent: *\nAllow: /"
|
|
return robots_txt, 200, {"Content-Type": "text/plain"}
|
|
|
|
|
|
@app.route("/admin/zbiorka/<int:zbiorka_id>/transakcje")
|
|
@login_required
|
|
def transakcje_zbiorki(zbiorka_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger"); return redirect(url_for("index"))
|
|
zb = db.session.get(Zbiorka, zbiorka_id)
|
|
if zb is None:
|
|
abort(404)
|
|
aktywnosci = (
|
|
[{"typ": "wpłata", "id": w.id, "kwota": w.kwota, "opis": w.opis, "data": w.data} for w in zb.wplaty] +
|
|
[{"typ": "wydatek","id": x.id, "kwota": x.kwota,"opis": x.opis,"data": x.data} for x in zb.wydatki]
|
|
)
|
|
aktywnosci.sort(key=lambda a: a["data"], reverse=True)
|
|
return render_template("admin/transakcje.html", zbiorka=zb, aktywnosci=aktywnosci)
|
|
|
|
|
|
@app.route("/admin/wplata/<int:wplata_id>/zapisz", methods=["POST"])
|
|
@login_required
|
|
def zapisz_wplate(wplata_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger"); return redirect(url_for("index"))
|
|
w = db.session.get(Wplata, wplata_id)
|
|
if w is None:
|
|
abort(404)
|
|
zb = w.zbiorka
|
|
try:
|
|
nowa_kwota = Decimal(request.form.get("kwota", "").replace(",", "."))
|
|
if nowa_kwota <= 0:
|
|
raise InvalidOperation
|
|
except (InvalidOperation, ValueError):
|
|
flash("Nieprawidłowa kwota (musi być > 0)", "danger")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
delta = nowa_kwota - (w.kwota or Decimal("0"))
|
|
w.kwota = nowa_kwota
|
|
w.opis = request.form.get("opis", "")
|
|
zb.stan = (zb.stan or Decimal("0")) + delta
|
|
db.session.commit()
|
|
flash("Wpłata zaktualizowana", "success")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
|
|
@app.route("/admin/wplata/<int:wplata_id>/usun", methods=["POST"])
|
|
@login_required
|
|
def usun_wplate(wplata_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger"); return redirect(url_for("index"))
|
|
w = db.session.get(Wplata, wplata_id)
|
|
if w is None:
|
|
abort(404)
|
|
zb = w.zbiorka
|
|
zb.stan -= w.kwota
|
|
db.session.delete(w)
|
|
db.session.commit()
|
|
flash("Wpłata usunięta", "success")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
|
|
@app.route("/admin/wydatek/<int:wydatek_id>/zapisz", methods=["POST"])
|
|
@login_required
|
|
def zapisz_wydatek(wydatek_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger"); return redirect(url_for("index"))
|
|
x = db.session.get(Wydatek, wydatek_id)
|
|
if x is None:
|
|
abort(404)
|
|
zb = x.zbiorka
|
|
try:
|
|
nowa_kwota = Decimal(request.form.get("kwota", "").replace(",", "."))
|
|
if nowa_kwota <= 0:
|
|
raise InvalidOperation
|
|
except (InvalidOperation, ValueError):
|
|
flash("Nieprawidłowa kwota (musi być > 0)", "danger")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
delta = nowa_kwota - (x.kwota or Decimal("0"))
|
|
x.kwota = nowa_kwota
|
|
x.opis = request.form.get("opis", "")
|
|
# wydatki zmniejszają stan; jeżeli delta>0, stan spada bardziej
|
|
zb.stan = (zb.stan or Decimal("0")) - delta
|
|
db.session.commit()
|
|
flash("Wydatek zaktualizowany", "success")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
|
|
@app.route("/admin/wydatek/<int:wydatek_id>/usun", methods=["POST"])
|
|
@login_required
|
|
def usun_wydatek(wydatek_id):
|
|
if not current_user.czy_admin:
|
|
flash("Brak uprawnień", "danger"); return redirect(url_for("index"))
|
|
x = db.session.get(Wydatek, wydatek_id)
|
|
if x is None:
|
|
abort(404)
|
|
zb = x.zbiorka
|
|
zb.stan += x.kwota
|
|
db.session.delete(x)
|
|
db.session.commit()
|
|
flash("Wydatek usunięty", "success")
|
|
return redirect(url_for("transakcje_zbiorki", zbiorka_id=zb.id))
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return "", 204
|
|
|
|
|
|
@app.route("/healthcheck")
|
|
def healthcheck():
|
|
header_token = request.headers.get("X-Internal-Check")
|
|
correct_token = app.config.get("HEALTHCHECK_TOKEN")
|
|
|
|
if header_token != correct_token:
|
|
abort(404)
|
|
return "OK", 200
|
|
|
|
|
|
if __name__ == "__main__":
|
|
with app.app_context():
|
|
db.create_all()
|
|
stmt = select(Uzytkownik).filter_by(czy_admin=True)
|
|
admin = db.session.execute(stmt).scalars().first()
|
|
if not admin:
|
|
main_admin = Uzytkownik(
|
|
uzytkownik=app.config["MAIN_ADMIN_USERNAME"],
|
|
czy_admin=True
|
|
)
|
|
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
|
|
db.session.add(main_admin)
|
|
db.session.commit()
|
|
|
|
app.run(debug=True)
|