from fastapi import APIRouter, Request, Depends, HTTPException, status, Response from fastapi.security import HTTPBasic, HTTPBasicCredentials from .deps import get_geo from .config import settings from .geo import reload_provider import secrets import ipaddress import re import json router = APIRouter() security = HTTPBasic() VENDOR_SINGLE_IP_HEADERS = [ "cf-connecting-ip", # Cloudflare "true-client-ip", # Akamai/F5 "x-cluster-client-ip", # niektóre load balancery "x-real-ip", # klasyk (nginx/traefik) ] def _check_admin(creds: HTTPBasicCredentials): user = settings.admin_user pwd = settings.admin_pass if not user or not pwd: raise HTTPException(status_code=403, detail="admin credentials not configured") # constant-time compare if not ( secrets.compare_digest(creds.username, user) and secrets.compare_digest(creds.password, pwd) ): raise HTTPException( status_code=401, detail="invalid credentials", headers={"WWW-Authenticate": "Basic"}, ) return True def _normalize_ip_str(ip_raw: str) -> str | None: """Usuń port, whitespace i ewentualne cudzysłowy""" if not ip_raw: return None ip_raw = ip_raw.strip().strip('"').strip("'") # usuń port, np. 1.2.3.4:5678 if ":" in ip_raw and ip_raw.count(":") == 1: # prawdopodobnie IPv4:port ip_raw = ip_raw.split(":")[0] # Pozostaw kwestie IPv6 z %zone return ip_raw def _is_ip_trusted(ip_str: str) -> bool: try: ip = ipaddress.ip_address(ip_str.split("%")[0]) except Exception: return False for net in settings.trusted_proxies: try: if ip in net: return True except Exception: continue return False def _extract_from_forwarded(header_value: str) -> list[str]: # Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=... ips = [] parts = re.split(r",\s*(?=[fF]or=)", header_value) for part in parts: m = re.search(r'for=(?P"[^"]+"|[^;,\s]+)', part) if m: val = m.group("val").strip('"').strip("'") ips.append(val) return ips def geo_headers(data: dict) -> dict: h = {} country = data.get("country", {}).get("name") if data.get("country") else None city = data.get("city") ip_val = data.get("ip") if ip_val and country: h["X-IP-ADDRESS"] = ip_val h["X-COUNTRY"] = country if city: h["X-CITY"] = city return h def get_client_ip(request: Request) -> str: """ Zwraca IP klienta biorąc pod uwagę: - CF-Connecting-IP / True-Client-IP / X-Cluster-Client-Ip / X-Real-IP - X-Forwarded-For (RFC7239 semantyka: client, proxy1, proxy2) - Forwarded: for=... Logika XFF: - weź listę IP - zdejmuj od PRAWEJ strony te, które są zaufanymi proxy - zwróć ostatni pozostały (jeśli nic nie zostało, zwróć lewy skrajny) """ # 0) Vendorowe nagłówki z pojedynczym IP (preferowane, jeśli są i nie są zaufane) for h in VENDOR_SINGLE_IP_HEADERS: v = request.headers.get(h) if v: ip = _normalize_ip_str(v) if ip: if not settings.trusted_proxies or not _is_ip_trusted(ip): return ip # jeśli vendor wskazuje zaufane proxy, idź dalej # 1) X-Forwarded-For (client, proxy1, proxy2...) xff = request.headers.get("x-forwarded-for") if xff: raw_ips = [p.strip() for p in xff.split(",") if p.strip()] norm_ips = [] for raw in raw_ips: v = _normalize_ip_str(raw) if v: norm_ips.append(v) if norm_ips: if settings.trusted_proxies: # zdejmuj od PRAWEJ strony zaufane hop’y tmp = norm_ips[:] while tmp and _is_ip_trusted(tmp[-1]): tmp.pop() if tmp: return tmp[-1] # ostatni niezaufany = klient # w skrajnym przypadku wszystkie są zaufane – zwróć najbardziej „kliencki” (lewy) return norm_ips[0] else: # bez listy zaufanych proxy bierzemy lewy skrajny return norm_ips[0] # 2) Forwarded (RFC7239) fwd = request.headers.get("forwarded") if fwd: fwd_ips = _extract_from_forwarded(fwd) norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)] if norm_ips: if settings.trusted_proxies: tmp = norm_ips[:] while tmp and _is_ip_trusted(tmp[-1]): tmp.pop() if tmp: return tmp[-1] return norm_ips[0] return norm_ips[0] # 3) Fallback: request.client.host try: host = request.client.host if host: return host.split("%")[0] if "%" in host else host except Exception: pass return "0.0.0.0" @router.api_route('/ip', methods=["GET", "HEAD"]) async def my_ip(request: Request, geo=Depends(get_geo)): ip = get_client_ip(request) # pobieranie IP:contentReference[oaicite:0]{index=0} data = geo.lookup(ip) # geo lookup:contentReference[oaicite:1]{index=1}:contentReference[oaicite:2]{index=2} headers = geo_headers(data) if request.method == "HEAD": return Response(status_code=200, headers=headers) body = json.dumps(data, ensure_ascii=False) + "\n" return Response(content=body, media_type="application/json", headers=headers) @router.api_route('/ip/{ip_address}', methods=["GET", "HEAD"]) async def ip_lookup(ip_address: str, request: Request, geo=Depends(get_geo)): data = geo.lookup(ip_address) headers = geo_headers(data) if request.method == "HEAD": return Response(status_code=200, headers=headers) body = json.dumps(data, ensure_ascii=False) + "\n" return Response(content=body, media_type="application/json", headers=headers) @router.post("/reload") async def reload(creds: HTTPBasicCredentials = Depends(security)): _check_admin(creds) provider = reload_provider() return {"reloaded": True, "provider": type(provider).__name__} @router.get("/health") async def health(): return {"status": "ok"} # from fastapi import Request # @router.get("/_debug/headers") # async def debug_headers(request: Request): # return {"headers": dict(request.headers)}