from aiohttp import BasicAuth import telnetlib3 import time import logging from threading import Thread, Lock from datetime import datetime import re import asyncio import aiohttp logger = logging.getLogger(__name__) class GPONCollector: def __init__(self, config): self.config = config self.data = {} self.lock = Lock() self.running = False self.thread = None self.alerts = [] self.last_full_collect = 0 self.full_collect_interval = 3600 self.first_run = True def start(self): if self.running: return self.running = True self.thread = Thread(target=self._collect_loop, daemon=True) self.thread.start() logger.info("Collector started") def stop(self): self.running = False if self.thread: self.thread.join(timeout=5) logger.info("Collector stopped") def get_data(self): with self.lock: return self.data.copy() def get_alerts(self): with self.lock: return self.alerts.copy() def _collect_loop(self): while self.running: try: collect_start = time.time() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) now = time.time() full_collect = self.first_run or (now - self.last_full_collect) >= self.full_collect_interval data = loop.run_until_complete(self._collect_all_data(full_collect=full_collect)) loop.close() collect_duration = time.time() - collect_start if full_collect: self.last_full_collect = now if data and data.get('status') == 'online': with self.lock: self.data = data self._check_alerts(data) rx = data.get('rx_power', 0) tx = data.get('tx_power', 0) temp = data.get('temperature', 0) uptime = data.get('uptime', 0) uptime_days = uptime // 86400 uptime_hours = (uptime % 86400) // 3600 uptime_minutes = (uptime % 3600) // 60 if self.first_run: logger.info("=" * 70) logger.info("Initial data collection completed successfully") logger.info(f"Device: {data.get('vendor_id', 'N/A')} {data.get('model', 'N/A')}") logger.info(f"Serial: {data.get('serial_number', 'N/A')}") logger.info(f"Optical: RX={rx:.2f}dBm TX={tx:.2f}dBm") logger.info(f"Status: Temperature={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m") logger.info(f"Collection time: {collect_duration:.2f}s") logger.info("=" * 70) self.first_run = False else: logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}°C Uptime={uptime_days}d {uptime_hours}h {uptime_minutes}m (took {collect_duration:.2f}s)") else: logger.warning(f"No data or device offline (took {collect_duration:.2f}s)") self.first_run = False except Exception as e: logger.error(f"Error in collector loop: {e}", exc_info=True) self.first_run = False time.sleep(self.config.POLL_INTERVAL) async def _collect_all_data(self, full_collect=False): omci_data = await self._collect_omci_async(full_collect=full_collect) if not omci_data: return None web_data = await self._scrape_web_interface() if web_data: for key, value in web_data.items(): if value is not None: omci_data[key] = value return omci_data async def _scrape_web_interface(self): try: scrape_start = time.time() auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD) base_url = f'http://{self.config.GPON_HOST}' data = {} connector = aiohttp.TCPConnector(ssl=False) timeout = aiohttp.ClientTimeout(total=10, connect=5) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: try: url = f'{base_url}/status_pon.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() temp_match = re.search(r']*>Temperature\s*]*>([\d.]+)\s*C', html, re.IGNORECASE) if temp_match: data['temperature'] = float(temp_match.group(1)) volt_match = re.search(r']*>Voltage\s*]*>([\d.]+)\s*V', html, re.IGNORECASE) if volt_match: data['voltage'] = float(volt_match.group(1)) bias_match = re.search(r']*>Bias Current\s*]*>([\d.]+)\s*mA', html, re.IGNORECASE) if bias_match: data['tx_bias_current'] = float(bias_match.group(1)) logger.debug(f"[WEB] status_pon.asp: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") else: logger.warning(f"[WEB] status_pon.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching status_pon.asp: {e}") try: url = f'{base_url}/admin/pon-stats.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() tx_pkts_match = re.search(r']*>Packets Sent:\s*]*>([\d,]+)', html, re.IGNORECASE) if tx_pkts_match: data['tx_packets'] = int(tx_pkts_match.group(1).replace(',', '')) rx_pkts_match = re.search(r']*>Packets Received:\s*]*>([\d,]+)', html, re.IGNORECASE) if rx_pkts_match: data['rx_packets'] = int(rx_pkts_match.group(1).replace(',', '')) tx_bytes_match = re.search(r']*>Bytes Sent:\s*]*>([\d,]+)', html, re.IGNORECASE) if tx_bytes_match: data['tx_bytes'] = int(tx_bytes_match.group(1).replace(',', '')) rx_bytes_match = re.search(r']*>Bytes Received:\s*]*>([\d,]+)', html, re.IGNORECASE) if rx_bytes_match: data['rx_bytes'] = int(rx_bytes_match.group(1).replace(',', '')) logger.debug(f"[WEB] pon-stats.asp: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}") else: logger.warning(f"[WEB] pon-stats.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching pon-stats.asp: {e}") scrape_duration = time.time() - scrape_start logger.info(f"[WEB] Scraped {len(data)} fields in {scrape_duration:.2f}s") return data except Exception as e: logger.error(f"[WEB] Web scraping failed: {e}") return {} async def _collect_omci_async(self, full_collect=False): try: telnet_start = time.time() reader, writer = await asyncio.wait_for( telnetlib3.open_connection( self.config.GPON_HOST, self.config.GPON_PORT ), timeout=10 ) await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5) writer.write(self.config.GPON_USERNAME + '\n') await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5) writer.write(self.config.GPON_PASSWORD + '\n') await asyncio.sleep(2) initial_output = await reader.read(2048) if 'RTK.0>' in initial_output or 'command:#' in initial_output: writer.write('exit\n') await asyncio.sleep(1) await reader.read(1024) data = { 'timestamp': datetime.now().isoformat(), 'status': 'offline', } writer.write('cat /proc/uptime\n') await asyncio.sleep(0.5) uptime_output = await reader.read(1024) uptime = self._parse_proc_uptime(uptime_output) if uptime: data['uptime'] = uptime writer.write('omcicli mib get Anig\n') await asyncio.sleep(1.5) anig_output = await reader.read(16384) optical = self._parse_anig(anig_output) if optical: data.update(optical) if full_collect: logger.info("[TELNET] Full collection mode - gathering all static data") writer.write('omcicli get sn\n') await asyncio.sleep(0.5) sn_output = await reader.read(1024) sn = self._parse_serial(sn_output) if sn: data['serial_number'] = sn if len(sn) >= 4: data['vendor_id'] = sn[:4] writer.write('omcicli mib get FecPmhd\n') await asyncio.sleep(1) fec_output = await reader.read(8192) fec = self._parse_fec(fec_output) if fec: data.update(fec) writer.write('omcicli mib get Ont2g\n') await asyncio.sleep(1.5) ont2g_output = await reader.read(16384) device_info = self._parse_ont2g(ont2g_output) if device_info: data.update(device_info) writer.write('omcicli mib get OltG\n') await asyncio.sleep(1) oltg_output = await reader.read(8192) olt_info = self._parse_oltg(oltg_output) if olt_info: data.update(olt_info) writer.write('omcicli mib get PptpEthUni\n') await asyncio.sleep(1) pptpethuni_output = await reader.read(8192) mac_info = self._parse_pptpethuni(pptpethuni_output) if mac_info and mac_info.get('mac_address'): data.update(mac_info) else: writer.write('omcicli mib get VeipUni\n') await asyncio.sleep(1) veipuni_output = await reader.read(8192) mac_info = self._parse_veipuni(veipuni_output) if mac_info and mac_info.get('mac_address'): data.update(mac_info) else: writer.write('cat /sys/class/net/eth0/address\n') await asyncio.sleep(0.5) sys_mac_output = await reader.read(2048) mac = self._parse_sys_mac(sys_mac_output) if mac: data['mac_address'] = mac flash_params = ['OMCI_SW_VER1', 'HW_HWVER', 'OMCI_OLT_MODE', 'LAN_SDS_MODE', 'PON_MODE', 'OMCC_VER'] for param in flash_params: writer.write(f'flash get {param}\n') await asyncio.sleep(0.3) param_output = await reader.read(4096) value = self._parse_flash_value(param_output, param) if value: if param == 'OMCI_SW_VER1': data['version'] = value elif param == 'HW_HWVER': data['hw_version'] = value elif param == 'OMCI_OLT_MODE': data['olt_mode'] = self._decode_olt_mode(value) elif param == 'LAN_SDS_MODE': data['lan_mode'] = self._decode_lan_mode(value) elif param == 'PON_MODE': data['pon_mode'] = self._decode_pon_mode(value) elif param == 'OMCC_VER': data['omcc_version'] = hex(int(value)) if value.isdigit() else value else: logger.info("[TELNET] Fast collection mode - using cached static data") with self.lock: logger.info(f"[CACHE] self.data contains {len(self.data)} keys") if self.data: cached_keys = ['serial_number', 'vendor_id', 'model', 'version', 'hw_version', 'mac_address', 'olt_vendor_info', 'olt_version_info', 'olt_mode', 'lan_mode', 'pon_mode', 'omcc_version', 'fec_corrected', 'fec_uncorrected'] cached_count = 0 for key in cached_keys: if key in self.data: data[key] = self.data[key] cached_count += 1 logger.info(f"[CACHE] Copied {cached_count}/{len(cached_keys)} cached fields") else: logger.warning("[CACHE] self.data is empty! Cache not available.") writer.write('exit\n') await asyncio.sleep(0.3) try: writer.close() except: pass if data.get('rx_power') is not None and data.get('tx_power') is not None: data['status'] = 'online' telnet_duration = time.time() - telnet_start logger.info(f"[TELNET] Collected {len(data)} fields (full={full_collect}) in {telnet_duration:.2f}s") return data except asyncio.TimeoutError: logger.error("[TELNET] Timeout during telnet connection") return None except Exception as e: logger.error(f"[TELNET] Telnet error: {e}", exc_info=True) return None def _parse_proc_uptime(self, output): try: match = re.search(r'(\d+\.\d+)', output) if match: return int(float(match.group(1))) except Exception as e: logger.error(f"Error parsing /proc/uptime: {e}") return None def _parse_anig(self, output): try: data = {} rx_match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', output) if rx_match: rx_hex = int(rx_match.group(1), 16) data['rx_power'] = self._convert_optical_power(rx_hex) tx_match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', output) if tx_match: tx_hex = int(tx_match.group(1), 16) data['tx_power'] = self._convert_optical_power(tx_hex) return data except Exception as e: logger.error(f"Error parsing ANI-G: {e}") return {} def _parse_fec(self, output): try: data = {} match = re.search(r'CorCodeWords:\s*(0x[0-9a-fA-F]+)', output) if match: data['fec_corrected'] = int(match.group(1), 16) else: data['fec_corrected'] = 0 match = re.search(r'UncorCodeWords:\s*(0x[0-9a-fA-F]+)', output) if match: data['fec_uncorrected'] = int(match.group(1), 16) else: data['fec_uncorrected'] = 0 return data except Exception as e: logger.error(f"Error parsing FEC: {e}") return {'fec_corrected': 0, 'fec_uncorrected': 0} def _parse_ont2g(self, output): try: data = {} match = re.search(r'EqtID:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) if match: hex_str = match.group(1) try: model = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip() if model: data['model'] = model except: pass return data except Exception as e: logger.error(f"Error parsing ONT2-G: {e}") return {} def _parse_oltg(self, output): try: data = {} match = re.search(r'OltVendorId:\s*([A-Z0-9]{4})', output, re.IGNORECASE) if match: data['olt_vendor_info'] = match.group(1) else: match = re.search(r'OltVendorId:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) if match: hex_str = match.group(1) try: olt_vendor = bytes.fromhex(hex_str).decode('ascii', errors='ignore').strip('\x00').strip() if olt_vendor: data['olt_vendor_info'] = olt_vendor except: pass match = re.search(r'Version:\s*(\d+)', output, re.IGNORECASE) if match: data['olt_version_info'] = match.group(1) return data except Exception as e: logger.error(f"Error parsing OLT-G: {e}") return {} def _parse_pptpethuni(self, output): try: data = {} match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) if match: data['mac_address'] = match.group(1) return data match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) if match: hex_str = match.group(1) if len(hex_str) == 12: mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) data['mac_address'] = mac return data return data except Exception as e: logger.error(f"Error parsing PptpEthUni: {e}") return {} def _parse_veipuni(self, output): try: data = {} match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', output, re.IGNORECASE) if match: data['mac_address'] = match.group(1) return data match = re.search(r'MacAddress:\s*0x([0-9a-fA-F]+)', output, re.IGNORECASE) if match: hex_str = match.group(1) if len(hex_str) == 12: mac = ':'.join([hex_str[i:i+2] for i in range(0, 12, 2)]) data['mac_address'] = mac return data return data except Exception as e: logger.error(f"Error parsing VeipUni: {e}") return {} def _parse_sys_mac(self, output): try: output = output.replace('cat /sys/class/net/eth0/address', '') output = output.replace('# ', '').strip() match = re.search(r'([0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2})', output) if match: return match.group(1) match = re.search(r'([0-9a-fA-F]{12})', output) if match: mac = match.group(1) return ':'.join([mac[i:i+2] for i in range(0, 12, 2)]) except Exception as e: logger.error(f"Error parsing /sys MAC: {e}") return None def _parse_serial(self, output): try: match = re.search(r'SerialNumber:\s*(\S+)', output, re.IGNORECASE) if match: return match.group(1) except Exception as e: logger.error(f"Error parsing serial: {e}") return None def _parse_flash_value(self, output, key): try: pattern = rf'{key}=(.+?)(?:\r|\n|$)' match = re.search(pattern, output, re.IGNORECASE) if match: value = match.group(1).strip() if value and value != 'DEFAULT': return value return None except Exception as e: logger.error(f"Error parsing flash value {key}: {e}") return None def _decode_olt_mode(self, value): modes = { '0': 'Disabled', '1': 'Huawei', '2': 'ZTE', '3': 'Customized', '21': 'Auto' } return modes.get(value, f'Unknown ({value})') def _decode_lan_mode(self, value): modes = { '0': 'Auto', '1': 'Fiber 1G', '8': 'HSGMII 2.5G' } return modes.get(value, f'Mode {value}') def _decode_pon_mode(self, value): modes = { '1': 'GPON', '2': 'EPON' } return modes.get(value, f'Mode {value}') def _convert_optical_power(self, hex_value): if hex_value > 0x7fff: signed_value = hex_value - 0x10000 else: signed_value = hex_value return signed_value * 0.002 def _check_alerts(self, data): new_alerts = [] thresholds = self.config.THRESHOLDS rx = data.get('rx_power') if rx is not None: if rx < thresholds['rx_power_min']: new_alerts.append({ 'severity': 'critical', 'category': 'optical', 'message': f'RX Power critically low: {rx:.2f} dBm', 'value': rx, 'timestamp': datetime.now().isoformat() }) elif rx > thresholds['rx_power_max']: new_alerts.append({ 'severity': 'warning', 'category': 'optical', 'message': f'RX Power high: {rx:.2f} dBm', 'value': rx, 'timestamp': datetime.now().isoformat() }) tx = data.get('tx_power') if tx is not None: if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']: new_alerts.append({ 'severity': 'warning', 'category': 'optical', 'message': f'TX Power out of range: {tx:.2f} dBm', 'value': tx, 'timestamp': datetime.now().isoformat() }) temp = data.get('temperature') if temp is not None and temp > thresholds.get('temperature_max', 85): new_alerts.append({ 'severity': 'warning', 'category': 'temperature', 'message': f'Temperature high: {temp:.1f}°C', 'value': temp, 'timestamp': datetime.now().isoformat() }) fec_uncor = data.get('fec_uncorrected', 0) if fec_uncor > 0: new_alerts.append({ 'severity': 'critical', 'category': 'transmission', 'message': f'FEC Uncorrected Errors: {fec_uncor}', 'value': fec_uncor, 'timestamp': datetime.now().isoformat() }) self.alerts = new_alerts[-20:] if new_alerts: logger.warning(f"Generated {len(new_alerts)} alerts")