dane w headerach i inne funkcje

This commit is contained in:
Mateusz Gruszczyński
2025-10-09 16:40:56 +02:00
parent cb109b63ae
commit eb137c87b0
7 changed files with 161 additions and 76 deletions

View File

@@ -8,6 +8,7 @@ from .config import settings
try:
import geoip2.database
from geoip2.errors import AddressNotFoundError
try:
# geoip2<5
from geoip2.errors import InvalidDatabaseError # type: ignore
@@ -17,8 +18,10 @@ try:
except Exception as e:
print("Import geoip2 failed:", e)
geoip2 = None
# awaryjne aliasy, aby kod dalej działał
class _TmpErr(Exception): ...
AddressNotFoundError = _TmpErr
InvalidDatabaseError = _TmpErr
@@ -67,8 +70,10 @@ class MaxMindGeo(GeoLookupBase):
def _detect_db_type(self):
"""Próbuje określić typ bazy na podstawie metadanych, nazwy lub próbnych zapytań."""
t = (getattr(self._reader, "metadata", None)
and getattr(self._reader.metadata, "database_type", "")) or ""
t = (
getattr(self._reader, "metadata", None)
and getattr(self._reader.metadata, "database_type", "")
) or ""
if t:
return t.lower()
@@ -80,7 +85,7 @@ class MaxMindGeo(GeoLookupBase):
probes = [
("city", self._reader.city),
("country", self._reader.country),
("asn", self._reader.asn)
("asn", self._reader.asn),
]
test_ip = "1.1.1.1"
for key, fn in probes:
@@ -107,7 +112,9 @@ class MaxMindGeo(GeoLookupBase):
pass
self._reader = geoip2.database.Reader(self.db_path)
self._db_type = self._detect_db_type()
print(f"[MaxMindGeo] opened {self.db_path} type={self._db_type or 'unknown'}")
print(
f"[MaxMindGeo] opened {self.db_path} type={self._db_type or 'unknown'}"
)
def _lookup_inner(self, ip: str):
t = (self._db_type or "").lower()
@@ -117,7 +124,9 @@ class MaxMindGeo(GeoLookupBase):
"ip": ip,
"asn": {
"number": getattr(rec, "autonomous_system_number", None),
"organization": getattr(rec, "autonomous_system_organization", None),
"organization": getattr(
rec, "autonomous_system_organization", None
),
},
"database_type": self._db_type,
}
@@ -145,7 +154,9 @@ class MaxMindGeo(GeoLookupBase):
"continent": getattr(rec.continent, "name", None),
"database_type": self._db_type,
}
raise RuntimeError(f"Nieobsługiwany / niewykryty typ bazy: {self._db_type} (plik: {self.db_path})")
raise RuntimeError(
f"Nieobsługiwany / niewykryty typ bazy: {self._db_type} (plik: {self.db_path})"
)
def lookup(self, ip: str):
if not self.is_valid_ip(ip):
@@ -213,8 +224,12 @@ _provider_lock = threading.RLock()
def _create_provider():
provider = settings.geo_provider.lower()
if provider == "ip2location":
return IP2LocationGeo(db_path=settings.ip2location_db_path, cache_maxsize=settings.cache_maxsize)
return MaxMindGeo(db_path=settings.maxmind_db_path, cache_maxsize=settings.cache_maxsize)
return IP2LocationGeo(
db_path=settings.ip2location_db_path, cache_maxsize=settings.cache_maxsize
)
return MaxMindGeo(
db_path=settings.maxmind_db_path, cache_maxsize=settings.cache_maxsize
)
def get_provider_instance():