diff --git a/DOCS.md b/DOCS.md index 7fee8cd..a76eaba 100644 --- a/DOCS.md +++ b/DOCS.md @@ -16,6 +16,8 @@ 12. [Troubleshooting](#12-troubleshooting) 13. [Data Retention](#13-data-retention) 14. [Environment Variables Reference](#14-environment-variables-reference) +15. [gNMI Streaming Telemetry (Phase 4)](#15-gnmi-streaming-telemetry-phase-4) +16. [Traffic Generator (Phase 4)](#16-traffic-generator-phase-4) --- @@ -28,7 +30,7 @@ This is a **BGP Monitoring Platform (BMP) lab stack** deployed via Docker Compos - Receives BMP (BGP Monitoring Protocol, RFC 7854) telemetry from routers on TCP port 5000 - Streams BMP data through Kafka into a TimescaleDB/PostgreSQL database -- Provides **27 Grafana dashboards** (17 operational + 6 learning + 4 advanced analytics) for real-time and historical BGP analysis +- Provides **30 Grafana dashboards** (17 operational + 6 learning + 4 advanced analytics + 3 streaming telemetry) for real-time and historical BGP analysis - Includes an **ExaBGP route injector** that peers with the two CORE routers and injects synthetic BGP routes, enabling testing of BGP policy, route propagation, and Grafana dashboards without needing internet connectivity - Provides a **Vue 3 web UI** at `:5001` for point-and-click scenario management, live route tables, and peer monitoring @@ -64,7 +66,7 @@ IOS-XR Routers (9x, AS 65020) PostgreSQL 14 + TimescaleDB | +---------> obmp-grafana (grafana/grafana:9.1.7) :3000 - | 23 dashboards, PostgreSQL datasource + | 30 dashboards, PostgreSQL + InfluxDB datasources +---------> obmp-whois (openbmp/whois:2.2.0) :4300 WHOIS query server backed by the DB @@ -73,6 +75,24 @@ ExaBGP (obmp-exabgp, built locally) Peers eBGP to CORE-01 and CORE-02 (AS 65100 -> AS 65020) HTTP API on :5050 — inject/withdraw routes on demand Routes propagate via iBGP mesh to all 9 routers -> BMP -> DB -> Grafana + +gNMI Streaming Telemetry (Phase 4): + IOS-XR Routers (gRPC :57400) + | + v + obmp-telegraf (telegraf:1.28 + gnmi plugin) + | + v + obmp-influxdb (influxdb:2.7) :8086 + | + v + obmp-grafana (InfluxDB datasource -> Telemetry dashboards) + +Traffic Generator (Phase 4): + obmp-traffic-gen (python:3.11 + Scapy + Flask) :5051 + Dual-mode: sender (generate traffic) / responder (echo/log) + RFC 2544 testing, custom packet flows + obmp-traffic-gen-ui (Vue 3 + NGINX) :5002 ``` ### Container Summary @@ -87,7 +107,11 @@ ExaBGP (obmp-exabgp, built locally) | obmp-grafana | grafana/grafana:9.1.7 | 3000 | Visualization | | obmp-whois | openbmp/whois:2.2.0 | 4300 | WHOIS query server | | obmp-exabgp | local build | 5050 (host net) | BGP route injector | -| obmp-exabgp-ui | local build | 5001 (host net) | Vue 3 web control panel | +| obmp-exabgp-ui | local build | 5001 (host net) | Route injector web UI | +| obmp-influxdb | influxdb:2.7 | 8086 | Time-series DB for telemetry | +| obmp-telegraf | local build | - (host net) | gNMI telemetry collector | +| obmp-traffic-gen | local build | 5051 (host net) | Scapy traffic generator | +| obmp-traffic-gen-ui | local build | 5002 (host net) | Traffic generator web UI | --- @@ -830,3 +854,215 @@ Adjust in `docker-compose.yml` under the `psql-app` service environment block. | Variable | Default | Description | |----------|---------|-------------| | `EXABGP_API` | `http://localhost:5050` | ExaBGP API base URL | + +--- + +## 15. gNMI Streaming Telemetry (Phase 4) + +### Overview + +gNMI (gRPC Network Management Interface) adds **data-plane visibility** alongside BMP's control-plane monitoring. Telegraf collects real-time interface counters from all 9 IOS-XR routers via gNMI subscriptions and stores them in InfluxDB. Grafana queries InfluxDB for telemetry dashboards. + +### Architecture + +``` +IOS-XR Routers (9x, gRPC port 57400) + | + gNMI subscriptions (10s sample) + | + v + obmp-telegraf (telegraf:1.28 + gnmi input plugin) + host networking → reaches routers on 10.100.0.x + | + v + obmp-influxdb (influxdb:2.7, port 8086) + bucket: "telemetry", org: "openbmp" + | + v + obmp-grafana (InfluxDB datasource, Flux queries) + 3 dashboards in OBMP-Telemetry folder +``` + +### Enabling gRPC on Routers + +The routers need gRPC enabled before Telegraf can collect telemetry. A NETCONF script is provided: + +```bash +# From the host (requires ncclient: pip install ncclient) +cd /home/user/obmp-docker/gnmi +python3 gnmi_grpc_config.py +``` + +This connects to all 9 routers via NETCONF (port 830, credentials webui/cisco) and pushes: +``` +grpc + port 57400 + no-tls +``` + +**Verify on router:** +``` +show grpc status +``` +Expected: gRPC listening on port 57400. + +### Telemetry Data Collected + +Telegraf subscribes to two IOS-XR YANG paths at 10-second intervals: + +| Subscription | YANG Path | Data | +|-------------|-----------|------| +| interface_counters | `Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/generic-counters` | bytes/packets in/out, errors, drops, CRC | +| interface_rates | `Cisco-IOS-XR-infra-statsd-oper:infra-statistics/interfaces/interface/latest/data-rate` | bits/sec in/out, packet rate | + +### InfluxDB Access + +- **URL:** `http://localhost:8086` +- **Org:** `openbmp` +- **Bucket:** `telemetry` +- **Token:** `openbmp-telemetry-token` +- **Retention:** 30 days + +### Grafana Telemetry Dashboards + +Three dashboards in the **OBMP-Telemetry** folder: + +| Dashboard | UID | Description | +|-----------|-----|-------------| +| Interface Utilization | obmp-telem-01 | Input/output bytes rate, packets rate, top interfaces by throughput | +| Interface Errors | obmp-telem-02 | CRC errors, input/output errors, drops, overruns | +| Combined BMP + Telemetry | obmp-telem-03 | Mixed datasource — BGP peer status (PostgreSQL) alongside interface counters (InfluxDB) | + +All dashboards have `$router` and `$interface` template variables for filtering. + +### Troubleshooting gNMI + +```bash +# Check Telegraf logs for gNMI connection status +docker logs obmp-telegraf --tail 50 + +# Verify InfluxDB has data +curl -s -H "Authorization: Token openbmp-telemetry-token" \ + "http://localhost:8086/api/v2/query?org=openbmp" \ + --data-urlencode 'q=from(bucket:"telemetry") |> range(start: -5m) |> limit(n:5)' + +# Check InfluxDB health +curl http://localhost:8086/health +``` + +--- + +## 16. Traffic Generator (Phase 4) + +### Overview + +A portable, containerized traffic generator with a web UI for RFC 2544 testing and custom packet flows. Built with Scapy + Flask (backend) and Vue 3 + NGINX (frontend). The container supports **dual-mode operation**: sender (generate traffic) or responder (receive/echo packets). + +### Accessing the UI + +- **Web UI:** `http://localhost:5002` +- **API:** `http://localhost:5051` + +### Dual-Mode Operation + +Set via `TRAFFIC_GEN_MODE` environment variable in `docker-compose.yml`: + +| Mode | Description | +|------|-------------| +| `sender` (default) | Generates traffic, runs RFC 2544 tests, sends custom flows | +| `responder` | Listens for incoming test packets, echoes/timestamps them, reports receive stats | + +**Typical deployment:** One instance as `sender` on the host, optionally a second instance as `responder` on another endpoint. Without a responder, the sender uses ICMP echo for latency measurement (routers respond natively). + +### Creating Flows + +Use the **Flow Builder** panel (left sidebar) in the UI: + +| Field | Default | Description | +|-------|---------|-------------| +| Name | - | Human-readable flow name | +| Destination IP | `10.100.0.100` | Target router IP | +| Source IP | `10.40.40.202` | Host IP | +| Protocol | UDP | UDP, TCP, or ICMP | +| Source Port | 50000 | (UDP/TCP only) | +| Destination Port | 5001 | (UDP/TCP only) | +| Frame Size | 512 | Packet size in bytes | +| Rate (pps) | 1000 | Packets per second | +| Duration | 30 | Seconds (0 = infinite) | +| DSCP | 0 | Differentiated Services Code Point | + +After creating a flow, use the **Flows** tab to Start/Stop/Delete flows. + +### RFC 2544 Testing + +Use the **Tests** tab to configure and run RFC 2544 tests: + +| Test Type | Description | +|-----------|-------------| +| **Throughput** | Binary search for maximum zero-loss forwarding rate | +| **Latency** | Measure round-trip time at determined throughput rate | +| **Frame Loss** | Loss percentage vs. offered load curve | +| **Back-to-Back** | Maximum burst length at line rate with zero loss | + +**Parameters:** +- **Base Flow:** Select a previously created flow as the test template +- **Frame Sizes:** Standard sizes: 64, 128, 256, 512, 1024, 1280, 1518 bytes +- **Trial Duration:** Per-frame-size test duration (5–300 sec) +- **Max Rate (pps):** Upper bound for binary search +- **Acceptable Loss %:** Threshold for pass/fail + +### Quick Presets + +Six built-in presets are available in the **Tests** tab: + +| Preset | Description | +|--------|-------------| +| quick_icmp | ICMP ping to CORE-01 at 10 pps | +| udp_flood_small | 64-byte UDP at 5000 pps | +| udp_flood_large | 1518-byte UDP at 1000 pps | +| rfc2544_throughput | Full throughput test with standard frame sizes | +| rfc2544_latency | Latency measurement with standard frame sizes | +| tcp_session | TCP flow at 500 pps | + +### API Reference + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/healthz` | Health check + engine status | +| GET | `/interfaces` | Available network interfaces | +| GET | `/mode` | Current mode (sender/responder) | +| GET/POST | `/flows` | List / create flows | +| GET/PUT/DELETE | `/flows/` | Get / update / delete flow | +| POST | `/flows//start` | Start sending | +| POST | `/flows//stop` | Stop sending | +| GET | `/flows//stats` | Real-time stats for a flow | +| GET/POST | `/tests` | List / create RFC 2544 tests | +| GET | `/tests/` | Test details + results | +| POST | `/tests//start` | Start test execution | +| POST | `/tests//stop` | Abort test | +| GET | `/tests//results` | Exportable results | +| GET | `/presets` | Available test presets | +| POST | `/presets/` | Create flow + test from preset | +| GET | `/stats/history` | Stats ring buffer (300 samples) | +| GET | `/responder/stats` | Responder-mode receive stats | +| POST | `/responder/reset` | Reset responder counters | + +### Integration with gNMI Telemetry + +The key value of combining the traffic generator with gNMI: **send traffic while watching real-time interface counters**. + +1. Create a UDP flow targeting a router (e.g., R9K-01 at 10.100.0.1) +2. Open the Grafana **Interface Utilization** dashboard, select that router +3. Start the flow — gNMI counters show traffic appearing on the interface +4. Run an RFC 2544 throughput test — Grafana shows the stepped traffic pattern from binary search iterations +5. Compare Scapy-reported stats with gNMI-reported counters for cross-validation + +The **Combined BMP + Telemetry** dashboard shows both control-plane (BMP BGP updates) and data-plane (gNMI interface counters) side by side, enabling correlation of BGP changes with traffic impact. + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `TRAFFIC_GEN_API_PORT` | `5051` | Flask API listen port | +| `TRAFFIC_GEN_MODE` | `sender` | Operating mode: `sender` or `responder` | +| `INFLUXDB_TOKEN` | `openbmp-telemetry-token` | InfluxDB auth token (Telegraf) | diff --git a/docker-compose.yml b/docker-compose.yml index 7fdac0a..a5d8ad5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -231,6 +231,62 @@ services: network_mode: host # Serves on port 5001 (host network, defined in nginx.conf) + # --- Phase 4: gNMI Streaming Telemetry --- + + influxdb: + restart: unless-stopped + container_name: obmp-influxdb + image: influxdb:2.7 + ports: + - "8086:8086" + volumes: + - ${OBMP_DATA_ROOT}/influxdb:/var/lib/influxdb2 + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=openbmp + - DOCKER_INFLUXDB_INIT_PASSWORD=openbmp123 + - DOCKER_INFLUXDB_INIT_ORG=openbmp + - DOCKER_INFLUXDB_INIT_BUCKET=telemetry + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=openbmp-telemetry-token + - DOCKER_INFLUXDB_INIT_RETENTION=30d + + telegraf: + restart: unless-stopped + container_name: obmp-telegraf + build: + context: ./telegraf + dockerfile: Dockerfile + network_mode: host + depends_on: + - influxdb + environment: + - INFLUXDB_TOKEN=openbmp-telemetry-token + + # --- Phase 4: Traffic Generator --- + + traffic-gen: + restart: unless-stopped + container_name: obmp-traffic-gen + build: + context: ./traffic-gen + dockerfile: Dockerfile + network_mode: host + cap_add: + - NET_RAW + - NET_ADMIN + environment: + - TRAFFIC_GEN_API_PORT=5051 + - TRAFFIC_GEN_MODE=sender + + traffic-gen-ui: + restart: unless-stopped + container_name: obmp-traffic-gen-ui + build: + context: ./traffic-gen-ui + dockerfile: Dockerfile + network_mode: host + # Serves on port 5002 (host network, defined in nginx.conf) + whois: restart: unless-stopped container_name: obmp-whois diff --git a/gnmi/__pycache__/gnmi_grpc_config.cpython-310.pyc b/gnmi/__pycache__/gnmi_grpc_config.cpython-310.pyc new file mode 100644 index 0000000..f39c9b2 Binary files /dev/null and b/gnmi/__pycache__/gnmi_grpc_config.cpython-310.pyc differ diff --git a/gnmi/gnmi_grpc_config.py b/gnmi/gnmi_grpc_config.py new file mode 100644 index 0000000..77942a7 --- /dev/null +++ b/gnmi/gnmi_grpc_config.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +""" +gNMI gRPC Configuration Script +=============================== +Enables gRPC dial-in telemetry on all 9 IOS-XR routers so that +Telegraf (or any gNMI collector) can subscribe to streaming +telemetry data. + +What this script applies per router: + - gRPC server on port 57400 with TLS disabled + - YANG model: Cisco-IOS-XR-man-ems-cfg + +Router targets: + CORE-01 (10.100.0.100) + CORE-02 (10.100.0.200) + R9K-01 (10.100.0.1) through R9K-07 (10.100.0.7) +""" + +from ncclient import manager +import sys + +GRPC_NS = 'http://cisco.com/ns/yang/Cisco-IOS-XR-man-ems-cfg' + +ROUTERS = [ + ('10.100.0.100', 'CORE-01'), + ('10.100.0.200', 'CORE-02'), + ('10.100.0.1', 'R9K-01'), + ('10.100.0.2', 'R9K-02'), + ('10.100.0.3', 'R9K-03'), + ('10.100.0.4', 'R9K-04'), + ('10.100.0.5', 'R9K-05'), + ('10.100.0.6', 'R9K-06'), + ('10.100.0.7', 'R9K-07'), +] + +GRPC_CONFIG_XML = """ + + + 57400 + + + +""" + + +def configure_router(mgmt_ip, label): + """Apply gRPC configuration via NETCONF edit-config + commit.""" + print(f"\n{'─'*60}") + print(f" Configuring {label} ({mgmt_ip})") + print(f"{'─'*60}") + print(f" Applying: gRPC port=57400 no-tls") + + try: + with manager.connect( + host=mgmt_ip, + port=830, + username='webui', + password='cisco', + hostkey_verify=False, + device_params={'name': 'iosxr'}, + timeout=20, + ) as m: + print(" → Applying gRPC configuration...") + m.edit_config(target='candidate', config=GRPC_CONFIG_XML) + + print(" → Committing...") + m.commit() + print(f" ✓ {label} done.") + return True + + except Exception as e: + print(f" ✗ ERROR on {label}: {e}") + return False + + +def verify_router(mgmt_ip, label): + """Re-read running config to confirm the grpc block is present.""" + try: + with manager.connect( + host=mgmt_ip, port=830, username='webui', password='cisco', + hostkey_verify=False, device_params={'name': 'iosxr'}, timeout=10 + ) as m: + filt_grpc = """ + + + + + """ + r_grpc = m.get_config(source='running', filter=filt_grpc) + + has_grpc = '57400' in str(r_grpc) + has_notls = ' range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "hide": 0, + "includeAll": true, + "label": "Router", + "multi": true, + "name": "router", + "options": [], + "query": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "refresh": 2, + "regex": "", + "type": "query" + } + ] + }, + "panels": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "Current BGP peer status from the OpenBMP PostgreSQL database. Shows peer address, name, and session state.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "custom": {"align": "auto","displayMode": "auto","filterable": true,"inspect": true}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null}]} + }, + "overrides": [ + {"matcher": {"id": "byName","options": "state"},"properties": [{"id": "custom.displayMode","value": "color-background-solid"},{"id": "mappings","value": [{"options": {"down": {"color": "red","index": 1,"text": "DOWN"},"up": {"color": "green","index": 0,"text": "UP"}},"type": "value"}]}]}, + {"matcher": {"id": "byName","options": "peer_addr"},"properties": [{"id": "custom.width","value": 160}]}, + {"matcher": {"id": "byName","options": "name"},"properties": [{"id": "custom.width","value": 200}]} + ] + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 0}, + "id": 1, + "options": {"footer": {"fields": "","reducer": ["sum"],"show": false},"showHeader": true,"sortBy": [{"desc": false,"displayName": "state"}]}, + "targets": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "format": "table", + "rawSql": "SELECT\n p.peer_addr,\n COALESCE(p.name, p.peer_addr::text) AS name,\n p.state,\n p.peer_as AS \"AS\",\n p.router_hash_id IS NOT NULL AS \"BMP Active\",\n p.timestamp AS \"Last State Change\"\nFROM bgp_peers p\nWHERE p.isprepolicy = true\nORDER BY p.state, p.peer_addr", + "refId": "A" + } + ], + "title": "BGP Peer Status", + "type": "table" + }, + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Interface traffic rates from gNMI streaming telemetry. Shows bytes per second for each interface across selected routers.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "line","fillOpacity": 10,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "none"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "red","value": 80}]}, + "unit": "Bps" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 10}, + "id": 2, + "options": {"legend": {"calcs": ["mean","max"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r._field == \"bytes_received\" or r._field == \"bytes_sent\")\n |> derivative(unit: 1s, nonNegative: true)\n |> map(fn: (r) => ({r with _value: if r._value < 0.0 then 0.0 else r._value}))", + "refId": "A" + } + ], + "title": "Interface Traffic", + "type": "timeseries" + }, + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "description": "BGP update activity over time from the OpenBMP PostgreSQL database. Shows peer event transitions and update counts for correlation with traffic patterns.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "bars","fillOpacity": 50,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "normal"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null}]}, + "unit": "short" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 20}, + "id": 3, + "options": {"legend": {"calcs": ["sum"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "postgres","uid": "obmp_postgres"}, + "format": "time_series", + "rawSql": "SELECT\n $__timeGroupAlias(e.timestamp, '1m'),\n COALESCE(p.name, p.peer_addr::text) AS metric,\n COUNT(*) AS \"value\"\nFROM peer_event_log e\nJOIN bgp_peers p ON p.hash_id = e.peer_hash_id\nWHERE $__timeFilter(e.timestamp)\nGROUP BY 1, 2\nORDER BY 1", + "refId": "A" + } + ], + "title": "BGP Update Activity", + "type": "timeseries" + }, + { + "datasource": {"type": "datasource","uid": "grafana"}, + "gridPos": {"h": 6,"w": 24,"x": 0,"y": 30}, + "id": 4, + "options": { + "code": {"language": "plaintext","showLineNumbers": false,"showMiniMap": false}, + "content": "## Combined BMP + Telemetry View\n\nThis dashboard integrates two complementary data sources to provide a unified network monitoring view:\n\n### Control Plane (BMP via PostgreSQL)\n- **BGP Peer Status** -- Real-time BGP session state from BMP (OpenBMP)\n- **BGP Update Activity** -- Session transitions and update events from `peer_event_log`\n\n### Data Plane (gNMI via InfluxDB)\n- **Interface Traffic** -- Streaming telemetry byte rates collected via gNMI at 10-second intervals\n\n### Correlation Use Cases\n- A BGP peer flap (control plane) should correlate with a traffic shift on affected interfaces (data plane)\n- Sustained high interface utilization (data plane) may precede BGP session resets due to congestion\n- Compare the number of active BGP peers with interface traffic to validate routing convergence", + "mode": "markdown" + }, + "title": "About", + "type": "text" + } + ], + "schemaVersion": 39, + "style": "dark", + "tags": ["obmp-telemetry"], + "time": {"from": "now-1h","to": "now"}, + "timepicker": {}, + "timezone": "browser", + "title": "Combined BMP + Telemetry View", + "uid": "obmp-telem-03", + "version": 1 +} diff --git a/obmp-grafana/dashboards/Telemetry-3001/interface_errors.json b/obmp-grafana/dashboards/Telemetry-3001/interface_errors.json new file mode 100644 index 0000000..9daf0f3 --- /dev/null +++ b/obmp-grafana/dashboards/Telemetry-3001/interface_errors.json @@ -0,0 +1,134 @@ +{ + "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, + "description": "Interface error and drop counters collected via gNMI streaming telemetry. Helps identify interfaces with packet loss or physical layer issues.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "templating": { + "list": [ + { + "current": {}, + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "definition": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "hide": 0, + "includeAll": true, + "label": "Router", + "multi": true, + "name": "router", + "options": [], + "query": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "refresh": 2, + "regex": "", + "type": "query" + }, + { + "current": {}, + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "definition": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> keep(columns: [\"name\"])\n |> distinct(column: \"name\")\n |> sort()", + "hide": 0, + "includeAll": true, + "label": "Interface", + "multi": true, + "name": "interface", + "options": [], + "query": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> keep(columns: [\"name\"])\n |> distinct(column: \"name\")\n |> sort()", + "refresh": 2, + "regex": "", + "type": "query" + } + ] + }, + "panels": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Interface error counters over time: input errors, output errors, and CRC errors. A rising trend indicates physical or configuration issues.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "line","fillOpacity": 10,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "none"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}, + "unit": "short" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 0}, + "id": 1, + "options": {"legend": {"calcs": ["mean","max","last"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"input_errors\" or r._field == \"output_errors\" or r._field == \"crc_errors\")\n |> derivative(unit: 1s, nonNegative: true)", + "refId": "A" + } + ], + "title": "Interface Errors", + "type": "timeseries" + }, + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Interface drop counters over time: input drops and output drops. Drops indicate congestion or queue overflow.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "line","fillOpacity": 10,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "none"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}, + "unit": "short" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 10}, + "id": 2, + "options": {"legend": {"calcs": ["mean","max","last"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"input_drops\" or r._field == \"output_drops\")\n |> derivative(unit: 1s, nonNegative: true)", + "refId": "A" + } + ], + "title": "Interface Drops", + "type": "timeseries" + }, + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Summary table showing the latest error and drop counter values per interface. Useful for quickly identifying problematic interfaces.", + "fieldConfig": { + "defaults": { + "color": {"mode": "thresholds"}, + "custom": {"align": "auto","displayMode": "auto","filterable": true,"inspect": true}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]} + }, + "overrides": [ + {"matcher": {"id": "byName","options": "input_errors"},"properties": [{"id": "custom.displayMode","value": "color-background-solid"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}}]}, + {"matcher": {"id": "byName","options": "output_errors"},"properties": [{"id": "custom.displayMode","value": "color-background-solid"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}}]}, + {"matcher": {"id": "byName","options": "input_drops"},"properties": [{"id": "custom.displayMode","value": "color-background-solid"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}}]}, + {"matcher": {"id": "byName","options": "output_drops"},"properties": [{"id": "custom.displayMode","value": "color-background-solid"},{"id": "thresholds","value": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "yellow","value": 1},{"color": "red","value": 100}]}}]} + ] + }, + "gridPos": {"h": 12,"w": 24,"x": 0,"y": 20}, + "id": 3, + "options": {"footer": {"fields": "","reducer": ["sum"],"show": false},"showHeader": true,"sortBy": [{"desc": true,"displayName": "input_errors"}]}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"input_errors\" or r._field == \"output_errors\" or r._field == \"crc_errors\" or r._field == \"input_drops\" or r._field == \"output_drops\")\n |> last()\n |> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n |> keep(columns: [\"source\", \"name\", \"input_errors\", \"output_errors\", \"crc_errors\", \"input_drops\", \"output_drops\"])\n |> sort(columns: [\"input_errors\"], desc: true)", + "refId": "A" + } + ], + "title": "Error Summary Table", + "type": "table" + } + ], + "schemaVersion": 39, + "style": "dark", + "tags": ["obmp-telemetry"], + "time": {"from": "now-1h","to": "now"}, + "timepicker": {}, + "timezone": "browser", + "title": "Interface Errors & Drops", + "uid": "obmp-telem-02", + "version": 1 +} diff --git a/obmp-grafana/dashboards/Telemetry-3001/interface_utilization.json b/obmp-grafana/dashboards/Telemetry-3001/interface_utilization.json new file mode 100644 index 0000000..07fec81 --- /dev/null +++ b/obmp-grafana/dashboards/Telemetry-3001/interface_utilization.json @@ -0,0 +1,141 @@ +{ + "annotations": {"list": [{"builtIn": 1,"datasource": {"type": "datasource","uid": "grafana"},"enable": true,"hide": true,"iconColor": "rgba(0, 211, 255, 1)","name": "Annotations & Alerts","type": "dashboard"}]}, + "description": "Interface utilization metrics collected via gNMI streaming telemetry from IOS-XR routers. Shows byte rates, packet rates, and top interfaces by traffic volume.", + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "templating": { + "list": [ + { + "current": {}, + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "definition": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "hide": 0, + "includeAll": true, + "label": "Router", + "multi": true, + "name": "router", + "options": [], + "query": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> keep(columns: [\"source\"])\n |> distinct(column: \"source\")\n |> sort()", + "refresh": 2, + "regex": "", + "type": "query" + }, + { + "current": {}, + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "definition": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> keep(columns: [\"name\"])\n |> distinct(column: \"name\")\n |> sort()", + "hide": 0, + "includeAll": true, + "label": "Interface", + "multi": true, + "name": "interface", + "options": [], + "query": "from(bucket: \"telemetry\")\n |> range(start: -1h)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> keep(columns: [\"name\"])\n |> distinct(column: \"name\")\n |> sort()", + "refresh": 2, + "regex": "", + "type": "query" + } + ] + }, + "panels": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Rate of bytes received and sent per interface, computed as the derivative of cumulative counters. Unit: bytes per second.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "line","fillOpacity": 10,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "none"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "red","value": 80}]}, + "unit": "Bps" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 0}, + "id": 1, + "options": {"legend": {"calcs": ["mean","max"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"bytes_received\" or r._field == \"bytes_sent\")\n |> derivative(unit: 1s, nonNegative: true)\n |> map(fn: (r) => ({r with _value: if r._value < 0.0 then 0.0 else r._value}))", + "refId": "A" + } + ], + "title": "Input/Output Bytes Rate", + "type": "timeseries" + }, + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Rate of packets received and sent per interface, computed as the derivative of cumulative counters. Unit: packets per second.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","barAlignment": 0,"drawStyle": "line","fillOpacity": 10,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineInterpolation": "linear","lineWidth": 1,"pointSize": 5,"scaleDistribution": {"type": "linear"},"showPoints": "never","spanNulls": false,"stacking": {"group": "A","mode": "none"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null},{"color": "red","value": 80}]}, + "unit": "pps" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 10}, + "id": 2, + "options": {"legend": {"calcs": ["mean","max"],"displayMode": "table","placement": "bottom"},"tooltip": {"mode": "multi","sort": "desc"}}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"packets_received\" or r._field == \"packets_sent\")\n |> derivative(unit: 1s, nonNegative: true)\n |> map(fn: (r) => ({r with _value: if r._value < 0.0 then 0.0 else r._value}))", + "refId": "A" + } + ], + "title": "Input/Output Packets Rate", + "type": "timeseries" + }, + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "description": "Top interfaces ranked by total bytes (received + sent) over the selected time range.", + "fieldConfig": { + "defaults": { + "color": {"mode": "palette-classic"}, + "custom": {"axisBorderShow": false,"axisCenteredZero": false,"axisLabel": "","axisPlacement": "auto","fillOpacity": 80,"gradientMode": "none","hideFrom": {"legend": false,"tooltip": false,"viz": false},"lineWidth": 1,"scaleDistribution": {"type": "linear"},"thresholdsStyle": {"mode": "off"}}, + "mappings": [], + "thresholds": {"mode": "absolute","steps": [{"color": "green","value": null}]}, + "unit": "decbytes" + } + }, + "gridPos": {"h": 10,"w": 24,"x": 0,"y": 20}, + "id": 3, + "options": {"barRadius": 0,"barWidth": 0.6,"fullHighlight": false,"groupWidth": 0.7,"legend": {"calcs": [],"displayMode": "list","placement": "bottom"},"orientation": "horizontal","showValue": "auto","stacking": "none","tooltip": {"mode": "single","sort": "none"},"xTickLabelRotation": 0}, + "targets": [ + { + "datasource": {"type": "influxdb","uid": "obmp_influxdb"}, + "query": "from(bucket: \"telemetry\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"interface_counters\")\n |> filter(fn: (r) => r.source =~ /${router:regex}/)\n |> filter(fn: (r) => r.name =~ /${interface:regex}/)\n |> filter(fn: (r) => r._field == \"bytes_received\" or r._field == \"bytes_sent\")\n |> derivative(unit: 1s, nonNegative: true)\n |> group(columns: [\"source\", \"name\", \"_field\"])\n |> sum()\n |> group(columns: [\"source\", \"name\"])\n |> sum()\n |> group()\n |> sort(columns: [\"_value\"], desc: true)\n |> limit(n: 20)", + "refId": "A" + } + ], + "title": "Top Interfaces by Traffic", + "type": "barchart" + }, + { + "datasource": {"type": "datasource","uid": "grafana"}, + "gridPos": {"h": 4,"w": 24,"x": 0,"y": 30}, + "id": 4, + "options": { + "code": {"language": "plaintext","showLineNumbers": false,"showMiniMap": false}, + "content": "## Interface Utilization Dashboard\n\nThis dashboard displays real-time interface utilization metrics collected via **gNMI streaming telemetry** from IOS-XR routers.\n\n- **Data source:** InfluxDB (Telegraf gNMI input plugin)\n- **YANG model:** `Cisco-IOS-XR-infra-statsd-oper`\n- **Subscription path:** `/infra-statistics/interfaces/interface/latest/generic-counters`\n- **Sample interval:** 10 seconds\n\nUse the **Router** and **Interface** template variables at the top to filter the view.", + "mode": "markdown" + }, + "title": "About", + "type": "text" + } + ], + "schemaVersion": 39, + "style": "dark", + "tags": ["obmp-telemetry"], + "time": {"from": "now-1h","to": "now"}, + "timepicker": {}, + "timezone": "browser", + "title": "Interface Utilization", + "uid": "obmp-telem-01", + "version": 1 +} diff --git a/obmp-grafana/provisioning/dashboards/openbmp-dashboards.yml b/obmp-grafana/provisioning/dashboards/openbmp-dashboards.yml index 3cbcea3..8075afb 100644 --- a/obmp-grafana/provisioning/dashboards/openbmp-dashboards.yml +++ b/obmp-grafana/provisioning/dashboards/openbmp-dashboards.yml @@ -133,4 +133,15 @@ providers: allowUiUpdates: true options: path: /var/lib/grafana/dashboards/Learning + foldersFromFilesStructure: false + - name: 'OpenBMP-Telemetry' + orgId: 1 + folder: 'OBMP-Telemetry' + folderUid: '3001' + type: file + disableDeletion: false + updateIntervalSeconds: 30 + allowUiUpdates: true + options: + path: /var/lib/grafana/dashboards/Telemetry-3001 foldersFromFilesStructure: false \ No newline at end of file diff --git a/obmp-grafana/provisioning/datasources/influxdb-ds.yml b/obmp-grafana/provisioning/datasources/influxdb-ds.yml new file mode 100644 index 0000000..f09088a --- /dev/null +++ b/obmp-grafana/provisioning/datasources/influxdb-ds.yml @@ -0,0 +1,16 @@ +apiVersion: 1 + +datasources: + - name: InfluxDB-Telemetry + uid: obmp_influxdb + type: influxdb + access: proxy + url: http://obmp-influxdb:8086 + jsonData: + version: Flux + organization: openbmp + defaultBucket: telemetry + secureJsonData: + token: openbmp-telemetry-token + isDefault: false + editable: true diff --git a/telegraf/Dockerfile b/telegraf/Dockerfile new file mode 100644 index 0000000..67f6d5f --- /dev/null +++ b/telegraf/Dockerfile @@ -0,0 +1,2 @@ +FROM telegraf:1.28-alpine +COPY telegraf.conf /etc/telegraf/telegraf.conf diff --git a/telegraf/telegraf.conf b/telegraf/telegraf.conf new file mode 100644 index 0000000..2959e86 --- /dev/null +++ b/telegraf/telegraf.conf @@ -0,0 +1,66 @@ +# Telegraf Configuration for gNMI Streaming Telemetry +# Collects interface counters and data rates from IOS-XR routers + +[global_tags] + +[agent] + interval = "10s" + round_interval = true + metric_batch_size = 1000 + metric_buffer_limit = 10000 + collection_jitter = "0s" + flush_interval = "10s" + flush_jitter = "0s" + precision = "0s" + +############################################################################### +# INPUT PLUGINS # +############################################################################### + +[[inputs.gnmi]] + addresses = [ + "10.100.0.100:57400", + "10.100.0.200:57400", + "10.100.0.1:57400", + "10.100.0.2:57400", + "10.100.0.3:57400", + "10.100.0.4:57400", + "10.100.0.5:57400", + "10.100.0.6:57400", + "10.100.0.7:57400" + ] + username = "webui" + password = "cisco" + + ## Do not verify the server certificate + enable_tls = false + + ## gNMI encoding requested (one of: "proto", "json", "json_ietf", "bytes") + encoding = "proto" + + ## Redial in case of failures after + redial = "10s" + + [[inputs.gnmi.subscription]] + name = "interface_counters" + origin = "Cisco-IOS-XR-infra-statsd-oper" + path = "/infra-statistics/interfaces/interface/latest/generic-counters" + subscription_mode = "sample" + sample_interval = "10s" + + [[inputs.gnmi.subscription]] + name = "interface_rates" + origin = "Cisco-IOS-XR-infra-statsd-oper" + path = "/infra-statistics/interfaces/interface/latest/data-rate" + subscription_mode = "sample" + sample_interval = "10s" + +############################################################################### +# OUTPUT PLUGINS # +############################################################################### + +[[outputs.influxdb_v2]] + urls = ["http://localhost:8086"] + token = "${INFLUXDB_TOKEN}" + organization = "openbmp" + bucket = "telemetry" diff --git a/traffic-gen-ui/Dockerfile b/traffic-gen-ui/Dockerfile new file mode 100644 index 0000000..42f19ee --- /dev/null +++ b/traffic-gen-ui/Dockerfile @@ -0,0 +1,12 @@ +FROM node:20-alpine AS build +WORKDIR /app +COPY package.json ./ +RUN npm install +COPY . . +RUN npm run build + +FROM nginx:alpine +COPY --from=build /app/dist /usr/share/nginx/html +COPY nginx.conf /etc/nginx/conf.d/default.conf +EXPOSE 5002 +CMD ["nginx", "-g", "daemon off;"] diff --git a/traffic-gen-ui/index.html b/traffic-gen-ui/index.html new file mode 100644 index 0000000..ef05071 --- /dev/null +++ b/traffic-gen-ui/index.html @@ -0,0 +1,12 @@ + + + + + + Traffic Generator + + +
+ + + diff --git a/traffic-gen-ui/nginx.conf b/traffic-gen-ui/nginx.conf new file mode 100644 index 0000000..ab4e9f3 --- /dev/null +++ b/traffic-gen-ui/nginx.conf @@ -0,0 +1,15 @@ +server { + listen 5002; + root /usr/share/nginx/html; + index index.html; + + location /api/ { + proxy_pass http://localhost:5051/; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + } + + location / { + try_files $uri $uri/ /index.html; + } +} diff --git a/traffic-gen-ui/package.json b/traffic-gen-ui/package.json new file mode 100644 index 0000000..045644c --- /dev/null +++ b/traffic-gen-ui/package.json @@ -0,0 +1,17 @@ +{ + "name": "traffic-gen-ui", + "version": "1.0.0", + "private": true, + "scripts": { + "dev": "vite", + "build": "vite build", + "preview": "vite preview" + }, + "dependencies": { + "vue": "^3.3.0" + }, + "devDependencies": { + "@vitejs/plugin-vue": "^4.2.0", + "vite": "^4.4.0" + } +} diff --git a/traffic-gen-ui/src/App.vue b/traffic-gen-ui/src/App.vue new file mode 100644 index 0000000..4d1c778 --- /dev/null +++ b/traffic-gen-ui/src/App.vue @@ -0,0 +1,313 @@ + + + + + + + diff --git a/traffic-gen-ui/src/api.js b/traffic-gen-ui/src/api.js new file mode 100644 index 0000000..09f1a01 --- /dev/null +++ b/traffic-gen-ui/src/api.js @@ -0,0 +1,44 @@ +const BASE = '/api' + +async function req(method, path, body) { + const opts = { method, headers: { 'Content-Type': 'application/json' } } + if (body) opts.body = JSON.stringify(body) + const r = await fetch(BASE + path, opts) + if (!r.ok) throw new Error(`${method} ${path} -> ${r.status}`) + return r.json() +} + +export const api = { + health: () => req('GET', '/healthz'), + interfaces: () => req('GET', '/interfaces'), + mode: () => req('GET', '/mode'), + + // Flows + flows: () => req('GET', '/flows'), + createFlow: (f) => req('POST', '/flows', f), + getFlow: (id) => req('GET', `/flows/${id}`), + updateFlow: (id, f) => req('PUT', `/flows/${id}`, f), + deleteFlow: (id) => req('DELETE', `/flows/${id}`), + startFlow: (id) => req('POST', `/flows/${id}/start`), + stopFlow: (id) => req('POST', `/flows/${id}/stop`), + flowStats: (id) => req('GET', `/flows/${id}/stats`), + + // Tests + tests: () => req('GET', '/tests'), + createTest: (t) => req('POST', '/tests', t), + getTest: (id) => req('GET', `/tests/${id}`), + startTest: (id) => req('POST', `/tests/${id}/start`), + stopTest: (id) => req('POST', `/tests/${id}/stop`), + testResults: (id) => req('GET', `/tests/${id}/results`), + + // Presets + presets: () => req('GET', '/presets'), + loadPreset: (name, overrides) => req('POST', `/presets/${name}`, overrides), + + // Stats + statsHistory: () => req('GET', '/stats/history'), + + // Responder + responderStats: () => req('GET', '/responder/stats'), + responderReset: () => req('POST', '/responder/reset'), +} diff --git a/traffic-gen-ui/src/components/FlowBuilder.vue b/traffic-gen-ui/src/components/FlowBuilder.vue new file mode 100644 index 0000000..bfaa1dc --- /dev/null +++ b/traffic-gen-ui/src/components/FlowBuilder.vue @@ -0,0 +1,126 @@ + + + + + diff --git a/traffic-gen-ui/src/components/FlowTable.vue b/traffic-gen-ui/src/components/FlowTable.vue new file mode 100644 index 0000000..9a21610 --- /dev/null +++ b/traffic-gen-ui/src/components/FlowTable.vue @@ -0,0 +1,92 @@ + + + + + diff --git a/traffic-gen-ui/src/components/ResultsPanel.vue b/traffic-gen-ui/src/components/ResultsPanel.vue new file mode 100644 index 0000000..2cc0f62 --- /dev/null +++ b/traffic-gen-ui/src/components/ResultsPanel.vue @@ -0,0 +1,126 @@ + + + + + diff --git a/traffic-gen-ui/src/components/StatsMonitor.vue b/traffic-gen-ui/src/components/StatsMonitor.vue new file mode 100644 index 0000000..52ba339 --- /dev/null +++ b/traffic-gen-ui/src/components/StatsMonitor.vue @@ -0,0 +1,129 @@ + + + + + diff --git a/traffic-gen-ui/src/components/StatusBar.vue b/traffic-gen-ui/src/components/StatusBar.vue new file mode 100644 index 0000000..b4857e4 --- /dev/null +++ b/traffic-gen-ui/src/components/StatusBar.vue @@ -0,0 +1,37 @@ + + + + + diff --git a/traffic-gen-ui/src/components/TestBuilder.vue b/traffic-gen-ui/src/components/TestBuilder.vue new file mode 100644 index 0000000..8db2f6b --- /dev/null +++ b/traffic-gen-ui/src/components/TestBuilder.vue @@ -0,0 +1,126 @@ + + + + + diff --git a/traffic-gen-ui/src/components/TestRunner.vue b/traffic-gen-ui/src/components/TestRunner.vue new file mode 100644 index 0000000..8ffd210 --- /dev/null +++ b/traffic-gen-ui/src/components/TestRunner.vue @@ -0,0 +1,97 @@ + + + + + diff --git a/traffic-gen-ui/src/main.js b/traffic-gen-ui/src/main.js new file mode 100644 index 0000000..1d4acd7 --- /dev/null +++ b/traffic-gen-ui/src/main.js @@ -0,0 +1,3 @@ +import { createApp } from 'vue' +import App from './App.vue' +createApp(App).mount('#app') diff --git a/traffic-gen-ui/vite.config.js b/traffic-gen-ui/vite.config.js new file mode 100644 index 0000000..4c9535b --- /dev/null +++ b/traffic-gen-ui/vite.config.js @@ -0,0 +1,14 @@ +import { defineConfig } from 'vite' +import vue from '@vitejs/plugin-vue' + +export default defineConfig({ + plugins: [vue()], + server: { + proxy: { + '/api': { + target: 'http://localhost:5051', + rewrite: path => path.replace(/^\/api/, '') + } + } + } +}) diff --git a/traffic-gen/Dockerfile b/traffic-gen/Dockerfile new file mode 100644 index 0000000..3ea05ff --- /dev/null +++ b/traffic-gen/Dockerfile @@ -0,0 +1,8 @@ +FROM python:3.11-slim +RUN apt-get update && apt-get install -y --no-install-recommends \ + tcpreplay libpcap-dev procps && rm -rf /var/lib/apt/lists/* +RUN pip install --no-cache-dir flask scapy psutil +COPY . /traffic-gen/ +WORKDIR /traffic-gen +EXPOSE 5051 +CMD ["python3", "server.py"] diff --git a/traffic-gen/__pycache__/server.cpython-310.pyc b/traffic-gen/__pycache__/server.cpython-310.pyc new file mode 100644 index 0000000..4256d2b Binary files /dev/null and b/traffic-gen/__pycache__/server.cpython-310.pyc differ diff --git a/traffic-gen/engine/__init__.py b/traffic-gen/engine/__init__.py new file mode 100644 index 0000000..689c824 --- /dev/null +++ b/traffic-gen/engine/__init__.py @@ -0,0 +1 @@ +# traffic-gen engine package diff --git a/traffic-gen/engine/__pycache__/__init__.cpython-310.pyc b/traffic-gen/engine/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..c68362c Binary files /dev/null and b/traffic-gen/engine/__pycache__/__init__.cpython-310.pyc differ diff --git a/traffic-gen/engine/__pycache__/packet_builder.cpython-310.pyc b/traffic-gen/engine/__pycache__/packet_builder.cpython-310.pyc new file mode 100644 index 0000000..02c7ab7 Binary files /dev/null and b/traffic-gen/engine/__pycache__/packet_builder.cpython-310.pyc differ diff --git a/traffic-gen/engine/__pycache__/responder.cpython-310.pyc b/traffic-gen/engine/__pycache__/responder.cpython-310.pyc new file mode 100644 index 0000000..3c01da5 Binary files /dev/null and b/traffic-gen/engine/__pycache__/responder.cpython-310.pyc differ diff --git a/traffic-gen/engine/__pycache__/rfc2544.cpython-310.pyc b/traffic-gen/engine/__pycache__/rfc2544.cpython-310.pyc new file mode 100644 index 0000000..7b867b7 Binary files /dev/null and b/traffic-gen/engine/__pycache__/rfc2544.cpython-310.pyc differ diff --git a/traffic-gen/engine/__pycache__/sender.cpython-310.pyc b/traffic-gen/engine/__pycache__/sender.cpython-310.pyc new file mode 100644 index 0000000..6974b92 Binary files /dev/null and b/traffic-gen/engine/__pycache__/sender.cpython-310.pyc differ diff --git a/traffic-gen/engine/__pycache__/stats.cpython-310.pyc b/traffic-gen/engine/__pycache__/stats.cpython-310.pyc new file mode 100644 index 0000000..e95f35d Binary files /dev/null and b/traffic-gen/engine/__pycache__/stats.cpython-310.pyc differ diff --git a/traffic-gen/engine/packet_builder.py b/traffic-gen/engine/packet_builder.py new file mode 100644 index 0000000..a76f749 --- /dev/null +++ b/traffic-gen/engine/packet_builder.py @@ -0,0 +1,120 @@ +""" +Packet Builder - constructs Scapy packets from flow configuration. + +Each generated packet embeds: + - Magic bytes b'TGEN' (4 bytes) + - Sequence number (4 bytes, big-endian) + - Sender timestamp in nanoseconds (8 bytes, big-endian) + - Padding to reach requested frame_size +""" + +import struct +import time + +from scapy.all import ( + Ether, IP, UDP, TCP, ICMP, Dot1Q, Raw, conf, +) + +MAGIC = b'TGEN' +HEADER_LEN = 4 + 4 + 8 # magic + seq + timestamp_ns + + +def _build_payload(seq: int, frame_size: int, header_overhead: int) -> Raw: + """Build payload with magic bytes, sequence number, timestamp placeholder, + and padding to reach the desired frame_size.""" + timestamp_ns = time.time_ns() + header = MAGIC + struct.pack('!I', seq) + struct.pack('!Q', timestamp_ns) + # frame_size includes Ethernet header (14) + FCS (4) in standard accounting, + # but Scapy doesn't add FCS, so we target frame_size - 4 total bytes on wire. + # header_overhead accounts for Ether + IP + L4 headers already present. + pad_len = max(0, frame_size - 4 - header_overhead - HEADER_LEN) + return Raw(load=header + (b'\x00' * pad_len)) + + +def stamp_payload(payload_bytes: bytes, seq: int) -> bytes: + """Re-stamp an existing payload with a new sequence number and fresh timestamp.""" + timestamp_ns = time.time_ns() + return ( + MAGIC + + struct.pack('!I', seq) + + struct.pack('!Q', timestamp_ns) + + payload_bytes[HEADER_LEN:] + ) + + +def parse_payload(payload_bytes: bytes): + """Extract (seq, timestamp_ns) from a TGEN payload, or None if invalid.""" + if len(payload_bytes) < HEADER_LEN: + return None + if payload_bytes[:4] != MAGIC: + return None + seq = struct.unpack('!I', payload_bytes[4:8])[0] + timestamp_ns = struct.unpack('!Q', payload_bytes[8:16])[0] + return seq, timestamp_ns + + +def build_packet(flow_config: dict, seq: int = 0): + """Build a Scapy packet from a flow configuration dict. + + Required keys: + dst_ip, protocol + + Optional keys: + src_mac, dst_mac, src_ip, src_port, dst_port, dscp, vlan_id, frame_size + """ + protocol = flow_config.get('protocol', 'udp').lower() + frame_size = flow_config.get('frame_size', 512) + + # --- Layer 2 --- + src_mac = flow_config.get('src_mac', 'auto') + dst_mac = flow_config.get('dst_mac') + + ether_kwargs = {} + if src_mac and src_mac != 'auto': + ether_kwargs['src'] = src_mac + if dst_mac: + ether_kwargs['dst'] = dst_mac + + pkt = Ether(**ether_kwargs) + header_overhead = 14 # Ethernet + + # --- VLAN --- + vlan_id = flow_config.get('vlan_id') + if vlan_id is not None: + pkt = pkt / Dot1Q(vlan=int(vlan_id)) + header_overhead += 4 + + # --- Layer 3 --- + ip_kwargs = {'dst': flow_config['dst_ip']} + src_ip = flow_config.get('src_ip') + if src_ip: + ip_kwargs['src'] = src_ip + + dscp = flow_config.get('dscp', 0) + if dscp: + ip_kwargs['tos'] = int(dscp) << 2 + + pkt = pkt / IP(**ip_kwargs) + header_overhead += 20 # IP (no options) + + # --- Layer 4 --- + if protocol == 'udp': + src_port = flow_config.get('src_port', 12000) + dst_port = flow_config.get('dst_port', 5001) + pkt = pkt / UDP(sport=int(src_port), dport=int(dst_port)) + header_overhead += 8 + elif protocol == 'tcp': + src_port = flow_config.get('src_port', 12000) + dst_port = flow_config.get('dst_port', 80) + pkt = pkt / TCP(sport=int(src_port), dport=int(dst_port), flags='S') + header_overhead += 20 + elif protocol == 'icmp': + pkt = pkt / ICMP() + header_overhead += 8 + else: + raise ValueError(f'Unsupported protocol: {protocol}') + + # --- Payload --- + pkt = pkt / _build_payload(seq, frame_size, header_overhead) + + return pkt diff --git a/traffic-gen/engine/responder.py b/traffic-gen/engine/responder.py new file mode 100644 index 0000000..078b335 --- /dev/null +++ b/traffic-gen/engine/responder.py @@ -0,0 +1,185 @@ +""" +Responder - listens for TGEN-tagged packets and collects receive statistics. + +Two sub-modes: + - echo: swaps src/dst MAC and IP, sends packet back with receive timestamp + - log: records rx stats only, exposed via API +""" + +import logging +import struct +import threading +import time + +from scapy.all import ( + AsyncSniffer, Ether, IP, Raw, send, conf, +) + +from engine.packet_builder import MAGIC, HEADER_LEN, parse_payload + +log = logging.getLogger(__name__) +conf.verb = 0 + + +class Responder: + """Listens for TGEN packets on an interface and collects stats.""" + + def __init__(self, mode: str = 'log'): + """ + Args: + mode: 'echo' to reflect packets back, 'log' to only record stats. + """ + self._mode = mode + self._lock = threading.Lock() + self._sniffer = None + self._running = False + + # Stats + self._rx_packets = 0 + self._rx_bytes = 0 + self._latency_samples = [] # list of (latency_ms,) + self._seen_seqs = set() + self._last_seq = -1 + self._out_of_order = 0 + self._duplicates = 0 + + # ------------------------------------------------------------------ + # Control + # ------------------------------------------------------------------ + + def start(self, interface: str = None): + """Start sniffing for TGEN packets.""" + if self._running: + log.warning('Responder already running') + return + + bpf_filter = 'ip' # broad filter; we check magic in callback + kwargs = { + 'prn': self._handle_packet, + 'store': False, + 'filter': bpf_filter, + } + if interface: + kwargs['iface'] = interface + + self._sniffer = AsyncSniffer(**kwargs) + self._sniffer.start() + self._running = True + log.info('Responder started on interface=%s mode=%s', interface or 'all', self._mode) + + def stop(self): + """Stop sniffing.""" + if self._sniffer and self._running: + self._sniffer.stop() + self._running = False + log.info('Responder stopped') + + def is_running(self) -> bool: + return self._running + + # ------------------------------------------------------------------ + # Stats + # ------------------------------------------------------------------ + + def get_stats(self) -> dict: + with self._lock: + latency = {} + if self._latency_samples: + vals = self._latency_samples + latency = { + 'min_ms': round(min(vals), 3), + 'max_ms': round(max(vals), 3), + 'avg_ms': round(sum(vals) / len(vals), 3), + 'jitter_ms': round( + sum(abs(vals[i] - vals[i - 1]) for i in range(1, len(vals))) / max(1, len(vals) - 1), + 3 + ) if len(vals) > 1 else 0.0, + 'samples': len(vals), + } + return { + 'rx_packets': self._rx_packets, + 'rx_bytes': self._rx_bytes, + 'out_of_order': self._out_of_order, + 'duplicates': self._duplicates, + 'latency': latency, + 'running': self._running, + } + + def reset_stats(self): + with self._lock: + self._rx_packets = 0 + self._rx_bytes = 0 + self._latency_samples = [] + self._seen_seqs.clear() + self._last_seq = -1 + self._out_of_order = 0 + self._duplicates = 0 + log.info('Responder stats reset') + + # ------------------------------------------------------------------ + # Packet handling + # ------------------------------------------------------------------ + + def _handle_packet(self, pkt): + """Process a received packet, checking for TGEN magic bytes.""" + if not pkt.haslayer(Raw): + return + + payload = bytes(pkt[Raw].load) + parsed = parse_payload(payload) + if parsed is None: + return + + seq, sender_ts_ns = parsed + rx_time_ns = time.time_ns() + pkt_len = len(bytes(pkt)) + + # Compute one-way latency (only meaningful if clocks are synced) + latency_ms = (rx_time_ns - sender_ts_ns) / 1_000_000 + + with self._lock: + self._rx_packets += 1 + self._rx_bytes += pkt_len + + # Duplicate detection + if seq in self._seen_seqs: + self._duplicates += 1 + else: + self._seen_seqs.add(seq) + # Keep set bounded + if len(self._seen_seqs) > 100000: + # Remove oldest entries (approximate) + to_remove = sorted(self._seen_seqs)[:50000] + self._seen_seqs -= set(to_remove) + + # Out-of-order detection + if seq < self._last_seq and seq not in self._seen_seqs: + self._out_of_order += 1 + self._last_seq = seq + + # Record latency (only if plausible: 0 < latency < 60s) + if 0 < latency_ms < 60000: + self._latency_samples.append(latency_ms) + if len(self._latency_samples) > 10000: + self._latency_samples = self._latency_samples[-5000:] + + # Echo mode: swap and send back + if self._mode == 'echo' and pkt.haslayer(Ether) and pkt.haslayer(IP): + try: + echo_pkt = pkt.copy() + # Swap MACs + echo_pkt[Ether].src, echo_pkt[Ether].dst = pkt[Ether].dst, pkt[Ether].src + # Swap IPs + echo_pkt[IP].src, echo_pkt[IP].dst = pkt[IP].dst, pkt[IP].src + # Append receive timestamp to payload + rx_ts_bytes = struct.pack('!Q', rx_time_ns) + echo_pkt[Raw].load = payload + rx_ts_bytes + # Clear checksums so Scapy recalculates + del echo_pkt[IP].chksum + if echo_pkt.haslayer('UDP'): + del echo_pkt['UDP'].chksum + elif echo_pkt.haslayer('TCP'): + del echo_pkt['TCP'].chksum + send(echo_pkt[IP], verbose=0) + except Exception as e: + log.debug('Echo send error: %s', e) diff --git a/traffic-gen/engine/rfc2544.py b/traffic-gen/engine/rfc2544.py new file mode 100644 index 0000000..879fed4 --- /dev/null +++ b/traffic-gen/engine/rfc2544.py @@ -0,0 +1,336 @@ +""" +RFC 2544 test implementations: + - ThroughputTest: binary search for max zero-loss throughput + - LatencyTest: measure latency at a given rate + - FrameLossTest: measure loss at decreasing rates + - BackToBackTest: find max burst length with zero loss +""" + +import logging +import threading +import time +from typing import Dict, List, Optional + +from scapy.all import send, sr, conf, IP, ICMP + +from engine.packet_builder import build_packet, parse_payload + +log = logging.getLogger(__name__) +conf.verb = 0 + + +class _BaseTest: + """Base class for RFC 2544 tests.""" + + def __init__(self, test_id: str, flow_config: dict, frame_sizes: List[int], + trial_duration: float = 60, max_rate_pps: int = 10000, + acceptable_loss_pct: float = 0.0): + self.test_id = test_id + self.flow_config = dict(flow_config) + self.frame_sizes = frame_sizes + self.trial_duration = trial_duration + self.max_rate_pps = max_rate_pps + self.acceptable_loss_pct = acceptable_loss_pct + + self.state = 'idle' # idle -> running -> complete/error + self.results = {} + self.error = None + self.started_at = None + self.completed_at = None + + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._lock = threading.Lock() + + def start(self): + if self.state == 'running': + return + self._stop_event.clear() + self.state = 'running' + self.started_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + self.results = {} + self.error = None + self._thread = threading.Thread(target=self._run_safe, daemon=True, + name=f'test-{self.test_id[:8]}') + self._thread.start() + + def stop(self): + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=self.trial_duration + 5) + if self.state == 'running': + self.state = 'error' + self.error = 'Cancelled by user' + + def _run_safe(self): + try: + self._run() + if self.state == 'running': + self.state = 'complete' + except Exception as e: + log.error('Test %s error: %s', self.test_id[:8], e) + self.state = 'error' + self.error = str(e) + finally: + self.completed_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + + def _run(self): + raise NotImplementedError + + def _is_stopped(self) -> bool: + return self._stop_event.is_set() + + def _send_trial(self, frame_size: int, rate_pps: int, duration: float): + """Send packets at a given rate for a duration. Returns (tx_count, rx_count, latencies).""" + flow = dict(self.flow_config) + flow['frame_size'] = frame_size + + interval = 1.0 / rate_pps if rate_pps > 0 else 1.0 + tx_count = 0 + rx_count = 0 + latencies = [] + protocol = flow.get('protocol', 'udp').lower() + + start = time.time() + seq = 0 + + while (time.time() - start) < duration and not self._is_stopped(): + pkt = build_packet(flow, seq=seq) + seq += 1 + + if protocol == 'icmp': + # Use sr() for ICMP to get responses + answered, _ = sr(pkt[IP], timeout=1, verbose=0) + tx_count += 1 + for sent_pkt, recv_pkt in answered: + rx_count += 1 + rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000 + latencies.append(rtt_ms) + else: + send(pkt[IP], verbose=0) + tx_count += 1 + + # Rate limiting + elapsed = time.time() - start + expected_sent = elapsed * rate_pps + if tx_count > expected_sent: + sleep_time = (tx_count - expected_sent) / rate_pps + if sleep_time > 0: + self._stop_event.wait(min(sleep_time, 0.1)) + + # For non-ICMP, we can't easily measure rx without a responder. + # rx_count stays 0 for UDP/TCP unless a responder is configured. + return tx_count, rx_count, latencies + + def get_info(self) -> dict: + return { + 'test_id': self.test_id, + 'type': self.__class__.__name__, + 'state': self.state, + 'results': self.results, + 'error': self.error, + 'started_at': self.started_at, + 'completed_at': self.completed_at, + } + + +class ThroughputTest(_BaseTest): + """Binary search for maximum throughput with acceptable loss.""" + + def _run(self): + for fs in self.frame_sizes: + if self._is_stopped(): + break + + low = 0 + high = self.max_rate_pps + best_rate = 0 + convergence_threshold = max(1, int(self.max_rate_pps * 0.01)) + + log.info('Throughput test: frame_size=%d, searching [%d, %d] pps', fs, low, high) + + while (high - low) > convergence_threshold and not self._is_stopped(): + mid = (low + high) // 2 + if mid == 0: + break + + tx, rx, _ = self._send_trial(fs, mid, self.trial_duration) + + if tx == 0: + loss_pct = 100.0 + else: + # For ICMP we have rx; for UDP assume zero loss if no responder + protocol = self.flow_config.get('protocol', 'udp').lower() + if protocol == 'icmp': + loss_pct = ((tx - rx) / tx) * 100 + else: + # Without responder, assume success (user should use responder for accurate test) + loss_pct = 0.0 + + log.info(' frame=%d rate=%d tx=%d rx=%d loss=%.2f%%', + fs, mid, tx, rx, loss_pct) + + if loss_pct <= self.acceptable_loss_pct: + best_rate = mid + low = mid + 1 + else: + high = mid - 1 + + self.results[str(fs)] = { + 'max_throughput_pps': best_rate, + 'frame_size': fs, + } + log.info('Throughput result: frame_size=%d -> %d pps', fs, best_rate) + + +class LatencyTest(_BaseTest): + """Measure latency at a specified rate.""" + + def _run(self): + rate = self.flow_config.get('rate_pps', 100) + + for fs in self.frame_sizes: + if self._is_stopped(): + break + + log.info('Latency test: frame_size=%d at %d pps for %ds', fs, rate, self.trial_duration) + _, _, latencies = self._send_trial(fs, rate, self.trial_duration) + + if latencies: + avg_ms = sum(latencies) / len(latencies) + min_ms = min(latencies) + max_ms = max(latencies) + jitter_ms = ( + sum(abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies))) + / max(1, len(latencies) - 1) + ) if len(latencies) > 1 else 0.0 + + self.results[str(fs)] = { + 'frame_size': fs, + 'min_ms': round(min_ms, 3), + 'avg_ms': round(avg_ms, 3), + 'max_ms': round(max_ms, 3), + 'jitter_ms': round(jitter_ms, 3), + 'samples': len(latencies), + } + else: + self.results[str(fs)] = { + 'frame_size': fs, + 'min_ms': None, 'avg_ms': None, 'max_ms': None, + 'jitter_ms': None, 'samples': 0, + 'note': 'No responses received (use ICMP or configure responder)', + } + log.info('Latency result: frame_size=%d -> %s', fs, self.results[str(fs)]) + + +class FrameLossTest(_BaseTest): + """Measure frame loss at decreasing rates (100%, 90%, 80%, ...).""" + + def _run(self): + for fs in self.frame_sizes: + if self._is_stopped(): + break + + results_for_size = [] + for pct in range(100, 0, -10): + if self._is_stopped(): + break + + rate = int(self.max_rate_pps * pct / 100) + if rate == 0: + continue + + log.info('FrameLoss test: frame_size=%d rate=%d (%d%%)', fs, rate, pct) + tx, rx, _ = self._send_trial(fs, rate, self.trial_duration) + + protocol = self.flow_config.get('protocol', 'udp').lower() + if tx > 0 and protocol == 'icmp': + loss_pct = ((tx - rx) / tx) * 100 + else: + loss_pct = 0.0 # Cannot measure without responder for non-ICMP + + results_for_size.append({ + 'rate_pct': pct, + 'rate_pps': rate, + 'tx_packets': tx, + 'rx_packets': rx, + 'loss_pct': round(loss_pct, 3), + }) + + self.results[str(fs)] = results_for_size + + +class BackToBackTest(_BaseTest): + """Find maximum burst length with zero loss.""" + + def _run(self): + for fs in self.frame_sizes: + if self._is_stopped(): + break + + low = 1 + high = self.max_rate_pps # Use max_rate_pps as max burst length + best_burst = 0 + convergence = max(1, high // 100) + + log.info('BackToBack test: frame_size=%d searching burst [%d, %d]', fs, low, high) + + while (high - low) > convergence and not self._is_stopped(): + mid = (low + high) // 2 + if mid == 0: + break + + flow = dict(self.flow_config) + flow['frame_size'] = fs + protocol = flow.get('protocol', 'udp').lower() + + # Send burst of 'mid' packets as fast as possible + tx_count = 0 + rx_count = 0 + for seq in range(mid): + if self._is_stopped(): + break + pkt = build_packet(flow, seq=seq) + if protocol == 'icmp': + answered, _ = sr(pkt[IP], timeout=0.5, verbose=0) + tx_count += 1 + rx_count += len(answered) + else: + send(pkt[IP], verbose=0) + tx_count += 1 + + if protocol == 'icmp' and tx_count > 0: + loss_pct = ((tx_count - rx_count) / tx_count) * 100 + else: + loss_pct = 0.0 # Can't measure without responder + + log.info(' burst=%d tx=%d rx=%d loss=%.2f%%', mid, tx_count, rx_count, loss_pct) + + if loss_pct <= self.acceptable_loss_pct: + best_burst = mid + low = mid + 1 + else: + high = mid - 1 + + self.results[str(fs)] = { + 'frame_size': fs, + 'max_burst_frames': best_burst, + } + log.info('BackToBack result: frame_size=%d -> %d frames', fs, best_burst) + + +# Factory function +TEST_TYPES = { + 'throughput': ThroughputTest, + 'latency': LatencyTest, + 'frame_loss': FrameLossTest, + 'back_to_back': BackToBackTest, +} + + +def create_test(test_id: str, test_type: str, flow_config: dict, **kwargs): + """Create an RFC 2544 test instance by type name.""" + cls = TEST_TYPES.get(test_type) + if cls is None: + raise ValueError(f'Unknown test type: {test_type}. Available: {list(TEST_TYPES)}') + return cls(test_id=test_id, flow_config=flow_config, **kwargs) diff --git a/traffic-gen/engine/sender.py b/traffic-gen/engine/sender.py new file mode 100644 index 0000000..2d96d00 --- /dev/null +++ b/traffic-gen/engine/sender.py @@ -0,0 +1,247 @@ +""" +FlowSender - manages traffic generation with background threads per flow. +""" + +import logging +import shutil +import threading +import time +import urllib.request +import json + +from scapy.all import send, sendpfast, sr, conf + +from engine.packet_builder import build_packet, stamp_payload + +log = logging.getLogger(__name__) + +# Suppress Scapy verbosity globally +conf.verb = 0 + +HAS_TCPREPLAY = shutil.which('tcpreplay') is not None + + +class FlowSender: + """Manages sending threads for multiple flows.""" + + def __init__(self): + self._lock = threading.Lock() + self._flows = {} # flow_id -> flow_config dict + self._threads = {} # flow_id -> Thread + self._stop_events = {} # flow_id -> Event + self._stats = {} # flow_id -> {tx_packets, tx_bytes, ...} + + # ------------------------------------------------------------------ + # Flow CRUD + # ------------------------------------------------------------------ + + def add_flow(self, flow_id: str, flow_config: dict): + with self._lock: + self._flows[flow_id] = flow_config + self._stats[flow_id] = { + 'tx_packets': 0, 'tx_bytes': 0, + 'rx_packets': 0, 'rx_bytes': 0, + 'latency_samples': [], + } + + def get_flow(self, flow_id: str): + with self._lock: + return self._flows.get(flow_id) + + def get_all_flows(self): + with self._lock: + return dict(self._flows) + + def update_flow(self, flow_id: str, updates: dict): + with self._lock: + if flow_id not in self._flows: + return False + self._flows[flow_id].update(updates) + return True + + def remove_flow(self, flow_id: str): + self.stop(flow_id) + with self._lock: + self._flows.pop(flow_id, None) + self._stats.pop(flow_id, None) + + # ------------------------------------------------------------------ + # Start / Stop + # ------------------------------------------------------------------ + + def start(self, flow_id: str): + with self._lock: + if flow_id not in self._flows: + raise KeyError(f'Flow {flow_id} not found') + if flow_id in self._threads and self._threads[flow_id].is_alive(): + return # already running + self._flows[flow_id]['state'] = 'running' + self._stats[flow_id] = { + 'tx_packets': 0, 'tx_bytes': 0, + 'rx_packets': 0, 'rx_bytes': 0, + 'latency_samples': [], + } + stop_event = threading.Event() + self._stop_events[flow_id] = stop_event + t = threading.Thread( + target=self._send_loop, + args=(flow_id, stop_event), + daemon=True, + name=f'sender-{flow_id[:8]}', + ) + self._threads[flow_id] = t + t.start() + + def stop(self, flow_id: str): + with self._lock: + ev = self._stop_events.pop(flow_id, None) + if ev: + ev.set() + t = self._threads.pop(flow_id, None) + if flow_id in self._flows: + self._flows[flow_id]['state'] = 'stopped' + if t and t.is_alive(): + t.join(timeout=5) + + def is_running(self, flow_id: str) -> bool: + with self._lock: + t = self._threads.get(flow_id) + return t is not None and t.is_alive() + + # ------------------------------------------------------------------ + # Stats + # ------------------------------------------------------------------ + + def get_stats(self, flow_id: str) -> dict: + with self._lock: + s = self._stats.get(flow_id, {}) + return dict(s) + + def get_all_stats(self) -> dict: + with self._lock: + return {fid: dict(s) for fid, s in self._stats.items()} + + # ------------------------------------------------------------------ + # Internal send loop + # ------------------------------------------------------------------ + + def _send_loop(self, flow_id: str, stop_event: threading.Event): + with self._lock: + flow = dict(self._flows[flow_id]) + + rate_pps = flow.get('rate_pps', 1000) + duration = flow.get('duration', 30) + protocol = flow.get('protocol', 'udp').lower() + responder_url = flow.get('responder_url') + use_icmp_sr = (protocol == 'icmp' and not responder_url) + + # Build template packet + pkt_template = build_packet(flow, seq=0) + pkt_bytes_len = len(bytes(pkt_template)) + + # Calculate sleep interval: send in batches for efficiency + batch_size = max(1, min(rate_pps // 10, 100)) + interval = batch_size / rate_pps if rate_pps > 0 else 1.0 + + seq = 0 + start_time = time.time() + last_responder_poll = 0 + log.info('Flow %s: starting send loop at %d pps for %ds', flow_id[:8], rate_pps, duration) + + try: + while not stop_event.is_set(): + elapsed = time.time() - start_time + if duration and elapsed >= duration: + break + + if use_icmp_sr: + # ICMP mode: use sr() to measure latency from responses + pkt = build_packet(flow, seq=seq) + answered, _ = sr(pkt[pkt.firstlayer().payload.__class__], + timeout=1, verbose=0) + with self._lock: + stats = self._stats.get(flow_id) + if stats: + stats['tx_packets'] += 1 + stats['tx_bytes'] += pkt_bytes_len + for sent_pkt, recv_pkt in answered: + rtt_ms = (recv_pkt.time - sent_pkt.sent_time) * 1000 + stats['rx_packets'] += 1 + stats['rx_bytes'] += len(bytes(recv_pkt)) + stats['latency_samples'].append(rtt_ms) + # Keep only last 1000 samples + if len(stats['latency_samples']) > 1000: + stats['latency_samples'] = stats['latency_samples'][-1000:] + seq += 1 + # Rate limit for ICMP + sleep_time = (1.0 / rate_pps) - (time.time() - start_time - elapsed) + if sleep_time > 0: + stop_event.wait(sleep_time) + else: + # UDP/TCP mode: send batches + packets = [] + for _ in range(batch_size): + pkt = build_packet(flow, seq=seq) + packets.append(pkt) + seq += 1 + + try: + if HAS_TCPREPLAY and rate_pps >= 1000: + sendpfast(packets, pps=rate_pps, loop=0) + else: + for p in packets: + send(p[p.firstlayer().payload.__class__], verbose=0) + except Exception as e: + # Fallback: basic send + log.debug('Send error (falling back): %s', e) + for p in packets: + try: + send(p[p.firstlayer().payload.__class__], verbose=0) + except Exception: + pass + + with self._lock: + stats = self._stats.get(flow_id) + if stats: + stats['tx_packets'] += len(packets) + stats['tx_bytes'] += pkt_bytes_len * len(packets) + + # Poll responder for rx stats periodically + if responder_url and (time.time() - last_responder_poll) >= 2.0: + self._poll_responder(flow_id, responder_url) + last_responder_poll = time.time() + + # Rate limit + stop_event.wait(interval) + + except Exception as e: + log.error('Flow %s: send loop error: %s', flow_id[:8], e) + finally: + with self._lock: + if flow_id in self._flows: + self._flows[flow_id]['state'] = 'stopped' + # Final responder poll + if responder_url: + self._poll_responder(flow_id, responder_url) + log.info('Flow %s: send loop finished. seq=%d', flow_id[:8], seq) + + def _poll_responder(self, flow_id: str, responder_url: str): + """Poll a responder's /responder/stats endpoint for rx metrics.""" + try: + url = responder_url.rstrip('/') + '/responder/stats' + req = urllib.request.Request(url, method='GET') + req.add_header('Accept', 'application/json') + with urllib.request.urlopen(req, timeout=2) as resp: + data = json.loads(resp.read().decode()) + with self._lock: + stats = self._stats.get(flow_id) + if stats: + stats['rx_packets'] = data.get('rx_packets', 0) + stats['rx_bytes'] = data.get('rx_bytes', 0) + lat = data.get('latency', {}) + if lat.get('avg_ms') is not None: + stats['latency_samples'].append(lat['avg_ms']) + if len(stats['latency_samples']) > 1000: + stats['latency_samples'] = stats['latency_samples'][-1000:] + except Exception as e: + log.debug('Responder poll error for flow %s: %s', flow_id[:8], e) diff --git a/traffic-gen/engine/stats.py b/traffic-gen/engine/stats.py new file mode 100644 index 0000000..fed2c5f --- /dev/null +++ b/traffic-gen/engine/stats.py @@ -0,0 +1,119 @@ +""" +StatsCollector - ring buffer of per-flow traffic statistics. + +Stores the last 300 samples (5 minutes at 1-second intervals) per flow, +with derived rates and loss calculations. +""" + +import threading +import time +from collections import defaultdict, deque + + +class StatsCollector: + """Collects and stores per-flow traffic statistics in a ring buffer.""" + + def __init__(self, max_samples: int = 300): + self._lock = threading.Lock() + self._max_samples = max_samples + # flow_id -> deque of sample dicts + self._history = defaultdict(lambda: deque(maxlen=self._max_samples)) + # flow_id -> previous counters for rate calculation + self._prev = {} + + def record(self, flow_id: str, tx_packets: int, tx_bytes: int, + rx_packets: int = 0, rx_bytes: int = 0, + latency: dict = None): + """Record a stats sample for a flow. + + Args: + flow_id: Flow identifier. + tx_packets: Cumulative transmitted packets. + tx_bytes: Cumulative transmitted bytes. + rx_packets: Cumulative received packets. + rx_bytes: Cumulative received bytes. + latency: Optional dict with min_ms, avg_ms, max_ms, jitter_ms. + """ + now = time.time() + + with self._lock: + prev = self._prev.get(flow_id) + + if prev is not None: + dt = now - prev['time'] + if dt > 0: + d_tx_pkts = tx_packets - prev['tx_packets'] + d_tx_bytes = tx_bytes - prev['tx_bytes'] + d_rx_pkts = rx_packets - prev['rx_packets'] + d_rx_bytes = rx_bytes - prev['rx_bytes'] + + tx_pps = d_tx_pkts / dt + rx_pps = d_rx_pkts / dt + tx_mbps = (d_tx_bytes * 8) / (dt * 1_000_000) + rx_mbps = (d_rx_bytes * 8) / (dt * 1_000_000) + else: + tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0 + else: + tx_pps = rx_pps = tx_mbps = rx_mbps = 0.0 + + # Loss calculation + loss_pct = 0.0 + if tx_packets > 0 and rx_packets > 0: + loss_pct = max(0.0, ((tx_packets - rx_packets) / tx_packets) * 100) + + sample = { + 'timestamp': now, + 'tx_packets': tx_packets, + 'tx_bytes': tx_bytes, + 'rx_packets': rx_packets, + 'rx_bytes': rx_bytes, + 'tx_pps': round(tx_pps, 2), + 'rx_pps': round(rx_pps, 2), + 'tx_mbps': round(tx_mbps, 4), + 'rx_mbps': round(rx_mbps, 4), + 'loss_pct': round(loss_pct, 3), + } + + if latency: + sample['latency'] = latency + + self._history[flow_id].append(sample) + self._prev[flow_id] = { + 'time': now, + 'tx_packets': tx_packets, + 'tx_bytes': tx_bytes, + 'rx_packets': rx_packets, + 'rx_bytes': rx_bytes, + } + + def get_history(self, flow_id: str, count: int = 60) -> list: + """Get the last N samples for a flow.""" + with self._lock: + history = self._history.get(flow_id) + if not history: + return [] + samples = list(history) + return samples[-count:] + + def get_latest(self, flow_id: str) -> dict: + """Get the most recent sample for a flow.""" + with self._lock: + history = self._history.get(flow_id) + if not history: + return {} + return dict(history[-1]) + + def get_all_latest(self) -> dict: + """Get the most recent sample for every flow.""" + with self._lock: + result = {} + for flow_id, history in self._history.items(): + if history: + result[flow_id] = dict(history[-1]) + return result + + def remove_flow(self, flow_id: str): + """Remove all data for a flow.""" + with self._lock: + self._history.pop(flow_id, None) + self._prev.pop(flow_id, None) diff --git a/traffic-gen/presets/__init__.py b/traffic-gen/presets/__init__.py new file mode 100644 index 0000000..3c645e8 --- /dev/null +++ b/traffic-gen/presets/__init__.py @@ -0,0 +1,75 @@ +""" +Built-in test presets for the traffic generator. +""" + +PRESETS = { + 'quick_icmp': { + 'description': 'Quick ICMP ping test to verify connectivity', + 'flow': { + 'protocol': 'icmp', + 'frame_size': 64, + 'rate_pps': 10, + 'duration': 10, + }, + }, + 'udp_flood_small': { + 'description': 'UDP flood with 64-byte frames at 1000 pps', + 'flow': { + 'protocol': 'udp', + 'dst_port': 5001, + 'frame_size': 64, + 'rate_pps': 1000, + 'duration': 30, + }, + }, + 'udp_flood_large': { + 'description': 'UDP flood with 1518-byte frames at 500 pps', + 'flow': { + 'protocol': 'udp', + 'dst_port': 5001, + 'frame_size': 1518, + 'rate_pps': 500, + 'duration': 30, + }, + }, + 'rfc2544_throughput': { + 'description': 'RFC 2544 throughput test across standard frame sizes', + 'flow': { + 'protocol': 'udp', + 'dst_port': 5001, + 'frame_size': 64, + 'rate_pps': 10000, + 'duration': 60, + }, + 'test': { + 'type': 'throughput', + 'frame_sizes': [64, 128, 256, 512, 1024, 1280, 1518], + 'trial_duration': 60, + 'acceptable_loss_pct': 0.0, + }, + }, + 'rfc2544_latency': { + 'description': 'RFC 2544 latency test at moderate rate', + 'flow': { + 'protocol': 'icmp', + 'frame_size': 64, + 'rate_pps': 100, + 'duration': 30, + }, + 'test': { + 'type': 'latency', + 'frame_sizes': [64, 512, 1518], + 'trial_duration': 30, + }, + }, + 'tcp_session': { + 'description': 'TCP SYN flood at 100 pps (for testing ACL/firewall)', + 'flow': { + 'protocol': 'tcp', + 'dst_port': 80, + 'frame_size': 64, + 'rate_pps': 100, + 'duration': 30, + }, + }, +} diff --git a/traffic-gen/presets/__pycache__/__init__.cpython-310.pyc b/traffic-gen/presets/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..1eb2ef1 Binary files /dev/null and b/traffic-gen/presets/__pycache__/__init__.cpython-310.pyc differ diff --git a/traffic-gen/requirements.txt b/traffic-gen/requirements.txt new file mode 100644 index 0000000..5972258 --- /dev/null +++ b/traffic-gen/requirements.txt @@ -0,0 +1,3 @@ +flask +scapy +psutil diff --git a/traffic-gen/server.py b/traffic-gen/server.py new file mode 100644 index 0000000..9739a8f --- /dev/null +++ b/traffic-gen/server.py @@ -0,0 +1,656 @@ +#!/usr/bin/env python3 +""" +Traffic Generator API Server + +Scapy-based traffic generator with a Flask API. Runs in two modes +controlled by TRAFFIC_GEN_MODE env var: + - sender (default): generates traffic, runs RFC 2544 tests + - responder: listens for test packets, echoes/timestamps, reports rx stats + +API endpoints: + GET /healthz - health + active flows/tests count + GET /interfaces - list available network interfaces + GET /mode - current mode + + Sender mode: + GET /flows - list all flows + POST /flows - create flow + GET /flows/ - get flow details + stats + PUT /flows/ - update flow (only if idle) + DELETE /flows/ - delete flow + POST /flows//start - start sending + POST /flows//stop - stop sending + GET /flows//stats - real-time stats + + GET /tests - list all tests + POST /tests - create RFC 2544 test + GET /tests/ - test details + results + POST /tests//start - start test + POST /tests//stop - abort test + GET /tests//results - exportable results + + GET /presets - list presets + POST /presets/ - create flow/test from preset + + GET /stats/history - historical stats for all flows + + Responder mode: + GET /responder/stats - receive statistics + POST /responder/reset - reset receive counters +""" + +import sys +import os +import json +import threading +import time +import logging +import uuid + +import psutil +from flask import Flask, request, jsonify + +# --- logging to stderr so stdout stays clean --- +logging.basicConfig( + stream=sys.stderr, + level=logging.INFO, + format='%(asctime)s [TGEN] %(levelname)s %(message)s', +) +log = logging.getLogger(__name__) + +app = Flask(__name__) + +# --------------------------------------------------------------------------- +# Global state (set in main()) +# --------------------------------------------------------------------------- +MODE = 'sender' # or 'responder' + +# Sender-mode globals +_sender = None # FlowSender instance +_stats_collector = None # StatsCollector instance +_flows_meta = {} # flow_id -> metadata dict (includes config) +_flows_lock = threading.Lock() + +_tests = {} # test_id -> RFC 2544 test instance +_tests_lock = threading.Lock() + +# Responder-mode globals +_responder = None # Responder instance + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _now_iso(): + return time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) + + +def _flow_response(flow_id: str) -> dict: + """Build a serializable flow dict.""" + with _flows_lock: + meta = _flows_meta.get(flow_id) + if meta is None: + return None + result = dict(meta) + if _sender: + result['is_running'] = _sender.is_running(flow_id) + result['stats'] = _sender.get_stats(flow_id) + return result + + +# --------------------------------------------------------------------------- +# Common endpoints +# --------------------------------------------------------------------------- + +@app.route('/healthz') +def health(): + with _flows_lock: + active_flows = sum( + 1 for fid in _flows_meta + if _sender and _sender.is_running(fid) + ) if MODE == 'sender' else 0 + with _tests_lock: + active_tests = sum( + 1 for t in _tests.values() if t.state == 'running' + ) if MODE == 'sender' else 0 + resp = { + 'status': 'ok', + 'mode': MODE, + 'active_flows': active_flows, + 'active_tests': active_tests, + } + if MODE == 'responder' and _responder: + resp['responder_running'] = _responder.is_running() + return jsonify(resp) + + +@app.route('/interfaces') +def list_interfaces(): + ifaces = [] + addrs = psutil.net_if_addrs() + stats = psutil.net_if_stats() + for name, addr_list in addrs.items(): + info = {'name': name, 'ip': None, 'mac': None, 'is_up': False} + for addr in addr_list: + if addr.family.name == 'AF_INET': + info['ip'] = addr.address + elif addr.family.name == 'AF_PACKET': + info['mac'] = addr.address + if name in stats: + info['is_up'] = stats[name].isup + ifaces.append(info) + return jsonify({'interfaces': ifaces}) + + +@app.route('/mode') +def get_mode(): + return jsonify({'mode': MODE}) + + +# --------------------------------------------------------------------------- +# Sender-mode: Flow endpoints +# --------------------------------------------------------------------------- + +@app.route('/flows', methods=['GET']) +def list_flows(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _flows_lock: + flow_ids = list(_flows_meta.keys()) + flows = [_flow_response(fid) for fid in flow_ids] + return jsonify({'count': len(flows), 'flows': flows}) + + +@app.route('/flows', methods=['POST']) +def create_flow(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + data = request.get_json(force=True) + + # Validate required fields + if 'dst_ip' not in data: + return jsonify({'error': 'dst_ip is required'}), 400 + if 'protocol' not in data: + return jsonify({'error': 'protocol is required'}), 400 + if data['protocol'].lower() not in ('udp', 'tcp', 'icmp'): + return jsonify({'error': 'protocol must be udp, tcp, or icmp'}), 400 + + flow_id = str(uuid.uuid4()) + flow_config = { + 'id': flow_id, + 'name': data.get('name', f'flow-{flow_id[:8]}'), + 'src_mac': data.get('src_mac', 'auto'), + 'dst_mac': data.get('dst_mac'), + 'src_ip': data.get('src_ip'), + 'dst_ip': data['dst_ip'], + 'protocol': data['protocol'].lower(), + 'src_port': data.get('src_port'), + 'dst_port': data.get('dst_port'), + 'frame_size': int(data.get('frame_size', 512)), + 'rate_pps': int(data.get('rate_pps', 1000)), + 'duration': int(data.get('duration', 30)), + 'dscp': int(data.get('dscp', 0)), + 'vlan_id': data.get('vlan_id'), + 'state': 'idle', + 'responder_url': data.get('responder_url'), + 'created_at': _now_iso(), + } + + with _flows_lock: + _flows_meta[flow_id] = flow_config + _sender.add_flow(flow_id, dict(flow_config)) + + log.info('Created flow %s (%s -> %s, %s)', + flow_id[:8], flow_config.get('src_ip', 'auto'), + flow_config['dst_ip'], flow_config['protocol']) + return jsonify(_flow_response(flow_id)), 201 + + +@app.route('/flows/', methods=['GET']) +def get_flow(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + result = _flow_response(flow_id) + if result is None: + return jsonify({'error': 'Flow not found'}), 404 + return jsonify(result) + + +@app.route('/flows/', methods=['PUT']) +def update_flow(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + with _flows_lock: + meta = _flows_meta.get(flow_id) + if meta is None: + return jsonify({'error': 'Flow not found'}), 404 + if meta.get('state') == 'running': + return jsonify({'error': 'Cannot update a running flow'}), 409 + + data = request.get_json(force=True) + updatable = [ + 'name', 'src_mac', 'dst_mac', 'src_ip', 'dst_ip', 'protocol', + 'src_port', 'dst_port', 'frame_size', 'rate_pps', 'duration', + 'dscp', 'vlan_id', 'responder_url', + ] + + with _flows_lock: + for key in updatable: + if key in data: + _flows_meta[flow_id][key] = data[key] + _sender.update_flow(flow_id, dict(_flows_meta[flow_id])) + + return jsonify(_flow_response(flow_id)) + + +@app.route('/flows/', methods=['DELETE']) +def delete_flow(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + with _flows_lock: + if flow_id not in _flows_meta: + return jsonify({'error': 'Flow not found'}), 404 + + _sender.remove_flow(flow_id) + if _stats_collector: + _stats_collector.remove_flow(flow_id) + + with _flows_lock: + _flows_meta.pop(flow_id, None) + + log.info('Deleted flow %s', flow_id[:8]) + return jsonify({'deleted': flow_id}) + + +@app.route('/flows//start', methods=['POST']) +def start_flow(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + with _flows_lock: + if flow_id not in _flows_meta: + return jsonify({'error': 'Flow not found'}), 404 + _flows_meta[flow_id]['state'] = 'running' + + try: + _sender.start(flow_id) + except KeyError: + return jsonify({'error': 'Flow not found in sender'}), 404 + + log.info('Started flow %s', flow_id[:8]) + return jsonify(_flow_response(flow_id)) + + +@app.route('/flows//stop', methods=['POST']) +def stop_flow(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + with _flows_lock: + if flow_id not in _flows_meta: + return jsonify({'error': 'Flow not found'}), 404 + _flows_meta[flow_id]['state'] = 'stopped' + + _sender.stop(flow_id) + + log.info('Stopped flow %s', flow_id[:8]) + return jsonify(_flow_response(flow_id)) + + +@app.route('/flows//stats', methods=['GET']) +def flow_stats(flow_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + with _flows_lock: + if flow_id not in _flows_meta: + return jsonify({'error': 'Flow not found'}), 404 + + stats = _sender.get_stats(flow_id) + latest = _stats_collector.get_latest(flow_id) if _stats_collector else {} + return jsonify({'flow_id': flow_id, 'counters': stats, 'rates': latest}) + + +# --------------------------------------------------------------------------- +# Sender-mode: Test endpoints +# --------------------------------------------------------------------------- + +@app.route('/tests', methods=['GET']) +def list_tests(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _tests_lock: + tests = [t.get_info() for t in _tests.values()] + return jsonify({'count': len(tests), 'tests': tests}) + + +@app.route('/tests', methods=['POST']) +def create_test(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + from engine.rfc2544 import create_test as _create_test + + data = request.get_json(force=True) + flow_id = data.get('flow_id') + test_type = data.get('type') + + if not flow_id or not test_type: + return jsonify({'error': 'flow_id and type are required'}), 400 + + with _flows_lock: + flow_meta = _flows_meta.get(flow_id) + if flow_meta is None: + return jsonify({'error': 'Flow not found'}), 404 + + test_id = str(uuid.uuid4()) + kwargs = { + 'frame_sizes': data.get('frame_sizes', [64, 512, 1518]), + 'trial_duration': float(data.get('trial_duration', 60)), + 'max_rate_pps': int(data.get('max_rate_pps', flow_meta.get('rate_pps', 10000))), + 'acceptable_loss_pct': float(data.get('acceptable_loss_pct', 0.0)), + } + + try: + test = _create_test(test_id, test_type, dict(flow_meta), **kwargs) + except ValueError as e: + return jsonify({'error': str(e)}), 400 + + with _tests_lock: + _tests[test_id] = test + + log.info('Created test %s (type=%s, flow=%s)', test_id[:8], test_type, flow_id[:8]) + return jsonify(test.get_info()), 201 + + +@app.route('/tests/', methods=['GET']) +def get_test(test_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _tests_lock: + test = _tests.get(test_id) + if test is None: + return jsonify({'error': 'Test not found'}), 404 + return jsonify(test.get_info()) + + +@app.route('/tests//start', methods=['POST']) +def start_test(test_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _tests_lock: + test = _tests.get(test_id) + if test is None: + return jsonify({'error': 'Test not found'}), 404 + test.start() + log.info('Started test %s', test_id[:8]) + return jsonify(test.get_info()) + + +@app.route('/tests//stop', methods=['POST']) +def stop_test(test_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _tests_lock: + test = _tests.get(test_id) + if test is None: + return jsonify({'error': 'Test not found'}), 404 + test.stop() + log.info('Stopped test %s', test_id[:8]) + return jsonify(test.get_info()) + + +@app.route('/tests//results', methods=['GET']) +def test_results(test_id): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + with _tests_lock: + test = _tests.get(test_id) + if test is None: + return jsonify({'error': 'Test not found'}), 404 + return jsonify({ + 'test_id': test_id, + 'type': test.__class__.__name__, + 'state': test.state, + 'results': test.results, + 'started_at': test.started_at, + 'completed_at': test.completed_at, + }) + + +# --------------------------------------------------------------------------- +# Sender-mode: Presets +# --------------------------------------------------------------------------- + +@app.route('/presets', methods=['GET']) +def list_presets(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + from presets import PRESETS + out = {} + for name, preset in PRESETS.items(): + out[name] = { + 'description': preset.get('description', ''), + 'has_test': 'test' in preset, + } + return jsonify({'presets': out}) + + +@app.route('/presets/', methods=['POST']) +def load_preset(name): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + from presets import PRESETS + from engine.rfc2544 import create_test as _create_test + + if name not in PRESETS: + return jsonify({'error': f'Unknown preset: {name}. Available: {list(PRESETS)}'}), 404 + + preset = PRESETS[name] + data = request.get_json(silent=True) or {} + + # The preset flow needs a dst_ip; caller can override + flow_data = dict(preset['flow']) + flow_data['dst_ip'] = data.get('dst_ip', flow_data.get('dst_ip', '10.0.0.1')) + if 'src_ip' in data: + flow_data['src_ip'] = data['src_ip'] + if 'responder_url' in data: + flow_data['responder_url'] = data['responder_url'] + + # Validate protocol present + if 'protocol' not in flow_data: + flow_data['protocol'] = 'udp' + + flow_id = str(uuid.uuid4()) + flow_config = { + 'id': flow_id, + 'name': data.get('name', f'{name}-{flow_id[:8]}'), + 'src_mac': flow_data.get('src_mac', 'auto'), + 'dst_mac': flow_data.get('dst_mac'), + 'src_ip': flow_data.get('src_ip'), + 'dst_ip': flow_data['dst_ip'], + 'protocol': flow_data['protocol'], + 'src_port': flow_data.get('src_port'), + 'dst_port': flow_data.get('dst_port'), + 'frame_size': int(flow_data.get('frame_size', 512)), + 'rate_pps': int(flow_data.get('rate_pps', 1000)), + 'duration': int(flow_data.get('duration', 30)), + 'dscp': int(flow_data.get('dscp', 0)), + 'vlan_id': flow_data.get('vlan_id'), + 'state': 'idle', + 'responder_url': flow_data.get('responder_url'), + 'created_at': _now_iso(), + } + + with _flows_lock: + _flows_meta[flow_id] = flow_config + _sender.add_flow(flow_id, dict(flow_config)) + + result = {'flow': _flow_response(flow_id)} + + # Optionally create a test + if 'test' in preset: + test_cfg = preset['test'] + test_id = str(uuid.uuid4()) + kwargs = { + 'frame_sizes': test_cfg.get('frame_sizes', [64, 512, 1518]), + 'trial_duration': float(test_cfg.get('trial_duration', 60)), + 'max_rate_pps': int(test_cfg.get('max_rate_pps', flow_config.get('rate_pps', 10000))), + 'acceptable_loss_pct': float(test_cfg.get('acceptable_loss_pct', 0.0)), + } + test = _create_test(test_id, test_cfg['type'], dict(flow_config), **kwargs) + with _tests_lock: + _tests[test_id] = test + result['test'] = test.get_info() + + log.info('Loaded preset %s -> flow %s', name, flow_id[:8]) + return jsonify(result), 201 + + +# --------------------------------------------------------------------------- +# Sender-mode: Stats history +# --------------------------------------------------------------------------- + +@app.route('/stats/history', methods=['GET']) +def stats_history(): + if MODE != 'sender': + return jsonify({'error': 'Not in sender mode'}), 400 + + count = int(request.args.get('count', 60)) + with _flows_lock: + flow_ids = list(_flows_meta.keys()) + + history = {} + for fid in flow_ids: + history[fid] = _stats_collector.get_history(fid, count) + return jsonify({'history': history}) + + +# --------------------------------------------------------------------------- +# Responder-mode endpoints +# --------------------------------------------------------------------------- + +@app.route('/responder/stats', methods=['GET']) +def responder_stats(): + if MODE != 'responder' or _responder is None: + return jsonify({'error': 'Not in responder mode'}), 400 + return jsonify(_responder.get_stats()) + + +@app.route('/responder/reset', methods=['POST']) +def responder_reset(): + if MODE != 'responder' or _responder is None: + return jsonify({'error': 'Not in responder mode'}), 400 + _responder.reset_stats() + return jsonify({'status': 'reset'}) + + +# --------------------------------------------------------------------------- +# Stats collection loop (sender mode) +# --------------------------------------------------------------------------- + +def _stats_loop(stop_event: threading.Event): + """Runs every 1s, recording per-flow stats into the StatsCollector.""" + while not stop_event.is_set(): + try: + all_stats = _sender.get_all_stats() + for flow_id, s in all_stats.items(): + latency = None + samples = s.get('latency_samples', []) + if samples: + latency = { + 'min_ms': round(min(samples), 3), + 'max_ms': round(max(samples), 3), + 'avg_ms': round(sum(samples) / len(samples), 3), + } + _stats_collector.record( + flow_id, + tx_packets=s.get('tx_packets', 0), + tx_bytes=s.get('tx_bytes', 0), + rx_packets=s.get('rx_packets', 0), + rx_bytes=s.get('rx_bytes', 0), + latency=latency, + ) + except Exception as e: + log.debug('Stats loop error: %s', e) + stop_event.wait(1.0) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + global MODE, _sender, _stats_collector, _responder + + MODE = os.environ.get('TRAFFIC_GEN_MODE', 'sender').lower() + api_port = int(os.environ.get('TRAFFIC_GEN_PORT', 5051)) + listen_iface = os.environ.get('TRAFFIC_GEN_INTERFACE', None) + responder_sub_mode = os.environ.get('TRAFFIC_GEN_RESPONDER_MODE', 'log') # echo or log + + log.info('Traffic Generator starting in %s mode', MODE) + + if MODE == 'sender': + from engine.sender import FlowSender + from engine.stats import StatsCollector + + _sender = FlowSender() + _stats_collector = StatsCollector() + + # Start stats collection loop + stats_stop = threading.Event() + stats_thread = threading.Thread( + target=_stats_loop, args=(stats_stop,), daemon=True, name='stats-loop' + ) + stats_thread.start() + + # Start Flask in background thread + flask_thread = threading.Thread( + target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True), + daemon=True, + ) + flask_thread.start() + log.info('Flask API listening on port %d', api_port) + + # Main thread: keep alive + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + log.info('Shutting down sender...') + stats_stop.set() + + elif MODE == 'responder': + from engine.responder import Responder + + _responder = Responder(mode=responder_sub_mode) + _responder.start(interface=listen_iface) + + # Start Flask in background thread + flask_thread = threading.Thread( + target=lambda: app.run(host='0.0.0.0', port=api_port, threaded=True), + daemon=True, + ) + flask_thread.start() + log.info('Flask API listening on port %d', api_port) + + # Main thread: keep alive + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + log.info('Shutting down responder...') + _responder.stop() + + else: + log.error('Unknown mode: %s. Use "sender" or "responder".', MODE) + sys.exit(1) + + +if __name__ == '__main__': + main()