""" Responder - listens for TGEN-tagged packets and collects receive statistics. Two sub-modes: - echo: swaps src/dst MAC and IP, sends packet back with receive timestamp - log: records rx stats only, exposed via API """ import logging import struct import threading import time from scapy.all import ( AsyncSniffer, Ether, IP, Raw, send, conf, ) from engine.packet_builder import MAGIC, HEADER_LEN, parse_payload log = logging.getLogger(__name__) conf.verb = 0 class Responder: """Listens for TGEN packets on an interface and collects stats.""" def __init__(self, mode: str = 'log'): """ Args: mode: 'echo' to reflect packets back, 'log' to only record stats. """ self._mode = mode self._lock = threading.Lock() self._sniffer = None self._running = False # Stats self._rx_packets = 0 self._rx_bytes = 0 self._latency_samples = [] # list of (latency_ms,) self._seen_seqs = set() self._last_seq = -1 self._out_of_order = 0 self._duplicates = 0 # ------------------------------------------------------------------ # Control # ------------------------------------------------------------------ def start(self, interface: str = None): """Start sniffing for TGEN packets.""" if self._running: log.warning('Responder already running') return bpf_filter = 'ip' # broad filter; we check magic in callback kwargs = { 'prn': self._handle_packet, 'store': False, 'filter': bpf_filter, } if interface: kwargs['iface'] = interface self._sniffer = AsyncSniffer(**kwargs) self._sniffer.start() self._running = True log.info('Responder started on interface=%s mode=%s', interface or 'all', self._mode) def stop(self): """Stop sniffing.""" if self._sniffer and self._running: self._sniffer.stop() self._running = False log.info('Responder stopped') def is_running(self) -> bool: return self._running # ------------------------------------------------------------------ # Stats # ------------------------------------------------------------------ def get_stats(self) -> dict: with self._lock: latency = {} if self._latency_samples: vals = self._latency_samples latency = { 'min_ms': round(min(vals), 3), 'max_ms': round(max(vals), 3), 'avg_ms': round(sum(vals) / len(vals), 3), 'jitter_ms': round( sum(abs(vals[i] - vals[i - 1]) for i in range(1, len(vals))) / max(1, len(vals) - 1), 3 ) if len(vals) > 1 else 0.0, 'samples': len(vals), } return { 'rx_packets': self._rx_packets, 'rx_bytes': self._rx_bytes, 'out_of_order': self._out_of_order, 'duplicates': self._duplicates, 'latency': latency, 'running': self._running, } def reset_stats(self): with self._lock: self._rx_packets = 0 self._rx_bytes = 0 self._latency_samples = [] self._seen_seqs.clear() self._last_seq = -1 self._out_of_order = 0 self._duplicates = 0 log.info('Responder stats reset') # ------------------------------------------------------------------ # Packet handling # ------------------------------------------------------------------ def _handle_packet(self, pkt): """Process a received packet, checking for TGEN magic bytes.""" if not pkt.haslayer(Raw): return payload = bytes(pkt[Raw].load) parsed = parse_payload(payload) if parsed is None: return seq, sender_ts_ns = parsed rx_time_ns = time.time_ns() pkt_len = len(bytes(pkt)) # Compute one-way latency (only meaningful if clocks are synced) latency_ms = (rx_time_ns - sender_ts_ns) / 1_000_000 with self._lock: self._rx_packets += 1 self._rx_bytes += pkt_len # Duplicate detection if seq in self._seen_seqs: self._duplicates += 1 else: self._seen_seqs.add(seq) # Keep set bounded if len(self._seen_seqs) > 100000: # Remove oldest entries (approximate) to_remove = sorted(self._seen_seqs)[:50000] self._seen_seqs -= set(to_remove) # Out-of-order detection if seq < self._last_seq and seq not in self._seen_seqs: self._out_of_order += 1 self._last_seq = seq # Record latency (only if plausible: 0 < latency < 60s) if 0 < latency_ms < 60000: self._latency_samples.append(latency_ms) if len(self._latency_samples) > 10000: self._latency_samples = self._latency_samples[-5000:] # Echo mode: swap and send back if self._mode == 'echo' and pkt.haslayer(Ether) and pkt.haslayer(IP): try: echo_pkt = pkt.copy() # Swap MACs echo_pkt[Ether].src, echo_pkt[Ether].dst = pkt[Ether].dst, pkt[Ether].src # Swap IPs echo_pkt[IP].src, echo_pkt[IP].dst = pkt[IP].dst, pkt[IP].src # Append receive timestamp to payload rx_ts_bytes = struct.pack('!Q', rx_time_ns) echo_pkt[Raw].load = payload + rx_ts_bytes # Clear checksums so Scapy recalculates del echo_pkt[IP].chksum if echo_pkt.haslayer('UDP'): del echo_pkt['UDP'].chksum elif echo_pkt.haslayer('TCP'): del echo_pkt['TCP'].chksum send(echo_pkt[IP], verbose=0) except Exception as e: log.debug('Echo send error: %s', e)