from fastapi import APIRouter, Request, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from .deps import get_geo from .config import settings from .geo import reload_provider import secrets import ipaddress import re router = APIRouter() security = HTTPBasic() VENDOR_SINGLE_IP_HEADERS = [ "cf-connecting-ip", # Cloudflare "true-client-ip", # Akamai/F5 "x-cluster-client-ip", # niektóre load balancery "x-real-ip", # klasyk (nginx/traefik) ] def _check_admin(creds: HTTPBasicCredentials): user = settings.admin_user pwd = settings.admin_pass if not user or not pwd: raise HTTPException(status_code=403, detail='admin credentials not configured') # constant-time compare if not (secrets.compare_digest(creds.username, user) and secrets.compare_digest(creds.password, pwd)): raise HTTPException(status_code=401, detail='invalid credentials', headers={"WWW-Authenticate":"Basic"}) return True def _normalize_ip_str(ip_raw: str) -> str | None: """Usuń port, whitespace i ewentualne cudzysłowy""" if not ip_raw: return None ip_raw = ip_raw.strip().strip('"').strip("'") # usuń port, np. 1.2.3.4:5678 if ':' in ip_raw and ip_raw.count(':') == 1: # prawdopodobnie IPv4:port ip_raw = ip_raw.split(':')[0] # Pozostaw kwestie IPv6 z %zone return ip_raw def _is_ip_trusted(ip_str: str) -> bool: try: ip = ipaddress.ip_address(ip_str.split('%')[0]) except Exception: return False for net in settings.trusted_proxies: try: if ip in net: return True except Exception: continue return False def _extract_from_forwarded(header_value: str) -> list[str]: # Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=... ips = [] parts = re.split(r',\s*(?=[fF]or=)', header_value) for part in parts: m = re.search(r'for=(?P"[^"]+"|[^;,\s]+)', part) if m: val = m.group('val').strip('"').strip("'") ips.append(val) return ips def get_client_ip(request: Request) -> str: """ Zwraca IP klienta biorąc pod uwagę: - CF-Connecting-IP / True-Client-IP / X-Cluster-Client-Ip / X-Real-IP - X-Forwarded-For (RFC7239 semantyka: client, proxy1, proxy2) - Forwarded: for=... Logika XFF: - weź listę IP - zdejmuj od PRAWEJ strony te, które są zaufanymi proxy - zwróć ostatni pozostały (jeśli nic nie zostało, zwróć lewy skrajny) """ # 0) Vendorowe nagłówki z pojedynczym IP (preferowane, jeśli są i nie są zaufane) for h in VENDOR_SINGLE_IP_HEADERS: v = request.headers.get(h) if v: ip = _normalize_ip_str(v) if ip: if not settings.trusted_proxies or not _is_ip_trusted(ip): return ip # jeśli vendor wskazuje zaufane proxy, idź dalej # 1) X-Forwarded-For (client, proxy1, proxy2...) xff = request.headers.get("x-forwarded-for") if xff: raw_ips = [p.strip() for p in xff.split(",") if p.strip()] norm_ips = [] for raw in raw_ips: v = _normalize_ip_str(raw) if v: norm_ips.append(v) if norm_ips: if settings.trusted_proxies: # zdejmuj od PRAWEJ strony zaufane hop’y tmp = norm_ips[:] while tmp and _is_ip_trusted(tmp[-1]): tmp.pop() if tmp: return tmp[-1] # ostatni niezaufany = klient # w skrajnym przypadku wszystkie są zaufane – zwróć najbardziej „kliencki” (lewy) return norm_ips[0] else: # bez listy zaufanych proxy bierzemy lewy skrajny return norm_ips[0] # 2) Forwarded (RFC7239) fwd = request.headers.get("forwarded") if fwd: fwd_ips = _extract_from_forwarded(fwd) norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)] if norm_ips: if settings.trusted_proxies: tmp = norm_ips[:] while tmp and _is_ip_trusted(tmp[-1]): tmp.pop() if tmp: return tmp[-1] return norm_ips[0] return norm_ips[0] # 3) Fallback: request.client.host try: host = request.client.host if host: return host.split('%')[0] if '%' in host else host except Exception: pass return "0.0.0.0" @router.get('/ip') async def my_ip(request: Request, geo=Depends(get_geo)): ip = get_client_ip(request) # handle IPv6 mapped IPv4 like ::ffff:1.2.3.4 try: ip = ip.split('%')[0] except Exception: pass return geo.lookup(ip) @router.get('/ip/{ip_address}') async def ip_lookup(ip_address: str, geo=Depends(get_geo)): # validate IP try: # allow zone index for IPv6 and strip it for validation if '%' in ip_address: addr = ip_address.split('%')[0] else: addr = ip_address ipaddress.ip_address(addr) except Exception: raise HTTPException(status_code=400, detail='invalid IP address') return geo.lookup(ip_address) @router.post('/reload') async def reload(creds: HTTPBasicCredentials = Depends(security)): _check_admin(creds) provider = reload_provider() return {'reloaded': True, 'provider': type(provider).__name__} @router.get('/health') async def health(): return {'status':'ok'} #from fastapi import Request #@router.get("/_debug/headers") #async def debug_headers(request: Request): # return {"headers": dict(request.headers)}