337 lines
12 KiB
Python
Raw Permalink Normal View History

"""
RFC 2544 test implementations:
- ThroughputTest: binary search for max zero-loss throughput
- LatencyTest: measure latency at a given rate
- FrameLossTest: measure loss at decreasing rates
- BackToBackTest: find max burst length with zero loss
"""
import logging
import threading
import time
from typing import Dict, List, Optional
from scapy.all import send, sr, conf, IP, ICMP
from engine.packet_builder import build_packet, parse_payload
log = logging.getLogger(__name__)
conf.verb = 0
class _BaseTest:
"""Base class for RFC 2544 tests."""
def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int],
trial_duration: float = 60, max_rate_pps: int = 10000,
acceptable_loss_pct: float = 0.0):
self.test_id = test_id
self.flow_config = dict(flow_config)
self.frame_sizes = frame_sizes
self.trial_duration = trial_duration
self.max_rate_pps = max_rate_pps
self.acceptable_loss_pct = acceptable_loss_pct
self.state = 'idle' # idle -> running -> complete/error
self.results = {}
self.error = None
self.started_at = None
self.completed_at = None
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._lock = threading.Lock()
def start(self):
if self.state == 'running':
return
self._stop_event.clear()
self.state = 'running'
self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
self.results = {}
self.error = None
self._thread = threading.Thread(target=self._run_safe, daemon=True,
name=f'test-{self.test_id[:8]}')
self._thread.start()
def stop(self):
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=self.trial_duration + 5)
if self.state == 'running':
self.state = 'error'
self.error = 'Cancelled by user'
def _run_safe(self):
try:
self._run()
if self.state == 'running':
self.state = 'complete'
except Exception as e:
log.error('Test %s error: %s', self.test_id[:8], e)
self.state = 'error'
self.error = str(e)
finally:
self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
def _run(self):
raise NotImplementedError
def _is_stopped(self) -> bool:
return self._stop_event.is_set()
def _send_trial(self, frame_size: int, rate_pps: int, duration: float):
"""Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies)."""
flow = dict(self.flow_config)
flow['frame_size'] = frame_size
interval = 1.0 / rate_pps if rate_pps > 0 else 1.0
tx_count = 0
rx_count = 0
latencies = []
protocol = flow.get('protocol', 'udp').lower()
start = time.time()
seq = 0
while (time.time() - start) < duration and not self._is_stopped():
pkt = build_packet(flow, seq=seq)
seq += 1
if protocol == 'icmp':
# Use sr() for ICMP to get responses
answered, _ = sr(pkt[IP], timeout=1, verbose=0)
tx_count += 1
for sent_pkt, recv_pkt in answered:
rx_count += 1
rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000
latencies.append(rtt_ms)
else:
send(pkt[IP], verbose=0)
tx_count += 1
# Rate limiting
elapsed = time.time() - start
expected_sent = elapsed * rate_pps
if tx_count > expected_sent:
sleep_time = (tx_count - expected_sent) / rate_pps
if sleep_time > 0:
self._stop_event.wait(min(sleep_time, 0.1))
# For non-ICMP, we can't easily measure rx without a responder.
# rx_count stays 0 for UDP/TCP unless a responder is configured.
return tx_count, rx_count, latencies
def get_info(self) -> dict:
return {
'test_id': self.test_id,
'type': self.__class__.__name__,
'state': self.state,
'results': self.results,
'error': self.error,
'started_at': self.started_at,
'completed_at': self.completed_at,
}
class ThroughputTest(_BaseTest):
"""Binary search for maximum throughput with acceptable loss."""
def _run(self):
for fs in self.frame_sizes:
if self._is_stopped():
break
low = 0
high = self.max_rate_pps
best_rate = 0
convergence_threshold = max(1, int(self.max_rate_pps * 0.01))
log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high)
while (high - low) > convergence_threshold and not self._is_stopped():
mid = (low + high) // 2
if mid == 0:
break
tx, rx, _ = self._send_trial(fs, mid, self.trial_duration)
if tx == 0:
loss_pct = 100.0
else:
# For ICMP we have rx; for UDP assume zero loss if no responder
protocol = self.flow_config.get('protocol', 'udp').lower()
if protocol == 'icmp':
loss_pct = ((tx - rx) / tx) * 100
else:
# Without responder, assume success (user should use responder for accurate test)
loss_pct = 0.0
log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%',
fs, mid, tx, rx, loss_pct)
if loss_pct <= self.acceptable_loss_pct:
best_rate = mid
low = mid + 1
else:
high = mid - 1
self.results[str(fs)] = {
'max_throughput_pps': best_rate,
'frame_size': fs,
}
log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate)
class LatencyTest(_BaseTest):
"""Measure latency at a specified rate."""
def _run(self):
rate = self.flow_config.get('rate_pps', 100)
for fs in self.frame_sizes:
if self._is_stopped():
break
log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration)
_, _, latencies = self._send_trial(fs, rate, self.trial_duration)
if latencies:
avg_ms = sum(latencies) / len(latencies)
min_ms = min(latencies)
max_ms = max(latencies)
jitter_ms = (
sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies)))
/ max(1, len(latencies) - 1)
) if len(latencies) > 1 else 0.0
self.results[str(fs)] = {
'frame_size': fs,
'min_ms': round(min_ms, 3),
'avg_ms': round(avg_ms, 3),
'max_ms': round(max_ms, 3),
'jitter_ms': round(jitter_ms, 3),
'samples': len(latencies),
}
else:
self.results[str(fs)] = {
'frame_size': fs,
'min_ms': None, 'avg_ms': None, 'max_ms': None,
'jitter_ms': None, 'samples': 0,
'note': 'No responses received (use ICMP or configure responder)',
}
log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)])
class FrameLossTest(_BaseTest):
"""Measure frame loss at decreasing rates (100%, 90%, 80%, ...)."""
def _run(self):
for fs in self.frame_sizes:
if self._is_stopped():
break
results_for_size = []
for pct in range(100, 0, -10):
if self._is_stopped():
break
rate = int(self.max_rate_pps * pct / 100)
if rate == 0:
continue
log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct)
tx, rx, _ = self._send_trial(fs, rate, self.trial_duration)
protocol = self.flow_config.get('protocol', 'udp').lower()
if tx > 0 and protocol == 'icmp':
loss_pct = ((tx - rx) / tx) * 100
else:
loss_pct = 0.0 # Cannot measure without responder for non-ICMP
results_for_size.append({
'rate_pct': pct,
'rate_pps': rate,
'tx_packets': tx,
'rx_packets': rx,
'loss_pct': round(loss_pct, 3),
})
self.results[str(fs)] = results_for_size
class BackToBackTest(_BaseTest):
"""Find maximum burst length with zero loss."""
def _run(self):
for fs in self.frame_sizes:
if self._is_stopped():
break
low = 1
high = self.max_rate_pps # Use max_rate_pps as max burst length
best_burst = 0
convergence = max(1, high // 100)
log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high)
while (high - low) > convergence and not self._is_stopped():
mid = (low + high) // 2
if mid == 0:
break
flow = dict(self.flow_config)
flow['frame_size'] = fs
protocol = flow.get('protocol', 'udp').lower()
# Send burst of 'mid' packets as fast as possible
tx_count = 0
rx_count = 0
for seq in range(mid):
if self._is_stopped():
break
pkt = build_packet(flow, seq=seq)
if protocol == 'icmp':
answered, _ = sr(pkt[IP], timeout=0.5, verbose=0)
tx_count += 1
rx_count += len(answered)
else:
send(pkt[IP], verbose=0)
tx_count += 1
if protocol == 'icmp' and tx_count > 0:
loss_pct = ((tx_count - rx_count) / tx_count) * 100
else:
loss_pct = 0.0 # Can't measure without responder
log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct)
if loss_pct <= self.acceptable_loss_pct:
best_burst = mid
low = mid + 1
else:
high = mid - 1
self.results[str(fs)] = {
'frame_size': fs,
'max_burst_frames': best_burst,
}
log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst)
# Factory function
TEST_TYPES = {
'throughput': ThroughputTest,
'latency': LatencyTest,
'frame_loss': FrameLossTest,
'back_to_back': BackToBackTest,
}
def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs):
"""Create an RFC 2544 test instance by type name."""
cls = TEST_TYPES.get(test_type)
if cls is None:
raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}')
return cls(test_id=test_id, flow_config=flow_config, **kwargs)