from fastapi import APIRouter, Request, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from .deps import get_geo from .config import settings from .geo import reload_provider import secrets import ipaddress import re router = APIRouter() security = HTTPBasic() def _check_admin(creds: HTTPBasicCredentials): user = settings.admin_user pwd = settings.admin_pass if not user or not pwd: raise HTTPException(status_code=403, detail='admin credentials not configured') # constant-time compare if not (secrets.compare_digest(creds.username, user) and secrets.compare_digest(creds.password, pwd)): raise HTTPException(status_code=401, detail='invalid credentials', headers={"WWW-Authenticate":"Basic"}) return True def _normalize_ip_str(ip_raw: str) -> str | None: """Usuń port, whitespace i ewentualne cudzysłowy""" if not ip_raw: return None ip_raw = ip_raw.strip().strip('"').strip("'") # usuń port, np. 1.2.3.4:5678 if ':' in ip_raw and ip_raw.count(':') == 1: # prawdopodobnie IPv4:port ip_raw = ip_raw.split(':')[0] # Pozostaw kwestie IPv6 z %zone return ip_raw def _is_ip_trusted(ip_str: str) -> bool: try: ip = ipaddress.ip_address(ip_str.split('%')[0]) except Exception: return False for net in settings.trusted_proxies: try: if ip in net: return True except Exception: continue return False def _extract_from_forwarded(header_value: str) -> list[str]: # Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=... ips = [] parts = re.split(r',\s*(?=[fF]or=)', header_value) for part in parts: m = re.search(r'for=(?P"[^"]+"|[^;,\s]+)', part) if m: val = m.group('val').strip('"').strip("'") ips.append(val) return ips def get_client_ip(request: Request) -> str: # 1) X-Forwarded-For xff = request.headers.get("x-forwarded-for") if xff: # XFF: client, proxy1, proxy2 raw_ips = [p.strip() for p in xff.split(",") if p.strip()] # Normalizuj i usuń porty norm_ips = [] for raw in raw_ips: v = _normalize_ip_str(raw) if v: norm_ips.append(v) # jeśli mamy zaufane proxy -> zwracamy pierwsze IP, które NIE jest zaufane if settings.trusted_proxies: for ip in norm_ips: try: # ignoruj jeżeli to zaufane proxy if not _is_ip_trusted(ip): return ip except Exception: continue # jeśli wszystkie były zaufane, zwróć pierwsze (najbardziej "client" lub lewo) if norm_ips: return norm_ips[0] else: # bez zaufanych proxy: przyjmujemy lewą (first) jako klienta if norm_ips: return norm_ips[0] # 2) Forwarded (RFC7239) fwd = request.headers.get("forwarded") if fwd: fwd_ips = _extract_from_forwarded(fwd) norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)] if norm_ips: if settings.trusted_proxies: for ip in norm_ips: if not _is_ip_trusted(ip): return ip return norm_ips[0] return norm_ips[0] # 3) X-Real-IP xri = request.headers.get("x-real-ip") if xri: v = _normalize_ip_str(xri) if v: if settings.trusted_proxies and _is_ip_trusted(v): # jeśli header wskazuje zaufane proxy - nie używamy go jako klienta pass else: return v # Fallback: request.client.host (np. bez reverse-proxy lub jeśli nic innego) try: host = request.client.host if host: return host.split('%')[0] if '%' in host else host except Exception: pass return "0.0.0.0" @router.get('/ip') async def my_ip(request: Request, geo=Depends(get_geo)): ip = get_client_ip(request) # handle IPv6 mapped IPv4 like ::ffff:1.2.3.4 try: ip = ip.split('%')[0] except Exception: pass return geo.lookup(ip) @router.get('/ip/{ip_address}') async def ip_lookup(ip_address: str, geo=Depends(get_geo)): # validate IP try: # allow zone index for IPv6 and strip it for validation if '%' in ip_address: addr = ip_address.split('%')[0] else: addr = ip_address ipaddress.ip_address(addr) except Exception: raise HTTPException(status_code=400, detail='invalid IP address') return geo.lookup(ip_address) @router.post('/reload') async def reload(creds: HTTPBasicCredentials = Depends(security)): _check_admin(creds) provider = reload_provider() return {'reloaded': True, 'provider': type(provider).__name__} @router.get('/health') async def health(): return {'status':'ok'}