commit1 permissions

This commit is contained in:
Mateusz Gruszczyński
2025-09-13 18:14:23 +02:00
parent 5674b4acbf
commit bf1c2e2a29
8 changed files with 557 additions and 81 deletions

314
app.py
View File

@@ -216,7 +216,7 @@ class ShoppingList(db.Model):
expires_at = db.Column(db.DateTime(timezone=True), nullable=True)
owner = db.relationship("User", backref="lists", lazy=True)
is_archived = db.Column(db.Boolean, default=False)
is_public = db.Column(db.Boolean, default=True)
is_public = db.Column(db.Boolean, default=False)
# Relacje
items = db.relationship("Item", back_populates="shopping_list", lazy="select")
@@ -290,6 +290,30 @@ class Receipt(db.Model):
uploaded_by_user = db.relationship("User", backref="uploaded_receipts")
class ListPermission(db.Model):
__tablename__ = "list_permission"
id = db.Column(db.Integer, primary_key=True)
list_id = db.Column(
db.Integer,
db.ForeignKey("shopping_list.id", ondelete="CASCADE"),
nullable=False,
)
user_id = db.Column(
db.Integer,
db.ForeignKey("user.id", ondelete="CASCADE"),
nullable=False,
)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
__table_args__ = (db.UniqueConstraint("list_id", "user_id", name="uq_list_user"),)
ShoppingList.permitted_users = db.relationship(
"User",
secondary="list_permission",
backref=db.backref("permitted_lists", lazy="dynamic"),
lazy="dynamic",
)
def hash_password(password):
pepper = app.config["BCRYPT_PEPPER"]
peppered = (password + pepper).encode("utf-8")
@@ -538,6 +562,10 @@ def redirect_with_flash(
flash(message, category)
return redirect(url_for(endpoint))
def user_permission_subq(user_id):
return db.session.query(ListPermission.list_id).filter(
ListPermission.user_id == user_id
)
def admin_required(f):
@wraps(f)
@@ -1392,21 +1420,27 @@ def favicon():
@app.route("/")
def main_page():
from sqlalchemy import func, or_, case # upewnij się, że masz te importy na górze pliku
perm_subq = (
user_permission_subq(current_user.id)
if current_user.is_authenticated
else None
)
now = datetime.now(timezone.utc)
month_param = request.args.get("m", None)
start = end = None
if month_param in (None, ""):
# brak wyboru -> domyślnie aktualny miesiąc
# domyślnie: bieżący miesiąc
month_str = now.strftime("%Y-%m")
start = datetime(now.year, now.month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
elif month_param == "all":
month_str = "all"
start = end = None
else:
month_str = month_param
try:
@@ -1414,16 +1448,10 @@ def main_page():
start = datetime(year, month, 1, tzinfo=timezone.utc)
end = (start + timedelta(days=31)).replace(day=1)
except ValueError:
# jeśli m ma zły format pokaż wszystko
month_str = "all"
start = end = None
# dalej normalnie używasz date_filter:
def date_filter(query):
if start and end:
query = query.filter(
ShoppingList.created_at >= start, ShoppingList.created_at < end
)
return query
def date_filter(query):
if start and end:
query = query.filter(
@@ -1434,10 +1462,10 @@ def main_page():
if current_user.is_authenticated:
user_lists = (
date_filter(
ShoppingList.query.filter_by(
owner_id=current_user.id, is_archived=False
).filter(
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now)
ShoppingList.query.filter(
ShoppingList.owner_id == current_user.id,
ShoppingList.is_archived == False,
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now),
)
)
.order_by(ShoppingList.created_at.desc())
@@ -1450,21 +1478,23 @@ def main_page():
.all()
)
# publiczne cudze + udzielone mi (po list_permission)
public_lists = (
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
ShoppingList.owner_id != current_user.id,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
ShoppingList.is_archived == False,
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now),
or_(
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
),
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
accessible_lists = public_lists # alias do szablonu: publiczne + udostępnione
else:
user_lists = []
archived_lists = []
@@ -1472,31 +1502,31 @@ def main_page():
date_filter(
ShoppingList.query.filter(
ShoppingList.is_public == True,
(
(ShoppingList.expires_at == None)
| (ShoppingList.expires_at > now)
),
(ShoppingList.expires_at == None) | (ShoppingList.expires_at > now),
ShoppingList.is_archived == False,
)
)
.order_by(ShoppingList.created_at.desc())
.all()
)
accessible_lists = public_lists # dla gościa = tylko publiczne
# Definiujemy widoczny zakres list dla tego użytkownika
# Zakres miesięcy do selektora
if current_user.is_authenticated:
visible_lists_query = ShoppingList.query.filter(
or_(
ShoppingList.owner_id == current_user.id, ShoppingList.is_public == True
ShoppingList.owner_id == current_user.id,
ShoppingList.is_public == True,
ShoppingList.id.in_(perm_subq),
)
)
else:
visible_lists_query = ShoppingList.query.filter(ShoppingList.is_public == True)
# Teraz możemy bezpiecznie pobrać miesiące
month_options = get_active_months_query(visible_lists_query)
all_lists = user_lists + public_lists + archived_lists
# Statystyki dla wszystkich widocznych sekcji
all_lists = user_lists + accessible_lists + archived_lists
all_ids = [l.id for l in all_lists]
if all_ids:
@@ -1504,18 +1534,13 @@ def main_page():
db.session.query(
Item.list_id,
func.count(Item.id).label("total_count"),
func.sum(case((Item.purchased == True, 1), else_=0)).label(
"purchased_count"
),
func.sum(case((Item.not_purchased == True, 1), else_=0)).label(
"not_purchased_count"
),
func.sum(case((Item.purchased == True, 1), else_=0)).label("purchased_count"),
func.sum(case((Item.not_purchased == True, 1), else_=0)).label("not_purchased_count"),
)
.filter(Item.list_id.in_(all_ids))
.group_by(Item.list_id)
.all()
)
stats_map = {
s.list_id: (
s.total_count or 0,
@@ -1535,18 +1560,12 @@ def main_page():
)
for l in all_lists:
total_count, purchased_count, not_purchased_count = stats_map.get(
l.id, (0, 0, 0)
)
total_count, purchased_count, not_purchased_count = stats_map.get(l.id, (0, 0, 0))
l.total_count = total_count
l.purchased_count = purchased_count
l.not_purchased_count = not_purchased_count
l.total_expense = latest_expenses_map.get(l.id, 0)
l.category_badges = [
{"name": c.name, "color": category_to_color(c.name)}
for c in l.categories
]
l.category_badges = [{"name": c.name, "color": category_to_color(c.name)} for c in l.categories]
else:
for l in all_lists:
l.total_count = 0
@@ -1559,6 +1578,7 @@ def main_page():
"main.html",
user_lists=user_lists,
public_lists=public_lists,
accessible_lists=accessible_lists,
archived_lists=archived_lists,
now=now,
timedelta=timedelta,
@@ -1627,6 +1647,48 @@ def edit_my_list(list_id):
next_page = request.args.get("next") or request.referrer
if request.method == "POST":
grant_username = (request.form.get("grant_username") or "").strip().lower()
revoke_user_id = request.form.get("revoke_user_id")
# ——— SZYBKIE AKCJE UPRAWNIEŃ ———
if grant_username:
u = User.query.filter(func.lower(User.username) == grant_username).first()
if not u:
flash("Użytkownik nie istnieje.", "danger")
return redirect(request.url)
if u.id == current_user.id:
flash("Jesteś właścicielem tej listy.", "info")
return redirect(request.url)
exists = (
db.session.query(ListPermission.id)
.filter(
ListPermission.list_id == l.id,
ListPermission.user_id == u.id,
)
.first()
)
if not exists:
db.session.add(ListPermission(list_id=l.id, user_id=u.id))
db.session.commit()
flash(f"Nadano dostęp użytkownikowi „{u.username}”.", "success")
else:
flash("Ten użytkownik już ma dostęp.", "info")
return redirect(request.url)
if revoke_user_id:
try:
uid = int(revoke_user_id)
except ValueError:
flash("Błędny identyfikator użytkownika.", "danger")
return redirect(request.url)
ListPermission.query.filter_by(list_id=l.id, user_id=uid).delete()
db.session.commit()
flash("Odebrano dostęp użytkownikowi.", "success")
return redirect(request.url)
# ——— KONIEC AKCJI UPRAWNIEŃ ———
if "unarchive" in request.form:
l.is_archived = False
db.session.commit()
@@ -1650,7 +1712,7 @@ def edit_my_list(list_id):
flash("Nieprawidłowy format miesiąca", "danger")
return redirect(next_page or url_for("main_page"))
new_title = request.form.get("title", "").strip()
new_title = (request.form.get("title") or "").strip()
is_public = "is_public" in request.form
is_temporary = "is_temporary" in request.form
is_archived = "is_archived" in request.form
@@ -1682,12 +1744,22 @@ def edit_my_list(list_id):
flash("Zaktualizowano dane listy", "success")
return redirect(next_page or url_for("main_page"))
# Użytkownicy z dostępem (do wyświetlenia w szablonie)
permitted_users = (
db.session.query(User)
.join(ListPermission, ListPermission.user_id == User.id)
.where(ListPermission.list_id == l.id)
.order_by(User.username.asc())
.all()
)
return render_template(
"edit_my_list.html",
list=l,
receipts=receipts,
categories=categories,
selected_categories=selected_categories_ids,
permitted_users=permitted_users,
)
@@ -1996,14 +2068,38 @@ def expenses_data():
@app.route("/share/<token>")
@app.route("/guest-list/<int:list_id>")
def shared_list(token=None, list_id=None):
now = datetime.now(timezone.utc)
if token:
shopping_list = ShoppingList.query.filter_by(share_token=token).first_or_404()
if not check_list_public(shopping_list):
# jeśli lista wygasła zablokuj (spójne z resztą aplikacji)
if shopping_list.is_temporary and shopping_list.expires_at and shopping_list.expires_at <= now:
flash("Link wygasł.", "warning")
return redirect(url_for("main_page"))
# >>> KLUCZOWE: pozwól wejść nawet, gdy niepubliczna (bez check_list_public)
list_id = shopping_list.id
# >>> Jeśli zalogowany i nie jest właścicielem — auto-przypisz stałe uprawnienie
if current_user.is_authenticated and current_user.id != shopping_list.owner_id:
# dodaj wpis tylko jeśli go nie ma
exists = (
db.session.query(ListPermission.id)
.filter(
ListPermission.list_id == shopping_list.id,
ListPermission.user_id == current_user.id,
)
.first()
)
if not exists:
db.session.add(
ListPermission(list_id=shopping_list.id, user_id=current_user.id)
)
db.session.commit()
else:
shopping_list = ShoppingList.query.get_or_404(list_id)
total_expense = get_total_expense_for_list(list_id)
shopping_list, items, receipts, expenses, total_expense = get_list_details(list_id)
@@ -2031,6 +2127,7 @@ def shared_list(token=None, list_id=None):
)
@app.route("/copy/<int:list_id>")
@login_required
def copy_list(list_id):
@@ -2852,6 +2949,13 @@ def edit_list(list_id):
joinedload(ShoppingList.categories),
],
)
permitted_users = (
db.session.query(User)
.join(ListPermission, ListPermission.user_id == User.id)
.filter(ListPermission.list_id == shopping_list.id)
.order_by(User.username.asc())
.all()
)
if shopping_list is None:
abort(404)
@@ -3029,6 +3133,7 @@ def edit_list(list_id):
receipts=receipts,
categories=categories,
selected_categories=selected_categories_ids,
permitted_users=permitted_users,
)
@@ -3309,6 +3414,123 @@ def add_suggestion():
return redirect(url_for("list_products"))
# ── Admin: zarządzanie dostępem do list ───────────────────────────────────────
@app.route("/admin/lists-access", methods=["GET", "POST"])
@login_required
def admin_lists_access():
# Prosta autoryzacja admina dostosuj do swojej aplikacji
if not getattr(current_user, "is_admin", False):
abort(403)
# Paginacja
try:
page = int(request.args.get("page", 1))
except ValueError:
page = 1
try:
per_page = int(request.args.get("per_page", 25))
except ValueError:
per_page = 25
per_page = max(1, min(100, per_page))
# Filtrowanie bazowe (bez archiwalnych? tutaj pokazujemy wszystkie)
q = (
ShoppingList.query
.options(db.joinedload(ShoppingList.owner))
.order_by(ShoppingList.created_at.desc())
)
# POST: grant/revoke per-wiersz + zbiorcza zmiana statusów
if request.method == "POST":
action = request.form.get("action")
target_list_id = request.form.get("target_list_id", type=int)
# Grant pojedynczy
if action == "grant" and target_list_id:
login = (request.form.get("grant_username") or "").strip().lower()
l = db.session.get(ShoppingList, target_list_id)
if not l:
flash("Lista nie istnieje.", "danger")
return redirect(request.url)
u = User.query.filter(func.lower(User.username) == login).first()
if not u:
flash("Użytkownik nie istnieje.", "danger")
return redirect(request.url)
exists = (
db.session.query(ListPermission.id)
.filter(ListPermission.list_id == l.id, ListPermission.user_id == u.id)
.first()
)
if not exists:
db.session.add(ListPermission(list_id=l.id, user_id=u.id))
db.session.commit()
flash(f"Nadano dostęp „{u.username}” do listy #{l.id}.", "success")
else:
flash("Ten użytkownik już ma dostęp.", "info")
return redirect(request.url)
# Revoke pojedynczy
if action == "revoke" and target_list_id:
uid = request.form.get("revoke_user_id", type=int)
if uid:
ListPermission.query.filter_by(list_id=target_list_id, user_id=uid).delete()
db.session.commit()
flash("Odebrano dostęp użytkownikowi.", "success")
return redirect(request.url)
# Zbiorcze zapisy statusów (checkboxy wierszy)
if action == "save_changes":
# Zaktualizuj pola is_public / is_temporary / is_archived na podstawie POST
# Wysyłamy identyfikatory wszystkich list widocznych na stronie w ukrytym polu multiple
ids = request.form.getlist("visible_ids", type=int)
if ids:
lists = ShoppingList.query.filter(ShoppingList.id.in_(ids)).all()
posted = request.form
for l in lists:
l.is_public = (posted.get(f"is_public_{l.id}") is not None)
l.is_temporary = (posted.get(f"is_temporary_{l.id}") is not None)
l.is_archived = (posted.get(f"is_archived_{l.id}") is not None)
db.session.commit()
flash("Zapisano zmiany statusów.", "success")
return redirect(request.url)
# Dane do tabeli
pagination = q.paginate(page=page, per_page=per_page, error_out=False)
lists = pagination.items
# Zbierz uprawnionych per lista (1 zapytanie)
list_ids = [l.id for l in lists]
perms = (
db.session.query(
ListPermission.list_id,
User.id.label("uid"),
User.username.label("uname"),
)
.join(User, User.id == ListPermission.user_id)
.filter(ListPermission.list_id.in_(list_ids))
.order_by(User.username.asc())
.all()
)
permitted_by_list = {lid: [] for lid in list_ids}
for lid, uid, uname in perms:
permitted_by_list[lid].append({"id": uid, "username": uname})
# Query-string do paginacji
query_string = f"per_page={per_page}"
return render_template(
"admin/admin_lists_access.html",
lists=lists,
permitted_by_list=permitted_by_list,
page=page,
per_page=per_page,
total_pages=pagination.pages or 1,
query_string=query_string,
)
@app.route("/healthcheck")
def healthcheck():
header_token = request.headers.get("X-Internal-Check")