Files
zbiorki_app/app.py
Mateusz Gruszczyński e9db945bb4 przebudowa systemu
2025-08-28 10:27:06 +02:00

548 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager,
login_user,
login_required,
logout_user,
current_user,
UserMixin,
)
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from markupsafe import Markup
from sqlalchemy import event
from sqlalchemy.engine import Engine
import markdown as md
from flask import request, flash, abort
import os
import re
import socket
app = Flask(__name__)
# Ładujemy konfigurację z pliku config.py
app.config.from_object("config.Config")
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = "login"
# MODELE
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
is_admin = db.Column(db.Boolean, default=False) # Flaga głównego administratora
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
class Zbiorka(db.Model):
id = db.Column(db.Integer, primary_key=True)
nazwa = db.Column(db.String(100), nullable=False)
opis = db.Column(db.Text, nullable=False)
numer_konta = db.Column(db.String(50), nullable=False)
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
cel = db.Column(db.Float, nullable=False, default=0.0)
stan = db.Column(db.Float, default=0.0)
ukryta = db.Column(db.Boolean, default=False)
ukryj_kwote = db.Column(db.Boolean, default=False)
zrealizowana = db.Column(db.Boolean, default=False)
wplaty = db.relationship(
"Wplata",
back_populates="zbiorka",
lazy=True,
order_by="Wplata.data.desc()",
cascade="all, delete-orphan",
passive_deletes=True,
)
class Wplata(db.Model):
id = db.Column(db.Integer, primary_key=True)
zbiorka_id = db.Column(
db.Integer,
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
nullable=False,
)
kwota = db.Column(db.Float, nullable=False)
data = db.Column(db.DateTime, default=datetime.utcnow)
opis = db.Column(db.Text, nullable=True)
zbiorka = db.relationship("Zbiorka", back_populates="wplaty")
class GlobalSettings(db.Model):
id = db.Column(db.Integer, primary_key=True)
numer_konta = db.Column(db.String(50), nullable=False)
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
allowed_login_hosts = db.Column(db.Text, nullable=True)
logo_url = db.Column(db.String(255), nullable=True)
site_title = db.Column(db.String(120), nullable=True)
show_logo_in_navbar = db.Column(db.Boolean, default=False)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
try:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
except Exception:
pass
def get_real_ip():
headers = request.headers
cf_ip = headers.get("CF-Connecting-IP")
if cf_ip:
return cf_ip.split(",")[0].strip()
xff = headers.get("X-Forwarded-For")
if xff:
return xff.split(",")[0].strip()
x_real_ip = headers.get("X-Real-IP")
if x_real_ip:
return x_real_ip.strip()
return request.remote_addr
def is_allowed_ip(remote_ip, allowed_hosts_str):
if remote_ip in ("127.0.0.1", "::1"):
return True
if os.path.exists("emergency_access.txt"):
return True
allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip())
allowed_ips = set()
for host in allowed_hosts:
host = host.strip()
if not host:
continue
try:
resolved_ip = socket.gethostbyname(host)
allowed_ips.add(resolved_ip)
except Exception:
continue
try:
hostname = socket.gethostbyaddr(remote_ip)[0]
app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}")
except Exception as e:
app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}")
return remote_ip in allowed_ips
# Dodaj filtr Markdown pozwala na zagnieżdżanie linków i obrazków w opisie
@app.template_filter("markdown")
def markdown_filter(text):
return Markup(md.markdown(text))
@app.context_processor
def inject_globals():
settings = GlobalSettings.query.first()
allowed_hosts_str = (
settings.allowed_login_hosts
if settings and settings.allowed_login_hosts
else ""
)
client_ip = get_real_ip()
return {
"is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str),
"global_settings": settings,
}
# TRASY PUBLICZNE
@app.route("/")
def index():
zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all()
return render_template("index.html", zbiorki=zbiorki)
@app.route("/zbiorki_zrealizowane")
def zbiorki_zrealizowane():
zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
return render_template("index.html", zbiorki=zbiorki)
@app.errorhandler(404)
def page_not_found(e):
return redirect(url_for("index"))
@app.route("/zbiorka/<int:zbiorka_id>")
def zbiorka(zbiorka_id):
zb = Zbiorka.query.get_or_404(zbiorka_id)
# Jeżeli zbiórka jest ukryta i użytkownik nie jest administratorem, zwróć 404
if zb.ukryta and (not current_user.is_authenticated or not current_user.is_admin):
abort(404)
return render_template("zbiorka.html", zbiorka=zb)
# TRASY LOGOWANIA I REJESTRACJI
@app.route("/zaloguj", methods=["GET", "POST"])
def zaloguj():
# Pobierz ustawienia globalne, w tym dozwolone hosty
settings = GlobalSettings.query.first()
allowed_hosts_str = ""
if settings and settings.allowed_login_hosts:
allowed_hosts_str = settings.allowed_login_hosts
# Sprawdzenie, czy adres IP klienta jest dozwolony
client_ip = get_real_ip()
if not is_allowed_ip(client_ip, allowed_hosts_str):
flash(
"Dostęp do endpointu /login jest zablokowany dla Twojego adresu IP",
"danger",
)
return redirect(url_for("index"))
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
flash("Zalogowano pomyślnie", "success")
next_page = request.args.get("next")
return (
redirect(next_page)
if next_page
else redirect(url_for("admin_dashboard"))
)
else:
flash("Nieprawidłowe dane logowania", "danger")
return render_template("login.html")
@app.route("/wyloguj")
@login_required
def wyloguj():
logout_user()
flash("Wylogowano", "success")
return redirect(url_for("zaloguj"))
@app.route("/zarejestruj", methods=["GET", "POST"])
def zarejestruj():
if not app.config.get("ALLOW_REGISTRATION", False):
flash("Rejestracja została wyłączona przez administratora", "danger")
return redirect(url_for("zaloguj"))
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if User.query.filter_by(username=username).first():
flash("Użytkownik już istnieje", "danger")
return redirect(url_for("register"))
new_user = User(username=username)
new_user.set_password(password)
db.session.add(new_user)
db.session.commit()
flash("Konto utworzone, możesz się zalogować", "success")
return redirect(url_for("zaloguj"))
return render_template("register.html")
# PANEL ADMINISTRACYJNY
@app.route("/admin")
@login_required
def admin_dashboard():
if not current_user.is_admin:
flash("Brak uprawnień do panelu administracyjnego", "danger")
return redirect(url_for("index"))
active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all()
completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
return render_template(
"admin/dashboard.html",
active_zbiorki=active_zbiorki,
completed_zbiorki=completed_zbiorki,
)
@app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"])
@login_required
def dodaj_zbiorke():
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia
if request.method == "POST":
nazwa = request.form["nazwa"]
opis = request.form["opis"]
# Pozyskujemy numer konta i telefon z formularza (mogą być nadpisane ręcznie)
numer_konta = request.form["numer_konta"]
numer_telefonu_blik = request.form["numer_telefonu_blik"]
cel = float(request.form["cel"])
ukryj_kwote = "ukryj_kwote" in request.form
nowa_zbiorka = Zbiorka(
nazwa=nazwa,
opis=opis,
numer_konta=numer_konta,
numer_telefonu_blik=numer_telefonu_blik,
cel=cel,
ukryj_kwote=ukryj_kwote,
)
db.session.add(nowa_zbiorka)
db.session.commit()
flash("Zbiórka została dodana", "success")
return redirect(url_for("admin_dashboard"))
return render_template("admin/dodaj_zbiorke.html", global_settings=global_settings)
@app.route("/admin/zbiorka/edytuj/<int:zbiorka_id>", methods=["GET", "POST"])
@login_required
def edytuj_zbiorka(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia
if request.method == "POST":
zb.nazwa = request.form["nazwa"]
zb.opis = request.form["opis"]
zb.numer_konta = request.form["numer_konta"]
zb.numer_telefonu_blik = request.form["numer_telefonu_blik"]
try:
zb.cel = float(request.form["cel"])
except ValueError:
flash("Podano nieprawidłową wartość dla celu zbiórki", "danger")
return render_template(
"admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings
)
zb.ukryj_kwote = "ukryj_kwote" in request.form
db.session.commit()
flash("Zbiórka została zaktualizowana", "success")
return redirect(url_for("admin_dashboard"))
return render_template(
"admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings
)
# TRASA DODAWANIA WPŁATY Z OPISEM
# TRASA DODAWANIA WPŁATY W PANELU ADMINA
@app.route("/admin/zbiorka/<int:zbiorka_id>/wplata/dodaj", methods=["GET", "POST"])
@login_required
def dodaj_wplate(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
if request.method == "POST":
kwota = float(request.form["kwota"])
opis = request.form.get("opis", "")
nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis)
zb.stan += kwota # Aktualizacja stanu zbiórki
db.session.add(nowa_wplata)
db.session.commit()
flash("Wpłata została dodana", "success")
return redirect(url_for("admin_dashboard"))
return render_template("admin/dodaj_wplate.html", zbiorka=zb)
@app.route("/admin/zbiorka/usun/<int:zbiorka_id>", methods=["POST"])
@login_required
def usun_zbiorka(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
db.session.delete(zb)
db.session.commit()
flash("Zbiórka została usunięta", "success")
return redirect(url_for("admin_dashboard"))
@app.route("/admin/zbiorka/edytuj_stan/<int:zbiorka_id>", methods=["GET", "POST"])
@login_required
def edytuj_stan(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
if request.method == "POST":
try:
nowy_stan = float(request.form["stan"])
except ValueError:
flash("Nieprawidłowa wartość kwoty", "danger")
return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id))
zb.stan = nowy_stan
db.session.commit()
flash("Stan zbiórki został zaktualizowany", "success")
return redirect(url_for("admin_dashboard"))
return render_template("admin/edytuj_stan.html", zbiorka=zb)
@app.route("/admin/zbiorka/zmien_widzialnosc/<int:zbiorka_id>", methods=["POST"])
@login_required
def zmien_widzialnosc(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
zb.ukryta = not zb.ukryta
db.session.commit()
flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success")
return redirect(url_for("admin_dashboard"))
def create_admin_account():
admin = User.query.filter_by(is_admin=True).first()
if not admin:
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
db.session.add(main_admin)
db.session.commit()
@app.after_request
def apply_headers(response):
# Nagłówki niestandardowe
custom_headers = app.config.get("ADD_HEADERS", {})
if isinstance(custom_headers, dict):
for header, value in custom_headers.items():
response.headers[header] = str(value)
# Wykluczenia
if response.status_code in (301, 302, 303, 307, 308):
response.headers.pop("Vary", None)
return response
if 400 <= response.status_code < 500:
response.headers["Cache-Control"] = "no-store"
response.headers["Content-Type"] = "text/html; charset=utf-8"
response.headers.pop("Vary", None)
elif 500 <= response.status_code < 600:
response.headers["Cache-Control"] = "no-store"
response.headers["Content-Type"] = "text/html; charset=utf-8"
response.headers["Retry-After"] = "120"
response.headers.pop("Vary", None)
elif request.path.startswith("/admin"):
response.headers.pop("Vary", None)
response.headers["Cache-Control"] = (
"no-store, no-cache, must-revalidate, max-age=0"
)
else:
response.headers["Vary"] = "Cookie, Accept-Encoding"
default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, max-age=0"
response.headers["Cache-Control"] = default_cache
# Blokowanie botów
if app.config.get("BLOCK_BOTS", False):
cc = (
app.config.get("CACHE_CONTROL_HEADER")
or "no-store, no-cache, must-revalidate, max-age=0"
)
response.headers["Cache-Control"] = cc
response.headers["X-Robots-Tag"] = (
app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive"
)
return response
@app.route("/admin/ustawienia", methods=["GET", "POST"])
@login_required
def admin_ustawienia():
if not current_user.is_admin:
flash("Brak uprawnień do panelu administracyjnego", "danger")
return redirect(url_for("index"))
client_ip = get_real_ip()
settings = GlobalSettings.query.first()
if request.method == "POST":
numer_konta = request.form.get("numer_konta")
numer_telefonu_blik = request.form.get("numer_telefonu_blik")
allowed_login_hosts = request.form.get("allowed_login_hosts")
logo_url = request.form.get("logo_url")
site_title = request.form.get("site_title")
show_logo_in_navbar = "show_logo_in_navbar" in request.form
if settings is None:
settings = GlobalSettings(
numer_konta=numer_konta,
numer_telefonu_blik=numer_telefonu_blik,
allowed_login_hosts=allowed_login_hosts,
logo_url=logo_url,
site_title=site_title,
show_logo_in_navbar=show_logo_in_navbar,
)
db.session.add(settings)
else:
settings.numer_konta = numer_konta
settings.numer_telefonu_blik = numer_telefonu_blik
settings.allowed_login_hosts = allowed_login_hosts
settings.logo_url = logo_url
settings.site_title = site_title
settings.show_logo_in_navbar = show_logo_in_navbar
db.session.commit()
flash("Ustawienia globalne zostały zaktualizowane", "success")
return redirect(url_for("admin_dashboard"))
return render_template(
"admin/ustawienia.html", settings=settings, client_ip=client_ip
)
@app.route("/admin/zbiorka/oznacz/<int:zbiorka_id>", methods=["POST"])
@login_required
def oznacz_zbiorka(zbiorka_id):
if not current_user.is_admin:
flash("Brak uprawnień do wykonania tej operacji", "danger")
return redirect(url_for("index"))
zb = Zbiorka.query.get_or_404(zbiorka_id)
zb.zrealizowana = True
db.session.commit()
flash("Zbiórka została oznaczona jako zrealizowana", "success")
return redirect(url_for("admin_dashboard"))
@app.route("/robots.txt")
def robots():
if app.config.get("BLOCK_BOTS", False):
robots_txt = "User-agent: *\nDisallow: /"
else:
robots_txt = "User-agent: *\nAllow: /"
return robots_txt, 200, {"Content-Type": "text/plain"}
@app.route("/favicon.ico")
def favicon():
return "", 204
if __name__ == "__main__":
with app.app_context():
db.create_all()
# Tworzenie konta głównego admina, jeśli nie istnieje
if not User.query.filter_by(is_admin=True).first():
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
db.session.add(main_admin)
db.session.commit()
app.run(debug=True)