Files
leox-gpon-monitoring/collector.py
Mateusz Gruszczyński 69711b46bc release
2026-01-02 22:31:35 +01:00

387 lines
16 KiB
Python

from aiohttp import BasicAuth
import telnetlib3
import time
import logging
from threading import Thread, Lock
from datetime import datetime
import re
import asyncio
import aiohttp
logger = logging.getLogger(__name__)
class GPONCollector:
def __init__(self, config):
self.config = config
self.data = {}
self.lock = Lock()
self.running = False
self.thread = None
self.alerts = []
def start(self):
if self.running:
return
self.running = True
self.thread = Thread(target=self._collect_loop, daemon=True)
self.thread.start()
logger.info("Collector started")
def stop(self):
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("Collector stopped")
def get_data(self):
with self.lock:
return self.data.copy()
def get_alerts(self):
with self.lock:
return self.alerts.copy()
def _collect_loop(self):
while self.running:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
data = loop.run_until_complete(self._collect_all_data())
loop.close()
if data and data.get('status') == 'online':
with self.lock:
self.data = data
self._check_alerts(data)
rx = data.get('rx_power', 0)
tx = data.get('tx_power', 0)
temp = data.get('temperature', 0)
uptime = data.get('uptime', 0)
uptime_days = uptime // 86400
logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}C Uptime={uptime}s ({uptime_days}d)")
else:
logger.warning("No data or device offline")
except Exception as e:
logger.error(f"Error in collector loop: {e}", exc_info=True)
time.sleep(self.config.POLL_INTERVAL)
async def _collect_all_data(self):
omci_data = await self._collect_omci_async()
if not omci_data:
return None
web_data = await self._scrape_web_interface()
if web_data:
for key, value in web_data.items():
if value is not None:
omci_data[key] = value
return omci_data
async def _scrape_web_interface(self):
try:
auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD)
base_url = f'http://{self.config.GPON_HOST}'
data = {}
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
try:
url = f'{base_url}/status_pon.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
temp_match = re.search(r'<th[^>]*>Temperature</th>\s*<td[^>]*>([\d.]+)\s*C', html, re.IGNORECASE)
if temp_match:
data['temperature'] = float(temp_match.group(1))
logger.debug(f"[WEB] Temperature: {data['temperature']}")
volt_match = re.search(r'<th[^>]*>Voltage</th>\s*<td[^>]*>([\d.]+)\s*V', html, re.IGNORECASE)
if volt_match:
data['voltage'] = float(volt_match.group(1))
logger.debug(f"[WEB] Voltage: {data['voltage']}")
bias_match = re.search(r'<th[^>]*>Bias Current</th>\s*<td[^>]*>([\d.]+)\s*mA', html, re.IGNORECASE)
if bias_match:
data['tx_bias_current'] = float(bias_match.group(1))
logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}")
logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}")
else:
logger.warning(f"[WEB] status_pon.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching status_pon.asp: {e}")
try:
url = f'{base_url}/status.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
uptime_match = re.search(
r'<th[^>]*>Uptime</th>\s*<td[^>]*>\s*([^<]+)</td>',
html,
re.IGNORECASE
)
if uptime_match:
uptime_str = uptime_match.group(1).strip()
logger.debug(f"[WEB] Raw uptime: '{uptime_str}'")
days_match = re.search(r'(\d+)\s*days?,\s*(\d+):(\d+)', uptime_str)
if days_match:
days = int(days_match.group(1))
hours = int(days_match.group(2))
minutes = int(days_match.group(3))
data['uptime'] = (days * 86400) + (hours * 3600) + (minutes * 60)
logger.info(f"[WEB] Uptime: {days}d {hours}h {minutes}m = {data['uptime']}s")
else:
time_match = re.search(r'(\d+):(\d+)', uptime_str)
if time_match:
hours = int(time_match.group(1))
minutes = int(time_match.group(2))
data['uptime'] = (hours * 3600) + (minutes * 60)
logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s")
else:
logger.warning(f"[WEB] Could not parse uptime: '{uptime_str}'")
else:
logger.warning("[WEB] Uptime not found in status.asp")
logger.info(f"[WEB] status.asp OK: uptime={data.get('uptime')}")
else:
logger.warning(f"[WEB] status.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching status.asp: {e}")
try:
url = f'{base_url}/admin/pon-stats.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
tx_pkts_match = re.search(r'<th[^>]*>Packets Sent:</th>\s*<td[^>]*>(\d+)</td>', html, re.IGNORECASE)
if tx_pkts_match:
data['tx_packets'] = int(tx_pkts_match.group(1))
logger.debug(f"[WEB] TX Packets: {data['tx_packets']}")
rx_pkts_match = re.search(r'<th[^>]*>Packets Received:</th>\s*<td[^>]*>(\d+)</td>', html, re.IGNORECASE)
if rx_pkts_match:
data['rx_packets'] = int(rx_pkts_match.group(1))
logger.debug(f"[WEB] RX Packets: {data['rx_packets']}")
tx_bytes_match = re.search(r'<th[^>]*>Bytes Sent:</th>\s*<td[^>]*>(\d+)</td>', html, re.IGNORECASE)
if tx_bytes_match:
data['tx_bytes'] = int(tx_bytes_match.group(1))
logger.debug(f"[WEB] TX Bytes: {data['tx_bytes']}")
rx_bytes_match = re.search(r'<th[^>]*>Bytes Received:</th>\s*<td[^>]*>(\d+)</td>', html, re.IGNORECASE)
if rx_bytes_match:
data['rx_bytes'] = int(rx_bytes_match.group(1))
logger.debug(f"[WEB] RX Bytes: {data['rx_bytes']}")
logger.info(f"[WEB] pon-stats.asp OK: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}")
else:
logger.warning(f"[WEB] pon-stats.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching pon-stats.asp: {e}")
if data:
logger.info(f"[WEB] Scraped {len(data)} fields from web interface")
else:
logger.warning("[WEB] No data scraped from web interface")
return data
except Exception as e:
logger.error(f"[WEB] Web scraping failed: {e}")
return {}
async def _collect_omci_async(self):
try:
reader, writer = await asyncio.wait_for(
telnetlib3.open_connection(
self.config.GPON_HOST,
self.config.GPON_PORT
),
timeout=10
)
await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5)
writer.write(self.config.GPON_USERNAME + '\n')
await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5)
writer.write(self.config.GPON_PASSWORD + '\n')
await asyncio.sleep(2)
initial_output = await reader.read(2048)
if 'RTK.0>' in initial_output or 'command:#' in initial_output:
writer.write('exit\n')
await asyncio.sleep(1)
await reader.read(1024)
writer.write('omcicli mib get all\n')
await asyncio.sleep(3)
full_output = await reader.read(102400)
writer.write('exit\n')
await asyncio.sleep(0.3)
try:
writer.close()
except:
pass
logger.debug(f"[OMCI] Received {len(full_output)} bytes")
return self._parse_omci(full_output)
except asyncio.TimeoutError:
logger.error("[OMCI] Timeout during telnet connection")
return None
except Exception as e:
logger.error(f"[OMCI] Telnet error: {e}", exc_info=True)
return None
def _parse_omci(self, raw_data):
try:
with open('/tmp/omci_debug.txt', 'w') as f:
f.write(raw_data)
except:
pass
data = {
'timestamp': datetime.now().isoformat(),
'status': 'offline',
}
try:
match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', raw_data)
if match:
rx_hex = int(match.group(1), 16)
data['rx_power'] = self._convert_optical_power(rx_hex)
match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', raw_data)
if match:
tx_hex = int(match.group(1), 16)
data['tx_power'] = self._convert_optical_power(tx_hex)
match = re.search(r'SerialNum:\s*(\S+)', raw_data)
if match:
data['serial_number'] = match.group(1)
match = re.search(r'Version:\s*(M\d+\S*)', raw_data)
if match:
data['version'] = match.group(1)
match = re.search(r'VID:\s*(\S+)', raw_data)
if match:
data['vendor_id'] = match.group(1)
match = re.search(r'OltVendorId:\s*(\S+)', raw_data)
if match:
data['olt_vendor_info'] = match.group(1)
match = re.search(r'OltG.*?Version:\s*(\d+)', raw_data, re.DOTALL)
if match:
data['olt_version_info'] = match.group(1)
match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', raw_data)
if match:
data['mac_address'] = match.group(1)
fec_section = re.search(
r'FecPmhd.*?CorCodeWords:\s*(0x[0-9a-fA-F]+).*?UncorCodeWords:\s*(0x[0-9a-fA-F]+)',
raw_data,
re.DOTALL
)
if fec_section:
data['fec_corrected'] = int(fec_section.group(1), 16)
data['fec_uncorrected'] = int(fec_section.group(2), 16)
else:
data['fec_corrected'] = 0
data['fec_uncorrected'] = 0
if data.get('rx_power') is not None and data.get('tx_power') is not None:
data['status'] = 'online'
except Exception as e:
logger.error(f"[OMCI] Parsing error: {e}", exc_info=True)
return data
def _convert_optical_power(self, hex_value):
if hex_value > 0x7fff:
signed_value = hex_value - 0x10000
else:
signed_value = hex_value
return signed_value * 0.002
def _check_alerts(self, data):
new_alerts = []
thresholds = self.config.THRESHOLDS
rx = data.get('rx_power')
if rx is not None:
if rx < thresholds['rx_power_min']:
new_alerts.append({
'severity': 'critical',
'category': 'optical',
'message': f'RX Power critically low: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
elif rx > thresholds['rx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'RX Power high: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
tx = data.get('tx_power')
if tx is not None:
if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'TX Power out of range: {tx:.2f} dBm',
'value': tx,
'timestamp': datetime.now().isoformat()
})
temp = data.get('temperature')
if temp is not None and temp > thresholds.get('temperature_max', 85):
new_alerts.append({
'severity': 'warning',
'category': 'temperature',
'message': f'Temperature high: {temp:.1f}C',
'value': temp,
'timestamp': datetime.now().isoformat()
})
fec_uncor = data.get('fec_uncorrected', 0)
if fec_uncor > 0:
new_alerts.append({
'severity': 'critical',
'category': 'transmission',
'message': f'FEC Uncorrected Errors: {fec_uncor}',
'value': fec_uncor,
'timestamp': datetime.now().isoformat()
})
self.alerts = new_alerts[-20:]
if new_alerts:
logger.warning(f"Generated {len(new_alerts)} alerts")