sam dcebf15bb3 Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
  Telegraf container with gnmi input plugin, InfluxDB for time-series
  storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
  with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
  back-to-back), Vue 3 web UI with flow builder, test runner, real-time
  stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
  traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00

120 lines
4.1 KiB
Python

"""
StatsCollector - ring buffer of per-flow traffic statistics.
Stores the last 300 samples (5 minutes at 1-second intervals) per flow,
with derived rates and loss calculations.
"""
import threading
import time
from collections import defaultdict, deque
class StatsCollector:
"""Collects and stores per-flow traffic statistics in a ring buffer."""
def __init__(self, max_samples: int = 300):
self._lock = threading.Lock()
self._max_samples = max_samples
# flow_id -> deque of sample dicts
self._history = defaultdict(lambda: deque(maxlen=self._max_samples))
# flow_id -> previous counters for rate calculation
self._prev = {}
def record(self, flow_id: str, tx_packets: int, tx_bytes: int,
rx_packets: int = 0, rx_bytes: int = 0,
latency: dict = None):
"""Record a stats sample for a flow.
Args:
flow_id: Flow identifier.
tx_packets: Cumulative transmitted packets.
tx_bytes: Cumulative transmitted bytes.
rx_packets: Cumulative received packets.
rx_bytes: Cumulative received bytes.
latency: Optional dict with min_ms, avg_ms, max_ms, jitter_ms.
"""
now = time.time()
with self._lock:
prev = self._prev.get(flow_id)
if prev is not None:
dt = now - prev['time']
if dt > 0:
d_tx_pkts = tx_packets - prev['tx_packets']
d_tx_bytes = tx_bytes - prev['tx_bytes']
d_rx_pkts = rx_packets - prev['rx_packets']
d_rx_bytes = rx_bytes - prev['rx_bytes']
tx_pps = d_tx_pkts / dt
rx_pps = d_rx_pkts / dt
tx_mbps = (d_tx_bytes * 8) / (dt * 1_000_000)
rx_mbps = (d_rx_bytes * 8) / (dt * 1_000_000)
else:
tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0
else:
tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0
# Loss calculation
loss_pct = 0.0
if tx_packets > 0 and rx_packets > 0:
loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100)
sample = {
'timestamp': now,
'tx_packets': tx_packets,
'tx_bytes': tx_bytes,
'rx_packets': rx_packets,
'rx_bytes': rx_bytes,
'tx_pps': round(tx_pps, 2),
'rx_pps': round(rx_pps, 2),
'tx_mbps': round(tx_mbps, 4),
'rx_mbps': round(rx_mbps, 4),
'loss_pct': round(loss_pct, 3),
}
if latency:
sample['latency'] = latency
self._history[flow_id].append(sample)
self._prev[flow_id] = {
'time': now,
'tx_packets': tx_packets,
'tx_bytes': tx_bytes,
'rx_packets': rx_packets,
'rx_bytes': rx_bytes,
}
def get_history(self, flow_id: str, count: int = 60) -> list:
"""Get the last N samples for a flow."""
with self._lock:
history = self._history.get(flow_id)
if not history:
return []
samples = list(history)
return samples[-count:]
def get_latest(self, flow_id: str) -> dict:
"""Get the most recent sample for a flow."""
with self._lock:
history = self._history.get(flow_id)
if not history:
return {}
return dict(history[-1])
def get_all_latest(self) -> dict:
"""Get the most recent sample for every flow."""
with self._lock:
result = {}
for flow_id, history in self._history.items():
if history:
result[flow_id] = dict(history[-1])
return result
def remove_flow(self, flow_id: str):
"""Remove all data for a flow."""
with self._lock:
self._history.pop(flow_id, None)
self._prev.pop(flow_id, None)