Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
"""
|
2026-05-15 14:22:41 -07:00
|
|
|
Responder - high-performance UDP packet receiver for TGEN traffic.
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
Uses multiple receiver threads on SO_REUSEPORT UDP sockets for parallel
|
|
|
|
|
packet processing. Each thread has its own socket and stats counters
|
|
|
|
|
to avoid contention.
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
2026-05-15 14:22:41 -07:00
|
|
|
import os
|
|
|
|
|
import socket
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
import struct
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
from engine.packet_builder import MAGIC, HEADER_LEN
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
DEFAULT_LISTEN_PORT = 5001
|
|
|
|
|
RECV_BUF_SIZE = 16 * 1024 * 1024
|
|
|
|
|
NUM_WORKERS = int(os.environ.get('RESPONDER_WORKERS', 4))
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
class _WorkerStats:
|
|
|
|
|
"""Per-worker stats — no sharing, no locks."""
|
|
|
|
|
__slots__ = ('rx_packets', 'rx_bytes', 'out_of_order', 'duplicates',
|
|
|
|
|
'last_seq', 'lat_buf', 'lat_idx', 'lat_count')
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.rx_packets = 0
|
|
|
|
|
self.rx_bytes = 0
|
|
|
|
|
self.out_of_order = 0
|
|
|
|
|
self.duplicates = 0
|
|
|
|
|
self.last_seq = -1
|
|
|
|
|
self.lat_buf = [0.0] * 4096
|
|
|
|
|
self.lat_idx = 0
|
|
|
|
|
self.lat_count = 0
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
class Responder:
|
|
|
|
|
def __init__(self, mode: str = 'log', listen_port: int = DEFAULT_LISTEN_PORT):
|
|
|
|
|
self._mode = mode
|
|
|
|
|
self._listen_port = listen_port
|
|
|
|
|
self._sockets = []
|
|
|
|
|
self._threads = []
|
|
|
|
|
self._workers = []
|
|
|
|
|
self._running = False
|
|
|
|
|
self._stop_event = threading.Event()
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
def start(self, interface: str = None):
|
|
|
|
|
if self._running:
|
|
|
|
|
return
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
self._stop_event.clear()
|
|
|
|
|
n = NUM_WORKERS
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
for i in range(n):
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
|
|
|
try:
|
|
|
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE)
|
|
|
|
|
except OSError:
|
|
|
|
|
pass
|
|
|
|
|
sock.settimeout(0.5)
|
|
|
|
|
sock.bind(('0.0.0.0', self._listen_port))
|
|
|
|
|
self._sockets.append(sock)
|
|
|
|
|
|
|
|
|
|
ws = _WorkerStats()
|
|
|
|
|
self._workers.append(ws)
|
|
|
|
|
|
|
|
|
|
t = threading.Thread(target=self._recv_loop, args=(sock, ws),
|
|
|
|
|
daemon=True, name=f'responder-rx-{i}')
|
|
|
|
|
self._threads.append(t)
|
|
|
|
|
t.start()
|
|
|
|
|
|
|
|
|
|
actual_buf = self._sockets[0].getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
self._running = True
|
2026-05-15 14:22:41 -07:00
|
|
|
log.info('Responder started on port=%d mode=%s workers=%d rcvbuf=%d',
|
|
|
|
|
self._listen_port, self._mode, n, actual_buf)
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
def stop(self):
|
2026-05-15 14:22:41 -07:00
|
|
|
self._stop_event.set()
|
|
|
|
|
for t in self._threads:
|
|
|
|
|
if t.is_alive():
|
|
|
|
|
t.join(timeout=3)
|
|
|
|
|
for s in self._sockets:
|
|
|
|
|
try:
|
|
|
|
|
s.close()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
self._sockets.clear()
|
|
|
|
|
self._threads.clear()
|
|
|
|
|
self._workers.clear()
|
|
|
|
|
self._running = False
|
|
|
|
|
log.info('Responder stopped')
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
|
|
|
|
|
def is_running(self) -> bool:
|
|
|
|
|
return self._running
|
|
|
|
|
|
|
|
|
|
def get_stats(self) -> dict:
|
2026-05-15 14:22:41 -07:00
|
|
|
rx_packets = 0
|
|
|
|
|
rx_bytes = 0
|
|
|
|
|
out_of_order = 0
|
|
|
|
|
duplicates = 0
|
|
|
|
|
all_lat = []
|
|
|
|
|
|
|
|
|
|
for ws in self._workers:
|
|
|
|
|
rx_packets += ws.rx_packets
|
|
|
|
|
rx_bytes += ws.rx_bytes
|
|
|
|
|
out_of_order += ws.out_of_order
|
|
|
|
|
duplicates += ws.duplicates
|
|
|
|
|
n = min(ws.lat_count, len(ws.lat_buf))
|
|
|
|
|
if n > 0:
|
|
|
|
|
all_lat.extend(ws.lat_buf[:n] if ws.lat_count <= len(ws.lat_buf) else ws.lat_buf[:])
|
|
|
|
|
|
|
|
|
|
latency = {}
|
|
|
|
|
if all_lat:
|
|
|
|
|
avg = sum(all_lat) / len(all_lat)
|
|
|
|
|
mn = min(all_lat)
|
|
|
|
|
mx = max(all_lat)
|
|
|
|
|
jitter = 0.0
|
|
|
|
|
if len(all_lat) > 1:
|
|
|
|
|
jitter = sum(abs(all_lat[i] - all_lat[i-1]) for i in range(1, len(all_lat))) / (len(all_lat) - 1)
|
|
|
|
|
latency = {
|
|
|
|
|
'min_ms': round(mn, 3),
|
|
|
|
|
'max_ms': round(mx, 3),
|
|
|
|
|
'avg_ms': round(avg, 3),
|
|
|
|
|
'jitter_ms': round(jitter, 3),
|
|
|
|
|
'samples': len(all_lat),
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
}
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
return {
|
|
|
|
|
'rx_packets': rx_packets,
|
|
|
|
|
'rx_bytes': rx_bytes,
|
|
|
|
|
'out_of_order': out_of_order,
|
|
|
|
|
'duplicates': duplicates,
|
|
|
|
|
'latency': latency,
|
|
|
|
|
'running': self._running,
|
|
|
|
|
}
|
|
|
|
|
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
def reset_stats(self):
|
2026-05-15 14:22:41 -07:00
|
|
|
for ws in self._workers:
|
|
|
|
|
ws.rx_packets = 0
|
|
|
|
|
ws.rx_bytes = 0
|
|
|
|
|
ws.last_seq = -1
|
|
|
|
|
ws.out_of_order = 0
|
|
|
|
|
ws.duplicates = 0
|
|
|
|
|
ws.lat_idx = 0
|
|
|
|
|
ws.lat_count = 0
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
log.info('Responder stats reset')
|
|
|
|
|
|
2026-05-15 14:22:41 -07:00
|
|
|
def _recv_loop(self, sock, ws):
|
|
|
|
|
"""Per-worker receive loop."""
|
|
|
|
|
echo = self._mode == 'echo'
|
|
|
|
|
recvfrom = sock.recvfrom
|
|
|
|
|
time_ns = time.time_ns
|
|
|
|
|
stop_is_set = self._stop_event.is_set
|
|
|
|
|
lat_buf = ws.lat_buf
|
|
|
|
|
lat_buf_len = len(lat_buf)
|
|
|
|
|
magic = MAGIC
|
|
|
|
|
|
|
|
|
|
while not stop_is_set():
|
Add Phase 4: gNMI streaming telemetry and traffic generator
- gNMI integration: NETCONF script to enable gRPC on all 9 routers,
Telegraf container with gnmi input plugin, InfluxDB for time-series
storage, 3 Grafana telemetry dashboards (utilization, errors, combined)
- Traffic generator: Scapy-based dual-mode container (sender/responder)
with Flask API, RFC 2544 test suite (throughput, latency, frame-loss,
back-to-back), Vue 3 web UI with flow builder, test runner, real-time
stats monitor, and results export
- docker-compose.yml updated with influxdb, telegraf, traffic-gen,
traffic-gen-ui services
- Full documentation in DOCS.md sections 15-16
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 15:29:44 -07:00
|
|
|
try:
|
2026-05-15 14:22:41 -07:00
|
|
|
data, addr = recvfrom(65535)
|
|
|
|
|
except socket.timeout:
|
|
|
|
|
continue
|
|
|
|
|
except OSError:
|
|
|
|
|
if stop_is_set():
|
|
|
|
|
break
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
rx_ns = time_ns()
|
|
|
|
|
dlen = len(data)
|
|
|
|
|
|
|
|
|
|
if dlen < HEADER_LEN or data[:4] != magic:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
seq = int.from_bytes(data[4:8], 'big')
|
|
|
|
|
sender_ns = int.from_bytes(data[8:16], 'big')
|
|
|
|
|
|
|
|
|
|
ws.rx_packets += 1
|
|
|
|
|
ws.rx_bytes += dlen
|
|
|
|
|
|
|
|
|
|
last = ws.last_seq
|
|
|
|
|
if seq == last:
|
|
|
|
|
ws.duplicates += 1
|
|
|
|
|
elif seq < last:
|
|
|
|
|
ws.out_of_order += 1
|
|
|
|
|
ws.last_seq = seq
|
|
|
|
|
|
|
|
|
|
lat_ms = (rx_ns - sender_ns) / 1_000_000
|
|
|
|
|
if 0 < lat_ms < 60000:
|
|
|
|
|
idx = ws.lat_idx
|
|
|
|
|
lat_buf[idx] = lat_ms
|
|
|
|
|
ws.lat_idx = (idx + 1) % lat_buf_len
|
|
|
|
|
ws.lat_count += 1
|
|
|
|
|
|
|
|
|
|
if echo:
|
|
|
|
|
try:
|
|
|
|
|
sock.sendto(data + struct.pack('!Q', rx_ns), addr)
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|