""" Responder - high-performance UDP packet receiver for TGEN traffic. Uses multiple receiver threads on SO_REUSEPORT UDP sockets for parallel packet processing. Each thread has its own socket and stats counters to avoid contention. """ import logging import os import socket import struct import threading import time from engine.packet_builder import MAGIC, HEADER_LEN log = logging.getLogger(__name__) DEFAULT_LISTEN_PORT = 5001 RECV_BUF_SIZE = 16 * 1024 * 1024 NUM_WORKERS = int(os.environ.get('RESPONDER_WORKERS', 4)) class _WorkerStats: """Per-worker stats — no sharing, no locks.""" __slots__ = ('rx_packets', 'rx_bytes', 'out_of_order', 'duplicates', 'last_seq', 'lat_buf', 'lat_idx', 'lat_count') def __init__(self): self.rx_packets = 0 self.rx_bytes = 0 self.out_of_order = 0 self.duplicates = 0 self.last_seq = -1 self.lat_buf = [0.0] * 4096 self.lat_idx = 0 self.lat_count = 0 class Responder: def __init__(self, mode: str = 'log', listen_port: int = DEFAULT_LISTEN_PORT): self._mode = mode self._listen_port = listen_port self._sockets = [] self._threads = [] self._workers = [] self._running = False self._stop_event = threading.Event() def start(self, interface: str = None): if self._running: return self._stop_event.clear() n = NUM_WORKERS for i in range(n): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE) except OSError: pass sock.settimeout(0.5) sock.bind(('0.0.0.0', self._listen_port)) self._sockets.append(sock) ws = _WorkerStats() self._workers.append(ws) t = threading.Thread(target=self._recv_loop, args=(sock, ws), daemon=True, name=f'responder-rx-{i}') self._threads.append(t) t.start() actual_buf = self._sockets[0].getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) self._running = True log.info('Responder started on port=%d mode=%s workers=%d rcvbuf=%d', self._listen_port, self._mode, n, actual_buf) def stop(self): self._stop_event.set() for t in self._threads: if t.is_alive(): t.join(timeout=3) for s in self._sockets: try: s.close() except Exception: pass self._sockets.clear() self._threads.clear() self._workers.clear() self._running = False log.info('Responder stopped') def is_running(self) -> bool: return self._running def get_stats(self) -> dict: rx_packets = 0 rx_bytes = 0 out_of_order = 0 duplicates = 0 all_lat = [] for ws in self._workers: rx_packets += ws.rx_packets rx_bytes += ws.rx_bytes out_of_order += ws.out_of_order duplicates += ws.duplicates n = min(ws.lat_count, len(ws.lat_buf)) if n > 0: all_lat.extend(ws.lat_buf[:n] if ws.lat_count <= len(ws.lat_buf) else ws.lat_buf[:]) latency = {} if all_lat: avg = sum(all_lat) / len(all_lat) mn = min(all_lat) mx = max(all_lat) jitter = 0.0 if len(all_lat) > 1: jitter = sum(abs(all_lat[i] - all_lat[i-1]) for i in range(1, len(all_lat))) / (len(all_lat) - 1) latency = { 'min_ms': round(mn, 3), 'max_ms': round(mx, 3), 'avg_ms': round(avg, 3), 'jitter_ms': round(jitter, 3), 'samples': len(all_lat), } return { 'rx_packets': rx_packets, 'rx_bytes': rx_bytes, 'out_of_order': out_of_order, 'duplicates': duplicates, 'latency': latency, 'running': self._running, } def reset_stats(self): for ws in self._workers: ws.rx_packets = 0 ws.rx_bytes = 0 ws.last_seq = -1 ws.out_of_order = 0 ws.duplicates = 0 ws.lat_idx = 0 ws.lat_count = 0 log.info('Responder stats reset') def _recv_loop(self, sock, ws): """Per-worker receive loop.""" echo = self._mode == 'echo' recvfrom = sock.recvfrom time_ns = time.time_ns stop_is_set = self._stop_event.is_set lat_buf = ws.lat_buf lat_buf_len = len(lat_buf) magic = MAGIC while not stop_is_set(): try: data, addr = recvfrom(65535) except socket.timeout: continue except OSError: if stop_is_set(): break raise rx_ns = time_ns() dlen = len(data) if dlen < HEADER_LEN or data[:4] != magic: continue seq = int.from_bytes(data[4:8], 'big') sender_ns = int.from_bytes(data[8:16], 'big') ws.rx_packets += 1 ws.rx_bytes += dlen last = ws.last_seq if seq == last: ws.duplicates += 1 elif seq < last: ws.out_of_order += 1 ws.last_seq = seq lat_ms = (rx_ns - sender_ns) / 1_000_000 if 0 < lat_ms < 60000: idx = ws.lat_idx lat_buf[idx] = lat_ms ws.lat_idx = (idx + 1) % lat_buf_len ws.lat_count += 1 if echo: try: sock.sendto(data + struct.pack('!Q', rx_ns), addr) except Exception: pass