from fastapi import APIRouter, Request, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from .deps import get_geo from .config import settings from .geo import reload_provider import secrets import ipaddress router = APIRouter() security = HTTPBasic() def _check_admin(creds: HTTPBasicCredentials): user = settings.admin_user pwd = settings.admin_pass if not user or not pwd: raise HTTPException(status_code=403, detail='admin credentials not configured') # constant-time compare if not (secrets.compare_digest(creds.username, user) and secrets.compare_digest(creds.password, pwd)): raise HTTPException(status_code=401, detail='invalid credentials', headers={"WWW-Authenticate":"Basic"}) return True @router.get('/ip') async def my_ip(request: Request, geo=Depends(get_geo)): ip = request.client.host # handle IPv6 mapped IPv4 like ::ffff:1.2.3.4 try: ip = ip.split('%')[0] except Exception: pass return geo.lookup(ip) @router.get('/ip/{ip_address}') async def ip_lookup(ip_address: str, geo=Depends(get_geo)): # validate IP try: # allow zone index for IPv6 and strip it for validation if '%' in ip_address: addr = ip_address.split('%')[0] else: addr = ip_address ipaddress.ip_address(addr) except Exception: raise HTTPException(status_code=400, detail='invalid IP address') return geo.lookup(ip_address) @router.post('/reload') async def reload(creds: HTTPBasicCredentials = Depends(security)): _check_admin(creds) provider = reload_provider() return {'reloaded': True, 'provider': type(provider).__name__} @router.get('/health') async def health(): return {'status':'ok'}