praca za proxy

This commit is contained in:
Mateusz Gruszczyński
2025-10-06 09:32:14 +02:00
parent feba31ce6f
commit 06b23dcd96

View File

@@ -10,6 +10,13 @@ import re
router = APIRouter() router = APIRouter()
security = HTTPBasic() security = HTTPBasic()
VENDOR_SINGLE_IP_HEADERS = [
"cf-connecting-ip", # Cloudflare
"true-client-ip", # Akamai/F5
"x-cluster-client-ip", # niektóre load balancery
"x-real-ip", # klasyk (nginx/traefik)
]
def _check_admin(creds: HTTPBasicCredentials): def _check_admin(creds: HTTPBasicCredentials):
user = settings.admin_user user = settings.admin_user
pwd = settings.admin_pass pwd = settings.admin_pass
@@ -57,32 +64,48 @@ def _extract_from_forwarded(header_value: str) -> list[str]:
return ips return ips
def get_client_ip(request: Request) -> str: def get_client_ip(request: Request) -> str:
# 1) X-Forwarded-For """
Zwraca IP klienta biorąc pod uwagę:
- CF-Connecting-IP / True-Client-IP / X-Cluster-Client-Ip / X-Real-IP
- X-Forwarded-For (RFC7239 semantyka: client, proxy1, proxy2)
- Forwarded: for=...
Logika XFF:
- weź listę IP
- zdejmuj od PRAWEJ strony te, które są zaufanymi proxy
- zwróć ostatni pozostały (jeśli nic nie zostało, zwróć lewy skrajny)
"""
# 0) Vendorowe nagłówki z pojedynczym IP (preferowane, jeśli są i nie są zaufane)
for h in VENDOR_SINGLE_IP_HEADERS:
v = request.headers.get(h)
if v:
ip = _normalize_ip_str(v)
if ip:
if not settings.trusted_proxies or not _is_ip_trusted(ip):
return ip
# jeśli vendor wskazuje zaufane proxy, idź dalej
# 1) X-Forwarded-For (client, proxy1, proxy2...)
xff = request.headers.get("x-forwarded-for") xff = request.headers.get("x-forwarded-for")
if xff: if xff:
# XFF: client, proxy1, proxy2
raw_ips = [p.strip() for p in xff.split(",") if p.strip()] raw_ips = [p.strip() for p in xff.split(",") if p.strip()]
# Normalizuj i usuń porty
norm_ips = [] norm_ips = []
for raw in raw_ips: for raw in raw_ips:
v = _normalize_ip_str(raw) v = _normalize_ip_str(raw)
if v: if v:
norm_ips.append(v) norm_ips.append(v)
# jeśli mamy zaufane proxy -> zwracamy pierwsze IP, które NIE jest zaufane
if settings.trusted_proxies: if norm_ips:
for ip in norm_ips: if settings.trusted_prox ies:
try: # zdejmuj od PRAWEJ strony zaufane hopy
# ignoruj jeżeli to zaufane proxy tmp = norm_ips[:]
if not _is_ip_trusted(ip): while tmp and _is_ip_trusted(tmp[-1]):
return ip tmp.pop()
except Exception: if tmp:
continue return tmp[-1] # ostatni niezaufany = klient
# jeśli wszystkie były zaufane, zwróć pierwsze (najbardziej "client" lub lewo) # w skrajnym przypadku wszystkie zaufane zwróć najbardziej „kliencki” (lewy)
if norm_ips:
return norm_ips[0] return norm_ips[0]
else: else:
# bez zaufanych proxy: przyjmujemy lewą (first) jako klienta # bez listy zaufanych proxy bierzemy lewy skrajny
if norm_ips:
return norm_ips[0] return norm_ips[0]
# 2) Forwarded (RFC7239) # 2) Forwarded (RFC7239)
@@ -92,24 +115,15 @@ def get_client_ip(request: Request) -> str:
norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)] norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)]
if norm_ips: if norm_ips:
if settings.trusted_proxies: if settings.trusted_proxies:
for ip in norm_ips: tmp = norm_ips[:]
if not _is_ip_trusted(ip): while tmp and _is_ip_trusted(tmp[-1]):
return ip tmp.pop()
if tmp:
return tmp[-1]
return norm_ips[0] return norm_ips[0]
return norm_ips[0] return norm_ips[0]
# 3) X-Real-IP # 3) Fallback: request.client.host
xri = request.headers.get("x-real-ip")
if xri:
v = _normalize_ip_str(xri)
if v:
if settings.trusted_proxies and _is_ip_trusted(v):
# jeśli header wskazuje zaufane proxy - nie używamy go jako klienta
pass
else:
return v
# Fallback: request.client.host (np. bez reverse-proxy lub jeśli nic innego)
try: try:
host = request.client.host host = request.client.host
if host: if host: