205 lines
6.0 KiB
Python
Raw Normal View History

"""
Responder - high-performance UDP packet receiver for TGEN traffic.
Uses multiple receiver threads on SO_REUSEPORT UDP sockets for parallel
packet processing. Each thread has its own socket and stats counters
to avoid contention.
"""
import logging
import os
import socket
import struct
import threading
import time
from engine.packet_builder import MAGIC, HEADER_LEN
log = logging.getLogger(__name__)
DEFAULT_LISTEN_PORT = 5001
RECV_BUF_SIZE = 16 * 1024 * 1024
NUM_WORKERS = int(os.environ.get('RESPONDER_WORKERS', 4))
class _WorkerStats:
"""Per-worker stats — no sharing, no locks."""
__slots__ = ('rx_packets', 'rx_bytes', 'out_of_order', 'duplicates',
'last_seq', 'lat_buf', 'lat_idx', 'lat_count')
def __init__(self):
self.rx_packets = 0
self.rx_bytes = 0
self.out_of_order = 0
self.duplicates = 0
self.last_seq = -1
self.lat_buf = [0.0] * 4096
self.lat_idx = 0
self.lat_count = 0
class Responder:
def __init__(self, mode: str = 'log', listen_port: int = DEFAULT_LISTEN_PORT):
self._mode = mode
self._listen_port = listen_port
self._sockets = []
self._threads = []
self._workers = []
self._running = False
self._stop_event = threading.Event()
def start(self, interface: str = None):
if self._running:
return
self._stop_event.clear()
n = NUM_WORKERS
for i in range(n):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE)
except OSError:
pass
sock.settimeout(0.5)
sock.bind(('0.0.0.0', self._listen_port))
self._sockets.append(sock)
ws = _WorkerStats()
self._workers.append(ws)
t = threading.Thread(target=self._recv_loop, args=(sock, ws),
daemon=True, name=f'responder-rx-{i}')
self._threads.append(t)
t.start()
actual_buf = self._sockets[0].getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
self._running = True
log.info('Responder started on port=%d mode=%s workers=%d rcvbuf=%d',
self._listen_port, self._mode, n, actual_buf)
def stop(self):
self._stop_event.set()
for t in self._threads:
if t.is_alive():
t.join(timeout=3)
for s in self._sockets:
try:
s.close()
except Exception:
pass
self._sockets.clear()
self._threads.clear()
self._workers.clear()
self._running = False
log.info('Responder stopped')
def is_running(self) -> bool:
return self._running
def get_stats(self) -> dict:
rx_packets = 0
rx_bytes = 0
out_of_order = 0
duplicates = 0
all_lat = []
for ws in self._workers:
rx_packets += ws.rx_packets
rx_bytes += ws.rx_bytes
out_of_order += ws.out_of_order
duplicates += ws.duplicates
n = min(ws.lat_count, len(ws.lat_buf))
if n > 0:
all_lat.extend(ws.lat_buf[:n] if ws.lat_count <= len(ws.lat_buf) else ws.lat_buf[:])
latency = {}
if all_lat:
avg = sum(all_lat) / len(all_lat)
mn = min(all_lat)
mx = max(all_lat)
jitter = 0.0
if len(all_lat) > 1:
jitter = sum(abs(all_lat[i] - all_lat[i-1]) for i in range(1, len(all_lat))) / (len(all_lat) - 1)
latency = {
'min_ms': round(mn, 3),
'max_ms': round(mx, 3),
'avg_ms': round(avg, 3),
'jitter_ms': round(jitter, 3),
'samples': len(all_lat),
}
return {
'rx_packets': rx_packets,
'rx_bytes': rx_bytes,
'out_of_order': out_of_order,
'duplicates': duplicates,
'latency': latency,
'running': self._running,
}
def reset_stats(self):
for ws in self._workers:
ws.rx_packets = 0
ws.rx_bytes = 0
ws.last_seq = -1
ws.out_of_order = 0
ws.duplicates = 0
ws.lat_idx = 0
ws.lat_count = 0
log.info('Responder stats reset')
def _recv_loop(self, sock, ws):
"""Per-worker receive loop."""
echo = self._mode == 'echo'
recvfrom = sock.recvfrom
time_ns = time.time_ns
stop_is_set = self._stop_event.is_set
lat_buf = ws.lat_buf
lat_buf_len = len(lat_buf)
magic = MAGIC
while not stop_is_set():
try:
data, addr = recvfrom(65535)
except socket.timeout:
continue
except OSError:
if stop_is_set():
break
raise
rx_ns = time_ns()
dlen = len(data)
if dlen < HEADER_LEN or data[:4] != magic:
continue
seq = int.from_bytes(data[4:8], 'big')
sender_ns = int.from_bytes(data[8:16], 'big')
ws.rx_packets += 1
ws.rx_bytes += dlen
last = ws.last_seq
if seq == last:
ws.duplicates += 1
elif seq < last:
ws.out_of_order += 1
ws.last_seq = seq
lat_ms = (rx_ns - sender_ns) / 1_000_000
if 0 < lat_ms < 60000:
idx = ws.lat_idx
lat_buf[idx] = lat_ms
ws.lat_idx = (idx + 1) % lat_buf_len
ws.lat_count += 1
if echo:
try:
sock.sendto(data + struct.pack('!Q', rx_ns), addr)
except Exception:
pass