565 lines
19 KiB
Python
565 lines
19 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from flask_login import (
|
||
LoginManager,
|
||
login_user,
|
||
login_required,
|
||
logout_user,
|
||
current_user,
|
||
UserMixin,
|
||
)
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
from datetime import datetime
|
||
from markupsafe import Markup
|
||
from sqlalchemy import event
|
||
from sqlalchemy.engine import Engine
|
||
|
||
import markdown as md
|
||
from flask import request, flash, abort
|
||
import os
|
||
import re
|
||
import socket
|
||
|
||
app = Flask(__name__)
|
||
# Ładujemy konfigurację z pliku config.py
|
||
app.config.from_object("config.Config")
|
||
|
||
db = SQLAlchemy(app)
|
||
login_manager = LoginManager(app)
|
||
login_manager.login_view = "login"
|
||
|
||
# MODELE
|
||
|
||
|
||
class User(UserMixin, db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
username = db.Column(db.String(80), unique=True, nullable=False)
|
||
password_hash = db.Column(db.String(128), nullable=False)
|
||
is_admin = db.Column(db.Boolean, default=False) # Flaga głównego administratora
|
||
|
||
def set_password(self, password):
|
||
self.password_hash = generate_password_hash(password)
|
||
|
||
def check_password(self, password):
|
||
return check_password_hash(self.password_hash, password)
|
||
|
||
|
||
class Zbiorka(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
nazwa = db.Column(db.String(100), nullable=False)
|
||
opis = db.Column(db.Text, nullable=False)
|
||
numer_konta = db.Column(db.String(50), nullable=False)
|
||
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
|
||
cel = db.Column(db.Float, nullable=False, default=0.0)
|
||
stan = db.Column(db.Float, default=0.0)
|
||
ukryta = db.Column(db.Boolean, default=False)
|
||
ukryj_kwote = db.Column(db.Boolean, default=False)
|
||
zrealizowana = db.Column(db.Boolean, default=False)
|
||
|
||
wplaty = db.relationship(
|
||
"Wplata",
|
||
back_populates="zbiorka",
|
||
lazy=True,
|
||
order_by="Wplata.data.desc()",
|
||
cascade="all, delete-orphan",
|
||
passive_deletes=True,
|
||
)
|
||
|
||
|
||
class Wplata(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
zbiorka_id = db.Column(
|
||
db.Integer,
|
||
db.ForeignKey("zbiorka.id", ondelete="CASCADE"),
|
||
nullable=False,
|
||
)
|
||
kwota = db.Column(db.Float, nullable=False)
|
||
data = db.Column(db.DateTime, default=datetime.utcnow)
|
||
opis = db.Column(db.Text, nullable=True)
|
||
|
||
zbiorka = db.relationship("Zbiorka", back_populates="wplaty")
|
||
|
||
|
||
class GlobalSettings(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
numer_konta = db.Column(db.String(50), nullable=False)
|
||
numer_telefonu_blik = db.Column(db.String(50), nullable=False)
|
||
allowed_login_hosts = db.Column(db.Text, nullable=True)
|
||
logo_url = db.Column(db.String(255), nullable=True)
|
||
site_title = db.Column(db.String(120), nullable=True)
|
||
show_logo_in_navbar = db.Column(db.Boolean, default=False)
|
||
show_logo_in_navbar = db.Column(db.Boolean, default=False)
|
||
navbar_brand_mode = db.Column(db.String(10), default="text")
|
||
footer_brand_mode = db.Column(db.String(10), default="text")
|
||
footer_text = db.Column(db.String(200), nullable=True)
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
return User.query.get(int(user_id))
|
||
|
||
|
||
@event.listens_for(Engine, "connect")
|
||
def set_sqlite_pragma(dbapi_connection, connection_record):
|
||
try:
|
||
cursor = dbapi_connection.cursor()
|
||
cursor.execute("PRAGMA foreign_keys=ON")
|
||
cursor.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def get_real_ip():
|
||
headers = request.headers
|
||
cf_ip = headers.get("CF-Connecting-IP")
|
||
if cf_ip:
|
||
return cf_ip.split(",")[0].strip()
|
||
xff = headers.get("X-Forwarded-For")
|
||
if xff:
|
||
return xff.split(",")[0].strip()
|
||
x_real_ip = headers.get("X-Real-IP")
|
||
if x_real_ip:
|
||
return x_real_ip.strip()
|
||
return request.remote_addr
|
||
|
||
|
||
def is_allowed_ip(remote_ip, allowed_hosts_str):
|
||
if remote_ip in ("127.0.0.1", "::1"):
|
||
return True
|
||
|
||
if os.path.exists("emergency_access.txt"):
|
||
return True
|
||
|
||
allowed_hosts = re.split(r"[\n,]+", allowed_hosts_str.strip())
|
||
allowed_ips = set()
|
||
for host in allowed_hosts:
|
||
host = host.strip()
|
||
if not host:
|
||
continue
|
||
try:
|
||
resolved_ip = socket.gethostbyname(host)
|
||
allowed_ips.add(resolved_ip)
|
||
except Exception:
|
||
continue
|
||
|
||
try:
|
||
hostname = socket.gethostbyaddr(remote_ip)[0]
|
||
app.logger.info(f"Odwiedzający IP: {remote_ip}, host: {hostname}")
|
||
except Exception as e:
|
||
app.logger.warning(f"Reverse DNS nieudane dla {remote_ip}: {e}")
|
||
|
||
return remote_ip in allowed_ips
|
||
|
||
|
||
# Dodaj filtr Markdown – pozwala na zagnieżdżanie linków i obrazków w opisie
|
||
@app.template_filter("markdown")
|
||
def markdown_filter(text):
|
||
return Markup(md.markdown(text))
|
||
|
||
|
||
@app.context_processor
|
||
def inject_globals():
|
||
settings = GlobalSettings.query.first()
|
||
allowed_hosts_str = (
|
||
settings.allowed_login_hosts
|
||
if settings and settings.allowed_login_hosts
|
||
else ""
|
||
)
|
||
client_ip = get_real_ip()
|
||
return {
|
||
"is_ip_allowed": is_allowed_ip(client_ip, allowed_hosts_str),
|
||
"global_settings": settings,
|
||
}
|
||
|
||
|
||
# TRASY PUBLICZNE
|
||
@app.route("/")
|
||
def index():
|
||
zbiorki = Zbiorka.query.filter_by(ukryta=False, zrealizowana=False).all()
|
||
return render_template("index.html", zbiorki=zbiorki)
|
||
|
||
|
||
@app.route("/zbiorki_zrealizowane")
|
||
def zbiorki_zrealizowane():
|
||
zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
|
||
return render_template("index.html", zbiorki=zbiorki)
|
||
|
||
|
||
@app.errorhandler(404)
|
||
def page_not_found(e):
|
||
return redirect(url_for("index"))
|
||
|
||
|
||
@app.route("/zbiorka/<int:zbiorka_id>")
|
||
def zbiorka(zbiorka_id):
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
# Jeżeli zbiórka jest ukryta i użytkownik nie jest administratorem, zwróć 404
|
||
if zb.ukryta and (not current_user.is_authenticated or not current_user.is_admin):
|
||
abort(404)
|
||
return render_template("zbiorka.html", zbiorka=zb)
|
||
|
||
|
||
# TRASY LOGOWANIA I REJESTRACJI
|
||
|
||
|
||
@app.route("/zaloguj", methods=["GET", "POST"])
|
||
def zaloguj():
|
||
# Pobierz ustawienia globalne, w tym dozwolone hosty
|
||
settings = GlobalSettings.query.first()
|
||
allowed_hosts_str = ""
|
||
if settings and settings.allowed_login_hosts:
|
||
allowed_hosts_str = settings.allowed_login_hosts
|
||
|
||
# Sprawdzenie, czy adres IP klienta jest dozwolony
|
||
client_ip = get_real_ip()
|
||
if not is_allowed_ip(client_ip, allowed_hosts_str):
|
||
flash(
|
||
"Dostęp do endpointu /login jest zablokowany dla Twojego adresu IP",
|
||
"danger",
|
||
)
|
||
return redirect(url_for("index"))
|
||
|
||
if request.method == "POST":
|
||
username = request.form["username"]
|
||
password = request.form["password"]
|
||
user = User.query.filter_by(username=username).first()
|
||
if user and user.check_password(password):
|
||
login_user(user)
|
||
flash("Zalogowano pomyślnie", "success")
|
||
next_page = request.args.get("next")
|
||
return (
|
||
redirect(next_page)
|
||
if next_page
|
||
else redirect(url_for("admin_dashboard"))
|
||
)
|
||
else:
|
||
flash("Nieprawidłowe dane logowania", "danger")
|
||
return render_template("login.html")
|
||
|
||
|
||
@app.route("/wyloguj")
|
||
@login_required
|
||
def wyloguj():
|
||
logout_user()
|
||
flash("Wylogowano", "success")
|
||
return redirect(url_for("zaloguj"))
|
||
|
||
|
||
@app.route("/zarejestruj", methods=["GET", "POST"])
|
||
def zarejestruj():
|
||
if not app.config.get("ALLOW_REGISTRATION", False):
|
||
flash("Rejestracja została wyłączona przez administratora", "danger")
|
||
return redirect(url_for("zaloguj"))
|
||
if request.method == "POST":
|
||
username = request.form["username"]
|
||
password = request.form["password"]
|
||
if User.query.filter_by(username=username).first():
|
||
flash("Użytkownik już istnieje", "danger")
|
||
return redirect(url_for("register"))
|
||
new_user = User(username=username)
|
||
new_user.set_password(password)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
flash("Konto utworzone, możesz się zalogować", "success")
|
||
return redirect(url_for("zaloguj"))
|
||
return render_template("register.html")
|
||
|
||
|
||
# PANEL ADMINISTRACYJNY
|
||
|
||
|
||
@app.route("/admin")
|
||
@login_required
|
||
def admin_dashboard():
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień do panelu administracyjnego", "danger")
|
||
return redirect(url_for("index"))
|
||
active_zbiorki = Zbiorka.query.filter_by(zrealizowana=False).all()
|
||
completed_zbiorki = Zbiorka.query.filter_by(zrealizowana=True).all()
|
||
return render_template(
|
||
"admin/dashboard.html",
|
||
active_zbiorki=active_zbiorki,
|
||
completed_zbiorki=completed_zbiorki,
|
||
)
|
||
|
||
|
||
@app.route("/admin/zbiorka/dodaj", methods=["GET", "POST"])
|
||
@login_required
|
||
def dodaj_zbiorke():
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
|
||
global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia
|
||
|
||
if request.method == "POST":
|
||
nazwa = request.form["nazwa"]
|
||
opis = request.form["opis"]
|
||
# Pozyskujemy numer konta i telefon z formularza (mogą być nadpisane ręcznie)
|
||
numer_konta = request.form["numer_konta"]
|
||
numer_telefonu_blik = request.form["numer_telefonu_blik"]
|
||
cel = float(request.form["cel"])
|
||
ukryj_kwote = "ukryj_kwote" in request.form
|
||
|
||
nowa_zbiorka = Zbiorka(
|
||
nazwa=nazwa,
|
||
opis=opis,
|
||
numer_konta=numer_konta,
|
||
numer_telefonu_blik=numer_telefonu_blik,
|
||
cel=cel,
|
||
ukryj_kwote=ukryj_kwote,
|
||
)
|
||
db.session.add(nowa_zbiorka)
|
||
db.session.commit()
|
||
flash("Zbiórka została dodana", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
|
||
return render_template("admin/dodaj_zbiorke.html", global_settings=global_settings)
|
||
|
||
|
||
@app.route("/admin/zbiorka/edytuj/<int:zbiorka_id>", methods=["GET", "POST"])
|
||
@login_required
|
||
def edytuj_zbiorka(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
global_settings = GlobalSettings.query.first() # Pobieramy globalne ustawienia
|
||
if request.method == "POST":
|
||
zb.nazwa = request.form["nazwa"]
|
||
zb.opis = request.form["opis"]
|
||
zb.numer_konta = request.form["numer_konta"]
|
||
zb.numer_telefonu_blik = request.form["numer_telefonu_blik"]
|
||
try:
|
||
zb.cel = float(request.form["cel"])
|
||
except ValueError:
|
||
flash("Podano nieprawidłową wartość dla celu zbiórki", "danger")
|
||
return render_template(
|
||
"admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings
|
||
)
|
||
zb.ukryj_kwote = "ukryj_kwote" in request.form
|
||
db.session.commit()
|
||
flash("Zbiórka została zaktualizowana", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
return render_template(
|
||
"admin/edytuj_zbiorke.html", zbiorka=zb, global_settings=global_settings
|
||
)
|
||
|
||
|
||
# TRASA DODAWANIA WPŁATY Z OPISEM
|
||
# TRASA DODAWANIA WPŁATY W PANELU ADMINA
|
||
@app.route("/admin/zbiorka/<int:zbiorka_id>/wplata/dodaj", methods=["GET", "POST"])
|
||
@login_required
|
||
def dodaj_wplate(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
if request.method == "POST":
|
||
kwota = float(request.form["kwota"])
|
||
opis = request.form.get("opis", "")
|
||
nowa_wplata = Wplata(zbiorka_id=zb.id, kwota=kwota, opis=opis)
|
||
zb.stan += kwota # Aktualizacja stanu zbiórki
|
||
db.session.add(nowa_wplata)
|
||
db.session.commit()
|
||
flash("Wpłata została dodana", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
return render_template("admin/dodaj_wplate.html", zbiorka=zb)
|
||
|
||
|
||
@app.route("/admin/zbiorka/usun/<int:zbiorka_id>", methods=["POST"])
|
||
@login_required
|
||
def usun_zbiorka(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
db.session.delete(zb)
|
||
db.session.commit()
|
||
flash("Zbiórka została usunięta", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
|
||
|
||
@app.route("/admin/zbiorka/edytuj_stan/<int:zbiorka_id>", methods=["GET", "POST"])
|
||
@login_required
|
||
def edytuj_stan(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
if request.method == "POST":
|
||
try:
|
||
nowy_stan = float(request.form["stan"])
|
||
except ValueError:
|
||
flash("Nieprawidłowa wartość kwoty", "danger")
|
||
return redirect(url_for("edytuj_stan", zbiorka_id=zbiorka_id))
|
||
zb.stan = nowy_stan
|
||
db.session.commit()
|
||
flash("Stan zbiórki został zaktualizowany", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
return render_template("admin/edytuj_stan.html", zbiorka=zb)
|
||
|
||
|
||
@app.route("/admin/zbiorka/zmien_widzialnosc/<int:zbiorka_id>", methods=["POST"])
|
||
@login_required
|
||
def zmien_widzialnosc(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
zb.ukryta = not zb.ukryta
|
||
db.session.commit()
|
||
flash("Zbiórka została " + ("ukryta" if zb.ukryta else "przywrócona"), "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
|
||
|
||
def create_admin_account():
|
||
admin = User.query.filter_by(is_admin=True).first()
|
||
if not admin:
|
||
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
|
||
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
|
||
db.session.add(main_admin)
|
||
db.session.commit()
|
||
|
||
|
||
@app.after_request
|
||
def apply_headers(response):
|
||
# Nagłówki niestandardowe
|
||
custom_headers = app.config.get("ADD_HEADERS", {})
|
||
if isinstance(custom_headers, dict):
|
||
for header, value in custom_headers.items():
|
||
response.headers[header] = str(value)
|
||
|
||
if request.path.startswith("/static/"):
|
||
response.headers.pop("Content-Disposition", None)
|
||
response.headers["Vary"] = "Accept-Encoding"
|
||
response.headers["Cache-Control"] = app.config.get("CACHE_CONTROL_HEADER_STATIC")
|
||
if app.config.get("USE_ETAGS", True) and "ETag" not in response.headers:
|
||
response.add_etag()
|
||
response.make_conditional(request)
|
||
return response
|
||
|
||
# Wykluczenia
|
||
if response.status_code in (301, 302, 303, 307, 308):
|
||
response.headers.pop("Vary", None)
|
||
return response
|
||
|
||
if 400 <= response.status_code < 500:
|
||
response.headers["Cache-Control"] = "no-store"
|
||
response.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
response.headers.pop("Vary", None)
|
||
elif 500 <= response.status_code < 600:
|
||
response.headers["Cache-Control"] = "no-store"
|
||
response.headers["Content-Type"] = "text/html; charset=utf-8"
|
||
response.headers["Retry-After"] = "120"
|
||
response.headers.pop("Vary", None)
|
||
elif request.path.startswith("/admin"):
|
||
response.headers.pop("Vary", None)
|
||
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
||
else:
|
||
response.headers["Vary"] = "Cookie, Accept-Encoding"
|
||
default_cache = app.config.get("CACHE_CONTROL_HEADER") or "private, max-age=0"
|
||
response.headers["Cache-Control"] = default_cache
|
||
|
||
# Blokowanie botów (ale NIE dla /static/)
|
||
if app.config.get("BLOCK_BOTS", False) and not request.path.startswith("/static/"):
|
||
cc_override = app.config.get("CACHE_CONTROL_HEADER")
|
||
if cc_override:
|
||
response.headers["Cache-Control"] = cc_override
|
||
response.headers["X-Robots-Tag"] = (
|
||
app.config.get("ROBOTS_TAG") or "noindex, nofollow, nosnippet, noarchive"
|
||
)
|
||
|
||
return response
|
||
|
||
|
||
@app.route("/admin/ustawienia", methods=["GET", "POST"])
|
||
@login_required
|
||
def admin_ustawienia():
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień do panelu administracyjnego", "danger")
|
||
return redirect(url_for("index"))
|
||
|
||
client_ip = get_real_ip()
|
||
settings = GlobalSettings.query.first()
|
||
if request.method == "POST":
|
||
numer_konta = request.form.get("numer_konta")
|
||
numer_telefonu_blik = request.form.get("numer_telefonu_blik")
|
||
allowed_login_hosts = request.form.get("allowed_login_hosts")
|
||
logo_url = request.form.get("logo_url")
|
||
site_title = request.form.get("site_title")
|
||
navbar_brand_mode = request.form.get("navbar_brand_mode", "text")
|
||
footer_brand_mode = request.form.get("footer_brand_mode", "text")
|
||
footer_text = request.form.get("footer_text") or None
|
||
show_logo_in_navbar = (navbar_brand_mode == "logo")
|
||
|
||
if settings is None:
|
||
settings = GlobalSettings(
|
||
numer_konta=numer_konta,
|
||
numer_telefonu_blik=numer_telefonu_blik,
|
||
allowed_login_hosts=allowed_login_hosts,
|
||
logo_url=logo_url,
|
||
site_title=site_title,
|
||
show_logo_in_navbar=show_logo_in_navbar,
|
||
navbar_brand_mode=navbar_brand_mode,
|
||
footer_brand_mode=footer_brand_mode,
|
||
footer_text=footer_text,
|
||
)
|
||
db.session.add(settings)
|
||
else:
|
||
settings.numer_konta = numer_konta
|
||
settings.numer_telefonu_blik = numer_telefonu_blik
|
||
settings.allowed_login_hosts = allowed_login_hosts
|
||
settings.logo_url = logo_url
|
||
settings.site_title = site_title
|
||
settings.show_logo_in_navbar = show_logo_in_navbar
|
||
settings.navbar_brand_mode = navbar_brand_mode
|
||
settings.footer_brand_mode = footer_brand_mode
|
||
settings.footer_text = footer_text
|
||
|
||
db.session.commit()
|
||
flash("Ustawienia globalne zostały zaktualizowane", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
|
||
return render_template(
|
||
"admin/ustawienia.html", settings=settings, client_ip=client_ip
|
||
)
|
||
|
||
|
||
@app.route("/admin/zbiorka/oznacz/<int:zbiorka_id>", methods=["POST"])
|
||
@login_required
|
||
def oznacz_zbiorka(zbiorka_id):
|
||
if not current_user.is_admin:
|
||
flash("Brak uprawnień do wykonania tej operacji", "danger")
|
||
return redirect(url_for("index"))
|
||
zb = Zbiorka.query.get_or_404(zbiorka_id)
|
||
zb.zrealizowana = True
|
||
db.session.commit()
|
||
flash("Zbiórka została oznaczona jako zrealizowana", "success")
|
||
return redirect(url_for("admin_dashboard"))
|
||
|
||
|
||
@app.route("/robots.txt")
|
||
def robots():
|
||
if app.config.get("BLOCK_BOTS", False):
|
||
robots_txt = "User-agent: *\nDisallow: /"
|
||
else:
|
||
robots_txt = "User-agent: *\nAllow: /"
|
||
return robots_txt, 200, {"Content-Type": "text/plain"}
|
||
|
||
|
||
@app.route("/favicon.ico")
|
||
def favicon():
|
||
return "", 204
|
||
|
||
|
||
if __name__ == "__main__":
|
||
with app.app_context():
|
||
db.create_all()
|
||
# Tworzenie konta głównego admina, jeśli nie istnieje
|
||
if not User.query.filter_by(is_admin=True).first():
|
||
main_admin = User(username=app.config["MAIN_ADMIN_USERNAME"], is_admin=True)
|
||
main_admin.set_password(app.config["MAIN_ADMIN_PASSWORD"])
|
||
db.session.add(main_admin)
|
||
db.session.commit()
|
||
app.run(debug=True)
|