from aiohttp import BasicAuth import telnetlib3 import time import logging from threading import Thread, Lock from datetime import datetime import re import asyncio import aiohttp logger = logging.getLogger(__name__) class GPONCollector: def __init__(self, config): self.config = config self.data = {} self.lock = Lock() self.running = False self.thread = None self.alerts = [] def start(self): if self.running: return self.running = True self.thread = Thread(target=self._collect_loop, daemon=True) self.thread.start() logger.info("Collector started") def stop(self): self.running = False if self.thread: self.thread.join(timeout=5) logger.info("Collector stopped") def get_data(self): with self.lock: return self.data.copy() def get_alerts(self): with self.lock: return self.alerts.copy() def _collect_loop(self): while self.running: try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) data = loop.run_until_complete(self._collect_all_data()) loop.close() if data and data.get('status') == 'online': with self.lock: self.data = data self._check_alerts(data) rx = data.get('rx_power', 0) tx = data.get('tx_power', 0) temp = data.get('temperature', 0) uptime = data.get('uptime', 0) uptime_days = uptime // 86400 logger.info(f"Collected: RX={rx:.2f}dBm TX={tx:.2f}dBm Temp={temp:.1f}C Uptime={uptime}s ({uptime_days}d)") else: logger.warning("No data or device offline") except Exception as e: logger.error(f"Error in collector loop: {e}", exc_info=True) time.sleep(self.config.POLL_INTERVAL) async def _collect_all_data(self): omci_data = await self._collect_omci_async() if not omci_data: return None web_data = await self._scrape_web_interface() if web_data: for key, value in web_data.items(): if value is not None: omci_data[key] = value return omci_data async def _scrape_web_interface(self): try: auth = BasicAuth(self.config.GPON_USERNAME, self.config.GPON_PASSWORD) base_url = f'http://{self.config.GPON_HOST}' data = {} connector = aiohttp.TCPConnector(ssl=False) timeout = aiohttp.ClientTimeout(total=10, connect=5) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: try: url = f'{base_url}/status_pon.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() temp_match = re.search(r']*>Temperature\s*]*>([\d.]+)\s*C', html, re.IGNORECASE) if temp_match: data['temperature'] = float(temp_match.group(1)) logger.debug(f"[WEB] Temperature: {data['temperature']}") volt_match = re.search(r']*>Voltage\s*]*>([\d.]+)\s*V', html, re.IGNORECASE) if volt_match: data['voltage'] = float(volt_match.group(1)) logger.debug(f"[WEB] Voltage: {data['voltage']}") bias_match = re.search(r']*>Bias Current\s*]*>([\d.]+)\s*mA', html, re.IGNORECASE) if bias_match: data['tx_bias_current'] = float(bias_match.group(1)) logger.debug(f"[WEB] TX Bias: {data['tx_bias_current']}") logger.info(f"[WEB] status_pon.asp OK: temp={data.get('temperature')} voltage={data.get('voltage')} bias={data.get('tx_bias_current')}") else: logger.warning(f"[WEB] status_pon.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching status_pon.asp: {e}") try: url = f'{base_url}/status.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() uptime_match = re.search( r']*>Uptime\s*]*>\s*([^<]+)', html, re.IGNORECASE ) if uptime_match: uptime_str = uptime_match.group(1).strip() logger.debug(f"[WEB] Raw uptime: '{uptime_str}'") days_min_match = re.search(r'(\d+)\s*days?,\s*(\d+)\s*min', uptime_str, re.IGNORECASE) if days_min_match: days = int(days_min_match.group(1)) minutes = int(days_min_match.group(2)) data['uptime'] = (days * 86400) + (minutes * 60) logger.info(f"[WEB] Uptime: {days}d {minutes}m = {data['uptime']}s") elif days_hm_match := re.search(r'(\d+)\s*days?,\s*(\d+):(\d+)', uptime_str): days = int(days_hm_match.group(1)) hours = int(days_hm_match.group(2)) minutes = int(days_hm_match.group(3)) data['uptime'] = (days * 86400) + (hours * 3600) + (minutes * 60) logger.info(f"[WEB] Uptime: {days}d {hours}h {minutes}m = {data['uptime']}s") elif hm_match := re.search(r'(\d+)\s*h(?:our)?[s]?,?\s*(\d+)\s*min', uptime_str, re.IGNORECASE): hours = int(hm_match.group(1)) minutes = int(hm_match.group(2)) data['uptime'] = (hours * 3600) + (minutes * 60) logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") elif time_match := re.search(r'^(\d+):(\d+)$', uptime_str): hours = int(time_match.group(1)) minutes = int(time_match.group(2)) data['uptime'] = (hours * 3600) + (minutes * 60) logger.info(f"[WEB] Uptime: {hours}h {minutes}m = {data['uptime']}s") else: logger.warning(f"[WEB] Could not parse uptime: '{uptime_str}'") else: logger.warning("[WEB] Uptime not found in status.asp") logger.info(f"[WEB] status.asp OK: uptime={data.get('uptime')}") else: logger.warning(f"[WEB] status.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching status.asp: {e}") try: url = f'{base_url}/admin/pon-stats.asp' async with session.get(url, auth=auth) as response: if response.status == 200: html = await response.text() tx_pkts_match = re.search(r']*>Packets Sent:\s*]*>(\d+)', html, re.IGNORECASE) if tx_pkts_match: data['tx_packets'] = int(tx_pkts_match.group(1)) logger.debug(f"[WEB] TX Packets: {data['tx_packets']}") rx_pkts_match = re.search(r']*>Packets Received:\s*]*>(\d+)', html, re.IGNORECASE) if rx_pkts_match: data['rx_packets'] = int(rx_pkts_match.group(1)) logger.debug(f"[WEB] RX Packets: {data['rx_packets']}") tx_bytes_match = re.search(r']*>Bytes Sent:\s*]*>(\d+)', html, re.IGNORECASE) if tx_bytes_match: data['tx_bytes'] = int(tx_bytes_match.group(1)) logger.debug(f"[WEB] TX Bytes: {data['tx_bytes']}") rx_bytes_match = re.search(r']*>Bytes Received:\s*]*>(\d+)', html, re.IGNORECASE) if rx_bytes_match: data['rx_bytes'] = int(rx_bytes_match.group(1)) logger.debug(f"[WEB] RX Bytes: {data['rx_bytes']}") logger.info(f"[WEB] pon-stats.asp OK: rx_pkts={data.get('rx_packets')} tx_pkts={data.get('tx_packets')}") else: logger.warning(f"[WEB] pon-stats.asp returned {response.status}") except Exception as e: logger.error(f"[WEB] Error fetching pon-stats.asp: {e}") if data: logger.info(f"[WEB] Scraped {len(data)} fields from web interface") else: logger.warning("[WEB] No data scraped from web interface") return data except Exception as e: logger.error(f"[WEB] Web scraping failed: {e}") return {} async def _collect_omci_async(self): try: reader, writer = await asyncio.wait_for( telnetlib3.open_connection( self.config.GPON_HOST, self.config.GPON_PORT ), timeout=10 ) await asyncio.wait_for(reader.readuntil(b'login: '), timeout=5) writer.write(self.config.GPON_USERNAME + '\n') await asyncio.wait_for(reader.readuntil(b'Password: '), timeout=5) writer.write(self.config.GPON_PASSWORD + '\n') await asyncio.sleep(2) initial_output = await reader.read(2048) if 'RTK.0>' in initial_output or 'command:#' in initial_output: writer.write('exit\n') await asyncio.sleep(1) await reader.read(1024) writer.write('omcicli mib get all\n') await asyncio.sleep(3) full_output = await reader.read(102400) writer.write('exit\n') await asyncio.sleep(0.3) try: writer.close() except: pass logger.debug(f"[OMCI] Received {len(full_output)} bytes") return self._parse_omci(full_output) except asyncio.TimeoutError: logger.error("[OMCI] Timeout during telnet connection") return None except Exception as e: logger.error(f"[OMCI] Telnet error: {e}", exc_info=True) return None def _parse_omci(self, raw_data): try: with open('/tmp/omci_debug.txt', 'w') as f: f.write(raw_data) except: pass data = { 'timestamp': datetime.now().isoformat(), 'status': 'offline', } try: match = re.search(r'OpticalSignalLevel:\s*(0x[0-9a-fA-F]+)', raw_data) if match: rx_hex = int(match.group(1), 16) data['rx_power'] = self._convert_optical_power(rx_hex) match = re.search(r'TranOpticLevel:\s*(0x[0-9a-fA-F]+)', raw_data) if match: tx_hex = int(match.group(1), 16) data['tx_power'] = self._convert_optical_power(tx_hex) match = re.search(r'SerialNum:\s*(\S+)', raw_data) if match: data['serial_number'] = match.group(1) match = re.search(r'Version:\s*(M\d+\S*)', raw_data) if match: data['version'] = match.group(1) match = re.search(r'VID:\s*(\S+)', raw_data) if match: data['vendor_id'] = match.group(1) match = re.search(r'OltVendorId:\s*(\S+)', raw_data) if match: data['olt_vendor_info'] = match.group(1) match = re.search(r'OltG.*?Version:\s*(\d+)', raw_data, re.DOTALL) if match: data['olt_version_info'] = match.group(1) match = re.search(r'MacAddress:\s*([0-9a-fA-F:]+)', raw_data) if match: data['mac_address'] = match.group(1) fec_section = re.search( r'FecPmhd.*?CorCodeWords:\s*(0x[0-9a-fA-F]+).*?UncorCodeWords:\s*(0x[0-9a-fA-F]+)', raw_data, re.DOTALL ) if fec_section: data['fec_corrected'] = int(fec_section.group(1), 16) data['fec_uncorrected'] = int(fec_section.group(2), 16) else: data['fec_corrected'] = 0 data['fec_uncorrected'] = 0 if data.get('rx_power') is not None and data.get('tx_power') is not None: data['status'] = 'online' except Exception as e: logger.error(f"[OMCI] Parsing error: {e}", exc_info=True) return data def _convert_optical_power(self, hex_value): if hex_value > 0x7fff: signed_value = hex_value - 0x10000 else: signed_value = hex_value return signed_value * 0.002 def _check_alerts(self, data): new_alerts = [] thresholds = self.config.THRESHOLDS rx = data.get('rx_power') if rx is not None: if rx < thresholds['rx_power_min']: new_alerts.append({ 'severity': 'critical', 'category': 'optical', 'message': f'RX Power critically low: {rx:.2f} dBm', 'value': rx, 'timestamp': datetime.now().isoformat() }) elif rx > thresholds['rx_power_max']: new_alerts.append({ 'severity': 'warning', 'category': 'optical', 'message': f'RX Power high: {rx:.2f} dBm', 'value': rx, 'timestamp': datetime.now().isoformat() }) tx = data.get('tx_power') if tx is not None: if tx < thresholds['tx_power_min'] or tx > thresholds['tx_power_max']: new_alerts.append({ 'severity': 'warning', 'category': 'optical', 'message': f'TX Power out of range: {tx:.2f} dBm', 'value': tx, 'timestamp': datetime.now().isoformat() }) temp = data.get('temperature') if temp is not None and temp > thresholds.get('temperature_max', 85): new_alerts.append({ 'severity': 'warning', 'category': 'temperature', 'message': f'Temperature high: {temp:.1f}C', 'value': temp, 'timestamp': datetime.now().isoformat() }) fec_uncor = data.get('fec_uncorrected', 0) if fec_uncor > 0: new_alerts.append({ 'severity': 'critical', 'category': 'transmission', 'message': f'FEC Uncorrected Errors: {fec_uncor}', 'value': fec_uncor, 'timestamp': datetime.now().isoformat() }) self.alerts = new_alerts[-20:] if new_alerts: logger.warning(f"Generated {len(new_alerts)} alerts")