""" RFC 2544 test implementations: - ThroughputTest: binary search for max zero-loss throughput - LatencyTest: measure latency at a given rate - FrameLossTest: measure loss at decreasing rates - BackToBackTest: find max burst length with zero loss """ import logging import threading import time from typing import Dict, List, Optional from scapy.all import send, sr, conf, IP, ICMP from engine.packet_builder import build_packet, parse_payload log = logging.getLogger(__name__) conf.verb = 0 class _BaseTest: """Base class for RFC 2544 tests.""" def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int], trial_duration: float = 60, max_rate_pps: int = 10000, acceptable_loss_pct: float = 0.0): self.test_id = test_id self.flow_config = dict(flow_config) self.frame_sizes = frame_sizes self.trial_duration = trial_duration self.max_rate_pps = max_rate_pps self.acceptable_loss_pct = acceptable_loss_pct self.state = 'idle' # idle -> running -> complete/error self.results = {} self.error = None self.started_at = None self.completed_at = None self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._lock = threading.Lock() def start(self): if self.state == 'running': return self._stop_event.clear() self.state = 'running' self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) self.results = {} self.error = None self._thread = threading.Thread(target=self._run_safe, daemon=True, name=f'test-{self.test_id[:8]}') self._thread.start() def stop(self): self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=self.trial_duration + 5) if self.state == 'running': self.state = 'error' self.error = 'Cancelled by user' def _run_safe(self): try: self._run() if self.state == 'running': self.state = 'complete' except Exception as e: log.error('Test %s error: %s', self.test_id[:8], e) self.state = 'error' self.error = str(e) finally: self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) def _run(self): raise NotImplementedError def _is_stopped(self) -> bool: return self._stop_event.is_set() def _send_trial(self, frame_size: int, rate_pps: int, duration: float): """Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies).""" flow = dict(self.flow_config) flow['frame_size'] = frame_size interval = 1.0 / rate_pps if rate_pps > 0 else 1.0 tx_count = 0 rx_count = 0 latencies = [] protocol = flow.get('protocol', 'udp').lower() start = time.time() seq = 0 while (time.time() - start) < duration and not self._is_stopped(): pkt = build_packet(flow, seq=seq) seq += 1 if protocol == 'icmp': # Use sr() for ICMP to get responses answered, _ = sr(pkt[IP], timeout=1, verbose=0) tx_count += 1 for sent_pkt, recv_pkt in answered: rx_count += 1 rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000 latencies.append(rtt_ms) else: send(pkt[IP], verbose=0) tx_count += 1 # Rate limiting elapsed = time.time() - start expected_sent = elapsed * rate_pps if tx_count > expected_sent: sleep_time = (tx_count - expected_sent) / rate_pps if sleep_time > 0: self._stop_event.wait(min(sleep_time, 0.1)) # For non-ICMP, we can't easily measure rx without a responder. # rx_count stays 0 for UDP/TCP unless a responder is configured. return tx_count, rx_count, latencies def get_info(self) -> dict: return { 'test_id': self.test_id, 'type': self.__class__.__name__, 'state': self.state, 'results': self.results, 'error': self.error, 'started_at': self.started_at, 'completed_at': self.completed_at, } class ThroughputTest(_BaseTest): """Binary search for maximum throughput with acceptable loss.""" def _run(self): for fs in self.frame_sizes: if self._is_stopped(): break low = 0 high = self.max_rate_pps best_rate = 0 convergence_threshold = max(1, int(self.max_rate_pps * 0.01)) log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high) while (high - low) > convergence_threshold and not self._is_stopped(): mid = (low + high) // 2 if mid == 0: break tx, rx, _ = self._send_trial(fs, mid, self.trial_duration) if tx == 0: loss_pct = 100.0 else: # For ICMP we have rx; for UDP assume zero loss if no responder protocol = self.flow_config.get('protocol', 'udp').lower() if protocol == 'icmp': loss_pct = ((tx - rx) / tx) * 100 else: # Without responder, assume success (user should use responder for accurate test) loss_pct = 0.0 log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%', fs, mid, tx, rx, loss_pct) if loss_pct <= self.acceptable_loss_pct: best_rate = mid low = mid + 1 else: high = mid - 1 self.results[str(fs)] = { 'max_throughput_pps': best_rate, 'frame_size': fs, } log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate) class LatencyTest(_BaseTest): """Measure latency at a specified rate.""" def _run(self): rate = self.flow_config.get('rate_pps', 100) for fs in self.frame_sizes: if self._is_stopped(): break log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration) _, _, latencies = self._send_trial(fs, rate, self.trial_duration) if latencies: avg_ms = sum(latencies) / len(latencies) min_ms = min(latencies) max_ms = max(latencies) jitter_ms = ( sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies))) / max(1, len(latencies) - 1) ) if len(latencies) > 1 else 0.0 self.results[str(fs)] = { 'frame_size': fs, 'min_ms': round(min_ms, 3), 'avg_ms': round(avg_ms, 3), 'max_ms': round(max_ms, 3), 'jitter_ms': round(jitter_ms, 3), 'samples': len(latencies), } else: self.results[str(fs)] = { 'frame_size': fs, 'min_ms': None, 'avg_ms': None, 'max_ms': None, 'jitter_ms': None, 'samples': 0, 'note': 'No responses received (use ICMP or configure responder)', } log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)]) class FrameLossTest(_BaseTest): """Measure frame loss at decreasing rates (100%, 90%, 80%, ...).""" def _run(self): for fs in self.frame_sizes: if self._is_stopped(): break results_for_size = [] for pct in range(100, 0, -10): if self._is_stopped(): break rate = int(self.max_rate_pps * pct / 100) if rate == 0: continue log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct) tx, rx, _ = self._send_trial(fs, rate, self.trial_duration) protocol = self.flow_config.get('protocol', 'udp').lower() if tx > 0 and protocol == 'icmp': loss_pct = ((tx - rx) / tx) * 100 else: loss_pct = 0.0 # Cannot measure without responder for non-ICMP results_for_size.append({ 'rate_pct': pct, 'rate_pps': rate, 'tx_packets': tx, 'rx_packets': rx, 'loss_pct': round(loss_pct, 3), }) self.results[str(fs)] = results_for_size class BackToBackTest(_BaseTest): """Find maximum burst length with zero loss.""" def _run(self): for fs in self.frame_sizes: if self._is_stopped(): break low = 1 high = self.max_rate_pps # Use max_rate_pps as max burst length best_burst = 0 convergence = max(1, high // 100) log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high) while (high - low) > convergence and not self._is_stopped(): mid = (low + high) // 2 if mid == 0: break flow = dict(self.flow_config) flow['frame_size'] = fs protocol = flow.get('protocol', 'udp').lower() # Send burst of 'mid' packets as fast as possible tx_count = 0 rx_count = 0 for seq in range(mid): if self._is_stopped(): break pkt = build_packet(flow, seq=seq) if protocol == 'icmp': answered, _ = sr(pkt[IP], timeout=0.5, verbose=0) tx_count += 1 rx_count += len(answered) else: send(pkt[IP], verbose=0) tx_count += 1 if protocol == 'icmp' and tx_count > 0: loss_pct = ((tx_count - rx_count) / tx_count) * 100 else: loss_pct = 0.0 # Can't measure without responder log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct) if loss_pct <= self.acceptable_loss_pct: best_burst = mid low = mid + 1 else: high = mid - 1 self.results[str(fs)] = { 'frame_size': fs, 'max_burst_frames': best_burst, } log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst) # Factory function TEST_TYPES = { 'throughput': ThroughputTest, 'latency': LatencyTest, 'frame_loss': FrameLossTest, 'back_to_back': BackToBackTest, } def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs): """Create an RFC 2544 test instance by type name.""" cls = TEST_TYPES.get(test_type) if cls is None: raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}') return cls(test_id=test_id, flow_config=flow_config, **kwargs)