436 lines
16 KiB
Python
Raw Normal View History

"""
RFC 2544 test implementations:
- ThroughputTest: binary search for max zero-loss throughput
- LatencyTest: measure latency at a given rate
- FrameLossTest: measure loss at decreasing rates
- BackToBackTest: find max burst length with zero loss
"""
import logging
import socket
import struct
import threading
import time
import urllib.request
import json
from typing import Dict, List, Optional
from scapy.all import send, sr, conf, IP, ICMP
from engine.packet_builder import build_packet, parse_payload, MAGIC
log = logging.getLogger(__name__)
conf.verb = 0
class _BaseTest:
"""Base class for RFC 2544 tests."""
def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int],
trial_duration: float = 60, max_rate_pps: int = 10000,
acceptable_loss_pct: float = 0.0, responder_url: str = None):
self.test_id = test_id
self.flow_config = dict(flow_config)
self.frame_sizes = frame_sizes
self.trial_duration = trial_duration
self.max_rate_pps = max_rate_pps
self.acceptable_loss_pct = acceptable_loss_pct
self.responder_url = responder_url # e.g. "http://172.30.0.10:5053"
self.state = 'idle' # idle -> running -> complete/error
self.results = {}
self.error = None
self.started_at = None
self.completed_at = None
# Progress tracking
self._progress_msg = ''
self._current_frame_idx = 0
self._current_trial_tx = 0
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
def start(self):
if self.state == 'running':
return
self._stop_event.clear()
self.state = 'running'
self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
self.results = {}
self.error = None
self._thread = threading.Thread(target=self._run_safe, daemon=True,
name=f'test-{self.test_id[:8]}')
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=self.trial_duration + 5)
if self.state == 'running':
self.state = 'error'
self.error = 'Cancelled by user'
def _run_safe(self):
try:
self._run()
if self.state == 'running':
self.state = 'complete'
except Exception as e:
log.error('Test %s error: %s', self.test_id[:8], e)
self.state = 'error'
self.error = str(e)
finally:
self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
def _run(self):
raise NotImplementedError
def _is_stopped(self) -> bool:
return self._stop_event.is_set()
def _responder_reset(self):
"""Reset responder stats before a trial."""
if not self.responder_url:
return
try:
req = urllib.request.Request(
f'{self.responder_url}/responder/reset', method='POST',
data=b'{}', headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=3)
except Exception as e:
log.warning('Responder reset failed: %s', e)
def _responder_stats(self) -> Optional[dict]:
"""Query responder for rx stats after a trial."""
if not self.responder_url:
return None
try:
req = urllib.request.Request(f'{self.responder_url}/responder/stats')
resp = urllib.request.urlopen(req, timeout=5)
return json.loads(resp.read())
except Exception as e:
log.warning('Responder stats query failed: %s', e)
return None
def _send_trial(self, frame_size: int, rate_pps: int, duration: float):
"""Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies)."""
flow = dict(self.flow_config)
flow['frame_size'] = frame_size
protocol = flow.get('protocol', 'udp').lower()
# Reset responder counters before trial
self._responder_reset()
tx_count = 0
rx_count = 0
latencies = []
start = time.time()
if protocol == 'icmp':
# ICMP: use sr() to measure latency from responses
seq = 0
while (time.time() - start) < duration and not self._is_stopped():
pkt = build_packet(flow, seq=seq)
seq += 1
answered, _ = sr(pkt[IP], timeout=1, verbose=0)
tx_count += 1
for sent_pkt, recv_pkt in answered:
rx_count += 1
rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000
latencies.append(rtt_ms)
elapsed = time.time() - start
expected = elapsed * rate_pps
if tx_count > expected:
sleep_time = (tx_count - expected) / rate_pps
if sleep_time > 0:
self._stop_event.wait(min(sleep_time, 0.1))
else:
# UDP/TCP: high-performance raw socket path
dst_ip = flow['dst_ip']
pkt_template = build_packet(flow, seq=0)
ip_template = bytes(pkt_template[pkt_template.firstlayer().payload.__class__])
magic_offset = ip_template.find(MAGIC)
# Find and zero UDP checksum in template so receivers accept packets
# IP header length from IHL field (byte 0, low nibble) * 4
ip_ihl = (ip_template[0] & 0x0F) * 4
ip_proto = ip_template[9] # protocol field
udp_csum_offset = ip_ihl + 6 if ip_proto == 17 else -1 # 17 = UDP
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
batch_size = max(1, min(rate_pps // 5, 500))
interval = batch_size / rate_pps if rate_pps > 0 else 1.0
seq = 0
try:
while (time.time() - start) < duration and not self._is_stopped():
batch_start = time.time()
for _ in range(batch_size):
pkt_bytes = bytearray(ip_template)
if magic_offset >= 0:
struct.pack_into('!I', pkt_bytes, magic_offset + 4, seq)
struct.pack_into('!Q', pkt_bytes, magic_offset + 8, time.time_ns())
pkt_bytes[10:12] = b'\x00\x00' # zero IP checksum
if udp_csum_offset > 0:
pkt_bytes[udp_csum_offset:udp_csum_offset + 2] = b'\x00\x00'
try:
raw_sock.sendto(bytes(pkt_bytes), (dst_ip, 0))
tx_count += 1
except Exception:
pass
seq += 1
batch_elapsed = time.time() - batch_start
sleep_time = interval - batch_elapsed
if sleep_time > 0:
self._stop_event.wait(sleep_time)
finally:
raw_sock.close()
# Query responder for actual rx stats (UDP/TCP path)
if protocol != 'icmp':
resp_stats = self._responder_stats()
if resp_stats and resp_stats.get('rx_packets', 0) > 0:
rx_count = resp_stats['rx_packets']
lat = resp_stats.get('latency', {})
if lat.get('samples', 0) > 0:
latencies = [lat['avg_ms']] # Use avg as representative
return tx_count, rx_count, latencies
def get_info(self) -> dict:
# Reverse-lookup the slug for this test class
type_slug = next((k for k, v in TEST_TYPES.items() if v is self.__class__), self.__class__.__name__)
info = {
'id': self.test_id,
'test_id': self.test_id,
'type': type_slug,
'state': self.state,
'results': self.results,
'error': self.error,
'frame_sizes': self.frame_sizes,
'started_at': self.started_at,
'completed_at': self.completed_at,
}
if self.state == 'running':
info['progress'] = {
'frame_idx': self._current_frame_idx,
'total_frames': len(self.frame_sizes),
'message': self._progress_msg,
'completed_sizes': list(self.results.keys()),
}
return info
class ThroughputTest(_BaseTest):
"""Binary search for maximum throughput with acceptable loss."""
def _run(self):
for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped():
break
self._current_frame_idx = idx
low = 0
high = self.max_rate_pps
best_rate = 0
convergence_threshold = max(1, int(self.max_rate_pps * 0.01))
step = 0
log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high)
while (high - low) > convergence_threshold and not self._is_stopped():
mid = (low + high) // 2
if mid == 0:
break
step += 1
self._progress_msg = f'Frame {fs}B: trial {step}, testing {mid} pps [{low}-{high}]'
tx, rx, _ = self._send_trial(fs, mid, self.trial_duration)
if tx == 0:
loss_pct = 100.0
elif rx > 0:
loss_pct = ((tx - rx) / tx) * 100
else:
loss_pct = 0.0 # No responder/ICMP — assume success
log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%',
fs, mid, tx, rx, loss_pct)
if loss_pct <= self.acceptable_loss_pct:
best_rate = mid
low = mid + 1
else:
high = mid - 1
self.results[str(fs)] = {
'max_throughput_pps': best_rate,
'frame_size': fs,
}
log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate)
class LatencyTest(_BaseTest):
"""Measure latency at a specified rate."""
def _run(self):
rate = self.flow_config.get('rate_pps', 100)
for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped():
break
self._current_frame_idx = idx
self._progress_msg = f'Frame {fs}B: sending at {rate} pps for {self.trial_duration}s'
log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration)
_, _, latencies = self._send_trial(fs, rate, self.trial_duration)
if latencies:
avg_ms = sum(latencies) / len(latencies)
min_ms = min(latencies)
max_ms = max(latencies)
jitter_ms = (
sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies)))
/ max(1, len(latencies) - 1)
) if len(latencies) > 1 else 0.0
self.results[str(fs)] = {
'frame_size': fs,
'min_ms': round(min_ms, 3),
'avg_ms': round(avg_ms, 3),
'max_ms': round(max_ms, 3),
'jitter_ms': round(jitter_ms, 3),
'samples': len(latencies),
}
else:
self.results[str(fs)] = {
'frame_size': fs,
'min_ms': None, 'avg_ms': None, 'max_ms': None,
'jitter_ms': None, 'samples': 0,
'note': 'No responses received (use ICMP or configure responder)',
}
log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)])
class FrameLossTest(_BaseTest):
"""Measure frame loss at decreasing rates (100%, 90%, 80%, ...)."""
def _run(self):
for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped():
break
self._current_frame_idx = idx
results_for_size = []
for pct in range(100, 0, -10):
if self._is_stopped():
break
rate = int(self.max_rate_pps * pct / 100)
if rate == 0:
continue
self._progress_msg = f'Frame {fs}B: testing at {pct}% rate ({rate} pps)'
log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct)
tx, rx, _ = self._send_trial(fs, rate, self.trial_duration)
if tx > 0 and rx > 0:
loss_pct = ((tx - rx) / tx) * 100
elif tx > 0 and rx == 0:
loss_pct = 0.0 # No responder — cannot measure
else:
loss_pct = 100.0
results_for_size.append({
'rate_pct': pct,
'rate_pps': rate,
'tx_packets': tx,
'rx_packets': rx,
'loss_pct': round(loss_pct, 3),
})
self.results[str(fs)] = results_for_size
class BackToBackTest(_BaseTest):
"""Find maximum burst length with zero loss."""
def _run(self):
for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped():
break
low = 1
high = self.max_rate_pps # Use max_rate_pps as max burst length
best_burst = 0
convergence = max(1, high // 100)
self._current_frame_idx = idx
log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high)
while (high - low) > convergence and not self._is_stopped():
mid = (low + high) // 2
if mid == 0:
break
flow = dict(self.flow_config)
flow['frame_size'] = fs
protocol = flow.get('protocol', 'udp').lower()
# Send burst of 'mid' packets as fast as possible
tx_count = 0
rx_count = 0
for seq in range(mid):
if self._is_stopped():
break
pkt = build_packet(flow, seq=seq)
if protocol == 'icmp':
answered, _ = sr(pkt[IP], timeout=0.5, verbose=0)
tx_count += 1
rx_count += len(answered)
else:
send(pkt[IP], verbose=0)
tx_count += 1
if tx_count > 0 and rx_count > 0:
loss_pct = ((tx_count - rx_count) / tx_count) * 100
elif tx_count > 0:
loss_pct = 0.0 # No responder — cannot measure
else:
loss_pct = 100.0
log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct)
if loss_pct <= self.acceptable_loss_pct:
best_burst = mid
low = mid + 1
else:
high = mid - 1
self.results[str(fs)] = {
'frame_size': fs,
'max_burst_frames': best_burst,
}
log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst)
# Factory function
TEST_TYPES = {
'throughput': ThroughputTest,
'latency': LatencyTest,
'frame_loss': FrameLossTest,
'back_to_back': BackToBackTest,
}
def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs):
"""Create an RFC 2544 test instance by type name."""
cls = TEST_TYPES.get(test_type)
if cls is None:
raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}')
return cls(test_id=test_id, flow_config=flow_config, **kwargs)