Files
leox-gpon-monitoring/collector.py
2026-01-07 13:23:40 +01:00

636 lines
24 KiB
Python

from aiohttp import BasicAuth
import telnetlib3
import time
import logging
from threading import Thread, Lock
from datetime import datetime
import re
import asyncio
import aiohttp
logger = logging.getLogger(__name__)
class GPONCollector:
def __init__(self, config):
self.config = config
self.data = {}
self.lock = Lock()
self.running = False
self.thread = None
self.alerts = []
self.last_full_collect = 0
self.full_collect_interval = 3600
self.first_run = True
def start(self):
if self.running:
return
self.running = True
self.thread = Thread(target=self._collect_loop, daemon=True)
self.thread.start()
logger.info("Collector started")
def stop(self):
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("Collector stopped")
def get_data(self):
with self.lock:
return self.data.copy()
def get_alerts(self):
with self.lock:
return self.alerts.copy()
def _collect_loop(self):
while self.running:
try:
collect_start = time.time()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
now = time.time()
full_collect = self.first_run or (now - self.last_full_collect) >= self.full_collect_interval
data = loop.run_until_complete(self._collect_all_data(full_collect=full_collect))
loop.close()
collect_duration = time.time() - collect_start
if full_collect:
self.last_full_collect = now
if data and data.get('status') == 'online':
with self.lock:
self.data = data
self._check_alerts(data)
rx = data.get('rx_power', 0)
tx = data.get('tx_power', 0)
temp = data.get('temperature', 0)
uptime = data.get('uptime', 0)
uptime_days = uptime // 86400
uptime_hours = (uptime % 86400) // 3600
uptime_minutes = (uptime % 3600) // 60
if self.first_run:
logger.info("=" * 70)
logger.info("Initial data collection completed successfully")
logger.info(f"Device: {data.get('vendor_id', 'N/A')} {data.get('model', 'N/A')}")
logger.info(f"Serial: {data.get('serial_number', 'N/A')}")
logger.info(f"Optical: RX={rx:.2f}dBm TX={tx:.2f}dBm")
logger.info(f"Status: Temperature={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m")
logger.info(f"Collection time: {collect_duration:.2f}s")
logger.info("=" * 70)
self.first_run = False
else:
logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m (took {collect_duration:.2f}s)")
else:
logger.warning(f"No data or device offline (took {collect_duration:.2f}s)")
self.first_run = False
except Exception as e:
logger.error(f"Error in collector loop: {e}", exc_info=True)
self.first_run = False
time.sleep(self.config.POLL_INTERVAL)
async def _collect_all_data(self, full_collect=False):
omci_data = await self._collect_omci_async(full_collect=full_collect)
if not omci_data:
return None
web_data = await self._scrape_web_interface()
if web_data:
for key, value in web_data.items():
if value is not None:
omci_data[key] = value
return omci_data
async def _scrape_web_interface(self):
try:
scrape_start = time.time()
auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD)
base_url = f'http://{self.config.GPON_HOST}'
data = {}
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
try:
url = f'{base_url}/status_pon.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
temp_match = re.search(r'<th[^>]*>Temperature</th>\s*<td[^>]*>([\d.]+)\s*C', html, re.IGNORECASE)
if temp_match:
data['temperature'] = float(temp_match.group(1))
volt_match = re.search(r'<th[^>]*>Voltage</th>\s*<td[^>]*>([\d.]+)\s*V', html, re.IGNORECASE)
if volt_match:
data['voltage'] = float(volt_match.group(1))
bias_match = re.search(r'<th[^>]*>Bias Current</th>\s*<td[^>]*>([\d.]+)\s*mA', html, re.IGNORECASE)
if bias_match:
data['tx_bias_current'] = float(bias_match.group(1))
logger.debug(f"[WEB] status_pon.asp: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}")
else:
logger.warning(f"[WEB] status_pon.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching status_pon.asp: {e}")
try:
url = f'{base_url}/admin/pon-stats.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
tx_pkts_match = re.search(r'<th[^>]*>Packets Sent:</th>\s*<td[^>]*>([\d,]+)</td>', html, re.IGNORECASE)
if tx_pkts_match:
data['tx_packets'] = int(tx_pkts_match.group(1).replace(',', ''))
rx_pkts_match = re.search(r'<th[^>]*>Packets Received:</th>\s*<td[^>]*>([\d,]+)</td>', html, re.IGNORECASE)
if rx_pkts_match:
data['rx_packets'] = int(rx_pkts_match.group(1).replace(',', ''))
tx_bytes_match = re.search(r'<th[^>]*>Bytes Sent:</th>\s*<td[^>]*>([\d,]+)</td>', html, re.IGNORECASE)
if tx_bytes_match:
data['tx_bytes'] = int(tx_bytes_match.group(1).replace(',', ''))
rx_bytes_match = re.search(r'<th[^>]*>Bytes Received:</th>\s*<td[^>]*>([\d,]+)</td>', html, re.IGNORECASE)
if rx_bytes_match:
data['rx_bytes'] = int(rx_bytes_match.group(1).replace(',', ''))
logger.debug(f"[WEB] pon-stats.asp: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}")
else:
logger.warning(f"[WEB] pon-stats.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching pon-stats.asp: {e}")
scrape_duration = time.time() - scrape_start
logger.info(f"[WEB] Scraped {len(data)} fields in {scrape_duration:.2f}s")
return data
except Exception as e:
logger.error(f"[WEB] Web scraping failed: {e}")
return {}
async def _collect_omci_async(self, full_collect=False):
try:
telnet_start = time.time()
reader, writer = await asyncio.wait_for(
telnetlib3.open_connection(
self.config.GPON_HOST,
self.config.GPON_PORT
),
timeout=10
)
await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5)
writer.write(self.config.GPON_USERNAME + '\n')
await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5)
writer.write(self.config.GPON_PASSWORD + '\n')
await asyncio.sleep(2)
initial_output = await reader.read(2048)
if 'RTK.0>' in initial_output or 'command:#' in initial_output:
writer.write('exit\n')
await asyncio.sleep(1)
await reader.read(1024)
data = {
'timestamp': datetime.now().isoformat(),
'status': 'offline',
}
writer.write('cat /proc/uptime\n')
await asyncio.sleep(0.5)
uptime_output = await reader.read(1024)
uptime = self._parse_proc_uptime(uptime_output)
if uptime:
data['uptime'] = uptime
writer.write('omcicli mib get Anig\n')
await asyncio.sleep(1.5)
anig_output = await reader.read(16384)
optical = self._parse_anig(anig_output)
if optical:
data.update(optical)
if full_collect:
logger.info("[TELNET] Full collection mode - gathering all static data")
writer.write('omcicli get sn\n')
await asyncio.sleep(0.5)
sn_output = await reader.read(1024)
sn = self._parse_serial(sn_output)
if sn:
data['serial_number'] = sn
if len(sn) >= 4:
data['vendor_id'] = sn[:4]
writer.write('omcicli mib get FecPmhd\n')
await asyncio.sleep(1)
fec_output = await reader.read(8192)
fec = self._parse_fec(fec_output)
if fec:
data.update(fec)
writer.write('omcicli mib get Ont2g\n')
await asyncio.sleep(1.5)
ont2g_output = await reader.read(16384)
device_info = self._parse_ont2g(ont2g_output)
if device_info:
data.update(device_info)
writer.write('omcicli mib get OltG\n')
await asyncio.sleep(1)
oltg_output = await reader.read(8192)
olt_info = self._parse_oltg(oltg_output)
if olt_info:
data.update(olt_info)
writer.write('omcicli mib get PptpEthUni\n')
await asyncio.sleep(1)
pptpethuni_output = await reader.read(8192)
mac_info = self._parse_pptpethuni(pptpethuni_output)
if mac_info and mac_info.get('mac_address'):
data.update(mac_info)
else:
writer.write('omcicli mib get VeipUni\n')
await asyncio.sleep(1)
veipuni_output = await reader.read(8192)
mac_info = self._parse_veipuni(veipuni_output)
if mac_info and mac_info.get('mac_address'):
data.update(mac_info)
else:
writer.write('cat /sys/class/net/eth0/address\n')
await asyncio.sleep(0.5)
sys_mac_output = await reader.read(2048)
mac = self._parse_sys_mac(sys_mac_output)
if mac:
data['mac_address'] = mac
flash_params = ['OMCI_SW_VER1', 'HW_HWVER', 'OMCI_OLT_MODE', 'LAN_SDS_MODE', 'PON_MODE', 'OMCC_VER']
for param in flash_params:
writer.write(f'flash get {param}\n')
await asyncio.sleep(0.3)
param_output = await reader.read(4096)
value = self._parse_flash_value(param_output, param)
if value:
if param == 'OMCI_SW_VER1':
data['version'] = value
elif param == 'HW_HWVER':
data['hw_version'] = value
elif param == 'OMCI_OLT_MODE':
data['olt_mode'] = self._decode_olt_mode(value)
elif param == 'LAN_SDS_MODE':
data['lan_mode'] = self._decode_lan_mode(value)
elif param == 'PON_MODE':
data['pon_mode'] = self._decode_pon_mode(value)
elif param == 'OMCC_VER':
data['omcc_version'] = hex(int(value)) if value.isdigit() else value
else:
logger.info("[TELNET] Fast collection mode - using cached static data")
with self.lock:
logger.info(f"[CACHE] self.data contains {len(self.data)} keys")
if self.data:
cached_keys = ['serial_number', 'vendor_id', 'model', 'version', 'hw_version',
'mac_address', 'olt_vendor_info', 'olt_version_info', 'olt_mode',
'lan_mode', 'pon_mode', 'omcc_version', 'fec_corrected', 'fec_uncorrected']
cached_count = 0
for key in cached_keys:
if key in self.data:
data[key] = self.data[key]
cached_count += 1
logger.info(f"[CACHE] Copied {cached_count}/{len(cached_keys)} cached fields")
else:
logger.warning("[CACHE] self.data is empty! Cache not available.")
writer.write('exit\n')
await asyncio.sleep(0.3)
try:
writer.close()
except:
pass
if data.get('rx_power') is not None and data.get('tx_power') is not None:
data['status'] = 'online'
telnet_duration = time.time() - telnet_start
logger.info(f"[TELNET] Collected {len(data)} fields (full={full_collect}) in {telnet_duration:.2f}s")
return data
except asyncio.TimeoutError:
logger.error("[TELNET] Timeout during telnet connection")
return None
except Exception as e:
logger.error(f"[TELNET] Telnet error: {e}", exc_info=True)
return None
def _parse_proc_uptime(self, output):
try:
match = re.search(r'(\d+\.\d+)', output)
if match:
return int(float(match.group(1)))
except Exception as e:
logger.error(f"Error parsing /proc/uptime: {e}")
return None
def _parse_anig(self, output):
try:
data = {}
rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output)
if rx_match:
rx_hex = int(rx_match.group(1), 16)
data['rx_power'] = self._convert_optical_power(rx_hex)
tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output)
if tx_match:
tx_hex = int(tx_match.group(1), 16)
data['tx_power'] = self._convert_optical_power(tx_hex)
return data
except Exception as e:
logger.error(f"Error parsing ANI-G: {e}")
return {}
def _parse_fec(self, output):
try:
data = {}
match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
if match:
data['fec_corrected'] = int(match.group(1), 16)
else:
data['fec_corrected'] = 0
match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
if match:
data['fec_uncorrected'] = int(match.group(1), 16)
else:
data['fec_uncorrected'] = 0
return data
except Exception as e:
logger.error(f"Error parsing FEC: {e}")
return {'fec_corrected': 0, 'fec_uncorrected': 0}
def _parse_ont2g(self, output):
try:
data = {}
match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
try:
model = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip()
if model:
data['model'] = model
except:
pass
return data
except Exception as e:
logger.error(f"Error parsing ONT2-G: {e}")
return {}
def _parse_oltg(self, output):
try:
data = {}
match = re.search(r'OltVendorId:\s*([A-Z0-9]{4})', output, re.IGNORECASE)
if match:
data['olt_vendor_info'] = match.group(1)
else:
match = re.search(r'OltVendorId:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
try:
olt_vendor = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip()
if olt_vendor:
data['olt_vendor_info'] = olt_vendor
except:
pass
match = re.search(r'Version:\s*(\d+)', output, re.IGNORECASE)
if match:
data['olt_version_info'] = match.group(1)
return data
except Exception as e:
logger.error(f"Error parsing OLT-G: {e}")
return {}
def _parse_pptpethuni(self, output):
try:
data = {}
match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
if match:
data['mac_address'] = match.group(1)
return data
match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
if len(hex_str) == 12:
mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
data['mac_address'] = mac
return data
return data
except Exception as e:
logger.error(f"Error parsing PptpEthUni: {e}")
return {}
def _parse_veipuni(self, output):
try:
data = {}
match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
if match:
data['mac_address'] = match.group(1)
return data
match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
if len(hex_str) == 12:
mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
data['mac_address'] = mac
return data
return data
except Exception as e:
logger.error(f"Error parsing VeipUni: {e}")
return {}
def _parse_sys_mac(self, output):
try:
output = output.replace('cat /sys/class/net/eth0/address', '')
output = output.replace('# ', '').strip()
match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output)
if match:
return match.group(1)
match = re.search(r'([0-9a-fA-F]{12})', output)
if match:
mac = match.group(1)
return ':'.join([mac[i:i+2] for i in range(0, 12, 2)])
except Exception as e:
logger.error(f"Error parsing /sys MAC: {e}")
return None
def _parse_serial(self, output):
try:
match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE)
if match:
return match.group(1)
except Exception as e:
logger.error(f"Error parsing serial: {e}")
return None
def _parse_flash_value(self, output, key):
try:
pattern = rf'{key}=(.+?)(?:\r|\n|$)'
match = re.search(pattern, output, re.IGNORECASE)
if match:
value = match.group(1).strip()
if value and value != 'DEFAULT':
return value
return None
except Exception as e:
logger.error(f"Error parsing flash value {key}: {e}")
return None
def _decode_olt_mode(self, value):
modes = {
'0': 'Disabled',
'1': 'Huawei',
'2': 'ZTE',
'3': 'Customized',
'21': 'Auto'
}
return modes.get(value, f'Unknown ({value})')
def _decode_lan_mode(self, value):
modes = {
'0': 'Auto',
'1': 'Fiber 1G',
'8': 'HSGMII 2.5G'
}
return modes.get(value, f'Mode {value}')
def _decode_pon_mode(self, value):
modes = {
'1': 'GPON',
'2': 'EPON'
}
return modes.get(value, f'Mode {value}')
def _convert_optical_power(self, hex_value):
if hex_value > 0x7fff:
signed_value = hex_value - 0x10000
else:
signed_value = hex_value
return signed_value * 0.002
def _check_alerts(self, data):
new_alerts = []
thresholds = self.config.THRESHOLDS
rx = data.get('rx_power')
if rx is not None:
if rx < thresholds['rx_power_min']:
new_alerts.append({
'severity': 'critical',
'category': 'optical',
'message': f'RX Power critically low: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
elif rx > thresholds['rx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'RX Power high: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
tx = data.get('tx_power')
if tx is not None:
if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'TX Power out of range: {tx:.2f} dBm',
'value': tx,
'timestamp': datetime.now().isoformat()
})
temp = data.get('temperature')
if temp is not None and temp > thresholds.get('temperature_max', 85):
new_alerts.append({
'severity': 'warning',
'category': 'temperature',
'message': f'Temperature high: {temp:.1f}°C',
'value': temp,
'timestamp': datetime.now().isoformat()
})
fec_uncor = data.get('fec_uncorrected', 0)
if fec_uncor > 0:
new_alerts.append({
'severity': 'critical',
'category': 'transmission',
'message': f'FEC Uncorrected Errors: {fec_uncor}',
'value': fec_uncor,
'timestamp': datetime.now().isoformat()
})
self.alerts = new_alerts[-20:]
if new_alerts:
logger.warning(f"Generated {len(new_alerts)} alerts")