""" StatsCollector - ring buffer of per-flow traffic statistics. Stores the last 300 samples (5 minutes at 1-second intervals) per flow, with derived rates and loss calculations. """ import threading import time from collections import defaultdict, deque class StatsCollector: """Collects and stores per-flow traffic statistics in a ring buffer.""" def __init__(self, max_samples: int = 300): self._lock = threading.Lock() self._max_samples = max_samples # flow_id -> deque of sample dicts self._history = defaultdict(lambda: deque(maxlen=self._max_samples)) # flow_id -> previous counters for rate calculation self._prev = {} def record(self, flow_id: str, tx_packets: int, tx_bytes: int, rx_packets: int = 0, rx_bytes: int = 0, latency: dict = None): """Record a stats sample for a flow. Args: flow_id: Flow identifier. tx_packets: Cumulative transmitted packets. tx_bytes: Cumulative transmitted bytes. rx_packets: Cumulative received packets. rx_bytes: Cumulative received bytes. latency: Optional dict with min_ms, avg_ms, max_ms, jitter_ms. """ now = time.time() with self._lock: prev = self._prev.get(flow_id) if prev is not None: dt = now - prev['time'] if dt > 0: d_tx_pkts = tx_packets - prev['tx_packets'] d_tx_bytes = tx_bytes - prev['tx_bytes'] d_rx_pkts = rx_packets - prev['rx_packets'] d_rx_bytes = rx_bytes - prev['rx_bytes'] tx_pps = d_tx_pkts / dt rx_pps = d_rx_pkts / dt tx_mbps = (d_tx_bytes * 8) / (dt * 1_000_000) rx_mbps = (d_rx_bytes * 8) / (dt * 1_000_000) else: tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0 else: tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0 # Loss calculation loss_pct = 0.0 if tx_packets > 0 and rx_packets > 0: loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100) sample = { 'timestamp': now, 'tx_packets': tx_packets, 'tx_bytes': tx_bytes, 'rx_packets': rx_packets, 'rx_bytes': rx_bytes, 'tx_pps': round(tx_pps, 2), 'rx_pps': round(rx_pps, 2), 'tx_mbps': round(tx_mbps, 4), 'rx_mbps': round(rx_mbps, 4), 'loss_pct': round(loss_pct, 3), } if latency: sample['latency'] = latency self._history[flow_id].append(sample) self._prev[flow_id] = { 'time': now, 'tx_packets': tx_packets, 'tx_bytes': tx_bytes, 'rx_packets': rx_packets, 'rx_bytes': rx_bytes, } def get_history(self, flow_id: str, count: int = 60) -> list: """Get the last N samples for a flow.""" with self._lock: history = self._history.get(flow_id) if not history: return [] samples = list(history) return samples[-count:] def get_latest(self, flow_id: str) -> dict: """Get the most recent sample for a flow.""" with self._lock: history = self._history.get(flow_id) if not history: return {} return dict(history[-1]) def get_all_latest(self) -> dict: """Get the most recent sample for every flow.""" with self._lock: result = {} for flow_id, history in self._history.items(): if history: result[flow_id] = dict(history[-1]) return result def remove_flow(self, flow_id: str): """Remove all data for a flow.""" with self._lock: self._history.pop(flow_id, None) self._prev.pop(flow_id, None)