- | RX Packets: | -- |
+ | PON Mode: | -- |
+ | LAN Mode: | -- |
+ | RX Packets: | -- |
| TX Packets: | -- |
| FEC Corrected: | -- |
| FEC Uncorrected: | -- |
@@ -181,6 +187,7 @@
+
diff --git a/tests/collector_old.py b/tests/collector_old.py
new file mode 100644
index 0000000..7043e99
--- /dev/null
+++ b/tests/collector_old.py
@@ -0,0 +1,399 @@
+from aiohttp import BasicAuth
+import telnetlib3
+import time
+import logging
+from threading import Thread, Lock
+from datetime import datetime
+import re
+import asyncio
+import aiohttp
+
+logger = logging.getLogger(__name__)
+
+class GPONCollector:
+ def __init__(self, config):
+ self.config = config
+ self.data = {}
+ self.lock = Lock()
+ self.running = False
+ self.thread = None
+ self.alerts = []
+
+ def start(self):
+ if self.running:
+ return
+
+ self.running = True
+ self.thread = Thread(target=self._collect_loop, daemon=True)
+ self.thread.start()
+ logger.info("Collector started")
+
+ def stop(self):
+ self.running = False
+ if self.thread:
+ self.thread.join(timeout=5)
+ logger.info("Collector stopped")
+
+ def get_data(self):
+ with self.lock:
+ return self.data.copy()
+
+ def get_alerts(self):
+ with self.lock:
+ return self.alerts.copy()
+
+ def _collect_loop(self):
+ while self.running:
+ try:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ data = loop.run_until_complete(self._collect_all_data())
+ loop.close()
+
+ if data and data.get('status') == 'online':
+ with self.lock:
+ self.data = data
+ self._check_alerts(data)
+
+ rx = data.get('rx_power', 0)
+ tx = data.get('tx_power', 0)
+ temp = data.get('temperature', 0)
+ uptime = data.get('uptime', 0)
+ uptime_days = uptime // 86400
+ logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}C Uptime={uptime}s ({uptime_days}d)")
+ else:
+ logger.warning("No data or device offline")
+
+ except Exception as e:
+ logger.error(f"Error in collector loop: {e}", exc_info=True)
+
+ time.sleep(self.config.POLL_INTERVAL)
+
+ async def _collect_all_data(self):
+ omci_data = await self._collect_omci_async()
+
+ if not omci_data:
+ return None
+
+ web_data = await self._scrape_web_interface()
+
+ if web_data:
+ for key, value in web_data.items():
+ if value is not None:
+ omci_data[key] = value
+
+ return omci_data
+
+ async def _scrape_web_interface(self):
+ try:
+ auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD)
+ base_url = f'http://{self.config.GPON_HOST}'
+
+ data = {}
+ connector = aiohttp.TCPConnector(ssl=False)
+ timeout = aiohttp.ClientTimeout(total=10, connect=5)
+
+ async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
+
+ try:
+ url = f'{base_url}/status_pon.asp'
+ async with session.get(url, auth=auth) as response:
+ if response.status == 200:
+ html = await response.text()
+
+ temp_match = re.search(r'
]*>Temperature | \s*]*>([\d.]+)\s*C', html, re.IGNORECASE)
+ if temp_match:
+ data['temperature'] = float(temp_match.group(1))
+ logger.debug(f"[WEB] Temperature: {data['temperature']}")
+
+ volt_match = re.search(r' | ]*>Voltage | \s*]*>([\d.]+)\s*V', html, re.IGNORECASE)
+ if volt_match:
+ data['voltage'] = float(volt_match.group(1))
+ logger.debug(f"[WEB] Voltage: {data['voltage']}")
+
+ bias_match = re.search(r' | ]*>Bias Current | \s*]*>([\d.]+)\s*mA', html, re.IGNORECASE)
+ if bias_match:
+ data['tx_bias_current'] = float(bias_match.group(1))
+ logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}")
+
+ logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}")
+ else:
+ logger.warning(f"[WEB] status_pon.asp returned {response.status}")
+ except Exception as e:
+ logger.error(f"[WEB] Error fetching status_pon.asp: {e}")
+
+ try:
+ url = f'{base_url}/status.asp'
+ async with session.get(url, auth=auth) as response:
+ if response.status == 200:
+ html = await response.text()
+
+ uptime_match = re.search(
+ r' | ]*>Uptime | \s*]*>\s*([^<]+) | ',
+ html,
+ re.IGNORECASE
+ )
+ if uptime_match:
+ uptime_str = uptime_match.group(1).strip()
+ logger.debug(f"[WEB] Raw uptime: '{uptime_str}'")
+
+ days_min_match = re.search(r'(\d+)\s*days?,\s*(\d+)\s*min', uptime_str, re.IGNORECASE)
+ if days_min_match:
+ days = int(days_min_match.group(1))
+ minutes = int(days_min_match.group(2))
+ data['uptime'] = (days * 86400) + (minutes * 60)
+ logger.info(f"[WEB] Uptime: {days}d {minutes}m = {data['uptime']}s")
+
+ elif days_hm_match := re.search(r'(\d+)\s*days?,\s*(\d+):(\d+)', uptime_str):
+ days = int(days_hm_match.group(1))
+ hours = int(days_hm_match.group(2))
+ minutes = int(days_hm_match.group(3))
+ data['uptime'] = (days * 86400) + (hours * 3600) + (minutes * 60)
+ logger.info(f"[WEB] Uptime: {days}d {hours}h {minutes}m = {data['uptime']}s")
+
+ elif hm_match := re.search(r'(\d+)\s*h(?:our)?[s]?,?\s*(\d+)\s*min', uptime_str, re.IGNORECASE):
+ hours = int(hm_match.group(1))
+ minutes = int(hm_match.group(2))
+ data['uptime'] = (hours * 3600) + (minutes * 60)
+ logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s")
+
+ elif time_match := re.search(r'^(\d+):(\d+)$', uptime_str):
+ hours = int(time_match.group(1))
+ minutes = int(time_match.group(2))
+ data['uptime'] = (hours * 3600) + (minutes * 60)
+ logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s")
+
+ else:
+ logger.warning(f"[WEB] Could not parse uptime: '{uptime_str}'")
+ else:
+ logger.warning("[WEB] Uptime not found in status.asp")
+
+ logger.info(f"[WEB] status.asp OK: uptime={data.get('uptime')}")
+ else:
+ logger.warning(f"[WEB] status.asp returned {response.status}")
+ except Exception as e:
+ logger.error(f"[WEB] Error fetching status.asp: {e}")
+
+ try:
+ url = f'{base_url}/admin/pon-stats.asp'
+ async with session.get(url, auth=auth) as response:
+ if response.status == 200:
+ html = await response.text()
+
+ tx_pkts_match = re.search(r']*>Packets Sent: | \s*]*>(\d+) | ', html, re.IGNORECASE)
+ if tx_pkts_match:
+ data['tx_packets'] = int(tx_pkts_match.group(1))
+ logger.debug(f"[WEB] TX Packets: {data['tx_packets']}")
+
+ rx_pkts_match = re.search(r']*>Packets Received: | \s*]*>(\d+) | ', html, re.IGNORECASE)
+ if rx_pkts_match:
+ data['rx_packets'] = int(rx_pkts_match.group(1))
+ logger.debug(f"[WEB] RX Packets: {data['rx_packets']}")
+
+ tx_bytes_match = re.search(r']*>Bytes Sent: | \s*]*>(\d+) | ', html, re.IGNORECASE)
+ if tx_bytes_match:
+ data['tx_bytes'] = int(tx_bytes_match.group(1))
+ logger.debug(f"[WEB] TX Bytes: {data['tx_bytes']}")
+
+ rx_bytes_match = re.search(r']*>Bytes Received: | \s*]*>(\d+) | ', html, re.IGNORECASE)
+ if rx_bytes_match:
+ data['rx_bytes'] = int(rx_bytes_match.group(1))
+ logger.debug(f"[WEB] RX Bytes: {data['rx_bytes']}")
+
+ logger.info(f"[WEB] pon-stats.asp OK: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}")
+ else:
+ logger.warning(f"[WEB] pon-stats.asp returned {response.status}")
+ except Exception as e:
+ logger.error(f"[WEB] Error fetching pon-stats.asp: {e}")
+
+ if data:
+ logger.info(f"[WEB] Scraped {len(data)} fields from web interface")
+ else:
+ logger.warning("[WEB] No data scraped from web interface")
+
+ return data
+
+ except Exception as e:
+ logger.error(f"[WEB] Web scraping failed: {e}")
+ return {}
+
+
+ async def _collect_omci_async(self):
+ try:
+ reader, writer = await asyncio.wait_for(
+ telnetlib3.open_connection(
+ self.config.GPON_HOST,
+ self.config.GPON_PORT
+ ),
+ timeout=10
+ )
+
+ await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5)
+ writer.write(self.config.GPON_USERNAME + '\n')
+
+ await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5)
+ writer.write(self.config.GPON_PASSWORD + '\n')
+
+ await asyncio.sleep(2)
+ initial_output = await reader.read(2048)
+
+ if 'RTK.0>' in initial_output or 'command:#' in initial_output:
+ writer.write('exit\n')
+ await asyncio.sleep(1)
+ await reader.read(1024)
+
+ writer.write('omcicli mib get all\n')
+ await asyncio.sleep(3)
+ full_output = await reader.read(102400)
+
+ writer.write('exit\n')
+ await asyncio.sleep(0.3)
+
+ try:
+ writer.close()
+ except:
+ pass
+
+ logger.debug(f"[OMCI] Received {len(full_output)} bytes")
+
+ return self._parse_omci(full_output)
+
+ except asyncio.TimeoutError:
+ logger.error("[OMCI] Timeout during telnet connection")
+ return None
+ except Exception as e:
+ logger.error(f"[OMCI] Telnet error: {e}", exc_info=True)
+ return None
+
+ def _parse_omci(self, raw_data):
+ try:
+ with open('/tmp/omci_debug.txt', 'w') as f:
+ f.write(raw_data)
+ except:
+ pass
+
+ data = {
+ 'timestamp': datetime.now().isoformat(),
+ 'status': 'offline',
+ }
+
+ try:
+ match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', raw_data)
+ if match:
+ rx_hex = int(match.group(1), 16)
+ data['rx_power'] = self._convert_optical_power(rx_hex)
+
+ match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', raw_data)
+ if match:
+ tx_hex = int(match.group(1), 16)
+ data['tx_power'] = self._convert_optical_power(tx_hex)
+
+ match = re.search(r'SerialNum:\s*(\S+)', raw_data)
+ if match:
+ data['serial_number'] = match.group(1)
+
+ match = re.search(r'Version:\s*(M\d+\S*)', raw_data)
+ if match:
+ data['version'] = match.group(1)
+
+ match = re.search(r'VID:\s*(\S+)', raw_data)
+ if match:
+ data['vendor_id'] = match.group(1)
+
+ match = re.search(r'OltVendorId:\s*(\S+)', raw_data)
+ if match:
+ data['olt_vendor_info'] = match.group(1)
+
+ match = re.search(r'OltG.*?Version:\s*(\d+)', raw_data, re.DOTALL)
+ if match:
+ data['olt_version_info'] = match.group(1)
+
+ match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', raw_data)
+ if match:
+ data['mac_address'] = match.group(1)
+
+ fec_section = re.search(
+ r'FecPmhd.*?CorCodeWords:\s*(0x[0-9a-fA-F]+).*?UncorCodeWords:\s*(0x[0-9a-fA-F]+)',
+ raw_data,
+ re.DOTALL
+ )
+ if fec_section:
+ data['fec_corrected'] = int(fec_section.group(1), 16)
+ data['fec_uncorrected'] = int(fec_section.group(2), 16)
+ else:
+ data['fec_corrected'] = 0
+ data['fec_uncorrected'] = 0
+
+ if data.get('rx_power') is not None and data.get('tx_power') is not None:
+ data['status'] = 'online'
+
+ except Exception as e:
+ logger.error(f"[OMCI] Parsing error: {e}", exc_info=True)
+
+ return data
+
+ def _convert_optical_power(self, hex_value):
+ if hex_value > 0x7fff:
+ signed_value = hex_value - 0x10000
+ else:
+ signed_value = hex_value
+ return signed_value * 0.002
+
+ def _check_alerts(self, data):
+ new_alerts = []
+ thresholds = self.config.THRESHOLDS
+
+ rx = data.get('rx_power')
+ if rx is not None:
+ if rx < thresholds['rx_power_min']:
+ new_alerts.append({
+ 'severity': 'critical',
+ 'category': 'optical',
+ 'message': f'RX Power critically low: {rx:.2f} dBm',
+ 'value': rx,
+ 'timestamp': datetime.now().isoformat()
+ })
+ elif rx > thresholds['rx_power_max']:
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'optical',
+ 'message': f'RX Power high: {rx:.2f} dBm',
+ 'value': rx,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ tx = data.get('tx_power')
+ if tx is not None:
+ if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']:
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'optical',
+ 'message': f'TX Power out of range: {tx:.2f} dBm',
+ 'value': tx,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ temp = data.get('temperature')
+ if temp is not None and temp > thresholds.get('temperature_max', 85):
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'temperature',
+ 'message': f'Temperature high: {temp:.1f}C',
+ 'value': temp,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ fec_uncor = data.get('fec_uncorrected', 0)
+ if fec_uncor > 0:
+ new_alerts.append({
+ 'severity': 'critical',
+ 'category': 'transmission',
+ 'message': f'FEC Uncorrected Errors: {fec_uncor}',
+ 'value': fec_uncor,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ self.alerts = new_alerts[-20:]
+
+ if new_alerts:
+ logger.warning(f"Generated {len(new_alerts)} alerts")
\ No newline at end of file
diff --git a/tests/collector_opti_1.py b/tests/collector_opti_1.py
new file mode 100644
index 0000000..ac47536
--- /dev/null
+++ b/tests/collector_opti_1.py
@@ -0,0 +1,508 @@
+from aiohttp import BasicAuth
+import telnetlib3
+import time
+import logging
+from threading import Thread, Lock
+from datetime import datetime
+import re
+import asyncio
+import aiohttp
+
+logger = logging.getLogger(__name__)
+
+class GPONCollector:
+ def __init__(self, config):
+ self.config = config
+ self.data = {}
+ self.lock = Lock()
+ self.running = False
+ self.thread = None
+ self.alerts = []
+
+ def start(self):
+ if self.running:
+ return
+ self.running = True
+ self.thread = Thread(target=self._collect_loop, daemon=True)
+ self.thread.start()
+ logger.info("Collector started")
+
+ def stop(self):
+ self.running = False
+ if self.thread:
+ self.thread.join(timeout=5)
+ logger.info("Collector stopped")
+
+ def get_data(self):
+ with self.lock:
+ return self.data.copy()
+
+ def get_alerts(self):
+ with self.lock:
+ return self.alerts.copy()
+
+ def _collect_loop(self):
+ while self.running:
+ try:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ data = loop.run_until_complete(self._collect_all_data())
+ loop.close()
+
+ if data and data.get('status') == 'online':
+ with self.lock:
+ self.data = data
+ self._check_alerts(data)
+
+ rx = data.get('rx_power', 0)
+ tx = data.get('tx_power', 0)
+ temp = data.get('temperature', 0)
+ uptime = data.get('uptime', 0)
+ uptime_days = uptime // 86400
+ logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d")
+ else:
+ logger.warning("No data or device offline")
+
+ except Exception as e:
+ logger.error(f"Error in collector loop: {e}", exc_info=True)
+
+ time.sleep(self.config.POLL_INTERVAL)
+
+ async def _collect_all_data(self):
+ omci_data = await self._collect_omci_async()
+ if not omci_data:
+ return None
+
+ web_data = await self._scrape_web_interface()
+ if web_data:
+ for key, value in web_data.items():
+ if value is not None:
+ omci_data[key] = value
+
+ return omci_data
+
+ async def _scrape_web_interface(self):
+ try:
+ auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD)
+ base_url = f'http://{self.config.GPON_HOST}'
+ data = {}
+
+ connector = aiohttp.TCPConnector(ssl=False)
+ timeout = aiohttp.ClientTimeout(total=10, connect=5)
+
+ async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
+ try:
+ url = f'{base_url}/status_pon.asp'
+ async with session.get(url, auth=auth) as response:
+ if response.status == 200:
+ html = await response.text()
+
+ temp_match = re.search(r']*>Temperature | \s*]*>([\d.]+)\s*C', html, re.IGNORECASE)
+ if temp_match:
+ data['temperature'] = float(temp_match.group(1))
+ logger.debug(f"[WEB] Temperature: {data['temperature']}")
+
+ volt_match = re.search(r' | ]*>Voltage | \s*]*>([\d.]+)\s*V', html, re.IGNORECASE)
+ if volt_match:
+ data['voltage'] = float(volt_match.group(1))
+ logger.debug(f"[WEB] Voltage: {data['voltage']}")
+
+ bias_match = re.search(r' | ]*>Bias Current | \s*]*>([\d.]+)\s*mA', html, re.IGNORECASE)
+ if bias_match:
+ data['tx_bias_current'] = float(bias_match.group(1))
+ logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}")
+
+ logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}")
+ else:
+ logger.warning(f"[WEB] status_pon.asp returned {response.status}")
+ except Exception as e:
+ logger.error(f"[WEB] Error fetching status_pon.asp: {e}")
+
+ if data:
+ logger.info(f"[WEB] Scraped {len(data)} fields from web interface")
+ else:
+ logger.warning("[WEB] No data scraped from web interface")
+
+ return data
+
+ except Exception as e:
+ logger.error(f"[WEB] Web scraping failed: {e}")
+ return {}
+
+ async def _collect_omci_async(self):
+ try:
+ reader, writer = await asyncio.wait_for(
+ telnetlib3.open_connection(
+ self.config.GPON_HOST,
+ self.config.GPON_PORT
+ ),
+ timeout=10
+ )
+
+ await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5)
+ writer.write(self.config.GPON_USERNAME + '\n')
+
+ await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5)
+ writer.write(self.config.GPON_PASSWORD + '\n')
+
+ await asyncio.sleep(2)
+ initial_output = await reader.read(2048)
+
+ if 'RTK.0>' in initial_output or 'command:#' in initial_output:
+ writer.write('exit\n')
+ await asyncio.sleep(1)
+ await reader.read(1024)
+
+ data = {
+ 'timestamp': datetime.now().isoformat(),
+ 'status': 'offline',
+ }
+
+ writer.write('cat /proc/uptime\n')
+ await asyncio.sleep(0.5)
+ uptime_output = await reader.read(1024)
+ uptime = self._parse_proc_uptime(uptime_output)
+ if uptime:
+ data['uptime'] = uptime
+ logger.debug(f"[TELNET] Uptime from /proc: {uptime}s")
+
+ writer.write('cat /proc/net/dev\n')
+ await asyncio.sleep(0.5)
+ netdev_output = await reader.read(4096)
+ traffic = self._parse_proc_net_dev(netdev_output)
+ if traffic:
+ data.update(traffic)
+ logger.debug(f"[TELNET] Traffic: RX={traffic.get('rx_bytes')} TX={traffic.get('tx_bytes')}")
+
+ writer.write('omcicli get sn\n')
+ await asyncio.sleep(0.5)
+ sn_output = await reader.read(1024)
+ sn = self._parse_serial(sn_output)
+ if sn:
+ data['serial_number'] = sn
+ logger.debug(f"[TELNET] Serial: {sn}")
+
+ writer.write('omcicli mib get Anig\n')
+ await asyncio.sleep(1.5)
+ anig_output = await reader.read(16384)
+ optical = self._parse_anig(anig_output)
+ if optical:
+ data.update(optical)
+ logger.debug(f"[TELNET] Optical: RX={optical.get('rx_power')} TX={optical.get('tx_power')}")
+
+ writer.write('omcicli mib get FecPmhd\n')
+ await asyncio.sleep(1)
+ fec_output = await reader.read(8192)
+ fec = self._parse_fec(fec_output)
+ if fec:
+ data.update(fec)
+ logger.debug(f"[TELNET] FEC: corrected={fec.get('fec_corrected')} uncorrected={fec.get('fec_uncorrected')}")
+
+ writer.write('omcicli mib get Ont2g\n')
+ await asyncio.sleep(1.5)
+ ont2g_output = await reader.read(16384)
+ device_info = self._parse_ont2g(ont2g_output)
+ if device_info:
+ data.update(device_info)
+ logger.debug(f"[TELNET] Device: vendor={device_info.get('vendor_id')} version={device_info.get('version')}")
+
+ writer.write('omcicli mib get OltG\n')
+ await asyncio.sleep(1)
+ oltg_output = await reader.read(8192)
+ olt_info = self._parse_oltg(oltg_output)
+ if olt_info:
+ data.update(olt_info)
+ logger.debug(f"[TELNET] OLT: vendor={olt_info.get('olt_vendor_info')} version={olt_info.get('olt_version_info')}")
+
+ writer.write('omcicli mib get PptpEthUni\n')
+ await asyncio.sleep(1)
+ pptpethuni_output = await reader.read(8192)
+ mac_info = self._parse_pptpethuni(pptpethuni_output)
+ if mac_info and mac_info.get('mac_address'):
+ data.update(mac_info)
+ logger.debug(f"[TELNET] MAC from PptpEthUni: {mac_info.get('mac_address')}")
+ else:
+ writer.write('omcicli mib get VeipUni\n')
+ await asyncio.sleep(1)
+ veipuni_output = await reader.read(8192)
+ mac_info = self._parse_veipuni(veipuni_output)
+ if mac_info and mac_info.get('mac_address'):
+ data.update(mac_info)
+ logger.debug(f"[TELNET] MAC from VeipUni: {mac_info.get('mac_address')}")
+ else:
+ writer.write('cat /sys/class/net/eth0/address\n')
+ await asyncio.sleep(0.5)
+ sys_mac_output = await reader.read(2048)
+ mac = self._parse_sys_mac(sys_mac_output)
+ if mac:
+ data['mac_address'] = mac
+ logger.debug(f"[TELNET] MAC from /sys: {mac}")
+
+ writer.write('exit\n')
+ await asyncio.sleep(0.3)
+
+ try:
+ writer.close()
+ except:
+ pass
+
+ if data.get('rx_power') is not None and data.get('tx_power') is not None:
+ data['status'] = 'online'
+
+ logger.info(f"[TELNET] Collected {len(data)} fields")
+ return data
+
+ except asyncio.TimeoutError:
+ logger.error("[TELNET] Timeout during telnet connection")
+ return None
+ except Exception as e:
+ logger.error(f"[TELNET] Telnet error: {e}", exc_info=True)
+ return None
+
+ def _parse_proc_uptime(self, output):
+ try:
+ match = re.search(r'(\d+\.\d+)', output)
+ if match:
+ return int(float(match.group(1)))
+ except Exception as e:
+ logger.error(f"Error parsing /proc/uptime: {e}")
+ return None
+
+ def _parse_proc_net_dev(self, output):
+ try:
+ data = {}
+ for line in output.split('\n'):
+ if 'eth0:' in line or 'br0:' in line:
+ parts = line.split()
+ if len(parts) >= 10:
+ data['rx_bytes'] = int(parts[1])
+ data['tx_bytes'] = int(parts[9])
+ data['rx_packets'] = int(parts[2])
+ data['tx_packets'] = int(parts[10])
+ break
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing /proc/net/dev: {e}")
+ return {}
+
+ def _parse_anig(self, output):
+ try:
+ data = {}
+
+ rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output)
+ if rx_match:
+ rx_hex = int(rx_match.group(1), 16)
+ data['rx_power'] = self._convert_optical_power(rx_hex)
+
+ tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output)
+ if tx_match:
+ tx_hex = int(tx_match.group(1), 16)
+ data['tx_power'] = self._convert_optical_power(tx_hex)
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing ANI-G: {e}")
+ return {}
+
+ def _parse_fec(self, output):
+ try:
+ data = {}
+
+ match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
+ if match:
+ data['fec_corrected'] = int(match.group(1), 16)
+ else:
+ data['fec_corrected'] = 0
+
+ match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output)
+ if match:
+ data['fec_uncorrected'] = int(match.group(1), 16)
+ else:
+ data['fec_uncorrected'] = 0
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing FEC: {e}")
+ return {'fec_corrected': 0, 'fec_uncorrected': 0}
+
+ def _parse_ont2g(self, output):
+ try:
+ data = {}
+
+ match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
+ if match:
+ hex_str = match.group(1)
+ try:
+ vendor_version = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00')
+ if vendor_version:
+ data['vendor_id'] = 'ALCL'
+ data['version'] = vendor_version
+ except:
+ pass
+
+ match = re.search(r'VendorId:\s*(\S+)', output, re.IGNORECASE)
+ if match:
+ data['vendor_id'] = match.group(1)
+
+ match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE)
+ if match:
+ data['version'] = match.group(1)
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing ONT2-G: {e}")
+ return {}
+
+ def _parse_oltg(self, output):
+ try:
+ data = {}
+
+ match = re.search(r'OltVendorId:\s*(\S+)', output, re.IGNORECASE)
+ if match:
+ data['olt_vendor_info'] = match.group(1)
+
+ match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE)
+ if match:
+ data['olt_version_info'] = match.group(1)
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing OLT-G: {e}")
+ return {}
+
+ def _parse_pptpethuni(self, output):
+ try:
+ data = {}
+
+ match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
+ if match:
+ data['mac_address'] = match.group(1)
+ return data
+
+ match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
+ if match:
+ hex_str = match.group(1)
+ if len(hex_str) == 12:
+ mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
+ data['mac_address'] = mac
+ return data
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing PptpEthUni: {e}")
+ return {}
+
+ def _parse_veipuni(self, output):
+ try:
+ data = {}
+
+ match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE)
+ if match:
+ data['mac_address'] = match.group(1)
+ return data
+
+ match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE)
+ if match:
+ hex_str = match.group(1)
+ if len(hex_str) == 12:
+ mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)])
+ data['mac_address'] = mac
+ return data
+
+ return data
+ except Exception as e:
+ logger.error(f"Error parsing VeipUni: {e}")
+ return {}
+
+ def _parse_sys_mac(self, output):
+ try:
+ output = output.replace('cat /sys/class/net/eth0/address', '')
+ output = output.replace('# ', '').strip()
+
+ match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output)
+ if match:
+ return match.group(1)
+
+ match = re.search(r'([0-9a-fA-F]{12})', output)
+ if match:
+ mac = match.group(1)
+ return ':'.join([mac[i:i+2] for i in range(0, 12, 2)])
+
+ except Exception as e:
+ logger.error(f"Error parsing /sys MAC: {e}")
+ return None
+
+ def _parse_serial(self, output):
+ try:
+ match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE)
+ if match:
+ return match.group(1)
+ except Exception as e:
+ logger.error(f"Error parsing serial: {e}")
+ return None
+
+ def _convert_optical_power(self, hex_value):
+ if hex_value > 0x7fff:
+ signed_value = hex_value - 0x10000
+ else:
+ signed_value = hex_value
+ return signed_value * 0.002
+
+ def _check_alerts(self, data):
+ new_alerts = []
+ thresholds = self.config.THRESHOLDS
+
+ rx = data.get('rx_power')
+ if rx is not None:
+ if rx < thresholds['rx_power_min']:
+ new_alerts.append({
+ 'severity': 'critical',
+ 'category': 'optical',
+ 'message': f'RX Power critically low: {rx:.2f} dBm',
+ 'value': rx,
+ 'timestamp': datetime.now().isoformat()
+ })
+ elif rx > thresholds['rx_power_max']:
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'optical',
+ 'message': f'RX Power high: {rx:.2f} dBm',
+ 'value': rx,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ tx = data.get('tx_power')
+ if tx is not None:
+ if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']:
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'optical',
+ 'message': f'TX Power out of range: {tx:.2f} dBm',
+ 'value': tx,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ temp = data.get('temperature')
+ if temp is not None and temp > thresholds.get('temperature_max', 85):
+ new_alerts.append({
+ 'severity': 'warning',
+ 'category': 'temperature',
+ 'message': f'Temperature high: {temp:.1f}°C',
+ 'value': temp,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ fec_uncor = data.get('fec_uncorrected', 0)
+ if fec_uncor > 0:
+ new_alerts.append({
+ 'severity': 'critical',
+ 'category': 'transmission',
+ 'message': f'FEC Uncorrected Errors: {fec_uncor}',
+ 'value': fec_uncor,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ self.alerts = new_alerts[-20:]
+
+ if new_alerts:
+ logger.warning(f"Generated {len(new_alerts)} alerts")
|