import subprocess import logging from pathlib import Path logger = logging.getLogger(__name__) class RRDManager: def __init__(self, rrd_dir): self.rrd_dir = Path(rrd_dir) self.rrd_dir.mkdir(parents=True, exist_ok=True) try: result = subprocess.run(['rrdtool', '--version'], capture_output=True, check=True, text=True) version = result.stdout.split('\n')[0] logger.info(f"RRDtool available: {version}") except (subprocess.CalledProcessError, FileNotFoundError) as e: logger.error("RRDtool not installed!") raise RuntimeError("Install rrdtool: sudo apt-get install rrdtool") self.rrd_files = { 'optical': self.rrd_dir / 'optical.rrd', 'traffic': self.rrd_dir / 'traffic.rrd', 'fec': self.rrd_dir / 'fec.rrd', 'system': self.rrd_dir / 'system.rrd', } self._init_rrds() def _init_rrds(self): if not self.rrd_files['optical'].exists(): logger.info(f"Creating {self.rrd_files['optical']}") cmd = [ 'rrdtool', 'create', str(self.rrd_files['optical']), '--step', '60', '--start', 'now-10s', 'DS:rx_power:GAUGE:300:-40:10', 'DS:tx_power:GAUGE:300:-10:10', 'DS:voltage:GAUGE:300:0:5', 'DS:tx_bias:GAUGE:300:0:200', 'DS:temperature:GAUGE:300:-20:100', 'RRA:LAST:0.5:1:1440', 'RRA:AVERAGE:0.5:1:1440', 'RRA:AVERAGE:0.5:5:2016', 'RRA:AVERAGE:0.5:30:1488', 'RRA:AVERAGE:0.5:360:730', 'RRA:MAX:0.5:1:1440', 'RRA:MAX:0.5:5:2016', 'RRA:MIN:0.5:1:1440', 'RRA:MIN:0.5:5:2016', ] subprocess.run(cmd, check=True) logger.info("optical.rrd created") else: logger.info(f"optical.rrd exists: {self.rrd_files['optical']}") if not self.rrd_files['traffic'].exists(): logger.info(f"Creating {self.rrd_files['traffic']}") cmd = [ 'rrdtool', 'create', str(self.rrd_files['traffic']), '--step', '60', '--start', 'now-10s', 'DS:rx_packets:DERIVE:300:0:U', 'DS:tx_packets:DERIVE:300:0:U', 'DS:rx_bytes:DERIVE:300:0:U', 'DS:tx_bytes:DERIVE:300:0:U', 'RRA:LAST:0.5:1:1440', 'RRA:AVERAGE:0.5:1:1440', 'RRA:AVERAGE:0.5:5:2016', 'RRA:AVERAGE:0.5:30:1488', 'RRA:AVERAGE:0.5:360:730', 'RRA:MAX:0.5:1:1440', 'RRA:MAX:0.5:5:2016', ] subprocess.run(cmd, check=True) logger.info("traffic.rrd created") else: logger.info(f"traffic.rrd exists: {self.rrd_files['traffic']}") if not self.rrd_files['fec'].exists(): logger.info(f"Creating {self.rrd_files['fec']}") cmd = [ 'rrdtool', 'create', str(self.rrd_files['fec']), '--step', '60', '--start', 'now-10s', 'DS:corrected:DERIVE:300:0:U', 'DS:uncorrected:DERIVE:300:0:U', 'DS:total_codewords:DERIVE:300:0:U', 'RRA:LAST:0.5:1:1440', 'RRA:AVERAGE:0.5:1:1440', 'RRA:AVERAGE:0.5:5:2016', 'RRA:AVERAGE:0.5:30:1488', 'RRA:MAX:0.5:1:1440', ] subprocess.run(cmd, check=True) logger.info("fec.rrd created") else: logger.info(f"fec.rrd exists: {self.rrd_files['fec']}") if not self.rrd_files['system'].exists(): logger.info(f"Creating {self.rrd_files['system']}") cmd = [ 'rrdtool', 'create', str(self.rrd_files['system']), '--step', '60', '--start', 'now-10s', 'DS:uptime:GAUGE:300:0:U', 'DS:status:GAUGE:300:0:1', 'RRA:LAST:0.5:1:1440', 'RRA:AVERAGE:0.5:1:1440', 'RRA:AVERAGE:0.5:5:2016', 'RRA:AVERAGE:0.5:30:1488', ] subprocess.run(cmd, check=True) logger.info("system.rrd created") else: logger.info(f"system.rrd exists: {self.rrd_files['system']}") def update(self, data): timestamp = 'N' updates_ok = 0 updates_fail = 0 try: if data.get('rx_power') is not None and data.get('tx_power') is not None: values = ':'.join([ timestamp, str(data.get('rx_power', 'U')), str(data.get('tx_power', 'U')), str(data.get('voltage', 'U')), str(data.get('tx_bias_current', 'U')), str(data.get('temperature', 'U')) ]) try: subprocess.run(['rrdtool', 'update', str(self.rrd_files['optical']), values], check=True, capture_output=True, text=True) logger.info(f"RRD optical: RX={data.get('rx_power'):.2f} TX={data.get('tx_power'):.2f} Temp={data.get('temperature', 'N/A')}") updates_ok += 1 except subprocess.CalledProcessError as e: logger.error(f"RRD optical failed: {e.stderr}") updates_fail += 1 else: logger.warning("RRD optical: missing RX/TX power data") if data.get('rx_packets') is not None: values = ':'.join([ timestamp, str(data.get('rx_packets', 0)), str(data.get('tx_packets', 0)), str(data.get('rx_bytes', 0)), str(data.get('tx_bytes', 0)) ]) try: subprocess.run(['rrdtool', 'update', str(self.rrd_files['traffic']), values], check=True, capture_output=True, text=True) logger.info(f"RRD traffic: RX={data.get('rx_packets'):,} TX={data.get('tx_packets'):,} pkts") updates_ok += 1 except subprocess.CalledProcessError as e: logger.error(f"RRD traffic failed: {e.stderr}") updates_fail += 1 else: logger.warning("RRD traffic: missing packets data") if data.get('fec_corrected') is not None: values = ':'.join([ timestamp, str(data.get('fec_corrected', 0)), str(data.get('fec_uncorrected', 0)), str(data.get('fec_total_codewords', 0)) ]) try: subprocess.run(['rrdtool', 'update', str(self.rrd_files['fec']), values], check=True, capture_output=True, text=True) logger.info(f"RRD fec: corrected={data.get('fec_corrected')} uncorrected={data.get('fec_uncorrected')}") updates_ok += 1 except subprocess.CalledProcessError as e: logger.error(f"RRD fec failed: {e.stderr}") updates_fail += 1 else: logger.warning("RRD fec: missing FEC data") status_val = 1 if data.get('status') == 'online' else 0 values = f"{timestamp}:{data.get('uptime', 0)}:{status_val}" try: subprocess.run(['rrdtool', 'update', str(self.rrd_files['system']), values], check=True, capture_output=True, text=True) logger.info(f"RRD system: uptime={data.get('uptime', 0)}s status={data.get('status')}") updates_ok += 1 except subprocess.CalledProcessError as e: logger.error(f"RRD system failed: {e.stderr}") updates_fail += 1 logger.info(f"RRD update summary: {updates_ok} OK, {updates_fail} failed") except Exception as e: logger.error(f"RRD update error: {e}", exc_info=True) def fetch(self, metric, period='6h', cf=None): periods = { '1h': 3600, '6h': 21600, '12h': 43200, '24h': 86400, '3d': 259200, '7d': 604800, '14d': 1209600, '30d': 2592000, '60d': 5184000, '90d': 7776000, '120d': 10368000, '1y': 31536000, '2y': 63072000, '5y': 157680000, } seconds = periods.get(period, 21600) start = f"-{seconds}s" rrd_file = self.rrd_files.get(metric) if not rrd_file or not rrd_file.exists(): logger.warning(f"RRD file not found: {metric} -> {rrd_file}") return None if cf is None: if metric in ['traffic', 'fec']: cf = 'AVERAGE' else: cf = 'AVERAGE' resolution = None if seconds <= 3600: resolution = 60 elif seconds <= 43200: resolution = 300 elif seconds <= 604800: resolution = 300 elif seconds <= 2592000: resolution = 1800 else: resolution = 21600 try: cmd = [ 'rrdtool', 'fetch', str(rrd_file), cf, '--start', start, '--end', 'now', '--resolution', str(resolution) ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) parsed = self._parse_fetch_output(result.stdout) if parsed: data_points = len(parsed['timestamps']) first_ds = parsed['ds_names'][0] if parsed['ds_names'] else None non_null_count = sum(1 for v in parsed['data'].get(first_ds, []) if v is not None) if first_ds else 0 max_expected_points = int(seconds / resolution) if data_points > max_expected_points * 2: logger.warning(f"RRD returned too many points: {data_points} (expected ~{max_expected_points}), applying downsampling") parsed = self._downsample_data(parsed, max_points=800) data_points = len(parsed['timestamps']) logger.info(f"RRD fetch {metric}/{period}/{cf} resolution={resolution}s: {data_points} points, {non_null_count} non-null") else: logger.warning(f"RRD fetch {metric}/{period}: no data parsed") return parsed except subprocess.CalledProcessError as e: logger.error(f"RRD fetch error {metric}: {e.stderr}") return None def _downsample_data(self, parsed, max_points=800): if len(parsed['timestamps']) <= max_points: return parsed step = len(parsed['timestamps']) // max_points new_timestamps = parsed['timestamps'][::step] new_data = {} for ds_name in parsed['ds_names']: original = parsed['data'][ds_name] downsampled = [] for i in range(0, len(original), step): chunk = original[i:i+step] valid = [v for v in chunk if v is not None] if valid: downsampled.append(sum(valid) / len(valid)) else: downsampled.append(None) new_data[ds_name] = downsampled logger.info(f"Backend downsampled: {len(parsed['timestamps'])} -> {len(new_timestamps)} points") return { 'start': parsed['start'], 'end': parsed['end'], 'step': parsed['step'] * step, 'ds_names': parsed['ds_names'], 'timestamps': new_timestamps, 'data': new_data } def _parse_fetch_output(self, output): lines = output.strip().split('\n') if len(lines) < 2: logger.warning("RRD fetch output too short") return None ds_names = lines[0].split() logger.debug(f"RRD DS names: {ds_names}") timestamps = [] data = {name: [] for name in ds_names} for line in lines[1:]: if ':' not in line: continue parts = line.split(':') try: timestamp = int(parts[0].strip()) except ValueError: continue values = parts[1].strip().split() timestamps.append(timestamp) for i, value_str in enumerate(values): if i < len(ds_names): try: if 'nan' in value_str.lower(): value = None else: value_str = value_str.replace(',', '.') value = float(value_str) except ValueError: value = None data[ds_names[i]].append(value) if not timestamps: logger.warning("No timestamps found in RRD output") return None step = (timestamps[1] - timestamps[0]) if len(timestamps) > 1 else 60 for ds_name in ds_names: non_null = sum(1 for v in data[ds_name] if v is not None) logger.debug(f"RRD {ds_name}: {non_null}/{len(data[ds_name])} non-null values") return { 'start': timestamps[0], 'end': timestamps[-1], 'step': step, 'ds_names': ds_names, 'timestamps': timestamps, 'data': data }