fix latin-1 errors

This commit is contained in:
Mateusz Gruszczyński
2025-10-24 20:22:34 +02:00
parent c2eb7c765d
commit c75ad0b564
2 changed files with 43 additions and 8 deletions

View File

@@ -4,11 +4,13 @@ from .deps import get_geo
from .config import settings from .config import settings
from .geo import reload_provider from .geo import reload_provider
from urllib.parse import quote from urllib.parse import quote
from starlette.responses import Response
import secrets import secrets
import ipaddress import ipaddress
import re import re
import json import json
import unicodedata import unicodedata
import json
router = APIRouter() router = APIRouter()
@@ -170,27 +172,46 @@ def get_client_ip(request: Request) -> str:
return "0.0.0.0" return "0.0.0.0"
def _safe_hdr(v: str) -> str:
try:
v.encode("latin-1")
return v
except UnicodeEncodeError:
return unicodedata.normalize("NFKD", v).encode("ascii", "ignore").decode("ascii") or "?"
def _sanitize_headers(headers: dict) -> dict:
out = {}
for k, v in (headers or {}).items():
ks = str(k)
vs = _safe_hdr(str(v))
# ostateczny test
try:
ks.encode("latin-1"); vs.encode("latin-1")
except UnicodeEncodeError:
ks = unicodedata.normalize("NFKD", ks).encode("ascii", "ignore").decode("ascii") or "X-Header"
vs = unicodedata.normalize("NFKD", vs).encode("ascii", "ignore").decode("ascii") or "?"
out[ks] = vs
return out
@router.api_route('/ip', methods=["GET", "HEAD"]) @router.api_route('/ip', methods=["GET", "HEAD"])
async def my_ip(request: Request, geo=Depends(get_geo)): async def my_ip(request: Request, geo=Depends(get_geo)):
ip = get_client_ip(request) # pobieranie IP:contentReference[oaicite:0]{index=0} ip = get_client_ip(request)
data = geo.lookup(ip) # geo lookup:contentReference[oaicite:1]{index=1}:contentReference[oaicite:2]{index=2} data = geo.lookup(ip)
headers = geo_headers(data) headers = _sanitize_headers(geo_headers(data))
if request.method == "HEAD": if request.method == "HEAD":
return Response(status_code=200, headers=headers) return Response(status_code=200, headers=headers)
body = json.dumps(data, ensure_ascii=False) + "\n" body = json.dumps(data, ensure_ascii=False) + "\n"
return Response(content=body, media_type="application/json", headers=headers) return Response(content=body, media_type="application/json", headers=headers)
@router.api_route('/ip/{ip_address}', methods=["GET", "HEAD"]) @router.api_route('/ip/{ip_address}', methods=["GET", "HEAD"])
async def ip_lookup(ip_address: str, request: Request, geo=Depends(get_geo)): async def ip_lookup(ip_address: str, request: Request, geo=Depends(get_geo)):
data = geo.lookup(ip_address) data = geo.lookup(ip_address)
headers = geo_headers(data) headers = _sanitize_headers(geo_headers(data))
if request.method == "HEAD": if request.method == "HEAD":
return Response(status_code=200, headers=headers) return Response(status_code=200, headers=headers)
body = json.dumps(data, ensure_ascii=False) + "\n" body = json.dumps(data, ensure_ascii=False) + "\n"
return Response(content=body, media_type="application/json", headers=headers) return Response(content=body, media_type="application/json", headers=headers)
@router.post("/reload") @router.post("/reload")
async def reload(creds: HTTPBasicCredentials = Depends(security)): async def reload(creds: HTTPBasicCredentials = Depends(security)):
_check_admin(creds) _check_admin(creds)

View File

@@ -11,14 +11,28 @@ app.include_router(router)
async def add_geo_headers(request, call_next): async def add_geo_headers(request, call_next):
import unicodedata
from starlette.responses import Response # tylko dla typu
def _ascii(s: str) -> str:
try:
s.encode("latin-1")
return s
except UnicodeEncodeError:
return unicodedata.normalize("NFKD", s).encode("ascii", "ignore").decode("ascii") or "?"
ip = get_client_ip(request) ip = get_client_ip(request)
geo = get_geo() geo = get_geo()
data = geo.lookup(ip) data = geo.lookup(ip)
response: Response = await call_next(request) response: Response = await call_next(request)
for k, v in geo_headers(data).items(): for k, v in (geo_headers(data) or {}).items():
response.headers[k] = v ks = _ascii(str(k))
vs = _ascii(str(v))
# gwarancja, że przejdzie przez kodowanie nagłówków Starlette
response.headers[ks] = vs
return response return response