diff --git a/.env.example b/.env.example index 1cae0cd..53719c7 100644 --- a/.env.example +++ b/.env.example @@ -29,3 +29,6 @@ CACHE_MAXSIZE=4096 # Log LOG_LEVEL=info + +# Proxy +TRUSTED_PROXIES="10.0.0.0/8,127.0.0.1" \ No newline at end of file diff --git a/app/api.py b/app/api.py index aa97c05..a904bb8 100644 --- a/app/api.py +++ b/app/api.py @@ -5,6 +5,7 @@ from .config import settings from .geo import reload_provider import secrets import ipaddress +import re router = APIRouter() security = HTTPBasic() @@ -19,9 +20,108 @@ def _check_admin(creds: HTTPBasicCredentials): raise HTTPException(status_code=401, detail='invalid credentials', headers={"WWW-Authenticate":"Basic"}) return True +def _normalize_ip_str(ip_raw: str) -> str | None: + """Usuń port, whitespace i ewentualne cudzysłowy""" + if not ip_raw: + return None + ip_raw = ip_raw.strip().strip('"').strip("'") + # usuń port, np. 1.2.3.4:5678 + if ':' in ip_raw and ip_raw.count(':') == 1: + # prawdopodobnie IPv4:port + ip_raw = ip_raw.split(':')[0] + # Pozostaw kwestie IPv6 z %zone + return ip_raw + +def _is_ip_trusted(ip_str: str) -> bool: + try: + ip = ipaddress.ip_address(ip_str.split('%')[0]) + except Exception: + return False + for net in settings.trusted_proxies: + try: + if ip in net: + return True + except Exception: + continue + return False + +def _extract_from_forwarded(header_value: str) -> list[str]: + # Forwarded: for=192.0.2.43, for="[2001:db8:cafe::17]";proto=http;by=... + ips = [] + parts = re.split(r',\s*(?=[fF]or=)', header_value) + for part in parts: + m = re.search(r'for=(?P"[^"]+"|[^;,\s]+)', part) + if m: + val = m.group('val').strip('"').strip("'") + ips.append(val) + return ips + +def get_client_ip(request: Request) -> str: + # 1) X-Forwarded-For + xff = request.headers.get("x-forwarded-for") + if xff: + # XFF: client, proxy1, proxy2 + raw_ips = [p.strip() for p in xff.split(",") if p.strip()] + # Normalizuj i usuń porty + norm_ips = [] + for raw in raw_ips: + v = _normalize_ip_str(raw) + if v: + norm_ips.append(v) + # jeśli mamy zaufane proxy -> zwracamy pierwsze IP, które NIE jest zaufane + if settings.trusted_proxies: + for ip in norm_ips: + try: + # ignoruj jeżeli to zaufane proxy + if not _is_ip_trusted(ip): + return ip + except Exception: + continue + # jeśli wszystkie były zaufane, zwróć pierwsze (najbardziej "client" lub lewo) + if norm_ips: + return norm_ips[0] + else: + # bez zaufanych proxy: przyjmujemy lewą (first) jako klienta + if norm_ips: + return norm_ips[0] + + # 2) Forwarded (RFC7239) + fwd = request.headers.get("forwarded") + if fwd: + fwd_ips = _extract_from_forwarded(fwd) + norm_ips = [_normalize_ip_str(ip) for ip in fwd_ips if _normalize_ip_str(ip)] + if norm_ips: + if settings.trusted_proxies: + for ip in norm_ips: + if not _is_ip_trusted(ip): + return ip + return norm_ips[0] + return norm_ips[0] + + # 3) X-Real-IP + xri = request.headers.get("x-real-ip") + if xri: + v = _normalize_ip_str(xri) + if v: + if settings.trusted_proxies and _is_ip_trusted(v): + # jeśli header wskazuje zaufane proxy - nie używamy go jako klienta + pass + else: + return v + + # Fallback: request.client.host (np. bez reverse-proxy lub jeśli nic innego) + try: + host = request.client.host + if host: + return host.split('%')[0] if '%' in host else host + except Exception: + pass + + return "0.0.0.0" + @router.get('/ip') async def my_ip(request: Request, geo=Depends(get_geo)): - ip = request.client.host + ip = get_client_ip(request) # handle IPv6 mapped IPv4 like ::ffff:1.2.3.4 try: ip = ip.split('%')[0] @@ -51,4 +151,4 @@ async def reload(creds: HTTPBasicCredentials = Depends(security)): @router.get('/health') async def health(): - return {'status':'ok'} + return {'status':'ok'} \ No newline at end of file diff --git a/app/config.py b/app/config.py index f162ff5..a8b7b64 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,27 @@ import os from pydantic_settings import BaseSettings from dotenv import load_dotenv +import ipaddress load_dotenv() +def _parse_trusted_proxies(raw: str): + # raw: comma-separated list of IPs or CIDR ranges + items = [p.strip() for p in (raw or "").split(",") if p.strip()] + nets = [] + for p in items: + try: + if "/" in p: + nets.append(ipaddress.ip_network(p, strict=False)) + else: + # treat single IP as /32 or /128 network + ip = ipaddress.ip_address(p) + nets.append(ipaddress.ip_network(ip.exploded + ("/32" if ip.version == 4 else "/128"))) + except Exception: + # ignoruj błędne wpisy + continue + return nets + class Settings(BaseSettings): geo_provider: str = os.getenv('GEO_PROVIDER', 'maxmind') @@ -12,7 +30,7 @@ class Settings(BaseSettings): maxmind_license_key: str | None = os.getenv('MAXMIND_LICENSE_KEY') maxmind_db_name: str = os.getenv('MAXMIND_DB_NAME', 'GeoLite2-City') maxmind_db_path: str = os.getenv('MAXMIND_DB_PATH', '/data/GeoLite2-City.mmdb') - maxmind_download_url_template: str = os.getenv( + maxmind_download_url_template: str | None = os.getenv( 'MAXMIND_DOWNLOAD_URL_TEMPLATE', 'https://download.maxmind.com/app/geoip_download?edition_id={DBNAME}&license_key={LICENSE_KEY}&suffix=tar.gz' ) @@ -34,5 +52,11 @@ class Settings(BaseSettings): admin_pass: str | None = os.getenv('ADMIN_PASS') cache_maxsize: int = int(os.getenv('CACHE_MAXSIZE', '4096')) + # Nowe: lista zaufanych proxy (CIDR lub IP), oddzielone przecinkami + # Przykład: "127.0.0.1,10.0.0.0/8,192.168.1.5" + _trusted_proxies_raw: str | None = os.getenv('TRUSTED_PROXIES', '') + @property + def trusted_proxies(self): + return _parse_trusted_proxies(self._trusted_proxies_raw) settings = Settings()