Adds sender/responder mode switching via API, QuickPing component, echo-mode responder with dedicated container, improved flow state sync, and RFC2544 test runner enhancements. Includes UI improvements across all traffic-gen components. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
436 lines
16 KiB
Python
436 lines
16 KiB
Python
"""
|
|
RFC 2544 test implementations:
|
|
- ThroughputTest: binary search for max zero-loss throughput
|
|
- LatencyTest: measure latency at a given rate
|
|
- FrameLossTest: measure loss at decreasing rates
|
|
- BackToBackTest: find max burst length with zero loss
|
|
"""
|
|
|
|
import logging
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
import json
|
|
from typing import Dict, List, Optional
|
|
|
|
from scapy.all import send, sr, conf, IP, ICMP
|
|
|
|
from engine.packet_builder import build_packet, parse_payload, MAGIC
|
|
|
|
log = logging.getLogger(__name__)
|
|
conf.verb = 0
|
|
|
|
|
|
class _BaseTest:
|
|
"""Base class for RFC 2544 tests."""
|
|
|
|
def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int],
|
|
trial_duration: float = 60, max_rate_pps: int = 10000,
|
|
acceptable_loss_pct: float = 0.0, responder_url: str = None):
|
|
self.test_id = test_id
|
|
self.flow_config = dict(flow_config)
|
|
self.frame_sizes = frame_sizes
|
|
self.trial_duration = trial_duration
|
|
self.max_rate_pps = max_rate_pps
|
|
self.acceptable_loss_pct = acceptable_loss_pct
|
|
self.responder_url = responder_url # e.g. "http://172.30.0.10:5053"
|
|
|
|
self.state = 'idle' # idle -> running -> complete/error
|
|
self.results = {}
|
|
self.error = None
|
|
self.started_at = None
|
|
self.completed_at = None
|
|
|
|
# Progress tracking
|
|
self._progress_msg = ''
|
|
self._current_frame_idx = 0
|
|
self._current_trial_tx = 0
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._stop_event = threading.Event()
|
|
self._lock = threading.Lock()
|
|
|
|
def start(self):
|
|
if self.state == 'running':
|
|
return
|
|
self._stop_event.clear()
|
|
self.state = 'running'
|
|
self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
|
self.results = {}
|
|
self.error = None
|
|
self._thread = threading.Thread(target=self._run_safe, daemon=True,
|
|
name=f'test-{self.test_id[:8]}')
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._stop_event.set()
|
|
if self._thread and self._thread.is_alive():
|
|
self._thread.join(timeout=self.trial_duration + 5)
|
|
if self.state == 'running':
|
|
self.state = 'error'
|
|
self.error = 'Cancelled by user'
|
|
|
|
def _run_safe(self):
|
|
try:
|
|
self._run()
|
|
if self.state == 'running':
|
|
self.state = 'complete'
|
|
except Exception as e:
|
|
log.error('Test %s error: %s', self.test_id[:8], e)
|
|
self.state = 'error'
|
|
self.error = str(e)
|
|
finally:
|
|
self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
|
|
|
|
def _run(self):
|
|
raise NotImplementedError
|
|
|
|
def _is_stopped(self) -> bool:
|
|
return self._stop_event.is_set()
|
|
|
|
def _responder_reset(self):
|
|
"""Reset responder stats before a trial."""
|
|
if not self.responder_url:
|
|
return
|
|
try:
|
|
req = urllib.request.Request(
|
|
f'{self.responder_url}/responder/reset', method='POST',
|
|
data=b'{}', headers={'Content-Type': 'application/json'})
|
|
urllib.request.urlopen(req, timeout=3)
|
|
except Exception as e:
|
|
log.warning('Responder reset failed: %s', e)
|
|
|
|
def _responder_stats(self) -> Optional[dict]:
|
|
"""Query responder for rx stats after a trial."""
|
|
if not self.responder_url:
|
|
return None
|
|
try:
|
|
req = urllib.request.Request(f'{self.responder_url}/responder/stats')
|
|
resp = urllib.request.urlopen(req, timeout=5)
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
log.warning('Responder stats query failed: %s', e)
|
|
return None
|
|
|
|
def _send_trial(self, frame_size: int, rate_pps: int, duration: float):
|
|
"""Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies)."""
|
|
flow = dict(self.flow_config)
|
|
flow['frame_size'] = frame_size
|
|
protocol = flow.get('protocol', 'udp').lower()
|
|
|
|
# Reset responder counters before trial
|
|
self._responder_reset()
|
|
|
|
tx_count = 0
|
|
rx_count = 0
|
|
latencies = []
|
|
start = time.time()
|
|
|
|
if protocol == 'icmp':
|
|
# ICMP: use sr() to measure latency from responses
|
|
seq = 0
|
|
while (time.time() - start) < duration and not self._is_stopped():
|
|
pkt = build_packet(flow, seq=seq)
|
|
seq += 1
|
|
answered, _ = sr(pkt[IP], timeout=1, verbose=0)
|
|
tx_count += 1
|
|
for sent_pkt, recv_pkt in answered:
|
|
rx_count += 1
|
|
rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000
|
|
latencies.append(rtt_ms)
|
|
elapsed = time.time() - start
|
|
expected = elapsed * rate_pps
|
|
if tx_count > expected:
|
|
sleep_time = (tx_count - expected) / rate_pps
|
|
if sleep_time > 0:
|
|
self._stop_event.wait(min(sleep_time, 0.1))
|
|
else:
|
|
# UDP/TCP: high-performance raw socket path
|
|
dst_ip = flow['dst_ip']
|
|
pkt_template = build_packet(flow, seq=0)
|
|
ip_template = bytes(pkt_template[pkt_template.firstlayer().payload.__class__])
|
|
magic_offset = ip_template.find(MAGIC)
|
|
|
|
# Find and zero UDP checksum in template so receivers accept packets
|
|
# IP header length from IHL field (byte 0, low nibble) * 4
|
|
ip_ihl = (ip_template[0] & 0x0F) * 4
|
|
ip_proto = ip_template[9] # protocol field
|
|
udp_csum_offset = ip_ihl + 6 if ip_proto == 17 else -1 # 17 = UDP
|
|
|
|
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
|
|
raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
|
|
|
|
batch_size = max(1, min(rate_pps // 5, 500))
|
|
interval = batch_size / rate_pps if rate_pps > 0 else 1.0
|
|
seq = 0
|
|
|
|
try:
|
|
while (time.time() - start) < duration and not self._is_stopped():
|
|
batch_start = time.time()
|
|
for _ in range(batch_size):
|
|
pkt_bytes = bytearray(ip_template)
|
|
if magic_offset >= 0:
|
|
struct.pack_into('!I', pkt_bytes, magic_offset + 4, seq)
|
|
struct.pack_into('!Q', pkt_bytes, magic_offset + 8, time.time_ns())
|
|
pkt_bytes[10:12] = b'\x00\x00' # zero IP checksum
|
|
if udp_csum_offset > 0:
|
|
pkt_bytes[udp_csum_offset:udp_csum_offset + 2] = b'\x00\x00'
|
|
try:
|
|
raw_sock.sendto(bytes(pkt_bytes), (dst_ip, 0))
|
|
tx_count += 1
|
|
except Exception:
|
|
pass
|
|
seq += 1
|
|
batch_elapsed = time.time() - batch_start
|
|
sleep_time = interval - batch_elapsed
|
|
if sleep_time > 0:
|
|
self._stop_event.wait(sleep_time)
|
|
finally:
|
|
raw_sock.close()
|
|
|
|
# Query responder for actual rx stats (UDP/TCP path)
|
|
if protocol != 'icmp':
|
|
resp_stats = self._responder_stats()
|
|
if resp_stats and resp_stats.get('rx_packets', 0) > 0:
|
|
rx_count = resp_stats['rx_packets']
|
|
lat = resp_stats.get('latency', {})
|
|
if lat.get('samples', 0) > 0:
|
|
latencies = [lat['avg_ms']] # Use avg as representative
|
|
|
|
return tx_count, rx_count, latencies
|
|
|
|
def get_info(self) -> dict:
|
|
# Reverse-lookup the slug for this test class
|
|
type_slug = next((k for k, v in TEST_TYPES.items() if v is self.__class__), self.__class__.__name__)
|
|
info = {
|
|
'id': self.test_id,
|
|
'test_id': self.test_id,
|
|
'type': type_slug,
|
|
'state': self.state,
|
|
'results': self.results,
|
|
'error': self.error,
|
|
'frame_sizes': self.frame_sizes,
|
|
'started_at': self.started_at,
|
|
'completed_at': self.completed_at,
|
|
}
|
|
if self.state == 'running':
|
|
info['progress'] = {
|
|
'frame_idx': self._current_frame_idx,
|
|
'total_frames': len(self.frame_sizes),
|
|
'message': self._progress_msg,
|
|
'completed_sizes': list(self.results.keys()),
|
|
}
|
|
return info
|
|
|
|
|
|
class ThroughputTest(_BaseTest):
|
|
"""Binary search for maximum throughput with acceptable loss."""
|
|
|
|
def _run(self):
|
|
for idx, fs in enumerate(self.frame_sizes):
|
|
if self._is_stopped():
|
|
break
|
|
|
|
self._current_frame_idx = idx
|
|
low = 0
|
|
high = self.max_rate_pps
|
|
best_rate = 0
|
|
convergence_threshold = max(1, int(self.max_rate_pps * 0.01))
|
|
step = 0
|
|
|
|
log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high)
|
|
|
|
while (high - low) > convergence_threshold and not self._is_stopped():
|
|
mid = (low + high) // 2
|
|
if mid == 0:
|
|
break
|
|
step += 1
|
|
|
|
self._progress_msg = f'Frame {fs}B: trial {step}, testing {mid} pps [{low}-{high}]'
|
|
tx, rx, _ = self._send_trial(fs, mid, self.trial_duration)
|
|
|
|
if tx == 0:
|
|
loss_pct = 100.0
|
|
elif rx > 0:
|
|
loss_pct = ((tx - rx) / tx) * 100
|
|
else:
|
|
loss_pct = 0.0 # No responder/ICMP — assume success
|
|
|
|
log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%',
|
|
fs, mid, tx, rx, loss_pct)
|
|
|
|
if loss_pct <= self.acceptable_loss_pct:
|
|
best_rate = mid
|
|
low = mid + 1
|
|
else:
|
|
high = mid - 1
|
|
|
|
self.results[str(fs)] = {
|
|
'max_throughput_pps': best_rate,
|
|
'frame_size': fs,
|
|
}
|
|
log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate)
|
|
|
|
|
|
class LatencyTest(_BaseTest):
|
|
"""Measure latency at a specified rate."""
|
|
|
|
def _run(self):
|
|
rate = self.flow_config.get('rate_pps', 100)
|
|
|
|
for idx, fs in enumerate(self.frame_sizes):
|
|
if self._is_stopped():
|
|
break
|
|
|
|
self._current_frame_idx = idx
|
|
self._progress_msg = f'Frame {fs}B: sending at {rate} pps for {self.trial_duration}s'
|
|
log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration)
|
|
_, _, latencies = self._send_trial(fs, rate, self.trial_duration)
|
|
|
|
if latencies:
|
|
avg_ms = sum(latencies) / len(latencies)
|
|
min_ms = min(latencies)
|
|
max_ms = max(latencies)
|
|
jitter_ms = (
|
|
sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies)))
|
|
/ max(1, len(latencies) - 1)
|
|
) if len(latencies) > 1 else 0.0
|
|
|
|
self.results[str(fs)] = {
|
|
'frame_size': fs,
|
|
'min_ms': round(min_ms, 3),
|
|
'avg_ms': round(avg_ms, 3),
|
|
'max_ms': round(max_ms, 3),
|
|
'jitter_ms': round(jitter_ms, 3),
|
|
'samples': len(latencies),
|
|
}
|
|
else:
|
|
self.results[str(fs)] = {
|
|
'frame_size': fs,
|
|
'min_ms': None, 'avg_ms': None, 'max_ms': None,
|
|
'jitter_ms': None, 'samples': 0,
|
|
'note': 'No responses received (use ICMP or configure responder)',
|
|
}
|
|
log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)])
|
|
|
|
|
|
class FrameLossTest(_BaseTest):
|
|
"""Measure frame loss at decreasing rates (100%, 90%, 80%, ...)."""
|
|
|
|
def _run(self):
|
|
for idx, fs in enumerate(self.frame_sizes):
|
|
if self._is_stopped():
|
|
break
|
|
|
|
self._current_frame_idx = idx
|
|
results_for_size = []
|
|
for pct in range(100, 0, -10):
|
|
if self._is_stopped():
|
|
break
|
|
|
|
rate = int(self.max_rate_pps * pct / 100)
|
|
if rate == 0:
|
|
continue
|
|
|
|
self._progress_msg = f'Frame {fs}B: testing at {pct}% rate ({rate} pps)'
|
|
log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct)
|
|
tx, rx, _ = self._send_trial(fs, rate, self.trial_duration)
|
|
|
|
if tx > 0 and rx > 0:
|
|
loss_pct = ((tx - rx) / tx) * 100
|
|
elif tx > 0 and rx == 0:
|
|
loss_pct = 0.0 # No responder — cannot measure
|
|
else:
|
|
loss_pct = 100.0
|
|
|
|
results_for_size.append({
|
|
'rate_pct': pct,
|
|
'rate_pps': rate,
|
|
'tx_packets': tx,
|
|
'rx_packets': rx,
|
|
'loss_pct': round(loss_pct, 3),
|
|
})
|
|
|
|
self.results[str(fs)] = results_for_size
|
|
|
|
|
|
class BackToBackTest(_BaseTest):
|
|
"""Find maximum burst length with zero loss."""
|
|
|
|
def _run(self):
|
|
for idx, fs in enumerate(self.frame_sizes):
|
|
if self._is_stopped():
|
|
break
|
|
|
|
low = 1
|
|
high = self.max_rate_pps # Use max_rate_pps as max burst length
|
|
best_burst = 0
|
|
convergence = max(1, high // 100)
|
|
|
|
self._current_frame_idx = idx
|
|
log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high)
|
|
|
|
while (high - low) > convergence and not self._is_stopped():
|
|
mid = (low + high) // 2
|
|
if mid == 0:
|
|
break
|
|
|
|
flow = dict(self.flow_config)
|
|
flow['frame_size'] = fs
|
|
protocol = flow.get('protocol', 'udp').lower()
|
|
|
|
# Send burst of 'mid' packets as fast as possible
|
|
tx_count = 0
|
|
rx_count = 0
|
|
for seq in range(mid):
|
|
if self._is_stopped():
|
|
break
|
|
pkt = build_packet(flow, seq=seq)
|
|
if protocol == 'icmp':
|
|
answered, _ = sr(pkt[IP], timeout=0.5, verbose=0)
|
|
tx_count += 1
|
|
rx_count += len(answered)
|
|
else:
|
|
send(pkt[IP], verbose=0)
|
|
tx_count += 1
|
|
|
|
if tx_count > 0 and rx_count > 0:
|
|
loss_pct = ((tx_count - rx_count) / tx_count) * 100
|
|
elif tx_count > 0:
|
|
loss_pct = 0.0 # No responder — cannot measure
|
|
else:
|
|
loss_pct = 100.0
|
|
|
|
log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct)
|
|
|
|
if loss_pct <= self.acceptable_loss_pct:
|
|
best_burst = mid
|
|
low = mid + 1
|
|
else:
|
|
high = mid - 1
|
|
|
|
self.results[str(fs)] = {
|
|
'frame_size': fs,
|
|
'max_burst_frames': best_burst,
|
|
}
|
|
log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst)
|
|
|
|
|
|
# Factory function
|
|
TEST_TYPES = {
|
|
'throughput': ThroughputTest,
|
|
'latency': LatencyTest,
|
|
'frame_loss': FrameLossTest,
|
|
'back_to_back': BackToBackTest,
|
|
}
|
|
|
|
|
|
def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs):
|
|
"""Create an RFC 2544 test instance by type name."""
|
|
cls = TEST_TYPES.get(test_type)
|
|
if cls is None:
|
|
raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}')
|
|
return cls(test_id=test_id, flow_config=flow_config, **kwargs)
|