Files
leox-gpon-monitoring/tests/collector_opti_1.py
2026-01-07 13:23:40 +01:00

509 lines
19 KiB
Python

from aiohttp import BasicAuth
import telnetlib3
import time
import logging
from threading import Thread, Lock
from datetime import datetime
import re
import asyncio
import aiohttp
logger = logging.getLogger(__name__)
class GPONCollector:
def __init__(self, config):
self.config = config
self.data = {}
self.lock = Lock()
self.running = False
self.thread = None
self.alerts = []
def start(self):
if self.running:
return
self.running = True
self.thread = Thread(target=self._collect_loop, daemon=True)
self.thread.start()
logger.info("Collector started")
def stop(self):
self.running = False
if self.thread:
self.thread.join(timeout=5)
logger.info("Collector stopped")
def get_data(self):
with self.lock:
return self.data.copy()
def get_alerts(self):
with self.lock:
return self.alerts.copy()
def _collect_loop(self):
while self.running:
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
data = loop.run_until_complete(self._collect_all_data())
loop.close()
if data and data.get('status') == 'online':
with self.lock:
self.data = data
self._check_alerts(data)
rx = data.get('rx_power', 0)
tx = data.get('tx_power', 0)
temp = data.get('temperature', 0)
uptime = data.get('uptime', 0)
uptime_days = uptime // 86400
logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d")
else:
logger.warning("No data or device offline")
except Exception as e:
logger.error(f"Error in collector loop: {e}", exc_info=True)
time.sleep(self.config.POLL_INTERVAL)
async def _collect_all_data(self):
omci_data = await self._collect_omci_async()
if not omci_data:
return None
web_data = await self._scrape_web_interface()
if web_data:
for key, value in web_data.items():
if value is not None:
omci_data[key] = value
return omci_data
async def _scrape_web_interface(self):
try:
auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD)
base_url = f'http://{self.config.GPON_HOST}'
data = {}
connector = aiohttp.TCPConnector(ssl=False)
timeout = aiohttp.ClientTimeout(total=10, connect=5)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
try:
url = f'{base_url}/status_pon.asp'
async with session.get(url, auth=auth) as response:
if response.status == 200:
html = await response.text()
temp_match = re.search(r'<th[^>]*>Temperature</th>\s*<td[^>]*>([\d.]+)\s*C', html, re.IGNORECASE)
if temp_match:
data['temperature'] = float(temp_match.group(1))
logger.debug(f"[WEB] Temperature: {data['temperature']}")
volt_match = re.search(r'<th[^>]*>Voltage</th>\s*<td[^>]*>([\d.]+)\s*V', html, re.IGNORECASE)
if volt_match:
data['voltage'] = float(volt_match.group(1))
logger.debug(f"[WEB] Voltage: {data['voltage']}")
bias_match = re.search(r'<th[^>]*>Bias Current</th>\s*<td[^>]*>([\d.]+)\s*mA', html, re.IGNORECASE)
if bias_match:
data['tx_bias_current'] = float(bias_match.group(1))
logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}")
logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}")
else:
logger.warning(f"[WEB] status_pon.asp returned {response.status}")
except Exception as e:
logger.error(f"[WEB] Error fetching status_pon.asp: {e}")
if data:
logger.info(f"[WEB] Scraped {len(data)} fields from web interface")
else:
logger.warning("[WEB] No data scraped from web interface")
return data
except Exception as e:
logger.error(f"[WEB] Web scraping failed: {e}")
return {}
async def _collect_omci_async(self):
try:
reader, writer = await asyncio.wait_for(
telnetlib3.open_connection(
self.config.GPON_HOST,
self.config.GPON_PORT
),
timeout=10
)
await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5)
writer.write(self.config.GPON_USERNAME + '\n')
await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5)
writer.write(self.config.GPON_PASSWORD + '\n')
await asyncio.sleep(2)
initial_output = await reader.read(2048)
if 'RTK.0>' in initial_output or 'command:#' in initial_output:
writer.write('exit\n')
await asyncio.sleep(1)
await reader.read(1024)
data = {
'timestamp': datetime.now().isoformat(),
'status': 'offline',
}
writer.write('cat /proc/uptime\n')
await asyncio.sleep(0.5)
uptime_output = await reader.read(1024)
uptime = self._parse_proc_uptime(uptime_output)
if uptime:
data['uptime'] = uptime
logger.debug(f"[TELNET] Uptime from /proc: {uptime}s")
writer.write('cat /proc/net/dev\n')
await asyncio.sleep(0.5)
netdev_output = await reader.read(4096)
traffic = self._parse_proc_net_dev(netdev_output)
if traffic:
data.update(traffic)
logger.debug(f"[TELNET] Traffic: RX={traffic.get('rx_bytes')} TX={traffic.get('tx_bytes')}")
writer.write('omcicli get sn\n')
await asyncio.sleep(0.5)
sn_output = await reader.read(1024)
sn = self._parse_serial(sn_output)
if sn:
data['serial_number'] = sn
logger.debug(f"[TELNET] Serial: {sn}")
writer.write('omcicli mib get Anig\n')
await asyncio.sleep(1.5)
anig_output = await reader.read(16384)
optical = self._parse_anig(anig_output)
if optical:
data.update(optical)
logger.debug(f"[TELNET] Optical: RX={optical.get('rx_power')} TX={optical.get('tx_power')}")
writer.write('omcicli mib get FecPmhd\n')
await asyncio.sleep(1)
fec_output = await reader.read(8192)
fec = self._parse_fec(fec_output)
if fec:
data.update(fec)
logger.debug(f"[TELNET] FEC: corrected={fec.get('fec_corrected')} uncorrected={fec.get('fec_uncorrected')}")
writer.write('omcicli mib get Ont2g\n')
await asyncio.sleep(1.5)
ont2g_output = await reader.read(16384)
device_info = self._parse_ont2g(ont2g_output)
if device_info:
data.update(device_info)
logger.debug(f"[TELNET] Device: vendor={device_info.get('vendor_id')} version={device_info.get('version')}")
writer.write('omcicli mib get OltG\n')
await asyncio.sleep(1)
oltg_output = await reader.read(8192)
olt_info = self._parse_oltg(oltg_output)
if olt_info:
data.update(olt_info)
logger.debug(f"[TELNET] OLT: vendor={olt_info.get('olt_vendor_info')} version={olt_info.get('olt_version_info')}")
writer.write('omcicli mib get PptpEthUni\n')
await asyncio.sleep(1)
pptpethuni_output = await reader.read(8192)
mac_info = self._parse_pptpethuni(pptpethuni_output)
if mac_info and mac_info.get('mac_address'):
data.update(mac_info)
logger.debug(f"[TELNET] MAC from PptpEthUni: {mac_info.get('mac_address')}")
else:
writer.write('omcicli mib get VeipUni\n')
await asyncio.sleep(1)
veipuni_output = await reader.read(8192)
mac_info = self._parse_veipuni(veipuni_output)
if mac_info and mac_info.get('mac_address'):
data.update(mac_info)
logger.debug(f"[TELNET] MAC from VeipUni: {mac_info.get('mac_address')}")
else:
writer.write('cat /sys/class/net/eth0/address\n')
await asyncio.sleep(0.5)
sys_mac_output = await reader.read(2048)
mac = self._parse_sys_mac(sys_mac_output)
if mac:
data['mac_address'] = mac
logger.debug(f"[TELNET] MAC from /sys: {mac}")
writer.write('exit\n')
await asyncio.sleep(0.3)
try:
writer.close()
except:
pass
if data.get('rx_power') is not None and data.get('tx_power') is not None:
data['status'] = 'online'
logger.info(f"[TELNET] Collected {len(data)} fields")
return data
except asyncio.TimeoutError:
logger.error("[TELNET] Timeout during telnet connection")
return None
except Exception as e:
logger.error(f"[TELNET] Telnet error: {e}", exc_info=True)
return None
def _parse_proc_uptime(self, output):
try:
match = re.search(r'(\d+\.\d+)', output)
if match:
return int(float(match.group(1)))
except Exception as e:
logger.error(f"Error parsing /proc/uptime: {e}")
return None
def _parse_proc_net_dev(self, output):
try:
data = {}
for line in output.split('\n'):
if 'eth0:' in line or 'br0:' in line:
parts = line.split()
if len(parts) >= 10:
data['rx_bytes'] = int(parts[1])
data['tx_bytes'] = int(parts[9])
data['rx_packets'] = int(parts[2])
data['tx_packets'] = int(parts[10])
break
return data
except Exception as e:
logger.error(f"Error parsing /proc/net/dev: {e}")
return {}
def _parse_anig(self, output):
try:
data = {}
rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output)
if rx_match:
rx_hex = int(rx_match.group(1), 16)
data['rx_power'] = self._convert_optical_power(rx_hex)
tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output)
if tx_match:
tx_hex = int(tx_match.group(1), 16)
data['tx_power'] = self._convert_optical_power(tx_hex)
return data
except Exception as e:
logger.error(f"Error parsing ANI-G: {e}")
return {}
def _parse_fec(self, output):
try:
data = {}
match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
if match:
data['fec_corrected'] = int(match.group(1), 16)
else:
data['fec_corrected'] = 0
match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
if match:
data['fec_uncorrected'] = int(match.group(1), 16)
else:
data['fec_uncorrected'] = 0
return data
except Exception as e:
logger.error(f"Error parsing FEC: {e}")
return {'fec_corrected': 0, 'fec_uncorrected': 0}
def _parse_ont2g(self, output):
try:
data = {}
match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
try:
vendor_version = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00')
if vendor_version:
data['vendor_id'] = 'ALCL'
data['version'] = vendor_version
except:
pass
match = re.search(r'VendorId:\s*(\S+)', output, re.IGNORECASE)
if match:
data['vendor_id'] = match.group(1)
match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE)
if match:
data['version'] = match.group(1)
return data
except Exception as e:
logger.error(f"Error parsing ONT2-G: {e}")
return {}
def _parse_oltg(self, output):
try:
data = {}
match = re.search(r'OltVendorId:\s*(\S+)', output, re.IGNORECASE)
if match:
data['olt_vendor_info'] = match.group(1)
match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE)
if match:
data['olt_version_info'] = match.group(1)
return data
except Exception as e:
logger.error(f"Error parsing OLT-G: {e}")
return {}
def _parse_pptpethuni(self, output):
try:
data = {}
match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
if match:
data['mac_address'] = match.group(1)
return data
match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
if len(hex_str) == 12:
mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
data['mac_address'] = mac
return data
return data
except Exception as e:
logger.error(f"Error parsing PptpEthUni: {e}")
return {}
def _parse_veipuni(self, output):
try:
data = {}
match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
if match:
data['mac_address'] = match.group(1)
return data
match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
if match:
hex_str = match.group(1)
if len(hex_str) == 12:
mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
data['mac_address'] = mac
return data
return data
except Exception as e:
logger.error(f"Error parsing VeipUni: {e}")
return {}
def _parse_sys_mac(self, output):
try:
output = output.replace('cat /sys/class/net/eth0/address', '')
output = output.replace('# ', '').strip()
match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output)
if match:
return match.group(1)
match = re.search(r'([0-9a-fA-F]{12})', output)
if match:
mac = match.group(1)
return ':'.join([mac[i:i+2] for i in range(0, 12, 2)])
except Exception as e:
logger.error(f"Error parsing /sys MAC: {e}")
return None
def _parse_serial(self, output):
try:
match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE)
if match:
return match.group(1)
except Exception as e:
logger.error(f"Error parsing serial: {e}")
return None
def _convert_optical_power(self, hex_value):
if hex_value > 0x7fff:
signed_value = hex_value - 0x10000
else:
signed_value = hex_value
return signed_value * 0.002
def _check_alerts(self, data):
new_alerts = []
thresholds = self.config.THRESHOLDS
rx = data.get('rx_power')
if rx is not None:
if rx < thresholds['rx_power_min']:
new_alerts.append({
'severity': 'critical',
'category': 'optical',
'message': f'RX Power critically low: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
elif rx > thresholds['rx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'RX Power high: {rx:.2f} dBm',
'value': rx,
'timestamp': datetime.now().isoformat()
})
tx = data.get('tx_power')
if tx is not None:
if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']:
new_alerts.append({
'severity': 'warning',
'category': 'optical',
'message': f'TX Power out of range: {tx:.2f} dBm',
'value': tx,
'timestamp': datetime.now().isoformat()
})
temp = data.get('temperature')
if temp is not None and temp > thresholds.get('temperature_max', 85):
new_alerts.append({
'severity': 'warning',
'category': 'temperature',
'message': f'Temperature high: {temp:.1f}°C',
'value': temp,
'timestamp': datetime.now().isoformat()
})
fec_uncor = data.get('fec_uncorrected', 0)
if fec_uncor > 0:
new_alerts.append({
'severity': 'critical',
'category': 'transmission',
'message': f'FEC Uncorrected Errors: {fec_uncor}',
'value': fec_uncor,
'timestamp': datetime.now().isoformat()
})
self.alerts = new_alerts[-20:]
if new_alerts:
logger.warning(f"Generated {len(new_alerts)} alerts")