From 8fb1d3ab6d922f4dd6292b66d2e20525129da703 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20Gruszczy=C5=84ski?= Date: Wed, 7 Jan 2026 13:23:40 +0100 Subject: [PATCH] optimalizations in collector --- collector.py | 560 +++++++++++++++++++++++++++----------- config.py | 1 + static/js/dashboard.js | 14 +- templates/index.html | 13 +- tests/collector_old.py | 399 +++++++++++++++++++++++++++ tests/collector_opti_1.py | 508 ++++++++++++++++++++++++++++++++++ 6 files changed, 1325 insertions(+), 170 deletions(-) create mode 100644 tests/collector_old.py create mode 100644 tests/collector_opti_1.py diff --git a/collector.py b/collector.py index 7043e99..31fe607 100644 --- a/collector.py +++ b/collector.py @@ -8,8 +8,10 @@ import re import asyncio import aiohttp + logger = logging.getLogger(__name__) + class GPONCollector: def __init__(self, config): self.config = config @@ -18,38 +20,56 @@ class GPONCollector: self.running = False self.thread = None self.alerts = [] + self.last_full_collect = 0 + self.full_collect_interval = 3600 + self.first_run = True + def start(self): if self.running: return - self.running = True self.thread = Thread(target=self._collect_loop, daemon=True) self.thread.start() logger.info("Collector started") + def stop(self): self.running = False if self.thread: self.thread.join(timeout=5) logger.info("Collector stopped") + def get_data(self): with self.lock: return self.data.copy() + def get_alerts(self): with self.lock: return self.alerts.copy() + def _collect_loop(self): while self.running: try: + collect_start = time.time() + loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - data = loop.run_until_complete(self._collect_all_data()) + + now = time.time() + full_collect = self.first_run or (now - self.last_full_collect) >= self.full_collect_interval + + data = loop.run_until_complete(self._collect_all_data(full_collect=full_collect)) loop.close() + collect_duration = time.time() - collect_start + + if full_collect: + self.last_full_collect = now + if data and data.get('status') == 'online': with self.lock: self.data = data @@ -60,23 +80,38 @@ class GPONCollector: temp = data.get('temperature', 0) uptime = data.get('uptime', 0) uptime_days = uptime // 86400 - logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}C Uptime={uptime}s ({uptime_days}d)") + uptime_hours = (uptime % 86400) // 3600 + uptime_minutes = (uptime % 3600) // 60 + + if self.first_run: + logger.info("=" * 70) + logger.info("Initial data collection completed successfully") + logger.info(f"Device: {data.get('vendor_id', 'N/A')} {data.get('model', 'N/A')}") + logger.info(f"Serial: {data.get('serial_number', 'N/A')}") + logger.info(f"Optical: RX={rx:.2f}dBm TX={tx:.2f}dBm") + logger.info(f"Status: Temperature={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m") + logger.info(f"Collection time: {collect_duration:.2f}s") + logger.info("=" * 70) + self.first_run = False + else: + logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m (took {collect_duration:.2f}s)") else: - logger.warning("No data or device offline") + logger.warning(f"No data or device offline (took {collect_duration:.2f}s)") + self.first_run = False except Exception as e: logger.error(f"Error in collector loop: {e}", exc_info=True) + self.first_run = False time.sleep(self.config.POLL_INTERVAL) - async def _collect_all_data(self): - omci_data = await self._collect_omci_async() + async def _collect_all_data(self, full_collect=False): + omci_data = await self._collect_omci_async(full_collect=full_collect) if not omci_data: return None web_data = await self._scrape_web_interface() - if web_data: for key, value in web_data.items(): if value is not None: @@ -84,17 +119,18 @@ class GPONCollector: return omci_data + async def _scrape_web_interface(self): try: + scrape_start = time.time() auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD) base_url = f'http://{self.config.GPON_HOST}' - data = {} + connector = aiohttp.TCPConnector(ssl=False) timeout = aiohttp.ClientTimeout(total=10, connect=5) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: - try: url = f'{base_url}/status_pon.asp' async with session.get(url, auth=auth) as response: @@ -104,113 +140,51 @@ class GPONCollector: temp_match = re.search(r']*>Temperature\s*]*>([\d.]+)\s*C', html, re.IGNORECASE) if temp_match: data['temperature'] = float(temp_match.group(1)) - logger.debug(f"[WEB] Temperature: {data['temperature']}") volt_match = re.search(r']*>Voltage\s*]*>([\d.]+)\s*V', html, re.IGNORECASE) if volt_match: data['voltage'] = float(volt_match.group(1)) - logger.debug(f"[WEB] Voltage: {data['voltage']}") bias_match = re.search(r']*>Bias Current\s*]*>([\d.]+)\s*mA', html, re.IGNORECASE) if bias_match: data['tx_bias_current'] = float(bias_match.group(1)) - logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}") - logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") + logger.debug(f"[WEB] status_pon.asp: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") else: logger.warning(f"[WEB] status_pon.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching status_pon.asp: {e}") - try: - url = f'{base_url}/status.asp' - async with session.get(url, auth=auth) as response: - if response.status == 200: - html = await response.text() - - uptime_match = re.search( - r']*>Uptime\s*]*>\s*([^<]+)', - html, - re.IGNORECASE - ) - if uptime_match: - uptime_str = uptime_match.group(1).strip() - logger.debug(f"[WEB] Raw uptime: '{uptime_str}'") - - days_min_match = re.search(r'(\d+)\s*days?,\s*(\d+)\s*min', uptime_str, re.IGNORECASE) - if days_min_match: - days = int(days_min_match.group(1)) - minutes = int(days_min_match.group(2)) - data['uptime'] = (days * 86400) + (minutes * 60) - logger.info(f"[WEB] Uptime: {days}d {minutes}m = {data['uptime']}s") - - elif days_hm_match := re.search(r'(\d+)\s*days?,\s*(\d+):(\d+)', uptime_str): - days = int(days_hm_match.group(1)) - hours = int(days_hm_match.group(2)) - minutes = int(days_hm_match.group(3)) - data['uptime'] = (days * 86400) + (hours * 3600) + (minutes * 60) - logger.info(f"[WEB] Uptime: {days}d {hours}h {minutes}m = {data['uptime']}s") - - elif hm_match := re.search(r'(\d+)\s*h(?:our)?[s]?,?\s*(\d+)\s*min', uptime_str, re.IGNORECASE): - hours = int(hm_match.group(1)) - minutes = int(hm_match.group(2)) - data['uptime'] = (hours * 3600) + (minutes * 60) - logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") - - elif time_match := re.search(r'^(\d+):(\d+)$', uptime_str): - hours = int(time_match.group(1)) - minutes = int(time_match.group(2)) - data['uptime'] = (hours * 3600) + (minutes * 60) - logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") - - else: - logger.warning(f"[WEB] Could not parse uptime: '{uptime_str}'") - else: - logger.warning("[WEB] Uptime not found in status.asp") - - logger.info(f"[WEB] status.asp OK: uptime={data.get('uptime')}") - else: - logger.warning(f"[WEB] status.asp returned {response.status}") - except Exception as e: - logger.error(f"[WEB] Error fetching status.asp: {e}") - try: url = f'{base_url}/admin/pon-stats.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() - tx_pkts_match = re.search(r']*>Packets Sent:\s*]*>(\d+)', html, re.IGNORECASE) + tx_pkts_match = re.search(r']*>Packets Sent:\s*]*>([\d,]+)', html, re.IGNORECASE) if tx_pkts_match: - data['tx_packets'] = int(tx_pkts_match.group(1)) - logger.debug(f"[WEB] TX Packets: {data['tx_packets']}") + data['tx_packets'] = int(tx_pkts_match.group(1).replace(',', '')) - rx_pkts_match = re.search(r']*>Packets Received:\s*]*>(\d+)', html, re.IGNORECASE) + rx_pkts_match = re.search(r']*>Packets Received:\s*]*>([\d,]+)', html, re.IGNORECASE) if rx_pkts_match: - data['rx_packets'] = int(rx_pkts_match.group(1)) - logger.debug(f"[WEB] RX Packets: {data['rx_packets']}") + data['rx_packets'] = int(rx_pkts_match.group(1).replace(',', '')) - tx_bytes_match = re.search(r']*>Bytes Sent:\s*]*>(\d+)', html, re.IGNORECASE) + tx_bytes_match = re.search(r']*>Bytes Sent:\s*]*>([\d,]+)', html, re.IGNORECASE) if tx_bytes_match: - data['tx_bytes'] = int(tx_bytes_match.group(1)) - logger.debug(f"[WEB] TX Bytes: {data['tx_bytes']}") + data['tx_bytes'] = int(tx_bytes_match.group(1).replace(',', '')) - rx_bytes_match = re.search(r']*>Bytes Received:\s*]*>(\d+)', html, re.IGNORECASE) + rx_bytes_match = re.search(r']*>Bytes Received:\s*]*>([\d,]+)', html, re.IGNORECASE) if rx_bytes_match: - data['rx_bytes'] = int(rx_bytes_match.group(1)) - logger.debug(f"[WEB] RX Bytes: {data['rx_bytes']}") + data['rx_bytes'] = int(rx_bytes_match.group(1).replace(',', '')) - logger.info(f"[WEB] pon-stats.asp OK: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}") + logger.debug(f"[WEB] pon-stats.asp: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}") else: logger.warning(f"[WEB] pon-stats.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching pon-stats.asp: {e}") - if data: - logger.info(f"[WEB] Scraped {len(data)} fields from web interface") - else: - logger.warning("[WEB] No data scraped from web interface") - + scrape_duration = time.time() - scrape_start + logger.info(f"[WEB] Scraped {len(data)} fields in {scrape_duration:.2f}s") return data except Exception as e: @@ -218,8 +192,10 @@ class GPONCollector: return {} - async def _collect_omci_async(self): + async def _collect_omci_async(self, full_collect=False): try: + telnet_start = time.time() + reader, writer = await asyncio.wait_for( telnetlib3.open_connection( self.config.GPON_HOST, @@ -242,9 +218,114 @@ class GPONCollector: await asyncio.sleep(1) await reader.read(1024) - writer.write('omcicli mib get all\n') - await asyncio.sleep(3) - full_output = await reader.read(102400) + data = { + 'timestamp': datetime.now().isoformat(), + 'status': 'offline', + } + + writer.write('cat /proc/uptime\n') + await asyncio.sleep(0.5) + uptime_output = await reader.read(1024) + uptime = self._parse_proc_uptime(uptime_output) + if uptime: + data['uptime'] = uptime + + writer.write('omcicli mib get Anig\n') + await asyncio.sleep(1.5) + anig_output = await reader.read(16384) + optical = self._parse_anig(anig_output) + if optical: + data.update(optical) + + if full_collect: + logger.info("[TELNET] Full collection mode - gathering all static data") + + writer.write('omcicli get sn\n') + await asyncio.sleep(0.5) + sn_output = await reader.read(1024) + sn = self._parse_serial(sn_output) + if sn: + data['serial_number'] = sn + if len(sn) >= 4: + data['vendor_id'] = sn[:4] + + writer.write('omcicli mib get FecPmhd\n') + await asyncio.sleep(1) + fec_output = await reader.read(8192) + fec = self._parse_fec(fec_output) + if fec: + data.update(fec) + + writer.write('omcicli mib get Ont2g\n') + await asyncio.sleep(1.5) + ont2g_output = await reader.read(16384) + device_info = self._parse_ont2g(ont2g_output) + if device_info: + data.update(device_info) + + writer.write('omcicli mib get OltG\n') + await asyncio.sleep(1) + oltg_output = await reader.read(8192) + olt_info = self._parse_oltg(oltg_output) + if olt_info: + data.update(olt_info) + + writer.write('omcicli mib get PptpEthUni\n') + await asyncio.sleep(1) + pptpethuni_output = await reader.read(8192) + mac_info = self._parse_pptpethuni(pptpethuni_output) + if mac_info and mac_info.get('mac_address'): + data.update(mac_info) + else: + writer.write('omcicli mib get VeipUni\n') + await asyncio.sleep(1) + veipuni_output = await reader.read(8192) + mac_info = self._parse_veipuni(veipuni_output) + if mac_info and mac_info.get('mac_address'): + data.update(mac_info) + else: + writer.write('cat /sys/class/net/eth0/address\n') + await asyncio.sleep(0.5) + sys_mac_output = await reader.read(2048) + mac = self._parse_sys_mac(sys_mac_output) + if mac: + data['mac_address'] = mac + + flash_params = ['OMCI_SW_VER1', 'HW_HWVER', 'OMCI_OLT_MODE', 'LAN_SDS_MODE', 'PON_MODE', 'OMCC_VER'] + for param in flash_params: + writer.write(f'flash get {param}\n') + await asyncio.sleep(0.3) + param_output = await reader.read(4096) + value = self._parse_flash_value(param_output, param) + if value: + if param == 'OMCI_SW_VER1': + data['version'] = value + elif param == 'HW_HWVER': + data['hw_version'] = value + elif param == 'OMCI_OLT_MODE': + data['olt_mode'] = self._decode_olt_mode(value) + elif param == 'LAN_SDS_MODE': + data['lan_mode'] = self._decode_lan_mode(value) + elif param == 'PON_MODE': + data['pon_mode'] = self._decode_pon_mode(value) + elif param == 'OMCC_VER': + data['omcc_version'] = hex(int(value)) if value.isdigit() else value + else: + logger.info("[TELNET] Fast collection mode - using cached static data") + with self.lock: + logger.info(f"[CACHE] self.data contains {len(self.data)} keys") + if self.data: + cached_keys = ['serial_number', 'vendor_id', 'model', 'version', 'hw_version', + 'mac_address', 'olt_vendor_info', 'olt_version_info', 'olt_mode', + 'lan_mode', 'pon_mode', 'omcc_version', 'fec_corrected', 'fec_uncorrected'] + cached_count = 0 + for key in cached_keys: + if key in self.data: + data[key] = self.data[key] + cached_count += 1 + logger.info(f"[CACHE] Copied {cached_count}/{len(cached_keys)} cached fields") + else: + logger.warning("[CACHE] self.data is empty! Cache not available.") writer.write('exit\n') await asyncio.sleep(0.3) @@ -254,83 +335,237 @@ class GPONCollector: except: pass - logger.debug(f"[OMCI] Received {len(full_output)} bytes") - - return self._parse_omci(full_output) - - except asyncio.TimeoutError: - logger.error("[OMCI] Timeout during telnet connection") - return None - except Exception as e: - logger.error(f"[OMCI] Telnet error: {e}", exc_info=True) - return None - - def _parse_omci(self, raw_data): - try: - with open('/tmp/omci_debug.txt', 'w') as f: - f.write(raw_data) - except: - pass - - data = { - 'timestamp': datetime.now().isoformat(), - 'status': 'offline', - } - - try: - match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', raw_data) - if match: - rx_hex = int(match.group(1), 16) - data['rx_power'] = self._convert_optical_power(rx_hex) - - match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', raw_data) - if match: - tx_hex = int(match.group(1), 16) - data['tx_power'] = self._convert_optical_power(tx_hex) - - match = re.search(r'SerialNum:\s*(\S+)', raw_data) - if match: - data['serial_number'] = match.group(1) - - match = re.search(r'Version:\s*(M\d+\S*)', raw_data) - if match: - data['version'] = match.group(1) - - match = re.search(r'VID:\s*(\S+)', raw_data) - if match: - data['vendor_id'] = match.group(1) - - match = re.search(r'OltVendorId:\s*(\S+)', raw_data) - if match: - data['olt_vendor_info'] = match.group(1) - - match = re.search(r'OltG.*?Version:\s*(\d+)', raw_data, re.DOTALL) - if match: - data['olt_version_info'] = match.group(1) - - match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', raw_data) - if match: - data['mac_address'] = match.group(1) - - fec_section = re.search( - r'FecPmhd.*?CorCodeWords:\s*(0x[0-9a-fA-F]+).*?UncorCodeWords:\s*(0x[0-9a-fA-F]+)', - raw_data, - re.DOTALL - ) - if fec_section: - data['fec_corrected'] = int(fec_section.group(1), 16) - data['fec_uncorrected'] = int(fec_section.group(2), 16) - else: - data['fec_corrected'] = 0 - data['fec_uncorrected'] = 0 - if data.get('rx_power') is not None and data.get('tx_power') is not None: data['status'] = 'online' - except Exception as e: - logger.error(f"[OMCI] Parsing error: {e}", exc_info=True) + telnet_duration = time.time() - telnet_start + logger.info(f"[TELNET] Collected {len(data)} fields (full={full_collect}) in {telnet_duration:.2f}s") + return data + + except asyncio.TimeoutError: + logger.error("[TELNET] Timeout during telnet connection") + return None + except Exception as e: + logger.error(f"[TELNET] Telnet error: {e}", exc_info=True) + return None + + + def _parse_proc_uptime(self, output): + try: + match = re.search(r'(\d+\.\d+)', output) + if match: + return int(float(match.group(1))) + except Exception as e: + logger.error(f"Error parsing /proc/uptime: {e}") + return None + + + def _parse_anig(self, output): + try: + data = {} + + rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output) + if rx_match: + rx_hex = int(rx_match.group(1), 16) + data['rx_power'] = self._convert_optical_power(rx_hex) + + tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output) + if tx_match: + tx_hex = int(tx_match.group(1), 16) + data['tx_power'] = self._convert_optical_power(tx_hex) + + return data + except Exception as e: + logger.error(f"Error parsing ANI-G: {e}") + return {} + + + def _parse_fec(self, output): + try: + data = {} + + match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output) + if match: + data['fec_corrected'] = int(match.group(1), 16) + else: + data['fec_corrected'] = 0 + + match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output) + if match: + data['fec_uncorrected'] = int(match.group(1), 16) + else: + data['fec_uncorrected'] = 0 + + return data + except Exception as e: + logger.error(f"Error parsing FEC: {e}") + return {'fec_corrected': 0, 'fec_uncorrected': 0} + + + def _parse_ont2g(self, output): + try: + data = {} + + match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + try: + model = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip() + if model: + data['model'] = model + except: + pass + + return data + except Exception as e: + logger.error(f"Error parsing ONT2-G: {e}") + return {} + + + def _parse_oltg(self, output): + try: + data = {} + + match = re.search(r'OltVendorId:\s*([A-Z0-9]{4})', output, re.IGNORECASE) + if match: + data['olt_vendor_info'] = match.group(1) + else: + match = re.search(r'OltVendorId:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + try: + olt_vendor = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip() + if olt_vendor: + data['olt_vendor_info'] = olt_vendor + except: + pass + + match = re.search(r'Version:\s*(\d+)', output, re.IGNORECASE) + if match: + data['olt_version_info'] = match.group(1) + + return data + except Exception as e: + logger.error(f"Error parsing OLT-G: {e}") + return {} + + + def _parse_pptpethuni(self, output): + try: + data = {} + + match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) + if match: + data['mac_address'] = match.group(1) + return data + + match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + if len(hex_str) == 12: + mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) + data['mac_address'] = mac + return data + + return data + except Exception as e: + logger.error(f"Error parsing PptpEthUni: {e}") + return {} + + + def _parse_veipuni(self, output): + try: + data = {} + + match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) + if match: + data['mac_address'] = match.group(1) + return data + + match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + if len(hex_str) == 12: + mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) + data['mac_address'] = mac + return data + + return data + except Exception as e: + logger.error(f"Error parsing VeipUni: {e}") + return {} + + + def _parse_sys_mac(self, output): + try: + output = output.replace('cat /sys/class/net/eth0/address', '') + output = output.replace('# ', '').strip() + + match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output) + if match: + return match.group(1) + + match = re.search(r'([0-9a-fA-F]{12})', output) + if match: + mac = match.group(1) + return ':'.join([mac[i:i+2] for i in range(0, 12, 2)]) + + except Exception as e: + logger.error(f"Error parsing /sys MAC: {e}") + return None + + + def _parse_serial(self, output): + try: + match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE) + if match: + return match.group(1) + except Exception as e: + logger.error(f"Error parsing serial: {e}") + return None + + + def _parse_flash_value(self, output, key): + try: + pattern = rf'{key}=(.+?)(?:\r|\n|$)' + match = re.search(pattern, output, re.IGNORECASE) + if match: + value = match.group(1).strip() + if value and value != 'DEFAULT': + return value + return None + except Exception as e: + logger.error(f"Error parsing flash value {key}: {e}") + return None + + + def _decode_olt_mode(self, value): + modes = { + '0': 'Disabled', + '1': 'Huawei', + '2': 'ZTE', + '3': 'Customized', + '21': 'Auto' + } + return modes.get(value, f'Unknown ({value})') + + + def _decode_lan_mode(self, value): + modes = { + '0': 'Auto', + '1': 'Fiber 1G', + '8': 'HSGMII 2.5G' + } + return modes.get(value, f'Mode {value}') + + + def _decode_pon_mode(self, value): + modes = { + '1': 'GPON', + '2': 'EPON' + } + return modes.get(value, f'Mode {value}') - return data def _convert_optical_power(self, hex_value): if hex_value > 0x7fff: @@ -339,6 +574,7 @@ class GPONCollector: signed_value = hex_value return signed_value * 0.002 + def _check_alerts(self, data): new_alerts = [] thresholds = self.config.THRESHOLDS @@ -378,7 +614,7 @@ class GPONCollector: new_alerts.append({ 'severity': 'warning', 'category': 'temperature', - 'message': f'Temperature high: {temp:.1f}C', + 'message': f'Temperature high: {temp:.1f}°C', 'value': temp, 'timestamp': datetime.now().isoformat() }) @@ -396,4 +632,4 @@ class GPONCollector: self.alerts = new_alerts[-20:] if new_alerts: - logger.warning(f"Generated {len(new_alerts)} alerts") \ No newline at end of file + logger.warning(f"Generated {len(new_alerts)} alerts") diff --git a/config.py b/config.py index 950b732..5fb6d22 100644 --- a/config.py +++ b/config.py @@ -8,6 +8,7 @@ class Config: # GPON Device GPON_HOST = os.getenv('GPON_HOST', '192.168.100.1') GPON_PORT = int(os.getenv('GPON_PORT', 23)) + GPON_TELNET_PORT = int(os.getenv('GPON_PORT', 23)) GPON_USERNAME = os.getenv('GPON_USERNAME', 'leox') GPON_PASSWORD = os.getenv('GPON_PASSWORD', 'leolabs_7') diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 2cc6575..3acb470 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -90,10 +90,15 @@ class GPONDashboard { updateDeviceInfo(data) { this.setElementText('vendor-id', data.vendor_id); + this.setElementText('model', data.model); this.setElementText('serial-number', data.serial_number); this.setElementText('version', data.version); + this.setElementText('hw-version', data.hw_version); this.setElementText('mac-address', data.mac_address); - + this.setElementText('omcc-version', data.omcc_version); + this.setElementText('olt-mode', data.olt_mode); + this.setElementText('pon-mode', data.pon_mode); + this.setElementText('lan-mode', data.lan_mode); this.setElementText('olt-vendor-info', data.olt_vendor_info); this.setElementText('olt-version-info', data.olt_version_info); @@ -106,12 +111,10 @@ class GPONDashboard { const days = Math.floor(seconds / 86400); const hours = Math.floor((seconds % 86400) / 3600); const minutes = Math.floor((seconds % 3600) / 60); - let connTime = ''; if (days > 0) connTime += days + 'd '; if (hours > 0 || days > 0) connTime += hours + 'h '; connTime += minutes + 'm'; - connTimeElem.textContent = connTime.trim(); } else { connTimeElem.textContent = '--'; @@ -148,9 +151,9 @@ class GPONDashboard { const statusElem = document.getElementById('device-status'); if (statusElem) { if (data.status === 'online') { - statusElem.innerHTML = 'Online'; + statusElem.innerHTML = ' Online'; } else { - statusElem.innerHTML = 'Offline'; + statusElem.innerHTML = ' Offline'; } } @@ -174,6 +177,7 @@ class GPONDashboard { } } + updateStatus(data) { const indicator = document.getElementById('status-indicator'); if (!indicator) return; diff --git a/templates/index.html b/templates/index.html index f82e89b..5b139e2 100644 --- a/templates/index.html +++ b/templates/index.html @@ -145,22 +145,28 @@ - + + +
Vendor ID:--
Serial Number:--
Version:--
Model:--
SW Version:--
HW Version:--
MAC Address:--
-
OLT Info
+
OLT & OMCI
+ +
OLT Vendor:--
OLT Version:--
OLT Mode:--
OMCC Version:--
Connection Time:--
Statistics
- + + + @@ -181,6 +187,7 @@ +
diff --git a/tests/collector_old.py b/tests/collector_old.py new file mode 100644 index 0000000..7043e99 --- /dev/null +++ b/tests/collector_old.py @@ -0,0 +1,399 @@ +from aiohttp import BasicAuth +import telnetlib3 +import time +import logging +from threading import Thread, Lock +from datetime import datetime +import re +import asyncio +import aiohttp + +logger = logging.getLogger(__name__) + +class GPONCollector: + def __init__(self, config): + self.config = config + self.data = {} + self.lock = Lock() + self.running = False + self.thread = None + self.alerts = [] + + def start(self): + if self.running: + return + + self.running = True + self.thread = Thread(target=self._collect_loop, daemon=True) + self.thread.start() + logger.info("Collector started") + + def stop(self): + self.running = False + if self.thread: + self.thread.join(timeout=5) + logger.info("Collector stopped") + + def get_data(self): + with self.lock: + return self.data.copy() + + def get_alerts(self): + with self.lock: + return self.alerts.copy() + + def _collect_loop(self): + while self.running: + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + data = loop.run_until_complete(self._collect_all_data()) + loop.close() + + if data and data.get('status') == 'online': + with self.lock: + self.data = data + self._check_alerts(data) + + rx = data.get('rx_power', 0) + tx = data.get('tx_power', 0) + temp = data.get('temperature', 0) + uptime = data.get('uptime', 0) + uptime_days = uptime // 86400 + logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}C Uptime={uptime}s ({uptime_days}d)") + else: + logger.warning("No data or device offline") + + except Exception as e: + logger.error(f"Error in collector loop: {e}", exc_info=True) + + time.sleep(self.config.POLL_INTERVAL) + + async def _collect_all_data(self): + omci_data = await self._collect_omci_async() + + if not omci_data: + return None + + web_data = await self._scrape_web_interface() + + if web_data: + for key, value in web_data.items(): + if value is not None: + omci_data[key] = value + + return omci_data + + async def _scrape_web_interface(self): + try: + auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD) + base_url = f'http://{self.config.GPON_HOST}' + + data = {} + connector = aiohttp.TCPConnector(ssl=False) + timeout = aiohttp.ClientTimeout(total=10, connect=5) + + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + + try: + url = f'{base_url}/status_pon.asp' + async with session.get(url, auth=auth) as response: + if response.status == 200: + html = await response.text() + + temp_match = re.search(r']*>Temperature\s*]*>([\d.]+)\s*C', html, re.IGNORECASE) + if temp_match: + data['temperature'] = float(temp_match.group(1)) + logger.debug(f"[WEB] Temperature: {data['temperature']}") + + volt_match = re.search(r']*>Voltage\s*]*>([\d.]+)\s*V', html, re.IGNORECASE) + if volt_match: + data['voltage'] = float(volt_match.group(1)) + logger.debug(f"[WEB] Voltage: {data['voltage']}") + + bias_match = re.search(r']*>Bias Current\s*]*>([\d.]+)\s*mA', html, re.IGNORECASE) + if bias_match: + data['tx_bias_current'] = float(bias_match.group(1)) + logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}") + + logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") + else: + logger.warning(f"[WEB] status_pon.asp returned {response.status}") + except Exception as e: + logger.error(f"[WEB] Error fetching status_pon.asp: {e}") + + try: + url = f'{base_url}/status.asp' + async with session.get(url, auth=auth) as response: + if response.status == 200: + html = await response.text() + + uptime_match = re.search( + r']*>Uptime\s*]*>\s*([^<]+)', + html, + re.IGNORECASE + ) + if uptime_match: + uptime_str = uptime_match.group(1).strip() + logger.debug(f"[WEB] Raw uptime: '{uptime_str}'") + + days_min_match = re.search(r'(\d+)\s*days?,\s*(\d+)\s*min', uptime_str, re.IGNORECASE) + if days_min_match: + days = int(days_min_match.group(1)) + minutes = int(days_min_match.group(2)) + data['uptime'] = (days * 86400) + (minutes * 60) + logger.info(f"[WEB] Uptime: {days}d {minutes}m = {data['uptime']}s") + + elif days_hm_match := re.search(r'(\d+)\s*days?,\s*(\d+):(\d+)', uptime_str): + days = int(days_hm_match.group(1)) + hours = int(days_hm_match.group(2)) + minutes = int(days_hm_match.group(3)) + data['uptime'] = (days * 86400) + (hours * 3600) + (minutes * 60) + logger.info(f"[WEB] Uptime: {days}d {hours}h {minutes}m = {data['uptime']}s") + + elif hm_match := re.search(r'(\d+)\s*h(?:our)?[s]?,?\s*(\d+)\s*min', uptime_str, re.IGNORECASE): + hours = int(hm_match.group(1)) + minutes = int(hm_match.group(2)) + data['uptime'] = (hours * 3600) + (minutes * 60) + logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") + + elif time_match := re.search(r'^(\d+):(\d+)$', uptime_str): + hours = int(time_match.group(1)) + minutes = int(time_match.group(2)) + data['uptime'] = (hours * 3600) + (minutes * 60) + logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") + + else: + logger.warning(f"[WEB] Could not parse uptime: '{uptime_str}'") + else: + logger.warning("[WEB] Uptime not found in status.asp") + + logger.info(f"[WEB] status.asp OK: uptime={data.get('uptime')}") + else: + logger.warning(f"[WEB] status.asp returned {response.status}") + except Exception as e: + logger.error(f"[WEB] Error fetching status.asp: {e}") + + try: + url = f'{base_url}/admin/pon-stats.asp' + async with session.get(url, auth=auth) as response: + if response.status == 200: + html = await response.text() + + tx_pkts_match = re.search(r']*>Packets Sent:\s*]*>(\d+)', html, re.IGNORECASE) + if tx_pkts_match: + data['tx_packets'] = int(tx_pkts_match.group(1)) + logger.debug(f"[WEB] TX Packets: {data['tx_packets']}") + + rx_pkts_match = re.search(r']*>Packets Received:\s*]*>(\d+)', html, re.IGNORECASE) + if rx_pkts_match: + data['rx_packets'] = int(rx_pkts_match.group(1)) + logger.debug(f"[WEB] RX Packets: {data['rx_packets']}") + + tx_bytes_match = re.search(r']*>Bytes Sent:\s*]*>(\d+)', html, re.IGNORECASE) + if tx_bytes_match: + data['tx_bytes'] = int(tx_bytes_match.group(1)) + logger.debug(f"[WEB] TX Bytes: {data['tx_bytes']}") + + rx_bytes_match = re.search(r']*>Bytes Received:\s*]*>(\d+)', html, re.IGNORECASE) + if rx_bytes_match: + data['rx_bytes'] = int(rx_bytes_match.group(1)) + logger.debug(f"[WEB] RX Bytes: {data['rx_bytes']}") + + logger.info(f"[WEB] pon-stats.asp OK: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}") + else: + logger.warning(f"[WEB] pon-stats.asp returned {response.status}") + except Exception as e: + logger.error(f"[WEB] Error fetching pon-stats.asp: {e}") + + if data: + logger.info(f"[WEB] Scraped {len(data)} fields from web interface") + else: + logger.warning("[WEB] No data scraped from web interface") + + return data + + except Exception as e: + logger.error(f"[WEB] Web scraping failed: {e}") + return {} + + + async def _collect_omci_async(self): + try: + reader, writer = await asyncio.wait_for( + telnetlib3.open_connection( + self.config.GPON_HOST, + self.config.GPON_PORT + ), + timeout=10 + ) + + await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5) + writer.write(self.config.GPON_USERNAME + '\n') + + await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5) + writer.write(self.config.GPON_PASSWORD + '\n') + + await asyncio.sleep(2) + initial_output = await reader.read(2048) + + if 'RTK.0>' in initial_output or 'command:#' in initial_output: + writer.write('exit\n') + await asyncio.sleep(1) + await reader.read(1024) + + writer.write('omcicli mib get all\n') + await asyncio.sleep(3) + full_output = await reader.read(102400) + + writer.write('exit\n') + await asyncio.sleep(0.3) + + try: + writer.close() + except: + pass + + logger.debug(f"[OMCI] Received {len(full_output)} bytes") + + return self._parse_omci(full_output) + + except asyncio.TimeoutError: + logger.error("[OMCI] Timeout during telnet connection") + return None + except Exception as e: + logger.error(f"[OMCI] Telnet error: {e}", exc_info=True) + return None + + def _parse_omci(self, raw_data): + try: + with open('/tmp/omci_debug.txt', 'w') as f: + f.write(raw_data) + except: + pass + + data = { + 'timestamp': datetime.now().isoformat(), + 'status': 'offline', + } + + try: + match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', raw_data) + if match: + rx_hex = int(match.group(1), 16) + data['rx_power'] = self._convert_optical_power(rx_hex) + + match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', raw_data) + if match: + tx_hex = int(match.group(1), 16) + data['tx_power'] = self._convert_optical_power(tx_hex) + + match = re.search(r'SerialNum:\s*(\S+)', raw_data) + if match: + data['serial_number'] = match.group(1) + + match = re.search(r'Version:\s*(M\d+\S*)', raw_data) + if match: + data['version'] = match.group(1) + + match = re.search(r'VID:\s*(\S+)', raw_data) + if match: + data['vendor_id'] = match.group(1) + + match = re.search(r'OltVendorId:\s*(\S+)', raw_data) + if match: + data['olt_vendor_info'] = match.group(1) + + match = re.search(r'OltG.*?Version:\s*(\d+)', raw_data, re.DOTALL) + if match: + data['olt_version_info'] = match.group(1) + + match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', raw_data) + if match: + data['mac_address'] = match.group(1) + + fec_section = re.search( + r'FecPmhd.*?CorCodeWords:\s*(0x[0-9a-fA-F]+).*?UncorCodeWords:\s*(0x[0-9a-fA-F]+)', + raw_data, + re.DOTALL + ) + if fec_section: + data['fec_corrected'] = int(fec_section.group(1), 16) + data['fec_uncorrected'] = int(fec_section.group(2), 16) + else: + data['fec_corrected'] = 0 + data['fec_uncorrected'] = 0 + + if data.get('rx_power') is not None and data.get('tx_power') is not None: + data['status'] = 'online' + + except Exception as e: + logger.error(f"[OMCI] Parsing error: {e}", exc_info=True) + + return data + + def _convert_optical_power(self, hex_value): + if hex_value > 0x7fff: + signed_value = hex_value - 0x10000 + else: + signed_value = hex_value + return signed_value * 0.002 + + def _check_alerts(self, data): + new_alerts = [] + thresholds = self.config.THRESHOLDS + + rx = data.get('rx_power') + if rx is not None: + if rx < thresholds['rx_power_min']: + new_alerts.append({ + 'severity': 'critical', + 'category': 'optical', + 'message': f'RX Power critically low: {rx:.2f} dBm', + 'value': rx, + 'timestamp': datetime.now().isoformat() + }) + elif rx > thresholds['rx_power_max']: + new_alerts.append({ + 'severity': 'warning', + 'category': 'optical', + 'message': f'RX Power high: {rx:.2f} dBm', + 'value': rx, + 'timestamp': datetime.now().isoformat() + }) + + tx = data.get('tx_power') + if tx is not None: + if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']: + new_alerts.append({ + 'severity': 'warning', + 'category': 'optical', + 'message': f'TX Power out of range: {tx:.2f} dBm', + 'value': tx, + 'timestamp': datetime.now().isoformat() + }) + + temp = data.get('temperature') + if temp is not None and temp > thresholds.get('temperature_max', 85): + new_alerts.append({ + 'severity': 'warning', + 'category': 'temperature', + 'message': f'Temperature high: {temp:.1f}C', + 'value': temp, + 'timestamp': datetime.now().isoformat() + }) + + fec_uncor = data.get('fec_uncorrected', 0) + if fec_uncor > 0: + new_alerts.append({ + 'severity': 'critical', + 'category': 'transmission', + 'message': f'FEC Uncorrected Errors: {fec_uncor}', + 'value': fec_uncor, + 'timestamp': datetime.now().isoformat() + }) + + self.alerts = new_alerts[-20:] + + if new_alerts: + logger.warning(f"Generated {len(new_alerts)} alerts") \ No newline at end of file diff --git a/tests/collector_opti_1.py b/tests/collector_opti_1.py new file mode 100644 index 0000000..ac47536 --- /dev/null +++ b/tests/collector_opti_1.py @@ -0,0 +1,508 @@ +from aiohttp import BasicAuth +import telnetlib3 +import time +import logging +from threading import Thread, Lock +from datetime import datetime +import re +import asyncio +import aiohttp + +logger = logging.getLogger(__name__) + +class GPONCollector: + def __init__(self, config): + self.config = config + self.data = {} + self.lock = Lock() + self.running = False + self.thread = None + self.alerts = [] + + def start(self): + if self.running: + return + self.running = True + self.thread = Thread(target=self._collect_loop, daemon=True) + self.thread.start() + logger.info("Collector started") + + def stop(self): + self.running = False + if self.thread: + self.thread.join(timeout=5) + logger.info("Collector stopped") + + def get_data(self): + with self.lock: + return self.data.copy() + + def get_alerts(self): + with self.lock: + return self.alerts.copy() + + def _collect_loop(self): + while self.running: + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + data = loop.run_until_complete(self._collect_all_data()) + loop.close() + + if data and data.get('status') == 'online': + with self.lock: + self.data = data + self._check_alerts(data) + + rx = data.get('rx_power', 0) + tx = data.get('tx_power', 0) + temp = data.get('temperature', 0) + uptime = data.get('uptime', 0) + uptime_days = uptime // 86400 + logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d") + else: + logger.warning("No data or device offline") + + except Exception as e: + logger.error(f"Error in collector loop: {e}", exc_info=True) + + time.sleep(self.config.POLL_INTERVAL) + + async def _collect_all_data(self): + omci_data = await self._collect_omci_async() + if not omci_data: + return None + + web_data = await self._scrape_web_interface() + if web_data: + for key, value in web_data.items(): + if value is not None: + omci_data[key] = value + + return omci_data + + async def _scrape_web_interface(self): + try: + auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD) + base_url = f'http://{self.config.GPON_HOST}' + data = {} + + connector = aiohttp.TCPConnector(ssl=False) + timeout = aiohttp.ClientTimeout(total=10, connect=5) + + async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: + try: + url = f'{base_url}/status_pon.asp' + async with session.get(url, auth=auth) as response: + if response.status == 200: + html = await response.text() + + temp_match = re.search(r']*>Temperature\s*]*>([\d.]+)\s*C', html, re.IGNORECASE) + if temp_match: + data['temperature'] = float(temp_match.group(1)) + logger.debug(f"[WEB] Temperature: {data['temperature']}") + + volt_match = re.search(r']*>Voltage\s*]*>([\d.]+)\s*V', html, re.IGNORECASE) + if volt_match: + data['voltage'] = float(volt_match.group(1)) + logger.debug(f"[WEB] Voltage: {data['voltage']}") + + bias_match = re.search(r']*>Bias Current\s*]*>([\d.]+)\s*mA', html, re.IGNORECASE) + if bias_match: + data['tx_bias_current'] = float(bias_match.group(1)) + logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}") + + logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") + else: + logger.warning(f"[WEB] status_pon.asp returned {response.status}") + except Exception as e: + logger.error(f"[WEB] Error fetching status_pon.asp: {e}") + + if data: + logger.info(f"[WEB] Scraped {len(data)} fields from web interface") + else: + logger.warning("[WEB] No data scraped from web interface") + + return data + + except Exception as e: + logger.error(f"[WEB] Web scraping failed: {e}") + return {} + + async def _collect_omci_async(self): + try: + reader, writer = await asyncio.wait_for( + telnetlib3.open_connection( + self.config.GPON_HOST, + self.config.GPON_PORT + ), + timeout=10 + ) + + await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5) + writer.write(self.config.GPON_USERNAME + '\n') + + await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5) + writer.write(self.config.GPON_PASSWORD + '\n') + + await asyncio.sleep(2) + initial_output = await reader.read(2048) + + if 'RTK.0>' in initial_output or 'command:#' in initial_output: + writer.write('exit\n') + await asyncio.sleep(1) + await reader.read(1024) + + data = { + 'timestamp': datetime.now().isoformat(), + 'status': 'offline', + } + + writer.write('cat /proc/uptime\n') + await asyncio.sleep(0.5) + uptime_output = await reader.read(1024) + uptime = self._parse_proc_uptime(uptime_output) + if uptime: + data['uptime'] = uptime + logger.debug(f"[TELNET] Uptime from /proc: {uptime}s") + + writer.write('cat /proc/net/dev\n') + await asyncio.sleep(0.5) + netdev_output = await reader.read(4096) + traffic = self._parse_proc_net_dev(netdev_output) + if traffic: + data.update(traffic) + logger.debug(f"[TELNET] Traffic: RX={traffic.get('rx_bytes')} TX={traffic.get('tx_bytes')}") + + writer.write('omcicli get sn\n') + await asyncio.sleep(0.5) + sn_output = await reader.read(1024) + sn = self._parse_serial(sn_output) + if sn: + data['serial_number'] = sn + logger.debug(f"[TELNET] Serial: {sn}") + + writer.write('omcicli mib get Anig\n') + await asyncio.sleep(1.5) + anig_output = await reader.read(16384) + optical = self._parse_anig(anig_output) + if optical: + data.update(optical) + logger.debug(f"[TELNET] Optical: RX={optical.get('rx_power')} TX={optical.get('tx_power')}") + + writer.write('omcicli mib get FecPmhd\n') + await asyncio.sleep(1) + fec_output = await reader.read(8192) + fec = self._parse_fec(fec_output) + if fec: + data.update(fec) + logger.debug(f"[TELNET] FEC: corrected={fec.get('fec_corrected')} uncorrected={fec.get('fec_uncorrected')}") + + writer.write('omcicli mib get Ont2g\n') + await asyncio.sleep(1.5) + ont2g_output = await reader.read(16384) + device_info = self._parse_ont2g(ont2g_output) + if device_info: + data.update(device_info) + logger.debug(f"[TELNET] Device: vendor={device_info.get('vendor_id')} version={device_info.get('version')}") + + writer.write('omcicli mib get OltG\n') + await asyncio.sleep(1) + oltg_output = await reader.read(8192) + olt_info = self._parse_oltg(oltg_output) + if olt_info: + data.update(olt_info) + logger.debug(f"[TELNET] OLT: vendor={olt_info.get('olt_vendor_info')} version={olt_info.get('olt_version_info')}") + + writer.write('omcicli mib get PptpEthUni\n') + await asyncio.sleep(1) + pptpethuni_output = await reader.read(8192) + mac_info = self._parse_pptpethuni(pptpethuni_output) + if mac_info and mac_info.get('mac_address'): + data.update(mac_info) + logger.debug(f"[TELNET] MAC from PptpEthUni: {mac_info.get('mac_address')}") + else: + writer.write('omcicli mib get VeipUni\n') + await asyncio.sleep(1) + veipuni_output = await reader.read(8192) + mac_info = self._parse_veipuni(veipuni_output) + if mac_info and mac_info.get('mac_address'): + data.update(mac_info) + logger.debug(f"[TELNET] MAC from VeipUni: {mac_info.get('mac_address')}") + else: + writer.write('cat /sys/class/net/eth0/address\n') + await asyncio.sleep(0.5) + sys_mac_output = await reader.read(2048) + mac = self._parse_sys_mac(sys_mac_output) + if mac: + data['mac_address'] = mac + logger.debug(f"[TELNET] MAC from /sys: {mac}") + + writer.write('exit\n') + await asyncio.sleep(0.3) + + try: + writer.close() + except: + pass + + if data.get('rx_power') is not None and data.get('tx_power') is not None: + data['status'] = 'online' + + logger.info(f"[TELNET] Collected {len(data)} fields") + return data + + except asyncio.TimeoutError: + logger.error("[TELNET] Timeout during telnet connection") + return None + except Exception as e: + logger.error(f"[TELNET] Telnet error: {e}", exc_info=True) + return None + + def _parse_proc_uptime(self, output): + try: + match = re.search(r'(\d+\.\d+)', output) + if match: + return int(float(match.group(1))) + except Exception as e: + logger.error(f"Error parsing /proc/uptime: {e}") + return None + + def _parse_proc_net_dev(self, output): + try: + data = {} + for line in output.split('\n'): + if 'eth0:' in line or 'br0:' in line: + parts = line.split() + if len(parts) >= 10: + data['rx_bytes'] = int(parts[1]) + data['tx_bytes'] = int(parts[9]) + data['rx_packets'] = int(parts[2]) + data['tx_packets'] = int(parts[10]) + break + return data + except Exception as e: + logger.error(f"Error parsing /proc/net/dev: {e}") + return {} + + def _parse_anig(self, output): + try: + data = {} + + rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output) + if rx_match: + rx_hex = int(rx_match.group(1), 16) + data['rx_power'] = self._convert_optical_power(rx_hex) + + tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output) + if tx_match: + tx_hex = int(tx_match.group(1), 16) + data['tx_power'] = self._convert_optical_power(tx_hex) + + return data + except Exception as e: + logger.error(f"Error parsing ANI-G: {e}") + return {} + + def _parse_fec(self, output): + try: + data = {} + + match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output) + if match: + data['fec_corrected'] = int(match.group(1), 16) + else: + data['fec_corrected'] = 0 + + match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output) + if match: + data['fec_uncorrected'] = int(match.group(1), 16) + else: + data['fec_uncorrected'] = 0 + + return data + except Exception as e: + logger.error(f"Error parsing FEC: {e}") + return {'fec_corrected': 0, 'fec_uncorrected': 0} + + def _parse_ont2g(self, output): + try: + data = {} + + match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + try: + vendor_version = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00') + if vendor_version: + data['vendor_id'] = 'ALCL' + data['version'] = vendor_version + except: + pass + + match = re.search(r'VendorId:\s*(\S+)', output, re.IGNORECASE) + if match: + data['vendor_id'] = match.group(1) + + match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE) + if match: + data['version'] = match.group(1) + + return data + except Exception as e: + logger.error(f"Error parsing ONT2-G: {e}") + return {} + + def _parse_oltg(self, output): + try: + data = {} + + match = re.search(r'OltVendorId:\s*(\S+)', output, re.IGNORECASE) + if match: + data['olt_vendor_info'] = match.group(1) + + match = re.search(r'Version:\s*(\S+)', output, re.IGNORECASE) + if match: + data['olt_version_info'] = match.group(1) + + return data + except Exception as e: + logger.error(f"Error parsing OLT-G: {e}") + return {} + + def _parse_pptpethuni(self, output): + try: + data = {} + + match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) + if match: + data['mac_address'] = match.group(1) + return data + + match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + if len(hex_str) == 12: + mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) + data['mac_address'] = mac + return data + + return data + except Exception as e: + logger.error(f"Error parsing PptpEthUni: {e}") + return {} + + def _parse_veipuni(self, output): + try: + data = {} + + match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) + if match: + data['mac_address'] = match.group(1) + return data + + match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) + if match: + hex_str = match.group(1) + if len(hex_str) == 12: + mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) + data['mac_address'] = mac + return data + + return data + except Exception as e: + logger.error(f"Error parsing VeipUni: {e}") + return {} + + def _parse_sys_mac(self, output): + try: + output = output.replace('cat /sys/class/net/eth0/address', '') + output = output.replace('# ', '').strip() + + match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output) + if match: + return match.group(1) + + match = re.search(r'([0-9a-fA-F]{12})', output) + if match: + mac = match.group(1) + return ':'.join([mac[i:i+2] for i in range(0, 12, 2)]) + + except Exception as e: + logger.error(f"Error parsing /sys MAC: {e}") + return None + + def _parse_serial(self, output): + try: + match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE) + if match: + return match.group(1) + except Exception as e: + logger.error(f"Error parsing serial: {e}") + return None + + def _convert_optical_power(self, hex_value): + if hex_value > 0x7fff: + signed_value = hex_value - 0x10000 + else: + signed_value = hex_value + return signed_value * 0.002 + + def _check_alerts(self, data): + new_alerts = [] + thresholds = self.config.THRESHOLDS + + rx = data.get('rx_power') + if rx is not None: + if rx < thresholds['rx_power_min']: + new_alerts.append({ + 'severity': 'critical', + 'category': 'optical', + 'message': f'RX Power critically low: {rx:.2f} dBm', + 'value': rx, + 'timestamp': datetime.now().isoformat() + }) + elif rx > thresholds['rx_power_max']: + new_alerts.append({ + 'severity': 'warning', + 'category': 'optical', + 'message': f'RX Power high: {rx:.2f} dBm', + 'value': rx, + 'timestamp': datetime.now().isoformat() + }) + + tx = data.get('tx_power') + if tx is not None: + if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']: + new_alerts.append({ + 'severity': 'warning', + 'category': 'optical', + 'message': f'TX Power out of range: {tx:.2f} dBm', + 'value': tx, + 'timestamp': datetime.now().isoformat() + }) + + temp = data.get('temperature') + if temp is not None and temp > thresholds.get('temperature_max', 85): + new_alerts.append({ + 'severity': 'warning', + 'category': 'temperature', + 'message': f'Temperature high: {temp:.1f}°C', + 'value': temp, + 'timestamp': datetime.now().isoformat() + }) + + fec_uncor = data.get('fec_uncorrected', 0) + if fec_uncor > 0: + new_alerts.append({ + 'severity': 'critical', + 'category': 'transmission', + 'message': f'FEC Uncorrected Errors: {fec_uncor}', + 'value': fec_uncor, + 'timestamp': datetime.now().isoformat() + }) + + self.alerts = new_alerts[-20:] + + if new_alerts: + logger.warning(f"Generated {len(new_alerts)} alerts")
RX Packets:--
PON Mode:--
LAN Mode:--
RX Packets:--
TX Packets:--
FEC Corrected:--
FEC Uncorrected:--