Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
"""
|
|
|
|
|
StatsCollector - ring buffer of per-flow traffic statistics.
|
|
|
|
|
|
|
|
|
|
Stores the last 300 samples (5 minutes at 1-second intervals) per flow,
|
|
|
|
|
with derived rates and loss calculations.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from collections import defaultdict, deque
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StatsCollector:
|
|
|
|
|
"""Collects and stores per-flow traffic statistics in a ring buffer."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, max_samples: int = 300):
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
self._max_samples = max_samples
|
|
|
|
|
# flow_id -> deque of sample dicts
|
|
|
|
|
self._history = defaultdict(lambda: deque(maxlen=self._max_samples))
|
|
|
|
|
# flow_id -> previous counters for rate calculation
|
|
|
|
|
self._prev = {}
|
|
|
|
|
|
|
|
|
|
def record(self, flow_id: str, tx_packets: int, tx_bytes: int,
|
|
|
|
|
rx_packets: int = 0, rx_bytes: int = 0,
|
|
|
|
|
latency: dict = None):
|
|
|
|
|
"""Record a stats sample for a flow.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
flow_id: Flow identifier.
|
|
|
|
|
tx_packets: Cumulative transmitted packets.
|
|
|
|
|
tx_bytes: Cumulative transmitted bytes.
|
|
|
|
|
rx_packets: Cumulative received packets.
|
|
|
|
|
rx_bytes: Cumulative received bytes.
|
|
|
|
|
latency: Optional dict with min_ms, avg_ms, max_ms, jitter_ms.
|
|
|
|
|
"""
|
|
|
|
|
now = time.time()
|
|
|
|
|
|
|
|
|
|
with self._lock:
|
|
|
|
|
prev = self._prev.get(flow_id)
|
|
|
|
|
|
|
|
|
|
if prev is not None:
|
|
|
|
|
dt = now - prev['time']
|
|
|
|
|
if dt > 0:
|
|
|
|
|
d_tx_pkts = tx_packets - prev['tx_packets']
|
|
|
|
|
d_tx_bytes = tx_bytes - prev['tx_bytes']
|
|
|
|
|
d_rx_pkts = rx_packets - prev['rx_packets']
|
|
|
|
|
d_rx_bytes = rx_bytes - prev['rx_bytes']
|
|
|
|
|
|
|
|
|
|
tx_pps = d_tx_pkts / dt
|
|
|
|
|
rx_pps = d_rx_pkts / dt
|
|
|
|
|
tx_mbps = (d_tx_bytes * 8) / (dt * 1_000_000)
|
|
|
|
|
rx_mbps = (d_rx_bytes * 8) / (dt * 1_000_000)
|
|
|
|
|
else:
|
|
|
|
|
tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0
|
|
|
|
|
else:
|
|
|
|
|
tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
# Loss calculation: use rate-based when actively sending (avoids
|
|
|
|
|
# poll-lag artifacts), cumulative when flow has stopped
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
loss_pct = 0.0
|
2026-05-15 14:22:41 -07:00
|
|
|
if prev is not None and tx_pps > 0 and rx_pps > 0:
|
|
|
|
|
# Rate-based: compare instantaneous tx/rx rates
|
|
|
|
|
loss_pct = max(0.0, ((tx_pps - rx_pps) / tx_pps) * 100)
|
|
|
|
|
elif tx_packets > 0 and rx_packets > 0 and tx_pps == 0:
|
|
|
|
|
# Flow stopped: use cumulative counters (final accurate value)
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100)
|
|
|
|
|
|
|
|
|
|
sample = {
|
|
|
|
|
'timestamp': now,
|
|
|
|
|
'tx_packets': tx_packets,
|
|
|
|
|
'tx_bytes': tx_bytes,
|
|
|
|
|
'rx_packets': rx_packets,
|
|
|
|
|
'rx_bytes': rx_bytes,
|
|
|
|
|
'tx_pps': round(tx_pps, 2),
|
|
|
|
|
'rx_pps': round(rx_pps, 2),
|
|
|
|
|
'tx_mbps': round(tx_mbps, 4),
|
|
|
|
|
'rx_mbps': round(rx_mbps, 4),
|
|
|
|
|
'loss_pct': round(loss_pct, 3),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if latency:
|
|
|
|
|
sample['latency'] = latency
|
|
|
|
|
|
|
|
|
|
self._history[flow_id].append(sample)
|
|
|
|
|
self._prev[flow_id] = {
|
|
|
|
|
'time': now,
|
|
|
|
|
'tx_packets': tx_packets,
|
|
|
|
|
'tx_bytes': tx_bytes,
|
|
|
|
|
'rx_packets': rx_packets,
|
|
|
|
|
'rx_bytes': rx_bytes,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def get_history(self, flow_id: str, count: int = 60) -> list:
|
|
|
|
|
"""Get the last N samples for a flow."""
|
|
|
|
|
with self._lock:
|
|
|
|
|
history = self._history.get(flow_id)
|
|
|
|
|
if not history:
|
|
|
|
|
return []
|
|
|
|
|
samples = list(history)
|
|
|
|
|
return samples[-count:]
|
|
|
|
|
|
|
|
|
|
def get_latest(self, flow_id: str) -> dict:
|
|
|
|
|
"""Get the most recent sample for a flow."""
|
|
|
|
|
with self._lock:
|
|
|
|
|
history = self._history.get(flow_id)
|
|
|
|
|
if not history:
|
|
|
|
|
return {}
|
|
|
|
|
return dict(history[-1])
|
|
|
|
|
|
|
|
|
|
def get_all_latest(self) -> dict:
|
|
|
|
|
"""Get the most recent sample for every flow."""
|
|
|
|
|
with self._lock:
|
|
|
|
|
result = {}
|
|
|
|
|
for flow_id, history in self._history.items():
|
|
|
|
|
if history:
|
|
|
|
|
result[flow_id] = dict(history[-1])
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
def remove_flow(self, flow_id: str):
|
|
|
|
|
"""Remove all data for a flow."""
|
|
|
|
|
with self._lock:
|
|
|
|
|
self._history.pop(flow_id, None)
|
|
|
|
|
self._prev.pop(flow_id, None)
|