Files
ip-geo-api/app/api.py
Mateusz Gruszczyński 07a190d067 praca za proxy
2025-10-06 10:03:25 +02:00

174 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Request, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from .deps import get_geo
from .config import settings
from .geo import reload_provider
import secrets
import ipaddress
import re
router = APIRouter()
security = HTTPBasic()
VENDOR_SINGLE_IP_HEADERS = [
"cf-connecting-ip", # Cloudflare
"true-client-ip", # Akamai/F5
"x-cluster-client-ip", # niektóre load balancery
"x-real-ip", # klasyk (nginx/traefik)
]
def _check_admin(creds: HTTPBasicCredentials):
user = settings.admin_user
pwd = settings.admin_pass
if not user or not pwd:
raise HTTPException(status_code=403, detail='admin credentials not configured')
# constant-time compare
if not (secrets.compare_digest(creds.username, user) and secrets.compare_digest(creds.password, pwd)):
raise HTTPException(status_code=401, detail='invalid credentials', headers={"WWW-Authenticate":"Basic"})
return True
def _normalize_ip_str(ip_raw: str) -> str | None:
"""Usuń port, whitespace i ewentualne cudzysłowy"""
if not ip_raw:
return None
ip_raw = ip_raw.strip().strip('"').strip("'")
# usuń port, np. 1.2.3.4:5678
if ':' in ip_raw and ip_raw.count(':') == 1:
# prawdopodobnie IPv4:port
ip_raw = ip_raw.split(':')[0]
# Pozostaw kwestie IPv6 z %zone
return ip_raw
def _is_ip_trusted(ip_str: str) -> bool:
try:
ip = ipaddress.ip_address(ip_str.split('%')[0])
except Exception:
return False
for net in settings.trusted_proxies:
try:
if ip in net:
return True
except Exception:
continue
return False
def _extract_from_forwarded(header_value: str) -> list[str]:
# Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=...
ips = []
parts = re.split(r',\s*(?=[fF]or=)', header_value)
for part in parts:
m = re.search(r'for=(?P<val>"[^"]+"|[^;,\s]+)', part)
if m:
val = m.group('val').strip('"').strip("'")
ips.append(val)
return ips
def get_client_ip(request: Request) -> str:
"""
Zwraca IP klienta biorąc pod uwagę:
- CF-Connecting-IP / True-Client-IP / X-Cluster-Client-Ip / X-Real-IP
- X-Forwarded-For (RFC7239 semantyka: client, proxy1, proxy2)
- Forwarded: for=...
Logika XFF:
- weź listę IP
- zdejmuj od PRAWEJ strony te, które są zaufanymi proxy
- zwróć ostatni pozostały (jeśli nic nie zostało, zwróć lewy skrajny)
"""
# 0) Vendorowe nagłówki z pojedynczym IP (preferowane, jeśli są i nie są zaufane)
for h in VENDOR_SINGLE_IP_HEADERS:
v = request.headers.get(h)
if v:
ip = _normalize_ip_str(v)
if ip:
if not settings.trusted_proxies or not _is_ip_trusted(ip):
return ip
# jeśli vendor wskazuje zaufane proxy, idź dalej
# 1) X-Forwarded-For (client, proxy1, proxy2...)
xff = request.headers.get("x-forwarded-for")
if xff:
raw_ips = [p.strip() for p in xff.split(",") if p.strip()]
norm_ips = []
for raw in raw_ips:
v = _normalize_ip_str(raw)
if v:
norm_ips.append(v)
if norm_ips:
if settings.trusted_proxies:
# zdejmuj od PRAWEJ strony zaufane hopy
tmp = norm_ips[:]
while tmp and _is_ip_trusted(tmp[-1]):
tmp.pop()
if tmp:
return tmp[-1] # ostatni niezaufany = klient
# w skrajnym przypadku wszystkie są zaufane zwróć najbardziej „kliencki” (lewy)
return norm_ips[0]
else:
# bez listy zaufanych proxy bierzemy lewy skrajny
return norm_ips[0]
# 2) Forwarded (RFC7239)
fwd = request.headers.get("forwarded")
if fwd:
fwd_ips = _extract_from_forwarded(fwd)
norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)]
if norm_ips:
if settings.trusted_proxies:
tmp = norm_ips[:]
while tmp and _is_ip_trusted(tmp[-1]):
tmp.pop()
if tmp:
return tmp[-1]
return norm_ips[0]
return norm_ips[0]
# 3) Fallback: request.client.host
try:
host = request.client.host
if host:
return host.split('%')[0] if '%' in host else host
except Exception:
pass
return "0.0.0.0"
@router.get('/ip')
async def my_ip(request: Request, geo=Depends(get_geo)):
ip = get_client_ip(request)
# handle IPv6 mapped IPv4 like ::ffff:1.2.3.4
try:
ip = ip.split('%')[0]
except Exception:
pass
return geo.lookup(ip)
@router.get('/ip/{ip_address}')
async def ip_lookup(ip_address: str, geo=Depends(get_geo)):
# validate IP
try:
# allow zone index for IPv6 and strip it for validation
if '%' in ip_address:
addr = ip_address.split('%')[0]
else:
addr = ip_address
ipaddress.ip_address(addr)
except Exception:
raise HTTPException(status_code=400, detail='invalid IP address')
return geo.lookup(ip_address)
@router.post('/reload')
async def reload(creds: HTTPBasicCredentials = Depends(security)):
_check_admin(creds)
provider = reload_provider()
return {'reloaded': True, 'provider': type(provider).__name__}
@router.get('/health')
async def health():
return {'status':'ok'}
#from fastapi import Request
#@router.get("/_debug/headers")
#async def debug_headers(request: Request):
# return {"headers": dict(request.headers)}