Files
ip-geo-api/app/api.py
2025-10-09 16:56:58 +02:00

201 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Request, Depends, HTTPException, status, Response
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from .deps import get_geo
from .config import settings
from .geo import reload_provider
import secrets
import ipaddress
import re
import json
router = APIRouter()
security = HTTPBasic()
VENDOR_SINGLE_IP_HEADERS = [
"cf-connecting-ip", # Cloudflare
"true-client-ip", # Akamai/F5
"x-cluster-client-ip", # niektóre load balancery
"x-real-ip", # klasyk (nginx/traefik)
]
def _check_admin(creds: HTTPBasicCredentials):
user = settings.admin_user
pwd = settings.admin_pass
if not user or not pwd:
raise HTTPException(status_code=403, detail="admin credentials not configured")
# constant-time compare
if not (
secrets.compare_digest(creds.username, user)
and secrets.compare_digest(creds.password, pwd)
):
raise HTTPException(
status_code=401,
detail="invalid credentials",
headers={"WWW-Authenticate": "Basic"},
)
return True
def _normalize_ip_str(ip_raw: str) -> str | None:
"""Usuń port, whitespace i ewentualne cudzysłowy"""
if not ip_raw:
return None
ip_raw = ip_raw.strip().strip('"').strip("'")
# usuń port, np. 1.2.3.4:5678
if ":" in ip_raw and ip_raw.count(":") == 1:
# prawdopodobnie IPv4:port
ip_raw = ip_raw.split(":")[0]
# Pozostaw kwestie IPv6 z %zone
return ip_raw
def _is_ip_trusted(ip_str: str) -> bool:
try:
ip = ipaddress.ip_address(ip_str.split("%")[0])
except Exception:
return False
for net in settings.trusted_proxies:
try:
if ip in net:
return True
except Exception:
continue
return False
def _extract_from_forwarded(header_value: str) -> list[str]:
# Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=...
ips = []
parts = re.split(r",\s*(?=[fF]or=)", header_value)
for part in parts:
m = re.search(r'for=(?P<val>"[^"]+"|[^;,\s]+)', part)
if m:
val = m.group("val").strip('"').strip("'")
ips.append(val)
return ips
def geo_headers(data: dict) -> dict:
h = {}
country = data.get("country", {}).get("name") if data.get("country") else None
city = data.get("city")
ip_val = data.get("ip")
if ip_val and country:
h["X-IP-ADDRESS"] = ip_val
h["X-COUNTRY"] = country
if city:
h["X-CITY"] = city
return h
def get_client_ip(request: Request) -> str:
"""
Zwraca IP klienta biorąc pod uwagę:
- CF-Connecting-IP / True-Client-IP / X-Cluster-Client-Ip / X-Real-IP
- X-Forwarded-For (RFC7239 semantyka: client, proxy1, proxy2)
- Forwarded: for=...
Logika XFF:
- weź listę IP
- zdejmuj od PRAWEJ strony te, które są zaufanymi proxy
- zwróć ostatni pozostały (jeśli nic nie zostało, zwróć lewy skrajny)
"""
# 0) Vendorowe nagłówki z pojedynczym IP (preferowane, jeśli są i nie są zaufane)
for h in VENDOR_SINGLE_IP_HEADERS:
v = request.headers.get(h)
if v:
ip = _normalize_ip_str(v)
if ip:
if not settings.trusted_proxies or not _is_ip_trusted(ip):
return ip
# jeśli vendor wskazuje zaufane proxy, idź dalej
# 1) X-Forwarded-For (client, proxy1, proxy2...)
xff = request.headers.get("x-forwarded-for")
if xff:
raw_ips = [p.strip() for p in xff.split(",") if p.strip()]
norm_ips = []
for raw in raw_ips:
v = _normalize_ip_str(raw)
if v:
norm_ips.append(v)
if norm_ips:
if settings.trusted_proxies:
# zdejmuj od PRAWEJ strony zaufane hopy
tmp = norm_ips[:]
while tmp and _is_ip_trusted(tmp[-1]):
tmp.pop()
if tmp:
return tmp[-1] # ostatni niezaufany = klient
# w skrajnym przypadku wszystkie są zaufane zwróć najbardziej „kliencki” (lewy)
return norm_ips[0]
else:
# bez listy zaufanych proxy bierzemy lewy skrajny
return norm_ips[0]
# 2) Forwarded (RFC7239)
fwd = request.headers.get("forwarded")
if fwd:
fwd_ips = _extract_from_forwarded(fwd)
norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)]
if norm_ips:
if settings.trusted_proxies:
tmp = norm_ips[:]
while tmp and _is_ip_trusted(tmp[-1]):
tmp.pop()
if tmp:
return tmp[-1]
return norm_ips[0]
return norm_ips[0]
# 3) Fallback: request.client.host
try:
host = request.client.host
if host:
return host.split("%")[0] if "%" in host else host
except Exception:
pass
return "0.0.0.0"
@router.api_route('/ip', methods=["GET", "HEAD"])
async def my_ip(request: Request, geo=Depends(get_geo)):
ip = get_client_ip(request) # pobieranie IP:contentReference[oaicite:0]{index=0}
data = geo.lookup(ip) # geo lookup:contentReference[oaicite:1]{index=1}:contentReference[oaicite:2]{index=2}
headers = geo_headers(data)
if request.method == "HEAD":
return Response(status_code=200, headers=headers)
body = json.dumps(data, ensure_ascii=False) + "\n"
return Response(content=body, media_type="application/json", headers=headers)
@router.api_route('/ip/{ip_address}', methods=["GET", "HEAD"])
async def ip_lookup(ip_address: str, geo=Depends(get_geo)):
data = geo.lookup(ip_address)
headers = geo_headers(data)
if request.method == "HEAD":
return Response(status_code=200, headers=headers)
body = json.dumps(data, ensure_ascii=False) + "\n"
return Response(content=body, media_type="application/json", headers=headers)
@router.post("/reload")
async def reload(creds: HTTPBasicCredentials = Depends(security)):
_check_admin(creds)
provider = reload_provider()
return {"reloaded": True, "provider": type(provider).__name__}
@router.get("/health")
async def health():
return {"status": "ok"}
# from fastapi import Request
# @router.get("/_debug/headers")
# async def debug_headers(request: Request):
# return {"headers": dict(request.headers)}