diff --git a/app/api.py b/app/api.py index 930878f..837ab42 100644 --- a/app/api.py +++ b/app/api.py @@ -4,11 +4,13 @@ from .deps import get_geo from .config import settings from .geo import reload_provider from urllib.parse import quote +from starlette.responses import Response import secrets import ipaddress import re import json import unicodedata +import json router = APIRouter() @@ -170,27 +172,46 @@ def get_client_ip(request: Request) -> str: return "0.0.0.0" +def _safe_hdr(v: str) -> str: + try: + v.encode("latin-1") + return v + except UnicodeEncodeError: + return unicodedata.normalize("NFKD", v).encode("ascii", "ignore").decode("ascii") or "?" + +def _sanitize_headers(headers: dict) -> dict: + out = {} + for k, v in (headers or {}).items(): + ks = str(k) + vs = _safe_hdr(str(v)) + # ostateczny test + try: + ks.encode("latin-1"); vs.encode("latin-1") + except UnicodeEncodeError: + ks = unicodedata.normalize("NFKD", ks).encode("ascii", "ignore").decode("ascii") or "X-Header" + vs = unicodedata.normalize("NFKD", vs).encode("ascii", "ignore").decode("ascii") or "?" + out[ks] = vs + return out + @router.api_route('/ip', methods=["GET", "HEAD"]) async def my_ip(request: Request, geo=Depends(get_geo)): - ip = get_client_ip(request) # pobieranie IP:contentReference[oaicite:0]{index=0} - data = geo.lookup(ip) # geo lookup:contentReference[oaicite:1]{index=1}:contentReference[oaicite:2]{index=2} - headers = geo_headers(data) + ip = get_client_ip(request) + data = geo.lookup(ip) + headers = _sanitize_headers(geo_headers(data)) if request.method == "HEAD": return Response(status_code=200, headers=headers) body = json.dumps(data, ensure_ascii=False) + "\n" return Response(content=body, media_type="application/json", headers=headers) - @router.api_route('/ip/{ip_address}', methods=["GET", "HEAD"]) async def ip_lookup(ip_address: str, request: Request, geo=Depends(get_geo)): data = geo.lookup(ip_address) - headers = geo_headers(data) + headers = _sanitize_headers(geo_headers(data)) if request.method == "HEAD": return Response(status_code=200, headers=headers) body = json.dumps(data, ensure_ascii=False) + "\n" return Response(content=body, media_type="application/json", headers=headers) - @router.post("/reload") async def reload(creds: HTTPBasicCredentials = Depends(security)): _check_admin(creds) diff --git a/app/main.py b/app/main.py index 7ac1f7b..2955334 100644 --- a/app/main.py +++ b/app/main.py @@ -11,14 +11,28 @@ app.include_router(router) async def add_geo_headers(request, call_next): + import unicodedata + from starlette.responses import Response # tylko dla typu + + def _ascii(s: str) -> str: + try: + s.encode("latin-1") + return s + except UnicodeEncodeError: + return unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") or "?" + ip = get_client_ip(request) geo = get_geo() data = geo.lookup(ip) response: Response = await call_next(request) - for k, v in geo_headers(data).items(): - response.headers[k] = v + for k, v in (geo_headers(data) or {}).items(): + ks = _ascii(str(k)) + vs = _ascii(str(v)) + # gwarancja, że przejdzie przez kodowanie nagłówków Starlette + response.headers[ks] = vs + return response