""" RFC 2544 test implementations: - ThroughputTest: binary search for max zero-loss throughput - LatencyTest: measure latency at a given rate - FrameLossTest: measure loss at decreasing rates - BackToBackTest: find max burst length with zero loss """ import logging import socket import struct import threading import time import urllib.request import json from typing import Dict, List, Optional from scapy.all import send, sr, conf, IP, ICMP from engine.packet_builder import build_packet, parse_payload, MAGIC log = logging.getLogger(__name__) conf.verb = 0 class _BaseTest: """Base class for RFC 2544 tests.""" def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int], trial_duration: float = 60, max_rate_pps: int = 10000, acceptable_loss_pct: float = 0.0, responder_url: str = None): self.test_id = test_id self.flow_config = dict(flow_config) self.frame_sizes = frame_sizes self.trial_duration = trial_duration self.max_rate_pps = max_rate_pps self.acceptable_loss_pct = acceptable_loss_pct self.responder_url = responder_url # e.g. "http://172.30.0.10:5053" self.state = 'idle' # idle -> running -> complete/error self.results = {} self.error = None self.started_at = None self.completed_at = None # Progress tracking self._progress_msg = '' self._current_frame_idx = 0 self._current_trial_tx = 0 self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._lock = threading.Lock() def start(self): if self.state == 'running': return self._stop_event.clear() self.state = 'running' self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) self.results = {} self.error = None self._thread = threading.Thread(target=self._run_safe, daemon=True, name=f'test-{self.test_id[:8]}') self._thread.start() def stop(self): self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=self.trial_duration + 5) if self.state == 'running': self.state = 'error' self.error = 'Cancelled by user' def _run_safe(self): try: self._run() if self.state == 'running': self.state = 'complete' except Exception as e: log.error('Test %s error: %s', self.test_id[:8], e) self.state = 'error' self.error = str(e) finally: self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) def _run(self): raise NotImplementedError def _is_stopped(self) -> bool: return self._stop_event.is_set() def _responder_reset(self): """Reset responder stats before a trial.""" if not self.responder_url: return try: req = urllib.request.Request( f'{self.responder_url}/responder/reset', method='POST', data=b'{}', headers={'Content-Type': 'application/json'}) urllib.request.urlopen(req, timeout=3) except Exception as e: log.warning('Responder reset failed: %s', e) def _responder_stats(self) -> Optional[dict]: """Query responder for rx stats after a trial.""" if not self.responder_url: return None try: req = urllib.request.Request(f'{self.responder_url}/responder/stats') resp = urllib.request.urlopen(req, timeout=5) return json.loads(resp.read()) except Exception as e: log.warning('Responder stats query failed: %s', e) return None def _send_trial(self, frame_size: int, rate_pps: int, duration: float): """Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies).""" flow = dict(self.flow_config) flow['frame_size'] = frame_size protocol = flow.get('protocol', 'udp').lower() # Reset responder counters before trial self._responder_reset() tx_count = 0 rx_count = 0 latencies = [] start = time.time() if protocol == 'icmp': # ICMP: use sr() to measure latency from responses seq = 0 while (time.time() - start) < duration and not self._is_stopped(): pkt = build_packet(flow, seq=seq) seq += 1 answered, _ = sr(pkt[IP], timeout=1, verbose=0) tx_count += 1 for sent_pkt, recv_pkt in answered: rx_count += 1 rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000 latencies.append(rtt_ms) elapsed = time.time() - start expected = elapsed * rate_pps if tx_count > expected: sleep_time = (tx_count - expected) / rate_pps if sleep_time > 0: self._stop_event.wait(min(sleep_time, 0.1)) else: # UDP/TCP: high-performance raw socket path dst_ip = flow['dst_ip'] pkt_template = build_packet(flow, seq=0) ip_template = bytes(pkt_template[pkt_template.firstlayer().payload.__class__]) magic_offset = ip_template.find(MAGIC) # Find and zero UDP checksum in template so receivers accept packets # IP header length from IHL field (byte 0, low nibble) * 4 ip_ihl = (ip_template[0] & 0x0F) * 4 ip_proto = ip_template[9] # protocol field udp_csum_offset = ip_ihl + 6 if ip_proto == 17 else -1 # 17 = UDP raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW) raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) batch_size = max(1, min(rate_pps // 5, 500)) interval = batch_size / rate_pps if rate_pps > 0 else 1.0 seq = 0 try: while (time.time() - start) < duration and not self._is_stopped(): batch_start = time.time() for _ in range(batch_size): pkt_bytes = bytearray(ip_template) if magic_offset >= 0: struct.pack_into('!I', pkt_bytes, magic_offset + 4, seq) struct.pack_into('!Q', pkt_bytes, magic_offset + 8, time.time_ns()) pkt_bytes[10:12] = b'\x00\x00' # zero IP checksum if udp_csum_offset > 0: pkt_bytes[udp_csum_offset:udp_csum_offset + 2] = b'\x00\x00' try: raw_sock.sendto(bytes(pkt_bytes), (dst_ip, 0)) tx_count += 1 except Exception: pass seq += 1 batch_elapsed = time.time() - batch_start sleep_time = interval - batch_elapsed if sleep_time > 0: self._stop_event.wait(sleep_time) finally: raw_sock.close() # Query responder for actual rx stats (UDP/TCP path) if protocol != 'icmp': resp_stats = self._responder_stats() if resp_stats and resp_stats.get('rx_packets', 0) > 0: rx_count = resp_stats['rx_packets'] lat = resp_stats.get('latency', {}) if lat.get('samples', 0) > 0: latencies = [lat['avg_ms']] # Use avg as representative return tx_count, rx_count, latencies def get_info(self) -> dict: # Reverse-lookup the slug for this test class type_slug = next((k for k, v in TEST_TYPES.items() if v is self.__class__), self.__class__.__name__) info = { 'id': self.test_id, 'test_id': self.test_id, 'type': type_slug, 'state': self.state, 'results': self.results, 'error': self.error, 'frame_sizes': self.frame_sizes, 'started_at': self.started_at, 'completed_at': self.completed_at, } if self.state == 'running': info['progress'] = { 'frame_idx': self._current_frame_idx, 'total_frames': len(self.frame_sizes), 'message': self._progress_msg, 'completed_sizes': list(self.results.keys()), } return info class ThroughputTest(_BaseTest): """Binary search for maximum throughput with acceptable loss.""" def _run(self): for idx, fs in enumerate(self.frame_sizes): if self._is_stopped(): break self._current_frame_idx = idx low = 0 high = self.max_rate_pps best_rate = 0 convergence_threshold = max(1, int(self.max_rate_pps * 0.01)) step = 0 log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high) while (high - low) > convergence_threshold and not self._is_stopped(): mid = (low + high) // 2 if mid == 0: break step += 1 self._progress_msg = f'Frame {fs}B: trial {step}, testing {mid} pps [{low}-{high}]' tx, rx, _ = self._send_trial(fs, mid, self.trial_duration) if tx == 0: loss_pct = 100.0 elif rx > 0: loss_pct = ((tx - rx) / tx) * 100 else: loss_pct = 0.0 # No responder/ICMP — assume success log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%', fs, mid, tx, rx, loss_pct) if loss_pct <= self.acceptable_loss_pct: best_rate = mid low = mid + 1 else: high = mid - 1 self.results[str(fs)] = { 'max_throughput_pps': best_rate, 'frame_size': fs, } log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate) class LatencyTest(_BaseTest): """Measure latency at a specified rate.""" def _run(self): rate = self.flow_config.get('rate_pps', 100) for idx, fs in enumerate(self.frame_sizes): if self._is_stopped(): break self._current_frame_idx = idx self._progress_msg = f'Frame {fs}B: sending at {rate} pps for {self.trial_duration}s' log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration) _, _, latencies = self._send_trial(fs, rate, self.trial_duration) if latencies: avg_ms = sum(latencies) / len(latencies) min_ms = min(latencies) max_ms = max(latencies) jitter_ms = ( sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies))) / max(1, len(latencies) - 1) ) if len(latencies) > 1 else 0.0 self.results[str(fs)] = { 'frame_size': fs, 'min_ms': round(min_ms, 3), 'avg_ms': round(avg_ms, 3), 'max_ms': round(max_ms, 3), 'jitter_ms': round(jitter_ms, 3), 'samples': len(latencies), } else: self.results[str(fs)] = { 'frame_size': fs, 'min_ms': None, 'avg_ms': None, 'max_ms': None, 'jitter_ms': None, 'samples': 0, 'note': 'No responses received (use ICMP or configure responder)', } log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)]) class FrameLossTest(_BaseTest): """Measure frame loss at decreasing rates (100%, 90%, 80%, ...).""" def _run(self): for idx, fs in enumerate(self.frame_sizes): if self._is_stopped(): break self._current_frame_idx = idx results_for_size = [] for pct in range(100, 0, -10): if self._is_stopped(): break rate = int(self.max_rate_pps * pct / 100) if rate == 0: continue self._progress_msg = f'Frame {fs}B: testing at {pct}% rate ({rate} pps)' log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct) tx, rx, _ = self._send_trial(fs, rate, self.trial_duration) if tx > 0 and rx > 0: loss_pct = ((tx - rx) / tx) * 100 elif tx > 0 and rx == 0: loss_pct = 0.0 # No responder — cannot measure else: loss_pct = 100.0 results_for_size.append({ 'rate_pct': pct, 'rate_pps': rate, 'tx_packets': tx, 'rx_packets': rx, 'loss_pct': round(loss_pct, 3), }) self.results[str(fs)] = results_for_size class BackToBackTest(_BaseTest): """Find maximum burst length with zero loss.""" def _run(self): for idx, fs in enumerate(self.frame_sizes): if self._is_stopped(): break low = 1 high = self.max_rate_pps # Use max_rate_pps as max burst length best_burst = 0 convergence = max(1, high // 100) self._current_frame_idx = idx log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high) while (high - low) > convergence and not self._is_stopped(): mid = (low + high) // 2 if mid == 0: break flow = dict(self.flow_config) flow['frame_size'] = fs protocol = flow.get('protocol', 'udp').lower() # Send burst of 'mid' packets as fast as possible tx_count = 0 rx_count = 0 for seq in range(mid): if self._is_stopped(): break pkt = build_packet(flow, seq=seq) if protocol == 'icmp': answered, _ = sr(pkt[IP], timeout=0.5, verbose=0) tx_count += 1 rx_count += len(answered) else: send(pkt[IP], verbose=0) tx_count += 1 if tx_count > 0 and rx_count > 0: loss_pct = ((tx_count - rx_count) / tx_count) * 100 elif tx_count > 0: loss_pct = 0.0 # No responder — cannot measure else: loss_pct = 100.0 log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct) if loss_pct <= self.acceptable_loss_pct: best_burst = mid low = mid + 1 else: high = mid - 1 self.results[str(fs)] = { 'frame_size': fs, 'max_burst_frames': best_burst, } log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst) # Factory function TEST_TYPES = { 'throughput': ThroughputTest, 'latency': LatencyTest, 'frame_loss': FrameLossTest, 'back_to_back': BackToBackTest, } def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs): """Create an RFC 2544 test instance by type name.""" cls = TEST_TYPES.get(test_type) if cls is None: raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}') return cls(test_id=test_id, flow_config=flow_config, **kwargs)