import ipaddress import threading from functools import lru_cache, wraps from typing import Dict from pathlib import Path from .config import settings try: import geoip2.database from geoip2.errors import AddressNotFoundError try: # geoip2<5 from geoip2.errors import InvalidDatabaseError # type: ignore except Exception: # geoip2>=5 from maxminddb.errors import InvalidDatabaseError # type: ignore except Exception as e: print("Import geoip2 failed:", e) geoip2 = None # awaryjne aliasy, aby kod dalej działał class _TmpErr(Exception): ... AddressNotFoundError = _TmpErr InvalidDatabaseError = _TmpErr try: import IP2Location except Exception: IP2Location = None class GeoLookupBase: def lookup(self, ip: str) -> Dict: raise NotImplementedError def reload(self): pass def is_valid_ip(self, ip: str) -> bool: try: ipaddress.ip_address(ip.split("%")[0] if "%" in ip else ip) return True except Exception: return False def make_cached(func, maxsize: int): cached = lru_cache(maxsize=maxsize)(func) @wraps(func) def wrapper(ip): return cached(ip) wrapper.cache_clear = cached.cache_clear # type: ignore[attr-defined] return wrapper class MaxMindGeo(GeoLookupBase): def __init__(self, db_path: str | None = None, cache_maxsize: int = 4096): if geoip2 is None: raise RuntimeError("Brak biblioteki geoip2. Zainstaluj `geoip2`") self.db_path = db_path or settings.maxmind_db_path self._reader = None self._db_type = "" self._lock = threading.RLock() self._open() self.lookup_cached = make_cached(self._lookup_inner, cache_maxsize) def _detect_db_type(self): """Próbuje określić typ bazy na podstawie metadanych, nazwy lub próbnych zapytań.""" t = ( getattr(self._reader, "metadata", None) and getattr(self._reader.metadata, "database_type", "") ) or "" if t: return t.lower() name = (self.db_path or "").lower() for key in ("city", "country", "asn"): if key in name: return key probes = [ ("city", self._reader.city), ("country", self._reader.country), ("asn", self._reader.asn), ] test_ip = "1.1.1.1" for key, fn in probes: try: fn(test_ip) except InvalidDatabaseError: continue except AddressNotFoundError: return key except Exception: continue else: return key return "" def _open(self): with self._lock: if not Path(self.db_path).exists(): raise RuntimeError(f"DB not found: {self.db_path}") if self._reader: try: self._reader.close() except Exception: pass self._reader = geoip2.database.Reader(self.db_path) self._db_type = self._detect_db_type() print( f"[MaxMindGeo] opened {self.db_path} type={self._db_type or 'unknown'}" ) def _lookup_inner(self, ip: str): t = (self._db_type or "").lower() if "asn" in t: rec = self._reader.asn(ip) return { "ip": ip, "asn": { "number": getattr(rec, "autonomous_system_number", None), "organization": getattr( rec, "autonomous_system_organization", None ), }, "database_type": self._db_type, } if "city" in t: rec = self._reader.city(ip) return { "ip": ip, "country": {"iso_code": rec.country.iso_code, "name": rec.country.name}, "continent": getattr(rec.continent, "name", None), "subdivisions": [sub.name for sub in rec.subdivisions], "city": getattr(rec.city, "name", None), "location": { "latitude": getattr(rec.location, "latitude", None), "longitude": getattr(rec.location, "longitude", None), "time_zone": getattr(rec.location, "time_zone", None), }, "postal": getattr(rec.postal, "code", None), "database_type": self._db_type, } if "country" in t: rec = self._reader.country(ip) return { "ip": ip, "country": {"iso_code": rec.country.iso_code, "name": rec.country.name}, "continent": getattr(rec.continent, "name", None), "database_type": self._db_type, } raise RuntimeError( f"Nieobsługiwany / niewykryty typ bazy: {self._db_type} (plik: {self.db_path})" ) def lookup(self, ip: str): if not self.is_valid_ip(ip): return {"ip": ip, "error": "invalid IP"} try: return self.lookup_cached(ip) except Exception as e: return {"ip": ip, "error": str(e)} def reload(self): with self._lock: self._open() try: self.lookup_cached.cache_clear() # type: ignore[attr-defined] except Exception: pass class IP2LocationGeo(GeoLookupBase): def __init__(self, db_path: str | None = None, cache_maxsize: int = 4096): if IP2Location is None: raise RuntimeError("Brak biblioteki IP2Location. Zainstaluj `IP2Location`") self.db_path = db_path or settings.ip2location_db_path self._lock = threading.RLock() self._db = IP2Location.IP2Location(self.db_path) self.lookup_cached = make_cached(self._lookup_inner, cache_maxsize) def _lookup_inner(self, ip: str): r = self._db.get_all(ip) return { "ip": ip, "country": {"iso_code": r.country_short, "name": r.country_long}, "region": r.region, "city": r.city, "latitude": r.latitude, "longitude": r.longitude, "zip_code": r.zipcode, "timezone": r.timezone, } def lookup(self, ip: str): if not self.is_valid_ip(ip): return {"ip": ip, "error": "invalid IP"} try: return self.lookup_cached(ip) except Exception as e: return {"ip": ip, "error": str(e)} def reload(self): with self._lock: try: self._db = IP2Location.IP2Location(self.db_path) except Exception: pass try: self.lookup_cached.cache_clear() # type: ignore[attr-defined] except Exception: pass _provider = None _provider_lock = threading.RLock() def _create_provider(): provider = settings.geo_provider.lower() if provider == "ip2location": return IP2LocationGeo( db_path=settings.ip2location_db_path, cache_maxsize=settings.cache_maxsize ) return MaxMindGeo( db_path=settings.maxmind_db_path, cache_maxsize=settings.cache_maxsize ) def get_provider_instance(): global _provider with _provider_lock: if _provider is None: _provider = _create_provider() return _provider def reload_provider(): global _provider with _provider_lock: if _provider is None: _provider = _create_provider() else: try: _provider.reload() except Exception: _provider = _create_provider() return _provider