Add traffic generator improvements: mode switching, ping, responder echo, RFC2544 fixes

Adds sender/responder mode switching via API, QuickPing component, echo-mode
responder with dedicated container, improved flow state sync, and RFC2544
test runner enhancements. Includes UI improvements across all traffic-gen
components.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
sam 2026-05-15 14:22:41 -07:00
parent c28c9b2527
commit 1f0936763b
16 changed files with 1119 additions and 385 deletions

View File

@ -6,7 +6,7 @@
<span class="logo-icon">&#9889;</span> <span class="logo-icon">&#9889;</span>
<h1>Traffic Generator</h1> <h1>Traffic Generator</h1>
</div> </div>
<StatusBar :health="health" :api-error="apiError" /> <StatusBar :health="health" :api-error="apiError" @modeChanged="fetchHealth(); fetchAll()" />
</header> </header>
<!-- ERROR BANNER --> <!-- ERROR BANNER -->
@ -19,7 +19,8 @@
<div class="main-content"> <div class="main-content">
<!-- LEFT COLUMN: Flow Builder --> <!-- LEFT COLUMN: Flow Builder -->
<aside class="left-col"> <aside class="left-col">
<FlowBuilder @created="fetchFlows" @updated="fetchFlows" /> <FlowBuilder :key="editFlow ? editFlow.id : 'new'" :editFlow="editFlow"
@created="onFlowSaved" @updated="onFlowSaved" @cancel="editFlow = null" />
</aside> </aside>
<!-- RIGHT COLUMN: Tabs --> <!-- RIGHT COLUMN: Tabs -->
@ -37,9 +38,16 @@
</div> </div>
<div class="tab-content"> <div class="tab-content">
<FlowTable v-if="activeTab === 'flows'" :flows="flows" @refresh="fetchFlows" /> <div v-if="activeTab === 'flows'">
<TestBuilder v-else-if="activeTab === 'tests'" :flows="flows" @created="fetchTests" @refresh="fetchAll" /> <QuickPing />
<TestRunner v-else-if="activeTab === 'runner'" :tests="tests" @refresh="fetchTests" /> <FlowTable :flows="flows" @refresh="fetchFlows" @edit="startEdit" />
</div>
<div v-else-if="activeTab === 'tests'">
<TestBuilder @created="fetchTests" @refresh="fetchAll" />
<div style="margin-top: 20px;">
<TestRunner :tests="tests" @refresh="fetchTests" />
</div>
</div>
<ResultsPanel v-else-if="activeTab === 'results'" :tests="tests" /> <ResultsPanel v-else-if="activeTab === 'results'" :tests="tests" />
<StatsMonitor v-else-if="activeTab === 'monitor'" :flows="flows" /> <StatsMonitor v-else-if="activeTab === 'monitor'" :flows="flows" />
</div> </div>
@ -50,19 +58,20 @@
<footer class="app-footer"> <footer class="app-footer">
<span>Refreshing every 5s (health) / 3s (flows)</span> <span>Refreshing every 5s (health) / 3s (flows)</span>
<span class="footer-sep">|</span> <span class="footer-sep">|</span>
<a href="http://localhost:3000" target="_blank" class="footer-link">Grafana: :3000</a> <a :href="baseUrl + ':3000'" target="_blank" class="footer-link">Grafana: :3000</a>
<span class="footer-sep">|</span> <span class="footer-sep">|</span>
<a href="http://localhost:5001" target="_blank" class="footer-link">Route Injector: :5001</a> <a :href="baseUrl + ':5001'" target="_blank" class="footer-link">Route Injector: :5001</a>
</footer> </footer>
</div> </div>
</template> </template>
<script setup> <script setup>
import { ref, onMounted, onUnmounted } from 'vue' import { ref, computed, onMounted, onUnmounted } from 'vue'
import { api } from './api.js' import { api } from './api.js'
import StatusBar from './components/StatusBar.vue' import StatusBar from './components/StatusBar.vue'
import FlowBuilder from './components/FlowBuilder.vue' import FlowBuilder from './components/FlowBuilder.vue'
import FlowTable from './components/FlowTable.vue' import FlowTable from './components/FlowTable.vue'
import QuickPing from './components/QuickPing.vue'
import TestBuilder from './components/TestBuilder.vue' import TestBuilder from './components/TestBuilder.vue'
import TestRunner from './components/TestRunner.vue' import TestRunner from './components/TestRunner.vue'
import ResultsPanel from './components/ResultsPanel.vue' import ResultsPanel from './components/ResultsPanel.vue'
@ -73,11 +82,15 @@ const flows = ref([])
const tests = ref([]) const tests = ref([])
const apiError = ref(null) const apiError = ref(null)
const activeTab = ref('flows') const activeTab = ref('flows')
const editFlow = ref(null)
const baseUrl = computed(() => `${window.location.protocol}//${window.location.hostname}`)
function startEdit(flow) { editFlow.value = { ...flow } }
function onFlowSaved() { editFlow.value = null; fetchFlows() }
const tabs = [ const tabs = [
{ id: 'flows', label: 'Flows' }, { id: 'flows', label: 'Flows' },
{ id: 'tests', label: 'Tests' }, { id: 'tests', label: 'Tests' },
{ id: 'runner', label: 'Runner' },
{ id: 'results', label: 'Results' }, { id: 'results', label: 'Results' },
{ id: 'monitor', label: 'Monitor' }, { id: 'monitor', label: 'Monitor' },
] ]

View File

@ -42,14 +42,23 @@
<input v-model.number="form.frame_size" type="number" min="64" max="9000" /> <input v-model.number="form.frame_size" type="number" min="64" max="9000" />
</div> </div>
<div class="form-row"> <div class="form-row">
<label>Rate (pps)</label> <label>Rate</label>
<input v-model.number="form.rate_pps" type="number" min="1" max="100000" /> <input v-model.number="form.rate_val" type="number" min="1" step="any" />
<select v-model="form.rate_unit" class="rate-unit-standalone">
<option value="pps">pps</option>
<option value="kbps">Kbps</option>
<option value="mbps">Mbps</option>
</select>
</div> </div>
</div> </div>
<div class="form-row-pair"> <div class="form-row-pair">
<div class="form-row"> <div class="form-row">
<label>Duration (sec)</label> <label>Duration (sec)</label>
<input v-model.number="form.duration" type="number" min="0" /> <input v-model.number="form.duration" type="number" min="0" :disabled="form.continuous" />
<label class="checkbox-inline">
<input type="checkbox" v-model="form.continuous" @change="onContinuousChange" />
Continuous
</label>
</div> </div>
<div class="form-row"> <div class="form-row">
<label>DSCP</label> <label>DSCP</label>
@ -71,32 +80,53 @@
</template> </template>
<script setup> <script setup>
import { reactive, watch } from 'vue' import { reactive, computed } from 'vue'
import { api } from '../api.js' import { api } from '../api.js'
const props = defineProps({ editFlow: Object }) const props = defineProps({ editFlow: Object })
const emit = defineEmits(['created', 'updated', 'cancel']) const emit = defineEmits(['created', 'updated', 'cancel'])
const editing = !!props.editFlow const editing = computed(() => !!props.editFlow)
const defaults = { const defaults = {
name: '', dst_ip: '', src_ip: '', dst_mac: '', name: '', dst_ip: '', src_ip: '', dst_mac: '',
protocol: 'udp', src_port: 50000, dst_port: 5001, protocol: 'udp', src_port: 50000, dst_port: 5001,
frame_size: 512, rate_pps: 1000, duration: 30, frame_size: 512, rate_val: 1000, rate_unit: 'pps', duration: 30,
dscp: 0, responder_url: '', dscp: 0, responder_url: '', continuous: false,
} }
const form = reactive({ ...defaults, ...(props.editFlow || {}) }) function ppsToDisplay(pps, frameSize) {
// Convert stored PPS to a friendlier unit if it was originally set that way
return { rate_val: pps, rate_unit: 'pps' }
}
const initData = props.editFlow
? { ...props.editFlow, continuous: props.editFlow.duration === 0, ...ppsToDisplay(props.editFlow.rate_pps, props.editFlow.frame_size) }
: {}
const form = reactive({ ...defaults, ...initData })
function onContinuousChange() { if (form.continuous) form.duration = 0 }
function computePps(val, unit, frameSize) {
if (unit === 'kbps') return Math.max(1, Math.round((val * 1000) / (frameSize * 8)))
if (unit === 'mbps') return Math.max(1, Math.round((val * 1_000_000) / (frameSize * 8)))
return Math.round(val)
}
async function submit() { async function submit() {
try { try {
const payload = { ...form } const payload = { ...form }
payload.rate_pps = computePps(form.rate_val, form.rate_unit, form.frame_size)
delete payload.rate_val
delete payload.rate_unit
delete payload.continuous
if (form.continuous) payload.duration = 0
if (!payload.src_ip) delete payload.src_ip if (!payload.src_ip) delete payload.src_ip
if (!payload.dst_mac) delete payload.dst_mac if (!payload.dst_mac) delete payload.dst_mac
if (!payload.responder_url) delete payload.responder_url if (!payload.responder_url) delete payload.responder_url
if (!payload.name) payload.name = `${payload.protocol.toUpperCase()} -> ${payload.dst_ip}` if (!payload.name) payload.name = `${payload.protocol.toUpperCase()} -> ${payload.dst_ip}`
if (editing) { if (editing.value) {
await api.updateFlow(props.editFlow.id, payload) await api.updateFlow(props.editFlow.id, payload)
emit('updated') emit('updated')
} else { } else {
@ -123,4 +153,7 @@ h3 { font-size: 15px; margin-bottom: 12px; color: var(--accent); }
.btn-accent:hover { opacity: 0.9; } .btn-accent:hover { opacity: 0.9; }
.btn-accent:disabled { opacity: 0.4; } .btn-accent:disabled { opacity: 0.4; }
.btn-muted { background: var(--border); color: var(--text); } .btn-muted { background: var(--border); color: var(--text); }
.rate-unit-standalone { width: 100%; margin-top: 4px; }
.checkbox-inline { display: inline-flex !important; align-items: center; gap: 4px; margin-top: 4px; font-size: 12px; cursor: pointer; }
.checkbox-inline input { width: auto; }
</style> </style>

View File

@ -10,7 +10,9 @@
<th>Size</th> <th>Size</th>
<th>Rate</th> <th>Rate</th>
<th>State</th> <th>State</th>
<th>TX Pkts</th>
<th>TX pps</th> <th>TX pps</th>
<th>RX Pkts</th>
<th>Actions</th> <th>Actions</th>
</tr> </tr>
</thead> </thead>
@ -20,14 +22,17 @@
<td class="mono">{{ f.dst_ip }}</td> <td class="mono">{{ f.dst_ip }}</td>
<td>{{ f.protocol.toUpperCase() }}</td> <td>{{ f.protocol.toUpperCase() }}</td>
<td>{{ f.frame_size }}B</td> <td>{{ f.frame_size }}B</td>
<td>{{ f.rate_pps }} pps</td> <td>{{ formatRate(f) }}</td>
<td> <td>
<span class="state-badge" :class="'state-' + f.state">{{ f.state }}</span> <span class="state-badge" :class="'state-' + f.state">{{ f.state }}</span>
</td> </td>
<td class="mono">{{ stats[f.id]?.tx_pps || 0 }}</td> <td class="mono">{{ formatNum(f.stats?.tx_packets || 0) }}</td>
<td class="mono">{{ pps[f.id] || 0 }}</td>
<td class="mono">{{ formatNum(f.stats?.rx_packets || 0) }}</td>
<td class="actions"> <td class="actions">
<button v-if="f.state !== 'running'" class="btn-sm btn-go" @click="start(f.id)">Start</button> <button v-if="f.state !== 'running'" class="btn-sm btn-go" @click="start(f.id)">Start</button>
<button v-else class="btn-sm btn-stop" @click="stop(f.id)">Stop</button> <button v-else class="btn-sm btn-stop" @click="stop(f.id)">Stop</button>
<button class="btn-sm btn-edit" @click="emit('edit', f)" :disabled="f.state === 'running'">Edit</button>
<button class="btn-sm btn-del" @click="del(f.id)" :disabled="f.state === 'running'">Del</button> <button class="btn-sm btn-del" @click="del(f.id)" :disabled="f.state === 'running'">Del</button>
</td> </td>
</tr> </tr>
@ -41,23 +46,40 @@ import { ref, onMounted, onUnmounted } from 'vue'
import { api } from '../api.js' import { api } from '../api.js'
const props = defineProps({ flows: Array }) const props = defineProps({ flows: Array })
const emit = defineEmits(['refresh']) const emit = defineEmits(['refresh', 'edit'])
const stats = ref({}) const pps = ref({})
const prevTx = ref({})
let statsTimer = null let statsTimer = null
async function fetchStats() { function computePps() {
for (const f of (props.flows || [])) { for (const f of (props.flows || [])) {
if (f.state === 'running') { const txNow = f.stats?.tx_packets || 0
try { const prev = prevTx.value[f.id] || 0
const s = await api.flowStats(f.id) if (f.state === 'running' && prev > 0) {
stats.value[f.id] = s pps.value[f.id] = Math.max(0, txNow - prev)
} catch (_) {} } else if (f.state !== 'running') {
pps.value[f.id] = 0
} }
prevTx.value[f.id] = txNow
} }
} }
onMounted(() => { statsTimer = setInterval(fetchStats, 1000) }) function formatRate(f) {
const pps = f.rate_pps || 0
const mbps = (pps * (f.frame_size || 64) * 8) / 1_000_000
if (mbps >= 1) return mbps.toFixed(1) + ' Mbps'
if (mbps >= 0.001) return (mbps * 1000).toFixed(0) + ' Kbps'
return pps + ' pps'
}
function formatNum(n) {
if (n >= 1000000) return (n / 1000000).toFixed(1) + 'M'
if (n >= 1000) return (n / 1000).toFixed(1) + 'K'
return n
}
onMounted(() => { statsTimer = setInterval(computePps, 1000) })
onUnmounted(() => { clearInterval(statsTimer) }) onUnmounted(() => { clearInterval(statsTimer) })
async function start(id) { async function start(id) {
@ -88,5 +110,7 @@ tr.running { background: rgba(79,156,249,0.05); }
.btn-go { background: var(--success); color: #fff; } .btn-go { background: var(--success); color: #fff; }
.btn-stop { background: var(--warning); color: #000; } .btn-stop { background: var(--warning); color: #000; }
.btn-del { background: rgba(252,129,129,0.15); color: var(--danger); } .btn-del { background: rgba(252,129,129,0.15); color: var(--danger); }
.btn-edit { background: rgba(79,156,249,0.15); color: var(--accent); }
.btn-edit:disabled { opacity: 0.3; }
.btn-del:disabled { opacity: 0.3; } .btn-del:disabled { opacity: 0.3; }
</style> </style>

View File

@ -0,0 +1,76 @@
<template>
<div class="quick-ping">
<div class="ping-row">
<input
v-model="target"
placeholder="IP address to ping..."
@keyup.enter="runPing"
:disabled="pinging"
/>
<button class="btn-ping" @click="runPing" :disabled="!target || pinging">
{{ pinging ? 'Pinging...' : 'Ping' }}
</button>
</div>
<div v-if="result" class="ping-result" :class="result.reachable ? 'reachable' : 'unreachable'">
<div class="ping-summary">
<span class="ping-target">{{ result.target }}</span>
<span v-if="result.reachable" class="ping-status ok">Reachable</span>
<span v-else class="ping-status fail">Unreachable</span>
</div>
<div v-if="result.reachable && result.stats" class="ping-stats">
<span>{{ result.received }}/{{ result.sent }} replies</span>
<span>Min: {{ result.stats.min_ms }}ms</span>
<span>Avg: {{ result.stats.avg_ms }}ms</span>
<span>Max: {{ result.stats.max_ms }}ms</span>
<span>Loss: {{ result.loss_pct }}%</span>
</div>
<div v-if="result.error" class="ping-error">{{ result.error }}</div>
</div>
</div>
</template>
<script setup>
import { ref } from 'vue'
import { api } from '../api.js'
const target = ref('')
const pinging = ref(false)
const result = ref(null)
async function runPing() {
if (!target.value || pinging.value) return
pinging.value = true
result.value = null
try {
result.value = await api.ping(target.value, 5)
} catch (e) {
result.value = { target: target.value, reachable: false, error: e.message }
} finally {
pinging.value = false
}
}
</script>
<style scoped>
.quick-ping { margin-bottom: 16px; }
.ping-row { display: flex; gap: 6px; }
.ping-row input { flex: 1; }
.btn-ping {
padding: 6px 16px; font-weight: 600; font-size: 13px;
background: var(--accent); color: #fff; white-space: nowrap;
}
.btn-ping:disabled { opacity: 0.4; }
.ping-result {
margin-top: 8px; padding: 8px 12px;
border-radius: var(--radius); font-size: 13px;
}
.ping-result.reachable { background: rgba(72,187,120,0.1); border: 1px solid rgba(72,187,120,0.3); }
.ping-result.unreachable { background: rgba(252,129,129,0.1); border: 1px solid rgba(252,129,129,0.3); }
.ping-summary { display: flex; align-items: center; gap: 10px; }
.ping-target { font-weight: 600; font-family: monospace; }
.ping-status { font-size: 11px; padding: 2px 8px; border-radius: 10px; font-weight: 600; }
.ping-status.ok { background: rgba(72,187,120,0.2); color: var(--success); }
.ping-status.fail { background: rgba(252,129,129,0.2); color: var(--danger); }
.ping-stats { display: flex; gap: 12px; margin-top: 6px; font-size: 12px; color: var(--muted); font-family: monospace; }
.ping-error { margin-top: 4px; color: var(--danger); font-size: 12px; }
</style>

View File

@ -13,8 +13,30 @@
</div> </div>
</div> </div>
<div v-if="t.results" class="result-table"> <div v-if="t.error" class="error-msg">Error: {{ t.error }}</div>
<div v-if="t.results && Object.keys(t.results).length" class="result-table">
<!-- Frame Loss: array of rate steps per frame size -->
<template v-if="t.type === 'frame_loss'">
<div v-for="(rates, size) in t.results" :key="size" class="fl-section">
<div class="fl-title">Frame Size: {{ size }} B</div>
<table> <table>
<thead>
<tr><th>Rate %</th><th>Rate (pps)</th><th>TX Packets</th><th>RX Packets</th><th>Loss %</th></tr>
</thead>
<tbody>
<tr v-for="r in rates" :key="r.rate_pct">
<td class="mono">{{ r.rate_pct }}%</td>
<td class="mono">{{ r.rate_pps }}</td>
<td class="mono">{{ r.tx_packets }}</td>
<td class="mono">{{ r.rx_packets }}</td>
<td class="mono">{{ r.loss_pct }}%</td>
</tr>
</tbody>
</table>
</div>
</template>
<!-- Other test types -->
<table v-else>
<thead> <thead>
<tr> <tr>
<th>Frame Size (B)</th> <th>Frame Size (B)</th>
@ -32,7 +54,7 @@
</table> </table>
</div> </div>
<div v-if="t.results" class="result-chart"> <div v-if="t.results && t.type !== 'frame_loss'" class="result-chart">
<div class="bar-chart"> <div class="bar-chart">
<div v-for="(val, size) in t.results" :key="size" class="bar-item"> <div v-for="(val, size) in t.results" :key="size" class="bar-item">
<div class="bar-fill" :style="{ height: barHeight(t, val) + '%' }"></div> <div class="bar-fill" :style="{ height: barHeight(t, val) + '%' }"></div>
@ -50,7 +72,7 @@ import { computed } from 'vue'
const props = defineProps({ tests: Array }) const props = defineProps({ tests: Array })
const completedTests = computed(() => const completedTests = computed(() =>
(props.tests || []).filter(t => t.state === 'complete' && t.results) (props.tests || []).filter(t => (t.state === 'complete' || t.state === 'error') && (t.results || t.error))
) )
function resultColumns(t) { function resultColumns(t) {
@ -63,22 +85,28 @@ function resultColumns(t) {
function formatVal(val, col) { function formatVal(val, col) {
if (typeof val === 'object') { if (typeof val === 'object') {
if (col.includes('Rate')) return val.max_rate_pps ?? '-' if (col.includes('Rate')) return val.max_throughput_pps ?? val.max_rate_pps ?? '-'
if (col.includes('Throughput')) return val.throughput_mbps ?? '-' if (col.includes('Throughput')) {
if (col.includes('Min')) return val.min_ms ?? '-' const pps = val.max_throughput_pps ?? val.max_rate_pps ?? 0
if (col.includes('Avg')) return val.avg_ms ?? '-' const fs = val.frame_size ?? 64
if (col.includes('Max') && col.includes('ms')) return val.max_ms ?? '-' return pps ? ((pps * fs * 8) / 1_000_000).toFixed(2) : '-'
if (col.includes('Jitter')) return val.jitter_ms ?? '-' }
if (col.includes('Min')) return val.min_ms != null ? val.min_ms.toFixed(2) : '-'
if (col.includes('Avg')) return val.avg_ms != null ? val.avg_ms.toFixed(2) : '-'
if (col.includes('Max') && col.includes('ms')) return val.max_ms != null ? val.max_ms.toFixed(2) : '-'
if (col.includes('Jitter')) return val.jitter_ms != null ? val.jitter_ms.toFixed(2) : '-'
if (col.includes('Loss')) return val.loss_pct ?? '-' if (col.includes('Loss')) return val.loss_pct ?? '-'
if (col.includes('Burst')) return val.max_burst ?? '-' if (col.includes('Burst')) return val.max_burst_frames ?? val.max_burst ?? '-'
return JSON.stringify(val) return JSON.stringify(val)
} }
return val return val
} }
function barHeight(t, val) { function barHeight(t, val) {
const v = typeof val === 'object' ? (val.max_rate_pps || val.avg_ms || val.loss_pct || val.max_burst || 0) : val const v = typeof val === 'object' ? (val.max_throughput_pps || val.max_rate_pps || val.avg_ms || val.loss_pct || val.max_burst_frames || 0) : val
return Math.min(100, Math.max(5, v / 100)) const allVals = Object.values(t.results).map(r => typeof r === 'object' ? (r.max_throughput_pps || r.max_rate_pps || r.avg_ms || r.loss_pct || r.max_burst_frames || 0) : r)
const maxVal = Math.max(...allVals, 1)
return Math.min(100, Math.max(5, (v / maxVal) * 100))
} }
function exportJSON(t) { function exportJSON(t) {
@ -123,4 +151,7 @@ td { font-size: 13px; padding: 4px 8px; }
.bar-item { flex: 1; display: flex; flex-direction: column; align-items: center; height: 100%; } .bar-item { flex: 1; display: flex; flex-direction: column; align-items: center; height: 100%; }
.bar-fill { width: 100%; background: var(--accent); border-radius: 3px 3px 0 0; min-height: 4px; transition: height 0.3s; margin-top: auto; } .bar-fill { width: 100%; background: var(--accent); border-radius: 3px 3px 0 0; min-height: 4px; transition: height 0.3s; margin-top: auto; }
.bar-label { font-size: 10px; color: var(--muted); margin-top: 4px; } .bar-label { font-size: 10px; color: var(--muted); margin-top: 4px; }
.fl-section { margin-bottom: 12px; }
.fl-title { font-size: 12px; font-weight: 600; color: var(--accent); margin-bottom: 4px; }
.error-msg { color: var(--danger); font-size: 13px; padding: 8px 0; }
</style> </style>

View File

@ -78,12 +78,79 @@ let timer = null
async function fetchStats() { async function fetchStats() {
try { try {
if (selectedFlow.value) { if (selectedFlow.value) {
// Single flow: /flows/<id>/stats returns {flow_id, counters, rates}
// rates contains: tx_pps, rx_pps, tx_mbps, rx_mbps, loss_pct, tx_packets, tx_bytes, etc.
const s = await api.flowStats(selectedFlow.value) const s = await api.flowStats(selectedFlow.value)
current.value = s const rates = s.rates || {}
const counters = s.counters || {}
current.value = {
tx_pps: Math.round(rates.tx_pps || 0),
rx_pps: Math.round(rates.rx_pps || 0),
tx_mbps: rates.tx_mbps || 0,
rx_mbps: rates.rx_mbps || 0,
loss_pct: rates.loss_pct || 0,
avg_latency_ms: rates.latency ? rates.latency.avg_ms : null,
tx_packets: counters.tx_packets || 0,
tx_bytes: counters.tx_bytes || 0,
rx_packets: counters.rx_packets || 0,
rx_bytes: counters.rx_bytes || 0,
}
// Append to history for sparkline
history.value.push({ tx_pps: current.value.tx_pps, rx_pps: current.value.rx_pps })
if (history.value.length > 60) history.value = history.value.slice(-60)
} else { } else {
// All flows: /stats/history returns {history: {flow_id: [samples]}}
const h = await api.statsHistory() const h = await api.statsHistory()
if (h.current) current.value = h.current const allHistory = h.history || {}
if (h.history) history.value = h.history.slice(-60) // Aggregate latest sample across all flows
let txPps = 0, rxPps = 0, txMbps = 0, rxMbps = 0
let txPkts = 0, txBytes = 0, rxPkts = 0, rxBytes = 0
let lossPcts = [], latencies = []
for (const [, samples] of Object.entries(allHistory)) {
if (!samples.length) continue
const latest = samples[samples.length - 1]
txPps += latest.tx_pps || 0
rxPps += latest.rx_pps || 0
txMbps += latest.tx_mbps || 0
rxMbps += latest.rx_mbps || 0
txPkts += latest.tx_packets || 0
txBytes += latest.tx_bytes || 0
rxPkts += latest.rx_packets || 0
rxBytes += latest.rx_bytes || 0
if (latest.loss_pct > 0) lossPcts.push(latest.loss_pct)
if (latest.latency && latest.latency.avg_ms) latencies.push(latest.latency.avg_ms)
}
current.value = {
tx_pps: Math.round(txPps),
rx_pps: Math.round(rxPps),
tx_mbps: txMbps,
rx_mbps: rxMbps,
loss_pct: txPkts > 0 ? Math.max(0, ((txPkts - rxPkts) / txPkts) * 100) : 0,
avg_latency_ms: latencies.length ? latencies.reduce((a, b) => a + b, 0) / latencies.length : null,
tx_packets: txPkts,
tx_bytes: txBytes,
rx_packets: rxPkts,
rx_bytes: rxBytes,
}
// Build aggregated sparkline from history samples
// Find max sample count across all flows
const flowIds = Object.keys(allHistory)
if (flowIds.length) {
const maxLen = Math.max(...flowIds.map(id => allHistory[id].length))
const sparkData = []
for (let i = Math.max(0, maxLen - 60); i < maxLen; i++) {
let sTx = 0, sRx = 0
for (const fid of flowIds) {
const s = allHistory[fid][i]
if (s) { sTx += s.tx_pps || 0; sRx += s.rx_pps || 0 }
}
sparkData.push({ tx_pps: Math.round(sTx), rx_pps: Math.round(sRx) })
}
history.value = sparkData
}
} }
} catch (_) {} } catch (_) {}
} }

View File

@ -2,25 +2,45 @@
<div class="status-bar"> <div class="status-bar">
<div class="status-badges"> <div class="status-badges">
<span class="badge" :class="connected ? 'badge-ok' : 'badge-err'"> <span class="badge" :class="connected ? 'badge-ok' : 'badge-err'">
{{ connected ? 'Connected' : 'Offline' }} {{ connected ? 'API Connected' : 'API Offline' }}
</span>
<span v-if="health" class="badge badge-mode" :class="'mode-' + (health.mode || 'sender')" @click="toggleMode">
{{ (health.mode || 'sender').toUpperCase() }}
</span> </span>
<span v-if="health" class="badge badge-info"> <span v-if="health" class="badge badge-info">
Mode: {{ health.mode || 'sender' }} Active Flows: {{ health.active_flows || 0 }}
</span> </span>
<span v-if="health" class="badge badge-info"> <span v-if="health" class="badge badge-info">
Flows: {{ health.active_flows || 0 }} Active Tests: {{ health.active_tests || 0 }}
</span>
<span v-if="health" class="badge badge-info">
Tests: {{ health.active_tests || 0 }}
</span> </span>
</div> </div>
</div> </div>
</template> </template>
<script setup> <script setup>
import { computed } from 'vue' import { computed, ref } from 'vue'
import { api } from '../api.js'
const props = defineProps({ health: Object, apiError: String }) const props = defineProps({ health: Object, apiError: String })
const emit = defineEmits(['modeChanged'])
const connected = computed(() => !props.apiError && props.health) const connected = computed(() => !props.apiError && props.health)
const switching = ref(false)
async function toggleMode() {
if (switching.value || !props.health) return
const current = props.health.mode || 'sender'
const next = current === 'sender' ? 'responder' : 'sender'
if (!confirm(`Switch to ${next.toUpperCase()} mode? This will stop all active flows/tests.`)) return
switching.value = true
try {
await api.setMode(next)
emit('modeChanged')
} catch (e) {
alert('Failed to switch mode: ' + e.message)
} finally {
switching.value = false
}
}
</script> </script>
<style scoped> <style scoped>
@ -33,5 +53,9 @@ const connected = computed(() => !props.apiError && props.health)
.badge-ok { background: rgba(72,187,120,0.15); color: var(--success); } .badge-ok { background: rgba(72,187,120,0.15); color: var(--success); }
.badge-err { background: rgba(252,129,129,0.15); color: var(--danger); animation: pulse 1.5s infinite; } .badge-err { background: rgba(252,129,129,0.15); color: var(--danger); animation: pulse 1.5s infinite; }
.badge-info { background: rgba(79,156,249,0.12); color: var(--accent); } .badge-info { background: rgba(79,156,249,0.12); color: var(--accent); }
.badge-mode { cursor: pointer; transition: background 0.2s; }
.badge-mode:hover { opacity: 0.8; }
.mode-sender { background: rgba(72,187,120,0.2); color: var(--success); }
.mode-responder { background: rgba(246,173,85,0.2); color: var(--warning); }
@keyframes pulse { 0%,100% { opacity: 1; } 50% { opacity: 0.5; } } @keyframes pulse { 0%,100% { opacity: 1; } 50% { opacity: 0.5; } }
</style> </style>

View File

@ -13,14 +13,24 @@
</div> </div>
<div class="form-row"> <div class="form-row">
<label>Base Flow</label> <label>Destination IP</label>
<select v-model="form.flow_id"> <input v-model="form.dst_ip" placeholder="10.100.0.1" />
<option value="" disabled>Select a flow...</option> </div>
<option v-for="f in flows" :key="f.id" :value="f.id">
{{ f.name || f.dst_ip }} ({{ f.protocol }}) <div class="form-row-pair">
</option> <div class="form-row">
<label>Protocol</label>
<select v-model="form.protocol">
<option value="udp">UDP</option>
<option value="icmp">ICMP</option>
<option value="tcp">TCP</option>
</select> </select>
</div> </div>
<div class="form-row">
<label>Source IP</label>
<input v-model="form.src_ip" placeholder="auto" />
</div>
</div>
<div class="form-row"> <div class="form-row">
<label>Frame Sizes</label> <label>Frame Sizes</label>
@ -37,8 +47,13 @@
<input v-model.number="form.trial_duration" type="number" min="5" max="300" /> <input v-model.number="form.trial_duration" type="number" min="5" max="300" />
</div> </div>
<div class="form-row"> <div class="form-row">
<label>Max Rate (pps)</label> <label>Max Rate</label>
<input v-model.number="form.max_rate_pps" type="number" min="10" max="100000" /> <input v-model.number="form.max_rate_val" type="number" min="1" step="any" />
<select v-model="form.max_rate_unit" class="rate-unit-standalone">
<option value="pps">pps</option>
<option value="kbps">Kbps</option>
<option value="mbps">Mbps</option>
</select>
</div> </div>
</div> </div>
@ -47,7 +62,7 @@
<input v-model.number="form.acceptable_loss_pct" type="number" min="0" max="100" step="0.1" /> <input v-model.number="form.acceptable_loss_pct" type="number" min="0" max="100" step="0.1" />
</div> </div>
<button class="btn btn-accent" @click="create" :disabled="!form.flow_id"> <button class="btn btn-accent" @click="create" :disabled="!form.dst_ip">
Create & Run Test Create & Run Test
</button> </button>
@ -67,36 +82,60 @@
import { reactive, ref, onMounted } from 'vue' import { reactive, ref, onMounted } from 'vue'
import { api } from '../api.js' import { api } from '../api.js'
const props = defineProps({ flows: Array })
const emit = defineEmits(['created', 'refresh']) const emit = defineEmits(['created', 'refresh'])
const standardSizes = [64, 128, 256, 512, 1024, 1280, 1518] const standardSizes = [64, 128, 256, 512, 1024, 1280, 1518, 2048, 4096, 9000]
const presets = ref({}) const presets = ref({})
const form = reactive({ const form = reactive({
type: 'throughput', type: 'throughput',
flow_id: '', dst_ip: '',
src_ip: '',
protocol: 'udp',
frame_sizes: [64, 512, 1518], frame_sizes: [64, 512, 1518],
trial_duration: 30, trial_duration: 30,
max_rate_pps: 10000, max_rate_val: 10,
max_rate_unit: 'mbps',
acceptable_loss_pct: 0.0, acceptable_loss_pct: 0.0,
}) })
function computePps(val, unit) {
if (unit === 'kbps') return Math.max(1, Math.round((val * 1000) / (512 * 8)))
if (unit === 'mbps') return Math.max(1, Math.round((val * 1_000_000) / (512 * 8)))
return Math.round(val)
}
onMounted(async () => { onMounted(async () => {
try { const r = await api.presets(); presets.value = r.presets || r } catch (_) {} try { const r = await api.presets(); presets.value = r.presets || r } catch (_) {}
}) })
async function create() { async function create() {
try { try {
const test = await api.createTest({ ...form }) const payload = {
type: form.type,
flow_config: {
dst_ip: form.dst_ip,
src_ip: form.src_ip || 'auto',
protocol: form.protocol,
src_port: 50000,
dst_port: 5001,
},
frame_sizes: form.frame_sizes,
trial_duration: form.trial_duration,
max_rate_pps: computePps(form.max_rate_val, form.max_rate_unit),
acceptable_loss_pct: form.acceptable_loss_pct,
}
const test = await api.createTest(payload)
await api.startTest(test.id) await api.startTest(test.id)
emit('created') emit('created')
} catch (e) { alert(e.message) } } catch (e) { alert(e.message) }
} }
async function loadPreset(name) { async function loadPreset(name) {
const dstIp = prompt('Destination IP for this preset:', '10.100.0.100')
if (!dstIp) return
try { try {
await api.loadPreset(name, {}) await api.loadPreset(name, { dst_ip: dstIp })
emit('refresh') emit('refresh')
} catch (e) { alert(e.message) } } catch (e) { alert(e.message) }
} }
@ -109,6 +148,7 @@ h4 { font-size: 13px; margin: 16px 0 8px; color: var(--muted); }
.form-row label { display: block; font-size: 11px; color: var(--muted); margin-bottom: 3px; text-transform: uppercase; letter-spacing: 0.05em; } .form-row label { display: block; font-size: 11px; color: var(--muted); margin-bottom: 3px; text-transform: uppercase; letter-spacing: 0.05em; }
.form-row input, .form-row select { width: 100%; } .form-row input, .form-row select { width: 100%; }
.form-row-pair { display: grid; grid-template-columns: 1fr 1fr; gap: 8px; } .form-row-pair { display: grid; grid-template-columns: 1fr 1fr; gap: 8px; }
.rate-unit-standalone { width: 100%; margin-top: 4px; }
.frame-sizes { display: flex; flex-wrap: wrap; gap: 8px; } .frame-sizes { display: flex; flex-wrap: wrap; gap: 8px; }
.checkbox-label { font-size: 12px; display: flex; align-items: center; gap: 4px; color: var(--text); cursor: pointer; } .checkbox-label { font-size: 12px; display: flex; align-items: center; gap: 4px; color: var(--text); cursor: pointer; }
.btn { padding: 8px 16px; font-weight: 600; font-size: 13px; width: 100%; margin-top: 8px; } .btn { padding: 8px 16px; font-weight: 600; font-size: 13px; width: 100%; margin-top: 8px; }

View File

@ -1,67 +1,145 @@
<template> <template>
<div class="test-runner"> <div class="test-runner">
<div v-if="!tests.length" class="empty">No tests created yet. Use the Test Builder tab.</div> <h3>Running Tests</h3>
<div v-for="t in tests" :key="t.id" class="test-card" :class="'state-' + t.state"> <div v-if="!tests.length" class="empty">No tests yet. Create one above and click "Create & Run Test".</div>
<div v-for="t in sortedTests" :key="t.id" class="test-card" :class="'state-' + t.state">
<div class="test-header"> <div class="test-header">
<div> <div class="test-title">
<strong>{{ t.type }}</strong> <strong>{{ t.type }}</strong>
<span class="test-state">{{ t.state }}</span> <span class="test-state" :class="'ts-' + t.state">{{ t.state }}</span>
<span v-if="t.frame_sizes" class="test-sizes">{{ t.frame_sizes.length }} frame sizes</span>
</div> </div>
<div class="test-actions"> <div class="test-actions">
<button v-if="t.state === 'idle'" class="btn-sm btn-go" @click="start(t.id)">Start</button> <button v-if="t.state === 'idle'" class="btn-sm btn-go" @click="start(t.id)">Start</button>
<button v-if="t.state === 'running'" class="btn-sm btn-stop" @click="stop(t.id)">Stop</button> <button v-if="t.state === 'running'" class="btn-sm btn-stop" @click="stop(t.id)">Stop</button>
<button v-if="t.state === 'complete' || t.state === 'error'" class="btn-sm btn-del" @click="del(t.id)">Remove</button>
</div> </div>
</div> </div>
<!-- RUNNING: live progress -->
<div v-if="t.state === 'running'" class="progress-section"> <div v-if="t.state === 'running'" class="progress-section">
<div class="progress-label">Running {{ t.type }} test...</div> <div class="progress-detail">
<span v-if="t.progress">{{ t.progress.message }}</span>
<span v-else>Starting...</span>
<span v-if="t.progress" class="progress-counter">
{{ (t.progress.completed_sizes || []).length }}/{{ t.progress.total_frames }} sizes done
</span>
</div>
<div class="progress-bar"> <div class="progress-bar">
<div class="progress-fill" :style="{ width: progressPct(t) + '%' }"></div> <div class="progress-fill" :style="{ width: progressPct(t) + '%' }"></div>
</div> </div>
<!-- Show partial results as they come in -->
<div v-if="t.results && Object.keys(t.results).length" class="partial-results">
<div v-for="(val, size) in t.results" :key="size" class="partial-item">
<span class="partial-size">{{ size }}B</span>
<span class="partial-val" v-if="!Array.isArray(val) && val.max_throughput_pps != null">{{ val.max_throughput_pps }} pps</span>
<span class="partial-val" v-else-if="!Array.isArray(val) && val.avg_ms != null">{{ val.avg_ms }}ms avg</span>
<span class="partial-val" v-else-if="!Array.isArray(val) && val.max_burst_frames != null">{{ val.max_burst_frames }} frames</span>
<span class="partial-val" v-else-if="Array.isArray(val)">{{ val.length }} rate steps</span>
</div>
</div>
</div> </div>
<div v-if="t.state === 'complete' && t.results" class="results-preview"> <!-- ERROR -->
<div v-if="t.state === 'error'" class="error-msg">{{ t.error || 'Test failed' }}</div>
<!-- COMPLETE: inline results summary -->
<div v-if="t.state === 'complete' && t.results && Object.keys(t.results).length" class="results-preview">
<!-- Frame Loss has a different structure (array per size) -->
<template v-if="t.type === 'frame_loss'">
<div v-for="(rates, size) in t.results" :key="size" class="fl-section">
<div class="fl-title">Frame Size: {{ size }} B</div>
<table> <table>
<thead>
<tr>
<th>Rate %</th>
<th>Rate (pps)</th>
<th>TX Packets</th>
<th>RX Packets</th>
<th>Loss %</th>
</tr>
</thead>
<tbody>
<tr v-for="r in rates" :key="r.rate_pct">
<td class="mono">{{ r.rate_pct }}%</td>
<td class="mono">{{ r.rate_pps }}</td>
<td class="mono">{{ r.tx_packets }}</td>
<td class="mono">{{ r.rx_packets }}</td>
<td class="mono">{{ r.loss_pct }}%</td>
</tr>
</tbody>
</table>
</div>
</template>
<!-- Other test types: single value per size -->
<table v-else>
<thead> <thead>
<tr> <tr>
<th>Frame Size</th> <th>Frame Size</th>
<th v-if="t.type === 'throughput'">Max Rate (pps)</th> <th v-if="t.type === 'throughput'">Max Rate (pps)</th>
<th v-if="t.type === 'latency'">Avg Latency (ms)</th> <th v-if="t.type === 'throughput'">Throughput</th>
<th v-if="t.type === 'frame_loss'">Loss @ Max %</th> <th v-if="t.type === 'latency'">Avg (ms)</th>
<th v-if="t.type === 'latency'">Min/Max (ms)</th>
<th v-if="t.type === 'back_to_back'">Max Burst</th> <th v-if="t.type === 'back_to_back'">Max Burst</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
<tr v-for="(val, size) in t.results" :key="size"> <tr v-for="(val, size) in t.results" :key="size">
<td>{{ size }} B</td> <td>{{ size }} B</td>
<td v-if="t.type === 'throughput'">{{ val.max_rate_pps || val }}</td> <td v-if="t.type === 'throughput'" class="mono">{{ val.max_throughput_pps || '-' }}</td>
<td v-if="t.type === 'latency'">{{ val.avg_ms || val }}</td> <td v-if="t.type === 'throughput'" class="mono">{{ formatMbps(val) }}</td>
<td v-if="t.type === 'frame_loss'">{{ val.loss_pct || val }}%</td> <td v-if="t.type === 'latency'" class="mono">{{ val.avg_ms != null ? val.avg_ms.toFixed(2) : '-' }}</td>
<td v-if="t.type === 'back_to_back'">{{ val.max_burst || val }}</td> <td v-if="t.type === 'latency'" class="mono">{{ val.min_ms != null ? val.min_ms.toFixed(2) + ' / ' + val.max_ms.toFixed(2) : '-' }}</td>
<td v-if="t.type === 'back_to_back'" class="mono">{{ val.max_burst_frames ?? '-' }}</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>
</div> </div>
<div class="test-meta"> <div class="test-meta">
<span>Created: {{ t.created_at || '-' }}</span>
<span v-if="t.started_at">Started: {{ t.started_at }}</span> <span v-if="t.started_at">Started: {{ t.started_at }}</span>
<span v-if="t.completed_at">Completed: {{ t.completed_at }}</span> <span v-if="t.completed_at">Completed: {{ t.completed_at }}</span>
<span v-if="t.state === 'running' && t.started_at">Elapsed: {{ elapsed(t) }}</span>
</div> </div>
</div> </div>
</div> </div>
</template> </template>
<script setup> <script setup>
import { computed } from 'vue'
import { api } from '../api.js' import { api } from '../api.js'
const props = defineProps({ tests: Array }) const props = defineProps({ tests: Array })
const emit = defineEmits(['refresh']) const emit = defineEmits(['refresh'])
const sortedTests = computed(() => {
const order = { running: 0, idle: 1, complete: 2, error: 3 }
return [...(props.tests || [])].sort((a, b) => (order[a.state] ?? 9) - (order[b.state] ?? 9))
})
function progressPct(t) { function progressPct(t) {
if (!t.results || !t.frame_sizes) return 20 if (!t.progress || !t.progress.total_frames) return 10
const done = Object.keys(t.results).length const done = (t.progress.completed_sizes || []).length
return Math.min(95, (done / t.frame_sizes.length) * 100) const partial = t.progress.frame_idx > done ? 0.5 : 0
return Math.min(95, ((done + partial) / t.progress.total_frames) * 100)
}
function formatMbps(val) {
const pps = val.max_throughput_pps || 0
const fs = val.frame_size || 64
if (!pps) return '-'
const mbps = (pps * fs * 8) / 1_000_000
return mbps.toFixed(1) + ' Mbps'
}
function elapsed(t) {
if (!t.started_at) return ''
const start = new Date(t.started_at).getTime()
const secs = Math.round((Date.now() - start) / 1000)
const m = Math.floor(secs / 60)
const s = secs % 60
return m > 0 ? `${m}m ${s}s` : `${s}s`
} }
async function start(id) { async function start(id) {
@ -70,28 +148,48 @@ async function start(id) {
async function stop(id) { async function stop(id) {
try { await api.stopTest(id); emit('refresh') } catch (e) { alert(e.message) } try { await api.stopTest(id); emit('refresh') } catch (e) { alert(e.message) }
} }
async function del(id) {
emit('refresh')
}
</script> </script>
<style scoped> <style scoped>
.empty { color: var(--muted); padding: 20px; text-align: center; } h3 { font-size: 15px; margin-bottom: 12px; color: var(--accent); }
.empty { color: var(--muted); padding: 16px; text-align: center; font-size: 13px; }
.test-card { background: var(--card-bg); border: 1px solid var(--border); border-radius: var(--radius); padding: 12px; margin-bottom: 10px; } .test-card { background: var(--card-bg); border: 1px solid var(--border); border-radius: var(--radius); padding: 12px; margin-bottom: 10px; }
.test-card.state-running { border-color: var(--accent); } .test-card.state-running { border-color: var(--accent); }
.test-card.state-complete { border-color: var(--success); } .test-card.state-complete { border-color: var(--success); }
.test-header { display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px; } .test-card.state-error { border-color: var(--danger); }
.test-header strong { font-size: 14px; text-transform: capitalize; } .test-header { display: flex; justify-content: space-between; align-items: center; }
.test-state { font-size: 11px; padding: 2px 8px; border-radius: 10px; margin-left: 8px; font-weight: 600; background: rgba(113,128,150,0.2); color: var(--muted); } .test-title { display: flex; align-items: center; gap: 8px; }
.state-running .test-state { background: rgba(79,156,249,0.15); color: var(--accent); } .test-title strong { font-size: 14px; text-transform: capitalize; }
.state-complete .test-state { background: rgba(72,187,120,0.15); color: var(--success); } .test-state { font-size: 11px; padding: 2px 8px; border-radius: 10px; font-weight: 600; }
.ts-idle { background: rgba(113,128,150,0.2); color: var(--muted); }
.ts-running { background: rgba(79,156,249,0.15); color: var(--accent); }
.ts-complete { background: rgba(72,187,120,0.15); color: var(--success); }
.ts-error { background: rgba(252,129,129,0.15); color: var(--danger); }
.test-sizes { font-size: 11px; color: var(--muted); }
.test-actions { display: flex; gap: 4px; } .test-actions { display: flex; gap: 4px; }
.btn-sm { padding: 3px 10px; font-size: 11px; font-weight: 600; border-radius: 6px; } .btn-sm { padding: 3px 10px; font-size: 11px; font-weight: 600; border-radius: 6px; }
.btn-go { background: var(--success); color: #fff; } .btn-go { background: var(--success); color: #fff; }
.btn-stop { background: var(--warning); color: #000; } .btn-stop { background: var(--warning); color: #000; }
.progress-section { margin: 8px 0; } .btn-del { background: rgba(252,129,129,0.15); color: var(--danger); }
.progress-label { font-size: 12px; color: var(--muted); margin-bottom: 4px; } .progress-section { margin: 10px 0; }
.progress-detail { display: flex; justify-content: space-between; font-size: 12px; color: var(--muted); margin-bottom: 6px; font-family: monospace; }
.progress-counter { color: var(--accent); font-weight: 600; }
.progress-bar { height: 6px; background: var(--border); border-radius: 3px; overflow: hidden; } .progress-bar { height: 6px; background: var(--border); border-radius: 3px; overflow: hidden; }
.progress-fill { height: 100%; background: var(--accent); border-radius: 3px; transition: width 0.5s; } .progress-fill { height: 100%; background: var(--accent); border-radius: 3px; transition: width 0.5s; }
.results-preview table { width: 100%; border-collapse: collapse; margin-top: 8px; } .partial-results { display: flex; gap: 8px; flex-wrap: wrap; margin-top: 8px; }
.partial-item { background: rgba(72,187,120,0.1); border: 1px solid rgba(72,187,120,0.2); padding: 2px 8px; border-radius: 6px; font-size: 11px; }
.partial-size { font-weight: 600; color: var(--success); }
.partial-val { color: var(--text); margin-left: 4px; font-family: monospace; }
.error-msg { color: var(--danger); font-size: 13px; padding: 8px 0; }
.results-preview { margin-top: 10px; }
.results-preview table { width: 100%; border-collapse: collapse; }
.results-preview th { font-size: 11px; color: var(--muted); text-align: left; padding: 4px 8px; border-bottom: 1px solid var(--border); } .results-preview th { font-size: 11px; color: var(--muted); text-align: left; padding: 4px 8px; border-bottom: 1px solid var(--border); }
.results-preview td { font-size: 13px; padding: 4px 8px; font-family: monospace; } .results-preview td { font-size: 13px; padding: 4px 8px; }
.mono { font-family: monospace; }
.fl-section { margin-bottom: 12px; }
.fl-title { font-size: 12px; font-weight: 600; color: var(--accent); margin-bottom: 4px; }
.test-meta { display: flex; gap: 12px; margin-top: 8px; font-size: 11px; color: var(--muted); } .test-meta { display: flex; gap: 12px; margin-top: 8px; font-size: 11px; color: var(--muted); }
</style> </style>

View File

@ -1,6 +1,6 @@
FROM python:3.11-slim FROM python:3.11-slim
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
tcpreplay libpcap-dev procps && rm -rf /var/lib/apt/lists/* tcpreplay libpcap-dev procps iputils-ping && rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir flask scapy psutil RUN pip install --no-cache-dir flask scapy psutil
COPY . /traffic-gen/ COPY . /traffic-gen/
WORKDIR /traffic-gen WORKDIR /traffic-gen

View File

@ -87,7 +87,7 @@ def build_packet(flow_config: dict, seq: int = 0):
# --- Layer 3 --- # --- Layer 3 ---
ip_kwargs = {'dst': flow_config['dst_ip']} ip_kwargs = {'dst': flow_config['dst_ip']}
src_ip = flow_config.get('src_ip') src_ip = flow_config.get('src_ip')
if src_ip: if src_ip and src_ip != 'auto':
ip_kwargs['src'] = src_ip ip_kwargs['src'] = src_ip
dscp = flow_config.get('dscp', 0) dscp = flow_config.get('dscp', 0)
@ -99,13 +99,13 @@ def build_packet(flow_config: dict, seq: int = 0):
# --- Layer 4 --- # --- Layer 4 ---
if protocol == 'udp': if protocol == 'udp':
src_port = flow_config.get('src_port', 12000) src_port = flow_config.get('src_port') or 12000
dst_port = flow_config.get('dst_port', 5001) dst_port = flow_config.get('dst_port') or 5001
pkt = pkt / UDP(sport=int(src_port), dport=int(dst_port)) pkt = pkt / UDP(sport=int(src_port), dport=int(dst_port))
header_overhead += 8 header_overhead += 8
elif protocol == 'tcp': elif protocol == 'tcp':
src_port = flow_config.get('src_port', 12000) src_port = flow_config.get('src_port') or 12000
dst_port = flow_config.get('dst_port', 80) dst_port = flow_config.get('dst_port') or 80
pkt = pkt / TCP(sport=int(src_port), dport=int(dst_port), flags='S') pkt = pkt / TCP(sport=int(src_port), dport=int(dst_port), flags='S')
header_overhead += 20 header_overhead += 20
elif protocol == 'icmp': elif protocol == 'icmp':

View File

@ -1,185 +1,204 @@
""" """
Responder - listens for TGEN-tagged packets and collects receive statistics. Responder - high-performance UDP packet receiver for TGEN traffic.
Two sub-modes: Uses multiple receiver threads on SO_REUSEPORT UDP sockets for parallel
- echo: swaps src/dst MAC and IP, sends packet back with receive timestamp packet processing. Each thread has its own socket and stats counters
- log: records rx stats only, exposed via API to avoid contention.
""" """
import logging import logging
import os
import socket
import struct import struct
import threading import threading
import time import time
from scapy.all import ( from engine.packet_builder import MAGIC, HEADER_LEN
AsyncSniffer, Ether, IP, Raw, send, conf,
)
from engine.packet_builder import MAGIC, HEADER_LEN, parse_payload
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
conf.verb = 0
DEFAULT_LISTEN_PORT = 5001
RECV_BUF_SIZE = 16 * 1024 * 1024
NUM_WORKERS = int(os.environ.get('RESPONDER_WORKERS', 4))
class _WorkerStats:
"""Per-worker stats — no sharing, no locks."""
__slots__ = ('rx_packets', 'rx_bytes', 'out_of_order', 'duplicates',
'last_seq', 'lat_buf', 'lat_idx', 'lat_count')
def __init__(self):
self.rx_packets = 0
self.rx_bytes = 0
self.out_of_order = 0
self.duplicates = 0
self.last_seq = -1
self.lat_buf = [0.0] * 4096
self.lat_idx = 0
self.lat_count = 0
class Responder: class Responder:
"""Listens for TGEN packets on an interface and collects stats.""" def __init__(self, mode: str = 'log', listen_port: int = DEFAULT_LISTEN_PORT):
def __init__(self, mode: str = 'log'):
"""
Args:
mode: 'echo' to reflect packets back, 'log' to only record stats.
"""
self._mode = mode self._mode = mode
self._lock = threading.Lock() self._listen_port = listen_port
self._sniffer = None self._sockets = []
self._threads = []
self._workers = []
self._running = False self._running = False
self._stop_event = threading.Event()
# Stats
self._rx_packets = 0
self._rx_bytes = 0
self._latency_samples = [] # list of (latency_ms,)
self._seen_seqs = set()
self._last_seq = -1
self._out_of_order = 0
self._duplicates = 0
# ------------------------------------------------------------------
# Control
# ------------------------------------------------------------------
def start(self, interface: str = None): def start(self, interface: str = None):
"""Start sniffing for TGEN packets."""
if self._running: if self._running:
log.warning('Responder already running')
return return
bpf_filter = 'ip' # broad filter; we check magic in callback self._stop_event.clear()
kwargs = { n = NUM_WORKERS
'prn': self._handle_packet,
'store': False,
'filter': bpf_filter,
}
if interface:
kwargs['iface'] = interface
self._sniffer = AsyncSniffer(**kwargs) for i in range(n):
self._sniffer.start() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, RECV_BUF_SIZE)
except OSError:
pass
sock.settimeout(0.5)
sock.bind(('0.0.0.0', self._listen_port))
self._sockets.append(sock)
ws = _WorkerStats()
self._workers.append(ws)
t = threading.Thread(target=self._recv_loop, args=(sock, ws),
daemon=True, name=f'responder-rx-{i}')
self._threads.append(t)
t.start()
actual_buf = self._sockets[0].getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
self._running = True self._running = True
log.info('Responder started on interface=%s mode=%s', interface or 'all', self._mode) log.info('Responder started on port=%d mode=%s workers=%d rcvbuf=%d',
self._listen_port, self._mode, n, actual_buf)
def stop(self): def stop(self):
"""Stop sniffing.""" self._stop_event.set()
if self._sniffer and self._running: for t in self._threads:
self._sniffer.stop() if t.is_alive():
t.join(timeout=3)
for s in self._sockets:
try:
s.close()
except Exception:
pass
self._sockets.clear()
self._threads.clear()
self._workers.clear()
self._running = False self._running = False
log.info('Responder stopped') log.info('Responder stopped')
def is_running(self) -> bool: def is_running(self) -> bool:
return self._running return self._running
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
def get_stats(self) -> dict: def get_stats(self) -> dict:
with self._lock: rx_packets = 0
rx_bytes = 0
out_of_order = 0
duplicates = 0
all_lat = []
for ws in self._workers:
rx_packets += ws.rx_packets
rx_bytes += ws.rx_bytes
out_of_order += ws.out_of_order
duplicates += ws.duplicates
n = min(ws.lat_count, len(ws.lat_buf))
if n > 0:
all_lat.extend(ws.lat_buf[:n] if ws.lat_count <= len(ws.lat_buf) else ws.lat_buf[:])
latency = {} latency = {}
if self._latency_samples: if all_lat:
vals = self._latency_samples avg = sum(all_lat) / len(all_lat)
mn = min(all_lat)
mx = max(all_lat)
jitter = 0.0
if len(all_lat) > 1:
jitter = sum(abs(all_lat[i] - all_lat[i-1]) for i in range(1, len(all_lat))) / (len(all_lat) - 1)
latency = { latency = {
'min_ms': round(min(vals), 3), 'min_ms': round(mn, 3),
'max_ms': round(max(vals), 3), 'max_ms': round(mx, 3),
'avg_ms': round(sum(vals) / len(vals), 3), 'avg_ms': round(avg, 3),
'jitter_ms': round( 'jitter_ms': round(jitter, 3),
sum(abs(vals[i] - vals[i - 1]) for i in range(1, len(vals))) / max(1, len(vals) - 1), 'samples': len(all_lat),
3
) if len(vals) > 1 else 0.0,
'samples': len(vals),
} }
return { return {
'rx_packets': self._rx_packets, 'rx_packets': rx_packets,
'rx_bytes': self._rx_bytes, 'rx_bytes': rx_bytes,
'out_of_order': self._out_of_order, 'out_of_order': out_of_order,
'duplicates': self._duplicates, 'duplicates': duplicates,
'latency': latency, 'latency': latency,
'running': self._running, 'running': self._running,
} }
def reset_stats(self): def reset_stats(self):
with self._lock: for ws in self._workers:
self._rx_packets = 0 ws.rx_packets = 0
self._rx_bytes = 0 ws.rx_bytes = 0
self._latency_samples = [] ws.last_seq = -1
self._seen_seqs.clear() ws.out_of_order = 0
self._last_seq = -1 ws.duplicates = 0
self._out_of_order = 0 ws.lat_idx = 0
self._duplicates = 0 ws.lat_count = 0
log.info('Responder stats reset') log.info('Responder stats reset')
# ------------------------------------------------------------------ def _recv_loop(self, sock, ws):
# Packet handling """Per-worker receive loop."""
# ------------------------------------------------------------------ echo = self._mode == 'echo'
recvfrom = sock.recvfrom
time_ns = time.time_ns
stop_is_set = self._stop_event.is_set
lat_buf = ws.lat_buf
lat_buf_len = len(lat_buf)
magic = MAGIC
def _handle_packet(self, pkt): while not stop_is_set():
"""Process a received packet, checking for TGEN magic bytes."""
if not pkt.haslayer(Raw):
return
payload = bytes(pkt[Raw].load)
parsed = parse_payload(payload)
if parsed is None:
return
seq, sender_ts_ns = parsed
rx_time_ns = time.time_ns()
pkt_len = len(bytes(pkt))
# Compute one-way latency (only meaningful if clocks are synced)
latency_ms = (rx_time_ns - sender_ts_ns) / 1_000_000
with self._lock:
self._rx_packets += 1
self._rx_bytes += pkt_len
# Duplicate detection
if seq in self._seen_seqs:
self._duplicates += 1
else:
self._seen_seqs.add(seq)
# Keep set bounded
if len(self._seen_seqs) > 100000:
# Remove oldest entries (approximate)
to_remove = sorted(self._seen_seqs)[:50000]
self._seen_seqs -= set(to_remove)
# Out-of-order detection
if seq < self._last_seq and seq not in self._seen_seqs:
self._out_of_order += 1
self._last_seq = seq
# Record latency (only if plausible: 0 < latency < 60s)
if 0 < latency_ms < 60000:
self._latency_samples.append(latency_ms)
if len(self._latency_samples) > 10000:
self._latency_samples = self._latency_samples[-5000:]
# Echo mode: swap and send back
if self._mode == 'echo' and pkt.haslayer(Ether) and pkt.haslayer(IP):
try: try:
echo_pkt = pkt.copy() data, addr = recvfrom(65535)
# Swap MACs except socket.timeout:
echo_pkt[Ether].src, echo_pkt[Ether].dst = pkt[Ether].dst, pkt[Ether].src continue
# Swap IPs except OSError:
echo_pkt[IP].src, echo_pkt[IP].dst = pkt[IP].dst, pkt[IP].src if stop_is_set():
# Append receive timestamp to payload break
rx_ts_bytes = struct.pack('!Q', rx_time_ns) raise
echo_pkt[Raw].load = payload + rx_ts_bytes
# Clear checksums so Scapy recalculates rx_ns = time_ns()
del echo_pkt[IP].chksum dlen = len(data)
if echo_pkt.haslayer('UDP'):
del echo_pkt['UDP'].chksum if dlen < HEADER_LEN or data[:4] != magic:
elif echo_pkt.haslayer('TCP'): continue
del echo_pkt['TCP'].chksum
send(echo_pkt[IP], verbose=0) seq = int.from_bytes(data[4:8], 'big')
except Exception as e: sender_ns = int.from_bytes(data[8:16], 'big')
log.debug('Echo send error: %s', e)
ws.rx_packets += 1
ws.rx_bytes += dlen
last = ws.last_seq
if seq == last:
ws.duplicates += 1
elif seq < last:
ws.out_of_order += 1
ws.last_seq = seq
lat_ms = (rx_ns - sender_ns) / 1_000_000
if 0 < lat_ms < 60000:
idx = ws.lat_idx
lat_buf[idx] = lat_ms
ws.lat_idx = (idx + 1) % lat_buf_len
ws.lat_count += 1
if echo:
try:
sock.sendto(data + struct.pack('!Q', rx_ns), addr)
except Exception:
pass

View File

@ -7,13 +7,17 @@ RFC 2544 test implementations:
""" """
import logging import logging
import socket
import struct
import threading import threading
import time import time
import urllib.request
import json
from typing import Dict, List, Optional from typing import Dict, List, Optional
from scapy.all import send, sr, conf, IP, ICMP from scapy.all import send, sr, conf, IP, ICMP
from engine.packet_builder import build_packet, parse_payload from engine.packet_builder import build_packet, parse_payload, MAGIC
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
conf.verb = 0 conf.verb = 0
@ -24,13 +28,14 @@ class _BaseTest:
def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int], def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int],
trial_duration: float = 60, max_rate_pps: int = 10000, trial_duration: float = 60, max_rate_pps: int = 10000,
acceptable_loss_pct: float = 0.0): acceptable_loss_pct: float = 0.0, responder_url: str = None):
self.test_id = test_id self.test_id = test_id
self.flow_config = dict(flow_config) self.flow_config = dict(flow_config)
self.frame_sizes = frame_sizes self.frame_sizes = frame_sizes
self.trial_duration = trial_duration self.trial_duration = trial_duration
self.max_rate_pps = max_rate_pps self.max_rate_pps = max_rate_pps
self.acceptable_loss_pct = acceptable_loss_pct self.acceptable_loss_pct = acceptable_loss_pct
self.responder_url = responder_url # e.g. "http://172.30.0.10:5053"
self.state = 'idle' # idle -> running -> complete/error self.state = 'idle' # idle -> running -> complete/error
self.results = {} self.results = {}
@ -38,6 +43,11 @@ class _BaseTest:
self.started_at = None self.started_at = None
self.completed_at = None self.completed_at = None
# Progress tracking
self._progress_msg = ''
self._current_frame_idx = 0
self._current_trial_tx = 0
self._thread: Optional[threading.Thread] = None self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._lock = threading.Lock() self._lock = threading.Lock()
@ -80,72 +90,155 @@ class _BaseTest:
def _is_stopped(self) -> bool: def _is_stopped(self) -> bool:
return self._stop_event.is_set() return self._stop_event.is_set()
def _responder_reset(self):
"""Reset responder stats before a trial."""
if not self.responder_url:
return
try:
req = urllib.request.Request(
f'{self.responder_url}/responder/reset', method='POST',
data=b'{}', headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=3)
except Exception as e:
log.warning('Responder reset failed: %s', e)
def _responder_stats(self) -> Optional[dict]:
"""Query responder for rx stats after a trial."""
if not self.responder_url:
return None
try:
req = urllib.request.Request(f'{self.responder_url}/responder/stats')
resp = urllib.request.urlopen(req, timeout=5)
return json.loads(resp.read())
except Exception as e:
log.warning('Responder stats query failed: %s', e)
return None
def _send_trial(self, frame_size: int, rate_pps: int, duration: float): def _send_trial(self, frame_size: int, rate_pps: int, duration: float):
"""Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies).""" """Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies)."""
flow = dict(self.flow_config) flow = dict(self.flow_config)
flow['frame_size'] = frame_size flow['frame_size'] = frame_size
protocol = flow.get('protocol', 'udp').lower()
# Reset responder counters before trial
self._responder_reset()
interval = 1.0 / rate_pps if rate_pps > 0 else 1.0
tx_count = 0 tx_count = 0
rx_count = 0 rx_count = 0
latencies = [] latencies = []
protocol = flow.get('protocol', 'udp').lower()
start = time.time() start = time.time()
seq = 0
if protocol == 'icmp':
# ICMP: use sr() to measure latency from responses
seq = 0
while (time.time() - start) < duration and not self._is_stopped(): while (time.time() - start) < duration and not self._is_stopped():
pkt = build_packet(flow, seq=seq) pkt = build_packet(flow, seq=seq)
seq += 1 seq += 1
if protocol == 'icmp':
# Use sr() for ICMP to get responses
answered, _ = sr(pkt[IP], timeout=1, verbose=0) answered, _ = sr(pkt[IP], timeout=1, verbose=0)
tx_count += 1 tx_count += 1
for sent_pkt, recv_pkt in answered: for sent_pkt, recv_pkt in answered:
rx_count += 1 rx_count += 1
rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000 rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000
latencies.append(rtt_ms) latencies.append(rtt_ms)
else:
send(pkt[IP], verbose=0)
tx_count += 1
# Rate limiting
elapsed = time.time() - start elapsed = time.time() - start
expected_sent = elapsed * rate_pps expected = elapsed * rate_pps
if tx_count > expected_sent: if tx_count > expected:
sleep_time = (tx_count - expected_sent) / rate_pps sleep_time = (tx_count - expected) / rate_pps
if sleep_time > 0: if sleep_time > 0:
self._stop_event.wait(min(sleep_time, 0.1)) self._stop_event.wait(min(sleep_time, 0.1))
else:
# UDP/TCP: high-performance raw socket path
dst_ip = flow['dst_ip']
pkt_template = build_packet(flow, seq=0)
ip_template = bytes(pkt_template[pkt_template.firstlayer().payload.__class__])
magic_offset = ip_template.find(MAGIC)
# Find and zero UDP checksum in template so receivers accept packets
# IP header length from IHL field (byte 0, low nibble) * 4
ip_ihl = (ip_template[0] & 0x0F) * 4
ip_proto = ip_template[9] # protocol field
udp_csum_offset = ip_ihl + 6 if ip_proto == 17 else -1 # 17 = UDP
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
batch_size = max(1, min(rate_pps // 5, 500))
interval = batch_size / rate_pps if rate_pps > 0 else 1.0
seq = 0
try:
while (time.time() - start) < duration and not self._is_stopped():
batch_start = time.time()
for _ in range(batch_size):
pkt_bytes = bytearray(ip_template)
if magic_offset >= 0:
struct.pack_into('!I', pkt_bytes, magic_offset + 4, seq)
struct.pack_into('!Q', pkt_bytes, magic_offset + 8, time.time_ns())
pkt_bytes[10:12] = b'\x00\x00' # zero IP checksum
if udp_csum_offset > 0:
pkt_bytes[udp_csum_offset:udp_csum_offset + 2] = b'\x00\x00'
try:
raw_sock.sendto(bytes(pkt_bytes), (dst_ip, 0))
tx_count += 1
except Exception:
pass
seq += 1
batch_elapsed = time.time() - batch_start
sleep_time = interval - batch_elapsed
if sleep_time > 0:
self._stop_event.wait(sleep_time)
finally:
raw_sock.close()
# Query responder for actual rx stats (UDP/TCP path)
if protocol != 'icmp':
resp_stats = self._responder_stats()
if resp_stats and resp_stats.get('rx_packets', 0) > 0:
rx_count = resp_stats['rx_packets']
lat = resp_stats.get('latency', {})
if lat.get('samples', 0) > 0:
latencies = [lat['avg_ms']] # Use avg as representative
# For non-ICMP, we can't easily measure rx without a responder.
# rx_count stays 0 for UDP/TCP unless a responder is configured.
return tx_count, rx_count, latencies return tx_count, rx_count, latencies
def get_info(self) -> dict: def get_info(self) -> dict:
return { # Reverse-lookup the slug for this test class
type_slug = next((k for k, v in TEST_TYPES.items() if v is self.__class__), self.__class__.__name__)
info = {
'id': self.test_id,
'test_id': self.test_id, 'test_id': self.test_id,
'type': self.__class__.__name__, 'type': type_slug,
'state': self.state, 'state': self.state,
'results': self.results, 'results': self.results,
'error': self.error, 'error': self.error,
'frame_sizes': self.frame_sizes,
'started_at': self.started_at, 'started_at': self.started_at,
'completed_at': self.completed_at, 'completed_at': self.completed_at,
} }
if self.state == 'running':
info['progress'] = {
'frame_idx': self._current_frame_idx,
'total_frames': len(self.frame_sizes),
'message': self._progress_msg,
'completed_sizes': list(self.results.keys()),
}
return info
class ThroughputTest(_BaseTest): class ThroughputTest(_BaseTest):
"""Binary search for maximum throughput with acceptable loss.""" """Binary search for maximum throughput with acceptable loss."""
def _run(self): def _run(self):
for fs in self.frame_sizes: for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped(): if self._is_stopped():
break break
self._current_frame_idx = idx
low = 0 low = 0
high = self.max_rate_pps high = self.max_rate_pps
best_rate = 0 best_rate = 0
convergence_threshold = max(1, int(self.max_rate_pps * 0.01)) convergence_threshold = max(1, int(self.max_rate_pps * 0.01))
step = 0
log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high) log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high)
@ -153,19 +246,17 @@ class ThroughputTest(_BaseTest):
mid = (low + high) // 2 mid = (low + high) // 2
if mid == 0: if mid == 0:
break break
step += 1
self._progress_msg = f'Frame {fs}B: trial {step}, testing {mid} pps [{low}-{high}]'
tx, rx, _ = self._send_trial(fs, mid, self.trial_duration) tx, rx, _ = self._send_trial(fs, mid, self.trial_duration)
if tx == 0: if tx == 0:
loss_pct = 100.0 loss_pct = 100.0
else: elif rx > 0:
# For ICMP we have rx; for UDP assume zero loss if no responder
protocol = self.flow_config.get('protocol', 'udp').lower()
if protocol == 'icmp':
loss_pct = ((tx - rx) / tx) * 100 loss_pct = ((tx - rx) / tx) * 100
else: else:
# Without responder, assume success (user should use responder for accurate test) loss_pct = 0.0 # No responder/ICMP — assume success
loss_pct = 0.0
log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%', log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%',
fs, mid, tx, rx, loss_pct) fs, mid, tx, rx, loss_pct)
@ -189,10 +280,12 @@ class LatencyTest(_BaseTest):
def _run(self): def _run(self):
rate = self.flow_config.get('rate_pps', 100) rate = self.flow_config.get('rate_pps', 100)
for fs in self.frame_sizes: for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped(): if self._is_stopped():
break break
self._current_frame_idx = idx
self._progress_msg = f'Frame {fs}B: sending at {rate} pps for {self.trial_duration}s'
log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration) log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration)
_, _, latencies = self._send_trial(fs, rate, self.trial_duration) _, _, latencies = self._send_trial(fs, rate, self.trial_duration)
@ -227,10 +320,11 @@ class FrameLossTest(_BaseTest):
"""Measure frame loss at decreasing rates (100%, 90%, 80%, ...).""" """Measure frame loss at decreasing rates (100%, 90%, 80%, ...)."""
def _run(self): def _run(self):
for fs in self.frame_sizes: for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped(): if self._is_stopped():
break break
self._current_frame_idx = idx
results_for_size = [] results_for_size = []
for pct in range(100, 0, -10): for pct in range(100, 0, -10):
if self._is_stopped(): if self._is_stopped():
@ -240,14 +334,16 @@ class FrameLossTest(_BaseTest):
if rate == 0: if rate == 0:
continue continue
self._progress_msg = f'Frame {fs}B: testing at {pct}% rate ({rate} pps)'
log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct) log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct)
tx, rx, _ = self._send_trial(fs, rate, self.trial_duration) tx, rx, _ = self._send_trial(fs, rate, self.trial_duration)
protocol = self.flow_config.get('protocol', 'udp').lower() if tx > 0 and rx > 0:
if tx > 0 and protocol == 'icmp':
loss_pct = ((tx - rx) / tx) * 100 loss_pct = ((tx - rx) / tx) * 100
elif tx > 0 and rx == 0:
loss_pct = 0.0 # No responder — cannot measure
else: else:
loss_pct = 0.0 # Cannot measure without responder for non-ICMP loss_pct = 100.0
results_for_size.append({ results_for_size.append({
'rate_pct': pct, 'rate_pct': pct,
@ -264,7 +360,7 @@ class BackToBackTest(_BaseTest):
"""Find maximum burst length with zero loss.""" """Find maximum burst length with zero loss."""
def _run(self): def _run(self):
for fs in self.frame_sizes: for idx, fs in enumerate(self.frame_sizes):
if self._is_stopped(): if self._is_stopped():
break break
@ -273,6 +369,7 @@ class BackToBackTest(_BaseTest):
best_burst = 0 best_burst = 0
convergence = max(1, high // 100) convergence = max(1, high // 100)
self._current_frame_idx = idx
log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high) log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high)
while (high - low) > convergence and not self._is_stopped(): while (high - low) > convergence and not self._is_stopped():
@ -299,10 +396,12 @@ class BackToBackTest(_BaseTest):
send(pkt[IP], verbose=0) send(pkt[IP], verbose=0)
tx_count += 1 tx_count += 1
if protocol == 'icmp' and tx_count > 0: if tx_count > 0 and rx_count > 0:
loss_pct = ((tx_count - rx_count) / tx_count) * 100 loss_pct = ((tx_count - rx_count) / tx_count) * 100
elif tx_count > 0:
loss_pct = 0.0 # No responder — cannot measure
else: else:
loss_pct = 0.0 # Can't measure without responder loss_pct = 100.0
log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct) log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct)

View File

@ -4,6 +4,8 @@ FlowSender - manages traffic generation with background threads per flow.
import logging import logging
import shutil import shutil
import socket
import struct
import threading import threading
import time import time
import urllib.request import urllib.request
@ -11,7 +13,7 @@ import json
from scapy.all import send, sendpfast, sr, conf from scapy.all import send, sendpfast, sr, conf
from engine.packet_builder import build_packet, stamp_payload from engine.packet_builder import build_packet, stamp_payload, MAGIC, HEADER_LEN
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -139,14 +141,113 @@ class FlowSender:
pkt_template = build_packet(flow, seq=0) pkt_template = build_packet(flow, seq=0)
pkt_bytes_len = len(bytes(pkt_template)) pkt_bytes_len = len(bytes(pkt_template))
# Calculate sleep interval: send in batches for efficiency
batch_size = max(1, min(rate_pps // 10, 100))
interval = batch_size / rate_pps if rate_pps > 0 else 1.0
seq = 0 seq = 0
start_time = time.time() start_time = time.time()
last_responder_poll = 0 last_responder_poll = 0
log.info('Flow %s: starting send loop at %d pps for %ds', flow_id[:8], rate_pps, duration) log.info('Flow %s: starting send loop at %d pps for %ds',
flow_id[:8], rate_pps, duration)
# Capture responder baseline so we report deltas, not cumulative totals
responder_baseline_rx = 0
responder_baseline_bytes = 0
if responder_url:
try:
base = self._fetch_responder(responder_url)
responder_baseline_rx = base.get('rx_packets', 0)
responder_baseline_bytes = base.get('rx_bytes', 0)
# Also reset responder so baseline is clean
self._reset_responder(responder_url)
responder_baseline_rx = 0
responder_baseline_bytes = 0
except Exception:
pass
raw_sock = None
try:
if use_icmp_sr:
self._send_loop_icmp(flow_id, flow, stop_event, rate_pps, duration)
return
# --- High-performance path: raw socket ---
dst_ip = flow['dst_ip']
# Build template as raw IP bytes (strip Ethernet layer)
ip_template = bytes(pkt_template[pkt_template.firstlayer().payload.__class__])
# Find where TGEN magic starts in the IP-layer bytes
magic_offset = ip_template.find(MAGIC)
# Find and zero UDP checksum offset in template
ip_ihl = (ip_template[0] & 0x0F) * 4
ip_proto = ip_template[9]
udp_csum_offset = ip_ihl + 6 if ip_proto == 17 else -1 # 17 = UDP
raw_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_RAW)
raw_sock.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# Adaptive batching: send bursts then sleep to hit target rate
batch_size = max(1, min(rate_pps // 5, 500))
interval = batch_size / rate_pps if rate_pps > 0 else 1.0
while not stop_event.is_set():
elapsed = time.time() - start_time
if duration and elapsed >= duration:
break
batch_start = time.time()
sent_this_batch = 0
for _ in range(batch_size):
pkt_bytes = bytearray(ip_template)
if magic_offset >= 0:
struct.pack_into('!I', pkt_bytes, magic_offset + 4, seq)
struct.pack_into('!Q', pkt_bytes, magic_offset + 8, time.time_ns())
pkt_bytes[10:12] = b'\x00\x00' # zero IP checksum
if udp_csum_offset > 0:
pkt_bytes[udp_csum_offset:udp_csum_offset + 2] = b'\x00\x00'
try:
raw_sock.sendto(bytes(pkt_bytes), (dst_ip, 0))
sent_this_batch += 1
except Exception:
pass
seq += 1
with self._lock:
stats = self._stats.get(flow_id)
if stats:
stats['tx_packets'] += sent_this_batch
stats['tx_bytes'] += pkt_bytes_len * sent_this_batch
# Poll responder for rx stats periodically
if responder_url and (time.time() - last_responder_poll) >= 1.0:
self._poll_responder(flow_id, responder_url,
responder_baseline_rx, responder_baseline_bytes)
last_responder_poll = time.time()
# Precise rate limiting: sleep remaining time for this batch
batch_elapsed = time.time() - batch_start
sleep_time = interval - batch_elapsed
if sleep_time > 0:
stop_event.wait(sleep_time)
except Exception as e:
log.error('Flow %s: send loop error: %s', flow_id[:8], e)
finally:
if raw_sock:
raw_sock.close()
with self._lock:
if flow_id in self._flows:
self._flows[flow_id]['state'] = 'stopped'
# Final responder poll
if responder_url:
self._poll_responder(flow_id, responder_url,
responder_baseline_rx, responder_baseline_bytes)
log.info('Flow %s: send loop finished. seq=%d', flow_id[:8], seq)
def _send_loop_icmp(self, flow_id, flow, stop_event, rate_pps, duration):
"""ICMP mode: use sr() to measure latency from router responses."""
pkt_template = build_packet(flow, seq=0)
pkt_bytes_len = len(bytes(pkt_template))
seq = 0
start_time = time.time()
try: try:
while not stop_event.is_set(): while not stop_event.is_set():
@ -154,8 +255,6 @@ class FlowSender:
if duration and elapsed >= duration: if duration and elapsed >= duration:
break break
if use_icmp_sr:
# ICMP mode: use sr() to measure latency from responses
pkt = build_packet(flow, seq=seq) pkt = build_packet(flow, seq=seq)
answered, _ = sr(pkt[pkt.firstlayer().payload.__class__], answered, _ = sr(pkt[pkt.firstlayer().payload.__class__],
timeout=1, verbose=0) timeout=1, verbose=0)
@ -169,75 +268,47 @@ class FlowSender:
stats['rx_packets'] += 1 stats['rx_packets'] += 1
stats['rx_bytes'] += len(bytes(recv_pkt)) stats['rx_bytes'] += len(bytes(recv_pkt))
stats['latency_samples'].append(rtt_ms) stats['latency_samples'].append(rtt_ms)
# Keep only last 1000 samples
if len(stats['latency_samples']) > 1000: if len(stats['latency_samples']) > 1000:
stats['latency_samples'] = stats['latency_samples'][-1000:] stats['latency_samples'] = stats['latency_samples'][-1000:]
seq += 1 seq += 1
# Rate limit for ICMP
sleep_time = (1.0 / rate_pps) - (time.time() - start_time - elapsed) sleep_time = (1.0 / rate_pps) - (time.time() - start_time - elapsed)
if sleep_time > 0: if sleep_time > 0:
stop_event.wait(sleep_time) stop_event.wait(sleep_time)
else:
# UDP/TCP mode: send batches
packets = []
for _ in range(batch_size):
pkt = build_packet(flow, seq=seq)
packets.append(pkt)
seq += 1
try:
if HAS_TCPREPLAY and rate_pps >= 1000:
sendpfast(packets, pps=rate_pps, loop=0)
else:
for p in packets:
send(p[p.firstlayer().payload.__class__], verbose=0)
except Exception as e: except Exception as e:
# Fallback: basic send log.error('Flow %s: ICMP send error: %s', flow_id[:8], e)
log.debug('Send error (falling back): %s', e)
for p in packets:
try:
send(p[p.firstlayer().payload.__class__], verbose=0)
except Exception:
pass
with self._lock:
stats = self._stats.get(flow_id)
if stats:
stats['tx_packets'] += len(packets)
stats['tx_bytes'] += pkt_bytes_len * len(packets)
# Poll responder for rx stats periodically
if responder_url and (time.time() - last_responder_poll) >= 2.0:
self._poll_responder(flow_id, responder_url)
last_responder_poll = time.time()
# Rate limit
stop_event.wait(interval)
except Exception as e:
log.error('Flow %s: send loop error: %s', flow_id[:8], e)
finally: finally:
with self._lock: with self._lock:
if flow_id in self._flows: if flow_id in self._flows:
self._flows[flow_id]['state'] = 'stopped' self._flows[flow_id]['state'] = 'stopped'
# Final responder poll
if responder_url:
self._poll_responder(flow_id, responder_url)
log.info('Flow %s: send loop finished. seq=%d', flow_id[:8], seq)
def _poll_responder(self, flow_id: str, responder_url: str): def _fetch_responder(self, responder_url: str) -> dict:
"""Poll a responder's /responder/stats endpoint for rx metrics.""" """Fetch raw stats from the responder."""
try:
url = responder_url.rstrip('/') + '/responder/stats' url = responder_url.rstrip('/') + '/responder/stats'
req = urllib.request.Request(url, method='GET') req = urllib.request.Request(url, method='GET')
req.add_header('Accept', 'application/json') req.add_header('Accept', 'application/json')
with urllib.request.urlopen(req, timeout=2) as resp: with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read().decode()) return json.loads(resp.read().decode())
def _reset_responder(self, responder_url: str):
"""Reset responder counters."""
url = responder_url.rstrip('/') + '/responder/reset'
req = urllib.request.Request(url, method='POST')
req.add_header('Content-Type', 'application/json')
with urllib.request.urlopen(req, timeout=2) as resp:
resp.read()
def _poll_responder(self, flow_id: str, responder_url: str,
baseline_rx: int = 0, baseline_bytes: int = 0):
"""Poll a responder's /responder/stats endpoint for rx metrics."""
try:
data = self._fetch_responder(responder_url)
rx_pkts = data.get('rx_packets', 0) - baseline_rx
rx_bytes = data.get('rx_bytes', 0) - baseline_bytes
with self._lock: with self._lock:
stats = self._stats.get(flow_id) stats = self._stats.get(flow_id)
if stats: if stats:
stats['rx_packets'] = data.get('rx_packets', 0) stats['rx_packets'] = max(0, rx_pkts)
stats['rx_bytes'] = data.get('rx_bytes', 0) stats['rx_bytes'] = max(0, rx_bytes)
lat = data.get('latency', {}) lat = data.get('latency', {})
if lat.get('avg_ms') is not None: if lat.get('avg_ms') is not None:
stats['latency_samples'].append(lat['avg_ms']) stats['latency_samples'].append(lat['avg_ms'])

View File

@ -56,9 +56,14 @@ class StatsCollector:
else: else:
tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0 tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0
# Loss calculation # Loss calculation: use rate-based when actively sending (avoids
# poll-lag artifacts), cumulative when flow has stopped
loss_pct = 0.0 loss_pct = 0.0
if tx_packets > 0 and rx_packets > 0: if prev is not None and tx_pps > 0 and rx_pps > 0:
# Rate-based: compare instantaneous tx/rx rates
loss_pct = max(0.0, ((tx_pps - rx_pps) / tx_pps) * 100)
elif tx_packets > 0 and rx_packets > 0 and tx_pps == 0:
# Flow stopped: use cumulative counters (final accurate value)
loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100) loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100)
sample = { sample = {

View File

@ -78,6 +78,7 @@ _tests_lock = threading.Lock()
_responder = None # Responder instance _responder = None # Responder instance
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helper # Helper
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -86,6 +87,7 @@ def _now_iso():
return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
def _flow_response(flow_id: str) -> dict: def _flow_response(flow_id: str) -> dict:
"""Build a serializable flow dict.""" """Build a serializable flow dict."""
with _flows_lock: with _flows_lock:
@ -94,8 +96,15 @@ def _flow_response(flow_id: str) -> dict:
return None return None
result = dict(meta) result = dict(meta)
if _sender: if _sender:
result['is_running'] = _sender.is_running(flow_id) running = _sender.is_running(flow_id)
result['is_running'] = running
result['stats'] = _sender.get_stats(flow_id) result['stats'] = _sender.get_stats(flow_id)
# Sync state: if sender thread finished but meta still says running
if not running and result.get('state') == 'running':
with _flows_lock:
if flow_id in _flows_meta:
_flows_meta[flow_id]['state'] = 'stopped'
result['state'] = 'stopped'
return result return result
@ -148,6 +157,114 @@ def get_mode():
return jsonify({'mode': MODE}) return jsonify({'mode': MODE})
@app.route('/mode', methods=['POST'])
def set_mode():
global MODE, _sender, _stats_collector, _responder
data = request.get_json(force=True)
new_mode = data.get('mode', '').lower()
if new_mode not in ('sender', 'responder'):
return jsonify({'error': 'mode must be "sender" or "responder"'}), 400
if new_mode == MODE:
return jsonify({'mode': MODE, 'changed': False})
listen_iface = os.environ.get('TRAFFIC_GEN_INTERFACE', None)
responder_sub_mode = os.environ.get('TRAFFIC_GEN_RESPONDER_MODE', 'log')
# Tear down current mode
if MODE == 'sender':
# Stop all running flows
if _sender:
for fid in list(_sender.get_all_flows().keys()):
_sender.stop(fid)
with _flows_lock:
_flows_meta.clear()
with _tests_lock:
for t in _tests.values():
if t.state == 'running':
t.stop()
_tests.clear()
elif MODE == 'responder':
if _responder:
_responder.stop()
_responder = None
# Start new mode
if new_mode == 'sender':
from engine.sender import FlowSender
from engine.stats import StatsCollector
_sender = FlowSender()
_stats_collector = StatsCollector()
_responder = None
elif new_mode == 'responder':
from engine.responder import Responder
_sender = None
_stats_collector = None
_responder = Responder(mode=responder_sub_mode)
_responder.start(interface=listen_iface)
MODE = new_mode
log.info('Mode switched to %s', MODE)
return jsonify({'mode': MODE, 'changed': True})
# ---------------------------------------------------------------------------
# Quick Ping (works in any mode)
# ---------------------------------------------------------------------------
@app.route('/ping', methods=['POST'])
def quick_ping():
"""Send ICMP pings to a target and return results."""
import subprocess
data = request.get_json(force=True)
target = data.get('target', '').strip()
count = min(int(data.get('count', 5)), 20)
if not target:
return jsonify({'error': 'target is required'}), 400
try:
result = subprocess.run(
['ping', '-c', str(count), '-W', '2', target],
capture_output=True, text=True, timeout=count * 3 + 5
)
output = result.stdout + result.stderr
# Parse ping output
lines = output.strip().split('\n')
replies = []
for line in lines:
if 'time=' in line:
try:
time_ms = float(line.split('time=')[1].split(' ')[0])
replies.append(time_ms)
except (IndexError, ValueError):
pass
stats = {}
if replies:
stats = {
'min_ms': round(min(replies), 2),
'avg_ms': round(sum(replies) / len(replies), 2),
'max_ms': round(max(replies), 2),
}
return jsonify({
'target': target,
'sent': count,
'received': len(replies),
'loss_pct': round(((count - len(replies)) / count) * 100, 1),
'replies': replies,
'stats': stats,
'reachable': len(replies) > 0,
})
except subprocess.TimeoutExpired:
return jsonify({'target': target, 'error': 'Ping timed out', 'reachable': False})
except Exception as e:
return jsonify({'target': target, 'error': str(e), 'reachable': False})
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Sender-mode: Flow endpoints # Sender-mode: Flow endpoints
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -193,7 +310,7 @@ def create_flow():
'dscp': int(data.get('dscp', 0)), 'dscp': int(data.get('dscp', 0)),
'vlan_id': data.get('vlan_id'), 'vlan_id': data.get('vlan_id'),
'state': 'idle', 'state': 'idle',
'responder_url': data.get('responder_url'), 'responder_url': data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
'created_at': _now_iso(), 'created_at': _now_iso(),
} }
@ -335,34 +452,51 @@ def create_test():
from engine.rfc2544 import create_test as _create_test from engine.rfc2544 import create_test as _create_test
data = request.get_json(force=True) data = request.get_json(force=True)
flow_id = data.get('flow_id')
test_type = data.get('type') test_type = data.get('type')
if not flow_id or not test_type: if not test_type:
return jsonify({'error': 'flow_id and type are required'}), 400 return jsonify({'error': 'type is required'}), 400
# Accept either flow_config directly or flow_id to look up
flow_config = data.get('flow_config')
flow_id = data.get('flow_id')
if flow_config:
# Direct flow config provided — no flow_id needed
if not flow_config.get('dst_ip'):
return jsonify({'error': 'flow_config.dst_ip is required'}), 400
flow_config.setdefault('src_ip', 'auto')
flow_config.setdefault('protocol', 'udp')
flow_config.setdefault('src_port', 50000)
flow_config.setdefault('dst_port', 5001)
elif flow_id:
with _flows_lock: with _flows_lock:
flow_meta = _flows_meta.get(flow_id) flow_meta = _flows_meta.get(flow_id)
if flow_meta is None: if flow_meta is None:
return jsonify({'error': 'Flow not found'}), 404 return jsonify({'error': 'Flow not found'}), 404
flow_config = dict(flow_meta)
else:
return jsonify({'error': 'flow_config or flow_id is required'}), 400
test_id = str(uuid.uuid4()) test_id = str(uuid.uuid4())
kwargs = { kwargs = {
'frame_sizes': data.get('frame_sizes', [64, 512, 1518]), 'frame_sizes': data.get('frame_sizes', [64, 512, 1518]),
'trial_duration': float(data.get('trial_duration', 60)), 'trial_duration': float(data.get('trial_duration', 60)),
'max_rate_pps': int(data.get('max_rate_pps', flow_meta.get('rate_pps', 10000))), 'max_rate_pps': int(data.get('max_rate_pps', flow_config.get('rate_pps', 10000))),
'acceptable_loss_pct': float(data.get('acceptable_loss_pct', 0.0)), 'acceptable_loss_pct': float(data.get('acceptable_loss_pct', 0.0)),
'responder_url': data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
} }
try: try:
test = _create_test(test_id, test_type, dict(flow_meta), **kwargs) test = _create_test(test_id, test_type, flow_config, **kwargs)
except ValueError as e: except ValueError as e:
return jsonify({'error': str(e)}), 400 return jsonify({'error': str(e)}), 400
with _tests_lock: with _tests_lock:
_tests[test_id] = test _tests[test_id] = test
log.info('Created test %s (type=%s, flow=%s)', test_id[:8], test_type, flow_id[:8]) log.info('Created test %s (type=%s, dst=%s)', test_id[:8], test_type,
flow_config.get('dst_ip', '?'))
return jsonify(test.get_info()), 201 return jsonify(test.get_info()), 201
@ -483,7 +617,7 @@ def load_preset(name):
'dscp': int(flow_data.get('dscp', 0)), 'dscp': int(flow_data.get('dscp', 0)),
'vlan_id': flow_data.get('vlan_id'), 'vlan_id': flow_data.get('vlan_id'),
'state': 'idle', 'state': 'idle',
'responder_url': flow_data.get('responder_url'), 'responder_url': flow_data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
'created_at': _now_iso(), 'created_at': _now_iso(),
} }