791 lines
26 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Traffic Generator API Server
Scapy-based traffic generator with a Flask API. Runs in two modes
controlled by TRAFFIC_GEN_MODE env var:
- sender (default): generates traffic, runs RFC 2544 tests
- responder: listens for test packets, echoes/timestamps, reports rx stats
API endpoints:
GET /healthz - health + active flows/tests count
GET /interfaces - list available network interfaces
GET /mode - current mode
Sender mode:
GET /flows - list all flows
POST /flows - create flow
GET /flows/<id> - get flow details + stats
PUT /flows/<id> - update flow (only if idle)
DELETE /flows/<id> - delete flow
POST /flows/<id>/start - start sending
POST /flows/<id>/stop - stop sending
GET /flows/<id>/stats - real-time stats
GET /tests - list all tests
POST /tests - create RFC 2544 test
GET /tests/<id> - test details + results
POST /tests/<id>/start - start test
POST /tests/<id>/stop - abort test
GET /tests/<id>/results - exportable results
GET /presets - list presets
POST /presets/<name> - create flow/test from preset
GET /stats/history - historical stats for all flows
Responder mode:
GET /responder/stats - receive statistics
POST /responder/reset - reset receive counters
"""
import sys
import os
import json
import threading
import time
import logging
import uuid
import psutil
from flask import Flask, request, jsonify
# --- logging to stderr so stdout stays clean ---
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format='%(asctime)s [TGEN] %(levelname)s %(message)s',
)
log = logging.getLogger(__name__)
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Global state (set in main())
# ---------------------------------------------------------------------------
MODE = 'sender' # or 'responder'
# Sender-mode globals
_sender = None # FlowSender instance
_stats_collector = None # StatsCollector instance
_flows_meta = {} # flow_id -> metadata dict (includes config)
_flows_lock = threading.Lock()
_tests = {} # test_id -> RFC 2544 test instance
_tests_lock = threading.Lock()
# Responder-mode globals
_responder = None # Responder instance
# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------
def _now_iso():
return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
def _flow_response(flow_id: str) -> dict:
"""Build a serializable flow dict."""
with _flows_lock:
meta = _flows_meta.get(flow_id)
if meta is None:
return None
result = dict(meta)
if _sender:
running = _sender.is_running(flow_id)
result['is_running'] = running
result['stats'] = _sender.get_stats(flow_id)
# Sync state: if sender thread finished but meta still says running
if not running and result.get('state') == 'running':
with _flows_lock:
if flow_id in _flows_meta:
_flows_meta[flow_id]['state'] = 'stopped'
result['state'] = 'stopped'
return result
# ---------------------------------------------------------------------------
# Common endpoints
# ---------------------------------------------------------------------------
@app.route('/healthz')
def health():
with _flows_lock:
active_flows = sum(
1 for fid in _flows_meta
if _sender and _sender.is_running(fid)
) if MODE == 'sender' else 0
with _tests_lock:
active_tests = sum(
1 for t in _tests.values() if t.state == 'running'
) if MODE == 'sender' else 0
resp = {
'status': 'ok',
'mode': MODE,
'active_flows': active_flows,
'active_tests': active_tests,
}
if MODE == 'responder' and _responder:
resp['responder_running'] = _responder.is_running()
return jsonify(resp)
@app.route('/interfaces')
def list_interfaces():
ifaces = []
addrs = psutil.net_if_addrs()
stats = psutil.net_if_stats()
for name, addr_list in addrs.items():
info = {'name': name, 'ip': None, 'mac': None, 'is_up': False}
for addr in addr_list:
if addr.family.name == 'AF_INET':
info['ip'] = addr.address
elif addr.family.name == 'AF_PACKET':
info['mac'] = addr.address
if name in stats:
info['is_up'] = stats[name].isup
ifaces.append(info)
return jsonify({'interfaces': ifaces})
@app.route('/mode')
def get_mode():
return jsonify({'mode': MODE})
@app.route('/mode', methods=['POST'])
def set_mode():
global MODE, _sender, _stats_collector, _responder
data = request.get_json(force=True)
new_mode = data.get('mode', '').lower()
if new_mode not in ('sender', 'responder'):
return jsonify({'error': 'mode must be "sender" or "responder"'}), 400
if new_mode == MODE:
return jsonify({'mode': MODE, 'changed': False})
listen_iface = os.environ.get('TRAFFIC_GEN_INTERFACE', None)
responder_sub_mode = os.environ.get('TRAFFIC_GEN_RESPONDER_MODE', 'log')
# Tear down current mode
if MODE == 'sender':
# Stop all running flows
if _sender:
for fid in list(_sender.get_all_flows().keys()):
_sender.stop(fid)
with _flows_lock:
_flows_meta.clear()
with _tests_lock:
for t in _tests.values():
if t.state == 'running':
t.stop()
_tests.clear()
elif MODE == 'responder':
if _responder:
_responder.stop()
_responder = None
# Start new mode
if new_mode == 'sender':
from engine.sender import FlowSender
from engine.stats import StatsCollector
_sender = FlowSender()
_stats_collector = StatsCollector()
_responder = None
elif new_mode == 'responder':
from engine.responder import Responder
_sender = None
_stats_collector = None
_responder = Responder(mode=responder_sub_mode)
_responder.start(interface=listen_iface)
MODE = new_mode
log.info('Mode switched to %s', MODE)
return jsonify({'mode': MODE, 'changed': True})
# ---------------------------------------------------------------------------
# Quick Ping (works in any mode)
# ---------------------------------------------------------------------------
@app.route('/ping', methods=['POST'])
def quick_ping():
"""Send ICMP pings to a target and return results."""
import subprocess
data = request.get_json(force=True)
target = data.get('target', '').strip()
count = min(int(data.get('count', 5)), 20)
if not target:
return jsonify({'error': 'target is required'}), 400
try:
result = subprocess.run(
['ping', '-c', str(count), '-W', '2', target],
capture_output=True, text=True, timeout=count * 3 + 5
)
output = result.stdout + result.stderr
# Parse ping output
lines = output.strip().split('\n')
replies = []
for line in lines:
if 'time=' in line:
try:
time_ms = float(line.split('time=')[1].split(' ')[0])
replies.append(time_ms)
except (IndexError, ValueError):
pass
stats = {}
if replies:
stats = {
'min_ms': round(min(replies), 2),
'avg_ms': round(sum(replies) / len(replies), 2),
'max_ms': round(max(replies), 2),
}
return jsonify({
'target': target,
'sent': count,
'received': len(replies),
'loss_pct': round(((count - len(replies)) / count) * 100, 1),
'replies': replies,
'stats': stats,
'reachable': len(replies) > 0,
})
except subprocess.TimeoutExpired:
return jsonify({'target': target, 'error': 'Ping timed out', 'reachable': False})
except Exception as e:
return jsonify({'target': target, 'error': str(e), 'reachable': False})
# ---------------------------------------------------------------------------
# Sender-mode: Flow endpoints
# ---------------------------------------------------------------------------
@app.route('/flows', methods=['GET'])
def list_flows():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
flow_ids = list(_flows_meta.keys())
flows = [_flow_response(fid) for fid in flow_ids]
return jsonify({'count': len(flows), 'flows': flows})
@app.route('/flows', methods=['POST'])
def create_flow():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
data = request.get_json(force=True)
# Validate required fields
if 'dst_ip' not in data:
return jsonify({'error': 'dst_ip is required'}), 400
if 'protocol' not in data:
return jsonify({'error': 'protocol is required'}), 400
if data['protocol'].lower() not in ('udp', 'tcp', 'icmp'):
return jsonify({'error': 'protocol must be udp, tcp, or icmp'}), 400
flow_id = str(uuid.uuid4())
flow_config = {
'id': flow_id,
'name': data.get('name', f'flow-{flow_id[:8]}'),
'src_mac': data.get('src_mac', 'auto'),
'dst_mac': data.get('dst_mac'),
'src_ip': data.get('src_ip'),
'dst_ip': data['dst_ip'],
'protocol': data['protocol'].lower(),
'src_port': data.get('src_port'),
'dst_port': data.get('dst_port'),
'frame_size': int(data.get('frame_size', 512)),
'rate_pps': int(data.get('rate_pps', 1000)),
'duration': int(data.get('duration', 30)),
'dscp': int(data.get('dscp', 0)),
'vlan_id': data.get('vlan_id'),
'state': 'idle',
'responder_url': data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
'created_at': _now_iso(),
}
with _flows_lock:
_flows_meta[flow_id] = flow_config
_sender.add_flow(flow_id, dict(flow_config))
log.info('Created flow %s (%s -> %s, %s)',
flow_id[:8], flow_config.get('src_ip', 'auto'),
flow_config['dst_ip'], flow_config['protocol'])
return jsonify(_flow_response(flow_id)), 201
@app.route('/flows/<flow_id>', methods=['GET'])
def get_flow(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
result = _flow_response(flow_id)
if result is None:
return jsonify({'error': 'Flow not found'}), 404
return jsonify(result)
@app.route('/flows/<flow_id>', methods=['PUT'])
def update_flow(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
meta = _flows_meta.get(flow_id)
if meta is None:
return jsonify({'error': 'Flow not found'}), 404
if meta.get('state') == 'running':
return jsonify({'error': 'Cannot update a running flow'}), 409
data = request.get_json(force=True)
updatable = [
'name', 'src_mac', 'dst_mac', 'src_ip', 'dst_ip', 'protocol',
'src_port', 'dst_port', 'frame_size', 'rate_pps', 'duration',
'dscp', 'vlan_id', 'responder_url',
]
with _flows_lock:
for key in updatable:
if key in data:
_flows_meta[flow_id][key] = data[key]
_sender.update_flow(flow_id, dict(_flows_meta[flow_id]))
return jsonify(_flow_response(flow_id))
@app.route('/flows/<flow_id>', methods=['DELETE'])
def delete_flow(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
if flow_id not in _flows_meta:
return jsonify({'error': 'Flow not found'}), 404
_sender.remove_flow(flow_id)
if _stats_collector:
_stats_collector.remove_flow(flow_id)
with _flows_lock:
_flows_meta.pop(flow_id, None)
log.info('Deleted flow %s', flow_id[:8])
return jsonify({'deleted': flow_id})
@app.route('/flows/<flow_id>/start', methods=['POST'])
def start_flow(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
if flow_id not in _flows_meta:
return jsonify({'error': 'Flow not found'}), 404
_flows_meta[flow_id]['state'] = 'running'
try:
_sender.start(flow_id)
except KeyError:
return jsonify({'error': 'Flow not found in sender'}), 404
log.info('Started flow %s', flow_id[:8])
return jsonify(_flow_response(flow_id))
@app.route('/flows/<flow_id>/stop', methods=['POST'])
def stop_flow(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
if flow_id not in _flows_meta:
return jsonify({'error': 'Flow not found'}), 404
_flows_meta[flow_id]['state'] = 'stopped'
_sender.stop(flow_id)
log.info('Stopped flow %s', flow_id[:8])
return jsonify(_flow_response(flow_id))
@app.route('/flows/<flow_id>/stats', methods=['GET'])
def flow_stats(flow_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _flows_lock:
if flow_id not in _flows_meta:
return jsonify({'error': 'Flow not found'}), 404
stats = _sender.get_stats(flow_id)
latest = _stats_collector.get_latest(flow_id) if _stats_collector else {}
return jsonify({'flow_id': flow_id, 'counters': stats, 'rates': latest})
# ---------------------------------------------------------------------------
# Sender-mode: Test endpoints
# ---------------------------------------------------------------------------
@app.route('/tests', methods=['GET'])
def list_tests():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _tests_lock:
tests = [t.get_info() for t in _tests.values()]
return jsonify({'count': len(tests), 'tests': tests})
@app.route('/tests', methods=['POST'])
def create_test():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
from engine.rfc2544 import create_test as _create_test
data = request.get_json(force=True)
test_type = data.get('type')
if not test_type:
return jsonify({'error': 'type is required'}), 400
# Accept either flow_config directly or flow_id to look up
flow_config = data.get('flow_config')
flow_id = data.get('flow_id')
if flow_config:
# Direct flow config provided — no flow_id needed
if not flow_config.get('dst_ip'):
return jsonify({'error': 'flow_config.dst_ip is required'}), 400
flow_config.setdefault('src_ip', 'auto')
flow_config.setdefault('protocol', 'udp')
flow_config.setdefault('src_port', 50000)
flow_config.setdefault('dst_port', 5001)
elif flow_id:
with _flows_lock:
flow_meta = _flows_meta.get(flow_id)
if flow_meta is None:
return jsonify({'error': 'Flow not found'}), 404
flow_config = dict(flow_meta)
else:
return jsonify({'error': 'flow_config or flow_id is required'}), 400
test_id = str(uuid.uuid4())
kwargs = {
'frame_sizes': data.get('frame_sizes', [64, 512, 1518]),
'trial_duration': float(data.get('trial_duration', 60)),
'max_rate_pps': int(data.get('max_rate_pps', flow_config.get('rate_pps', 10000))),
'acceptable_loss_pct': float(data.get('acceptable_loss_pct', 0.0)),
'responder_url': data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
}
try:
test = _create_test(test_id, test_type, flow_config, **kwargs)
except ValueError as e:
return jsonify({'error': str(e)}), 400
with _tests_lock:
_tests[test_id] = test
log.info('Created test %s (type=%s, dst=%s)', test_id[:8], test_type,
flow_config.get('dst_ip', '?'))
return jsonify(test.get_info()), 201
@app.route('/tests/<test_id>', methods=['GET'])
def get_test(test_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _tests_lock:
test = _tests.get(test_id)
if test is None:
return jsonify({'error': 'Test not found'}), 404
return jsonify(test.get_info())
@app.route('/tests/<test_id>/start', methods=['POST'])
def start_test(test_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _tests_lock:
test = _tests.get(test_id)
if test is None:
return jsonify({'error': 'Test not found'}), 404
test.start()
log.info('Started test %s', test_id[:8])
return jsonify(test.get_info())
@app.route('/tests/<test_id>/stop', methods=['POST'])
def stop_test(test_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _tests_lock:
test = _tests.get(test_id)
if test is None:
return jsonify({'error': 'Test not found'}), 404
test.stop()
log.info('Stopped test %s', test_id[:8])
return jsonify(test.get_info())
@app.route('/tests/<test_id>/results', methods=['GET'])
def test_results(test_id):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
with _tests_lock:
test = _tests.get(test_id)
if test is None:
return jsonify({'error': 'Test not found'}), 404
return jsonify({
'test_id': test_id,
'type': test.__class__.__name__,
'state': test.state,
'results': test.results,
'started_at': test.started_at,
'completed_at': test.completed_at,
})
# ---------------------------------------------------------------------------
# Sender-mode: Presets
# ---------------------------------------------------------------------------
@app.route('/presets', methods=['GET'])
def list_presets():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
from presets import PRESETS
out = {}
for name, preset in PRESETS.items():
out[name] = {
'description': preset.get('description', ''),
'has_test': 'test' in preset,
}
return jsonify({'presets': out})
@app.route('/presets/<name>', methods=['POST'])
def load_preset(name):
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
from presets import PRESETS
from engine.rfc2544 import create_test as _create_test
if name not in PRESETS:
return jsonify({'error': f'Unknown preset: {name}. Available: {list(PRESETS)}'}), 404
preset = PRESETS[name]
data = request.get_json(silent=True) or {}
# The preset flow needs a dst_ip; caller can override
flow_data = dict(preset['flow'])
flow_data['dst_ip'] = data.get('dst_ip', flow_data.get('dst_ip', '10.0.0.1'))
if 'src_ip' in data:
flow_data['src_ip'] = data['src_ip']
if 'responder_url' in data:
flow_data['responder_url'] = data['responder_url']
# Validate protocol present
if 'protocol' not in flow_data:
flow_data['protocol'] = 'udp'
flow_id = str(uuid.uuid4())
flow_config = {
'id': flow_id,
'name': data.get('name', f'{name}-{flow_id[:8]}'),
'src_mac': flow_data.get('src_mac', 'auto'),
'dst_mac': flow_data.get('dst_mac'),
'src_ip': flow_data.get('src_ip'),
'dst_ip': flow_data['dst_ip'],
'protocol': flow_data['protocol'],
'src_port': flow_data.get('src_port'),
'dst_port': flow_data.get('dst_port'),
'frame_size': int(flow_data.get('frame_size', 512)),
'rate_pps': int(flow_data.get('rate_pps', 1000)),
'duration': int(flow_data.get('duration', 30)),
'dscp': int(flow_data.get('dscp', 0)),
'vlan_id': flow_data.get('vlan_id'),
'state': 'idle',
'responder_url': flow_data.get('responder_url') or os.environ.get('RESPONDER_URL') or None,
'created_at': _now_iso(),
}
with _flows_lock:
_flows_meta[flow_id] = flow_config
_sender.add_flow(flow_id, dict(flow_config))
result = {'flow': _flow_response(flow_id)}
# Optionally create a test
if 'test' in preset:
test_cfg = preset['test']
test_id = str(uuid.uuid4())
kwargs = {
'frame_sizes': test_cfg.get('frame_sizes', [64, 512, 1518]),
'trial_duration': float(test_cfg.get('trial_duration', 60)),
'max_rate_pps': int(test_cfg.get('max_rate_pps', flow_config.get('rate_pps', 10000))),
'acceptable_loss_pct': float(test_cfg.get('acceptable_loss_pct', 0.0)),
}
test = _create_test(test_id, test_cfg['type'], dict(flow_config), **kwargs)
with _tests_lock:
_tests[test_id] = test
result['test'] = test.get_info()
log.info('Loaded preset %s -> flow %s', name, flow_id[:8])
return jsonify(result), 201
# ---------------------------------------------------------------------------
# Sender-mode: Stats history
# ---------------------------------------------------------------------------
@app.route('/stats/history', methods=['GET'])
def stats_history():
if MODE != 'sender':
return jsonify({'error': 'Not in sender mode'}), 400
count = int(request.args.get('count', 60))
with _flows_lock:
flow_ids = list(_flows_meta.keys())
history = {}
for fid in flow_ids:
history[fid] = _stats_collector.get_history(fid, count)
return jsonify({'history': history})
# ---------------------------------------------------------------------------
# Responder-mode endpoints
# ---------------------------------------------------------------------------
@app.route('/responder/stats', methods=['GET'])
def responder_stats():
if MODE != 'responder' or _responder is None:
return jsonify({'error': 'Not in responder mode'}), 400
return jsonify(_responder.get_stats())
@app.route('/responder/reset', methods=['POST'])
def responder_reset():
if MODE != 'responder' or _responder is None:
return jsonify({'error': 'Not in responder mode'}), 400
_responder.reset_stats()
return jsonify({'status': 'reset'})
# ---------------------------------------------------------------------------
# Stats collection loop (sender mode)
# ---------------------------------------------------------------------------
def _stats_loop(stop_event: threading.Event):
"""Runs every 1s, recording per-flow stats into the StatsCollector."""
while not stop_event.is_set():
try:
all_stats = _sender.get_all_stats()
for flow_id, s in all_stats.items():
latency = None
samples = s.get('latency_samples', [])
if samples:
latency = {
'min_ms': round(min(samples), 3),
'max_ms': round(max(samples), 3),
'avg_ms': round(sum(samples) / len(samples), 3),
}
_stats_collector.record(
flow_id,
tx_packets=s.get('tx_packets', 0),
tx_bytes=s.get('tx_bytes', 0),
rx_packets=s.get('rx_packets', 0),
rx_bytes=s.get('rx_bytes', 0),
latency=latency,
)
except Exception as e:
log.debug('Stats loop error: %s', e)
stop_event.wait(1.0)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global MODE, _sender, _stats_collector, _responder
MODE = os.environ.get('TRAFFIC_GEN_MODE', 'sender').lower()
api_port = int(os.environ.get('TRAFFIC_GEN_PORT', 5051))
listen_iface = os.environ.get('TRAFFIC_GEN_INTERFACE', None)
responder_sub_mode = os.environ.get('TRAFFIC_GEN_RESPONDER_MODE', 'log') # echo or log
log.info('Traffic Generator starting in %s mode', MODE)
if MODE == 'sender':
from engine.sender import FlowSender
from engine.stats import StatsCollector
_sender = FlowSender()
_stats_collector = StatsCollector()
# Start stats collection loop
stats_stop = threading.Event()
stats_thread = threading.Thread(
target=_stats_loop, args=(stats_stop,), daemon=True, name='stats-loop'
)
stats_thread.start()
# Start Flask in background thread
flask_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True),
daemon=True,
)
flask_thread.start()
log.info('Flask API listening on port %d', api_port)
# Main thread: keep alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
log.info('Shutting down sender...')
stats_stop.set()
elif MODE == 'responder':
from engine.responder import Responder
_responder = Responder(mode=responder_sub_mode)
_responder.start(interface=listen_iface)
# Start Flask in background thread
flask_thread = threading.Thread(
target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True),
daemon=True,
)
flask_thread.start()
log.info('Flask API listening on port %d', api_port)
# Main thread: keep alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
log.info('Shutting down responder...')
_responder.stop()
else:
log.error('Unknown mode: %s. Use "sender" or "responder".', MODE)
sys.exit(1)
if __name__ == '__main__':
main()